# ETL Pipeline Comparing United States Immigration Data to City Demographics
### Data Engineering Capstone Project

#### Project Summary
The Capstone project creates an ETL pipeline consisting of data from the [I94 immigration data](https://travel.trade.gov/research/reports/i94/historical/2016.html) and [US cities demographics data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). The data is combined into a database that can be queried to answer questions concerning possible links between imigration and destination city and/or state demographics.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf

### Step 1: Scope the Project and Gather Data

#### Scope 
The project involves the selection of I94 immigration data to form the first dimension table.  The second dimension table utilizes US cities demographic data.  The two dimension tables will be joined on the state columns to form a fact table.  This fact table will display the total number of immigrants for each state and each states total population.  The fact table is designed to answer questions such as:
* Which states have the most incoming immigrants?
* Is there some correlation between number of immigrants and state populations?

Spark is used as the engine to process data in the various tables.

#### Describe and Gather Data 
The  [I94 immigration data](https://travel.trade.gov/research/reports/i94/historical/2016.html) is sourced from the US National Tourism and Trade Office. The format of the data is in SAS7BDAT which is a binary database storage format. 

The [US cities demographics data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/) is sourced from OpenSoft.  This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.  This data comes from the US Census Bureau's 2015 American Community Survey.

In [2]:
# Read into Pandas April 2016 immigration data
fname = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
df_immigration = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
# Display some rows from the April 2016 imigration data for exploration and verification
df_immigration.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [4]:
# Read in the city demographics data
fname = "./us-cities-demographics.csv"
df_cities = pd.read_csv(fname, sep=';')

In [5]:
# Display some temperature data rows for exploration and verification
df_cities.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [6]:
# Create a spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [7]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
There were values in the immigration data for the i94addr (State abbreviations) that contained Null (NaN) values.  This is an important column as it is the join conditon for creating the fact table; therefore, it was important to filter out Null values.  It was also stated in the documentation (I94_SAS_Labels_Descriptions.SAS) that a value of "99" was applied to rows proven to be invalid for this column value.  These values were also filtered out.  

For the US demographic data, any state code column values having a Null (NaN) value were filtered out as this column is used to join in the fact table creation.

#### Cleaning Steps
Document steps necessary to clean the data

In [8]:
# Clean the immigration data

def clean_immigration_data(file):
    '''
        Input: The path to the immigration file
        Output: Spark dataframe with valid columns of immigration data
    '''
    df_spark = spark.read.format('com.github.saurfang.sas.spark').load(file)
    df_spark = df_spark.filter((df_spark.i94addr.isNotNull()) & (df_spark.i94addr != "99") & (df_spark.visatype.isNotNull()))
    return df_spark

In [9]:
# Test the cleaning of the data

test_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
df_test = clean_immigration_data(test_file)
df_test.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| null|      G|   null|      Y|   null| 1991.0|     D/S|     M|  null|   null|  3.73679633E9|00296|      F1|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    nul

In [10]:
# Clean the city data
cities_file = "./us-cities-demographics.csv"
df_cities = spark.read.load(cities_file, format="csv", sep=";", inferSchema="true", header="true")

# Remove NULLs if found
df_cities = df_cities.filter(df_cities['State code'].isNotNull()) 

In [11]:
# Test the cleaning of the data
df_cities.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The immigration data dimension table contains data from the I94 immigration data.  The following columns are extracted from the source data:
* i94yr   = year (4 digit value)
* i94mon  = month (numeric value)
* i94addr = destinatin state (State abbreviation) 
* i94cit  = origin city (3 digit value)
* i94visa = Class of admission (text)

The demographic data dimension table contains data from the US city demographic data set.  The following columns are extracted from the source data:
* State Code = State abbreviation (text)
* City = (text)
* Race = (text)
* Median Age = (decimal)
* Male Population = (text)
* Female Population = (text)
* Number of Veterans = (integer)
* Foreign-born = (integer)
* Count = (integer)

The fact table contains the joined information from the immigration and demographic dim tables.  The following columns compose the fact table:
* State = (text)
* Total_Immigrants = Total number of immigrants for that particular state (integer)
* Total_Population = Total population of that state (based on cities with populations > 65K) (integer)

#### 3.2 Mapping Out Data Pipelines
1. Clean the source data.
    * Call method clean_immigration_data on source data to clean I94 immigration data which creates Spark dataframe.
    * From Step 2 clean the source city data which creates Spark dataframe df_cities.
2. Create the immigration dimension table by selecting stated columns in section 3.1 from the Spark dataframe.  Write to parquet files which are partitioned by State.
3. Create the demographics dimension table by selecting stated columns in section 3.1 from the Spark dataframe. Write to parquet files which are partitioned by State_Code. 
4. Create the fact table by joining the immigration and demographic dimension tables on the State and State_Code columns.  Write the results to parquet files partitioned by State.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [12]:
# Switch out the variables if you want a larger set of data
#immigration_data = '/data/18-83510-I94-Data-2016/*.sas7bdat'
im_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# Clean I94 immigration data and store as Spark dataframe
df_im = clean_immigration_data(im_data)

# Extract columns, groupby and/or aggregate data for the dimension table
im_dim_table = df_im.select(sf.col("i94yr").alias("Year"),
                            sf.col("i94mon").alias("Month"),
                            sf.col("i94addr").alias("State"), 
                            sf.col("i94cit").alias("origin_city"),
                            sf.col("visatype"))

# Write the immigration dim table to parquet files partitioned by State
im_dim_table.write.mode("append").partitionBy("State").parquet("/results/immigration_data.parquet")


In [13]:
# Select a subset of the columns for the dim table
cities_table = df_cities.select(sf.col("State Code").alias("State_Code"), 
                             sf.col("City"), 
                             sf.col("Race"),
                             sf.col("Median Age").alias("Median_Age"),
                             sf.col("Male Population").alias("Male_Population"),
                             sf.col("Female Population").alias("Female_Population"),
                             sf.col("Number of Veterans").alias("Veterans"),
                             sf.col("Foreign-born").alias("Foreign_born"),
                             sf.col("Count"))

# Write the state dem states table to parquet files partitioned by State Code
cities_table.write.mode("append").partitionBy("State_Code").parquet("/results/state_counts.parquet")

In [14]:

# Create some temp views
im_dim_table.createOrReplaceTempView("immigration_view")
cities_table.createOrReplaceTempView("demographic_view")

# Create the fact table 
fact_table = spark.sql('''
SELECT iv.State,
       iv.Total_Immigrants,
       dv.Total_Population
FROM (SELECT State, COUNT(*) AS Total_Immigrants FROM immigration_view GROUP BY State) iv
JOIN (SELECT State_Code, SUM(Count) AS Total_Population FROM demographic_view GROUP BY State_Code) dv
    ON (iv.State = dv.State_Code)
ORDER BY dv.Total_Population DESC
''')

# Write fact table to parquet files partitioned by State
fact_table.write.mode("append").partitionBy("State").parquet("/results/fact.parquet")
fact_table.show(5)

+-----+----------------+----------------+
|State|Total_Immigrants|Total_Population|
+-----+----------------+----------------+
|   CA|          470386|        31753718|
|   TX|          134321|        20029645|
|   NY|          553677|        11377068|
|   FL|          621701|         8664477|
|   AZ|           20218|         5754881|
+-----+----------------+----------------+
only showing top 5 rows



#### 4.2 Data Quality Checks
A data quality check method, data_quality_check, was created to look for the table counts.  The check simply verifies that the supplied Spark dataframe has at least 1 row of data.


In [15]:
def data_quality_check(df, desc):
    '''
    Input: Spark dataframe, description of Spark datafram
    Output: Print outcome of data quality check
    '''
    
    result_counts = df.count()
    if result_counts == 0:
        print("Data quality check failed for {} with zero records".format(desc))
    else:
        print("Data quality check passed for {} with {} records".format(desc, result_counts))
    return 0

# Perform data quality check for dim and fact tables
data_quality_check(im_dim_table, "immigration table")
data_quality_check(cities_table, "demographic table")
data_quality_check(fact_table, "fact table")

Data quality check passed for immigration table with 2943669 records
Data quality check passed for demographic table with 2891 records
Data quality check passed for fact table with 49 records


0

#### 4.3 Data dictionary 
The immigration data dimension table contains data from the I94 immigration data: 
* i94yr   = year (4 digit value)
* i94mon  = month (numeric value)
* i94addr = destinatin state (State abbreviation) 
* i94cit  = origin city (3 digit value)
* i94visa = Class of admission (text)

The demographic data dimension table contains data from the US city demographic data set: 
* State Code = State abbreviation (text)
* City = City name (text)
* Race = Racial category (text)
* Median Age = Median age  for the particular city (decimal)
* Male Population = The total number of males in the population for the particular city (text)
* Female Population = The total number of females in the population for the particular city  (text)
* Number of Veterans = Number of military veterns for the particular city  (integer)
* Foreign-born = Number of foreign born for the particular city  (integer)
* Count = Total number of people based on their racial category (integer)

The fact table contains the joined information from the immigration and demographic dim tables.  The following columns compose the fact table:
* State = State abbreviation (text)
* Total_Immigrants = Total number of immigrants for that particular state (integer)
* Total_Population = Total population of that state (based on cities with populations > 65K) (integer)

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
    * Spark was selected as the backbone technology for the project as it handles multiple large data file formats (SAS and CVS files).  The project also uses Spark SQL to process the large input data into dataframes.

* Propose how often the data should be updated and why.
    * Since the immigration data set comes with a monthly subscription model that contains the latest month and year-to-date, updating the data monthly is suggested.  A higher frequency would not provide any new information at the expense of extra overhead.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     * With an increase of 100x the amount of data coming from the immigration dataset may prove to be prohibitly large to process in single batches.  Other third party tools may be required to help alleviate that increased data size.  We could consider using Spark in clustor mode.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     * Airflow might be a good choice if a daily update on a set schedule is required.  Airflow could manange the ETL pipeline and scheduling of the updating of the data.
 * The database needed to be accessed by 100+ people.
     * The parquet files could be published to HDFS where the users could be granted access to read the data located there.