## Building an ETL Pipeline for Immigration and Temperature Data using Spark
### Data Engineering Capstone Project

#### Project Summary
The goal of the project is to build an ETL pipeline that will be utilised to integrate immigration and temperature data. The developed schema will serve users who seek information on weather-influenced immigration decisions. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import re
import psycopg2
from collections import defaultdict
from datetime import datetime, timedelta
from pyspark.sql.functions import *
from pyspark.sql import SparkSession, SQLContext, GroupedData


### Step 1: Project scope and project data

The aim of this project is to use Spark to create an ETL pipeline that will serve users seeking information on weather-influenced immigration statistics.
In this project, the I94 immigration data provided and the temperature data from Kaggle will be transformed into two dimension tables respectively and aggregated by destination cities to form the fact table for the schema. With the schema, users can query the database to see if there is a relationshp between cities being migrated to and the the temperature of the cities.

The I94 immigration data is souced from the US National Tourism and Trade Office. It is provided in a binary database storage format. Some attributes of the data include: 
AverageTemperature 
City 
Country 
Latitude 
Longitude



### Immigration data

In [2]:
# Read April 2016 I94 immigration data
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immidata = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
df_immidata.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [8]:
i94_label = 'I94_SAS_Labels_Descriptions.SAS'
with open(i94_label) as f:
    lines = f.readlines()  
    
#remove this
#comments = [line for line in lines if '/*' in line and '*/\n' in line]
#regexp = re.compile(r'^/\*\s+(?P<code>.+?)\s+-\s+(?P<description>.+)\s+\*/$')
#matches = [regexp.match(c) for c in comments]

#for m in matches:
#    print(m.group("code"), ":", m.group('description'))

### Temperature data


In [9]:
# Read temperature data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_tempdata = pd.read_csv(fname, sep=',')

In [10]:
df_tempdata.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [11]:
# Beginning Spark session 
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [9]:
#write to parquet
#df_spark.write.parquet(df_tempdata)
#df_spark=spark.read.parquet(df_tempdata)





### Step 2: Exploring data and and removing invalid data points

Spark is used for processing the data. Data is explored to find missing and invalid data. For I94 immigration data, all data points where i94port is missing or invalid is dropped. For Temperature Data, all data points where AverageTemperature is missing is also dropped. 

In [34]:
#Creating valid i94 data
re_comp = re.compile(r'\'(.*)\'.*\'(.*)\'')
vports = {}
for line in lines[302:962]:
    match = re_comp.search(line)
    vports[match.group(1)]=[match.group(2)]

In [38]:
# Load I94 data into Spark and clean data
def file_to_spark_df(file):
    return spark.read.format('com.github.saurfang.sas.spark').load(file)

def prepared_i94(df):
    filteredColumns = {'i94yr', 'i94mon', 'i94cit', 'i94port', 'i94mode', 'i94bir', 'arrdate', 'depdate', 'i94visa'}
    tempdata = df.filter(df.i94port.isin(list(vports.keys())))
    return tempdata.select([c for c in temp.columns if c in filteredColumns])

def prepared_i94(file):
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(vports.keys())))
    return df_immigration

In [39]:
# Testing function
immigration_test_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
df_immigration_test = prepared_i94(immigration_test_file)
df_immigration_test.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [40]:
# Cleaning temperature data
df_cleantemp = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")
df_cleantemp = df_cleantemp.filter(df_cleantemp.AverageTemperature != 'NaN')
df_cleantemp = df_cleantemp.dropDuplicates(['City', 'Country'])

In [42]:
# Including iport94 based on city name and removing data with no iport94 field
@udf()
def get_i94port(city):
    '''
    Input: City name    
    Output: Corresponding i94port
    
    '''
    for key in vports:
        if city.lower() in vports[key][0].lower():
            return key
        
df_cleantemp = df_cleantemp.withColumn("i94port", get_i94port(df_cleantemp.City))
df_cleantemp = df_cleantemp.filter(df_cleantemp.i94port != 'null')
df_cleantemp.head()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Fact Table: This will have attributes of both the I94 immigration data and the temperature data joined with the city temperature data on i94port.
Attributes:
i94yr = 4 digit year
i94mon = month
i94cit = 3 digit code of origin city
i94port = 3 character code of destination city
i94mode = 1 digit travel code
arrdate = arrival date
depdate = departure date
i94visa = immigration purpose
AverageTemperature = average temperature of destination city

First dimension table: This contains attributes from the I94 immigration data.
Attributes:
i94yr = 4 digit year
i94mon = month
i94cit = 3 digit code of origin city
i94port = 3 character code of destination city
i94mode = 1 digit travel code
arrdate = arrival date
depdate = departure date
i94visa = immigration purpose

Second dimension table: The temperature data from Kaggle which includes.
Attributes:
AverageTemperature 
City 
Country 
Latitude 
Longitude
i94port = 3 character code of destination city 

#### 3.2 Mapping Out Data Pipelines
Steps to creating data pipline:

I 94 data is cleaned to create Spark dataframe df_immigration for each month.     
Temperature data is cleaned to create Spark df_cleanTemp.
Immigration dimension table is created by selecting attributes from df_immigration and writing to parquet file partitioned by i94port.
Temperature dimension table is created by selecting attributes from df_cleantemp and writing to parquet file partitioned by i94port.
Fact table is created by joining immigration and temperature dimension tables on i94port and then writing to parquet file partitioned by i94port.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [25]:
#step1
immigration_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
# Clean I94 immigration data and store as Spark dataframe
df_immigration = prepared_i94(immigration_data)
# Extract columns for immigration dimension table
immigration_table = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])
# Write immigration dimension table to parquet files partitioned by i94port
immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [29]:
#step2
# Extract columns for temperature dimension table
temp_table = df_cleantemp.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])
# Write temperature dimension table to parquet files partitioned by i94port
temp_table.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [31]:
#Step 3:
# Create temporary views of the immigration and temperature data
df_immigration.createOrReplaceTempView("immigration_view")
df_cleantemp.createOrReplaceTempView("temp_view")

In [32]:
# Create and write the fact table to i94port parquet files based on i94port
fact_table = spark.sql('''
SELECT immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94yr as arrival_year,
       immigration_view.i94mon as arrival_month,
       immigration_view.i94cit as origin_city,
       immigration_view.i94port as destin_city,
       immigration_view.i94visa as visa,
       temp_view.AverageTemperature,
       temp_view.Latitude,
       temp_view.Longitude 
FROM immigration_view
JOIN temp_view ON (immigration_view.i94port = temp_view.i94port)
''')

fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks

In [34]:
# Performing quality checks 
def data_quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark dataframe
    Output: Print outcome of data quality check
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

data_quality_check(df_immigration, "immigration table")
data_quality_check(df_cleantemp, "temperature table")

#### 4.3 Data dictionary 

Fact Table is a table created by joining I94 immigration data to city temperature data on i94port.
immigration_view.arrdate = arrival date
immigration_view.depdate = departure date     
immigration_view.i94yr = arrival year
immigration_view.i94mon = arrival month
immigration_view.i94cit = origin city
immigration_view.i94port = destinaton city
immigration_view.i94visa = visa
temp_view.AverageTemperature = average temperature 
temp_view.Latitude = latitude 
temp_view.Longitude = longitude 

Dimension Table 1 contains I94 immigration data.
i94yr = arrival year
i94mon = arrival month
i94cit = origin city
i94port = destinaton city
arrdate = arrival date
i94mode = 1 digit travel code
depdate = departure date
i94visa = visa

Dimension Table 2 contains city temperature data.
i94port = destination city from immigration data
AverageTemperature = average temperature
City = city name
Country = country name
Latitude= latitude
Longitude = longitude


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

Rationale for the choice of tools and technologies.
For this project, Panda was used for data exploration because it is customarily the preferred choice due to its efficiency. Then Spark was used in building the data pipeline because it performs well on file formats such as SAS with big data.  


Propose how often the data should be updated and why.
The data used in creating the schema is captured in montly format so I propose it be updated monthly too. 


Scenario 1
If data is increased by 100x, I would used Redshift to increase efficiency processing efficiency since Redshift is built for handling such big data prcesses efficiently.

Scenario 2
If the data populates a dashboard that must be updated daily by 7am, I would use Airflow since it enhances management of data pipelines.

Scenario 3
If the database is to be accessed by 100+ people, I would still use Spark but ensure settings are optimized for large data reads and writes.

