### ![Spark Logo](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark.png) + ![SF Open Data Logo](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/logo_sfopendata.png)

## Exploring the City of San Francisco public data with Apache Spark 2.0

The SF OpenData project was launched in 2009 and contains hundreds of datasets from the city and county of San Francisco. Open government data has the potential to increase the quality of life for residents, create more efficient government services, better public decisions, and even new local businesses and services.

It was the 4th of July a couple of days ago, so SF residents enjoyed a fireworks show:

![Fireworks](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/fireworks.png)

How did the 4th of July holiday affect demand for Firefighters?

## Introduction to Spark

Our software tool to do the data analysis will be Apache Spark:

![About Spark](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/book_intro/spark_about.png)

*(Spark 2.0.0 is in release candidate status)*

Spark is a unified processing engine that can analyze big data using SQL, machine learning, graph processing or real time stream analysis:

![Spark Engines](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/book_intro/spark_4engines.png)

We will mostly focus on Spark SQL and DataFrames this evening.

Spark can read from many different databases and file systems and run in various environments:

![Spark Goal](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/book_intro/spark_goal.png)

Although Spark supports four languages (Scala, Java, Python, R), tonight we will use Python.
Broadly speaking, there are **2 APIs** for interacting with Spark:
- **DataFrames/SQL/Datasets:** general, higher level API for users of Spark
- **RDD:** a lower level API for spark internals and advanced programming

A Spark cluster is made of one Driver and many Executor JVMs (java virtual machines):

![Spark Physical Cluster, slots](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/book_intro/spark_cluster_slots.png)

The Driver sends Tasks to the empty slots on the Executors when work has to be done:

![Spark Physical Cluster, tasks](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/book_intro/spark_cluster_tasks.png)

In Databricks Community Edition, everyone gets a local mode cluster, where the Driver and Executor code run in the same JVM. Local mode clusters are typically used for prototyping and learning Spark:

![Notebook + Micro Cluster](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/book_intro/notebook_microcluster.png)

![Databricks](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/book_intro/databricks_about.png)

## Introduction to Fire Department Calls for Service

The latest July 6th, 2016 copy of the "Fire Department Calls for Service" data set has been uploaded to S3. You can see the data with the `%fs ls` command:

In [22]:
%fs ls /mnt/sf_open_data/fire_dept_calls_for_service/

path,name,size
dbfs:/mnt/sf_open_data/fire_dept_calls_for_service/Fire_Department_Calls_for_Service.csv,Fire_Department_Calls_for_Service.csv,1634673683


Note, you can also access the 1.6 GB of data directly from sfgov.org via this link: https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3

The entry point into all functionality in Spark 2.0 is the new SparkSession class:

In [25]:
spark

Using the SparkSession, create a DataFrame from the CSV file by inferring the schema:

In [27]:
fireServiceCallsDF = spark.read.csv('/mnt/sf_open_data/fire_dept_calls_for_service/Fire_Department_Calls_for_Service.csv', header=True, inferSchema=True)

Notice that the above cell takes ~15 seconds to run b/c it is inferring the schema by sampling the file and reading through it.

Inferring the schema works for ad hoc analysis against smaller datasets. But when working on multi-TB+ data, it's better to provide an **explicit pre-defined schema manually**, so there's no inferring cost:

In [29]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType

In [30]:
# Note that we are removing all space characters from the col names to prevent errors when writing to Parquet later

fireSchema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),       
                     StructField('WatchDate', StringType(), True),       
                     StructField('ReceivedDtTm', StringType(), True),       
                     StructField('EntryDtTm', StringType(), True),       
                     StructField('DispatchDtTm', StringType(), True),       
                     StructField('ResponseDtTm', StringType(), True),       
                     StructField('OnSceneDtTm', StringType(), True),       
                     StructField('TransportDtTm', StringType(), True),                  
                     StructField('HospitalDtTm', StringType(), True),       
                     StructField('CallFinalDisposition', StringType(), True),       
                     StructField('AvailableDtTm', StringType(), True),       
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('ZipcodeofIncident', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumberofAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('Unitsequenceincalldispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('NeighborhoodDistrict', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True)])

In [31]:
#Notice that no job is run this time
fireServiceCallsDF = spark.read.csv('/mnt/sf_open_data/fire_dept_calls_for_service/Fire_Department_Calls_for_Service.csv', header=True, schema=fireSchema)

Look at the first 5 records in the DataFrame:

In [33]:
display(fireServiceCallsDF.limit(5))

CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,ReceivedDtTm,EntryDtTm,DispatchDtTm,ResponseDtTm,OnSceneDtTm,TransportDtTm,HospitalDtTm,CallFinalDisposition,AvailableDtTm,Address,City,ZipcodeofIncident,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumberofAlarms,UnitType,Unitsequenceincalldispatch,FirePreventionDistrict,SupervisorDistrict,NeighborhoodDistrict,Location,RowID
142480332,B02,14086309,Alarms,09/05/2014,09/04/2014,09/05/2014 03:15:13 AM,09/05/2014 03:17:26 AM,09/05/2014 03:18:18 AM,09/05/2014 03:20:30 AM,09/05/2014 03:24:11 AM,04/25/2016 01:15:16 PM,04/25/2016 01:15:16 PM,Fire,09/05/2014 03:33:20 AM,1600 Block of HAIGHT ST,San Francisco,94117,B05,12,4525,3,3,3,True,Alarm,1,CHIEF,3,5,5,Haight Ashbury,"(37.7695711762103, -122.449920089485)",142480332-B02
153022542,T02,15115908,Structure Fire,10/29/2015,10/29/2015,10/29/2015 03:39:06 PM,10/29/2015 03:39:25 PM,10/29/2015 03:39:49 PM,10/29/2015 03:40:55 PM,10/29/2015 03:43:21 PM,04/25/2016 01:07:30 PM,04/25/2016 01:07:30 PM,Fire,10/29/2015 03:51:21 PM,BATTERY ST/VALLEJO ST,San Francisco,94111,B01,13,1155,3,3,3,False,Alarm,1,TRUCK,4,1,3,Financial District/South Beach,"(37.7995314468258, -122.401240243673)",153022542-T02
143451112,AM04,14122741,Medical Incident,12/11/2014,12/11/2014,12/11/2014 09:02:07 AM,12/11/2014 09:03:01 AM,12/11/2014 09:03:11 AM,12/11/2014 09:06:19 AM,12/11/2014 09:20:16 AM,12/11/2014 09:20:26 AM,12/11/2014 09:43:41 AM,Code 2 Transport,12/11/2014 10:06:26 AM,300 Block of BUENA VISTA AVE,San Francisco,94117,B05,21,5136,3,3,3,False,Potentially Life-Threatening,1,PRIVATE,1,5,8,Castro/Upper Market,"(37.7668035178194, -122.440704687809)",143451112-AM04
141660300,E01,14057129,Medical Incident,06/15/2014,06/14/2014,06/15/2014 02:04:57 AM,06/15/2014 02:06:42 AM,06/15/2014 02:10:01 AM,06/15/2014 02:12:55 AM,06/15/2014 02:24:55 AM,04/25/2016 01:16:45 PM,04/25/2016 01:16:45 PM,Code 2 Transport,06/15/2014 02:51:39 AM,0 Block of HALLAM ST,San Francisco,94103,B03,1,2313,2,2,2,True,Non Life-threatening,1,ENGINE,2,2,6,South of Market,"(37.7756902570435, -122.408609057895)",141660300-E01
152633454,E36,15100829,Outside Fire,09/20/2015,09/20/2015,09/20/2015 08:15:00 PM,09/20/2015 08:15:53 PM,09/20/2015 08:16:17 PM,09/20/2015 08:18:07 PM,04/25/2016 01:08:14 PM,04/25/2016 01:08:14 PM,04/25/2016 01:08:14 PM,Fire,09/20/2015 08:22:11 PM,MARKET ST/VAN NESS AV,San Francisco,94103,B02,36,3211,3,3,3,True,Fire,1,ENGINE,1,2,6,Mission,"(37.7751470741622, -122.419255607214)",152633454-E36


Print just the column names in the DataFrame:

In [35]:
fireServiceCallsDF.columns

Count how many rows total there are in DataFrame (and see how long it takes to do a full scan from remote disk/S3):

In [37]:
fireServiceCallsDF.count()

There are over 4 million rows in the DataFrame and it takes ~14 seconds to do a full read of it.

Open the Apache Spark 2.0 early release documentation in new tabs, so you can easily reference the API guide:

1) Spark 2.0 preview docs: https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/

2) DataFrame user documentation: https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/sql-programming-guide.html

3) PySpark API 2.0 docs: https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/python/index.html

### ![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) **Analysis with PySpark DataFrames API**

####![Spark Operations](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/spark_ta.png)

DataFrames support two types of operations: *transformations* and *actions*.

Transformations, like `select()` or `filter()` create a new DataFrame from an existing one.

Actions, like `show()` or `count()`, return a value with results to the user. Other actions like `save()` write the DataFrame to distributed storage (like S3 or HDFS).

####![Spark T/A](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pagecounts/trans_and_actions.png)

Transformations contribute to a query plan,  but  nothing is executed until an action is called.

**Q-1) How many different types of calls were made to the Fire Department?**

In [46]:
# Use the .select() transformation to yank out just the 'Call Type' column, then call the show action
fireServiceCallsDF.select('CallType').show(5)

In [47]:
# Add the .distinct() transformation to keep only distinct rows
# The False below expands the ASCII column width to fit the full text in the output

fireServiceCallsDF.select('CallType').distinct().show(35, False)

**Q-2) How many incidents of each call type were there?**

In [49]:
#Note that .count() is actually a transformation here

display(fireServiceCallsDF.select('CallType').groupBy('CallType').count().orderBy("count", ascending=False))

CallType,count
Medical Incident,2590898
Structure Fire,560055
Alarms,432235
Traffic Collision,158525
Other,64156
Citizen Assist / Service Call,62031
Outside Fire,46273
Administrative,30129
,26961
Vehicle Fire,20001


Seems like the SF Fire department is called for medical incidents far more than any other type. Note that the above command took about 14 seconds to execute. In an upcoming section, we'll cache the data into memory for up to 100x speed increases.

### ![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) ** Doing Date/Time Analysis**

**Q-3) How many years of Fire Service Calls is in the data file?**

Notice that the date or time columns are currently being interpreted as strings, rather than date or time objects:

In [54]:
fireServiceCallsDF.printSchema()

Let's use the unix_timestamp() function to convert the string into a timestamp:

https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/python/pyspark.sql.html?highlight=spark#pyspark.sql.functions.from_unixtime

In [56]:
from pyspark.sql.functions import *

In [57]:
# Note that PySpark uses the Java Simple Date Format patterns

from_pattern1 = 'MM/dd/yyyy'
to_pattern1 = 'yyyy-MM-dd'

from_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'
to_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'


fireServiceCallsTsDF = fireServiceCallsDF \
  .withColumn('CallDateTS', unix_timestamp(fireServiceCallsDF['CallDate'], from_pattern1).cast("timestamp")) \
  .drop('CallDate') \
  .withColumn('WatchDateTS', unix_timestamp(fireServiceCallsDF['WatchDate'], from_pattern1).cast("timestamp")) \
  .drop('WatchDate') \
  .withColumn('ReceivedDtTmTS', unix_timestamp(fireServiceCallsDF['ReceivedDtTm'], from_pattern2).cast("timestamp")) \
  .drop('ReceivedDtTm') \
  .withColumn('EntryDtTmTS', unix_timestamp(fireServiceCallsDF['EntryDtTm'], from_pattern2).cast("timestamp")) \
  .drop('EntryDtTm') \
  .withColumn('DispatchDtTmTS', unix_timestamp(fireServiceCallsDF['DispatchDtTm'], from_pattern2).cast("timestamp")) \
  .drop('DispatchDtTm') \
  .withColumn('ResponseDtTmTS', unix_timestamp(fireServiceCallsDF['ResponseDtTm'], from_pattern2).cast("timestamp")) \
  .drop('ResponseDtTm') \
  .withColumn('OnSceneDtTmTS', unix_timestamp(fireServiceCallsDF['OnSceneDtTm'], from_pattern2).cast("timestamp")) \
  .drop('OnSceneDtTm') \
  .withColumn('TransportDtTmTS', unix_timestamp(fireServiceCallsDF['TransportDtTm'], from_pattern2).cast("timestamp")) \
  .drop('TransportDtTm') \
  .withColumn('HospitalDtTmTS', unix_timestamp(fireServiceCallsDF['HospitalDtTm'], from_pattern2).cast("timestamp")) \
  .drop('HospitalDtTm') \
  .withColumn('AvailableDtTmTS', unix_timestamp(fireServiceCallsDF['AvailableDtTm'], from_pattern2).cast("timestamp")) \
  .drop('AvailableDtTm')  

In [58]:
fireServiceCallsTsDF.printSchema()

Notice that the formatting of the timestamps is now different:

In [60]:
display(fireServiceCallsTsDF.limit(5))

CallNumber,UnitID,IncidentNumber,CallType,CallFinalDisposition,Address,City,ZipcodeofIncident,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumberofAlarms,UnitType,Unitsequenceincalldispatch,FirePreventionDistrict,SupervisorDistrict,NeighborhoodDistrict,Location,RowID,CallDateTS,WatchDateTS,ReceivedDtTmTS,EntryDtTmTS,DispatchDtTmTS,ResponseDtTmTS,OnSceneDtTmTS,TransportDtTmTS,HospitalDtTmTS,AvailableDtTmTS
142480332,B02,14086309,Alarms,Fire,1600 Block of HAIGHT ST,San Francisco,94117,B05,12,4525,3,3,3,True,Alarm,1,CHIEF,3,5,5,Haight Ashbury,"(37.7695711762103, -122.449920089485)",142480332-B02,2014-09-05 00:00:00,2014-09-04 00:00:00,2014-09-05 03:15:13,2014-09-05 03:17:26,2014-09-05 03:18:18,2014-09-05 03:20:30,2014-09-05 03:24:11,2016-04-25 13:15:16,2016-04-25 13:15:16,2014-09-05 03:33:20
153022542,T02,15115908,Structure Fire,Fire,BATTERY ST/VALLEJO ST,San Francisco,94111,B01,13,1155,3,3,3,False,Alarm,1,TRUCK,4,1,3,Financial District/South Beach,"(37.7995314468258, -122.401240243673)",153022542-T02,2015-10-29 00:00:00,2015-10-29 00:00:00,2015-10-29 15:39:06,2015-10-29 15:39:25,2015-10-29 15:39:49,2015-10-29 15:40:55,2015-10-29 15:43:21,2016-04-25 13:07:30,2016-04-25 13:07:30,2015-10-29 15:51:21
143451112,AM04,14122741,Medical Incident,Code 2 Transport,300 Block of BUENA VISTA AVE,San Francisco,94117,B05,21,5136,3,3,3,False,Potentially Life-Threatening,1,PRIVATE,1,5,8,Castro/Upper Market,"(37.7668035178194, -122.440704687809)",143451112-AM04,2014-12-11 00:00:00,2014-12-11 00:00:00,2014-12-11 09:02:07,2014-12-11 09:03:01,2014-12-11 09:03:11,2014-12-11 09:06:19,2014-12-11 09:20:16,2014-12-11 09:20:26,2014-12-11 09:43:41,2014-12-11 10:06:26
141660300,E01,14057129,Medical Incident,Code 2 Transport,0 Block of HALLAM ST,San Francisco,94103,B03,1,2313,2,2,2,True,Non Life-threatening,1,ENGINE,2,2,6,South of Market,"(37.7756902570435, -122.408609057895)",141660300-E01,2014-06-15 00:00:00,2014-06-14 00:00:00,2014-06-15 02:04:57,2014-06-15 02:06:42,2014-06-15 02:10:01,2014-06-15 02:12:55,2014-06-15 02:24:55,2016-04-25 13:16:45,2016-04-25 13:16:45,2014-06-15 02:51:39
152633454,E36,15100829,Outside Fire,Fire,MARKET ST/VAN NESS AV,San Francisco,94103,B02,36,3211,3,3,3,True,Fire,1,ENGINE,1,2,6,Mission,"(37.7751470741622, -122.419255607214)",152633454-E36,2015-09-20 00:00:00,2015-09-20 00:00:00,2015-09-20 20:15:00,2015-09-20 20:15:53,2015-09-20 20:16:17,2015-09-20 20:18:07,2016-04-25 13:08:14,2016-04-25 13:08:14,2016-04-25 13:08:14,2015-09-20 20:22:11


Finally calculate how many distinct years of data is in the CSV file:

In [62]:
fireServiceCallsTsDF.select(year('CallDateTS')).distinct().orderBy('year(CallDateTS)').show()

**Q-4) How many service calls were logged in the past 7 days?**

Note that today, July 6th, is the 187th day of the year.

Filter the DF down to just 2016 and days of year greater than 180:

In [65]:
fireServiceCallsTsDF.filter(year('CallDateTS') == '2016').filter(dayofyear('CallDateTS') >= 180).select(dayofyear('CallDateTS')).distinct().orderBy('dayofyear(CallDateTS)').show()

In [66]:
fireServiceCallsTsDF.filter(year('CallDateTS') == '2016').filter(dayofyear('CallDateTS') >= 180).groupBy(dayofyear('CallDateTS')).count().orderBy('dayofyear(CallDateTS)').show()

Note above that July 4th, 2016 was the 185th day of the year.

Visualize the results in a bar graph:

In [69]:
display(fireServiceCallsTsDF.filter(year('CallDateTS') == '2016').filter(dayofyear('CallDateTS') >= 180).groupBy(dayofyear('CallDateTS')).count().orderBy('dayofyear(CallDateTS)'))

dayofyear(CallDateTS),count
180,753
181,731
182,797
183,847
184,729
185,797
186,958


### ![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) ** Memory, Caching and write to Parquet**

The DataFrame is currently comprised of 13 partitions:

In [72]:
fireServiceCallsTsDF.rdd.getNumPartitions()

![Partitions](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/df_13_parts.png)

In [74]:
fireServiceCallsTsDF.repartition(6).createOrReplaceTempView("fireServiceVIEW");

In [75]:
spark.catalog.cacheTable("fireServiceVIEW")

In [76]:
# Call .count() to materialize the cache
spark.table("fireServiceVIEW").count()

In [77]:
fireServiceDF = spark.table("fireServiceVIEW")

In [78]:
# Note that the full scan + count in memory takes < 1 second!

fireServiceDF.count()

In [79]:
spark.catalog.isCached("fireServiceVIEW")

The 6 partitions are now cached in memory. Each partition should be around 15 - 200 MB in-memory, larger than 100 MB is better.

![6 Partitions](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/df_6_parts.png)

Use the Spark UI to see the 6 partitions in memory:

![Mem UI](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/mem_ui.png)

Now that our data has the correct date types for each column and it is correctly partitioned, let's write it down as a parquet file for future loading:

In [85]:
%fs ls /tmp/

path,name,size
dbfs:/tmp/fireServiceParquet/,fireServiceParquet/,0
dbfs:/tmp/hive/,hive/,0


In [86]:
fireServiceDF.write.format('parquet').save('/tmp/fireServiceParquet/')

Now the directory should contain 6 .gz compressed Parquet files (one for each partition):

In [88]:
%fs ls /tmp/fireServiceParquet/

path,name,size
dbfs:/tmp/fireServiceParquet/_SUCCESS,_SUCCESS,0
dbfs:/tmp/fireServiceParquet/_committed_1575499903064082612,_committed_1575499903064082612,618
dbfs:/tmp/fireServiceParquet/_started_1575499903064082612,_started_1575499903064082612,0
dbfs:/tmp/fireServiceParquet/part-00000-tid-1575499903064082612-99dcf718-5583-4ce4-a55a-b39daa31db76-1578-c000.snappy.parquet,part-00000-tid-1575499903064082612-99dcf718-5583-4ce4-a55a-b39daa31db76-1578-c000.snappy.parquet,76880038
dbfs:/tmp/fireServiceParquet/part-00001-tid-1575499903064082612-99dcf718-5583-4ce4-a55a-b39daa31db76-1579-c000.snappy.parquet,part-00001-tid-1575499903064082612-99dcf718-5583-4ce4-a55a-b39daa31db76-1579-c000.snappy.parquet,76413904
dbfs:/tmp/fireServiceParquet/part-00002-tid-1575499903064082612-99dcf718-5583-4ce4-a55a-b39daa31db76-1580-c000.snappy.parquet,part-00002-tid-1575499903064082612-99dcf718-5583-4ce4-a55a-b39daa31db76-1580-c000.snappy.parquet,76771725
dbfs:/tmp/fireServiceParquet/part-00003-tid-1575499903064082612-99dcf718-5583-4ce4-a55a-b39daa31db76-1581-c000.snappy.parquet,part-00003-tid-1575499903064082612-99dcf718-5583-4ce4-a55a-b39daa31db76-1581-c000.snappy.parquet,76871756
dbfs:/tmp/fireServiceParquet/part-00004-tid-1575499903064082612-99dcf718-5583-4ce4-a55a-b39daa31db76-1582-c000.snappy.parquet,part-00004-tid-1575499903064082612-99dcf718-5583-4ce4-a55a-b39daa31db76-1582-c000.snappy.parquet,76863346
dbfs:/tmp/fireServiceParquet/part-00005-tid-1575499903064082612-99dcf718-5583-4ce4-a55a-b39daa31db76-1583-c000.snappy.parquet,part-00005-tid-1575499903064082612-99dcf718-5583-4ce4-a55a-b39daa31db76-1583-c000.snappy.parquet,76371486


Here's how you can easily read the parquet file from S3 in the future:

In [90]:
tempDF = spark.read.parquet('/tmp/fireServiceParquet/')

In [91]:
display(tempDF.limit(2))

CallNumber,UnitID,IncidentNumber,CallType,CallFinalDisposition,Address,City,ZipcodeofIncident,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumberofAlarms,UnitType,Unitsequenceincalldispatch,FirePreventionDistrict,SupervisorDistrict,NeighborhoodDistrict,Location,RowID,CallDateTS,WatchDateTS,ReceivedDtTmTS,EntryDtTmTS,DispatchDtTmTS,ResponseDtTmTS,OnSceneDtTmTS,TransportDtTmTS,HospitalDtTmTS,AvailableDtTmTS
113510161,RC2,11116461,Medical Incident,Other,100 Block of GOLDEN GATE AVE,SF,94102,B99,51,4616,3,E,3,True,,1,RESCUE CAPTAIN,3,2,6,Tenderloin,"(37.7820224371777, -122.413054148253)",113510161-RC2,2011-12-17 00:00:00,2011-12-17 00:00:00,2011-12-17 10:20:51,2011-12-17 10:22:25,2011-12-17 10:23:01,2011-12-17 10:25:02,2011-12-17 10:29:22,2016-04-25 14:01:06,2016-04-25 14:01:06,2011-12-17 10:53:30
112170309,81,11071813,Medical Incident,Other,1400 Block of ALABAMA ST,SF,94110,B06,9,5615,1,1,2,True,,1,MEDIC,3,6,9,Mission,"(37.7489077084041, -122.41053305539)",112170309-81,2011-08-05 00:00:00,2011-08-05 00:00:00,2011-08-05 18:47:50,2011-08-05 18:50:11,2011-08-05 18:50:57,2011-08-05 18:51:06,2016-04-25 14:03:19,2016-04-25 14:03:19,2016-04-25 14:03:19,2016-04-25 14:03:19


Did you know that the new vectorized Parquet decoder in Spark 2.0 has improved Parquet scan throughput by 3x?

### ![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) **SQL Queries**

In [94]:
%sql SELECT count(*) FROM fireServiceVIEW;

count(1)
4091248


Explain the 'Spark Jobs' in the cell above to see that 7 tasks were launched to run the count... 6 tasks to reach the data from each of the 6 partitions and do a pre-aggregation on each partition, then a final task to aggregate the count from all 6 tasks:

![Job details](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/6_tasks.png)

You can use the Spark Stages UI to see the 6 tasks launched in the middle stage:

![Event Timeline](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/event_timeline.png)

**Q-5) Which neighborhood in SF generated the most calls last year?**

In [100]:
%sql SELECT `NeighborhoodDistrict`, count(`NeighborhoodDistrict`) AS Neighborhood_Count FROM fireServiceVIEW WHERE year(`CallDateTS`) == '2015' GROUP BY `NeighborhoodDistrict` ORDER BY Neighborhood_Count DESC LIMIT 15;

NeighborhoodDistrict,Neighborhood_Count
Tenderloin,39607
South of Market,30550
Mission,26638
Financial District/South Beach,21301
Bayview Hunters Point,14678
Sunset/Parkside,11265
Western Addition,10514
Nob Hill,10344
Outer Richmond,7802
Hayes Valley,7594


Expand the Spark Job details in the cell above and notice that the last stage uses 200 partitions! This is default is non-optimal, given that we only have ~1.6 GB of data and 3 slots.

Change the shuffle.partitions option to 6:

In [102]:
spark.conf.get("spark.sql.shuffle.partitions")

In [103]:
spark.conf.set("spark.sql.shuffle.partitions", 6)

In [104]:
spark.conf.get("spark.sql.shuffle.partitions")

Re-run the same SQL query and notice the speed increase:

In [106]:
%sql SELECT `NeighborhoodDistrict`, count(`NeighborhoodDistrict`) AS Neighborhood_Count FROM fireServiceVIEW WHERE year(`CallDateTS`) == '2015' GROUP BY `NeighborhoodDistrict` ORDER BY Neighborhood_Count DESC LIMIT 15;

NeighborhoodDistrict,Neighborhood_Count
Tenderloin,39607
South of Market,30550
Mission,26638
Financial District/South Beach,21301
Bayview Hunters Point,14678
Sunset/Parkside,11265
Western Addition,10514
Nob Hill,10344
Outer Richmond,7802
Hayes Valley,7594


SQL also has some handy commands like `DESC` (describe) to see the schema + data types for the table:

In [108]:
%sql DESC fireServiceVIEW;

col_name,data_type,comment
CallNumber,int,
UnitID,string,
IncidentNumber,int,
CallType,string,
CallFinalDisposition,string,
Address,string,
City,string,
ZipcodeofIncident,int,
Battalion,string,
StationArea,string,


### ![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) ** Spark Internals and SQL UI**

![Catalyst](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/catalyst.png)

In [111]:
# Note that a SQL Query just returns back a DataFrame
spark.sql("SELECT `NeighborhoodDistrict`, count(`NeighborhoodDistrict`) AS Neighborhood_Count FROM fireServiceVIEW WHERE year(`CallDateTS`) == '2015' GROUP BY `NeighborhoodDistrict` ORDER BY Neighborhood_Count DESC LIMIT 15")

The `explain()` method can be called on a DataFrame to understand its logical + physical plans:

In [113]:
spark.sql("SELECT `NeighborhoodDistrict`, count(`NeighborhoodDistrict`) AS Neighborhood_Count FROM fireServiceVIEW WHERE year(`CallDateTS`) == '2015' GROUP BY `NeighborhoodDistrict` ORDER BY Neighborhood_Count DESC LIMIT 15").explain(True)

You can view the visual representation of the SQL Query plan from the Spark UI:

![SQL Plan](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/sql_query_plan.png)

### ![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) ** DataFrame Joins**

**Q-6) What was the primary non-medical reason most people called the fire department from the Tenderloin last year?**

The "Fire Incidents" data includes a summary of each (non-medical) incident to which the SF Fire Department responded.

Let's do a join to the Fire Incidents data on the "Incident Number" column:

https://data.sfgov.org/Public-Safety/Fire-Incidents/wr8u-xric

Read the Fire Incidents CSV file into a DataFrame:

In [121]:
incidentsDF = spark.read.csv('/mnt/sf_open_data/fire_incidents/Fire_Incidents.csv', header=True, inferSchema=True).withColumnRenamed('Incident Number', 'IncidentNumber').cache()

In [122]:
incidentsDF.printSchema()

In [123]:
# Materialize the cache
incidentsDF.count()

In [124]:
display(incidentsDF.limit(3))

IncidentNumber,Exposure Number,Address,Incident Date,Call Number,Alarm DtTm,Arrival DtTm,Close DtTm,City,Zipcode,Battalion,Station Area,Box,Suppression Units,Suppression Personnel,EMS Units,EMS Personnel,Other Units,Other Personnel,First Unit On Scene,Estimated Property Loss,Estimated Contents Loss,Fire Fatalities,Fire Injuries,Civilian Fatalities,Civilian Injuries,Number of Alarms,Primary Situation,Mutual Aid,Action Taken Primary,Action Taken Secondary,Action Taken Other,Detector Alerted Occupants,Property Use,Area of Fire Origin,Ignition Cause,Ignition Factor Primary,Ignition Factor Secondary,Heat Source,Item First Ignited,Human Factors Associated with Ignition,Structure Type,Structure Status,Floor of Fire Origin,Fire Spread,No Flame Spead,Number of floors with minimum damage,Number of floors with significant damage,Number of floors with heavy damage,Number of floors with extreme damage,Detectors Present,Detector Type,Detector Operation,Detector Effectiveness,Detector Failure Reason,Automatic Extinguishing System Present,Automatic Extinguishing Sytem Type,Automatic Extinguishing Sytem Perfomance,Automatic Extinguishing Sytem Failure Reason,Number of Sprinkler Heads Operating,Supervisor District,Neighborhood District,Location
9030109,0,310 Colon Av.,04/12/2009,91020273,04/12/2009 06:09:13 PM,04/12/2009 06:13:45 PM,04/12/2009 07:23:13 PM,SF,,B09,15,,1,5,0,0,0,0,T15,,,0,0,0,0,,551 - assist pd or other govern. agency,none,52 - forcible entry,-,-,-,"000 - property use, other",,,,,,,,,,,,,,,,,,,,,,,,,,,,,
13067402,0,20 Lansdale Av,07/18/2013,131990117,07/18/2013 10:32:03 AM,07/18/2013 10:37:15 AM,07/18/2013 10:39:55 AM,SF,,B09,39,8571.0,3,11,0,0,0,0,E39,,,0,0,0,0,,745 - alarm system sounded/no fire-accidental,none,86 - investigate,-,-,-,429 - multifamily dwellings,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
12044490,0,7th St. / Folsom St.,05/13/2012,121340051,05/13/2012 03:55:37 AM,05/13/2012 04:01:57 AM,05/13/2012 04:05:44 AM,SF,94103.0,B03,1,,3,10,0,0,0,0,B03,,,0,0,0,0,,"711 - municipal alarm system, street box false",none,86 - investigate,-,-,-,963 - street or road in commercial area,,,,,,,,,,,,,,,,,,,,,,,,,,,6.0,South of Market,"(37.7767460000297, -122.407844)"


In [125]:
joinedDF = fireServiceDF.join(incidentsDF, fireServiceDF.IncidentNumber == incidentsDF.IncidentNumber)

In [126]:
display(joinedDF.limit(3))

CallNumber,UnitID,IncidentNumber,CallType,CallFinalDisposition,Address,City,ZipcodeofIncident,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumberofAlarms,UnitType,Unitsequenceincalldispatch,FirePreventionDistrict,SupervisorDistrict,NeighborhoodDistrict,Location,RowID,CallDateTS,WatchDateTS,ReceivedDtTmTS,EntryDtTmTS,DispatchDtTmTS,ResponseDtTmTS,OnSceneDtTmTS,TransportDtTmTS,HospitalDtTmTS,AvailableDtTmTS,IncidentNumber.1,Exposure Number,Address.1,Incident Date,Call Number,Alarm DtTm,Arrival DtTm,Close DtTm,City.1,Zipcode,Battalion.1,Station Area,Box.1,Suppression Units,Suppression Personnel,EMS Units,EMS Personnel,Other Units,Other Personnel,First Unit On Scene,Estimated Property Loss,Estimated Contents Loss,Fire Fatalities,Fire Injuries,Civilian Fatalities,Civilian Injuries,Number of Alarms,Primary Situation,Mutual Aid,Action Taken Primary,Action Taken Secondary,Action Taken Other,Detector Alerted Occupants,Property Use,Area of Fire Origin,Ignition Cause,Ignition Factor Primary,Ignition Factor Secondary,Heat Source,Item First Ignited,Human Factors Associated with Ignition,Structure Type,Structure Status,Floor of Fire Origin,Fire Spread,No Flame Spead,Number of floors with minimum damage,Number of floors with significant damage,Number of floors with heavy damage,Number of floors with extreme damage,Detectors Present,Detector Type,Detector Operation,Detector Effectiveness,Detector Failure Reason,Automatic Extinguishing System Present,Automatic Extinguishing Sytem Type,Automatic Extinguishing Sytem Perfomance,Automatic Extinguishing Sytem Failure Reason,Number of Sprinkler Heads Operating,Supervisor District,Neighborhood District,Location.1
30010041,RC2,3000027,Medical Incident,Other,0 Block of PARKER AVE,SF,94118,B07,10,4436,3,3,3,True,,1,RESCUE CAPTAIN,3,7,2,Presidio Heights,"(37.7851399842228, -122.454627027921)",030010041-RC2,2003-01-01 00:00:00,2002-12-31 00:00:00,2003-01-01 00:42:41,2003-01-01 00:43:39,2003-01-01 00:43:53,2016-04-25 21:09:36,2016-04-25 21:09:36,2016-04-25 21:09:36,2016-04-25 21:09:36,2003-01-01 00:54:51,3000027,0,2 Parker Av.,01/01/2003,30010041,01/01/2003 12:43:53 AM,01/01/2003 12:47:00 AM,01/01/2003 01:43:22 AM,SF,,B07,10,4436,1,4,1,4,0,0,E10,0,0.0,0,0,0,0,,"463 - vehicle accident, general cleanup",none,"10 - fire, other",-,-,-,"960 - street, other",-,-,-,-,-,-,-,-,-,,-,,,,,,-,-,-,-,-,-,-,-,-,,,,
30010041,E10,3000027,Medical Incident,Other,0 Block of PARKER AVE,SF,94118,B07,10,4436,3,3,3,True,,1,ENGINE,1,7,2,Presidio Heights,"(37.7851399842228, -122.454627027921)",030010041-E10,2003-01-01 00:00:00,2002-12-31 00:00:00,2003-01-01 00:42:41,2003-01-01 00:43:39,2003-01-01 00:43:53,2003-01-01 00:45:10,2003-01-01 00:47:00,2016-04-25 21:09:36,2016-04-25 21:09:36,2003-01-01 00:58:18,3000027,0,2 Parker Av.,01/01/2003,30010041,01/01/2003 12:43:53 AM,01/01/2003 12:47:00 AM,01/01/2003 01:43:22 AM,SF,,B07,10,4436,1,4,1,4,0,0,E10,0,0.0,0,0,0,0,,"463 - vehicle accident, general cleanup",none,"10 - fire, other",-,-,-,"960 - street, other",-,-,-,-,-,-,-,-,-,,-,,,,,,-,-,-,-,-,-,-,-,-,,,,
30010041,M38,3000027,Medical Incident,Other,0 Block of PARKER AVE,SF,94118,B07,10,4436,3,3,3,True,,1,MEDIC,2,7,2,Presidio Heights,"(37.7851399842228, -122.454627027921)",030010041-M38,2003-01-01 00:00:00,2002-12-31 00:00:00,2003-01-01 00:42:41,2003-01-01 00:43:39,2003-01-01 00:43:53,2003-01-01 00:45:16,2003-01-01 00:50:04,2003-01-01 01:25:04,2003-01-01 01:25:08,2003-01-01 00:54:07,3000027,0,2 Parker Av.,01/01/2003,30010041,01/01/2003 12:43:53 AM,01/01/2003 12:47:00 AM,01/01/2003 01:43:22 AM,SF,,B07,10,4436,1,4,1,4,0,0,E10,0,0.0,0,0,0,0,,"463 - vehicle accident, general cleanup",none,"10 - fire, other",-,-,-,"960 - street, other",-,-,-,-,-,-,-,-,-,,-,,,,,,-,-,-,-,-,-,-,-,-,,,,


In [127]:
#Note that the joined DF is only 1.1 million rows b/c we did an inner join (the original Fire Service Calls data had 4+ million rows)
joinedDF.count()

In [128]:
joinedDF.filter(year('CallDateTS') == '2015').filter(col('NeighborhoodDistrict') == 'Tenderloin').count()

In [129]:
display(joinedDF.filter(year('CallDateTS') == '2015').filter(col('NeighborhoodDistrict') == 'Tenderloin').groupBy('Primary Situation').count().orderBy(desc("count")).limit(10))

Primary Situation,count
"700 false alarm or false call, other",1389
"711 municipal alarm system, malicious false alarm",1012
"743 smoke detector activation, no fire - unintentional",574
735 alarm system sounded due to malfunction,569
"113 cooking fire, confined to container",489
"500 service call, other",489
"745 alarm system activation, no fire - unintentional",438
733 smoke detector activation due to malfunction,404
"100 fire, other",190
"600 good intent call, other",179


Most of the calls were False Alarms!

What do residents of Russian Hill call the fire department for?

In [132]:
display(joinedDF.filter(year('CallDateTS') == '2015').filter(col('NeighborhoodDistrict') == 'Russian Hill').groupBy('Primary Situation').count().orderBy(desc("count")).limit(10))

Primary Situation,count
"500 service call, other",332
"700 false alarm or false call, other",320
"711 municipal alarm system, malicious false alarm",144
111 building fire,104
322 motor vehicle accident with injuries,94
"113 cooking fire, confined to container",79
"100 fire, other",61
"745 alarm system activation, no fire - unintentional",60
"600 good intent call, other",57
323 motor vehicle/pedestrian accident (mv ped),54


### ![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) ** Convert a Spark DataFrame to a Pandas DataFrame **

In [134]:
import pandas as pd

In [135]:
pandas2016DF = joinedDF.filter(year('CallDateTS') == '2016').toPandas()

In [136]:
pandas2016DF.dtypes

In [137]:
pandas2016DF.head()

In [138]:
pandas2016DF.describe()

### ** Keep Hacking! **