# San Francisco Fire Department Data Analysis

The SF OpenData project was launched in 2009 and contains hundreds of datasets from the city and county of San Francisco. Open government data has the potential to increase the quality of life for residents, create more efficient government services, better public decisions, and even new local businesses and services.

## Loading in the Data

Run the following two cells to access the San Francisco Fire data set.

In [None]:
ACCESSY_KEY_ID = "AKIAJBRYNXGHORDHZB4A"
SECERET_ACCESS_KEY = "a0BzE1bSegfydr3%2FGE3LSPM6uIV5A4hOUfpH8aFF" 

mounts_list = [
{'bucket':'databricks-corp-training/sf_open_data/', 'mount_folder':'/mnt/sf_open_data'}
]

for mount_point in mounts_list:
  bucket = mount_point['bucket']
  mount_folder = mount_point['mount_folder']
  try:
    dbutils.fs.ls(mount_folder)
    dbutils.fs.unmount(mount_folder)
  except:
    pass
  finally: #If MOUNT_FOLDER does not exist
    dbutils.fs.mount("s3a://"+ ACCESSY_KEY_ID + ":" + SECERET_ACCESS_KEY + "@" + bucket,mount_folder)

In [None]:
%fs ls /mnt/sf_open_data/fire_dept_calls_for_service/

path,name,size
dbfs:/mnt/sf_open_data/fire_dept_calls_for_service/Fire_Department_Calls_for_Service.csv,Fire_Department_Calls_for_Service.csv,1634673683


In [None]:
df = spark.read.csv("dbfs:/mnt/sf_open_data/fire_dept_calls_for_service/Fire_Department_Calls_for_Service.csv", header = True, inferSchema = True)
df.printSchema()

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType

fireSchema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),       
                     StructField('WatchDate', StringType(), True),       
                     StructField('ReceivedDtTm', StringType(), True),       
                     StructField('EntryDtTm', StringType(), True),       
                     StructField('DispatchDtTm', StringType(), True),       
                     StructField('ResponseDtTm', StringType(), True),       
                     StructField('OnSceneDtTm', StringType(), True),       
                     StructField('TransportDtTm', StringType(), True),                  
                     StructField('HospitalDtTm', StringType(), True),       
                     StructField('CallFinalDisposition', StringType(), True),       
                     StructField('AvailableDtTm', StringType(), True),       
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('ZipcodeofIncident', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumberofAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('Unitsequenceincalldispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('NeighborhoodDistrict', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True)])

In [None]:
df = spark.read.csv("dbfs:/mnt/sf_open_data/fire_dept_calls_for_service/Fire_Department_Calls_for_Service.csv", header=True, schema = fireSchema)
df.cache()

In [None]:
display(df.head(5))

In [None]:
from pyspark.ml.feature import *

pattern1 = "M/d/yyyy"
pattern2 = "M/d/yyyy h:m:s a"

In [None]:
df = df.withColumn("CallDateTS", to_date(df["CallDate"], pattern1)).drop(df.CallDate)
df = df.withColumn("WatchDateTS", to_date(df["WatchDate"], pattern1)).drop(df.WatchDate)
df = df.withColumn("ReceivedDateTS", unix_timestamp(df["ReceivedDtTm"], pattern2).cast("timestamp")).drop(df.ReceivedDtTm)
df = df.withColumn("EntryDateTS", unix_timestamp(df["EntryDtTm"], pattern2).cast("timestamp")).drop(df.EntryDtTm)
df = df.withColumn("DispatchDateTS", unix_timestamp(df["DispatchDtTm"], pattern2).cast("timestamp")).drop(df.DispatchDtTm)
df = df.withColumn("ResponseDateTS", unix_timestamp(df["ResponseDtTm"], pattern2).cast("timestamp")).drop(df.ResponseDtTm)
df = df.withColumn("OnSceneDateTS", unix_timestamp(df["OnSceneDtTm"], pattern2).cast("timestamp")).drop(df.OnSceneDtTm)
df = df.withColumn("TransportDateTS", unix_timestamp(df["TransportDtTm"], pattern2).cast("timestamp")).drop(df.TransportDtTm)
df = df.withColumn("HospitalDateTS", unix_timestamp(df["HospitalDtTm"], pattern2).cast("timestamp")).drop(df.HospitalDtTm)
df = df.withColumn("AvailableDateTS", unix_timestamp(df["AvailableDtTm"], pattern2).cast("timestamp")).drop(df.AvailableDtTm)

How many incidents were there at each zip code?

In [None]:
df.select("UnitID").distinct().count()

Show that all twelve months are accounted for in the Fire Service Calls data column.

In [None]:
display(df.groupby(year('CallDateTS')).count().orderBy(year('CallDateTS')))

In [None]:
display(df.groupby(month('CallDateTS')).count().orderBy(month('CallDateTS')))

How many service calls were placed each month in 2001?

In [None]:
df.filter(year('CallDateTS') == 2001).groupBy(month('CallDateTS')).count().orderBy(month('CallDateTS')).show()

In [None]:
display(df.filter(year('CallDateTS') == 2001).groupBy(month('CallDateTS')).count().orderBy(month('CallDateTS')))

How many fires were there in 2015 by month.  (Filter on Call Type Group == 'Fire' and add .show() to the end of your command)

In [None]:
df.filter(year('CallDateTS') == 2015).filter(df['CallTypeGroup'] =="Fire").groupby(month('CallDateTS')).count()\
      .orderBy(month('CallDateTS')).show()
#display(df.filter(year('CallDateTS') == 2015).filter(df['CallTypeGroup'] =="Fire").groupby(month('CallDateTS')).count()\
#      .orderBy(month('CallDateTS')))

## Joins and Filters

In this problem, we will modify the dataset above so as to predict false alarms. We will need to do some formatting to get the data in the proper format.  In particular, we will need to make all variables numeric.

In [None]:
%fs ls /mnt/sf_open_data/fire_incidents

In [None]:
incidentsDF = spark.read.csv('dbfs:/mnt/sf_open_data/fire_incidents/Fire_Incidents.csv', header =True, inferSchema = True)\
                        .withColumnRenamed("Incident Number", "IncidentNumber")\
                        .cache()

In [None]:
incidentsDF.printSchema()

Join the fire data set with the incidents data on IncidentNumber so that we only include data that has an incident number in both tables.  Name the resulting table "DF".

In [None]:
DF = df.join(incidentsDF, "IncidentNumber").select(df["IncidentNumber"], df["OriginalPriority"], df["NumberofAlarms"], df['UnitID'], df['Zipcodeofincident'], df['Battalion'], df['StationArea'], df['NeighborhoodDistrict'], incidentsDF["Primary Situation"], incidentsDF["Exposure Number"], incidentsDF["Fire Fatalities"], incidentsDF["Fire Injuries"]).dropDuplicates()

In [None]:
DF = DF.withColumn("FA", (DF["Primary Situation"] == '700 - false alarm or false call, other').cast('integer'))
#DF = DF.withColumn("FA", (DF["Primary Situation"].like('700 %')).cast('integer'))

In [None]:
Battalion_indexer = StringIndexer(inputCol='Battalion', outputCol="BattIndex")

In [None]:
assembler = VectorAssembler(inputCols= [ 'NumberofAlarms', 'Zipcodeofincident', 'BattIndex', 'Exposure Number', 'Fire Fatalities', 'Fire Injuries'], outputCol="features")

In [None]:
from pyspark.ml.classification import LogisticRegression
log_reg = LogisticRegression(featuresCol='features', labelCol='FA')

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[Battalion_indexer, assembler, log_reg])

In [None]:
train, test = DF.randomSplit([0.7, 0.3])