# Data Engineering Capstone Project

## Project Summary
The objective of this project is to collect data related to immigration and build a simple data warehouse where it can be further analysed in a fast and efficient manner. 

Project Files:  
* `Capstone_Project.ipynb`: Current notebook. It documents all the steps taken in this project.  
* `Creating_Data_Warehouse.ipynb`: This notebook illustrates the step to create a new Amazon Redshift cluster.  
* `Data_Dictionary.ipynb`: This notebook contains the data dictionary of the data model of this project.  
* `etl.py`, `sql_queries.py` and `dwh.cfg`: The Python and configuration files for ETL process.
* `data_model.png`: The diagram of the data model.
* `raw_data` folder: The raw dataset provided by Udacity for this project.
* `csv_files` folder: List of csv files of the dataset after cleaning process.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

## Step 1: Scope the Project and Gather Data

The process will first use Spark and Pandas to read, parse and clean the data that are stored in local directory. The data will then be transferred to Amazon S3 and ready to be loaded by Amazon Redshift. The final solution consists of several tables of U.S. immigration information that could be accessed for further analysis by using SQL.

The data in this project consists of information related to immigration in the United States:
* I94 Immigration Data - This data was from the US National Tourism and Trade Office. The data contains international visitor arrival statistics by world regions, and select countries. The data contains the type of visa, the mode of transportation, the age groups, states visited, and the top ports of entry for immigration into the United States. The data was collected from [here](https://travel.trade.gov/research/reports/i94/historical/2016.html).
* U.S. City Demographic Data - This dataset contains information about the demographics of all US cities, and census-designated places with a population greater or equal to 65,000. The dataset can be accessed [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
* Airport Code Table - This dataset contains a simple table of airport codes, and corresponding cities. The data can be accessed [here](https://datahub.io/core/airport-codes#data).


In [2]:
# Do all imports and installs here
import pandas as pd
import boto3
import json
import psycopg2
import pandas.io.sql as sqlio
import configparser
from datetime import datetime

import os
import pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DateType
from pyspark.sql.functions import expr , udf ,trim ,year, month, dayofmonth

In [13]:
# Load parameters from configuration files
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY = config.get('AWS','KEY')
SECRET = config.get('AWS','SECRET')
S3_BUCKET = config.get('S3', 'S3_BUCKET')

First, we load the data that are stored in local directory.

#### Load immigrations data


In [6]:
# Get a spark session
spark = SparkSession \
    .builder \
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .getOrCreate()

In [7]:
# Get the list of immigration data in the folder
immi_path = '../../data/18-83510-I94-Data-2016/'
immig_files = [os.path.join(immi_path, f) for f in os.listdir(immi_path)]

The immigration data for Jun 2016 (`i94_jun16_sub.sas7bdat`) consists of extra columns, namely *'validres','delete_days','delete_mexl','delete_dup','delete_recdup'* and *'delete_visa'*. These columns are dropped before joining with other immigration data of another months.

In [8]:
# Read 'i94_jun16_sub.sas7bdat' and drop extra columns
df_immig = spark.read.format('com.github.saurfang.sas.spark') \
                .option("inferSchema", "true") \
                .option("dateFormat", "yyyyMMdd") \
                .load(immig_files[4]) \
                .drop('validres','delete_days','delete_mexl','delete_dup','delete_recdup','delete_visa')

# Remove 'i94_jun16_sub.sas7bdat' from file list since it has been read
immig_files.pop(4);

# Combine with the rest of the SAS files
for i in range(0, len(immig_files)):
    df = spark.read.format('com.github.saurfang.sas.spark') \
                .option("inferSchema", "true") \
                .option("dateFormat", "yyyyMMdd") \
                .load(immig_files[i])

    df_immig = df_immig.union(df)

The descriptor file, `I94_SAS_Labels_Descriptions.SAS` consists of data dictionary of the immigration data. We parse it manually and save the information as several csv files in `./csv_files/` folder. The following files are the results of the manually parsing and will be loaded as dimension tables in the later ETL process:
* `arrival_mode.csv`
* `countries.csv`
* `ports.csv`
* `states.csv`
* `visa_type.csv`

#### Load demographics data and airport codes

In [9]:
# Load demographics data and airport_codes csv 
df_demog = pd.read_csv('./raw_data/us-cities-demographics.csv', delimiter=';')
df_port = pd.read_csv('./raw_data/airport-codes_csv.csv')

## Step 2: Explore and Assess the Data

Next, we will explore the data and apply necessary step to clean the data.

#### Immigrations Data


In [11]:
# Number of rows
df_immig.count()

40790529

In [10]:
# Display a few rows of data
df_immig.show(3)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|  4.0|2016.0|   6.0| 135.0| 135.0|    XXX|20612.0|   null|   null|   null|  59.0|    2.0|  1.0|    null|    null| null|      Z|   null|      U|   null| 1957.0|10032016|  null|  null|   null|1.4938462027E10| null|      WT|
|  5.0|2016.0|   6.0| 135.0| 135.0|    XXX|20612.0|   null|   null|   null|  50.0|    2.0|  1.0|    null|   

* Some columns such as *'i94yr'*, *'i94mon'* and *'i94bir'* are stored as floating point. We will cast it to integer type.
* The various dates in the data are stored as SAS date format, which is the number of days since 1/1/1960. We will transform them to date type.

In [10]:
# Update data type of several columns
df_immig = df_immig.withColumn('cicid', df_immig["cicid"].cast("bigint")) \
                .withColumn('i94yr', df_immig["i94yr"].cast("int"))\
                .withColumn('i94mon', df_immig["i94mon"].cast("int"))\
                .withColumn('i94cit', df_immig["i94cit"].cast("int"))\
                .withColumn('i94res', df_immig["i94res"].cast("int"))\
                .withColumn('i94mode', df_immig["i94mode"].cast("int"))\
                .withColumn('i94bir', df_immig["i94bir"].cast("int"))\
                .withColumn('i94visa', df_immig["i94visa"].cast("int"))\
                .withColumn('count', df_immig["count"].cast("int"))\
                .withColumn('biryear', df_immig["biryear"].cast("int"))\
                .withColumn('arrdate', expr("date_add(to_date('1960-01-01'), arrdate)")) \
                .withColumn('depdate', expr("date_add(to_date('1960-01-01'), depdate)")) \
                .withColumn('dtaddto', expr("to_date(dtaddto,'MMddyyyy')"))

df_immig.printSchema()

root
 |-- cicid: long (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: date (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable =

In [23]:
# Display a few row of the cleaned data
df_immig.show(3)

+-----+-----+------+------+------+-------+----------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+----------+------+------+-------+---------------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|   dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+-----+-----+------+------+------+-------+----------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+----------+------+------+-------+---------------+-----+--------+
|   22| 2016|     8|   323|   323|    NYC|2016-08-01|      1|     FL|   null|    23|      3|    1|20160801|     RID| null|      U|   null|   null|   null|   1993|      null|     M|  null|     EK| 6.451049563E10|  201|      F1|
|   55| 2016|     8|   209|   209|    AGA|2016-08-01|      1|     CA|   null|    41|      2|

The processed immigration data will be stored as parquet files and transfer to a Amazon S3 bucket

In [11]:
# Store the immigrations data as parquet
df_immig.write.mode("overwrite").parquet("immigrations_parquet")

In [14]:
# Transfer the parquet files to S3 bucket

session = boto3.Session(
                aws_access_key_id = KEY,
                aws_secret_access_key = SECRET)

s3 = session.resource('s3')
bucket = s3.Bucket(S3_BUCKET)

path = './immigrations_parquet/'
for subdir, dirs, files in os.walk(path):
    for file in files:
        file_name, file_extension = os.path.splitext(file)
        full_path = os.path.join(subdir, file)
        if file_extension == '.' + 'parquet':            
            with open(full_path, 'rb') as data:
                bucket.put_object(Key = 'immigrations_parquet/' + full_path[len(path):], Body=data)

#### Demographics Data

In [17]:
# Display a few rows of data
df_demog.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [18]:
# Display the dimension of the data
df_demog.shape

(2891, 12)

There are no changes to the demographics data. Nevertheless, we store the data as csv file in `./csv_files` folder

In [19]:
# Store the data as csv file
df_demog.to_csv('./csv_files/us-cities-demographics.csv', index = False)

#### Airport Codes

In [20]:
# Display a few rows of data
df_port.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [21]:
# Display the dimension of the data
df_port.shape

(55075, 12)

For airports located in U.S., the *'iso_region'* column consists of the state code. We will extract the state code so that it could be used to join with other tables in later anaylsis.

In [121]:
# Splitting column 'iso_region' to extract state code
df_port['iso_region'] = df_port['iso_region'].apply(lambda x: x.split('-')[1])

In [123]:
# Store the data as csv file
df_port.to_csv('./csv_files/airport-codes.csv', index = False)

Now we transfer all csv files in `./csv_files` folder to S3 bucket.

In [75]:
# Transfer all csv files to S3 bucket
path = './csv_files/'
for subdir, dirs, files in os.walk(path):
    for file in files:
        file_name, file_extension = os.path.splitext(file)
        full_path = os.path.join(subdir, file)
        if file_extension == '.' + 'csv':            
            with open(full_path, 'rb') as data:
                bucket.put_object(Key = full_path[len(path):], Body=data)

## Step 3: Define the Data Model
### 3.1 Conceptual Data Model
<img src="data_model.png">

### 3.2 Mapping Out Data Pipelines
To pipeline the data into the chosen data model:
1. Create or connect to an Amazon Redshift Cluster.
2. Drop any existing tables.
3. Create the tables in the defined data model.
4. Load the data from Amazon S3 to the tables.
5. Run data quality check

## Step 4: Run Pipelines to Model the Data 
### 4.1 Create the data model


The necessary steps to create a new Amazon Redshift cluster are outlined in a seperate notebook, `Creating_Data_Warehouse.ipynb`. After creating the Redshift cluster, run `etl.py` to create and load the data model.

In [23]:
%run etl.py

Database connected
Dropping existing tables...
All existing tables dropped
Creating tables...
All tables created
Inserting data into tables...
All data inserted into tables


### 4.2 Data Quality Checks
To ensure the pipeline ran as expected, we first ensure that the tables are no empty.

In [30]:
# Connect to the database
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [25]:
# Ensures that for a given table, there exists records

table_names = ['immigrations', 'arrival_mode', 'countries', 'ports', 'states', 'visa_type', 'airport_codes', 'demographics']

for table in table_names:
    sql = 'SELECT COUNT(*) FROM {};'.format(table)
    cur.execute(sql)
    count = cur.fetchone()

    if (count[0] == 0):
        raise Exception('There are no records for table {}'.format(table))    
        

We also run a a few SQL enquries to ensure the database is working properly.

In [29]:
# 1. Top 10 city from where immigrants arrived

sql = """
        SELECT TOP 10 ports.port_name, COUNT(cicid) AS count
        FROM immigrations immi 
        JOIN ports ON immi.port = ports.port_code
        GROUP BY ports.port_name
        ORDER BY COUNT(cicid) DESC
"""

sqlio.read_sql_query(sql, conn)

Unnamed: 0,port_name,count
0,"NEW YORK, NY",6058033
1,"MIAMI, FL",4735010
2,"LOS ANGELES, CA",4187439
3,"SAN FRANCISCO, CA",2091507
4,"HONOLULU, HI",2064457
5,"NEWARK/TETERBORO, NJ",1707321
6,"CHICAGO, IL",1615110
7,"ORLANDO, FL",1569443
8,"HOUSTON, TX",1235650
9,"AGANA, GU",1231022


In [27]:
# 2. Comparing the total number of foreign born residents versus total number of arrival immigrations in each state of U.S.

sql = """
        WITH table1 AS (SELECT state_code AS state, SUM(foreign_born) AS total_foreign_born
                        FROM demographics
                        GROUP BY state_code),
              table2 AS (SELECT state, COUNT (cicid) AS total_immigrations
                          FROM immigrations
                          GROUP BY state)

        SELECT table1.state, table2.total_immigrations, table1.total_foreign_born
        FROM table1 
        JOIN table2 ON table2.state = table1.state
        ORDER BY table2.total_immigrations DESC
        LIMIT 10
"""

sqlio.read_sql_query(sql, conn)

Unnamed: 0,state,total_immigrations,total_foreign_born
0,FL,7571672,7845566
1,NY,6174793,17186873
2,CA,5928310,37059662
3,HI,2153992,506560
4,TX,1545945,14498054
5,NV,1274193,2406685
6,IL,964594,4632600
7,MA,952068,2573815
8,NJ,888263,2327750
9,WA,733776,2204810


In [28]:
# 3. Comparing total number of immigrants (arrived by air) versus total number of airports (large or medium) in each state of U.S.

sql = """
        WITH table1 AS (SELECT iso_region, COUNT(ident) AS number_of_airports
                        FROM  airport_codes
                        WHERE iso_country = 'US' AND (type = 'large_airport' OR type = 'medium_airport')
                        GROUP BY iso_region
                        ORDER BY COUNT (ident) DESC),
              table2 AS (SELECT state, COUNT (cicid) AS total_immigrations
                         FROM immigrations
                         WHERE arrival_mode = 1
                         GROUP BY state)

        SELECT table2.state, table2.total_immigrations, table1.number_of_airports
        FROM table1 
        JOIN table2 ON table2.state = table1.iso_region
        ORDER BY table2.total_immigrations DESC
        LIMIT 10
"""

sqlio.read_sql_query(sql, conn)

Unnamed: 0,state,total_immigrations,number_of_airports
0,FL,7500392,47
1,NY,5958773,26
2,CA,5828506,67
3,HI,2132758,17
4,TX,1494769,56
5,NV,1264391,12
6,IL,946468,16
7,MA,910989,11
8,NJ,836683,9
9,WA,546812,22


### 4.3 Data dictionary 

The data dictionary of the data model can be found in another notebook, `Data_Dictionary.ipynb`.

## Step 5: Complete Project Write Up

#### Tools and Technologies
* Spark and Pandas: They are used to load and explore the raw data stored on local directory. Given the data set is considerably small and structured, it can be fit in a local machine memory. Spark can also handle the situation where the size of the data was increase exponentially in the future. 
* Amazon S3 and Amazon Redshift: We store data in S3 bucket and process it using Redshift cluster. Both tools are scalable and efficient in handling multiple concurrent access.

#### Data Update Schedule
* *immigrants*: Since thousands of immigrations reach U.S. everyday, this table should be updated daily, provided the data is available.
* *demographics*: Since this data is based on U.S. Census Bureau's survey, it should be updated when new data is released by the bureau.
* *airport_codes*: As this information doesn't change rapidly, annual update would be an be appropriate choice.


#### Scenario Handling

* **The data was increased by 100x**  
As Amazon Redshift is simple and quickly to scale, we could use a higher specification of Redshift cluster, such as changing the number or type of nodes of the cluster. If the data couldn’t fit in local memory, we could use Spark hosted on Amazon EMR for the data exploration and data cleaning process.
* **The data populates a dashboard that must be updated on a daily basis by 7am every day**  
We could set up an Apache Airflow pipeline to automate the whole ETL process. The ETL pipeline could be triggered every morning to populate the dashboard.
* **The database needed to be accessed by 100+ people**  
Amazon Redshift is designed to provides consistently fast performance, even with thousands of concurrent queries. If necessary, we could enable [concurrency scaling feature](https://docs.aws.amazon.com/redshift/latest/dg/concurrency-scaling.html)  of Amazon Redshift, so that additional cluster capacity will be added to process an increase in concurrent read queries. 
