# **Chapter 3. Apache Spark’s Structured APIs**.
## In this Notebook I solve the exercises shown in End-to-End DataFrame Example


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml.stat import Correlation

In [2]:
FIRE_INCIDENTS_PATH = '../Data/Fire_Incidents.csv'

In [3]:
spark = (SparkSession
    .builder
    .appName("LearningDataFrames")
    .getOrCreate()
)

Create the schema for the fire incidents data.

In [4]:
fire_schema = StructType([
    StructField('CallNumber', IntegerType(), True),
    StructField('UnitID', StringType(), True),
    StructField('IncidentNumber', IntegerType(), True),
    StructField('CallType', StringType(), True),
    StructField('CallDate', StringType(), True),
    StructField('WatchDate', StringType(), True),
    StructField('ReceivedDtTm', StringType(), True),
    StructField('EntryDtTm', StringType(), True),
    StructField('DistpatchDtTm', StringType(), True),
    StructField('ResponseDtTm', StringType(), True),
    StructField('OnSceneDtTm', StringType(), True),
    StructField('TransportDtTm', StringType(), True),
    StructField('HospitalDtTm', StringType(), True),
    StructField('CallFinalDisposition', StringType(), True),
    StructField('AvailableDtTm', StringType(), True),
    StructField('Address', StringType(), True),
    StructField('City', StringType(), True),
    StructField('Zipcode', IntegerType(), True),
    StructField('Battalion', StringType(), True),
    StructField('StationArea', StringType(), True),
    StructField('Box', StringType(), True),
    StructField('OriginalPriority', StringType(), True),
    StructField('Priority', StringType(), True),
    StructField('FinalPriority', IntegerType(), True),
    StructField('ALSUnit', BooleanType(), True),
    StructField('CallTypeGroup', StringType(), True),
    StructField('NumAlarms', IntegerType(), True),
    StructField('UnitType', StringType(), True),
    StructField('UnitSequenceInCallDispatch', IntegerType(), True),
    StructField('FirePreventionDistrict', StringType(), True),
    StructField('SupervisorDistrict', StringType(), True),
    StructField('Neighborhood', StringType(), True),
    StructField('Location', StringType(), True),
    StructField('RowID', StringType(), True),
    StructField('Delay', FloatType(), True)
])

Firstly, load the data and have a look

In [5]:
fire_df = spark.read.csv(FIRE_INCIDENTS_PATH, header=True, schema=fire_schema)

In [6]:
fire_df.limit(5).toPandas().head()  # I convert the data to Pandas DFs so that I can get more readable output.

Unnamed: 0,CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,ReceivedDtTm,EntryDtTm,DistpatchDtTm,ResponseDtTm,...,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay
0,210690030,T03,21030278,Alarms,03/10/2021,03/09/2021,03/10/2021 12:16:03 AM,03/10/2021 12:18:36 AM,03/10/2021 12:19:01 AM,03/10/2021 12:20:15 AM,...,Alarm,1,TRUCK,3,2,6,Tenderloin,210690030-T03,POINT (-122.41697932641094 37.7770834641944),36.0
1,203421272,BLS841,20139667,Medical Incident,12/07/2020,12/07/2020,12/07/2020 11:26:17 AM,12/07/2020 11:26:17 AM,12/07/2020 11:26:17 AM,12/07/2020 11:26:17 AM,...,,1,CHIEF,1,4,2,Marina,203421272-BLS841,,
2,213602525,T10,21160001,Structure Fire,12/26/2021,12/26/2021,12/26/2021 11:30:51 PM,12/26/2021 11:32:21 PM,12/26/2021 11:34:41 PM,12/26/2021 11:38:10 PM,...,Alarm,1,TRUCK,4,5,5,Lone Mountain/USF,213602525-T10,POINT (-122.45094859355395 37.77283315421379),18.0
3,210683285,58,21030264,Structure Fire,03/09/2021,03/09/2021,03/09/2021 11:06:09 PM,03/09/2021 11:08:23 PM,03/09/2021 11:08:35 PM,03/09/2021 11:08:43 PM,...,Alarm,1,MEDIC,10,2,6,Tenderloin,210683285-58,POINT (-122.41324374873464 37.78295349313507),36.0
4,203341976,VAN1,20136406,Medical Incident,11/29/2020,11/29/2020,11/29/2020 04:59:48 PM,11/29/2020 04:59:48 PM,11/29/2020 04:59:48 PM,11/29/2020 04:59:48 PM,...,,1,SUPPORT,1,2,6,Tenderloin,203341976-VAN1,,


## End-to-End DF processing

Firstly, change the dates to DateType.


In [7]:
fire_ts_2018_df = (fire_df
    .withColumnRenamed("Delay", "ResponseDelayedinMins")
    .withColumn("IncidentDate", F.to_timestamp(F.col("CallDate"), "MM/dd/yyyy"))
    .drop("CallDate")
    .withColumn("OnWatchDate", F.to_timestamp(F.col("WatchDate"), "MM/dd/yyyy"))
    .drop("WatchDate")
    .withColumn("AvailableDtTS", F.to_timestamp(F.col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a"))
    .drop("AvailableDtTm")
    .where(F.year("IncidentDate") == 2018)
)

In [8]:
fire_ts_2018_df.limit(5).toPandas().head()  # Let's see what's inside.

Unnamed: 0,CallNumber,UnitID,IncidentNumber,CallType,ReceivedDtTm,EntryDtTm,DistpatchDtTm,ResponseDtTm,OnSceneDtTm,TransportDtTm,...,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,ResponseDelayedinMins,IncidentDate,OnWatchDate,AvailableDtTS
0,182231460,B07,18094142,Alarms,08/11/2018 11:38:04 AM,08/11/2018 11:39:47 AM,08/11/2018 11:40:09 AM,08/11/2018 11:40:52 AM,,,...,3,7,1,Outer Richmond,182231460-B07,POINT (-122.489016145519 37.780861021477),29.0,2018-08-11,2018-08-11,2018-08-11 11:57:43
1,180150449,E36,18006326,Structure Fire,01/15/2018 05:38:00 AM,01/15/2018 05:38:00 AM,01/15/2018 05:38:11 AM,01/15/2018 05:40:32 AM,01/15/2018 05:43:01 AM,,...,1,2,6,Tenderloin,180150449-E36,POINT (-122.41371122073 37.779666042606),36.0,2018-01-15,2018-01-14,2018-01-15 05:43:05
2,180102336,D2,18004376,Structure Fire,01/10/2018 02:08:26 PM,01/10/2018 02:10:41 PM,01/10/2018 02:11:11 PM,,,,...,10,6,9,Mission,180102336-D2,POINT (-122.417362867532 37.753108051231),20.0,2018-01-10,2018-01-10,2018-01-10 14:14:39
3,180412083,E32,18017674,Other,02/10/2018 02:31:02 PM,02/10/2018 02:31:02 PM,02/10/2018 02:31:02 PM,02/10/2018 02:31:02 PM,02/10/2018 02:31:02 PM,,...,1,6,8,Bernal Heights,180412083-E32,POINT (-122.425793479077 37.735985677747),2.0,2018-02-10,2018-02-10,2018-02-10 14:38:01
4,181121425,E37,18047664,Alarms,04/22/2018 11:38:42 AM,04/22/2018 11:40:29 AM,04/22/2018 11:41:00 AM,04/22/2018 11:42:37 AM,04/22/2018 11:44:38 AM,,...,1,10,10,Potrero Hill,181121425-E37,POINT (-122.40654101432 37.756508001322),20.0,2018-04-22,2018-04-22,2018-04-22 11:51:52


1. What were all the different types of fire calls in 2018?

In [9]:
(fire_ts_2018_df
    .select("CallType")
    .distinct()
    .limit(5)
    .toPandas()
    .head()
)

Unnamed: 0,CallType
0,Alarms
1,HazMat
2,Vehicle Fire
3,Other
4,Outside Fire


2. What months within the year 2018 saw the highest number of fire calls?


In [10]:
(fire_ts_2018_df
    .groupBy(F.month("IncidentDate"))
    .count()
    .orderBy(F.desc("count"))
    .limit(5)
    .toPandas()
    .head()
)

Unnamed: 0,month(IncidentDate),count
0,1,27027
1,3,26606
2,10,26536
3,11,26307
4,5,26297


3. Which neighborhood in San Francisco generated the most fire calls in 2018?

In [11]:
(fire_ts_2018_df
    .groupBy("Neighborhood")
    .count()
    .orderBy(F.desc("count"))
    .limit(5)
    .toPandas()
    .head()
)

Unnamed: 0,Neighborhood,count
0,Tenderloin,43894
1,South of Market,32620
2,Mission,27108
3,Financial District/South Beach,23870
4,Bayview Hunters Point,15667


4. Which neighborhoods had the worst response times to fire calls in 2018?


In [13]:
(fire_ts_2018_df
    .select("ResponseDelayedinMins", "IncidentDate", "Neighborhood")
    .groupBy("Neighborhood")
    .max("ResponseDelayedinMins")
    .orderBy(F.desc("max(ResponseDelayedinMins)"))
    .limit(5)
    .toPandas()
    .head()
)

Unnamed: 0,Neighborhood,max(ResponseDelayedinMins)
0,West of Twin Peaks,41.0
1,Visitacion Valley,40.0
2,Western Addition,39.0
3,Twin Peaks,38.0
4,Treasure Island,37.0


5. Which week in the year in 2018 had the most fire calls?

In [14]:
(fire_ts_2018_df
    .select("IncidentDate")
    .groupBy(F.weekofyear("IncidentDate"))
    .count()
    .orderBy(F.desc("count"))
    .limit(5)
    .toPandas()
    .head()
)

Unnamed: 0,weekofyear(IncidentDate),count
0,1,7545
1,25,6425
2,49,6354
3,22,6328
4,13,6321


6. Is there a correlation between neighborhood, zip code, and number of fire calls?

*I mean it makes no sense so there shouldn't be one*

In [15]:
(fire_df
 .groupBy("Zipcode")
 .count()
 .agg(F.corr("Zipcode", "count").alias('c')).collect()
)

[Row(c=-0.5240421601797752)]

7. How can we use Parquet files or SQL tables to store this data and read it back?

The benefit is that it writes down the schema -- we don't have to redefine it.

In [16]:
# fireDF.write.format("parquet").save(parquetPath)
# fire_df.write.format("parquet").save(parquet_path)
