In [22]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('RDD Exampl').getOrCreate()

In [23]:
from pyspark.sql.functions import col
from pyspark.sql.functions import weekofyear
from pyspark.sql.functions import month
from pyspark.sql.functions import year, to_date
from pyspark.sql.functions import when
from pyspark.sql.functions import avg
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import count

In [24]:
df = spark.read.csv("/home/sois/Documents/lab/sf-fire-calls.csv", header=True, inferSchema=True)
df.show(5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

In [25]:
#1. Get yearly count of fire calls

In [26]:
df = df.withColumn("CallDate", to_date("CallDate", "MM/dd/yyyy"))
yearly_calls = df.withColumn("Year", year("CallDate")).groupBy("Year").count().orderBy("Year")
yearly_calls.show()

+----+-----+
|Year|count|
+----+-----+
|2000| 5459|
|2001| 7713|
|2002| 8090|
|2003| 8499|
|2004| 8283|
|2005| 8282|
|2006| 8174|
|2007| 8255|
|2008| 8869|
|2009| 8789|
|2010| 9341|
|2011| 9735|
|2012| 9674|
|2013|10020|
|2014|10775|
|2015|11458|
|2016|11609|
|2017|12135|
|2018|10136|
+----+-----+



In [27]:
#2. What were all the different types of fire calls in 2018?

In [28]:
fire_calls_2018 = df.filter(year("CallDate") == 2018).select("CallType").distinct().filter(col("CallType").isNotNull())
fire_calls_2018.show(1)

+--------------------+
|            CallType|
+--------------------+
|Elevator / Escala...|
+--------------------+
only showing top 1 row



In [29]:
#3. Which week in the year in 2018 had the most fire calls?

In [30]:
weekly_counts_2018 = df.filter(year("CallDate") == 2018).withColumn("Week", weekofyear("CallDate")).groupBy("Week").count().orderBy(col("count").desc())
weekly_counts_2018.show(1)

+----+-----+
|Week|count|
+----+-----+
|  22|  259|
+----+-----+
only showing top 1 row



In [31]:
#4. Get monthly count of fire calls based on year

In [32]:
monthly_counts = df.withColumn("Year", year("CallDate")).withColumn("Month", month("CallDate")).groupBy("Year", "Month").count().orderBy("Year", "Month")
monthly_counts.show()

+----+-----+-----+
|Year|Month|count|
+----+-----+-----+
|2000|    4|  335|
|2000|    5|  680|
|2000|    6|  585|
|2000|    7|  668|
|2000|    8|  678|
|2000|    9|  655|
|2000|   10|  620|
|2000|   11|  595|
|2000|   12|  643|
|2001|    1|  622|
|2001|    2|  613|
|2001|    3|  692|
|2001|    4|  636|
|2001|    5|  682|
|2001|    6|  672|
|2001|    7|  646|
|2001|    8|  660|
|2001|    9|  577|
|2001|   10|  673|
|2001|   11|  619|
+----+-----+-----+
only showing top 20 rows



In [33]:
#5. Give monthly report of fire call types for selected year

In [34]:
monthly_report = df.filter(year("CallDate") == 2018).withColumn("Month", month("CallDate")).groupBy("Month", "CallType").count().orderBy("Month", "count", ascending=[True, False])
monthly_report.show(truncate=False)

+-----+-------------------------------+-----+
|Month|CallType                       |count|
+-----+-------------------------------+-----+
|1    |Medical Incident               |692  |
|1    |Alarms                         |122  |
|1    |Structure Fire                 |91   |
|1    |Traffic Collision              |42   |
|1    |Citizen Assist / Service Call  |15   |
|1    |Outside Fire                   |14   |
|1    |Gas Leak (Natural and LP Gases)|5    |
|1    |Water Rescue                   |4    |
|1    |Vehicle Fire                   |4    |
|1    |Electrical Hazard              |3    |
|1    |Elevator / Escalator Rescue    |3    |
|1    |Other                          |3    |
|1    |Smoke Investigation (Outside)  |3    |
|1    |Odor (Strange / Unknown)       |2    |
|1    |Train / Rail Incident          |2    |
|1    |Fuel Spill                     |1    |
|1    |HazMat                         |1    |
|2    |Medical Incident               |635  |
|2    |Alarms                     

In [35]:
#6. Give top five fire call types for every season of selected year (seasons are like Spring, summer, fall winter etc).

In [36]:
df_2018 = df.filter(year("CallDate") == 2018).withColumn("Month", month("CallDate")).withColumn("Season", when(col("Month").isin(12, 1, 2), "Winter").when(col("Month").isin(3, 4, 5), "Spring").when(col("Month").isin(6, 7, 8), "Summer").when(col("Month").isin(9, 10, 11), "Fall"))
windowSpec = Window.partitionBy("Season").orderBy(col("count").desc())
season_top5 = df_2018.groupBy("Season", "CallType").count().withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 5)
season_top5.select("Season", "CallType", "count").orderBy("Season", "count", ascending=[True, False]).show(truncate=False)

+------+-----------------+-----+
|Season|CallType         |count|
+------+-----------------+-----+
|Fall  |Medical Incident |1514 |
|Fall  |Alarms           |251  |
|Fall  |Structure Fire   |201  |
|Fall  |Traffic Collision|100  |
|Fall  |Outside Fire     |39   |
|Spring|Medical Incident |2110 |
|Spring|Alarms           |333  |
|Spring|Structure Fire   |261  |
|Spring|Traffic Collision|133  |
|Spring|Other            |36   |
|Summer|Medical Incident |2053 |
|Summer|Alarms           |336  |
|Summer|Structure Fire   |262  |
|Summer|Traffic Collision|121  |
|Summer|Outside Fire     |61   |
|Winter|Medical Incident |1327 |
|Winter|Alarms           |224  |
|Winter|Structure Fire   |182  |
|Winter|Traffic Collision|79   |
|Winter|Outside Fire     |28   |
+------+-----------------+-----+



In [37]:
#7. Whether fire type calls are seasonal?

In [38]:
seasonal_call_counts = df_2018.groupBy("Season", "CallType").count().orderBy("CallType", "Season")
seasonal_call_counts.show(truncate=False)

+------+-----------------------------+-----+
|Season|CallType                     |count|
+------+-----------------------------+-----+
|Fall  |Alarms                       |251  |
|Spring|Alarms                       |333  |
|Summer|Alarms                       |336  |
|Winter|Alarms                       |224  |
|Winter|Assist Police                |1    |
|Fall  |Citizen Assist / Service Call|35   |
|Spring|Citizen Assist / Service Call|26   |
|Summer|Citizen Assist / Service Call|25   |
|Winter|Citizen Assist / Service Call|27   |
|Fall  |Electrical Hazard            |5    |
|Spring|Electrical Hazard            |6    |
|Summer|Electrical Hazard            |11   |
|Winter|Electrical Hazard            |8    |
|Fall  |Elevator / Escalator Rescue  |8    |
|Spring|Elevator / Escalator Rescue  |13   |
|Summer|Elevator / Escalator Rescue  |10   |
|Winter|Elevator / Escalator Rescue  |5    |
|Spring|Explosion                    |1    |
|Fall  |Fuel Spill                   |1    |
|Spring|Fu

In [39]:
#8. What months within the year 2018 saw the highest number of fire calls?

In [40]:
df_2018.groupBy("Month").count().orderBy(col("count").desc()).show()

+-----+-----+
|Month|count|
+-----+-----+
|   10| 1068|
|    5| 1047|
|    3| 1029|
|    8| 1021|
|    1| 1007|
|    7|  974|
|    6|  974|
|    9|  951|
|    4|  947|
|    2|  919|
|   11|  199|
+-----+-----+



In [41]:
#9. Find which type of fire call is major calltype in each year

In [46]:
df_with_year = df.withColumn("Year", year("CallDate"))
major_calltypes_by_year = df_with_year.groupBy("Year", "CallType").count()
window_year = Window.partitionBy("Year").orderBy(col("count").desc())
top_calltype_per_year = major_calltypes_by_year.withColumn("rank", row_number().over(window_year)).filter(col("rank") == 1)
top_calltype_per_year.select("Year", "CallType", "count").orderBy("Year").show()

+----+----------------+-----+
|Year|        CallType|count|
+----+----------------+-----+
|2000|Medical Incident| 3408|
|2001|Medical Incident| 4653|
|2002|Medical Incident| 5046|
|2003|Medical Incident| 5056|
|2004|Medical Incident| 5137|
|2005|Medical Incident| 5084|
|2006|Medical Incident| 5027|
|2007|Medical Incident| 5114|
|2008|Medical Incident| 5692|
|2009|Medical Incident| 5671|
|2010|Medical Incident| 6186|
|2011|Medical Incident| 6413|
|2012|Medical Incident| 6296|
|2013|Medical Incident| 6690|
|2014|Medical Incident| 7176|
|2015|Medical Incident| 7812|
|2016|Medical Incident| 7999|
|2017|Medical Incident| 8330|
|2018|Medical Incident| 7004|
+----+----------------+-----+



In [47]:
#10. Find out average delay in response for each call type

In [48]:
avg_delay_per_calltype = df.groupBy("CallType").agg(avg("Delay").alias("AvgDelay")).orderBy("AvgDelay", ascending=False)
avg_delay_per_calltype.show(truncate=False)

+-----------------------------------+------------------+
|CallType                           |AvgDelay          |
+-----------------------------------+------------------+
|Mutual Aid / Assist Outside Agency |38.416666311111115|
|Assist Police                      |26.981903994285716|
|Train / Rail Incident              |16.4520467631579  |
|Administrative                     |12.261111333333332|
|HazMat                             |7.527016126612901 |
|Marine Fire                        |6.928571314285715 |
|Confined Space / Structure Collapse|6.915384576923078 |
|Watercraft in Distress             |6.8869048178571415|
|Suspicious Package                 |6.576666720000001 |
|High Angle Rescue                  |6.0489583750000016|
|Water Rescue                       |5.5077483421457005|
|Other                              |5.505155432421966 |
|Fuel Spill                         |5.492227982383421 |
|Citizen Assist / Service Call      |5.473342576604607 |
|Electrical Hazard             

In [49]:
#11. Find which calltype has maximum average delay time.

In [50]:
avg_delay_per_calltype.orderBy(col("AvgDelay").desc()).show(1, truncate=False)

+----------------------------------+------------------+
|CallType                          |AvgDelay          |
+----------------------------------+------------------+
|Mutual Aid / Assist Outside Agency|38.416666311111115|
+----------------------------------+------------------+
only showing top 1 row



In [51]:
#12. Which neighborhood in San Francisco generated the most fire calls in 2018?

In [52]:
df.filter((year("CallDate") == 2018) & (col("City") == "San Fransisco")).groupBy("Neighborhood").count().orderBy(col("count").desc()).show(1, truncate=False)

+------------+-----+
|Neighborhood|count|
+------------+-----+
+------------+-----+



In [53]:
#13. Which neighborhoods had the worst response times to fire calls in 2018?

In [54]:
df.filter(year("CallDate") == 2018).groupBy("Neighborhood").agg(avg("Delay").alias("AvgDelay")).orderBy(col("AvgDelay").desc()).show(truncate=False)

+------------------------------+------------------+
|Neighborhood                  |AvgDelay          |
+------------------------------+------------------+
|Chinatown                     |6.190314097905761 |
|Presidio                      |5.829227041449275 |
|Treasure Island               |5.4537037124999985|
|McLaren Park                  |4.744047642857143 |
|Bayview Hunters Point         |4.620561956877396 |
|Presidio Heights              |4.594131472394366 |
|Inner Sunset                  |4.438095199935065 |
|Inner Richmond                |4.364728682713179 |
|Financial District/South Beach|4.344084618290153 |
|Haight Ashbury                |4.266428599285713 |
|Seacliff                      |4.261111146666667 |
|West of Twin Peaks            |4.190952390857143 |
|Potrero Hill                  |4.190555557428572 |
|Pacific Heights               |4.180453718900522 |
|Tenderloin                    |4.101519516597269 |
|Oceanview/Merced/Ingleside    |3.947242180719426 |
|Excelsior  

In [55]:
#14. Find out calltype whose average response delay time is maximum, increases, decreases or has no relation over years.

In [56]:
call_delay_by_year = df.withColumn("Year", year("CallDate")).groupBy("Year", "CallType").agg(avg("Delay").alias("AvgDelay")).orderBy("CallType", "Year")
call_delay_by_year.show(truncate=False)

+----+------------------+------------------+
|Year|CallType          |AvgDelay          |
+----+------------------+------------------+
|2005|Administrative    |31.983334         |
|2006|Administrative    |1.8               |
|2017|Administrative    |3.0               |
|2000|Aircraft Emergency|3.905555533333333 |
|2001|Aircraft Emergency|2.616666675       |
|2002|Aircraft Emergency|4.14666662        |
|2003|Aircraft Emergency|13.166667         |
|2004|Aircraft Emergency|2.5916667         |
|2005|Aircraft Emergency|4.29166675        |
|2006|Aircraft Emergency|3.2111111166666664|
|2007|Aircraft Emergency|3.094444333333333 |
|2009|Aircraft Emergency|3.0083335         |
|2011|Aircraft Emergency|3.5944443333333336|
|2012|Aircraft Emergency|4.65833335        |
|2013|Aircraft Emergency|2.4333334         |
|2014|Aircraft Emergency|7.75              |
|2015|Aircraft Emergency|1.1333333         |
|2000|Alarms            |3.011146839308682 |
|2001|Alarms            |2.6232156389407737|
|2002|Alar

In [57]:
#15. For each year find out which city has more calltypes

In [58]:
df.withColumn("Year", year("CallDate")).groupBy("Year", "City").agg(countDistinct("CallType").alias("UniqueCallTypes")).orderBy("Year", col("UniqueCallTypes").desc()).show(truncate=False)

+----+----+---------------+
|Year|City|UniqueCallTypes|
+----+----+---------------+
|2000|SF  |18             |
|2000|TI  |6              |
|2000|DC  |2              |
|2000|HP  |2              |
|2000|PR  |2              |
|2000|FM  |1              |
|2000|SFO |1              |
|2000|YB  |1              |
|2001|SF  |20             |
|2001|TI  |6              |
|2001|DC  |2              |
|2001|YB  |2              |
|2001|BN  |1              |
|2001|SFO |1              |
|2001|FM  |1              |
|2001|HP  |1              |
|2001|PR  |1              |
|2002|SF  |20             |
|2002|TI  |5              |
|2002|HP  |3              |
+----+----+---------------+
only showing top 20 rows



In [59]:
#16. For every year find count of calltypes for 5 cities which has more calls.

In [60]:
top_5_cities = df.groupBy("City").count().orderBy(col("count").desc()).limit(5).select("City").rdd.flatMap(lambda x: x).collect()
df.filter(col("City").isin(top_5_cities)).withColumn("Year", year("CallDate")).groupBy("Year", "City", "CallType").count().orderBy("Year", "City", col("count").desc()).show(truncate=False)

+----+----+-------------------------------+-----+
|Year|City|CallType                       |count|
+----+----+-------------------------------+-----+
|2000|SF  |Medical Incident               |3399 |
|2000|SF  |Structure Fire                 |1002 |
|2000|SF  |Alarms                         |620  |
|2000|SF  |Citizen Assist / Service Call  |124  |
|2000|SF  |Other                          |99   |
|2000|SF  |Outside Fire                   |67   |
|2000|SF  |Vehicle Fire                   |30   |
|2000|SF  |Odor (Strange / Unknown)       |19   |
|2000|SF  |Elevator / Escalator Rescue    |15   |
|2000|SF  |Smoke Investigation (Outside)  |12   |
|2000|SF  |Fuel Spill                     |12   |
|2000|SF  |Gas Leak (Natural and LP Gases)|11   |
|2000|SF  |Water Rescue                   |7    |
|2000|SF  |Electrical Hazard              |6    |
|2000|SF  |Oil Spill                      |5    |
|2000|SF  |Industrial Accidents           |4    |
|2000|SF  |Train / Rail Incident          |2    |


In [61]:
#17. Is there a correlation between neighborhood, zip code, and number of fire calls?

In [62]:
df.groupBy("Neighborhood", "Zipcode").count().orderBy(col("count").desc()).show(truncate=False)

+------------------------------+-------+-----+
|Neighborhood                  |Zipcode|count|
+------------------------------+-------+-----+
|Tenderloin                    |94102  |17084|
|South of Market               |94103  |13762|
|Mission                       |94110  |10444|
|Bayview Hunters Point         |94124  |9150 |
|Mission                       |94103  |5445 |
|Tenderloin                    |94109  |5377 |
|Financial District/South Beach|94105  |4235 |
|Outer Richmond                |94121  |4121 |
|Nob Hill                      |94109  |3983 |
|Castro/Upper Market           |94114  |3946 |
|Western Addition              |94115  |3934 |
|North Beach                   |94133  |3706 |
|Sunset/Parkside               |94122  |3404 |
|Marina                        |94123  |3360 |
|Excelsior                     |94112  |3237 |
|Bernal Heights                |94110  |3109 |
|Sunset/Parkside               |94116  |3025 |
|Hayes Valley                  |94102  |2814 |
|Lakeshore   