# Project Title
### Data Engineering Capstone Project

#### Project Summary
This is the Capstone project for Udacity Data Engineer nanodegree where we work on datasets having millions of rows and apply the tools, technology and knowledge gained throughout tjhe course

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [38]:
# Do all imports and installs here
import pandas as pd
import re

from pyspark.sql.functions import udf

### Step 1: Scope the Project and Gather Data

#### Scope 
The scope of the project is to analyse temperature effects on immigration by cities. We create dimension and fact tables on which analytical queires can be run to study how temperature in different cities has affected the immigration to them.

#### Describe and Gather Data 
1. The immigration data comes from here: https://travel.trade.gov/research/reports/i94/historical/2016.html . This data is located in Udacity Workspace at '../../data/18-83510-I94-Data-2016/' location. The file format is 'sas7bdat'
2. The temperature data is taken from Kaggle: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data 

In [39]:
# Read immigration in the data here
immigration_filename = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_im = pd.read_sas(immigration_filename, 'sas7bdat', encoding="ISO-8859-1")

In [41]:
df_im.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [42]:
temperature_filename = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(temperature_filename, sep=',')

In [43]:
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [44]:
# Create Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [46]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore and cleaning the Data 
1. For the immigration data, we will clean it by identifying improper i94port city codes by comparing those with valid codes in the i94port.txt file
2. For temperature data, we will clean it by dropping Nan values for AverageTemperature, duplicate values for City & Country and as well as add corresponding i94port for the location


In [47]:
# Create dictionary of valid i94port codes
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}
with open('i94port.txt') as f:
     for line in f:
         match = re_obj.search(line)
         i94port_valid[match[1]]=[match[2]]

In [48]:
# Clean I94 immigration data
def clean_i94_data(immigration_file):
    '''    
    Params: 
        immigration_file: Filepath of file containing immigration data
    Output: Cleaned I94 immigration spark dataframe
    '''    
    # Read I94 data into Spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(immigration_file)

    # Filter out entries where i94port is invalid
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(i94port_valid.keys())))

    return df_immigration

In [49]:
immigration_test_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
df_immigration_cleaned = clean_i94_data(immigration_test_file)
df_immigration_cleaned.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| null|      G|   null|      Y|   null| 1991.0|     D/S|     M|  null|   null|  3.73679633E9|00296|      F1|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    nul

In [50]:
# Clean temperature data
df_temperature = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")
df_temperature = df_temperature.filter(df_temperature.AverageTemperature != 'NaN')
df_temperature = df_temperature.dropDuplicates(['City', 'Country'])

In [51]:
# UDF to get i94port for a city
@udf()
def get_i94port(city):
    '''
    Params: 
        city: City name
    Output: Corresponding i94port
    '''
    for key in i94port_valid:
        if city.lower() in i94port_valid[key][0].lower():
            return key

In [52]:
# Add iport94 code to temperature dataframe
df_temperature = df_temperature.withColumn("i94port", get_i94port(df_temperature.City))
df_temperature = df_temperature.filter(df_temperature.i94port != 'null')
df_temperature.show()

+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|     City|             Country|Latitude|Longitude|i94port|
+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|1852-07-01|             15.488|                        1.395|    Perth|           Australia|  31.35S|  114.97E|    PER|
|1828-01-01|             -1.977|                        2.551|  Seattle|       United States|  47.42N|  121.97W|    SEA|
|1743-11-01|              2.767|                        1.905| Hamilton|              Canada|  42.59N|   80.73W|    HAM|
|1849-01-01|  7.399999999999999|                        2.699|  Ontario|       United States|  34.56N|  116.76W|    ONT|
|1821-11-01|              2.322|                        2.375|  Spokane|       United States|  47.42N|  117.24W|    SPO|
|1843-01-01| 18.874000000000002|

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
<b>Dimension Tables</b>:</br>
    1. <i>Immigration_dimension</i>: I94 immigration data <br/>
        Columns:
            <code>
            i94yr = 4 digit year
            i94mon = numeric month
            i94cit = 3 digit code of origin city
            i94port = 3 character code of destination city
            arrdate = arrival date
            i94mode = 1 digit travel code
            depdate = departure date
            i94visa = reason for immigration 
            </code>
            <br/><br/>
    2. <i>Temperature_dimension</i>: City <-> Temperature data</br>
         Columns:
            <code>
            i94port = 3 character code of destination city (mapped from cleaned up immigration data)
            AverageTemperature = average temperature
            City = city name
            Country = country name
            Latitude= latitude
            Longitude = longitude
            </code>
            <br/><br/><br/>
<b>Fact table:</b>:</br>
    <i>immigration_city_temperature_fact</i>: This containes fact table information by joining immigration and temperature dimension tables </br>
        Columns:
            <code>
            i94yr = 4 digit year
            i94mon = numeric month
            i94cit = 3 digit code of origin city
            i94port = 3 character code of destination city
            arrdate = arrival date
            i94mode = 1 digit travel code
            depdate = departure date
            i94visa = reason for immigration
            AverageTemperature = average temperature of destination city
            </code>


#### 3.2 Mapping Out Data Pipelines
Pipeline Steps:

    1. Clean I94 immigration data
    2. Clean temperature data
    3. Create immigration dimension table by selecting relevant columns from df_immigration and write to parquet file partitioned by i94port
    4. Create temperature dimension table by selecting relevant columns from df_temperature and write to parquet file partitioned by i94port
    5. Create fact table by joining immigration and temperature dimension tables on i94port and write to parquet file partitioned by i94port

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [54]:
# Load immigration data and cleaning and write as parquet partitioned by i94port in the results folder
immigration_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immigration = clean_i94_data(immigration_data)
immigration_table = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])
immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [55]:
# Select columns from temperature dataframe and write as parquet in results folder partitioned by i94port
temperature_table = df_temperature.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])
temperature_table.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [56]:
# Create a fact table by joining the 2 dataframes by first creating a temporary view for each and 
# write to results folder as parquet file partitioned by i94port
df_immigration.createOrReplaceTempView("immigration_view")
df_temperature.createOrReplaceTempView("temperature_view")

immigration_temperature_fact_table = spark.sql('''
select immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temperature_view.AverageTemperature as temperature,
       temperature_view.Latitude as latitude,
       temperature_view.Longitude as longitude
from immigration_view
JOIN temperature_view ON (immigration_view.i94port = temperature_view.i94port)
''')

immigration_temperature_fact_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration_temperature_fact.parquet")

#### 4.2 Data Quality Checks
The data quality checks we'll perform to ensure the pipeline ran as expected. This includes:
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [58]:
def count_rows(output_file_path, description):
    '''
    Params: 
        output_file_path: Output file to read and check if rows exist in the result
    Output: Assertion result of quality check; if rows exist 
    '''
    dataframe = spark.read.parquet(output_file_path)
    result = dataframe.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))

count_rows("/results/immigration.parquet", "Immigration data check")
count_rows("/results/temperature.parquet", "Temperature data check")
count_rows("/results/immigration_temperature_fact.parquet", "Immigration Temperature Fact data check")


Data quality check passed for Immigration data check with 3088544 records
Data quality check passed for Temperature data check with 207 records
Data quality check passed for Immigration Temperature Fact data check with 4130593 records


#### 4.3 Data dictionary 
<b>Dimension Tables</b>:</br>
    1. <i>Immigration_dimension</i>: I94 immigration data <br/>
        Columns:
            <code>
            i94yr = 4 digit year
            i94mon = numeric month
            i94cit = 3 digit code of origin city
            i94port = 3 character code of destination city
            arrdate = arrival date
            i94mode = 1 digit travel code
            depdate = departure date
            i94visa = reason for immigration 
            </code>
            <br/><br/>
    2. <i>Temperature_dimension</i>: City <-> Temperature data</br>
         Columns:
            <code>
            i94port = 3 character code of destination city (mapped from cleaned up immigration data)
            AverageTemperature = average temperature
            City = city name
            Country = country name
            Latitude= latitude
            Longitude = longitude
            </code>
            <br/><br/><br/>
<b>Fact table:</b>:</br>
    <i>immigration_city_temperature_fact</i>: This containes fact table information by joining immigration and temperature dimension tables </br>
        Columns:
            <code>
            i94yr = 4 digit year
            i94mon = numeric month
            i94cit = 3 digit code of origin city
            i94port = 3 character code of destination city
            arrdate = arrival date
            i94mode = 1 digit travel code
            depdate = departure date
            i94visa = reason for immigration
            AverageTemperature = average temperature of destination city
            </code>


#### Step 5: Complete Project Write Up
* Rationale for the choice of tools and technologies for the project: <br/>
    <p>We used Python Pandas library to explore and test the dataset
    We used Spark as a technology to read and transform the million+ rows dataset as it is designed for big data problems and can handle large scale of data by processing it parallely. Also, it has support for reading multiple file formats be it csv, json, parquet and in our case also sas7bdat for the immigration data provided from the source
* Propose how often the data should be updated and why<br/>
    The source immigration data is made available on a monthly and quarterly basis and hence we can also update the data in our store by running the ETL on a monthly basis
* Description of how you would approach the problem differently under the following scenarios:<br/>
 * The data was increased by 100x:<br/>
     In such cases, we can write the output parquet files to Amazon S3 where scale is not a problem and is also very cost effective resilient storage on cloud
 * The data populates a dashboard that must be updated on a daily basis by 7am every day<br/>
     This can easily be achieved by hosting our ETL process in an Airflow cluster and orchestrating it there by required scheduling configurations
 * The database needed to be accessed by 100+ people<br/>
     The dimension and fact tables can be created in Redshift. It's very easy and fast to copy S3 data to Redshift and Redshift does guarantee the scale needed for accessing by numerous people
     