In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col, to_timestamp, month, year
from delta.pip_utils import configure_spark_with_delta_pip

In [None]:
APP_NAME = "spark-sql"

SHARED_DIRECTORY = "data"
OUTPUT_FILE_NAME = "311_service_requests/"

# input file
# INPUT_FILE_NAME = "erm2-nwe9_version_176301.csv" # use for end to end final output

INPUT_FILE_NAME = "small_erm2-nwe9_version_176301.csv" # dev file path

# Bronze file paths
BRONZE_INPUT_FILE_PATH = os.path.join(SHARED_DIRECTORY, "raw", INPUT_FILE_NAME)
BRONZE_OUTPUT_FILE_PATH = os.path.join(SHARED_DIRECTORY, "bronze",  OUTPUT_FILE_NAME)


# Silver file paths
SILVER_INPUT_FILE_PATH = BRONZE_OUTPUT_FILE_PATH
SILVER_OUTPUT_FILE_PATH = os.path.join(SHARED_DIRECTORY, "silver",  OUTPUT_FILE_NAME)

# Gold file paths
GOLD_INPUT_FILE_PATH = SILVER_OUTPUT_FILE_PATH
# GOLD_OUTPUT_FILE_PATH = os.path.join(SHARED_DIRECTORY, "silver",  OUTPUT_FILE_NAME)

In [None]:
  # Initialize SparkSession
spark = (
SparkSession
.builder.master("spark://localhost:7077")
.appName(APP_NAME)
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(spark).getOrCreate()
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)

In [None]:
input_csv_file_schema = StructType([
            StructField("unique_key", StringType(), True),
            StructField("created_date", TimestampType(), True),
            StructField("closed_date", TimestampType(), True),
            StructField("agency", StringType(), True),
            StructField("agency_name", StringType(), True),
            StructField("complaint_type", StringType(), True),
            StructField("descriptor", StringType(), True),
            StructField("location_type", StringType(), True),
            StructField("incident_zip", StringType(), True),
            StructField("incident_address", StringType(), True),
            StructField("street_name", StringType(), True),
            StructField("cross_street_1", StringType(), True),
            StructField("cross_street_2", StringType(), True),
            StructField("intersection_street_1", StringType(), True),
            StructField("intersection_street_2", StringType(), True),
            StructField("address_type", StringType(), True),
            StructField("city", StringType(), True),
            StructField("landmark", StringType(), True),
            StructField("facility_type", StringType(), True),
            StructField("status", StringType(), True),
            StructField("due_date", TimestampType(), True),
            StructField("resolution_description", StringType(), True),
            StructField("resolution_action_updated_date", TimestampType(), True),
            StructField("community_board", StringType(), True),
            StructField("bbl", StringType(), True),
            StructField("borough", StringType(), True),
            StructField("x_coordinate_state_plane", StringType(), True),
            StructField("y_coordinate_state_plane", StringType(), True),
            StructField("open_data_channel_type", StringType(), True),
            StructField("park_facility_name", StringType(), True),
            StructField("park_borough", StringType(), True),
            StructField("vehicle_type", StringType(), True),
            StructField("taxi_company_borough", StringType(), True),
            StructField("taxi_pick_up_location", StringType(), True),
            StructField("bridge_highway_name", StringType(), True),
            StructField("bridge_highway_direction", StringType(), True),
            StructField("road_ramp", StringType(), True),
            StructField("bridge_highway_segment", StringType(), True),
            StructField("latitude", DoubleType(), True),
            StructField("longitude", DoubleType(), True),
            StructField("location_city", StringType(), True),
            StructField("location", StringType(), True),
            StructField("location_address", StringType(), True),
            StructField("location_zip", StringType(), True),
            StructField("location_state", StringType(), True),
        ])

# Read CSV File
bronze_df = (
    spark.read
    .format("csv")
    .option("header", True)
    .option("inferSchema", False)
    .schema(input_csv_file_schema)
    .load(BRONZE_INPUT_FILE_PATH)
    )

# small_df = bronze_df.limit(1000)
# small_df.toPandas().to_csv(os.path.join(SHARED_DIRECTORY, "raw","small_erm2-nwe9_version_176301.csv"), index=False)


# show 10 records
bronze_df.show(10)

# transform into Delta Lake
(
    bronze_df.write
    .format("delta")
    .mode("overwrite")
    .save(BRONZE_OUTPUT_FILE_PATH)
)

In [None]:
silver_df = (  spark.read
    .format("delta")
    .load(SILVER_INPUT_FILE_PATH)
    .filter(col("complaint_type").isNotNull())
    .withColumn("created_date",to_timestamp('created_date', 'yyyy-MM-dd')) #Coaerce to datettime
    .withColumn("closed_date",to_timestamp('closed_date', 'yyyy-MM-dd')) #Coaerce to datettime
    .withColumn("month",month('created_date'))#extract month
    .withColumn("year",year('created_date'))#extract year
    .dropDuplicates(["unique_key"])
)
silver_df.show(10)

In [None]:
# Save to a delta table with partitions
(
    silver_df.write
    .format("delta")
    .mode("overwrite")
    .partitionBy("year","month")
    .save(SILVER_OUTPUT_FILE_PATH)
)

In [None]:
gold_df = ( spark.read
        .format("delta")
        .load(GOLD_INPUT_FILE_PATH)
)
gold_df.show(10)