I-94 US Immigration Insights


In [1]:

# Import necessary libraries
import pandas as pd
import re
from pyspark.sql import SparkSession
import os
import glob
import configparser
from datetime import datetime, timedelta, date
from pyspark.sql import types as t
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear

In [2]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']


# NOTE: Use these if using local storage
INPUT_DATA_LOCAL          = config['LOCAL']['INPUT_DATA_LOCAL']
INPUT_DATA_I94_LOCAL      = config['LOCAL']['INPUT_DATA_I94_LOCAL']
INPUT_DATA_AIRPORT_LOCAL  = config['LOCAL']['INPUT_DATA_AIRPORT_LOCAL']
INPUT_DATA_IMMIGRATION    = config['LOCAL']['INPUT_DATA_IMMIGRATION']
INPUT_DATA_US_CITIES      = config['LOCAL']['INPUT_DATA_US_CITIES']
INPUT_COUNTRY_CODES       = config['LOCAL']['INPUT_COUNTRY_CODES']


**Step 1: Scope the Project and Gather Data**
Scope

Scope :- 

Scope of the project is to create an ETL pipeline for processing, cleaning and storing data related to US I94 immigration data, and country codes.

Output of the ETL pipeline: processed data stored in Star schema model to parquet files.

**-- data/18-83510-I94-Data-2016/: US I94 immigration data from 2016 (Jan-Dec).**

        -Source: https://travel.trade.gov/research/reports/i94/historical/2016.html
        -Description: I94_SAS_Labels_Descriptions.txt file contains descriptions for the I94 data.
        -I94 dataset has SAS7BDAT file per each month of the year (e.g. i94_jan16_sub.sas7bdat).
        -Each file contains about 3M rows

**--i94_airport_codes.csv: Airport codes and related cities defined in I94 data description file.**
        -Description: I94 Airport codes data contains information about different airports around the world.
        -Columns: i94port, i94_airport_name
        -Data has 660 rows and 2 columns.

**Country Codes**
-- file :- I94_SAS_Labels_Description.SAS


**Define config and read in data**

In [3]:
# Set config

input_data        = INPUT_DATA_LOCAL
i94_data          = INPUT_DATA_I94_LOCAL
airport_codes     = INPUT_DATA_AIRPORT_LOCAL
immigration_data  = INPUT_DATA_IMMIGRATION
us_cities_data    = INPUT_DATA_US_CITIES
country_codes_i94 = INPUT_COUNTRY_CODES


In [4]:

# Read airport code data:
airport_codes_df = pd.read_csv(airport_codes, header=0, sep=',')

# Read Global country codes data:
#country_codes_df  = pd.read_sas(country_codes_i94, 'sas7bdat', encoding = 'ISO-8859-1')

# Read Immigration Data
immigration_data_df = pd.read_csv(immigration_data,header=0, sep=',')

# US Cities
us_cities_demo = pd.read_csv(us_cities_data,header=0, sep=',')

** Check that the Data has been Read Properly**

In [5]:

airport_codes_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [6]:
airport_codes_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


** Create Spark session **

In [7]:
spark = SparkSession.builder\
                    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
                    .enableHiveSupport().getOrCreate()

** Read I94 immigration data to Spark**

In [9]:
i94_schema = t.StructType([
                            t.StructField("alpha-2", t.StringType(), False),
                            t.StructField("alpha-3", t.StringType(), False),
                            t.StructField("country-code", t.IntegerType(), False),
                            t.StructField("intermediate-region", t.StringType(), False),
                            t.StructField("intermediate-region-code", t.StringType(), False),
                            t.StructField("iso-3166-2", t.StringType(), False),
                            t.StructField("name", t.StringType(), False),
                            t.StructField("region", t.StringType(), True),
                            t.StructField("region-code", t.StringType(), True),
                            t.StructField("sub-region", t.StringType(), True),
                            t.StructField("sub-region-code", t.StringType(), True),
                        ])

i94_data= '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

i94_df_spark =spark.read.format('com.github.saurfang.sas.spark').load(i94_data)

In [10]:

i94_df_spark.printSchema()
i94_df_spark.show(5, truncate=False)

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

** Read Airport code data to Spark **

In [11]:
airport_codes_iata_df_spark = spark.read.csv(airport_codes, header=True)

airport_codes_iata_df_spark.printSchema()
airport_codes_iata_df_spark.show(5, truncate=False)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

+-----+-------------+----------------------------------+------------+---------+-----------+----------+------------+--------+---------+----------+-------------------------------------+
|ident|type         |name                              |elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates                          |
+-----+-------------+----------------------------------+------------+---------+-----------+----------+------------+--------+---------+---

 ** Write Spark DataFrames to parquet files**

In [18]:
start_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
print(start_time)

2020-05-18-08-32-26-103634


In [19]:
# Write I94 Immigration data to parquet file:
i94_df_path = './' + "i94_staging.parquet" + "_" + start_time
print(f"OUTPUT: {i94_df_path}")
i94_df_spark.write.mode("overwrite").parquet(i94_df_path)
print("Writing DONE.")

# Read parquet file back to Spark:
i94_df_spark = spark.read.parquet(i94_df_path)

OUTPUT: ./i94_staging.parquet_2020-05-18-08-32-26-103634
Writing DONE.


In [22]:
# Write I94 Airport data to parquet file:
airport_codes_i94_df_path = './' + "airport_codes_i94_staging.parquet" + "_" + start_time
print(f"OUTPUT: {airport_codes_i94_df_path}")
#airport_codes_i94_df_spark.write.mode("overwrite").parquet(airport_codes_i94_df_path)
print("Writing DONE.")

# Read parquet file back to Spark:
#airport_codes_i94_df_spark = spark.read.parquet(airport_codes_i94_df_path)

OUTPUT: ./airport_codes_i94_staging.parquet_2020-05-18-08-32-26-103634
Writing DONE.


** Step 2: Explore and Assess the Data ** 
    

**Cleaning the data**

In [21]:
# Cleaning i94 data
i94_df_spark_clean = i94_df_spark.na.fill({'i94mode': 0.0, 'i94addr': 'NA','depdate': 0.0, 'i94bir': 'NA', \
                        'i94visa': 0.0, 'count': 0.0, 'dtadfile': 'NA', 'visapost': 'NA', \
                        'occup': 'NA', 'entdepa': 'NA', 'entdepd': 'NA', 'entdepu': 'NA', \
                        'matflag': 'NA', 'biryear': 0.0, 'dtaddto': 'NA', 'gender': 'NA', \
                        'insnum': 'NA', 'airline': 'NA', 'admnum': 0.0, 'fltno': 'NA', 'visatype': 'NA'})
print("Filling NULLs DONE.")

Filling NULLs DONE.


**Step 3: Define the Data Model**

I94 Immigration Insights data models is a star models consisting of 4 Dimensions table and 1 Fact table:

Dimensions tables:
admissions table
countries table
airports table
time table
Fact table:
immigrations table
ERD for the project:

Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

First, ETL script reads in configuration settings (dl.cfg). Script also re-orders I94 inout files to process them in right order (Jan => Dec).
ETL script takes input data (I94 data, I94 country data, I94 airport data, ISO-3166 country data, IATA airport data).
Raw input data is read into pandas dataframe, and from there to Spark dataframe and stored into parquet staging files.
Staging parquet files are read back to Spark dataframes and cleaned (when necessary) and some further data is extracted from the original data.
Each star schema table is processed in order: admissions => countries => airports => time => immigrations
Finally, data quality checks are run for each table to validate the output (key columns don't have nulls, each table has content). A summary of the quality check is provided and written in console.

** Step 4: Run Pipelines to Model the Data** 

In [25]:
i94_df_spark_clean.createOrReplaceTempView("admissions_table_DF")
admissions_table = spark.sql("""
    SELECT  DISTINCT admnum   AS admission_nbr,
                     i94res   AS country_code, 
                     i94bir   AS age, 
                     i94visa  AS visa_code, 
                     visatype AS visa_type, 
                     gender   AS person_gender
    FROM admissions_table_DF
    ORDER BY country_code
""")
admissions_table.printSchema()

root
 |-- admission_nbr: double (nullable = false)
 |-- country_code: double (nullable = true)
 |-- age: double (nullable = true)
 |-- visa_code: double (nullable = false)
 |-- visa_type: string (nullable = false)
 |-- person_gender: string (nullable = false)



In [28]:
admissions_table_path = './'+ "admissions_table.parquet" + "_" + start_time
print(f"OUTPUT: {admissions_table_path}")
admissions_table.write.mode("overwrite").parquet(admissions_table_path)
print("Writing DONE.")

# Read parquet file back to Spark:
admissions_table_df = spark.read.parquet(admissions_table_path)

OUTPUT: ./admissions_table.parquet_2020-05-18-08-32-26-103634
Writing DONE.


In [None]:
# Join tables
country_codes_i94_df_spark_joined = country_codes_i94_df_spark\
                                        .join(country_codes_iso_df_spark, \
                                            (country_codes_i94_df_spark.iso_country_code == \
                                                    country_codes_iso_df_spark.country_code))

 **Step 4: Run Pipelines to Model the Data**

Extracting data from SAS Documents and writing as CSV files to S3 immigration bucket
Extracting remaining CSV and PARQUET files from S3 immigration bucket
Writing CSV and PARQUET files from S3 to Redshift
Performing data quality checks on the newly created tables

**Data quality checks include**

Integrity constraints on the relational database (e.g., unique key, data type, etc.)
Unit tests for the scripts to ensure they are doing the right thing
Source/Count checks to ensure completeness

In [None]:
for table in tables['table_name'][1:]:
    column = get_col_from_table(table)
    sql = f"SELECT COUNT(DISTINCT {column}) FROM {table};"
    c = eng.execute(sql).fetchone()[0]
    print(f"{table}:\n\t{c} distinct rows for column {column}\n")