# Project Title
### Data Engineering Capstone Project

#### Project Summary
The project goal is to create an ETL pipeline using the I94 immigration dataset and the city temperature data from Kaggle to enable users to analyse the relationship between immgration behaviours and the temperature of a location.


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [93]:
import pandas as pd
import os
import re
from functools import reduce
from collections import OrderedDict

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import udf, col, lower

### Step 1: Scope the Project and Gather Data

#### Scope 
ETL pipeline using the I94 immigration dataset and the city temperature data from Kaggle.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

#### I94 immigration dataset

```
I94YR   - 4 digit year
I94MON  - Numeric month
I94CIT  - Country visited
I94PORT - Port visited
I94MODE - Mode of travel
I94ADDR - Address
I94BIR  - Age of Respondent in Years
I94VISA - Purpose of visit. Visa codes collapsed into three categories.
```

In [3]:
sas_header_file = 'I94_SAS_Labels_Descriptions.SAS'

fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")



In [4]:
print(df.shape)
df.head()

(3096313, 28)


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [11]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

#### Temperature data

In [7]:
temperature_fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df = pd.read_csv(temperature_fname)
temperature_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [7]:
df.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

In [8]:
df.i94yr.unique()

array([ 2016.])

In [9]:
df.i94mon.unique()

array([ 4.])

In [10]:
df.i94cit.unique()

array([ 692.,  254.,  101.,  102.,  103.,  104.,  105.,  107.,  108.,
        109.,  110.,  111.,  112.,  113.,  114.,  115.,  116.,  117.,
        118.,  119.,  121.,  122.,  123.,  124.,  126.,  127.,  128.,
        129.,  130.,  131.,  133.,  135.,  140.,  141.,  145.,  147.,
        148.,  151.,  152.,  153.,  154.,  155.,  156.,  157.,  159.,
        162.,  163.,  164.,  165.,  166.,  167.,  201.,  203.,  204.,
        206.,  207.,  209.,  213.,  218.,  220.,  236.,  242.,  244.,
        245.,  249.,  250.,  251.,  253.,  255.,  256.,  257.,  258.,
        260.,  261.,  262.,  263.,  266.,  267.,  268.,  272.,  273.,
        274.,  296.,  297.,  298.,  299.,  301.,  304.,  310.,  311.,
        315.,  316.,  320.,  323.,  324.,  326.,  329.,  332.,  336.,
        338.,  339.,  340.,  342.,  343.,  344.,  345.,  348.,  350.,
        352.,  368.,  369.,  370.,  371.,  372.,  373.,  375.,  376.,
        382.,  383.,  384.,  385.,  386.,  387.,  388.,  389.,  390.,
        391.,  392.,

In [11]:
df.i94port.unique()

array(['XXX', 'ATL', 'WAS', 'NYC', 'TOR', 'BOS', 'HOU', 'MIA', 'CHI',
       'LOS', 'CLT', 'DEN', 'DAL', 'DET', 'NEW', 'FTL', 'LVG', 'ORL',
       'NOL', 'PIT', 'SFR', 'SPM', 'POO', 'PHI', 'SEA', 'SLC', 'TAM',
       'HAM', 'NAS', 'VCV', 'MAA', 'AUS', 'HHW', 'OGG', 'PHO', 'SDP',
       'SFB', 'EDA', 'MON', 'CLG', 'DUB', 'FMY', 'YGF', 'SAJ', 'CIN',
       'BAL', 'RDU', 'WPB', 'STT', 'OAK', 'NSV', 'SNA', 'OTT', 'X96',
       '5KE', 'CLE', 'HAR', 'PSP', 'CHR', 'HAL', 'SAA', 'KOA', 'SHA',
       'WIN', 'BGM', 'NCA', 'OPF', 'SAI', 'JFA', 'AGA', 'ONT', 'CLM',
       'STL', 'W55', 'CHS', 'SNJ', 'SRQ', 'ANC', 'LNB', 'LIH', 'MIL',
       'INP', 'KAN', 'ROC', 'SAC', 'BRO', 'LAR', 'RNO', 'SGR', 'ELP',
       'MCA', 'MDT', 'SPE', 'FPR', 'SYR', 'ICT', 'MLB', 'ADS', 'TUC',
       'DLR', 'CAE', 'CHA', 'HSV', 'WIL', 'HPN', 'HEF', 'BRG', 'BED',
       'DAB', 'JAC', 'FRB', 'SWF', 'KEY', 'PTK', 'MWH', 'X44', 'MYR',
       'APF', 'ATW', 'PVD', 'BUF', 'PIE', 'MHT', 'BDL', 'NYL', 'VNY',
       '5T6', 'LEX',

In [12]:
df.i94mode.unique()

array([ nan,   1.,   2.,   9.,   3.])

In [13]:
df.i94visa.unique()

array([ 2.,  3.,  1.])

We will retrieve valid city codes and valid port codes to clean our data.

In [8]:
with open(sas_header_file) as f:
    lines = f.readlines()

In [85]:

regex_matcher = re.compile(r'\s*(\d*).*\'(.*)\'')

valid_cities = OrderedDict()
for line in lines[11:298]:
    match = regex_matcher.search(line)
    abbreviation = match.group(1)
    description = match.group(2)
    valid_cities[abbreviation]=[description]


print(list(valid_cities.keys())[:10])
print(list(valid_cities.values())[:10])

['101', '316', '102', '324', '529', '518', '687', '151', '532', '438']
[['ALBANIA'], ['ALGERIA'], ['ANDORRA'], ['ANGOLA'], ['ANGUILLA'], ['ANTIGUA-BARBUDA'], ['ARGENTINA '], ['ARMENIA'], ['ARUBA'], ['AUSTRALIA']]


Example valid cities

```
{'   101 =  ': ['ALBANIA'], '   316 =  ': ['ALGERIA'], '   102 =  ': ['ANDORRA'], '   324 =  ': ['ANGOLA'], '   529 =  ': ['ANGUILLA'], '   518 =  ': ['ANTIGUA-BARBUDA'], '   687 =  ': ['ARGENTINA '], '   151 =  ': ['ARMENIA'], '   532 =  ': ['ARUBA'], '   438 =  ': ['AUSTRALIA'], '   103 =  ': ['AUSTRIA'], '   152 =  ': ['AZERBAIJAN'], '   512 =  ': ['BAHAMAS'], '   298 =  ': ['BAHRAIN'], '   274 =  ': ['BANGLADESH'], '   513 =  ': ['BARBADOS']
```

In [86]:
# Mapping string to float to fit the dataset
adjusted_cities = OrderedDict()
for key, value in valid_cities.items():
    adjusted_cities[float(key)] = value[0]

valid_cities = adjusted_cities
print(list(valid_cities.keys())[:10])
print(list(valid_cities.values())[:10])

[101.0, 316.0, 102.0, 324.0, 529.0, 518.0, 687.0, 151.0, 532.0, 438.0]
['ALBANIA', 'ALGERIA', 'ANDORRA', 'ANGOLA', 'ANGUILLA', 'ANTIGUA-BARBUDA', 'ARGENTINA ', 'ARMENIA', 'ARUBA', 'AUSTRALIA']


In [13]:
regex_matcher = re.compile(r'\'(.*)\'.*\'(.*)\'')
valid_ports = {}
for line in lines[303:962]:
    match = regex_matcher.search(line)
    abbreviation = match.group(1)
    description = match.group(2)
    valid_ports[abbreviation]=[description]


# print(valid_ports)

Example valid ports
```
{'ANC': ['ANCHORAGE, AK         '], 'BAR': ['BAKER AAF - BAKER ISLAND, AK'], 'DAC': ['DALTONS CACHE, AK     '], 'PIZ': ['DEW STATION PT LAY DEW, AK'], 'DTH': ['DUTCH HARBOR, AK      '], 'EGL': ['EAGLE, AK             '], 'FRB': ['FAIRBANKS, AK         '], 'HOM': ['HOMER, AK             '], 'HYD': ['HYDER, AK             '], 'JUN': ['JUNEAU, AK            '],
```

We shall clean the cities and ports to ensure that only valid codes are present. Also, we will remove i94mode that are not a number eg 'NaN' values. We will only keep relevant columns that will be used in the fact or dimension tables.

In [14]:
spark = SparkSession.builder \
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .enableHiveSupport() \
    .getOrCreate()


In [53]:
def load_spark_df(file):
    return spark.read.format('com.github.saurfang.sas.spark').load(file)

def clean_data(df):
    target_columns = ['i94yr', 'i94mon', 'i94cit', 'i94port', 'i94mode', 'i94bir', 'arrdate', 'i94visa']
    
    df = df.filter(df.i94cit.isin(valid_cities))
    df = df.filter(df.i94port.isin(list(valid_ports.keys())))
    df = df.filter(df.i94mode != 'NaN')

    return df.select(*target_columns)

In [54]:
immigration_df = clean_data(load_spark_df(fname))
immigration_df.show(10)

+------+------+------+-------+-------+------+-------+-------+
| i94yr|i94mon|i94cit|i94port|i94mode|i94bir|arrdate|i94visa|
+------+------+------+-------+-------+------+-------+-------+
|2016.0|   4.0| 101.0|    WAS|    1.0|  55.0|20545.0|    2.0|
|2016.0|   4.0| 101.0|    NYC|    1.0|  28.0|20545.0|    2.0|
|2016.0|   4.0| 101.0|    NYC|    1.0|   4.0|20545.0|    2.0|
|2016.0|   4.0| 101.0|    NYC|    1.0|  57.0|20545.0|    1.0|
|2016.0|   4.0| 101.0|    NYC|    1.0|  63.0|20545.0|    2.0|
|2016.0|   4.0| 101.0|    NYC|    1.0|  57.0|20545.0|    2.0|
|2016.0|   4.0| 101.0|    NYC|    1.0|  46.0|20545.0|    2.0|
|2016.0|   4.0| 101.0|    NYC|    1.0|  48.0|20545.0|    1.0|
|2016.0|   4.0| 101.0|    NYC|    1.0|  52.0|20545.0|    2.0|
|2016.0|   4.0| 101.0|    TOR|    1.0|  33.0|20545.0|    2.0|
+------+------+------+-------+-------+------+-------+-------+
only showing top 10 rows



In [None]:
@udf()
def map_city(city):
    for key in valid_port:
        if city.lower() in valid_port[key][0].lower():
            return key

def map_df(df):
    df
    
immigration_df

In [57]:
temp_df = pd.read_csv(temperature_fname, sep=',')
temp_df.head()


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [73]:
print(min(temp_df.AverageTemperature))
print(max(temp_df.AverageTemperature))

print("Check for non numeric AverageTemperature")
print(temp_df[pd.isnull(temp_df.AverageTemperature)][:3])

print("Check for non numeric AverageTemperatureUncertainty")
print(temp_df[pd.isnull(temp_df.AverageTemperatureUncertainty)][:3])

print("Check for non numeric Latitude")
print(temp_df[temp_df.Latitude.apply(lambda x: x.isnumeric())])

print("Check for non numeric Longitude")
print(temp_df[temp_df.Longitude.apply(lambda x: x.isnumeric())])


-42.70399999999999
39.650999999999996
Check for non numeric AverageTemperature
           dt  AverageTemperature  AverageTemperatureUncertainty   City  \
1  1743-12-01                 NaN                            NaN  Århus   
2  1744-01-01                 NaN                            NaN  Århus   
3  1744-02-01                 NaN                            NaN  Århus   

   Country Latitude Longitude  
1  Denmark   57.05N    10.33E  
2  Denmark   57.05N    10.33E  
3  Denmark   57.05N    10.33E  
Check for non numeric AverageTemperatureUncertainty
           dt  AverageTemperature  AverageTemperatureUncertainty   City  \
1  1743-12-01                 NaN                            NaN  Århus   
2  1744-01-01                 NaN                            NaN  Århus   
3  1744-02-01                 NaN                            NaN  Århus   

   Country Latitude Longitude  
1  Denmark   57.05N    10.33E  
2  Denmark   57.05N    10.33E  
3  Denmark   57.05N    10.33E  
Check for n

We will be cleaning null values and only selecting the relevant columns from temperature dataset for the dimension and fact tables. 

In [74]:

def csv_to_spark_df(file):
    return spark.read.format("csv").option("header", "true").load(file)

def clean_temp_data(df):
    target_columns = ['dt', 'AverageTemperature', 'City', 'Country', 'Latitude', 'Longitude']
    df = df.filter(temp_df.AverageTemperature != 'NaN')

    return df.select(*target_columns)

temp_df = clean_temp_data(csv_to_spark_df(temperature_fname))
temp_df.show(3)

+----------+------------------+-----+-------+--------+---------+
|        dt|AverageTemperature| City|Country|Latitude|Longitude|
+----------+------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01|5.7879999999999985|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|            10.644|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----+-------+--------+---------+
only showing top 3 rows



In [132]:
@udf()
def map_country(city):
    for key, value in valid_cities.items():
        if city.lower() == value.lower():
            return key

def map_temp_data(df):
    df = df.withColumn("CityCode", map_country(temp_df.Country))
    return df.filter(df.CityCode != 'null')



In [133]:
temp_df = map_temp_data(temp_df)
temp_df.show(5)

+----------+------------------+-----+-------+--------+---------+--------+--------+
|        dt|AverageTemperature| City|Country|Latitude|Longitude|PortCode|CityCode|
+----------+------------------+-----+-------+--------+---------+--------+--------+
|1743-11-01|             6.068|Århus|Denmark|  57.05N|   10.33E|   108.0|   108.0|
|1744-04-01|5.7879999999999985|Århus|Denmark|  57.05N|   10.33E|   108.0|   108.0|
|1744-05-01|            10.644|Århus|Denmark|  57.05N|   10.33E|   108.0|   108.0|
|1744-06-01|14.050999999999998|Århus|Denmark|  57.05N|   10.33E|   108.0|   108.0|
|1744-07-01|            16.082|Århus|Denmark|  57.05N|   10.33E|   108.0|   108.0|
+----------+------------------+-----+-------+--------+---------+--------+--------+
only showing top 5 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Dimension Tables

Immigration will contain the following data from the I94 dataset.
```
I94YR   - 4 digit year
I94MON  - Numeric month
I94CIT  - Country visited
I94PORT - Port visited
I94MODE - Mode of travel
I94BIR  - Age of Respondent in years
ARRDATE - Arrival date
I94VISA - Purpose of visit (Business/Pleasure/Student)
```

Temperature will contain the following columns from the temperature dataset.

```
dt                 - Date of recording
CityCode           - Code representing a city
AverageTemperature - Average temperature for the day
City               - City name
Country            - Country name
Latitude           - Latitude
Longitude          - Longitude
```

Fact Table:
```
I94YR   - 4 digit year
I94MON  - Numeric month
I94CIT  - Country visited
I94PORT - Port visited
I94MODE - Mode of travel
I94BIR  - Age of Respondent in years
ARRDATE - Arrival date
I94VISA - Purpose of visit (Business/Pleasure/Student)
AverageTemperature - Average temperature for the day
```


#### 3.2 Mapping Out Data Pipelines

 1. Data cleaning
 2. Transform collumns to standardised code (eg country code)
 3. Transform to dimension tables
 4. Create fact table by joining on the city code

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [137]:
immigration_df.write \
    .mode("append") \
    .partitionBy("i94port") \
    .parquet("/tables/immigration.parquet")


In [None]:
temp_df.write \
    .mode("append") \
    .partitionBy("CityCode") \
    .parquet("/tables/temperature.parquet")

In [None]:
# Create temporary views 
immigration_df.createOrReplaceTempView("immigration_view")
temp_df.createOrReplaceTempView("temperature_view")

# Create the fact table by joining on city code
fact_table = spark.sql("""
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as port,
       immigration_view.i94mode as travel_mode,
       immigration_view.i94bir as birthday,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temp_view.AverageTemperature as temperature,
FROM immigration_view
JOIN temp_view ON (immigration_view.i94cit = temperature_view.CityCode)
""")

# Write fact table 
fact_table.write \
    .mode("append") \
    .partitionBy("city") \
    .parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
def check_count(df):
    return df.count() > 0

print(check_count(fact_table))

In [None]:
# Integrity Check
imm_city_count = df_immigration.select(col("i94cit")).distinct().count()
temp_city_count = temp_df.select(col("CityCode")).distinct().count()
fact_city_count = fact_table.select(col("city")).distinct().count()

print(temp_city_count <= imm_city_count)
print(fact_city_count <= imm_city_count)

#### 4.3 Data dictionary 


Dimension Tables

Immigration will contain the following data from the I94 dataset.
```
I94YR   - 4 digit year
I94MON  - Numeric month
I94CIT  - Country visited
I94PORT - Port visited
I94MODE - Mode of travel
I94BIR  - Age of Respondent in years
ARRDATE - Arrival date
I94VISA - Purpose of visit (Business/Pleasure/Student)
```

Temperature will contain the following columns from the temperature dataset.

```
dt                 - Date of recording
CityCode           - Code representing a city
AverageTemperature - Average temperature for the day
City               - City name
Country            - Country name
Latitude           - Latitude
Longitude          - Longitude
```

Fact Table:
```
I94YR   - 4 digit year
I94MON  - Numeric month
I94CIT  - Country visited
I94PORT - Port visited
I94MODE - Mode of travel
I94BIR  - Age of Respondent in years
ARRDATE - Arrival date
I94VISA - Purpose of visit (Business/Pleasure/Student)
AverageTemperature - Average temperature for the day
```


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

First, we use Pandas to perform data exploration to get a better sense of the data and identify potential issues to fix. We use Apache Spark for the overall data pipeline to support the processing of large datasets. Also we use the parquet format that can be saved into S3 for efficient data processing.

* Propose how often the data should be updated and why.

The data should be updated daily as the dimension tables captures daily information. 


* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 
 We could manually resize the number of core nodes in the running Spark cluster to a suitable size that meets the business requirements. 


 * The data populates a dashboard that must be updated on a daily basis by 7am every day.

    We could set up a schedule job using Apache Airflow to update the database at the specified timing. Then, push a notification to the dashboard to retrieve the data from our database.

 * The database needed to be accessed by 100+ people.
 
     We could provision database read replicas if there are multiple users accessing directly to the database. 