### Data Engineering Capstone Project

#### Project Summary

The objective of this project was to create an ETL pipeline for I94 immigration, global land temperatures, US demographics and aiport datasets to form an analytics database on immigration events. This analytics database could be useful for usecases such as studying the effects of temperature on the volume of travellers, how seasonality affects travel, the differences between people travelling from different climates, etc. 

Following are the datasets being used to create the ETL pipeline:

[I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html): This data comes from the US National Tourism and Trade Office and includes the contents of the i94 form on entry to the united states. A data dictionary is included in the workspace.

[World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data): This dataset comes from Kaggle and includes the temperatures of various cities in the world from 1743 to 2013.

[U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export): This data comes from OpenSoft. It contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000 and comes from the US Census Bureau's 2015 American Community Survey.

[Airport Codes](https://datahub.io/core/airport-codes#data): This is a simple table of airport codes and corresponding cities.

The project follows the follow steps:
* Scope the Project and Gather Data
* Explore and Assess the Data
* Define the Data Model
* Run ETL to Model the Data
* Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import re
from datetime import timedelta, datetime

from pyspark.python.pyspark.shell import spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, year, month, avg, round
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql import functions as F
from pyspark.sql.functions import lit

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.3
      /_/

Using Python version 3.6.3 (default, Dec  9 2017 04:28:46)
SparkSession available as 'spark'.


In [2]:
# Create Spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Scope the Project and Gather Data
The scope of this project is to create an ETL pipeline from four different data sources, create fact and dimensional tables, and be able to do different sorts of analysis on US immigration using factors of city, temperature, demographics. 

In [3]:
# Read airport data
airport_fname = "airport-codes_csv.csv"
airport_df = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(airport_fname)

In [4]:
# Read immigration data
i94_fname = "immigration_data_sample.csv"
i94_df = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(i94_fname)

In [5]:
# Read temperature data
temperature_fname = "GlobalLandTemperaturesByCity.csv"
temperature_df = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(temperature_fname)

In [6]:
# Read demographics data
demographics_fname = "us-cities-demographics.csv"
demographics_df = spark.read.format("csv").option("delimiter", ";").option("header", "true").load(demographics_fname)

In [7]:
	
#from pyspark.sql import SparkSession
#spark = SparkSession.builder.\
#config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
#.enableHiveSupport().getOrCreate()
#df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [8]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

### Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps


### Airports Data

In [9]:
# Explore aiport data
airport_df.count()

55075

In [10]:
airport_df.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [11]:
# Clean airport data
# Extract only US data
cleaned_airport_df = airport_df.withColumn("iso_country", F.when(F.col("iso_country")=='US','US').otherwise("Other"))
# Remove all other countries data
cleaned_airport_df = cleaned_airport_df.filter(cleaned_airport_df.iso_country != 'Other')

# Remove any missing values
cleaned_airport_df = cleaned_airport_df.dropna(how="any", subset=["name","type","iso_country"])

staging_airport_df = cleaned_airport_df.select(col("name").alias("airport_name"),
                                              col("type").alias("type"),
                                              col("iso_country").alias("country"),
                                              col("iso_region").alias("state"),
                                              col("gps_code").alias("gps_code"),
                                              col("local_code").alias("local_code"),
                                              col("coordinates").alias("coordinates")).drop_duplicates()

In [12]:
staging_airport_df.count()

22755

In [13]:
staging_airport_df.limit(5).toPandas()

Unnamed: 0,airport_name,type,country,state,gps_code,local_code,coordinates
0,Tampa General Hospital Brandon Healthplex Heli...,heliport,US,US-FL,04FD,04FD,"-82.336981, 27.929372"
1,Alum Ridge STOLport,small_airport,US,US-VA,09VA,09VA,"-80.493896484375, 36.97650146484375"
2,Walters Airport,small_airport,US,US-MD,0MD6,0MD6,"-77.10579681396484, 39.38119888305664"
3,Sooter Airport,small_airport,US,US-KS,12KS,12KS,"-97.48780059814453, 37.935298919677734"
4,Hock Airport,small_airport,US,US-NE,13NE,13NE,"-100.80899810791016, 40.31669998168945"


In [14]:
staging_airport_df.printSchema()

root
 |-- airport_name: string (nullable = true)
 |-- type: string (nullable = true)
 |-- country: string (nullable = false)
 |-- state: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



### Immigration Data

In [15]:
# Explore immigration data
i94_df.count()

1000

In [16]:
i94_df.limit(5).toPandas()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582674633.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94361995930.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780468433.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789696030.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322572633.0,LAND,WT


In [17]:
# Create list of valid ports
i94_sas_label_descriptions_fname = "I94_SAS_Labels_Descriptions.SAS"
with open(i94_sas_label_descriptions_fname) as f:
    lines = f.readlines()

re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_ports = {}
for line in lines[302:961]:
    results = re_compiled.search(line)
    valid_ports[results.group(1)] = results.group(2)
print(len(valid_ports))
#print(valid_ports)

659


In [18]:
# Create list of valid states
valid_states = demographics_df.toPandas()["State Code"].unique()
print(len(valid_states))
#print(valid_states)

49


In [19]:
# Create udf to convert SAS date to PySpark date 
# @udf(StringType())
# def convert_datetime(x):
#     print(x)
#     if x:
#         return (datetime.datetime(1960, 1, 1).date() + datetime.timedelta(x)).isoformat()
#     return None

def convert_to_integer(df, cols):
    """
    Convert the column to integer
    Args:
        df (:obj:`SparkDataFrame`): Spark dataframe to be processed.
        cols (:obj:`list`): List of column names that should be converted to integer
    Returns:
        df (:obj:`SparkDataFrame`): Processed Spark dataframe
    """

    for col in [col for col in cols if col in df.columns]:
        df = df.withColumn(col, df[col].cast("integer"))
    return df


def convert_to_date(df, cols):
    """
    Convert the columns to date
    Args:
        df (:obj:`SparkDataFrame`): Spark dataframe to be processed.
        cols (:obj:`list`): List of column names that should be converted to date
    Returns:
        df (:obj:`SparkDataFrame`): Processed Spark dataframe
    """

    for col in [col for col in cols if col in df.columns]:
        df = df.withColumn(col, CONVERT_SAS_UDF(df[col]))
    return df


# Converts SAS dates into string dates in the format YYYY-MM-DD
CONVERT_SAS_UDF = udf(lambda x: x if x is None else (
    timedelta(days=x) + datetime(1960, 1, 1)).strftime("%Y-%m-%d"))

In [20]:
# Create udf to validate state
@udf(StringType())
def validate_state(x):  
    if x in valid_states:
        return x
    return 'other'


In [21]:
i94_df.count()

1000

In [22]:
# Clean immigration data
# Remove any missing values for following columns
cleaned_i94_df = i94_df.dropna(how="any", subset=["i94port", "i94addr", "gender"])

In [23]:
cleaned_i94_df.limit(5).toPandas()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582674633.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94361995930.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780468433.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789696030.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322572633.0,LAND,WT


In [24]:
cleaned_i94_df.count()

814

In [25]:
# Extract valid states
cleaned_i94_df = cleaned_i94_df.withColumn("i94addr", validate_state(cleaned_i94_df.i94addr))

# only keep US related immigration data
cleaned_i94_df = cleaned_i94_df.filter(cleaned_i94_df.i94addr != 'other')

# Convert arrival_date (SAS format) to PySpark format
# cleaned_i94_df = cleaned_i94_df.withColumn("arrdate", convert_datetime(cleaned_i94_df.arrdate).cast("string"))
date_cols = ['arrdate']
int_cols = ['arrdate']
cleaned_i94_df = convert_to_integer(cleaned_i94_df, int_cols)
cleaned_i94_df = convert_to_date(cleaned_i94_df, date_cols)

In [26]:
# cleaned_i94_df.limit(5).toPandas()
staging_i94_df = cleaned_i94_df.select(col("cicid").alias("id"),
                                       col("arrdate").alias("date"),
                                       col("i94port").alias("city_code"),
                                       col("i94addr").alias("state_code"),
                                       col("i94bir").alias("age"),
                                       col("gender").alias("gender"),
                                       col("visatype").alias("visa_type"),
                                       "count").drop_duplicates()
staging_i94_df.limit(2).toPandas()

Unnamed: 0,id,date,city_code,state_code,age,gender,visa_type,count
0,5418580.0,2016-04-29,SFR,CA,44.0,F,B2,1.0
1,3814096.0,2016-04-20,MIA,FL,29.0,M,B2,1.0


In [27]:
staging_i94_df.count()

779

In [28]:
staging_i94_df.limit(5).toPandas()

Unnamed: 0,id,date,city_code,state_code,age,gender,visa_type,count
0,5418580.0,2016-04-29,SFR,CA,44.0,F,B2,1.0
1,3814096.0,2016-04-20,MIA,FL,29.0,M,B2,1.0
2,3554693.0,2016-04-19,BLA,WA,24.0,F,B2,1.0
3,4700630.0,2016-04-25,NYC,NY,29.0,F,WT,1.0
4,5135218.0,2016-04-27,MIA,CA,50.0,F,B2,1.0


In [29]:
staging_i94_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- count: string (nullable = true)



### Temperature Data

In [30]:
# Explore temperature data
temperature_df.count()

8599212

In [31]:
temperature_df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [32]:
# Create udf to map city full name to city port abbreviation
@udf(StringType())
def city_to_port(city):
    for key in valid_ports:
        if city.lower() in valid_ports[key].lower():
            return key

In [33]:
# Clean temperature data
# Using temperatures from United States
# Map full name to city port abbreviation
# Remove invalid ports
cleaned_temperature_df = temperature_df.filter(temperature_df["Country"] == "United States") \
    .withColumn("year", year(temperature_df['dt'])) \
    .withColumn("month", month(temperature_df["dt"])) \
    .withColumn("i94port", city_to_port(temperature_df["City"])) \
    .withColumn("AverageTemperature", col("AverageTemperature").cast("float")) \
    .dropna(how='any', subset=["i94port"])

# Only use temperatures from 2013 (as it is the last year in the dataset)
cleaned_temperature_df = cleaned_temperature_df.filter(cleaned_temperature_df["year"] == 2013)

staging_temperature_df = cleaned_temperature_df.select(col("year"), col("month"), col("i94port").alias("city_code"),
                                         round(col("AverageTemperature"), 1).alias("avg_temperature"),
                                         col("Latitude").alias("lat"), col("Longitude").alias("long")).drop_duplicates()

In [34]:
staging_temperature_df.count()

1044

In [35]:
staging_temperature_df.limit(5).toPandas()

Unnamed: 0,year,month,city_code,avg_temperature,lat,long
0,2013,4,COL,16.9,32.95N,85.21W
1,2013,1,DAB,0.5,39.38N,83.24W
2,2013,1,ONT,6.8,34.56N,116.76W
3,2013,2,POM,5.8,45.81N,123.46W
4,2013,5,PRO,14.3,42.59N,72.00W


In [36]:
staging_temperature_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- city_code: string (nullable = true)
 |-- avg_temperature: float (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)



### Demographics Data

In [37]:
# Explore temperature data
demographics_df.count()

2891

In [38]:
demographics_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [39]:
# Clean demographics data
# Calculate percentages of numeric columns and create new ones
cleaned_demographics_df = demographics_df.withColumn("median_age", demographics_df['Median Age']) \
    .withColumn("pct_male_pop", (demographics_df['Male Population'] / demographics_df['Total Population']) * 100) \
    .withColumn("pct_female_pop", (demographics_df['Female Population'] / demographics_df['Total Population']) * 100) \
    .withColumn("pct_veterans", (demographics_df['Number of Veterans'] / demographics_df['Total Population']) * 100) \
    .withColumn("pct_foreign_born", (demographics_df['Foreign-born'] / demographics_df['Total Population']) * 100) \
    .withColumn("pct_race", (demographics_df['Count'] / demographics_df['Total Population']) * 100) \
    .withColumn("city_code", city_to_port(demographics_df["City"])) \
    .dropna(how='any', subset=["city_code"])

cleaned_demographics_df = cleaned_demographics_df.select(col("City").alias("city_name"), col("State Code").alias("state_code"), 
                         "median_age", "pct_male_pop", "pct_female_pop","pct_veterans", 
                         "pct_foreign_born", col("Total Population").alias("total_pop"), 
                         col("Race").alias("race"), "pct_race").drop_duplicates()

In [40]:
cleaned_demographics_df.count()

883

In [41]:
cleaned_demographics_df.limit(5).toPandas()

Unnamed: 0,city_name,state_code,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,total_pop,race,pct_race
0,Reno,NV,35.7,50.650464,49.349536,7.077861,15.35352,241443,American Indian and Alaska Native,1.603691
1,Springfield,IL,38.8,47.22814,52.77186,6.387458,3.619418,117809,Hispanic or Latino,2.324101
2,Columbia,MD,37.9,50.452801,49.547199,6.307325,22.469966,103467,White,56.388027
3,Newark,NJ,34.6,48.965461,51.034539,2.067659,30.595609,281913,Asian,2.606833
4,Tampa,FL,35.3,47.561974,52.438026,5.591988,15.932395,369028,White,69.791994


In [42]:
# Pivot the race column
pivot_demographics_df = cleaned_demographics_df.groupBy("city_name", "state_code", "median_age", "pct_male_pop",
                                        "pct_female_pop","pct_veterans", "pct_foreign_born", "total_pop").pivot("Race").avg("pct_race")

pivot_demographics_df = pivot_demographics_df.withColumn("city_code", city_to_port(pivot_demographics_df["city_name"])) \
    .dropna(how='any', subset=["city_code"])

staging_demographics_df = pivot_demographics_df.select("city_code", "state_code", "city_name", "median_age",
                                    round(col("pct_male_pop"), 1).alias("pct_male_pop"),
                                    round(col("pct_female_pop"), 1).alias("pct_female_pop"),
                                    round(col("pct_veterans"), 1).alias("pct_veterans"),
                                    round(col("pct_veterans"), 1).alias("pct_foreign_born"),
                                    round(col("American Indian and Alaska Native"), 1).alias("pct_native_american"),
                                    round(col("Asian"), 1).alias("pct_asian"),
                                    round(col("Black or African-American"), 1).alias("pct_black"),
                                    round(col("Hispanic or Latino"), 1).alias("pct_hispanic_or_latino"),
                                    round(col("White"), 1).alias("pct_white"), "total_pop")

In [43]:
staging_demographics_df.count()

180

In [44]:
staging_demographics_df.limit(5).toPandas()

Unnamed: 0,city_code,state_code,city_name,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,pct_native_american,pct_asian,pct_black,pct_hispanic_or_latino,pct_white,total_pop
0,TUC,AZ,Tucson,33.6,49.8,50.2,7.2,7.2,4.6,4.6,6.4,43.5,76.1,531674
1,MCA,TX,Allen,37.2,52.3,47.7,3.6,3.6,0.2,16.1,13.4,10.8,71.2,98138
2,CRP,TX,Corpus Christi,35.0,49.5,50.5,7.7,7.7,0.9,2.8,4.6,61.9,90.3,324082
3,FMY,FL,Fort Myers,37.3,49.8,50.2,5.8,5.8,,4.8,23.4,24.1,67.8,74015
4,ORL,FL,Orlando,33.1,48.3,51.7,4.7,4.7,0.9,4.1,25.1,33.0,66.1,270917


In [45]:
staging_demographics_df.printSchema()

root
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- pct_male_pop: double (nullable = true)
 |-- pct_female_pop: double (nullable = true)
 |-- pct_veterans: double (nullable = true)
 |-- pct_foreign_born: double (nullable = true)
 |-- pct_native_american: double (nullable = true)
 |-- pct_asian: double (nullable = true)
 |-- pct_black: double (nullable = true)
 |-- pct_hispanic_or_latino: double (nullable = true)
 |-- pct_white: double (nullable = true)
 |-- total_pop: string (nullable = true)



###  Define the Data Model
####  Conceptual Data Model

The data model to be used for this ETL is the star schema. It is simple, effective, will process quickly, and the joins will be simpler. 

Following are the staging and final tables used in the schema.

#### Staging Tables:
| table name   |  columns             |
| ------------ | ----------------    | 
| airports     | airport_name, type, country, state, gps_code, local_code, coordinates |
| immigration  | id, date, city_code, state_code, age, gender, visa_type, count |
| temperature  | year, month, city_code, avg_temperature, lat, long | 
| demographics | city_code, state_code, city_name, median_age, pcs_male_pop, pct_female_pop, pct_veterans, pct_foreign_born, pct_native_american, pct_asian, pct_black, pct_hispanic_or_latino, pct_white, total_pop |

#### Final Tables:
| table name | columns | type |
|-------------|----------|-----------|
| immigration_df | id, state_code, city_code, date, count | fact|
| immigrant_df | id, gender, age, visa_type | dimensional |
| city_stat_df | city_code, state_code, city_name, median_age, pct_male_pop, pct_female_pop, pct_veterans, pct_foreign_born, pct_native_american, pct_asian, pct_black, pct_hispanic_or_latino, pct_white, total_pop, lat, long | dimensional |
| monthly_city_temp | city_code, year, month, avg_temperature | dimensional | 


#### Mapping Out Data Pipelines
The steps necessary to pipeline the data into star schema model:
- Clean the data of nulls, duplicates, fix data_types
- Load the staging tables
- Create fact and dimensional tables
- Save the processed data into parquet files for further uses

###  Run Pipelines to Model the Data 
#### Create the data model
Build the data pipelines to create the data model.

In [46]:
# Create fact table for immigration
immigration_df = staging_i94_df.select("id", "state_code", "city_code", "date", "count").drop_duplicates()
immigration_df.limit(5).toPandas()

Unnamed: 0,id,state_code,city_code,date,count
0,264030.0,FL,FMY,2016-04-02,1.0
1,1364080.0,CA,TOR,2016-04-08,1.0
2,834035.0,PA,NEW,2016-04-05,1.0
3,2573307.0,PR,CLM,2016-04-14,1.0
4,5835717.0,FL,DET,2016-04-30,1.0


In [47]:
immigration_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- city_code: string (nullable = true)
 |-- date: string (nullable = true)
 |-- count: string (nullable = true)



In [48]:
# Create dimension table for immigrant
immigrant_df = staging_i94_df.select("id", "gender", "age", "visa_type").drop_duplicates()
immigrant_df.limit(5).toPandas()

Unnamed: 0,id,gender,age,visa_type
0,969559.0,M,71.0,B2
1,647849.0,M,32.0,WT
2,4877171.0,M,11.0,WT
3,3162474.0,F,61.0,B2
4,2557045.0,F,73.0,B2


In [49]:
immigrant_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- visa_type: string (nullable = true)



In [50]:
# Create dimension table for city
city_stat_df = staging_demographics_df.join(staging_temperature_df, "city_code") \
    .select("city_code", "state_code", "city_name", "median_age", "pct_male_pop", "pct_female_pop", "pct_veterans",
           "pct_foreign_born", "pct_native_american", "pct_asian", "pct_black",
           "pct_hispanic_or_latino", "pct_white", "total_pop", "lat", "long").drop_duplicates()
city_stat_df.limit(5).toPandas()

Unnamed: 0,city_code,state_code,city_name,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,pct_native_american,pct_asian,pct_black,pct_hispanic_or_latino,pct_white,total_pop,lat,long
0,BRO,TX,Brownsville,30.6,47.7,52.3,2.3,2.3,0.6,0.9,0.7,92.5,95.0,183888,26.52N,96.72W
1,HSV,WI,Madison,30.7,49.2,50.8,3.9,3.9,0.9,9.6,8.2,7.9,82.1,248956,34.56N,85.62W
2,ATL,GA,Atlanta,33.8,48.3,51.7,4.0,4.0,1.0,5.2,52.9,4.0,42.3,463875,34.56N,83.68W
3,NEW,NJ,Newark,34.6,49.0,51.0,2.1,2.1,0.8,2.6,51.4,35.6,27.1,281913,40.99N,74.56W
4,NWH,CT,New Haven,29.9,48.9,51.1,2.0,2.0,1.7,6.1,33.3,33.4,43.2,130310,40.99N,72.43W


In [51]:
immigration_df.count()

779

In [54]:
# Create dimension table for monthly city temperature
monthly_city_temperature_df = staging_temperature_df.select("city_code", "year", "month", "avg_temperature").drop_duplicates()
monthly_city_temperature_df.limit(5).toPandas()

Unnamed: 0,city_code,year,month,avg_temperature
0,SAA,2013,6,18.6
1,PHI,2013,5,16.6
2,BOS,2013,5,14.3
3,BUR,2013,3,14.5
4,RNO,2013,2,4.7


In [55]:
monthly_city_temperature_df.printSchema()

root
 |-- city_code: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- avg_temperature: float (nullable = true)



#### Save tables to parquet files

In [56]:
# Write fact table to parquet
immigration_df.write.mode("overwrite").partitionBy("state_code", "city_code").parquet("immigration")

# Write dimension tables to parquet
immigrant_df.write.mode("overwrite").partitionBy("gender", "age").parquet("immigrants")
city_stat_df.write.mode("overwrite").partitionBy("state_code").parquet("cities")
monthly_city_temperature_df.write.mode("overwrite").parquet("monthly_city_temperatues")

#### Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [57]:
# Perform quality checks here
# Check is the tables exist
def table_exists(df):
    if df is not None:
        return True
    else:
        return False
        
if  table_exists(immigration_df) & table_exists(immigrant_df) & table_exists(city_stat_df) & table_exists(monthly_city_temperature_df):
    print("data quality check passed")
    print("fact and dimensional tables exist")
    print()
else:
    print("data quality check failed")
    print("tables are missing")

data quality check passed
fact and dimensional tables exist



In [58]:
# make sure the tables are not empty
def table_not_empty(df):
    return df.count() != 0 

if table_not_empty(immigration_df) & table_not_empty(immigrant_df) & table_not_empty(city_stat_df) & table_not_empty(monthly_city_temperature_df):
    print("data quality check passed")
    print("dimension tables and fact table contain records")
    print()
else:
    print("data quality check failed")
    print("records do not exist")

data quality check passed
dimension tables and fact table contain records



### Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

Brief description of data files with column names and their description, alng with an ER diagram

![Database schema](CapstoneErDiagram.png)
##### Fact Table

    immigration_df
        id: id
        state_code: state code of arrival city
        city_code: city port code of arrival city
        date: date of arrival
        count: count of immigrant's entries into the US

##### Dimensional Tables
   
    immigrant_df
        id: id of immigrant
        gender: gender of immigrant
        age: age of immigrant
        visa_type: immigrant's visa type

    city_stat_df
        city_code: city port code
        state_code: state code of the city
        city_name: name of the city
        median_age: median age of the city
        pct_male_pop: percentage of male population
        pct_female_pop: percentage of population in percentage
        pct_veterans: percentage of veteran population in percentage
        pct_foreign_born: percentage of foreign born population
        pct_native_american: percentage of native american population
        pct_asian: percentage of asian population
        pct_black: percentage of black population
        pct_hispanic_or_latino: percentage of hispanic population
        pct_white: percentage of white population
        total_pop: city's total population
        lat: latitude of the city
        long: longitude of the city

    monthly_city_temp_df
        city_code: city port code
        year: 
        month: month 
        avg_temperature: average temperature of city for given month

#### Examples of queries a user will be able to perform from the given schema

In [59]:
# finding the top 10 busiest destination airport
city_stat_df.groupBy("city_code").count().sort("count", ascending=False).limit(10).show()

+---------+-----+
|city_code|count|
+---------+-----+
|      SPI|    9|
|      JAC|    8|
|      RCM|    4|
|      COL|    4|
|      PIA|    4|
|      HSV|    4|
|      CAE|    3|
|      RST|    2|
|      FAY|    2|
|      KAN|    2|
+---------+-----+



In [60]:
# the most common visit types of travellers
immigrant_df.groupBy("visa_type").count().sort("count", ascending=False).show()

+---------+-----+
|visa_type|count|
+---------+-----+
|       WT|  344|
|       B2|  309|
|       WB|   55|
|       B1|   55|
|       F1|    9|
|       F2|    3|
|       E2|    3|
|       M1|    1|
+---------+-----+



###  Project Write Up

* The rationale for choosing Apache Spark:
     - ability to handle multiple file formats with large datasets
     - unified analytics engine for big data
     - easy processing of large input files into dataframes
     - easy manipulation of data 


* The decision for how often the data should be updated must be based on factors such as how often new data will be available, and what kind of analytics will be performed. For these datasets, a monthly update cycle might be optimal.

* If the data was increased by 100x volume, the data would be stored in something like Amazon S3 bucket and loaded onto the staging tables. Spark would still be used as the data processing framework as it is able to handle very large datadets. 

* If the data populates a dashboard that must be updated on a daily basis by 7am every day, a workflow management platform would have to be used such as Apache Airflow, where data pipelines could be built, scheduled and automated. Data quality checks can also be scheduled. 

* If the database needed to be accessed by 100+ people, the analytics database would have to be moved to a proudction scale warehouse on the cloud such as Amazon Redshift that would allow efficient access of the database by many people. It also has auto-scaling capabilities and great read performance.