# Project Title
### Data Engineering Capstone Project

#### Project Summary
In this Project, I94 Immigration data and City Temperature will be used to make a database in order to query and analyze immigration events and to create the database from these two sources an ETL pipeline is to be build.
and then the database will be used to access immigration behaviour to location temperatures.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import re
import pandas as pd
import psycopg2
from collections import defaultdict
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
pd.set_option('display.max_columns', None)

### Step 1: Scope the Project and Gather Data

#### Scope 
1. In this project, we will have to aggregate the data based on destination city and form the immigration dimension table. 
2. Similarly, we will also create the second dimesion table i.e. temeperature table aggregated based on city.
3. Lastly, a Fact table will be created by joining both the immigration and temperature table in order to obtain a star schema. It will be done in order to determine whether temperature affects the destination cities selection for immigration
4. All the processing of the provided datasets is done using Spark because of its ability of handling multiple data formats.

#### Describe and Gather Data 
We will be using different datasets in this project like:

1. I94 Immigration Data: This data comes from the US National Tourism and Trade Office. [Link](https://travel.trade.gov/research/reports/i94/historical/2016.html)
2. World Temperature Data: This dataset came from Kaggle. [Link](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)

In [2]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immigration = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
df_immigration.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [4]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [5]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperatures = pd.read_csv(fname)

In [6]:
df_temperatures.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [7]:
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
I94ports = {}
with open('I94ports.txt') as f:
     for line in f:
         match = re_obj.search(line)
         I94ports[match[1]]=[match[2]]

In [8]:
df_immigration_new = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_immigration_new = df_immigration_new.filter(df_immigration_new.i94port.isin(list(I94ports.keys())))
df_immigration_new.limit(20).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,57.0,1.0,1.0,20160401.0,,,O,O,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,63.0,2.0,1.0,20160401.0,,,O,K,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,57.0,2.0,1.0,20160401.0,,,O,K,,M,1959.0,09302016,,,AZ,92471610000.0,602.0,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,46.0,2.0,1.0,20160401.0,,,O,O,,M,1970.0,09302016,,,AZ,92470800000.0,602.0,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,48.0,1.0,1.0,20160401.0,,,O,O,,M,1968.0,09302016,,,AZ,92478490000.0,608.0,B1


#### Clean Temperature Data

In [9]:
df_temperature_new = spark.read.format("csv").option("header", "true").load('../../data2/GlobalLandTemperaturesByCity.csv')
df_temperature_new.limit(20).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.787999999999999,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.050999999999998,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


In [10]:
df_temperature_new = df_temperature_new.filter(df_temperature_new.AverageTemperature != "NaN").dropDuplicates(['City', 'Country'])
df_temperature_new.limit(20).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,3.264,1.665,Allentown,United States,40.99N,74.56W
1,1779-11-01,0.0119999999999999,2.714,Atyrau,Kazakhstan,47.42N,50.92E
2,1825-01-01,26.069000000000003,2.16,Bintulu,Malaysia,2.41N,113.30E
3,1825-01-01,26.517,2.584,Butterworth,Malaysia,5.63N,100.09E
4,1845-01-01,24.995,1.871,Cainta,Philippines,15.27N,120.83E
5,1825-01-01,24.753,2.152,Ciamis,Indonesia,7.23S,107.84E
6,1850-01-01,22.121,1.5730000000000002,Dodoma,Tanzania,5.63S,35.52E
7,1840-01-01,6.2250000000000005,2.112,Fuling,China,29.74N,107.08E
8,1841-01-01,-0.3360000000000001,2.695,Fuyang,China,32.95N,115.85E
9,1856-01-01,26.901,1.359,Ife,Nigeria,7.23N,4.05E


In [11]:
@udf()
def get_i94port(city):  
    for key in I94ports:
        if city.lower() in I94ports[key][0].lower():
            return key

In [12]:
df_temperature_new = df_temperature_new.withColumn("i94port", get_i94port(df_temperature_new.City))
df_temperature_new = df_temperature_new.filter(df_temperature_new.i94port != 'null')
df_temperature_new.show(10)

+----------+------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|     City|             Country|Latitude|Longitude|i94port|
+----------+------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|1856-01-01|            26.901|                        1.359|      Ife|             Nigeria|   7.23N|    4.05E|    888|
|1852-07-01|            15.488|                        1.395|    Perth|           Australia|  31.35S|  114.97E|    PER|
|1828-01-01|            -1.977|                        2.551|  Seattle|       United States|  47.42N|  121.97W|    SEA|
|1743-11-01|             2.767|                        1.905| Hamilton|              Canada|  42.59N|   80.73W|    HAM|
|1849-01-01| 7.399999999999999|                        2.699|  Ontario|       United States|  34.56N|  116.76W|    ONT|
|1821-11-01|             2.322|         

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
#### Fact Table - I94 immigration data joined with the city temperature data on i94port Columns:

1. i94port = destination city code,
2. i94yr = 4 digit year,
3. i94mon = Numeric month,
4. i94cit = origin city code, 
5. arrdate = arrival date,
6. i94mode = travel code,
7. depdate = departure date from USA,
8. i94visa = visa,
9. City = city name
10. Latitude = latitude
11. AverageTemperature = average temperature of destination city
12. Longitude = longitude
            
#### Dimension Table - temperature data Columns:

1. i94port = destination city code extracted from immigration data
2. AverageTemperature = average temperature
3. City = city name
4. Country = country name
5. Latitude= latitude
6. Longitude = longitude

#### Dimension Table - I94 immigration data Events Columns:

1. i94yr = year
2. i94mon = month
3. i94cit = origin city code
4. i94port = destination city code
5. arrdate = arrival date
6. i94mode = travel code
7. i94add = address
8. depdate = departure date
9. i94visa = visa
10. visatype = type of visa

#### Use Case
The above data model is used to determine whether temperature affects the destination cities selection for immigration and for that I have created a star schema with two dimension tables and one fact table as mentioned above
The above data model can be used to query for weather and demographics of popular immigration destinations, 
which could be useful for both immigrants and regulators. Regulators can also access data about individual immigrants, date of arrival to the US, 
visa expiry dates and method of entries to improve decision making.

#### 3.2 Mapping Out Data Pipelines

1. Immigration data is cleaned for i94port not being valid
2. Temperature data is cleaned for AverageTemperature not being valid and dropping duplicates on the basis of City and Country
3. Temperature dimension table is created using the above cleaned temperature data and write into parquet form
4. Immigration dimension table is created using the cleaned immigration data and write into parquet form
5. Fact table is created by joining Temperature and Immigration dimension table and then and write into parquet form

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### First we will build the Temperature dimension Table

In [13]:
df_temperature_new.createOrReplaceTempView('Temperature')

In [14]:
query = "SELECT i94port, AverageTemperature AS Average_Temerature, City, Country, Latitude, Longitude, AverageTemperatureUncertainty AS Average_Temperature_Uncertainity FROM Temperature"
df_final_temperature = spark.sql(query)
df_final_temperature.limit(10).toPandas()

Unnamed: 0,i94port,Average_Temerature,City,Country,Latitude,Longitude,Average_Temperature_Uncertainity
0,888,26.901,Ife,Nigeria,7.23N,4.05E,1.359
1,PER,15.488,Perth,Australia,31.35S,114.97E,1.395
2,SEA,-1.977,Seattle,United States,47.42N,121.97W,2.551
3,HAM,2.767,Hamilton,Canada,42.59N,80.73W,1.905
4,ONT,7.399999999999999,Ontario,United States,34.56N,116.76W,2.699
5,SPO,2.322,Spokane,United States,47.42N,117.24W,2.375
6,MAA,18.874,Abu Dhabi,United Arab Emirates,24.92N,54.98E,2.017
7,ANA,25.229,Anaco,Venezuela,8.84N,64.05W,1.094
8,CHI,9.904,Ica,Peru,13.66S,75.14W,1.4369999999999998
9,NOG,9.833,Nogales,United States,31.35N,111.20W,2.182


In [15]:
df_final_temperature.write.partitionBy('i94port').mode('append').parquet('Final_data_Tables/temperature.parquet')

#### Now Create Immigration dimension Table

In [16]:
df_immigration_new.createOrReplaceTempView('Immigration_data')

In [17]:
query = "SELECT i94port, i94yr, i94mon, i94cit, arrdate, i94mode, i94addr, depdate, i94visa, visatype FROM Immigration_data"
df_final_immigration = spark.sql(query)
df_final_immigration.limit(10).toPandas()

Unnamed: 0,i94port,i94yr,i94mon,i94cit,arrdate,i94mode,i94addr,depdate,i94visa,visatype
0,XXX,2016.0,4.0,692.0,20573.0,,,,2.0,B2
1,ATL,2016.0,4.0,254.0,20551.0,1.0,AL,,3.0,F1
2,WAS,2016.0,4.0,101.0,20545.0,1.0,MI,20691.0,2.0,B2
3,NYC,2016.0,4.0,101.0,20545.0,1.0,MA,20567.0,2.0,B2
4,NYC,2016.0,4.0,101.0,20545.0,1.0,MA,20567.0,2.0,B2
5,NYC,2016.0,4.0,101.0,20545.0,1.0,MI,20555.0,1.0,B1
6,NYC,2016.0,4.0,101.0,20545.0,1.0,NJ,20558.0,2.0,B2
7,NYC,2016.0,4.0,101.0,20545.0,1.0,NJ,20558.0,2.0,B2
8,NYC,2016.0,4.0,101.0,20545.0,1.0,NY,20553.0,2.0,B2
9,NYC,2016.0,4.0,101.0,20545.0,1.0,NY,20562.0,1.0,B1


In [18]:
df_final_immigration.write.partitionBy('i94port').mode('append').parquet('Final_data_Tables/Immigration.parquet')

#### Create the Fact table by joining Immigration Table and Temperature Table on i94port

In [19]:
df_final_temperature.createOrReplaceTempView('temperature')
df_final_immigration.createOrReplaceTempView('immigration')

In [20]:
query = '''SELECT im.i94port AS destination_city_code,
            im.i94yr AS year, im.i94mon AS month, im.i94cit AS origin_city_code,
            im.arrdate AS arrival_date,
            im.i94mode AS travel_code,
            im.depdate AS departure_date,
            im.i94visa AS visa,
            tm.City,
            tm.Latitude, 
            tm.Average_Temerature AS Temperature,
            tm.Longitude, tm.Country
            FROM immigration im JOIN temperature tm
            ON (im.i94port = tm.i94port)
'''

df_fact = spark.sql(query)
df_fact.limit(10).toPandas()

Unnamed: 0,destination_city_code,year,month,origin_city_code,arrival_date,travel_code,departure_date,visa,City,Latitude,Temperature,Longitude,Country
0,SNA,2016.0,4.0,111.0,20545.0,1.0,20547.0,1.0,San Antonio,29.74N,7.168999999999999,97.85W,United States
1,SNA,2016.0,4.0,114.0,20545.0,1.0,20562.0,2.0,San Antonio,29.74N,7.168999999999999,97.85W,United States
2,SNA,2016.0,4.0,117.0,20545.0,1.0,20559.0,2.0,San Antonio,29.74N,7.168999999999999,97.85W,United States
3,SNA,2016.0,4.0,129.0,20545.0,1.0,20548.0,2.0,San Antonio,29.74N,7.168999999999999,97.85W,United States
4,SNA,2016.0,4.0,575.0,20545.0,1.0,20547.0,1.0,San Antonio,29.74N,7.168999999999999,97.85W,United States
5,SNA,2016.0,4.0,575.0,20545.0,1.0,20547.0,2.0,San Antonio,29.74N,7.168999999999999,97.85W,United States
6,SNA,2016.0,4.0,577.0,20545.0,1.0,20550.0,1.0,San Antonio,29.74N,7.168999999999999,97.85W,United States
7,SNA,2016.0,4.0,577.0,20545.0,1.0,20560.0,2.0,San Antonio,29.74N,7.168999999999999,97.85W,United States
8,SNA,2016.0,4.0,582.0,20545.0,1.0,20560.0,1.0,San Antonio,29.74N,7.168999999999999,97.85W,United States
9,SNA,2016.0,4.0,582.0,20545.0,1.0,,2.0,San Antonio,29.74N,7.168999999999999,97.85W,United States


In [21]:
df_fact.write.partitionBy('destination_city_code').mode('append').parquet('Final_data_Tables/Immigration_Temperature.parquet')

In [22]:
df_fact.createOrReplaceTempView('facts')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [23]:
def checkDataQuality(table):
    query = f"SELECT COUNT(*) FROM {table}"
    records = spark.sql(query)
    if records == 0:
        raise ValueError(f"Data Quality Check Failed and {table} table contains no results")
    print(f"Data Quality Check Passed for {table} table")

In [24]:
checkDataQuality('temperature')
checkDataQuality('immigration')
checkDataQuality('facts')

Data Quality Check Passed for temperature table
Data Quality Check Passed for immigration table
Data Quality Check Passed for facts table


#### 4.3 Data dictionary 

#### Fact Table - I94 immigration data joined with the city temperature data on i94port Columns:

1. i94port = destination city code,
2. i94yr = year,
3. i94mon = month,
4. i94cit = origin city code, 
5. arrdate = arrival date,
6. i94mode = travel code,
7. depdate = departure date from USA,
8. i94visa = visa,
9. City = city name
10. Latitude = latitude
11. AverageTemperature = average temperature of destination city
12. Longitude = longitude
            
#### Dimension Table - temperature data Columns:

1. i94port = destination city code extracted from immigration data
2. AverageTemperature = average temperature
3. City = city name
4. Country = country name
5. Latitude= latitude
6. Longitude = longitude
7. AverageTemperatureUncertainity = average temperature uncertainity

#### Dimension Table - I94 immigration data Events Columns:

1. i94yr = year
2. i94mon = month
3. i94cit = origin city code
4. i94port = destination city code
5. arrdate = arrival date
6. i94mode = travel code
7. i94add = address
8. depdate = departure date
9. i94visa = visa
10. visatype = type of visa

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
 * For this project, we used the following:
   * Spark is used in order to process the immigration and temperature data.
   * Spark SQL is used to create dataframes from the input files and then perform operations on the dataframe in order to create the fact and dimension table using standard SQL. 

* Propose how often the data should be updated and why.
 * As the raw file data is present by months and due to this reason data should be updated on monthly basis.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
   * When the data was decreased by 100x, Amazon Redshift would be used as it is a analytical database
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
   * For populating a dashboard, we must use Airflow as it can be schedules on daily basis and time can also be fixed, DAG should be created in Airflow which also send email on failures
 * The database needed to be accessed by 100+ people.
   * Use Amazon Redshift because of its auto scaling and able to deal with heavy workloads.