In [1]:
import os, sys
import pyspark
from pyspark.sql import SQLContext

from pyspark.sql import SparkSession # to create dataframes

sc = pyspark.SparkContext.getOrCreate()
#sqlContext = SQLContext(sc)
sqlContext = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate()

import pandas as pd
pd.options.display.max_rows = 999
pd.options.display.max_columns = 999

import pyspark.sql.functions as F # to work with dataframes siimilar to rdd.map()
from pyspark.sql.functions import lit # to fill new column with custom values
from pyspark.sql.functions import col # to select a dataframe column

from matplotlib import pyplot as plt
import seaborn as sns
%matplotlib inline

In [2]:
# to read parquet file

df1 = sqlContext.read.parquet('/home/shashank/Documents/gitWorkspace/SFFD-Spark-Project/Data/FireDepartmentBig.parquet')
df1 = df1.repartition(8)
df1.rdd.getNumPartitions()

8

# Data description:


## Column Name  |  Description  |  Data Type

Call Number  |  A unique 9-digit number assigned by the 911 Dispatch Center (DEM) to this call. These number are used for both Police and Fire calls.  |  Plain Text

Unit ID  |  A unique code assigned for every unit  |  Plain Text

Incident Number  |  A unique 8-digit number assigned by DEM to this Fire incident.  |  Plain Text

Call Type  |  Call Type  |  Plain Text

Call Date  |  Date the call is received at the 911 Dispatch Center. Used for reporting purposes.  |  Date & Time

Watch Date  |  Watch date when the call is received. Watch date starts at 0800 each morning and ends at 0800 the next day.  |  Date & Time

Received DtTm  |  Date and time of call is received at the 911 Dispatch Center.  |  Date & Time

Entry DtTm  |  Date and time the 911 operator submits the entry of the initical call information into the CAD system  |  Date & Time

Dispatch DtTm  |  Date and time the 911 operator dispatches this unit to the call.  |  Date & Time

Response DtTm  |  Date and time this unit acknowledges the dispatch and records that the unit is en route to the location of the call.  |  Date & Time

On Scene DtTm  |  Date and time the unit records arriving to the location of the incident  |  Date & Time

Transport DtTm  |  If this unit is an ambulance, date and time the unit begins the transport unit arrives to hospital  |  Date & Time

Hospital DtTm  |  If this unit is an ambulance, date and time the unit arrives to the hospital.  |  Date & Time

Call Final Disposition  |  Disposition of the call (Code). For example TH2: Transport to Hospital - Code 2, FIR: Resolved by Fire Department  |  Plain Text

Available DtTm  |  \Date and time this unit is not longer assigned to this call and it is available for another dispatch.  |  Date & Time

Address  |  Address of incident (note: address and location generalized to mid-block of street, intersection or nearest call box location, to protect caller privacy).  |  Plain Text

City  |  City of incident  |  Plain Text

Zipcode of Incident  |  Zipcode of incident  |  Plain Text

Battalion  |  Emergency Response District (There are 9 Fire Emergency Response Districts)  |  Plain Text

Station Area  |  Fire Station First Response Area associated with the address of the incident  |  Plain Text

Box  |  Fire box associated with the address of the incident. A box is the smallest area used to divide the City. Each box is associated with a unique unit dispatch order. The City is divided into more than 2,400 boxes.  |  Plain Text

Original Priority  |  Initial call priority (Code 2: Non-Emergency or Code 3:Emergency).  |  Plain Text

Priority  |  Call priority (Code 2: Non-Emergency or Code 3:Emergency).  |  Plain Text

Final Priority  |  Final call priority (Code 2: Non-Emergency or Code 3:Emergency).  |  Plain Text

ALS Unit  |  Does this unit includes ALS (Advance Life Support) resources? Is there a paramedic in this unit?  |  Checkbox

Call Type Group  |  Call types are divided into four main groups: Fire, Alarm, Potential Life Threatening and Non Life Threatening.  |  Plain Text

Number of Alarms  |  Number of alarms associated with the incident. This is a number between 1 and 5.  |  Number

Unit Type  |  Unit type  |  Plain Text

Unit sequence in call dispatch  |  A number that indicates the order this unit was assigned to this call  |  Number

Fire Prevention District  |  Bureau of Fire Prevention District associated with this address  |  Plain Text

Supervisor District  |  Supervisor District associated with this address (note: these are the districts created in 2012).  |  Plain Text

Neighborhooods - Analysis Boundaries  |  Neighborhood District associated with this address  |  Plain Text

Location  |  Location of incident (note: address and location generalized to mid-block of street, intersection or nearest call box location, to protect caller privacy).  |  Location

RowID  |  Unique Call Number and Unit ID combination  |  Plain Text


## Aim is to predict how long it takes for the SFFD to reach the location of incident from the moment call is received by 911.
#### Drop the following because we are interested in prediction of : (On_Scene_DtTm - Received_DtTm)

    -Call_Date
    -Watch_Date
    -Entry_DtTm
    -Dispatch_DtTm
    -Response_DtTm
    -Transport_DtTm
    -Hospital_DtTm
    
#### Drop City because we are looking only at one city, San Franscio
#### Drop Call Final Disposition, Priority and Final Priority because these are not known at the time of dispatch hence, should not be used to predict the response time
#### Drop Unit_sequence_in_call_dispatch since we are interested in the minimum time taken by the first unit.
#### Drop Address since we will be using zip, box and coordinates to locate the incident.
#### Drop Neighborhooods_-_Analysis_Boundaries since a single location can have multiple neighborhoods and Supervisor District is already good indicator of the travelling distance.

In [3]:
print("distinct call number =", df1.select('Call_Number').distinct().count())
print("distinct incident number = ", df1.select('Incident_Number').distinct().count())

('distinct call number =', 2224130)
('distinct incident number = ', 2224130)


#### Since both Call_Number and Incident_Number represent the same event, we can drop Call_Number

In [4]:
df1 = df1.drop('Call_Date', 'Watch_Date', 'Entry_DtTm', 'Dispatch_DtTm',\
               'Response_DtTm', 'Transport_DtTm', 'Hospital_DtTm', \
               'City', 'Priority', 'Final_Priority', 'ALS_Unit', 'Address',  \
               'Unit_sequence_in_call_dispatch', 'Neighborhooods_-_Analysis_Boundaries', \
               'Battalion', 'Call_Number', 'Call_Final_Disposition', '__index_level_0__', 
              )

In [5]:
df1.printSchema()

root
 |-- Unit_ID: string (nullable = true)
 |-- Incident_Number: integer (nullable = true)
 |-- Call_Type: string (nullable = true)
 |-- Received_DtTm: string (nullable = true)
 |-- On_Scene_DtTm: string (nullable = true)
 |-- Available_DtTm: string (nullable = true)
 |-- Zipcode_of_Incident: integer (nullable = true)
 |-- Station_Area: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- Original_Priority: string (nullable = true)
 |-- Call_Type_Group: string (nullable = true)
 |-- Number_of_Alarms: integer (nullable = true)
 |-- Unit_Type: string (nullable = true)
 |-- Fire_Prevention_District: string (nullable = true)
 |-- Supervisor_District: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- RowID: string (nullable = true)



In [6]:
# change format of dates from string to datetime/ date

# Received_DtTm
df1 = df1.withColumn('Received_DtTm', F.to_timestamp('Received_DtTm', 'MM/dd/yyyy HH:mm'))
# On_Scene_DtTm
df1 = df1.withColumn('On_Scene_DtTm', F.to_timestamp('On_Scene_DtTm', 'MM/dd/yyyy HH:mm'))
# Available_DtTm
df1 = df1.withColumn('Available_DtTm', F.to_timestamp('Available_DtTm', 'MM/dd/yyyy HH:mm'))

# month of the year when call was received
df1 = df1.withColumn('Received_month', F.month('Received_DtTm'))

# Hour of the day when call was received
df1 = df1.withColumn('Received_Hour', F.hour('Received_DtTm'))

# response_time = On_Scene_DtTm - Received_DtTm
df1 = df1.withColumn('response_time', (F.unix_timestamp('On_Scene_DtTm') - F.unix_timestamp('Received_DtTm'))/60)

df1.limit(5).toPandas()

Unnamed: 0,Unit_ID,Incident_Number,Call_Type,Received_DtTm,On_Scene_DtTm,Available_DtTm,Zipcode_of_Incident,Station_Area,Box,Original_Priority,Call_Type_Group,Number_of_Alarms,Unit_Type,Fire_Prevention_District,Supervisor_District,Location,RowID,Received_month,Received_Hour,response_time
0,RS2,2043687,Structure Fire,2002-05-27 06:02:00,NaT,2002-05-27 06:10:00,94132,19,8482,3,,1,RESCUE SQUAD,8,7,"(37.7175134679807, -122.480594574461)",021470272-RS2,5,6,
1,E36,1044168,Medical Incident,2001-05-23 07:58:00,2001-05-23 08:03:00,2001-05-23 08:04:00,94102,36,3111,3,,1,ENGINE,2,6,"(37.7764405100838, -122.418481123408)",011430359-E36,5,7,5.0
2,A208,1072234,Medical Incident,2001-08-22 10:28:00,2001-08-22 10:49:00,2001-08-22 11:39:00,94107,25,2641,1,,1,PRIVATE,10,10,"(37.7507996891376, -122.389470116786)",012340152-A208,8,10,21.0
3,B02,1044407,Alarms,2001-05-24 04:34:00,2001-05-24 04:40:00,2001-05-24 04:46:00,94102,1,1456,3,,1,CHIEF,3,6,"(37.7825474000421, -122.412247935495)",011440330-B02,5,4,6.0
4,E29,14055998,Medical Incident,2014-06-11 09:16:00,2014-06-11 09:21:00,2014-06-11 09:29:00,94103,29,2331,3,Potentially Life-Threatening,1,ENGINE,3,6,"(37.7707086968349, -122.407716820686)",141623389-E29,6,9,5.0


In [7]:
df1.groupBy("Unit_Type").agg({"response_time":"avg", "Unit_Type":"count"}).show()

+--------------+----------------+------------------+
|     Unit_Type|count(Unit_Type)|avg(response_time)|
+--------------+----------------+------------------+
|       AIRPORT|           19372|11.548808920425747|
|         MEDIC|         1521025| 9.186744684767747|
|         CHIEF|          361838| 7.384741039684828|
|  RESCUE SQUAD|           89073| 5.077862398347747|
|RESCUE CAPTAIN|          167919| 8.557951030447583|
|         TRUCK|          530221| 5.972022632939403|
| INVESTIGATION|            5209| 40.52706819779705|
|        ENGINE|         1887109| 5.531574799705148|
|       SUPPORT|           25998| 13.04386146113062|
|       PRIVATE|          308063|6.1669380461433265|
+--------------+----------------+------------------+



###### Looking at the above table, 'AIRPORT' and 'INVESTIGATION' unit types are clearly outliers. Since, the datapoints are also very low, we can drop them.

In [8]:
df1 = df1.filter(df1['Unit_Type'] != "AIRPORT").filter(df1['Unit_Type'] != "INVESTIGATION")

fig = plt.figure(figsize= (15,5))

plt.subplot(1, 2, 1)
sns.distplot(df1.filter(df1['response_time'] <> 0 ).select('response_time').collect())
plt.xlabel('Response time in mins')

plt.subplot(1, 2, 2)
sns.distplot(df1.filter(df1['response_time'] < 100 ).select('response_time').collect())
plt.xlabel('Response time in mins')

plt.show()

sns.distplot(df1.filter(df1['response_time'] > 0 ).filter(df1['response_time'] <= 60 ).select('response_time').collect())
plt.xlabel('Response time in mins')
plt.show()

df1.filter(df1['response_time'] > 0 ).filter(df1['response_time'] <= 60 ).select('response_time').describe().show()

In [9]:
print('total number of data points = ', df1.count())
print('total data points being included = ', df1.filter(df1['response_time'] > 0 )\
      .filter(df1['response_time'] <= 60 ).count())

('total number of data points = ', 4891246)
('total data points being included = ', 3690460)


###### From the above plot we can safely say, response time of most of our data points lie below 60 minutes. We can drop all the points where we don't have response time or response time is negative or response time is more than 1 hour.

In [10]:
df1 = df1.filter(df1['response_time'] > 0 ).filter(df1['response_time'] <= 60 )
df1.limit(5).toPandas()

Unnamed: 0,Unit_ID,Incident_Number,Call_Type,Received_DtTm,On_Scene_DtTm,Available_DtTm,Zipcode_of_Incident,Station_Area,Box,Original_Priority,Call_Type_Group,Number_of_Alarms,Unit_Type,Fire_Prevention_District,Supervisor_District,Location,RowID,Received_month,Received_Hour,response_time
0,E01,18098628,Medical Incident,2018-08-22 02:58:00,2018-08-22 03:07:00,2018-08-22 03:22:00,94102,3,1456,3,Potentially Life-Threatening,1,ENGINE,3,6,"(37.78316342713403, -122.41159927893747)",182342552-E01,8,2,9.0
1,T17,7028711,Traffic Collision,2007-04-10 02:12:00,2007-04-10 02:28:00,2007-04-10 03:29:00,94124,44,6542,3,,1,TRUCK,10,10,"(37.7177400893424, -122.398434316401)",071000025-T17,4,2,16.0
2,T03,12111685,Traffic Collision,2012-12-01 11:08:00,2012-12-01 11:13:00,2012-12-01 11:13:00,94109,3,1636,3,Potentially Life-Threatening,1,TRUCK,4,3,"(37.788699045225, -122.420306883643)",123360141-T03,12,11,5.0
3,RC2,7093141,Medical Incident,2007-11-20 09:14:00,2007-11-20 09:28:00,2007-11-20 09:41:00,94122,23,7632,3,,1,RESCUE CAPTAIN,8,4,"(37.7559394117862, -122.501380811138)",073240093-RC2,11,9,14.0
4,53,17008092,Medical Incident,2017-01-18 12:30:00,2017-01-18 12:43:00,2017-01-18 02:01:00,94111,13,1133,2,Potentially Life-Threatening,1,MEDIC,1,3,"(37.795466113999, -122.39677157393)",170181628-53,1,12,13.0


In [11]:
df3 = df1.filter(df1['Received_DtTm'] >= '2018-01-01 00:00:00' )

#df3 = df3.repartition(1)
#df3.write.parquet('/home/shashank/Documents/gitWorkspace/SFFD-Spark-Project/Data/FireDepartmentSmall.parquet')
#df3.write.csv('/home/shashank/Documents/gitWorkspace/SFFD-Spark-Project/Data/FireDepartmentSmall.csv', header=True)

In [12]:
# for now use a small sample 
df1 = df3
df1.count()

298562

### Create avg_response_history for every box

In [13]:
df1.registerTempTable('temptable')

df2 = sqlContext.sql("\
SELECT \
    A.RowID as temp_rowid,\
    SUM(B.response_time)/COUNT(B.response_time) AS avg_response_history \
FROM \
    temptable A JOIN temptable B ON A.Zipcode_of_Incident = B.Zipcode_of_Incident \
WHERE \
     (B.Received_DtTm >= A.Received_DtTm - INTERVAL 1 DAY) and (B.Received_DtTm < A.Received_DtTm) \
GROUP BY A.RowID\
")

df1 = df1.join(df2, df1.RowID == df2.temp_rowid, 'outer')
df1 = df1.drop('temp_rowid')

df1.limit(5).toPandas()

Unnamed: 0,Unit_ID,Incident_Number,Call_Type,Received_DtTm,On_Scene_DtTm,Available_DtTm,Zipcode_of_Incident,Station_Area,Box,Original_Priority,Call_Type_Group,Number_of_Alarms,Unit_Type,Fire_Prevention_District,Supervisor_District,Location,RowID,Received_month,Received_Hour,response_time,avg_response_history
0,AM124,18000033,Medical Incident,2018-01-01 12:45:00,2018-01-01 12:54:00,2018-01-01 01:39:00,94109,41,1631,3,Potentially Life-Threatening,1,PRIVATE,1,3,"(37.798971055891634, -122.4206888025825)",180010233-AM124,1,12,9.0,9.960526
1,KM01,18000080,Medical Incident,2018-01-01 02:11:00,2018-01-01 02:18:00,2018-01-01 03:29:00,94112,15,8473,3,Potentially Life-Threatening,1,PRIVATE,9,7,"(37.722866055573746, -122.45621512101437)",180010565-KM01,1,2,7.0,6.5
2,B03,18000111,Alarms,2018-01-01 03:02:00,2018-01-01 03:09:00,2018-01-01 03:14:00,94105,35,2147,3,Alarm,1,CHIEF,3,6,"(37.78579181631806, -122.39692077599344)",180010741-B03,1,3,7.0,17.615385
3,T05,18000132,Alarms,2018-01-01 04:24:00,2018-01-01 04:31:00,2018-01-01 04:35:00,94115,5,3624,3,Alarm,1,TRUCK,5,5,"(37.78116830793363, -122.43480568645582)",180010937-T05,1,4,7.0,8.611111
4,E42,18000498,Medical Incident,2018-01-01 11:43:00,2018-01-01 11:49:00,2018-01-02 12:12:00,94124,42,6475,3,Potentially Life-Threatening,1,ENGINE,10,10,"(37.7277278991841, -122.39761127578647)",180014190-E42,1,11,6.0,9.058824


In [19]:
# save the file for model fitting
#df1.write.parquet('/home/shashank/Documents/gitWorkspace/SFFD-Spark-Project/Data/FireSmallCleaned.parquet')
df1.write.csv('/home/shashank/Documents/gitWorkspace/SFFD-Spark-Project/Data/FireSmallCleaned.csv', header=True)