# Project Title
### Data Engineering Capstone Project

#### Project Summary
The project aims to build a cloud based datawarehouse in Redshift for analysing International visitor statistics by the US National Tourism and Trade Office. They would like to understand the distribution of visitors around all the airports in the US and the seasonal change in number of vistors. This would enable them to assign approriate number of personnel so visitor entry is efficiently handled. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd  
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F

In [2]:
# Read AWS config to get access id
config = configparser.ConfigParser()

config.read('dw.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

### Step 1: Scope the Project and Gather Data

#### Scope 

The US Tourism and Trade office would like to understand the following visitor statistics using the I-94 vistor arrival data.

1. Visitor arrival by month and year

2. Visitors by arrival City and by date

3. Visitors by Country and date

The proposed solution is to use AWS to build a cloud Data warehouse to anwer their questions. The use of AWS cloud technology enables linear scalabity as the data set grows. AWS also supports a variety of techonologies such as S3, Reshift, EMR and Cassandra suitable for handling massive amounts of data and also to proivde fast response times to end user queries. 

Initial data analysis is done by using PySpark in the local notebook and rest of the data transformation is handled by Redshift SQL during the ETL stage.

The data will be loaded into fact and dimension tables in an Amazon Redshift data warehouse hosted in the cloud. Data is first copied to Amazon S3 service. Using S3 service will enable to scale the data staging area in case huge gigabyte size files are required to be processed. Also, the storage can be scaled back once the processing is done to save cost.

Data will be first loaded into staging tables in Amazon Redshift using Copy statements. Following this, data cleansing can be perfomed and data loaded into fact and dimension tables using insert statements. The same SQL queries can be used to create ETL pipelines if required at a later stage.


#### Describe and Gather Data

The project uses the following Udacity supplied data sets to build the data warehouse.

1. I94 immigration SAS data files
   This data is provided by the US travel department and is available here - https://travel.trade.gov/research/reports/i94/historical/2016.html
2. Airport data from the supplie airport-codes_csv.csv
3. The aiport data is augmented by a manually created airport.csv file from the provided data dictionary.
4. Country codes data in csv data format created using I94_SAS_Labels_description dictionary
5. State demographics in the supploed us-cities-demographics.csv file
6. The State data is augmented using states.txt data in csv format created using I94_SAS_Labels_description dictionary

In [2]:
# Read in the data here
i94_fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_i94 = pd.read_sas(i94_fname, 'sas7bdat', encoding="ISO-8859-1")
df_i94.head()

In [3]:
# Initialize Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.1").\
config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']).\
config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']).\
enableHiveSupport().getOrCreate()
    

In [4]:
# Read i94 sas data for 2 months - Mar 2016 and Apr 2016 and write to local parquet files
sas_files=["../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat","../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"]
for sasfile in sas_files:
    df_spark_i94 = spark.read.format('com.github.saurfang.sas.spark').load(sasfile)
    print(f"Record count for {sasfile} is ", df_spark_i94.count())
    df_spark_i94.write.mode("append").parquet("sas_parquet/")
   

Record count for ../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat is  3157072
Record count for ../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat is  3096313


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [22]:
# Performing cleaning tasks here
# Read parquet data
df_all_i94 = spark.read.parquet("sas_parquet/")
df_all_i94.count()

6253385

In [23]:
# Clean i94 data
## Get distinct values for the columns selected - remove duplicates and limit count to 1.1 million rows.
df_all_i94.createOrReplaceTempView("visitor_i94_table")
i94_table = spark.sql("select distinct cicid, i94yr, i94mon,i94cit,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,visapost, \
gender,airline,fltno,visatype from visitor_i94_table limit 1100000")
print("i94 table count:",i94_table.count())

i94 table count: 1100000


In [24]:
i94_table.show()
#Write i94 parquet to AWS S3 
i94_table.write.mode("overwrite").parquet("s3a://ctsprojbucket/i94visitors/")

+------+------+------+------+-------+-------+-------+-------+-------+------+-------+--------+------+-------+-----+--------+
| cicid| i94yr|i94mon|i94cit|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|visapost|gender|airline|fltno|visatype|
+------+------+------+------+-------+-------+-------+-------+-------+------+-------+--------+------+-------+-----+--------+
|  48.0|2016.0|   4.0| 101.0|    NYC|20545.0|    1.0|     NY|20572.0|  68.0|    2.0|     FLR|     M|     AA|00199|      B2|
| 270.0|2016.0|   4.0| 103.0|    NYC|20545.0|    1.0|     NY|20560.0|  33.0|    2.0|    null|     M|     LH|00410|      WT|
| 424.0|2016.0|   3.0| 103.0|    NYC|20514.0|    1.0|     NY|20574.0|  37.0|    2.0|     VNN|     M|     OS|00087|      B2|
| 746.0|2016.0|   4.0| 103.0|    LOS|20545.0|    1.0|   null|20546.0|  12.0|    2.0|    null|  null|     AS|00291|      WT|
|1324.0|2016.0|   3.0| 107.0|    LOS|20514.0|    1.0|     CA|20538.0|  33.0|    2.0|    null|  null|     LH|00456|      B2|
|1372.0|

In [20]:
# Gather data from manually prepared airports file
airport_local_df=spark.read.csv('./airport.csv', header=True, sep='|')
airport_local_df.createOrReplaceTempView("local_airport_table")

#Clean airport data from supplied airports file
airport_df = spark.read.csv('./airport-codes_csv.csv', header=True)
airport_df.createOrReplaceTempView("airport_table")
orig_airport_df = spark.sql("SELECT * from airport_table")
print("Original airport count ",orig_airport_df.count())

# Merge the 2 airport datasets
# Augment supplied airports with missing airports from manually created airport list
# Assign row number by iata_code using row_number Window function
clean_airport_df = spark.sql("select a.*, row_number() over(partition by iata_code order by municipality) as rownum\
                               from (SELECT distinct iata_code,ident,type,name,continent,iso_country,iso_region,\
                               municipality,gps_code,local_code,coordinates \
                               from airport_table where iso_country = 'US' and iata_code is not null\
                               union all \
                              select iata_code,null,null,null,null,null,null,municipality,null,null,null from local_airport_table\
                              where iata_code not in (select distinct iata_code from airport_table where iata_code is not null)\
                              ) a\
                              ")

clean_airport_df.describe
print("Clean airport count ",clean_airport_df.count())

# Eliminate duplicates by selecting 1st row from every iata_code group
clean_airport_df.createOrReplaceTempView("clean_airport_table")
nondup_airport_df = spark.sql("select iata_code,ident,type,name,continent,iso_country,iso_region,\
                               municipality,gps_code,local_code,coordinates from clean_airport_table \
                                where rownum=1")
nondup_airport_df.count()
nondup_airport_df.show()

Original airport count  55075
Clean airport count  2133


2128

In [21]:
# Write to local parquet first since the file is small and directly writing to S3 is taking forever
nondup_airport_df.write.mode("overwrite").parquet("airports/")

# Read from parquet
df_airports = spark.read.parquet("airports/")

#Write Airports to S3
df_airports.describe
df_airports.write.mode("overwrite").parquet("s3a://ctsprojbucket/airports/")
df_airports.show()

+---------+-----+--------------+--------------------+---------+-----------+----------+----------------+--------+----------+--------------------+
|iata_code|ident|          type|                name|continent|iso_country|iso_region|    municipality|gps_code|local_code|         coordinates|
+---------+-----+--------------+--------------------+---------+-----------+----------+----------------+--------+----------+--------------------+
|      ARC| PARC|medium_airport|Arctic Village Ai...|       NA|         US|     US-AK|  Arctic Village|    PARC|       ARC|-145.578995, 68.1147|
|      BKD| KBKD| small_airport|Stephens County A...|       NA|         US|     US-TX|    Breckenridge|    KBKD|       BKD|-98.8909988403000...|
|      CHZ| K2S7| small_airport|Chiloquin State A...|       NA|         US|     US-OR|       Chiloquin|    K2S7|       2S7|-121.879062653, 4...|
|      CIU| KCIU|medium_airport|Chippewa County I...|       NA|         US|     US-MI| Sault Ste Marie|    KCIU|       CIU|-84.472

In [14]:
# Read locally prepared state file
local_state_df = spark.read.csv('states.txt',header=True)
local_state_df.createOrReplaceTempView("local_state_table")
local_state_df.show()


+----------+-----------------+
|state_code|            state|
+----------+-----------------+
|        AL|          ALABAMA|
|        AK|           ALASKA|
|        AZ|          ARIZONA|
|        AR|         ARKANSAS|
|        CA|       CALIFORNIA|
|        CO|         COLORADO|
|        CT|      CONNECTICUT|
|        DE|         DELAWARE|
|        DC|DIST. OF COLUMBIA|
|        FL|          FLORIDA|
|        GA|          GEORGIA|
|        GU|             GUAM|
|        HI|           HAWAII|
|        ID|            IDAHO|
|        IL|         ILLINOIS|
|        IN|          INDIANA|
|        IA|             IOWA|
|        KS|           KANSAS|
|        KY|         KENTUCKY|
|        LA|        LOUISIANA|
+----------+-----------------+
only showing top 20 rows



In [15]:
# US cities state demographics data
# Augment with data from manually preparted states file - add only missing states

us_city_demo_df = spark.read.csv('us-cities-demographics.csv',sep=';',header=True)
us_city_demo_df.describe
us_city_demo_df.createOrReplaceTempView("us_city_state_table")
us_state_table = spark.sql("select `State Code` as state_cd,State,sum(`Total Population`) as total_popln,\
                            sum(`Foreign-born`) as foreign_born_popln\
                           from us_city_state_table\
                           group by `State Code`,State\
                           union all\
                           select state_code,state,null,null\
                           from local_state_table \
                           where state_code not in (select distinct `State Code` from us_city_state_table)\
                           ")
print("State count is ",us_state_table.count())
us_state_table.show()

# Write to local parquet first since the file is small and directly writing to S3 is taking forever
us_state_table.write.mode("overwrite").parquet("state_parquet/")


State count is  55
+--------+--------------------+------------+------------------+
|state_cd|               State| total_popln|foreign_born_popln|
+--------+--------------------+------------+------------------+
|      MT|             Montana|    906470.0|           29885.0|
|      NC|      North Carolina| 1.5300995E7|         1896635.0|
|      MD|            Maryland|   6560645.0|         1148970.0|
|      CO|            Colorado| 1.4678345E7|         1688155.0|
|      CT|         Connecticut|   4355096.0|         1114250.0|
|      IL|            Illinois|  2.251439E7|         4632600.0|
|      NJ|          New Jersey|   6931024.0|         2327750.0|
|      DE|            Delaware|    359785.0|           16680.0|
|      DC|District of Columbia|   3361140.0|          475585.0|
|      AR|            Arkansas|   2882889.0|          307753.0|
|      TN|           Tennessee| 1.0690165E7|          900149.0|
|      LA|           Louisiana|   6502975.0|          417095.0|
|      AK|           

In [16]:
# Upload States data to S3
df_states = spark.read.parquet("state_parquet/")
df_states.write.mode("overwrite").parquet("s3a://ctsprojbucket/states/")

In [9]:
# Write countries to local parquet first
df_cc = spark.read.csv('countrycodes.csv',header=False,sep='|')
print ("Country code count is ",df_cc.count())
df_cc.write.mode("overwrite").parquet("country_parquet/")

# Upload countries data to S3
df_countries = spark.read.parquet("country_parquet/")
df_countries.write.mode("overwrite").parquet("s3a://ctsprojbucket/countries/")

Country code count is  288


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

The Data model consists of the following fact and dimension tables. 

##### Fact table - i94visitors_fact
This fact table contains all the detailed i94 visitor data from the SAS files. The source data is loaded into a staging table. ETL is performed using SQL as data is loaded into the fact table.
The fact table contains most of the columns from the source SAS files so no detail is lost. All analytical queries can be run on this fact table. The data model can be enhanced by building 
summary fact tables to speed up queries if required.

##### airports_dim
This dimension table is loaded from airport-codes csv file after cleaning up non US airports. The airports dimension provides details about each of the airports in the US and can help disect the fact table by airport

##### states_dim
The states dimension provides state level demographics such as total population and foreign born population and can provide insights into number of visitors arriving in a state and its foreign born population. 

##### dates_dim
The dates dimension is useful for summarizing the visitors data by arrival dates, month, year, etc. This will provide visibility into how the volume of visitors changes over the course of the year and months within that year.


##### countries_dim
The Countries dimension lists the countries of origin for the visitors. Data in this dimension can be enahnced further as required. This dimension helps in summarizing the data based on the visitors' countries and can be used to check where people are visiting from. 


The data model would provide US travel department with a lot of insights such as where people are arriving from and during what season. Although data for all air travellers is not captured here, it would still help various US travel departments to better provision their airport resources and personnel and also help the states involved in performing future capacity planning and upgrades of their airports.  The US embassies in various countries can all augment their staff and capacity based on this data.


#### 3.2 Mapping Out Data Pipelines

The data pipeline is described briefly below
1. Data is first staged in Amazon S3 in parquet format. 
2. The data is then loaded into fact and dimension tables in Amazon Redshift for analysis by end users. 
3. Apache airflow is used to build the data pipelne. 
4. The stage to Redshift operator is reused while loading data eliminating redundant code. 
5. Any ETL while loading data is performed by SQL queries. 
6. The airflow pipeline is called capstone dag and is available under the project submission folders



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:

# THe data pipeline for the ETL is implemented using Apache airflow
# The Dag called capstone is available under the dags folder. 
# https://github.com/learnds/DataEngineer/blob/master/capstone/dags/capstone.py
# The table creation script is available in create_tables.sql



#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

# Perform quality checks here
# Data quality checks are performed in the Airflow Dag
1. The fact table - i94visitors_fact is checked for null values for 'reasonforvisit' column. A threshold can be set to fail the Dag if the count of null/missing values exceeds the execpted count.
2. The fact table - i94visitors_fact is checked for missing dimension values by querying for '-1' in airportid or stateid columns. A threshold value of 10000 is set. If the counts exceed this threshold, the dag fails.

#### 4.3 Data dictionary 

There are 4 dimension tables and 1 fact table

##### Dimension tables

##### dates_dim
The dates_dim dimension table contains date related information for querying by date, day, month, year, etc.
date - primary key date column used as the primary key
day - day of the week
week - week of the year
month - month of the year
year - 4 digit year

##### states_dim
This table contains state level demographics for 50 US states along with Washington DC, Virgin Islands and a few other territories around the US.
The table may be useful in determining if there is a "causal" relationship between the nationality of the pepple arriving at a particular airport and the nationality of the immigrants living in that state. 

stateid - Primary key, 2 letter state id for the various states in the US and surrounding independent areas
statename - Name of the state
totalpopulation - Total population of the state
foreignborn - Total foreign born populaton 

##### airports_dim
This table contains valueable information about various airports in the US. This will be useful in charting traffic across various airports in the country

airportid - Primary key, Airport Id usually consists of 3 letters used by the airline industry
identifier - internal identification number of the airport
type - type and size of the airport, usually small, medium, heliport, etc
name - Name of the airport
continent - Redundant column indicating continent
isocountry - Redudant column, only stores US in this exercise
isoregion - Region of the airport usually derived from the 2 letter State 
municipality - City/Town where the airport is located
gpscode - GPS code for the airport
localcode - Local code for the airport
coordinates - Latitiude/Longitude coordinates for the airport

##### countries_dim
This contains the country codes and correspoding names of the countries people arrive from. It is vert useful in understanding traffic volume from various countries.
The dimension can be expanded to include other additional columns.

countryid - Primary key, Numerical country id used in i94 visitors souce data
countryname - name of the country the person arrived from

##### i94visitors_fact

This is a detailed Fact table that stores visitor level information for each traveller visiting the US.

visitorid - The unique visitor id derived by applying md5 to the i94 cicid raw column from the dataset
arrivaldate - The arrivaldate of the visitor derived from the SAS date. It provides complete date information including day of the month.
airportid - The airport id where the visitor arrived. Stores '-1' for invalid airports
stateid - The state id where the visitor arrived. Stores '-1' for invalid or missing states
visitorcountryid - The id corresponding the country of the visitor
i94date - Partial i94 date rounded to the 1st of the month
arrivalmode - mode of arrival of the visitor - land, air, sea, etc
reasonforvisit - Reason for the traveler's visit - Business, pleasure, etc
departuredate - the date the visitor left the country
visitorage - Age of the visitor in years
visatype - Visa type used by the visitor - Visitor, business visa, etc
visaissuedloc - Location in the visitors country where the visa was issued.
gender - gender of the visitor
airline - The airline used by the visitor
fltno - The flight number of the flight on which the visitor arrived.



#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
A cloud based solution such as AWS provides high scalability and high availability. Nodes can be added and removed dynamically, storage can be expanded with virtually as much storage as you afford.
Data is initially staged in Amazon S3 which provides high performance storage and supports parquet format to handle massive amounts of data.
Redshift is a very popular AWS database for hosting Data Warehouses. It is highly scalable and a clusted can be spun up in minutes. It provides many ways to partition data as well. Since it is directly licensed by Amazon, no additional licensing costs are incurred apart from the usage fee.
The ETL pipeline is handled by Apache Airflow that is a fairly mature Opensource tool for handling DAGs. It can seamlessly connect to AWS to run ETL at various schedules.



* Propose how often the data should be updated and why.
Data as in the supplied dataset can be loaded on a monthly basis. The SAS files are partitioned by month and are probably available at the beginning of the following month. Initial analysis shows that the monthly volume is around 3 million rows which is pretty small and is easily handled by a 2 node Redshift cluster.


* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 If the data set increases by 100x, that would amount to 300 million rows per month. While this can be easily ingested into a 16 node (or larger) high performance SSD Redshift cluster, resources may not see full utilization after the load is complete. It is best to receive smaller SAS datasets on a daily basis that would amount to 10 million rows and can be efficiently loaded using a 8 node cluster.
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 The SLA here is load the dashboard data by 7 am. The Airflow DAG can be set up with a 'sla' parameter setting to ensure loads complete by 6 am to provide some buffer. A 'sla_miss_callback' function can be implemented in the DAG to alery oncall personnel in case the loads do not complete on time. 
 
 * The database needed to be accessed by 100+ people.
 This is a read heavy scenario. 
 A 16 or 32 node Redshift cluster can be created to support 100's of users.
 Depending on the queries, additional summary tables also called cubes can be created as part of the ETL to support summarization queries. 
 Smaller  tables can use the distribution feature and larger tables can use the sorted key feature to store data in the sorted order to  improve query performance for useds.
 Redshift materialized views can also be created to support such summarizations transparaently. 
 If the users need to query detailed data, one or more Cassandra tables can be created based on the user queries on appropriately sized cluster to provide response times of less than a few seconds or less than a second.