# Project Title
### Data Engineering Capstone Project

#### Project Summary

This project exercises an ETL pipeline on Spark based on large datasets (immigration and weather data). 

What can the data be used for?

The database that will be created from the initial datasets will be used to analyse correlations between immigration data (ie. gener), temperature, in major cities.

Example of questions the data model should be able to answer:

* number of visitors per month, per year in a given city, state or country
* seasonality of visits, depending on weather temperatures over the course of a week, month, year
* main reasons of visits based on the VISA types ([B1, B2, WT, WB, others](https://isso.ucsf.edu/what-b-1b-2-and-wbwt-status)
* impact of weather on tourism / regional economy by looking at the most/least popular destinations (city, state or country)

Who can use it? 

The produced data can be used by a variety of end users such as:
* Government workers: the data could help the immigration department to anticipate and re-evaluate future workloads/resource capacity/quotas in processing VISA requests
* Tourism & Affairs workers: the data could help public or private organisations to understand the influence of climate or to understand the success of developments effort in the tourism industry.
* Economists, historians: given the weather dataset goes back to ~1700, studying migrations trends could be another angle of analysis for historians or economists looking at the impact of global changes and their mass population effects.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, udf, col, udf, date_format, hour, year, month, weekofyear, dayofmonth, from_unixtime


### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

##### I94 Immigration Data

[This](https://travel.trade.gov/research/reports/i94/historical/2016.html) data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.

Format:

```
,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,07202016,F,,JL,56582674633.0,00782,WT
```

##### World Temperature Data

This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

##### U.S. City Demographic Data

This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

Format:

```
City;State;Median Age;Male Population;Female Population;Total Population;Number of Veterans;Foreign-born;Average Household Size;State Code;Race;Count
Newark;New Jersey;34.6;138040;143873;281913;5829;86253;2.73;NJ;White;76402
```

##### Airport Code Table

This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).

Format: 

```
ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
00A,heliport,Total Rf Heliport,11,NA,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
```

In [2]:
spark = SparkSession\
    .builder\
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport()\
    .getOrCreate()

In [3]:
travel_df = spark\
        .read\
        .format("com.github.saurfang.sas.spark")\
        .load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [4]:
temp_df = spark\
        .read\
        .option("header", "true")\
        .csv('../../data2/GlobalLandTemperaturesByCity.csv')

In [5]:
demog_df = spark\
        .read\
        .option("header", "true")\
        .option("delimiter", ";")\
        .csv('./us-cities-demographics.csv')

In [6]:
# Downsize datasets to 10k records during development
# This is also to avoid filling up the Udacity workspace disk space as parquet files are persisted and used during the tests.
demog_df = demog_df.limit(10000)
travel_df = travel_df.limit(10000)
temp_df = temp_df.limit(10000)

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [7]:
def debug_df(spark_df): 
    """
    Show statistics on a dataframe.
    Do not run this function in production as it runs full scans on the dataset
    """
    print("Dataframe row count: {}".format(spark_df.count()))
    print("Dataframe first 2 rows: {}".format(spark_df.show(n=2)))
    print("Dataframe schema: {}".format(spark_df.printSchema()))

##### Travel / Immigration


In [8]:
def get_valid_codes(file, isnumeric):
    """
    Takes a file (1 col) and return the values in a list
    """
    valid_codes = {}
    with open(file) as codes:
        count = 0
        for code in codes:
            count = count + 1
            col1 = code.split(';')[0].rstrip("\n")
            if isnumeric:
                valid_codes[count]=float(col1)
            else:
                valid_codes[count]=col1
    return list(valid_codes.values())

In [9]:
@udf
def sasdate_to_datetime(sas_date):
    """
    Convert SAS Date to Spark Date
    """
    epoch = datetime.datetime(1960, 1, 1).date()
    date = epoch + datetime.timedelta(sas_date)
    return date.isoformat()

@udf
def sanitize(label):   
    """
    Utility function to normalise locations (countries or cities)
    """
    return label.upper().rstrip("\n").strip()

cities = {}
with open("./i94codes/I94PORT.codes") as codes:
    count = 0
    for code in codes:
        data = code.split(';')
        col1 = data[0].rstrip()
        col2 = data[1].rstrip("\n").strip()
        cities[col1] = col2

@udf
def citycode_to_name(city_code):
    """
    Return city name from code
    """
    return cities[city_code]

In [10]:
debug_df(travel_df)

Dataframe row count: 10000
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null|1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0

In [11]:
# The files *.codes are an extraction of valid codes from I94_SAS_Labels_Descriptions.SAS
# This approach has be taken due to the short list of codes and to avoid relying on I94_SAS_Labels_Descriptions.SAS formats
cit_res_codes = get_valid_codes("./i94codes/I94CIT-I94RES.codes", isnumeric=True)
port_codes = get_valid_codes("./i94codes/I94PORT.codes", isnumeric=False)
mode_codes = get_valid_codes("./i94codes/I94MODE.codes", isnumeric=True)
addr_codes = get_valid_codes("./i94codes/I94ADDR.codes", isnumeric=False)

travel_df = travel_df\
    .withColumn("i94port", sanitize(travel_df.i94port))\
    .filter(travel_df.i94cit.isin(cit_res_codes))\
    .filter(travel_df.i94res.isin(cit_res_codes))\
    .filter(travel_df.i94port.isin(port_codes))\
    .filter(travel_df.i94mode.isin(mode_codes))\
    .filter(travel_df.i94addr.isin(addr_codes))\
    .where(travel_df.arrdate.isNotNull())

travel_df = travel_df\
    .withColumn("city_name", citycode_to_name(travel_df.i94port))\
    .withColumn("date", sasdate_to_datetime(travel_df.arrdate))

In [12]:
debug_df(travel_df)

Dataframe row count: 9586
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+-------------+----------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|    city_name|      date|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+-------------+----------+
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    null| null|      T|      O|   null|      M| 1961.0|09302016|     M|  null|     OS|  6.66643185E8|   93|      B2|WASHINGTON D

##### Temperatures

In [13]:
debug_df(temp_df)

Dataframe row count: 10000
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 2 rows

Dataframe first 2 rows: None
root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

Dataframe schema: None


In [14]:
temp_df = temp_df\
    .dropDuplicates(['dt', 'Latitude', 'Longitude'])\
    .dropDuplicates(['dt', 'City', 'Country'])\
    .where(temp_df.AverageTemperature.isNotNull())\
    .withColumn("City", sanitize(temp_df['City']))\
    .withColumn("Country", sanitize(temp_df['Country']))\
    .withColumn("month", month(temp_df['dt']))\
    .withColumn("year", year(temp_df['dt']))

In [15]:
debug_df(temp_df)

Dataframe row count: 9848
+----------+------------------+-----------------------------+-----+-------+--------+---------+-----+----+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|month|year|
+----------+------------------+-----------------------------+-----+-------+--------+---------+-----+----+
|1743-11-01|             6.068|           1.7369999999999999|ÅRHUS|DENMARK|  57.05N|   10.33E|   11|1743|
|1743-11-01|            10.013|                        2.291|ÇORLU| TURKEY|  40.99N|   27.69E|   11|1743|
+----------+------------------+-----------------------------+-----+-------+--------+---------+-----+----+
only showing top 2 rows

Dataframe first 2 rows: None
root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: str

##### Demographics

In [16]:
debug_df(demog_df)

Dataframe row count: 2891
+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|         City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|              Race|Count|
+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|Hispanic or Latino|25924|
|       Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|             White|58723|
+-------------+-------------+----------+---------------+---

In [17]:
demog_df = demog_df\
    .dropDuplicates(['City', 'State Code'])\
    .withColumn('City', sanitize(demog_df.City))

In [18]:
debug_df(demog_df)

Dataframe row count: 596
+-------+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+-------+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|ABILENE|Texas|      31.3|          65212|            60664|          125876|              9367|        8129|                  2.64|        TX|American Indian a...| 1813|
|  AKRON| Ohio|      38.1|          96886|           100667|          197553|             12878|       10024|                  2.24|        OH|Black or African-...|66551|
+-------+-----+----------+---------------+-----------------+----------------+------------------+------------+-----------

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The model will used a star schema. Aggregating or partioning is almost already possible on cities across the staging frames / tables previously created. Time is also a good candidate for as a dimension and to look at trends over time (ie. temperature vs. travel dates).

The reasoning behind the use of a star schema vs a snowflake schema. Both are commonly used models for data-warehousing but the star model:
* decreases the chance of data integrity problems and is simpler to maintain
* reduce query complexity and query response time (less JOINs)

We will not include any information about traveler origin to avoid any sort of bias on origins or genders.

The start schema includes the following tables:

* `location_dim` table: `city_name`, `state`, `country_name`, `longitude`, `latitude`
* `time_dim`: `date`, `day`, `weekday`, `week`, `month`, `year`
* `temperature_dim: `city_code`, `avg_temperature`, `month`, `year`
* `traveler_fact`: `id`, `arrival_date`, `departure_date`, `visa_type`, `month`, `year`



#### 3.2 Mapping Out Data Pipelines

The data pipeline will have the following steps listed in order:

1. Clean data
2. Store data into staging table (schema-on read with Spark)
3. Transform 3FN staging table into a star schema with Spark SQL
4. Add a simple data quality that guarantee the presence of rows into the target tables after transformation
5. Partition data per city, month, year
6. Save data tables into Parquet files that can be re-used for a future analysis.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model with of use of temporary views cretfrom Spark dataframes

In [19]:
temp_staging = "temp_staging"
temp_df.createOrReplaceTempView(temp_staging)

demog_staging = "demog_staging"
demog_df.createOrReplaceTempView(demog_staging)

travel_staging = "travel_staging"
travel_df.createOrReplaceTempView(travel_staging)

In [20]:
temp_df.printSchema()
demog_df.printSchema()
travel_df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon

In [21]:
spark.sql("""
SELECT DISTINCT
    cicid as id, 
    visatype as visa_type, 
    i94mon as month, 
    i94yr as year, 
    i94cit as city_name
FROM {}
"""\
    .format(travel_staging))\
    .write\
    .partitionBy("month", "year")\
    .parquet("dim_fact/traveler_fact.parquet", mode="Overwrite")

In [22]:
spark.sql("""
SELECT DISTINCT
   city as city_name,
   AverageTemperature as avg_temperature, 
   month, 
   year
FROM {}
"""\
    .format(temp_staging))\
    .write\
    .partitionBy("month", "year")\
    .parquet("dim_fact/temperature_dim.parquet", mode="Overwrite")

In [27]:
spark.sql("""
SELECT DISTINCT
    demographics.city as city_name,
    country as country_name,
    state as state_name,
    longitude, 
    latitude
FROM {} demographics
JOIN temp_staging temperatures ON
    demographics.city = temperatures.city
"""\
    .format(demog_staging, temp_staging))\
    .write\
    .partitionBy("state_name", "country_name")\
    .parquet("dim_fact/location_dim.parquet", mode="Overwrite")

In [24]:
spark.sql("""
SELECT DISTINCT
    date,
    dayofmonth(date) as day,
    dayofweek(date) as weekday,
    weekofyear(date) as week,
    month(date) as month,
    year(date) as year
FROM {}
"""\
    .format(travel_staging))\
    .write\
    .partitionBy("month", "year")\
    .parquet("dim_fact/time_dim.parquet", mode="Overwrite")

#### 4.2 Data Quality Checks

In [28]:
def it_should_have_records(parquet_file):
    """
    Verify that the number of row in the table is greater than 0
    """
    parquetData = spark.read.parquet(parquet_file)
    if (parquetData.count() <= 0):
        raise ValueError("The number of records in {} must greater than 0".format(parquet_file))
    else:
        print("Test passed: {} records found for {}".format(parquetData.count(), parquet_file))

it_should_have_records("dim_fact/time_dim.parquet")
it_should_have_records("dim_fact/temperature_dim.parquet")
it_should_have_records("dim_fact/traveler_fact.parquet")

Test passed: 1 records found for dim_fact/time_dim.parquet
Test passed: 9848 records found for dim_fact/temperature_dim.parquet
Test passed: 9586 records found for dim_fact/traveler_fact.parquet


#### 4.3 Data dictionary 

Brief description of what the data is and where it came from:

`location_dim` table: 
* `city_name`: City name
* `state`: State
* `country_name`: Country name
* `longitude`: geo-localisation info.
* `latitude`: geo-localisation info.

`time_dim` table: 
* `date`: ISO date
* `day`: day
* `weekday`: day of the week
* `week`: week of the year
* `month`: month
* `year`: year

`temperature_dim` table: 
* `city_name`: City name
* `avg_temperature`: average temperature for the month
* `month`: month of the year
* `year`: year

`traveler_fact` table: 
* `id`: traveler unique id
* `arrival_date`: traveler arrival date
* `visa_type`: traveler visa type (ie. B2)
* `month`: visa delivery month
* `year`: visa delivery year


#### Step 5: Complete Project Write Up
* Rationale for the choice of tools and technologies for the project:

Spark was chosen to enable MPP (Mass Parallel Processing) due to the size of the dataset.

Spark SQL simplifies reading values from different files formats (SAS, CSV, JSON) and provides schema-on read which reduces the time spent in modelling tables.

UDF functions were also used for Spark to process data for existing or new columns that needed particular processing (ie. date conversions).

The data should be updated on regular basis. The current project does not integrate with AWS S3 but this change would be pretty seamless and guarantee that the pipeline can get data from different sources that get updated by upstream producers. The importance of frequently updating data depends on when consumers will need it. Depending on the frequency, the pipeline could use a time-based approach in Apache Airflow to split the data in small tasks within DAGs that can then be executed quickly.

Changes in frequency could affect the resources (memory, disk) of AWS EMR or Airflow workers, so understanding these requirements will need to continuously observed.

 * If the data was increased by 100x, it is recommended to monitor/re-calibrate the current resources (memory, disk, CPU) to accomodate the increased capacity, whether it's Spark Nodes or Airflow workers. CPU will likely be the metrics the most impacted and data distribution (related to partioning and replication) will need to analysed with a likely increased number of new use cases. It is important to measure and observe these changes because relying on Cloud managed services does not solve all these challenges if configurations and data processing performance is not aligned with usages. Ie. [Optimize Amazon S3 for High Concurrency in Distributed Workloads](https://aws.amazon.com/blogs/big-data/optimizing-amazon-s3-for-high-concurrency-in-distributed-workloads/)
 
 * If the data populates a dashboard that must be updated on a daily basis by 7am every day, Airflow schedules and automated alerts (ie. Slack) would be very helpful and help to quickly narrow down errors/mistakes preventing data processing.
 
 * If the database needed to be accessed by 100+ people, a data-warehouse like AWS RedShift or GCP BigQuery should be considered as these systems are designed to optimise data storage and read access for a large number of concurrent users.