# Analyzing SF Fire report using AWS EMR and Spark

## Steps 

1. Create emr clsuter in aws console

Open the Amazon EMR console at https://console.aws.amazon.com/elasticmapreduce/

[Steps to create EMR cluster](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-gs-launch-sample-cluster.html)

[Steps to create jupyter notebook](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-notebooks-create.html)

A S3 bucket will be created during creation of S3 EMR clsuter(You can specify an existing bucket name if you already have otherwise a new s3 bucket will be created).

2. Upload Data File to S3
    - Boot an ec2 machine 
    - Configure AWS CLI 
    
            aws configure
    
        Provide access key id , secret acess key,region name(same as ec2 machine region)
        
    - Download File from URL to ec2 machine local disk
    
            wget 'https://data.sfgov.org/api/views/nuek-vuh3/rows.csv?accessType=DOWNLOAD' -O sffire.csv

    - Upload File to s3 using CLI command
    
            aws s3 cp sffire.csv s3://bucketname/sffire.csv
3. Upload this notebook in your jupyter server and execute the cells


In [97]:
import os

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
df = spark.read.csv('s3://aws-emr-resources-504809190933-us-west-2/Data/sffire.csv',header=True,inferSchema=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
df.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-------+---------------+---------+----------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+-------------+----------------------+--------------------+-------------------+-------------+-------------------+---------+------------+----+-----------------+--------+--------------+--------+---------------+----------------+---------+------------------------------+------------------------+-------------------+------------------------------------+--------------------+-------------+
|Call Number|Unit ID|Incident Number|Call Type| Call Date|Watch Date|       Received DtTm|          Entry DtTm|       Dispatch DtTm|       Response DtTm|       On Scene DtTm|Transport DtTm|Hospital DtTm|Call Final Disposition|      Available DtTm|            Address|         City|Zipcode of Incident|Battalion|Station Area| Box|Original Priority|Priority|Final Priority|ALS Unit|Call Type Group|Number of Alarms|Unit Type|Unit 

In [18]:
for s in str(df.schema).split('),'):
    print(s)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

StructType(List(StructField(Call Number,IntegerType,true
StructField(Unit ID,StringType,true
StructField(Incident Number,IntegerType,true
StructField(Call Type,StringType,true
StructField(Call Date,StringType,true
StructField(Watch Date,StringType,true
StructField(Received DtTm,StringType,true
StructField(Entry DtTm,StringType,true
StructField(Dispatch DtTm,StringType,true
StructField(Response DtTm,StringType,true
StructField(On Scene DtTm,StringType,true
StructField(Transport DtTm,StringType,true
StructField(Hospital DtTm,StringType,true
StructField(Call Final Disposition,StringType,true
StructField(Available DtTm,StringType,true
StructField(Address,StringType,true
StructField(City,StringType,true
StructField(Zipcode of Incident,IntegerType,true
StructField(Battalion,StringType,true
StructField(Station Area,StringType,true
StructField(Box,StringType,true
StructField(Original Priority,StringType,true
StructField(Priority,StringType,true
StructField(Final Priority,IntegerType,true
Struc

# Tasks

1. What were all the different types of fire calls in 2018?

2. What months within the year 2018 saw for the highest number of fire calls?

3. Which neighborhood in SF generated the most fire calls in 2018?

4. Which neighborhoods in SF had the worst response time to fire calls in 2018?

5. Which week in the year in 2018 had the most fire calls?

6. Is there a correlation between neighborhood, zip code, and fire calls?

7. How can we use Parquet files or SQL tables to store this data and read it back?

In [21]:
df = df.cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
df.select('call type').distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|           call type|
+--------------------+
|Elevator / Escala...|
|         Marine Fire|
|  Aircraft Emergency|
|Confined Space / ...|
|      Administrative|
|              Alarms|
|Odor (Strange / U...|
|Lightning Strike ...|
|Citizen Assist / ...|
|              HazMat|
|Watercraft in Dis...|
|           Explosion|
|           Oil Spill|
|        Vehicle Fire|
|  Suspicious Package|
|   Train / Rail Fire|
|Extrication / Ent...|
|               Other|
|        Outside Fire|
|   Traffic Collision|
+--------------------+
only showing top 20 rows

In [71]:
sdf = df.select('call type','call date','Neighborhooods - Analysis Boundaries','Response DtTm','Zipcode of Incident','Received DtTm')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [72]:
from pyspark.sql.functions import *
sdf = (sdf.withColumn('call_date',to_timestamp(col('call date'), 'MM/dd/yyyy')).drop('call date')
    .withColumn('res_date',to_timestamp(col('Response DtTm'), 'MM/dd/yyyy hh:mm:ss')).drop('Response DtTm')
       .withColumn('recv_date',to_timestamp(col('Received DtTm'), 'MM/dd/yyyy hh:mm:ss')).drop('Received DtTm')
      .filter(year('call_date')==2018))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [73]:
sdf=sdf.cache()
sdf.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+------------------------------------+-------------------+-------------------+-------------------+-------------------+
|       call type|Neighborhooods - Analysis Boundaries|Zipcode of Incident|          call_date|           res_date|          recv_date|
+----------------+------------------------------------+-------------------+-------------------+-------------------+-------------------+
|          Alarms|                          Tenderloin|              94109|2018-02-02 00:00:00|2018-02-02 00:37:53|2018-02-02 00:33:20|
|Medical Incident|                             Mission|              94110|2018-02-01 00:00:00|2018-02-01 02:42:25|2018-02-01 02:39:08|
|Medical Incident|                      Haight Ashbury|              94117|2018-02-01 00:00:00|2018-02-01 03:07:42|2018-02-01 03:03:58|
|Medical Incident|                Financial Distric...|              94111|2018-02-01 00:00:00|2018-02-01 03:28:30|2018-02-01 03:25:11|
|Medical Incident|                     South of 

## Q.1 What were all the different types of fire calls in 2018?

In [50]:
sdf.select('call type').distinct().show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------------------------+
|call type                                   |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Train / Rail Fire                           |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Police                               |
|Gas Leak (Na

## Q.2 What months within the year 2018 saw for the highest number of fire calls?

In [56]:
maxcall=sdf.groupBy(month('call_date')).count().agg(max('count')).take(1)
maxcall=maxcall[0][0]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [65]:
sdf.groupBy(month('call_date')).count().orderBy('month(call_date)').show()

sdf.groupBy(month('call_date')).count().filter(col('count')==maxcall).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+-----+
|month(call_date)|count|
+----------------+-----+
|               1|27027|
|               2|24252|
|               3|26606|
|               4|25565|
|               5|26297|
|               6|26189|
|               7|25964|
|               8|25341|
|               9|24602|
|              10|26536|
|              11|26307|
|              12|26014|
+----------------+-----+

+----------------+-----+
|month(call_date)|count|
+----------------+-----+
|               1|27027|
+----------------+-----+

## Q.3 Which neighborhood in SF generated the most fire calls in 2018?

In [70]:
sdf.groupBy('Neighborhooods - Analysis Boundaries').count().sort('count',ascending=False).take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(Neighborhooods - Analysis Boundaries='Tenderloin', count=43894)]

## Q.4 Which neighborhoods in SF had the worst response time to fire calls in 2018?

In [83]:
resp_time_df=sdf.withColumn('res_time',col('res_date').cast("long")-col('recv_date').cast("long"))
resp_time_df.select('Neighborhooods - Analysis Boundaries','res_time').sort('res_time',ascending=False).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------------------+--------+
|Neighborhooods - Analysis Boundaries|res_time|
+------------------------------------+--------+
|                  West of Twin Peaks|   88445|
|                             Mission|   87956|
|                           Chinatown|   87292|
|                             Mission|   83365|
|                  West of Twin Peaks|   75599|
|                           Chinatown|   73617|
|                           Chinatown|   72687|
|                           Chinatown|   72676|
|                           Chinatown|   72602|
|                           Chinatown|   72348|
|                     Sunset/Parkside|   70644|
|                Financial Distric...|   68384|
|                Financial Distric...|   68378|
|                Financial Distric...|   68378|
|                Financial Distric...|   68378|
|                Financial Distric...|   68378|
|                Financial Distric...|   68378|
|                Financial Distric...|  

In [86]:
resp_time_df.groupBy('Neighborhooods - Analysis Boundaries').agg(avg('res_time')).sort('avg(res_time)',ascending=False).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------------------+------------------+
|Neighborhooods - Analysis Boundaries|     avg(res_time)|
+------------------------------------+------------------+
|                     Treasure Island| 695.8151029748284|
|                            Presidio|430.21208141825343|
|                             Portola| 328.8081374956156|
|                        Russian Hill| 308.4644152311877|
|                Financial Distric...| 283.6726095402547|
|                     Sunset/Parkside|268.83954664668414|
|                      Bernal Heights|247.84918032786885|
|                Oceanview/Merced/...|244.77176948464077|
|                           Chinatown|240.81956299078183|
|                              Marina|239.95751469353485|
|                          Noe Valley|226.03992395437263|
|                 Castro/Upper Market| 224.5785999003488|
|                             Mission|224.21023607176582|
|                Bayview Hunters P...| 213.5516316787653|
|             

## Q.5 Which week in the year in 2018 had the most fire calls?

In [89]:
sdf.groupBy(weekofyear('call_date')).count().sort('count',ascending=False).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------+-----+
|weekofyear(call_date)|count|
+---------------------+-----+
|                    1| 7545|
|                   25| 6425|
|                   49| 6354|
|                   22| 6328|
|                   13| 6321|
|                   27| 6289|
|                   40| 6252|
|                   44| 6250|
|                   16| 6217|
|                   46| 6209|
|                   43| 6200|
|                    5| 6160|
|                   18| 6152|
|                   48| 6142|
|                    2| 6109|
|                    9| 6079|
|                   21| 6073|
|                   45| 6050|
|                    6| 6025|
|                    8| 6014|
+---------------------+-----+
only showing top 20 rows

## Q.7 How can we use Parquet files or SQL tables to store this data and read it back?

In [94]:
sdf_new = (sdf.withColumnRenamed('call type','call_type')
.withColumnRenamed('Neighborhooods - Analysis Boundaries','Neighborhooods')
.withColumnRenamed('Zipcode of Incident','Zipcode'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [95]:

parquet_path = 's3://aws-emr-resources-504809190933-us-west-2/Data/SFFire.parquet'
sdf_new.write.format('parquet').save(parquet_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [96]:
spark.read.parquet(parquet_path).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+--------------------+-------+-------------------+-------------------+-------------------+
|       call_type|      Neighborhooods|Zipcode|          call_date|           res_date|          recv_date|
+----------------+--------------------+-------+-------------------+-------------------+-------------------+
|          Alarms|          Tenderloin|  94109|2018-02-02 00:00:00|2018-02-02 00:37:53|2018-02-02 00:33:20|
|Medical Incident|             Mission|  94110|2018-02-01 00:00:00|2018-02-01 02:42:25|2018-02-01 02:39:08|
|Medical Incident|      Haight Ashbury|  94117|2018-02-01 00:00:00|2018-02-01 03:07:42|2018-02-01 03:03:58|
|Medical Incident|Financial Distric...|  94111|2018-02-01 00:00:00|2018-02-01 03:28:30|2018-02-01 03:25:11|
|Medical Incident|     South of Market|  94103|2018-02-01 00:00:00|2018-02-01 03:46:29|2018-02-01 03:44:30|
|Medical Incident|Bayview Hunters P...|  94124|2018-02-01 00:00:00|2018-02-01 03:57:24|2018-02-01 03:53:31|
|Medical Incident|      Oute