## Example Large Dataframe from CSV with defined Schema

In [1]:
# Prerequisites
from pyspark.sql import SparkSession
from pyspark.sql.functions import * 
from pyspark.sql.types import *


In [2]:
# Get SparkSession
spark = SparkSession.builder.master("local").getOrCreate()
print("Spark Version: ", spark.version)

Spark Version:  3.5.0


### Read CSV file with Schema

In [3]:
# Programmaticly define the schema 
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                StructField('UnitID', StringType(), True),
                StructField('IncidentNumber', IntegerType(), True),
                StructField('CallType', StringType(), True),                  
                StructField('CallDate', StringType(), True),      
                StructField('WatchDate', StringType(), True),
                StructField('CallFinalDisposition', StringType(), True),
                StructField('AvailableDtTm', StringType(), True),
                StructField('Address', StringType(), True),       
                StructField('City', StringType(), True),       
                StructField('Zipcode', IntegerType(), True),       
                StructField('Battalion', StringType(), True),                 
                StructField('StationArea', StringType(), True),
				StructField('Box', StringType(), True),       
                StructField('OriginalPriority', StringType(), True),       
                StructField('Priority', StringType(), True),       
                StructField('FinalPriority', IntegerType(), True),       
                StructField('ALSUnit', BooleanType(), True),       
                StructField('CallTypeGroup', StringType(), True),
                StructField('NumAlarms', IntegerType(), True),
                StructField('UnitType', StringType(), True),
                StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                StructField('FirePreventionDistrict', StringType(), True),
                StructField('SupervisorDistrict', StringType(), True),
                StructField('Neighborhood', StringType(), True),
                StructField('Location', StringType(), True),
                StructField('RowID', StringType(), True),
                StructField('Delay', FloatType(), True)])


In [4]:
# Use the DataFrameReader interface to read a CSV file
df_sf_fire = spark.read.csv("data/sf_fire_calls.csv", header=True, schema=fire_schema)
df_sf_fire.show()


+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+

In [15]:
# Number of rows
print("Number of rows: ", df_sf_fire.count())

Number of rows:  175296


In [16]:
# Print the Schama
df_sf_fire.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

### Save it to Parquet file (Parquet file contains the schema)

In [6]:
df_sf_fire.write.format("parquet").save("data/sf_fire_calls.parquet")

## Transformations and Actions

### Projections and Filters

A projection is a way to return only the rows matching a certain relational condition by using filters. In Spark, projections are done with the select() method, while filters can be expressed using the filter() or where() method.

In [5]:
df_sf_fire_few = (df_sf_fire.select("IncidentNumber", "AvailableDtTm", "CallType").where(col("CallType") != "Medical Incident"))
df_sf_fire_few.show(10, truncate=False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
|2003304       |01/11/2002 09:58:53 AM|Alarms        |
|2003382       |01/11/2002 02:59:04 PM|Structure Fire|
|2003408       |01/11/2002 04:09:08 PM|Structure Fire|
|2003408       |01/11/2002 04:09:08 PM|Structure Fire|
|2003408       |01/11/2002 04:09:08 PM|Structure Fire|
+--------------+----------------------+--------------+
only showing top 10 rows



In [6]:
# Count how many different CallTypes
(df_sf_fire.select("CallType").where(col("CallType").isNotNull())
    .agg(countDistinct("CallType").alias("DistincCallTypes"))
    .show())

+----------------+
|DistincCallTypes|
+----------------+
|              30|
+----------------+



In [7]:
# List the distinct call types
(df_sf_fire.select("CallType").where(col("CallType").isNotNull()).distinct().show(20, truncate=False))

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

### Renaming, adding, and dropping columns
Spaces in column names can be problematic, especially when saving a DataFrame to a Parquet file (which prohibits this).

In [8]:
# Rename Delay Column to ResponseDelayedInMins
df_sf_fire_delay_mins = df_sf_fire.withColumnRenamed("Delay", "ResponseDelayedInMins")
(df_sf_fire_delay_mins.select("ResponseDelayedInMins")
                        .where(col("ResponseDelayedInMins") > 5)
                        .show(10, False))

+---------------------+
|ResponseDelayedInMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
|11.916667            |
|5.116667             |
|8.633333             |
|95.28333             |
|5.45                 |
+---------------------+
only showing top 10 rows



### Converting Column Data

The columns CallDate, WatchDate, and AlarmDtTm are strings rather than either Unix timestamps or SQL dates, both of which Spark supports and can easily manipulate during transformations or actions

In [9]:
# Convert String Dates and Times to timestamps
df_sf_fire_timestamps = (df_sf_fire_delay_mins
                         .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
                         .drop("CallDate")
                         .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
                         .drop("WatchDate")
                         .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a"))
                         .drop("AvailableDtTm"))

# Select converted columns
(df_sf_fire_timestamps.select("IncidentDate", "OnWatchDate", "AvailableDtTS").show(5, False))


+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



With converted timestamps one can query using functions from spark.sql.functions, such as dayofmonth(), dayofyear(), and dayofweek()

In [10]:
# Show distinct Incident Years
(df_sf_fire_timestamps
    .select(year("IncidentDate"))
    .distinct()
    .orderBy(year("IncidentDate"))
    .show(5))

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
+------------------+
only showing top 5 rows



### Aggregations
groupBy(), orderBy(), and count() can aggregate by column names and then further aggregate counts across them.


In [11]:
# What are the most common types of fire calls
df_sf_fire_calltypes = (df_sf_fire_timestamps
    .select("CallType")
    .where(col("CallType").isNotNull())
    .groupBy("CallType")
    .count()
    .orderBy("count", ascending=False))

df_sf_fire_calltypes.show(n=10, truncate=False)


+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



### Other Common Operations

The DataFrame API provides descriptive statistical methods like min(), max(), sum(), and avg().

In [14]:
# Compute the sum of alarms, the average response time, 
# and the minimum and maximum response times to all fire calls

(df_sf_fire_timestamps
    .select(sum("NumAlarms"), avg("ResponseDelayedInMins"), 
    min("ResponseDelayedInMins"), max("ResponseDelayedInMins")).show(truncate=False))


+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedInMins)|min(ResponseDelayedInMins)|max(ResponseDelayedInMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|176170        |3.892364154521585         |0.016666668               |1844.55                   |
+--------------+--------------------------+--------------------------+--------------------------+



For more advanced statistical methods, check out stat(), describe(), correlation(), covariance(), sampleBy(), approxQuantile(), frequentItems(), etc.

### More Interesting Data Analysis examples

What zip codes in San Francisco accounted for most fire calls and what type where they?<br />

- Filter out by CallType
- Group them by CallType and Zip code
- Count them and display them in descending order

In [17]:
# Which zip codes accounted for most fire calls
(df_sf_fire_timestamps
    .select("CallType", "ZipCode")
    .where(col("CallType").isNotNull())
    .groupBy("CallType","ZipCode")
    .count()
    .orderBy("count", ascending=False)
    .show(n=10, truncate=False)
)

+----------------+-------+-----+
|CallType        |ZipCode|count|
+----------------+-------+-----+
|Medical Incident|94102  |16130|
|Medical Incident|94103  |14775|
|Medical Incident|94110  |9995 |
|Medical Incident|94109  |9479 |
|Medical Incident|94124  |5885 |
|Medical Incident|94112  |5630 |
|Medical Incident|94115  |4785 |
|Medical Incident|94122  |4323 |
|Medical Incident|94107  |4284 |
|Medical Incident|94133  |3977 |
+----------------+-------+-----+
only showing top 10 rows



What San Francisco neighborhoods are in the zip codes 94102 and 94103?

In [20]:
(df_sf_fire_timestamps
    .select("Neighborhood", "Zipcode")
    .where((col("Zipcode") == 94102) | (col("Zipcode") == 94103))
    .distinct()
    .show(10, False))

+------------------------------+-------+
|Neighborhood                  |Zipcode|
+------------------------------+-------+
|Potrero Hill                  |94103  |
|Western Addition              |94102  |
|Tenderloin                    |94102  |
|Nob Hill                      |94102  |
|Castro/Upper Market           |94103  |
|South of Market               |94102  |
|South of Market               |94103  |
|Hayes Valley                  |94103  |
|Financial District/South Beach|94102  |
|Mission Bay                   |94103  |
+------------------------------+-------+
only showing top 10 rows



Which week in year 2018 had the most fire calls?

In [22]:
(df_sf_fire_timestamps
    .filter(year("IncidentDate") == 2018)
    .groupBy(weekofyear("IncidentDate"))
    .count()
    .orderBy("count", ascending=False)
    .show(10)
)

+------------------------+-----+
|weekofyear(IncidentDate)|count|
+------------------------+-----+
|                      22|  259|
|                      40|  255|
|                      43|  250|
|                      25|  249|
|                       1|  246|
|                      44|  244|
|                      13|  243|
|                      32|  243|
|                      11|  240|
|                       5|  236|
+------------------------+-----+
only showing top 10 rows



Which neighborhoods in San Francisco had the worst response time in 2017?

In [25]:
(df_sf_fire_timestamps
    .select("Neighborhood", "ResponseDelayedInMins")
    .filter(year("IncidentDate") == 2017)
    .orderBy("ResponseDelayedInMins", ascending=False)
    .show(10, False)
)

+------------------------------+---------------------+
|Neighborhood                  |ResponseDelayedInMins|
+------------------------------+---------------------+
|South of Market               |1844.55              |
|Chinatown                     |931.45               |
|Mission                       |314.95               |
|Bayview Hunters Point         |308.56668            |
|Mission                       |302.88333            |
|Financial District/South Beach|171.7                |
|Financial District/South Beach|135.03334            |
|Lakeshore                     |92.28333             |
|Excelsior                     |89.51667             |
|Castro/Upper Market           |88.71667             |
+------------------------------+---------------------+
only showing top 10 rows

