# Project Title
### Data Engineering Capstone Project

#### Project Summary
* The purpose of this project is to understand the immigration trends using a data pipeline created using Spark and Parquet file format

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Importing libraries, packages and modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import col, isnan, when, trim
from pyspark.sql.functions import *
from Load_Source import LoadSource
from Clean_Source import CleanSource
from Transform_Source import TransformSource
from Model_Source import ModelSource
from Build_Pipeline import BuildPipeline
from Check_Validity import CheckValidity

In [2]:
# Spark session initialize
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 1: Project Scope and Data Gathering
* I have used the following 3 data sources:
 * I94 Immigration 
 * U.S. City Demographic
 * Airport Code Table
* I used the above data sources to create a data model used to understand immigration patterns in the US. 
* Tool/Framework used was Spark in parquet file format to extract, transform and build data model used to understand immigration patterns. Spark is used because of the ability to process massive datasets and parquet file format for efficient querying due to columnar storage format.

#### Data Description
* Following is the brief description of where the data sources come and what information does it include:
 * I94 Immigration Data: This data comes from the US National Tourism and Trade Office. The data contains international
   visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of
   transportation, age groups, states visited (first intended address only), and the top ports of entry (for select 
   countries)
 * U.S. City Demographic Data: This data comes from OpenSoft. This dataset contains information about the demographics of
    all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US 
    Census Bureau's 2015 American Community Survey.
 * Airport Code Table: This is a simple table of airport codes and corresponding cities.

* Refer the Load_Source.py file for related class and methods to load sas data and different csv files

In [3]:
SrcLd1 = LoadSource("input_sources/sas_data",spark)
immi_df  = SrcLd1.load_sas_data()
# Cache data
immi_df.cache()

SrcLd2 = LoadSource("input_sources/us-cities-demographics.csv",spark)
demog_df  = SrcLd2.load_csv_data()

SrcLd3 = LoadSource("input_sources/airport-codes_csv.csv",spark)
airport_df  = SrcLd3.load_csv_data(",")

In [4]:
immi_df.show(10)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [5]:
demog_df.show(10)

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|       Alabama|      38.5|          3

In [6]:
airport_df.show(10)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

### Step 2: Data Cleaning Steps
* Following are the steps taken in order to clean the data
* Drop missing values
* Drop NAN
* Select Unique records
* Rename specific columns
* Select specific columns for each data

* Refer the Clean_Source.py file for related class and methods to clean various source files using the above steps

In [7]:
clean = CleanSource(airport_df,immi_df,demog_df)
airport_clean = clean.clean_airport()
immi_clean = clean.clean_immigration()
immi_clean.cache()
demog_clean = clean.clean_demographics()

Total records in Airport-Codes table before cleaning is 55075
Total records in Airport-Codes table after cleaning is 28221
Total records in SAS-immigration table before cleaning is 3096313
Total records in SAS-immigration table after cleaning is 2431703
Total records in US-demographics table before cleaning is 2891
Total records in US-demographics table after cleaning is 2875


In [8]:
immi_clean.show(5)

+------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+------+--------+--------+
| cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|gender|visatype|dtadfile|
+------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+------+--------+--------+
| 258.0|2016.0|   4.0| 103.0| 103.0|    NYC|20545.0|    1.0|     NY|20554.0|  55.0|    2.0|     M|      WT|20160401|
| 569.0|2016.0|   4.0| 103.0| 103.0|    SFR|20545.0|    1.0|     CA|20562.0|  28.0|    2.0|     M|      WT|20160401|
|1080.0|2016.0|   4.0| 104.0| 104.0|    NEW|20545.0|    1.0|     NY|20552.0|  18.0|    2.0|     F|      WT|20160401|
|1393.0|2016.0|   4.0| 104.0| 104.0|    NYC|20545.0|    1.0|     NY|20550.0|  15.0|    2.0|     F|      WT|20160401|
|1417.0|2016.0|   4.0| 104.0| 104.0|    NYC|20545.0|    1.0|     NY|20551.0|  47.0|    2.0|     F|      WT|20160401|
+------+------+------+------+------+-------+-------+-------+----

In [9]:
demog_clean.show(5)

+-------------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------+--------------------+------+
|         City|     State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|state_code|                Race| Count|
+-------------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------+--------------------+------+
|      Phoenix|   Arizona|      33.8|         786833|           776168|         1563001|             72388|      300702|        AZ|  Hispanic or Latino|669914|
|   Round Rock|     Texas|      34.9|          56646|            59193|          115839|              6804|       18237|        TX|Black or African-...| 13636|
|   Union City|California|      38.5|          38599|            35911|           74510|              1440|       32752|        CA|Black or African-...|  5508|
|Santa Clarita|California|      38.1|   

In [10]:
airport_clean.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+----------+--------------------+
| 00IS|small_airport|Hayenga's Cant Fi...|         820|       NA|         US|     US-IL|      00IS|-89.1229019165039...|
| 03ID|small_airport|Flying Y Ranch Ai...|        3180|       NA|         US|     US-ID|      03ID|-116.532997131347...|
| 09NY|     heliport|Spring Lake Fire ...|         254|       NA|         US|     US-NY|      09NY|-74.0488967895507...|
|  0L4|small_airport|Lida Junction Air...|        4684|       NA|         US|     US-NV|       0L4|-117.191001892089...|
| 16TS|small_airport|  Pineridge STOLport|         420|       NA|         US|     US-TX|      16TS|-95.3180007934570...|
+-----+-------------+-----------

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
* Used the following star schema
* Fact Table
 * immi_fact (**i94addr,i94port,visa_code,visa_mode**, cicid, i94cit, i94res, arrdate,  depdate, i94bir, gender, visatype, dtadfile,  arrival_yr, arrival_mon)
* Dimension Tables
    * demog_dim (**state_code**,race,male_pop,female_pop,total_pop,foreign_born_pop,veteran_pop)
    * airport_dim (**local_code**,ident,type,name,elevation_ft,continent,iso_country,iso_region,coordinates)
* visa_code - mapped i94visa column into corresponding values using I94_SAS_Labels_Descriptions file
* visa_mode - mapped i94mode into corresponding values using I94_SAS_Labels_Descriptions file


* Reason for the model choice:
 * The star schema separates business process data into facts, and measurable descriptive attributes related to the fact
   table into dimension tables
 * I wanted to analyze the following trends using the above data model:
     * Determine male vs female count arriving at different airport types in US per month? (Use immi_fact and airport_dim)
     * Determine which airport ports have the maximum number of immigrants arrival per month? (Use immi_fact and airport_dim)
     * Determine trends between foreign_born population and the number of immigrants arrival per US state? 
       (Use immi_fact and demog_dim)
     * Determine trends between male_population and number of men arrived in US per state? (Use immi_fact and demog_dim)
     * Determine trends between female_population and number of females arrived in US per state? (Use immi_fact and demog_dim)
 * Thus by using the above star schema model, based on the above analysis we can determine key trends and immigration
   activity in US which can guide key business decisions

#### 3.2 Mapping Out Data Pipelines
* Following are the steps taken to pipeline the data
* Transformations
   1. Transform the clean US demographics data to Group by state_code and race to get corresponding metrics
   2. Transform the i94visa into the following categories of visa codes as per I94_SAS_Labels_Descriptions file:
        * 1 = Business
        * 2 = Pleasure
        * 3 = Student
   3. Transform the i94mode into the following categories of visa modes as per I94_SAS_Labels_Descriptions file:
        * 1 = 'Air'
        * 2 = 'Sea'
        * 3 = 'Land'
        * 9 = 'Not reported'          
* Model Data 
   1. Model fact table using primary key and foreign key relation
   2. Generate fact table
   3. Generate dimensions table   
* Build Pipelines
   1. Generate immigration fact table in parquet file format partitioned by "i94addr","arrival_yr","arrival_mon"
   2. Generate demographics dimensions table in parquet file format partitioned by "state_code","race"
   3. Genrate airport dimensions table in parquet file format partitioned by "local_code","iso_region"

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

* Refer the Transform_Source.py file for related class and methods to transform source files using the above steps

In [11]:
# Transform Data
transformer = TransformSource(immi_clean,demog_clean,airport_clean)
demog_trans = transformer.transform_demog()
immi_trans = transformer.transform_immi()
demog_trans.show(5)
immi_trans.show(5)

+----------+--------------------+--------+----------+---------+----------------+-----------+
|state_code|                race|male_pop|female_pop|total_pop|foreign_born_pop|veteran_pop|
+----------+--------------------+--------+----------+---------+----------------+-----------+
|        NJ|American Indian a...|  600089|    615303|  1215392|          419638|      25852|
|        KY|American Indian a...|  452483|    477394|   929877|           66488|      56025|
|        OK|Black or African-...|  714573|    734422|  1448995|          151174|      95468|
|        CA|American Indian a...|12019895|  12288108| 24308003|         7310648|     909205|
|        NH|               White|   97771|    100427|   198198|           27199|      11005|
+----------+--------------------+--------+----------+---------+----------------+-----------+
only showing top 5 rows

+------+------+------+-------+-------+-------+-------+------+------+--------+--------+---------+---------+----------+-----------+
| cicid|

* Refer the Model_Source.py file for related class and methods to model source files using the above steps into a star schema

In [12]:
model = ModelSource(immi_trans,demog_trans,airport_clean)
immi_fact,demog_dim,airport_dim = model.star_model()

In [13]:
immi_fact.show(5)

+------+------+------+-------+-------+-------+-------+------+------+--------+--------+---------+---------+----------+-----------+
| cicid|i94cit|i94res|i94port|arrdate|i94addr|depdate|i94bir|gender|visatype|dtadfile|visa_code|visa_mode|arrival_yr|arrival_mon|
+------+------+------+-------+-------+-------+-------+------+------+--------+--------+---------+---------+----------+-----------+
| 569.0| 103.0| 103.0|    SFR|20545.0|     CA|20562.0|  28.0|     M|      WT|20160401| Pleasure|      Air|      2016|          4|
|1080.0| 104.0| 104.0|    NEW|20545.0|     NY|20552.0|  18.0|     F|      WT|20160401| Pleasure|      Air|      2016|          4|
|1793.0| 104.0| 104.0|    BLA|20545.0|     CA|20551.0|  26.0|     M|      WT|20160401| Pleasure|     Land|      2016|          4|
|3000.0| 108.0| 108.0|    NEW|20545.0|     NY|20553.0|  26.0|     F|      WT|20160401| Pleasure|      Air|      2016|          4|
|3474.0| 108.0| 108.0|    SFR|20545.0|     AZ|20553.0|  13.0|     M|      WT|20160401| Ple

In [14]:
demog_dim.show(5)

+----------+--------------------+--------+----------+---------+----------------+-----------+
|state_code|                race|male_pop|female_pop|total_pop|foreign_born_pop|veteran_pop|
+----------+--------------------+--------+----------+---------+----------------+-----------+
|        NJ|American Indian a...|  600089|    615303|  1215392|          419638|      25852|
|        KY|American Indian a...|  452483|    477394|   929877|           66488|      56025|
|        OK|Black or African-...|  714573|    734422|  1448995|          151174|      95468|
|        CA|American Indian a...|12019895|  12288108| 24308003|         7310648|     909205|
|        NH|               White|   97771|    100427|   198198|           27199|      11005|
+----------+--------------------+--------+----------+---------+----------------+-----------+
only showing top 5 rows



In [15]:
airport_dim.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+----------+--------------------+
| 00IS|small_airport|Hayenga's Cant Fi...|         820|       NA|         US|     US-IL|      00IS|-89.1229019165039...|
| 03ID|small_airport|Flying Y Ranch Ai...|        3180|       NA|         US|     US-ID|      03ID|-116.532997131347...|
| 09NY|     heliport|Spring Lake Fire ...|         254|       NA|         US|     US-NY|      09NY|-74.0488967895507...|
|  0L4|small_airport|Lida Junction Air...|        4684|       NA|         US|     US-NV|       0L4|-117.191001892089...|
| 16TS|small_airport|  Pineridge STOLport|         420|       NA|         US|     US-TX|      16TS|-95.3180007934570...|
+-----+-------------+-----------

* Refer the Build_Pipeline.py file for related class and methods to generate output files using the above mentioned steps

In [16]:
build = BuildPipeline(immi_fact,demog_dim,airport_dim,"/output_stg")
build.gen_immi_fact()
build.gen_demog_dim()
build.gen_airport_dim()

Begin:writing Immigration Fact table to location /output_stg
End:File written to /output_stg
Begin:writing Demog Dimension table to location /output_stg
End:File written to /output_stg
Begin:writing Airport Dimension table to location /output_stg
End:File written to /output_stg


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

* Refer the Check_Validity.py file for related class and methods to check validity of files using the above mentioned steps

In [17]:
validity = CheckValidity(spark,"/output_stg/immi_fact.parquet","/output_stg/demog_dim.parquet","/output_stg/airport_dim.parquet")
validity.check_demog_dim()
validity.check_airport_dim()
validity.check_fact_table()
validity.check_integrity()

Distinct records in demog_dim is 240
Distinct state_code values in demog_dim Table is 48
Checking for unique records
Checking if primary key is not null
Table check complete
Distinct records in airport_dim is 28221
Distinct local_code values in airport_dim Table is 26999
Checking for unique records
Checking if primary key is not null
Table check complete
Distinct records in immi_fact is 1536265
Distinct i94addr values in immi_fact Table is 48
Checking for unique records
Checking if primary key is not null
Table check complete
Checking integrity constraints
Tests passed


True

#### 4.3 Data dictionary 

* Please refer to the Data_Dictionary.md file

#### Step 5: Complete Project Write Up
* I used Apache Spark for ETL process because we can load the Petabytes of data and can process it without any hassle by setting up multi-node clusters.
* Data needs to be updated every month since immigration fact table is partitioned by month. However if required we can also have weekly granularity, we can partition it weekly and hence update it respectively.
* I would use the following approch under the following scenarios:  
    * <b>The data was increased by 100x</b> - I would use AWS + Spark to store input sources and output sources in s3 buckets
      along with appropriately partitoned columns   
    * <b>The data populates a dashboard that must be updated on a daily basis by 7am every day</b> - I would use AWS + Spark + 
      Airflow to run the ETL job by 7 am everyday to visualize ETL workflows and debug when necessary.
    * <b>The database needed to be accessed by 100+ people</b> - I would use AWS + Spark + Airflow and store output tables in 
      Redshift so that 100+ people can run/access Analytical queries