### Data Engineering Capstone Project

#### Project Summary

In this project, I designed a STAR-shape schema of the database system, containing the information of U.S. I94 immigration data, country code representation, destination state demographic summary and visa type definitions and constructed the cloud ETL pipeline in **AWS**. Specifically, data is extracted and preprocessed using **Apache Spark (PySpark)** on a provided workspace, dumped into **S3 bucket** and queried using **Redshift**. The goal of this project is to construct the immigration database system efficiently, such that the the query of needed immigration and geographical information is retrieved quickly for further downstream services like analytics. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import configparser
import os
from pyspark.sql.functions import isnan, when, count, col, expr, year, month, date_format
from pyspark.sql import SQLContext

### Step 1: Scope the Project and Gather Data

I worked with four dataset specifically. 

- I94 Immigration: This dataset is provided by Udacity, which contains > 3 million rows and is "big". It contains information about foreigners who have entered the U.S. without their P.I.I.. I'm using this table as the main fact table and connect other dimension tables to it. 
- US city demographic: This dataset is also proviced by Udacity, and it contains city-level demographic distributions of U.S.. I aggregated it to the level of **state** and connected it with the entering state of I94 records.
- Country code: This dataset is extracted from the given I94 label ```.SAS``` file, which contains the corresponding information of the country code used by the I94 immigration dataset.
- Visa definition: This dataset is collected from [here](https://www.trade.gov/i-94-arrivals-program), which contains the definitions of different visa types of the I94 immigration dataset.

### Step 2: Explore and Assess the Data

#### Country code data

In [14]:
with open('I94_SAS_Labels_Descriptions.SAS') as f:
    f = f.read().replace('\t', '')

In [15]:
head = f.index("i94cntyl")
tail = f.find(";", head, len(f))
content = [s.replace("'", "") for s in f[head:tail].split("\n")[1:]]
l = []
for c in content:
    code, country = c.split("=")
    code = int(code.strip())
    country = country.strip()
    l.append((code, country))
df_country = pd.DataFrame.from_records(l, columns=["country_code", "country_name"])
df_country.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 289 entries, 0 to 288
Data columns (total 2 columns):
country_code    289 non-null int64
country_name    289 non-null object
dtypes: int64(1), object(1)
memory usage: 4.6+ KB


#### State demographic data

In [16]:
df_city_demo = pd.read_csv("us-cities-demographics.csv", delimiter=";")

In [17]:
columns = ["State Code","Male Population", "Female Population", "Total Population", "Number of Veterans"]
df_state_demo = df_city_demo[columns].groupby("State Code").sum().astype(int).reset_index()
df_state_demo.columns = ["_".join(c.lower().split(" ")) for c in columns]
df_state_demo.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 49 entries, 0 to 48
Data columns (total 5 columns):
state_code            49 non-null object
male_population       49 non-null int64
female_population     49 non-null int64
total_population      49 non-null int64
number_of_veterans    49 non-null int64
dtypes: int64(4), object(1)
memory usage: 2.0+ KB


In [18]:
del df_city_demo

#### Visa data

In [19]:
with open("visainfo.txt") as f:
    f = f.read().split("\n")

In [20]:
columns = f[0].split(";")
content = []
for l in f[1:]:
    content.append(l.split(";"))
df_visa = pd.DataFrame(content, columns=columns)
df_visa.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20 entries, 0 to 19
Data columns (total 2 columns):
visa_type    20 non-null object
visa_def     20 non-null object
dtypes: object(2)
memory usage: 400.0+ bytes


#### I94 immigration data

In [33]:
# Option 1: Local laod

In [35]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df_final = spark.read.load('./sas_data')

In [36]:
field_list = ['cicid', 'i94cit', 'i94res', 'i94port', 'i94mode', 'i94addr', 'gender', 'visatype']
df_final = df_final.select(field_list)

In [34]:
# Option 2: Cloud load

In [None]:
config = configparser.ConfigParser()
config.read_file(open('aws/credentials.cfg'))
output_data = config['AWS']['S3_BUCKET']

# Building a spark session with a connection to AWS S3
spark = SparkSession.builder \
        .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11","org.apache.hadoop:hadoop-aws:2.7.2") \
        .config("spark.hadoop.fs.s3a.access.key",config['AWS']['AWS_ACCESS_KEY_ID']) \
        .config("spark.hadoop.fs.s3a.secret.key",config['AWS']['AWS_SECRET_ACCESS_KEY']) \
        .getOrCreate()

In [None]:
month_list = ["jan","feb","mar","apr","may","jun","jul","aug","sep","oct","nov","dec"]
field_list = ['cicid', 'i94cit', 'i94res', 'i94port', 'i94mode', 'i94addr', 'gender', 'visatype']

for month in month_list:
    fname = f'../../data/18-83510-I94-Data-2016/i94_{month}16_sub.sas7bdat'
    df_temp = spark.read.format('com.github.saurfang.sas.spark').load(fname)
    df_temp = df_temp.select(field_list)
   
    if month == 'jan':
        df_final = df_temp
    else:
        df_final = df_final.union(df_temp)

In [None]:
# LEFT ANTI JOIN DUP

In [37]:
df_final = df_final.cache()
df_final_dup = df_final.groupby('cicid').count().filter(col('count')>1)
df_final2 = df_final.join(df_final_dup, on=['cicid'], how='left_anti')
df_final2 = df_final2.cache()

In [38]:
# FILL NA

In [39]:
df_final2 = df_final2.fillna({'i94cit':-1, 'i94mode':-1, 'i94addr':'99'})

In [40]:
# CONVERT TO INTEGER

In [41]:
cast_lst = ['cicid','i94cit','i94res','i94mode']

for col_name in cast_lst:
    df_final2 = df_final2.withColumn(col_name, col(col_name).cast('int'))

In [42]:
# Save to .parquet

In [43]:
output_data = "./data/"

In [46]:
df_final2 = df_final2.coalesce(4)
df_final2.write.parquet(f"{output_data}I94-Data")

In [47]:
print(df_final2.count())
df_final2.printSchema()

3096313
root
 |-- cicid: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = false)
 |-- gender: string (nullable = true)
 |-- visatype: string (nullable = true)



In [48]:
sc = spark.sparkContext
sqlCtx = SQLContext(sc)

In [49]:
schema = StructType([ \
    StructField("state_code", StringType(), True), \
    StructField("male_population", IntegerType(), True), \
    StructField("female_population", IntegerType(), True), \
    StructField("total_population", IntegerType(), True), \
    StructField("number_of_veterans", IntegerType(), True) \
  ])

spark_state_demo = sqlCtx.createDataFrame(df_state_demo, schema=schema)
spark_state_demo.write.parquet(f"{output_data}demographics")

print(spark_state_demo.count())
spark_state_demo.printSchema()

49
root
 |-- state_code: string (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- number_of_veterans: integer (nullable = true)



In [51]:
schema = StructType([ \
    StructField("country_code", IntegerType(), True), \
    StructField("country_name", StringType(), True) \
  ])

spark_country = sqlCtx.createDataFrame(df_country, schema=schema)
spark_country.write.parquet(f"{output_data}country")

print(spark_country.count())
spark_country.printSchema()

289
root
 |-- country_code: integer (nullable = true)
 |-- country_name: string (nullable = true)



In [52]:
schema = StructType([ \
    StructField("visa_type", StringType(), True), \
    StructField("visa_def", StringType(), True) \
  ])

spark_visa = sqlCtx.createDataFrame(df_visa, schema=schema)
spark_visa.write.parquet(f"{output_data}visa")

print(spark_visa.count())
spark_visa.printSchema()

20
root
 |-- visa_type: string (nullable = true)
 |-- visa_def: string (nullable = true)



In [21]:
# Create metadata.json

In [53]:
df_metadata = pd.DataFrame([("fact_immigration", df_final2.count()), 
                            ("dim_country", spark_country.count()), 
                            ("dim_visa", spark_visa.count()), 
                            ("dim_state_demo", spark_state_demo.count())], columns=["table", "rows"])
df_metadata.to_json("./metadata.json")

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Data model is designed as follows:
- Fact table:
    - fact_immigration: comes from the I94 immigration data.
- Dimension table:
    - dim_country: comes from the I94 label SAS file.
    - dim_visa: comes from the collected visa definitions.
    - dim_state_demo: comes from the aggregation of city-level demographic information.
    
![schema](./DB_schema.svg)

#### 3.2 Mapping Out Data Pipelines

The ETL pipeline is as follows: 
- Create staging, dimension and fact tables.
- Load processed ```.parquet``` files from S3.
- Insert the loaded data to the corresponding dimension and fact tables in Redshift.

![pipeline](./Pipeline.svg)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [3]:
# Create Redshift cluster

In [None]:
!python ./operations/create_redshift_cluster.py create

In [None]:
# Create tables

In [None]:
!python ./operations/create_tables.py

In [None]:
# Perform ETL pipeline

In [None]:
!python ./operations/etl.py

#### 4.2 Data Quality Checks

Two data quality checks are performed:
- whether the created table is in the workspace.
- whether the number of records in each table is identical to the preprocessed ones.

In [None]:
# Perform data quality check

In [None]:
!python ./operations/data_quality.py

In [None]:
# Delete (the expensive) Redshift cluster

In [None]:
!python ./operations/create_redshift_cluster.py delete

#### 4.3 Data dictionary

Fact table:
- fact_immigration:
    - **cicid**: unique id for each visitor.
    - i94cit: country code for visitor's citizenship.
    - i94res: country code for visitor's residence.
    - i94port: id of entry port.
    - i94mode: mode of entry.
    - i94addr: abbreviation of entry state.
    - gender: gender of visitor.
    - visatype: visa type of entry.

Dimension tables:
- dim_country:
    - **country_code**: code of country.
    - country_name: name of country.
- dim_visa:
    - **visa_type**: type of visa.
    - visa_def: definition of visa type.
- dim_state_demo:
    - **state_code**: abbreviation of state.
    - male_population: total population of males in the state.
    - female_population: total population of females in the state.
    - total_population: male_population: total population of the state.
    - number_of_veterans: total number of veterans in the state.

### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
    * The fact table is the majority component in the database, so it is more efficient to store a shortened version of variables in it and link the detailed descriptions in seperate dimension tables. This is an ideal use case of the STAR-schema, where the storage of duplicated information is avoided. Tables are connected via indices (bold in the tables and figures above), and they can be joined together to generate more complex insights. For example, if a data scientist would like to know which country has the most incoming F1 student, (s)he can query 
    ```
    SELECT 
          c.country_name AS Country, 
          SUM(i.visatype=1) AS Students 
    FROM fact_immigration i 
    JOIN dim_country c 
    ON i.i94cit = c.country_code 
    GROUP BY Country 
    ORDER BY Students DESC LIMIT 1
    ```
    * Since the size of I94 data is large, I'm using Apache Spark for processing. The other datasets are much smaller, I'm simply using pandas for processing.
    * I'm saving processed data as ```.parquet``` format in S3 bucket storage, because the columnar storage format of ```.parquet``` files would significantly boost the access speed of queries and S3 works natively well with Redshift. 
    * Redshift is a power tool for data warehousing. It is scalable, distributed and cost-effective, and it is ideal to host the DB on it.
* Propose how often the data should be updated and why.
    * Normally speaking, the a weekly or a monthly update of data should be fine if the query is not emergent.
* Write a description of how you would approach the problem differently under the following scenarios:
    * The data was increased by 100x: Process all dataset on cloud. Since the other procedures are already on cloud and in distributed manners, the rest procedures should be scalable enough. 
    * The data populates a dashboard that must be updated on a daily basis by 7am every day: Schedule the execution of pipelines using tools like Apache Airflow.
    * The database needed to be accessed by 100+ people: Change the number and type of hosting nodes in Redshift.