# Temperature and US Immigration Data ETL Pipeline
### Data Engineering Capstone Project

#### Project Summary
I94 Immigration data and city temperature data will be used to create a database that is optimized to query and analyze immigration events. An ETL pipeline is to be build with these to data sources to create the database. Finally, the database will be used to access immigration behaviour to location temperatures.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import re
import psycopg2
from collections import defaultdict
from datetime import datetime, timedelta
from pyspark.sql.functions import udf

### Step 1: Scope the Project and Gather Data

#### Scope 
We would be creating 2 dimension tables and 1 fact table. I94 immigration data is to be aggregated by destination city, then temperature data is to be aggregated by city. These would be the 2 fact tables. Both these tables will be joined on destination city to form fact table. A final database will be created to query on immigration events to determine if temperature affects the selection of destination cities for immigration.

#### Describe and Gather Data 
I94 immigration data comes from the [US National Tourism and Trade Office website](https://travel.trade.gov/research/reports/i94/historical/2016.html). It is provided in SAS7BDAT format which is a binary database storage format.

The temperature data is a Kaggle data set that includes temperatures in cities around the world. It can be found here: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data

#### Key Notes
* i94yr = 4 digit year,
* i94mon = numeric month,
* i94cit = 3 digit code of origin city,
* i94port = 3 character code of destination USA city,
* arrdate = arrival date in the USA,
* i94mode = 1 digit travel code,
* depdate = departure date from the USA,
* i94visa = reason for immigration, The temperature data set comes from [Kaggle](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). It is in csv format.

#### Key Notes
* AverageTemperature = average temperature,
* City = city name,
* Country = country name,
* Latitude= latitude,
* Longitude = longitude

#### Immigration Data

In [6]:
# Read in the data here
immigration = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immigration = pd.read_sas(immigration, 'sas7bdat', encoding="ISO-8859-1")

In [7]:
df_immigration.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


#### Temperature Data

In [8]:
# Read in the temperature data
temperature_data = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature_data = pd.read_csv(temperature_data, sep=',')

In [5]:
# Display first 5 rows of df_temp
df_temperature_data.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [12]:
# Create Spark session with SAS7BDAT jar
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 2: Explore and Assess the Data
#### Explore the Data 
**I94 immigration data** - drop all data points with the destination city code i94port is not a valid value like (XXX, 99, NaN, etc). This is described in I94_SAS_Labels_Description.SAS

**Temperature Data** - drop all data points where AverageTemperature is NaN, duplicate locations, and add the i94port of the location in each entry.

In [13]:
# Dictionary of valid i94port codes is created
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94portvalid = {}
with open('i94port.txt') as f:
     for data in f:
         match = re_obj.search(data)
         i94portvalid[match[1]]=[match[2]]

In [14]:
# Clean immigration data
def clean_immigration_data(file):
    '''    
    Input: Path to immigration data file
    Output: Spark dataframe of immigration data with valid i94port
    '''    
    # Read I94 data into Spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter out entries where i94port is invalid
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(i94portvalid.keys())))

    return df_immigration

In [15]:
# Test function
immigration_test_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
df_immigration_test = clean_immigration_data(immigration_test_file)
df_immigration_test.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| null|      G|   null|      Y|   null| 1991.0|     D/S|     M|  null|   null|  3.73679633E9|00296|      F1|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    nul

In [16]:
# Clean temperature data
df_temperature_data = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

In [17]:
# Filter out data points with NaN average temperature
df_temperature_data = df_temperature_data.filter(df_temperature_data.AverageTemperature != 'NaN')

In [18]:
@udf()
def get_i94port(city):
    '''
    Input: City name 
    Output: Corresponding i94port
    '''
    
    for key in i94portvalid:
        if city.lower() in i94portvalid[key][0].lower():
            return key

In [19]:
# Add iport94 code based on city name
df_temperature_data = df_temperature_data.withColumn("i94port", get_i94port(df_temperature_data.City))
df_temperature_data.show()

+----------+-------------------+-----------------------------+-----+-------+--------+---------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|i94port|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+-------+
|1743-11-01|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-04-01| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-05-01|             10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-06-01| 14.050999999999998|                        1.347|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-07-01|             16.082|                        1.396|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-09-01| 12.780999999999999|                        1.454|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-10-01|               7.95|                         1.63|År

In [20]:
# Remove data points with no iport94 code
df_temperature_data = df_temperature_data.filter(df_temperature_data.i94port != 'null')

In [22]:
# Show results
df_temperature_data.show()

+----------+------------------+-----------------------------+--------+--------------+--------+---------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|    City|       Country|Latitude|Longitude|i94port|
+----------+------------------+-----------------------------+--------+--------------+--------+---------+-------+
|1743-11-01|             8.758|                        1.886|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|1744-04-01|6.0699999999999985|           2.9339999999999997|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|1744-05-01|             7.751|                        1.494|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|1744-06-01|             10.62|                        1.574|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|1744-07-01|             12.35|                        1.591|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|1744-09-01|            11.224|           1.6059999999999999|Aberdeen|United Kingdom|  57.05N|  

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
**Fact Table** - I94 immigration data joined with the city temperature data on i94port, Columns:

* i94yr = 4 digit year,
* i94mon = numeric month,
* i94cit = 3 digit code of origin city,
* i94port = 3 character code of destination USA city,
* arrdate = arrival date in the USA,
* i94mode = 1 digit travel code,
* depdate = departure date from the USA,
* i94visa = reason for immigration,
* AverageTemperature = average temperature of destination city

**Dimension Table** - I94 immigration data Events Columns:

* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination USA city
* arrdate = arrival date in the USA
* i94mode = 1 digit travel code
* depdate = departure date from the USA
* i94visa = reason for immigration

**Dimension Table** - temperature data Columns:

* i94port = 3 character code of destination city (mapped from cleaned up immigration data)
* AverageTemperature = average temperature
* City = city name
* Country = country name
* Latitude= latitude
* Longitude = longitude

#### 3.2 Mapping Out Data Pipelines
Pipeline Steps:
 1. Clean I94 data as described in step 2 to create Spark dataframe df_immigration_test for each month.
 2. Clean temperature data as described in step 2 to create Spark dataframe df_temperature_data (this was already performed).
 3. Create immigration dimension table by selecting relevant columns from df_immigration and write to parquet file partitioned by i94port.
 4. Create temperature dimension table by selecting relevant columns from df_temperature_data and write to parquet file partitioned by i94port.
 5. Create fact table by joining immigration and temperature dimension tables on i94port and write to parquet file partitioned by i94port.
    
    

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [23]:
# Path to I94 immigration data 
immigration_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

In [24]:
# Clean I94 immigration data and store as Spark dataframe
df_immigration = clean_immigration_data(immigration_data)

In [25]:
# Extract columns for immigration dimension table
immigration_table = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

In [26]:
# Write immigration dimension table to parquet files partitioned by i94port
immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [None]:
# Extract columns for temperature dimension table
temp_table = df_temperature_data.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

In [None]:
# Write temperature dimension table to parquet files partitioned by i94port
temp_table.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [None]:
# Create temporary views of the immigration and temperature data
df_immigration.createOrReplaceTempView("immigration_view")
df_temperature_data.createOrReplaceTempView("temperature_view")

In [None]:
# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql('''
select immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temperature_view.AverageTemperature as temperature,
       temperature_view.Latitude as latitude,
       temperature_view.Longitude as longitude
from immigration_view
JOIN temperature_view ON (immigration_view.i94port = temperature_view.i94port)
''')

In [None]:
# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here
def quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram
    Output: Print outcome of data quality check
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

In [None]:
quality_check(df_immigration, "immigration table")
quality_check(df_temperature_data, "temperature table")

#### 4.3 Data dictionary 
**Fact Table** - I94 immigration data joined with the city temperature data on i94port Columns:

* i94yr = 4 digit year,
* i94mon = numeric month,
* i94cit = 3 digit code of origin city,
* i94port = 3 character code of destination USA city,
* arrdate = arrival date in the USA,
* i94mode = 1 digit travel code,
* depdate = departure date from the USA,
* i94visa = reason for immigration,
* AverageTemperature = average temperature of destination city

**Dimension Table** - I94 immigration data Events Columns:

* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination USA city
* arrdate = arrival date in the USA
* i94mode = 1 digit travel code
* depdate = departure date from the USA
* i94visa = reason for immigration

**Dimension Table** - temperature data Columns:

* i94port = 3 character code of destination city (mapped from cleaned up immigration data)
* AverageTemperature = average temperature
* City = city name
* Country = country name
* Latitude= latitude
* Longitude = longitude

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
 1. I used Spark since it can easily handle multiple file formats (SAS, csv, etc) that contain large amounts of data. Spark SQL was used to process the input files into dataframes and manipulated via standard SQL join operations to create the tables.
 
* Propose how often the data should be updated and why.
 1. Since the format of the raw files are monthly, we should continue pulling the data monthly.


### Scenarios
* Write a description of how you would approach the problem differently under the following scenarios:
 1. The data was increased by 100x.
  * Using Amazon Redshift would be fast and reliable option as it is an analytical database that is advanced for clustering and read-heavy workloads and above given solution would work first-class.
 2. The data populates a dashboard that must be updated on a daily basis by 7am every day.
  * A tool named Apache Airflow has to be used to rationally schedule and audit the whole processes like creating  DAG retries or send emails on failures.
 3. The database needed to be accessed by 100+ people.
  * A cloud based datawarehouse such as Amazon Redshift can help us here since it is very efficient data modelling technique and it holds auto-scaling capacity and good read execution.