# Capstone Project: ETL Pipeline for Fire Department Calls and Temperature Data

## Project Summary
The goal of this project: 
* Download San Francisco Fire department Calls-For-Service data and globle land city temperature data from Kaggle and load raw dataset to AWS S3, 
* Create an ETL pipeline that extracts data, processes data using Spark, and loads the data back into set of dimensional tables in S3. <br>

This will allow analytics team to find insights of the fire calls-for-service.e.g.
* What were the common types of fire calls?<br> 
* Which months within the year 2019 saw for the highest number of fire calls? <br>
* How to improve in order to reduce the fire incident?<br>
* Does temperature affect the fire incidents?<br>


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data<br>
* Step 2: Explore and Assess the Data<br>
* Step 3: Define the Data Model<br>
* Step 4: Run ETL to Model the Data<br>
* Step 5: Complete Project Write Up<br>

## Import Modules

In [20]:
# Do all imports and installs here
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

from pyspark.sql.types import *
import pyspark.sql.functions as F

import configparser
import os

## Create Spark Session

In [21]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

## Set AWS Credentials

In [None]:
config = configparser.ConfigParser()

config.read_file(open('aws/dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

## Step 1: Scope the Project and Gather Data

### Scope
In this project, we will aggregate fire call data by CallType to form first dimension table. Next we will aggregate city temperature data by year and month to form the second dimension table. The two datasets will be joined on city, year and month to form the fact table. The final database is optimized to query on fire call events to determine if temperature affects the number of fire incidents. Spark will be used to process the data.

 
### Describe and Gather Data
**Data Source 1** <br>
San Francisco Fire Calls-For-Service <br>
https://www.kaggle.com/san-francisco/sf-fire-data-incidents-violations-and-more
    
**Content** <br>
Fire Calls-For-Service includes all fire units responses to calls. Each record includes the call number, incident number, address, unit identifier, call type, and disposition. All relevant time intervals are also included.

**Data Source 2**<br>

https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data?select=GlobalLandTemperaturesByCity.csv
The temperature data comes from Kaggle. It is provided in csv format. Some relevant attributes include:

**Content** <br>
AverageTemperature = average temperature
City = city name
Country = country name
Latitude= latitude
Longitude = longitude

### Read in fire calls data

In [22]:
# Read in the fire calls data 
# input_path = "input/fire-department-calls-for-service.csv"

input_path = "s3a://udacity-dend-capstone-project/firecall_data/"
fire_df =spark.read.csv(input_path, header=True, mode="DROPMALFORMED")
fire_df.count()

5319351

In [65]:
fire_df.printSchema()

root
 |-- Call Number: string (nullable = true)
 |-- Unit ID: string (nullable = true)
 |-- Incident Number: string (nullable = true)
 |-- Call Type: string (nullable = true)
 |-- Call Date: string (nullable = true)
 |-- Watch Date: string (nullable = true)
 |-- Received DtTm: string (nullable = true)
 |-- Entry DtTm: string (nullable = true)
 |-- Dispatch DtTm: string (nullable = true)
 |-- Response DtTm: string (nullable = true)
 |-- On Scene DtTm: string (nullable = true)
 |-- Transport DtTm: string (nullable = true)
 |-- Hospital DtTm: string (nullable = true)
 |-- Call Final Disposition: string (nullable = true)
 |-- Available DtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode of Incident: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- Station Area: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- Original Priority: string (nullable = true)
 |-- Priority: string (nullable = 

### Read in temperature data

In [23]:
# Read in temperature data
# temperature_input_path = "input/GlobalLandTemperaturesByCity.csv"

temperature_input_path = "s3a://udacity-dend-capstone-project/temp_data/"
df_t =spark.read.csv(temperature_input_path, header=True)

In [6]:
df_t.count()

8599212

In [67]:
df_t.show(5, False)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|dt        |AverageTemperature|AverageTemperatureUncertainty|City |Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|6.068             |1.7369999999999999           |Århus|Denmark|57.05N  |10.33E   |
|1743-12-01|null              |null                         |Århus|Denmark|57.05N  |10.33E   |
|1744-01-01|null              |null                         |Århus|Denmark|57.05N  |10.33E   |
|1744-02-01|null              |null                         |Århus|Denmark|57.05N  |10.33E   |
|1744-03-01|null              |null                         |Århus|Denmark|57.05N  |10.33E   |
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



## Step 2: Cleaning the data
For the temperature data,  drop all entries where AverageTemperature is NaN, then add year and month as new columns

In [24]:
# Filter out entries with NaN average temperature
df_t=df_t.filter(df_t.AverageTemperature != 'NaN')
df_t.show(5,False)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|dt        |AverageTemperature|AverageTemperatureUncertainty|City |Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|6.068             |1.7369999999999999           |Århus|Denmark|57.05N  |10.33E   |
|1744-04-01|5.7879999999999985|3.6239999999999997           |Århus|Denmark|57.05N  |10.33E   |
|1744-05-01|10.644            |1.2830000000000001           |Århus|Denmark|57.05N  |10.33E   |
|1744-06-01|14.050999999999998|1.347                        |Århus|Denmark|57.05N  |10.33E   |
|1744-07-01|16.082            |1.396                        |Århus|Denmark|57.05N  |10.33E   |
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



In [10]:
df_t.count()

8235082

In [25]:
# Remove duplicate locations
df_t=df_t.dropDuplicates(['dt', 'City', 'Country'])

In [11]:
df_t.count()

8190783

In [26]:
# Add 2 new columns "year" and "month" to df_t
df_t = df_t.withColumn("year", F.year(df_t.dt)) \
                 .withColumn("month", F.month(df_t.dt))

In [13]:
df_t.show(2, False)

+----------+------------------+-----------------------------+-----------+-------+--------+---------+----+-----+
|dt        |AverageTemperature|AverageTemperatureUncertainty|City       |Country|Latitude|Longitude|year|month|
+----------+------------------+-----------------------------+-----------+-------+--------+---------+----+-----+
|1743-11-01|7.03              |1.611                        |Bielefeld  |Germany|52.24N  |7.88E    |1743|11   |
|1743-11-01|7.318             |1.764                        |Kaliningrad|Russia |55.45N  |19.84E   |1743|11   |
+----------+------------------+-----------------------------+-----------+-------+--------+---------+----+-----+
only showing top 2 rows



In [27]:
# Query temperature info for city "San Francisco"
df_t_san = (df_t.select('dt','year','month', 'Country', 'City', "AverageTemperature").where(df_t.City == "San Francisco")).sort('dt',ascending=False)

In [16]:
df_t_san.show(24)

+----------+----+-----+-------------+-------------+------------------+
|        dt|year|month|      Country|         City|AverageTemperature|
+----------+----+-----+-------------+-------------+------------------+
|2013-09-01|2013|    9|United States|San Francisco|            20.471|
|2013-08-01|2013|    8|United States|San Francisco|19.730999999999998|
|2013-07-01|2013|    7|United States|San Francisco|20.656999999999996|
|2013-06-01|2013|    6|United States|San Francisco|            19.759|
|2013-05-01|2013|    5|United States|San Francisco|            17.434|
|2013-04-01|2013|    4|United States|San Francisco|15.995999999999999|
|2013-03-01|2013|    3|United States|San Francisco|13.505999999999998|
|2013-02-01|2013|    2|United States|San Francisco|            10.229|
|2013-01-01|2013|    1|United States|San Francisco|              8.32|
|2012-12-01|2012|   12|United States|San Francisco|              8.95|
|2012-11-01|2012|   11|United States|San Francisco|            13.367|
|2012-

## Step 3: Define the Data Model


### 3.1 Conceptual Data Model
The first dimension table will contain events from the fire department calls data. The columns below will be extracted from the fire calls dataframe:

>CallNumber<br>
>City<br>
>CallType<br>
>Callyear<br>
>callmonth<br>
>Neighborhooods-AnalysisBoundaries<br>
>received_dt<br>
>available_dt<br>
       
The second dimension table will contain city temperature data. The columns below will be extracted from the temperature dataframe:
>ts<br>
>year<br>
>month<br>
>AverageTemperature <br>
>City <br>
>Country <br>

The fact table will contain information from the fire calls data joined with the city temperature data on City:

>CallNumber<br>
>City<br>
>CallType<br>
>year<br>
>month<br>
>AverageTemperature<br>
>Country<br>
>dt<br>


The tables will be saved to Parquet files

### 3.2 Mapping Out Data Pipelines
The pipeline steps are described below:
    
* Clean temperature calls data as described in step 2 to create Spark dataframe df_temp
* Create fire call dimension table by selecting columns from df_fire and write to parquet file
* Create temperature dimension table by selecting columns from df_temp and write to parquet file partitioned
* Create fact table by joining fire and temperature dimension tables on [city, year, month] and write to parquet file

## Step 4: Run Pipelines to Model the Data 
### 4.1 Create the data model
Build the data pipelines to create the data model.

In [118]:
fire_table = fire_df.select("Call Number", "Call Date", "Call Type" , "City", "Neighborhooods - Analysis Boundaries", "Received DtTm", "Available DtTm" )\
                        .withColumn("year", F.year("Call Date")) \
                        .withColumn("month", F.month("Call Date")) \
                        .withColumnRenamed("Call Number", "callnumber") \
                        .withColumnRenamed("Call Date", "calldate") \
                        .withColumnRenamed("Call Type", "calltype") \
                        .withColumnRenamed("Neighborhooods - Analysis Boundaries", "Neighborhooods") \
                        .withColumnRenamed("Received DtTm", "received_dt") \
                        .withColumnRenamed("Available DtTm", "available_dt") \
                        .distinct()

In [87]:
fire_table.show(5, False)

+-----------+-----------------------+----------------+-------------+------------------------------------+-----------------------+-----------------------+----+-----+
|Call Number|Call Date              |Call Type       |City         |Neighborhooods - Analysis Boundaries|Received DtTm          |Available DtTm         |year|month|
+-----------+-----------------------+----------------+-------------+------------------------------------+-----------------------+-----------------------+----+-----+
|201903333  |2020-07-08T00:00:00.000|Medical Incident|San Francisco|Tenderloin                          |2020-07-08T23:08:23.000|2020-07-08T23:27:53.000|2020|7    |
|201903063  |2020-07-08T00:00:00.000|Structure Fire  |San Francisco|Outer Richmond                      |2020-07-08T20:53:55.000|2020-07-08T21:18:31.000|2020|7    |
|201901433  |2020-07-08T00:00:00.000|Medical Incident|San Francisco|Bayview Hunters Point               |2020-07-08T12:22:43.000|2020-07-08T12:44:13.000|2020|7    |
|201900691

In [91]:
received_timetable = fire_df.select(
                                F.col("Received DtTm").alias("received_time"),
                                F.year("Received DtTm").alias('year'),
                                F.month("Received DtTm").alias('month'),
                                F.dayofmonth("Received DtTm").alias('day'),
                                F.hour("Received DtTm").alias("hour"),
                                F.minute("Received DtTm").alias("minute"),
                                F.second("Received DtTm").alias("second"),
                                F.weekofyear("Received DtTm").alias("week"),
                                F.date_format(F.col("Received DtTm"), "E").alias("weekday")
                                ).distinct()

In [92]:
received_timetable.show(5, False)

+-----------------------+----+-----+---+----+------+------+----+-------+
|received_time          |year|month|day|hour|minute|second|week|weekday|
+-----------------------+----+-----+---+----+------+------+----+-------+
|2020-07-08T16:06:09.000|2020|7    |8  |16  |6     |9     |28  |Wed    |
|2020-07-08T07:55:06.000|2020|7    |8  |7   |55    |6     |28  |Wed    |
|2020-07-08T02:03:35.000|2020|7    |8  |2   |3     |35    |28  |Wed    |
|2020-07-07T16:57:30.000|2020|7    |7  |16  |57    |30    |28  |Tue    |
|2020-07-07T13:00:33.000|2020|7    |7  |13  |0     |33    |28  |Tue    |
+-----------------------+----+-----+---+----+------+------+----+-------+
only showing top 5 rows



In [94]:
available_timetable = fire_df.select(
                                F.col("Available DtTm").alias("available_time"),
                                F.year("Available DtTm").alias('year'),
                                F.month("Available DtTm").alias('month'),
                                F.dayofmonth("Available DtTm").alias('day'),
                                F.hour("Available DtTm").alias("hour"),
                                F.minute("Available DtTm").alias("minute"),
                                F.second("Available DtTm").alias("second"),
                                F.weekofyear("Available DtTm").alias("week"),
                                F.date_format(F.col("Available DtTm"), "E").alias("weekday")
                                ).distinct()

In [95]:
available_timetable.show(5, False)

+-----------------------+----+-----+---+----+------+------+----+-------+
|available_time         |year|month|day|hour|minute|second|week|weekday|
+-----------------------+----+-----+---+----+------+------+----+-------+
|2020-07-08T20:54:28.000|2020|7    |8  |20  |54    |28    |28  |Wed    |
|2020-07-08T19:53:37.000|2020|7    |8  |19  |53    |37    |28  |Wed    |
|2020-07-08T17:52:20.000|2020|7    |8  |17  |52    |20    |28  |Wed    |
|2020-07-08T12:27:52.000|2020|7    |8  |12  |27    |52    |28  |Wed    |
|2020-07-08T12:27:08.000|2020|7    |8  |12  |27    |8     |28  |Wed    |
+-----------------------+----+-----+---+----+------+------+----+-------+
only showing top 5 rows



In [117]:
df_t.show(5,False)

+----------+------------------+-----------------------------+-----------+-------------+--------+---------+----+-----+
|dt        |AverageTemperature|AverageTemperatureUncertainty|City       |Country      |Latitude|Longitude|year|month|
+----------+------------------+-----------------------------+-----------+-------------+--------+---------+----+-----+
|1743-11-01|7.03              |1.611                        |Bielefeld  |Germany      |52.24N  |7.88E    |1743|11   |
|1743-11-01|7.318             |1.764                        |Kaliningrad|Russia       |55.45N  |19.84E   |1743|11   |
|1743-11-01|3.544             |1.764                        |Mulhouse   |France       |47.42N  |8.29E    |1743|11   |
|1743-11-01|18.722            |2.302                        |Orlando    |United States|28.13N  |80.91W   |1743|11   |
|1743-11-01|6.425             |1.6280000000000001           |Remscheid  |Germany      |50.63N  |6.34E    |1743|11   |
+----------+------------------+-------------------------

In [119]:
temperature_table = df_t.select("dt", "year", "month","City", "Country", "AverageTemperature").distinct()

In [129]:
fire_table.printSchema()

root
 |-- callnumber: string (nullable = true)
 |-- calldate: string (nullable = true)
 |-- calltype: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Neighborhooods: string (nullable = true)
 |-- received_dt: string (nullable = true)
 |-- available_dt: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [130]:
temperature_table.printSchema()

root
 |-- dt: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)



In [154]:
# Create the fact table by joining the fire calls and temperature views
fact_joined_table = fire_table.join(temperature_table, ["City", "year", "month"], how='inner')
fire_temp_table = fact_joined_table.select("callnumber", "City", "calltype", "year", "month", "dt","AverageTemperature")

In [155]:
fire_temp_table.show(1, False)

+----------+-------------+----------------+----+-----+----------+------------------+
|callnumber|City         |calltype        |year|month|dt        |AverageTemperature|
+----------+-------------+----------------+----+-----+----------+------------------+
|010120303 |San Francisco|Medical Incident|2001|1    |2001-01-01|8.516             |
+----------+-------------+----------------+----+-----+----------+------------------+



### 4.2 Write to AWS S3 Bucket

In [None]:
# write table to parquet file
# fire_table.write.parquet("/results/firecall.parquet")

output_data  = "s3a://udacity-dend-capstone-project/results" 

received_timetable.write.parquet(output_data + "received_time/" + "received_time.parquet")
available_timetable.write.parquet(output_data + "available_time/" + "available_time.parquet")

fire_table.write.parquet(output_data + "firecalls/" + "firecall.parquet")
temperature_table.write.parquet(output_data + "temp/" + "temperature.parquet")
fire_temp_table.write.parquet(output_data + "fire_temp/" + "fire_temp.parquet")

## 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here


def quality_check(df, description):
    '''
    Input: Spark dataframe
    Output: Print outcome of data quality check
    
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

# Perform data quality check
quality_check(fire_table, "fire calls table")
quality_check(temperature_table, "temperature table")
quality_check(fire_temp_table, "fire calls and temperature joined table")

### 4.3 Data dictionary 
**fire calls table**
* CallNumber: A unique 9-digit number assigned by the 911 Dispatch Center (DEM) to this call. These number are used for both Police and Fire calls.<br>
* City: name of city<br>
* CallType: the type of the call<br>
* year: 4 digit year<br>
* month: numeric month<br>
* Neighborhooods-AnalysisBoundaries : Text, Neighborhood District associated with this address<br>
* received_dt: Date and time of call is received at the 911 Dispatch Center.<br>
* available_dt: Date and time this unit is not longer assigned to this call and it is available for another dispatch.<br>
       
**temperature data table**
* ts: Date and time
* year: 4 digit year<br>
* month:numeric month<br>
* AverageTemperature: Float, average temperature <br>
* City:Name of city<br> <br>

**joined table**

* CallNumber: A unique 9-digit number assigned by the 911 Dispatch Center (DEM) to this call. These number are used for both Police and Fire calls.<br>
* City: name of city<br>
* CallType: Text, type of call
* year:  4 digit year<br>
* month: numeric month<br>
* AverageTemperature: average temperature<br>
* Country:name of country<br>
* dt: Date and time<br>


## Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
Spark was chosen since it can easily handle large amounts of data. <br> <br>

* Propose how often the data should be updated and why.<br>
The data should be updated monthly.<br> <br> 

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.<br>
 Use AWS EMR + S3 if the data was increased by 100x  <br><br>

 * The data populates dashboard that must be updated on a daily basis by 7am.<br>
 Use Apache Airflow <br><br>

 * The database needed to be accessed by 100+ people.<br>
 Store parquet files in AWS S3, give read access to users