# U.S. Immigrants Analysis
### Data Engineering Capstone Project

#### Project Summary
I implemented the Udacity provided Data Engineering project using following datasets:
* I94 Immigration Data: This data comes from the US National Tourism and Trade Office.  
* U.S. City Demographic Data: This data comes from OpenSoft. 
* World Temperature Data: This dataset came from Kaggle. 
* Airport Code Data: This is a simple table of airport codes and corresponding cities. 
* I94_SAS_Labels_Descriptions.SAS - used the descriptions available in this file

Used AWS EMR to preprocess the above datasets and store it into S3. Then load this data to AWS Redshift fact && dimension tables, applied  data quality checks using Apache Airflow data pipeline. The Redshift datawarehouse can be used for further analysis of the data using SQL. 

The project steps are:
* Step 1: Scope the Project and Gather Data
    - Since the scope of the project will be highly dependent on the data, these two things happen simultaneously. In this step, you’ll:
    Identify and gather the data you'll be using for your project (at least two sources and more than 1 million rows). 
    See Project Resources for ideas of what data you can use.
    Explain what end use cases you'd like to prepare the data for (e.g., analytics table, app back-end, source-of-truth database, etc.)

* Step 2: Explore and Assess the Data
    - Explore the data to identify data quality issues, like missing values, duplicate data, etc.
    Document steps necessary to clean the data

* Step 3: Define the Data Model
    - Map out the conceptual data model and explain why you chose that model
    List the steps necessary to pipeline the data into the chosen data model

* Step 4: Run ETL to Model the Data
    - Create the data pipelines and the data model
    Include a data dictionary
    Run data quality checks to ensure the pipeline ran as expected
    Integrity constraints on the relational database (e.g., unique key, data type, etc.)
    Unit tests for the scripts to ensure they are doing the right thing
    Source/count checks to ensure completeness

* Step 5: Complete Project Write Up
    - What's the goal? What queries will you want to run? How would Spark or Airflow be incorporated? Why did you choose the model you chose?
    Clearly state the rationale for the choice of tools and technologies for the project.
    Document the steps of the process.
    Propose how often the data should be updated and why.
    Post your write-up and final data model in a GitHub repo.
    Include a description of how you would approach the problem differently under the following scenarios:
    If the data was increased by 100x.
    If the pipelines were run on a daily basis by 7am.
    If the database needed to be accessed by 100+ people. 

In [1]:
import re
from collections import defaultdict
from datetime import datetime, timedelta
import psycopg2

import pandas as pd
pd.set_option('display.max_columns', 28)

### Step 1: Scope the Project and Gather Data

#### Scope 
Deliver a cloud based data warehouse solution that supports integrations with BI tools. I chose AWS Redshift as the DWH.  

Users of the DWH tool should be able to analyze the datamart and find answers such as:
applicants by nationality/origin/airplane.
Correlations between destination in the U.S and the applicant's country.
Correlations between climates. 
Correlations between applicant's demographics, and states visited in U.S.

#### Describe and Gather Data 
- I94 Immigration Data: This data comes from the US National Tourism and Trade Office.
- U.S. City Demographic Data: This data comes from OpenSoft.
- World Temperature Data: This dataset came from Kaggle.
- Airport Code Data: This is a simple table of airport codes and corresponding cities.
- I94_SAS_Labels_Descriptions.SAS - used the descriptions available in this file

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In the following sections I briefly describe the datasets provided and give a summarized idea on the reasons I took into consideration when deciding what data to use.

#### U.S Immigration data

In [17]:
# Read the U.S Immigration data
us_immigraiton_fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
us_immigraiton_df = pd.read_sas(us_immigraiton_fname, 'sas7bdat', encoding="ISO-8859-1")

In [18]:
us_immigraiton_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


#### The immigration dataset is our fact table (star schema in the data warehouse)

#### Data dictionary

- cicid - ID that uniquely identify one record in the dataset
- i94yr - 4 digit year
- i94mon - Numeric month
- i94cit - 3 digit code of source city for immigration (Born country)
- i94res - 3 digit code of source country for immigration (Residence country)
- i94port - Port addmitted through
- arrdate - Arrival date in the USA
- i94mode - Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
- i94addr - State of arrival
- depdate - Departure date
- i94bir - Age of Respondent in Years
- i94visa - Visa codes collapsed into three categories: (1 = Business; 2 = Pleasure; 3 = Student)
- count - Used for summary statistics
- dtadfile - Character Date Field
- visapost - Department of State where where Visa was issued
- occup - Occupation that will be performed in U.S.
- entdepa - Arrival Flag. Whether admitted or paroled into the US
- entdepd - Departure Flag. Whether departed, lost visa, or deceased
- entdepu - Update Flag. Update of visa, either apprehended, overstayed, or updated to PR
- matflag - Match flag
- biryear - 4 digit year of birth
- dtaddto - Character date field to when admitted in the US
- gender  - Gender
- insnum  - INS number 
- airline - Airline used to arrive in U.S. 
- admnum  - Admission number, should be unique and not nullable 
- fltno   - Flight number of Airline used to arrive in U.S.
- visatype - Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

### Global Temperature data 
In this project, we will use the data available in ../../data2/GlobalLandTemperaturesByCity

#### Data dictionary
- dt - Date in format YYYY-MM-DD
- AverageTemperature - Average temperature of the city in a given date
- AverageTemperatureUncertainty - Average temperature of the city
- City - City Name
- Country - Country Name
- Latitude - Latitude
- Longitude - Longitude

In [19]:
temperature_fname = '../../data2/GlobalLandTemperaturesByCity.csv'
world_temperature_df = pd.read_csv(temperature_fname)

# Group by country and produce average temperature levels per country
world_temperature_df = world_temperature_df.groupby(["Country"]).agg({
    "AverageTemperature": "mean", 
    "Latitude": "last", 
    "Longitude": "last"}
).reset_index()

In [20]:
world_temperature_df.head()

Unnamed: 0,Country,AverageTemperature,Latitude,Longitude
0,Afghanistan,13.816497,36.17N,69.61E
1,Albania,15.525828,40.99N,19.17E
2,Algeria,17.763206,31.35N,5.65E
3,Angola,21.759716,15.27S,14.17E
4,Argentina,16.999216,26.52S,64.48W


### Airports data 
lets read the data available in airport-codes_csv.csv

#### Data dictionary
 - ident - Unique identifier for the airport
 - type - Airport type
 - name - Airport Name
 - elevation_ft - Altitude of the airport
 - continent - Continent
 - iso_country - ISO code of airport country
 - iso_region - ISO code for airport region
 - municipality - City
 - gps_code - GPS code of airport
 - iata_code - IATA code of airport
 - local_code - Local code of airport
 - coordinates - GPS coordinates of airport

In [21]:
airports_df = pd.read_csv("airport-codes_csv.csv")

In [22]:
airports_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


#### After exploring the airport data I realized it was not possible to join with the immigrants data due to the absence of common keys. 
So I decided to discard the airport data for the data model

### U.S. City Demographic data 
This data comes from OpenSoft.It contains information about the demographics of the U.S. cities(means the census informaiton)
lets read the data available in us-cities-demographics.csv

#### Data dictionary 
- City
- State
- Median Age - Average age of the populaiton
- Male Population
- Female Population
- Total Population
- Number of Veterans
- Foreign-born
- Average Household Siz
- State Code - Common key sharing with Immigrants dataset
- Race
- Count - Count in each Race

In [23]:
usa_cities_demographics_df = pd.read_csv("us-cities-demographics.csv", sep=";")

In [24]:
usa_cities_demographics_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
I am using 3 data sources provided by the Udacity : 94 Immigration Data, World Temperature Data and U.S. City Demographic Data. Also,  used descriptions from file I94_SAS_Labels_Descriptions.SAS

I am following the STAR schema concept.

Tables in the Data model are:
* 1 -IMMIGRATION - The U.S Immigration Dataset is the FACT table here
* 2 - STATE - Dimension table created from U.S. City Demographic Data
* 3 - COUNTRY - Dimension table created from World Temperature Data
* 4 - DATE - Dimension table extracted from the imigraiton dataset

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

The actual ETL was done in a dedicated file **etl/s3_to_redshift_etl.py** using AWS EMR.
But, lets explore the ETL funcitons in this notebook as well.






In [1]:
from etl.s3_to_redshift_etl import etl_immigration_data, etl_countries_data, etl_states_data, create_spark_session, save, read_data, cast_type, capitalize_udf, change_field_value_condition

In [2]:
# Spark Session
import os, re
import configparser
from datetime import timedelta, datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, when, lower, isnull, year, month, dayofmonth, hour, weekofyear, dayofweek, date_format, to_date
from pyspark.sql.types import StructField, StructType, IntegerType, DoubleType
spark = create_spark_session()

In [3]:
spark

**ETL U.S Immigration dataset**

Loading of the immigration file into Spark dataframe and Save the processed immigration and date dataframes to the Amazon S3 in the parquet format


In [39]:
immigration = etl_immigration_data(spark, input_path='../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',
                                     output_path="s3a://udemy-data-engineer-capstone/immigration.parquet",
                                     date_output_path="s3a://udemy-data-engineer-capstone/date.parquet",
                                     input_format = "com.github.saurfang.sas.spark", 
                                     load_size=1000, partitionBy=None, 
                                     columns_to_save = '*')

**Countries dataset**

Loading of the global temperature dataset file into Spark dataframe and Save the country dataframe to the Amazon S3 in the parquet format


In [None]:
countries = etl_countries_data(spark, output_path="s3a://udemy-data-engineer-capstone/country.parquet")

**States dataset**

Loading of the U.S. cities demographics dataset file into Spark dataframe and join it with I94ADDR.csv then save the merged dataframe into S3 bucket

In [None]:
states = etl_states_data(spark, output_path="s3a://udemy-data-engineer-capstone/state.parquet")

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Here I used **Apache Airflow** to build a DAG to extract data from S3 and load it into Redshift staging tables of the same name in Amazon Redshift. 
As a final step I applied  the data quality(simply counts) checking to ensure completeness.

Airflow data pipeline code is in the "airflow" folder.  

**Important files:**
 - airflow/dags/dag_capstone_s3_to_redshift.py - The main DAG file to load data from S3 to Redshift
 - airflow/dags/plugins/stage_redshift.py - Custom operator that performs the s3 to redshift transfer
 - airflow/dags/plugins/data_quality.py - Custom operator that checks redshift data quality 

#### 4.2 Data Quality Checks
- All the tables have a PrimaryKey constraint that uniquely identifies the records
- In the immigraiton fact table there are Foriegn Keys that guarantee that values in the fact table are present in the dimension tables(country, state).

In the data quality check step, this check I verified if every redshift table was actually loaded with count check in all the tables of the data model.
 


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

**Immigration** fact table
- cicid		- primary key
- i94yr		- year
- i94mon	- month
- i94cit	- 3 digit for the country code where the visitor was born. this is a fk to the country dimension table
- i94res	- 3 digit for the country code where the visitor resides in. this is a fk to the country dimension table
- arrdate	- arrival date in the usa. this is a fk to the date dimension table
- i94mode	- mode of transportation (1 = air; 2 = sea; 3 = land; 9 = not reported)
- i94addr	- state of arrival. this is a fk to the state dimension table
- depdate	- departure date from the usa. this is a fk to the date dimension table
- i94bir	- age of respondent in years
- i94visa	- visa codes collapsed into three categories: (1 = business; 2 = pleasure; 3 = student)
- biryear	- 4 digit year of birth
- gender	- gender
- airline	- airline used to arrive in u.s.
- fltno		- flight number of airline used to arrive in u.s.
- visatype	- class of admission legally admitting the non-immigrant to temporarily stay in u.s.
- stay		- number of days in the us

**Country** dimension table
- code			- country code. this is the pk.
- country		- country name
- temperature	- average temperature of the country between 1743 and 2013
- latitude		- gps latitude
- longitude		- gps longitude

**State** dimension table
- code						- primary key. this is the code of the state as in i94addr lookup table
- state						- name of the state
- blackorafricanamerican	- number of residents of the race black or african american
- white						- number of residents of the race white
- foreignborn				- number of residents that born outside th united states
- americanindianandalaskanative	- number of residents of the race american indian and alaska native
- hispanicorlatino	- number of residents of the race hispanic or latino
- asian	- number of residents of the race asian
- numberveterans	- number of residents that are war veterans
- femalepopulation	- number of female population
- malepopulation	- number of male population
- totalpopulation	- number total of the population

**Date** dimension table
- date - Date in the format YYYY-MM-DD. This is the PK.
- day - Two digit day
- month - Two digit month
- year - Four digit for the year
- weekofyea - The week of the year
- dayofweek - The day of the week

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
    - Python - this project is written in python
    - Pandas - Used for initial data analysis
    - Apark Spark (AWS EMR) - Used pyspark to pre-process the data and store it into S3
    - AWS S3 - used as intermediate data store for pre-processed data from Spark
    - AWS Redshift - Final datamark which stores the processed and cleansed data in the form of fact and dimension tables. Ready to be consumed by analysis using user's favorite BI tools 
    

* Propose how often the data should be updated and why.
    - Monthly basis is a good start
    

* Write a description of how you would approach the problem differently under the following scenarios:

 * The data was increased by 100x.
     - This project is built using EMR, S3 an Redshift. EMR can handle any large amount of data, so dies S3 and so does Redshift
     - Few more improvesments could be to enable the better compression mechanism in S3 and also enable parallel processing(copy) in Redshift
     
     
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     - This is easily doable on the Airflow DAG
     
 
 * The database needed to be accessed by 100+ people.
     - Redshift can support upto 500 connections. Also elastic resize option in Redshift comes handy.