# Project Title
### Data Engineering Capstone Project

#### Project Summary
The purpose of the project is to build an ETL pipeline that extracts data from multiple data sets and transforms them into analytical tables containing information on immigration to United States. The analytical tables are stored in a data lake on S3. For purpose of this workspace, the data is being stored in the output folder in this workspace. The output folder can be imagined as a replacement for a S3 bucket. 

The project follows the steps outlined below:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
The goal of the project is to build a datalake on Amazon S3 containing analytical tables. This project uses Udacity provides datasets and an external dataset on airline routes. 

Datasets - 
1. Immigration data from 2016 
2. Demographics of US cities
3. Airport data for airports acros  
4. Temperature data 
5. Airline Routes

Analyze immigration data and build a data lake containing aggregated data  that provides information on the people immigrating in and out of US, what countries they are coming from, which cities they are settling in, what are the popular point of entries. etc. 
 
The end solution is a data lake  containing the following tables - 
1. immigration_data_with_demographics - Demographics data grouped by states is joined with immigration data for comparing how immigration impacts a state's demographics. 


Tools used for this project are - 
1. Jupyter Notebooks 
2. pandas library for preliminary data analysis
3. Apache Spark for data processing and transformation of the data 

#### Describe and Gather Data 
The sections below I used the pandas library for exploring the datasets and looking at the data samples. 

#### Immigration Data for 2016
This data comes from the US National Tourism and Trade Office. The dataset is composed of files in SASB7DAT format. There's a file for each month of the year. The files in the project workspace in the data/18-83510-I94-Data-2016 folder. 
The data dictionary for this dataset is provided in the file I94_SAS_Labels_Descriptions.SAS. 

In [1]:
from datetime import datetime, timedelta, date
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, IntegerType, StringType
from pyspark.sql.functions import col, udf, year, date_format, sum, avg

In [2]:
# Here's example data from one file 
immi_df = pd.read_sas('file://localhost/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat', 
                          format='sas7bdat', 
                          index=None,
                          encoding='ISO-8859-1', 
                          chunksize=None, 
                          iterator=False
                     )

In [4]:
immi_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [6]:
immi_df.shape

(3096313, 28)

#### World Temperature Data 
This dataset comes from Kaggle datasets. It is available at https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data. The dataset is provided in the project workspace as GlobalLandTemperaturesByCity.csv. 

In [7]:
weather_data_fname = '../../data2/GlobalLandTemperaturesByCity.csv'
weather_df = pd.read_csv(weather_data_fname)

In [8]:
weather_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


In [9]:
weather_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


#### Airport Codes
This is a dataset of airport codes (IATA and GPS codes) from across the globe. It also includes name and type of the airport and the region and municipality in which it is located. 
It comes from https://datahub.io/core/airport-codes#data. 

In [10]:
airport_codes_fname = 'airport-codes_csv.csv'
airport_codes_df = pd.read_csv(airport_codes_fname)

In [11]:
airport_codes_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [12]:
airport_codes_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


#### Demographics Data for US Cities 
This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey. 
The dataset is available at https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/. 

In [13]:
demographics_fname = 'us-cities-demographics.csv'
demographics_df = pd.read_csv(demographics_fname, sep=';')

In [14]:
demographics_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


In [15]:
demographics_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


#### Flight Route Database
A database of 59,036 flight routes. As of January 2012, the OpenFlights/Airline Route Mapper Route Database contains 59036 routes between 3209 airports on 531 airlines spanning the globe. 
Source of dataset - https://www.kaggle.com/open-flights/flight-route-database

In [16]:
flight_routes_fname = 'routes.csv'
flight_routes_df = pd.read_csv(flight_routes_fname, sep=',')

In [17]:
flight_routes_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 67663 entries, 0 to 67662
Data columns (total 9 columns):
airline                   67663 non-null object
airline_id                67663 non-null object
source_airport            67663 non-null object
source_airport_id         67663 non-null object
destination_airport       67663 non-null object
destination_airport_id    67663 non-null object
codeshare                 14597 non-null object
stops                     67663 non-null int64
equipment                 67645 non-null object
dtypes: int64(1), object(8)
memory usage: 4.6+ MB


In [18]:
flight_routes_df.head()

Unnamed: 0,airline,airline_id,source_airport,source_airport_id,destination_airport,destination_airport_id,codeshare,stops,equipment
0,2B,410,AER,2965,KZN,2990,,0,CR2
1,2B,410,ASF,2966,KZN,2990,,0,CR2
2,2B,410,ASF,2966,MRV,2962,,0,CR2
3,2B,410,CEK,2968,KZN,2990,,0,CR2
4,2B,410,CEK,2968,OVB,4078,,0,CR2


#### Convert the data from SAS to Parquet
All subsequent work in this project will be done in Spark. In this step, we read in the SAS file and save it in Parquet in the sas_data folder in the project workspace. 

In [19]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [20]:
df_spark=spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [21]:
df_spark.count()

3096313

In [22]:
df_spark.write.parquet("sas_data", mode='overwrite')

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

1. Many columns in the immigration data set are sparsely populated. As can be seen from the output of the count on April 2016 dataset, there are many null values in columns insnum, occup, entdepu. We will drop these columns and some other columns that are not required in the analysis tables from the dataset. 
2. Remove null values from columns of interest. 

#### Cleaning Steps
Document steps necessary to clean the data
1. Drop duplicate rows from dataset if any. 
2. From immigration data set, drop columns not relevant for analysis e.g. insnum, occup etc.  
3. For the immigration dataset drop the rows with nulls in columns are relevant for analyis.  
4. For airport data set, only consider records with non null iata_code. 
5. Weather dataset has records from 1700. Filter out the dataset to only contain records from yhear 2000 onwards. 

In [23]:
immigration_df=spark.read.parquet("sas_data")

In [24]:
# Performing cleaning tasks here
immigration_df = immigration_df.dropDuplicates()
immigration_df = immigration_df.drop('insnum', 'occup', 'entdepa', 'entdepd', 'entdepu')
# drop rows which have null values in columns relevant for analysis. 
immigration_df = immigration_df.na.drop(how='any', thresh=None, 
                             subset=['i94mode', 'i94addr', 'i94bir', 'i94cit', 'i94res', 'i94visa', 'biryear', 'gender', 'airline', 'fltno', 'visatype']
                            )

In [25]:
airport_codes_df = spark.read.format("csv")\
                      .option("header", "true")\
                      .option("inferSchema", "true")\
                      .load("/home/workspace/airport-codes_csv.csv")

In [26]:
airport_codes_df = airport_codes_df.dropna(how='any', subset=['iata_code'])

In [27]:
flight_routes_df = spark.read.format("csv")\
                      .option("header", "true")\
                      .option("inferSchema", "true")\
                      .load("/home/workspace/routes.csv")

In [28]:
flight_routes_df.dropDuplicates()
flight_routes_df.count()

67663

In [29]:
flight_routes_df.columns

['airline',
 'airline_id',
 'source_airport',
 'source_airport_id',
 'destination_airport',
 'destination_airport_id',
 'codeshare',
 'stops',
 'equipment']

In [30]:
demographics_df = spark.read.format("csv")\
                     .option("sep", ";")\
                     .option("header", "true")\
                     .option("inferSchema", "true")\
                     .load("/home/workspace/us-cities-demographics.csv")

In [31]:
demographics_df = demographics_df.sort(demographics_df['State Code'], demographics_df['Race'])

In [32]:
demographics_df = demographics_df.withColumnRenamed('State', 'state')\
                                 .withColumnRenamed('State Code', 'state_code')\
                                 .withColumnRenamed('City', 'city')\
                                 .withColumnRenamed('Median Age', 'median_age')\
                                 .withColumnRenamed('Male Population', 'male_population')\
                                 .withColumnRenamed('Female Population', 'female_population')\
                                 .withColumnRenamed('Total Population', 'total_population')\
                                 .withColumnRenamed('Number of Veterans', 'number_of_veterans')\
                                 .withColumnRenamed('Foreign-born', 'foreign_born')\
                                 .withColumnRenamed('Average Household Size', 'average_household_size')\
                                 .withColumnRenamed('Race', 'race')\
                                 .withColumnRenamed('Count', 'count')                   

In [33]:
demographics_df.head()

Row(city='Anchorage', state='Alaska', median_age=32.2, male_population=152945, female_population=145750, total_population=298695, number_of_veterans=27492, foreign_born=33258, average_household_size=2.77, state_code='AK', race='American Indian and Alaska Native', count=36339)

In [34]:
weather_df = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .load("../../data2/GlobalLandTemperaturesByCity.csv")

In [35]:
weather_df.count()

8599212

In [36]:
weather_df = weather_df.filter(year(weather_df.dt)>=2000) 

In [37]:
weather_df.count()

579150

In [38]:
weather_df = weather_df.withColumnRenamed('dt', 'dt')\
                       .withColumnRenamed('AverageTemperature', 'average_temperature')\
                       .withColumnRenamed('AverageTemperatureUncertainty', 'average_temperature_uncertainty')\
                       .withColumnRenamed('City', 'city')\
                       .withColumnRenamed('Country', 'country')\
                       .withColumnRenamed('Latitude', 'latitude')\
                       .withColumnRenamed('Longitude', 'longitude')

In [39]:
weather_df.head()

Row(dt=datetime.datetime(2000, 1, 1, 0, 0), average_temperature=3.065, average_temperature_uncertainty=0.37200000000000005, city='Århus', country='Denmark', latitude='57.05N', longitude='10.33E')

In [40]:
ports_df = spark.read.format("csv")\
                    .option("header", "true")\
                    .option("inferSchema", "true")\
                    .load("/home/workspace/port_mappings.csv")

In [41]:
trim = udf(lambda c: c.strip())
ports_df = ports_df.withColumn('airport_code', trim(ports_df['airport_code'])) \
                    .withColumn('city', trim(ports_df['city'])) \
                    .withColumn('state', trim(ports_df['state']))
ports_df = ports_df.withColumnRenamed('airport_code', 'port_of_entry_code')

In [42]:
ports_df.show()

+------------------+--------------------+-----+
|port_of_entry_code|                city|state|
+------------------+--------------------+-----+
|               ALC|               ALCAN|   AK|
|               ANC|           ANCHORAGE|   AK|
|               BAR|BAKER AAF - BAKER...|   AK|
|               DAC|       DALTONS CACHE|   AK|
|               PIZ|DEW STATION PT LA...|   AK|
|               DTH|        DUTCH HARBOR|   AK|
|               EGL|               EAGLE|   AK|
|               FRB|           FAIRBANKS|   AK|
|               HOM|               HOMER|   AK|
|               HYD|               HYDER|   AK|
|               JUN|              JUNEAU|   AK|
|               5KE|           KETCHIKAN|   AK|
|               KET|           KETCHIKAN|   AK|
|               MOS|MOSES POINT INTER...|   AK|
|               NIK|             NIKISKI|   AK|
|               NOM|                 NOM|   AK|
|               PKC|         POKER CREEK|   AK|
|               ORI|      PORT LIONS SPB

In [43]:
countries_df=spark.read.format("csv")\
                    .option("header", "true")\
                    .option("inferSchema", "true")\
                    .load("/home/workspace/countries.csv")

In [44]:
countries_df.head(6)

[Row(country_code=582, country='MEXICO Air Sea and Not Reported (I-94 no land arrivals)'),
 Row(country_code=236, country='AFGHANISTAN'),
 Row(country_code=101, country='ALBANIA'),
 Row(country_code=316, country='ALGERIA'),
 Row(country_code=102, country='ANDORRA'),
 Row(country_code=324, country='ANGOLA')]

In [45]:
output_path = '/home/workspace/output/'

In [46]:
flight_routes_df.write.csv(os.path.join(output_path, 'flight_routes'), mode='overwrite', header=True)
demographics_df.write.csv(os.path.join(output_path, 'demographics_data'), mode='overwrite', header=True) 
weather_df.write.csv(os.path.join(output_path, 'weather_data'), mode='overwrite', header=True) 
ports_df.write.csv(os.path.join(output_path, 'ports_data'), mode='overwrite', header=True) 
countries_df.write.csv(os.path.join(output_path, 'countries_data'), mode='overwrite', header=True) 

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

<img src="dend_capstone_data_model-analytics2.png">


##### 3.1.1 Analytical Tables 
1. immigration_data_with_demographics
Data is stored in output/immigration_demographics/ folder. It is partitioned by immigration year and immigration month.  The data is formed by a join between immigration data and the grouped state demographics. This table can be used to run queries on immigration data to compare with demographic information in a state. 

2. immigration_analysis_based_on_airlines
Data is stored in output/immigration_airlines folder. It is partitioned by immigration year and immigration month. This data is formed by joining immigration dataset with a data set of airlines and flights to US from countries outside US. This dataset can be of use in analyzing the countries and cities from where visitors travel to US. 


#### 3.2 Mapping Out Data Pipelines
The data will be pipelined into a datalake using ELT approach


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [47]:
# Read all the needed datasets 
demographics_data_df = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .load(os.path.join(output_path, 'demographics_data'))

In [48]:
demographics_data_df.head()

Row(city='Folsom', state='California', median_age=40.9, male_population=41051, female_population=35317, total_population=76368, number_of_veterans=4187, foreign_born=13234, average_household_size=2.62, state_code='CA', race='American Indian and Alaska Native', count=998)

In [49]:
demographics_df_by_state = demographics_data_df.groupBy('state_code').agg(
    avg('median_age'),
    sum('male_population'), sum('female_population'), sum('total_population'),
    sum('number_of_veterans'), sum('foreign_born'), avg('average_household_size'), sum('count')                                                           
).sort(
    'state_code'
)
demographics_df_by_state = demographics_df_by_state.select(
     demographics_df_by_state['state_code'],
     demographics_df_by_state['avg(median_age)'].alias('average_median_age'), 
     demographics_df_by_state['sum(male_population)'].alias('total_male_population'),
     demographics_df_by_state['sum(female_population)'].alias('total_female_population'),
     demographics_df_by_state['sum(total_population)'].alias('total_population'),
     demographics_df_by_state['sum(number_of_veterans)'].alias('total_number_of_veterans'),
     demographics_df_by_state['sum(foreign_born)'].alias('total_foreign_born'),
     demographics_df_by_state['avg(average_household_size)'].alias('average_household_size'),
     demographics_df_by_state['sum(count)'].alias('total_count_by_race')   
)

In [50]:
demographics_df_by_state.head()

Row(state_code='AK', average_median_age=32.2, total_male_population=764725, total_female_population=728750, total_population=1493475, total_number_of_veterans=137460, total_foreign_born=166290, average_household_size=2.77, total_count_by_race=336228)

In [51]:
# UDF for converting SAS numeric dates to ISO date format of YYYY-MM-DD
def convert_sasdate(sas_date):
    try:
        obj_date = date(1960,1,1) + timedelta(days=sas_date)
        return obj_date.isoformat()
    except:
        return '' 
convert_date_udf=udf(convert_sasdate, StringType())   

In [52]:
# UDF for getting country name from country codes in I94res or I94port tables
def get_country_name(country_code_from_i94_record):
    try:
        #return countries_df.filter(countries_df.country_code==country_code_from_i94_record).first().country
        return countries_dict.get(country_code_from_i94_record)
    except:
        return ''
get_country_name_udf = udf(get_country_name, StringType())

In [53]:
countries_list = countries_df.rdd.map(lambda row: row.asDict()).collect()
countries_dict = {}
for cc in countries_list:
    countries_dict[cc['country_code']]=cc['country']

In [54]:
immigration_df = immigration_df.withColumn('cicid', immigration_df['cicid'].cast(LongType())) \
                       .withColumn('I94yr', immigration_df['i94yr'].cast(IntegerType())) \
                       .withColumn('I94mon', immigration_df['i94mon'].cast(IntegerType())) \
                       .withColumn('I94cit', immigration_df['i94cit'].cast(IntegerType())) \
                       .withColumn('I94res', immigration_df['i94res'].cast(IntegerType())) \
                       .withColumnRenamed('i94port', 'I94port') \
                       .withColumn('I94mode', immigration_df['i94mode'].cast(IntegerType())) \
                       .withColumn('I94bir', immigration_df['i94bir'].cast(IntegerType())) \
                       .withColumn('biryear', immigration_df['biryear'].cast(IntegerType())) \
                       .withColumn('I94visa', immigration_df['i94visa'].cast(IntegerType())) \
                       .withColumn('admnum', immigration_df['admnum'].cast(LongType()))

In [55]:
immigration_df.head()

Row(cicid=1508, I94yr=2016, I94mon=4, I94cit=104, I94res=104, I94port='NYC', arrdate=20545.0, I94mode=1, i94addr='NY', depdate=20552.0, I94bir=16, I94visa=2, count=1.0, dtadfile='20160401', visapost=None, matflag='M', biryear=2000, dtaddto='06292016', gender='F', airline='LX', admnum=55416411533, fltno='00016', visatype='WT')

In [56]:
immigration_df = immigration_df.withColumn('I94res', get_country_name_udf(immigration_df['I94res']))

In [57]:
immigration_df =  immigration_df.withColumn('I94cit', get_country_name_udf(immigration_df['I94cit']))

In [58]:
immigration_df.count()

2492328

#### Build the table immigration_data_with_demographics

In [59]:
immigration_data_with_demographics=immigration_df.select(
    'cicid', 'I94yr', 'I94mon', 'I94cit', 'I94res', 'I94port', 'i94addr', 'I94bir', 'gender'
).join(
    demographics_df_by_state, 
    immigration_df.i94addr==demographics_df_by_state.state_code
)

In [60]:
immigration_data_with_demographics = immigration_data_with_demographics.select(
    immigration_data_with_demographics.cicid.alias('immigration_id'),
    immigration_data_with_demographics.I94yr.alias('immigration_year'),
    immigration_data_with_demographics.I94mon.alias('immigration_month'), 
    immigration_data_with_demographics.I94cit.alias('citizenship'),
    immigration_data_with_demographics.I94res.alias('country_of_residence'), 
    immigration_data_with_demographics.I94port.alias('port_of_entry'),
    immigration_data_with_demographics.i94addr.alias('destination_state_on_i94'), 
    immigration_data_with_demographics.I94bir.alias('age'), 
    immigration_data_with_demographics.gender, 
    immigration_data_with_demographics.state_code,
    immigration_data_with_demographics.average_median_age,
    immigration_data_with_demographics.total_male_population,
    immigration_data_with_demographics.total_female_population,
    immigration_data_with_demographics.total_population,
    immigration_data_with_demographics.total_number_of_veterans,
    immigration_data_with_demographics.total_foreign_born, 
    immigration_data_with_demographics.average_household_size,
    immigration_data_with_demographics.total_count_by_race
)

In [61]:
immigration_data_with_demographics.head()

Row(immigration_id=1508, immigration_year=2016, immigration_month=4, citizenship='BELGIUM', country_of_residence='BELGIUM', port_of_entry='NYC', destination_state_on_i94='NY', age=16, gender='F', state_code='NY', average_median_age=35.57037037037038, total_male_population=23422799, total_female_population=25579256, total_population=49002055, total_number_of_veterans=1019097, total_foreign_born=17186873, average_household_size=2.77037037037037, total_count_by_race=11377068)

In [62]:
immigration_data_with_demographics.count()

2378241

In [63]:
immigration_data_with_demographics.write.partitionBy('immigration_year', 'immigration_month').parquet(
    os.path.join(output_path, 'immigration_demographics/'), 
    mode='overwrite'
)

#### Build the table immigration_data_with_flight_info

In [64]:
immigration_df = immigration_df.withColumn('arrdate', convert_date_udf(immigration_df['arrdate']))

In [65]:
immigration_df = immigration_df.withColumn('depdate', convert_date_udf(immigration_df['depdate']))

In [66]:
# Filter airports data only on large airports. The assumption here is that iinternational flights take off from 
# the airports categorized as large_airport in the dataset. 
airport_codes_df = airport_codes_df.filter(
    airport_codes_df.iata_code.isNotNull()
).filter(
    airport_codes_df.type == 'large_airport'
)
# Create a list of airport codes for US airports 
airports_in_us = airport_codes_df.filter(airport_codes_df.iso_country=='US').select('iata_code').distinct().collect()
airports_in_us_list = [row.iata_code for row in airports_in_us]

In [69]:
large_airports_outside_us = airport_codes_df.filter(airport_codes_df.iso_country != 'US')
flights_from_large_airports_outside_us = large_airports_outside_us.join(
    flight_routes_df, 
    large_airports_outside_us.iata_code==flight_routes_df.source_airport
)
flights_to_airports_in_us=flights_from_large_airports_outside_us.where(
    flights_from_large_airports_outside_us.destination_airport.isin(airports_in_us_list)
)
flights_to_airports_in_us = flights_to_airports_in_us.select(
            'iata_code',  'name', 'iso_country', 'iso_region', 'municipality',
            flights_to_airports_in_us.airline.alias('airline_for_flights_to_us'), 
            'source_airport', 
            'destination_airport'
)

In [70]:
immigration_analysis_based_on_airlines = immigration_df.select(
    'cicid', 'I94yr', 'I94mon', 'I94cit', 'I94res', 'I94port', 'I94bir',  'gender', 'arrdate', 'depdate', 'airline', 'fltno'
).join(
    flights_to_airports_in_us, 
    immigration_df.airline==flights_to_airports_in_us.airline_for_flights_to_us
)

In [71]:
immigration_analysis_based_on_airlines = immigration_analysis_based_on_airlines.select(
    immigration_analysis_based_on_airlines.cicid.alias('immigration_id'),
    immigration_analysis_based_on_airlines.I94yr.alias('immigration_year'),
    immigration_analysis_based_on_airlines.I94mon.alias('immigration_month'), 
    immigration_analysis_based_on_airlines.I94cit.alias('citizenship'),
    immigration_analysis_based_on_airlines.I94res.alias('country_of_residence'), 
    immigration_analysis_based_on_airlines.I94port.alias('port_of_entry'),
    immigration_analysis_based_on_airlines.gender,
    immigration_analysis_based_on_airlines.arrdate.alias('arrival_date'),
    immigration_analysis_based_on_airlines.depdate.alias('departure_date'), 
    immigration_analysis_based_on_airlines.airline, 
    immigration_analysis_based_on_airlines.fltno.alias('flight_number'),
    immigration_analysis_based_on_airlines.iata_code.alias('iata_code_origination_airport'),  
    immigration_analysis_based_on_airlines.name.alias('origination_airport_name'), 
    immigration_analysis_based_on_airlines.iso_country.alias('origination_airport_country'), 
    immigration_analysis_based_on_airlines.iso_region.alias('origination_airport_region'), 
    immigration_analysis_based_on_airlines.municipality.alias('origination_airport_municipality'), 
    immigration_analysis_based_on_airlines.destination_airport.alias('iata_code_destination_airport')
)

In [72]:
immigration_analysis_based_on_airlines.head(6)

[Row(immigration_id=52676, immigration_year=2016, immigration_month=4, citizenship='INDONESIA', country_of_residence='INDONESIA', port_of_entry='LOS', gender='M', arrival_date='2016-04-01', departure_date='2016-06-06', airline='CI', flight_number='00006', iata_code_origination_airport='GUA', origination_airport_name='La Aurora Airport', origination_airport_country='GT', origination_airport_region='GT-GU', origination_airport_municipality='Guatemala City', iata_code_destination_airport='LAX'),
 Row(immigration_id=52676, immigration_year=2016, immigration_month=4, citizenship='INDONESIA', country_of_residence='INDONESIA', port_of_entry='LOS', gender='M', arrival_date='2016-04-01', departure_date='2016-06-06', airline='CI', flight_number='00006', iata_code_origination_airport='TPE', origination_airport_name='Taiwan Taoyuan International Airport', origination_airport_country='TW', origination_airport_region='TW-TAO', origination_airport_municipality='Taipei', iata_code_destination_airport=

In [None]:
immigration_analysis_based_on_airlines.write.partitionBy('immigration_year', 'immigration_month').parquet(
    os.path.join(output_path, 'immigration_airlines/'), 
    mode='overwrite'
)

In [53]:
immigration_analysis_based_on_airlines.count()

218944973

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [78]:
# Perform quality checks here
# check that the dataset PK is unique
assert immigration_data_with_demographics.count() == immigration_data_with_demographics.select('immigration_id').distinct().count()
# check that essential fields like state code have valid values
assert immigration_data_with_demographics.filter(immigration_data_with_demographics.state_code.isNull()).count() == 0

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

| Column        | Purpose                              | Source         | 
|---------------|--------------------------------------|-----------------|
|immigration_id | unique identifier for the record, is a PK in the dataset|maps to cicid column in immigration_data |
|immigration_year| year in which I94 was issued  | maps to i94yr in immigration_data | 
|immigration_month| month in which I94 was issued  | maps to i94mon in immigration_data | 
|citizenship| Visitor's country of citizenship| maps to i94cit in immigration_data. It is transformed in a country name based on the countries mapping given in I94_SAS_Labels_Descriptions| 
|country_of_residence| Visitor's country of residence| maps to i94res in immigration_data. It is transformed in a country name based on the countries mapping given in I94_SAS_Labels_Descriptions | 
|port_of_entry| 3 character city code where the I94 is issued| maps to i94port in immigration_data | 
|address_on_i94| 2 character state code corresponding to the visitor's address in US during their stay| maps to i94addr in immigration_data | 
|age| visitor's age| maps to i94bir in immigration_data | 
|gender| visitor's gender| maps to gender in immigration_data | 
|state_code| 2 character state code for demographics | maps to state code in demographics data | 
|average_median_age| This is the median age in a state | obtained by grouping demographics data for a state and taking an average of median age | 
|total_male_population| Total male population in a state | obtained by grouping demographics data for a state and totaling the male population| 
|total_female_population| Total female population in a state | obtained by grouping demographics data for a state and totaling the female population| 
|total_population|Total population in a state | obtained by grouping demographics data for a state and totaling the population| 
|total_number_of_veterans|Total number of veterans in a state | obtained by grouping demographics data for a state and totaling the number of veterans column| 
|total_foreign_born|Total number of foreign born people in a state | obtained by grouping demographics data for a state and totaling the numbers in foreign_born | 
|average_household_size|Average Household size in a state | obtained by grouping demographics data for a state and taking an average of average_household_size |
|total_count_by_race|Total number of people listed by race in demographics | obtained by grouping demographics data for a state and totaling the count of people of different races | 

### Step 5: Complete Project Write Up
 
#### Rationale of Tools 
I used Spark for performing the ETL on the various datasets and I stored the resultant analytical tables on S3. This breakdown helped with separating the compute and storage. I could have used a Data Warehouse like Redshift to store the data but then that would have made scaling complex and costly. 

The decision to use Spark came from the need to process large datasets. Spark SQL makes it easy to run SQL like queries and operations on Dataframes. Spark works well with S3 as it can use cluster processing to pull partiotioned data from S3 folders. 

I decided to build my analytics database as a data lake on S3 because of the following reasons - 
- Scalability and Availability - S3 is infinitely scalable and provides continous availability. 
- Storage Data Costs  - S3 storage is very cheap and large amounts of data can be stored without substantial increase in costs.
- Scema on read - I can store data in flexible forms without confining the data into predefined tables. 
- Data on S3 is easily integrated with data warehouses and ML libraries. Storing the data on S3 does not prevent me from doing more powerful analytics or ML training as S3 is widely supported by libraries, frameworks and data warehouses. 

#### Data Update Frequency 
The data can be updated on a weekly basis. Monthly data sets provided as examples were a few 100 GBs in size. Doing a weekly ETL will keep the data fresh and the ETL processes will run quickly and the analytics data lakes will update frequently. If the analytics are needed on a more real time basis, the ETL pipeline can be run once daily. 

#### Scenarios 
##### Data increased by 100x 
As I have built the ETL in Spark, an increase in data can be easily handled by the size of the EMR cluster that will run the Spark job to perform ETL. 

##### Data populates a dashboard that must be updated on a daily basis by 7am every day 
For a more frequent requirement, the ETL process will need to run overnight. This ETL process can be modeled into a DAG of tasks in a workflow scheduler like Airflow.  The DAG can then be scheduled to run overnight. There can be two DAGs - 
- first one will populates the data lake in S3 and 
- the second DAG will trigger after the data population is completed. It will run an ELT pipeline on the data lake to load the data in a data warehouse like Amazon Redshift. There can be additional DAGs or tasks in the same DAG to transform the data to the format required for the dashboard. 
Both the DAGs will be scheduled to run on a daily frequency. 

##### The database needed to be accessed by 100+ people
If the number of users interested in analysing the data increases to 100+, it would be best to store the data in a highly distributed and scalable database like Cassandra. Dependending on the use cases of users, the data can be ELTed from the data lake to Cassandra in the form desired by users or the data can be stored directly on Cassandra. 