# ETL Pipeline - Temperature and US Immigration Data
### Data Engineering Capstone Project

#### Project Summary
This project consists of building an ETL pipeline that uses I94 immigration and temperature data to create a database optimized for analyzing immigration events. And the fact table will be used to answer whether the temperature of cities is decisive for the choice of destination by immigrants.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 67 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 60.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=5b22cd96ed3cac794c6414a176934eead8c7d4d678bf78ee344adff3b4794a40
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [2]:
import re
from collections import defaultdict
from datetime import datetime, timedelta

import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as F
import pyspark.sql.types as t

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
enableHiveSupport().getOrCreate()
spark

### Step 1: Scope the Project and Gather Data

#### Scope 
For this project we created two dimension tables and a fact table. One dimension table consists of I94 immigration data aggregated by destination city, the other dimension table is temperature data aggregated by city. Joining the two tables by city results in the fact table. The last step is the creation of a database to consult immigration events and check whether the temperature has an influence on the choice of destination by immigrants.

#### Describe and Gather Data
- I94 Immigration Data: provided in SAS7BDAT format, comes from the [US National Tourism and Trade Office website](https://travel.trade.gov/research/reports/i94/historical/2016.html).
- Daily Temperature of Major Cities: This dataset came from [Kaggle](https://www.kaggle.com/sudalairajkumar/daily-temperature-of-major-cities) in csv format.

*Immigration data - Key Notes:*

- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination USA city
- arrdate = arrival date in the USA
- i94mode = 1 digit travel code
- depdate = departure date from the USA
- i94visa = reason for immigration

*Temperature data - Key Notes:*

- Region =  continent name
- Country = country name
- State = state name
- City = city name
- Month = number of the month
- Day = number of the day
- Year = year
- AvgTemperature = average temperature

#### Immigration Data

In [None]:
df_immigration = pd.read_sas('/content/drive/MyDrive/Data-Engineering-Nanodegree/Capstone_Project/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat', 'sas7bdat', encoding='ISO-8859-1')

In [None]:
df_immigration.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


#### Temperature Data

In [5]:
df_temperature = pd.read_csv('/content/drive/MyDrive/Datasets/city_temperature.csv')

  interactivity=interactivity, compiler=compiler, result=result)


In [12]:
df_temperature.head()

Unnamed: 0,Region,Country,State,City,Month,Day,Year,AvgTemperature
0,Africa,Algeria,,Algiers,1,1,1995,64.2
1,Africa,Algeria,,Algiers,1,2,1995,49.4
2,Africa,Algeria,,Algiers,1,3,1995,48.8
3,Africa,Algeria,,Algiers,1,4,1995,46.4
4,Africa,Algeria,,Algiers,1,5,1995,47.9


### Step 2: Explore and Assess the Data
#### Explore the Data 
*Immigration data* - filter data points that have valid i94port, rename columns with understandable names, extract number of the day from arrival date and convert dates to date format.

*Temperature Data* - drop data points where AvgTemperature is null, filter only US cities, add i94port in each entry and drop rows where i94port is null.

In [4]:
with open('I94_SAS_Labels_Descriptions.SAS') as f:
    lines=f.readlines()
    df_i94port = pd.DataFrame(lines[303:962])

i94ports = {}
for index, row in df_i94port.iterrows():
    i94ports[df_i94port[0][index].split("'")[1]] = [df_i94port[0][index].split("'")[3].split(',')[0]]

In [5]:
def cleaner_immigration_data(spark, path, i94port):
    '''    
    Input:
        spark = Spark Session
        path = I94 immigration file path
        i94port = list of i94 ports
    
    Output:
        spark_df = Spark DataFrame
    '''    
    spark_df = spark.read.parquet(path)
    spark_df = spark_df.filter(spark_df.i94port.isin(list(i94port.keys())))

    spark_df = spark_df.withColumn('year', col('i94yr').cast('integer')).drop('i94yr') \
                       .withColumn('month', col('i94mon').cast('integer')).drop('i94mon') \
                       .withColumn('visa', col('i94visa').cast('integer')).drop('i94visa') \
                       .withColumn('mode', col('i94mode').cast('integer')).drop('i94mode') \
                       .withColumn('origin_country', col('i94res').cast('integer')).drop('i94res') \
                       .withColumn('origin_city', col('i94cit').cast('integer')).drop('i94cit') \
                       .withColumn('age', col('i94bir').cast('integer')).drop('i94bir') \
                       .withColumn('arrival_date', col('arrdate').cast('integer')).drop('arrdate') \
                       .withColumn('departure_date', col('depdate').cast('integer')).drop('depdate')
    
    spark_df = spark_df.filter(spark_df.departure_date.isNotNull())

    def extract_day(days):
        date_format = datetime.strptime('1960-01-01', "%Y-%m-%d")+timedelta(days)
        return date_format.day

    day_udf = udf(extract_day, t.StringType())

    spark_df = spark_df.withColumn('arrival_day', day_udf('arrival_date')) \
                       .withColumn('arrival_day', col('arrival_day').cast('integer'))

    def convert_date(days):
        date_format = datetime.strptime('1960-01-01', "%Y-%m-%d")+timedelta(days)
        return date_format.strftime('%Y-%m-%d')

    date_udf = udf(convert_date, t.StringType())

    spark_df = spark_df.withColumn('arrival_date', date_udf('arrival_date')) \
                        .withColumn('departure_date', date_udf('departure_date'))
    
    return spark_df.select(col('i94port'), col('year'),
                           col('month'), col('arrival_day'), col('origin_country'),
                           col('origin_city'), col('arrival_date'),
                           col('departure_date'), col('visa'), col('mode'),
                           col('age'), col('gender'))

In [6]:
df_imm = cleaner_immigration_data(spark, '/content/drive/MyDrive/Data-Engineering-Nanodegree/Capstone_Project/sas_data', i94ports)
df_imm.show()

+-------+----+-----+-----------+--------------+-----------+------------+--------------+----+----+---+------+
|i94port|year|month|arrival_day|origin_country|origin_city|arrival_date|departure_date|visa|mode|age|gender|
+-------+----+-----+-----------+--------------+-----------+------------+--------------+----+----+---+------+
|    LOS|2016|    4|         30|           438|        245|  2016-04-30|    2016-05-08|   1|   1| 40|     F|
|    LOS|2016|    4|         30|           438|        245|  2016-04-30|    2016-05-17|   1|   1| 32|     F|
|    LOS|2016|    4|         30|           438|        245|  2016-04-30|    2016-05-08|   1|   1| 29|     M|
|    LOS|2016|    4|         30|           438|        245|  2016-04-30|    2016-05-14|   1|   1| 29|     F|
|    LOS|2016|    4|         30|           438|        245|  2016-04-30|    2016-05-14|   1|   1| 28|     M|
|    HHW|2016|    4|         30|           464|        245|  2016-04-30|    2016-05-05|   2|   1| 57|     M|
|    HHW|2016|    4

In [12]:
@udf()
def get_i94_port(city):
    '''
    Input: City name
    Output: City i94port code
    
    '''
    for key in i94ports:
        if city.lower() == i94ports[key][0].lower():
            return key


def cleaner_temperature_data(spark, path):
    '''    
    Input:
        spark = Spark Session
        path = temperature file path
    
    Output:
        spark_df = Spark DataFrame
    '''    
    spark_df = spark.read.format('csv').option('header', 'true').load(path)
    spark_df = spark_df.filter(spark_df.AvgTemperature.isNotNull())
    spark_df = spark_df.filter(spark_df.Country == 'US')
    spark_df = spark_df.withColumn('i94port', get_i94_port(spark_df.City)) \
                       .withColumnRenamed('AvgTemperature', 'temperature') \
                       .withColumnRenamed('City', 'city') \
                       .withColumnRenamed('Country', 'country') \
                       .withColumnRenamed('Year', 'year') \
                       .withColumnRenamed('Month', 'month') \
                       .withColumnRenamed('Day', 'day')
    
    spark_df = spark_df.filter(spark_df.i94port.isNotNull())

    return spark_df.select(col('i94port'), col('temperature'), col('city'),
                           col('country'), col('year'),
                           col('month'), col('day'))

In [13]:
df_temp = cleaner_temperature_data(spark, '/content/drive/MyDrive/Datasets/city_temperature.csv')
df_temp.show()

+-------+-----------+----------+-------+----+-----+---+
|i94port|temperature|      city|country|year|month|day|
+-------+-----------+----------+-------+----+-----+---+
|    BHX|       50.7|Birmingham|     US|1995|    1|  1|
|    BHX|       37.2|Birmingham|     US|1995|    1|  2|
|    BHX|       33.2|Birmingham|     US|1995|    1|  3|
|    BHX|       33.3|Birmingham|     US|1995|    1|  4|
|    BHX|       26.4|Birmingham|     US|1995|    1|  5|
|    BHX|       41.5|Birmingham|     US|1995|    1|  6|
|    BHX|       45.0|Birmingham|     US|1995|    1|  7|
|    BHX|       36.2|Birmingham|     US|1995|    1|  8|
|    BHX|       46.9|Birmingham|     US|1995|    1|  9|
|    BHX|       46.4|Birmingham|     US|1995|    1| 10|
|    BHX|       58.8|Birmingham|     US|1995|    1| 11|
|    BHX|       64.4|Birmingham|     US|1995|    1| 12|
|    BHX|       64.4|Birmingham|     US|1995|    1| 13|
|    BHX|       59.7|Birmingham|     US|1995|    1| 14|
|    BHX|       48.1|Birmingham|     US|1995|   

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The purpose of this database is to aggregate immigration and temperature events using star schema, where the data is modeled in two dimensional tables linked to a fact table. Dimensional tables contain the characteristics of an event. The fact table stores the occurred facts and keys (i94port) for the corresponding characteristics in the dimensional tables.

Fact Table - I94 immigration data joined with the city temperature data on i94port:
- i94port = 3 character code of destination USA city
- year = 4 digit year (i94yr)
- month = numeric month (i94mon)
- arrival_day = arrival day in the USA
- departure_date = departure date from the USA (depdate)
- visa = visa type (reason for immigration)
- mode = transport mode
- city = city name
- temperature = average temperature of destination city

Dimension Table - I94 immigration data Events:
- i94port = 3 character code of destination USA city
- year = 4 digit year (i94yr)
- month = numeric month (i94mon)
- arrival_day = arrival day in the USA
- origin_country = 3 digit code of origin country (i94res)
- origin_city = 3 digit code of origin city (i94cit)
- arrival_date = arrival date in the USA (arrdate)
- departure_date = departure date from the USA (depdate)
- mode = transport mode (i94mode)
- visa = reason for immigration (i94visa)
- age = immigrant age (i94bir)
- gender = immigrant gender (i94visa)

Dimension Table - Temperature data:
- i94port = 3 character code of destination city
- temperature = average temperature
- city = city name
- country = country name
- year = year
- month = month
- day = day

#### 3.2 Mapping Out Data Pipelines
Pipeline Steps:

- Clean immigration data to create Spark dataframe **df_imm**.
- Clean temperature data to create Spark dataframe **df_temp**.
- Create immigration dimension table by selecting columns from **df_imm** and write to parquet file partitioned by i94port.
- Create temperature dimension table by selecting columns from **df_temp** and write to parquet file partitioned by i94port.
- Create fact table by joining immigration and temperature dimension tables on i94port and write to parquet file partitioned by i94port where the arrival date (day) equals the temperature date.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [9]:
class DataPipeline:
    '''
    Pipeline to extract data from Spark dataframe and write to the database
    Inputs: 
        spark = Spark Session
        spark_df = spark dataframe from which the data is extracted
        select_columns = name of columns that should be selected from spark_df
        write_path = path where the data will be written
        fact_table = fact table for quality check
    '''   
    def __init__(self, spark):
        self.spark = spark
    
    def select(self, spark_df, select_columns):
        self.spark_df = spark_df
        self.columns = select_columns

        self.table = self.spark_df.select(self.columns)
        
        return self.table
        
    def write(self, write_path):
        self.write_path = write_path
        self.table.write.mode('append').partitionBy('i94port').parquet(f'{self.write_path}.parquet')

    def quality_check(self, fact_table):
        if self.spark_df.count() == 0:
            print('Warning! The dataframe has zero records.')
        else:
            print('The dataframe has records.')
        
        check_table = spark.read.parquet(f'{self.write_path}.parquet')

        check_integrity = fact_table.select(col('i94port')).distinct() \
                                       .join(check_table, fact_table['i94port'] == check_table['i94port'], 'left_anti') \
                                       .count() == 0
        if check_integrity:
            print('The model has no integrity restrictions.')
        else:
            print('Warning! The model has integrity restrictions.')

##### Step 1

In [10]:
immigration_columns = ['i94port', 'year', 'month', 'arrival_day', 'origin_country',
                       'origin_city', 'arrival_date', 'departure_date',
                       'visa', 'mode', 'age', 'gender']
immigration_table = DataPipeline(spark)
immigration_table.select(df_imm, immigration_columns)
immigration_table.write('immigration')

##### Step 2

In [14]:
temperature_columns = ['i94port', 'temperature', 'city', 'country', 'year', 'month', 'day']
temperature_table = DataPipeline(spark)
temperature_table.select(df_temp, temperature_columns)
temperature_table.write('temperature')

##### Step 3

In [15]:
df_imm.createOrReplaceTempView('imm_view')
df_temp.createOrReplaceTempView('temp_view')

In [18]:
fact = spark.sql('''
SELECT imm_view.i94port as i94port,
       imm_view.year as year,
       imm_view.month as month,
       imm_view.arrival_day as arrival_day,
       imm_view.departure_date as departure_date,
       imm_view.visa as visa,
       imm_view.mode as mode,
       temp_view.city as city,
       temp_view.temperature as temperature
FROM imm_view
JOIN temp_view ON (imm_view.i94port = temp_view.i94port)
WHERE imm_view.year = temp_view.year AND imm_view.month = temp_view.month AND imm_view.arrival_day = temp_view.day
''')

In [19]:
fact.write.mode('append').partitionBy('i94port').parquet('fact.parquet')

#### 4.2 Data Quality Checks

Run Quality Checks

In [20]:
immigration_table.quality_check(fact)

The dataframe has records.
The model has no integrity restrictions.


In [21]:
temperature_table.quality_check(fact)

The dataframe has records.
The model has no integrity restrictions.


#### 4.3 Data dictionary 
* *Fact Table - I94 immigration data joined with the city temperature data (i94port):*

| **field**          | **type** | **description**                         |
|--------------------|----------|-----------------------------------------|
| i94port            | string   | i94 code (city code)                    |
| year               | int      | year                                    |
| month              | int      | month                                   |
| arrival_day       | date     |  arrival day in the USA                 |
| departure_date     | date     | departure date from the USA             |
| visa            | int      | visa code (1=Business, 2=Pleasure, 3=Strudent)      |
| mode            | int      | mode code (1=Air, 2=Sea, 3=Land, 9=Uninformed)      |
| city               | string   | destination city                               |
| temperature | numeric  | average temperature of destination city |


* *Dimension Table 1 - I94 immigration data Events:*

| **field** | **type** | **description**                    |
|-----------|----------|------------------------------------|
| i94port   | string   | i94 code (city code)               |
| year     | int      | year                               |
| month    | int      | month                              |
| origin_country    | int      | country code                       |
| origin_city    | int      | city code                       |
| arrdate   | date     | arrival date in the USA            |
| depdate   | date     | departure date from the USA        |
| mode            | int      | mode code (1=Air, 2=Sea, 3=Land, 9=Uninformed)      |
| visa            | int      | visa code (1=Business, 2=Pleasure, 3=Strudent)      |
| age   | int     | immigrant age        |
| gender   | string     | immigrant gender        |

* *Dimension Table 2 - Temperature data:*

| **field**           | **type** | **description**                         |
|---------------------|----------|-----------------------------------------|
| i94port             | string   | i94 code (city code)                    |
| temperature  | numeric  | average temperature of destination city |
| city                | string   | destination city                               |
| country             | string   | destination country                           |
| year            | int   | year                    |
| month           | int   | number of the month                   |
| day           | int   | number of the day                   |

#### Step 5: Complete Project Write Up
**Choice of tools**

Apache Spark was used in the project due to its ability to work with large amounts of data and with different file formats, in addition to having spark.sql library has many tools for transforming data, such as performing joins and creating tables.

**Data updates**

Due to the nature of the data being monthly, the update is ideally carried out monthly.

**Adapting the project to different scenarios**

*1. Increase in data volume by 100x*

In this scenario Spark remains the tool to be used. To run this pipeline for a 100x dataset a real spark cluster must be used and distribute the calculation to several nodes.

*2. The data populates a dashboard that must be updated on a daily basis by 7am every day*

For this case the ideal tool is Apache Airflow, with it you can reliably hijack and run ETL pipelines and report any problems along the way.

*3. The database needs to be accessed by 100+ people*

One option would be to put the data into S3 and create a pipeline with Airflow or AWS Step Functions to move the data from S3 to a scalable DataWarehouse on Amazon Redshift. If the database only receives queries and does not receive inserts or updates, the data can be periodically copied to a NoSQL database like Cassandra.