In [1]:
from pyspark.sql.session import SparkSession
from pyspark.sql.context import SQLContext

import warnings
warnings.filterwarnings("ignore")

In [2]:
spark = SparkSession.builder.appName("dataframe_analysis").getOrCreate()

sqlcontext = SQLContext(spark)

In [3]:
sf_fire_calls_df = spark.read.csv("../data/fire_department_calls.csv", header = True, inferSchema = True)

In [4]:
sf_fire_calls_df.show(5)

+-----------+-------+---------------+-----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+----------------------+-------------------+--------------------+----+-------------------+---------+------------+----+-----------------+--------+--------------+--------+---------------+----------------+---------+------------------------------+------------------------+-------------------+---------------------------------+-------------+--------------------+-------------------+-------------------+-------------------------+
|call_number|unit_id|incident_number|        call_type|          call_date|         watch_date|      received_dttm|         entry_dttm|      dispatch_dttm|      response_dttm|      on_scene_dttm|     transport_dttm|      hospital_dttm|call_final_disposition|     available_dttm|             address|city|zipcode_of_incident|battalion|station_

In [5]:
#Before doing any transformations, we can use the cache() method. 
#It will cache the dataframe into memory, and as i want to run several analysis transformations on the same dataframe
#It makes sense to cache the dataframe in memory and speed up my analysis execution

sf_fire_calls_df.cache()

DataFrame[call_number: int, unit_id: string, incident_number: int, call_type: string, call_date: timestamp, watch_date: timestamp, received_dttm: timestamp, entry_dttm: timestamp, dispatch_dttm: timestamp, response_dttm: timestamp, on_scene_dttm: timestamp, transport_dttm: timestamp, hospital_dttm: timestamp, call_final_disposition: string, available_dttm: timestamp, address: string, city: string, zipcode_of_incident: int, battalion: string, station_area: int, box: int, original_priority: string, priority: string, final_priority: int, als_unit: boolean, call_type_group: string, number_of_alarms: int, unit_type: string, unit_sequence_in_call_dispatch: int, fire_prevention_district: string, supervisor_district: string, neighborhoods_analysis_boundaries: string, rowid: string, case_location: string, data_as_of: timestamp, data_loaded_at: timestamp, computed_region_ajp5_b2md: int]

Q1. How many distint types of calls were made to the FIRE DEPARTMENT?

In [6]:
q1_df = sf_fire_calls_df.where("call_type is not null")\
                        .select("call_type")\
                        .distinct()

#count() is an action, it is an operation that kick off a Spark job execution. It does not create a newly transform df
answer_q1 = q1_df.count()

print(answer_q1)

18


Q2. What were distinct types of calls made to the FIRE DEPARTMENT ?

In [7]:
#if we want to give an alias to the columns we are selecting, we must slice the df
q2_df = sf_fire_calls_df.where("call_type is not null")\
                        .select(sf_fire_calls_df["call_type"].alias("disctinct_call_type"))\
                        .distinct()

q2_df.show()

+--------------------+
| disctinct_call_type|
+--------------------+
|Elevator / Escala...|
|Structure Fire / ...|
|      Administrative|
|              Alarms|
|Odor (Strange / U...|
|Citizen Assist / ...|
|           Explosion|
|        Vehicle Fire|
|               Other|
|        Outside Fire|
|   Traffic Collision|
|Gas Leak (Natural...|
|        Water Rescue|
|   Electrical Hazard|
|Industrial Accidents|
|    Medical Incident|
|Smoke Investigati...|
|Train / Rail Inci...|
+--------------------+



Q3. What were the most common call types?

In [8]:
q3_df = sf_fire_calls_df.select("call_type")\
                        .where("call_type is not null")\
                        .groupBy("call_type").count().withColumnRenamed("count", "n#_of_call_types")\
                        .orderBy("count", ascending=False)

q3_df.show()

+--------------------+----------------+
|           call_type|n#_of_call_types|
+--------------------+----------------+
|    Medical Incident|             600|
|Structure Fire / ...|             147|
|              Alarms|             112|
|   Traffic Collision|              72|
|Citizen Assist / ...|              18|
|               Other|              13|
|        Vehicle Fire|               8|
|Odor (Strange / U...|               7|
|Gas Leak (Natural...|               6|
|        Outside Fire|               4|
|Smoke Investigati...|               4|
|        Water Rescue|               2|
|   Electrical Hazard|               2|
|Elevator / Escala...|               1|
|           Explosion|               1|
|      Administrative|               1|
|Industrial Accidents|               1|
|Train / Rail Inci...|               1|
+--------------------+----------------+



Q4. What zipcodes accounted for most common calls ?

In [11]:
q4_df = sf_fire_calls_df.select("zipcode_of_incident")\
                        .where("zipcode_of_incident is not null")\
                        .groupBy("zipcode_of_incident").count().withColumnRenamed("count","n#_of_zipcodes")\
                        .orderBy("count", ascending = False)

q4_df.show()

+-------------------+--------------+
|zipcode_of_incident|n#_of_zipcodes|
+-------------------+--------------+
|              94103|           120|
|              94102|           108|
|              94109|            95|
|              94110|            93|
|              94124|            53|
|              94107|            49|
|              94112|            45|
|              94133|            39|
|              94115|            37|
|              94122|            37|
|              94118|            33|
|              94134|            31|
|              94121|            29|
|              94132|            28|
|              94117|            26|
|              94114|            25|
|              94116|            24|
|              94108|            24|
|              94131|            24|
|              94105|            21|
+-------------------+--------------+
only showing top 20 rows



Q5. What San Francisco neighborhoods are in zip codes 94102 and 94103?

In [13]:
q5_df = sf_fire_calls_df.select("zipcode_of_incident","neighborhoods_analysis_boundaries")\
                        .where("zipcode_of_incident in (94102,94103)")

q5_df.show()

+-------------------+---------------------------------+
|zipcode_of_incident|neighborhoods_analysis_boundaries|
+-------------------+---------------------------------+
|              94102|                     Hayes Valley|
|              94102|                       Tenderloin|
|              94103|             Financial Distric...|
|              94102|                       Tenderloin|
|              94103|                  South of Market|
|              94103|                  South of Market|
|              94102|                 Western Addition|
|              94103|                  South of Market|
|              94102|                       Tenderloin|
|              94102|                     Hayes Valley|
|              94103|                          Mission|
|              94102|                     Hayes Valley|
|              94103|                  South of Market|
|              94103|                  South of Market|
|              94102|                       Tend