# Project Title
### Data Engineering Capstone Project

#### Project Summary
The purpose of the data engineering capstone project is to create a database schema and ETL pipeline for analyzing immigration data in the United States. We will use three datasets (immigration data, city temperature data and airports data) and create a database that will optimize to query and analyze immigration events. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import psycopg2
from collections import defaultdict
from datetime import datetime, timedelta
import pyspark

### Step 1: Scope the Project and Gather Data

#### Scope 
This project involves enhancing the U.S. I94 immigration data with other data such as temperature data with the goal of having a broader base for immigration data analysis. We would be creating 2 dimension tables and 1 fact table. The immigration data will be the the fact table and the temperature table and the destination city table will be the dimension table.

#### Data description

#### I94 Immigration Data
This data comes from the U.S. National Office of Tourism and Trade. A sample file allows you to examine the data in csv format before reading it in full. 

In [2]:
# Read in the data here
immigration = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immigration = pd.read_sas(immigration, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
pd.options.display.max_columns = None
df_immigration.head(20)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,57.0,1.0,1.0,20160401.0,,,O,O,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,63.0,2.0,1.0,20160401.0,,,O,K,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,57.0,2.0,1.0,20160401.0,,,O,K,,M,1959.0,09302016,,,AZ,92471610000.0,602.0,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,46.0,2.0,1.0,20160401.0,,,O,O,,M,1970.0,09302016,,,AZ,92470800000.0,602.0,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,48.0,1.0,1.0,20160401.0,,,O,O,,M,1968.0,09302016,,,AZ,92478490000.0,608.0,B1


#### World Temperature Data

This dataset came from Kaggle. You can read more about it here.

In [4]:
df_temp = pd.read_csv('./GlobalLandTemperaturesByMajorCity.csv')

In [5]:
df_temp.head(20)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1849-01-01,26.704,1.435,Abidjan,Côte D'Ivoire,5.63N,3.23W
1,1849-02-01,27.434,1.362,Abidjan,Côte D'Ivoire,5.63N,3.23W
2,1849-03-01,28.101,1.612,Abidjan,Côte D'Ivoire,5.63N,3.23W
3,1849-04-01,26.14,1.387,Abidjan,Côte D'Ivoire,5.63N,3.23W
4,1849-05-01,25.427,1.2,Abidjan,Côte D'Ivoire,5.63N,3.23W
5,1849-06-01,24.844,1.402,Abidjan,Côte D'Ivoire,5.63N,3.23W
6,1849-07-01,24.058,1.254,Abidjan,Côte D'Ivoire,5.63N,3.23W
7,1849-08-01,23.576,1.265,Abidjan,Côte D'Ivoire,5.63N,3.23W
8,1849-09-01,23.662,1.226,Abidjan,Côte D'Ivoire,5.63N,3.23W
9,1849-10-01,25.263,1.175,Abidjan,Côte D'Ivoire,5.63N,3.23W


#### Airport Code Table

This is a simple table of airport codes and corresponding cities. 

In [6]:
df_airport_codes = pd.read_csv("./airport-codes_csv.csv")

In [7]:
df_airport_codes.head(20)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100.0,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810.0,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038.0,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87.0,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350.0,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


#### Create Spark session

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

-Immigration data

In [9]:
# Performing cleaning tasks here
#Load dataset with spark
df_immigration_data = spark.read.format("csv").option("header", "true").load("./immigration_data_sample.csv")
# clear missing temperature values
df_immigration_data = df_immigration_data.filter(df_immigration_data.cicid != 'NaN')
df_im = pd.read_csv("./immigration_data_sample.csv")
df_im.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


-Temperature data

In [10]:
# Performing cleaning tasks here
#Load dataset with spark
df_temperature_data = spark.read.format("csv").option("header", "true").load("./GlobalLandTemperaturesByMajorCity.csv")
# clear missing temperature values
df_temperature_data = df_temperature_data.filter(df_temperature_data.AverageTemperature != 'NaN')
df_temperature_data.show()


+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|1849-01-01|            26.704|                        1.435|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-02-01|            27.434|                        1.362|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-03-01|            28.101|                        1.612|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-04-01|             26.14|           1.3869999999999998|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-05-01|            25.427|                          1.2|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-06-01|            24.844|                        1.402|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-07-01|24.058000000000003|                        1.254|Abidjan|Côte

In [11]:
#Filter on United States
df_temperature_data = df_temperature_data.filter(df_temperature_data.Country == 'United States')
df_temperature_data.show()

+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|1743-11-01|             5.436|                        2.205|Chicago|United States|  42.59N|   87.27W|
|1744-04-01|             8.766|                        2.357|Chicago|United States|  42.59N|   87.27W|
|1744-05-01|            11.605|                        2.102|Chicago|United States|  42.59N|   87.27W|
|1744-06-01|            17.965|                        1.989|Chicago|United States|  42.59N|   87.27W|
|1744-07-01|             21.68|                        1.786|Chicago|United States|  42.59N|   87.27W|
|1744-09-01|             17.03|                        1.927|Chicago|United States|  42.59N|   87.27W|
|1744-10-01|            10.662|                        2.165|Chicago|Unit

-Aiports Code data

In [15]:
#Load dataset with spark
df_airport_codes = spark.read.format("csv").option("header", "true").load("./airport-codes_csv.csv")
# clear missing temperature values
df_airport_codes = df_airport_codes.filter(df_airport_codes.iata_code  != 'NaN')
df_airport_codes.show()
df_ = pd.read_csv("./airport-codes_csv.csv")



+-----+-------------+--------------------+------------+---------+-----------+----------+---------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|   municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+---------------+--------+---------+----------+--------------------+
|  03N|small_airport|      Utirik Airport|           4|       OC|         MH|    MH-UTI|  Utirik Island|    K03N|      UTK|       03N|  169.852005, 11.222|
| 07FA|small_airport|Ocean Reef Club A...|           8|       NA|         US|     US-FL|      Key Largo|    07FA|      OCA|      07FA|-80.274803161621,...|
|  0AK|small_airport|Pilot Station Air...|         305|       NA|         US|     US-AK|  Pilot Station|    null|      PQS|       0AK|-162.899994, 61.9...|
| 0CO2|small_airport|Crested Butte Air...|        8980|       NA

In [13]:
#Filter on United States
df_airport_codes= df_airport_codes.filter(df_airport_codes.iso_country == 'US')
df_airport_codes.show()

+-----+-------------+--------------------+------------+---------+-----------+----------+---------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|   municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+---------------+--------+---------+----------+--------------------+
| 07FA|small_airport|Ocean Reef Club A...|           8|       NA|         US|     US-FL|      Key Largo|    07FA|      OCA|      07FA|-80.274803161621,...|
|  0AK|small_airport|Pilot Station Air...|         305|       NA|         US|     US-AK|  Pilot Station|    null|      PQS|       0AK|-162.899994, 61.9...|
| 0CO2|small_airport|Crested Butte Air...|        8980|       NA|         US|     US-CO|  Crested Butte|    0CO2|      CSE|      0CO2|-106.928341, 38.8...|
| 0TE7|small_airport|   LBJ Ranch Airport|        1515|       NA

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

<h3>Schema for Immigration Data Analysis</h3>

![image info](star_schema.png)


#### 3.2 Mapping Out Data Pipelines
Data pipeline steps
<ul>
 <li>Clean immigration data </li>
 <li> Clean temperature data</li>
 <li> Clean airports data</li>
 <li>Create the fact table </li>
 <li>Create two dimensions tables: immigration table, temperature table and airports table</li>
 <li>Check the data quality we will perform to ensure the pipeline ran as expected.</li>
    </ul>

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [18]:
#Create the fact table with immigration table
immigration_table = df_immigration_data.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa", ])
immigration_table.createOrReplaceTempView("immigration_view")

In [19]:
#Create a dimension table with airpors code table
airports_table= df_airport_codes.select(['iata_code', 'name', 'type',  'local_code', 'coordinates', 'municipality'])
airports_table.createOrReplaceTempView("airports_view")

In [23]:
##Create a dimension table with temerature table
temp_table = df_temperature_data.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude"])
temp_table.createOrReplaceTempView("temperature_view")

#### 4.2 Data Quality Checks
Let's check the data quality we will perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [26]:
print("The number records for immigration table: ", immigration_table.count(), " rows")

The number records for immigration table:  1000  rows


In [27]:
print("The number records for airports table: ", airports_table.count(), " rows")

The number records for airports table:  9189  rows


In [28]:
print("The number records for temperature table: ", temp_table.count(), " rows")

The number records for temperature table:  8237  rows


#### 4.3 Data dictionary 
Create a data dictionary for your data model. <br>

<h3>Schema for Immigration Data Analysis</h3>
Using the US I94 immigration, airports and Temperature datasets, we need to create a star schema optimized for queries on Immigration data analysis. This includes the following tables.

![image info](star_schema.png)

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x. <br>
 If Spark can not process 100x dataset, we could use Amazon Redshift to optimize the use of large data sets.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.<br>
 We could use Apache Airflow and regularly update the ETL data pipeline.
 * The database needed to be accessed by 100+ people.<br>
 Amazon Redshift could make a good performance for the database access.