# Immigrant and Temperature Facts Exploration - An ETL Pipeline
### Data Engineering Capstone Project

#### Project Summary
The main taget of the project is to create a data pipeline built up using the immigrants data and the temperature data. We need to create a ELT database which is well optimized for running queries and performing other analytical operation to gather the facts/insights from this data.Using this database we can analyze the temperature behaviour of the different locations of the immigrant's destination.
The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import re
from collections import defaultdict
from datetime import datetime, timedelta
from pyspark.sql.functions import udf,col
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

### Step 1: Scope the Project and Gather Data

#### **Scope**

<font size="3">
    For this Capstone project, I am planning to take the Immigration data which consists of the immigrants personal details e.g.  when they arrived, what was the source city, and where what's their destination city in USA. I am planning to take the temperature dataset from the kaggle and combining it with this immigration dataset. We can aggregate this data on the basis of the <strong><i>destination city</i></strong> in USA. We can analyze the temperature behaviour on the basis of that particular city and how it affects the immigrant's destination city.To deal with this large amount of data we will be using spark as our data analytics tool for processing.
</font>

#### Describe and Gather Data
<br/>
<div>
<strong> Dataset Name : Immigration Data</strong>
<br>
<font size="3">
    This data comes from the <a href="https://travel.trade.gov/research/reports/i94/historical/2016.html">US National Tourism and Trade Office</a>. This data contains the immigrant details which have been immigrated to USA. The description of the dataset is provided below:<br />
        <em>Dataset Format</em> : "SAS7BDAT" (A binary format)

<br />
    <table>
        <thead>
            <td>Field Name</td>
            <td>Field Description</td>
        </thead>
        <tbody>
            <tr>
                <td>i94yr</td>
                <td> Year (Numeric)</td>
            </tr>
            <tr>
                <td>i94mon</td>
                <td>Month (Numeric)</td>
            </tr><tr>
                <td>i94city</td>
                <td>A 3 digit code for origin city(Numeric)</td>
            </tr>
            <tr>
                <td>i94res</td>
                <td>A 3 digit code(Numeric)</td>
            </tr>
            <tr>
                <td>i94port</td>
                <td>Destination City Code (3 Character)</td>
            </tr>
            <tr>
                <td>arrdate</td>
                <td>Arrival date (in USA)</td>
            </tr>
            <tr>
                <td>i94mode</td>
                <td>Travel Code (Numeric)</td>
            </tr>
            <tr>
                <td>i94addr</td>
                <td>USA city code</td>
            </tr>
            <tr>
                <td>depdate</td>
                <td>The departure date</td>
            </tr>
            <tr>
                <td>i94visa</td>
                <td>Reason of Immigration (Numeric)</td>
            </tr>
        </tbody>
    </table>
    <br />
</font>
</div>

<div>
<strong> Dataset Name : Temperature Data</strong>
<br>
<font size="3">
    This data comes from the <a href="https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data">Kaggle</a>. This data contains the temperature details of a particular city. The description of the dataset is provided below:<br />
        <em>Dataset Format</em> : "CSV" (Comma Seperated values)

<br />
    <table>
        <thead>
            <td>Field Name</td>
            <td>Field Description</td>
        </thead>
        <tbody>
            <tr>
                <td>dt</td>
                <td> Date when temperature is measured</td>
            </tr>
            <tr>
                <td>AverageTemperature</td>
                <td>Average temperature of the city</td>
            </tr>
            <tr>
                <td>City</td>
                <td>Name of the city</td>
            </tr>
            <tr>
                <td>Country</td>
                <td>Country where city is located</td>
            </tr>
            <tr>
                <td>Latitude</td>
                <td>The latitude</td>
            </tr>
            <tr>
                <td>Longitude</td>
                <td>The Longitude</td>
            </tr>
        </tbody>
    </table>
    <br />
</font>
</div>

In [2]:
#Defining all the properties that are being used in the project
props = {
    'immDataPath':'../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',
    'tempDataPath':'../../data2/GlobalLandTemperaturesByCity.csv',
    'immDestCodeMappingPath':'imm_dest_mapping.csv',
    'outputBasePath':'/output'
}

In [None]:
# Reading the immigration data from the path
immigrationDf = pd.read_sas(props['immDataPath'], 'sas7bdat', encoding="ISO-8859-1")

KeyboardInterrupt: 

In [None]:
immigrationDf.head(10)

In [None]:
# Reading the Kaggle's temperature data
tempDataPathDf = pd.read_csv(props['tempDataPath'], sep=',')

In [None]:
tempDataPathDf.head(10)

In [None]:
	
#Creating a spark session and loading the immigrant data
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load(props['immDataPath'])

### Step 2: Explore and Assess the Data
#### Explore the Data 

##### Cleaning Steps

We have extracted the destination mapping data from the "I94_SAS_Labels_Description.SAS". We are converting it to dictionary to generate key value mapping for each and every destination code

##### Processing the Destination city codes for mapping

In [None]:
# Performing cleaning tasks here
destination_df = pd.read_csv(props['immDestCodeMappingPath'],sep='=',names=['dest_code','dest'])
destination_df.head()

In [None]:
destination_df['city'] = destination_df.apply(lambda x : x.dest.split(',')[0].strip(),axis=1)

In [None]:
dest_mapping = dict(zip(destination_df.dest_code,destination_df.city))

In [None]:
# A utility method to filter immigration data by destination codes
def filter_data_by_destcode(sparkSession, input_path,key_mapping):
    '''  
    Filters the i94 input dataframe containing rows with valid destination codes. 
    
    :param inputPath: The input path for which dataset needs to be filtered
    :param key_mapping: The key value mapping of the destination codes and destination name
    :return: The filtered dataframe
    '''    
    # Read the data with spark session
    df = sparkSession.read.format('com.github.saurfang.sas.spark').load(input_path)

    # Filter out entries where i94port is not present
    df = df.filter(df.i94port.isin(list(key_mapping.keys())))
    return df

##### Cleaning temperature data

In [None]:
# Creating a spark dataframe from the input temperature data
temp_df = spark.read.format("csv").option("header", "true").load(props['tempDataPath'])

In [None]:
#Filtering the null data and dropping the duplicates containing for columns 'City' and 'Country'
temp_df = temp_df.filter(temp_df.AverageTemperature != 'NaN').filter(temp_df.City != 'NaN')
temp_df = temp_df.dropDuplicates(['City', 'Country'])
temp_df.head()

In [None]:
#Retrieves the reverse mapping Destination Name -> Destination Code for Optimization
dest_code_mapping = dict([(value, key) for key, value in dest_mapping.items()])

In [None]:
# A spark udf
@udf()
def get_dest_code(city_name):
    '''  
    Retrieves the destination code from the city name 
    
    :param city_name: Name of the city which we need the actual value
    :param key_mapping: The key value mapping of the destination codes and destination name
    :return: The destination name
    ''' 
    for key in dest_code_mapping.keys():
        if (city_name.strip().lower() == key.strip().lower()):
            return dest_code_mapping[key]

In [None]:
#Creating a new column in the dataframe containing the destination code
temp_df = temp_df.withColumn("destCode", get_dest_code(temp_df.City))

In [None]:
# Remove data points where the destination code is null
temp_df = temp_df.filter(temp_df.destCode != 'null')

In [None]:
# Show results
temp_df.show(5)

**Immigrant Data Cleaning**

In [None]:
# Clean I94 immigration data and store as Spark dataframe
imm_df = filter_data_by_destcode(spark,props['immDataPath'],dest_mapping)

In [None]:
# A spark udf
@udf(StringType())
def get_visa_purpose(visa_code):
    '''  
    Retrieves the visa purpose from the visa_code. 
    
    :param visa_code: The input path for which dataset needs to be filtered
    :return: The visa purpose
    ''' 
    visa_code = int(visa_code)
    purpose = 'NaN'
    if(visa_code == 1):
        purpose = 'Business'
    elif (visa_code == 2):
        purpose = 'Pleasure'
    elif (visa_code ==3):
        purpose = 'Student'
    return purpose

In [None]:
# A spark udf
@udf(StringType())
def get_travel_mode(travel_code):
    '''  
    Retrieves the travel_mode_type from the travel_code. 
    
    :param travel_code: The travel_code for which travel_mode_type needs to be retrieved
    :return: The travel_mode_type
    ''' 
    travel_mode = 'Not reported'
    try:
        travel_code = int(travel_code)
        if(travel_code == 1):
            travel_mode = 'Air'
        elif (travel_code == 2):
            travel_mode = 'Sea'
        elif (travel_code ==3):
            travel_mode = 'Land'
    except:
        print(f"Invalid travel code {travel_code}")
        return travel_mode

In [None]:
imm_df = imm_df.withColumn('i94visa',get_visa_purpose(imm_df.i94visa))
imm_df = imm_df.withColumn('i94mode',get_travel_mode(imm_df.i94mode))
imm_df.show(5)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model


##### **The Fact Table** - This will contain information from the I94 immigration data joined with the city temperature data on i94port


Column Details:
- immTempId : The unique id for the fact table
- city_code : The destination city code for the immigrants
- city : The city name referring to the city code
- immigrant_arrival_date : Arrival date of immigrants on the destination city 
- month : The immigration month
- year: The immigration year
- immigrant_departure_date : Departure date of immigrants
- travel_mode : Mode of travel
- visa_purpose : The visa purpose
- visa_type : The visa type
- temperature : The average temperature of destination city.
- latitude : The latitude coordinates of destination city
- longitude : The longitude coordinates of destination city


##### Dimension Table 1 - Data reffering from I94 immigration data.

Column Details:
- immigrantId : The unique id for the immigrant
- i94port : 3 character city code
- i94cit : 3 digit origin city code
- arrdate : the arrival date
- i94mon : immigration month
- i94yr : immigration year
- depdate : departure date
- i94mode : mode of travel
- i94visa : visa purpose 
- visatype : visa type

##### Dimension Table 2 - Data containing city temperature information .
Column Details:
- tempId : The unique id for a particular city's temperature data
- destCode : The  destination code for particular city
- AverageTemperature : Average temperature of the city
- City : The city reffering to the destCode
- Country : Country where city is located
- Latitude : The latitude coordinates of city
- Longitude : The longitude coordinates of city

#### 3.2 Mapping Out Data Pipelines


#### We are following the below steps to pipeline the data to the data model:
- First we are creating the dataframes for both immigrant and temperature data using spark session and input data paths.
- Then we are cleaning the temperature data and filtering out the required events which are related to the immigrant's city.
- Then we are cleaning the immigrant data, and mapping the required values from their codes.
- Then we are creating the dimension tables with the required schema by selecting specific fields from dataframe and writing them in parquet format (ELT format) in the '/output' directory.
- Then we are creating our fact table by joining these both dimension table which creates the required insights.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### Creating data model for immigration table

In [None]:
#Adding a unique id to the immigrant dataframe
imm_df = imm_df.withColumn('immigrantId', F.monotonically_increasing_id())

In [None]:
# Extract columns for immigration dimension table
imm_select_cols = ['immigrantId','i94port','i94cit','arrdate','i94mon','i94yr','depdate','i94mode','i94visa','visatype']
imm_table = imm_df.select(imm_select_cols).distinct()

In [None]:
# Writing immigration dataframe (dim table) partitioned by i94port
imm_table.write \
        .partitionBy('i94port') \
        .format('parquet') \
        .mode("overwrite") \
        .save(props['outputBasePath'] + '/immigration')

In [None]:
#Creating data model for Temperature table

In [None]:
# Adding a unique id to the temperature dataframe
temp_df = temp_df.withColumn('tempId', F.monotonically_increasing_id())

In [None]:
# Extract columns for temperature dimension table
temp_select_cols = ['tempId','destCode','AverageTemperature','City','Country','Latitude','Longitude']
temp_table = temp_df.select(temp_select_cols).distinct()


In [None]:
# Write temperature dimension table to parquet files partitioned by destCode
temp_table.write \
        .partitionBy('destCode') \
        .format('parquet') \
        .mode("overwrite") \
        .save(props['outputBasePath'] + '/temperature')

##### Creating fact table using the dimension tables

In [None]:
# Joining tables by destination code
imm_temp_df = imm_df.join(temp_df, imm_df.i94port == temp_df.destCode)
imm_temp_df = imm_temp_df.withColumn('immTempId', F.monotonically_increasing_id())

In [None]:
imm_temp_df.show()

In [None]:
imm_temp_table = imm_temp_df.select(
    'immTempId',
    col('i94port').alias('city_code'),
    col('i94cit').alias('city'),
    col('arrdate').alias('immigrant_arrival_date'),
    col('i94mon').alias('month'),
    col('i94yr').alias('year'),
    col('depdate').alias('immigrant_departure_date'),
    col('i94mode').alias('travel_mode'),
    col('i94visa').alias('visa_purpose'),
    col('visatype').alias('visa_type'),
    col('AverageTemperature').alias('temperature'),
    col('Latitude').alias('latitude'),
    col('Longitude').alias('longitude')
).distinct()

In [None]:
imm_temp_table.write \
        .partitionBy('city_code') \
        .format('parquet') \
        .mode("overwrite") \
        .save(props['outputBasePath'] + '/city_temp_facts')

#### 4.2 Data Quality Checks

In [None]:
# A method to perform data quality checks
def perform_quality_check(df, table_name):
    '''
    Computes the quality check and logs the result for input dataframe.
    
    :param df: Input spark dataframe
    :param table_name: The table name reffering to the dataframe printing data quality check
    :return: None
    '''
    print(f"===========Data Quality Check started for {table_name} table==============")
    rowCount = df.count()
    
    if rowCount == 0:
        print(f"Data quality check failed for {table_name} with zero records")
    else:
        print(f"Data quality check passed for {table_name} with {rowCount} records")
    print("\n\n")

In [None]:
# Perform data quality checks on fact and dimension tables
perform_quality_check(imm_df, 'immigration')
perform_quality_check(temp_df, 'temperature')
perform_quality_check(imm_temp_df,'city_temp_facts')

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

4.3 Data dictionary
##### **The Fact Table** - This will contain information from the I94 immigration data joined with the city temperature data on i94port


Column Details:
- immTempId : The unique id for the fact table
- city_code : The destination city code for the immigrants
- city : The city name referring to the city code
- immigrant_arrival_date : Arrival date of immigrants on the destination city 
- month : The immigration month
- year: The immigration year
- immigrant_departure_date : Departure date of immigrants
- travel_mode : Mode of travel
- visa_purpose : The visa purpose
- visa_type : The visa type
- temperature : The average temperature of destination city.
- latitude : The latitude coordinates of destination city
- longitude : The longitude coordinates of destination city

The schema details for the fact table are described below - 

In [None]:
imm_temp_df.printSchema()

##### Dimension Table 1 - Data reffering from I94 immigration data.

Column Details:
- immigrantId : The unique id for the immigrant
- i94port : 3 character city code
- i94cit : 3 digit origin city code
- arrdate : the arrival date
- i94mon : immigration month
- i94yr : immigration year
- depdate : departure date
- i94mode : mode of travel
- i94visa : visa purpose 
- visatype : visa type

The schema details for the immigration dimension table are described below - 

In [None]:
imm_df.printSchema()

##### Dimension Table 2 - Data containing city temperature information .
Column Details:
- tempId : The unique id for a particular city's temperature data
- destCode : The  destination code for particular city
- AverageTemperature : Average temperature of the city
- City : The city reffering to the destCode
- Country : Country where city is located
- Latitude : The latitude coordinates of city
- Longitude : The longitude coordinates of city

The schema details for the immigration temperature table are described below - 

In [None]:
temp_df.printSchema()

#### Step 5: Complete Project Write Up

#### Clearly state the rationale for the choice of tools and technologies for the project.
I have used the spark for this project due to below reasons:
- We can easily manage input data from various for different file formats eg. SAS,csv etc.
- Its fast, easy and effecient to process the large amount of data
- Even if the data or number of users increases we can reuse the same code by modifying the spark configuration.

#### Propose how often the data should be updated and why.
As the raw files are being formatted by month, we can update the data on monthy basis.

#### We can manage the problem differently considering all the above scenarios:


<strong>Scenario 1: The data was increased by 100x </strong><br>
<font size="3">
Solution: We can load our data into Amazon Redshift. Its highly optimized for large workloads and operations like aggeregating data.
</font>
<br/><br/>
<strong>Scenario 2: The data populates a dashboard that must be updated on a daily basis by 7am every day.</strong><br>
<font size="3">
    Solution: This action can be easily performed using <b> Airflow</b> by creating a pipeline and adding scheduler to it which will run on daily basis. We can also add DAG retry mechanism, perform data checks, and we can also send emails in case of failures. We can also alter the dashboard functionality in case of failures
</font>
<br/><br/>
<strong>Scenario 3: The database needed to be accessed by 100+ people.</strong><br>
<font size="3">
Solution: This also can be achieved by using Amazon Redshift.Its completely auto scalable, highly optimized with extremely good performance during any operations performed.
</font>