# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import re
from functools import reduce

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import udf

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In this project,  we will be using the I94 Immigration Data and World Tempreture Data to explore the relationship between immgration and tempreture to see if the immigration will be impacted by weather. In order to do the analysis, we will make an automated pipeline to refresh the data. 


#### Scope

Using the I94 Immigration dataset and the city temperature data provided by Udacity, we will be creating an ETL pipeline. The pipeline contains several steps including designing the dimensional model schema, creating database and tables based on schema,  staging the data and loading the data. In this project, due to the large scale of data, Spark can greatly speed up the processing efficiency. In addition, the dimension model we are designing here can greatly help business decision team to rapidly map the data they needed and design the query. 


#### Describe and Gather Data

The I94 data comes from the US National Tourism and Trade Office.


The World Temperature Data came from Kaggle.


#### Use Case

The user of this database may come from the  data scientist who wants to research that the location where less likely rains has more immigrants.


In [2]:

spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [3]:
df_spark.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [4]:
df_temp = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

In [5]:
df_temp.show()

+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|             10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01| 14.050999999999998|        

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

After observing the data, we can find that destination city code in i94 has some invalid value such as 99, NaN. Also, the average temperature has some duplicate locations. We also need to add the i94 port of the location

In [6]:
sas_header_file = 'I94_SAS_Labels_Descriptions.SAS'
with open(sas_header_file) as f:
    lines = f.readlines() 
    
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
valid_ports = {}
for line in lines[302:961]:
    match = re_obj.search(line)
    valid_ports[match.group(1)]=[match.group(2)]

In [8]:
# remove the invalid i94port
# re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
# i94port_valid = {}
# with open('i94port_valid.txt') as f:
#      for line in f:
#             match = re_obj.search(line)
#             i94port_valid[match[1]]=[match[2]]
            
cleaned_immigration = df_spark.filter(df_spark.i94port.isin(list(valid_ports.keys())))            

In [9]:
cleaned_immigration.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [17]:
@udf()
def get_i94port(city):
    '''
    Input: City name
    
    Output: Corresponding i94port
    
    '''
    
    for key in valid_ports:
        if city.lower() in valid_ports[key][0].lower():
            return key

In [18]:
# remove null value
cleaned_temp = df_temp.filter(df_temp.AverageTemperature != 'NaN')
# remove duplicated rows
cleaned_temp = cleaned_temp.dropDuplicates(['City', 'Country'])
# add i94 port column
cleaned_temp = cleaned_temp.withColumn("i94port", get_i94port(df_temp.City))
# remove null value
cleaned_temp = cleaned_temp.filter(cleaned_temp.i94port != 'null')

In [19]:
cleaned_temp.show()

+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|     City|             Country|Latitude|Longitude|i94port|
+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|1856-01-01|             26.901|                        1.359|      Ife|             Nigeria|   7.23N|    4.05E|    888|
|1852-07-01|             15.488|                        1.395|    Perth|           Australia|  31.35S|  114.97E|    PER|
|1828-01-01|             -1.977|                        2.551|  Seattle|       United States|  47.42N|  121.97W|    SEA|
|1743-11-01|              2.767|                        1.905| Hamilton|              Canada|  42.59N|   80.73W|    HAM|
|1849-01-01|  7.399999999999999|                        2.699|  Ontario|       United States|  34.56N|  116.76W|    ONT|
|1821-11-01|              2.322|

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

In this case, we will be building 1 fact table and 2 dimension table.


##### Fact Table

1. fctImmigration
- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination city
- arrdate = arrival date
- i94mode = 1 digit travel code
- depdate = departure date
- i94visa = reason for immigration
- AverageTemperature = average temperature of destination city


##### Dimension Table

1. dimTemperature

- i94port = 3 character code of destination city (mapped from cleaned up immigration data)
- AverageTemperature = average temperature
- City = city name
- Country = country name
- Latitude= latitude
- Longitude = longitude


2. dimImmigration

- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination city
- arrdate = arrival date
- i94mode = 1 digit travel code
- depdate = departure date
- i94visa = reason for immigration


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.



The procedure for ETL:


1. Extract unique respondent features from I94 table as immigration dimension table.
2. Extract unique and non-duplicate recores from temperature spark dataframe and map the i94 port to each records, and convert it into one  of our dimension table.
3. Based on two dimension tables we created, using Spark SQL to join dimension tables together on i94 port, which becomes the fact table.

In [20]:
# build dimension tables

dimTemperature = cleaned_temp.select(['i94port','AverageTemperature','City','Country','Latitude','Longitude'])
dimTemperature .write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")


dimImmigration = cleaned_immigration.select(['i94yr','i94mon',"i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])
dimImmigration.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [21]:
# write spark sql for fact table
dimImmigration.createOrReplaceTempView("immigration_view")
dimTemperature.createOrReplaceTempView("temp_view")


fctImmigration = spark.sql('''
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temp_view.AverageTemperature as temperature,
       temp_view.Latitude as latitude,
       temp_view.Longitude as longitude
FROM immigration_view
JOIN temp_view ON (immigration_view.i94port = temp_view.i94port)
''')


# write table into parquet files
fctImmigration.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

def quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram
    Output: Print outcome of data quality check
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

# Perform data quality check
quality_check(df_spark, "immigration table")
quality_check(df_temp, "temperature table")

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

1. fctImmigration
- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination city
- arrdate = arrival date
- i94mode = 1 digit travel code
- depdate = departure date
- i94visa = reason for immigration
- AverageTemperature = average temperature of destination city


##### Dimension Table

1. dimTemperature

- i94port = 3 character code of destination city (mapped from cleaned up immigration data)
- AverageTemperature = average temperature
- City = city name
- Country = country name
- Latitude= latitude
- Longitude = longitude


2. dimImmigration

- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination city
- arrdate = arrival date
- i94mode = 1 digit travel code
- depdate = departure date
- i94visa = reason for immigration
# 

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

- Clearly state the rationale for the choice of tools and technologies for the project

This project requires huge amount of data thus Spark is a great tool to speed up the running time. Based on python operation and notebook, you can easily check the data after every operation to make sure it's correct.

- Propose how often the data should be updated and why.

It should be based on the business requirements. If we create this project for monthly report then it can be set as montly update.

- Write a description of how you would approach the problem differently under the following scenarios:
    
    - The data was increased by 100x.
    
    We should think about if we can scale up the number of spark workers processing the data. 
    
    - The data populates a dashboard that must be updated on a daily basis by 7am every day.
    
    In order to precisely trigger data pipeline to run at a specific time. Airflow is the best tool to control the time schedule.
    
    - The database needed to be accessed by 100+ people.
    
    Since we are hoping the databased can be accessed by 100+ people, Spark driver can also take the responsibility to do this if we correctly set it.
