# ETL Pipeline for Immigration and Temperature Data
### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to create an ETL pipeline using I94 immigration data and city temperature data to form a database that is optimized for queries on immigration events. This database can be used to answer questions relating immigration behavior to destination temperature e.g., do people tend to immigrate to warmer places?

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd, re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, we will aggregate I94 immigration data by destination city to form our first dimension table. Next we will aggregate city temperature data by city to form the second dimension table. The two datasets will be joined on destination city to form the fact table. The final database is optimized to query on immigration events to determine if temperature affects the selection of destination cities. Spark will be used to process the data.

#### Describe and Gather Data 
The I94 immigration [data](https://travel.trade.gov/research/reports/i94/historical/2016.html) comes from the US National Tourism and Trade Office. It is provided in SAS7BDAT [format](https://cran.r-project.org/web/packages/sas7bdat/vignettes/sas7bdat.pdf) which is a binary database storage format. Some relevant attributes include:

* `i94yr` = 4 digit year
* `i94mon` = numeric month
* `i94cit` = 3 digit code of origin city
* `i94port` = 3 character code of destination USA city
* `arrdate` = arrival date in the USA
* `i94mode` = 1 digit travel code
* `depdate` = departure date from the USA
* `i94visa` = reason for immigration

The temperature [data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) comes from Kaggle. It is provided in csv format. Some relevant attributes include:

* `AverageTemperature` = average temperature
* `City` = city name
* `Country` = country name
* `Latitude`= latitude
* `Longitude` = longitude

In [2]:
# Read June 2016 I94 immigration data into Pandas for exploration
filename_1 = '../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat'
df_sas = pd.read_sas(filename_1, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
df_sas.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4.0,2016.0,6.0,135.0,135.0,XXX,20612.0,,,,...,U,,1957.0,10032016,,,,14938460000.0,,WT
1,5.0,2016.0,6.0,135.0,135.0,XXX,20612.0,,,,...,U,,1966.0,10032016,,,,17460060000.0,,WT
2,6.0,2016.0,6.0,213.0,213.0,XXX,20609.0,,,,...,U,,1989.0,D/S,,,,1679298000.0,,F1
3,7.0,2016.0,6.0,213.0,213.0,XXX,20611.0,,,,...,U,,1993.0,D/S,,,,1140963000.0,,F1
4,16.0,2016.0,6.0,245.0,245.0,XXX,20632.0,,,,...,U,,1992.0,D/S,,,,1934535000.0,,F1


In [4]:
#Temperature data into Pandas for exploration
filename_2= '../../data2/GlobalLandTemperaturesByCity.csv'

df_temp = pd.read_csv(filename_2, sep = ',')

In [5]:
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [6]:
# Create Spark session with SAS7BDAT jar
spark = SparkSession\
.builder \
.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
.enableHiveSupport().getOrCreate()

### Step 2: Explore and Assess the Data
#### Explore and clearning data Steps
For the I94 immigration data, we will drop all entries where the destination city code i94port does not have A valid value (e.g., XXX, 99, etc) in I94_SAS_Labels_Description.SAS. For the temperature data, we will drop all entries where AverageTemperature is NaN, then drop all entries with duplicate locations, and then add the i94port of the location for each entry.

In [7]:
# Performing cleaning tasks here


reg_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}
with open('i94port_valid.txt') as f:
    for line in f:
        match = reg_obj.search(line)
        i94port_valid[match[1]]=[match[2]]
        
def clean_i94_data(file):
    '''
    Input: Path to I94 immigration data file
    
    Output: Spark dataframe of I94 immigration
    data with valid i94port
    '''
    
    #I94 data into spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)
    
    #Filter entries where i94port is not valid
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(i94port_valid.keys())))
    return df_immigration

In [8]:
# Clean temperature data
df_temp=spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

# Filter entries with NaN for average temperature
df_temp=df_temp.filter(df_temp.AverageTemperature != 'NaN')

# Remove duplicate locations
df_temp=df_temp.dropDuplicates(['City', 'Country'])

@udf()
def get_i94port(city):
    '''
    Input: City name
    
    Output: Corresponding i94port
    
    '''
    
    for key in i94port_valid:
        if city.lower() in i94port_valid[key][0].lower():
            return key

# iport94 code based on city name
df_temp=df_temp.withColumn("i94port", get_i94port(df_temp.City))

# Remove entries with 'null' iport94 code
df_temp=df_temp.filter(df_temp.i94port != 'null')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The first dimension table will contain events from the I94 immigration data. The columns below will be extracted from the immigration dataframe:
* `i94yr` = four digit year
* `i94mon` =  numeric month
* `i94cit` = three digit code of origin city
* `i94port` =  three character code of destination city
* `arrdate` = arrival date
* `i94mode` = one digit travel code
* `depdate` = departure date
* `i94visa` = reason for immigration

The second dimension table will contain city temperature data. The columns below will be extracted from the temperature dataframe:

* `i94port` = 3 character code of destination city (mapped from immigration data during cleanup step)
* `AverageTemperature` = average temperature
* `City` = city name
* `Country` = country name
* `Latitude`= latitude
* `Longitude` = longitude

The fact table will contain information from the I94 immigration data joined with the city temperature data on `i94port`:
* `i94yr` = four digit year
* `i94mon` = numeric month
* `i94cit` = three digit code of origin city
* `i94port` = three character code of destination city
* `arrdate` = arrival date
* `i94mode` = one digit travel code
* `depdate` = departure date
* `i94visa` = reason for immigration
* `AverageTemperature` = average temperature of destination city

The tables will be saved to Parquet files partitioned by city (`i94port`). 

#### 3.2 Mapping Out Data Pipelines
The pipeline steps are described below:
1. Clean I94 data as described in step 2 to create Spark dataframe `df_immigration` for each month
2. Clean temperature data as described in step 2 to create Spark dataframe `df_temp` (already performed)
3. Create immigration dimension table by selecting relevant columns from `df_immigration` and write to parquet file partitioned by `i94port`
4. Create temperature dimension table by selecting relevant columns from `df_temp` and write to parquet file partitioned by `i94port`
5. Create fact table by joining immigration and temperature dimension tables on `i94port` and write to parquet file partitioned by `i94port`

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [9]:
# Path to I94 immigration data
#immigration_data = '/data/18-83510-I94-Data-2016/*.sas7bdat'
data_immigration = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

#Clean I94 Immigration data and store as Spark Dataframe
df_immigration = clean_i94_data(data_immigration)

#Select columns for immigration dimension columns
immigration_table = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

#Write immigration dimension to parquet files partitioned by 'i94port'
immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [10]:
#Select columns for temperature dimension table
temp_table = df_temp.select(["AverageTemperature", "City", "Country","Latitude", "Longitude", "i94port"])

#Write immigration dimension table to parquet files partitioned by 'i94port'
immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [11]:
#Create temporary views of the immigraion and temperature data
df_immigration.createOrReplaceTempView("immigration_view")
df_temp.createOrReplaceTempView("temp_view")

#Create the fact table by joining the immigration and temperature views
fact_table = spark.sql('''
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temp_view.AverageTemperature as temperature,
       temp_view.Latitude as latitude,
       temp_view.Longitude as longitude
FROM immigration_view
INNER JOIN temp_view ON (immigration_view.i94port = temp_view.i94port)
''')
#Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here
def quality_check(df, description):
    '''
    Input: spark dataframe, description of spark dataframe
    
    output: Print the outcome of data quality check
    '''
    record_count=df.count()
    if record_count == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print(" Data quality check passed for {} with {} records".format(description, record_count))
            
#Perform data quality check
quality_check(df_immigration, "immigration table")
quality_check(df_temp, "temperature table")

 Data quality check passed for immigration table with 3088544 records


#### 4.3 Data dictionary 
The first dimension table will contain events from the I94 immigration data. The columns below will be extracted from the immigration dataframe:
* `i94yr` =  four digit year
* `i94mon` = numeric month
* `i94cit` = three digit code of origin city
* `i94port` = three character code of destination city
* `arrdate` = arrival date
* `i94mode` = digit travel code
* `depdate` = departure date
* `i94visa` = reason for immigration

The second dimension table will contain city temperature data. The columns below will be extracted from the temperature dataframe:

* `i94port` = character code of destination city (mapped from immigration data during cleanup step)
* `AverageTemperature` = average temperature
* `City` = city name
* `Country` = country name
* `Latitude`= latitude
* `Longitude` = longitude

The fact table will contain information from the I94 immigration data joined with the city temperature data on `i94port`:
* `i94yr` = four digit year
* `i94mon` = numeric month
* `i94cit` = three digit code of origin city
* `i94port` = three character code of destination city
* `arrdate` = arrival date
* `i94mode` = one digit travel code
* `depdate` = departure date
* `i94visa` = reason for immigration
* `AverageTemperature` = average temperature of destination city

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

Spark was chosen as it can easily handle multiple file formats (including SAS) containing large amounts of data. Spark SQL was chosen to process the large input files into dataframes and manipulated via standard SQL join operations to form additional tables.

* Propose how often the data should be updated and why.

The data should be updated monthly in coexistence with the current raw file format.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 
 Our data would be stored in an Amazon S3 bucket and loaded to our staging tables. We would still use spark as it as our data processing platform since it is the best suited platform for very large datasets.
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
 We would use Apache Airflow to perform the ETL and data qualtiy validation.
 
 * The database needed to be accessed by 100+ people.
 
 Once the data is ready to be accessed, it would be stored in a postgres database on a redshift cluster that easily supports multiuser access.