# 2016 Immigration data model
### Data Engineering Capstone Project

#### Project Summary
This project is an interpretation of the Udacity provided data engineering capstone project. 
I used the immigration dataset allong with the temperature dataset to develop analytics table to gain insight on immigration trends in the US based on city temperatures.
Using a postgres database the data is first ingested into staging tables and then normilized into a star schema for analytics.
A summary table is created for quick analytics

Note in this notebook an overview is given of the process the full pipeline is run through a python script (and can also be run through airflow) as loading the full dataset here would take considerable amount of time

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [216]:
# dependecies
from datetime import datetime, timedelta
import re
import pandas as pd
import psycopg2 as ps
from io import StringIO
import os

pd.set_option('display.max_columns', None)  # or 1000
pd.set_option('display.max_rows', None)

### Step 1: Scope the Project and Gather Data

#### Scope 
For this project we will aggragate the immigration and temperature datasets provided by Udacity to create the fact and dimesion tables. The two datasets will be joined on destination city to form the summary table. 
The final database is optimized to query on immigration events to determine if temperature affects the selection of destination cities. Postgres is used to process the data.


#### Describe and Gather Data 
The immigration dataset comes from the US National Tourism and Trade Office ([see here](https://travel.trade.gov/research/reports/i94/historical/2016.html)). It is provided in SAS7BDAT format.
Relevant attributes include:

- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination USA city
- arrdate = arrival date in the USA
- i94mode = 1 digit travel code
- depdate = departure date from the USA
- i94visa = reason for immigration
- gender = immigrant gender
- biryear: four-digit year of birth

The temperature data comes from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data). It is provided in csv format. Relevant attributes include:

- AverageTemperature = average temperature
- City = city name
- Country = country name
- Latitude= latitude
- Longitude = longitude

#### Imigration data

In [217]:
# load immigration data into dataframe
# Note here we only load the example as demonstration (see README)
immi_df = pd.read_csv('./data/immigration_data_sample.csv')

In [218]:
# Visualize dataframe
pd.set_option('display.max_columns', 30)
display(immi_df.head())

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


#### US city demographics data

In [219]:
# load demographics data
dem_df = pd.read_csv('./data/us-cities-demographics.csv', delimiter=';')

In [220]:
# Visualize dataframe
display(dem_df.head())

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


#### Global temperature data

In [221]:
# load immigration data into dataframe
temp_df = pd.read_csv('./data/GlobalLandTemperaturesByCity.csv')

In [222]:
# Visualize dataframe
temp_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### Step 2: Explore and Assess the Data
#### Immigration data 
For the immigration data, we want to drop all invalid entries for destination and origin cities (e.g., XXX, 11, etc) as described in I94_SAS_Labels_Description.SAS.

In [223]:
i94port = pd.read_csv('./data/i94port.csv')
i94port.head()

Unnamed: 0,code,city,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


In [224]:
i94res = pd.read_csv('./data/i94res.csv', sep=';')
i94res.head()

Unnamed: 0,id,country
0,582,MEXICO
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [225]:
df = immi_df[immi_df.i94port.isin(list(i94port.code))]
df = df[df.i94res.isin(list(i94res.id))]
immi_df = df[df.i94cit.isin(list(i94res.id))]
display(immi_df.head())

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT
5,721257,1481650.0,2016.0,4.0,577.0,577.0,ATL,20552.0,1.0,GA,20606.0,51.0,2.0,1.0,20160408,,,T,N,,M,1965.0,10072016,M,,DL,736852600.0,910,B2


#### Temperature data
For the temperature data we are only interested in Unique values for United States cities.
The data is filtered by:
- Country equals United States
- Remove duplicates (city, country)
- Remove rows with NaN average temperature values

In [226]:
df = temp_df[temp_df.Country == 'United States']
df = df.drop_duplicates(['City', 'Country'])
temp_df = df[pd.notnull(df.AverageTemperature)]
display(temp_df.head())

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
137066,1743-11-01,3.209,1.961,Akron,United States,40.99N,80.95W
168075,1820-01-01,-3.42,3.182,Albuquerque,United States,34.56N,107.03W
187528,1743-11-01,5.339,1.828,Alexandria,United States,39.38N,76.99W
202251,1743-11-01,3.264,1.665,Allentown,United States,40.99N,74.56W


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The datamodel is presented in the image below 
![alt text](./images/ERDiagram.png)
A simple star scema is chose so that the can be quickly queried to gain insights into immigration paterns based on temperature and destivation without having to do many joins while still remaining in a normilized form.

an aggragated summary table is also created as part of the data quality check
![alt text](./images/Summary.png)

#### 3.2 Mapping Out Data Pipelines
The full data pipline is handle directly in postgresql.
The choice for using postgres directly for the etl process is motvated by the ability to easily handle and manipulate the large data set, perform neccesary checks and enforce the data quality.
As the dataset was not to large postgres was chosen over using spark.
To handle the full ingestion airflow can be is used to parrellize and schedule the etl process but for now a python script was used

The pipeline consists of 3 major steps, staging the data, transforming the data and aggragation.
The data is staged into staging table as raw data (with only a few columns removed from immigration dataset) and is subsequently transformed into the star schema using sql commands in the following order.
1. load countries dimension table
2. join temperature, cities and contries data to load cities dimension table
3. transform immigrations data and load immigration fact table
4. aggragate data on destination city to create summary table


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [227]:
# connection
try:
    conn = ps.connect("postgresql://{}:{}@{}/{}".format(os.getenv(DBUSER), os.getenv(DBPW), os.getenv(DBHOST), os.getenv(DB)))
    cur = conn.cursor()
except:
    print('Error connecting to database')

#### Create tables

In [228]:
try:
    with open ('./sql/create_staging.sql', 'r') as f:
        cur.execute(f.read())
    with open ('./sql/create_analytics.sql', 'r') as f:
        cur.execute(f.read())
    conn.commit()
    print('Tables created')
except Exception as e:
    conn.rollback
    print('Error creating tables: ' + str(e))

Tables created


#### Stage temperature and city/country data

In [229]:
# Generic function to load stagig tables
def load_data(csv, table:str, delimiter:str=','):
    sql = """
    COPY {}
    FROM STDIN DELIMITER '{}' CSV HEADER
    """.format(table, delimiter)
    cur.copy_expert(sql, csv)
    conn.commit()
    csv.close()

In [230]:
# stage main dimensions
load_data(open('./data/GlobalLandTemperaturesByCity.csv', 'r', encoding="ISO-8859-1"), 'staging.city_temp')
load_data(open('./data/i94port.csv', 'r', encoding="ISO-8859-1"), 'staging.i94port')
load_data(open('./data/i94res.csv', 'r', encoding="ISO-8859-1"), 'staging.i94res', ';')

In [231]:
# remove unnecesarry columns from immigration data to clean data slightly
df = pd.read_csv('./data/immigration_data_sample.csv', encoding="ISO-8859-1")
df = df[[
    'i94yr','i94mon','i94cit','i94res','i94port','arrdate',
    'i94mode','depdate','i94bir','i94visa','gender', 'biryear'
]]

csv = StringIO(newline='')
df.to_csv(csv, sep=',', index=False)
csv.seek(0)
load_data(csv, 'staging.immigration')

 #### Transform data 

In [232]:
# transform and load countries
try:
    with open ('./sql/load_countries.sql', 'r') as f:
        cur.execute(f.read())
    conn.commit()
    print('Contries data inserted')
except Exception as e:
    conn.rollback()
    print('Failed to transform data: ' + str(e))

Contries data inserted


In [233]:
# transform and load cities and temperature
try:
    with open ('./sql/load_cities.sql', 'r') as f:
        cur.execute(f.read())
    conn.commit()
    print('Cities data inserted')
except Exception as e:
    conn.rollback()
    print('Failed to transform data: ' + str(e))

Cities data inserted


In [234]:
# transform and load immigration fact table
try:
    with open ('./sql/load_immigration.sql', 'r') as f:
        cur.execute(f.read())
    conn.commit()
    print('Immigration data inserted')
except Exception as e:
    conn.rollback()
    print('Failed to transform data: ' + str(e))

Immigration data inserted


In [235]:
# transform and summerize
try:
    with open ('./sql/load_summary.sql', 'r') as f:
        cur.execute(f.read())
    conn.commit()
    print('Summary data inserted')
except Exception as e:
    conn.rollback()
    print('Failed to transform data: ' + str(e))

Summary data inserted


#### 4.2 Data Quality Checks
Data quality is ensured by the relational database using following constraints:
- Unique country in countries table 
- Unique city & state in cities table
- Not null constraints on full summary table

Aside from this the data quality is checked after running the pipeline:
- Count on Summary table rows (if rows present all tables have data that fits data model)
- Check that data is from 2016

 


In [236]:
# Perform quality checks here
try:
    with open ('./sql/data_quality.sql', 'r') as f:
        cur.execute(f.read())
    conn.commit()
    print('Data Pipeline ran ok')
except Exception as e:
    conn.rollback()
    print('Data Pipeline unsuccessfull: ' + str(e))

Data Pipeline ran ok


In [237]:
conn.close()

#### 4.3 Data dictionary 

##### Countries table
- id: primary key
- country: country name

##### Cities table
- id: primary key
- city: city name
- state: state name
- country_id: country of city, foreign key to countries.id
- avg_temp: city average temperature

##### Visas table
- id: primary key
- visa: visa purpose

##### Modes table
- id: primary key
- mode: mode of transport

##### Immigrants table
- id: primary key
- i94yr: 4 digit year, always 2016
- i94mon: numeric month, 1-12
- i94cit: immigrant's country of citizenship; foreign key to countries.id
- i94res: immigrant's country of residence outside US; foreign key to countries.id
- i94port: port of entry; foreign key to cities.id
- arrdate: arrival date of immigrant where 20454 == 1/1/2016
- i94mode: mode of arrival; foreign key to mode.id
- depdate: departure date of immigrant where 20454 == 1/1/2016
- i94visa: purpose of visa; foreign key to visa.id
- biryear: four-digit year of birth
- gender: immagrant gender M or F

##### Summary table
- id: primary key
- i94yr: 4 digit year, always 2016
- i94mon: numeric month, 1-12
- i94cit: immigrant's country of citizenship
- i94res: immigrant's country of residence outside US
- i94port: port of entry
- i94mode: mode of arrival
- i94visa: purpose of visa
- avg_temp: arrival city average temperature
- count: number of immigrants

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
Python and Pandas was used for exploring and staging the data because it can easily read all the data formats provided and then easily get them into a database.

A relational database was chosen to easily enforce data quality aswel as normilize the data to avoid requirements for updating multiple tables.

The choice for using postgres directly for the etl process is motivated by the ability to easily handle and manipulate the large data set, perform neccesary checks and easily enforce the data quality through the data model itself.
Along with the fact that the dataset was not extremly large postgres was chosen over using spark (over 1e6 data points well withing the capabilities of postgres).

To handle the full ingestion a python script was developed for simplicity and follows the same structure as presented here in this notebook.

* Propose how often the data should be updated and why.
The data should be updated when new immigration data becomes available to keep ingestions speeds optimized monthly/quarterly updates are suggested and can be fed into the live database

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 
 *While Postgres is still capable of handeling this quantity of data a significant increase in data size (100x)
 would cause a bottleneck in the staging process. 
 This step would no longer be done in batch but the data would have to be chunked and be staged incrementally.
 Alternitvely a switch to redshif cluster (instead of postgres server) would also provide a solution as staging can be done much faster and the database is the  horizontally scalable as well.
 The increased size would also warrent the use of big data tools such as spark as staging the data may cause a bottleneck in the process.*


 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
 *If the data needs to populate a dashboard daily we could use a scheduling tool such as Airflow to run the ETL pipeline overnight or a setting up a simple cronjob may also suffice*
 
 
 * The database needed to be accessed by 100+ people.
 
 *Accessibility of the data should not be too much of an issue, but the default limits for postgres is aroud 100 users.
 For more users it can be recommende to create an API layer infront of the database to slightly balance the load on the database or to consider a multi zone deployment where the database is replicated in multiple zones to usage load can be balanced throughout these nodes using a loadbalancer*
