In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Fire Example').getOrCreate()

In [2]:
# import packages containing builtin functions and datatypes
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
df = spark.read.csv('D:/Dataset/sf-fire-calls.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [5]:
# Data cleaning - Select only required columns using select() and storing as new dataframe
clean_df = df.select('CallType', 'CallDate', 'City', 'Zipcode', 'Neighborhood', 'Delay' )

# Add a new column 'Date' using method 'withColumn()'.
# It takes two parameters, new column name and values to that column. Here we pass 'to_date()' method
# as second argument. to_date() takes two parameters. First is the date in string datatype and second 
# is the required format in date datatype.

clean_df = clean_df.withColumn('Date', to_date(col('CallDate'), 'MM/dd/yyyy') )
clean_df.printSchema()

# Drop the original column, 'CallDate' (date) in stringtype from dataframe
clean_df = clean_df.drop('CallDate')
clean_df.printSchema()

root
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Delay: double (nullable = true)
 |-- Date: date (nullable = true)

root
 |-- CallType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Delay: double (nullable = true)
 |-- Date: date (nullable = true)



In [6]:
# Add new columns, 'Year', 'Month' and 'Week' to existing dataframe

clean_df = clean_df.withColumn('Year', year(col('Date')))\
    .withColumn('Month', month(col('Date')))\
    .withColumn('Week', weekofyear(col('Date')))
clean_df.show()

+----------------+----+-------+--------------------+---------+----------+----+-----+----+
|        CallType|City|Zipcode|        Neighborhood|    Delay|      Date|Year|Month|Week|
+----------------+----+-------+--------------------+---------+----------+----+-----+----+
|  Structure Fire|  SF|  94109|     Pacific Heights|     2.95|2002-01-11|2002|    1|   2|
|Medical Incident|  SF|  94124|Bayview Hunters P...|      4.7|2002-01-11|2002|    1|   2|
|Medical Incident|  SF|  94102|          Tenderloin|2.4333334|2002-01-11|2002|    1|   2|
|    Vehicle Fire|  SF|  94110|      Bernal Heights|      1.5|2002-01-11|2002|    1|   2|
|          Alarms|  SF|  94109|    Western Addition|3.4833333|2002-01-11|2002|    1|   2|
|  Structure Fire|  SF|  94105|Financial Distric...|     1.75|2002-01-11|2002|    1|   2|
|          Alarms|  SF|  94112|Oceanview/Merced/...|2.7166667|2002-01-11|2002|    1|   2|
|          Alarms|  SF|  94102|          Tenderloin|1.7833333|2002-01-11|2002|    1|   2|
|Medical I

In [8]:
# Add new column 'Season' by user defined function

# creating normal python function. In this case mapSeason(). Input to this function is values
# from column Month. Output of this is season name (StringType)

def mapSeason(data):
    if 2 < data < 6:
        return 'Spring'
    elif 5 < data < 9:
        return 'Summer'
    elif 8 < data < 12 :
        return 'Autumn'
    else:
        return 'Winter'
    
# Register python function as UDF (user defined function) using built-in function udf()
# udf() takes 2 parameter. First is python function and second is return type of function.

seasons_udf = udf(mapSeason, StringType())

# Call the user defined function (season_udf()) and create new column 'Season'
clean_df = clean_df.withColumn('Season', seasons_udf(col('Month')))
clean_df.filter(col('Month') == 7).show()

+--------------------+----+-------+--------------------+----------+----------+----+-----+----+------+
|            CallType|City|Zipcode|        Neighborhood|     Delay|      Date|Year|Month|Week|Season|
+--------------------+----+-------+--------------------+----------+----------+----+-----+----+------+
|    Medical Incident|  SF|  94110|             Mission| 2.4833333|2002-07-01|2002|    7|  27|Summer|
|    Medical Incident|  SF|  94102|          Tenderloin|       4.3|2002-07-01|2002|    7|  27|Summer|
|    Medical Incident|  SF|  94103|     South of Market|      6.55|2002-07-01|2002|    7|  27|Summer|
|    Medical Incident|  SF|  94114| Castro/Upper Market|      2.45|2002-07-01|2002|    7|  27|Summer|
|    Medical Incident|  SF|  94122|     Sunset/Parkside| 3.2666667|2002-07-01|2002|    7|  27|Summer|
|    Medical Incident|  SF|  94109|        Russian Hill| 3.1333334|2002-07-01|2002|    7|  27|Summer|
|    Medical Incident|  SF|  94109|          Tenderloin|       8.1|2002-07-01|2002

In [9]:
# 1.	Get yearly count of fire calls
clean_df.select('Year')\
    .groupBy('Year')\
    .count()\
    .orderBy('Year').show()

+----+-----+
|Year|count|
+----+-----+
|2000| 5459|
|2001| 7713|
|2002| 8090|
|2003| 8499|
|2004| 8283|
|2005| 8282|
|2006| 8174|
|2007| 8255|
|2008| 8869|
|2009| 8789|
|2010| 9341|
|2011| 9735|
|2012| 9674|
|2013|10020|
|2014|10775|
|2015|11458|
|2016|11609|
|2017|12135|
|2018|10136|
+----+-----+



In [10]:
# 2.	What were all the different types of fire calls in 2018?
clean_df.select('CallType')\
    .where(col('Year') == 2018)\
    .distinct().show(truncate=False)

+-------------------------------+
|CallType                       |
+-------------------------------+
|Elevator / Escalator Rescue    |
|Alarms                         |
|Odor (Strange / Unknown)       |
|Citizen Assist / Service Call  |
|HazMat                         |
|Explosion                      |
|Vehicle Fire                   |
|Suspicious Package             |
|Other                          |
|Outside Fire                   |
|Traffic Collision              |
|Assist Police                  |
|Gas Leak (Natural and LP Gases)|
|Water Rescue                   |
|Electrical Hazard              |
|Structure Fire                 |
|Medical Incident               |
|Fuel Spill                     |
|Smoke Investigation (Outside)  |
|Train / Rail Incident          |
+-------------------------------+



In [None]:
# 3.	Which week in the year in 2018 had the most fire calls?
clean_df.select('Week')\
    .where(col('Year') == 2018)\
    .groupBy('Week')\
    .count()\
    .orderBy('count', ascending=False).collect()

In [None]:
# 4.	Get monthly count of fire calls based on year
clean_df.select('Month')\
    .where(col('Year') == 2017)\
    .groupBy('Month')\
    .count()\
    .orderBy('Month').show()

In [None]:
# 5.	Give monthly report of fire call types for selected year
clean_df.select('Month', 'CallType')\
    .where(col('Year') == 2017)\
    .groupBy('Month', 'CallType')\
    .count()\
    .orderBy('Month').show()

In [None]:
# 6.	Give top five fire call types for every season of selected year (seasons are like Spring, summer, fall winter etc).
from pyspark.sql.window import Window

season_df = clean_df.select('Season', 'CallType')\
    .where(col('Year') == 2017)\
    .groupBy('Season', 'CallType')\
    .count()

windowPartition = Window.partitionBy('Season').orderBy(col("count").desc())
season_df = season_df.withColumn('Rank', rank().over(windowPartition))\
    .where(col('Rank') <= 5)
season_df.show()

In [None]:
# 8.	What months within the year 2018 saw the highest number of fire calls?
clean_df.select('Month').filter(col('Year') == 2018).groupBy('Month').count().orderBy(col('count').desc()).show()

In [None]:
# 9.	Find which type of fire call is major calltype in each year
maj_fire = clean_df.select('CallType', 'Year')\
    .groupBy('Year','CallType')\
    .count().orderBy('Year', col('count').desc())
maj_fire.show()

In [None]:
win = Window.partitionBy('Year').orderBy(col("count").desc())
maj_fire.withColumn('Rank',rank().over(win)).where(col('Rank') ==1).orderBy('Year').show()

In [None]:
# 10.	Find out average delay in response for each call type
avg_delay = clean_df.select('CallType', 'Delay')\
    .groupBy('CallType')\
    .agg(avg('Delay').alias('Avg Delay')).orderBy(col('Avg Delay').desc())
avg_delay.show(30, truncate=False)

In [None]:
#11.	Find which calltype has maximum average delay time.
avg_delay.collect()[0][0]

In [None]:
# 12.	Which neighborhood in San Francisco generated the most fire calls in 2018?
clean_df.select('City', 'Year','Neighborhood')\
    .filter((col('year') == 2018) & (col('City') == 'San Francisco'))\
    .groupBy('Neighborhood').count().orderBy(col('count').desc()).collect()[0][0]