# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project is creating a Data Lake ETL pipeline to process, clean, and store data related to US I94 immigration data. The data can be used to analyze the immigration flow in the USA.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import re
import os
import glob
import configparser
from datetime import datetime, timedelta, date
from pyspark.sql import SparkSession
from pyspark.sql import types as t
import pyspark.sql.functions as func
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['aws_key']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['aws_secret']

output_data = config['AWS']['out_data']

input_I94 = config['INPUT_DATA']['i94_data']
input_airport = config['INPUT_DATA']['airport_data']
input_country = config['INPUT_DATA']['country_code']

### Step 1: Scope the Project and Gather Data

#### Scope 
The Scope of this project is create a Datalake solution, with a ETL pipeline to process, clean and store data related with the I94-immigration data, world weather data and airport codes.

The purpose of the pipeline is process and store the data in a parquet file and a star schema

Tools: 
* pandas
* python
* pyspark to handle the big amount of data
* AWS S3 to store the files

#### Describe and Gather Data 

The I94 Immigration dataset must be bought. You can order it with this order form: https://travel.trade.gov/research/cat/doc_order_form.pdf 

##### The I94 immigration data set comes from https://travel.trade.gov/research/reports/i94/historical/2016.html
* It contains immigration data for the year 2016 from jan to dez.
* This dataset has about 3 million rows per file
* 28 columns (arrive date, visatypes, airport, gender ...)

##### The I94_country data set comes also from https://travel.trade.gov/research/reports/i94/historical/2016.html
* It contains the country codes from the I94 file
* This dataset has 289 rows
* 2 Columns (I94_code and I94_country)
  * This data I copied manually from the I94 description file, which comes with the I94 dataset
  
##### The I94_port dataset comes also from 
* It contains the arrive ports from the I94 file
* This dataset has rows
* 3 Columns (port_code, city, state\country)
  * This data I copied manually from the I94 description file, which comes with the I94 dataset

##### The Airport dataset comes from https://datahub.io/core/airport-codes#data
* It contains airport data from around the world
* This dataset has about 55 tsd rows.
* 12 Columns (airporttype, Name, country, Region ...)


### Read in the date

In [3]:
# build or create a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [4]:
# Read in the I94 data
df_i94 = spark.read.format('csv').option('header',True).load("immigration_data_sample.csv")
df_i94.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = tru

In [5]:
df_airport = spark.read.format('csv').option('header',True).load('airport-codes_csv.csv')
df_airport.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [7]:
country_spark = spark.read.json("country_code.json",multiLine=True)
country_spark.printSchema()

root
 |-- alpha_2: string (nullable = true)
 |-- alpha_3: string (nullable = true)
 |-- country_name: string (nullable = true)
 |-- numeric: string (nullable = true)



### Step 2: Explore and Assess the Data
#### Explore the Data 
##### Data quality issues:
 1. I94 Data
  * Missing data (null or NaN)
  * DataType errors (solved with new dataschema)
 2. Airport data
  * no action needed cames clean
 3. Country_code
  * no action needed cames clean

#### Cleaning Steps
1. I94 Data
 * Replace all null or NaN data with 0
2. Airport Data
 * No cleaning needed
3. Country_code
  * no action needed cames clean

#### Clean datasets and schow the first two rows

In [17]:
# Fill NaN with 0 for number and N/D for string 
df_i94_clean = df_i94.na.fill({'i94cit':'0', 'i94res':'0', 'i94port':'N/D', 'arrdate':'0', 'i94mode':'9', 'i94addr':'N/D', 'depdate':'0', 'i94bir':'0',\
                'i94visa':'0', 'count':'0', 'dtadfile':'N/D', 'visapost':'N/D', 'occup':'N/D', 'entdepa':'N/D', 'entdepd':'N/D',\
                'entdepu':'N/D', 'matflag':'N/D', 'biryear':'0', 'dtaddto':'0', 'gender':'N/D', 'insnum':'0', 'airline':'N/D',\
                'admnum':'0', 'fltno':'N/D', 'visatype':'N/D'})
df_i94_clean.show(2, truncate=False)

+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|_c0    |cicid    |i94yr |i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto |gender|insnum|airline|admnum       |fltno|visatype|
+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|2027561|4084316.0|2016.0|4.0   |209.0 |209.0 |HHW    |20566.0|1.0    |HI     |20573.0|61.0  |2.0    |1.0  |20160422|N/D     |N/D  |G      |O      |N/D    |M      |1955.0 |07202016|F     |0     |JL     |56582674633.0|00782|WT      |
|2171295|4422636.0|2016.0|4.0   |582.0 |582.0 |MCA    |20567.0|1.0  

In [35]:
# Clean no action needed
df_airport.show(10, truncate=False)

+-----+-------------+----------------------------------+------------+---------+-----------+----------+------------+--------+---------+----------+---------------------------------------+
|ident|type         |name                              |elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates                            |
+-----+-------------+----------------------------------+------------+---------+-----------+----------+------------+--------+---------+----------+---------------------------------------+
|00A  |heliport     |Total Rf Heliport                 |11          |NA       |US         |US-PA     |Bensalem    |00A     |null     |00A       |-74.93360137939453, 40.07080078125     |
|00AA |small_airport|Aero B Ranch Airport              |3435        |NA       |US         |US-KS     |Leoti       |00AA    |null     |00AA      |-101.473911, 38.704022                 |
|00AK |small_airport|Lowell Field                      |450         |N

In [10]:
country_spark.show(2, truncate=False)

+-------+-------+------------+-------+
|alpha_2|alpha_3|country_name|numeric|
+-------+-------+------------+-------+
|AF     |AFG    |Afghanistan |004    |
|AL     |ALB    |Albania     |008    |
+-------+-------+------------+-------+
only showing top 2 rows



In [11]:
# create timestamp column from original column
@udf(t.TimestampType())
def get_timestamp (date):
    arr_float = float(date)
    return (datetime(1960,1,1) + timedelta(days=int(arr_float)))

In [18]:
df_i94_clean = df_i94_clean.withColumn('arr_date', get_timestamp(df_i94_clean.arrdate))
df_i94_clean = df_i94_clean.withColumn('dep_date', get_timestamp(df_i94_clean.depdate))

#### Write stagging tables to AWS S3 in parquet format

In [19]:
# write all datasets to .parquet files to my AWS S3 bucket
df_i94_clean.write.save(output_data + "stagging_i94.parquet", format= "parquet", mode= "overwrite")
df_airport.write.save(output_data + "stagging_airport.parquet", format= "parquet", mode= "overwrite")
country_spark.write.save(output_data + "stagging_country.parquet", format="parquet", mode= "overwrite")

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
I94 Immigration data models is a star models of four Dimension-tables and one Fact-table:

* Dimensions tables:
 * Admission table
 * Country table
 * Airports table
 * Time table
* Fact table:
 * Immigration table
![Stagging Tables](Stagging_tables.png),


![Table Schema](I94_schema.png)
#### 3.2 Mapping Out Data Pipelines
1. Set the input, output_data path and the AWS data in the `dl.cfg`.
2. The script take the raw data into spark dataframe clean it and write to parquet stagging files
3. Read in the stagging files with spark and extract the needed columns for the analytic tables
4. Write it table files back as parquet files

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [33]:
# read in the clean datasets
df_i94 = spark.read.format('parquet').load(output_data + "stagging_i94.parquet")
df_airport = spark.read.format('parquet').load(output_data + "stagging_airport.parquet")
df_country = spark.read.format('parquet').load(output_data + "stagging_country.parquet")

# add arr_date to the i94_stagging table

# extract columns to create time table
time_table = df_i94.select("arr_date",
                          dayofmonth('arr_date').alias('day'),
                          weekofyear("arr_date").alias("week"),
                          date_format("arr_date", "u").alias("weekday"),
                          month("arr_date").alias("month"),
                          year("arr_date").alias("year")
                          )

time_table= time_table.write.partitionBy("year","month").format('parquet').save(output_data+"time_table.parquet", mode = 'overwrite')

# extract columns for the airport table
airport_table = df_airport.select(['type','name','iso_country','municipality', 'iata_code'])

# write airport table to parquet file
airport_table.write.format('parquet').save(output_data + 'airport_table.parquet', mode= 'overwrite')

# extract columns for admission table and rename some of them
admission_table = df_i94.select(['admnum','cicid', 'dep_date', 'gender', 'biryear', 'i94mode', 'i94visa','visatype', 'visapost'])\
                    .withColumnRenamed('dep_date','departure_date')\
                    .withColumnRenamed('biryear', 'birth_year')\
                    .withColumnRenamed('i94mode', 'mode')\
                    .withColumnRenamed('i94visa', 'visa_code')

# write admission table to parquet file
admission_table.write.format("parquet").save(output_data + 'admission_table.parquet', mode= 'overwrite')

# extract columns for country table and rename some of them
country_table = df_country.select(['country_name','alpha_2','alpha_3','numeric'])\
                    .withColumnRenamed('alpha_2', 'two_letter_code')\
                    .withColumnRenamed('alpha_3', 'three_letter_code')\
                    .withColumnRenamed('numeric', 'three_digit_code')

# write counrty table to parquet file
country_table.write.format('parquet').save(output_data + 'country_table.parquet', mode='overwrite')

# extract columns for immigration table and rename some of them
immigration_table = df_i94.select(
         monotonically_increasing_id().alias('id'),
         'arr_date',\
         'dep_date',\
         'i94yr', \
         'i94mon',\
         'i94port',\
         'i94cit',\
         'i94mode',\
         'admnum',\
         'i94visa',\
         'i94bir')\
         .withColumnRenamed('arr_date','arrival_date')\
         .withColumnRenamed('dep_date','departure_date')\
         .withColumnRenamed('i94yr', 'year')\
         .withColumnRenamed('i94mon', 'month')\
         .withColumnRenamed('i94port', 'airport')\
         .withColumnRenamed('i94cit', 'origin_country')\
         .withColumnRenamed('i94mode', 'mode')\
         .withColumnRenamed('i94visa', 'visa_code')\
         .withColumnRenamed('i94bir', 'age')

# write immigration table to parquet file partitioned by year and month
immigration_table.write.partitionBy('year','month').format('parquet')\
    .save(output_data + 'immigration_table.parquet', mode='overwrite')

In [50]:
# read in time_table for quality check
time_table = spark.read.format('parquet').load(output_data + "time_table.parquet")
time_table.show(2)

+-------------------+---+----+-------+----+-----+
|           arr_date|day|week|weekday|year|month|
+-------------------+---+----+-------+----+-----+
|2016-04-22 00:00:00| 22|  16|      5|2016|    4|
|2016-04-23 00:00:00| 23|  16|      6|2016|    4|
+-------------------+---+----+-------+----+-----+
only showing top 2 rows



#### 4.2 Data Quality Checks

* Unit tests for the scripts to ensure they are doing the right thing
* Source/Count checks to ensure completeness

In [22]:
# Quality Check for admission table rows > 0
admission_table.createOrReplaceTempView("admission_table")
admission_table_check = spark.sql("""
    SELECT  COUNT(*)
    FROM admission_table
""")
admission_table_check.show(1)

+--------+
|count(1)|
+--------+
|    1000|
+--------+



In [44]:
admission = spark.sql("""
    SELECT COUNT (*)
    FROM admission_table
    WHERE admnum == ""
    LIMIT 5
    """)
admission.show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [45]:
# Quality Check for country table rows > 0
country_table.createOrReplaceTempView("country_table")
country_table_check = spark.sql("""
        SELECT COUNT(*)
        FROM country_table
        """)
country_table_check.show(1)

+--------+
|count(1)|
+--------+
|       1|
+--------+



In [46]:
country_table = spark.sql("""
    SELECT *
    FROM country_table
    """)
country_table.show()

+--------------------+---------------+-----------------+----------------+
|        country_name|two_letter_code|three_letter_code|three_digit_code|
+--------------------+---------------+-----------------+----------------+
|United States of ...|             US|              USA|             840|
+--------------------+---------------+-----------------+----------------+



In [47]:
# Quality Check for airport table rows > 0
airport_table.createOrReplaceTempView("airport_table")
airport_table_check = spark.sql("""
        SELECT COUNT(*)
        FROM airport_table
        """)
airport_table_check.show(1)

+--------+
|count(1)|
+--------+
|   55075|
+--------+



In [48]:
air_table = spark.sql("""
    SELECT COUNT (*)
    FROM airport_table
    WHERE iso_country == 'US'
    LIMIT 5
    """)
air_table.show()

+--------+
|count(1)|
+--------+
|   22757|
+--------+



In [51]:
# Quality Check for time table rows > 0
time_table.createOrReplaceTempView("time_table")
time_table_check = spark.sql("""
        SELECT COUNT(*)
        FROM time_table
        """)
time_table_check.show(1)

+--------+
|count(1)|
+--------+
|    1000|
+--------+



In [52]:
time = spark.sql("""
    SELECT COUNT (*)
    FROM time_table
    WHERE day == '23'
""")
time.show()

+--------+
|count(1)|
+--------+
|      37|
+--------+



In [53]:
# Quality Check for immigration table rows > 0
immigration_table.createOrReplaceTempView("immigration_table")
immigration_table_check = spark.sql("""
        SELECT COUNT(*)
        FROM immigration_table
        """)
immigration_table_check.show(1)

+--------+
|count(1)|
+--------+
|    1000|
+--------+



In [67]:
immi_table = spark.sql("""
    SELECT COUNT (*)
    FROM immigration_table
    WHERE visa_code == '2.0' AND mode == '3.0'
    """)
immi_table.show()

+--------+
|count(1)|
+--------+
|      23|
+--------+



#### 4.3 Data dictionary 
The Data dictionary you can find as a excel file named Data Dictionary.xlsx 

#### Step 5: Complete Project Write Up

### Why I choice this Tools?
* Pandas: is one of the most used data manipulation library build on python
* Python: is one of the most comun programming language for data analysis
* Pyspark(Spark): spark can handle fast a large amount of data(Big Data)
* AWS S3: on AWS you can store relative cheap a large amount of data in a Cloud

### How often you should update the dataset?
* These data are published monthly, so they should be updated monthly

### What shoud I do with the following problems?
* The dataset was increased by 100x
 * In this case you should change the storage option on AWS (buy more space and power(cpu´s))
* The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * In this case you can implement a DAG that can update the data on a daily base
* The database needed to be accessed by 100+ people.
 * If the date already stored in in a Cloud there should be no problem but you can create also new clusters to access