# ETL Pipeline for Immigration and Average Temperature
### Data Engineering Capstone Project

#### Project Summary
The analyst team are analysing seasonal pattern of immigration from one country to another, following destination temperature. The project needs to build an ETL pipeline to form an analytics database on immigration events and the temperature of their destintion.

#### Project Datasets:
Project used I94 immigration data, Global land and temperature by City and global I94 port data. Accessing the data:
* **Immigration data**: `../../data/18-83510-I94-Data-2016/`
* **Temperature data**: `../../data2/GlobalLandTemperaturesByCity.csv`
* **Valid i94 data**: `i94port_valid.txt` extracted from the file I94_SAS_Labels_Descriptions

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Create a spark session

In [1]:
from pyspark.sql import SparkSession

MAX_MEMORY = "1g"

spark = SparkSession.builder \
            .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
            .config("spark.executor.memory", MAX_MEMORY) \
            .config("spark.driver.memory", MAX_MEMORY) \
            .getOrCreate()

## Step 1: Scope the Project and Gather Data

### Scope of the Project:
- Use Spark to load the data into the dataframes.
- Cleanse the data for Global temperature.
- Create a dataframe containing fields City, Country, Port from i94_valid.txt file.
- Create a dataframe from temperature data: City, Country, Port,  Year, Month, Average Temperature
- Cleanse the data for i94 immigration
- Create a dataframe for i94 immigration: Port, Year, Month, Count
- Create a fact table from immigration and temprature dataframes: Port, Month, Average Temperature, Count

### Description and Gather Data:
I94 immigration [data](https://travel.trade.gov/research/reports/i94/historical/2016.html) comes from the US National Tourism and Trade Office. Data is provided in SAS7BDAT format. Some relevant attributes include:

* **i94yr**: 4 digit year
* **i94mon**: numeric month
* **i94port**: 3 character code of destination USA city
* **arrdate**: arrival date in the USA

The temperature [data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) comes from Kaggle. Data is provided in csv format. Some relevant attributes include:

* **AverageTemperature** = average temperature
* **City**: city name
* **Country**: country name
* **Latitude**: latitude
* **Longitude**: longitude


## Step 2: Explore and Assess the Data

###  Load Temperature data

In [2]:
# Read csv Global Land Temperatures by City
global_temp_df = spark.read \
                .options(header=True, delimiter=',', inferSchema=True, dateFormat="yyyy-MM-dd") \
                .csv('../../data2/GlobalLandTemperaturesByCity.csv')

global_temp_df.show()

+-------------------+-------------------+-----------------------------+-----+-------+--------+---------+
|                 dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01 00:00:00| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01 00:00:00|             10.644|           1.2

###  2.1: Data cleaning for Temperature data
There are some City, Country combination for which there exists multiple locations (Eg. Springfield, United States). In order to distinguish between them, we will need to take Lat/Long into consideration. To simplify it, we are skipping the entries that belong to these City, Country combination.

In [3]:
# Cleanup records with no mention of average temperature
global_temp_df = global_temp_df.filter(global_temp_df.AverageTemperature != 'NaN')

# List of city/country combination for which there are multiple locations named that.
duplicate_df = global_temp_df \
    .select("City", "Country", "Latitude", "Longitude") \
    .distinct() \
    .groupBy("City", "Country") \
    .count() \
    .filter("count > 1")
duplicate_df.show()

+-----------+-------------+-----+
|       City|      Country|count|
+-----------+-------------+-----+
|     Jining|        China|    2|
|  Arlington|United States|    2|
|Springfield|United States|    3|
|     Haikou|        China|    2|
|   Pasadena|United States|    2|
|  Rongcheng|        China|    3|
|    Luoyang|        China|    2|
|     Yichun|        China|    2|
|   Richmond|United States|    2|
|   Haicheng|        China|    2|
|      Depok|    Indonesia|    2|
|      Taman|    Indonesia|    2|
|  Yingcheng|        China|    2|
|     Aurora|United States|    2|
|     Peoria|United States|    2|
|     Suzhou|        China|    2|
|   Columbus|United States|    2|
|   Glendale|United States|    2|
+-----------+-------------+-----+



In [4]:
# Remove those common city/country name from the dataframe
temp_df = global_temp_df.join(duplicate_df, ["City", "Country"], "leftanti")

# Check if there are any duplicates
temp_df \
    .select("City", "Country", "Latitude", "Longitude") \
    .distinct() \
    .groupBy("City", "Country") \
    .count() \
    .filter("count > 1") \
    .show()

+----+-------+-----+
|City|Country|count|
+----+-------+-----+
+----+-------+-----+



###  2.2: Create a dataframe containing the fields - City, Country, Port from i94port_valid.txt

In [5]:
def get_city_country_tuple(location):
    '''
    Given a string, return a tuple containing city and country. Eg.
    
    Input: ANACORTES, WA
    Output: (ANACORTES, UNITED STATES)
    
    Input: DUBLIN, IRELAND
    Output: (DUBLIN, IRELAND)
    '''
    
    vals = location.rsplit(',', 1)
    if len(vals) != 2:
        return
    
    city = vals[0].strip();
    country = vals[1].strip();
    
    if (len(country) == 2):
        country = "UNITED STATES"
        
    return (city, country)

In [6]:
import re

reg_str = re.compile(r'\'(.*)\'.*\'(.*)\'')
port_locations = []

'''
Uses i94port_valid.txt to create a dictionary of (city, country) -> port.
This will be used below to get the port based on city and country.
'''
with open('i94port_valid.txt') as f:
    for line in f:
        line = line.upper()
        matched_str = reg_str.search(line)
        
        port = matched_str.group(1).strip()
        location = matched_str.group(2).strip()
        
        tup = get_city_country_tuple(location);
        
        if tup:
            city = tup[0]
            country = tup[1]
            port_locations.append((city, country, port))
    # print(port_locations)

In [7]:
# Create a dataframe of valid ports from i94_valid.txt file.
port_location_schema = ["City", "Country", "Port"]
port_location_df = spark.createDataFrame(data=port_locations, schema = port_location_schema)

port_location_df.printSchema()
port_location_df.show(truncate=False)

root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Port: string (nullable = true)

+------------------------+-------------+----+
|City                    |Country      |Port|
+------------------------+-------------+----+
|ALCAN                   |UNITED STATES|ALC |
|ANCHORAGE               |UNITED STATES|ANC |
|BAKER AAF - BAKER ISLAND|UNITED STATES|BAR |
|DALTONS CACHE           |UNITED STATES|DAC |
|DEW STATION PT LAY DEW  |UNITED STATES|PIZ |
|DUTCH HARBOR            |UNITED STATES|DTH |
|EAGLE                   |UNITED STATES|EGL |
|FAIRBANKS               |UNITED STATES|FRB |
|HOMER                   |UNITED STATES|HOM |
|HYDER                   |UNITED STATES|HYD |
|JUNEAU                  |UNITED STATES|JUN |
|KETCHIKAN               |UNITED STATES|5KE |
|KETCHIKAN               |UNITED STATES|KET |
|MOSES POINT INTERMEDIATE|UNITED STATES|MOS |
|NIKISKI                 |UNITED STATES|NIK |
|NOM                     |UNITED STATES|NOM |
|POKER CR

### 2.3: Create DF for i94 immigration
Extract fields (Year, Month, Port, Count) for arrivals from i94 immigration data

In [8]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

def filter_i94_data(file, spark_session):
    '''
    Function filters out only valid i94port records from sas7bdat file.
    
    file : I94 immigration data file,
    spark_session : Spark session
    
    Return: I94 immigration dataframe of only valid i94port data
    
    '''
    
    # Read I94 data from the file and load into spark dataframe
    i94_immigration_df = spark_session.read.format('com.github.saurfang.sas.spark').load(file)

    # Remove all the invalid records from the i94port column
    # i94_immigration_df = i94_immigration_df.filter(i94_immigration_df.i94port.isin(list(i94port_valid_dict.keys())))
    
    i94_immigration_df = i94_immigration_df \
        .filter(i94_immigration_df.arrdate != 'NaN') \
        .select(
            col("i94port").alias("Port"),
            col("i94yr").cast(IntegerType()).alias("Year"),
            col("i94mon").cast(IntegerType()).alias("Month")
        ) \
        .groupBy(["Port", "Year", "Month"]) \
        .count() \
        .orderBy(["Port", "Year", "Month"])

    return i94_immigration_df

In [9]:
import os

files = os.listdir('../../data/18-83510-I94-Data-2016/')
files

['i94_jul16_sub.sas7bdat',
 'i94_jun16_sub.sas7bdat',
 'i94_mar16_sub.sas7bdat',
 'i94_apr16_sub.sas7bdat',
 'i94_may16_sub.sas7bdat',
 'i94_oct16_sub.sas7bdat',
 'i94_jan16_sub.sas7bdat',
 'i94_feb16_sub.sas7bdat',
 'i94_aug16_sub.sas7bdat',
 'i94_nov16_sub.sas7bdat',
 'i94_sep16_sub.sas7bdat',
 'i94_dec16_sub.sas7bdat']

In [10]:
# Create a thread for each file to process the data.

from multiprocessing.pool import ThreadPool

def process_file(file_name):
    '''
        Process a file
        Input: file_name
        Return: i94 immigration dataframe
    '''
    
    file_path = '../../data/18-83510-I94-Data-2016/' + file_name
    return filter_i94_data(file_path, spark)

pool = ThreadPool(len(files))
df_coll = pool.map(lambda file_name: process_file(file_name), files)

In [11]:
# Merge all the immigration dataframe created by an individual file 
# and create a new merged dimension immigration dataframe

from functools import reduce

dim_immigration_df = reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), df_coll)
dim_immigration_df = dim_immigration_df.orderBy(["Port", "Year", "Month"])

dim_immigration_df.show()

+----+----+-----+-----+
|Port|Year|Month|count|
+----+----+-----+-----+
| 48Y|2016|    7|    2|
| 5KE|2016|    3|    2|
| 5KE|2016|    4|    3|
| 5KE|2016|    6|   18|
| 5KE|2016|    7|   25|
| 5KE|2016|    8|    9|
| 5KE|2016|    9|    1|
| 5T6|2016|    1|   12|
| 5T6|2016|    2|   18|
| 5T6|2016|    3|   20|
| 5T6|2016|    4|    4|
| 5T6|2016|    5|   11|
| 5T6|2016|    6|   12|
| 5T6|2016|    7|   18|
| 5T6|2016|    8|   12|
| 5T6|2016|    9|   11|
| 5T6|2016|   10|   16|
| 5T6|2016|   11|   14|
| 5T6|2016|   12|   13|
| 74S|2016|    5|    1|
+----+----+-----+-----+
only showing top 20 rows



## Step 3: Define the Data Model

![alt text](ER-diagram.png "Title")

### Dimension Tables
* **port_location_df** - global I94 port location  
    *City, Country, Port*
    
* **dim_temp_df** - global average temperature  
    *City, Country, Year, Month, AverageTemperature*

* **dim_immigration_df** - I94 immigration only for arrivals  
    *Port, Year, Month, count*

### Fact Table
* **avg_temp_fact_df** - Monthly records of arrivals in the country/city with average temperature 
    *Port, Month, AverageTemperature, Count*
    
### Snowflake Schema based model:
With this project, I wanted to find the effect of temperature on the number of passengers arriving at a port. The data we had were:
1. Dim table #1 (port_location): The mapping of port to the country/city it is located in. It is created from i94port_valid.txt file where only valid i94port location details are extracted. 
2. Dim table #2 (dim_temp): The average monthly temperature of a country/city for a given month and year. It is extracted from global temperature data.
3. Dim table #3 (dim_immigration): The count of passengers arriving at (and departing from) a port in a given year/month. It is extracted from I94 immigration data.

These had some commonalities using which we could obtain the desired outcome. I chose to do it in two steps:
1. Create another dim table (avg_temp) by joining port_location (Dim table #1) and dim_temp (Dim table #2) based on the fields *city* and *country*. This would thus provide the average monthly temperature at a port for a given year and month.
2. Join the dim table from above with dim_immigration (Dim table #3) using port, year and month to get the desired outcome. 

To the reason behind choosing Snowkflake Schema:
* We needed to create an intermediate dim_table.
* Snowflake can avoid redundant data which makes it easier to maintain.
* Snowflake schema leads to reduced data redundancy and consumes lesser disk space.

## Step 4: Run Pipelines to Model the Data 

In [12]:
# Write immigration dimension table to parquet files partitioned by port
dim_immigration_df.write.mode("append").partitionBy("port").parquet("results/dim_immigration.parquet")

### 4.1: Create a dataframe containing: Year, Month, City, Country, Average Temperature

In [13]:
from pyspark.sql.functions import year, month, upper, col

# Create a dimension dataframe from the temperature dataframe

dim_temp_df = temp_df \
    .select(
        year("dt").alias('Year'), 
        month("dt").alias('Month'),
        upper(col("City")).alias('City'),
        upper(col("Country")).alias('Country'),
        "AverageTemperature"
    ) \
    .groupBy("City", "Country", "Year", "Month") \
    .avg("AverageTemperature") \
    .withColumnRenamed('avg(AverageTemperature)', 'AverageTemperature') \
    .orderBy("City", "Country", "Year", "Month")

dim_temp_df.printSchema()
dim_temp_df.show(n = 10)

root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- AverageTemperature: double (nullable = true)

+--------+-------+----+-----+------------------+
|    City|Country|Year|Month|AverageTemperature|
+--------+-------+----+-----+------------------+
|A CORUÑA|  SPAIN|1743|   11|10.779000000000002|
|A CORUÑA|  SPAIN|1744|    4|            13.325|
|A CORUÑA|  SPAIN|1744|    5|              12.9|
|A CORUÑA|  SPAIN|1744|    6|             16.41|
|A CORUÑA|  SPAIN|1744|    7|            17.992|
|A CORUÑA|  SPAIN|1744|    9|16.067999999999998|
|A CORUÑA|  SPAIN|1744|   10|12.904000000000002|
|A CORUÑA|  SPAIN|1744|   11|            11.028|
|A CORUÑA|  SPAIN|1744|   12|             8.798|
|A CORUÑA|  SPAIN|1745|    1| 7.651999999999999|
+--------+-------+----+-----+------------------+
only showing top 10 rows



In [14]:
# Port column is added in dim_tem_df on the basis of City and Country
avg_temp_df = port_location_df.join(dim_temp_df, ['City','Country'])

avg_temp_df.printSchema()
avg_temp_df.show(n = 10)

# Write average temperature dimension table to parquet files partitioned by port
avg_temp_df.write.mode("append").partitionBy("port").parquet("results/dim_temperature.parquet")

root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Port: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- AverageTemperature: double (nullable = true)

+-------+-------------+----+----+-----+------------------+
|   City|      Country|Port|Year|Month|AverageTemperature|
+-------+-------------+----+----+-----+------------------+
|BUFFALO|UNITED STATES| BUF|1743|   11|1.6879999999999995|
|BUFFALO|UNITED STATES| BUF|1744|    4| 7.699999999999998|
|BUFFALO|UNITED STATES| BUF|1744|    5|            13.613|
|BUFFALO|UNITED STATES| BUF|1744|    6|            19.437|
|BUFFALO|UNITED STATES| BUF|1744|    7|            20.682|
|BUFFALO|UNITED STATES| BUF|1744|    9|            13.662|
|BUFFALO|UNITED STATES| BUF|1744|   10| 7.449000000000002|
|BUFFALO|UNITED STATES| BUF|1744|   11|1.7279999999999998|
|BUFFALO|UNITED STATES| BUF|1744|   12|            -4.157|
|BUFFALO|UNITED STATES| BUF|1745|    1|           

### 4.2 Create a fact table from immigration and temprature dataframes: Port, Month, Average Temperature, Count

### Caveat
The Global temperatures data is available only upto 2013. But the immigration data is available only for year 2016. So we will not be able to perform a join on the two DFs.

### Approach
We will:
1. only take the temperatures for the year 2010 into consideration
2. Use the monthly immigration data for the year 2016
3. Create a fact table with cols: [Port, Month, AverageTemperature, Count]

In [15]:
from pyspark.sql.functions import year, month, upper, col

# only take the temperatures for the year 2010 into consideration

monthly_avg_temp_df = avg_temp_df \
    .filter(avg_temp_df.Year == 2010) \
    .select(
        "Port",
        "Month",
        "AverageTemperature"
    ) \
    .orderBy("Port", "Month")

monthly_avg_temp_df.show()

+----+-----+------------------+
|Port|Month|AverageTemperature|
+----+-----+------------------+
| ABQ|    1|            -0.581|
| ABQ|    2|             1.537|
| ABQ|    3| 5.497999999999998|
| ABQ|    4|            11.025|
| ABQ|    5|            15.283|
| ABQ|    6|            22.448|
| ABQ|    7|             23.39|
| ABQ|    8|22.215999999999998|
| ABQ|    9|            20.075|
| ABQ|   10|             12.93|
| ABQ|   11|4.1770000000000005|
| ABQ|   12|             3.202|
| AKR|    1|              -3.7|
| AKR|    2|            -3.411|
| AKR|    3| 5.372000000000001|
| AKR|    4|            12.232|
| AKR|    5|17.128999999999998|
| AKR|    6|            21.936|
| AKR|    7|24.203000000000003|
| AKR|    8|            23.502|
+----+-----+------------------+
only showing top 20 rows



### Use the monthly immigration data for the year 2016
The monthly immigration data has data for 2016 only. So no additional work is needed

### Create a fact table with cols: [Port, Month, AverageTemperature, Count]

In [16]:
from pyspark.sql.functions import col

fact_df = dim_immigration_df.join(monthly_avg_temp_df, ['Port', 'Month'])

avg_temp_fact_df = fact_df.select(
        "Port",
        "Month",
        "AverageTemperature",
        col("count").alias("Count")
    ) \
    .orderBy(["Port", "Month"])

avg_temp_fact_df.show()

# Write fact table to parquet files partitioned by port
avg_temp_fact_df.write.mode("append").partitionBy("port").parquet("results/avg_temp_fact.parquet")

+----+-----+-------------------+-----+
|Port|Month| AverageTemperature|Count|
+----+-----+-------------------+-----+
| ABQ|    1|             -0.581|   16|
| ABQ|    2|              1.537|    3|
| ABQ|    3|  5.497999999999998|    1|
| ABQ|    4|             11.025|    3|
| ABQ|    6|             22.448|    1|
| ABQ|    7|              23.39|    9|
| ABQ|    8| 22.215999999999998|    8|
| ABQ|    9|             20.075|    1|
| ABQ|   10|              12.93|    7|
| ABQ|   11| 4.1770000000000005|    5|
| ABQ|   12|              3.202|    7|
| ANC|    1|            -13.011|  109|
| ANC|    2|             -7.597|  124|
| ANC|    3|              -7.39|   96|
| ANC|    4|-1.2220000000000002|   91|
| ANC|    5| 6.3629999999999995| 1897|
| ANC|    6|              9.654| 4341|
| ANC|    7|             10.757| 6555|
| ANC|    8|             10.384| 5736|
| ANC|    9|  6.252000000000001| 2805|
+----+-----+-------------------+-----+
only showing top 20 rows



### 4.2: Data Quality Checks

In [17]:
def quality_check(df, df_name):
    '''
    df: dataframe
    
    df_name: dataframe name (String)
    
    '''
    
    output = df.count()
    if output == 0:
        print("No records found in {}!! Data uality check failed. UGH!!!".format(df_name))
    else:
        print("Number of records for {} = {} Data quality check passed. Hurray!!".format(df_name, output))

# Check for data quality
quality_check(dim_temp_df, "average temperature table")
quality_check(dim_immigration_df, "immigration table")
quality_check(avg_temp_fact_df, "fact table")

Number of records for average temperature table = 8146263 Data quality check passed. Hurray!!
Number of records for immigration table = 3649 Data quality check passed. Hurray!!
Number of records for fact table = 939 Data quality check passed. Hurray!!


### 4.3 Data dictionary 

Temperature Dimension dataframe - data dictionary

* `City` = city name
* `Country` = country name
* `Year` = 4 digit year
* `Month` = numeric month
* `port` = 3 character code of destination city
* `AverageTemperature` = average temperature

I94 immigration dataframe - data dictionary
* `Year` = 4 digit year
* `Month` = numeric month
* `Port` = 3 character code of destination city
* `count` = number of arrivals in the i94 port

Fact table - data dictionay
* `Port` = 3 character code of destination city
* `Month` = numeric month
* `AverageTemperature` = average temperature of destination city
* `count` = number of arrivals in the i94 port

## Step 5: Complete Project Write Up

**Clearly state the rationale for the choice of tools and technologies for the project.**
   * Spark was chosen for the project as it can process in multiple threads and store data in-memory which speed up data processing. Also, it can handle any kind of data format like binary data (SAS), csv, json and more.
    
**Propose how often the data should be updated and why.**
   * The temperature data should be updated daily and immigration data monthly. Currently they are updated in that often.
    
**Write a description of how you would approach the problem differently under the following scenarios:**
 * **The data was increased by 100x.**
     * Spark can still handle the volume but we need to increase the number of nodes in our cluster. Also, local cluster manager needs to be moved to yarn.
 
 * **The data populates a dashboard that must be updated on a daily basis by 7am every day.**
     * I will schedule and ran the ETL pipeline in Apache Airflow.
     
 * **The database needed to be accessed by 100+ people.**
     * I will use Amazon Redshift to access 100+ people as it is fast in performance, auto-scalable, and reliable.