In [38]:
import os

import traceback

from datetime import datetime, time
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import (StructType, StructField, IntegerType, StringType, BooleanType)
from pyspark.sql import functions as F

Define file paths

In [39]:
current_dir = os.getcwd()
file_name = '../csv_files/chapter14/south_dublin_lib.csv'
file_path = os.path.join(current_dir, file_name)

Custom functions

In [40]:
def create_timestamp_df(spark: SparkSession) -> DataFrame:
      schema = StructType([
            StructField('id', IntegerType(), False),
            StructField('date_str', StringType(), False)
      ])

      rows = [
            (1, "2019-03-11 14:30:00"),
            (2, "2019-04-27 16:00:00"),
            (3, "2020-01-26 05:00:00")
      ]

      df = spark.createDataFrame(rows, schema)
      df = df.withColumn("date", F.to_timestamp(F.col("date_str"))).drop("date_str")
      return df

def parse_time_range(time_range_str):
      """
      Parses a time range string into meaningful components, handling both simple and complex cases.

      For example:
          Input: "09:45-20:00"
          Output: ("09:45-20:00", None, None)

          Input: "10:00-17:00 (16:00 July and August) - closed for lunch 12:30-13:00"
          Output: ("10:00-17:00", "(16:00 July and August)", "closed for lunch 12:30-13:00")
      """
      parts = time_range_str.split(" - ")

      # Ensure unpacking is controlled based on how many parts we actually have
      main_hours = parts[0].strip() if len(parts) > 0 else None  # Always take the first part
      additional_info = parts[1].strip() if len(parts) > 1 else None  # Optional second part
      lunch_break = parts[2].strip() if len(parts) > 2 else None  # Optional third part

      return main_hours, additional_info, lunch_break

def udf_is_open(expected_timestamp,
                opening_hours_monday,
                opening_hours_tuesday,
                opening_hours_wednesday,
                opening_hours_thursday,
                opening_hours_friday,
                opening_hours_saturday,
                opening_hours_sunday):
      try:
            weekday_switch = {
                  "Monday": opening_hours_monday,
                  "Tuesday": opening_hours_tuesday,
                  "Wednesday": opening_hours_wednesday,
                  "Thursday": opening_hours_thursday,
                  "Friday": opening_hours_friday,
                  "Saturday": opening_hours_saturday,
                  "Sunday": opening_hours_sunday
            }
            weekday = expected_timestamp.strftime("%A")
            corresponding_opening_hours = weekday_switch.get(weekday)
            if corresponding_opening_hours == "Closed":
                  return False

            main_hours, additional_info, lunch_break = parse_time_range(corresponding_opening_hours)

            # For now, if we can not get main_hours, this means we will have no information of opening and close time then we assume that is "Closed"
            if main_hours is None:
                  return False

            # Extract start time and end time
            start_time_str, end_time_str = main_hours.split('-')
            start_hour, start_minute = map(int, start_time_str.split(':'))
            end_hour, end_minute = map(int, end_time_str.split(':'))

            # Create time objects for comparison
            start_time = time(start_hour, start_minute)
            end_time = time(end_hour, end_minute)
            expected_timestamp_time = expected_timestamp.time()

            if start_time <= expected_timestamp_time <= end_time:
                  return True
            return False
      except ValueError as e:
            return False  # Or handle appropriately in your logic

Ingesting and Do Data Analysis

In [41]:
spark = SparkSession.builder.appName("Spark with UDF Analysis").getOrCreate()
drop_cols = [ 'Address1', 'Address2', 'Town', 'Postcode', 'County', 'Phone', 'Email', 'Website', 'Image', 'WGS84_Latitude', 'WGS84_Longitude']
df = (spark.read.format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .option("encoding", "cp1252")
      .load(file_path)).drop(*drop_cols)
df.printSchema()

timestamp_df = create_timestamp_df(spark)
timestamp_df.printSchema()

# Join 2 dfs for having data with dates
final_df = df.crossJoin(timestamp_df).drop("id")
final_df.printSchema()

25/01/02 20:58:48 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/01/02 20:58:48 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/01/02 20:58:48 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/01/02 20:58:48 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/01/02 20:58:48 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
25/01/02 20:58:48 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
25/01/02 20:58:48 WARN Utils: Service 'SparkUI' could not bind on port 4046. Attempting port 4047.
25/01/02 20:58:48 WARN Utils: Service 'SparkUI' could not bind on port 4047. Attempting port 4048.
25/01/02 20:58:48 WARN Utils: Service 'SparkUI' could not bind on port 4048. Attempting port 4049.


root
 |-- Council_ID: string (nullable = true)
 |-- Administrative_Authority: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Opening_Hours_Monday: string (nullable = true)
 |-- Opening_Hours_Tuesday: string (nullable = true)
 |-- Opening_Hours_Wednesday: string (nullable = true)
 |-- Opening_Hours_Thursday: string (nullable = true)
 |-- Opening_Hours_Friday: string (nullable = true)
 |-- Opening_Hours_Saturday: string (nullable = true)

root
 |-- id: integer (nullable = false)
 |-- date: timestamp (nullable = true)

root
 |-- Council_ID: string (nullable = true)
 |-- Administrative_Authority: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Opening_Hours_Monday: string (nullable = true)
 |-- Opening_Hours_Tuesday: string (nullable = true)
 |-- Opening_Hours_Wednesday: string (nullable = true)
 |-- Opening_Hours_Thursday: string (nullable = true)
 |-- Opening_Hours_Friday: string (nullable = true)
 |-- Opening_Hours_Saturday: string (nullable = true)


In [42]:
final_df.show(5)

+----------+------------------------+-------------------+--------------------+---------------------+-----------------------+----------------------+--------------------+----------------------+-------------------+
|Council_ID|Administrative_Authority|               Name|Opening_Hours_Monday|Opening_Hours_Tuesday|Opening_Hours_Wednesday|Opening_Hours_Thursday|Opening_Hours_Friday|Opening_Hours_Saturday|               date|
+----------+------------------------+-------------------+--------------------+---------------------+-----------------------+----------------------+--------------------+----------------------+-------------------+
|       SD1|    South Dublin Coun...|     County Library|         09:45-20:00|          09:45-20:00|            09:45-20:00|           09:45-20:00|         09:45-16:30|           09:45-16:30|2019-03-11 14:30:00|
|       SD2|    South Dublin Coun...|  Ballyroan Library|         09:45-20:00|          09:45-20:00|            09:45-20:00|           09:45-20:00|     

Register UDF and work with UDF via dataframe API

In [43]:
is_open = F.udf(udf_is_open, BooleanType())
final_df = final_df.withColumn("is_opening", is_open(F.col("date"),
                                                  F.col("Opening_Hours_Monday"),
                                                  F.col("Opening_Hours_Tuesday"),
                                                  F.col("Opening_Hours_Wednesday"),
                                                  F.col("Opening_Hours_Thursday"),
                                                  F.col("Opening_Hours_Friday"),
                                                  F.col("Opening_Hours_Saturday"),
                                                  F.lit("Closed")))
final_df.show()

+----------+------------------------+--------------------+--------------------+---------------------+-----------------------+----------------------+--------------------+----------------------+-------------------+----------+
|Council_ID|Administrative_Authority|                Name|Opening_Hours_Monday|Opening_Hours_Tuesday|Opening_Hours_Wednesday|Opening_Hours_Thursday|Opening_Hours_Friday|Opening_Hours_Saturday|               date|is_opening|
+----------+------------------------+--------------------+--------------------+---------------------+-----------------------+----------------------+--------------------+----------------------+-------------------+----------+
|       SD1|    South Dublin Coun...|      County Library|         09:45-20:00|          09:45-20:00|            09:45-20:00|           09:45-20:00|         09:45-16:30|           09:45-16:30|2019-03-11 14:30:00|      true|
|       SD2|    South Dublin Coun...|   Ballyroan Library|         09:45-20:00|          09:45-20:00|   

Register UDF via SQL

In [44]:
spark.udf.register("is_open", is_open)
final_df.createOrReplaceTempView("final_df")
sql_query = """
      SELECT *, is_open(date, Opening_Hours_Monday, Opening_Hours_Tuesday, Opening_Hours_Wednesday, Opening_Hours_Thursday, Opening_Hours_Friday, Opening_Hours_Saturday, 'Closed') AS is_opening FROM final_df
"""
result = spark.sql(sql_query)
result.show()

+----------+------------------------+--------------------+--------------------+---------------------+-----------------------+----------------------+--------------------+----------------------+-------------------+----------+----------+
|Council_ID|Administrative_Authority|                Name|Opening_Hours_Monday|Opening_Hours_Tuesday|Opening_Hours_Wednesday|Opening_Hours_Thursday|Opening_Hours_Friday|Opening_Hours_Saturday|               date|is_opening|is_opening|
+----------+------------------------+--------------------+--------------------+---------------------+-----------------------+----------------------+--------------------+----------------------+-------------------+----------+----------+
|       SD1|    South Dublin Coun...|      County Library|         09:45-20:00|          09:45-20:00|            09:45-20:00|           09:45-20:00|         09:45-16:30|           09:45-16:30|2019-03-11 14:30:00|      true|      true|
|       SD2|    South Dublin Coun...|   Ballyroan Library|  

Stop Session

In [45]:
spark.stop()