# Project Title
### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to create an ETL pipeline using I94 immigration data and city temperature data to form a database for the anlysis of immigration events. This database can be used to answer questions relating immigration behavior to location temperatures.


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
# Do all imports and installs here
import pandas as pd
import re
import psycopg2
from collections import defaultdict
from datetime import datetime, timedelta
from pyspark.sql.functions import udf

### Step 1: Scope the Project and Gather Data

#### Scope 

In this project, to form our first dimension table we will aggregate I94 immigration dat by destination city. To from second dimension table we will aggregate city temperature data by city. To form a  fact table, we will join these two datasets on a destination city. The final database will be created to query in immigration events. This will help to determine the temperature effects on the selection of the destinationn cities.
We will use Spark for the data processing.

#### Describe and Gather Data 
US National Tourism and Trade office provides I94 immigration data. The data is in the SAS7BDAT format which is a binary datbase storage format.
The temperature data has been taken from Kaggle. It is in the csv format.


In [5]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [6]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [None]:
# Read in the temperature data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname, sep=',')

In [None]:
# Display first 5 rows of df_temp
df_temp.head()

In [8]:
# Create Spark session with SAS7BDAT jar
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [11]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
For the I94 immigration data - Drop all rows where the destination city code i94port is not a valid value.
Temperature Data - Drop all rows where AverageTemperature is NaN, duplicate locations, and add the i94port of the location in each entry.

In [None]:
# Create dictionary of valid i94port entries
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}
with open('i94port_valid.txt') as f:
     for line in f:
         match = re_obj.search(line)
         i94port_valid[match[1]]=[match[2]]


In [None]:
# Cleaning I94 immigration data
def clean_i94_data(file):
    '''    
    Input: Path to access the I94 immigration data file
    Output: Spark dataframe of I94 immigration data with valid i94port in it
    '''    
    # Read I94 data into Spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter out entries where i94port is invalid
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(i94port_valid.keys())))
    return df_immigration

    immigration_test_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 

    df_immigration_test = clean_i94_data(immigration_test_file)
    df_immigration_test.show()

In [None]:
# Clean temperature data
df_temp = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

# Filter out data points with NaN average temperature in the dataset
df_temp = df_temp.filter(df_temp.AverageTemperature != 'NaN')

# Remove duplicate locations from the dataset
df_temp = df_temp.dropDuplicates(['City', 'Country'])

In [None]:
@udf()
def get_i94port(city):
    '''
    Input: City name, Output: Corresponding i94port
    '''
    
    for key in i94port_valid:
        if city.lower() in i94port_valid[key][0].lower():
            return key

# Add iport94 code based on city name
df_temp=df_temp.withColumn("i94port", get_i94port(df_temp.City))

# Remove data points with no iport94 code
df_temp = df_temp.filter(df_temp.i94port != 'null')

# Show results
df_temp.show()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The fact table will contain information from the I94 immigration data joined with the city temperature data on i94port:
The first dimension table will contain events from the I94 immigration data. The columns below will be extracted from the immigration dataframe:
The second dimension table will contain city temperature data. The columns below will be extracted from the temperature dataframe:
The tables will be saved to Parquet files partitioned by city (i94port)

#### 3.2 Mapping Out Data Pipelines
The pipeline steps are described below:

Clean I94 data as described in step 2 to create Spark dataframe df_immigration for each month
Clean temperature data as described in step 2 to create Spark dataframe df_temp (already performed)
Create immigration dimension table by selecting relevant columns from df_immigration and write to parquet file partitioned by i94port
Create temperature dimension table by selecting relevant columns from df_temp and write to parquet file partitioned by i94port
Create fact table by joining immigration and temperature dimension tables on i94port and write to parquet file partitioned by i94port

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### Step : 1

In [None]:
# Path to the dataset 
immigration_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# Clean I94 immigration data and store as Spark dataframe
df_immigration = clean_i94_data(immigration_data)

# Extract columns for immigration dimension table
immigration_table = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

# Write immigration dimension table to parquet files partitioned by i94port
immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [None]:
#### Step : 2

In [None]:
# Extract columns for temperature dimension table
temp_table = df_temp.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

# Write temperature dimension table to parquet files partitioned by i94port
temp_table.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [None]:
#### Step : 3

In [None]:
# Create temporary views
df_immigration.createOrReplaceTempView("immigration_view")
df_temp.createOrReplaceTempView("temp_view")

In [None]:
# Create the fact table by joining the views of immigration and temperature tables
fact_table = spark.sql('''
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temp_view.AverageTemperature as temperature,
       temp_view.Latitude as latitude,
       temp_view.Longitude as longitude
FROM immigration_view
JOIN temp_view ON (immigration_view.i94port = temp_view.i94port)
''')

In [None]:
# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
def quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram
    Output: Print outcome of data quality check
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

# Perform data quality check
quality_check(df_immigration, "immigration table")
quality_check(df_temp, "temperature table")

#### 4.3 Data dictionary 
Fact Table - This will contain information from the I94 immigration data joined with the city temperature data on i94port
Columns:
i94yr = 4 digit year i94mon = numeric month i94cit = 3 digit code of origin city i94port = 3 character code of destination city arrdate = arrival date i94mode = 1 digit travel code depdate = departure date i94visa = reason for immigration AverageTemperature = average temperature of destination city

1st Dimension Table - This will contain events from the I94 immigration data.
Columns:
i94yr = 4 digit year i94mon = numeric month i94cit = 3 digit code of origin city i94port = 3 character code of destination city arrdate = arrival date i94mode = 1 digit travel code depdate = departure date i94visa = reason for immigration

2nd Dimension Table - This will contain city temperature data.
Columns:
i94port = 3 character code of destination city (mapped from cleaned up immigration data) AverageTemperature = average temperature City = city name Country = country name Latitude= latitude Longitude = longitude

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
Rationale: In this project, we used Spark since it can easily handle multiple file formats (SAS, csv, etc) that contain large amounts of data. To process the input files into data frames we have used Spark SQL and manipulated via standard SQL join operations to create the tables.

* Propose how often the data should be updated and why.
Schedule:  We should update the data monthly. Because the format of the raw files are monthly.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x
 Use redshift to upload it.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 To acheive an automation we can use Airflow.
 * The database needed to be accessed by 100+ people.
 we could consider publishing the parquet files to HDFS and giving read access to users that need it.