# Project Title
### Data Engineering Capstone Project

#### Author and date
Kristina Matiukhina, 20-Feb-2020

#### Project Summary
The purpose of the data engineering capstone project is to give students a chance to combine what they've learned throughout the program.

In this project, students can choose to complete the project provided for them, or define the scope and data for a project of their own design. Either way, they'll be expected to go through the same steps outlined below.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
import configparser
import os
from datetime import datetime
from pyspark.sql import SparkSession, types as T
from pyspark.sql.functions import udf, to_date, split, year, length
from pyspark import SparkContext

### Step 1: Scope the Project and Gather Data

#### Scope 
The analytics team wants to know insights about:
- What are top visa type, type of landing (land, sea, etc), entry point (port) and city and how they change over months?

To answer this question we will use Spark to explore, clean and aggregate data to answer this question. Ideally, information is going to be analyzed every year (this is how often a new report is posted on the US National Travel and Tourism Office website) and appeneded to the monthly_analysis file in the analytics folder. Data is not partitioned and just appended, because we don't expext this file growing enourmously - every year it will add 12 new records with 6 columns - month, year, visa type, type of landing, entry point, city.


#### Describe and Gather Data 
US government stores information about visitors. Information contains high-level description of the visitor like age, gender, visa type or occupation they will have in US. It also contains the entry point and type of travelling they used to land into the country, and updated information about when the visitor left. This information is provided on US National Travel and Tourism Office website https://travel.trade.gov.

In addition, we have lists of all CBP codes represented by 3 letters and corresponding city and state. The file is created from https://redbus2us.com/travel/usa/us-customs-and-border-protection-cbp-codes-port-of-entry-stamp/.

In [2]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').\
    load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_spark.head()

Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2')

### Step 2: Explore and Assess the Data
#### Explore the Data 
##### US immigration data
This set has not very intuitive column names. It also has values which heavily depend on data dictionary provided with the set - I94_SAS_Labels_Descriptions. 

Arrdate is in sas format, so you use it we should convert it to days and add to 01-01-1960. 
``` 
get_datestamp = udf(lambda x: pd.to_timedelta(x, unit='d') + pd.datetime(1960, 1, 1), T.DateType()) 
a = a.withColumn("arrival_date", get_datestamp(a.arrdate))
a.where("month(arrival_date) != month and year(arrival_date) != year").take(5)
```
However, after exploration of data, there is no discrepency between i94yr and i94mon and month(arrdate) and year(arr_date). So, for our aggregation we can use i94yr and i95mon without complex manipulation of arrdate column.

For some columns we will also need to use data dictionary - I94_SAS_Labels_Descriptions.sas - to translate short values into more informational ones. We are going to use it for i94mode and i94visa.

For translating i94port, we will go to use CPBcodes.csv as a dictionary ans second source of data.

##### CBP codes data
This is pretty straight-forward data source. It contains CPB codes and location. City and state are stored in the same column "location", so we need to split them into two. Also some locations are empty or have more then 2 letter code for states, so we will need investigate and clean up.

#### Cleaning Steps
##### US immigration data
Filter records with 'not reported' mode or with value 9.0.

In [3]:
df_spark = df_spark.filter(df_spark["i94mode"] != 9.0)

Select columns required for aggregations.

In [4]:
immig_df = df_spark.selectExpr("int(cicid) as id", "int(i94yr) as year", \
    "int(i94mon) as month", "i94port as port", "int(i94mode) as i94mode", \
    "int(i94visa) as i94visa")

Replace i94mode and i94visa with values from data dictionary.

In [5]:
# replace i94mode data with values from dictionary
get_landing_type = udf(lambda x: 
                       "Air" if x == 1 else
                       "Sea" if x == 2 else
                       "Land" if x == 3 else
                       "Not reported", T.StringType()) 
immig_df = immig_df.withColumn("landing_type", 
                               get_landing_type(immig_df.i94mode))

# replace i94visa data with values from dictionary
get_visa_type = udf(lambda x: 
                       "Business" if x == 1 else
                       "Pleasure" if x == 2 else
                       "Student" if x == 3 else
                       "Not reported", T.StringType()) 
immig_df = immig_df.withColumn("visa_type", 
                               get_visa_type(immig_df.i94visa))

In [6]:
immig_df = immig_df.selectExpr("id", "year", "month", "port", "landing_type",
                               "visa_type")
immig_df.take(10)

[Row(id=7, year=2016, month=4, port='ATL', landing_type='Air', visa_type='Student'),
 Row(id=15, year=2016, month=4, port='WAS', landing_type='Air', visa_type='Pleasure'),
 Row(id=16, year=2016, month=4, port='NYC', landing_type='Air', visa_type='Pleasure'),
 Row(id=17, year=2016, month=4, port='NYC', landing_type='Air', visa_type='Pleasure'),
 Row(id=18, year=2016, month=4, port='NYC', landing_type='Air', visa_type='Business'),
 Row(id=19, year=2016, month=4, port='NYC', landing_type='Air', visa_type='Pleasure'),
 Row(id=20, year=2016, month=4, port='NYC', landing_type='Air', visa_type='Pleasure'),
 Row(id=21, year=2016, month=4, port='NYC', landing_type='Air', visa_type='Pleasure'),
 Row(id=22, year=2016, month=4, port='NYC', landing_type='Air', visa_type='Business'),
 Row(id=23, year=2016, month=4, port='NYC', landing_type='Air', visa_type='Pleasure')]

Store data in parquet file partitioned by year and month.

In [7]:
immig_df.write.partitionBy("year", "month").\
    parquet("immigration_data/", mode="ignore")

##### CBP codes data
Read csv data and filter codes which expired before considered year (in this project it is 2016).

In [8]:
cbp_df = spark.read.load("CBP_codes.csv", format="csv", sep=",", header="true")
cbp_df.head(6)

[Row(Code='ABE', Location='Aberdeen, WA'),
 Row(Code='ABQ', Location='Albuquerque, NM'),
 Row(Code='ADT', Location='Amistad Dam, TX'),
 Row(Code='ALP', Location='Alpena, MI'),
 Row(Code='AGN', Location='Algonac, MI'),
 Row(Code='AKR', Location='Akron, OH')]

Split "Location" into "city" and "state_code" and delete all records which  

In [9]:
split_col = split(cbp_df['Location'], ', ')
cbp_df = cbp_df.withColumn('city', split_col.getItem(0))
cbp_df = cbp_df.withColumn('state_code', split_col.getItem(1))

Find all state codes which didn't clean properly - lenght is not 2.

In [12]:
a = cbp_df.withColumn("ln", length("state_code"))
a.select("city", "state_code").where("ln != 2").toPandas()

Unnamed: 0,city,state_code
0,Saipan Island,Saipan
1,Agana,Guam


Clean 42 records which we found in previous step and, just in case, all canadian states

In [13]:
clean_states = udf(lambda x: \
    "TX" if x == "B&M Bridge" 
                   or x == "Fort Duncan Bridge" 
                   or x == "TX Bridge of Americas" 
                   or x == "Gateway Bridge" 
                   or x == "TX Juarez-Lincoln Bridge" 
                   or x == "Columbia Bridge" 
                   or x == "World Trade Bridge" 
                   or x == "El Paso" 
    else "ME" if x == "ME Ferry Terminal" 
    else "VI" if x == "St. Thomas" 
                   or x == "St. Croix" 
                   or x == "St. John" 
                   or x == "USVI" 
    else None if x == "Ireland" 
                   or x == "AFB DE" 
                   or x == "AB Canada" or x == "Alberta Canada" or x == "AB"
                   or x == "Nova Scotia" or x == "NS" or x == "NL"
                   or x == "Quebec" or x == "QC"
                   or x == "Canada" or x == "NB" or x == "MB" or x == "PE"  
                   or x == "Island" 
                   or x == "SK. Canada" or x == "SK"
                   or x == "IAP at Edinburg" 
                   or x == "Bahamas" 
                   or x == "Bermuda" 
                   or x == "Italy" 
                   or x == "Ontario Canada" or x == "ON" 
                   or x == "BC Canada" or x == "BC" 
    else "MI" if x == "Ambassador Bridge" 
                   or x == "M" 
                   or x == "Windsor Tunnel" 
                   or x == "MI Kent County Intl Airport" 
    else "Saipan" if x == "Saipan Intl Arpt" 
    else "CA" if x == "Truck Crossing" 
    else "NY" if x == "Rainbow Bridge" 
    else "DC" if x == "Washington DC" 
    else "ND" if x == "ND (Int'l Airport)" 
    else x, T.StringType())
cbp_df = cbp_df.withColumn("state_code", clean_states(cbp_df.state_code))

Filter records with null values.

In [14]:
cbp_df = cbp_df.filter(cbp_df['state_code'] != "None")

cbp_df = cbp_df.select('code','city','state_code').dropDuplicates(["code"])

Store cleaned file.

In [16]:
cbp_df.toPandas().to_csv(r'./port_codes/port_codes.csv', header=True)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Data Model is very simple, as it was created to answer only the question which analytics asked for.
##### Staging-like tables
Those data are required for aggregations to deliver response about data to analytics.
- 1 dimension table which stores port_codes 
port_code |city  |state
string, PK|string|string
Data stored in port_codes folder.
- 1 fact table which stored cleaned immigration data ready for aggregation
id     |year|month|port  |landing_type|visa_type
int, PK|int |int  |string|string      |string
Data stored in immigration_data folder.

##### Result table
As analytics team asked for top visa_type, landing_type, port and associated to it city for each month each year. We can have one more result table or view with the following columns:
- year
- month
- top_visa_type 
- top_lading_type
- port - the one which entered mostly
- city - associated to the most entered port
As mentioned earlier, data will be processed ones a year and will do aggregations for new 12 month only. So, we will just append data into the file, instead of overwritting it. 
Data are stored in analytics folder.

#### 3.2 Mapping Out Data Pipelines
Steps:
1. Read data from immigration_data folder for a new year
2. Read data from port_codes
3. Join both sets on immigration_data.port and port_codes.port_code. It should be inner join to get rid off records in immigration table which does not have a valid port in US.
4. Run a few separate aggregations for each month - one to identify top visa, another to identify top landing type, and last one to identify top 
5. Merge all 3 aggregations into one row with month and year
6. Append new date to existing file in analytics folder


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

In [17]:
spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport().getOrCreate()

# set what year is picked up
cur_year = 2016
immig_filepath = 'immigration_data/year=' + str(cur_year)
# read both required data sets
immig_df = spark.read.parquet(immig_filepath)
p_df = spark.read.load("./port_codes/port_codes.csv", format="csv", 
                       sep=",", header="true")

# do inner join on port_code
res_df = immig_df.join(p_df, immig_df.port == p_df.code, how='inner')

# aggregations
top_visa = res_df.groupBy('month','visa_type').count().\
    orderBy('count', ascending=False).limit(1).select('month','visa_type')
top_landing = res_df.groupBy('month','landing_type').count().\
    orderBy('count', ascending=False).limit(1).select('month','landing_type')
top_port = res_df.groupBy('month','port', 'city').count().\
    orderBy('count', ascending=False).limit(1).select('month','port', 'city')

# join all aggregations together and add year column
year_expr = 'int(' + str(cur_year) + ') as year'
agg_df = top_visa.join(top_landing, on = ['month'], how='inner').\
    join(top_port, on = ['month'], how='inner').selectExpr(year_expr, \
    "month", "visa_type","landing_type", "port", "city")


In [18]:
# append data to analytics file
if not os.path.isfile('analytics.csv'):
    agg_df.toPandas().to_csv(r'analytics.csv', header=True)
else: # else it exists so append without writing the header
    agg_df.toPandas().to_csv(r'analytics.csv', mode="a", header=False)

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [19]:
spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport().getOrCreate()

# Uniqueness of codes in port codes is highly important, 
# especially because we inner join those codes with immigration data
# We don't want results for a duplicate code being calculated twice, 
# as analytics might get th wrong top information
print("Test1: Codes from port codes file should be all unique")
p_df = spark.read.load("./port_codes/port_codes.csv", format="csv",
                       sep=",", header="true")
if p_df.count() <= 0:
    raise Exception("Test1 is failed: there is no data read from port_codes")
elif p_df.count() != p_df.select('code').distinct().count():
    raise Exception("Test1 is failed: there are duplicate codes in port_codes")
else:
    print("Test1 is passed")

Test1: Codes from port codes file should be all unique
Test1 is passed


In [20]:
print("Test2: Analytics file should have 1 record for each month and year pair")
p_df = spark.read.load("./analytics.csv", format="csv", sep=",", header="true")
if p_df.count() <= 0:
    raise Exception("Test2 is failed: there is no data read from analytics.csv")
elif p_df.groupBy('year','month').count().select('count').filter('count > 1').\
    count() != 0:
    raise Exception("Test2 is failed: there are duplicate month and year" +
                    " combinations in analytics.csv")
else:
    print("Test2 is passed")

Test2: Analytics file should have 1 record for each month and year pair
Test2 is passed


In [21]:
print("Test3: Analytics file should not have empty values")
p_df = spark.read.load("./analytics.csv", format="csv", sep=",", header="true")
if p_df.filter(p_df.year.isNull()).count() > 0 \
    or p_df.filter(p_df.month.isNull()).count() > 0 \
    or p_df.filter(p_df.visa_type.isNull()).count() > 0 \
    or p_df.filter(p_df.landing_type.isNull()).count() > 0 \
    or p_df.filter(p_df.port.isNull()).count() > 0 \
    or p_df.filter(p_df.city.isNull()).count() > 0:
        raise Exception("Test3 is failed: there is empty values in analytics.csv")
else:
    print("Test3 is passed")

Test3: Analytics file should not have empty values
Test3 is passed


#### 4.3 Data dictionary 

##### Immigration Data
Stored in immigration_data folder and partitioned by year and then month. 

***id***

this is a unique integer value for every immigration record. It originally comes from uncleaned data set loaded from US National Travel and Tourism Office website (https://travel.trade.gov).

***year***

4 digit year

***month*** 

numeric month

***port***

3 letter abbreviation for the entry port name

***landing_type***

the entry way which was used to entry US.

Allowed values are: *Air, Sea, Land*

***visa_type*** 

category of visa used to entry US. 

Allowed values: *Business, Pleasure, Student*


##### Port Data
Stored only US entry port information. It is represented in csv format and located in port_codes foder. 

port_code |city  |state

***port_code***

uniqque 3 letter abbreviation for the entry port name

***city***

city associated to entry port

***state*** 

2 letter state or territory abbreviation associated to entry port.

Exceptions are: *Guam, Saipan*


##### Analytics Data
Provides analytics with required insights about the most common characteristics of entry US. Data calculated for each month each year.
It is located in analytics.csv.

***year***

4 digit year

***month*** 

numeric month

***visa_type*** 

category of visa used motsly to entry US within the certain month and year

Allowed values: *Business, Pleasure, Student*

***landing_type***

the entry way which was used montsly to entry US within the certain month and year

Allowed values are: *Air, Sea, Land*

***port***

3 letter abbreviation for the entry port name that was used mostly to enter US within the certain month and year
  
***city***

city associated to entry port

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1. As the required data for analytics is very small and will be growing maximum 12 lines per year, we can use csv format which widely used by analytics teams in various companies. 
Because of the same reason, I don't see the point of creating databases or warehouses to store this information. Data can be easily cleaned and stored as backup in folders, but they are going to be used ones for this analytics. New set od data will be analized every year and append to xisting csv. Spark is fast enough tool to consider 1 year worth if data. 

2. I would suggest creating a scheduler which will pick up a new file for immigration data and run all steps ones a year, as US government provides this data only once a year for a previous year. It would be much better to automate this process to avoid running etls on the same year twice or more and as a result appending duplicate information to analytics file.

3.1 If data increases by 100x, then there 2 things I would change:
- store immigration data in parquet with partition by month and year right away and then clean each set separately
- do analytical aggregations for month worth data and not whole year at the same time
- Note: that eventually it might be needed to scale spark instances and add nodes.

3.2 This question is not relavant to my solution

3.3 It is possible to create a small view stored in postgres or redshift for people to access analytics data and make sure noone changes anything. So, substitute csv to view.