# Temperature and US Immigration
### Data Engineering Capstone Project

#### Project Summary

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 

For this project, we will have to aggregate the I94 immigration data by destination city to form our first-dimension table. We will then aggregate city temperature data by city to create the second-dimension table. Both tables will be joined on destination city to form the fact table. We will be using Spark to process the data.

#### Describe and Gather Data 

The I94 immigration data comes from the US National Tourism and Trade Office website. It is provided in SAS7BDAT.

**Key Notes:**
- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination USA city
- arrdate = arrival date in the USA
- i94mode = 1 digit travel code
- depdate = departure date from the USA
- i94visa = reason for immigration

The temperature data set [Kaggle](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). Csv format.

**Notes**
- AverageTemperature = average temperature
- City = city name
- Country = country name
- Latitude= latitude
- Longitude = longitude

#### Set libraries and create spark session

In [1]:
# Installed all dependencies
import re
import pandas as pd
import psycopg2
from collections import defaultdict
from datetime import datetime, timedelta
from pyspark.sql.functions import udf

In [2]:
# create una sesion de spark
from pyspark.sql import SparkSession
spark = SparkSession.builder\
            .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
            .enableHiveSupport().getOrCreate()

#### Immigration Data

In [3]:
# Take and read immigration data for April 16
#file_name_immigration = 'data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
file_name_immigration = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immigration = pd.read_sas(file_name_immigration, 'sas7bdat', encoding="ISO-8859-1")

# Show immigration data
df_immigration.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


#### Temperature data

In [4]:
# Take and read global temperature data
#file_name_temperature = 'data2/GlobalLandTemperaturesByCity.csv'
file_name_temperature = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(file_name_temperature, sep=',')

# Displays temperature data
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### Step 2: Explore and Assess the Data

#### Explore the Data 

<b>I94 immigration data</b> - drop all data points with the destination city code i94port is not a valid value like (XXX, 99, NaN, etc). 

<b>Temperature Data</b> - drop all data points where AverageTemperature is NaN, duplicate locations, and add the i94port of the location in each entry.

In [5]:
# Take valid port values by using regular expressions
reg_exp_port = re.compile(r'\'(.*)\'.*\'(.*)\'')
port_valid = {}
with open('I94port_valid.txt') as f:
    for line in f:
        match = reg_exp_port.search(line)
        port_valid[match[1]]=[match[2]]

In [6]:
# Take immigration data for valid ports
def clean_immigration_data(file):
    '''
    This function reads immigration data using the sas format with spark.
    Then, filter and match the immigration data.
    
    Input:
    * file the immigration file in sas format
    
    Output:
    * df_immigration the filtered dataframe with the ports valid data
    '''
    
    # read the file in sas format
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)
    
    # filter and match the immigration data
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(port_valid.keys())))
    
    return df_immigration

In [7]:
@udf()
def get_key_port(city):
    '''
    This function compares a city belonging to the list of valid ports.
    
    Input:
    * city the city name
    
    Output:
    * key the corresponding key port
    '''
    
    # Evaluate the city port by port
    for key in port_valid:
        if city.lower() in port_valid[key][0].lower():
            return key

In [8]:
# load temperature data with csv format
#df_temperature_cleaned = spark.read.format("csv").option("header", "true").load("data2/GlobalLandTemperaturesByCity.csv")
df_temperature_cleaned = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

# Exclude average temperatures with 'Nan' value
df_temperature_cleaned = df_temperature_cleaned.filter(df_temperature_cleaned.AverageTemperature != 'NaN')

# Exclude city and country values with duplicates
df_temperature_cleaned = df_temperature_cleaned.dropDuplicates(['City', 'Country'])

# Create i94port column 
df_temperature_cleaned = df_temperature_cleaned.withColumn("i94port", get_key_port(df_temperature_cleaned.City))

# Exclude ports with null value
df_temperature_cleaned = df_temperature_cleaned.filter(df_temperature_cleaned.i94port != 'null')

# Show a part of the data
df_temperature_cleaned.show()

+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|     City|             Country|Latitude|Longitude|i94port|
+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|1852-07-01|             15.488|                        1.395|    Perth|           Australia|  31.35S|  114.97E|    PER|
|1828-01-01|             -1.977|                        2.551|  Seattle|       United States|  47.42N|  121.97W|    SEA|
|1743-11-01|              2.767|                        1.905| Hamilton|              Canada|  42.59N|   80.73W|    HAM|
|1849-01-01|  7.399999999999999|                        2.699|  Ontario|       United States|  34.56N|  116.76W|    ONT|
|1821-11-01|              2.322|                        2.375|  Spokane|       United States|  47.42N|  117.24W|    SPO|
|1843-01-01| 18.874000000000002|

### Step 3: Define the Data Model

#### Data Model Justification

For the present project (I94 immigrants from the USA) the star scheme will be used.
This allows you to use fact and dimension tables to answer business questions.
This scheme is quite common for ETL processes, however it is useful and clean to have critical information available in the fact tables, allowing flexibility in queries.
This is an optimized scheme to answer questions like:
What are the airports with the most immigration to the US on 94?

#### 3.1 Conceptual Data Model

**Fact Table** - This will contain information from the I94 immigration data joined with the city temperature data on i94port

Columns:
- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination city
- arrdate = arrival date
- i94mode = 1 digit travel code
- depdate = departure date
- i94visa = reason for immigration
- AverageTemperature = average temperature of destination city

**1st Dimension Table** - This will contain events from the I94 immigration data.

Columns:
- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination city
- arrdate = arrival date
- i94mode = 1 digit travel code
- depdate = departure date
- i94visa = reason for immigration

**2nd Dimension Table** - This will contain city temperature data.

Columns:
- i94port = 3 character code of destination city (mapped from cleaned up immigration data)
- AverageTemperature = average temperature
- City = city name
- Country = country name
- Latitude= latitude
- Longitude = longitude

#### 3.2 Mapping Out Data Pipelines

Pipeline Steps:
1. Clean I94 data as described in step 2 to create Spark dataframe df_immigration for each month.
2. Clean temperature data as described in step 2 to create Spark dataframe df_temperature_cleaned (this was already performed).
3. Create immigration dimension table by selecting relevant columns from df_immigration and write to parquet file partitioned by i94port.
4. Create temperature dimension table by selecting relevant columns from df_temperature_cleaned and write to parquet file partitioned by i94port.
5. Create fact table by joining immigration and temperature dimension tables on i94port and write to parquet file partitioned by i94port.

### Step 4: Run Pipelines to Model the Data 

#### 4.1 Create the data model


##### Step 1:

In [9]:
# takes immigration data from April 16 in sas format
#filename_immigration = 'data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
filename_immigration = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# filter data from valid ports
df_immigration_cleaned = clean_immigration_data(filename_immigration)

# select the columns: i94yr, i94mon, i94cit, i94port, arrdate, i94mode, depdate, i94visa
df_immigration = df_immigration_cleaned.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

# write results (partitioned by i94port) in parquet format
# complete path: /results/immigration.parquet
df_immigration.write.mode("append").partitionBy("i94port").parquet("results/immigration.parquet")

##### Step 2:

In [10]:
# select the columns: AverageTemperature, City, Country, Latitude, Longitude, i94port
df_temperature = df_temperature_cleaned.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

# write temperature_table results (partitioned by i94port) in parquet format
# complete path: /results/temperature.parquet
df_temperature.write.mode("append").partitionBy("i94port").parquet("results/temperature.parquet")

##### Step 3:

In [11]:
# create temporary tables to work with sparksql
df_immigration.createOrReplaceTempView("df_immigration")
df_temperature_cleaned.createOrReplaceTempView("df_temperature")

# crea tabla de hechos con spark sql
# select the columns:   year, month, city, i94port, arrival_date, \
#                       departure_date, reason, temperature, latitude, longitude
fact_table = spark.sql('''
    SELECT  dfi.i94yr as year
            ,dfi.i94mon as month
            ,dfi.i94cit as city
            ,dfi.i94port as i94port
            ,dfi.arrdate as arrival_date
            ,dfi.depdate as departure_date
            ,dfi.i94visa as reason
            ,dft.AverageTemperature as temperature
            ,dft.Latitude as latitude
            ,dft.Longitude as longitude
    FROM    df_immigration dfi
            JOIN df_temperature dft
                ON dfi.i94port = dft.i94port
''')

# write fact_table results (partitioned by i94port) in parquet format
# complete path: /results/fact.parquet
fact_table.write.mode("append").partitionBy("i94port").parquet("results/fact.parquet")

#### 4.2 Data Quality Checks

Run Quality Checks

In [12]:
def quality_check(df, description):
    '''
    This function is aimed at data quality.
    Counts the number of rows in the dataframe.
    
    Input: 
    * df the dataframe
    * description the dataframe descripcion
    '''
    
    # check if the dataframe has data or not
    result = df.count()
    if result == 0:
        print(f"Data quality check failed for {description} with zero records")
    else:
        print(f"Data quality check passed for {description} with {result} records")

In [13]:
# verifies the data quality for the tables of: immigration and temperature
quality_check(df_immigration, "immigration table")
quality_check(df_temperature, "temperature table")

Data quality check passed for immigration table with 3088544 records
Data quality check passed for temperature table with 207 records


#### 4.3 Data dictionary

**Fact Table** - This will contain information from the I94 immigration data joined with the city temperature data on i94port

Columns:
- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination city
- arrdate = arrival date
- i94mode = 1 digit travel code
- depdate = departure date
- i94visa = reason for immigration
- AverageTemperature = average temperature of destination city

**1st Dimension Table** - This will contain events from the I94 immigration data.

Columns:
- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination city
- arrdate = arrival date
- i94mode = 1 digit travel code
- depdate = departure date
- i94visa = reason for immigration


**2nd Dimension Table** - This will contain city temperature data.

Columns:
- i94port = 3 character code of destination city (mapped from cleaned up immigration data)
- AverageTemperature = average temperature
- City = city name
- Country = country name
- Latitude= latitude
- Longitude = longitude

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project. <br>
Spark is being used due to the different format that allows to handle (SAS, CSV, etc).
* Propose how often the data should be updated and why. <br>
Since the format of the raw files are monthly, we should continue pulling the data monthly.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x. <br>
 Amazon RedShift could be used. RedShipT is an analyzed database optimized for large workloads.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day. <br>
 Apache Airflow could be used. It would be scheduled to run the dag every day. If you fail to send an email.
 * The database needed to be accessed by 100+ people. <br>
 Amazon Redshift is useful for this task as it has good read performance.