In [1]:
import findspark
findspark.init()

In [4]:
import os
import pandas as pd
import pyspark as ps

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql.types import *

In [1]:
HOME = os.path.expanduser("~")
DATA_DIR = os.path.join(HOME,r'Data')

In [2]:
databricks_dir = os.path.join(DATA_DIR,r'databricks')

### Reading DataFrame

Also we can infer a schema but a better option would be to define types programatically.

The spark.read.csv() function reads in the CSV file and returns a DataFrame of rows and named columns with the types dictated in the schema.

In [5]:
# Programmatic way to define a schema 
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                StructField('UnitID', StringType(), True),
                StructField('IncidentNumber', IntegerType(), True),
                StructField('CallType', StringType(), True),                  
                StructField('CallDate', StringType(), True),      
                StructField('WatchDate', StringType(), True),
                StructField('CallFinalDisposition', StringType(), True),
                StructField('AvailableDtTm', StringType(), True),
                StructField('Address', StringType(), True),       
                StructField('City', StringType(), True),       
                StructField('Zipcode', IntegerType(), True),       
                StructField('Battalion', StringType(), True),                 
                StructField('StationArea', StringType(), True),       
                StructField('Box', StringType(), True),       
                StructField('OriginalPriority', StringType(), True),       
                StructField('Priority', StringType(), True),       
                StructField('FinalPriority', IntegerType(), True),       
                StructField('ALSUnit', BooleanType(), True),       
                StructField('CallTypeGroup', StringType(), True),
                StructField('NumAlarms', IntegerType(), True),
                StructField('UnitType', StringType(), True),
                StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                StructField('FirePreventionDistrict', StringType(), True),
                StructField('SupervisorDistrict', StringType(), True),
                StructField('Neighborhood', StringType(), True),
                StructField('Location', StringType(), True),
                StructField('RowID', StringType(), True),
                StructField('Delay', FloatType(), True)])

In [6]:
# Use the DataFrameReader interface to read a CSV file
sf_fire_file = os.path.join(databricks_dir, r"sf-fire-calls.csv")
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

To write the DataFrame into an external data source in your format of choice, you can use the DataFrameWriter interface. Like DataFrameReader, it supports multiple data sources. Parquet, a popular columnar format, is the default format; it uses snappy compression to compress the data. If the DataFrame is written as Parquet, the schema is preserved as part of the Parquet metadata. In this case, subsequent reads back into a DataFrame do not require you to manually supply a schema.

### Saving a DataFrame as a Parquet file or SQL table

In [9]:
# parquet_path = os.path.join(databricks_dir, r'f-fire-calls.parquet')
# fire_df.write.format("parquet").save(parquet_path)

### Transformations and actions

Are they of the correct types? Do any of them need to be converted to different types? Do they have null values?

#### Projections and filters

A projection in relational parlance is a way to return only the rows matching a certain relational condition by using filters. In Spark, projections are done with the select() method, while filters can be expressed using the filter() or where() method. 

In [10]:
few_fire_df = (fire_df
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .filter(col("CallType") != "Medical Incident"))
few_fire_df.show(5, truncate=False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



What if we want to know how many distinct CallTypes were recorded as the causes of the fire calls? These simple and expressive queries do the job:

In [11]:
(fire_df
  .select("CallType")
  .where(col("CallType").isNotNull())
  .agg(countDistinct("CallType").alias("DistinctCallTypes"))
  .show())



+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+





We can list the distinct call types in the data set using these queries:

In [15]:
(fire_df
  .select("CallType")
  .where(col("CallType").isNotNull())
  .distinct()
  .show(10, False))

+-----------------------------------+
|CallType                           |
+-----------------------------------+
|Elevator / Escalator Rescue        |
|Marine Fire                        |
|Aircraft Emergency                 |
|Confined Space / Structure Collapse|
|Administrative                     |
|Alarms                             |
|Odor (Strange / Unknown)           |
|Citizen Assist / Service Call      |
|HazMat                             |
|Watercraft in Distress             |
+-----------------------------------+
only showing top 10 rows



#### Renaming, adding, and dropping columns

By specifying the desired column names in the schema with StructField, as we did, we effectively changed all names in the resulting DataFrame.

Alternatively, you could selectively rename columns with the **withColumnRenamed()** method. For instance, let’s change the name of our Delay column to ResponseDelayedinMins and take a look at the response times that were longer than five minutes:

In [13]:
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
(new_fire_df
  .select("ResponseDelayedinMins")
  .where(col("ResponseDelayedinMins") > 5)
  .show(5, False))

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



To convert into a more usable format. **spark.sql.functions** has a set of to/from date/timestamp functions such as to_timestamp() and to_date()

    * Convert the existing column’s data type from string to a Spark-supported timestamp.
    * Use the new format specified in the format string "MM/dd/yyyy" or "MM/dd/yyyy hh:mm:ss a" where appropriate.
    * After converting to the new data type, drop() the old column and append the new one specified in the first argument to the withColumn() method.
    * Assign the new modified DataFrame to fire_ts_df.

In [12]:
# In Python
fire_ts_df = (new_fire_df
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
  .drop("CallDate") 
  .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
  .drop("WatchDate") 
  .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), 
  "MM/dd/yyyy hh:mm:ss a"))
  .drop("AvailableDtTm"))

# Select the converted columns
(fire_ts_df
  .select("IncidentDate", "OnWatchDate", "AvailableDtTS")
  .show(5, False))

NameError: name 'new_fire_df' is not defined

Now that we have modified the dates, we can query using functions from spark.sql.functions like dayofmonth(), dayofyear(), and dayofweek() to explore our data further. 

We could find out how many calls were logged in the last seven days, or we could see how many years’ worth of Fire Department calls are included in the data set with this query:

In [21]:
(fire_ts_df
  .select(year('IncidentDate'))
  .distinct()
  .orderBy(year('IncidentDate'))
  .show())



+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



                                                                                

#### Aggregations

Let’s take our first question: what were the most common types of fire calls?

In [22]:
(fire_ts_df
  .select("CallType")
  .where(col("CallType").isNotNull())
  .groupBy("CallType")
  .count()
  .orderBy("count", ascending=False)
  .show(n=10, truncate=False))

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



21/10/27 23:36:14 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 142029 ms exceeds timeout 120000 ms
21/10/27 23:36:14 WARN SparkContext: Killing executors is not supported by current scheduler.


The DataFrame API also offers the collect() method, but for extremely large DataFrames this is resource-heavy (expensive) and dangerous, as it can cause out-of-memory (OOM) exceptions. Unlike count(), which returns a single number to the driver, collect() returns a collection of all the Row objects in the entire DataFrame or Dataset. If you want to take a peek at some Row records you’re better off with take(n), which will return only the first n Row objects of the DataFrame.

In [23]:
(fire_ts_df
  .select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"),
    F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins"))
  .show())

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+



In [26]:
fire_ts_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Location: string (nullable =