## Immigration & Weather Analysis
### Data Engineering Capstone Project

#### Project Summary
We are using 2 data sources, immigration data (available in SAS format) & Temprature dataset (available in CSV format) - to create 2 dimension tables and one fact table (joining 2 datasets) - which will help us determine if there is any co-relation between the place of visit & weather

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import psycopg2
from collections import defaultdict
from datetime import datetime, timedelta


### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

Datasets used in this project are - 
* I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace
* World Temperature Data: This dataset came from Kaggle.

For Detailed Data Dictionary of all the datasets - please refer this file in workspace - I94_SAS_Labels_Descriptions.SAS

Note - The file porti94.txt is created from the data-dictionary to validate valid ports (This can also be considered one manually created data source, .txt file)


In [2]:
# Read in the temperature data
temp_data = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp_data = pd.read_csv(temp_data, sep=',')

In [3]:
df_temp_data.head(10)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


In [4]:
# Read in the data here
i94_df = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_imm = pd.read_sas(i94_df, 'sas7bdat', encoding="ISO-8859-1")

In [5]:
df_imm.head(10)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,...,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1959.0,09302016,,,AZ,92471610000.0,602.0,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,...,,M,1970.0,09302016,,,AZ,92470800000.0,602.0,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,...,,M,1968.0,09302016,,,AZ,92478490000.0,608.0,B1


Below code won't run. Issue is raised in Udacity Support

In [2]:
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()


#df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [3]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.
1. We need to identity all the invalid ports in the i94 dataset and remove those
2. We need to remove all the temperature values that have average temperature as NaN value


#### Cleaning Steps
Document steps necessary to clean the data
1. Create a dictionary of valid ports
2. Create function to weed out invalid ports
3. Pass dataset through this function to get cleansed data
4. Clean Temperature dataset by filtering out the rows that have 'NaN' values
5. Create a Function that will pass corresponding PORT value for in Temperature dataset for a city
6. Add new column to Temperature dataset - that tells port for corresponding city

In [4]:
# Performing cleaning tasks here


# valid ports - This JSON file was created from the SAS file for one time reference
import json

i94portvalid = {}
with open('porti94.txt') as f:
    contents = f.read()
    i94portvalid = json.loads(contents)

In [5]:
# Define immigration data cleansing function
def clean_imm_data(file):
    '''    
    This function takes the immigration data file & outputs the dataframe with valid i94 ports 
    '''    
    # Read I94 data into Spark
    df_imm = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter out entries where i94port is invalid
    df_imm = df_imm.filter(df_imm.i94port.isin(list(i94portvalid.keys())))

    return df_imm

In [6]:
# Clean temperature data
df_temp_data = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

df_temp_data = df_temp_data.filter(df_temp_data.AverageTemperature != 'NaN')

In [7]:
@udf()
def get_port(city):
    for key in i94portvalid:
        if city.lower() in i94portvalid[key][0].lower():
            return key

In [8]:
# Add port information to temperature data
df_temp_data = df_temp_data.withColumn("i94port", get_port(df_temp_data.City))
df_temp_data = df_temp_data.filter(df_temp_data.i94port != 'null')
df_temp_data.show()

+----------+------------------+-----------------------------+--------+--------------+--------+---------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|    City|       Country|Latitude|Longitude|i94port|
+----------+------------------+-----------------------------+--------+--------------+--------+---------+-------+
|1743-11-01|             8.758|                        1.886|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|1744-04-01|6.0699999999999985|           2.9339999999999997|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|1744-05-01|             7.751|                        1.494|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|1744-06-01|             10.62|                        1.574|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|1744-07-01|             12.35|                        1.591|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|1744-09-01|            11.224|           1.6059999999999999|Aberdeen|United Kingdom|  57.05N|  

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

Dimension Table 1 - Immigration Table

* i94yr -> 4 digit year
* i94mon -> numneric month
* i94cit -> city code
* i94port -> port code
* arrdate -> arrival date
* i94mode -> travel code
* depdate -> departure date
* i94visa -> Remarks of Immigration



Dimension Table 2 - Temperature Table

* AverageTemperature -> average temperature
* City -> City
* Country -> Country
* Latitude -> Latitude
* Longitude -> Longitude
* i94port -> port code



Fact Table - Join both datasets on Port Code

* i94yr -> year
* i94mon -> month
* i94cit -> city
* i94port -> i94port
* arrdate -> arrival_date
* depdate -> departure_date
* i94visa -> remarks
* AverageTemperature -> temperature
* Latitude -> latitude
* Longitude -> longitude




#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Load, clean & create Immigration data as required
2. Create Temperature Table
3. Create temporary views of both tables 
4. Use the temporary views to create the fact table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [9]:
# Write code here
#1 Load, clean & select Immigration data as required

imm_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_imm = clean_imm_data(imm_data)
imm_table = df_imm.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])
imm_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")


In [None]:
#2 Create Temperature Table

temperature_table = df_temp_data.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])
temperature_table.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [None]:
# Create temporary views of the immigration and temperature data

df_imm.where("i94port = 'DAL'").createOrReplaceTempView("immigration_view")
df_temp_data.where("i94port = 'DAL'").createOrReplaceTempView("temperature_view")

In [None]:
# Test ONLY
#spark.sql('select * from immigration_view join temperature_view on immigration_view.i94port = temperature_view.i94port')

#spark.sql('select * from temperature_view')

In [None]:
# Create the fact table by joining the immigration and temperature views (Only checking for Dallas, TX due to performace issues)

fact_table = spark.sql('''
select immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temperature_view.AverageTemperature as temperature,
       temperature_view.Latitude as latitude,
       temperature_view.Longitude as longitude
from immigration_view
JOIN temperature_view ON immigration_view.i94port = temperature_view.i94port
WHERE immigration_view.i94port  in ('DAL')
''')

fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

def quality_check_func(df, desc):
    '''
    This function inputs the dataframe and its description and outputs the result of the data quality check
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(desc))
    else:
        print("Data quality check passed for {} with {} records".format(desc, result))
    return 0

In [None]:
# Perform quality check
quality_check_func(df_imm, "immigration table")
quality_check_func(df_temp_data, "temperature table")

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

There is a data dictionary provided in workspace in .SAS file as discussed above, also in section 3.1 - I have explained all the fields used in the project's tables.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
-> I have used Spark in this project as we know it handles multiple file formats seamlessly (hence we can also modify the same code to scale)

* Propose how often the data should be updated and why.
-> The grain of the model is set to monthly - so the data should be updated using that schedule!

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
-> We must use database as a service like redshift which can handle large datasets easily

 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
-> We can use a scheduling tool like Airflow to acheive this target

 * The database needed to be accessed by 100+ people.
-> Same redshift database can be used to satisfy this requirement as well!

THE END