# Buidling Datawarehouse of US Visitor Information
### Data Engineering Capstone Project

#### Project Summary
The primary objective of this data engineering capstone project is to utilize what I have learned through the program and implement the stacks and techniques in a problem solving context. Udacity provided datasets have been choosen for the project. The main dataset will be include data on immigration to the United Sates, and supplmentary datasets will include data on airport codes, United States city demographics, and temperature distribution data.

#### The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
  + Identify and gather the data to be used for the project
  + Explain what end use case is prepared for the data
* Step 2: Explore and Assess the Data
  + Explore the data to identify data quality issues, like missing values or duplicated data
  + Documents steps necessasry to clean the data
* Step 3: Define the Data Model
  + Map out the conceptual data model
  + List the steps necessary to pipline the data into the data model
* Step 4: Run ETL to Model the Data
  + Create the data pipelines and the data model to include a data dictionary
  + Run data quality checks to ensure the pipeline ran as expected
* Step 5: Complete Project Write Up


In [28]:
# Imports and installs
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *

from etl.extract import Extract
from glob import glob
import pandas as pd
import re

In [29]:
# Initiate Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").\
config("spark.sql.broadcastTimeout", "36000").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope

In order to build a data warehouse containing US visitor information, the following information will be aggregated:
- immigration data by cities in the States
- temperature data in respect to the individual city
- demographics and airports information

The end product would be in the format of a master table join by the immigration and temperature data. The process will be done through a locally run Apache session. 

#### Describe and Gather Data 

The I94 Immigration Data comes from [the US National Tourism and Trade Office website](https://travel.trade.gov/research/reports/i94/historical/2016.html).

U.S. City Demographic Data comes from OpenSoft and includes data by city, state, age, population, veteran status and race.
- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination USA city
- arrdate = arrival date in the USA
- i94mode = 1 digit travel code
- depdate = departure date from the USA
- i94visa = reason for immigration
- The temperature data set comes from Kaggle. It is in csv format.

I94 Immigration Data (sas_data): comes from the US National Tourism and Trade Office and includes details on incoming immigrants and their ports of entry.

Airport Code Table (airport): comes from datahub.io and includes airport codes and corresponding cities.

Temperature Data : includes the information on average tempearture, city, country, latitude and longitude.

In [30]:
#File path for the data source
paths = {
    "us_cities_demographics" : "us-cities-demographics.csv",
    "airport_codes" :  "airport-codes_csv.csv",
    "sas_data" : "sas_data/",
    "temperature_data": '/data2/GlobalLandTemperaturesByCity.csv'
}

In [31]:
# Read in the data here
extract = Extract(spark, paths)

demographics = extract.get_cities_demographics()
airport_codes = extract.get_airports_codes()
immigration_data = extract.get_immigration()
temperature_data = extract.get_temperature_data()

In [32]:
demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [33]:
airport_codes.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [34]:
immigration_data.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [35]:
temperature_data.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



### Step 2: Explore and Assess the Data
#### Explore the Data 
- printSchema() function allows to see the structure and data type existing in the data frame.

#### Cleaning Steps
- data_cleaning.py existing under ETL folder contains the steps to be applied for cleaning data depending on the each dataset details.
- Cleaning steps include, but not limited to filling null values within 0 and grouping by city and state for the columns
- Filtering airports dataset for only the US located airports
- Adjusting the name for the columns for natural language

In [36]:
# Performing cleaning tasks with predefined functions
from etl.data_cleaning import Cleaner

demographics = Cleaner.get_cities_demographics(demographics)
airport_codes = Cleaner.get_airports(airport_codes)
immigration_data = Cleaner.get_immigration(immigration_data)
temperature_data = Cleaner.get_temperature(temperature_data)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

**Star Schema Dimension Tables:**

- demographics_dim
State, state_code, Total_Population, Male_Population, Female_Population, American_Indian_and_Alaska_Native, Asian, Black_or_African-American, Hispanic_or_Latino, White, Male_Population_Ratio, Female_Population_Ratio, American_Indian_and_Alaska_Native_Ratio, Asian_Ratio, Black_or_African-American_Ratio, Hispanic_or_Latino_Ratio, White_Ratio.

- airports_dim
ident, type, name, elevation_ft, continent, iso_country, iso_region, municipality, gps_code, iata_code, local_code, coordinates.

- temperature_dim
dt, AverageTemperature, AverageTemperatureUncertainty, City, Country, Latitude, Longitude, i94port

**Fact Table:**

- immigration_fact
cic_id, cod_port, cod_state, visapost, matflag, dtaddto, gender, airline, admnum, fltno, visatype, cod_visa, cod_mode, cod_country_origin, cod_country_cit, year, month, bird_year, age, counter, arrival_date, departure_date, arrival_year, arrival_month, arrival_day.


#### 3.2 Mapping Out Data Pipelines
- Transform data
Transform demographics dataset grouping by state an calculate all the totals and ratios for every race in every state. Transform immigration dataset on order to get arrival date in different columns (year, month, day) for partitioning the dataset.

- Generate Model (Star Schema):
Create all dimensions in parquet.
Create fact table in parquet particioned by year, month, day of th arrival date.
Insert in fact table only items with dimension keys right. For integrity and consistency.

In [37]:
from etl.transform import Transformer

demographics = Transformer.demographics(demographics)
immigration_data = Transformer.immigrants(immigration_data)
temperature_data = Transformer.temperatue(temperature_data)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [38]:
# Defined output path

destination_paths = {
    'demographics': './output/dimensions/demographics.parquet',
    'airports': './output/dimensions/airports.parquet',
    'temperature': './output/dimensions/temperature.parquet',
    'facts': './output/fact/immigrations_fact.parquet'
}

In [39]:
# Apply models from predefined functions

from etl.models import Model

model = Model(spark, destination_paths)

model.modelize(immigration_data, demographics, airport_codes, temperature_data)

writing demographics parquet....
writing airports parquet....
writing temperature parquet....
writing facts parquet....


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [40]:
# Apply data quality checks from predefined functions

from etl.quality_check import QualityCheck

checker = QualityCheck(spark, destination_paths)
immigration_fact = checker.get_facts()
dim_demographics, dim_airports, dim_temperature = checker.get_dimensions()

In [41]:
# Check for value existance
print(checker.row_count_check(dim_demographics),
checker.row_count_check(dim_airports),
checker.row_count_check(dim_temperature),
checker.row_count_check(immigration_fact))

True True True True


In [42]:
#check for integrity
checker.integrity_checker(immigration_fact, dim_demographics, dim_airports, dim_temperature)

True

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

In [43]:
dim_demographics.printSchema()

root
 |-- State_code: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Total_Population: double (nullable = true)
 |-- Male_Population: double (nullable = true)
 |-- Female_Population: double (nullable = true)
 |-- American_Indian_and_Alaska_Native: long (nullable = true)
 |-- Asian: long (nullable = true)
 |-- Black_or_African-American: long (nullable = true)
 |-- Hispanic_or_Latino: long (nullable = true)
 |-- White: long (nullable = true)
 |-- Male_Population_Ratio: double (nullable = true)
 |-- Female_Population_Ratio: double (nullable = true)
 |-- American_Indian_and_Alaska_Native_Ratio: double (nullable = true)
 |-- Asian_Ratio: double (nullable = true)
 |-- Black_or_African-American_Ratio: double (nullable = true)
 |-- Hispanic_or_Latino_Ratio: double (nullable = true)
 |-- White_Ratio: double (nullable = true)



In [44]:
dim_airports.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: float (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [45]:
dim_temperature.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- cod_port: string (nullable = true)



In [46]:
immigration_fact.printSchema()

root
 |-- cic_id: integer (nullable = true)
 |-- cod_port: string (nullable = true)
 |-- cod_state: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- cod_visa: integer (nullable = true)
 |-- cod_mode: integer (nullable = true)
 |-- cod_country_origin: integer (nullable = true)
 |-- cod_country_cit: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- bird_year: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- counter: integer (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- arrival_d

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1. The choice of tools and technologies for this project is based on the Apache Spark. Spark is an excellent tool when it comes to scale a lot of data and transform those data into useful one. I have also considered using the service provided by AWS, but for this particular dataset and task a locally run process deemed sufficient.

2. The data can be updated for any frequency based on the need. Apache Airflow can be set up to process the data ingestion on a monthly basis.

3. Scaling the pipeline wouldn't be an issue for the 100 folds increase of the amount of data. If the Spark process gets migrated to the AWS, the scalability would be only the matter of expense difference. Only the number of nodes of cluster in EMR needs to be increased to handle more data from there.

4. Apache Airflow can be set up to process the data at a frequency of every day

5. To provide the database service that can be accessed by multiple users, migrating the process to AWS and integrate S3, EMR, and Redshift service would be sufficient