# US-Immigration 2016 

## Data Engineering Capstone Project

### Project Summary

This project uses Spark to produce data warehouse about immigration to the USA in year 2016. It combines the following data domains into a single warehouse: 
- Immigration events
- Airports
- Airlines
- US cities/states demographics
- World countries temperature data


### Project Structure

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

# Step 1: Scope the Project and Gather Data

## Scope

The goal of this project is to provide an enriched view over the immigration datasets by:
- providing aditional data about demographics of a state where visitor heads to,
- providing airport information related to the port of entry of the visitor,
- providing airline information related to the airline that brought the visitor to the USA,
- providing information related to the country of origin of the visitor.

## Out-of-scope Note

This project focuses mainly on the ETL process using Spark.
The project has been designed to be extensible to run on AWS using S3, EMR, and Airflow, however, the implementation of this extension is out-of scope.

## Example Queries

1. How many visitors arrived to US with Lufthansa?
1. How many visitors arrived to California?
1. How many visitors arrived by sea?
1. Does enthinc profile of an US state correlates with the number of visitors?

## Datasets

This project combines six datasets:

1. **U.S. City Demographic Data (demographics)**: comes from OpenSoft and includes data by city, state, age, population, veteran status and race.

2. **I94 Immigration Data (immigration)**: comes from the US National Tourism and Trade Office and includes details on incoming immigrants and their ports of entry.

3. **Airport Code Table (airports)**: comes from datahub.io and includes airport codes and corresponding cities - [source](https://datahub.io/core/airport-codes#data)

4. **Countries (countries)**: comes from the file `I94_SAS_Labels_Descriptions.SAS`

5. **World Temparature data (temperature)**: comes form Kagle dataset - [source](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)

6. **Airlines (airlines)**: comes from OpenFlights - [source](https://raw.githubusercontent.com/jpatokal/openflights/master/data/airlines.dat)

This project will use the star schema with `immigration` as the fact table, whereas the other datasets serve as four dimmensions:
- `dim-demographics`
- `dim-airports`
- `dim-airlines`
- `dim-countries` (joined **temperature** and **countries**)


# Step 2: Explore and Assess the Data

In this step, each dataset is:
- displayed to explore it visually
- cleand to remove empty fields, and correct missing data


In [1]:
from helper.common import *
from helper.loading import *
from helper.cleaning import *
from helper.transform import *
from helper.data_checks import *

In [2]:
spark = create_spark_session()

## Step 2.1 Loading data from sources and browsing contents

In this section, we load data from 4 various formats:
- `csv`
- `json`
- `parquet` files (previously processed from a `sas7bdat` file)
- custom format `dat` for the **airlines** dataset

In [3]:
demographics = read_json(spark, "./data/us-cities-demographics.json").select('fields.*')
print_data(demographics)

Num rows =  2891


Unnamed: 0,average_household_size,city,count,female_population,foreign_born,male_population,median_age,number_of_veterans,race,state,state_code,total_population
0,2.73,Newark,76402,143873,86253,138040,34.6,5829,White,New Jersey,NJ,281913
1,2.4,Peoria,1343,62432,7517,56229,33.1,6634,American Indian and Alaska Native,Illinois,IL,118661
2,2.77,O'Fallon,2583,43270,3269,41762,36.0,5783,Hispanic or Latino,Missouri,MO,85032
3,2.48,Hampton,70303,70240,6204,66214,35.5,19638,Black or African-American,Virginia,VA,136454
4,2.29,Lakewood,33630,76576,14169,76013,37.7,9988,Hispanic or Latino,Colorado,CO,152589
5,2.68,Mesa,16044,236835,57492,234998,36.9,31808,American Indian and Alaska Native,Arizona,AZ,471833
6,2.55,Bryan,11914,40345,12014,41761,29.4,3602,Black or African-American,Texas,TX,82106
7,3.12,Garland,27217,120430,62975,116406,34.5,10407,Asian,Texas,TX,236836
8,2.22,Springfield,3871,62170,4264,55639,38.8,7525,Asian,Illinois,IL,117809
9,2.38,Flint,657,49313,2138,48984,35.3,3757,Asian,Michigan,MI,98297


In [4]:
airports = read_csv(spark, "./data/airport-codes_csv.csv")
print_data(airports)

Num rows =  56191


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


In [5]:
countries = read_csv(spark, "./data/countries.csv") # obtained from I94_SAS_Labels_Descriptions.SAS
print_data(countries)

Num rows =  289


Unnamed: 0,code,country_name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA
5,324,ANGOLA
6,529,ANGUILLA
7,518,ANTIGUA-BARBUDA
8,687,ARGENTINA
9,151,ARMENIA


In [6]:
airlines = load_airlines(spark, "./data/airlines.dat")
print_data(airlines)

Num rows =  6162


Unnamed: 0,Airline_ID,Name,Alias,IATA,ICAO,Callsign,Country,Active
0,-1,Unknown,\N,-,,\N,\N,Y
1,1,Private flight,\N,-,,,,Y
2,2,135 Airways,\N,,GNL,GENERAL,United States,N
3,3,1Time Airline,\N,1T,RNX,NEXTIME,South Africa,Y
4,4,2 Sqn No 1 Elementary Flying Training School,\N,,WYT,,United Kingdom,N
5,5,213 Flight Unit,\N,,TFU,,Russia,N
6,6,223 Flight Unit State Airline,\N,,CHD,CHKALOVSK-AVIA,Russia,N
7,7,224th Flight Unit,\N,,TTF,CARGO UNIT,Russia,N
8,8,247 Jet Ltd,\N,,TWF,CLOUD RUNNER,United Kingdom,N
9,9,3D Aviation,\N,,SEC,SECUREX,United States,N


In [None]:
# OPTIONAL STEP! Load i94_apr16_sub.sas7bdat and save as parquet

# df_spark = spark.read.format('com.github.saurfang.sas.spark').load('./data/i94_apr16_sub.sas7bdat')
# df_spark.write.parquet("./data/sas_data")

In [7]:
immigration = spark.read.parquet("./data/sas_data")
print_data(immigration)

Num rows =  3096313


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1
5,5748522.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20579.0,...,,M,1959.0,10292016,M,,NZ,94981800000.0,10,B2
6,5748523.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1950.0,10292016,F,,NZ,94979690000.0,10,B2
7,5748524.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1975.0,10292016,F,,NZ,94979750000.0,10,B2
8,5748525.0,2016.0,4.0,245.0,464.0,HOU,20574.0,1.0,FL,20581.0,...,,M,1989.0,10292016,M,,NZ,94973250000.0,28,B2
9,5748526.0,2016.0,4.0,245.0,464.0,LOS,20574.0,1.0,CA,20581.0,...,,M,1990.0,10292016,F,,NZ,95013550000.0,2,B2


In [8]:
temperature = read_csv(spark, "./data/GlobalLandTemperaturesByCity.csv")
print_data(temperature)

Num rows =  8599212


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.787999999999999,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.050999999999998,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


## Step 2.2 Cleaning data and displaying post-cleaning results

Respective cleaning steps are data-set specific and are briefly described next to each cleaning row in the notebook.

### Cleaning demographics dataset

Set correct formats in numbers and dates and select only important columns.
Fill null values with zeros in numeric fields.

In [10]:
demographics = clean_demographics(demographics)
print_data(demographics)

Num rows =  2891


Unnamed: 0,average_household_size,city,count,female_population,foreign_born,male_population,median_age,number_of_veterans,race,state,state_code,total_population
0,2.73,Newark,76402,143873,86253,138040,34.599998,5829,White,New Jersey,NJ,281913
1,2.4,Peoria,1343,62432,7517,56229,33.099998,6634,American Indian and Alaska Native,Illinois,IL,118661
2,2.77,O'Fallon,2583,43270,3269,41762,36.0,5783,Hispanic or Latino,Missouri,MO,85032
3,2.48,Hampton,70303,70240,6204,66214,35.5,19638,Black or African-American,Virginia,VA,136454
4,2.29,Lakewood,33630,76576,14169,76013,37.700001,9988,Hispanic or Latino,Colorado,CO,152589
5,2.68,Mesa,16044,236835,57492,234998,36.900002,31808,American Indian and Alaska Native,Arizona,AZ,471833
6,2.55,Bryan,11914,40345,12014,41761,29.4,3602,Black or African-American,Texas,TX,82106
7,3.12,Garland,27217,120430,62975,116406,34.5,10407,Asian,Texas,TX,236836
8,2.22,Springfield,3871,62170,4264,55639,38.799999,7525,Asian,Illinois,IL,117809
9,2.38,Flint,657,49313,2138,48984,35.299999,3757,Asian,Michigan,MI,98297


### Cleaning airports dataset

Filter-out null values in fields designed to be FK.
Set correct formats in numbers and dates and select only important columns.

In [11]:
airports = clean_airports(airports)
print_data(airports)

Num rows =  14215


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
1,00AK,small_airport,Lowell Field,450.0,,US,AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
2,00AL,small_airport,Epps Airpark,820.0,,US,AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
3,00AS,small_airport,Fulton Airport,1100.0,,US,OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
4,00AZ,small_airport,Cordes Airport,3810.0,,US,AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
5,00CA,small_airport,Goldstone /Gts/ Airport,3038.0,,US,CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
6,00CL,small_airport,Williams Ag Airport,87.0,,US,CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
7,00FA,small_airport,Grass Patch Airport,53.0,,US,FL,Bushnell,00FA,,00FA,"-82.21900177001953, 28.64550018310547"
8,00FL,small_airport,River Oak Airport,35.0,,US,FL,Okeechobee,00FL,,00FL,"-80.96920013427734, 27.230899810791016"
9,00GA,small_airport,Lt World Airport,700.0,,US,GA,Lithonia,00GA,,00GA,"-84.06829833984375, 33.76750183105469"


### Cleaning immigration dataset

Rename columns to remove white spaces.
Set correct formats in numbers and dates and select only project-relevant columns.

In particular remove the following columns: `['admnum', 'biryear', 'count',
'dtaddto', 'dtadfile', 'entdepa', 'entdepd', 'entdepu', 'insnum', 'matflag', 'occup']`.

In [12]:
immigration = clean_immigration(immigration)
print_data(immigration)

Num rows =  2943721


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,dtadfile,gender,airline,fltno,visatype
0,5748517,2016,4,245,438,LOS,2016-04-30,1,CA,2016-05-08,40,1,20160430,F,QF,11,B1
1,5748518,2016,4,245,438,LOS,2016-04-30,1,NV,2016-05-17,32,1,20160430,F,VA,7,B1
2,5748519,2016,4,245,438,LOS,2016-04-30,1,WA,2016-05-08,29,1,20160430,M,DL,40,B1
3,5748520,2016,4,245,438,LOS,2016-04-30,1,WA,2016-05-14,29,1,20160430,F,DL,40,B1
4,5748521,2016,4,245,438,LOS,2016-04-30,1,WA,2016-05-14,28,1,20160430,M,DL,40,B1
5,5748522,2016,4,245,464,HHW,2016-04-30,1,HI,2016-05-05,57,2,20160430,M,NZ,10,B2
6,5748523,2016,4,245,464,HHW,2016-04-30,1,HI,2016-05-12,66,2,20160430,F,NZ,10,B2
7,5748524,2016,4,245,464,HHW,2016-04-30,1,HI,2016-05-12,41,2,20160430,F,NZ,10,B2
8,5748525,2016,4,245,464,HOU,2016-04-30,1,FL,2016-05-07,27,2,20160430,M,NZ,28,B2
9,5748526,2016,4,245,464,LOS,2016-04-30,1,CA,2016-05-07,26,2,20160430,F,NZ,2,B2


### Cleaning airlines dataset

Filter-out null values in fields designed to be FK.


In [13]:
airlines = clean_airlines(airlines)
print_data(airlines)

Num rows =  1535


Unnamed: 0,Airline_ID,Name,IATA,ICAO,Callsign,Country,Active
0,3,1Time Airline,1T,RNX,NEXTIME,South Africa,Y
1,10,40-Mile Air,Q5,MLA,MILE-AIR,United States,Y
2,13,Ansett Australia,AN,AAA,ANSETT,Australia,Y
3,14,Abacus International,1B,,,Singapore,Y
4,15,Abelag Aviation,W9,AAB,ABG,Belgium,N
5,21,Aigle Azur,ZI,AAF,AIGLE AZUR,France,Y
6,22,Aloha Airlines,AQ,AAH,ALOHA,United States,Y
7,24,American Airlines,AA,AAL,AMERICAN,United States,Y
8,28,Asiana Airlines,OZ,AAR,ASIANA,Republic of Korea,Y
9,29,Askari Aviation,4K,AAS,AL-AAS,Pakistan,Y


### Cleaning temperature dataset

Rename skewed values in fields designed to be FK.
Aggregate cities information to focus on the countries level.

In [14]:
temperature = clean_temperature(temperature)
print_data(temperature)

Num rows =  159


Unnamed: 0,Country,avg_temperature,latitude,longitude,country_name_lower
0,Chad,27.189829,8.84N,15.41E,chad
1,Paraguay,22.784014,24.92S,56.75W,paraguay
2,Russia,3.347268,52.24N,47.30E,russia
3,Yemen,25.768408,13.66N,45.41E,yemen
4,Senegal,25.984177,15.27N,17.50W,senegal
5,Sweden,5.665518,58.66N,12.31E,sweden
6,Guyana,26.549849,7.23N,57.57W,guyana
7,Burma,26.01684,20.09N,92.13E,burma
8,Eritrea,24.001516,15.27N,39.17E,eritrea
9,Philippines,26.516462,15.27N,120.83E,philippines


### Cleaning countries dataset

Rename skewed values in fields designed to be FK.
Set correct formats in numbers-fileds.

In [15]:
countries = clean_countries(countries)
print_data(countries)

Num rows =  289


Unnamed: 0,code,country_name
0,582,MEXICO
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA
5,324,ANGOLA
6,529,ANGUILLA
7,518,ANTIGUA-BARBUDA
8,687,ARGENTINA
9,151,ARMENIA


# Step 3: Define the Data Model

## 3.1. Conceptual Data Model

_Map out the conceptual data model and explain why you chose that model._

I choose to implement a data model based on **star schema**.

### 3.1.1. Dimension Tables:

#### dim_demographics

This table is the result of the aggregation of the demographics dataset by the State column. 
Fields: `male_population`, `female_population`, `total_population`, `number_of_veterans`, `foreign-born` were aggregated by City using `first` function, since they are repeated accross the different rows of the same city. 
The fields `median_age` and `average_household_size` were aggegated using the `avg` function to estimate the real values for the state. 
Respective races were `pivot`ed into separate columns, so that ethnic profile for each state is available.

Structure:
- `state_code` **(FK)**
- `state`
- `hispanic_or_latino`
- `foreign_born`
- `asian`
- `male_population`
- `average_household_size`
- `total_population`
- `female_population`
- `black_or_african_american`
- `median_age`
- `number_of_veterans`
- `american_indian_and_alaska_native`
- `white`
 
#### dim_airlines

Table `dim_arilines` was not processed, excpet of the cleaning steps described in Step 2.

Structure:
- `airline_id`
- `name`
- `IATA` **(FK)**
- `ICAO`
- `callsign`
- `country`
- `active`
  
#### dim_airports

Table `dim_airports` was not processed, excpet of the cleaning steps described in Step 2.

Structure:
- `ident`
- `type`
- `name`
- `elevation_ft`
- `continent` 
- `iso_country`
- `iso_region`
- `municipality`
- `gps_code`
- `iata_code`
- `local_code` **(FK)**
- `coordinates`
    
#### dim_countries

Table `dim_countries` was joined from the `countires` and the `temperature` data. This allowed to connect country code to useful (although limited) information about the geographical position and the temperatures in a given country.
The tables were joined using a temporary filed `country_name_lower` so that the case mismatch could be avoided. The field was later removed from the resulting table to not duplicate the data.

Structure:
  - `code` **(FK)**
  - `country`
  - `avg_temperature`
  - `latitude`
  - `longitude`


### 3.1.2. Fact Table:

#### f_imigration

Table `f_immigration` was transformed once before the end result was obtained.
The `arrdate` field (containig the arrival date of the visitor) was parsed and three parts (year, month, day) were extracted to be later used for partitioning of the dataset. This resulted in additional technical fields: `arrival_year`, `arrival_month`, `arrival_day`.

Structure:
- `cicid`
- `i94yr`
- `i94mon`
- `i94cit` **(FK)** - `dim_countries`
- `i94res`
- `i94port` **(FK)** - `dim_airports`
- `arrdate`
- `i94mode`
- `i94addr` **(FK)** - `dim_demographics`
- `depdate`
- `i94bir`
- `i94visa`
- `dtadfile`
- `gender`
- `airline` **(FK)** - `dim_airlines`
- `fltno`
- `visatype`
- `arrival_year`
- `arrival_month`
- `arrival_day`

## 3.2 Mapping Out Data Pipelines
_List the steps necessary to pipeline the data into the chosen data model_

To accomplish the goal of the ETL process, I developed a number of functions in a package we called `helper`. The `helper` contains five modules responsible for: *loading*, *cleaning*, *transforming*, *data_checks*, and one *common* module that gathers shared functions. The open-source framework Apache Spark was the main tool in this process.

The documentation of the functions can be found in the docstrings for each module and function in the `helper` package file.

The code of `helper` is a base for running the pipeline in the EMR cluster in the AWS Cloud. Note that a  demonstrtion-ready code for this is not in the scope of this project! I envision the following conceptual DAG for the processing in the cloud:

(pseudocode)
```
start >> read_demographics
start >> read_immigration
start >> read_countries
start >> read_airlines
start >> read_airports
start >> read_temperature

read_demographics >> clean_demographics
read_immigration >> clean_immigration
read_countries >> clean_countries
read_airlines >> clean_airlines
read_airports >> clean_airports
read_temperature >> clean_temperature

clean_demographics >> transform_demographics
clean_immigration >> transform_immigration
clean_countries >> transform_countries
clean_temperature >> transform_temperature

transform_countries >> countries_and_temperature_ready
transform_temperature >> countries_and_temperature_ready

countries_and_temperature_ready >> transform_join_temperature_countries

transform_join_temperature_countries >> build_fact_ready
transform_demographics >> build_fact_ready
transform_immigration >> build_fact_ready
clean_airlines >> build_fact_ready
clean_airports >> build_fact_ready

build_fact_ready >> build_fact

build_fact >> end
```
 

# Step 4: Run Pipelines to Model the Data 

## 4.1 Create the data model

Run the data pipelines to create the data model.

In [16]:
demographics = transform_demographics(demographics)
print_data(demographics)

Num rows =  49


Unnamed: 0,state_code,state,hispanic_or_latino,foreign_born,asian,male_population,average_household_size,total_population,female_population,black_or_african_american,median_age,number_of_veterans,american_indian_and_alaska_native,white
0,MT,Montana,10000,5977,4165,87707,2.275,181294,93587,3349,35.5,13854,9684,169026
1,NC,North Carolina,354409,379327,178740,1466105,2.475,3060199,1594094,1029446,33.785715,166146,35209,1790136
2,MD,Maryland,138644,229794,128839,627951,2.655,1312129,684178,573768,36.37,64143,16155,594522
3,CO,Colorado,703722,337631,148790,1454619,2.56,2935669,1481050,208043,35.81875,187896,62613,2463916
4,CT,Connecticut,309992,225866,48311,432157,2.6625,885581,453424,231822,34.9625,24953,10729,505674
5,IL,Illinois,1215659,941735,374589,2218541,2.723158,4562312,2343771,1130574,35.847369,146701,35097,2620068
6,NJ,New Jersey,600437,477028,116844,705736,2.965833,1428908,723172,452202,35.125,30195,11350,615083
7,DE,Delaware,5516,3336,1193,32680,2.45,71957,39277,44182,36.400002,3063,414,23743
8,DC,District of Columbia,71129,95117,35072,319705,2.24,672228,352523,328786,33.799999,25963,6130,285402
9,AR,Arkansas,77813,62108,22062,286479,2.53,589879,303400,149608,32.766666,31704,9381,384733


In [17]:
immigration = transform_immigration(immigration)
print_data(immigration)

Num rows =  2943721


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,dtadfile,gender,airline,fltno,visatype,arrival_year,arrival_month,arrival_day
0,5748517,2016,4,245,438,LOS,2016-04-30,1,CA,2016-05-08,40,1,20160430,F,QF,11,B1,2016,4,30
1,5748518,2016,4,245,438,LOS,2016-04-30,1,NV,2016-05-17,32,1,20160430,F,VA,7,B1,2016,4,30
2,5748519,2016,4,245,438,LOS,2016-04-30,1,WA,2016-05-08,29,1,20160430,M,DL,40,B1,2016,4,30
3,5748520,2016,4,245,438,LOS,2016-04-30,1,WA,2016-05-14,29,1,20160430,F,DL,40,B1,2016,4,30
4,5748521,2016,4,245,438,LOS,2016-04-30,1,WA,2016-05-14,28,1,20160430,M,DL,40,B1,2016,4,30
5,5748522,2016,4,245,464,HHW,2016-04-30,1,HI,2016-05-05,57,2,20160430,M,NZ,10,B2,2016,4,30
6,5748523,2016,4,245,464,HHW,2016-04-30,1,HI,2016-05-12,66,2,20160430,F,NZ,10,B2,2016,4,30
7,5748524,2016,4,245,464,HHW,2016-04-30,1,HI,2016-05-12,41,2,20160430,F,NZ,10,B2,2016,4,30
8,5748525,2016,4,245,464,HOU,2016-04-30,1,FL,2016-05-07,27,2,20160430,M,NZ,28,B2,2016,4,30
9,5748526,2016,4,245,464,LOS,2016-04-30,1,CA,2016-05-07,26,2,20160430,F,NZ,2,B2,2016,4,30


In [18]:
temperature = transform_temperature(temperature)
countries = transform_countries(countries)

countries = transform_join_temperature_countries(temperature, countries)
print_data(countries)

Num rows =  290


Unnamed: 0,code,Country,avg_temperature,latitude,longitude
0,532,Aruba,,,
1,110,Finland,3.711645,60.27N,25.95E
2,438,Australia,16.701462,34.56S,138.16E
3,113,Greece,16.347483,37.78N,24.41E
4,126,Portugal,14.749675,40.99N,8.52W
5,737,Invalid: Midway Islands,,,
6,251,Israel,19.007697,31.35N,33.93E
7,849,No Country Code (849),,,
8,162,Ukraine,7.822184,49.03N,29.39E
9,508,Netherlands Antilles,,,


In [19]:
facts = build_facts(immigration, demographics, airports, airlines, countries)
print_data(facts)

Num rows =  1195976


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,dtadfile,gender,airline,fltno,visatype,arrival_year,arrival_month,arrival_day
0,4557215,2016,4,392,392,ATL,2016-04-24,1,LA,2016-07-25,44,3,20160424,M,DL,83,F1,2016,4,24
1,708837,2016,4,392,111,NEW,2016-04-04,1,NJ,2016-04-27,55,2,20160404,F,TP,213,B2,2016,4,4
2,3550514,2016,4,392,388,NEW,2016-04-19,1,NJ,2016-08-15,27,2,20160419,F,DL,21,B2,2016,4,19
3,1425101,2016,4,392,392,NEW,2016-04-08,1,VA,2016-04-23,50,1,20160408,M,AF,682,B1,2016,4,8
4,1425105,2016,4,392,392,CLT,2016-04-08,1,VA,,71,2,20160408,F,AF,54,B2,2016,4,8
5,1425103,2016,4,392,392,CLM,2016-04-08,1,MD,2016-05-04,33,2,20160408,M,SA,207,B2,2016,4,8
6,2951955,2016,4,392,392,ATL,2016-04-16,1,WA,2016-05-03,41,2,20160416,,DL,684,B2,2016,4,16
7,2377310,2016,4,392,388,ATL,2016-04-13,1,AL,2016-04-18,57,1,20160413,F,DL,83,B1,2016,4,13
8,3908785,2016,4,392,245,MIA,2016-04-21,1,PA,,28,2,20160421,F,AT,200,B2,2016,4,21
9,3908787,2016,4,392,392,FMY,2016-04-21,1,PA,,3,2,20160421,,AT,200,B2,2016,4,21


## 4.2 Store the resulting tables in parquet files

In [20]:
output_path = "./model"

demographics.write.mode('overwrite').parquet(output_path+"/demographics.parquet")
airports.write.mode('overwrite').parquet(output_path+"/airports.parquet")
airlines.write.mode('overwrite').parquet(output_path+"/airlines.parquet")
countries.write.mode('overwrite').parquet(output_path+"/countries.parquet")
facts.write.partitionBy("arrival_year", "arrival_month", "arrival_day")\
    .mode('overwrite').parquet(output_path+"/facts.parquet")

## 4.3 Data Quality Checks

_Explain the data quality checks you'll perform to ensure the pipeline ran as expected._ 

I implement two basic quality checks:
- Count checks to ensure the tables are not empty. One cannot set the check to a proper number of rows greater than 0, because the number of entries in the `immigration` dataset can vary from day to day. This is implemented with function `has_more_rows`.
- Integrity checks on the relational database - checking whether all foreign keys have their non-null counterparts. This is implemented with function `check_integrity`.
 
Run Quality Checks

### 4.3.1 Count Quality Checks


In [21]:
# Load data from parquet files
dim_demographics = spark.read.parquet("./model/demographics.parquet")
dim_airports = spark.read.parquet("./model/airports.parquet")
dim_airlines = spark.read.parquet("./model/airlines.parquet")
dim_countries = spark.read.parquet("./model/countries.parquet")

has_more_rows(dim_demographics, 0)
has_more_rows(dim_airports, 0)
has_more_rows(dim_airlines, 0)
has_more_rows(dim_countries, 0)

True

In [22]:
f_immigrations = spark.read.parquet("./model/facts.parquet")
has_more_rows(f_immigrations, 0)

True

### 4.3.2 Integrity Quality Checks

In [23]:
check_integrity(f_immigrations, dim_demographics, dim_airports, dim_airlines, dim_countries)

True

## 4.3 Data dictionary 

##### Immigration - facts

| Column Name | Description |
| :--- | :--- |
| CICID | Record ID |
| I94YR | 4 digit year |
| I94MON | Numeric month |
| I94CIT | Contry of citizenship |
| I94RES | Country of residence |
| I94PORT | Airport of addmittance into the USA |
| ARRDATE | Arrival date in the USA |
| I94MODE | Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported) |
| I94ADDR | State of arrival |
| DEPDATE | Departure date |
| I94BIR | Age of the visitor |
| I94VISA | Visa codes: (1 = Business; 2 = Pleasure; 3 = Student) |
| DTADFILE | Character date field |
| GENDER | Gender of the visitor |
| VISAPOST | Department of State where where Visa was issued |
| FLTNO | Flight number of Airline used to arrive in U.S. |
| VISATYPE | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |
| arrival_year | Numeric year of the arrival (used for data partitioning) |
| arrival_month | Numeric month of the arrival (used for data partitioning) |
| arrival_day | Numeric day of the arrival (used for data partitioning) |

##### State Demographics - dimmension


| Column Name | Description |
| :--- | :--- |
| STATE_CODE | Two-letter code of the state |
| STATE | Name of the state |
| MEDIAN_AGE | Median age in the state (estimation) |
| AVERAGE_HOUSEHOLD_SIZE | Average number of people loving in a household in the state (estimation) |
| TOTAL_POPULATION | Number of citizens |
| FEMALE_POPULATION | Number of female citizens |
| MALE_POPULATION | Number of male citizens |
| NUMBER_OF_VETERANS | Number of veteran citizens |
| BLACK_OR_AFRICAN_AMERICAN | Number of citizens belonging to this ethnic group |
| HISPANIC_OR_LATINO | Number of citizens belonging to this ethnic group |
| ASIAN | Number of citizens belonging to this ethnic group |
| AMERICAN_INDIAN_AND_ALASKA_NATIVE | Number of citizens belonging to this ethnic group |
| WHITE | Number of citizens belonging to this ethnic group |
| FOREIGN_BORN | Number of citizens born outside of US |

##### Airports - dimmension

ident	type	name	elevation_ft	continent	iso_country	iso_region	municipality	gps_code	iata_code	local_code	coordinates

| Column Name | Description |
| :--- | :--- |
| IDENT | Identification code |
| TYPE | Type of the airport |
| NAME | Name of the Airport |
| ELEVATION_FT | Elevation above the sea level in feet |
| CONTINENT | Continent code |
| ISO_COUNTRY | Country code according to ISO |
| ISO_REGION | Region code according to ISO |
| MUNICIPALITY | Mucipality where the airport is located |
| GPS_CODE | GPS code |
| IATA_CODE | Code of the airport assigned by International Air Transport Association |
| LOCAL_CODE | Local code of the airport |
| COORDINATES | GPS coordinates - longitude and latitude |

##### Airlines - dimmension

| Column Name | Description |
| :--- | :--- |
| AIRLINE_ID | ID of the airline |
| NAME | Name of the airline |
| IATA | Code of the airline assigned by International Air Transport Association |
| ICAO | Code of the airline assigned by International Civil Aviation Organization |
| CALLSIGN | Callsign of the airline |
| COUNTRY | Country where the airline is registered |
| ACTIVE | Letter 'Y' if the airiline still operates, 'N' otherwise |


##### Countries - dimmension

| Column Name | Description |
| :--- | :--- |
| CODE | Country Code |
| COUNTRY | Country Name |
| TEMPERATURE | Average temperature of the country between 1743 and 2013 |
| LATITUDE | GPS Latitude |
| LONGITUDE | GPS Longitude |

#### Step 5: Complete Project Write Up

##### Clearly state the rationale for the choice of tools and technologies for the project.

This project aims to use technologies that allow to run it both locally and in the cloud. Selecting *Spark* allows us to run this project in a Docker container but also it can be easily promoted to an EMR cluster running on AWS. The `parquet` files used by *Spark* provide performance improvement over raw data formats and promise scalability up to many terabytes of data.

##### Propose how often the data should be updated and why.

Since we receive one file per month it seems reasonable to update the model monthly. However, since the immigration data is partitioned daily, one could also update the dataset everyday.

##### Write a description of how you would approach the problem differently under the following scenarios:

1. The data was increased by 100x.

    Scaling the whole pipeline should be realtively easy. The solution can be moved to AWS Cloud - for example to EMR cluster - and the cluster could scale horizontally by adding new nodes. Should the data be transferred to RedShift over Airflow, both solutions can scale way above the 300 million rows.


2. The data populates a dashboard that must be updated on a daily basis by 7am every day.

    The entire ETL process takes up to 30 minutes inside Docker on a 6-core 12 GB notebook. Processing new data every morning would not cause any problems. The data could be stored on S3, processed by EMR and Airflow and deposited to Redshift. Entire process should not exceed 1 hour for the current size of data.


3. The database needed to be accessed by 100+ people.

    I assume that the data would need to be stored in RedShift on AWS. RedShift cluster would need to be scaled-out to support increased number of read requests per unit of time.
 