Spark Program to analyze San Francisco fires DataSet

Importing libraries

In [None]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F

In [None]:
#Building the Spark App
spark=SparkSession.builder.appName("SF_Fire").getOrCreate()

In [None]:
#Schema
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                          StructField('UnitID', StringType(), True),
                          StructField('IncidentNumber', IntegerType(), True),
                          StructField('CallType', StringType(), True), 
                          StructField('CallDate', StringType(), True), 
                          StructField('WatchDate', StringType(), True),
                          StructField('CallFinalDisposition', StringType(), True),
                          StructField('AvailableDtTm', StringType(), True),
                          StructField('Address', StringType(), True), 
                          StructField('City', StringType(), True), 
                          StructField('Zipcode', IntegerType(), True), 
                          StructField('Battalion', StringType(), True), 
                          StructField('StationArea', StringType(), True), 
                          StructField('Box', StringType(), True), 
                          StructField('OriginalPriority', StringType(), True), 
                          StructField('Priority', StringType(), True), 
                          StructField('FinalPriority', IntegerType(), True), 
                          StructField('ALSUnit', BooleanType(), True), 
                          StructField('CallTypeGroup', StringType(), True),
                          StructField('NumAlarms', IntegerType(), True),
                          StructField('UnitType', StringType(), True),
                          StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                          StructField('FirePreventionDistrict', StringType(), True),
                          StructField('SupervisorDistrict', StringType(), True),
                          StructField('Neighborhood', StringType(), True),
                          StructField('Location', StringType(), True),
                          StructField('RowID', StringType(), True),
                          StructField('Delay', FloatType(), True)])

In [None]:
#Reading the CSV file and loading it into a DataFrame
sf_fire_file="file:///SparkCourse/Fire-data.csv"
fire_df=spark.read.schema(fire_schema).option("header","true").csv(sf_fire_file)

In [None]:
#Getting Incidents which do not have Medical Incident as their call types
few_fire_df = (fire_df.select("IncidentNumber","AvailableDtTm","CallType").where(F.col("CallType")!="Medical Incident"))
few_fire_df.show(5,truncate=False)

In [None]:
#Getting distinct types of call types
fire_df.select("CallType").where(F.col("CallType").isNotNull()).agg(countDistinct("CallType").alias("Distinct CallTypes")).show()
fire_df.show(5,truncate=False)
fire_df.select("CallType").where(F.col("CallType").isNotNull()).distinct().show(5,truncate=False)

In [None]:
#Renaming Columns and getting the situations where response time was more than 5 mins
new_fire_df=fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
new_fire_df.select("ResponseDelayedinMins").where(F.col("ResponseDelayedinMins")>5).show(5)

In [None]:
#Changing the Datatypes of few columns from string to date types
fire_ts_df = (new_fire_df
.withColumn("IncidentDate", to_timestamp(col("CallDate"), "M/d/y"))
  .drop("CallDate")
  .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "M/d/y"))
  .drop("WatchDate")
  .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"),
  "M/d/y h:m"))
  .drop("AvailableDtTm"))

In [None]:
#Projection and aggregation of few columns
fire_ts_df.select("IncidentDate","OnWatchDate","AvailableDtTs").show(5,truncate=False)
fire_ts_df.select("CallType")\
    .where(F.col("CallType").isNotNull())\
        .groupBy("CallType")\
        .count()\
        .orderBy("count", ascending=False)\
        .show(n=10, truncate=False)

In [None]:
#Trying various types of Aggregation Functions
fire_ts_df.select(F.sum("NumAlarms"),F.avg("ResponseDelayedinMins"),F.min("ResponseDelayedinMins"),F.max("ResponseDelayedinMins")).show()

In [None]:
#What were all the different types of fire calls in the year of 2018?
fire_ts_df.select("CallType").filter(year(F.col("IncidentDate"))==2018).distinct().show()

In [None]:
#What months within the year 2018 saw the highest number of fire calls?
ne_fire=fire_ts_df.withColumn("Month",month("IncidentDate")).filter(year(F.col("IncidentDate"))==2018)
sa=ne_fire.groupBy("Month").agg(count("Month").alias("Count")).orderBy("Count",ascending=False)
sa.show()

In [None]:
sc.stop()