# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project is in fulfilment of an imagined  requirement to create a data pipeline to populate a data warehouse for the data scientist team at Widgets Inc. The data warehouse is dedicated to the purpose of collecting various data about state regions within the United States for the purpose of data analysis by the data science team.

This project has also been deployed to Github, and is available at XXXXX

The data pipeline will follow these essential steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
The data warehouse will be populated by Pyspark as an engine for various ETL jobs (Extract, Tranform, and Load). At some points, Pandas will be used for data frame manipulation. The Pyspark pipeline will produce source-of-truth tables and one analytics summary table in order to facilitate the jobs of the staff data scientist team.

#### Describe and Gather Data 
The data will be sourced from the following locations:
- Listing of states and state abbreviations for states in the United States of America, plus the District of Columbia. 
- I94 Immigration Data
- U.S. City Demographic Data

#### 1.1 Import libraries

In [24]:
import pandas as pd
import os
import configparser
import datetime as dt
import time
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear, avg, monotonically_increasing_id
from pyspark.sql import SparkSession, SQLContext, GroupedData, HiveContext
#from pyspark.sql.functions import col,isnan, when, count

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

#### 1.2 Import: State names and abbreviations
- Sourced from World Population Review (https://worldpopulationreview.com/states/state-abbreviations)
- Provided in JSON format
- 51 total records (50 states + District of Columbia)

In [25]:
json_url = "https://worldpopulationreview.com/static/states/abbr-name.json"
from pyspark import SparkFiles
spark.sparkContext.addFile(json_url)

states_json_df = spark.read.option("multiline","true").option("inferSchema","true").json("file://"+SparkFiles.get("abbr-name.json"))
states_pandas = states_json_df.toPandas()

statesArr = []
for val in states_pandas:
    statesArr.append({'state_code': str(val), 'state_name': states_pandas[val][0]})

states_pandas = pd.DataFrame(statesArr, columns=['state_code', 'state_name'])
states_df=spark.createDataFrame(states_pandas) 
states_df.show()

+----------+--------------------+
|state_code|          state_name|
+----------+--------------------+
|        AK|              Alaska|
|        AL|             Alabama|
|        AR|            Arkansas|
|        AZ|             Arizona|
|        CA|          California|
|        CO|            Colorado|
|        CT|         Connecticut|
|        DC|District Of Columbia|
|        DE|            Delaware|
|        FL|             Florida|
|        GA|             Georgia|
|        HI|              Hawaii|
|        IA|                Iowa|
|        ID|               Idaho|
|        IL|            Illinois|
|        IN|             Indiana|
|        KS|              Kansas|
|        KY|            Kentucky|
|        LA|           Louisiana|
|        MA|       Massachusetts|
+----------+--------------------+
only showing top 20 rows



#### 1.3 Import: I94 Immigration data.
- Provided by the the US National Tourism and Trade Office
- 3,096,313 total records
- CSV source file

In [26]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

file_name = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration_df = spark.read.format('com.github.saurfang.sas.spark').load(file_name)
immigration_df.show(n=5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

#### 1.4 Import: US city demographic data
- Provided by OpenSoft, via a US Census Bureau's 2015 American Community Survey.
- 2,891 total records
- CSV source file

In [27]:
### Load U.S. City Demographic Data
file_name = "us-cities-demographics.csv"
demographics_df = spark.read.csv(file_name, inferSchema=True, header=True, sep=';')

# display the first five records
demographics_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


### Step 2: Explore and Assess the Data
#### Explore the Data 
Below, we will explore the data to identify data quality issues, like missing values and duplicate data, and document steps necessary to clean the data.

#### Review schemas

In [28]:
### Temperature schema
states_df.printSchema()
immigration_df.printSchema()
demographics_df.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- state_name: string (nullable = true)

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string

#### Look for null values

In [29]:
def get_nulls(input_df):
    '''
    Looks for missing data in input dataframe, and returns a dataframe with the number of missing values in each column'
    
    Parameters
    ----------
    df : DataFrame
        input dataFrame
    '''

    input_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in input_df.columns]).show()

In [30]:
print("Found the following null values in States data")
get_nulls(states_df)
print("Found the following null values in Immigration data")
get_nulls(immigration_df)
print("Found the following null values in Demographics data")
get_nulls(demographics_df)

Found the following null values in States data
+----------+----------+
|state_code|state_name|
+----------+----------+
|         0|         0|
+----------+----------+

Found the following null values in Immigration data
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|    239| 152592| 142457|   802|      0|    0|       1| 1881250|3088187|    238| 13842

#### Look for duplicate values

In [31]:
print("Found " + str(states_df.groupBy("state_code").count().filter("count > 1").count()) + " duplicates from states data")
print("Found " + str(immigration_df.groupBy("cicid").count().filter("count > 1").count()) + " duplicates from immigration data")
print("Found " + str(demographics_df.groupBy("City", "State", "Race").count().filter("count > 1").count()) +  " duplicates from demographics data")

Found 0 duplicates from states data
Found 0 duplicates from immigration data
Found 0 duplicates from demographics data


#### Plan cleaning steps

We have shown that there are no duplicate values from the source data. Therefore, the only steps required for cleaning the data is to remove the null values.

### Step 3: Define the Data Model

#### 3.1 Conceptual Data Model
The data will be modelled as a Star Schema for better support for OLAP queries, i.e. for overall design elegance and optimized use in data analytics. The fact table and dimension tables are documented below.

##### Fact Table

*state_analysis*
- state_code
- avg_percent_foreign_born
- avg_percent_veterans
- avg_household_size
- avg_median_age
- recent_period_total_immigrants
- recent_period_total_immigrants_month
- recent_period_total_immigrants_year


##### Dimension Tables

*states*
- state_code
- title

*city_demographics*
- city
- state_code
- total_population
- total_foreign_born
- percent_foreign_born
- total_veterans
- percent_veterans
- household_size
- median_age

*i94_immigration_monthly*
- i94_id
- year
- month
- city_port_code
- state_code
- birth_year
- gender
- visa_type

#### 3.2 Mapping Out Data Pipelines

List the steps necessary to pipeline the data into the chosen data model

- Create dimension tables as subsets of the relevant data from source files.
- Perform data clean up, as following the requirements documented in Step 2.
- Create fact table, generated as summary analytics from dimension tables
- Save final parquet files from each table created.
- Run quality control checks on the final parquet files.
- Summarize output as data dictionary.

### Step 4: Run ETL to Model the Data

#### 4.1 Create dimension tables as subsets of the relevant data from source files.

In [32]:
# Create Spark SQL views
sqlContext = SQLContext(spark)

# Create Dimension tables
states_df.createOrReplaceTempView("states")
immigration_df.createOrReplaceTempView("i94_immigration_monthly_source")
demographics_df.createOrReplaceTempView("city_demographics_source")

#allow unlimited time for SQL joins and parquet writes.
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0")

In [33]:
immigration_df = spark.sql("""
    SELECT
        cicid as i94_id, i94yr as year, i94mon as month, i94port as city_port_code,
        i94addr as state_code, biryear as birth_year, gender, visatype as visa_type
    FROM i94_immigration_monthly_source
""").toDF('i94_id', 'year', 'month', 'city_port_code', 'state_code', 'birth_year', 'gender', 'visa_type')
immigration_df.createOrReplaceTempView("i94_immigration_monthly")
immigration_df.show(5)

+------+------+-----+--------------+----------+----------+------+---------+
|i94_id|  year|month|city_port_code|state_code|birth_year|gender|visa_type|
+------+------+-----+--------------+----------+----------+------+---------+
|   6.0|2016.0|  4.0|           XXX|      null|    1979.0|  null|       B2|
|   7.0|2016.0|  4.0|           ATL|        AL|    1991.0|     M|       F1|
|  15.0|2016.0|  4.0|           WAS|        MI|    1961.0|     M|       B2|
|  16.0|2016.0|  4.0|           NYC|        MA|    1988.0|  null|       B2|
|  17.0|2016.0|  4.0|           NYC|        MA|    2012.0|  null|       B2|
+------+------+-----+--------------+----------+----------+------+---------+
only showing top 5 rows



In [34]:
city_demographics_df = spark.sql("""
        SELECT
            City as city, `State Code` as state_code,
            `Total Population` as total_population, `Foreign-born` as total_foreign_born,
            (`Foreign-born` / `Total Population`) AS percent_foreign_born,
            `Number of Veterans` as total_veterans,
            (`Number of Veterans` / `Total Population`) AS percent_veterans,
            `Average Household Size` AS household_size, `Median Age` AS median_age
        FROM city_demographics_source
""").toDF('city', 'state_code', 'total_population', 'total_foreign_born', 'percent_foreign_born',
          'total_veterans', 'percent_veterans', 'household_size', 'median_age')
city_demographics_df.createOrReplaceTempView("city_demographics")
city_demographics_df.show(5)


+----------------+----------+----------------+------------------+--------------------+--------------+--------------------+--------------+----------+
|            city|state_code|total_population|total_foreign_born|percent_foreign_born|total_veterans|    percent_veterans|household_size|median_age|
+----------------+----------+----------------+------------------+--------------------+--------------+--------------------+--------------+----------+
|   Silver Spring|        MD|           82463|             30908|  0.3748105210821823|          1562|0.018941828456398628|           2.6|      33.8|
|          Quincy|        MA|           93629|             32935|  0.3517606724412308|          4147| 0.04429183265868481|          2.39|      41.0|
|          Hoover|        AL|           84839|              8229| 0.09699548556677944|          4819|0.056801706762220204|          2.58|      38.5|
|Rancho Cucamonga|        CA|          175232|             33878|  0.1933322680788897|          5821| 0.03

#### 4.2 Perform data clean up

In [35]:
# Drop nulls
city_demographics_df.na.drop()
immigration_df.na.drop()
states_df.na.drop()

DataFrame[state_code: string, state_name: string]

#### 4.3 Create fact table, generated as summary analytics from dimension tables

In [36]:
state_analysis_df = spark.sql("""
               
WITH
    i94_summary AS (
        SELECT
            state_code, COUNT(*) as recent_period_total_immigrants, 
            AVG(month) as recent_period_total_immigrants_month,
            AVG(year) as recent_period_total_immigrants_year
        FROM i94_immigration_monthly 
        GROUP BY state_code
    ),
    demographics_summary AS (
        SELECT 
            state_code,
            AVG(percent_foreign_born) as avg_percent_foreign_born,
            AVG(percent_veterans) as avg_percent_veterans,
            AVG(household_size) as avg_household_size,
            AVG(median_age) as avg_median_age
        FROM city_demographics 
        GROUP BY state_code
    )
    SELECT
        demographics_summary.*, recent_period_total_immigrants,
        recent_period_total_immigrants_year, recent_period_total_immigrants_month
    FROM demographics_summary
    INNER JOIN i94_summary
    ON demographics_summary.state_code = i94_summary.state_code
    ORDER BY demographics_summary.state_code
    

""").toDF('state_code', 'avg_percent_foreign_born', 'avg_percent_veterans', 'avg_household_size',
          'avg_median_age', 'recent_period_total_immigrants',
          'recent_period_total_immigrants_month', 'recent_period_total_immigrants_year'
)
state_analysis_df.createOrReplaceTempView("state_analysis")
state_analysis_df.show(5)

+----------+------------------------+--------------------+------------------+-----------------+------------------------------+------------------------------------+-----------------------------------+
|state_code|avg_percent_foreign_born|avg_percent_veterans|avg_household_size|   avg_median_age|recent_period_total_immigrants|recent_period_total_immigrants_month|recent_period_total_immigrants_year|
+----------+------------------------+--------------------+------------------+-----------------+------------------------------+------------------------------------+-----------------------------------+
|        AK|     0.11134434791342337| 0.09204037563400794|              2.77|             32.2|                          1604|                              2016.0|                                4.0|
|        AL|     0.04998828196121399| 0.06790675684503047|2.4300000000000006|36.16176470588235|                          8188|                              2016.0|                                4.0|


#### 4.4 Save final parquet files from each table created

In [37]:
# Write fact table to parquet
states_df.write.mode('overwrite').parquet("states")
immigration_df.write.mode('overwrite').parquet("immigration")
city_demographics_df.write.mode('overwrite').parquet("demographics")
state_analysis_df.write.mode('overwrite').parquet("state_analysis")

#### 4.5 Run quality control checks on the final parquet files

In [38]:
def compare_counts_df_parquet(input_df, parquetName):
    '''
    Looks for missing data in input dataframe, and returns a dataframe with the number of missing values in each column'
    
    Parameters
    ----------
    df : DataFrame
        input dataFrame
    '''
    areCountsEqual = input_df.count() == spark.read.parquet(parquetName).count()
    print("Counts for " + parquetName + (" are" if areCountsEqual else " are not") + " the same")

In [39]:
compare_counts_df_parquet(states_df, "states")
compare_counts_df_parquet(immigration_df, "immigration")
compare_counts_df_parquet(city_demographics_df, "demographics")
compare_counts_df_parquet(state_analysis_df, "state_analysis")

Counts for states are the same
Counts for immigration are the same
Counts for demographics are the same
Counts for state_analysis are the same


In [40]:
def compare_columns_df_parquet(input_df, parquetName):
    '''
    Looks for missing data in input dataframe, and returns a dataframe with the number of missing values in each column'
    
    Parameters
    ----------
    df : DataFrame
        input dataFrame
    '''
    parquet_fields = spark.read.parquet(parquetName).schema.fields
    df_fields = input_df.schema.fields

    areCountsEqual = input_df.count() == spark.read.parquet(parquetName).count()
    print("The schema fields for " + parquetName + (" are" if parquet_fields == df_fields else " are not") + " the same")

In [41]:
#Fix nullable column mismatch, which appears later in parquet
state_analysis_df.schema['recent_period_total_immigrants'].nullable = True

# Validate that schema is the same in parquet files, compared to data frame sources
compare_columns_df_parquet(states_df, "states")
compare_columns_df_parquet(immigration_df, "immigration")
compare_columns_df_parquet(city_demographics_df, "demographics")
compare_columns_df_parquet(state_analysis_df, "state_analysis")

The schema fields for states are the same
The schema fields for immigration are the same
The schema fields for demographics are the same
The schema fields for state_analysis are the same


#### 4.6 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from.


##### states

| Column    | Type    | Description                                     |
|-----------|---------|-------------------------------------------------|
| state_code | date    | Primary key. Full date in the format YYYY-MM-DD |
| title       | integer | Day of the month of full_date                   |


##### i94_immigration_monthly

| Column   | Type    | Description                             |
|----------|---------|-----------------------------------------|
| i94_id    | integer    | Id                                  |
| year    | integer | Year of arrival                          |
| month   | integer | Month of arrival                         |
| city_port_code   | integer | Entrant origin country code     |
| state_code  | string  | US state code of arrival             |
| birth_year  | date    | Birth year of applicant              |
| gender   | string  | Gender / sex of entrant                 |
| visatype | string  | Type of visa granted                    |

##### city_demographics

| Column            | Type    | Description                                                |
|-------------------|---------|------------------------------------------------------------|
| city              | string  | US city name                                               |
| state_code        | string  | State where the city is located                            |
| total_population  | integer | Total population in city                                   |
| total_foreign_born| integer | Number of people born outside the US                       |
| percent_foreign_born  | double | Percent of people born outside the US                  |
| total_veterans    | integer | Total veteran population in city                           |
| percent_veterans    | double | Percent veteran population in city                       |
| household_size    | integer | Average size of the household                              |

##### state_analysis

| Column   | Type    | Description                             |
|----------|---------|-----------------------------------------|
| state_code                           | integer | State abbreviation                         |
| avg_percent_foreign_born             | float   | Average of cities foreign born percent     |
| avg_percent_veterans                 | float   | Average of cities veterans percent         |
| avg_household_size                   | float   | Average of cities household size           |
| avg_median_age                       | integer | Average of cities median age               |
| recent_period_total_immigrants       | integer | Total number of immigrants in recent period|
| recent_period_total_immigrants_month | integer | Recent period month from i94 data          |
| recent_period_total_immigrants_year  | integer | Recent period year from i94 data           |

### Step 5: Complete Project Write Up

#### 5.1 Rationale for choice of tools and technologies

- Python has a number of libraries available for data engineering, and a robust set of built in tools for data processing.
- Pandas facilitates powerful manipulation of data frames.
- Spark is a specialized tool for manipulating large data sets from a wide array of possible different sources. This enables efficient data wrangling and scaling potential with the use of clusters of nodes.

In the future, we may also incorporate Apache Airflow for better management and monitoring of DAG pipelines.

#### 5.2 How often data should be updated

That data used in this sample is now quite dated, and is intended for demonstration purposes only. For actual production scenarios, new data sets should be downloaded and provided for the pipelines on a periodic basis. The demographics data is generated from census data, which is provided every 10 years; therefore this data should be refreshed every 10 years. The immigration data contains information for one month; therefore, this data should be refreshed monthly, as new monthly data is continuously published.

#### 5.3 Scenario: Data increased by 100x

Because we are using Spark, we have the option of deploying additional nodes so that we can handle more compute intensive requirements, such as source data being increased by 100 fold. Additionally, we may use EC2 clusters for additional compute power if our current infrastructure does not support the increased load. However, the use of cloud technologies such as EC2 will also imply additional operational expenses.

#### 5.4 Scenario: Data populates a dashboard that must be updated on a daily basis by 7am every day

In order for us to populate a dashboard that must be updated on a daily basis at 7 am every day, we have two main options. The first option is to  use a task scheduler like crontab to automatically run this Python script at the desired schedule. Another option is to make use of Apache Airflow. Aside from the other benefits of Apache Airflow previously indicated, it also has native support for running pipelines on a preset schedule.

#### 5.5 Scenario: Database needs to be accessed by 100+ people

Because we have made use of the Star Schema, this data is already optimized for use by a large number of people, such as more than 100 people. Note that we would need to define user roles for this audience, such as Read only access. In case we would like to further optimize accessibility, we may also consider hosting this data in the cloud. This option could be particularly useful if the business does not already have the infrastructure in place for hosting databases for access by a wide number of people. However, again please note that the use of cloud technologies will also imply increased operational expenses.