# US Immigration Data Warehouse
### Data Engineering Capstone Project

#### Project Summary
The purpose of this data engineering capstone project is to give students a chance to combine what they've learned throughout the program. This project will be an important part of learners portfolio that will help to achieve data engineering-related career goals. I choose to use dataset that Udacity provided which is related with US Immigration. The goal of this project is to create scalable data warehouse which can be used by Data Analyst or Data Scientist so they can create Insight in Jupyter notebook or implement Machine Learning algorithms.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# install additional library
import sys
!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install "notebook>=5.3" "ipywidgets>=7.5"
!{sys.executable} -m pip install plotly==4.5.0
!{sys.executable} -m pip install pyspark
!{sys.executable} -m pip install pyarrow



In [3]:
# Do all imports here
import pandas as pd

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import when, count, col, isnan, sum, max, min, expr, desc, isnull, round, lit, avg, first
from pyspark.sql.functions import datediff, year, month, dayofmonth, quarter, format_number, upper
from pyspark.sql.types import IntegerType
from pyspark.sql import SQLContext

import plotly.express as px
import plotly.offline as pyo

# Set notebook mode to work in offline
pyo.init_notebook_mode(connected=True)

In [4]:
# Create Spark Session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Change memory configuration
spark.conf.set("spark.executor.memory", '16g')

# Change format display
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc

#### Describe and Gather Data 
In the following sections we will describe each data set.

##### I94 Immigration Data

This data comes from US National Tourism and Trade Office. The data highlighting overseas visitor arrivals by country of residence, ports of entry, mode of transportation, type of visa, and more. The data is located in ```../../data/18-83510-I94-Data-2016/``` . In this project we not use entire dataset but we used only the month of April of 2016. In the workspace, we also have sample data of this dataset.

**Data Dictionary**

| Column Name  | Description                                                                           | Data Type   |
|--------------|---------------------------------------------------------------------------------------|-------------|
| CICD         | Unique ID (Serial Primary Key)                                                        | text        |
| I94YR        | Year (4 digits)                                                                       | integer     |
| I94MON       | Numeric Month                                                                         | integer     |
| I94CIT       | Born country (3 digits)                                                               | text        |
| I94RES       | Residence country (3 digits)                                                          | text        |
| I94PORT      | Port (3 digits)                                                                       | text        |
| ARRDATE      | Arrival Date (SAS date format)                                                        | text        |
| I94MODE      | Mode of Transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)                 | integer     |
| I94ADDR      | State of Arrival                                                                      | text        |
| DEPDATE      | Departure Date from the USA (SAS date format)                                         | text        |
| I94BIR       | Age of Respondent in Years                                                            | integer     |
| I94VISA      | Visa codes collapsed into three categories: (1 = Business; 2 = Pleasure; 3 = Student) | integer     |
| COUNT        | Used for summary statistics                                                           | integer     |
| DTADFILE     | Date added to I-94 Files                                                              | text        |
| VISAPOST     | Department of State where where Visa was issued                                       | text        |
| OCCUP        | Occupation that will be performed in U.S                                              | text        |
| ENTDEPA      | Arrival Flag - admitted or paroled into the U.S.                                      | text        |
| ENTDEPD      | Departure Flag - Departed, lost I-94 or is deceased                                   | text        |
| ENTDEPU      | Update Flag - Either apprehended, overstayed, adjusted to perm residence              | text        |
| MATFLAG      | Match flag - Match of arrival and departure records                                   | text        |
| BIRYEAR      | 4 digit year of birth                                                                 | integer     |
| DTADDTO      | Date to which admitted to U.S. (allowed to stay until)                                | text        |
| GENDER       | Non-immigrant sex                                                                     | text        |
| INSNUM       | INS number                                                                            | text        |
| AIRLINE      | Airline used to arrive in U.S.                                                        | text        |
| ADMNUM       | Admission Number                                                                      | double      |
| FLTNO        | Flight number of Airline used to arrive in U.S.                                       | text        |
| VISATYPE     | Class of admission legally admitting the non-immigrant to temporarily stay in U.S.    | text        |

In [7]:
# Ingest April 2016 data using Spark
df_immigration = spark.read.format('com.github.saurfang.sas.spark')\
                 .load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_immigration.write.mode('overwrite').parquet("sas_data")
df_immigration=spark.read.parquet("sas_data")
df_immigration.limit(5)

cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870030.0,11,B1
5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,1.0,20160430,SYD,,G,O,,M,1984.0,10292016,F,,VA,94955622830.0,7,B1
5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,M,,DL,94956406530.0,40,B1
5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,F,,DL,94956451430.0,40,B1
5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,1.0,20160430,SYD,,G,O,,M,1988.0,10292016,M,,DL,94956388130.0,40,B1


##### Global Land Temperatures By City Data

This data comes from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) dataset. They repackaged the data from Berkeley Earth, which is affiliated with Lawrence Berkeley National Labotary. The Berkeley Earth Surface Temperature Study combines 1.6 billion temperature reports from 16 pre-existing archives. Several files are available in Kaggle dataset but we will be using only the GlobalLandTemperaturesByCity. The data is located in ```../../data2/``` and there's just one file in that folder, called ```GlobalLandTemperaturesByCity.csv``` . 

**Data Dictionary**

| Column Name                   | Description                                    | Data Type     |
|-------------------------------|------------------------------------------------|---------------|
| dt                            | Date (YYYY-MM-DD)                              | date          |
| AverageTemperature            | Global average temperature in Celcius          | decimal       |
| AverageTemperatureUncertainty | The 95% confidence interval around the average | decimal       |
| City                          | City Name                                      | text          |
| Country                       | Country Name                                   | text          |
| Latitude                      | Latitude                                       | double        |
| Longitude                     | Longitude                                      | double        |

In [15]:
# Read sample data Global Temperature by City
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = spark.read.csv(fname, header=True, inferSchema=True)
df_temperature.limit(5)

dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
1743-11-01 00:00:00,6.068,1.737,Århus,Denmark,57.05N,10.33E
1743-12-01 00:00:00,,,Århus,Denmark,57.05N,10.33E
1744-01-01 00:00:00,,,Århus,Denmark,57.05N,10.33E
1744-02-01 00:00:00,,,Århus,Denmark,57.05N,10.33E
1744-03-01 00:00:00,,,Århus,Denmark,57.05N,10.33E


##### U.S. City Demographic Data

This [data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/) comes from the US Census Bureau's 2015 American Community Survey. This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. The data is located in the workspace.

**Data Dictionary**

| Column Name            | Description                                        | Data Type      |
|------------------------|----------------------------------------------------|----------------|
| City                   | City Name                                          | text           |
| State                  | USA State                                          | text           |
| Median Age             | The median age of population                       | decimal        |
| Male Population        | Number of Male Population                          | integer        |
| Female Population      | Number of Female Population                        | integer        |
| Total Population       | Number of Total Population                         | integer        |
| Number of Veterans     | Number of Veterans                                 | integer        |
| Foreign-born           | Number of residents that were not born in the city | integer        |
| Average Household Size | Average size of the house                          | decimal        |
| State Code             | Code of the state (2 digits)                       | text           |
| Race                   | Race                                               | text           |
| Count                  | Number of Race                                     | integer        |

In [8]:
# Read sample data U.S City Demographic
df_demographic = spark.read.csv("us-cities-demographics.csv", sep=";", header=True, inferSchema=True)
df_demographic.limit(5)

City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-...,24437
Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


##### Airport Code Data

This data comes from [here](https://datahub.io/core/airport-codes#data). The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code.

**Data Dictionary**

| Column Name  | Description                       | Data Type |
|--------------|-----------------------------------|-----------|
| ident        | Unique identifier                 | text      |
| type         | Type of the airport               | text      |
| name         | Airport Name                      | text      |
| elevation_ft | Altitude of the airport           | integer   |
| continent    | Continent                         | text      |
| iso_country  | ISO code of the country           | text      |
| iso_region   | ISO code of the region            | text      |
| municipality | City where the airport is located | text      |
| gps_code     | GPS Code                          | text      |
| iata_code    | IATA Code                         | text      |
| local_code   | Local Code                        | text      |
| coordinates  | GPS coordinates                   | text      |

In [9]:
# Read sample data Airport Code
df_airport = spark.read.csv("airport-codes_csv.csv", header=True, inferSchema=True)
df_airport.limit(5)

ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,-74.9336013793945...
00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.7..."
00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 5..."
00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,-86.7703018188476...
00AR,closed,Newport Hospital ...,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


### Step 2: Explore and Assess the Data
### Explore the Data 
We will explore each dataset and identify data quality issues, like missing values, duplicate data and other issues.

#### I94 Immigration Data

**EDA**

First we need to identify primary key from this data. From below statements, we can sure that ccid is the primary key as total unique ccid equal to total rows.

In [10]:
# count the total number of records
total_immigration = df_immigration.count()
total_ccid = df_immigration.select("cicid").distinct().count()
print("Total rows in immigration table: {}".format(total_immigration))
print("Total unique ccid in immigration table: {}".format(total_ccid))

Total rows in immigration table: 3096313
Total unique ccid in immigration table: 3096313


Some columns should have ```Integer``` type but Spark loaded them as ```Double```. We need to correct these columns.

In [11]:
# cast to IntegerType
cols = ['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94mode','i94bir','i94visa', 'biryear']

for column in cols:
    df_immigration = df_immigration.withColumn(column,round(df_immigration[column]).cast(IntegerType()))


After explore unique values of each column and analyze the use, we need to discard meaningless columns from this dataset

In [12]:
# columns that need to be removed
cols = ['count', 'dtadfile', 'entdepa','entdepd','matflag','dtaddto','admnum']

# drop these columns
df_immigration = df_immigration.drop(*cols)

If we check ```I94_SAS_Labels_Description.SAS``` file in the workshop, we could find other information that will help us to create dimension tables. First dimension table that we will create is **Country**. ```i94city``` and ```i94res``` will refer to this table.

In [13]:
# get first dimension: Country
df_country = spark.read.csv("Country_Name.csv", sep=",", header=True, inferSchema=True)
df_country.printSchema()
df_country.limit(5)

root
 |-- CountryCode: integer (nullable = true)
 |-- CountryName: string (nullable = true)



CountryCode,CountryName
582,"MEXICO Air Sea, a..."
236,AFGHANISTAN
101,ALBANIA
316,ALGERIA
102,ANDORRA


Next dimension table is **Port**. ```i94port``` will refer to this table.

In [26]:
# get next dimension: Port
df_port = spark.read.csv("Port_Name.csv", sep=",", header=True, inferSchema=True)

# rename the columns so it will align with Redshift schema
df_port = df_port.withColumnRenamed("PortCode","port_code") \
                 .withColumnRenamed("PortName","port_name") 

df_port.printSchema()
df_port.limit(5)

root
 |-- port_code: string (nullable = true)
 |-- port_name: string (nullable = true)



port_code,port_name
ALC,"ALCAN, AK"
ANC,"ANCHORAGE, AK"
BAR,BAKER AAF - BAKER...
DAC,"DALTONS CACHE, AK"
PIZ,DEW STATION PT LA...


As ```arrdate``` and ```depdate``` using SAS date format, we need to convert these to standard date time format in SQL.

In [27]:
# convert SAS date format to standard date format
df_immigration = df_immigration.withColumn('arrival_date', expr("date_add('1960-01-01', arrdate)"))
df_immigration = df_immigration.withColumn('departure_date', expr("date_add('1960-01-01', depdate)"))

We need to check if ```departure_date``` > ```arrival_date```. If not follow this rule, we need to drop the rows.

In [28]:
# remove rows where duration_stay < 0
# keep rows where duration_stay isnull as departure_date might be empty
df_immigration = df_immigration.withColumn('duration_stay', datediff(col('departure_date'),col('arrival_date')))
df_immigration = df_immigration.filter('duration_stay > 0 or duration_stay is null') 
df_immigration.count()

3095938

For best practice, it will be great to have dimension calendar table. Usually it will help us a lot if we create financial reports (create measurement Month To Date, Year To Date, etc). We will create this dimension table based on ```arrival_date``` and ```departure_date```.

In [30]:
# create dimension date, combine arrival and departure date
df_temp  = df_immigration.select(col("arrival_date").alias("date_deparr")).distinct()
df_temp2 = df_immigration.select(col("departure_date").alias("date_deparr")).distinct()
df_calendar = df_temp.union(df_temp2)
df_calendar = df_calendar.distinct()

# derived year, month, day from date
df_calendar = df_calendar.withColumn("year_deparr", year(col("date_deparr"))) \
.withColumn("month_deparr", month(col("date_deparr"))) \
.withColumn("day_deparr", dayofmonth(col("date_deparr"))) \
.withColumn("quarter_deparr", quarter(col("date_deparr")))

df_calendar.printSchema()
df_calendar.limit(5)

root
 |-- date_deparr: date (nullable = true)
 |-- year_deparr: integer (nullable = true)
 |-- month_deparr: integer (nullable = true)
 |-- day_deparr: integer (nullable = true)
 |-- quarter_deparr: integer (nullable = true)



date_deparr,year_deparr,month_deparr,day_deparr,quarter_deparr
2016-04-25,2016,4,25,2
2016-05-03,2016,5,3,2
2016-08-15,2016,8,15,3
2016-08-31,2016,8,31,3
2016-07-26,2016,7,26,3


In [46]:
# remove rows based on missing values
df_calendar = df_calendar.na.drop(subset=("date_deparr"))

# count null values
df_calendar.filter("date_deparr is null").count()

0

Next dimension table is **Mode of Transportation**. ```i94mode``` will refer to this table.

In [31]:
# create dimension table
df_mode_trans = spark.createDataFrame([(1, "Air"), (2, "Sea"), (3, "Land"), (9, "Not reported")], ("mode_code", "mode_name"))
df_mode_trans = df_mode_trans.withColumn("mode_code",col("mode_code").cast(IntegerType()))
df_mode_trans.printSchema()
df_mode_trans.limit(5)

root
 |-- mode_code: integer (nullable = true)
 |-- mode_name: string (nullable = true)



mode_code,mode_name
1,Air
2,Sea
3,Land
9,Not reported


Assume that null values in ```i94mode``` equal to 9 so we could replace this null values.

In [12]:
df_immigration = df_immigration.fillna({'i94mode':9})
df_immigration.select("i94mode").distinct().limit(5)

i94mode
1.0
3.0
2.0
9.0


Next dimension table is **State**. ```i94addr``` will refer to this table.

In [33]:
# get State description
df_state = spark.read.csv("State_Name.csv", sep=",", header=True, inferSchema=True)
df_state.printSchema()
df_state.limit(5)

root
 |-- StateCode: string (nullable = true)
 |-- StateName: string (nullable = true)



StateCode,StateName
AL,ALABAMA
AK,ALASKA
AZ,ARIZONA
AR,ARKANSAS
CA,CALIFORNIA


Replace invalid value of ```i94addr``` with value 99 (All Other Codes)

In [14]:
# collect valid code of State
list_state = df_state.select("StateCode").rdd.flatMap(lambda x: x).collect()

# replace invalid code with 99 (All Other Codes)
df_immigration = df_immigration.withColumn('i94addr', \
                   when(df_immigration.i94addr.isin(list_state),df_immigration['i94addr']).otherwise('99'))

Now we need to check whether there are invalid values of Age ( ```i94bir``` ).

In [15]:
df_immigration.where('i94bir is null or i94bir < 0').groupBy('i94bir').count().limit(5)

i94bir,count
,800
-3.0,1


We assume that these are invalid data and we need to remove these rows.

In [16]:
# drop rows where Age Is Null or Age < 0
df_immigration = df_immigration.filter('i94bir is not null and i94bir >= 0')

Next dimension table is **Visa Category**. ```i94visa``` will refer to this table.

In [41]:
# create dimension table
df_visa_cat = spark.createDataFrame([(1, "Business"), (2, "Pleasure"), (3, "Student")], ("visacat_code", "visa_category"))
df_visa_cat = df_visa_cat.withColumn("visacat_code",col("visacat_code").cast(IntegerType()))
df_visa_cat.printSchema()
df_visa_cat.limit(5)

root
 |-- visacat_code: integer (nullable = true)
 |-- visa_category: string (nullable = true)



visacat_code,visa_category
1,Business
2,Pleasure
3,Student


The last data that we need to check is ```gender```. We found many invalid gender: (null,'U','X')

In [18]:
df_immigration.groupBy('gender').count().limit(10)

gender,count
F,1302257
,414268
M,1376762
U,243
X,1607


As we lack information to identify these invalid gender, we replace these values with constant value = 'N/A'

In [19]:
# valid gender
list_gender = ['F', 'M']

# replace invalid code with N/A
df_immigration = df_immigration.withColumn('gender', \
                   when(df_immigration.gender.isin(list_gender),df_immigration['gender']).otherwise('N/A'))

In [20]:
df_immigration.groupBy('gender').count().limit(10)

gender,count
F,1302257
M,1376762
,416118


**Missing Values**

We need to identify % missing percentage for all columns. Assume we only accept % missing percentages below or equal to 30%. 
If more than 30% than we need to remove the column.
As we can see below, there are 4 columns that have missing values percentage more than 30% : visapost, occup, entdepu, insnum

In [21]:
# count missing value for each column
df_nan_imm = df_immigration.select([count(when(col(c).isNull(), c))\
            .alias(c) for c in df_immigration.columns]).toPandas()


pyarrow.open_stream is deprecated as of 0.17.0, please use pyarrow.ipc.open_stream instead.



In [22]:
# Calculate missing values percentage for all column
df_nan_imm_trans = df_nan_imm.transpose()
df_nan_imm_trans.reset_index(inplace=True)
df_nan_imm_trans.columns = ['columns','% Missing Values']
df_nan_imm_trans['% Missing Values'] = df_nan_imm_trans['% Missing Values'] * 100 / total_immigration

# As immigration data have a lot of columns that have % missing values below 2 percent
# we can restrict them out from the barchart
df_nan_imm_trans = df_nan_imm_trans[df_nan_imm_trans['% Missing Values'] >= 2]

# Visualize missing values percentage as a bar chart
fig = px.bar(df_nan_imm_trans, x="columns", y="% Missing Values", color='columns')
fig.show()

According our assumption, we need to remove columns with missing percentage more than 30%

In [23]:
# columns with over 30% missing values
cols = ['visapost', 'occup', 'entdepu','insnum']

# drop these columns
df_immigration = df_immigration.drop(*cols)

In [24]:
# display final schema
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = false)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (n

**Duplicate Data**

No duplicate data found in immigration data if we compare all rows

In [69]:
# Check duplicate data
df_rm_duplicate = df_immigration.distinct()
if (df_rm_duplicate.count() == df_immigration.count()):
    print("Duplicate data not exist")
else:
    print("Duplicate data exist")

Duplicate data not exist


#### Global Land Temperatures By City Data

**EDA**

The data is quite big (more than 8 million records) and we don't use them all. We need to remove these data with some assumptions.

In [16]:
# count the total number of records
total_temperature = df_temperature.count()
print(total_temperature)

8599212


In [17]:
# Get Minimum Date and Maximum Date
min_date, max_date = df_temperature.select(min("dt"), max("dt")).first()
print("Minimum date : {}".format(min_date))
print("Maximum date : {}".format(max_date))

Minimum date : 1743-11-01 00:00:00
Maximum date : 2013-09-01 00:00:00


We already focus on April 2016 data (immigration table) but we couldn't find 2016 temperature data here. For now we assume that average temperature from 2010 until 2013 could represent temperature in 2016 as the difference only 3-6 years. Because of this assumption, we need to remove data prior to 2010.

In [18]:
# Remove all dates prior to 2010
df_temperature = df_temperature.filter(df_temperature.dt >= '2010-01-01')

print("Total records from 2010 until 2013 : {}".format(df_temperature.count()))

Total records from 2010 until 2013 : 157950


Create new table contains average temperature, latitude and longitude (group by country). Later ```df_country``` could lookup the values from this table.

In [19]:
# Aggregate by Country
df_temperature = df_temperature.groupby(["Country"]).agg(avg("AverageTemperature").alias("AverageTemperature"), \
                                        first("Latitude").alias("Latitude"), \
                                        first("Longitude").alias("Longitude") \
                                       )

# uppercase Country so later could be joined with Country dimension table
df_temperature = df_temperature.withColumn('Country',upper(col('Country'))) \
                 .withColumn('AverageTemperature',round('AverageTemperature',2))
df_temperature.limit(5)

Country,AverageTemperature,Latitude,Longitude
CHAD,27.99,8.84N,15.41E
PARAGUAY,23.42,24.92S,58.52W
RUSSIA,4.78,53.84N,91.36E
YEMEN,26.9,13.66N,45.41E
SENEGAL,26.89,15.27N,17.50W


In [20]:
df_temperature.printSchema()

root
 |-- Country: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



Integrate ```df_country``` with ```df_temperature```

In [21]:
# Left Join df_country with df_temperature
df_country = df_country.join(df_temperature, df_country['CountryName']==df_temperature['Country'],how='left')

# drop these columns
cols = ['Country']
df_country = df_country.drop(*cols)

df_country.printSchema()
df_country.limit(5)

root
 |-- CountryCode: integer (nullable = true)
 |-- CountryName: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



CountryCode,CountryName,AverageTemperature,Latitude,Longitude
151,ARMENIA,10.31,40.99N,44.73E
512,BAHAMAS,25.44,24.92N,78.03W
739,INVALID: DRONNING...,,,
373,SOUTH AFRICA,16.95,26.52S,28.66E
914,No Country Code (...,,,


We need to rename the columns of ```df_country``` so it will align with Redshift schema

In [22]:
# rename columns align with Redshift table schema
df_country = df_country.withColumnRenamed("CountryCode","country_code") \
                       .withColumnRenamed("CountryName","country_name") \
                       .withColumnRenamed("AverageTemperature","avg_temperature") \
                       .withColumnRenamed("Latitude","latitude") \
                       .withColumnRenamed("Longitude","longitude")
df_country.printSchema()

root
 |-- country_code: integer (nullable = true)
 |-- country_name: string (nullable = true)
 |-- avg_temperature: double (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



**Missing Values**

We need to identify % missing percentage for all columns. Assume we only accept % missing percentages below or equal to 30%. Here we could see that missing values not exist in aggregated temperature table.

In [31]:
# count missing value for each column
total_temperature = df_temperature.count()
df_nan_temp = df_temperature.select([count(when(col(c).isNull(), c))\
            .alias(c) for c in df_temperature.columns]).toPandas()


pyarrow.open_stream is deprecated as of 0.17.0, please use pyarrow.ipc.open_stream instead.



In [32]:
# Calculate missing values percentage for all column
df_nan_temp_trans = df_nan_temp.transpose()
df_nan_temp_trans.reset_index(inplace=True)
df_nan_temp_trans.columns = ['columns','% Missing Values']
df_nan_temp_trans['% Missing Values'] = df_nan_temp_trans['% Missing Values'] * 100 / total_temperature

# Visualize missing values percentage as a bar chart
fig2 = px.bar(df_nan_temp_trans, x="columns", y="% Missing Values", color='columns')
fig2.show()

**Duplicate Data**

No duplicate data found in global temperature data

In [14]:
# Check duplicate data
df_rm_duplicate = df_temperature.select("Country").distinct()
if (df_rm_duplicate.count() == df_temperature.count()):
    print("Duplicate data not exist")
else:
    print("Duplicate data exist")

Duplicate data not exist


#### U.S. City Demographic Data


**Missing Values**

We need to identify % missing percentage for all columns. Assume we only accept % missing percentages below or equal to 30%. 
If more than 30% than we need to remove the column. Here we could see that % missing percentage are very small. We could drop these rows.

In [34]:
# count missing value for each column
total_demographic = df_demographic.count()
df_nan_dmgr = df_demographic.select([count(when(col(c).isNull(), c))\
            .alias(c) for c in df_demographic.columns]).toPandas()
print("Total demographic rows: {}".format(total_demographic))


pyarrow.open_stream is deprecated as of 0.17.0, please use pyarrow.ipc.open_stream instead.



Total demographic rows: 2891


In [35]:
# Calculate missing values percentage for all column
df_nan_dmgr_trans = df_nan_dmgr.transpose()
df_nan_dmgr_trans.reset_index(inplace=True)
df_nan_dmgr_trans.columns = ['columns','% Missing Values']
df_nan_dmgr_trans['% Missing Values'] = 100 * (df_nan_dmgr_trans['% Missing Values'] / total_demographic)

# Visualize missing values percentage as a bar chart
fig3 = px.bar(df_nan_dmgr_trans, x="columns", y="% Missing Values", color='columns', range_y=(0,100))
fig3.show()

In [36]:
# remove rows based on missing values
df_demographic = df_demographic.na.drop(subset=("Male Population","Female Population","Number of Veterans","Foreign-born","Average Household Size"))
print("Total records after drop rows : {}".format(df_demographic.count()))

Total records after drop rows : 2875


**Duplicated data**

If we sample some data below, we could see that for one ```City``` and one ```State```, there are a lot of duplicated information. The difference only ```Race``` and ```Count```. It will be better if we pivot column ```Race``` so it will be aggregated into one row.

In [37]:
df_demographic.filter("State = 'Maryland' and City='Waldorf'").limit(20)

City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
Waldorf,Maryland,33.6,35640,39872,75512,6932,5954,2.69,MD,Hispanic or Latino,4810
Waldorf,Maryland,33.6,35640,39872,75512,6932,5954,2.69,MD,Asian,4100
Waldorf,Maryland,33.6,35640,39872,75512,6932,5954,2.69,MD,White,26788
Waldorf,Maryland,33.6,35640,39872,75512,6932,5954,2.69,MD,Black or African-...,47334
Waldorf,Maryland,33.6,35640,39872,75512,6932,5954,2.69,MD,American Indian a...,1918


In [38]:
# pivot operation
groupcol = ('City', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', \
  'Foreign-born', 'Average Household Size', 'State Code')
aggrcol = sum('Count')

df_demographic = df_demographic.groupBy(*groupcol).pivot("Race").agg(aggrcol)
df_demographic.filter("State = 'Maryland' and City='Waldorf'").limit(20)

City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
Waldorf,Maryland,33.6,35640,39872,75512,6932,5954,2.69,MD,1918,4100,47334,4810,26788


We want to integrate this data with ```df_state``` so we need to aggregate this data (group by ```State```) and remove ```City```

In [39]:
# group by State
df_demographic = df_demographic.groupBy("State Code","State").agg( \
    avg("Median Age").alias("avg_medianage"),
    sum("Male Population").alias("total_male"),
    sum("Female Population").alias("total_female"),
    sum("Total Population").alias("total_population"),
    sum("Number of Veterans").alias("total_veteran"),
    sum("Foreign-born").alias("total_foreignborn"),
    sum("American Indian and Alaska Native").alias("total_americannative"),
    sum("Asian").alias("total_asian"),
    sum("Black or African-American").alias("total_african"),
    sum("Hispanic or Latino").alias("total_hispanic"),
    sum("White").alias("total_white"),
    min("Average Household Size").alias("min_avghousesize"),
    max("Average Household Size").alias("max_avghousesize")                                                         
)

# upper case and rounding
df_demographic = df_demographic.withColumn('State',upper(col('State'))) \
                     .withColumn('avg_medianage',round('avg_medianage',2))

df_demographic.printSchema()
df_demographic.limit(5)

root
 |-- State Code: string (nullable = true)
 |-- State: string (nullable = true)
 |-- avg_medianage: double (nullable = true)
 |-- total_male: long (nullable = true)
 |-- total_female: long (nullable = true)
 |-- total_population: long (nullable = true)
 |-- total_veteran: long (nullable = true)
 |-- total_foreignborn: long (nullable = true)
 |-- total_americannative: long (nullable = true)
 |-- total_asian: long (nullable = true)
 |-- total_african: long (nullable = true)
 |-- total_hispanic: long (nullable = true)
 |-- total_white: long (nullable = true)
 |-- min_avghousesize: double (nullable = true)
 |-- max_avghousesize: double (nullable = true)



State Code,State,avg_medianage,total_male,total_female,total_population,total_veteran,total_foreignborn,total_americannative,total_asian,total_african,total_hispanic,total_white,min_avghousesize,max_avghousesize
MT,MONTANA,35.5,87707,93587,181294,13854,5977,9684,4165,3349,10000,169026,2.15,2.4
NC,NORTH CAROLINA,33.79,1466105,1594094,3060199,166146,379327,35209,178740,1029446,354409,1790136,2.18,2.72
MD,MARYLAND,36.37,627951,684178,1312129,64143,229794,16155,128839,573768,138644,594522,2.48,2.95
CO,COLORADO,35.82,1454619,1481050,2935669,187896,337631,62613,148790,208043,703722,2463916,2.24,2.97
CT,CONNECTICUT,34.96,432157,453424,885581,24953,225866,10729,48311,231822,309992,505674,2.48,2.86


Integrate with ```df_state```

In [40]:
# Left Join df_state with df_demographic
df_state = df_state.join(df_demographic,df_state['StateCode'] == df_demographic['State Code'],how='left')

# drop these columns
cols = ['State Code', 'State']
df_state = df_state.drop(*cols)

# rename column so it will align with Redshift schema
df_state = df_state.withColumnRenamed("StateCode","state_code") \
                   .withColumnRenamed("StateName","state_name")

df_state.printSchema()
df_state.limit(5)

root
 |-- state_code: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- avg_medianage: double (nullable = true)
 |-- total_male: long (nullable = true)
 |-- total_female: long (nullable = true)
 |-- total_population: long (nullable = true)
 |-- total_veteran: long (nullable = true)
 |-- total_foreignborn: long (nullable = true)
 |-- total_americannative: long (nullable = true)
 |-- total_asian: long (nullable = true)
 |-- total_african: long (nullable = true)
 |-- total_hispanic: long (nullable = true)
 |-- total_white: long (nullable = true)
 |-- min_avghousesize: double (nullable = true)
 |-- max_avghousesize: double (nullable = true)



state_code,state_name,avg_medianage,total_male,total_female,total_population,total_veteran,total_foreignborn,total_americannative,total_asian,total_african,total_hispanic,total_white,min_avghousesize,max_avghousesize
AL,ALABAMA,36.23,497248,552381,1049629,71543,52154,8084,28769,521068,39313,498920,2.18,2.67
AK,ALASKA,32.2,152945,145750,298695,27492,33258,36339,36825,23107,27261,212696,2.77,2.77
AZ,ARIZONA,35.04,2227455,2272087,4499542,264505,682313,129708,229183,296222,1508157,3591611,2.17,3.44
AR,ARKANSAS,32.77,286479,303400,589879,31704,62108,9381,22062,149608,77813,384733,2.28,3.04
CA,CALIFORNIA,36.18,12278281,12544179,24822460,928270,7448257,401386,4543730,2047009,9856464,14905129,2.0,4.78


#### Airport Data

**EDA**

As our fact table will focus on US Immigration data than we should only include US airport data. We should exclude other countries airport data.

In [71]:
# include US airport data only
df_airport = df_airport.filter(df_airport.iso_country == 'US')
df_airport.count()

22757

When we check airport type below, we assume that immigrant not enter USA through balloonport, closed, heliport, seaplane_base and small_airport.

In [72]:
df_airport.select("type").distinct().show()

+--------------+
|          type|
+--------------+
| large_airport|
|   balloonport|
| seaplane_base|
|      heliport|
|        closed|
|medium_airport|
| small_airport|
+--------------+



In [74]:
# Exclude balloonport, seaplane_base, heliport,closed and small airport
df_airport = df_airport.filter(col('type').isin(['large_airport','medium_airport']))
df_airport.limit(5)

ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
5A8,medium_airport,Aleknagik / New A...,66,,US,US-AK,Aleknagik,5A8,WKK,5A8,"-158.617996216, 5..."
K79J,medium_airport,South Alabama Reg...,310,,US,US-AL,Andalusia/Opp,K79J,,79J,"-86.393799, 31.3088"
KABE,medium_airport,Lehigh Valley Int...,393,,US,US-PA,Allentown,KABE,ABE,ABE,-75.4408035278320...
KABI,medium_airport,Abilene Regional ...,1791,,US,US-TX,Abilene,KABI,ABI,ABI,-99.6819000244000...
KABQ,large_airport,Albuquerque Inter...,5355,,US,US-NM,Albuquerque,KABQ,ABQ,ABQ,"-106.609001, 35.0..."


When we try to integrate this data with fact table ```df_immigration```, we found that some airport code (either ```ident```, ```gps_code```,```iata_code```,```local_code```) not match with ```df_immigration-i94port```. Please check the sample below (LAX not match with LOS). At the end, we conclude that we won't use this data in our model.

In [81]:
# Airport: Los Angeles
df_airport.filter("name like '%Los Angeles%'").limit(5)

ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
KLAX,large_airport,Los Angeles Inter...,125,,US,US-CA,Los Angeles,KLAX,LAX,LAX,"-118.4079971, 33...."


In [83]:
# Port: Los Angeles
df_port.filter("PortName like '%LOS ANGELES%'").limit(5)

PortCode,PortName
LOS,"LOS ANGELES, CA"


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model


![Star Schema](images/Star_Schema.png)

We are using star schema as it will improve the performance of query that will be executed by data analyst or data science. The immigration fact table is the center of the data model. We need to create dimension tables to support this fact table. Dimension table will contain text description and other information. We need to make sure that there will be no many to many relationship between fact table and dimension table (it's always one to many). That's why I aggregate demographic table by State. If we keep City information in demographic than it will become many to many relationship. Having this relationships in a dimensional model causes several difficult issues, such as losing the simplicity of the star schema structure, increasing complexity in forming queries, and degrading query performance by adding more joins

#### 3.2 Mapping Out Data Pipelines
*List the steps necessary to pipeline the data into the chosen data model*

To accomplish all the tasks related to the transformation of the datasets it was developed by ```cleansing_data.py``` (the script is located in ```/home/workspace/airflow/dags/scripts```). We also using Airflow to manage the flow of the whole process. The script is ```etl_capstone.py``` which is located in ```/home/workspace/airflow/dags``` 

Here is the summary of the process:

- All data source (CSV files and SAS file) will be stored in S3 using custom Airflow operator ```LoadFileIntoS3Operator```
- Create new EMR cluster using Airflow Operator ```EmrCreateJobFlowOperator```
- Move all raw data from S3 into EMR HDFS
- Run the pyspark script ```cleansing_data.py``` in this new EMR cluster:
  - Get Country information through function ```getCountry(input_file)```
  - Get Port information through function ```getPort(input_file)```
  - Get State information through function ```getState(input_file)```
  - Get Mode of Transportation through function ```getModeTrans()```
  - Get Visa Category through function ```getVisaCat()```
  - Get Temperature through function ```getTemperature(input_file)```
  - Get Demographic through function ```getDemographic(input_file)```
  - Get Immigration through function ```getImmigration(input_file)```
  - Get Calendar through function ```getCalendar(immigration_table)```
  - Join Country with Temperature, the final result become Country dimension table
  - Join State with Demographic, the final result become State dimension table
  - Store fact table and all dimension tables into parquet files (saved into HDFS)
- Move all parquet files from EMR HDFS into S3 (clean data)
- Terminate EMR cluster using Airflow operator ```EmrTerminateJobFlowOperator```
- Move all parquet files from S3 into Redshift using custom Airflow operator ```ParquetToRedshiftOperator``` (make sure that the schema already created in Redshift using sql statements in ```home/workspace/airflow/create_table_capstone.sql```)
- Check data quality in Redshift using custom Airflow operator ```DataQualityOperator```

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
*Build the data pipelines to create the data model.*

Dataset and pyspark script will be loaded into S3, then these will be uploaded into EMR. We use spark in EMR to load, extracted, transform and store the provided datasets into the AWS S3 staging area. We extract data from S3 and load them into tables of the same name in Amazon Redshift. As a final step we check the data quality to ensure no null values in primary key.

![AWS Diagram](images/AWS_Diagram.png)

Below we show the pipeline we developed using Apache Airflow.

![Airflow Diagram](images/Airflow_Diagram.png)

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

Data quality check in here is to make sure that no null values for all primary keys in Redshift table

![Data_Quality](images/Data_Quality.png)

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up


**Clearly state the rationale for the choice of tools and technologies for the project.**

We are using cloud computing technology because it provides a low-cost, scalable, and highly reliable infrastructure platform.

Why we use the following services:

**Airflow**: We use Airflow as our data pipeline orchestrator in order to easily orchestrating and monitoring data processes. Airflow provides many built-in operators that help us process data in S3, EMR and Redshift.

**S3**: Provides an optimal foundation for a data lake because of its virtually unlimited scalability. We can seamlessly and nondisruptively increase storage from gigabytes to petabytes of content, paying only for what we use. Amazon S3 is designed to provide 99.999999999% durability.

**Spark**: As we need to process a lot of data (around 3 million records), we need to use this big data framework. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance.

**EMR**: Maximize spark process on this flexible big data platform. We could set up EMR, use the engine and terminate it programmaticaly through python script. It is very flexible and cheap.

**Redshift**: Redshift provides a massively parallel, column-oriented data warehouse that provides easy-scale functionality.




**Propose how often the data should be updated and why.**

We should update the data daily (once per day) as data source usually changed every day.

**Write a description of how you would approach the problem differently under the following scenarios:**

- The data was increased by 100x

There will be no issue if we use cloud technology. 

The total volume of data and number of objects we can store in S3 are unlimited. Individual Amazon S3 objects can range in size from a minimum of 0 bytes to a maximum of 5 terabytes. We can enable EMR managed scaling or use automatic scaling with a custom policy to handle increment of data processing. With Redshift, we can resize our Amazon Redshift clusters by adding more nodes or changing node types in just a few hours.

- The data populates a dashboard that must be updated on a daily basis by 7am every day

We can activate Service Level Agreement (SLA) feature in Airflow. It provides the functionality of sending emails in the event a task exceeds its expected time frame from the start of the DAG execution. 

- The database needed to be accessed by 100+ people

Amazon S3’s massive scale enables us to spread load evenly, so that no individual application is affected by traffic spikes.

In Redshift, elastic resize lets us quickly increase or decrease the number of compute nodes, doubling or halving the original cluster’s node count, or even change the node type. We can expand the cluster to provide additional processing power to accommodate an expected increase in workload, such as Black Friday for internet shopping, or a championship game for a team’s web business