### Data Engineering Capstone Project

#### Summary
This project aims to have an end-to-end data pipeline for the US Immigration Services in order them to utilize their immigration data combined with demographics and weather based information provided from different sources.

The ETL pipeline has 3 main phases, that will be explained in detail

- Data Gathering (Extraction)
- Data Exploration, Cleaning & Joins (Transformation)
- Data Ingestion (Loading)

Contents:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
#import libraries

import pandas as pd
import datetime as dt
import configparser
import os

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [2]:
#declare a config file to get AWS credentials

config = configparser.ConfigParser()
config.read('config.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
#Initialize Spark, with additional SAS libraries
spark = SparkSession.builder.config("spark.jars.packages",
                                        "saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.0")\
.enableHiveSupport().getOrCreate()

### Step 1: Project Scope

This project aims to have an end-to-end data pipeline for the US Immigration Services in order them to utilize their immigration data combined with demographics and weather based information provided from different sources. By this way, the Business Intelligence and Data Analytics layers that are fed by the DWH would be able to create insights from immigration data enriched with weather, demographic and time information. 
- Some example insights would be like:
	 - The correlations of temperature data between US States/Cities and the immigrant countries. (Do people coming from warm weather prefer to immigrate to warm climates?)
	 - Time series analysis of target and destination cities based on climate.
	 - The effect of temperature on immigrated states? (number of immigrations, number of countries immigrated from etc.)
	 - Some predictional models can be used to forecast the immigration numbers in future. Example; The relation between temperature and population density of cities in time can be used to predict the effect populations to immigrate from very hot to mild weather states/cities due to global warming.

The ETL pipeline that is is scheduled on Airflow has 3 main phases;

- **Data Gathering (Extraction)**: I assumed the data consisting from 4 different source files resides on different sources, so I uploaded 1 source file to Amazon S3 and kept 3 on Udacity servers. The extraction phase runs in parallel and fetches its source data at the same time frame.  The data model has 4 dimension tables and 1 fact table. So as in total, there are 5 seperate batch jobs. At first, four of these process', which will be used to load the dimension dataframes are triggered from Airflow. Each Spark job loads the necessary file and creates it's dataframe, which consists of uncleaned raw data. (The raw data in the dataframes will be cleaned at first and then will be converted to parquet files and these files will be loaded into tables)

- **Data Exploration, Cleaning & Joins (Transformation)**:  After the 4 dimension data frames are loaded as dataframes (raw data), the data is analysed and some cleaning steps are taken. The exploration and cleaning steps are explained in more detail in the code as comments. After the cleaning steps are completed, these 4 dataframes are loaded into Amazon S3 buckets as parquet files. The fact table process starts after the dimension steps are completed. The data quality checks are made at the end of each seperate process. (As a difference, the quality check step in the .ipynb code is performed as the final step for all of the dataframes created)

[![End-to-End Flow](https://github.com/saygincelebi/Udacity-Data-Engineering-Nanodegree-Capstone-Project/blob/main/diagrams/ETL_flow.png?raw=true "End-to-End Flow")](https://github.com/saygincelebi/Udacity-Data-Engineering-Nanodegree-Capstone-Project/blob/main/diagrams/ETL_flow.png?raw=true "End-to-End Flow")

###### *The first 4 process are executed 5 times. 4 times for dimensions (in parallel) and one for the fact table. After the files are loaded into S3 buckets, the last process is also executed 5 times. 4 times for dimensions (in parallel) and one for the fact table.*

- **Data Ingestion (Loading)**: In the last phase, all of the dimension and fact Parquet files residing on the S3 buckets are ingested into the Redshift tables. Similar to the previous steps, the dimension tables are loaded in parallel and the fact table is loaded after all the dimensions are loaded. Instead of using Spark ,this step is direct executed from Airflow using the StageToRedshiftOperator. 

[![Airflow - DAG](https://github.com/saygincelebi/Udacity-Data-Engineering-Nanodegree-Capstone-Project/blob/main/diagrams/Airflow%20-%20Dag.PNG?raw=true "Airflow - DAG: ETL_Flow")](https://github.com/saygincelebi/Udacity-Data-Engineering-Nanodegree-Capstone-Project/blob/main/diagrams/Airflow%20-%20Dag.PNG?raw=true "Airflow - DAG")

###### *The gray  boxes are bash operators that run the Spark scripts in order to load the files to S3, the green boxes are StagetoRedshift operators that are used to ingest these files to Redshift.*

#### The Datasets

The project has 4 main data sources. I assumed there are two different physical data sources. One is the Udacity servers, the other is the Amazon S3 servers. One of the goals of the project is to combine the data gathered from different data sources.

- **US Immigration Data**
	- This data is provided by the US National Tourism and Trade Office.  (https://travel.trade.gov/research/reports/i94/historical/2016.html) A data dictionary is included in the workspace. For this project a partition containing a month of historical data is used.  (~9.3 million records) 
	- The data is ingested from the Udacity servers during the flow. (Location: ../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat)

- **U.S. City Demographic Data**
	- This data is provided by the US Census Bureau's 2015 American Community Survey.  (https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/) This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. (~3000 records) 
	- The data is ingested from the Amazon S3 servers during the flow.

- **World Temperature Data**
	- This data is provided by the  Berkeley Earth Surface Temperature Study.  (https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) It combines 1.6 billion temperature reports from 16 pre-existing archives. (~8.6 million records) It is nicely packaged and allows for slicing into interesting subsets (for example by country). They publish the source data and the code for the transformations they applied. They also use methods that allow weather observations from shorter time series to be included, meaning fewer observations need to be thrown away.
	- The data is ingested from the Udacity servers during the flow.  (Location: ../../data2/GlobalLandTemperaturesByCity.csv)

- **Country Codes**
	- This data is based on the US Immigration data provided by US National Tourism and Trade Office.  (https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) It has 3 digit country codes for 289 countries (Including Non Country Codes)
	- The data is ingested from the Udacity servers during the flow. The original file is the ../../data/I94_SAS_Labels_Descriptions.SAS file but I have provided a simplified .csv version that only contains the country codes.


In [4]:
#Reading the immigration dataset (Data resides on Udacity servers)
file_name = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immigration =spark.read.format('com.github.saurfang.sas.spark').load(file_name)

In [6]:
#Checking the data
df_immigration.limit(10).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,...,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1959.0,09302016,,,AZ,92471610000.0,602.0,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,...,,M,1970.0,09302016,,,AZ,92470800000.0,602.0,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,...,,M,1968.0,09302016,,,AZ,92478490000.0,608.0,B1


In [5]:
#Reading the Global Land Temperatures by City dataset #Reading the immigration dataset (Data resides on Udacity servers)
file_name = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = spark.read.csv(file_name, header=True, inferSchema=True)

In [6]:
#Checking the data
df_temperature.limit(10).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


In [7]:
#Reading the immigration dataset (Data resides on Amazon S3 servers)
file_name = "s3a://celebis/source/demographics/us-cities-demographics.csv"
df_demographics = spark.read.csv(file_name, inferSchema=True, header=True, sep=';')

In [8]:
# #Checking the data
df_demographics.limit(10).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712,41971,80683,4815,8355,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629,56860,108489,3800,37038,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762,43270,85032,5783,3269,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751,58077,109828,5204,16315,2.65,NC,Asian,11060


### Step 2: Exploring and Assessing the Data
For each dataframe the exploration and cleaning steps are different.
Each of these steps are documented in comments.

#### Cleaning Steps
Cleaning steps are also documented in comments.

**Immigration Data**

In [9]:
#finding the number of nan and null rows for each column
df_count_nan = df_immigration.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_immigration.columns]).toPandas()
    
#converting the dataframe to long format from wide format
df_count_nan = pd.melt(df_count_nan, var_name='columns', value_name='values')

#counting total number of records in the dataframe
no_of_recs = df_immigration.count()
    
#finding the percentage of missing values for each column
df_count_nan['% missing values'] = 100*df_count_nan['values']/no_of_recs

In [20]:
#Displaying the % of missing values for each column
df_count_nan

Unnamed: 0,columns,values,% missing values
0,cicid,0,0.0
1,i94yr,0,0.0
2,i94mon,0,0.0
3,i94cit,0,0.0
4,i94res,0,0.0
5,i94port,0,0.0
6,arrdate,0,0.0
7,i94mode,239,0.007719
8,i94addr,152592,4.928184
9,depdate,142457,4.600859


In [10]:
#finding the columns that have %90 or more missing values 
missing_vcolumns=list(df_count_nan.loc[df_count_nan['% missing values'] >= 90]['columns'])

In [12]:
#Displaying these columns, if there are any
missing_vcolumns

['occup', 'entdepu', 'insnum']

In [15]:
#Dropping these columns
df_immigration = df_immigration.drop(*missing_vcolumns)

In [16]:
#Assuming the 'cicid' column is the primary key, we drop duplicate entries, if there are any
df_immigration = df_immigration.dropDuplicates(['cicid'])

In [17]:
#Assuming the 'cicid' column is the primary key, we drop these entries with null values, if there are any
df_immigration = df_immigration.dropna(how='all', subset=['cicid'])

In [18]:
#We drop entries with all null values, if there are any
df_immigration = df_immigration.dropna(how='all')

In [19]:
#Checking the number of rows after dropping missing values
count_df_immigration= df_immigration.count()
print('{:,}'.format(count_df_immigration))

3,096,313


**Temperature Data**

In [23]:
#Converting date column type to string
df_temperature = df_temperature.withColumn("dt",col("dt").cast(StringType()))

In [24]:
#finding the number of nan and null rows
df_count_nan = df_temperature.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_temperature.columns]).toPandas()
    
#converting the dataframe to long format from wide format
df_count_nan = pd.melt(df_count_nan, var_name='columns', value_name='values')
    
#counting total number of records in the dataframe
no_of_recs = df_temperature.count()
    
#finding the percentage of missing values for each column
df_count_nan['% missing values'] = 100*df_count_nan['values']/no_of_recs

In [25]:
#Displaying the % of missing values for each column
df_count_nan

Unnamed: 0,columns,values,% missing values
0,dt,0,0.0
1,AverageTemperature,364130,4.234458
2,AverageTemperatureUncertainty,364130,4.234458
3,City,0,0.0
4,Country,0,0.0
5,Latitude,0,0.0
6,Longitude,0,0.0


In [27]:
#finding the columns that have %90 or more missing values
missing_vcolumns=list(df_count_nan.loc[df_count_nan['% missing values'] >= 90]['columns'])

In [29]:
#Displaying these columns, if there are any
missing_vcolumns

[]

In [30]:
#Dropping these columns
df_temperature = df_temperature.drop(*missing_vcolumns)

In [31]:
#We drop these entries with null values, if there are any
df_temperature = df_temperature.dropna(subset=['AverageTemperature','AverageTemperatureUncertainty'])

In [32]:
#We drop entries with duplicate values, if there are any (we need one temperature record per city, country & date)
df_temperature = df_temperature.drop_duplicates(subset=['dt', 'City', 'Country'])

In [20]:
#We drop entries with all null values, if there are any
df_temperature = df_temperature.dropna(how='all')

In [33]:
#Checking the number of rows after dropping missing values
count_df_temperature= df_temperature.count()
print('{:,}'.format(count_df_temperature))

8,190,783


In [34]:
#finding the number of nan and null rows for each column
df_count_nan = df_demographics.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_demographics.columns]).toPandas()
    
#converting the dataframe to long format from wide format
df_count_nan = pd.melt(df_count_nan, var_name='columns', value_name='values')
    
#counting total number of records in the dataframe
no_of_recs = df_demographics.count()
    
#finding the percentage of missing values for each column
df_count_nan['% missing values'] = 100*df_count_nan['values']/no_of_recs

In [35]:
#Displaying the % of missing values for each column
df_count_nan

Unnamed: 0,columns,values,% missing values
0,City,0,0.0
1,State,0,0.0
2,Median Age,0,0.0
3,Male Population,3,0.10377
4,Female Population,3,0.10377
5,Total Population,0,0.0
6,Number of Veterans,13,0.449671
7,Foreign-born,13,0.449671
8,Average Household Size,16,0.553442
9,State Code,0,0.0


In [36]:
#finding the columns that have %90 or more missing values
missing_vcolumns=list(df_count_nan.loc[df_count_nan['% missing values'] > 90]['columns'])

In [37]:
#Displaying these columns, if there are any
missing_vcolumns

[]

In [38]:
#Dropping these columns
df_demographics = df_demographics.drop(*missing_vcolumns)

In [39]:
#finding the columns that have missing values
missing_vcolumns=list(df_count_nan.loc[df_count_nan['% missing values'] > 0]['columns'])

In [42]:
#displaying these columns
missing_vcolumns

['Male Population',
 'Female Population',
 'Number of Veterans',
 'Foreign-born',
 'Average Household Size']

In [43]:
#We drop these entries with null values
df_demographics = df_demographics.dropna(subset=missing_vcolumns)

In [44]:
#Assuming the 'City', 'State', 'State Code', 'Race' columns make the primary key, we drop these entries with null values, if there are any
df_demographics= df_demographics.drop_duplicates(subset=['City', 'State', 'State Code', 'Race'])

In [45]:
#We drop entries with all null values, if there are any
df_demographics = df_demographics.dropna(how='all')

In [52]:
#Checking the number of rows after dropping missing values
count_df_demographics= df_demographics.count()
print('{:,}'.format(count_df_demographics))

2,875


<b><i>Data dictionary for Immigration Data</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">cicid</td><td class="tg-0pky">Unique record ID</td>
 <tr><td class="tg-0pky">i94yr</td><td class="tg-0pky">4 digit year</td>
 <tr><td class="tg-0pky">i94mon</td><td class="tg-0pky">Numeric month</td>
 <tr><td class="tg-0pky">i94cit</td><td class="tg-0pky">3 digit code for immigrant country of birth</td>
 <tr><td class="tg-0pky">i94res</td><td class="tg-0pky">3 digit code for immigrant country of residence </td>
 <tr><td class="tg-0pky">i94port</td><td class="tg-0pky">Port of admission</td>
 <tr><td class="tg-0pky">arrdate</td><td class="tg-0pky">Arrival Date in the USA</td>
 <tr><td class="tg-0pky">i94mode</td><td class="tg-0pky">Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)</td>
 <tr><td class="tg-0pky">i94addr</td><td class="tg-0pky">USA State of arrival</td>
 <tr><td class="tg-0pky">depdate</td><td class="tg-0pky">Departure Date from the USA</td>
 <tr><td class="tg-0pky">i94bir</td><td class="tg-0pky">Age of Respondent in Years</td>
 <tr><td class="tg-0pky">i94visa</td><td class="tg-0pky">Visa codes collapsed into three categories</td>
 <tr><td class="tg-0pky">count</td><td class="tg-0pky">Field used for summary statistics</td>
 <tr><td class="tg-0pky">dtadfile</td><td class="tg-0pky">Character Date Field - Date added to I-94 Files</td>
 <tr><td class="tg-0pky">visapost</td><td class="tg-0pky">Department of State where where Visa was issued </td>
 <tr><td class="tg-0pky">occup</td><td class="tg-0pky">Occupation that will be performed in U.S</td>
 <tr><td class="tg-0pky">entdepa</td><td class="tg-0pky">Arrival Flag - admitted or paroled into the U.S.</td>
 <tr><td class="tg-0pky">entdepd</td><td class="tg-0pky">Departure Flag - Departed, lost I-94 or is deceased</td>
 <tr><td class="tg-0pky">entdepu</td><td class="tg-0pky">Update Flag - Either apprehended, overstayed, adjusted to perm residence</td>
 <tr><td class="tg-0pky">matflag</td><td class="tg-0pky">Match flag - Match of arrival and departure records</td>
 <tr><td class="tg-0pky">biryear</td><td class="tg-0pky">4 digit year of birth</td>
 <tr><td class="tg-0pky">dtaddto</td><td class="tg-0pky">Character Date Field - Date to which admitted to U.S. (allowed to stay until)</td>
 <tr><td class="tg-0pky">gender</td><td class="tg-0pky">Non-immigrant sex</td>
 <tr><td class="tg-0pky">insnum</td><td class="tg-0pky">INS number</td>
 <tr><td class="tg-0pky">airline</td><td class="tg-0pky">Airline used to arrive in U.S.</td>
 <tr><td class="tg-0pky">admnum</td><td class="tg-0pky">Admission Number</td>
 <tr><td class="tg-0pky">fltno</td><td class="tg-0pky">Flight number of Airline used to arrive in U.S.</td>
 <tr><td class="tg-0pky">visatype</td><td class="tg-0pky">Class of admission legally admitting the non-immigrant to temporarily stay in U.S.</td>
</table>

<b><i>Data dictionary for Temperature Data</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">dt</td><td class="tg-0pky">Date</td>
 <tr><td class="tg-0pky">AverageTemperature</td><td class="tg-0pky">Global average land temperature in celsius</td>
 <tr><td class="tg-0pky">AverageTemperatureUncertainty</td><td class="tg-0pky">95% confidence interval around the average</td>
 <tr><td class="tg-0pky">City</td><td class="tg-0pky">Name of City</td>
 <tr><td class="tg-0pky">Country</td><td class="tg-0pky">Name of Country</td>
 <tr><td class="tg-0pky">Latitude</td><td class="tg-0pky">City Latitude</td>
 <tr><td class="tg-0pky">Longitude</td><td class="tg-0pky">City Longitude</td>
</table>

<b><i>Data dictionary for Demographics Data</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">City</td><td class="tg-0pky">City Name</td>
 <tr><td class="tg-0pky">State</td><td class="tg-0pky">US State where city is located</td>
 <tr><td class="tg-0pky">Median Age</td><td class="tg-0pky">Median age of the population</td>
 <tr><td class="tg-0pky">Male Population</td><td class="tg-0pky">Count of male population</td>
 <tr><td class="tg-0pky">Female Population</td><td class="tg-0pky">Count of female population</td>
 <tr><td class="tg-0pky">Total Population</td><td class="tg-0pky">Count of total population</td>
 <tr><td class="tg-0pky">Number of Veterans</td><td class="tg-0pky">Count of total Veterans</td>
 <tr><td class="tg-0pky">Foreign born</td><td class="tg-0pky">Count of residents of the city that were not born in the city</td>
 <tr><td class="tg-0pky">Average Household Size</td><td class="tg-0pky">Average city household size</td>
 <tr><td class="tg-0pky">State Code</td><td class="tg-0pky">Code of the US state</td>
 <tr><td class="tg-0pky">Race</td><td class="tg-0pky">Respondent race</td>
 <tr><td class="tg-0pky">Count</td><td class="tg-0pky">Count of city's individual per race</td>
</table>

### Step 3: The Data Model
#### 3.1 Conceptual Data Model

A Star Schema has been created in order to provide the reporing and analytics features described in Step 1.

- **dim_calendar:** Time dimension table (Load Type: Truncate Insert)
- **dim_country:** Country & average temperature dimension table (Load Type: Truncate Insert)
- **dim_visatype:** Visa type dimension table (Load Type: Truncate Insert)
- **dim_demographics:**  Demographics dimension table (Load Type: Truncate Insert)
- **fact_immigration**: Immigration fact table (Load Type: Insert Append)

The PK, FK, Indexes and Relations are described in more detail here: [Logical Data Model](https://github.com/saygincelebi/Udacity-Data-Engineering-Nanodegree-Capstone-Project/blob/main/diagrams/Logical%20Data%20Model.txt "Logical Data Model")

[![ER Diagram](https://github.com/saygincelebi/Udacity-Data-Engineering-Nanodegree-Capstone-Project/blob/main/diagrams/ER%20Diagram.png?raw=true "ER Diagram")](https://github.com/saygincelebi/Udacity-Data-Engineering-Nanodegree-Capstone-Project/blob/main/diagrams/ER%20Diagram.png?raw=true "ER Diagram")

#### 3.2 Mapping Out Data Pipelines
As described in Step 1, the data pipeline is as follows:

1.  Loading files into dataframes (raw data)
2.  Analyzing dataframes
3.  Cleaning dataframes
4.  Checking dataframes
5.  Ingesting dim and fact data as parquet files on S3 buckets
6.  Writing files to Redshift tables


### Step 4: Running the Pipelines to Model the Data 
#### 4.1 Creating the data model
The functions that will be creating the files on S3 buckets that will be loaded into the Star Schema on Redshift are defined here.

In [57]:
#Type convertion for Numeric and String columns in order to prevent errors while loading data into Redshift
def cast_type(df, cols):
    """
    in: df: spark dataframe
    in: cols: columns and types to be casted
    out: spark dataframe with casted types
    """
    for x,y in cols.items():
        if x in df.columns:
            df = df.withColumn(x, df[x].cast(y))
    return df

In [65]:
#Saving the dataframe to S3 (or another file system)
def save_df(df, output_path, mode = "overwrite", output_format = "parquet", columns = '*', partitionBy=None, **options):
    """
    in: df: spark dataframe to be written into files
    in: output_path: path to be written
    in: parameters
    """
    df.select(columns).write.save(output_path, mode= mode, format=output_format, partitionBy = partitionBy, **options)

In [55]:
#S3 output path for dimension files
dim_output_data = "s3a://celebis/dimensions/"

In [56]:
#S3 output path for fact files
fact_output_data = "s3a://celebis/facts/"

In [59]:
#This function creates an immigration calendar dataframe based on arrival date
def create_dim_calendar(df, dim_output_data):
    """
    in:df: spark dataframe of immigration events
    in:dim_output_data: path to write dimension dataframe to
    out: spark dataframe calendar dimension
    """
    #Creating a udf to convert arrival date in SAS format to datetime object
    get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)

    #Creating initial calendar df from arrdate column
    dim_calendar_df = df.select(['arrdate']).withColumn("arrdate", get_datetime(df.arrdate)).distinct()

    #Expanding df by adding other calendar columns
    dim_calendar_df = dim_calendar_df.withColumn('arrival_day', dayofmonth('arrdate'))
    dim_calendar_df = dim_calendar_df.withColumn('arrival_week', weekofyear('arrdate'))
    dim_calendar_df = dim_calendar_df.withColumn('arrival_month', month('arrdate'))
    dim_calendar_df = dim_calendar_df.withColumn('arrival_year', year('arrdate'))
    dim_calendar_df = dim_calendar_df.withColumn('arrival_weekday', dayofweek('arrdate'))

    #Creating an id field in calendar df
    dim_calendar_df = dim_calendar_df.withColumn('id', monotonically_increasing_id())
    
    integer_cols = ['arrival_day','arrival_week','arrival_month','arrival_year','arrival_weekday']

    #Type conversion for integer columns  
    calendar_df = cast_type(dim_calendar_df, dict(zip(integer_cols, len(integer_cols)*[IntegerType()])))
 
    varchar_cols = ['id']
    #Type conversion for varchar columns
    dim_calendar_df = cast_type(calendar_df, dict(zip(varchar_cols, len(varchar_cols)*[StringType()])))
    
    #Writing the calendar dimension to parquet file
    dim_calendar_df.write.parquet(dim_output_data + "dim_calendar", mode="overwrite")

    return dim_calendar_df


In [60]:
#Creating the df_dim_calendar dataframe and writing the files to S3
df_dim_calendar = create_dim_calendar(df_immigration, dim_output_data)

In [61]:
#Checking the schema
df_dim_calendar.printSchema()

root
 |-- arrdate: string (nullable = true)
 |-- arrival_day: integer (nullable = true)
 |-- arrival_week: integer (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_weekday: integer (nullable = true)
 |-- id: string (nullable = false)



In [62]:
#Checking the data
df_dim_calendar.limit(5).toPandas()

Unnamed: 0,arrdate,arrival_day,arrival_week,arrival_month,arrival_year,arrival_weekday,id
0,2016-04-22,22,16,4,2016,6,8589934592
1,2016-04-15,15,15,4,2016,6,25769803776
2,2016-04-18,18,16,4,2016,2,42949672960
3,2016-04-09,9,14,4,2016,7,68719476736
4,2016-04-11,11,15,4,2016,2,85899345920


In [66]:
#This function calculates average temperature data for a given country
def aggregate_temperature_data(df):
    """    
    in: df: spark dataframe of global temperature data
    out: spark dataframe consisting of countries average temperatures
    """
    new_df = df.select(['Country', 'AverageTemperature']).groupby('Country').avg()
    
    new_df = new_df.withColumnRenamed('avg(AverageTemperature)', 'average_temperature')
    
    return new_df

In [67]:
#This function creates a country dimension from the immigration and global land temperatures data.
def create_dim_country(df_imm, df_temp, dim_output_data):
    """This function creates a country dimension from the immigration and global land temperatures data.
    
    in: df: spark dataframe of immigration events
    in: temp_df: spark dataframe of global land temperatures data.
    in: dim_output_data: path to write dimension dataframe to
    out: spark dataframe representing calendar dimension
    """
    #udf that returns the country name
    @udf()
    def get_country_names(code):
        name = mapping_codes[mapping_codes['code']==code]['Name'].iloc[0]
        
        if name:
            return name.title()
        return None
    
    #udf that returns the average temperature for the given country
    @udf('string')
    def get_country_average_temp(country_name):
        print("Processing average temperature for: ", country_name)
        avg_temperature = df_temp_agg[df_temp_agg['Country']==country_name]['average_temperature']
        if not avg_temperature.empty:
            return str(avg_temperature.iloc[0])
        return None
    
    #Gets the aggregated temperature data
    df_temp_agg = aggregate_temperature_data(df_temp).toPandas()
    
    #country_codes.csv holds the country codes
    file_name = "country_codes.csv"
    
    mapping_codes = pd.read_csv(file_name)
        
    #Select and renames the i94res column as country_code
    dim_country_temp = df_imm.select(['i94res']).distinct() \
                .withColumnRenamed('i94res', 'country_code')
    
    #Creates the country_name column
    dim_country_temp = dim_country_temp.withColumn('country_name', get_country_names(dim_country_temp.country_code))
    
    #Creates the average_temperature column
    dim_country_temp = dim_country_temp.withColumn('average_temperature', get_country_average_temp(dim_country_temp.country_name))
    
    
    integer_cols = ['country_code']
    #Type conversion for integer columns
    dim_country_temp = cast_type(dim_country_temp, dict(zip(integer_cols, len(integer_cols)*[IntegerType()])))
    
    float_cols = ['average_temperature']
    #Type conversion for float columns  
    dim_country_temp = cast_type(dim_country_temp, dict(zip(float_cols, len(float_cols)*[DoubleType()])))
    
    #Writes the dimension to a parquet file
    dim_country_temp.write.parquet(dim_output_data + "dim_country", mode="overwrite")

    return dim_country_temp


In [68]:
#Creating the df_dim_country dataframe and writing the files to S3
df_dim_country = create_dim_country(df_immigration, df_temperature, dim_output_data)

In [70]:
#Checking the schema
df_dim_country.printSchema()

root
 |-- country_code: integer (nullable = true)
 |-- country_name: string (nullable = true)
 |-- average_temperature: double (nullable = true)



In [71]:
#Checking the data
df_dim_country.show(5)

+------------+------------+-------------------+
|country_code|country_name|average_temperature|
+------------+------------+-------------------+
|         692|     Ecuador|      20.5391705374|
|         299|    Mongolia|     -3.36548531952|
|         576| El Salvador|      25.2628525509|
|         735|  Montenegro|      10.2210401137|
|         206|   Hong Kong|      21.4236961538|
+------------+------------+-------------------+
only showing top 5 rows



In [72]:
#This function creates a visa type dimension from the immigration data.
def create_dim_visa_type(df_imm, dim_output_data):
    """
    in: df_imm: immigration dataframe
    in: dim_output_data: path to write dimension dataframe to
    out: visa type dataframe
    """
    #Creates visatype df from visatype column
    visatype_df = df_imm.select(['visatype']).distinct()
    
    #Adding an id column
    visatype_df = visatype_df.withColumn('visa_type_key', monotonically_increasing_id())
    
    varchar_cols = ['visa_type_key']
    #Type conversion for varchar columns
    visatype_df = cast_type(visatype_df, dict(zip(varchar_cols, len(varchar_cols)*[StringType()])))
    
    #Writing dimension to parquet file
    visatype_df.write.parquet(dim_output_data + "dim_visatype", mode="overwrite")
    
    return visatype_df


In [73]:
#Creating the df_dim_visatype dataframe and writing the files to S3
df_dim_visatype = create_dim_visa_type(df_immigration, dim_output_data)

In [74]:
#Checking the schema
df_dim_visatype.printSchema()

root
 |-- visatype: string (nullable = true)
 |-- visa_type_key: string (nullable = false)



In [75]:
#Checking the data
df_dim_visatype.show(n=5)

+--------+-------------+
|visatype|visa_type_key|
+--------+-------------+
|      F2| 103079215104|
|     GMB| 352187318272|
|      B2| 369367187456|
|      F1| 498216206336|
|     CPL| 601295421440|
+--------+-------------+
only showing top 5 rows



In [76]:
#This function creates a us demographics dimension table from the us cities demographics data.
def create_dim_demographics(df_demo, dim_output_data):
    """
    in: df_demo: spark dataframe of us demographics survey data
    in: dim_output_data: path to write dimension dataframe to
    out: spark dataframe representing demographics dimension
    """
    df_demo = df_demo.withColumnRenamed('Median Age','median_age') \
            .withColumnRenamed('Male Population', 'male_population') \
            .withColumnRenamed('Female Population', 'female_population') \
            .withColumnRenamed('Total Population', 'total_population') \
            .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
            .withColumnRenamed('Foreign-born', 'foreign_born') \
            .withColumnRenamed('Average Household Size', 'average_household_size') \
            .withColumnRenamed('State Code', 'state_code')

    #Adding an id column
    df_demo = df_demo.withColumn('id', monotonically_increasing_id())
    

    varchar_cols = ['id']
    #Type conversion for varchar columns
    df_demo = cast_type(df_demo, dict(zip(varchar_cols, len(varchar_cols)*[StringType()])))
    
    integer_cols = ['male_population','female_population', 'total_population', 'number_of_veterans', 'foreign_born', 'count']
    #Type conversion for integer columns
    df_demo = cast_type(df_demo, dict(zip(integer_cols, len(integer_cols)*[IntegerType()])))
    
    float_cols = ['median_age', 'average_household_size']
    #Type conversion for float columns
    df_demo = cast_type(df_demo, dict(zip(float_cols, len(float_cols)*[DoubleType()])))
    
    #Writing dimension to parquet file
    df_demo.write.parquet(dim_output_data + "dim_demographics", mode="overwrite")
    
    return df_demo

In [77]:
#Creating the df_dim_demographics dataframe and writing the files to S3
df_dim_demographics = create_dim_demographics(df_demographics, dim_output_data)

In [79]:
#Checking the schema
df_dim_demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- number_of_veterans: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)
 |-- id: string (nullable = false)



In [80]:
#Checking the data
df_dim_demographics.limit(5).toPandas()

Unnamed: 0,City,State,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,Race,Count,id
0,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723,0
1,Wilmington,North Carolina,35.5,52346,63601,115947,5908,7401,2.24,NC,Asian,3152,1
2,Tampa,Florida,35.3,175517,193511,369028,20636,58795,2.47,FL,Hispanic or Latino,95154,2
3,Gastonia,North Carolina,36.9,35527,39023,74550,3537,5715,2.67,NC,Asian,2788,3
4,Tyler,Texas,33.9,50422,53283,103705,4813,8225,2.59,TX,American Indian and Alaska Native,1057,4


In [81]:
#This function creates an country dimension from the immigration and global land temperatures data.
def create_fact_immigration(spark, df_imm, dim_output_data):
    """
    in: spark: spark session
    in: df_imm: spark dataframe of immigration events.
    in: output_data: path to write dimension dataframe to
    out: spark dataframe representing calendar dimension
    """
 
    #Gets the dim_visatype_df dimension
    dim_visatype_df = spark.read.parquet(dim_output_data + "dim_visatype")

    #Creates a view for dim_visatype_df dimension
    dim_visatype_df.createOrReplaceTempView("v_visa")

    #Create a udf to convert arrival date in SAS format to datetime object
    get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)

    #Renames columns to align with data model
    df_imm = df_imm.withColumnRenamed('ccid', 'record_id') \
        .withColumnRenamed('i94res', 'country_residence_code') \
        .withColumnRenamed('i94addr', 'state_code')

    #Creates an immigration view
    df_imm.createOrReplaceTempView("v_immigration")

    #Creates the visa_type key
    df_imm = spark.sql(
        """
        SELECT 
            v_immigration.*, v_visa.visa_type_key
        FROM v_immigration
        LEFT JOIN v_visa ON v_visa.visatype=v_immigration.visatype
        """
    )

    #Converts arrival date into datetime object
    df_imm = df_imm.withColumn("arrdate", get_datetime(df_imm.arrdate))

    #Drop visatype key
    df_imm = df_imm.drop(df_imm.visatype)

    int_cols = ['cicid','i94yr','i94mon','i94','i94mode','i94bir','i94visa','biryear','admnum']
    #Type conversion for integer columns
    df_imm = cast_type(df_imm, dict(zip(int_cols, len(int_cols)*[IntegerType()])))
    
    varchar_cols = ['visa_type_key']
    #Type conversion for varchar columns
    df_imm = cast_type(df_imm, dict(zip(varchar_cols, len(varchar_cols)*[StringType()])))
    
    #Writes dimension to parquet file
    df_imm.write.parquet(fact_output_data + "fact_immigration", mode="overwrite")

    return df_imm


In [82]:
#Creating the df_fact_immigration dataframe and writing the files to S3
df_fact_immigration = create_fact_immigration(spark,df_immigration, dim_output_data)

In [83]:
#Checking the schema
df_fact_immigration.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- country_residence_code: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- state_code: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: integer (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visa_type_key: string (nullable = true)

In [84]:
#Checking the data
df_fact_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,country_residence_code,i94port,arrdate,i94mode,state_code,depdate,...,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visa_type_key
0,299,2016,4,103.0,103.0,NYC,2016-04-01,1,NY,20550.0,...,O,O,M,1962,6292016,,OS,2147483647,87,884763262976
1,305,2016,4,103.0,103.0,NYC,2016-04-01,1,NY,20555.0,...,O,O,M,1953,6292016,,OS,2147483647,87,884763262976
2,496,2016,4,103.0,103.0,CHI,2016-04-01,1,IL,20548.0,...,O,O,M,1952,6292016,,OS,2147483647,65,738734374912
3,558,2016,4,103.0,103.0,SFR,2016-04-01,1,CA,20547.0,...,G,O,M,1974,6292016,M,LH,2147483647,454,738734374912
4,596,2016,4,103.0,103.0,NAS,2016-04-01,1,FL,20547.0,...,G,N,M,1992,6292016,M,UP,2147483647,221,884763262976


#### 4.2 Data Quality Checks

 * Unit tests have been performed in the notebook
 * Count checks are implemented, exception is thrown if no data is loaded

In [88]:
#Checking the number of rows in each dataframe one by one. Throws exception if unloaded table detected.

df_check = {
    
    'df_dim_visatype': df_dim_visatype,
    'df_dim_country': df_dim_country,
    'df_dim_demographics': df_dim_demographics,
    'df_dim_calendar': df_dim_calendar,
    'df_fact_immigration': df_fact_immigration,
}
for df_name, df in df_check.items():
 
    num_of_rows = df.count()

    if num_of_rows == 0:
        raise ValueError (f"{df_name} has zero records. Data quality check failed. ")
    else:
        print(f"{df_name} has {num_of_rows:,} records. Data quality check successful. ") 
        

df_dim_visatype has 17 records. Data quality check successful. 
df_dim_country has 229 records. Data quality check successful. 
df_dim_demographics has 2,875 records. Data quality check successful. 
df_dim_calendar has 30 records. Data quality check successful. 
df_fact_immigration has 3,096,313 records. Data quality check successful. 


 #### 4.3 Data Dictionary
 ##### Dim Calendar (Source: Immigration Dataset)
 
<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">id</td><td class="tg-0pky">Unique id</td></tr>
 <tr><td class="tg-0pky">arrdate</td><td class="tg-0pky">Arrival date into US</td></tr>    
 <tr><td class="tg-0pky">arrival_year</td><td class="tg-0pky">Arrival year into US</td></tr>
 <tr><td class="tg-0pky">arrival_month</td><td class="tg-0pky">Arrival MonthS</td></tr>
 <tr><td class="tg-0pky">arrival_day</td><td class="tg-0pky">Arrival Day</td></tr>
 <tr><td class="tg-0pky">arrival_week</td><td class="tg-0pky">Arrival Week</td></tr>
 <tr><td class="tg-0pky">arrival_weekday</td><td class="tg-0pky">Arrival WeekDay</td></tr>
</table>


##### Dim Demographics (Source: Demographics Data)

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">id</td><td class="tg-0pky">Record id</td>
 <tr><td class="tg-0pky">state_code</td><td class="tg-0pky">US state code </td>
 <tr><td class="tg-0pky">City</td><td class="tg-0pky">City Name</td>
 <tr><td class="tg-0pky">State</td><td class="tg-0pky">US State where city is located</td>
 <tr><td class="tg-0pky">Median Age</td><td class="tg-0pky">Median age of the population</td>
 <tr><td class="tg-0pky">Male Population</td><td class="tg-0pky">Count of male population</td>
 <tr><td class="tg-0pky">Female Population</td><td class="tg-0pky">Count of female population</td>
 <tr><td class="tg-0pky">Total Population</td><td class="tg-0pky">Count of total population</td>
 <tr><td class="tg-0pky">Number of Veterans</td><td class="tg-0pky">Count of total Veterans</td>
 <tr><td class="tg-0pky">Foreign born</td><td class="tg-0pky">Count of residents of the city that were not born in the city</td>
 <tr><td class="tg-0pky">Average Household Size</td><td class="tg-0pky">Average city household size</td>
 <tr><td class="tg-0pky">Race</td><td class="tg-0pky">Respondent race</td>
 <tr><td class="tg-0pky">Count</td><td class="tg-0pky">Count of city's individual per race</td>
</table>

##### Dim Country (Source: Temperature Data and 94_SAS_Labels_Descriptions.SAS file)

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">country_code</td><td class="tg-0pky">Unique country code</td></tr>
 <tr><td class="tg-0pky">country_name</td><td class="tg-0pky">Name of country</td></tr>    
 <tr><td class="tg-0pky">average_temperature</td><td class="tg-0pky">Average temperature of country</td></tr>
</table>

##### Dim Visa Type (Source: Immigration Data)
<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">visa_type_key</td><td class="tg-0pky">Unique id for each visa issued</td></tr>
 <tr><td class="tg-0pky">visa_type</td><td class="tg-0pky">Name of visa</td></tr>
</table>

##### Fact Immigration (Source: Immigration Data)

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">record_id</td><td class="tg-0pky">Unique record ID</td></tr>
 <tr><td class="tg-0pky">country_residence_code</td><td class="tg-0pky">3 digit code for immigrant country of residence </td></tr>    
 <tr><td class="tg-0pky">visa_type_key</td><td class="tg-0pky">A numerical key that links to the visa_type dimension table</td></tr>
 <tr><td class="tg-0pky">state_code</td><td class="tg-0pky">US state of arrival</td></tr>
 <tr><td class="tg-0pky">i94yr</td><td class="tg-0pky">4 digit year</td></tr>
 <tr><td class="tg-0pky">i94mon</td><td class="tg-0pky">Numeric month</td></tr>
 <tr><td class="tg-0pky">i94port</td><td class="tg-0pky">Port of admission</td></tr>
 <tr><td class="tg-0pky">arrdate</td><td class="tg-0pky">Arrival Date in the USA</td></tr>
 <tr><td class="tg-0pky">i94mode</td><td class="tg-0pky">Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)</td></tr>
 <tr><td class="tg-0pky">i94addr</td><td class="tg-0pky">USA State of arrival</td></tr>
 <tr><td class="tg-0pky">depdate</td><td class="tg-0pky">Departure Date from the USA</td></tr>
 <tr><td class="tg-0pky">i94bir</td><td class="tg-0pky">Age of Respondent in Years</td></tr>
 <tr><td class="tg-0pky">i94visa</td><td class="tg-0pky">Visa codes collapsed into three categories</td></tr>
 <tr><td class="tg-0pky">count</td><td class="tg-0pky">Field used for summary statistics</td></tr>
 <tr><td class="tg-0pky">dtadfile</td><td class="tg-0pky">Character Date Field - Date added to I-94 Files</td></tr>
 <tr><td class="tg-0pky">visapost</td><td class="tg-0pky">Department of State where where Visa was issued </td></tr>
 <tr><td class="tg-0pky">occup</td><td class="tg-0pky">Occupation that will be performed in U.S</td></tr>
 <tr><td class="tg-0pky">entdepa</td><td class="tg-0pky">Arrival Flag - admitted or paroled into the U.S.</td></tr>
 <tr><td class="tg-0pky">entdepd</td><td class="tg-0pky">Departure Flag - Departed, lost I-94 or is deceased</td></tr>
 <tr><td class="tg-0pky">entdepu</td><td class="tg-0pky">Update Flag - Either apprehended, overstayed, adjusted to perm residence</td></tr>
 <tr><td class="tg-0pky">matflag</td><td class="tg-0pky">Match flag - Match of arrival and departure records</td></tr>
 <tr><td class="tg-0pky">biryear</td><td class="tg-0pky">4 digit year of birth</td></tr>
 <tr><td class="tg-0pky">dtaddto</td><td class="tg-0pky">Character Date Field - Date to which admitted to U.S. (allowed to stay until)</td></tr>
 <tr><td class="tg-0pky">gender</td><td class="tg-0pky">Non-immigrant sex</td></tr>
</table>

#### Step 5: Complete Project Write Up

* The architecture designed for this project mainly combines most of the technologies covered in the Data Engineering Nano Degree program. Airflow provides the data pipeline orchestration, scheduling utilities as well as the predefined operators give flexibility to process and ingest data. Spark enables us to process greater amounts of data in parallel. S3 is a great storage option for staging massive amounts of data.  Redshift is a scalable datawarehouse platform that would be used for repoting purposes. Developing an end to end pipeline gives the chance to utilize these technologies all at once with great compatibility.

* The main data source for this project was the immigration dataset, which was provided in monthly partitions. The flow is scheduled to work once in a month but it can definetely be converted into weekly or daily schedules by adjusting loading mechanism of the source files.

* Scenarios:

 * If the data was increased by 100x I believe this architecture would still perform well. Some performance issues would have probably occured on Spark servers which could be solved easily by some capacity increase. I would also make daily batches instead of running monthly.
 
 * If the data populates a dashboard that must be updated on a daily basis by 7am every day, this means we would need daily schedules, that would probably start running after 00:00 AM just after the source files are created.
 
 * If the database needed to be accessed by 100+ people, Redshift would still perform well with this Star schema model. Of course some database and query optimizations would improve performance.  Also improvements on server capacites can be considered when query loads are more than the servers can handle.