# Project: I94 Immigration
### Data Engineering Capstone Project

#### Project Summary
The US National Tourism and Trade Office wants to set up and store their immigration data on the any cloud service that supports quick ingesting, transforming and loading data to their reports as well as other system for prompt tracking for people who come and leave the US.
Besides, these data will help them to have an eye on what relation foreigners and immigrants might have on other aspects, such as US Cities Demographic, US Airports in each region,...

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
# Do all imports and installs here

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import configparser
import os
from pyspark.sql.types import *
import datetime

In [4]:
# Connect to AWS Credentials

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

### Step 1: Scope the Project and Gather Data

#### Scope 

1. What Data?
- I94 Immigration Data
- U.S. City Demographic Data
- Airport Code Table

2. What tool?
- Main data format is SAS and expect to be grow in the future so we will use Spark to ingest and transform data
- After data is transformed, to support prompt writing and reading, Spark will load processed data into S3 Data Lake, data are queried in-place on the S3 parquet data.



#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. This will be the main data source used for analysis. Data will be in SAS format
    - **Data Dictionary** :All column definition and sample data is stored in this file: [I94_SAS_Labels_Descriptions](I94_SAS_Labels_Descriptions.SAS)
- U.S. City Demographic Data: This data comes from OpenSoft, including detailed information about US Cities like age, population, Race,...
- Airport Code Table: This is a simple table of airport codes and corresponding cities.

In [5]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem").\
enableHiveSupport().getOrCreate()

# raw_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [6]:
#write to parquet
# raw_df.write.parquet("i94_data")
print("DONE Saving file in parquet format")

immigration_df=spark.read.parquet("i94_data")
print("DONE Reading Immigration data")

airport_df = spark.read.format("csv").option("header", "true").load('airport-codes_csv.csv')
print("DONE Reading Airport data")

uscity_df = spark.read.options(header='true', delimiter=";").csv("us-cities-demographics.csv")
print("DONE Reading US Cities data")

DONE Saving file in parquet format
DONE Reading Immigration data
DONE Reading Airport data
DONE Reading US Cities data


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [5]:
print(f'Immigration df length: {immigration_df.count()}')
print(f'Airport df length: {airport_df.count()}')
print(f'US Cities df length: {uscity_df.count()}')

Immigration df length: 3096313
Airport df length: 55075
US Cities df length: 2891


In [6]:
# Exploring data

# Count NULLs in each column in Immigration DF
immigration_df.select([count(when(isnull(c), c)).alias(c) for c in immigration_df.columns]).show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|      1|      3|      8|     0|      0|    0|       1|      76|  100|      0|      8|     98|      8|      0|      0|    27|   100|      2|     0|    1|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-

- After checking NULLs, we will not take those columns with too many NULLs into transforming and analyzing: isnum, entdepu, entdepd, occup
- Next, choose critical columns for fact_immigration table

In [19]:
# Count NULLs in each column in airport_df
airport_df.select([count(when(isnull(c), c)).alias(c) for c in airport_df.columns]).show()

+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|ident|type|name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|    0|   0|   0|        7006|        0|          0|         0|        5676|   14045|    45886|     26389|          0|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+



In [20]:
uscity_df.select([count(when(isnull(c), c)).alias(c) for c in uscity_df.columns]).show()

+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race|Count|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|   0|    0|         0|              3|                3|               0|                13|          13|                    16|         0|   0|    0|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
[Conceptual Data Model](DataModel.png)
<img src="DataModel.png"/>


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

In [7]:
#Select columns and rename for better understanding. Then drop duplicates if any based on cicid column
fact_immigration_df = immigration_df.select("cicid"\
                                  ,"i94yr"\
                                  ,"i94mon"\
                                  ,col("i94res").alias("CountryID")\
                                  ,col("i94port").alias("PortID")\
                                  ,col("arrdate").alias("ArrivalDate")\
                                  ,col("depdate").alias("DepartureDate")\
                                  ,col("i94addr").alias("DestinationState")\
                                  ,"i94mode"\
                                  ,col("i94visa").alias("VisaCat")\
                                  ,"visatype").dropDuplicates(["cicid"])
fact_immigration_df.show(5)

+-----+------+------+---------+------+-----------+-------------+----------------+-------+-------+--------+
|cicid| i94yr|i94mon|CountryID|PortID|ArrivalDate|DepartureDate|DestinationState|i94mode|VisaCat|visatype|
+-----+------+------+---------+------+-----------+-------------+----------------+-------+-------+--------+
| 67.0|2016.0|   4.0|    103.0|   ATL|    20545.0|      20580.0|              AL|    1.0|    2.0|      WT|
| 70.0|2016.0|   4.0|    103.0|   ATL|    20545.0|      20567.0|              FL|    1.0|    2.0|      WT|
| 69.0|2016.0|   4.0|    103.0|   ATL|    20545.0|      20560.0|              FL|    1.0|    2.0|      WT|
|  7.0|2016.0|   4.0|    276.0|   ATL|    20551.0|         null|              AL|    1.0|    3.0|      F1|
|124.0|2016.0|   4.0|    103.0|   NEW|    20545.0|      20554.0|              NJ|    1.0|    2.0|      WT|
+-----+------+------+---------+------+-----------+-------------+----------------+-------+-------+--------+
only showing top 5 rows



In [20]:
# Check the schema
fact_immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- CountryID: double (nullable = true)
 |-- PortID: string (nullable = true)
 |-- ArrivalDate: double (nullable = true)
 |-- DepartureDate: double (nullable = true)
 |-- DestinationState: string (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- VisaCat: double (nullable = true)
 |-- visatype: string (nullable = true)



In [8]:
#From the Schema, some dates columns should be convert to timestamp
# Create a UDF to help convert date in sas numberic format to timestamp

@udf(TimestampType())
def sas_to_timestamp(x):
    try:
        if  x is not None:
            epoch = datetime.datetime(1960, 1, 1)
            return epoch + datetime.timedelta(days=int(float(x)))
        else:
            return pd.Timestamp('1900-1-1')
    except:
         return pd.Timestamp('1900-1-1')

In [9]:
fact_immigration_df = fact_immigration_df.withColumn("ArrivalDate",sas_to_timestamp(col("ArrivalDate")))\
                                        .withColumn("DepartureDate",sas_to_timestamp(col("DepartureDate")))

fact_immigration_df.show(3)

+-----+------+------+---------+------+-------------------+-------------------+----------------+-------+-------+--------+
|cicid| i94yr|i94mon|CountryID|PortID|        ArrivalDate|      DepartureDate|DestinationState|i94mode|VisaCat|visatype|
+-----+------+------+---------+------+-------------------+-------------------+----------------+-------+-------+--------+
| 67.0|2016.0|   4.0|    103.0|   ATL|2016-04-01 00:00:00|2016-05-06 00:00:00|              AL|    1.0|    2.0|      WT|
| 70.0|2016.0|   4.0|    103.0|   ATL|2016-04-01 00:00:00|2016-04-23 00:00:00|              FL|    1.0|    2.0|      WT|
| 69.0|2016.0|   4.0|    103.0|   ATL|2016-04-01 00:00:00|2016-04-16 00:00:00|              FL|    1.0|    2.0|      WT|
+-----+------+------+---------+------+-------------------+-------------------+----------------+-------+-------+--------+
only showing top 3 rows



In [10]:
dim_airport = airport_df.select(regexp_extract(col("iso_region"), ".*-(.*)",1).alias("StateID")\
                          ,"type"\
                          ,"name"\
                          ,"iso_country"\
                          ,"gps_code"\
                          ,"coordinates"
                         ).dropDuplicates()
dim_airport.show(3)

+-------+-------------+--------------------+-----------+--------+--------------------+
|StateID|         type|                name|iso_country|gps_code|         coordinates|
+-------+-------------+--------------------+-----------+--------+--------------------+
|     ME|seaplane_base|Saint Peter's Sea...|         US|    01ME|-68.5002975463867...|
|     AR|     heliport|Saline Memorial H...|         US|    04AR|-92.586047, 34.57...|
|     NV|small_airport|Kingston Ranch Ai...|         US|    04NV|-115.665000916, 3...|
+-------+-------------+--------------------+-----------+--------+--------------------+
only showing top 3 rows



In [11]:
dim_immigrants = immigration_df.select("cicid"\
                            ,col("i94bir").alias("Age")\
                            ,col("biryear").alias("BirthYear")\
                            ,"gender"
                            )
dim_immigrants.show(2)

+-----+----+---------+------+
|cicid| Age|BirthYear|gender|
+-----+----+---------+------+
|  6.0|37.0|   1979.0|  null|
|  7.0|25.0|   1991.0|     M|
+-----+----+---------+------+
only showing top 2 rows



In [12]:
dim_cities = uscity_df.select("State Code"\
                        ,"State"\
                        ,"City"\
                        ,"Median Age"\
                        ,"Total Population" \
                        ,"Foreign-born"\
                        ,"Race"\
                        ,"Count"
                        )

dim_cities.show(2)

+----------+-------------+-------------+----------+----------------+------------+------------------+-----+
|State Code|        State|         City|Median Age|Total Population|Foreign-born|              Race|Count|
+----------+-------------+-------------+----------+----------------+------------+------------------+-----+
|        MD|     Maryland|Silver Spring|      33.8|           82463|       30908|Hispanic or Latino|25924|
|        MA|Massachusetts|       Quincy|      41.0|           93629|       32935|             White|58723|
+----------+-------------+-------------+----------+----------------+------------+------------------+-----+
only showing top 2 rows



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Load data to S3 Data Lake

In [None]:

fact_immigration_df.write.option("header",True) \
        .partitionBy("i94yr", "i94mon") \
        .mode("overwrite") \
        .parquet(output_data +'fact_immigration')

In [None]:
dim_airport.write.option("header",True) \
        .mode("overwrite") \
        .parquet(output_data +'dim_airport')

In [None]:
dim_immigrants.write.option("header",True) \
        .mode("overwrite") \
        .parquet(output_data +'dim_immigrants')

In [None]:
dim_cities.write.option("header",True) \
        .mode("overwrite") \
        .parquet(output_data +'dim_cities')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [53]:
def records_count_check(df_list):
    for df in df_list:
        dfname =[x for x in globals() if globals()[x] is df][0]
        count = df.count()
        if count < 1:
            print("ERROR: Table has 0 records")
        else:
            print(f"COUNT CHECKED: Table {dfname} has {count} records")

In [54]:
def col_dtype_check(df, cols, col_type):
    for col in cols:
        if dict(df.dtypes)[col] == col_type:
            print(f"CORRECT DATA TYPE: column {col} checked! Type {col_type}")
        else:
            print(f"FAILED DATA TYPE CHECK: column {col} has wrong dtype: {col_type}")

In [55]:
records_count_check([dim_airport, dim_cities])


COUNT CHECKED: Table dim_airport has 54968 records
COUNT CHECKED: Table dim_cities has 2891 records


In [57]:
# print(dict(fact_immigration_df.dtypes))
col_dtype_check(fact_immigration_df, ['ArrivalDate','DepartureDate'],'timestamp')

CORRECT DATA TYPE: column ArrivalDate checked! Type timestamp
CORRECT DATA TYPE: column DepartureDate checked! Type timestamp


In [58]:
# Create table/view for querying in SQL
fact_immigration_df.createOrReplaceTempView("fact_immigration_table")
dim_immigrants.createOrReplaceTempView("dim_immigrants_table")
dim_airport.createOrReplaceTempView("dim_airport_table")
dim_cities.createOrReplaceTempView("dim_cities_table")


In [59]:
#Count records of each table

spark.sql("""
SELECT "fact_immigration_table" as name, COUNT(*)
FROM fact_immigration_table
UNION
SELECT "dim_immigrants_table" as name, COUNT(*)
FROM dim_immigrants_table
UNION
SELECT "dim_airport_table" as name, COUNT(*)
FROM dim_airport_table
UNION
SELECT "dim_cities_table" as name, COUNT(*)
FROM dim_cities_table
""").show()

+--------------------+--------+
|                name|count(1)|
+--------------------+--------+
|dim_immigrants_table|     100|
|   dim_airport_table|   54968|
|fact_immigration_...|     100|
|    dim_cities_table|    2891|
+--------------------+--------+



In [60]:
#Do some analysis queries

#Query 1: TOP STATE with most immigration
spark.sql("""
SELECT f.DestinationState, count(*)
FROM fact_immigration_table f
GROUP BY f.DestinationState
ORDER BY COUNT(*) DESC
LIMIT 10
""").show()


+----------------+--------+
|DestinationState|count(1)|
+----------------+--------+
|              NJ|      15|
|              NY|      14|
|              MI|      12|
|              FL|      12|
|              MA|      11|
|              TX|       5|
|              NH|       4|
|              CA|       4|
|            null|       3|
|              CT|       3|
+----------------+--------+



In [61]:
#Query 2: TOP MONTHs with most Immigration application
spark.sql("""
SELECT f.i94mon, count(*)
FROM fact_immigration_table f
GROUP BY f.i94mon
ORDER BY COUNT(*) DESC
LIMIT 10
""").show()

+------+--------+
|i94mon|count(1)|
+------+--------+
|   4.0|     100|
+------+--------+



In [62]:
#Query 3: Immigration applicants by age and gender
spark.sql("""
SELECT i.age, i.gender, count(*)
FROM dim_immigrants_table i
GROUP BY i.age, i.gender
ORDER BY COUNT(*) DESC
LIMIT 10
""").show()

+----+------+--------+
| age|gender|count(1)|
+----+------+--------+
|55.0|  null|       3|
|40.0|     M|       2|
|57.0|  null|       2|
|49.0|     F|       2|
|55.0|     M|       2|
|48.0|     M|       2|
|66.0|  null|       2|
|61.0|     F|       2|
|27.0|     M|       2|
|58.0|     M|       2|
+----+------+--------+



#### 4.3 Data dictionary

 1. fact_immigration:
 
 - cicid : unique ID of each profile
 - i94yr: year of application
 - i94mon: month of application
 - CountryID: Country ID of applicants
 - PortID: Port ID of applicants
 - ArrivalDate: Date applicants arrive to US
 - DepartureDate: Date applicants depart from US
 - DestinationState: State applicants arrive to
 - i94mode: how did the applicant arrived in the USA
 - VisaCat: Visa category of applicant
 - visatype: Visa type of applicant
 
 
 2. dim_airport:
 
 - StateID: State ID of a specific airport
 - type: Type of airport
 - name: Name of airport
 - iso_country: Country that airport belongs to
 - gps_code: GPS Code of airport
 - coordinate: Latitude and longtitute of an airport
 
3.dim_immigrants:
 
  - cicid: unique ID of each profile
  - Age: Age of applicant
  - BirthYear: Birth year of applicant
  - gender: gender of applicant
  
  
4. dim_cities:

 - State Code: State code
 - State: State name
 - City: city name
 - Median Age: median age of a city
 - Total Population: total population of a city
 - Foreign-born: number of foreign-born of a city
 - Race: majority race of a city
 - Count: number of people of a specific race of a city

#### Step 5: Complete Project Write Up

##### The choice of tools, technologies:

- Main file data format is SAS and expect to be grow in the future so we will use Spark cluster to ingest and transform data. Spark is a general-purpose distributed data processing engine that is suitable for use in a wide range of circumstances. Sparks's in-memory processing saves a lot of time and makes it easier and efficient. It's also easy to scale up Spark cluster for larger data volume in the future
- After data is transformed, to support prompt writing and reading, Spark will load processed data into S3 Data Lake, data are queried in-place on the S3 parquet data. S3 is considered as a data lake with cheaper cost than a data warehouse such as Redshift, easily accessible and integrated with other platforms

##### Jars required for Spark to connect to S3

aws-java-sdk-1.7.4.2.jar
hadoop-aws-2.7.3.jar

##### How often the data should be updated: 

- Data should be updated **monthly** for the fact table as shown in the documentation for the I-94 Arrivals Program data
- Meanwhile for dimension table, data can be updated upon request

##### In case the data was increased by 100x:

-  Then the Spark cluster that is hosted on an EMR instance should be scaled up accordingly to help process the data properly

##### In case the data populates a dashboard that must be updated on a daily basis by 7am every day:


- A Schedule job tool such as Apache Airflow will help. This requires set up a DAG that trigger on scheduled time of daily 7am to run Python scripts and Spark jobs.

##### In case the database needed to be accessed by 100+ people: 

  * As we use S3 to store data directly and query on fly, 100 or more people can easily access the data by assigning for all of them the proper IAM roles