# Mapping Immigration Traffic with Airlines
### Data Engineering Capstone Project

#### Project Summary
In The Project we will try to map Immigration TRaffic and Airlines data sets into a star schema for better analysis.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [20]:
# Import Needed Libraries
import configparser
import os
from datetime import datetime, date, time, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, TimestampType, DateType, IntegerType
from pyspark.sql.functions import udf, col, year, month, dayofmonth, weekofyear, date_format, row_number
from pyspark.sql.window import Window

### Step 1: Scope the Project and Gather Data

#### Scope 
We want to implement an ETL process to create an optimized data model to be effiecient enough to analyze the immigration traffic wiht Airlines using a star schema model.
We will use Spark library to cleansing and transformaing the data sets and put the final results on S3 Bucket as parquet files.  
> you need spark-sas7bdat-2.1.0-s_2.11.jar , parso-2.0.10.jar and Java 1.8

#### Describe and Gather Data 
3 Datasets used in the Project:
1. I94 Immigration Data: This data comes from the US National Tourism and Trade Office. and it includes international visitor arrival statistics on select countries, type of visa, mode of transportation, age groups, states visited (first intended address only). source website (https://travel.trade.gov/research/reports/i94/historical/2016.html)  
2. Airlines Data: This dataset came from kaggel. You can read more about it here (https://www.kaggle.com/open-flights/airline-database).

In [21]:
# Read AWS config 
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config['aws']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['aws']['AWS_SECRET_ACCESS_KEY']

# Read in the data here
spark = SparkSession \
    .builder \
    .appName("immigration Temprature Analysis") \
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.0" )\
    .config("spark.driver.memory", "5g") \
    .config("spark.executor.memory", "5g") \
    .config("spark.files.fetchTimeout","3m") \
    .config("spark.dynamicAllocation.executorIdleTimeout","3m") \
    .enableHiveSupport().getOrCreate()

# Read Airports Data
dfAirlines = spark.read.format('csv').option('header', 'True').load("airlines.csv")

# To make it easy we read one file only
dfImmigration = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat')

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [22]:
# Cleaning Airlines Data

# Drop null values of IATA field
dfAirlines = dfAirlines.dropna(how='any', subset=['IATA'])

#Drop Duplicated IATA codes
dfAirlines = dfAirlines.dropDuplicates(['IATA'])

# Remove space from column name, cast type to Integer
dfAirlines = dfAirlines.withColumnRenamed("Airline ID","Airline_ID")
dfAirlines = dfAirlines.withColumn("Airline_ID", dfAirlines.Airline_ID.cast(IntegerType()))

#Create List of all IATA field codes, to be used as Filter with Immigration Dataset
airline_codes = list(dfAirlines.select('IATA').toPandas()['IATA'])

#show dataset
dfAirlines.show(3)

+----------+--------------------+-----+----+----+----------+-------------+------+
|Airline_ID|                Name|Alias|IATA|ICAO|  Callsign|      Country|Active|
+----------+--------------------+-----+----+----+----------+-------------+------+
|      5172|Open Skies Consul...|   \N|  1L| OSY|OPEN SKIES|United States|     N|
|      5002|           Tiara Air|   \N|  3P| TNM|     TIARA|        Aruba|     Y|
|     11963|         Starline.kz| null|  DZ|  \N|     ALUNK|   Kazakhstan|     Y|
+----------+--------------------+-----+----+----+----------+-------------+------+
only showing top 3 rows



In [23]:
# Cleaning Immigration data

# droping missing values in port, address and airline fields
dfImmigration = dfImmigration.dropna(how='any', subset=['i94port', 'i94addr','airline'])

# udf function to map invalid column values to other
@udf(StringType())
def validate_state(airline):  
    if airline in airline_codes:
        return airline
    return 'other'

#convert sas date integer to readable date 
@udf(DateType())
def parse_date(arrdate):
    if arrdate:
        return (datetime(1960, 1, 1).date() + timedelta(int(arrdate)))
    return None

# extracting valid airline 
dfImmigration = dfImmigration.withColumn("airline", validate_state(dfImmigration.airline))
# extract arrival_date in standard format
dfImmigration = dfImmigration.withColumn("arrdate", parse_date(dfImmigration.arrdate))

#cast columns to integer
dfImmigration = dfImmigration.withColumn("cicid", dfImmigration.cicid.cast(IntegerType()))
dfImmigration = dfImmigration.withColumn("i94yr", dfImmigration.i94yr.cast(IntegerType()))
dfImmigration = dfImmigration.withColumn("i94mon", dfImmigration.i94mon.cast(IntegerType()))

#Show Dataset
dfImmigration.show(3)

+-----+-----+------+------+------+-------+----------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+-----+-----+------+------+------+-------+----------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|   22| 2016|     8| 323.0| 323.0|    NYC|2016-08-01|    1.0|     FL|   null|  23.0|    3.0|  1.0|20160801|     RID| null|      U|   null|   null|   null| 1993.0|     D/S|     M|  null|     EK| 6.451049563E10|  201|      F1|
|   55| 2016|     8| 209.0| 209.0|    AGA|2016-08-01|    1.0|     CA|   null|  41.0|    2.0|  1.0|20

In [24]:
# Create Date Dataframe
dfDate = dfImmigration.select("arrdate").dropDuplicates() \
        .withColumn("year", year(col("arrdate"))).withColumn("month", month(col("arrdate"))) \
        .withColumn("day", dayofmonth(col("arrdate"))).withColumn("week", weekofyear(col("arrdate"))) \
        .withColumn("weekday", date_format(col("arrdate"),'E')) 
dfDate = dfDate.withColumnRenamed("arrdate","arrive_date")

#show Data
dfDate.show(3)

+-----------+----+-----+---+----+-------+
|arrive_date|year|month|day|week|weekday|
+-----------+----+-----+---+----+-------+
| 2016-08-15|2016|    8| 15|  33|    Mon|
| 2016-08-31|2016|    8| 31|  35|    Wed|
| 2016-08-23|2016|    8| 23|  34|    Tue|
+-----------+----+-----+---+----+-------+
only showing top 3 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
We will convert the Dataset to a Star schema as data model, so users can write simple queries by joing fact and dimension tables to analyze the data.

Data model Diagram:
<img src="ERD.png">

#### 3.2 Mapping Out Data Pipelines
There will be four Tables as following:
- Fact Table (ImmigrationAirline): created by joining ImmigrationDim and AirlineDim Table --> saved as parquet file on S3 (partitioned by: Arrive_date and IATA_Code)
- ImmigrationDim Dimension table: created from cleansed Immigration dataframe --> saved to a parquet file on s3, partitioned by ("arrdate","airline")  
- AirlineDim Dimension table:created from cleansed dataframe Airlines --> saved to a parquet file on s3.
- DateDim Dimension table: created from Immigration(arrdate) dataframe column --> saved to a parquet file on s3.  


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [25]:
S3_output_path="s3a://udacity-ende-capstone/data-model/"

In [26]:
# create view & fact table
dfImmigration.createOrReplaceTempView("immigrationDim")
dfAirlines.createOrReplaceTempView("airlineDim")

ImmigrationAirline_fact = spark.sql("""
    SELECT 
           a.IATA,
           i.cicid,
           i.arrdate as arrive_date
    FROM immigrationDim i
    JOIN airlineDim a
    ON i.airline == a.IATA
""").dropDuplicates()

# Add Serial Key
window = Window.orderBy(col('IATA'))
ImmigrationAirline_fact = ImmigrationAirline_fact.withColumn('ImmAirline_key', row_number().over(window))

#Show Fact data
ImmigrationAirline_fact.show(3)

+----+------+-----------+--------------+
|IATA| cicid|arrive_date|ImmAirline_key|
+----+------+-----------+--------------+
|  2B| 13685| 2016-08-01|             1|
|  2D|303516| 2016-08-02|             2|
|  2D|404694| 2016-08-02|             3|
+----+------+-----------+--------------+
only showing top 3 rows



In [None]:
# write dimmension tables to s3
dfImmigration.\
    write.mode("overwrite").\
    parquet(os.path.join(S3_output_path , 'immigrationDim.parquet')).\
    partitionBy("arrdate", "airline")

dfAirlines.\
    write.mode("overwrite").\
    parquet(os.path.join(S3_output_path , 'airlineDim.parquet'))
    
dfDate.\
    write.mode("overwrite").\
    parquet(os.path.join(S3_output_path , 'dateDim.parquet'))

# write fact table to s3
ImmigrationAirline_fact.\
    write.mode("overwrite").\
    parquet(os.path.join(S3_output_path , 'ImmigrationAirline.parquet')).\
    partitionBy("arrive_date", "IATA")


#### 4.2 Data Quality Checks
We can make sure that Fact table dataframe already has rows, and then query with dimension table to see if it is mapped correctly.  
We can read the parquet files after writing them to make sure total rows are correct.

In [27]:
# Make sure Dataframes has rows.
fact_total = ImmigrationAirline_fact.count()
if fact_total == 0:
    print("Quality Check::: Your Fact Table has no Rows, Please Check!!!...")

# Check Fact data is mapped Correctly
ImmigrationAirline_fact.createOrReplaceTempView("fact")
fact_airline = spark.sql("""
    select f.IATA,a.IATA as dim_IATA, f.arrive_date
    from fact f
    JOIN airlineDim a
    ON f.IATA == a.IATA
""")
fact_airline.show(5)

+----+--------+-----------+
|IATA|dim_IATA|arrive_date|
+----+--------+-----------+
|  LH|      LH| 2016-08-01|
|  AA|      AA| 2016-08-01|
|  7C|      7C| 2016-08-01|
|  7C|      7C| 2016-08-01|
|  7C|      7C| 2016-08-01|
+----+--------+-----------+
only showing top 5 rows



In [None]:
# verify the data in parquet files (one for example)
immigrationDim_parquet = spark.read.parquet(S3_output_path + "immigrationDim.parquet")
print ("Number of records in state table: ", immigrationDim_parquet.count())


### Step 5: Complete Project Write Up

* Clearly state the rationale for the choice of tools and technologies for the project.  
We used Apache Spark to read, clean, transform, and create parquet files. Spark's schema-on-read is a powerful tool that let you do all the transformation without using any additional database. Using spark, We could process the raw data as if I am working on a traditional dtabase

* Propose how often the data should be updated and why.  
Since the Immigration data generated every month, then the ETL pipeline can be updated monthly.

* Write a description of how you would approach the problem differently under the following scenarios:  
   * The data was increased by 100x.
Probably we will run the job on Spark Cluster Solution like AWS EMR, and scale up the nodes if needed.  
   * The data populates a dashboard that must be updated on a daily basis by 7am every day.  
The ETL process (pipeline) may scheduled to run using orchestration tool like Apache Airflow on daily basis at 7:00 am.
Other related resources Airlines can be scheduled on Yearly basis.
   * The database needed to be accessed by 100+ people.  
We can store data in cluster mode database like Cassandra or Redshift and scle up the nodes when needed.

