# Project Title
### Data Engineering Capstone Project

#### Project Summary
The objective of this project is to create an ETL pipeline for given data on I94 immigration, global land temperatures and US demographics datasets to form an analytics database on immigration events. A use case for this analytics database is to find immigration patterns to the US, for example statistics on non-resident arrivals to the U.S. by type of visa (business, pleasure, student) or dependencies between country of origin and state demographics of the first intended adress.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up


---

## Step 1: Scope the Project and Gather Data


### Scope
The main goal of this project is to create a warehouse in the cloud that will allow the analysts to perform analytical queries or build dashboards to answer some business questions. The main dataset that we will use will include data on immigration to the United States, and supplementary datasets will include data on U.S. city demographics and temperature data.
Questions to answer with this data:
- Non-resident arrivals to the U.S. by country of residence over a period of time
- Non-resident arrivals to the U.S. by type of visa (business, pleasure, student)
- Non-resident arrivals to the U.S. by mode of transportation
- Arrivals to the U.S. by the climate of the original country
- Statistics of visitors: age, gender
- Dependencies between country of origin and state demographics of the first intended adress
- ...

This project will be executed with the help of **Amazon Web Services**: Data will be read from the customers repository using **Apache Spark** and stored in a staging area in **AWS S3 bucket**. Then, it will be loaded to the **Amazon Redshift** warehouse using an **Apache Airflow** pipeline.

### Describe and Gather Data 
Following data sets are available:
- **I94 Immigration Data:** This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
- **World Temperature Data:** This dataset came from Kaggle. You can read more about it here.
- **U.S. City Demographic Data:** This data comes from OpenSoft. You can read more about it here.

In [24]:
import pandas as pd

In [8]:
# Immigration Data Sample: For the first impression of the data we will take a look on a data sample provided in the workspace. For data analysis we will load a bigger batch

dirImmigration = 'immigration_data_sample.csv'
dfImmigration = pd.read_csv(dirImmigration)
dfImmigration.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [18]:
dfImmigration.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

##### Description of the dataset:

|Column Name|Description|Data Type|
|---|---|---|
|_cicid_|Record ID|double|
|i94yr|Year of arrival, 4 digits|double|
|i94mon|Month of arrival, numeric|double|
|i94cit|Citizenship country code|double|
|_i94res_|Residence country code|double|
|i94port|U.S. port of entry (main gateway ports - all modes and air-only)|string|
|_arrdate_|Arrival date in the USA|double|
|i94mode|Mode of transportation (1=air, 2=sea, 3=land, 9=not reported)|double|
|_i94addr_|First Intended Address in the U.S. (or Address while in the U.S.) (state)|string|
|depdate|Departure date from the USA|double|
|i94bir|Age of Respondent in Years|double|
|i94visa|Visa code (1=business, 2=pleasure, 3=student)|double|
|count|Used for summary statistics|double|
|dtadfile|Character Date Field - Date added to I-94 Files|string|
|visapost|Department of State where where Visa was issued|string|
|entdepa|Arrival Flag - admitted or paroled into the U.S.|string|
|entdepd|Departure Flag - Departed, lost I-94 or is deceased|string|
|matflag| Match flag - Match of arrival and departure records|string|
|biryear|Birth year, 4 digits|double|
|dtaddto|Character Date Field - Date to which admitted to U.S. (allowed to stay until)|string|
|gender|Gender|string|
|airline|Airline used to arrive in U.S.|string|
|admnum|Admission Number|double|
|fltno|Flight number of Airline used to arrive in U.S.|string|
|visatype|Class of admission legally admitting the non-immigrant to temporarily stay in U.S.|string|

In [9]:
# World Temperature Data

dirTemp = '../../data2/GlobalLandTemperaturesByCity.csv'
dfTemp = pd.read_csv(dirTemp)
dfTemp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


##### Description of the dataset:

|Column Name|Description|Data Type|
|---|---|---|
|dt|Date in format YYYY-MM-DD|timestamp|
|AverageTemperature|Global average land temperature in Celsius|double|
|AverageTemperatureUncertainty|The 95% confidence interval around the average|double|
|City|City name|string|
|Country|Country name|string|
|Latitude|Latitude|string|
|Longitude|Longitude|string|

In [10]:
# Demographic Data

dirDemogr = 'us-cities-demographics.csv'
dfDemogr = pd.read_csv(dirDemogr,sep=';')
dfDemogr.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


##### Description of the dataset:

|Column Name|Description|Data Type|
|---|---|---|
|City|City name|string|
|State|US state|string|
|Median Age|Median of the age of the population|double|
|Male Population|Number of males|integer|
|Female Population|Number of females|integer|
|Total Population|Number of the total population|integer|
|Number Veterans|Number of veterans|integer|
|Foreign-born||integer|
|Average Household Size|Average size of the household|double|
|State Code||string|
|Race||string|
|Count||integer|

---

### Step 2: Explore and Assess the Data

Following data sets are available:
- **I94 Immigration Data:** This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
- **World Temperature Data:** This dataset came from Kaggle. You can read more about it here.
- **U.S. City Demographic Data:** This data comes from OpenSoft. You can read more about it here.


In [10]:
# Import the libraries needed for data analysis
import pandas as pd
import os
import configparser

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, isnan
from pyspark.sql.types import *

In [5]:
# Load Configuration Data
config = configparser.ConfigParser()
config.read('config.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
# Create a spark session
spark = SparkSession.builder.config("spark.jars.packages",
                                    "saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.0").\
    enableHiveSupport().getOrCreate()

#### I94 Immigration Data
The immigration data can be accessed in a folder with the following path: `../../data/18-83510-I94-Data-2016/`.There's a file for each month of the year. An example file name is `i94_apr16_sub.sas7bdat`. Each file has a three-letter abbreviation for the month name. So a full file path for June would look like this: `../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat`.

In [29]:
# List all the files in the data repository
os.listdir('../../data/18-83510-I94-Data-2016/')

['i94_apr16_sub.sas7bdat',
 'i94_sep16_sub.sas7bdat',
 'i94_nov16_sub.sas7bdat',
 'i94_mar16_sub.sas7bdat',
 'i94_jun16_sub.sas7bdat',
 'i94_aug16_sub.sas7bdat',
 'i94_may16_sub.sas7bdat',
 'i94_jan16_sub.sas7bdat',
 'i94_oct16_sub.sas7bdat',
 'i94_jul16_sub.sas7bdat',
 'i94_feb16_sub.sas7bdat',
 'i94_dec16_sub.sas7bdat']

In [32]:
# Read in the data of one month
filenameImm = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
dfImmigration = spark.read.format('com.github.saurfang.sas.spark').load(filenameImm)
dfImmigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [33]:
dfImmigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

##### Description of the dataset:

|Column Name|Description|Data Type|
|---|---|---|
|_**cicid**_|Record ID|double|
|**i94yr**|Year of arrival, 4 digits|double|
|**i94mon**|Month of arrival, numeric|double|
|**i94cit**|Citizenship country code|double|
|_**i94res**_|Residence country code|double|
|**i94port**|U.S. port of entry (main gateway ports - all modes and air-only)|string|
|_**arrdate**_|Arrival date in the USA|double|
|**i94mode**|Mode of transportation (1=air, 2=sea, 3=land, 9=not reported)|double|
|_**i94addr**_|First Intended Address in the U.S. (or Address while in the U.S.) (state)|string|
|**depdate**|Departure date from the USA|double|
|**i94bir**|Age of Respondent in Years|double|
|**i94visa**|Visa code (1=business, 2=pleasure, 3=student)|double|
|count|Used for summary statistics|double|
|dtadfile|Character Date Field - Date added to I-94 Files|string|
|visapost|Department of State where where Visa was issued|string|
|entdepa|Arrival Flag - admitted or paroled into the U.S.|string|
|entdepd|Departure Flag - Departed, lost I-94 or is deceased|string|
|matflag| Match flag - Match of arrival and departure records|string|
|**biryear**|Birth year, 4 digits|double|
|**dtaddto**|Character Date Field - Date to which admitted to U.S. (allowed to stay until)|string|
|**gender**|Gender|string|
|**airline**|Airline used to arrive in U.S.|string|
|admnum|Admission Number|double|
|fltno|Flight number of Airline used to arrive in U.S.|string|
|**visatype**|Class of admission legally admitting the non-immigrant to temporarily stay in U.S.|string|

In [38]:
# Count percentage of missing values
total = dfImmigration.count()

nanCountTemp = dfImmigration.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dfImmigration.columns]).toPandas()
nanCount = pd.melt(nanCountTemp, var_name='cols', value_name='values')

nanCount['missing_values_percent'] = nanCount['values']/total*100

nanCount

Unnamed: 0,cols,values,missing_values_percent
0,cicid,0,0.0
1,i94yr,0,0.0
2,i94mon,0,0.0
3,i94cit,0,0.0
4,i94res,0,0.0
5,i94port,0,0.0
6,arrdate,0,0.0
7,i94mode,239,0.007719
8,i94addr,152592,4.928184
9,depdate,142457,4.600859


In [43]:
# Drop columns with missing values > 85%
columnsToDrop = ['occup', 'entdepu','insnum']
dfImmigration = dfImmigration.drop(*columnsToDrop)

In [46]:
# Drop duplicate entries
countAll = dfImmigration.count()

dfImmigrationT = dfImmigration.dropDuplicates(['cicid'])
countWithDupl = dfImmigrationT.count()

print("dfImmigration has " + str(countAll - countWithDupl) + " duplucates.")

dfImmigration has 0 duplucates.


In [47]:
# Drop rows with missing values
dfImmigrationT = dfImmigration.dropna(how='all', subset=['cicid'])
countWithNa = dfImmigrationT.count()

print("dfImmigration has " + str(countAll - countWithNa) + " NaN-values.")

dfImmigration has 0 NaN-values.


The immigration dataset will provide the fact table which will be at the center of the star schema model of our data warehouse. We discovered the columns that can be removed because of the high amount of missing values. Also some columns won't be used because the information is not necessary for our model. The column *cicid* has no duplucates and NaN-values and can be used as a primary index.

#### World Temperature Data

In [48]:
dfTempDir = '../../data2/GlobalLandTemperaturesByCity.csv'
dfTemp = spark.read.csv(dfTempDir, header=True, inferSchema=True)
dfTemp.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [49]:
dfTemp.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



##### Description of the dataset:

|Column Name|Description|Data Type|
|---|---|---|
|dt|Date in format YYYY-MM-DD|timestamp|
|AverageTemperature|Global average land temperature in Celsius|double|
|AverageTemperatureUncertainty|The 95% confidence interval around the average|double|
|City|City name|string|
|Country|Country name|string|
|Latitude|Latitude|string|
|Longitude|Longitude|string|

In [51]:
# Count percentage of missing values
total = dfTemp.count()

dfTemp = dfTemp.withColumn("dt",col("dt").cast(StringType())) # convert dt column type to string

nanCountTemp = dfTemp.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dfTemp.columns]).toPandas()
nanCount = pd.melt(nanCountTemp, var_name='cols', value_name='values')

nanCount['missing_values_percent'] = nan_count['values']/total*100

nanCount

Unnamed: 0,cols,values,missing_values_percent
0,dt,0,0.0
1,AverageTemperature,364130,0.0
2,AverageTemperatureUncertainty,364130,0.0
3,City,0,0.0
4,Country,0,0.0
5,Latitude,0,0.0
6,Longitude,0,0.0


In [52]:
# Drop duplicate entries
countAll = dfTemp.count()

dfTempT = dfTemp.dropDuplicates(subset=['dt', 'City', 'Country'])
countWithDupl = dfTempT.count()

print("dfTemp has " + str(countAll - countWithDupl) + " duplucates.")

dfTemp has 46034 duplucates.


In [53]:
# Drop rows with missing values
dfTempT = dfTemp.dropna(how='all', subset=['AverageTemperature'])
countWithNa = dfTempT.count()

print("dfTemp has " + str(countAll - countWithNa) + " NaN-values.")

dfTemp has 364130 NaN-values.


Since the immigration dataset only has data of the US National Tourism Office in the year of 2016, the  majority of the data here is not necessary for our model. Thereforewe will use an aggregation of the averaged temperature by country over the whole provided period of time. The latitude and the longitude are given for the difference cities therefore we won't use them in the aggregation. We will also generate a look-up table *i94_country* from the file ```I94_SAS_Labels_Descriptions.SAS``` provided in the workspace. In this way we can connect each country with its country code.

#### U.S. City Demographic Data


In [6]:
dfDemogrDir = 'us-cities-demographics.csv'
dfDemogr = spark.read.csv(dfDemogrDir, inferSchema=True, header=True, sep=';')
dfDemogr.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [7]:
dfDemogr.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



##### Description of the dataset:

|Column Name|Description|Data Type|
|---|---|---|
|City|City name|string|
|State|US state|string|
|Median Age|Median of the age of the population|double|
|Male Population|Number of males|integer|
|Female Population|Number of females|integer|
|Total Population|Number of the total population|integer|
|Number Veterans|Number of veterans|integer|
|Foreign-born||integer|
|Average Household Size|Average size of the household|double|
|State Code||string|
|Race||string|
|Count||integer|

In [8]:
# Count percentage of missing values
total = dfDemogr.count()

nanCountTemp = dfDemogr.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dfDemogr.columns]).toPandas()
nanCount = pd.melt(nanCountTemp, var_name='cols', value_name='values')

nanCount['missing_values_percent'] = nanCount['values']/total*100

nanCount

Unnamed: 0,cols,values,missing_values_percent
0,City,0,0.0
1,State,0,0.0
2,Median Age,0,0.0
3,Male Population,3,0.10377
4,Female Population,3,0.10377
5,Total Population,0,0.0
6,Number of Veterans,13,0.449671
7,Foreign-born,13,0.449671
8,Average Household Size,16,0.553442
9,State Code,0,0.0


In [9]:
# Drop duplicate entries
countAll = dfDemogr.count()

dfDemogrT = dfDemogr.dropDuplicates(subset=['City', 'State', 'State Code', 'Race'])
countWithDupl = dfDemogrT.count()

print("dfTemp has " + str(countAll - countWithDupl) + " duplucates.")

dfTemp has 0 duplucates.


The demographics dataset will provide the dimension table with information on different U.S. states. We will create a table containing one row for each state. We will do a pivoting for the race values to have separate columns for each race. The NaN-Values in all the columns will be replaces with zeroes.

---

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

![data model](udacity_capstone.jpg "Data Model" )

#### 3.2 Mapping Out Data Pipelines
The pipeline steps are as follows:

1. Transfer the data from the local directory to S3
- etl_immigration: load immigration data from local repository, create immigration dataframe and date dataframe, load both dataframes to an S3 bucket as parquet-files
- etl_countries: load temperature data from local repository, create countries dataframe and load the dataframe to an S3 bucket as parquet-files
- etl_states: load demographics data from local repository, create states dataframe and load the dataframe to an S3 bucket as parquet-files
2. Transfer the data to a data warehouse
- create fact and dimension tables in Amazon Redshift respectively
- load staging data from S3 to Redshift via Airflow
- data quality checks

---

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

**From immigration data create immigration and date dataframes**

In [None]:
# udf needed for immigration etl
def date_diff(date1, date2):
    '''
    Calculates the difference in days between two dates
    '''
    
    if date2 is None:
        return None
    else:
        a = datetime.strptime(date1, "%Y-%m-%d")
        b = datetime.strptime(date2, "%Y-%m-%d")
        delta = b - a
        return delta.days



# User defined functions using Spark udf wrapper function for date operations
convert_sas_udf = udf(lambda x: x if x is None else (datetime(1960,1,1) + timedelta(days=x)).strftime("%Y-%m-%d"))
date_diff_udf = udf(date_diff)

In [None]:
def etl_immigration(spark, input_path="immigration_data_sample.csv", output_path=None, date_output_path=None,
                         input_format = "csv", columns = ['cicid','i94res','i94cit','arrdate','i94addr','i94yr','i94mon','i94port','i94mode','i94visa','visatype',
                                                          'depdate','dtaddto','entdepa','entdepd','i94bir','biryear','gender','airline'],
                                                          load_size = None, partitionBy = ["i94yr", "i94mon"], **options):    
    """
    Reads the immigration dataset from the input_path, transforms it to the immigration fact table and dimension date table and saves both to the output_path
    output_path.
    
    Args:
        spark (SparkSession): Spark session
        input_path (string): Directory with the input files, default a data sample
        output_path (string): Directory to save immigration output files
        date_output_path (string): Directory to save date output files
        input_format (str): Format of the input files, default to "csv"
        columns (list): List of the columns names to read in
        load_size (int): Number of rows to read for debug purposes
        partitionBy (list): Files will be saved in partitions using the columns of this list
        options: All other string options
    """
    
    # Loads the immigration dataframe using Spark
    # After the data analysis we choose only selected columns straightaway
    dfImmigration = spark.read.format(input_format).load(input_path).select(columns)
    
    # Converts columns to integer
    dfImmigration = dfImmigration.withColumn("cicid", dfImmigration["cicid"].cast(IntegerType()))\
        .withColumn("i94res", dfImmigration["i94res"].cast(IntegerType()))\
        .withColumn("i94cit", dfImmigration["i94cit"].cast(IntegerType()))\
        .withColumn("arrdate", dfImmigration["arrdate"].cast(IntegerType()))\
        .withColumn("i94yr", dfImmigration["i94yr"].cast(IntegerType()))\
        .withColumn("i94mon", dfImmigration["i94mon"].cast(IntegerType()))\
        .withColumn("i94mode", dfImmigration["i94mode"].cast(IntegerType()))\
        .withColumn("i94visa", dfImmigration["i94visa"].cast(IntegerType()))\
        .withColumn("i94bir", dfImmigration["i94bir"].cast(IntegerType()))\
        .withColumn("biryear", dfImmigration["biryear"].cast(IntegerType()))
    
    # Converts SAS date to a string date in the format of YYYY-MM-DD with a udf
    dfImmigration = dfImmigration.withColumn("arrdate", convert_sas_udf(dfImmigration['arrdate']))\
        .withColumn("depdate", convert_sas_udf(dfImmigration['depdate']))
        
    # Creates a new column with the length of the visitor's stay in the US using udf
    dfImmigration = dfImmigration.withColumn('stay', date_diff_udf(dfImmigration.arrdate, dfImmigration.depdate))
    dfImmigration = dfImmigration.withColumn("stay", dfImmigration["stay"].cast(IntegerType()))\
    
    # Generates date dataframe and saves it to the date_output_path
    if date_output_path is not None:
        arrdate = dfImmigration.select('arrdate').distinct()
        depdate = dfImmigration.select('depdate').distinct()
        dates = arrdate.union(depdate)
        dates = dates.withColumn("date", to_date(dates.arrdate))
        dates = dates.withColumn("year", year(dates.date))
        dates = dates.withColumn("month", month(dates.date))
        dates = dates.withColumn("day", dayofmonth(dates.date))
        dates = dates.withColumn("weekofyear", weekofyear(dates.date))
        dates = dates.withColumn("dayofweek", dayofweek(dates.date))
        dates = dates.drop("date").withColumnRenamed('arrdate', 'date')
        dates.select(["date", "year", "month","day","weekofyear","dayofweek"]).write.save(date_output_path,mode= "overwrite",format="parquet")
    
    # Saves the immigration dataframe to the output_path
    if output_path is not None:
        dfImmigration.select(columns).write.save(output_path, mode="overwrite", format="parquet", partitionBy = partitionBy)
    return dfImmigration

**From world temperature data create the country dataframe**

In [None]:
def etl_countries(spark, input_path="../../data2/GlobalLandTemperaturesByCity.csv", output_path=None, 
                         input_format = "csv", columns = '*', load_size = 1000, partitionBy = ["CountryName"], **options):
    """
    Reads the temperature dataset from the input_path, transforms it to the country dimension table and saves it to the output_path.
    
    Args:
        spark (SparkSession): Spark session
        input_path (string): Directory with the input files
        output_path (string): Directory to save immigration output files
        input_format (str): Format of the input files, default to "csv"
        columns (list): List of the columns names to read in
        load_size (int): Number of rows to read for debug purposes
        partitionBy (list): Files will be saved in partitions using the columns of this list
        options: All other string options
    """
    
    # Loads the temperature dataframe using Spark
    dfTemp = spark.read.csv(input_path, header=True, inferSchema=True)
    
    # Aggregates the dataset by country and renames the name of new columns
    dfTempAvg = dfTemp.groupby(["Country"]).avg()\
        .select(["Country","avg(AverageTemperature)"])\
        .withColumnRenamed('Country','CountryName')\
        .withColumnRenamed('avg(AverageTemperature)', 'Temperature')
    
    # Renames some country found in the data analysis to enable a join with the country code list
    dfTempAvg = dfTempAvg.withColumn('CountryName', when(dfTempAvg['CountryName'] == 'Congo (Democratic Republic Of The', 'Congo').otherwise(dfTempAvg['CountryName']))
    dfTempAvg = dfTempAvg.withColumn('CountryName', when(dfTempAvg['CountryName'] == "Côte D'Ivoire", 'Ivory Coast').otherwise(dfTempAvg['CountryName']))
    
    # Creates a column with the country name in all lower letters to enable the join with the country code list
    dfTempAvg = dfTempAvg.withColumn('Country_Lower', lower(dfTempAvg.CountryName))
    
    # Loads the table of countries and their country code
    i94_country = spark.read.load("I94_Country.csv", format="csv", sep=";", header=True).select(['Code','Country'])
    
    # Renames some country found in the data analysis to enable a join with the countries from the temperature data frame
    i94_country = i94_country.withColumn('Country', when(i94_country['Country'] == 'BOSNIA-HERZEGOVINA', 'BOSNIA AND HERZEGOVINA').otherwise(i94_country['Country']))
    i94_country = i94_country.withColumn('Country', when(i94_country['Country'] == "INVALID: CANADA", 'CANADA').otherwise(i94_country['Country']))
    i94_country = i94_country.withColumn('Country', when(i94_country['Country'] == "CHINA, PRC", 'CHINA').otherwise(i94_country['Country']))
    i94_country = i94_country.withColumn('Country', when(i94_country['Country'] == "GUINEA-BISSAU", 'GUINEA BISSAU').otherwise(i94_country['Country']))
    i94_country = i94_country.withColumn('Country', when(i94_country['Country'] == "INVALID: PUERTO RICO", 'PUERTO RICO').otherwise(i94_country['Country']))
    i94_country = i94_country.withColumn('Country', when(i94_country['Country'] == "INVALID: UNITED STATES", 'UNITED STATES').otherwise(i94_country['Country']))
    i94_country = i94_country.withColumn('Country', when(i94_country['Country'] == "MEXICO Air Sea, and Not Reported (I-94, no land arrivals)", 'MEXICO').otherwise(i94_country['Country']))
    
    # Creates a column with the country name in all lower letters to enable the join with the country code list
    i94_country = i94_country.withColumn('i94_Country_Lower', lower(i94_country.Country))
    i94_country = i94_country.withColumn('Code', i94_country['Code'].cast(IntegerType()))
    
    # Joins both country tables to create a country dimension table
    i94_country = i94_country.join(dfTempAvg, i94_country.i94_Country_Lower == dfTempAvg.Country_Lower, how="left")
    
    dfCountry = i94_country.withColumn("CountryName1", initcap(col("i94_Country_Lower")))\
        .select(["Code","CountryName1","Temperature"])\
        .withColumnRenamed("CountryName1","CountryName")
    
    # Saves the country dataframe to the output_path
    if output_path is not None:
        dfCountry.select(columns).write.save(output_path, mode="overwrite", format="parquet", partitionBy = partitionBy)
    return dfCountry

**From demographics data create the state dataframe**

In [None]:
def etl_states(spark, input_path="us-cities-demographics.csv", output_path=None, 
                         input_format = "csv", columns='*', load_size = None, partitionBy = ["StateCode"], sep=";", **options):
    """
    Reads the cities demographics dataset from the input_path, transforms it to the state dimension table and saves it to the output_path.
    
    Args:
        spark (SparkSession): Spark session
        input_path (string): Directory with the input files
        output_path (string): Directory to save immigration output files
        input_format (str): Format of the input files, default to "csv"
        columns (list): List of the columns names to read in
        load_size (int): Number of rows to read for debug purposes
        partitionBy (list): Files will be saved in partitions using the columns of this list
        sep (string): Separator for reading the csv-file
        options: All other string options
    """
    
    # Loads the demografics dataframe using Spark
    dfDemogr = spark.read.csv(input_path, inferSchema=True, header=True, sep=sep)
    
    # Aggregates some values to the city level: Since those values are repeated in every row with a certain city, we take the min
    dfDemogrAggr = dfDemogr.groupby(["City","State","State Code"]).min()\
        .withColumnRenamed('min(Median Age)', 'MedianAge')\
        .withColumnRenamed('min(Average Household Size)', 'AverageHouseholdSize')\
        .withColumnRenamed('min(Male Population)', 'MalePopulation')\
        .withColumnRenamed('min(Female Population)', 'FemalePopulation')\
        .withColumnRenamed('min(Total Population)', 'TotalPopulation')\
        .withColumnRenamed('min(Number of Veterans)', 'NumberOfVeterans')\
        .withColumnRenamed('min(Foreign-born)', 'ForeignBorn')\
        .select(["City","State","State Code","MedianAge","AverageHouseholdSize","MalePopulation","FemalePopulation",\
                 "TotalPopulation","NumberOfVeterans","ForeignBorn"])
    
    # Creates a pivot for race to aggregate the numbers to one row for each city
    dfDemogrPiv = dfDemogr.groupBy(["City", "State", "State Code"]).pivot("Race").sum("Count")\
        .withColumnRenamed('American Indian and Alaska Native', 'AmericanIndianAndAlaskaNative')\
        .withColumnRenamed('Black or African-American', 'BlackOrAfricanAmerican')\
        .withColumnRenamed('Hispanic or Latino', 'HispanicOrLatino')
    
    # Joins both tables
    dfDemogrF = dfDemogrAggr.join(other=dfDemogrPiv, on=["City", "State", "State Code"], how="inner")\
        .withColumnRenamed('State Code', 'StateCode')
    
    # Replaces missing values with 0
    dfDemogrF = dfDemogrF.fillna(0, ['MedianAge', 'AverageHouseholdSize', 'MalePopulation', 'FemalePopulation', 'TotalPopulation', 'NumberOfVeterans', 'ForeignBorn',
                                     'AmericanIndianAndAlaskaNative', 'Asian', 'BlackOrAfricanAmerican', 'HispanicOrLatino', 'White'])
    
    # Converts columns to integer
    dfDemogrF = dfDemogrF.withColumn('MalePopulation', dfDemogrF['MalePopulation'].cast(IntegerType()))\
        .withColumn('FemalePopulation', dfDemogrF['FemalePopulation'].cast(IntegerType()))\
        .withColumn('TotalPopulation', dfDemogrF['TotalPopulation'].cast(IntegerType()))\
        .withColumn('NumberOfVeterans', dfDemogrF['NumberOfVeterans'].cast(IntegerType()))\
        .withColumn('ForeignBorn', dfDemogrF['ForeignBorn'].cast(IntegerType()))\
        .withColumn('AmericanIndianAndAlaskaNative', dfDemogrF['AmericanIndianAndAlaskaNative'].cast(IntegerType()))\
        .withColumn('Asian', dfDemogrF['Asian'].cast(IntegerType()))\
        .withColumn('BlackOrAfricanAmerican', dfDemogrF['BlackOrAfricanAmerican'].cast(IntegerType()))\
        .withColumn('HispanicOrLatino', dfDemogrF['HispanicOrLatino'].cast(IntegerType()))\
        .withColumn('White', dfDemogrF['White'].cast(IntegerType()))
    
    # To aggregate the columns to the state level we have to recalculate the average considering number of total population in every city
    dfDemogrAvg = dfDemogrF.withColumn("MedianAgeSum", dfDemogrF.MedianAge * dfDemogrF.TotalPopulation)\
        .withColumn("AverageHouseholdSizeSum", dfDemogrF.AverageHouseholdSize * dfDemogrF.TotalPopulation)  
    dfDemogrSum = dfDemogrAvg.groupby(["State","StateCode"]).sum()\
        .select(["State","StateCode","sum(TotalPopulation)","sum(MalePopulation)","sum(FemalePopulation)","sum(ForeignBorn)","sum(NumberOfVeterans)",
                 "sum(AmericanIndianAndAlaskaNative)","sum(Asian)","sum(BlackOrAfricanAmerican)","sum(HispanicOrLatino)","sum(White)",
                 "sum(MedianAgeSum)","sum(AverageHouseholdSizeSum)"])
    dfDemogrState = dfDemogrSum.withColumn("MedianAgeNew", (dfDemogrSum["sum(MedianAgeSum)"]/dfDemogrSum["sum(TotalPopulation)"]).cast(DoubleType()))\
        .withColumn("AverageHouseholdSizeNew", (dfDemogrSum["sum(AverageHouseholdSizeSum)"]/dfDemogrSum["sum(TotalPopulation)"]).cast(DoubleType()))
    
    # Renames the columns
    dfDemogrState = dfDemogrState.withColumnRenamed("sum(MalePopulation)", "MalePopulation")\
        .withColumnRenamed("sum(FemalePopulation)", "FemalePopulation")\
        .withColumnRenamed("sum(TotalPopulation)", "TotalPopulation")\
        .withColumnRenamed("sum(ForeignBorn)", "ForeignBorn")\
        .withColumnRenamed("sum(NumberOfVeterans)", "NumberOfVeterans")\
        .withColumnRenamed("sum(AmericanIndianAndAlaskaNative)", "AmericanIndianAndAlaskaNative")\
        .withColumnRenamed("sum(Asian)", "Asian")\
        .withColumnRenamed("sum(BlackOrAfricanAmerican)", "BlackOrAfricanAmerican")\
        .withColumnRenamed("sum(HispanicOrLatino)", "HispanicOrLatino")\
        .withColumnRenamed("sum(White)", "White")
    
    dfState = dfDemogrState.drop("sum(MedianAgeSum)","sum(AverageHouseholdSizeSum)")\
        .withColumnRenamed("MedianAgeNew", "MedianAge")\
        .withColumnRenamed("AverageHouseholdSizeNew", "AverageHouseholdSize")
    
    # Saves the state dataframe to the output_path
    if output_path is not None:
        dfState.select(columns).write.save(output_path, mode="overwrite", format="parquet", partitionBy = partitionBy)
    return dfState

Given the output_path directories, the etl-functions perform the first step of our data pipeline - transfer the data from local directories to Amazon S3.

In [None]:
if __name__ == "__main__" :
    spark = create_spark_session()
    
    # Perform ETL process to the immigration data and load it into s3 bucket
    immigration = etl_immigration(spark, input_path='../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',
                                     output_path=output_data+"immigration.parquet",
                                     date_output_path=output_data+"date.parquet",
                                     input_format = "com.github.saurfang.sas.spark", 
                                     load_size=1000, partitionBy=None)
    
    # Perform ETL process to the immigration data and load it into s3 bucket
    countries = etl_countries(spark, output_path=output_data + "country.parquet")
    
    # Perform ETL process to the immigration data and load it into s3 bucket
    states = etl_states(spark, output_path=output_data + "state.parquet")

With Apache Airflow we now can insert the data into created tables on Amazon Redshift. The quality check takes place in the Airflow routine.

Unfortunately I cannot upload a picture of the airflow tree or execution plan since the stupid workspace shows an error today which didn't exist yesterday and I honestly don't have anymore nerves for this stuff.

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Data check is performed within the Airflow dag
def execute(self, context):
    redshift_hook = PostgresHook(self.redshift_conn_id)

    for table in self.tables:
        records = redshift_hook.get_records(f"SELECT COUNT(*) FROM {table}")
        if len(records) < 1 or len(records[0]) < 1:
            raise ValueError(f"Data quality check failed. {table} returned no results")
        num_records = records[0][0]
        if num_records < 1:
            raise ValueError(f"Data quality check failed. {table} contained 0 rows")

        self.log.info(f"Data quality on table {table} check passed with {records[0][0]} records")

#### 4.3 Data dictionary 
*Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.*

Data dictionary was already repeated throughtout the whole project multiple times but I copy and paste it again with pleasure.

##### Immigration face table:

|Column Name|Description|
|---|---|
|_**cicid**_|Record ID|
|**i94yr**|Year of arrival, 4 digits|
|**i94mon**|Month of arrival, numeric|
|**i94cit**|Citizenship country code|
|_**i94res**_|Residence country code|
|**i94port**|U.S. port of entry (main gateway ports - all modes and air-only)|
|_**arrdate**_|Arrival date in the USA|
|**i94mode**|Mode of transportation (1=air, 2=sea, 3=land, 9=not reported)|
|_**i94addr**_|First Intended Address in the U.S. (or Address while in the U.S.) (state)|
|**depdate**|Departure date from the USA|
|**i94bir**|Age of Respondent in Years|
|**i94visa**|Visa code (1=business, 2=pleasure, 3=student)|
|**visapost**|Department of State where where Visa was issued|
|**entdepa**|Arrival Flag - admitted or paroled into the U.S.|
|**entdepd**|Departure Flag - Departed, lost I-94 or is deceased|
|**biryear**|Birth year, 4 digits|
|**dtaddto**|Character Date Field - Date to which admitted to U.S. (allowed to stay until)|
|**gender**|Gender|
|**airline**|Airline used to arrive in U.S.|
|**visatype**|Class of admission legally admitting the non-immigrant to temporarily stay in U.S.|

##### Country dimension table:

|Column Name|Description|
|---|---|
|CountryCode|Country code|
|Country|Country name|
|AverageTemperature|Average land temperature of the country in Celsius|

##### State dimension dataset:

|Column Name|Description|
|---|---|
|StateCode|State Code|
|State|US state|
|Total Population|Number of the total population|
|Male Population|Number of males|
|Female Population|Number of females|
|Number Veterans|Number of veterans|
|Foreign-born|Number of residents that born outside th United States|
|Median Age|Median of the age of the population|
|Average Household Size|Average size of the household|
|AmericanIndianAndAlaskaNative|Number of residents of the race American Indian And Alaska Native|
|Asian|Number of residents of the race Asian|
|BlackOrAfricanAmerican|Number of residents of the race Black Or African American|
|HispanicOrLatino|Number of residents of the race Hispanic Or Latino|
|White|Number of residents of the race White|

##### Date dimension table:

|Column Name|Description|
|---|---|
|date|Date in the format YYYY-MM-DD|
|year|Four digit year|
|month|Two digit month|
|day|Two digit day|
|weekofyear|The week of the year|
|dayofweek|The day of the week|

#### Step 5: Complete Project Write Up
*Clearly state the rationale for the choice of tools and technologies for the project.*
* Aparche Spark was used because of its speed with large scale data processing, its ability to hangle multiple file formats. It also comes packaged with higher-level libraries, including support for SQL queries, and much more
* Amazon S3 was used because it provides a relatively cheap, easy-to-use with scalability, high availability, security, and performance solution for the staging of big data
* Since we were using AWS products, we also used Redshift - a parallel, column-oriented data warehouse with easy-scale functionality

*Propose how often the data should be updated and why.*
* The data should be updated monthly respectively to the I94 Immigration data update.

*Write a description of how you would approach the problem differently under the following scenarios:*
1. The data was increased by 100x.
* Increasing of the data should not be a problem for Spark. We could consider increasing the number of nodes we are using in our EMR cluster of AWS and we could use a bigger node for Amazon Redshift. Both solutions are provided by AWS.
 
2. The data populates a dashboard that must be updated on a daily basis by 7am every day.
* Airflow should run daily before 7am to provide necessary data for the dashboard. The fact table is only updated monthly, but depending on the dashboard aggregations it makes sence to schedule a daily update.

3. The database needed to be accessed by 100+ people.
* With Amazon Redshift we can quickly add or remove nodes. This allows us to get better performance and more storage for demanding workloads.