In [1]:
%fs ls /mnt/Fire_Department_Calls_for_Service

Note, you can also access the 1.6 GB of data directly from sfgov.org via this link: https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3

In [3]:
spark

Using the SparkSession, create a DataFrame from the CSV file by inferring the schema:

In [5]:
fireServiceCallsDF = spark.read.csv('dbfs:/mnt/Fire_Department_Calls_for_Service/Fire_Department_Calls_for_Service.csv', header=True, inferSchema=True)

Notice that the above cell takes ~15 seconds to run b/c it is inferring the schema by sampling the file and reading through it.

Inferring the schema works for ad hoc analysis against smaller datasets. But when working on multi-TB+ data, it's better to provide an **explicit pre-defined schema manually**, so there's no inferring cost:

In [7]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType

In [8]:
# Note that we are removing all space characters from the col names to prevent errors when writing to Parquet later

fireSchema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),       
                     StructField('WatchDate', StringType(), True),       
                     StructField('ReceivedDtTm', StringType(), True),       
                     StructField('EntryDtTm', StringType(), True),       
                     StructField('DispatchDtTm', StringType(), True),       
                     StructField('ResponseDtTm', StringType(), True),       
                     StructField('OnSceneDtTm', StringType(), True),       
                     StructField('TransportDtTm', StringType(), True),                  
                     StructField('HospitalDtTm', StringType(), True),       
                     StructField('CallFinalDisposition', StringType(), True),       
                     StructField('AvailableDtTm', StringType(), True),       
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('ZipcodeofIncident', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumberofAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('Unitsequenceincalldispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('NeighborhoodDistrict', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True)])

In [9]:
#Notice that no job is run this time
fireServiceCallsDF = spark.read.csv('/FileStore/tables/Fire_Department_Calls_for_Service.csv', header=True, schema=fireSchema)

In [10]:
display(fireServiceCallsDF.limit(5))

In [11]:
fireServiceCallsDF.columns

Count how many rows total there are in DataFrame (and see how long it takes to do a full scan from remote disk/S3):

In [13]:
fireServiceCallsDF.count()

There are over 4.6 million rows in the DataFrame and it takes ~14 seconds to do a full read of it.

DataFrames support two types of operations: *transformations* and *actions*.

Transformations, like `select()` or `filter()` create a new DataFrame from an existing one.

Actions, like `show()` or `count()`, return a value with results to the user. Other actions like `save()` write the DataFrame to distributed storage (like S3 or HDFS).

####![Spark T/A](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pagecounts/trans_and_actions.png)

Transformations contribute to a query plan,  but  nothing is executed until an action is called.

**Q-1) How many different types of calls were made to the Fire Department?**

In [19]:
# Use the .select() transformation to yank out just the 'Call Type' column, then call the show action
fireServiceCallsDF.select('CallType').show(5)

In [20]:
# Add the .distinct() transformation to keep only distinct rows
# The False below expands the ASCII column width to fit the full text in the output

fireServiceCallsDF.select('CallType').distinct().show(35,False)

**Q-2) How many incidents of each call type were there?**

In [22]:
#Note that .count() is actually a transformation here

display(fireServiceCallsDF.select('CallType').groupBy('CallType').count().orderBy("count", ascending=False))

Seems like the SF Fire department is called for medical incidents far more than any other type.

** Date/Time Analysis**

**Q-3) How many years of Fire Service Calls is in the data file?**

Notice that the date or time columns are currently being interpreted as strings, rather than date or time objects:

In [27]:
fireServiceCallsDF.printSchema()

Let's use the unix_timestamp() function to convert the string into a timestamp:

https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/python/pyspark.sql.html?highlight=spark#pyspark.sql.functions.from_unixtime

In [29]:
from pyspark.sql.functions import *

In [30]:
# Note that PySpark uses the Java Simple Date Format patterns

from_pattern1 = 'MM/dd/yyyy'
to_pattern1 = 'yyyy-MM-dd'

from_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'
to_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'


fireServiceCallsTsDF = fireServiceCallsDF \
  .withColumn('CallDateTS', unix_timestamp(fireServiceCallsDF['CallDate'], from_pattern1).cast("timestamp")) \
  .drop('CallDate') \
  .withColumn('WatchDateTS', unix_timestamp(fireServiceCallsDF['WatchDate'], from_pattern1).cast("timestamp")) \
  .drop('WatchDate') \
  .withColumn('ReceivedDtTmTS', unix_timestamp(fireServiceCallsDF['ReceivedDtTm'], from_pattern2).cast("timestamp")) \
  .drop('ReceivedDtTm') \
  .withColumn('EntryDtTmTS', unix_timestamp(fireServiceCallsDF['EntryDtTm'], from_pattern2).cast("timestamp")) \
  .drop('EntryDtTm') \
  .withColumn('DispatchDtTmTS', unix_timestamp(fireServiceCallsDF['DispatchDtTm'], from_pattern2).cast("timestamp")) \
  .drop('DispatchDtTm') \
  .withColumn('ResponseDtTmTS', unix_timestamp(fireServiceCallsDF['ResponseDtTm'], from_pattern2).cast("timestamp")) \
  .drop('ResponseDtTm') \
  .withColumn('OnSceneDtTmTS', unix_timestamp(fireServiceCallsDF['OnSceneDtTm'], from_pattern2).cast("timestamp")) \
  .drop('OnSceneDtTm') \
  .withColumn('TransportDtTmTS', unix_timestamp(fireServiceCallsDF['TransportDtTm'], from_pattern2).cast("timestamp")) \
  .drop('TransportDtTm') \
  .withColumn('HospitalDtTmTS', unix_timestamp(fireServiceCallsDF['HospitalDtTm'], from_pattern2).cast("timestamp")) \
  .drop('HospitalDtTm') \
  .withColumn('AvailableDtTmTS', unix_timestamp(fireServiceCallsDF['AvailableDtTm'], from_pattern2).cast("timestamp")) \
  .drop('AvailableDtTm')  

In [31]:
fireServiceCallsTsDF.printSchema()

Notice that the formatting of the timestamps is now different:

In [33]:
display(fireServiceCallsTsDF.limit(5))

Finally calculate how many distinct years of data is in the CSV file:

In [35]:
fireServiceCallsTsDF.select(year('CallDateTS')).distinct().orderBy('year(CallDateTS)').show()

**Q-4) How many service calls were logged in the past 7 days?**

Note that  July 4th, is the 185th day of the year.

Filter the DF down to just 2017 and days of year greater than 180:

In [38]:
fireServiceCallsTsDF.filter(year('CallDateTS') == '2017').filter(dayofyear('CallDateTS') >= 180).select(dayofyear('CallDateTS')).distinct().orderBy('dayofyear(CallDateTS)').show()

In [39]:
fireServiceCallsTsDF.filter(year('CallDateTS') == '2017').filter(dayofyear('CallDateTS') >= 180).groupBy(dayofyear('CallDateTS')).count().orderBy('dayofyear(CallDateTS)').show()

Note above that July 4th, 2017 was the 185th day of the year.

Visualize the results in a bar graph:

In [42]:
display(fireServiceCallsTsDF.filter(year('CallDateTS') == '2017').filter(dayofyear('CallDateTS') >= 180).groupBy(dayofyear('CallDateTS')).count().orderBy('dayofyear(CallDateTS)'))

https://www.cbsnews.com/news/san-francisco-has-second-straight-day-of-triple-digit-temperatures/

The DataFrame is currently comprised of 13 partitions:

In [45]:
fireServiceCallsTsDF.rdd.getNumPartitions()

![Partitions](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/df_13_parts.png)

In [47]:
fireServiceCallsTsDF.repartition(6).createOrReplaceTempView("fireServiceVIEW");

In [48]:
spark.catalog.cacheTable("fireServiceVIEW")

In [49]:
# Call .count() to materialize the cache
spark.table("fireServiceVIEW").count()

In [50]:
fireServiceDF = spark.table("fireServiceVIEW")

In [51]:
# Note that the full scan + count in memory takes < 1 second!

fireServiceDF.count()

In [52]:
spark.catalog.isCached("fireServiceVIEW")

The 6 partitions are now cached in memory:

![6 Partitions](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/df_6_parts.png)

Use the Spark UI to see the 6 partitions in memory:

![Mem UI](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/mem_ui.png)

Now that our data has the correct date types for each column and it is correctly partitioned, let's write it down as a parquet file for future loading:

In [58]:
%fs ls /mnt/

In [59]:
fireServiceDF.write.format('parquet').save('/mnt/Fire_Department_Calls_for_Service2/')

Now the directory should contain 6 .gz compressed Parquet files (one for each partition):

In [61]:
%fs ls /mnt/Fire_Department_Calls_for_Service2/

Here's how you can easily read the parquet file from S3 in the future:

In [63]:
tempDF = spark.read.parquet('/mnt/Fire_Department_Calls_for_Service2/')

In [64]:
display(tempDF.limit(2))

**SQL Queries**

In [66]:
spark.sql("SELECT count(*) FROM fireServiceVIEW").show()

In [67]:
%sql SELECT count(*) FROM fireServiceVIEW;

'Spark Jobs' in the cell above to see that 7 tasks were launched to run the count... 6 tasks to reach the data from each of the 6 partitions and do a pre-aggregation on each partition, then a final task to aggregate the count from all 6 tasks:

![Job details](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/6_tasks.png)

You can use the Spark Stages UI to see the 6 tasks launched in the middle stage:

![Event Timeline](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/event_timeline.png)

**Q-5) Which neighborhood in SF generated the most calls last year?**

In [73]:
%sql SELECT `NeighborhoodDistrict`, count(`NeighborhoodDistrict`) AS Neighborhood_Count FROM fireServiceVIEW WHERE year(`CallDateTS`) == '2015' GROUP BY `NeighborhoodDistrict` ORDER BY Neighborhood_Count DESC LIMIT 15;

Expand the Spark Job details in the cell above and notice that the last stage uses 200 partitions! This is default is non-optimal, given that we only have ~1.6 GB of data and 3 slots.

Change the shuffle.partitions option to 6:

In [75]:
spark.conf.get("spark.sql.shuffle.partitions")

In [76]:
spark.conf.set("spark.sql.shuffle.partitions", 6)

In [77]:
spark.conf.get("spark.sql.shuffle.partitions")

Re-run the same SQL query and notice the speed increase:

In [79]:
%sql SELECT `NeighborhoodDistrict`, count(`NeighborhoodDistrict`) AS Neighborhood_Count FROM fireServiceVIEW WHERE year(`CallDateTS`) == '2015' GROUP BY `NeighborhoodDistrict` ORDER BY Neighborhood_Count DESC LIMIT 15;

SQL also has some handy commands like `DESC` (describe) to see the schema + data types for the table:

In [81]:
%sql DESC fireServiceVIEW;

** DataFrame Joins**

**Q-6) What was the primary non-medical reason most people called the fire department from the Tenderloin last year?**

The "Fire Incidents" data includes a summary of each (non-medical) incident to which the SF Fire Department responded.

Let's do a join to the Fire Incidents data on the "Incident Number" column:

https://data.sfgov.org/Public-Safety/Fire-Incidents/wr8u-xric

Read the Fire Incidents CSV file into a DataFrame:

In [87]:
incidentsDF = spark.read.csv('/mnt/Fire_Department_Calls_for_Service/Fire_Incidents.csv', header=True, inferSchema=True).withColumnRenamed('Incident Number', 'IncidentNumber').cache()

In [88]:
incidentsDF.printSchema()

In [89]:
# Materialize the cache
incidentsDF.count()

In [90]:
display(incidentsDF.limit(3))

In [91]:
joinedDF = fireServiceDF.join(incidentsDF, fireServiceDF.IncidentNumber == incidentsDF.IncidentNumber)

In [92]:
display(joinedDF.limit(3))

In [93]:
#Note that the joined DF is only 1.1 million rows b/c we did an inner join (the original Fire Service Calls data had 4+ million rows)
joinedDF.count()

In [94]:
joinedDF.filter(year('CallDateTS') == '2017').filter(col('NeighborhoodDistrict') == 'Tenderloin').count()

In [95]:
display(joinedDF.filter(year('CallDateTS') == '2017').filter(col('NeighborhoodDistrict') == 'Tenderloin').groupBy('Primary Situation').count().orderBy(desc("count")).limit(10))

Most of the calls were False Alarms!

What do residents of Russian Hill call the fire department for?

In [98]:
display(joinedDF.filter(year('CallDateTS') == '2017').filter(col('NeighborhoodDistrict') == 'Russian Hill').groupBy('Primary Situation').count().orderBy(desc("count")).limit(10))

** Convert a Spark DataFrame to a Pandas DataFrame **

In [100]:
import pandas as pd
from sklearn import tree
from sklearn.cross_validation import train_test_split
from sklearn.metrics import precision_score
from sklearn.metrics import accuracy_score
from sklearn import svm
from sklearn.neighbors import KNeighborsClassifier
from sklearn import tree


In [101]:
p17 = joinedDF.filter(year('CallDateTS') == '2017').toPandas()

In [102]:
p17.dtypes

In [103]:
p17.columns
columns = ['CallType', 'CallFinalDisposition',  'StationArea', 'OriginalPriority', 'Priority', 'FinalPriority', 'ALSUnit','CallTypeGroup', 'NumberofAlarms', 'UnitType', 'EMS Units', 'Ignition Cause']
p17 = p17[columns]
p17.columns

In [104]:
cleanup = {}
for column in columns :
    cleanup = {}
    temp = list(set(p17[column]))
    temp_map = {temp[i]:i for i in range(len(temp))}
    cleanup[column] = temp_map
    print(temp_map)
    print(column)
    try:
      p17.replace(cleanup, inplace=True)
    except:
      continue
p17['ALSUnit'] = p17['ALSUnit'].astype(int)

In [105]:

columns = ['CallType', 'CallFinalDisposition',  'StationArea', 'OriginalPriority', 'Priority', 'FinalPriority', 'CallTypeGroup', 'NumberofAlarms', 'UnitType', 'EMS Units', 'Ignition Cause']
X = p17[columns]
Y = p17['ALSUnit']
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=42)
clf = tree.DecisionTreeClassifier()
clf = clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
print precision_score(y_test, y_pred)
print accuracy_score(y_test,y_pred)


In [106]:
columns = ['UnitType', 'CallFinalDisposition',  'StationArea', 'OriginalPriority', 'Priority', 'FinalPriority', 'CallTypeGroup', 'NumberofAlarms', 'ALSUnit', 'EMS Units', 'Ignition Cause']
X = p17[columns]
Y = p17['CallType']
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=42)
clf = svm.SVC()
clf = clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
precision_score(y_test, y_pred)

In [107]:
columns = ['CallType', 'CallFinalDisposition',  'StationArea', 'OriginalPriority', 'Priority', 'FinalPriority', 'CallTypeGroup', 'NumberofAlarms', 'ALSUnit', 'EMS Units', 'Ignition Cause']
X = p17[columns]
Y = p17['UnitType']
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.3, random_state=42)
neigh = KNeighborsClassifier(n_neighbors=10)
neigh.fit(X_train, y_train) 
y_pred = neigh.predict(X_test)
print precision_score(y_test, y_pred)
print accuracy_score(y_test,y_pred)

In [108]:
columns = ['CallType', 'CallFinalDisposition',  'StationArea', 'OriginalPriority', 'Priority', 'FinalPriority', 'CallTypeGroup', 'NumberofAlarms', 'ALSUnit', 'EMS Units', 'Ignition Cause', 'UnitType']
X = p17[columns]
Y = p17['UnitType']
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import numpy as np
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.3, random_state=42)
# p17[columns]=p17[columns].as_matrix()
kmeans = KMeans(n_clusters=3, random_state=0).fit(X_train,y_train)
pred= kmeans.predict(X_test)
kmeans.cluster_centers_

In [109]:
p17.describe()