In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.appName('fireCalls').getOrCreate()

In [None]:
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                StructField('UnitID', StringType(), True),
                StructField('IncidentNumber', IntegerType(), True),
                StructField('CallType', StringType(), True),                  
                StructField('CallDate', StringType(), True),      
                StructField('WatchDate', StringType(), True),
                StructField('CallFinalDisposition', StringType(), True),
                StructField('AvailableDtTm', StringType(), True),
                StructField('Address', StringType(), True),       
                StructField('City', StringType(), True),       
                StructField('Zipcode', IntegerType(), True),       
                StructField('Battalion', StringType(), True),                 
                StructField('StationArea', StringType(), True),       
                StructField('Box', StringType(), True),       
                StructField('OriginalPriority', StringType(), True),       
                StructField('Priority', StringType(), True),       
                StructField('FinalPriority', IntegerType(), True),       
                StructField('ALSUnit', BooleanType(), True),       
                StructField('CallTypeGroup', StringType(), True),
                StructField('NumAlarms', IntegerType(), True),
                StructField('UnitType', StringType(), True),
                StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                StructField('FirePreventionDistrict', StringType(), True),
                StructField('SupervisorDistrict', StringType(), True),
                StructField('Neighborhood', StringType(), True),
                StructField('Location', StringType(), True),
                StructField('RowID', StringType(), True),
                StructField('Delay', FloatType(), True)])

In [None]:
fire_df = spark.read.csv("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv",header=True,schema=fire_schema)

In [None]:
fire_df.head()

In [None]:
fire_df.columns

In [None]:
few_fire_df=fire_df.select("IncidentNumber","AvailableDtTm","CallTypeGroup").where(fire_df["CallTypeGroup"]!="Medical Incident")
few_fire_df.show(5,truncate=False)

In [None]:
# In Python, return number of distinct types of calls using countDistinct()
from pyspark.sql.functions import *
distCallType2 = fire_df.select('CallType').where(col('CallType').isNotNull()).agg(countDistinct('CallType'))
distCallType2.show()

In [None]:
# In Python, filter for only distinct non-null CallTypes from all the rows
fire_df.select("CallType").where(col('CallType').isNotNull()).distinct().show(10)

In [None]:
new_fire_df=fire_df.withColumnRenamed('Delay','ResponseDelayedinMin')
new_fire_df.select('ResponseDelayedinMin').where(col('ResponseDelayedinMin') > 5).show(5,truncate=False)

In [None]:
fire_ts_df = new_fire_df.withColumn('IncidentDate',to_timestamp(col('CallDate'),'MM/dd/yyyy')).drop('CallDate')\
.withColumn('OnWatchDate',to_timestamp(col('WatchDate'),'MM/dd/yyyy')).drop('WatchDate')\
.withColumn('AvailableDtTS',to_timestamp(col('AvailableDtTm'),'MM/dd/yyyy hh:mm:ss a')).drop('AvailableDtTm')
fire_ts_df.select('IncidentDate','OnWatchDate','AvailableDtTS').show(5,truncate=False)    

In [None]:
fire_ts_df.select(year('IncidentDate')).distinct().orderBy(year(col('IncidentDate'))).show()

In [None]:
# what were the most common types of fire calls?
fire_ts_df.select('CallType').where(col('CallType').isNotNull()).groupBy('CallType').count().orderBy('count', ascending=False).show(10,truncate=False)

In [None]:
#compute the sum of alarms, the average response time, and the minimum and maximum response times to all fire calls
import pyspark.sql.functions as F
fire_ts_df.select(F.sum('NumAlarms'),F.avg('ResponseDelayedinMin'),F.min('ResponseDelayedinMin'),F.max('ResponseDelayedinMin')).show()