**Aggregate functions**

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
data_path = 'Data\sf-fire-calls.csv'
df1 = spark.read.option('samplingRatio', 0.01)\
                .option('header', 'true')\
                .csv(data_path)

In [4]:
df1.printSchema()

root
 |-- CallNumber: string (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: string (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: string (nullable = true)
 |-- ALSUnit: string (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: string (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: string (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- Sup

In [5]:
from pyspark.sql.functions import *

df1.select('CallNumber', 'Location', 'RowID')\
   .where(col('CallNumber') == '20110016')\
   .show(5, truncate=False)

+----------+-------------------------------------+-------------+
|CallNumber|Location                             |RowID        |
+----------+-------------------------------------+-------------+
|20110016  |(37.7895840679362, -122.428071912459)|020110016-T13|
+----------+-------------------------------------+-------------+



In [6]:
df1.select('UnitType')\
   .where(df1['UnitType'] != 'ENGINE')\
   .agg(countDistinct('UnitType').alias('Distinct_UnitType'))\
   .show()

+-----------------+
|Distinct_UnitType|
+-----------------+
|                9|
+-----------------+



In [7]:
df1.select('UnitType').distinct().show()
# df1.select('UnitType').where(df1['UnitType'].isNotNull()).distinct().show(10,truncate=False)

+--------------+
|      UnitType|
+--------------+
|       AIRPORT|
|         MEDIC|
|         CHIEF|
|  RESCUE SQUAD|
|RESCUE CAPTAIN|
|         TRUCK|
| INVESTIGATION|
|        ENGINE|
|       SUPPORT|
|       PRIVATE|
+--------------+



In [8]:
df1.withColumnRenamed('UnitType', 'UnitType_').select('UnitType_').show()

+--------------+
|     UnitType_|
+--------------+
|         TRUCK|
|         MEDIC|
|         MEDIC|
|        ENGINE|
|         CHIEF|
|         TRUCK|
|        ENGINE|
|        ENGINE|
|        ENGINE|
|        ENGINE|
|         MEDIC|
|         TRUCK|
|        ENGINE|
|        ENGINE|
|         CHIEF|
|         CHIEF|
|        ENGINE|
|RESCUE CAPTAIN|
|        ENGINE|
|         MEDIC|
+--------------+
only showing top 20 rows



In [9]:
df1.groupBy('UnitType')\
   .agg(avg('Delay')).show()

+--------------+------------------+
|      UnitType|        avg(Delay)|
+--------------+------------------+
|       AIRPORT| 6.375418990603358|
|         MEDIC| 3.702183630021639|
|         CHIEF| 3.937236628227981|
|  RESCUE SQUAD|3.7178353647933635|
|RESCUE CAPTAIN| 6.469999208860462|
|         TRUCK|3.7983520445111414|
| INVESTIGATION| 34.90993382913908|
|        ENGINE|3.7321145478603457|
|       SUPPORT| 10.90008075285945|
|       PRIVATE| 4.116082512702082|
+--------------+------------------+



In [10]:
# Convert dates columns from string to timestamps.
# As you can see "CallDate" is string type but it worth to be timestamps.
df1 = df1.withColumn('CallDate', to_timestamp(df1['CallDate'], 'MM/dd/yyy'))
# As noticed it has been changed.
df1.printSchema()

root
 |-- CallNumber: string (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: string (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: timestamp (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: string (nullable = true)
 |-- ALSUnit: string (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: string (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: string (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- 

In [11]:
df1.select(year('CallDate').alias('Year'))\
   .distinct()\
   .orderBy('Year').show()

+----+
|Year|
+----+
|2000|
|2001|
|2002|
|2003|
|2004|
|2005|
|2006|
|2007|
|2008|
|2009|
|2010|
|2011|
|2012|
|2013|
|2014|
|2015|
|2016|
|2017|
|2018|
+----+



In [12]:
# To see the most common fire type.
df1.select('CallType')\
   .groupBy('CallType')\
   .agg(count('CallType').alias('Count_type'))\
   .orderBy('Count_type', ascending=False).show(truncate=False)

+-------------------------------+----------+
|CallType                       |Count_type|
+-------------------------------+----------+
|Medical Incident               |113794    |
|Structure Fire                 |23319     |
|Alarms                         |19406     |
|Traffic Collision              |7013      |
|Citizen Assist / Service Call  |2524      |
|Other                          |2166      |
|Outside Fire                   |2094      |
|Vehicle Fire                   |854       |
|Gas Leak (Natural and LP Gases)|764       |
|Water Rescue                   |755       |
|Odor (Strange / Unknown)       |490       |
|Electrical Hazard              |482       |
|Elevator / Escalator Rescue    |453       |
|Smoke Investigation (Outside)  |391       |
|Fuel Spill                     |193       |
|HazMat                         |124       |
|Industrial Accidents           |94        |
|Explosion                      |89        |
|Train / Rail Incident          |57        |
|Aircraft 

In [13]:
df1.groupBy('Zipcode')\
   .agg(count('Zipcode').alias('Count'))\
   .orderBy('Count', ascending=False).show()

+-------+-----+
|Zipcode|Count|
+-------+-----+
|  94102|21840|
|  94103|20897|
|  94110|14801|
|  94109|14686|
|  94124| 9236|
|  94112| 8421|
|  94115| 7812|
|  94107| 6941|
|  94122| 6355|
|  94133| 6246|
|  94117| 5804|
|  94114| 5175|
|  94118| 5157|
|  94134| 5009|
|  94121| 4555|
|  94132| 4321|
|  94105| 4236|
|  94108| 4084|
|  94116| 3933|
|  94123| 3719|
+-------+-----+
only showing top 20 rows



In [14]:
df1.groupBy('CallType')\
   .agg(count('Zipcode'), max('Delay'), avg('Delay'))\
   .orderBy(count('Zipcode')).show(truncate=False)

+--------------------------------------------+--------------+----------+------------------+
|CallType                                    |count(Zipcode)|max(Delay)|avg(Delay)        |
+--------------------------------------------+--------------+----------+------------------+
|Administrative                              |3             |31.983334 |12.261111333333332|
|Aircraft Emergency                          |3             |7.75      |3.7731481499999995|
|Mutual Aid / Assist Outside Agency          |9             |9.9       |38.416666311111115|
|Confined Space / Structure Collapse         |13            |7.2166667 |6.915384576923078 |
|Marine Fire                                 |14            |9.883333  |6.928571314285715 |
|Suspicious Package                          |15            |9.883333  |6.5766667199999995|
|Oil Spill                                   |21            |7.7833333 |4.977777761904761 |
|Extrication / Entrapped (Machinery, Vehicle)|28            |9.35      |4.391666