# Project Title
### Data Engineering Capstone Project

#### Project Summary

The goal of this project is to create a data mart for the massive Travel data that serves as a "golden record" for reporting and analytics purposes. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [19]:
# Do all imports and installs here
import pandas as pd
import configparser
from datetime import datetime, timedelta
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import types as T
from pyspark.sql import functions as F
from datetime import date
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import * 
from pyspark.sql.functions import split

In [3]:
# Configuring the AWS Credentials
config = configparser.ConfigParser()
config.read('dw.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config['CREDENTIALS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['CREDENTIALS']['AWS_SECRET_ACCESS_KEY']

### Step 1: Scope the Project and Gather Data

#### Scope 
I am going to load the data into the workspace, examine it, clean the datasets, join them where necessary, apply transformations, design a star schema for data mart, load the data into the data mart.
I used US Airport codes, US Travel data, US Cities demographics data. My end solution is a data mart. I used Apache Spark  on Amazon S3

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 
I used:
US Travel Data:
This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.

US Cities Demographics Data: This data set contains demographics information, average size of household for major cities and their respective states. This data comes from OpenSoft. You can read more about it here.

US Airport Codes Data: This is a simple table of airport codes and corresponding cities. It comes from here.


In [4]:
# https://spark-packages.org/package/saurfang/spark-sas7bdat
# Initializing a Spark Session
spark = SparkSession.builder\
            .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.3")\
            .enableHiveSupport()\
            .getOrCreate()

In [5]:
# Loading the US Travel data that is in sas7bdat format
i94_df = spark.read.format('com.github.saurfang.sas.spark')\
                .option('header','True')\
                .load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [6]:
# Loading the cities demographics data
cities_df = spark.read.format('csv')\
                    .option('header','True')\
                    .load('us-cities-demographics.csv')

In [7]:
# Loading the airport codes data
airport_df = spark.read.format('csv')\
                    .option('header','True')\
                    .load('airport-codes_csv.csv')

Step 2: Explore and Assess the Data

Explore the Data

Missing values,
unclean data (entire dataset is in one column blob) that needs to be transformed,
SAS Date format on dates

Cleaning Steps

Created a spark UDF to transform the arrival date
Cleaned up for missing values and duplications
Transformed a single column blob data to multi column data that can be resuable for reporting and analytics
Created a new data set based on US Cities and Demographics that would work better with my data mart 

In [8]:
i94_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [19]:
# missing values estimation
columnlist = i94_df.columns

for i in columnlist:
    a = spark.sql("select sum(case when {} is null then 1 else 0 end) as {}_missing from i94_df \
                where {} is null".format(i,i,i)).collect()
    print(i,a)

cicid [Row(cicid_missing=None)]
i94yr [Row(i94yr_missing=None)]
i94mon [Row(i94mon_missing=None)]
i94cit [Row(i94cit_missing=None)]
i94res [Row(i94res_missing=None)]
i94port [Row(i94port_missing=None)]
i94addr [Row(i94addr_missing=152592)]
depdate [Row(depdate_missing=142457)]
i94bir [Row(i94bir_missing=802)]
i94visa [Row(i94visa_missing=None)]
count [Row(count_missing=None)]
dtadfile [Row(dtadfile_missing=1)]
visapost [Row(visapost_missing=1881250)]
occup [Row(occup_missing=3088187)]
entdepa [Row(entdepa_missing=238)]
entdepd [Row(entdepd_missing=138429)]
entdepu [Row(entdepu_missing=3095921)]
matflag [Row(matflag_missing=138429)]
biryear [Row(biryear_missing=802)]
dtaddto [Row(dtaddto_missing=477)]
gender [Row(gender_missing=414269)]
insnum [Row(insnum_missing=2982605)]
airline [Row(airline_missing=83627)]
admnum [Row(admnum_missing=None)]
fltno [Row(fltno_missing=19549)]
visatype [Row(visatype_missing=None)]


In [27]:
# Deduplication process on selected columns
# Data Cleaning
i94_df_clean = i94_df.dropna(how = "any", subset = ["i94addr", "depdate"])
airport_df_clean = airport_df.dropna(how = "any", subset = ["iso_region", "local_code","StateCode"])

In [25]:
# Examining the data 
#airport_df.groupBy("continent").agg(countDistinct("name")).toPandas()
airport_df.filter(airport_df.local_code == "STP").toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,StateCode
0,KSTP,medium_airport,St Paul Downtown Holman Field,705,,US,US-MN,St Paul,KSTP,STP,STP,"-93.05999755859375, 44.93450164794922",MN


In [9]:
i94_df.show(3)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null|1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| n

In [26]:
# Transforming a single column blob to multi column data set

split_col = split(cities_df['City;State;Median Age;Male Population;Female Population;Total Population;Number of Veterans;Foreign-born;Average Household Size;State Code;Race;Count'], ';')
cities_df = cities_df.withColumn('City', split_col.getItem(0))
cities_df = cities_df.withColumn('State', split_col.getItem(1))
cities_df = cities_df.withColumn('MedianAge', split_col.getItem(2))
cities_df = cities_df.withColumn('MalePop', split_col.getItem(3))
cities_df = cities_df.withColumn('FemalePop', split_col.getItem(4))
cities_df = cities_df.withColumn('TotalPop', split_col.getItem(5))
cities_df = cities_df.withColumn('NumberofVeterans', split_col.getItem(6))
cities_df = cities_df.withColumn('ForeignBorn', split_col.getItem(7))
cities_df = cities_df.withColumn('AvgHouseholdSize', split_col.getItem(8))
cities_df = cities_df.withColumn('StateCode', split_col.getItem(9))
cities_df = cities_df.withColumn('Race', split_col.getItem(10))
cities_df = cities_df.withColumn('Count', split_col.getItem(11))
#df_states.withColumn("col1", split(col("State_Name"), "-")
#                     .getItem(0))
cities_df_clean = cities_df.drop('City;State;Median Age;Male Population;Female Population;Total Population;Number of Veterans;Foreign-born;Average Household Size;State Code;Race;Count')

#cities_df_clean.toPandas()
airport_df = airport_df.withColumn("StateCode", split(col("iso_region"),"-").getItem(1))

In [11]:
states_df = cities_df_clean.groupBy("State","StateCode") \
    .agg(sum("MalePop").alias("Male"), \
         sum("FemalePop").alias("Female"), \
         sum("TotalPop").alias("Population"), \
         sum("NumberofVeterans").alias("Veterans"), \
         sum("ForeignBorn").alias("AlienPopulation"), \
         avg("AvgHouseholdSize").alias("AvgHouseHoldSize"), \
         avg("MedianAge").alias("AverageAge") 
     ) 

In [12]:
#airport_df.toPandas()
states_df.toPandas()

Unnamed: 0,State,StateCode,Male,Female,Population,Veterans,AlienPopulation,AvgHouseHoldSize,AverageAge
0,Mississippi,MS,527627.0,613916.0,1141543.0,67314.0,21233.0,2.601111,33.211111
1,Utah,UT,2586752.0,2532925.0,5119677.0,193165.0,651811.0,3.156875,30.8625
2,South Dakota,SD,613590.0,611900.0,1225490.0,80435.0,76545.0,2.345,37.05
3,Kentucky,KY,2262415.0,2386970.0,4649385.0,280125.0,332440.0,2.395,35.95
4,California,CA,61055672.0,62388681.0,123444353.0,4617022.0,37059662.0,3.095325,36.173964
5,Nebraska,NE,1786665.0,1819500.0,3606165.0,195985.0,356105.0,2.435,33.25
6,New Hampshire,NH,488855.0,502135.0,990990.0,55025.0,135995.0,2.43,37.8
7,Delaware,DE,163400.0,196385.0,359785.0,15315.0,16680.0,2.45,36.4
8,Minnesota,MN,3478803.0,3565362.0,7044165.0,321738.0,1069888.0,2.496852,35.57963
9,North Carolina,NC,7330525.0,7970470.0,15300995.0,830730.0,1896635.0,2.475,33.785714


In [30]:
@udf(StringType())
def arrival_date(arrdate):
    if arrdate:
        return (datetime(1960,1,1).date + timedelta(arrdate)).isoformat()
    return None

i94_df_clean = i94_df_clean.withColumn("arrdate",arrival_date(i94_df_clean.arrdate))

In [14]:
# initialize spark sql
i94_df_clean.createOrReplaceTempView("i94")
cities_df_clean.createOrReplaceTempView("cities")
airport_df_clean.createOrReplaceTempView("airport")
states_df.createOrReplaceTempView("states")

In [28]:
	
#from pyspark.sql import SparkSession
#spark = SparkSession.builder.\
#config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
#.enableHiveSupport().getOrCreate()
#df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The conceptual data model is desgined by including all three data sets for analysis and in depth reporting into travellers, airport informations, their respective state based information and other major information that would be appropritate for reporting and analytics

##### 3.1.1 Conceptual Data Model along with Data Dictionary

* Fact_i94 - fact table
   * cicid: CIC_UniqueID
   * i94cit: origin city code
   * i94res: residential code
   * i94port: port of entry 
   * i94addr: state code within united states, final state (different from port of entry state)
   * visapost: visa issuance information
   * admnum: admission number for that entry


* DimAirport - Airport Dimension Table
   * i94Port: PortOfEntry
   * name: name of the airport 
   * coordinates: latitude and longtitude coordinates
   * iso_region: region of the airport 
   * municipality: Municipality where the airport is located
   * gps_code: GPS code of the airport
   * type: type of airport whether it is small or large 
   
* DimTraveller - Traveller Dimension Table
   * cicid: CIC_UniqueID of the traveller
   * gender: gender of the traveller
   * biryear: birth year of the traveller
   * visatype: visa type of the traveller
   * occup: occupation of the traveller


* DimTravelFlags - Various Flags used for the Travel
   * cicid: CIC_UniqueID
   * ENTDEPA: ArrivalFlag
   * ENTDEPD: DepartureFlag
   * ENTDEPU: UpdateFlag
   * MATFLAG: MatchFlag

* DimAirline - Airline that Traveller used to flyDimension Table
   * cicid: CIC_UniqueID
   * arrdate: ArrivalDate
   * AIRLINE: Airline
   * FLTNO: FlightNumber of that airline

* DimTravelInfo - Dimension that contains travel information
   * cicid as CIC_UniqueID
   * VISATYPE: TypeOfVisa
   * ADMNUM: AdmissionNumber
   * VISAPOST: VisaIssuance
   * I94VISA: PurposeOfTravel
   * I94MODE: ModeOfTravel

* DimStateInfo - Dimension about State demographic information
   * State: State 
   * StateCode: Short code of every state
   * Male: Male population in that state
   * Female: Female population in that state
   * Population: Total population in that state
   * Veterans: Veteran population in that state
   * AlienPopulation: Foreign born population in that state
   * AvgHouseHoldSize: average size of a household in that state
   * AverageAge: average age of a person in that state

#### 3.2 Mapping Out Data Pipelines

The following steps are taken to pipeline the raw data to data marts:
fact and dimension tables

In [8]:
# Reading the parquet
#filename = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
#df_spark =spark.read.format('com.github.saurfang.sas.spark').load(filename)

In [6]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [15]:
# loading data into the data lake as a data mart

fact_i94 = spark.sql("""
Select 
    cicid as CIC_UniqueID,
    i94cit,
    i94res,
    i94port,
    i94addr,
    visapost,
    admnum
From 
    i94
""").dropDuplicates()

dimAirport = spark.sql("""
Select
    a.i94Port as PortOfEntry,
    b.name as AirportName,
    b.coordinates as Coordinates,
    b.iso_region as Region,
    b.municipality as Municipality,
    b.gps_code as GPSCode,
    b.type as AirportType
From
    i94 a
Join
    airport b
On
    a.i94Port == b.local_code
""").dropDuplicates()

dimTraveller = spark.sql("""
Select
    cicid as CIC_UniqueID,
    gender,
    biryear,
    visatype,
    occup
From
    i94
""").dropDuplicates()

dimTravelFlags = spark.sql("""
Select
    cicid as CIC_UniqueID,
    ENTDEPA as ArrivalFlag,
    ENTDEPD as DepartureFlag,
    ENTDEPU as UpdateFlag,
    MATFLAG as MatchFlag
From
    i94
""").dropDuplicates()

dimAirline = spark.sql("""
Select
    cicid as CIC_UniqueID,
    arrdate,
    AIRLINE as Airline,
    FLTNO as FlightNumber
From
    i94
""").dropDuplicates()

dimTravelInfo = spark.sql("""
Select
    cicid as CIC_UniqueID,
    VISATYPE as TypeOfVisa,
    ADMNUM as AdmissionNumber,
    VISAPOST as VisaIssuance,
    I94VISA as PurposeOfTravel,
    I94MODE as ModeOfTravel
From
    i94
""").dropDuplicates()

dimStateInfo = spark.sql("""
Select
    State,
    StateCode,
    Male,
    Female,
    Population,
    Veterans,
    AlienPopulation,
    AvgHouseHoldSize,
    AverageAge
From
    States
""").dropDuplicates()

In [29]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

# Write the data mart to S3 bucket
#DestBucket = "s3://stagingi94/"

fact_i94.\
    write.mode("overwrite").\
    parquet(os.path.join(DestBucket, "fact_i94.parquet")). \
    partitionBy("i94cit","i94res","i94port","i94addr")
dimAirport.\
    write.mode("overwrite").\
    parquet(os.path.join(DestBucket, "dimAirport.parquet")). \
    partitionBy("iso_region","municipality","airporttype")
dimTraveller.\
    write.mode("overwrite").\
    parquet(os.path.join(DestBucket, "dimTraveller.parquet")). \
    partitionBy("CIC_UniqueID","biryear","visatype")
dimTravelFlags.\
    write.mode("overwrite").\
    parquet(os.path.join(DestBucket, "dimTravelFlags.parquet")). \
    partitionBy("CIC_UniqueID")
dimDate.\
    write.mode("overwrite").\
    parquet(os.path.join(DestBucket, "dimDate.parquet"))
dimAirline.\
    write.mode("overwrite").\
    parquet(os.path.join(DestBucket, "dimAirline.parquet")). \
    partitionBy("Airline")
dimTravelInfo.\
    write.mode("overwrite").\
    parquet(os.path.join(DestBucket, "dimTravelInfo.parquet")). \
    partitionBy("CIC_UniqueID","TypeOfVisa","VisaIssuance")
dimStateInfo.\
    write.mode("overwrite").\
    parquet(os.path.join(DestBucket, "dimStateInfo.parquet")). \
    partitionBy("State")

NameError: name 'DestBucket' is not defined

#### 4.2 Data Quality Checks

For data quality checks, first, I am looking at joining a fact and dimension table to verify the records match appropriately followed by checking for dupliation on the fact table using CICIC Id. 

In [16]:
fact_i94.createOrReplaceTempView("dq_test")
dimStateInfo.createOrReplaceTempView("dq_test1")
dq_testquery = spark.sql("""
select 
    a.CIC_UniqueID,
    a.i94port,
    a.i94addr,
    b.State,
    b.StateCode
From dq_test a
Join dq_test1 b
on a.i94addr == b.StateCode
""")
dq_testquery.show(3)

+------------+-------+-------+--------+---------+
|CIC_UniqueID|i94port|i94addr|   State|StateCode|
+------------+-------+-------+--------+---------+
|        91.0|    CLT|     IN| Indiana|       IN|
|       151.0|    NEW|     NY|New York|       NY|
|       938.0|    NEW|     NY|New York|       NY|
+------------+-------+-------+--------+---------+
only showing top 3 rows



In [18]:
# DQ Check #2
# look for duplications
dq_testquery2 = spark.sql("""
select CIC_UniqueID, count(*) as count from dq_test group by CIC_UniqueID order by count
""")
dq_testquery2.show(3)

+------------+-----+
|CIC_UniqueID|count|
+------------+-----+
|     12467.0|    1|
|     35804.0|    1|
|     52086.0|    1|
+------------+-----+
only showing top 3 rows



#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
In this project, I used Spark to develop data mart and write the ETL pipeline to load cleaned data into data mart. I was pleased Spark took only few minutes to load and perform operations on dataset with 3.1 million rows in it.
* Propose how often the data should be updated and why.
The file structure shows the dataset is prepared every month. Perhaps if this ETL process runs every month, that would be ideal. We might want to have development environment where we run the pipeline to see if we encounter any issues before moving to production environment.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 Put together a proper Amazon EMR cluster mx.extralarge with one name node and several worker nodes (15-20)
 Utilize Apache Airflow to schedule and monitor workflows.
 Also, utilize a resource manager that would display the nodes utilization and amount of physical memory that is being consumed and adjust workload accordingly.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 Using Apache Airflow to schedule the workflow and create a email mechanism 
 If the data is being refreshed every day (which does not seem to be the case since the data sets are prepared monthly) then identify data points that more often (US Travel Data) and schedule that to update every so often.
 Some data may not change every day - for instance, airport codes are never meant to change and remain static.
 * The database needed to be accessed by 100+ people.
 Enable Server Encryption to prevent DDoS attacks
 Enable replication - backups to speed up recovery
 Increase number of worker nodes in EMR and increase replication
 Let the data sit on Hadoop DFS and implement Hive to retrieve data quickly from the HDFS