# U.S. Visitors Data-Warehousing 
## (Data Engineering Capstone Project)

### Project Summary

The purpose of the data engineering capstone project is to give students a chance to combine what they've learned throughout the program. This project will be an important part of their portfolio that will help them achieve their data engineering-related career goals.

The project aims to create a data warehouse for U.S. immigration data in order to understand and easily query for information related to people visiting U.S.

### Project Steps
The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 

The plan in this project is to create a Data Warehouse in the cloud, so the Analytics and BI Tools can perform queries related to U.S. immigration for international visitors.

#### Data Sets

 The following data sets are analyzed for this project, povided in the project workspace. The Exploratory Data Analysis (EDA) is performed in the next section.

-   **I94 Immigration Data:** This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
-   **World Temperature Data:** This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).
-   **U.S. City Demographic Data:** This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
-   **Airport Code Table:** This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).

#### End Solution

The end solution would be based on **AWS (Amazon Web Services) Cloud**. In the first phase of the project we would perform **ETL** processes on the given datasets, and stage the data model in **Amazon S3 buckets**. In the second phase of the project, we would load the dimension and fact tables fromt the **AWS S3 buckets** to **AWS Redshift** using the data pipeline created using **Apache Airflow**, and eventually check the data quality.



![](./images/Architecture.jpg)

The users could easily query the data warehouse to find information like visitors origin, distribution of visitors by nationality, correlation between the source country and the state destination in U.S., comparison of their regional climate changes, comparision of the demographics of the states visited, etc.

### Step 2: Explore and Assess the Data

In the following section, I performed a Exploratory Data Analysis on the data sets provided by Udacity, and discuss what data sets are useful and how we can clean or tranform data into something meaningful.


### 1. Immigration Data

Form I-94, the Arrival-Departure Record Card, is a form used by U.S. Customs and Border Protection (CBP) intended to keep track of the arrival and departure to/from the United States of people who are not United States citizens or lawful permanent residents. 

This data comes from the US National Tourism and Trade Office. For each month of 2016 there is a 'sas7bdat' file in this folder **'../../data/18-83510-I94-Data-2016/**. I have used April 2016 for this project, which has 3096313 entries. 

For the capstone project, the immigration data is treated as the main data set and it will fill the fact table of star schema.

In [None]:
# Importing Libraries
import pandas as pd

In [None]:
# Reading Immigration Data
immigration_fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immigration = pd.read_sas(immigration_fname, 'sas7bdat', encoding="ISO-8859-1")
df_immigration.head()

In [None]:
# Checking which columns tend to be empty
df_immigration.isnull().sum()

|  Column Name | Description  |
|---|---|
|   **cicid**       |  Unique Identifier for the Immigration Data |
|   **i94yr**       |   Year |
|   **i94mon**      |   Month |
|   **i94cit**      |   City Code (Cirty of Origin)|
|   **i94res**      |   Country Code (Country of Residence)|
|   **i94port**     |   Port City of Entry into U.S.|
|   **arrdate**     |   Arrival Date in US (SAS date format)|
|   **i94mode**     |   Mode of transportation (1: Air, 2: Sea, 3: Land, 9: Not reported) |
|   **i94addr**     |   State of Arrival|
|   **depdate**     |   Departure Date from US |
|   **i94bir**      |   Age of Respondent in Years |
|   **i94visa**     |   Visa codes collapsed into three categories (1 = Business, 2 = Pleasure, 3 = Student)|
|   **count**       |   Used for Summary Statistics |
|   **dtadfile**    |   Character Date Field - Data of Arrival |
|   **visapost**    |   Department of State where where Visa was issued |
|   **occup**       |   Occupation that will be performed in U.S. |
|   **entdepa**     |   Arrival Flag - admitted or paroled into the U.S. |
|   **entdepd**     |   Departure Flag - Departed, lost I-94 or is deceased |
|   **entdepu**     |   Update Flag - Either apprehended, overstayed, adjusted to perm residence |
|   **matflag**     |   Match flag - Match of arrival and departure records |
|   **biryear**     |   4 digit year of birth |
|   **dtaddto**     |   Character Date Field - Date until which allowed to stay in U.S. |
|   **gender**      |   Non-immigrant sex |
|   **insnum**      |   INS number |
|   **airline**     |   Airline used to arrive in U.S. |
|   **admnum**      |   Admission Number |
|   **fltno**       |    Flight number of Airline used to arrive in U.S. |
|   **visatype**    |   Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |

*Assumptions are made where the column description was not clear enough.

### 2. Temperature Data

This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data). The raw data comes from the [Berkeley Earth data page.](http://berkeleyearth.org/archive/data/)

For the capstone project, we will be using the file from the workspace "../../data2/GlobalLandTemperaturesByCity.csv".
The data set provide city temperatures from year 1743 to 2013. But the immigration data is from year 2016. So we can't directly use this as table, instead we can aggregate the city info to find the average temperature on a country-level and create a new dimension table.


In [None]:
temperature_fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(temperature_fname)
df_temperature.head()

In [None]:
# Checking which columns tend to be empty
df_temperature.isnull().sum()

|  Column Name | Description  |
|---|---|
|   **dt**       |  Date in 'YYYY-MM-DD' format |
|   **AverageTemperature**       |   Average temperature in degrees |
|   **AverageTemperatureUncertainty**      |   Average temperature uncertainity in degrees |
|   **City**      |   City name |
|   **Country**      |   Country name |
|   **Latitude**      |   Latitude |
|   **Longitude**      |   Longitude |

In [None]:
df_temperature_by_country = df_temperature.groupby(["Country"]).agg({"AverageTemperature": "mean", 
                                                                        "Latitude": "first", "Longitude": "first"})

In [None]:
df_temperature_by_country.head()

### 3. U.S. Cities Deographic Data

This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

For the capstone project, we will be using the file from the workspace **"us-cities-demographics.csv".**
The data set contains city demographics. In order to use it with our immigration fact table, we can aggregate certains columns to get the state-level info. This could be our new dimension table.

In [None]:
df_cities = pd.read_csv("us-cities-demographics.csv", delimiter=";")
df_cities.head()

In [None]:
# Checking which columns tend to be empty
df_cities.isnull().sum()

|  Column Name | Description  |
|---|---|
|   **City**      |   City name |
|   **State**         |   U.S. State of city |
|   **Median Age**      |   Median of the age of population |
|   **Male Population** |   Size of male population |
|   **Female Population**      |   Size of female population |
|   **Total Population**      |   Size of total population |
|   **Number of Veterans**      |   Number of Veterans in city |
|   **Foreign-born**      |   Number of foreign-borns in city |
|   **Average Household Size**      |   Average size of house-hold |
|   **State Code**      |   Code of the state of city |
|   **Race**      |   Majority race in the city |
|   **Count**      |   Population of the majority race |

### 4. Airport Data

This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).

The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code (from wikipedia).



In [None]:
df_airport = pd.read_csv("airport-codes_csv.csv")
df_airport.head()

|  Column Name | Description  |
|---|---|
|   **ident**      |   Unique identifier |
|   **type**         |   Type of the airport |
|   **name**      |   Name of the airport |
|   **elevation_ft** |   Altitude of the airport |
|   **continent**      |   Continent of the airport |
|   **iso_country**      |   ISO Code of the country of the airport |
|   **iso_region**      |   ISO Code of the region of the airport |
|   **municipality**      |   City of the airport |
|   **gps_code**      |   GPS Code of the airport |
|   **iata_code**      |   IATA Code of the airport |
|   **local_code**      |   Local Code of the airport |
|   **coordinates**      |   GPS coordinates of the airport |

For the capstone project, we will not use this data set, because the available keys don't seem to match with the main fact table with immigration data.

### 5. I94 Labels

There is a i94 Label descriptions file **I94_SAS_Labels_Descriptions.SAS_** provided in the workspace as well. For captone project, I have extracted the label descriptions from this file and put the labels in corresponding csvs, placed in i94_labels folder, which can later be used as a lookup.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Following diagram shows the Star Scema Data Model used for Capstone Project.
With **Immigration** Table at the center as a Fact Table. **Country_Temperature** and **State_Demographics** tables as Dimension Table.

![](./images/Data_Model.jpg)

#### 3.2 Mapping Out Data Pipelines
Some python functions were written using PySpark to perform initial ETL (Extract, Transform and Load) processing of the data sets and store them as parquet files.
These functions can be found in folder **etl** folder.
Following are the steps to perform ETL using those Python functions:

In [None]:
# Loading the functions
from etl.etl import create_spark_session, etl_immigration, etl_temperature_by_country, etl_state_demographics
from pyspark.sql.functions import isnan, isnull, when, count, col

In [None]:
# Creating the Spark Session
spark = create_spark_session()

#### ETL processing for 'Immigration' Table
In the step we perform the following tasks:
- Extract the immigration data from *'sas7bdat'* file format.
- Transform the selected columns with appropriate data types.
- Load it on AWS S3 Bucket in the parquet format.

In [None]:
# Perform ETL processing for Immigration Fact Table for April 2016
immigration_table = etl_immigration(spark, 
                input_data="../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat", 
                output_path="parquet-output/immigration")
                #output_path="s3a://zafarsohaib/immigration")
immigration_table.show(n=5)

In [None]:
# Check for NULL values 
immigration_table.select([count(when(isnull(c), c)).alias(c) for c in immigration_table.columns]).show()

#### ETL Processing for 'Country_Temperature' Table
In the step we perform the following tasks:

- Loads of the csv file of the global temperature and I94CIT_I94RES labels
- Aggregates the temperatures dataset by country and rename new columns
- Join the two datasets based on Country Name
- Save the resulting parquet file to the staging area in Amazon S3

In [None]:
# Perform ETL processing for Country_Temperature Dimension Table
country_temperature_table = etl_temperature_by_country(spark, 
                          input_data="../../data2/GlobalLandTemperaturesByCity.csv", 
                          label_data="i94_labels/I94CIT_I94RES.csv",
                          output_path="parquet-output/country_temperature")
                          #output_path="s3a://zafarsohaib/country_temperature")
                                                       
country_temperature_table.show(n=5)

In [None]:
# Check for NULL values 
country_temperature_table.select([count(when(isnull(c), c)).alias(c) for c in country_temperature_table.columns]).show()

In [None]:
# Reading from parquet to verfify schema
#par = spark.read.parquet("s3a://zafarsohaib/state_demographics")
par = spark.read.parquet("parquet-output/state_demographics")
par.printSchema()

#### ETL Processing for 'State_Demographics' Table
In the step we perform the following tasks:

- Loads the csv file of the state demographics and I94ADDR labels.
- Aggregates the demographics dataset by state and rename new columns.
- Join the two datasets based on State Name.
- Save the resulting parquet file to the staging area in Amazon S3.

In [None]:
# Perform ETL processing for State_Demographics Dimension Table
state_demographics_table = etl_state_demographics(spark, 
                       input_data="us-cities-demographics.csv", 
                       label_data="i94_labels/I94ADDR.csv",
                       output_path="parquet-output/state_demographics")
                       #output_path="s3a://zafarsohaib/state_demographics")
state_demographics_table.show(n=5)

In [None]:
# Check for NULL values 
state_demographics_table.select([count(when(isnull(c), c)).alias(c) for c in state_demographics_table.columns]).show()

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
The Data Pipeline can be divided into two phases. First phase perform ETL processes using Spark on AWS EMR Cluster, and stages the resulting parquet files in AWS S3 Buckets. The second phase involves using Apache Airflow pipelining to trigger loading of files from S3 buckets to AWS Redshift, and finally the tables are checked to ensure data quality.

![](./images/Architecture.jpg)

### Airflow DAG

First step after the execution has started, is to create table in Redshift (if doesn't exist), the SQL queries to create tables are stored in the file **"/airflow/dags/create_tables.sql"**. (*NOTE: As our Dimension Tables are not too big we chose an ALL distribution style, to speed up our analytical queries for joins*). After than we load the fact and dimension tables by loading the parquet files stored on AWS S3, in the last step we run the data quality checks as explained in next section.

![](./images/DAG.jpg)

#### 4.2 Data Quality Checks

After all the tables are loaded to Radshift dataware house. The last step of the Airflow Pipeline, called "Run_data_quality_checks", verifies if the tables record count is as expected.

#### 4.3 Data dictionary 
Below you can find a data dictionary for the data model. For each field, there is a brief description of what the data is and where it came from. 

#### Fact Table: Immigration 

|  Column Name | Description  |
|---|---|
|   **ID**      |   Primary Key (renamed from: cicid) |
|   **YEAR**         |   4-digit Year (renamed from: i94yr) |
|   **MONTH**      |  Month Number (renamed from: i94mon)  |
|   **ARR_DATE** |   Arrival Data 'MM-DD-YYYY' (renamed from: arrdate)  |
|   **DEP_DATE**      |   Departure Data 'MM-DD-YYYY' |
|   **ARR_CITYCODE**      |   Arrival City Code |
|   **ARR_STATECODE**      |   Arrival State Code |
|   **ARR_MODE**      |   Arrival Transport Mode |
|   **ARR_FLIGHT**      |   Arrival Flight Number |
|   **ARR_AIRLINE**      |   Arrival Airline |
|   **RES_BIRTHYEAR**      |   Respondent's Birth Year |
|   **RES_COUNTRYCODE**      |   Respondent's Residence Country Code |
|   **RES_GENDER**      |   Respondent's Gender |
|   **VISA_EXPIRYDATE**      |   VISA Expiry Date |
|   **VISA_ISSUESTATE**      |   VISA Issue State |
|   **VISA_CATEGORY**      |   VISA Category |
|   **VISA_TYPE**      |   VISA Type |

#### Dimension Table: Country_Temperature 

|  Column Name | Description  |
|---|---|
|   **Code**      |   Country Code (Foreign Key) |
|   **Country**         |   Country Name |
|   **AverageTemperature**      |  Average Temperature in Celsius  |
|   **Latitude** |   GPS Latitude   |
|   **Longitude**      |   GPS Longitude |


#### Dimension Table: State_Demographics 

|  Column Name | Description  |
|---|---|
|   **Code**      |   State Code (Foreign Key) |
|   **State**         | State Name   |
|   **Male_Population**      |  Number of Males in State  |
|   **Female_Population** |   Number of Females in State |
|   **Total_Population**      |   Total population of State |
|   **Veterans**      |   Number of Veterans in State |
|   **Foreign_born**      |   Number of Foreign_born in State |


#### Step 5: Complete Project Write Up

***Clearly state the rationale for the choice of tools and technologies for the project.***

The whole Capstone project is implemented to be deployed on AWS Cloud. The AWS Cloud Services were chosen because of following benefits:

1. **Trade capital expense for variable expense –** Instead of having to invest heavily in data centers and servers before you know how you’re going to use them, you can pay only when you consume computing resources, and pay only for how much you consume.

2. **Benefit from massive economies of scale –** By using cloud computing, you can achieve a lower variable cost than you can get on your own. Because usage from hundreds of thousands of customers is aggregated in the cloud, providers such as AWS can achieve higher economies of scale, which translates into lower pay as-you-go prices.

3. **Stop guessing capacity –** Eliminate guessing on your infrastructure capacity needs. When you make a capacity decision prior to deploying an application, you often end up either sitting on expensive idle resources or dealing with limited capacity. With cloud computing, these problems go away. You can access as much or as little capacity as you need, and scale up and down as required with only a few minutes’ notice.

4. **Increase speed and agility –** In a cloud computing environment, new IT resources are only a click away, which means that you reduce the time to make those resources available to your developers from weeks to just minutes. This results in a dramatic increase in agility for the organization, since the cost and time it takes to experiment and develop is significantly lower.

5. **Stop spending money running and maintaining data centers –** Focus on projects that differentiate your business, not the infrastructure. Cloud computing lets you focus on your own customers, rather than on the heavy lifting of racking, stacking, and powering servers.

6. **Go global in minutes –** Easily deploy your application in multiple regions around the world with just a few clicks. This means you can provide lower latency and a better experience for your customers at minimal cost.

In particular, we are using the following AWS Services:

**Amazon S3** It has a simple web services interface that you can use to store and retrieve any amount of data, at any time, from anywhere on the web. It gives any developer access to the same highly scalable, reliable, fast, inexpensive data storage infrastructure that Amazon uses to run its own global network of web sites.

**Amazon Elastic MapReduce (EMR)** is an Amazon Web Services (AWS) tool for big data processing and analysis. Amazon EMR offers the expandable low-configuration service as an easier alternative to running in-house cluster computing.

**Redshift** is very fast when it comes to loading data and querying it for analytical and reporting purposes. Redshift has Massively Parallel Processing (MPP) Architecture which allows you to load data at blazing fast speed. In addition, using this architecture, Redshift distributes and parallelize your queries across multiple nodes. It is horizontally scalable, fully-managed datawarehouse and offers attractive and transparent 'pay-as-you-go' pricing.

**Apache Spark** is an open-source distributed general-purpose cluster-computing framework. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance. Spark provides API for Python called PySpark.

***Propose how often the data should be updated and why.***

The data would be updated on a monthly basis as we recieve one file per month.

***Write a description of how you would approach the problem differently under the following scenarios:***

***- The data was increased by 100x.***

As our project is deployed in Cloud, scalability will not be an issue. We can increase the number of nodes for EMR Cluster. We can also resize Redshift Cluster to more storage-optimized xlarge node types.

***- The data populates a dashboard that must be updated on a daily basis by 7am every day.***

The Airflow DAG can be scheduled to meet this demoand.

***- The database needed to be accessed by 100+ people.***

Amazon Redshift Cluster can be extended with more compute nodes. There is also an option of "Elastic Resize" available. Elastic resize significantly improves your ability to scale your Amazon Redshift clusters on-demand. Together with features such as Amazon Redshift Spectrum, it enables you to independently scale storage and compute so that you can adapt to the evolving analytical needs of your business.