##### 1. Create a data frame using the /databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv

In [0]:
raw_fire_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema","true") \
    .load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv")

##### 2. Transform the data frame to rename columns removing the space in the column names

In [0]:
renamed_fire_df = raw_fire_df \
    .withColumnRenamed("Call Number", "CallNumber") \
    .withColumnRenamed("Unit ID", "UnitID") \
    .withColumnRenamed("Incident Number", "IncidentNumber") \
    .withColumnRenamed("Call Date", "CallDate") \
    .withColumnRenamed("Watch Date", "WatchDate") \
    .withColumnRenamed("Call Final Disposition", "CallFinalDisposition") \
    .withColumnRenamed("Available DtTm", "AvailableDtTm") \
    .withColumnRenamed("Zipcode of Incident", "Zipcode") \
    .withColumnRenamed("Station Area", "StationArea") \
    .withColumnRenamed("Final Priority", "FinalPriority") \
    .withColumnRenamed("ALS Unit", "ALSUnit") \
    .withColumnRenamed("Call Type Group", "CallTypeGroup") \
    .withColumnRenamed("Unit sequence in call dispatch", "UnitSequenceInCallDispatch") \
    .withColumnRenamed("Fire Prevention District", "FirePreventionDistrict") \
    .withColumnRenamed("Supervisor District", "SupervisorDistrict")

##### 3. Transform the data frame to fix the date and timestamp column types

In [0]:
from pyspark.sql.functions import to_date, to_timestamp, round
fire_df = renamed_fire_df \
    .withColumn("CallDate", to_date("CallDate", "MM/dd/yyyy")) \
    .withColumn("WatchDate", to_date("WatchDate", "MM/dd/yyyy")) \
    .withColumn("AvailableDtTm", to_timestamp("AvailableDtTm", "MM/dd/yyyy hh:mm:ss a")) \
    .withColumn("Delay", round("Delay", 2))

##### 4. Verify your data frame

In [0]:
display(fire_df)

##### 5. Cache your dataframe

In [0]:
fire_df.cache()

##### Q1. How many distinct types of calls were made to the Fire Department?

In [0]:
q1_df = fire_df.where("CallType is not null") \
            .select("CallType") \
            .distinct()
print(q1_df.count())

##### Q2. What were distinct types of calls made to the Fire Department?

In [0]:
from pyspark.sql.functions import expr
q2_df = fire_df.where("CallType is not null") \
            .select(expr("CallType as distinct_call_type")) \
            .distinct()
display(q2_df)

##### Q3. Find out all response for delayed times greater than 5 mins?

In [0]:
fire_df.where("Delay > 5") \
    .select("CallNumber", "Delay") \
    .show()

##### Q4. What were the most common call types?

In [0]:
fire_df.select("CallType") \
    .where("CallType is not null") \
    .groupBy("CallType") \
    .count() \
    .orderBy("count", ascending=False) \
    .show()

##### Q5. What zip codes accounted for most common calls?

In [0]:
fire_df.select("CallType", "ZipCode") \
    .where("CallType is not null") \
    .groupBy("CallType", "Zipcode") \
    .count() \
    .orderBy("count", ascending=False) \
    .show()

##### Q6. What San Francisco neighborhoods are in the zip codes 94102 and 94103

In [0]:
fire_df.select("Zipcode", "Neighborhood") \
       .where("Zipcode == 94102 OR Zipcode == 94103") \
       .distinct() \
       .show(truncate=False)

##### Q7. What was the sum of all calls, average, min and max of the response times for calls?

In [0]:
from pyspark.sql.functions import sum, avg, min, max
fire_df.select(sum("NumAlarms"), 
               avg("Delay"), 
               min("Delay"), 
               max("Delay")) \
        .show()

##### Q8. How many distinct years of data is in the CSV file?

In [0]:
from pyspark.sql.functions import year
fire_df.select(year('CallDate')) \
       .distinct() \
       .orderBy(year('CallDate')) \
       .show()

##### Q9. What week of the year in 2018 had the most fire calls?

In [0]:
from pyspark.sql.functions import weekofyear
fire_df.filter(year('CallDate') == 2018) \
       .groupBy(weekofyear('CallDate').alias("week_year")) \
       .count() \
       .orderBy('count', ascending=False) \
       .show()

##### Q10. What neighborhoods in San Francisco had the worst response time in 2018?

In [0]:
fire_df.select("Neighborhood", "Delay") \
       .filter(year("CallDate") == 2018) \
       .orderBy("Delay", ascending=False) \
       .show(10, False)