# US I94 Immigration Insights 
### Data Engineering Capstone Project

#### Project Summary
This Project creates a Data Lake type of ETL pipeline to process, clean and store data related to US I94 Immigration data. Data can be used to analyse immigration flow to and from US through different airports.  

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Import necessary libraries
import pandas as pd
import re
from pyspark.sql import SparkSession
import os
import glob
import configparser
from datetime import datetime, timedelta, date
from pyspark.sql import types as t
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear

In [2]:
# Read config file
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

# NOTE: Use these if using AWS S3 as a storage
INPUT_DATA = config['AWS']['INPUT_DATA']
OUTPUT_DATA = config['AWS']['OUTPUT_DATA']

# NOTE: Use these if using local storage
INPUT_DATA_LOCAL          = config['LOCAL']['INPUT_DATA_LOCAL']
INPUT_DATA_I94_LOCAL      = config['LOCAL']['INPUT_DATA_I94_LOCAL']
INPUT_DATA_AIRPORT_LOCAL  = config['LOCAL']['INPUT_DATA_AIRPORT_LOCAL']
INPUT_DATA_COUNTRY_LOCAL  = config['LOCAL']['INPUT_DATA_COUNTRY_LOCAL']
INPUT_DATA_AIRPORT_I94_LOCAL  = config['LOCAL']['INPUT_DATA_AIRPORT_I94_LOCAL']
INPUT_DATA_COUNTRY_I94_LOCAL  = config['LOCAL']['INPUT_DATA_COUNTRY_I94_LOCAL']
OUTPUT_DATA_LOCAL         = config['LOCAL']['OUTPUT_DATA_LOCAL']

# NOTE: Use these when storing data on server.
INPUT_DATA_I94_SERVER     = config['SERVER']['INPUT_DATA_I94_SERVER']
INPUT_DATA_AIRPORT_SERVER = config['SERVER']['INPUT_DATA_AIRPORT_SERVER']
INPUT_DATA_COUNTRY_SERVER = config['SERVER']['INPUT_DATA_COUNTRY_SERVER']
OUTPUT_DATA_SERVER        = config['SERVER']['OUTPUT_DATA_SERVER']

# NOTE: Use these when storing data on AWS.
INPUT_DATA_I94            = config['AWS']['INPUT_DATA_I94']
INPUT_DATA_AIRPORT        = config['AWS']['INPUT_DATA_AIRPORT']
INPUT_DATA_COUNTRY        = config['AWS']['INPUT_DATA_COUNTRY']
OUTPUT_DATA               = config['AWS']['OUTPUT_DATA']

DATA_LOCATION             = config['COMMON']['DATA_LOCATION']
DATA_STORAGE              = config['COMMON']['DATA_STORAGE']

#print(AWS_ACCESS_KEY_ID)
#print(AWS_SECRET_ACCESS_KEY)

print(INPUT_DATA_LOCAL)
print(INPUT_DATA_I94_LOCAL)
print(INPUT_DATA_AIRPORT_LOCAL)
print(INPUT_DATA_COUNTRY_LOCAL)
print(INPUT_DATA_AIRPORT_I94_LOCAL)
print(INPUT_DATA_COUNTRY_I94_LOCAL)
print(OUTPUT_DATA_LOCAL)
print(DATA_LOCATION)
print(DATA_STORAGE)

data/
data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat
data/airport_codes.csv
data/iso-3166-country-codes.csv
data/i94_airport_codes.xlsx
data/i94_country_codes.xlsx
data/output_data/
local
parquet


### Step 1: Scope the Project and Gather Data

#### Scope 
_Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc_

Scope of the project is to create an ETL pipeline for processing, cleaning and storing data related to US I94 immigration data, and country codes.  

Output of the ETL pipeline: processed data stored in Star schema model to parquet files. 
Tools: python, pandas, pyspark

#### Describe and Gather Data 
_Describe the data sets you're using. Where did it come from? What type of information is included?_

Project's data contains the following pieces:
* **data/18-83510-I94-Data-2016/**: US I94 immigration data from 2016 (Jan-Dec).
    * Source: https://travel.trade.gov/research/reports/i94/historical/2016.html
    * Description: I94_SAS_Labels_Descriptions.txt file contains descriptions for the I94 data.
        * I94 dataset has SAS7BDAT file per each month of the year (e.g. i94_jan16_sub.sas7bdat).
        * Each file contains about 3M rows
        * Data has 28 columns containing information about event date, arriving person, airport, airline, etc.
    * I94 immigration data example:
    * ![I94-immigration-data example](./Udacity-DEND-Project-Capstone-I94ImmigrationData-20190812-2.png)
    * NOTE: This data is behind a pay wall and need to be purchased to get access. Data is available for Udacity DEND course.
    
* **data/i94_airport_codes.xlsx**: Airport codes and related cities defined in I94 data description file.
    * Source: https://travel.trade.gov/research/reports/i94/historical/2016.html
    * Description: I94 Airport codes data contains information about different airports around the world.
        * Columns: i94port, i94_airport_name
        * Data has 660 rows and 2 columns.
    * NOTE: I94 data uses its own codes for airports instead of using standard codes (like IATA). Therefore, I94 airport codes have been taken from I94 data description file and processed for ETL use.  
    * Airport Code example:
    * ![I94-AirportCode-data example](./Udacity-DEND-Project-Capstone-I94AirportCodeData-20190813-4.png)

* **data/i94_country_codes.xlsx**: Country codes defined in US I94 Immigration data description file. 
    * Source: https://travel.trade.gov/research/reports/i94/historical/2016.html
    * Description: I94 Country codes data contains information about countries people come to US from.
        * Columns: i94cit, i94_country_code
        * Data has 289 rows and 2 columns.
    * NOTE: I94 data uses its own codes for countries instead of using ISO-3166 standard codes. Therefore, I94 country codes have been taken from I94 data description file and processed for ETL use.
    * Country Code example:
    * ![CountryCode-data example](./Udacity-DEND-Project-Capstone-I94CountryCodeData-20190813-5.png)    
  
* **data/airport-codes.csv**: Airport codes and related cities.
    * Source: https://datahub.io/core/airport-codes#data
    * Description: Airpot codes data contains information about different airports around the world.
        * Columns: Airport code, name, type, location, etc.
        * Data has 48304 rows and 12 columns.
    * Airport Code example:
    * ![AirportCode-data example](./Udacity-DEND-Project-Capstone-AirportCodeData-20190812-3.png)

* **data/iso-3166-country-codes.json**: World country codes (ISO-3166)
    * Source: https://github.com/lukes/ISO-3166-Countries-with-Regional-Codes
    ISO-3166-1 and ISO-3166-2 Country and Dependent Territories Lists with UN Regional Codes
    * ISO-3166: https://www.iso.org/iso-3166-country-codes.html
    * Country Code example:
    * ![CountryCode-data example](./Udacity-DEND-Project-Capstone-CountryCodesData-20190804-4.png)

### 1.1 Define config and read in data

In [3]:
# Set config
if DATA_LOCATION == "local":
    input_data        = INPUT_DATA_LOCAL
    i94_data          = INPUT_DATA_I94_LOCAL
    airport_codes     = INPUT_DATA_AIRPORT_LOCAL
    country_codes     = INPUT_DATA_COUNTRY_LOCAL
    airport_codes_i94 = INPUT_DATA_AIRPORT_I94_LOCAL
    country_codes_i94 = INPUT_DATA_COUNTRY_I94_LOCAL
    output_data       = OUTPUT_DATA_LOCAL
elif DATA_LOCATION == "server":
    input_data_bucket = INPUT_DATA_SERVER
    i94_data          = INPUT_DATA_I94_SERVER
    airport_codes     = INPUT_DATA_AIRPORT_SERVER
    country_codes     = INPUT_COUNTRY_SERVER
    airport_codes_i94 = INPUT_DATA_AIRPORT_I94_SERVER
    country_codes_i94 = INPUT_DATA_COUNTRY_I94_SERVER
    output_data       = OUTPUT_DATA_SERVER
elif DATA_LOCATION == "aws":
    input_data_bucket = INPUT_DATA
    i94_data          = INPUT_DATA_I94
    airport_codes     = INPUT_DATA_AIRPORT
    country_codes     = INPUT_COUNTRY
    airport_codes_i94 = INPUT_DATA_AIRPORT_I94
    country_codes_i94 = INPUT_DATA_COUNTRY_I94
    output_data       = OUTPUT_DATA
    
if DATA_STORAGE == "postgresql":
    pass
elif DATA_STORAGE == "parquet":
    data_storage      = DATA_STORAGE

In [None]:
# Read I94 immigration data:
#i94_df = pd.read_sas(i94_data, 'sas7bdat', encoding='ISO-8859-1')

In [4]:
# Read airport code data:
airport_codes_df = pd.read_csv(airport_codes, header=0, sep=',')

# Read Global country codes data:
#country_codes_df = pd.read_json(country_codes, orient="records")
country_codes_df = pd.read_csv(country_codes, header=0)

# Read I94 Airport codes data:
airport_codes_i94_df = pd.read_excel(airport_codes_i94, header=0, index_col=0)

# Read I94 Country codes data:
country_codes_i94_df = pd.read_excel(country_codes_i94, header=0, index_col=0)

### 1.2 Show data snippets

In [None]:
# I94 immigration data
#i94_df.head()

In [5]:
airport_codes_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [12]:
country_codes_df.head()

Unnamed: 0,name,alpha-2,alpha-3,country-code,iso_3166-2,region,sub-region,intermediate-region,region-code,sub-region-code,intermediate-region-code
0,Afghanistan,AF,AFG,4,ISO 3166-2:AF,Asia,Southern Asia,,142.0,34.0,
1,Åland Islands,AX,ALA,248,ISO 3166-2:AX,Europe,Northern Europe,,150.0,154.0,
2,Albania,AL,ALB,8,ISO 3166-2:AL,Europe,Southern Europe,,150.0,39.0,
3,Algeria,DZ,DZA,12,ISO 3166-2:DZ,Africa,Northern Africa,,2.0,15.0,
4,American Samoa,AS,ASM,16,ISO 3166-2:AS,Oceania,Polynesia,,9.0,61.0,


In [7]:
airport_codes_i94_df.head()

Unnamed: 0_level_0,i94_airport_name
i94port,Unnamed: 1_level_1
'ALC',"'ALCAN, AK '"
'ANC',"'ANCHORAGE, AK '"
'BAR',"'BAKER AAF - BAKER ISLAND, AK'"
'DAC',"'DALTONS CACHE, AK '"
'PIZ',"'DEW STATION PT LAY DEW, AK'"


In [8]:
country_codes_i94_df.head()

Unnamed: 0_level_0,i94_country_name
i94cit,Unnamed: 1_level_1
582,MEXICO'
236,'AFGHANISTAN'
101,'ALBANIA'
316,'ALGERIA'
102,'ANDORRA'


### 1.3 Create Spark session

In [5]:
#spark = SparkSession.builder\
#                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
#                     .getOrCreate()
#from pyspark.sql import SparkSession
spark = SparkSession.builder\
                    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
                    .enableHiveSupport().getOrCreate()

### 1.4 Read I94 immigration data to Spark

In [6]:
i94_schema = t.StructType([
                            t.StructField("alpha-2", t.StringType(), False),
                            t.StructField("alpha-3", t.StringType(), False),
                            t.StructField("country-code", t.IntegerType(), False),
                            t.StructField("intermediate-region", t.StringType(), False),
                            t.StructField("intermediate-region-code", t.StringType(), False),
                            t.StructField("iso-3166-2", t.StringType(), False),
                            t.StructField("name", t.StringType(), False),
                            t.StructField("region", t.StringType(), True),
                            t.StructField("region-code", t.StringType(), True),
                            t.StructField("sub-region", t.StringType(), True),
                            t.StructField("sub-region-code", t.StringType(), True),
                        ])

In [7]:
i94_df_spark =spark.read.format('com.github.saurfang.sas.spark').load(i94_data)

In [8]:
i94_df_spark.printSchema()
i94_df_spark.show(5, truncate=False)

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

### 1.5 Read Airport code data to Spark

In [9]:
#airport_schema = t.StructType([
#                            t.StructField("dt", t.StringType(), False),
#                            t.StructField("AverageTemperature", t.FloatType(), True),
#                            t.StructField("AverageTemperatureUncertainty", t.FloatType(), True),
#                            t.StructField("City", t.StringType(), False),
#                            t.StructField("Country", t.StringType(), False),
#                            t.StructField("Latitude", t.StringType(), False),
#                            t.StructField("Longitude", t.StringType(), False),
#                        ])
airport_codes_iata_df_spark = spark.read.csv(airport_codes, header=True)

In [10]:
airport_codes_iata_df_spark.printSchema()
airport_codes_iata_df_spark.show(5, truncate=False)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

+-----+-------------+----------------------------------+------------+---------+-----------+----------+------------+--------+---------+----------+-------------------------------------+
|ident|type         |name                              |elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates                          |
+-----+-------------+----------------------------------+------------+---------+-----------+----------+------------+--------+---------+---

### 1.6 Read ISO Country Code data to Spark

In [45]:
country_code_schema = t.StructType([
                            t.StructField("name", t.StringType(), False),
                            t.StructField("alpha_2", t.StringType(), False),
                            t.StructField("alpha_3", t.StringType(), False),
                            t.StructField("country_code", t.StringType(), False),
                            t.StructField("iso_3166_2", t.StringType(), False),
                            t.StructField("region", t.StringType(), True),
                            t.StructField("sub_region", t.StringType(), True),
                            t.StructField("intermediate_region", t.StringType(), False),
                            t.StructField("region_code", t.StringType(), True),
                            t.StructField("sub_region_code", t.StringType(), True),
                            t.StructField("intermediate_region_code", t.StringType(), False)
                        ])
country_codes_iso_df_spark = spark.createDataFrame(country_codes_df, schema=country_code_schema)

In [46]:
country_codes_iso_df_spark.printSchema()
country_codes_iso_df_spark.show(5, truncate=False)

root
 |-- name: string (nullable = false)
 |-- alpha_2: string (nullable = false)
 |-- alpha_3: string (nullable = false)
 |-- country_code: string (nullable = false)
 |-- iso_3166_2: string (nullable = false)
 |-- region: string (nullable = true)
 |-- sub_region: string (nullable = true)
 |-- intermediate_region: string (nullable = false)
 |-- region_code: string (nullable = true)
 |-- sub_region_code: string (nullable = true)
 |-- intermediate_region_code: string (nullable = false)

+--------------+-------+-------+------------+-------------+-------+---------------+-------------------+-----------+---------------+------------------------+
|name          |alpha_2|alpha_3|country_code|iso_3166_2   |region |sub_region     |intermediate_region|region_code|sub_region_code|intermediate_region_code|
+--------------+-------+-------+------------+-------------+-------+---------------+-------------------+-----------+---------------+------------------------+
|Afghanistan   |AF     |AFG    |4      

### 1.7 Read I94 Airport code data to Spark

In [13]:
# Cleaning I94 Airport data first
ac = {"i94port_clean": [], "i94_airport_name_clean": [], "i94_state_clean": []}
codes = []
names = []
states = []
for index, row in airport_codes_i94_df.iterrows():
    y = re.sub("'", "", index)
    x = re.sub("'", "", row[0])
    z = re.sub("'", "", row[0]).split(",")
    y = y.strip()
    z[0] = z[0].strip()
    
    if len(z) == 2:
        codes.append(y)
        names.append(z[0])
        z[1] = z[1].strip()
        states.append(z[1])
    else:
        codes.append(y)
        names.append(z[0])
        states.append("NaN")

ac["i94port_clean"] = codes
ac["i94_airport_name_clean"] = names
ac["i94_state_clean"] = states

airport_codes_i94_df_clean = pd.DataFrame.from_dict(ac)

ac_path = output_data + "/airport_codes_i94_clean.csv"
airport_codes_i94_df_clean.to_csv(ac_path, sep=',')

In [14]:
airport_codes_i94_schema = t.StructType([
                            t.StructField("i94_port", t.StringType(), False),
                            t.StructField("i94_airport_name", t.StringType(), False),
                            t.StructField("i94_airport_state", t.StringType(), False)
                        ])
airport_codes_i94_df_spark = spark.createDataFrame(airport_codes_i94_df_clean, schema=airport_codes_i94_schema)

In [15]:
airport_codes_i94_df_spark.printSchema()
airport_codes_i94_df_spark.show(5, truncate=False)

root
 |-- i94_port: string (nullable = false)
 |-- i94_airport_name: string (nullable = false)
 |-- i94_airport_state: string (nullable = false)

+--------+------------------------+-----------------+
|i94_port|i94_airport_name        |i94_airport_state|
+--------+------------------------+-----------------+
|ALC     |ALCAN                   |AK               |
|ANC     |ANCHORAGE               |AK               |
|BAR     |BAKER AAF - BAKER ISLAND|AK               |
|DAC     |DALTONS CACHE           |AK               |
|PIZ     |DEW STATION PT LAY DEW  |AK               |
+--------+------------------------+-----------------+
only showing top 5 rows



### 1.8 Read I94 Country Code data to Spark

In [53]:
#country_codes_i94_df.count()

In [16]:
# Cleaning I94 Country Code data first
cc = {"i94cit_clean": [],
      "i94_country_name_clean": [],
      "iso_country_code_clean" : []
      }
ccodes = []
cnames = []
ccodes_iso = []

for index, row in country_codes_i94_df.iterrows():
    cname = re.sub("'", "", row[0]).strip()
    ccode_iso = row[1]
    ccodes.append(index)
    cnames.append(cname)
    ccodes_iso.append(ccode_iso)

cc["i94cit_clean"] = ccodes
cc["i94_country_name_clean"] = cnames
cc["iso_country_code_clean"] = ccodes_iso
country_codes_i94_df_clean = pd.DataFrame.from_dict(cc)

cc_path = input_data + "/country_codes_i94_clean.csv"
country_codes_i94_df_clean.to_csv(cc_path, sep=',')

In [17]:
country_codes_i94_schema = t.StructType([
                            t.StructField("i94_cit", t.StringType(), False),
                            t.StructField("i94_country_name", t.StringType(), False),
                            t.StructField("iso_country_code", t.StringType(), False)
                        ])
country_codes_i94_df_spark = spark.createDataFrame(country_codes_i94_df_clean, schema=country_codes_i94_schema)

In [18]:
country_codes_i94_df_spark.printSchema()
country_codes_i94_df_spark.show(5, truncate=False)

root
 |-- i94_cit: string (nullable = false)
 |-- i94_country_name: string (nullable = false)
 |-- iso_country_code: string (nullable = false)

+-------+----------------+----------------+
|i94_cit|i94_country_name|iso_country_code|
+-------+----------------+----------------+
|582    |MEXICO          |484             |
|236    |AFGHANISTAN     |4               |
|101    |ALBANIA         |8               |
|316    |ALGERIA         |12              |
|102    |ANDORRA         |20              |
+-------+----------------+----------------+
only showing top 5 rows



### 1.9 Write Spark DataFrames to parquet files

In [19]:
start_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
print(start_time)

2019-08-21-08-25-59-465542


In [20]:
# Write I94 Immigration data to parquet file:
i94_df_path = output_data + "i94_staging.parquet" + "_" + start_time
print(f"OUTPUT: {i94_df_path}")
i94_df_spark.write.mode("overwrite").parquet(i94_df_path)
print("Writing DONE.")

# Read parquet file back to Spark:
i94_df_spark = spark.read.parquet(i94_df_path)

OUTPUT: data/output_data/i94_staging.parquet_2019-08-21-08-25-59-465542
Writing DONE.


In [42]:
#i94_df_spark.printSchema()
#i94_df_spark.show(5, truncate=False)

In [21]:
# Write I94 Airport data to parquet file:
airport_codes_i94_df_path = output_data + "airport_codes_i94_staging.parquet" + "_" + start_time
print(f"OUTPUT: {airport_codes_i94_df_path}")
airport_codes_i94_df_spark.write.mode("overwrite").parquet(airport_codes_i94_df_path)
print("Writing DONE.")

# Read parquet file back to Spark:
airport_codes_i94_df_spark = spark.read.parquet(airport_codes_i94_df_path)

OUTPUT: data/output_data/airport_codes_i94_staging.parquet_2019-08-21-08-25-59-465542
Writing DONE.


In [50]:
#airport_codes_i94_df_spark.printSchema()
#airport_codes_i94_df_spark.show(5, truncate=False)

In [22]:
# Write i94 Country data to parquet file:
country_codes_i94_df_path = output_data + "country_codes_i94_staging.parquet" + "_" + start_time
print(f"OUTPUT: {country_codes_i94_df_path}")
country_codes_i94_df_spark.write.mode("overwrite").parquet(country_codes_i94_df_path)
print("Writing DONE.")

# Read parquet file back to Spark:
country_codes_i94_df_spark = spark.read.parquet(country_codes_i94_df_path)

OUTPUT: data/output_data/country_codes_i94_staging.parquet_2019-08-21-08-25-59-465542
Writing DONE.


In [45]:
#country_codes_i94_df_spark.printSchema()
#country_codes_i94_df_spark.show(5, truncate=False)

In [23]:
# Write IATA Airport data to parquet file:
airport_codes_iata_df_path = output_data + "airport_codes_iata_staging.parquet" + "_" + start_time
print(f"OUTPUT: {airport_codes_iata_df_path}")
airport_codes_iata_df_spark.write.mode("overwrite").parquet(airport_codes_iata_df_path)
print("Writing DONE.")

# Read parquet file back to Spark:
airport_codes_iata_df_spark = spark.read.parquet(airport_codes_iata_df_path)

OUTPUT: data/output_data/airport_codes_iata_staging.parquet_2019-08-21-08-25-59-465542
Writing DONE.


In [63]:
#airport_codes_iata_df_spark.printSchema()
#airport_codes_iata_df_spark.show(5, truncate=False)

In [47]:
# Write ISO-3166 Country Code data to parquet file:
country_codes_iso_df_path = output_data + "country_codes_iso_staging.parquet" + "_" + start_time
print(f"OUTPUT: {country_codes_iso_df_path}")
country_codes_iso_df_spark.write.mode("overwrite").parquet(country_codes_iso_df_path)
print("Writing DONE.")

# Read parquet file back to Spark:
country_code_iso_df_spark = spark.read.parquet(country_codes_iso_df_path)

OUTPUT: data/output_data/country_codes_iso_staging.parquet_2019-08-21-08-33-50-378418
Writing DONE.


In [56]:
#country_codes_iso_df_spark.printSchema()
#country_codes_iso_df_spark.show(5, truncate=False)

--------------------
### Step 2: Explore and Assess the Data
#### Explore the Data 
_Identify data quality issues, like missing values, duplicate data, etc._

Input data has the following quality issues:
    
* I94 Immigration data: 
    * Most of the columns are missing some info (nulls).
    * All missing info need to be filled-in to avoid errors further in the pipeline. 
    
* I94 Airport data: 
    * Data has quote marks and extra white spaces aftwer copy-paste operation.
    * Original data was cleaned-up already before importing to Spark.
    
* I94 Country code data: 
    * Data has quote marks and extra white spaces aftwer copy-paste operation.
    * Original data was cleaned-up already before importing to Spark.

* ISO3166 Country data:
    * Antarctica (row) is missing data from some columns.

#### Cleaning Steps
_Document steps necessary to clean the data_

Input data needs the following cleaning operations:
* I94 data:
    * All missing (null) data is handled in all columns. 
    * Nulls are replaced with either NA (string), or 0.0 (double).
    
* I94 Airport data: 
    * Remove quote marks and extra white spaces from the data.
* I94 Country Code data: 
    * Remove quote marks and extra white spaces from the data.
* ISO Country Code data:
    * No action required. Antarctica is handled as a special case to avoid duplicate data.

### 2.1 Cleaning the data

#### 2.1.1 Clean I94 Immigration data

In [25]:
# Cleaning i94 data
i94_df_spark_clean = i94_df_spark.na.fill({'i94mode': 0.0, 'i94addr': 'NA','depdate': 0.0, 'i94bir': 'NA', \
                        'i94visa': 0.0, 'count': 0.0, 'dtadfile': 'NA', 'visapost': 'NA', \
                        'occup': 'NA', 'entdepa': 'NA', 'entdepd': 'NA', 'entdepu': 'NA', \
                        'matflag': 'NA', 'biryear': 0.0, 'dtaddto': 'NA', 'gender': 'NA', \
                        'insnum': 'NA', 'airline': 'NA', 'admnum': 0.0, 'fltno': 'NA', 'visatype': 'NA'})
print("Filling NULLs DONE.")

Filling NULLs DONE.


In [20]:
# No cleaning actions. All necessary columns have clean data.
#i94_df_spark_clean.createOrReplaceTempView("immigrants_table_DF")
#immigrants_table_check = spark.sql("""
#    SELECT  cicid, i94yr, i94mon, i94cit, i94res, i94port, arrdate, \
#            i94mode, airline, fltno, depdate, i94bir, i94visa, gender,  \
#            visatype, admnum
#    FROM immigrants_table_DF
#    WHERE   cicid == null OR arrdate == null OR i94port == null \
#            OR fltno == null OR i94mode == null OR admnum == null \
#            OR gender == null OR admnum == null 
#    ORDER BY arrdate
#""")
#immigrants_table_check.printSchema()
#immigrants_table_check.show(20)

In [123]:
#i94_df_spark_clean.show(10, truncate=False)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+---------+----------+-------------------+
|cicid|i94yr |i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto |gender|insnum|airline|admnum      |fltno|visatype|person_id|i94res_str|arrival_ts         |
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+---------+----------+-------------------+
|7.0  |2016.0|1.0   |101.0 |101.0 |BOS    |20465.0|1.0    |MA     |0.0    |20.0  |3.0    |1.0  |NA      |NA      |NA   |T      |NA     |NA     |NA     |1996.0 |D/S     |M     |NA    |LH     |3.46608285E8|424  |F1     

#### 2.1.2 Clean I94 Airport data

In [28]:
# No further cleaning required.
airport_codes_i94_df_clean.head(10)

Unnamed: 0,i94port_clean,i94_airport_name_clean,i94_state_clean
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK
5,DTH,DUTCH HARBOR,AK
6,EGL,EAGLE,AK
7,FRB,FAIRBANKS,AK
8,HOM,HOMER,AK
9,HYD,HYDER,AK


#### 2.1.3 Clean I94 Country Code data

In [27]:
# No further cleaning required.
country_codes_i94_df_clean.head(10)

Unnamed: 0,i94cit_clean,i94_country_name_clean,iso_country_code_clean
0,582,MEXICO,484
1,236,AFGHANISTAN,4
2,101,ALBANIA,8
3,316,ALGERIA,12
4,102,ANDORRA,20
5,324,ANGOLA,24
6,529,ANGUILLA,660
7,518,ANTIGUA-BARBUDA,28
8,687,ARGENTINA,32
9,151,ARMENIA,51


#### 2.1.4 Clean ISO Country Codes data

In [48]:
country_codes_iso_df_spark.printSchema()
country_codes_iso_df_spark.show(5, truncate=False)
country_codes_iso_df_spark.count()

root
 |-- name: string (nullable = false)
 |-- alpha_2: string (nullable = false)
 |-- alpha_3: string (nullable = false)
 |-- country_code: string (nullable = false)
 |-- iso_3166_2: string (nullable = false)
 |-- region: string (nullable = true)
 |-- sub_region: string (nullable = true)
 |-- intermediate_region: string (nullable = false)
 |-- region_code: string (nullable = true)
 |-- sub_region_code: string (nullable = true)
 |-- intermediate_region_code: string (nullable = false)

+--------------+-------+-------+------------+-------------+-------+---------------+-------------------+-----------+---------------+------------------------+
|name          |alpha_2|alpha_3|country_code|iso_3166_2   |region |sub_region     |intermediate_region|region_code|sub_region_code|intermediate_region_code|
+--------------+-------+-------+------------+-------------+-------+---------------+-------------------+-----------+---------------+------------------------+
|Afghanistan   |AF     |AFG    |4      

249

In [53]:
# Cleaning ISO-3166 country code data
country_codes_iso_df_spark_clean = country_codes_iso_df_spark\
                                        .na.fill({  'name': 'NA', \
                                                    'alpha_2': 'NA', \
                                                    'alpha_3': 'NA', \
                                                    'country_code': 0, \
                                                    'iso_3166_2': 'NA', \
                                                    'region': 'NA', \
                                                    'sub_region': 'NA', \
                                                    'intermediate_region': 'NA', \
                                                    'region_code': 'NA', \
                                                    'sub_region_code': 'NA', \
                                                    'intermediate_region_code': 'NA'})
print("Filling NULLs DONE.")

Filling NULLs DONE.


In [55]:
#country_codes_iso_df_spark_clean.show(20)

---------
### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
_Map out the conceptual data model and explain why you chose that model_

I94 Immigration Insights data models is a star models consisting of 4 Dimensions table and 1 Fact table:
  * Dimensions tables:
      * admissions table
      * countries table
      * airports table
      * time table
  * Fact table:
      * immigrations table
      
ERD for the project:
![I94-ImmigrationInsights data schema as ER Diagram](./Udacity-DEND-Project-Capstone-ERD-20190820v11.png)

#### 3.2 Mapping Out Data Pipelines
_List the steps necessary to pipeline the data into the chosen data model_
* First, ETL script reads in configuration settings (dl.cfg). Script also re-orders I94 inout files to process them in right order (Jan => Dec).
* ETL script takes input data (I94 data, I94 country data, I94 airport data, ISO-3166 country data, IATA airport data).
* Raw input data is read into pandas dataframe, and from there to Spark dataframe and stored into parquet staging files.
* Staging parquet files are read back to Spark dataframes and cleaned (when necessary) and some further data is extracted from the original data.
* Each star schema table is processed in order: admissions => countries => airports => time => immigrations
* Finally, data quality checks are run for each table to validate the output (key columns don't have nulls, each table has content). A summary of the quality check is provided and written in console.

-----------
### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [34]:
# Write code here
start_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
print(start_time)

2019-08-21-08-33-50-378418


#### 4.1.1 Create admissions table + write to parquet file

In [35]:
# Create table
#i94_df_spark = i94_df_spark.withColumn("person_id", monotonically_increasing_id())
i94_df_spark_clean.createOrReplaceTempView("admissions_table_DF")
admissions_table = spark.sql("""
    SELECT  DISTINCT admnum   AS admission_nbr,
                     i94res   AS country_code, 
                     i94bir   AS age, 
                     i94visa  AS visa_code, 
                     visatype AS visa_type, 
                     gender   AS person_gender
    FROM admissions_table_DF
    ORDER BY country_code
""")
admissions_table.printSchema()
#admissions_table.show(20)

root
 |-- admission_nbr: double (nullable = false)
 |-- country_code: double (nullable = true)
 |-- age: double (nullable = true)
 |-- visa_code: double (nullable = false)
 |-- visa_type: string (nullable = false)
 |-- person_gender: string (nullable = false)



In [36]:
# Write admissions_table to parquet file:
admissions_table_path = output_data + "admissions_table.parquet" + "_" + start_time
print(f"OUTPUT: {admissions_table_path}")
admissions_table.write.mode("overwrite").parquet(admissions_table_path)
print("Writing DONE.")

# Read parquet file back to Spark:
admissions_table_df = spark.read.parquet(admissions_table_path)

OUTPUT: data/output_data/admissions_table.parquet_2019-08-21-08-33-50-378418
Writing DONE.


#### 4.1.2 Create countries table + write to parquet file

In [42]:
country_codes_i94_df_spark.printSchema()
country_codes_i94_df_spark.show(5)
country_codes_i94_df_spark.count()

root
 |-- i94_cit: string (nullable = true)
 |-- i94_country_name: string (nullable = true)
 |-- iso_country_code: string (nullable = true)

+-------+--------------------+----------------+
|i94_cit|    i94_country_name|iso_country_code|
+-------+--------------------+----------------+
|    527|TURKS AND CAICOS ...|             796|
|    420|              TUVALU|             798|
|    352|              UGANDA|             800|
|    162|             UKRAINE|             804|
|    296|UNITED ARAB EMIRATES|             784|
+-------+--------------------+----------------+
only showing top 5 rows



289

In [56]:
country_codes_iso_df_spark.printSchema()
country_codes_iso_df_spark.show(5)
country_codes_iso_df_spark.count()

root
 |-- name: string (nullable = false)
 |-- alpha_2: string (nullable = false)
 |-- alpha_3: string (nullable = false)
 |-- country_code: string (nullable = false)
 |-- iso_3166_2: string (nullable = false)
 |-- region: string (nullable = true)
 |-- sub_region: string (nullable = true)
 |-- intermediate_region: string (nullable = false)
 |-- region_code: string (nullable = true)
 |-- sub_region_code: string (nullable = true)
 |-- intermediate_region_code: string (nullable = false)

+--------------+-------+-------+------------+-------------+-------+---------------+-------------------+-----------+---------------+------------------------+
|          name|alpha_2|alpha_3|country_code|   iso_3166_2| region|     sub_region|intermediate_region|region_code|sub_region_code|intermediate_region_code|
+--------------+-------+-------+------------+-------------+-------+---------------+-------------------+-----------+---------------+------------------------+
|   Afghanistan|     AF|    AFG|       

249

In [57]:
# Join tables
country_codes_i94_df_spark_joined = country_codes_i94_df_spark\
                                        .join(country_codes_iso_df_spark, \
                                            (country_codes_i94_df_spark.iso_country_code == \
                                                    country_codes_iso_df_spark.country_code))

In [58]:
# Create table
country_codes_i94_df_spark_joined.createOrReplaceTempView("countries_table_DF")
countries_table = spark.sql("""
        SELECT DISTINCT i94_cit          AS country_code,
                        i94_country_name AS country_name,
                        iso_country_code AS iso_ccode,
                        alpha_2          AS iso_alpha_2,
                        alpha_3          AS iso_alpha_3,
                        iso_3166_2       AS iso_3166_2_code,
                        name             AS iso_country_name,
                        region           AS iso_region,
                        sub_region       AS iso_sub_region,
                        region_code      AS iso_region_code,
                        sub_region_code  AS iso_sub_region_code
        FROM countries_table_DF          AS countries
        ORDER BY country_name
    
""")
countries_table.printSchema()
countries_table.show(20)
countries_table.count()

root
 |-- country_code: string (nullable = true)
 |-- country_name: string (nullable = true)
 |-- iso_ccode: string (nullable = true)
 |-- iso_alpha_2: string (nullable = false)
 |-- iso_alpha_3: string (nullable = false)
 |-- iso_3166_2_code: string (nullable = false)
 |-- iso_country_name: string (nullable = false)
 |-- iso_region: string (nullable = true)
 |-- iso_sub_region: string (nullable = true)
 |-- iso_region_code: string (nullable = true)
 |-- iso_sub_region_code: string (nullable = true)

+------------+---------------+---------+-----------+-----------+---------------+-------------------+----------+--------------------+---------------+-------------------+
|country_code|   country_name|iso_ccode|iso_alpha_2|iso_alpha_3|iso_3166_2_code|   iso_country_name|iso_region|      iso_sub_region|iso_region_code|iso_sub_region_code|
+------------+---------------+---------+-----------+-----------+---------------+-------------------+----------+--------------------+---------------+--------

229

In [59]:
# Write countries_table to parquet file:
countries_table_path = output_data + "countries_table.parquet" + "_" + start_time
print(f"OUTPUT: {countries_table_path}")
countries_table.write.mode("overwrite").parquet(countries_table_path)
print("Writing DONE.")

# Read parquet file back to Spark:
countries_table_df = spark.read.parquet(countries_table_path)


OUTPUT: data/output_data/countries_table.parquet_2019-08-21-08-33-50-378418
Writing DONE.


#### 4.1.3 Create airports table + write to parquet file

In [32]:
#airport_codes_i94_df_spark.printSchema()
#airport_codes_i94_df_spark.show(15, truncate=False)

root
 |-- i94port: string (nullable = true)
 |-- i94_airport_name: string (nullable = true)
 |-- i94_airport_state: string (nullable = true)

+-------+--------------------------+-----------------+
|i94port|i94_airport_name          |i94_airport_state|
+-------+--------------------------+-----------------+
|ELM    |REGIONAL ARPT - HORSEHEAD |NY               |
|ROC    |ROCHESTER                 |NY               |
|ROU    |ROUSES POINT              |NY               |
|SWF    |STEWART - ORANGE CNTY     |NY               |
|SYR    |SYRACUSE                  |NY               |
|THO    |THOUSAND ISLAND BRIDGE    |NY               |
|TRO    |TROUT RIVER               |NY               |
|WAT    |WATERTOWN                 |NY               |
|HPN    |WESTCHESTER - WHITE PLAINS|NY               |
|WRB    |WHIRLPOOL BRIDGE          |NY               |
|YOU    |YOUNGSTOWN                |NY               |
|AKR    |AKRON                     |OH               |
|ATB    |ASHTABULA               

In [60]:
# Create table
airport_codes_i94_df_spark.createOrReplaceTempView("airports_table_DF")
airports_table = spark.sql("""
    SELECT DISTINCT  i94_port          AS airport_id, 
                     i94_airport_name  AS airport_name,
                     i94_airport_state AS airport_state
    FROM airports_table_DF             AS airports
    ORDER BY airport_name
""")

airports_table.printSchema()
airports_table.show(20)
airports_table.count()

root
 |-- airport_id: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- airport_state: string (nullable = true)

+----------+--------------------+-------------+
|airport_id|        airport_name|airport_state|
+----------+--------------------+-------------+
|       ABE|            ABERDEEN|           WA|
|       ADS|ADDISON AIRPORT- ...|           TX|
|       AGA|               AGANA|           GU|
|       AGU|           AGUADILLA|           PR|
|       BOI|AIR TERM. (GOWEN ...|           ID|
|       AKR|               AKRON|           OH|
|       CAK|               AKRON|           OH|
|       ALA|          ALAMAGORDO|     NM (BPS)|
|       ALB|              ALBANY|           NY|
|       CHO|ALBEMARLE CHARLOT...|           VA|
|       ABQ|         ALBUQUERQUE|           NM|
|       ABG|              ALBURG|           VT|
|       ABS|      ALBURG SPRINGS|           VT|
|       ALC|               ALCAN|           AK|
|       AXB|      ALEXANDRIA BAY|           NY|

660

In [61]:
# Write airports_table to parquet file:
airports_table_path = output_data + "airports_table.parquet" + "_" + start_time
print(f"OUTPUT: {airports_table_path}")
airports_table.write.mode("overwrite").parquet(airports_table_path)
print("Writing DONE.")

# Read parquet file back to Spark:
airports_table_df = spark.read.parquet(airports_table_path)

OUTPUT: data/output_data/airports_table.parquet_2019-08-21-08-33-50-378418
Writing DONE.


#### 4.1.4 Create time table + write to parquet file

In [62]:
@udf(t.TimestampType())
def get_timestamp (arrdate):
    arrdate_int = int(arrdate)
    return (datetime(1960,1,1) + timedelta(days=arrdate_int))
    
i94_df_spark_clean = i94_df_spark_clean.withColumn("arrival_time", get_timestamp(i94_df_spark.arrdate))
print("New column creation DONE.")

New column creation DONE.


In [180]:
#i94_df_spark_clean.printSchema()

In [63]:
i94_df_spark_clean.createOrReplaceTempView("time_table_DF")
time_table = spark.sql("""
    SELECT DISTINCT  arrival_time           AS arrival_ts, 
                     hour(arrival_time)       AS hour, 
                     day(arrival_time)        AS day, 
                     weekofyear(arrival_time) AS week,
                     month(arrival_time)      AS month,
                     year(arrival_time)       AS year,
                     dayofweek(arrival_time)  AS weekday
    FROM time_table_DF
""")
time_table.printSchema()
#time_table.show(5, truncate=False)

root
 |-- arrival_ts: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [64]:
# Write time_table to parquet file:
time_table_path = output_data + "time_table.parquet" + "_" + start_time
print(f"OUTPUT: {time_table_path}")
time_table.write.mode("append").partitionBy("year", "month")\
                                  .parquet(time_table_path)
print("Writing DONE.")

# Read parquet file back to Spark:
time_table_df = spark.read.parquet(time_table_path)

OUTPUT: data/output_data/time_table.parquet_2019-08-21-08-33-50-378418
Writing DONE.


#### 4.1.5 Create immigrations table + write to parquet file

In [181]:
#i94_df_spark_clean.printSchema()

In [182]:
#countries_table_df.printSchema()

In [None]:
# Create "depdate_ts" column at this phase (before joining tables)
# Create "immigration_id" column at this phase (before joining tables)

In [65]:
i94_df_spark_joined = i94_df_spark_clean.join(country_codes_i94_df_spark, \
                                              (i94_df_spark_clean.i94cit == country_codes_i94_df_spark.i94_cit))\
                                        .join(airport_codes_i94_df_spark, \
                                              (i94_df_spark_clean.i94port == airport_codes_i94_df_spark.i94_port))\
                                        .join(time_table_df, \
                                                            i94_df_spark_clean.arrival_time == \
                                                            time_table_df.arrival_ts)

In [66]:
i94_df_spark_joined.printSchema()
#i94_df_spark_joined.show(5, truncate=False)

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = false)
 |-- i94addr: string (nullable = false)
 |-- depdate: double (nullable = false)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = false)
 |-- count: double (nullable = false)
 |-- dtadfile: string (nullable = false)
 |-- visapost: string (nullable = false)
 |-- occup: string (nullable = false)
 |-- entdepa: string (nullable = false)
 |-- entdepd: string (nullable = false)
 |-- entdepu: string (nullable = false)
 |-- matflag: string (nullable = false)
 |-- biryear: double (nullable = false)
 |-- dtaddto: string (nullable = false)
 |-- gender: string (nullable = false)
 |-- insnum: string (nullable = false)
 |-- airline: string (nullable = false)
 |-- admnum: do

In [67]:
i94_df_spark_joined = i94_df_spark_joined.withColumn("immigration_id", monotonically_increasing_id())
print("New column creation DONE.")

New column creation DONE.


In [183]:
#i94_df_spark_joined.printSchema()
#i94_df_spark_joined.show(5, truncate=False)

In [68]:
#i94_df_spark_joined.createOrReplaceTempView("immigrants_table_DF")
#immigrants_table_check = spark.sql("""
#    SELECT  cicid, i94yr, i94mon, i94cit, i94res, i94port, arrdate, \
#            i94mode, airline, fltno, depdate, i94bir, i94visa, gender,  \
#            visatype, admnum
#    FROM immigrants_table_DF
#    WHERE   cicid == null OR arrdate == null OR i94port == null \
#            OR fltno == null OR i94mode == null OR admnum == null \
#            OR gender == null OR admnum == null OR depdate == null
#    ORDER BY arrdate
#""")
#immigrants_table_check.printSchema()
#immigrants_table_check.show(20)

In [69]:
@udf(t.TimestampType())
def get_timestamp2 (depdate):
    if depdate == "null":
        depdate_int = 0
    else:
        depdate_int = int(depdate)
    return (datetime(1960,1,1) + timedelta(days=depdate_int))

i94_df_spark_joined = i94_df_spark_joined.withColumn("departure_date", get_timestamp2(i94_df_spark_joined.depdate))
print("New column creation DONE.")

New column creation DONE.


In [72]:
#i94_df_spark_joined.printSchema()

In [73]:

i94_df_spark_joined.createOrReplaceTempView("immigrations_table_DF")
immigrations_table = spark.sql("""
    SELECT DISTINCT  immigration_id AS immigration_id, 
                     arrival_time   AS arrival_time,
                     year           AS arrival_year,
                     month          AS arrival_month,
                     i94_port       AS airport_id,
                     i94_cit        AS country_code,
                     admnum         AS admission_nbr,
                     i94mode        AS arrival_mode,
                     departure_date AS departure_date,
                     airline        AS airline,
                     fltno          AS flight_nbr
                    
    FROM immigrations_table_DF immigrants
    ORDER BY arrival_time
""")
immigrations_table.printSchema()

root
 |-- immigration_id: long (nullable = false)
 |-- arrival_time: timestamp (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- airport_id: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- admission_nbr: double (nullable = false)
 |-- arrival_mode: double (nullable = false)
 |-- departure_date: timestamp (nullable = true)
 |-- airline: string (nullable = false)
 |-- flight_nbr: string (nullable = false)



In [74]:
#immigrations_table.show(5, truncate=False)

+--------------+-------------------+------------+-------------+----------+------------+-------------+------------+-------------------+-------+----------+
|immigration_id|arrival_time       |arrival_year|arrival_month|airport_id|country_code|admission_nbr|arrival_mode|departure_date     |airline|flight_nbr|
+--------------+-------------------+------------+-------------+----------+------------+-------------+------------+-------------------+-------+----------+
|11714         |2016-01-01 00:00:00|2016        |1            |SPM       |135         |2.85902085E8 |1.0         |1960-01-01 00:00:00|BA     |243       |
|33049         |2016-01-01 00:00:00|2016        |1            |NYC       |245         |2.85799385E8 |1.0         |2016-03-19 00:00:00|KE     |85        |
|21283         |2016-01-01 00:00:00|2016        |1            |SFR       |213         |2.83135585E8 |1.0         |2016-01-03 00:00:00|AI     |173       |
|3945          |2016-01-01 00:00:00|2016        |1            |SPM       |11

In [98]:
# Write immigrants_table to parquet file:
immigrations_table_path = output_data + "immigrations_table.parquet" + "_" + start_time
print(f"OUTPUT: {immigrations_table_path}")
immigrations_table.write.mode("append").partitionBy("arrival_year", "arrival_month")\
                                          .parquet(immigrations_table_path)
print("Writing DONE.")

# Read parquet file back to Spark:
immigrations_table_df = spark.read.parquet(immigrations_table_path)

OUTPUT: data/output_data/immigrations_table.parquet_2019-08-21-08-33-50-378418
Writing DONE.


#### 4.2 Data Quality Checks
_Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:_
 * _Integrity constraints on the relational database (e.g., unique key, data type, etc.)_
 * _Unit tests for the scripts to ensure they are doing the right thing_
 * _Source/Count checks to ensure completeness_
 
_Run Quality Checks_

**Data quality checks:**
 * Check that all primary and secondary keys in star schema dimension and fact tables have values.
 * Check that all tables have more than 0 rows.

In [100]:
round_ts = start_time
results = { "round_ts": round_ts,
            "admissions_count": 0,
            "admissions": "",
            "countries_count": 0,
            "countries": "",
            "airports_count": 0,
            "airports": "",
            "time_count": 0,
            "time": "",
            "immigrations_count": 0,
            "immigrations": ""}

#### 4.2.1 Quality checks for admissions table

In [101]:
# Check that key fields have valid values (no nulls or empty)
admissions_table_df.createOrReplaceTempView("admissions_table_DF")
admissions_table_check1 = spark.sql("""
    SELECT  COUNT(*)
    FROM admissions_table_DF
    WHERE   admission_nbr IS NULL OR admission_nbr == "" OR 
            country_code IS NULL OR country_code == "" 
""")
admissions_table_check1.show(1)
admissions_table_check1.collect()[0][0]

+--------+
|count(1)|
+--------+
|       0|
+--------+



0

In [102]:
# Check that table has > 0 rows
admissions_table_df.createOrReplaceTempView("admissions_table_DF")
admissions_table_check2 = spark.sql("""
    SELECT  COUNT(*)
    FROM admissions_table_DF
""")
admissions_table_check2.show(1)
admissions_table_check2.collect()[0][0]

+--------+
|count(1)|
+--------+
| 2833480|
+--------+



2833480

In [103]:
if admissions_table_check1.collect()[0][0] > 0 & admissions_table_check2.collect()[0][0] < 1:
    results['admissions_count'] = admissions_table_check2.collect()[0][0]
    results['admissions'] = "NOK"
else:
    results['admissions_count'] = admissions_table_check2.collect()[0][0]
    results['admissions'] = "OK"

print(f"RESULTS: {results}")

RESULTS: {'round_ts': '2019-08-21-08-33-50-378418', 'admissions_count': 2833480, 'admissions': 'OK', 'countries_count': 0, 'countries': '', 'airports_count': 0, 'airports': '', 'time_count': 0, 'time': '', 'immigrations_count': 0, 'immigrations': ''}


#### 4.2.2 Quality checks for countries table

In [104]:
# Check that key fields have valid values (no nulls or empty)
countries_table_df.createOrReplaceTempView("countries_table_DF")
countries_table_check1 = spark.sql("""
    SELECT  COUNT(*)
    FROM countries_table_DF
    WHERE   country_code IS NULL OR country_code == ""
""")
countries_table_check1.show(1)
countries_table_check1.collect()[0][0]

+--------+
|count(1)|
+--------+
|       0|
+--------+



0

In [105]:
# Check that table has > 0 rows
countries_table_df.createOrReplaceTempView("countries_table_DF")
countries_table_check2 = spark.sql("""
    SELECT  COUNT(*)
    FROM countries_table_DF
""")
countries_table_check2.show(1)
countries_table_check2.collect()[0][0]

+--------+
|count(1)|
+--------+
|     229|
+--------+



229

In [106]:
if countries_table_check1.collect()[0][0] > 0 & countries_table_check2.collect()[0][0] < 1:
    results['countries_count'] = countries_table_check2.collect()[0][0]
    results['countries'] = "NOK"
else:
    results['countries_count'] = countries_table_check2.collect()[0][0]
    results['countries'] = "OK"

print(f"RESULTS: {results}")

RESULTS: {'round_ts': '2019-08-21-08-33-50-378418', 'admissions_count': 2833480, 'admissions': 'OK', 'countries_count': 229, 'countries': 'OK', 'airports_count': 0, 'airports': '', 'time_count': 0, 'time': '', 'immigrations_count': 0, 'immigrations': ''}


#### 4.2.3 Quality checks for airports table

In [107]:
# Check that key fields have valid values (no nulls or empty)
airports_table_df.createOrReplaceTempView("airports_table_DF")
airports_table_check1 = spark.sql("""
    SELECT  COUNT(*)
    FROM airports_table_DF
    WHERE   airport_id IS NULL OR airport_id == "" OR
            airport_name IS NULL OR airport_name == ""
""")
countries_table_check2.show(1)
countries_table_check2.collect()[0][0]

+--------+
|count(1)|
+--------+
|     229|
+--------+



229

In [108]:
# Check that table has > 0 rows
airports_table_df.createOrReplaceTempView("airports_table_DF")
airports_table_check2 = spark.sql("""
    SELECT  COUNT(*)
    FROM airports_table_DF
""")
airports_table_check2.show(1)
airports_table_check2.collect()[0][0]

+--------+
|count(1)|
+--------+
|     660|
+--------+



660

In [109]:
if airports_table_check1.collect()[0][0] > 0 & airports_table_check2.collect()[0][0] < 1:
    results['airports_count'] = airports_table_check2.collect()[0][0]
    results['airports'] = "NOK"
else:
    results['airports_count'] = airports_table_check2.collect()[0][0]
    results['airports'] = "OK"

print(f"RESULTS: {results}")

RESULTS: {'round_ts': '2019-08-21-08-33-50-378418', 'admissions_count': 2833480, 'admissions': 'OK', 'countries_count': 229, 'countries': 'OK', 'airports_count': 660, 'airports': 'OK', 'time_count': 0, 'time': '', 'immigrations_count': 0, 'immigrations': ''}


#### 4.2.4 Quality checks for time table

In [110]:
# Check that key fields have valid values (no nulls or empty)
time_table_df.createOrReplaceTempView("time_table_DF")
time_table_check1 = spark.sql("""
    SELECT  COUNT(*)
    FROM time_table_DF
    WHERE   arrival_ts IS NULL OR arrival_ts == ""
""")
time_table_check1.show(1)
time_table_check1.collect()[0][0]

+--------+
|count(1)|
+--------+
|       0|
+--------+



0

In [111]:
# Check that table has > 0 rows
time_table_df.createOrReplaceTempView("time_table_DF")
time_table_check2 = spark.sql("""
    SELECT  COUNT(*)
    FROM time_table_DF
""")
time_table_check2.show(1)
time_table_check2.collect()[0][0]

+--------+
|count(1)|
+--------+
|      31|
+--------+



31

In [112]:
if time_table_check1.collect()[0][0] > 0 & time_table_check2.collect()[0][0] < 1:
    results['time_count'] = time_table_check2.collect()[0][0]
    results['time'] = "NOK"
else:
    results['time_count'] = time_table_check2.collect()[0][0]
    results['time'] = "OK"

print(f"RESULTS: {results}")

RESULTS: {'round_ts': '2019-08-21-08-33-50-378418', 'admissions_count': 2833480, 'admissions': 'OK', 'countries_count': 229, 'countries': 'OK', 'airports_count': 660, 'airports': 'OK', 'time_count': 31, 'time': 'OK', 'immigrations_count': 0, 'immigrations': ''}


#### 4.2.1 Quality checks for immigrations table

In [99]:
#immigrations_table_path = "data/output_data/immigrations_table.parquet_2019-08-15-12-22-26-417652"
#print(f"OUTPUT: {immigrations_table_path}")
#immigrations_table_df = spark.read.parquet(immigrations_table_path)

In [113]:
immigrations_table_df.count()

2450639

In [115]:
# Check that key fields have valid values (no nulls or empty)
immigrations_table_df.createOrReplaceTempView("immigrations_table_DF")
immigrations_table_check1 = spark.sql("""
    SELECT  COUNT(*)
    FROM immigrations_table_DF
        WHERE   immigration_id IS NULL OR immigration_id == "" OR
                arrival_time IS NULL OR arrival_time == "" OR
                arrival_year IS NULL OR arrival_year == "" OR
                arrival_month IS NULL OR arrival_month == "" OR
                airport_id IS NULL OR airport_id == "" OR
                country_code IS NULL OR country_code == "" OR
                admission_nbr IS NULL OR admission_nbr == ""
""")
immigrations_table_check1.show(1)
immigrations_table_check1.collect()[0][0]

+--------+
|count(1)|
+--------+
|       0|
+--------+



0

In [116]:
# Check that table has > 0 rows
immigrations_table_df.createOrReplaceTempView("immigrations_table_DF")
immigrations_table_check2 = spark.sql("""
    SELECT  COUNT(*)
    FROM immigrations_table_DF
""")
immigrations_table_check2.show(1)
immigrations_table_check2.collect()[0][0]

+--------+
|count(1)|
+--------+
| 2450639|
+--------+



2450639

In [118]:
if immigrations_table_check1.collect()[0][0] > 0 & immigrations_table_check2.collect()[0][0] < 1:
    results['immigrations_count'] = immigrations_table_check2.collect()[0][0]
    results['immigrations'] = "NOK"
else:
    results['immigrations_count'] = immigrations_table_check2.collect()[0][0]
    results['immigrations'] = "OK"

print(f"RESULTS: {results}")

RESULTS: {'round_ts': '2019-08-21-08-33-50-378418', 'admissions_count': 2833480, 'admissions': 'OK', 'countries_count': 229, 'countries': 'OK', 'airports_count': 660, 'airports': 'OK', 'time_count': 31, 'time': 'OK', 'immigrations_count': 2450639, 'immigrations': 'OK'}


#### 4.3 Data dictionary 
_Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file._

Data Dictionary for the project is described in **data_dictionary.json** file stored in the project root directory.

#### Step 5: Complete Project Write Up
* _Clearly state the rationale for the choice of tools and technologies for the project._
* _Propose how often the data should be updated and why._
* _Write a description of how you would approach the problem differently under the following scenarios:_
 * _The data was increased by 100x._
 * _The data populates a dashboard that must be updated on a daily basis by 7am every day._
 * _The database needed to be accessed by 100+ people._
 
**Rationale for the tools selection:**
* Python, Pandas and Spark were natural choises to process project's input data since it contains all necessary (and easy to use) libraries to read, clean, process, and form DB tables.
* Since the data set was still limited, local and server storage was used in storing, reading, writing the input and output data. 
* Input data could have been stored in AWS without big problems (excluded in this project). 
* Output data could have been easily written to AWS after processing (excluded in this project). Experiences have shown that it's better to write parquet files locally first and only after that write them to cloud storage (as a bulk oparation) to avoid delays and extra costs caused by AWS S3.

**How often ETL script should be run:**
* ETL script should be run monthly basis (assuming that new I94 data is available once per month).

**Other scenarions (what to consider in them):**
* Data is 100x: 
    * Input data should be stoted in cloud storage e.g. AWS S3
    * Clustered Spark should be used to enable parallel processing of the data.
    * Clustered Cloud DB e.g. AWS Redshift should be used to store the data during the processing (staging and final tables).  
    * Output data (parquet files) should be stored to Cloud storage e.g. AWS S3 for easy access or to a Cloud DB for further analysis. AWS Redshift is very expensive for storing the data, so maybe some SQL DB (e.g. AWS RDS) should be used. 
    
* Data is used in dashboard and updated every day 07:00AM:
    * ETl script should be refactored to process only the changed inout information instead of processing all the inout files as it does now to minimise the used time and comouting resources.
    * Output data should be stored and updated in a Cloud DB (e.g. AWS RDS) to make it available all times for the dashboard.
    * Possibly this "always available" DB (serving the dashboard) would contain a latest sub-set of all available data to make it fast perfoming and easier to manage.

* DB is accessed by 100+ people:
    * Output data should be stored in a Cloud DB (e.g. AWS RDS) to make it "always available" for further analysis. Tools should be provided for the end-users to access the output DB. 
    * Potentially, some new tables could be created to serve the most used queries better.

**Potential further work:** 
* ETL pipeline script could be re-factored
    * make it more modular (split functions to separate files/classes)
    * combine functions to have fewer, more general purpose functions instead of several specific function per ETL steps 
    
* IATA airport data could be (semi-manually) mapped to I94 airport data to add more value for the analysis and enable further data merges.

* Other data e.g. daily weather data could be combined as inout data to provide insights about the weather immigrants experienced when they entered US. 

    