# Data Engineering Capstone Project

#### Project Summary
The purpose of this project is to create a dataset to ease query for jobs. For that we don't need normalized schema, instead star schema is created because of it's scalability in terms of reading and querying. This star schema can be further modified to create more summarized data such as OLAP cubes for BI and analytics (not in scope of this project). Here are few examples of kind of analysis that can be done on the final data to help answer patterns such as below:

- How many visas were issued overall or over a period of time? *(dim: visa, dim: date, fact: immigration records)*
- What mode immigrants used to travel the most? *(dim: mode, fact: immigration records)*
- Which state/port received most no. of non-immigrants during a period of time? *(dim: state, dim: airport, fact immigration records)*
- What kind of occupation for a specific visa immigrants had? *(dim: non_immigrant, fact: immigration records)*
- Metric w.r.t date like the number of immigrants entered in a year, quarter, month, week? *(dim: date, fact: immigration record)*



The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [11]:
import os
import configparser
import pandas as d
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.types import IntegerType, StringType, DoubleType, LongType

In [12]:
config = configparser.ConfigParser()
config.read_file(open('CapstoneProject/conf/config.cfg'))

In [13]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3")\
                     .getOrCreate()
spark._jsc.hadoopConfiguration().set(
            'fs.s3a.impl', 'org.apache.hadoop.fs.s3native.NativeS3FileSystem')
spark._jsc.hadoopConfiguration().set(
            'fs.s3a.awsAccessKeyId', config['AWS']['access_key_id'])
spark._jsc.hadoopConfiguration().set(
            'fs.s3a.awsSecretAccessKey', config['AWS']['secret_access_key'])

log4jLogger = spark._jvm.org.apache.log4j
log = log4jLogger.LogManager.getLogger(__name__)
log.setLevel(log4jLogger.Level.WARN)


## Step 1: Scope the Project and Gather Data

### Scope 
The scope of this project is to analyze the raw data for immigration, airlines, states provided/gathered from the sources mentioned below. Transform and clean the data to load into schema on read in s3. This data once transformed can be further used to create summary tables/df for analytics.

### Describe and Gather Data 
##### I94 Immigration Data: 
This data comes from the [US National Tourism and Trade Office](https://www.trade.gov/national-travel-and-tourism-office).
The data is provided in both sas and parquet format. We would be using parquet format for this project. This data contains event information related to immigrants. Their entry date, visa, mode of entry, country of origin/residence, their return date, airlines, modes of travel etc. All the event related information is provided in the data files and are already stored in s3.<br><br>
##### U.S. City Demographic Data: 
This data comes from [OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/), file is delimited by semicolon contains information related to city, state, male/female population, racial population and other demographic information. <br><br>
##### Airport Code Table: 
This is a simple table of airport codes and corresponding cities. It comes from [datahub.io](https://datahub.io/core/airport-codes#data). Data is stored in CSV format.<br><br>
##### Other Data Sources:
Following data sources are coming from I94_SAS_Labels_Descriptions.SAS file transformed into CSV<br>
- Countries : Contains country code and name
- States : Contains state code and name
- Visa : Contain Visa code and type
- Mode : Contains mode code and mode name
- Ports : Contains port code and port name/state


<br>Here is the how the data looks like:

In [6]:
immigration_data="s3a://capstoneprojectsource/sas_data"
visa_data="s3a://capstoneprojectsource/source/visa.csv"
mode_data="s3a://capstoneprojectsource/source/mode.csv"
country_data="s3a://capstoneprojectsource/source/countries.csv"
states_data="s3a://capstoneprojectsource/source/states.csv"
ports_data="s3a://capstoneprojectsource/source/all_ports.csv"
airport_data="s3a://capstoneprojectsource/source/airport-codes_csv.csv"
city_data="s3a://capstoneprojectsource/source/us-cities-demographics.csv"

In [8]:
immigration_data_df = spark.read.load(immigration_data, header=True)
immigration_data_df.count()

3096313

In [9]:
immigration_data_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [40]:
visa_data_df = spark.read.csv(visa_data, header=True)
visa_data_df.printSchema()

root
 |-- code: string (nullable = true)
 |-- visa_desc: string (nullable = true)



In [41]:
mode_data_df = spark.read.csv(mode_data, header=True)
mode_data_df.printSchema()

root
 |-- code: string (nullable = true)
 |-- mode: string (nullable = true)



In [42]:
country_data_df = spark.read.csv(country_data, header=True)
country_data_df.printSchema()

root
 |-- code: string (nullable = true)
 |-- name: string (nullable = true)



In [43]:
states_data_df = spark.read.csv(states_data, header=True)
states_data_df.printSchema()

root
 |-- code: string (nullable = true)
 |-- state: string (nullable = true)



In [44]:
ports_data_df = spark.read.option("delimiter", ";").csv(ports_data, header=True)
ports_data_df.printSchema()

root
 |-- code: string (nullable = true)
 |-- ports: string (nullable = true)



In [45]:
airport_data_df = spark.read.csv(airport_data, header=True)
airport_data_df.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [46]:
city_data_df = spark.read.option("delimiter", ";").csv(city_data, header=True)
city_data_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



### Step 2: Explore and Assess the Data
#### Explore the Data 

##### I94 Immigration data :
- The dataset contains event data for the individuals that entered the united states. The information contains their year of birth, date of arrival,  departure, visa, country of origin or residence, gender, airline, mode/port of entrance.
- Information that is not related to event but are related to immigrant can be put in a dimension table like year of birth, occupation, residence of immigrant etc. This will help any kind of analytics to be performed on immigrant type/country/occupation. 
- Rest of the event related information can be used to generate the fact data for any further analytics on this dataset.

In [43]:
immigration_data_df.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

##### Visa code and types :
- This dataset contains type of visa codes that are allowed with their respective descriptions.
- This data can be combined with I94 immigration dataset to add the types of visa along with code and desc.
- This can be a dimension for analysing the number and types of visa that have been issued/arrived.

In [73]:
visa_data_df.show(5)

+----+---------+
|code|visa_desc|
+----+---------+
|   1| Business|
|   2| Pleasure|
|   3|  Student|
+----+---------+



##### Mode of Arrival
- Contains types of mode that can be used to enter the country.
- Can be further combined with even data for any information related mode of arrival.

In [74]:
mode_data_df.show(5)

+----+------------+
|code|        mode|
+----+------------+
|   1|         Air|
|   2|         Sea|
|   3|        Land|
|   9|Not reported|
+----+------------+



##### List of Countries :
- Dataset contains list of country codes along with names.
- This data can be used to analyse I94 data related country of origin / residence of immigrants

In [14]:
country_data_df.show(5)

+----+--------------+
|code|          name|
+----+--------------+
| 582|MEXICO Air Sea|
| 236|   AFGHANISTAN|
| 101|       ALBANIA|
| 316|       ALGERIA|
| 102|       ANDORRA|
+----+--------------+
only showing top 5 rows



##### List of states :
- Lists of states codes and name of valid states, can be combined with city demographics data to create on dimension.

In [15]:
states_data_df.show(5)

+----+----------+
|code|     state|
+----+----------+
|  AL|   ALABAMA|
|  AK|    ALASKA|
|  AZ|   ARIZONA|
|  AR|  ARKANSAS|
|  CA|CALIFORNIA|
+----+----------+
only showing top 5 rows



##### Ports :
- List of ports that have all valid and invalid ports. This can be used to make sure I94 event data has only valid codes.

In [16]:
ports_data_df.show(5)

+----+--------------------+
|code|               ports|
+----+--------------------+
| ALC|ALCAN, AK        ...|
| ANC|ANCHORAGE, AK    ...|
| BAR|BAKER AAF - BAKER...|
| DAC|DALTONS CACHE, AK...|
| PIZ|DEW STATION PT LA...|
+----+--------------------+
only showing top 5 rows



##### List of airports :
- Name of airports along with data like type of airport, iata_code, elevation, co-ordinates etc.
- This joined with event data would get the information for specific airport 

In [17]:
airport_data_df.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

##### City Demographic information
- Dataset contains city demographic information. 
- City and State related data can be combined and put into one dimension for further analysis with event data.

In [18]:
city_data_df.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

#### Clean and Transform Data

Generic steps to be performed on each dataset :
- Find nulls and replace with proper code, date or 'UNKNOWN' if no values is known at this moment
- Give proper alias to the columns
- Select only required columns in intermediate df(s), eliminate the ones that are not needed
- Typecast the columns whenever required


<br>Here are the clean and transform steps performed on each of the dataset to create dim and fact tables
##### UDF to remove the '0' from leading position

In [51]:
remove_padding = F.udf(lambda x: x.lstrip('0') if x else '0', StringType())

##### visa dim : 
- change the data type of visa code to integer
- assign proper alias to columns
- join with immigiration event data to also get the visa type 
- create a dim table / df for visa

In [43]:
imm_df_stg = (
    immigration_data_df
    .select(
        F.col("i94visa").cast(IntegerType()).alias("code"),
        F.col("visatype").alias("visa_type")
    )
).distinct()


visa_dim = (
    visa_data_df
    .join(imm_df_stg, visa_data_df.code == imm_df_stg.code, how = 'left')
    .select(
        visa_data_df.code.cast(IntegerType()).alias("code"),
        imm_df_stg.visa_type.alias("type"),
        visa_data_df.visa_desc.alias("desc")
    )
)


visa_dim.show(5)

+----+----+--------+
|code|type|    desc|
+----+----+--------+
|   1|  I1|Business|
|   1|  B1|Business|
|   1|  E1|Business|
|   1|  WB|Business|
|   1|  E2|Business|
+----+----+--------+
only showing top 5 rows



##### mode dim:
- assign appropiate datatype to code
- assign meaningful aliases to columns

In [45]:
mode_dim = (
    mode_data_df
    .select(
        F.col("code").cast(IntegerType()).alias("code"), 
        F.col("mode")
    )
)
mode_dim.show(5)

+----+------------+
|code|        mode|
+----+------------+
|   1|         Air|
|   2|         Sea|
|   3|        Land|
|   9|Not reported|
+----+------------+



##### country dim:
- Select only valid codes to create this data set eliminate the invalid code.
- Typecast code to have integer data type

In [46]:
country_dim = (
    country_data_df
    .select(
        F.col("code").cast(IntegerType()), 
        F.col("name"))
    .filter(~F.col("name").contains("No Country Code"))
    .filter(~F.col("name").contains("INVALID"))
)
country_dim.show(5)

+----+--------------+
|code|          name|
+----+--------------+
| 582|MEXICO Air Sea|
| 236|   AFGHANISTAN|
| 101|       ALBANIA|
| 316|       ALGERIA|
| 102|       ANDORRA|
+----+--------------+
only showing top 5 rows



##### ports dim:
- Select only valid codes to create ports dim

In [47]:
ports_dim = (
    ports_data_df
    .filter(~F.col("ports").contains("No PORT Code"))
)

ports_dim.show(5)

+----+--------------------+
|code|               ports|
+----+--------------------+
| ALC|ALCAN, AK        ...|
| ANC|ANCHORAGE, AK    ...|
| BAR|BAKER AAF - BAKER...|
| DAC|DALTONS CACHE, AK...|
| PIZ|DEW STATION PT LA...|
+----+--------------------+
only showing top 5 rows



##### airports dim:
- Select records that have proper IATA Code
- Split coordinates to separate columns latitude and longitude
- Typecast the columns whereever needed
- Remove any duplicate records
- Removing any leading '0' from local codes
- Replace nulls with proper code : 'UNKNOWN'
- Assign appropiate alias where necessary

In [59]:
airports_dim = (
    airport_data_df
    .select(airport_data_df.columns)
    .withColumn("local_code", F.when(F.col("local_code").isNull(), 'UNK').otherwise(F.col("local_code")).alias("local_code"))
    .withColumn("local_code", remove_padding("local_code"))
    .withColumn("latitude", F.split("coordinates",',')[0])
    .withColumn("longitude", F.split("coordinates",',')[1])
    .select(
        F.col("ident"), 
        F.col("type"), 
        F.col("name"),
        F.col("elevation_ft"), 
        F.col("iso_country"),
        F.when(F.col("iso_region").isNull(), 'UNK').otherwise(F.col("iso_region")).alias("iso_region"),
        F.when(F.col("municipality").isNull(), 'UNK').otherwise(F.col("municipality")).alias("municipality"),
        F.when(F.col("gps_code").isNull(), 'UNK').otherwise(F.col("gps_code")).alias("gps_code"), 
        F.col("iata_code"), 
        F.col("local_code"),
        F.col("latitude").cast(DoubleType()), 
        F.col("longitude").cast(DoubleType())
    )
    .where(
        F.col("iata_code").isNotNull())
    .distinct()

)

airports_dim.show(5)

+-----+-------------+--------------------+------------+-----------+----------+------------+--------+---------+----------+------------------+-----------------+
|ident|         type|                name|elevation_ft|iso_country|iso_region|municipality|gps_code|iata_code|local_code|          latitude|        longitude|
+-----+-------------+--------------------+------------+-----------+----------+------------+--------+---------+----------+------------------+-----------------+
|  AQY|small_airport|    Girdwood Airport|         150|         US|     US-AK|    Girdwood|     UNK|      AQY|       AQY|       -149.126007|        60.966099|
| AYHH|small_airport|    Honinabi Airport|         452|         PG|    PG-WPD|    Honinabi|    AYHH|      HNN|       HBI|          142.1771|         -16.2457|
| BIDV|small_airport| DjÃºpivogur Airport|           9|         IS|      IS-7| DjÃºpivogur|    BIDV|      DJU|       UNK|-14.28279972076416|64.64420318603516|
| CAM3|small_airport|      Duncan Airport|    

##### states dim (states + city demographic data):
- Get the state data from states df and city data from city df and combine those to create one dimension that has state_code, state_name, list of cities in states, their respective population
- Assign appropiate alias where necessary
- Remove duplicates from states and city df before joining these two datasets

In [62]:
city_df_ = (
    city_data_df
    .select(
        F.col("city"), 
        F.col("State Code").alias("state_code"),
        F.col("Total Population").alias("total_city_pop")
    )
    .distinct()
)

city_df_.show(2)



states_df_ = (
    states_data_df
    .select(
        F.col("code"), 
        F.col("state")
    )
    .distinct()
)

states_df_.show(2)



states_dim = (
    states_df_
    .join(city_df_, states_df_.code == city_df_.state_code,how='left')
    .select(
        states_df_.code.alias("state_code"),
        states_df_.state.alias("state_name"),
        city_df_.city,
        city_df_.total_city_pop
    )
)

states_dim.show(5)

+----------+----------+--------------+
|      city|state_code|total_city_pop|
+----------+----------+--------------+
|   Suffolk|        VA|         88161|
|Enterprise|        NV|        135474|
+----------+----------+--------------+
only showing top 2 rows

+----+---------+
|code|    state|
+----+---------+
|  TN|TENNESSEE|
|  KS|   KANSAS|
+----+---------+
only showing top 2 rows

+----------+----------+------------+--------------+
|state_code|state_name|        city|total_city_pop|
+----------+----------+------------+--------------+
|        TN| TENNESSEE|Murfreesboro|        126121|
|        TN| TENNESSEE|Johnson City|         65369|
|        TN| TENNESSEE|     Memphis|        655760|
|        TN| TENNESSEE| Chattanooga|        176597|
|        TN| TENNESSEE|     Jackson|         66980|
+----------+----------+------------+--------------+
only showing top 5 rows



##### non immigrant dim:
- This data is retrived from I94 immigration event data
- Data that is relevent to immigrant like cicid, birth year, gender, occup, country of residence that is not going to change frequently can be put in separate dimension for analysis on immigrants related data
- Retrive the above columns, typecast and assign aliases wherever necessary
- Replace nulls with code : UNKNOWN or UNK

In [47]:
non_imm_dim = (
    immigration_data_df
    .select(
        F.col("cicid").cast(IntegerType()),
        F.col("biryear").alias("birth_year").cast(IntegerType()),
        F.col("i94bir").cast(IntegerType()).alias("age_in_years"),
        F.col("gender"),
        F.when(F.col("occup").isNull(),'UNK').otherwise(F.col("occup")).alias("occup"),
        F.col("i94res").alias("country_of_res").cast(IntegerType())
    )
)

non_imm_dim.show(5)

+-------+----------+------------+------+-----+--------------+
|  cicid|birth_year|age_in_years|gender|occup|country_of_res|
+-------+----------+------------+------+-----+--------------+
|5748517|      1976|          40|     F|  UNK|           438|
|5748518|      1984|          32|     F|  UNK|           438|
|5748519|      1987|          29|     M|  UNK|           438|
|5748520|      1987|          29|     F|  UNK|           438|
|5748521|      1988|          28|     M|  UNK|           438|
+-------+----------+------------+------+-----+--------------+
only showing top 5 rows



##### date dim:
- This dimension would be useful when any analysis needs to be performed w.r.t specific date, month, year or quarter
- There are other ways to create time dim but for this specific use case get all the arrival and departure dates from i94 immigrantion data
- Get a union of both the data sets, eliminate duplicates
- Extract year, month, day, datekey, quarter, day_of_month, day_of_week, week_of_year from the date and assign to appropiate columns

In [72]:
# Get the arrival and departure dates from Immigration dataset, combine and remove dups

time_df_a = (
    immigration_data_df
    .select(
        F.col("arrdate").alias("date"))
    .where(F.col("arrdate").isNotNull())
)

time_df_d = (
    immigration_data_df
    .select(
        F.col("depdate").alias("date"))
    .where(F.col("depdate").isNotNull())
)

time_df_ = (
    time_df_a
    .union(time_df_d)
    .distinct()
)


# Transform the date in mm/dd/yyyy format

time_df_ = (
    time_df_
    .select(
        F.col("date").cast(IntegerType())
    )
)

time_df_ = (
    time_df_
    .withColumn("sas_date", F.to_date(F.lit("01/01/1960"), "MM/dd/yyyy"))
)

time_df_ = (
    time_df_
    .withColumn("date", F.expr("date_add(sas_date, date)"))
)


# Derive the required columns from date

date_dim = (
    time_df_
    .select("date")
    .withColumn("year", F.year("date"))
    .withColumn("month", F.month("date"))
    .withColumn("day", F.dayofmonth("date"))
    .withColumn("datekey", F.date_format(F.col("date"), "yyyyMMdd"))
    .withColumn("quarter", F.quarter("date"))
    .withColumn("day_of_month", F.dayofmonth("date"))
    .withColumn("day_of_week", F.dayofweek("date"))
    .withColumn("week_of_year", F.weekofyear("date"))
    .sort("datekey")
)

date_dim.show(5)

+----------+----+-----+---+--------+-------+------------+-----------+------------+
|      date|year|month|day| datekey|quarter|day_of_month|day_of_week|week_of_year|
+----------+----+-----+---+--------+-------+------------+-----------+------------+
|2001-07-20|2001|    7| 20|20010720|      3|          20|          6|          29|
|2012-04-12|2012|    4| 12|20120412|      2|          12|          5|          15|
|2012-04-14|2012|    4| 14|20120414|      2|          14|          7|          15|
|2014-04-22|2014|    4| 22|20140422|      2|          22|          3|          17|
|2014-04-24|2014|    4| 24|20140424|      2|          24|          5|          17|
+----------+----+-----+---+--------+-------+------------+-----------+------------+
only showing top 5 rows



##### immigration dataset - fact:
- Transform arrival and departure date to mm/dd/yyyy format
- Remove leading '0' from flight no
- Rename the columns to more readable names
- Assign UNKNOWN to nulls values if the value is not known
- Where ever the mode of travel is not 'Air' assign 'Not Applicable' to airline Column
- Cast col adm_num to long type and cicid, country_of_origin, arr_mode_code, visa_code to Integer
- Get the distinct ports from ports dim and local code from airport dim and create an intermediate data frame that has union of list of codes from both airport and port dims
- semi left join following dims to ensure the immigration fact table has valid data (foriegn key dependency)
	- mode_dim on mode_code
	- country_dim on country code
	- visa_dim on visa code
	- intermediate ports dim (airports_dim local codes + port from ports_dim) on port code
	- states_dim on state_code

In [86]:
# Clean and transform the columns
df_imm_stg = (
    immigration_data_df
    .select(
        immigration_data_df.columns)
    .withColumn("arr_date", F.col("arrdate").cast(IntegerType()))
    .withColumn("dep_date", F.col("depdate").cast(IntegerType()))
    .withColumn("sas_date", F.to_date(F.lit("01/01/1960"), "MM/dd/yyyy"))
    .withColumn("arr_date", F.expr("date_add(sas_date, arr_date)"))
    .withColumn("dep_date", F.expr("date_add(sas_date, dep_date)"))
    .withColumn("flt_no", remove_padding("fltno"))
    .select(F.col("admnum").cast(LongType()).alias("adm_num"),
            F.col("cicid").cast(IntegerType()).alias("cic_id"),
            F.col("i94cit").cast(IntegerType()).alias("country_of_origin"),
            F.col("i94port").alias("arr_port"),
            F.col("i94mode").cast(IntegerType()).alias("arr_mode_code"),
            F.when(F.col("i94addr").isNull(), 'UNK').otherwise(F.col("i94addr")).alias("arr_state_code"),
            F.col("arr_date"),
            F.when(F.col("dep_date").isNull(), F.to_date(F.lit("12/31/9999"), "MM/dd/yyyy")).otherwise(F.col("dep_date")).alias("dep_date"),
            F.col("i94visa").cast(IntegerType()).alias("visa_code"),
            F.col("visatype"), 
            F.when(F.col("visapost").isNull(), 'UNK').otherwise(F.col("visapost")).alias("visa_issuing_state"),
            F.when((F.col("airline").isNull() & (F.col("i94mode").cast(IntegerType()) != 1)), 'Not Applicable').otherwise(F.col("airline")).alias("airline"),
            F.col("flt_no"),
            F.to_date(F.lit(F.col("dtadfile")),'yyyymmdd').alias("date_added")
           )
    .distinct()
)

# Create intermediate df to get the codes from both airport and ports dimension to be joined with immigration data in next step, 
# this will ensure we get only valid codes in immigration fact table
df_ports_stg = (
    ports_dim
    .select(
        F.col("code"))
)

df_airports_stg = (
    airports_dim
    .select(
        F.col("local_code"))
)

df_ports_validation = (
    df_ports_stg
    .select('code')
    .union(df_airports_stg
           .select('local_code'))
    .distinct()
)

# join with rest of the dimension tables to filter out any invalid records that are not present in dimension data sets
immigration_fact = (
    df_imm_stg
    .join(mode_dim, df_imm_stg.arr_mode_code == mode_dim.code, how="leftsemi")
    .join(country_dim, df_imm_stg.country_of_origin == country_dim.code, how="leftsemi")
    .join(df_ports_validation, df_ports_validation.code == df_imm_stg.arr_port, how="leftsemi")
    .join(visa_dim, visa_dim.code == df_imm_stg.visa_code, how="leftsemi")
    .join(states_dim, states_dim.state_code == df_imm_stg.arr_state_code, how="leftsemi")
    .select(df_imm_stg.columns)
)

immigration_fact.show(5)

+-----------+-------+-----------------+--------+-------------+--------------+----------+----------+---------+--------+------------------+-------+------+----------+
|    adm_num| cic_id|country_of_origin|arr_port|arr_mode_code|arr_state_code|  arr_date|  dep_date|visa_code|visatype|visa_issuing_state|airline|flt_no|date_added|
+-----------+-------+-----------------+--------+-------------+--------------+----------+----------+---------+--------+------------------+-------+------+----------+
|94166868730|3867886|              135|     NYC|            1|            NY|2016-04-21|2016-08-28|        1|      B1|               LND|     BA|   113|2016-01-21|
|95002496530|5749447|              251|     NEW|            1|            PA|2016-04-30|2016-05-05|        1|      B1|               TLV|     UA|    85|2016-01-30|
|94965421030|5756497|              260|     NYC|            1|            OH|2016-04-30|2016-05-11|        1|      B1|               MNL|     KE|    81|2016-01-30|
|94954405430|575

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The purpose of creating this section is to provide an overview of the data design for this specific project and how it is going to be categorized for analytics.

Here are the dimensions that are identified from the immigration data:
- Visa
- Mode of travel
- List of Countries
- Port of entry:
	- Ports
	- Airports 
- States
- Immigrant
- Date

Dataset that has log of each entry and exit of an immigrant (source for fact table):
- I94 Data

Following diagram shows the relationship between dimensions and fact, along with their respective primary and foreign keys.

Schema to be stored in 1NF and would require minimal joins to fetch the information.



<!-- ![alt text](images/data_model.jpeg) -->

<img src="images/data_model.jpeg" alt="conceptual DM" width="1000"/>

<br>

#### 3.2 Mapping Out Data Pipelines

The data pipeline would have following steps:
1) Read the source and load the data into intermediate dataframes, assuming that the data is stored in s3 and the s3 would be used as source.
2) Clean the data and transform the columns/data in desired format.
3) Load the data in s3 into a 'star schema' which is also a 'schema on read' . This schema can be further analyzed using spark/panda for analytics or can be further transformed and loaded into olap cubes in a redshift dataware house.

The target schema will have data in parquet format with fact and date dimension partitioned by year and month.
The fact table will have data that is present in dimension table to ensure the referential integrity although it's not enforced but the pipeline is design to ensure this.


There are also additional steps to be included in pipeline pertaining to project setup and cluster creation and other validations those also needs to be included as part of this pipeline to ensure all the pieces work together end to end to minimize the errors and the steps in pipeline would be orchestrated with the help of **Apache Airflow**.<br>




##### ETL DAG:

**upload_bootstrap_task_to_s3** :  Shell script to be uploaded in s3 which runs during the creation of EMR cluster which setup the project workspace and also install any requirements/dependencies for python modules to load and run properly.

**create_project_package** : Bash command to package the project into a tar file.

**upload_to_s3** :  Upload the project tar to s3 which is used by the bootstrap script to setup project space on EMR. Though zip files can be accessed from s3 directly while invoking the spark-submit but this step ensures there are no import errors when the spark runs.

**run_etl_spark** : Invokes etl.py from project /scripts directory which includes following steps:
1) Creates spark session 
2) Extract the data from s3 into spark data frames
3) Clean and transform the data into dim and fact tables
4) Load the data back in s3 in parquet format, creating a schema on read

**watch_step_etl** : This step ensures the validation wont's run until the output is uploaded in s3.

**run_data_validation_spark** : This steps performs validation on the output schema/tables.

**watch_step_validate** : This step ensures the cluster won’t terminate until spark job is completed.
 
**terminate_cluster** : Issue terminate cluster when the spark job is completed.


<img src="images/airflow_pipeline.png" alt="pipeline" width="1500"/>


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
The data pipeline as mentioned above is created in Airflow and is manual for now since this is just a sample project to show the creation of ETL end to end.<br>
Here is what project structure looks like : 
- **CapstoneProject** : contains ETL code in python
- **dags** : contains airflow dags
- **logs** : contains airflow workflow logs
- **plugins**: contains airflow helpers and operators

<img src="images/project.png" alt="etl" width="600"/>


<br>Here is what ETL code sturture in **CapstoneProject** looks like :

<img src="images/ETL.png" alt="etl" width="400"/>


<br><br>To start the apache airflow on local host intiate docker compose up in detached mode. 

This will start  all the services defined in a docker-compose.yml along with the latest code in project directory.

In [24]:
!docker-compose up -d

Docker Compose is now in the Docker CLI, try `docker compose up`

Creating network "docker_default" with the default driver
Creating docker_postgres_1 ... 
Creating docker_redis_1    ... 
[1Bting docker_redis_1    ... [32mdone[0m[1A[2KCreating docker_flower_1   ... 
Creating docker_airflow-init_1 ... 
Creating docker_airflow-worker_1 ... 
Creating docker_airflow-scheduler_1 ... 
Creating docker_airflow-webserver_1 ... 
[1Bting docker_airflow-webserver_1 ... [32mdone[0m[4A[2K[2A[2K[5A[2K[1A[2K

<br><br>Airflow dags can be initiated using airflow API(s) programitically. 

Here are the steps to run the dags:

1) Check if the dag "Spark_S3_ETL" exists in airflow 

In [25]:
!curl 'http://localhost:8080/api/v1/dags/Spark_S3_ETL' -H 'content-type: application/json' --user "airflow:airflow"

{
  "dag_id": "Spark_S3_ETL",
  "description": "Load and transform data S3 using spark and airflow",
  "file_token": "Ii9vcHQvYWlyZmxvdy9kYWdzL2RhZy5weSI.cXPa1UbBIsJCgfCxsH65tlZvqB4",
  "fileloc": "/opt/airflow/dags/dag.py",
  "is_paused": false,
  "is_subdag": false,
  "owners": [
    "CapstoneProject"
  ],
  "root_dag_id": null,
  "schedule_interval": {
    "__type": "CronExpression",
    "value": "@once"
  },
  "tags": []
}


<br><br> 
2) If dag runs are paused make sure to unpause

In [15]:
!curl -X PATCH -d '{"is_paused": false}' 'http://localhost:8080/api/v1/dags/Spark_S3_ETL?update_mask=is_paused' -H 'content-type: application/json' --user "airflow:airflow"

{
  "dag_id": "Spark_S3_ETL",
  "description": "Load and transform data S3 using spark and airflow",
  "file_token": "Ii9vcHQvYWlyZmxvdy9kYWdzL2RhZy5weSI.iCswl9I_DJedNikdzjNIehRDrjs",
  "fileloc": "/opt/airflow/dags/dag.py",
  "is_paused": false,
  "is_subdag": false,
  "owners": [
    "CapstoneProject"
  ],
  "root_dag_id": null,
  "schedule_interval": {
    "__type": "CronExpression",
    "value": "@once"
  },
  "tags": []
}


<br><br>
3) Intiate the dag run by the execution date

In [27]:
!curl -X POST -d '{"execution_date": "2021-07-09T13:21:00Z", "conf": {}}' 'http://localhost:8080/api/v1/dags/Spark_S3_ETL/dagRuns' -H 'content-type: application/json' --user "airflow:airflow"

{
  "conf": {},
  "dag_id": "Spark_S3_ETL",
  "dag_run_id": "manual__2021-07-09T13:21:00+00:00",
  "end_date": null,
  "execution_date": "2021-07-09T13:21:00+00:00",
  "external_trigger": true,
  "start_date": "2021-07-09T20:21:39.730742+00:00",
  "state": "running"
}


<br><br>
4) Check the status of dag execution to ensure it was a success and validate the data in s3.

In [28]:
!curl 'http://localhost:8080/api/v1/dags/Spark_S3_ETL/dagRuns/manual__2021-07-09T13:21:00+00:00' -H 'content-type: application/json' --user "airflow:airflow"

{
  "conf": {},
  "dag_id": "Spark_S3_ETL",
  "dag_run_id": "manual__2021-07-09T13:21:00+00:00",
  "end_date": "2021-07-09T20:21:48.011080+00:00",
  "execution_date": "2021-07-09T13:21:00+00:00",
  "external_trigger": true,
  "start_date": "2021-07-09T20:21:39.730742+00:00",
  "state": "failed"
}


<br><br>
Stop the docker down when the job is finished

In [30]:
!docker-compose down

Stopping docker_airflow-webserver_1 ... 
Stopping docker_airflow-scheduler_1 ... 
Stopping docker_airflow-worker_1    ... 
Stopping docker_flower_1            ... 
Stopping docker_redis_1             ... 
Stopping docker_postgres_1          ... 
[2Bping docker_redis_1             ... [32mdone[0m[4A[2K[5A[2K[6A[2K[1A[2K[2A[2KRemoving docker_airflow-webserver_1 ... 
Removing docker_airflow-scheduler_1 ... 
Removing docker_airflow-worker_1    ... 
Removing docker_airflow-init_1      ... 
Removing docker_flower_1            ... 
Removing docker_redis_1             ... 
Removing docker_postgres_1          ... 
[7BRemoving network docker_default ... [32mdone[0m[3A[2K[5A[2K[4A[2K


#### 4.2 Data Quality Checks

Following data quality checks are performed after the ETL pipe line is finished. The step has run manually here to show the output however it is part of data pipeline and is orchestrated by APACHE AIRFLOW. Please check the pipeline image in previous section for more information.

STEPS:
- Data Validation 1:
    - A check is performed on all the relevent dimensions to ensure the referential integrity. To make sure respective FK(s) in fact table are valid and are present in dimension table.
- Data Validation 2:
    - Check on primary keys to ensure there are no nulls.
- Data Validation 3:
    - Dimension table are queried to make sure the count after load is not 0.

SCRIPTS INVOLVED:
- validator.py : driver script that invokes data_validator.py
- data_validator.py : script that invokes spark.py and load_schema.py to create spark session and read schema respectively. Also run all the checks mentioned above
- load_schema.py: script that reads the data from s3

In [7]:
%run CapstoneProject/validator.py

Data Integrity Validation Completed : visa_dim
Data Integrity Validation Completed : mode_dim
Data Integrity Validation Completed : country_dim
Data Integrity Validation Completed : ports and airports dim
Data Integrity Validation Completed : states_dim
Not Null Validation Passed : visa_dim
Not Null Validation Passed : mode_dim
Not Null Validation Passed : country_dim
Not Null Validation Passed : ports_dim
Not Null Validation Passed : airports_dim
Not Null Validation Passed : states_dim
Not Null Validation Passed : non_imm_dim
Not Null Validation Passed : adm_num
Check Rows Validation Passed : visa_dim
Check Rows Validation Passed : mode_dim
Check Rows Validation Passed : country_dim
Check Rows Validation Passed : ports_dim
Check Rows Validation Passed : airports_dim
Check Rows Validation Passed : states_dim
Check Rows Validation Passed : non_imm_dim


#### 4.3 Data dictionary 
**non_imm_dim**: Dimension table that stored immigrant’s information<br>
*cicid* : client id<br>
*birth_year* : year of birth<br>
*age_in_years* : age of immigrant in years<br>
*gender* : gender of immigrant<br>
*occup* : occupation of immigrant<br>
*country_of_res* : Immigrant’s country of residence<br>

**date_dim**: Date dimension that would help with date wise reporting<br>
*date* : date of arr/dep<br>
*year* : year of arr/dep<br>
*month* : month of arr/dep<br>
*day* : day of arr/dep<br>
*datekey* : date in integer format<br>
*quarter* : quarter of arr/dep<br>
*day_of_month* : day of month of arr/dep<br>
*day_of_week* : day of week of arr/dep<br>
*week_of_year* : week of the year of arr/dep<br>

**mode_dim** : This dimension contains mode of travel along with the code<br>
*code* : number associated to mode of travel<br>
*mode* : mode description<br>

**states_dim** : Contains information related to states<br>
*state_code* : unique code associated to state<br>
*state_name* : name of the state<br>
*city* :  city name <br>
*total_city_pop* : Population of city<br>


**airports_dim** : This dimension contains information related to airports<br>
*ident* : Unique code to identify airport<br>
*type* : size of airport<br>
*name* : name of the airport<br>
*elevation_ft* : elevation in feet<br>
*iso_county* : country (ISO-2)<br>
*iso_region* : region (ISO-2)<br>
*municipality* :  municipality<br>
*gps_code* : gps code<br>
*iata_code* : location identifier<br>
*local_code* : local code for the airport<br>
*latitude* : latitude<br>
*longitude* : longitude<br>

**country_dim** : This dimension contains list of countries<br>
*code* : unique for the country<br>
*name* : name of country<br>

**ports_dim** : This dimension contains port related information, to identify port of entry<br>
*code* : unique code associated to port<br>
*ports* : port name<br>

**visa_dim** : Contains information related to visa<br>
*code* :  Unique visa code<br>
*type* : Type of visa<br>
*desc* : Visa desc<br>

**immigration_fact** : This table contains the entry and exit records<br>
*adm_num* : admission record number<br>
*cicid* : client id<br>
*country_of_origin* : country of origin of non-immigrant<br>
*arr_port* : arrival port of immigrant<br>
*arr_mode_code* : arrival mode <br>
*arr_state_code* : state where immigrant arrived<br>
*arr_date* : date of arrival<br>
*dep_date* : departure date<br>
*visa_code* : visa that was issued to immigrant<br>
*visatype* : visa type<br>
*visa_issuing_state* : Visa issues state<br>
*airline* : airline used for travel<br>
*flt_no* : flight number <br>
*date_added* : date record was added in system<br>

#### Step 5: Project Write Up
<!-- * Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people. -->

1) The scope of this project is to have a simple ETL pipeline which reads the data from data files transform and create star schema in s3. This schema can be accessed by pandas or spark or other technologies to be processed further for analytics.The data stored in s3 is structured, this pipeline can be also be modifed to stored semi structure data and to have raw, semi structured and strutured data to create data lakes. It's versatile due to the technologies used in the project.

    - **SPARK** : Since we are dealing with huge data here and reading the data from data files, spark was a clear choice here as it support wide range of data formats and also the in memory processing is much faster. With pyspark it's easy to use dataframe APIs for processing large chunk of structured or semi-structured data.
    - **AWS EMR** : With spark we just need the cluster for it's processing power and for that EMR was an easy choice. EMR clusters are easy to setup, can scale up and down depending on the workload and it has very good integration with APACHE AIRFLOW which is used as orchestrator for this project.
    - **AWS S3** : S3 is used for creating schema on read. It's cheap storage, easy to setup and easily accessible from EMR clusters.
    - **APACHE AIRFLOW** : It is an open source tool for orchestrating complex data pipelines. It already has wide range of operators and apis that minimize the need of coding or creating custom operator. In this project we used s3 hooks and EMR operators. since python is the main lauguage used to create pipeline and apache airflow is written in python it was easy to integrate with this project. It also has CLI and API support to trigger the pipeline programatically.

2) The data should be updated on **montly basis**. While analyzing the data I felt that montly load would make more sense then daily. The data size would be manageable and it's not business critical and I am assuming that it would be used mostly to check monthly, quatertly or yearly metric. Hence, it makes more sense to have the load when complete montly's data arrive on 1st of every month at midnight. We can use Apache Airflow to ingest the data. Also the fact table is paritioned by year and month, it would be easy to create new parition for each month.

3) Spark is used to populate s3 schema, which make it easier to deal with the large datasets even when the data is **increased by 100x**. The EMR clusters can scale up and down depending on the workload provided autoscale is configured.

    Spark also makes it easy and efficient to read the schema from s3 to perform any further analysis on data. The Data can be read and loaded into dataframes, can be used to generate the intermediate data cubes or it can be further transformed and loaded into RDS.  



4) With conventional databases as the traffic grows, bandwidth utilization increases hence this add the need to upgrade database instances to instance types with more bandwidth. This adds downtime and other setbacks that can hurt the business if data is business critical.<br>
    **S3 is used as database / storage** option for final dataset which is both **scalable and distributed** can easily achieve thousands of transactions per second in request performance when uploading and retrieving the data from storage. Data stored in S3 are widely accessible through RESTful APIs. It can serve as a central static asset repository that knows to partition storage automatically to increase performance as the number of requests increases over time. <br>
    Hence, the data needed to be accessed by 100+ people won't be a problem with s3 object.


5) Airflow is used as an orchestrator and jobs can be **scheduled** in airflow to run the pipeline whenever needed. The dag for this pipeline is not scheduled and is externally triggered. However, using Airflow it can be sheduled to run once a @month at midnight of 1st day for the month. It would require following changes in DAG :
>      with DAG('Spark_S3_ETL', 
>          default_args=default_args, 
>          description='Load and transform data S3 using spark and airflow', 
>          schedule_interval='@montly' 
>        ) as dag:

<br>

#### Step 6: Query on final dataset
Here are examples of some of the use cases we mentioned in first section. This model has 7 dimensions visa, mode, country, ports, airports, states, date, non_imm. We can used these descriptive attributes to generate multiple metrics with the help of data in fact table. Here are few exmaples of information retrived from final dataset. 

In [134]:
visa_dim = spark.read.parquet("s3a://capstoneprojectsource/immigration_info/visa_dim.parquet")
mode_dim =  spark.read.parquet("s3a://capstoneprojectsource/immigration_info/mode_dim.parquet")
ports_dim = spark.read.parquet("s3a://capstoneprojectsource/immigration_info/ports_dim.parquet")
airports_dim = spark.read.parquet("s3a://capstoneprojectsource/immigration_info/airports_dim.parquet")
date_dim = spark.read.parquet("s3a://capstoneprojectsource/immigration_info/date_dim.parquet")
immigration_fact = spark.read.parquet("s3a://capstoneprojectsource/immigration_info/immigration_fact.parquet")

<br>

##### 1) How many visas were issued overall or over a period of time? In the month of April, 2016 (dim: visa, dim: date, fact: immigration records)

##### a) No. of visa issued - visa category wise


In [184]:
df_visa = (
    immigration_fact
    .join(visa_dim, immigration_fact.visa_code==visa_dim.code, how="leftsemi")
    .join(date_dim, immigration_fact.arr_date == date_dim.date, how="leftsemi")
    .select(immigration_fact.columns)
    .where((immigration_fact.month==4) & (immigration_fact.year==2016))
)

df_visa_cat = df_visa.groupBy("visa_code").agg(F.count("*").alias("count"))
df_visa_cat.show()

+---------+-------+
|visa_code|  count|
+---------+-------+
|        1| 430501|
|        3|  35535|
|        2|2085134|
+---------+-------+



##### b) Total no. of visas issued 


In [142]:
df_visa_cat.select("count").groupBy().sum().show()

[Row(sum(count)=2551170)]

<br>

##### 2) What mode immigrants used the most to travel ? (dim: mode_dim, fact: immigration_fact)


In [196]:
df_mode = (
    immigration_fact
    .join(mode_dim, immigration_fact["arr_mode_code"] == mode_dim["code"])
    .select(immigration_fact.arr_mode_code, mode_dim.mode)
    .groupBy(mode_dim.mode)
    .agg(F.count("arr_mode_code").alias("No_of_entries"))
    .orderBy(F.col("No_of_entries").desc())
)

df_mode.show()

print("Most Frequest mode of Travel")
df_mode.show(1)

+------------+-------------+
|        mode|No_of_entries|
+------------+-------------+
|         Air|      2497553|
|        Land|        42370|
|         Sea|         6045|
|Not reported|         5202|
+------------+-------------+

Most Frequest mode of Travel
+----+-------------+
|mode|No_of_entries|
+----+-------------+
| Air|      2497553|
+----+-------------+
only showing top 1 row



<br>

##### 3) Which port received most no. of non-immigrants during a period of time? (dim: state, dim: airport, dim:date, fact immigration records)
Data for 14th week of year (in April, 2016)

In [197]:
all_ports = (ports_dim
             .select(F.col("code"), F.trim(F.col("ports")).alias("name"))
             .unionAll(airports_dim.select(F.col("local_code").alias("code"), F.col("name")))
            ).distinct()
df_ports = (
    immigration_fact
    .join(all_ports, immigration_fact["arr_port"] == all_ports["code"])
    .join(date_dim, immigration_fact["arr_date"] == date_dim["date"])
    .select(all_ports.name.alias("port_name"), date_dim.week_of_year)
    .where(date_dim.week_of_year == 14)
    .groupBy("port_name")
    .agg(F.count("port_name").alias("no_of_entries"))
    .orderBy(F.col("no_of_entries").desc())
)

print("Port with most no. of entries in the 14th week of year")
df_ports.show(1)

Port with most no. of entries in the 14th week of year
+------------+-------------+
|   port_name|no_of_entries|
+------------+-------------+
|NEW YORK, NY|        96527|
+------------+-------------+
only showing top 1 row

