## Required Imports

In [1]:
import os 
import pandas as pd
import pyspark

from pyspark.sql import SparkSession, SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import isnan, when, count, col, isnull, format_number, countDistinct, avg
from pyspark.sql.functions import format_number,dayofmonth,hour,dayofyear,month,year,weekofyear,date_format

In [3]:
## Create a spark session
spark = SparkSession.builder.appName("Operations").getOrCreate()

## Spark DataFrame - Basic operations

In [4]:
## Load data into a spark dataframe
df = spark.read.csv('Seattle_Real_Time_Fire_911_Calls.csv',inferSchema = True, header = True)

In [5]:
df.printSchema()

root
 |-- Address: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Datetime: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Report Location: string (nullable = true)
 |-- Incident Number: string (nullable = true)



In [6]:
df.columns

['Address',
 'Type',
 'Datetime',
 'Latitude',
 'Longitude',
 'Report Location',
 'Incident Number']

In [10]:
print(df.describe().show())

+-------+--------------------+-----------+--------------------+-------------------+-------------------+--------------------+---------------+
|summary|             Address|       Type|            Datetime|           Latitude|          Longitude|     Report Location|Incident Number|
+-------+--------------------+-----------+--------------------+-------------------+-------------------+--------------------+---------------+
|  count|             1519421|    1519478|             1519478|            1518925|            1518925|             1518949|        1519478|
|   mean|              4809.5|       null|                null|  47.62000760881998|-122.33107982340742|                null|           null|
| stddev|   6351.940215398757|       null|                null|0.05518436853345341|0.03115947279882827|                null|           null|
|    min|"47�31'53n / 122�...|1RED 1 Unit|01/01/2004 01:05:...|          47.251232|        -122.476944|POINT (-121.89549...|               |
|    max|w st

In [11]:
df.toPandas().describe().transpose()

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
Latitude,1518925.0,47.620008,0.055184,47.251232,47.589017,47.613375,47.661661,47.776695
Longitude,1518925.0,-122.33108,0.031159,-122.476944,-122.348859,-122.330541,-122.312433,-121.895499


In [12]:
df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1519478 entries, 0 to 1519477
Data columns (total 7 columns):
 #   Column           Non-Null Count    Dtype  
---  ------           --------------    -----  
 0   Address          1519421 non-null  object 
 1   Type             1519478 non-null  object 
 2   Datetime         1519478 non-null  object 
 3   Latitude         1518925 non-null  float64
 4   Longitude        1518925 non-null  float64
 5   Report Location  1518949 non-null  object 
 6   Incident Number  1519478 non-null  object 
dtypes: float64(2), object(5)
memory usage: 81.1+ MB


## Filter operations

In [17]:
df.filter("Latitude>46").show(n = 5, truncate = False, vertical = True) # For better output view

-RECORD 0----------------------------------------
 Address         | 105 5th Av S                  
 Type            | Triaged Incident              
 Datetime        | 03/15/2020 12:16:00 AM        
 Latitude        | 47.601643                     
 Longitude       | -122.327656                   
 Report Location | POINT (-122.327656 47.601643) 
 Incident Number | F200026453                    
-RECORD 1----------------------------------------
 Address         | Nw 59th St / 15th Av Nw       
 Type            | Triaged Incident              
 Datetime        | 10/11/2019 11:34:00 PM        
 Latitude        | 47.67163                      
 Longitude       | -122.376212                   
 Report Location | POINT (-122.376212 47.67163)  
 Incident Number | F190108891                    
-RECORD 2----------------------------------------
 Address         | 1431 Minor Av                 
 Type            | Aid Response                  
 Datetime        | 12/27/2019 01:24:00 AM        


In [18]:
df.show(n = 5, truncate = False, vertical = True)

-RECORD 0----------------------------------------
 Address         | 105 5th Av S                  
 Type            | Triaged Incident              
 Datetime        | 03/15/2020 12:16:00 AM        
 Latitude        | 47.601643                     
 Longitude       | -122.327656                   
 Report Location | POINT (-122.327656 47.601643) 
 Incident Number | F200026453                    
-RECORD 1----------------------------------------
 Address         | Nw 59th St / 15th Av Nw       
 Type            | Triaged Incident              
 Datetime        | 10/11/2019 11:34:00 PM        
 Latitude        | 47.67163                      
 Longitude       | -122.376212                   
 Report Location | POINT (-122.376212 47.67163)  
 Incident Number | F190108891                    
-RECORD 2----------------------------------------
 Address         | 1431 Minor Av                 
 Type            | Aid Response                  
 Datetime        | 12/27/2019 01:24:00 AM        


In [19]:
df.filter("Latitude>46").select('Address').show(n = 5)

+--------------------+
|             Address|
+--------------------+
|        105 5th Av S|
|Nw 59th St / 15th...|
|       1431 Minor Av|
|         2209 1st Av|
|         1402 3rd Av|
+--------------------+
only showing top 5 rows



In [20]:
## Grab a single column
df['Type']

Column<b'Type'>

In [21]:
type(df['Type'])

pyspark.sql.column.Column

In [22]:
df.select('Type')

DataFrame[Type: string]

In [23]:
type(df.select('Type'))

pyspark.sql.dataframe.DataFrame

In [24]:
df.select('Type').show(n = 5, truncate = False, vertical = True)

-RECORD 0----------------
 Type | Triaged Incident 
-RECORD 1----------------
 Type | Triaged Incident 
-RECORD 2----------------
 Type | Aid Response     
-RECORD 3----------------
 Type | Aid Response     
-RECORD 4----------------
 Type | Rescue Elevator  
only showing top 5 rows



In [25]:
df.head(5) # Return list of row objects

[Row(Address='105 5th Av S', Type='Triaged Incident', Datetime='03/15/2020 12:16:00 AM', Latitude=47.601643, Longitude=-122.327656, Report Location='POINT (-122.327656 47.601643)', Incident Number='F200026453'),
 Row(Address='Nw 59th St / 15th Av Nw', Type='Triaged Incident', Datetime='10/11/2019 11:34:00 PM', Latitude=47.67163, Longitude=-122.376212, Report Location='POINT (-122.376212 47.67163)', Incident Number='F190108891'),
 Row(Address='1431 Minor Av', Type='Aid Response', Datetime='12/27/2019 01:24:00 AM', Latitude=47.613247, Longitude=-122.327187, Report Location='POINT (-122.327187 47.613247)', Incident Number='F190136279'),
 Row(Address='2209 1st Av', Type='Aid Response', Datetime='03/15/2020 12:19:00 AM', Latitude=47.612669, Longitude=-122.345319, Report Location='POINT (-122.345319 47.612669)', Incident Number='F200026454'),
 Row(Address='1402 3rd Av', Type='Rescue Elevator', Datetime='05/30/2019 11:22:00 AM', Latitude=47.608766, Longitude=-122.336894, Report Location='POIN

In [26]:
# Select multiple columns
df.select(['Address','Type'])

DataFrame[Address: string, Type: string]

In [27]:
df.select(['Address','Type']).show()

+--------------------+--------------------+
|             Address|                Type|
+--------------------+--------------------+
|        105 5th Av S|    Triaged Incident|
|Nw 59th St / 15th...|    Triaged Incident|
|       1431 Minor Av|        Aid Response|
|         2209 1st Av|        Aid Response|
|         1402 3rd Av|     Rescue Elevator|
|         500 17th Av|        Aid Response|
|       8319 8th Av S|    Triaged Incident|
| 201 Occidental Av S|    Triaged Incident|
|         1922 9th Av|Automatic Fire Al...|
|         1526 3rd Av|        Aid Response|
|10th Av E / E Lyn...|    Triaged Incident|
|     1504 Ne 90th St|Automatic Medical...|
|S Weller St / 5th...|      Medic Response|
|4225 Roosevelt Wa...|        Aid Response|
|3401 W Government...| Aid Response Yellow|
|    4120 Stone Way N|        Aid Response|
|Post Av / Columbi...|        Aid Response|
|   3204 Sw Morgan St|        Aid Response|
|   10756 Alton Av Ne|        Aid Response|
|          410 9th Av|        Tr

## Rename a column

In [29]:
df.withColumnRenamed('Address','Location').show(n = 5, truncate = False, vertical = True)

-RECORD 0----------------------------------------
 Location        | 105 5th Av S                  
 Type            | Triaged Incident              
 Datetime        | 03/15/2020 12:16:00 AM        
 Latitude        | 47.601643                     
 Longitude       | -122.327656                   
 Report Location | POINT (-122.327656 47.601643) 
 Incident Number | F200026453                    
-RECORD 1----------------------------------------
 Location        | Nw 59th St / 15th Av Nw       
 Type            | Triaged Incident              
 Datetime        | 10/11/2019 11:34:00 PM        
 Latitude        | 47.67163                      
 Longitude       | -122.376212                   
 Report Location | POINT (-122.376212 47.67163)  
 Incident Number | F190108891                    
-RECORD 2----------------------------------------
 Location        | 1431 Minor Av                 
 Type            | Aid Response                  
 Datetime        | 12/27/2019 01:24:00 AM        


In [30]:
df.columns

['Address',
 'Type',
 'Datetime',
 'Latitude',
 'Longitude',
 'Report Location',
 'Incident Number']

### Using SQL

To use SQL queries directly with the dataframe, we  need to register it to a temporary view

In [31]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("fire_incidents")

In [32]:
sql_results = spark.sql("SELECT * FROM fire_incidents")

In [34]:
sql_results.show()

+--------------------+--------------------+--------------------+---------+-----------+--------------------+---------------+
|             Address|                Type|            Datetime| Latitude|  Longitude|     Report Location|Incident Number|
+--------------------+--------------------+--------------------+---------+-----------+--------------------+---------------+
|        105 5th Av S|    Triaged Incident|03/15/2020 12:16:...|47.601643|-122.327656|POINT (-122.32765...|     F200026453|
|Nw 59th St / 15th...|    Triaged Incident|10/11/2019 11:34:...| 47.67163|-122.376212|POINT (-122.37621...|     F190108891|
|       1431 Minor Av|        Aid Response|12/27/2019 01:24:...|47.613247|-122.327187|POINT (-122.32718...|     F190136279|
|         2209 1st Av|        Aid Response|03/15/2020 12:19:...|47.612669|-122.345319|POINT (-122.34531...|     F200026454|
|         1402 3rd Av|     Rescue Elevator|05/30/2019 11:22:...|47.608766|-122.336894|POINT (-122.33689...|     F190056035|
|       

## Using SQL syntax

In [35]:
spark.sql("SELECT * FROM fire_incidents WHERE Type = 'Aid Response'").show(n = 5, truncate = False, vertical = True)

-RECORD 0----------------------------------------
 Address         | 1431 Minor Av                 
 Type            | Aid Response                  
 Datetime        | 12/27/2019 01:24:00 AM        
 Latitude        | 47.613247                     
 Longitude       | -122.327187                   
 Report Location | POINT (-122.327187 47.613247) 
 Incident Number | F190136279                    
-RECORD 1----------------------------------------
 Address         | 2209 1st Av                   
 Type            | Aid Response                  
 Datetime        | 03/15/2020 12:19:00 AM        
 Latitude        | 47.612669                     
 Longitude       | -122.345319                   
 Report Location | POINT (-122.345319 47.612669) 
 Incident Number | F200026454                    
-RECORD 2----------------------------------------
 Address         | 500 17th Av                   
 Type            | Aid Response                  
 Datetime        | 05/30/2019 11:23:00 AM        


## Missing and Null data

In [36]:
# Columns with missing data
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+-------+----+--------+--------+---------+---------------+---------------+
|Address|Type|Datetime|Latitude|Longitude|Report Location|Incident Number|
+-------+----+--------+--------+---------+---------------+---------------+
|      0|   0|       0|       0|        0|              0|              0|
+-------+----+--------+--------+---------+---------------+---------------+



In [37]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+-------+----+--------+--------+---------+---------------+---------------+
|Address|Type|Datetime|Latitude|Longitude|Report Location|Incident Number|
+-------+----+--------+--------+---------+---------------+---------------+
|     57|   0|       0|     553|      553|            529|              0|
+-------+----+--------+--------+---------+---------------+---------------+



In [38]:
# Address Null
df.where(col("Address").isNull()).show()

+-------+--------------------+--------------------+--------+---------+---------------+---------------+
|Address|                Type|            Datetime|Latitude|Longitude|Report Location|Incident Number|
+-------+--------------------+--------------------+--------+---------+---------------+---------------+
|   null|    Triaged Incident|12/27/2019 07:20:...|    null|     null|           null|     F190136329|
|   null|    Triaged Incident|12/27/2019 07:12:...|    null|     null|           null|     F190136323|
|   null|    Triaged Incident|12/27/2019 07:13:...|    null|     null|           null|     F190136324|
|   null|    Triaged Incident|12/27/2019 07:32:...|    null|     null|           null|     F190136333|
|   null|    Triaged Incident|12/27/2019 07:33:...|    null|     null|           null|     F190136336|
|   null|    Triaged Incident|12/27/2019 07:15:...|    null|     null|           null|     F190136326|
|   null|LINK - Link Contr...|08/31/2019 10:07:...|    null|     null|   

In [39]:
# Total rows in dataframe
df.count()

1519478

In [40]:
# Rows after droping null values
df.na.drop(subset=["Address",'Latitude','Longitude','Report Location']).count()

1518915

In [41]:
# Drop rows with null values
df_clean = df.na.drop(subset=["Address",'Latitude','Longitude','Report Location'])

In [42]:
df_clean.count()

1518915

## Groupby and Aggregate Functions

In [43]:
df.groupBy("Type")

<pyspark.sql.group.GroupedData at 0x122719a50>

In [44]:
df.groupBy("Type").count().show()

+--------------------+-----+
|                Type|count|
+--------------------+-----+
|  Mutual Aid- Marine|    1|
|    Rescue Saltwater|   41|
|     3RED - 1 +1 + 1| 1156|
|        Chimney Fire|  521|
|        Call on hold|    4|
|      Tunnel Standby|   10|
|           MCI Major|    3|
|Aid Response Freeway| 1431|
|Medic Response, 7...|17313|
|Scenes Of Violenc...|   36|
|     Auto Fire Alarm|86979|
|Natur Gas Leak In...|    9|
|Mutual Aid- Task ...|    5|
|Motor Vehicle Inc...|   64|
|         Aid Service|  175|
|Natur Gas Leak In...|    5|
|    Aircraft Standby|    7|
|Investigate In Se...|  889|
|Hazardous Mat, Sp...|  237|
|Boat Under 50' Fi...|   47|
+--------------------+-----+
only showing top 20 rows



In [45]:
df.select(countDistinct("Type")).show()

+--------------------+
|count(DISTINCT Type)|
+--------------------+
|                 230|
+--------------------+



## Orderby on dataframe with no null values

In [46]:
df_clean.orderBy("Latitude").show(n=20) #Ascending

+--------------------+--------------------+--------------------+---------+-----------+--------------------+---------------+
|             Address|                Type|            Datetime| Latitude|  Longitude|     Report Location|Incident Number|
+--------------------+--------------------+--------------------+---------+-----------+--------------------+---------------+
|   16399 Se 391st St|    Triaged Incident|06/03/2020 01:20:...|47.251232|-122.122934|POINT (-122.12293...|     V200054734|
|       1211 Alder Ct|        Aid Response|01/27/2004 08:53:...|47.257397|-122.314562|POINT (-122.31456...|     V040009360|
|    1914 Virginia Ct|        Aid Response|12/24/2003 11:30:...|47.258946|-122.305833|POINT (-122.30583...|     F030097654|
|      344 50th St Se|    Triaged Incident|12/12/2019 06:08:...| 47.26188|-122.225729|POINT (-122.22572...|     V190131076|
|      8329 44th Av S|Automatic Fire Al...|05/22/2004 11:30:...|47.262472|-122.279823|POINT (-122.27982...|     V040049648|
|       

In [47]:
df_clean.orderBy(df_clean["Latitude"].desc()).show() #Descending

+--------------------+--------------------+--------------------+---------+-----------+--------------------+---------------+
|             Address|                Type|            Datetime| Latitude|  Longitude|     Report Location|Incident Number|
+--------------------+--------------------+--------------------+---------+-----------+--------------------+---------------+
|20315 Greenwood Av N|   Single Medic Unit|11/18/2010 02:37:...|47.776695|-122.355919|POINT (-122.35591...|     B100106494|
|20315 Greenwood Av N|        Aid Response|12/05/2010 10:50:...|47.776695|-122.355919|POINT (-122.35591...|     B100112370|
|     420 Nw 203rd St|   Mutual Aid, Medic|12/06/2006 07:21:...|47.776643|-122.363061|POINT (-122.36306...|     B060118580|
|     420 Nw 203rd St|   Mutual Aid, Medic|07/16/2006 07:45:...|47.776643|-122.363061|POINT (-122.36306...|     B060068467|
|    1849 Nw 201st St|  Mutual Aid- Ladder|08/15/2019 07:35:...|47.775249|-122.380734|POINT (-122.38073...|     B190087193|
|    220