
### DEND Capstone Project
#### ETL Pipeline - Immigration and Temperature Data

##### Project Summary
This capstone project goal is to create an ETL pipeline that combines I94 immagrating data, city temperature data and demographics data into FACT table(star schema is applied because it is a simple model), that is optimized for queries to analyze immigration events. These optimized data will then be used to answer questions regarding immigration behavior based on location temperatures and population capacity.

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Packages used.
import re
import pandas as pd
import os
import glob
from datetime import datetime, timedelta
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, year, month, avg, round, dayofweek, weekofyear, isnull
from pyspark.sql.types import StringType, IntegerType

### Step 1: Scope the Project and Gather Data

### Scope
First dimension table is the aggregation of the I94 immigration data by destination city. 
Second dimension table is the aggregation of city temperature data by city.
Third dimension table is the aggregation of demographics data by city.
Fact table is the joining of above dimensions on destination city.
Finally, we will query those data to determine whether the selection of destination cities is based on the temperature and population capacity or not. (Spark is used to process the data)
### Describe and Gather Data
I94 Immigration Data: This data comes from the US National Tourism and Trade Office ( in SAS database binary format). [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from.
### Key Notes:

* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination USA city
* arrdate = arrival date in the USA
* i94mode = 1 digit travel code
* depdate = departure date from the USA
* i94visa = reason for immigration

**World Temperature Data:** This dataset came from Kaggle (comma-separated values file). You can read more about it [HERE](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

### Key Notes:

* AverageTemperature = average temperature
* City = city name
* Country = country name
* Latitude= latitude
* Longitude = longitude

**U.S. City Demographic Data:** This data comes from OpenSoft (comma-separated values file). You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
### Key Notes:

* Total Population: number of total city population 
* City: city name.

In [2]:
# Spark Session .. 
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [3]:
# Immigration Data
i94_files = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
i94_fname = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
immigration_df = spark.read.format("com.github.saurfang.sas.spark").load(i94_fname)

In [4]:
immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [None]:
# Deomgraphics Data .. 
demographics_file_name = "us-cities-demographics.csv"
demographics_df = spark.read.format("csv").option("delimiter", ";").option("header", "true").load(demographics_file_name)

In [None]:
demographics_df.limit(5).toPandas()

In [5]:
# Temperature Data
temperature_fname = "../../data2/GlobalLandTemperaturesByCity.csv"
temperature_df = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(temperature_fname)

In [6]:
temperature_df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### Step 2: Explore and Assess the Data
#### Explore the Data 

**I94 immigration data** - Removing the destination city code i94port that is not valid or null.
I94_SAS_Labels_Description.SAS shows all valid and invalid codes.

**Temperature Data** - Removing NULL AverageTemperature, duplicate locations; add iport94 code based on city name.

**Demographics Data** - Removing duplicate cites; add iport94 code based on city name.

In [7]:
# Performing cleaning tasks here


# Create dictionary of valid i94port codes
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
valid_i94port_dict = {}
with open('valid_i94ports.txt') as f:
     for line in f:
         port = re_obj.search(line)
         valid_i94port_dict[port[1]]=[port[2]]

In [8]:
# Clean Immigration Data
def clean_i94_dataframe(i94_dataframe):
    """
    Input: I94 immigration dataframe
    Output: I94 immigration dataframe valid i94port
    """      
    # Remove invalid i94port
    immigration_df_filter = immigration_df.filter(i94_dataframe.i94port.isin(list(valid_i94port_dict.keys())))

    return immigration_df_filter

In [9]:
# Filtering i94 data .. 
immi94_df = clean_i94_dataframe(immigration_df)
immi94_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296,F1
1,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93,B2
2,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199,B2
3,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199,B2
4,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,...,,M,1959.0,09302016,,,AZ,92471040000.0,602,B1


In [10]:
# clean Temp. data where average temperature is null ... 
temperature_df_filtered = temperature_df.filter(temperature_df.AverageTemperature.isNotNull())
temperature_df_filtered.show(3)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01|5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|            10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 3 rows



In [11]:
# Remove duplicate locations
temperature_df = temperature_df_filtered.dropDuplicates(['City', 'Country'])

In [12]:
temperature_df_filtered.count()

8235082

In [13]:
temperature_df.show(3)

+----------+--------------------+-----------------------------+---------+-------------+--------+---------+
|        dt|  AverageTemperature|AverageTemperatureUncertainty|     City|      Country|Latitude|Longitude|
+----------+--------------------+-----------------------------+---------+-------------+--------+---------+
|1743-11-01|               3.264|                        1.665|Allentown|United States|  40.99N|   74.56W|
|1779-11-01|0.011999999999999985|                        2.714|   Atyrau|   Kazakhstan|  47.42N|   50.92E|
|1825-01-01|  26.069000000000003|                         2.16|  Bintulu|     Malaysia|   2.41N|  113.30E|
+----------+--------------------+-----------------------------+---------+-------------+--------+---------+
only showing top 3 rows



In [None]:
demographics_df.count()

In [None]:
# Droping demo duplicates city .. 
demographics_df = demographics_df.dropDuplicates(['City'])

In [None]:
demographics_df.count()

In [14]:
@udf()
def get_imm94port(city):
    """
    Input: City name
    
    Output: Valid i94port
    
    """
    
    for key in valid_i94port_dict:
        if city.lower() in valid_i94port_dict[key][0].lower():
            return key

In [15]:
# Add iport94 code based on city name
temperature_df = temperature_df.withColumn("i94port", get_imm94port(temperature_df.City))

In [16]:
temperature_df.show(3)

+----------+--------------------+-----------------------------+---------+-------------+--------+---------+-------+
|        dt|  AverageTemperature|AverageTemperatureUncertainty|     City|      Country|Latitude|Longitude|i94port|
+----------+--------------------+-----------------------------+---------+-------------+--------+---------+-------+
|1743-11-01|               3.264|                        1.665|Allentown|United States|  40.99N|   74.56W|   null|
|1779-11-01|0.011999999999999985|                        2.714|   Atyrau|   Kazakhstan|  47.42N|   50.92E|   null|
|1825-01-01|  26.069000000000003|                         2.16|  Bintulu|     Malaysia|   2.41N|  113.30E|   null|
+----------+--------------------+-----------------------------+---------+-------------+--------+---------+-------+
only showing top 3 rows



In [17]:
temperature_df.count()

3490

In [18]:
# Remove iport94 code null values 
temperature_df = temperature_df.filter(temperature_df.i94port.isNotNull())

In [19]:
temperature_df.show(2)

+----------+------------------+-----------------------------+-------+-------------+--------+---------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|i94port|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+-------+
|1852-07-01|            15.488|                        1.395|  Perth|    Australia|  31.35S|  114.97E|    PER|
|1828-01-01|            -1.977|                        2.551|Seattle|United States|  47.42N|  121.97W|    SEA|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+-------+
only showing top 2 rows



In [None]:
# Add iport94 code to demo data based on city name
demographics_df = demographics_df.withColumn("i94port", get_imm94port(demographics_df.City))

In [None]:
demographics_df.limit(3).toPandas()

In [None]:
# Remove iport94 code null values 
demographics_df = demographics_df.filter(demographics_df.i94port.isNotNull())

In [None]:
# Renaming column without space ... 
demographics_df = demographics_df.withColumnRenamed('Total Population', 'total_population')
demographics_df.limit(5).toPandas()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

##### Star Schema chosen because it makes queries simpler and easier to perform through the fact table.
**Fact Table - This will contain information from the I94 immigration data joined with the city temperature data and demographic data on i94port**

Columns:

* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination city
* arrdate = arrival date
* i94mode = 1 digit travel code
* depdate = departure date
* i94visa = reason for immigration
* AverageTemperature = average temperature of destination city

**1st Dimension Table - This will contain events from the I94 immigration data.**

Columns:

* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination city
* arrdate = arrival date
* i94mode = 1 digit travel code
* depdate = departure date
* i94visa = reason for immigration

**2nd Dimension Table - This will contain city temperature data.**

Columns:

* i94port = 3 character code of destination city (mapped from cleaned up immigration data)
* AverageTemperature = average temperature
* City = city name
* Country = country name
* Latitude= latitude
* Longitude = longitude

**3rd Dimension Table - This will contain demographics data.**

Columns:

* i94port = 3 character code of destination city (mapped from cleaned up immigration data)
* Total Population = number of total city populaiton.
* City = city name



#### 3.2 Mapping Out Data Pipelines
* List the steps necessary to pipeline the data into the chosen data model.

**Pipeline Steps:**

1. Cleaning I94 data (already cleaned above).
2. Cleaning temperature (already cleaned above).
3. Cleaning demographics (already cleaned above).
4. Creating immigration dimension table by extracting columns from immigration dataframe and write to parquet file partitioned by i94port.
5. Creating temperature dimension table by extracting columns from temperature and write to parquet file partitioned by i94port.
6. Creating demographic dimension table by extracting columns from demographic and write to parquet file partitioned by i94port.
7. Creating fact table by joining immigration, temperature and demographics dimension tables on i94port and write to parquet file partitioned by i94port.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### Step 1:

In [20]:
# Columns needed
immigration_columns = ["i94port", "i94mon", "i94cit", "arrdate", "i94yr", "i94mode", "depdate", "i94visa"]

In [21]:
# Extract columns for immigration dimension table
immi94_table = immi94_df.select(immigration_columns)

In [22]:
# Partitioning immigration dimension table to parquet files by i94port
immi94_table.write.mode("append").partitionBy(immigration_columns[0]).parquet("/results/immigration.parquet")

#### Step 2:

In [23]:
temperature_columns = ["i94port", "AverageTemperature", "City", "Country", "Latitude", "Longitude"]

In [None]:
# Extract columns for temperature dimension table
temperature_table = temperature_df.select(temperature_columns)

In [24]:
# Partitioning temperature dimension table to parquet files by i94port
temperature_table.write.mode("append").partitionBy(temperature_columns[0]).parquet("/results/temperature.parquet")

#### Step 3:


In [None]:
demographics_columns = ["i94port", "total_population", "City"]

In [None]:
# Extract columns for demographics dimension table
demographics_table = demographics_df.select(demographics_columns)

In [None]:
# Partitioning temperature dimension table to parquet files by i94port
demographics_table.write.mode("append").partitionBy(demographics_columns[0]).parquet("/results/demographics.parquet")

#### Step 4:


In [None]:
temp_views = ["immi94_temp_view", "temperature_temp_view", "demographic_temp_view"]

In [25]:
# Immigration temporary view.
immi94_df.createOrReplaceTempView(temp_views[0])

In [None]:
# Temperature temporary view.
temperature_df.createOrReplaceTempView(temp_views[1])

In [None]:
# Temperature temporary view.
demographics_df.createOrReplaceTempView(temp_views[2])

In [None]:
joining_query = """
SELECT  
        demographic_temp_view.total_population as total_population,
        immi94_temp_view.i94port as i94port,
        immi94_temp_view.i94yr as year,
        immi94_temp_view.i94mon as month,
        immi94_temp_view.i94cit as city,
        immi94_temp_view.arrdate as arrival_date,
        immi94_temp_view.depdate as departure_date,
        immi94_temp_view.i94visa as reason,
        temperature_temp_view.AverageTemperature as temperature,
        temperature_temp_view.Latitude as latitude,
        temperature_temp_view.Longitude as longitude
        
FROM immi94_temp_view

JOIN temperature_temp_view ON (immi94_temp_view.i94port = temperature_temp_view.i94port)
JOIN demographic_temp_view ON (immi94_temp_view.i94port = demographic_temp_view.i94port)
"""

In [26]:
# Joining immigration and temperature views into Fact Table.
fact_table = spark.sql(joining_query)

In [27]:
# Writing fact table into parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [28]:
def data_quality_check(df, title):
    '''
    Input: Dataframe(Spark), dataframe title.
    Output: Print whether data quality pass or fail with records counted.
    '''
    
    result = df.count()
    if result == 0:
        print("FAILED!! 0 records found in {} table".format(title))
    else:
        print("PASSED... {} records in {} table".format(result, title))

In [29]:
# Perform quality checks here
data_quality_check(df_immigration, "immigration")

PASSED... 3088544 records in immigration table


In [30]:
data_quality_check(temperature_df, "temperature")

PASSED... 207 records in temperature table


In [None]:
data_quality_check(demographics_df, "demographics")

#### 4.3 Data dictionary 
**Fact Table** - Joining the I94 immigration data, city temperature data and demographics data on i94port codes.

Columns:

* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination city
* arrdate = arrival date
* i94mode = 1 digit travel code
* depdate = departure date
* i94visa = reason for immigration
* AverageTemperature = average temperature of destination city

**1st Dimension Table** - The I94 immigration data events.

Columns:

* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination city
* arrdate = arrival date
* i94mode = 1 digit travel code
* depdate = departure date
* i94visa = reason for immigration

**2nd Dimension Table** - The city temperature data.

Columns:

* i94port = destination city code (mapped from cleaned up immigration data)
* AverageTemperature = average temperature
* City = city name
* Country = country name
* Latitude= latitude
* Longitude = longitude

**3rd Dimension Table** - The demographics data.

Columns:

* i94port = destination city code (mapped from cleaned up immigration data)
* Total Population = number of total city population.
* City = city name


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
For this project, we used Spark since it can easily handle multiple file formats (SAS, csv, etc) that contain large amounts of data. Spark SQL was used to process the input files into dataframes and manipulated via standard SQL join operations to create the tables.
* Propose how often the data should be updated and why.
Since the format of the raw files are monthly, we should continue pulling the data monthly.
### Scenarios
* Write a description of how you would approach the problem differently under the following scenarios:
1. the data was increased by 100x.
    - Load data into Amazon Redshift: It is an analytical database that is optimized for aggregation and read-heavy workloads
2. The data populates a dashboard that must be updated on a daily basis by 7am every day.
    - Using Airflow, create DAG retries or send emails on failures.
    - Have daily quality checks; if fail, send emails to operators and freeze dashboards
3. The database needed to be accessed by 100+ people.
    - Use Redshift since it has auto-scaling capabilities and good read performance