# Project Title
### Data Engineering Capstone Project

#### Project Summary
The aim of the project to create an ETL pipeline script to map Immigration and Airport data to an star schema data base as a source of truth tables to enable Analysis of data in an optimized manner. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import configparser
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, TimestampType
from datetime import datetime, date, time, timedelta
import glob
from pyspark.sql import SparkSession

ModuleNotFoundError: No module named 'pyspark'

In [None]:
# os.environ['AWS_ACCESS_KEY_ID']='AKIAIK2OP6CMX5ZWKVQA'
# os.environ['AWS_SECRET_ACCESS_KEY']='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'
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config['aws']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['aws']['AWS_SECRET_ACCESS_KEY']

# using two packages, one for reading database storage file sas7bdat, and one for s3 connection
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.0" )\
.enableHiveSupport().getOrCreate()

#### Scope of the Project
We first clean up the Immigration and Airport data. Next, we extract valid us related records by joining them with US State table. Then, we use US Immigration, Us Airport and Us state tables to created dimension tables. Finally, We create fact tables by joining the first two tables. Lastly, We perform quality checks on the data.

#### Data Description
For this project, 3 datasets are used

*I94 Immigration Data* : This data comes from the US National Tourism and Trade Office. and it includes international visitor arrival statistics on select countries, type of visa, mode of transportation, age groups, states visited (first intended address only). source website (https://travel.trade.gov/research/reports/i94/historical/2016.html)

*US State Table* : This table has a list of valid us state names and their abbreviations. source website: (http://worldpopulationreview.com/states/state-abbreviations/ )

*Airport Code Table* : This is a simple table of airport codes and corresponding cities. source website (https://datahub.io/core/airport-codes#data)

The aim of the project to create source of truth tables in order to analyze US immigration and airport data using star schema model. We first clean up the Immigration and Airport data. Next, we extract valid us related records by joining them with US State table. Then, we use US Immigration, Us Airport and Us state tables to created dimension tables. Finally, We create fact tables by joining all three tables.

In [None]:
# Assumption for the project: in this project only immigration file is used, in order to process all avaialble data, simply us immigration_files
# Immigration Data
immigration_files= glob.glob('../../data/18-83510-I94-Data-2016/*.sas7bdat')
immigration_fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

immigration_df = spark.read.format('com.github.saurfang.sas.spark').load(immigration_fname)

In [None]:
state_fname = 'state_abbreviation.csv'
df_state =spark.read.format('csv').option('header', 'True').load(state_fname)

In [None]:
# Airport Code Data
airport_fname = 'airport-codes_csv.csv'
airport_df =spark.read.format('csv').option('header', 'True').load(airport_fname)

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [8]:
# Cleaning immigration data
# converting the spark dataframe column to a list
valid_state_codes = list(df_state.select('code').toPandas()['code'])

# udf function to map invalid column values to other
@udf(StringType())
def validate_state(i94addr):  
    if i94addr in valid_state_codes:
        return i94addr
    return 'other'



@udf(StringType())
def parse_date(arrdate):
    if arrdate:
        return (datetime(1960, 1, 1).date() + timedelta(arrdate)).isoformat()
    return None

# droping any missing values
immigration_df_valid = immigration_df.dropna(how='any', subset=['i94port', 'i94addr'])

# extracting valid state 
immigration_df_valid = immigration_df_valid.withColumn("i94addr", validate_state(immigration_df_valid.i94addr))
# extract arrival_date in standard format
immigration_df_valid = immigration_df_valid.withColumn("arrdate", parse_date(immigration_df_valid.arrdate))

# only keep us related immigration data
immigration_df_valid = immigration_df_valid.filter(immigration_df_valid.i94addr !='other')
immigration_df_valid.show(1)

+-----+------+------+------+------+-------+----------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|      admnum|fltno|visatype|
+-----+------+------+------+------+-------+----------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------------+-----+--------+
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|2016-04-07|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| null|      G|   null|      Y|   null| 1991.0|    D/S|     M|  null|   null|3.73679633E9|00296|      F1|
+-----+------+------+------+------+-------+----------+-------+-------+-------+------+-------+-----+--------+----

In [9]:
# cleaning airport data
airport_df_valid = airport_df.dropna(how="any", subset=['iso_region', 'iso_country', 'local_code'])

# keep US countries only
airport_df_valid = airport_df_valid.filter(airport_df_valid.iso_country=="US")

# extract state abbreviation, e.g. US-FL > FL 
get_state_name = udf(lambda x: x.split('US-')[1])
airport_df_valid = airport_df_valid.withColumn('state', get_state_name(airport_df_valid.iso_region))
# use udf function to filter valid states
airport_df_valid = airport_df_valid.withColumn("state", validate_state(airport_df_valid.state))

# only keep us related airports
us_airport_df = airport_df_valid.filter(airport_df_valid.state !='other')
us_airport_df.show(2)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+-----+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|state|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+-----+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|   PA|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|   KS|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+-----+
only showing top 2 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model && Data dictionary
Star schema is chosen as the data model because it is simple and yet effective. users can write simple queries by joing fact and dimension tables to analyze the data.

Following are the tables of the schema, along with their data dictionaries
* Immigration Dimension table 
    - cicid:  identifier 
    - i94yr:  year
    - i94mon: numeric month
    - i94cit: origin city code
    - i94res: residential code
    - i94port: port code of destination city
    - arrdate: arrival date , Y-m-d format
    - i94mode: travel mode code, i.e. land, flight, etc
    - i94addr: final state destination in us, filtered to only valid states
    - depdate: Departure Date from the USA
    - i94bir:  age
    - i94visa: visa code
    - count:   stat
    - dtadfile: Character Date Field - Date added to I-94 Files
    - visapost: department that issued visa
    - occup: occupation
    - entdepa: Arrival Flag
    - entdepd: Departure Flag
    - entdepu: Update Flag
    - matflag: Match flag
    - biryear: birth year
    - dtaddto: Date to which admitted to U.S.
    - gender:  gender
    - insnum: insurance number
    - airline: airline used
    - admnum: admission number
    - fltno: flight number
    - visatype: Class of admission legally admitting the non-immigrant to temporarily stay in U.S
    
    
* Airport dimension table
    - ident: table identifier
    - type: type of airport
    - name: airport name
    - elevation_ft: elevation in feet
    - continent: continent
    - iso_country: country
    - municipality: city
    - gps_code: gps code
    - iata_code: not sure
    - local_code: port code
    - coordinates: longitute, latitude
    - state: state its located in
    
    
* State dimension table
    - state: state full name
    - abbrev: state abbreviation
    - code: state code
    
    
* Fact table
    - year
    - month
    - arrival_date
    - settled_state
    - airport_code
    - gender
    - airport_state

#### 3.2 Mapping Out Data Pipelines
Here are the steps necessary to pipeline the data into the star schema:

* Immigration Dimension table is created from cleansed dataframes and then saved to a parquet file on s3, partitioned by year,month, and state. ("i94yr", "i94mon", "i94addr")
* Airport Dimension table is created from cleansed dataframes and then saved to a parquet file on s3, partitioned them by state.
* State Dimension table is created from dataframes and then saved to a parquet file on s3.
* Fact table is created from joining the immigration and airport table on state and port and then it is saved to a parquet file on s3, partitioned by year, month, and airport_state.


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [10]:
output_path = "s3a://dend-capstone/data-model/"

In [13]:
# create view & fact table
immigration_df_valid.createOrReplaceTempView("immigration")
us_airport_df.createOrReplaceTempView("airport")
df_state.createOrReplaceTempView("state")

fact_table = spark.sql("""
    SELECT i94yr as year, 
           i94mon as month,
           arrdate as arrival_date,
           i94addr as immigration_state,
           i94port as airport_code,
           gender,
           state as airport_state
    FROM immigration i
    JOIN airport a
    ON i.i94port == a.local_code
""").dropDuplicates()

In [14]:
# write dimmension tables to s3
immigration_df_valid.\
    write.mode("overwrite").\
    parquet(os.path.join(output_path , 'immigration.parquet')).\
    partitionBy("i94yr", "i94mon", "i94addr")

us_airport_df.\
    write.mode("overwrite").\
    parquet(os.path.join(output_path , 'airport.parquet')). \
    partitionBy("state")
    
df_state.\
    write.mode("overwrite").\
    parquet(os.path.join(output_path , 'state.parquet'))

# write fact table to s3
fact_table.\
    write.mode("overwrite").\
    parquet(os.path.join(output_path , 'fact.parquet')).\
    .partitionBy("year", "month", "airport_state")

#### 4.2 Data Quality Checks
 
for quality check, we can verify the data in the fact dataframe. 
we can also verify our writing process to s3, by reading parquet file and performing a count. 

In [16]:
fact_table.createOrReplaceTempView("fact")
arrived_settled_same_state = spark.sql("""
    select * 
    from fact
    where fact.immigration_state == fact.airport_state
""")
arrived_settled_same_state.show(5)

+------+-----+------------+-----------------+------------+------+-------------+
|  year|month|arrival_date|immigration_state|airport_code|gender|airport_state|
+------+-----+------------+-----------------+------------+------+-------------+
|2016.0|  4.0|  2016-04-01|               TX|         DAL|     M|           TX|
|2016.0|  4.0|  2016-04-03|               TX|         HOU|     F|           TX|
|2016.0|  4.0|  2016-04-06|               NY|         HPN|     M|           NY|
|2016.0|  4.0|  2016-04-11|               UT|         SLC|     M|           UT|
|2016.0|  4.0|  2016-04-11|               FL|         OPF|     F|           FL|
+------+-----+------------+-----------------+------------+------+-------------+
only showing top 5 rows



In [15]:
# verify the data in our parquet files
state_parquet = spark.read.parquet(output_path + "state.parquet")
airport_parquet = spark.read.parquet(output_path + "airport.parquet")

print ("Number of records in state table: ", state_parquet.count())
print ("Number of records in airport table: ", airport_parquet.count())

Number of records in state table:  51
Number of records in airport table:  21236


#### Step 5: Complete Project Write Up
* **Clearly state the rationale for the choice of tools and technologies for the project.**
    -  I used Apache Spark to read, clean, transform, and create parquet files. Spark's schema-on-read is a powerful tool that let me do all the transformation without using any additional database. Using spark, I could process the raw data as if I am working on a traditional dtabase
    
    
* **Propose how often the data should be updated and why.**
    - since the organization stores data every month, this etl pipeline can be updated monthly.
    
    
* Write a description of how you would approach the problem differently under the following scenarios:
 * **The data was increased by 100x.**
     - We can use a resource manager that schedule these jobs accross clusters. we can run spark's yarn mode and and increase the number of nodes available on the cluster
 * **The data populates a dashboard that must be updated on a daily basis by 7am every day.**
     - ETL pipeline related to immigration should be scheduled using an orchestration tool like Apache Airflow overnite. Other transformations related to airport and state can be scheduled for yearly update due to minimal update
 
 * **The database needed to be accessed by 100+ people.**
     - We can use a resource manager that schedule these jobs accross clusters. we can run spark's yarn mode and increase the number of nodes available on the cluster