# San Francisco Fire Calls

This notebook is the end-to-end example from Chapter 3, showing how to use DataFrame and Spark SQL for common data analytics patterns and operations on a [San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3) dataset.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName('San Francisco Fire Calls').getOrCreate()

In [8]:
%ls -lrt "/home/karthik/SparkCourse/pyspark notebooks/data"

total 56216
-rw-rw-rw- 1 karthik karthik  1284872 May 12 16:04 mnm_dataset.csv
-rw-rw-rw- 1 karthik karthik 44530123 May 12 16:39 sf-fire-calls.csv
-rw-rw-rw- 1 karthik karthik 11745628 May 12 16:39 sf-fire-incidents.csv


In [10]:
filePath =  "/home/karthik/SparkCourse/pyspark notebooks/data/sf-fire-calls.csv"

In [11]:
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),      
                     StructField('WatchDate', StringType(), True),
                     StructField('CallFinalDisposition', StringType(), True),
                     StructField('AvailableDtTm', StringType(), True),
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('Zipcode', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('Neighborhood', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True),
                     StructField('Delay', FloatType(), True)])

In [12]:
data = spark.read.csv(filePath,header=True)

In [13]:
data.show(5,False)

+----------+------+--------------+----------------+----------+----------+--------------------+----------------------+---------------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+---------------------+-------------------------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|CallType        |CallDate  |WatchDate |CallFinalDisposition|AvailableDtTm         |Address                    |City|Zipcode|Battalion|StationArea|Box |OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|Neighborhood         |Location                             |RowID        |Delay    |
+----------+------+--------------+----------------+----------+----------+--------------------+----------------------+---------------------------+----+-------+--

In [14]:
data.count()

175296

In [15]:
data.printSchema()

root
 |-- CallNumber: string (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: string (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: string (nullable = true)
 |-- ALSUnit: string (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: string (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: string (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- Sup

Filter out "Medical Incident" call types

Note that filter() and where() methods on the DataFrame are similar. Check relevant documentation for their respective argument types.

In [22]:
few_fire_df = data.select("IncidentNumber","AvailableDtTm","CallType").filter(col("CallType") != "Medical Incident")

few_fire_df.count()

61502

In [23]:
few_fire_df.show(10,False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
|2003304       |01/11/2002 09:58:53 AM|Alarms        |
|2003382       |01/11/2002 02:59:04 PM|Structure Fire|
|2003408       |01/11/2002 04:09:08 PM|Structure Fire|
|2003408       |01/11/2002 04:09:08 PM|Structure Fire|
|2003408       |01/11/2002 04:09:08 PM|Structure Fire|
+--------------+----------------------+--------------+
only showing top 10 rows



Q-1) How many distinct types of calls were made to the Fire Department?

To be sure, let's not count "null" strings in that column.

In [25]:
data.select("CallType").where(col("CallType").isNotNull()).distinct().count()

30

In [27]:
data.select("CallType").where(col("CallType").isNotNull()).distinct().show(30,False)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

Q-3) Find out all response or delayed times greater than 5 mins?

Rename the column Delay - > ReponseDelayedinMins
Returns a new DataFrame
Find out all calls where the response time to the fire site was delayed for more than 5 mins

In [29]:
new_fire_df = data.withColumnRenamed("Delay","ResponseDelayedinMins")

new_fire_df.select("ResponseDelayedinMins").filter(col("ResponseDelayedinMins")>5).show(10,False)

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|6.25                 |
|7.25                 |
|11.916667            |
|8.633333             |
|95.28333             |
|7.6                  |
|6.133333             |
|6.9166665            |
|6.35                 |
|7.983333             |
+---------------------+
only showing top 10 rows



Let's do some ETL:

1. Transform the string dates to Spark Timestamp data type so we can make some time-based queries later
2. Returns a transformed query
3. Cache the new DataFrame

In [30]:
new_fire_df.columns

['CallNumber',
 'UnitID',
 'IncidentNumber',
 'CallType',
 'CallDate',
 'WatchDate',
 'CallFinalDisposition',
 'AvailableDtTm',
 'Address',
 'City',
 'Zipcode',
 'Battalion',
 'StationArea',
 'Box',
 'OriginalPriority',
 'Priority',
 'FinalPriority',
 'ALSUnit',
 'CallTypeGroup',
 'NumAlarms',
 'UnitType',
 'UnitSequenceInCallDispatch',
 'FirePreventionDistrict',
 'SupervisorDistrict',
 'Neighborhood',
 'Location',
 'RowID',
 'ResponseDelayedinMins']

In [45]:
fire_df_ts = (new_fire_df
 .withColumn("IncidentDate",to_date(col("CallDate"),"MM/dd/yyyy")).drop("CallDate")
 .withColumn("OnWatchDate",to_date(col("WatchDate"),"MM/dd/yyyy")).drop("WatchDate")
 .withColumn("AvailableDtTs",to_timestamp(col("AvailableDtTm"),"MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")
)

fire_df_ts.printSchema()

root
 |-- CallNumber: string (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: string (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: string (nullable = true)
 |-- ALSUnit: string (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: string (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: string (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Location: string (nullable = true)


In [46]:
fire_df_ts.select("IncidentNumber","IncidentDate","OnWatchDate","AvailableDtTs").show(10,False)

+--------------+------------+-----------+-------------------+
|IncidentNumber|IncidentDate|OnWatchDate|AvailableDtTs      |
+--------------+------------+-----------+-------------------+
|2003235       |2002-01-11  |2002-01-10 |2002-01-11 01:51:44|
|2003241       |2002-01-11  |2002-01-10 |2002-01-11 03:01:18|
|2003242       |2002-01-11  |2002-01-10 |2002-01-11 02:39:50|
|2003250       |2002-01-11  |2002-01-10 |2002-01-11 04:16:46|
|2003259       |2002-01-11  |2002-01-10 |2002-01-11 06:01:58|
|2003279       |2002-01-11  |2002-01-11 |2002-01-11 08:03:26|
|2003301       |2002-01-11  |2002-01-11 |2002-01-11 09:46:44|
|2003304       |2002-01-11  |2002-01-11 |2002-01-11 09:58:53|
|2003343       |2002-01-11  |2002-01-11 |2002-01-11 12:06:57|
|2003348       |2002-01-11  |2002-01-11 |2002-01-11 13:08:40|
+--------------+------------+-----------+-------------------+
only showing top 10 rows



In [47]:
fire_df_ts.cache()

DataFrame[CallNumber: string, UnitID: string, IncidentNumber: string, CallType: string, CallFinalDisposition: string, Address: string, City: string, Zipcode: string, Battalion: string, StationArea: string, Box: string, OriginalPriority: string, Priority: string, FinalPriority: string, ALSUnit: string, CallTypeGroup: string, NumAlarms: string, UnitType: string, UnitSequenceInCallDispatch: string, FirePreventionDistrict: string, SupervisorDistrict: string, Neighborhood: string, Location: string, RowID: string, ResponseDelayedinMins: string, IncidentDate: date, OnWatchDate: date, AvailableDtTs: timestamp]

In [48]:
fire_df_ts.columns

['CallNumber',
 'UnitID',
 'IncidentNumber',
 'CallType',
 'CallFinalDisposition',
 'Address',
 'City',
 'Zipcode',
 'Battalion',
 'StationArea',
 'Box',
 'OriginalPriority',
 'Priority',
 'FinalPriority',
 'ALSUnit',
 'CallTypeGroup',
 'NumAlarms',
 'UnitType',
 'UnitSequenceInCallDispatch',
 'FirePreventionDistrict',
 'SupervisorDistrict',
 'Neighborhood',
 'Location',
 'RowID',
 'ResponseDelayedinMins',
 'IncidentDate',
 'OnWatchDate',
 'AvailableDtTs']

**Q-4) What were the most common call types?**

List them in descending order

In [54]:
fire_df_ts.select("CallType").groupBy("CallType").count().orderBy("count",ascending=False).show(10,False)

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



**Q-4a) What zip codes accounted for most common calls?**

Let's investigate what zip codes in San Francisco accounted for most fire calls and what type where they.

1. Filter out by CallType
2. Group them by CallType and Zip code
3. Count them and display them in descending order

It seems like the most common calls were all related to Medical Incident, and the two zip codes are 94102 and 94103.

In [55]:
(fire_df_ts
 .select("CallType","ZipCode")
 .where(col("CallType").isNotNull())
 .groupBy("CallType","ZipCode")
 .count()
 .orderBy("count",ascending=False)
 .show(10,False)
)

+----------------+-------+-----+
|CallType        |ZipCode|count|
+----------------+-------+-----+
|Medical Incident|94102  |16130|
|Medical Incident|94103  |14775|
|Medical Incident|94110  |9995 |
|Medical Incident|94109  |9479 |
|Medical Incident|94124  |5885 |
|Medical Incident|94112  |5630 |
|Medical Incident|94115  |4785 |
|Medical Incident|94122  |4323 |
|Medical Incident|94107  |4284 |
|Medical Incident|94133  |3977 |
+----------------+-------+-----+
only showing top 10 rows



**Q-4b) What San Francisco neighborhoods are in the zip codes 94102 and 94103**

Let's find out the neighborhoods associated with these two zip codes. In all likelihood, these are some of the contested 
neighborhood with high reported crimes.

In [61]:
fire_df_ts.select("Neighborhood","ZipCode").where(expr("ZipCode == 94102 OR ZipCode == 94103")).distinct().show(10,False)

+------------------------------+-------+
|Neighborhood                  |ZipCode|
+------------------------------+-------+
|Mission Bay                   |94103  |
|Financial District/South Beach|94103  |
|Castro/Upper Market           |94103  |
|Western Addition              |94102  |
|Nob Hill                      |94102  |
|South of Market               |94103  |
|Potrero Hill                  |94103  |
|Hayes Valley                  |94103  |
|South of Market               |94102  |
|Tenderloin                    |94102  |
+------------------------------+-------+
only showing top 10 rows



**Q-5) What was the sum of all calls, average, min and max of the response times for calls?**

Let's use the built-in Spark SQL functions to compute the sum, avg, min, and max of few columns:

* Number of Total Alarms
* What were the min and max the delay in response time before the Fire Dept arrived at the scene of the call

In [64]:
fire_df_ts.select(sum("NumAlarms"),mean("ResponseDelayedinMins"),min("ResponseDelayedinMins"),max("ResponseDelayedinMins")).show()

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|      176170.0|        3.8923641541750413|               0.016666668|                      99.9|
+--------------+--------------------------+--------------------------+--------------------------+



** Q-6a) How many distinct years of data is in the CSV file?**

We can use the `year()` SQL Spark function off the Timestamp column data type IncidentDate.

In all, we have fire calls from years 2000-2018

In [66]:
fire_df_ts.select(year("IncidentDate")).distinct().orderBy(year("IncidentDate")).show(100,False)

+------------------+
|year(IncidentDate)|
+------------------+
|2000              |
|2001              |
|2002              |
|2003              |
|2004              |
|2005              |
|2006              |
|2007              |
|2008              |
|2009              |
|2010              |
|2011              |
|2012              |
|2013              |
|2014              |
|2015              |
|2016              |
|2017              |
|2018              |
+------------------+



** Q-6b) What week of the year in 2018 had the most fire calls?**

**Note**: Week 1 is the New Years' week and week 25 is the July 4 the week. Loads of fireworks, so it makes sense the higher number of calls.

In [68]:
fire_df_ts.filter(year("IncidentDate")==2018).groupBy(weekofyear("IncidentDate")).count().orderBy("count",ascending=False).show(10,False)

+------------------------+-----+
|weekofyear(IncidentDate)|count|
+------------------------+-----+
|22                      |259  |
|40                      |255  |
|43                      |250  |
|25                      |249  |
|1                       |246  |
|44                      |244  |
|32                      |243  |
|13                      |243  |
|11                      |240  |
|5                       |236  |
+------------------------+-----+
only showing top 10 rows



** Q-7) What neighborhoods in San Francisco had the worst response time in 2018?**

It appears that if you living in Presidio Heights, the Fire Dept arrived in less than 3 mins, while Mission Bay took more than 6 mins.

In [69]:
fire_df_ts.select("Neighborhood","ResponseDelayedinMins").filter(year("IncidentDate") == 2018).show(10,False)

+------------------------------+---------------------+
|Neighborhood                  |ResponseDelayedinMins|
+------------------------------+---------------------+
|Presidio Heights              |2.8833334            |
|Mission Bay                   |6.3333335            |
|Chinatown                     |2.65                 |
|Financial District/South Beach|3.5333333            |
|Tenderloin                    |1.1                  |
|Bayview Hunters Point         |4.05                 |
|Inner Richmond                |2.5666666            |
|Inner Sunset                  |1.4                  |
|Sunset/Parkside               |2.6666667            |
|South of Market               |1.7666667            |
+------------------------------+---------------------+
only showing top 10 rows



**Q-8a) How can we use Parquet files or SQL table to store data and read it back?**

In [72]:
fire_df_ts.write.format("parquet").mode("overwrite").save("/home/karthik/SparkCourse/pyspark notebooks/parquet")

In [77]:
%ls -lrt "/home/karthik/SparkCourse/pyspark notebooks/parquet/"

total 8104
-rw-r--r-- 1 karthik karthik 8296054 May 12 18:01 part-00000-d8db17a8-420d-4ebe-a2c1-e9d44aa5de50-c000.snappy.parquet
-rw-r--r-- 1 karthik karthik       0 May 12 18:01 _SUCCESS


In [78]:
fire_df_ts.write.format("parquet").mode("overwrite").saveAsTable("fire_calls")

In [80]:
spark.sql("CACHE TABLE fire_calls")

DataFrame[]

In [82]:
spark.sql("SELECT * FROM fire_calls LIMIT 10").show(truncate=False)

+----------+------+--------------+----------------+--------------------+---------------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+------------------------------+-------------------------------------+-------------+---------------------+------------+-----------+-------------------+
|CallNumber|UnitID|IncidentNumber|CallType        |CallFinalDisposition|Address                    |City|Zipcode|Battalion|StationArea|Box |OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|Neighborhood                  |Location                             |RowID        |ResponseDelayedinMins|IncidentDate|OnWatchDate|AvailableDtTs      |
+----------+------+--------------+----------------+--------------------+---------------------------+----+-------+-----

** Q-8c) How can read data from Parquet file?**

Note we don't have to specify the schema here since it's stored as part of the Parquet metadata

In [84]:
new_par_df = spark.read.format("parquet").load("/home/karthik/SparkCourse/pyspark notebooks/parquet")

new_par_df.show(10,False)

+----------+------+--------------+----------------+--------------------+---------------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+------------------------------+-------------------------------------+-------------+---------------------+------------+-----------+-------------------+
|CallNumber|UnitID|IncidentNumber|CallType        |CallFinalDisposition|Address                    |City|Zipcode|Battalion|StationArea|Box |OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|Neighborhood                  |Location                             |RowID        |ResponseDelayedinMins|IncidentDate|OnWatchDate|AvailableDtTs      |
+----------+------+--------------+----------------+--------------------+---------------------------+----+-------+-----