# Data Engineering Capstone Project

#### Project Summary
Our objective is to gain insight in tourism by finding out how many people are traveling to destinations (states), what the weather conditions (average temperature) are like at that moment in time so tourism operators can adjust their offerings accordingly. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# install pandas package, required for .toPandas() functionality
sc.install_pypi_package("pandas==0.25.1") 

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1627990089475_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==0.25.1
  Using cached https://files.pythonhosted.org/packages/7e/ab/ea76361f9d3e732e114adcd801d2820d5319c23d0ac5482fa3b412db217e/pandas-0.25.1-cp37-cp37m-manylinux1_x86_64.whl
Collecting python-dateutil>=2.6.1 (from pandas==0.25.1)
  Using cached https://files.pythonhosted.org/packages/36/7a/87837f39d0296e723bb9b62bbb257d0355c7f6128853c78955f57342a56d/python_dateutil-2.8.2-py2.py3-none-any.whl
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-0.25.1 python-dateutil-2.8.2

In [2]:
from pyspark.sql.functions import udf, to_date, col, month, year, dayofmonth, split, format_string, abs, isnan, when, count, substring, length, regexp_extract, monotonically_increasing_id
from datetime import datetime
from datetime import timedelta

import pyspark.sql.types as t
# import pandas as pd
import numpy as np
import os
import logging
import configparser

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# settings, specify the s3 bucket name on which to read data from and store data on
# config = configparser.ConfigParser()
# config.read('settings.cfg')

# ensure that the environment variables are set before the spark session is started, otherwise s3 cannot be accessed
# os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
# os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
# s3_bucket=config['AWS']['AWS_S3_BUCKET_LOC']
s3_bucket='jjudacitydatalake'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 1: Scope the Project and Gather Data

#### Scope 
We want to gain insight in tourism in the United States. Questions like, how many people are traveling to destinations and what are the weather conditions like at the arrival location. The goal is for tourism operators to gain insight in why people travel there so they can adjust their tourism offerings. 

We use the immigration dataset provided by Udacity (originally from the US National Tourism and Trade Office) which can be found here: https://travel.trade.gov/research/reports/i94/historical/2016.html. 

Our goal is to run a one-time analysis on the full (i.e. the whole year) dataset. We want to match the immigration data to historical temperatures.
One particular example is to count how many people are arriving in a certain state per time period (year, month) and what the expected historical average temperature is like at that period in time.

The final data model provides information on immigration data as well as temperature data. Each record of the immigration data contains information about the specifics of an arrival into the US (such as airline, port of arrival, state) as well as some specifics on the person arriving (such as gender, birth year). 
Temperature data is originally provided on a City/Country basis which makes it hard to join with the immigration data. The geographic information contained in the immigration data is the state so our ETL uses the coordinate data to link City/Country information to State (with the intermediate use of the airport code table).
An example analysis of how a user may use the data model to gain insight is given below in the notebook. See also the table design below in the notebook for information on how users might link the available information together.

We'll store the immigration data and the temperature data on S3 with a suitable partitioning for efficient processing. The processing itself will be done via Apache Spark on an EMR cluster. Results will be written back to S3.

Users will be able to read in the processed parquet files on S3 from the /capstone/processed/ folders. With a tool like Power BI, they will be able to read in the star schema and perform their analysis.
One example of an analysis is shown in this notebook (see below).

#### Describe and Gather Data 
##### Immigration data
Describe the data sets you're using. Where did it come from? What type of information is included? 

We use the immigration dataset provided by Udacity (originally from the US National Tourism and Trade Office) which can be found here: https://travel.trade.gov/research/reports/i94/historical/2016.html. The US National Tourism and Trade Office provides this dataset to the public for third parties to gain useful insights. Each entry in the dataset is an arrival into the USA with additional data provided (such as gender, date of arrival, airline, purposes of visit (we filter on tourism/pleasure).

The Immigration data has been provided by a separate disk on the Udacity workspace. I have read this data in with pandas and re-written it without modifications as parquet files in the i94parquet folder on my S3 folder.

##### Temperature data
This dataset was provided by Udacity and is originally sourced from Kaggle. More information can be found here: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data.




In [4]:
def readMultipleParquet(listPaths):
    """
    Given a list of paths, return Spark Dataframe for processing
    """
    sc.setJobGroup("Read", "Reading multiple parquet files")
    return spark.read.parquet(*listPaths)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
def sasDateToDatetime(sasdate):
    """
    Given a spark column which specifies the number of days since 1960, return the datetime object
    """
    return None if sasdate == None else datetime.strptime('1960-01-01', "%Y-%m-%d") + timedelta(sasdate)

valid_us_states = ["AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DC", "DE", "FL", "GA", 
          "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", 
          "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", 
          "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", 
          "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"]

sasdate_udf = udf(sasDateToDatetime, t.DateType())

def read_immigration_staging(listPaths):
    """
    Given a list of strings representing paths to sas data files stored in parquet format, read and transform the immmigration staging data and return the DataFrame.
    """
    
    sc.setJobGroup("Read", "Read raw immigration staging data")

    raw_data = readMultipleParquet(listPaths)

    print(f"Number of raw rows read: {raw_data.count()}")
    
    sc.setJobGroup("Read", "Read and transform immigration staging data")
    
    final_data = raw_data.\
        withColumn('arrdate_dt', sasdate_udf('arrdate')).\
        withColumn('depdate_dt', sasdate_udf('depdate')).\
        withColumn('arrdate_dayofmonth', dayofmonth(col('arrdate_dt'))).\
        withColumn('arrdate_month', month(col('arrdate_dt'))).\
        withColumn('arrdate_year', year(col('arrdate_dt'))).\
        withColumn('state', when(~col('i94addr').isin(valid_us_states), 'other').otherwise(col('i94addr'))).\
        fillna('other', subset='state').\
        fillna('unknown', subset='gender').\
        dropDuplicates().\
        select('i94port', 'biryear', 'gender', 'airline', 'i94visa', 'arrdate_dt', 'depdate_dt', 'arrdate_dayofmonth', 'arrdate_month', 'arrdate_year', 'state').\
        filter(col('i94visa') == 2).\
        withColumn('id_imm', monotonically_increasing_id())

    print(f"Number of rows in final selected dataset: {final_data.count()}")
    
    return raw_data, final_data

immigration_raw_data, immigration_final_data = read_immigration_staging([f's3://{s3_bucket}/capstone/staging/i94_parquet/i94_apr16_sub.sas7bdat', f's3://{s3_bucket}/capstone/staging/i94_parquet/i94_may16_sub.sas7bdat'])



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of raw rows read: 6540562
Number of rows in final selected dataset: 5388905

## Examine the raw immigration data. 

In [6]:
immigration_raw_data.limit(10).show(truncate=False)

immigration_raw_data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|cicid    |i94yr |i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto |gender|insnum|airline|admnum         |fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|1689141.0|2016.0|5.0   |135.0 |135.0 |LVG    |20583.0|1.0    |NE     |20590.0|27.0  |2.0    |1.0  |20160509|null    |null |G      |O      |null   |M      |1989.0 |08062016|M     |null  |VS     |6.0079838633E10|00043|WT      |
|1689142.0|2016.0|5.0   |135.0 |135.0 |LVG    |20583.0|1.0    |NE     |20590.0|26.0  |2.0   

# Dataset enhancement

### This is part of Step 2: Explore and Assess the Data
Identify data quality issues, like missing values, duplicate data, etc. & Document steps necessary to clean the data.
These steps are performed in method read_immigration_staging.

## Tourism filtering
We are interested in tourism data so from the data description we find we have to filter on i94visa = 2 (Pleasure).

<code>
/* I94VISA - Visa codes collapsed into three categories:
   1 = Business
   2 = Pleasure
   3 = Student
*/
</code>

## Adding useful date fields
The arrdate and depdate fields are double. They represent the number of days since 1-1-1960.
We add columns arrdate_dt, depdate_dt (to parse them as proper datetime objects) as well as day of month, month and year columns which can be extracted from arrdate_dt.

## Fixing US states
The US contains 50 states. The field i94addr can contain invalid data (i.e. entries outside the applicable list of 50 states). If this happens we replace the value with 'other'.
We also replace values null with other.

## Dropping duplicates
We drop duplicate rows in the immigration dataset. Each row ought to be unique.

## Gender
Sometimes gender is null. As there's no way of enriching the data we substitute the null value with 'unknown'

## Examine result after data enhancement

In [7]:
immigration_final_data.limit(10).toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

  i94port  biryear   gender airline  ...  arrdate_month arrdate_year state  id_imm
0     NYC   1975.0        M      AM  ...              5         2016    NY       0
1     NYC   1950.0        M      FI  ...              5         2016    NY       1
2     NYC   1984.0        M      VS  ...              5         2016    NY       2
3     NYC   1949.0        M      BA  ...              5         2016    NY       3
4     ORL   1969.0        F      VS  ...              5         2016    FL       4
5     ORL   1993.0  unknown      BA  ...              5         2016    FL       5
6     MIA   1987.0        M      BA  ...              5         2016    FL       6
7     MIA   1947.0        F      BA  ...              5         2016    FL       7
8     TAM   1975.0  unknown      BA  ...              5         2016    FL       8
9     MIA   2001.0        F      AB  ...              5         2016    FL       9

[10 rows x 12 columns]

In [8]:
immigration_final_data.limit(10).show(truncate=False)

immigration_final_data.printSchema()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------+-------+-------+-------+----------+----------+------------------+-------------+------------+-----+----------+
|i94port|biryear|gender |airline|i94visa|arrdate_dt|depdate_dt|arrdate_dayofmonth|arrdate_month|arrdate_year|state|id_imm    |
+-------+-------+-------+-------+-------+----------+----------+------------------+-------------+------------+-----+----------+
|LVG    |1951.0 |M      |VS     |2.0    |2016-05-09|2016-05-22|9                 |5            |2016        |NV   |8589934592|
|ORL    |1994.0 |unknown|BA     |2.0    |2016-05-09|2016-05-23|9                 |5            |2016        |FL   |8589934593|
|MIA    |1994.0 |M      |BA     |2.0    |2016-05-09|2016-05-14|9                 |5            |2016        |FL   |8589934594|
|SFR    |1986.0 |unknown|BA     |2.0    |2016-05-09|2016-05-31|9                 |5            |2016        |CA   |8589934595|
|PIT    |1992.0 |M      |ZX     |2.0    |2016-05-09|2016-05-27|9                 |5            |2016        |PA

# Database normalization
Above we have a selection of useful columns. We have picked these as they contain useful information about the people arriving as well as the immigration details (port of arrival i94port, visa type (i94visa) and airline.

In order to properly store this information we apply the database normalization technique covered earlier in order to come up with a robust database design. We do this to minimize duplicate data, to minimize data modification issues and to simplify queries.

When we look at the above table we see a lot of information: port of arrival, date of birth of the person entering, gender of the person entering, airline the person entered with, visa type (Tourism in this case), arrival datetime, departure datetime, day of month, month and year of arrival date, destination state and unique id id_imm.
There are several topics covered in this table: airport, person entering on date, airlines and state.

We will later on split these tables in a more proper format.

<img src="https://i.imgur.com/jHBgMur.png">


# Read in immigration data

Explore data skewness, find partition/clustering keys in order to parallelize processing

I have a gut feeling that the number of people arriving each month and day of month ought to be relatively constant. Let's validate this assumption:

In [9]:
immigration_final_data.createOrReplaceTempView("immdata")
dataSkewQuery = spark.sql("""
select arrdate_month as month, arrdate_dayofmonth as day, count(*) as immnum
from immdata
group by month, day
order by month asc, day asc
""")
dataSkewQuery.toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

    month  day  immnum
0       4    1   95254
1       4    2   83883
2       4    3   66613
3       4    4   69904
4       4    5   74432
..    ...  ...     ...
56      5   27  107975
57      5   28  104771
58      5   29   75577
59      5   30   64529
60      5   31   69553

[61 rows x 3 columns]

## Examine seasonality in the data, how many visitors visit the US each month?

In [10]:
immigration_final_data.createOrReplaceTempView("immdata")
seasonality_table = spark.sql("""
select arrdate_month as month, count(*) as immnum
from immdata
group by month
order by month asc
""")
seasonality_table.toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   month   immnum
0      4  2530868
1      5  2858037

# Read in temperature data

In [6]:
def read_temperature_staging(listPaths):
    """
    Given a list of paths to csv files, read in the temperature data, run transform and return dataframe.
    """

    sc.setJobGroup("Read", "Read raw temperature staging data")
    raw_data = spark.read.option("header", "true").csv(*listPaths)

    sc.setJobGroup("Read", "Read and transform temperature staging data")
    final_data = raw_data.\
    filter(col('Country') == 'United States').\
    select(to_date(col("dt"),"yyyy-MM-dd").alias("dt"), 'AverageTemperature', 'City', 'Country', 'Latitude', 'Longitude').\
    withColumn('dayofmonth', dayofmonth(col('dt'))).\
    withColumn('month', month(col('dt'))).\
    withColumn('year', year(col('dt'))).\
    withColumn("latitude_rounded", format_string("%.0f", regexp_extract(col('Latitude'), '\d+.\d+', 0).cast(t.DoubleType()))).\
    withColumn("longitude_rounded", format_string("%.0f", regexp_extract(col('Longitude'), '\d+.\d+', 0).cast(t.DoubleType()))).\
    dropna()
    
    return raw_data, final_data
#     raw_data.limit(10).show(truncate=False)

temperature_raw_data, temperature_final_data = read_temperature_staging([f's3://{s3_bucket}/capstone/staging/temperature_data/GlobalLandTemperaturesByCity.csv'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:

temperature_final_data.createOrReplaceTempView("tempdata_coord")
temp_table = spark.sql("""
select dayofmonth, month, latitude_rounded as lat, longitude_rounded as long, avg(AverageTemperature) as AvgTemp
from tempdata_coord
group by lat, long, month, dayofmonth
order by lat asc, long asc, month asc, dayofmonth asc
""")

# temp_table = temp_table.withColumn("id_temp_coord", monotonically_increasing_id())
temp_table.show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----+---+----+------------------+
|dayofmonth|month|lat|long|AvgTemp           |
+----------+-----+---+----+------------------+
|1         |1    |27 |81  |17.598033333333337|
|1         |2    |27 |81  |18.65918828451882 |
|1         |3    |27 |81  |20.510491666666674|
|1         |4    |27 |81  |22.596529166666656|
|1         |5    |27 |81  |25.009309623430973|
|1         |6    |27 |81  |27.00768049792531 |
|1         |7    |27 |81  |27.85293775933608 |
|1         |8    |27 |81  |27.92431535269709 |
|1         |9    |27 |81  |26.861252100840332|
|1         |10   |27 |81  |23.967126582278475|
|1         |11   |27 |81  |20.727659663865527|
|1         |12   |27 |81  |18.027008403361332|
|1         |1    |27 |82  |18.587668103448276|
|1         |2    |27 |82  |18.725623376623375|
|1         |3    |27 |82  |19.821219827586205|
|1         |4    |27 |82  |21.626489270386273|
|1         |5    |27 |82  |24.153780172413803|
|1         |6    |27 |82  |26.069188841201726|
|1         |7

# Read in airport code data

# Mapping coordinates to states
We introduce a bit of noise by rounding the lat/long coordinates to 0 decimals. This means that each lat, long pair can have multiple states, which is undesirable.

Below we count the number of states contained within each lat/long pair and select the maximum as being the most representative state for each lat/long coordinate.

In [8]:
def read_airport_codes_staging(listPaths):
    """
    Given a list of strings to airport code csv files, return the raw and final dataframes of airport codes.
    """
    
    sc.setJobGroup("Read", "Read raw airport code staging data")
    raw_data = spark.read.option("header", "true").csv(*listPaths)
    
    # coordinates are specified in [longitude, latitude]
    coordinates_split = split(raw_data['coordinates'], ',')
    region_split = split(raw_data['iso_region'], '-')

    sc.setJobGroup("Read", "Read and transform airport code staging data")
    final_data = raw_data.\
                        filter(col('iso_country') == 'US').\
                        withColumn("latitude", format_string("%.0f", abs(coordinates_split.getItem(1).cast(t.DoubleType())))).\
                        withColumn("longitude", format_string("%.0f", abs(coordinates_split.getItem(0).cast(t.DoubleType())))).\
                        withColumn("state", region_split.getItem(1)).\
                        withColumn('state', when(~col('state').isin(valid_us_states), 'other').otherwise(col('state'))).\
                        fillna('other', subset='state')
    
    return raw_data, final_data

airport_codes_raw_data, airport_codes_final_data = read_airport_codes_staging([f's3://{s3_bucket}/capstone/staging/airportcodes_data/airport-codes_csv.csv'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
def create_temperature_table():
    """
    Given two dataframes, airport_codes_final_data and temperature_final_data, join these on coordinates and return a dataframe
    with columns day of month, month, state and the average temperature applicable
    """
    sc.setJobGroup("Transform", "Create temperature helper table")
    
    temperature_final_data.createOrReplaceTempView("tempdata_coord")
    temp_table = spark.sql("""
    select dayofmonth, month, latitude_rounded as lat, longitude_rounded as long, avg(AverageTemperature) as AvgTemp
    from tempdata_coord
    group by lat, long, month, dayofmonth
    order by lat asc, long asc, month asc, dayofmonth asc
    """)

    airport_codes_final_data.createOrReplaceTempView("aircodes")
    # count the number of states for each lat/long pair
    aircode_table1 = spark.sql("""
    select latitude, longitude, state, count(state) as num
    from aircodes
    group by latitude, longitude, state
    order by latitude, longitude, state
    """)

    airport_codes_final_data.createOrReplaceTempView("aircodes")
    # determine the maximum count per lat/long pair
    aircode_table2 = spark.sql("""
    select latitude as lat, longitude as long, max(num) as maxPerLatLong from (
        select latitude, longitude, state, count(state) as num
        from aircodes
        group by latitude, longitude, state
        order by latitude, longitude, state
    )
    group by lat, long
    order by lat, long
    """)

    # join both tables to get the state with the most counts for each lat/long pairs
    aircode_table3 = aircode_table1.\
        join(aircode_table2, [aircode_table1.latitude == aircode_table2.lat, aircode_table1.longitude == aircode_table2.long, aircode_table1.num == aircode_table2.maxPerLatLong]).\
        drop('long', 'lat', 'num', 'maxPerLatLong')

    # finally, join both together on coordinates
    state_temp = temp_table.join(aircode_table3, [temp_table.lat == aircode_table3.latitude, temp_table.long == aircode_table3.longitude])

    state_temp.createOrReplaceTempView("state_temp")
    state_temp2 = spark.sql("""
    select dayofmonth, month, state, avg(AvgTemp) as avg_temp
    from state_temp
    group by dayofmonth, month, state
    order by dayofmonth, month, state
    """)
    
    state_temp2 = state_temp2.withColumn("id_temp", monotonically_increasing_id())
    
    return state_temp2


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
state_temp = create_temperature_table()

state_temp.show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----+-----+--------------------+----------+
|dayofmonth|month|state|avg_temp            |id_temp   |
+----------+-----+-----+--------------------+----------+
|1         |1    |AK   |-15.762100000000002 |0         |
|1         |1    |AL   |7.015045627376424   |1         |
|1         |1    |AR   |4.856640816326534   |2         |
|1         |1    |AZ   |9.667974719101123   |3         |
|1         |1    |CA   |7.45849751585624    |4         |
|1         |1    |CO   |-4.064113748795792  |5         |
|1         |1    |CT   |-0.9482965779467679 |6         |
|1         |1    |FL   |15.287752127682106  |7         |
|1         |1    |GA   |7.619192648922688   |8         |
|1         |1    |IA   |-7.177857954545457  |9         |
|1         |1    |IL   |-3.804348052771056  |10        |
|1         |1    |IN   |-2.4980444139589815 |11        |
|1         |1    |KS   |-1.9120205612310877 |12        |
|1         |1    |KY   |0.8350912547528511  |13        |
|1         |1    |LA   |10.2561

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

We have 3 sets of data:
1. immigration data
2. temperature data
3. airport codes

Our immigration data is composed of arrival datetime fields (datetime column, month, dayofmonth, year) as well as some personal info (gender, birth year, state they're entering) as well as the airline people flew with. 

The temperature data is composed of temperatures linked to cities and countries. I have tried to join on latitude and longitude with 2 decimals but this is too fine-grained. 
A 2 decimals latitude/longitude corresponds to a real world distance of approx 1.11 km. There is no overlap between the places where temperatures are measured an airport locations.
1 decimal corresponds to 11.1 km and zero to 111 km.

The airport code data is composed of iso-region (i.e. US-PA) and coordinates data (longitude, latitude).

In the end we'd like to relate the immigration data to temperatures.
We cannot do this directly, so the goal is: 
1. read in immigration data, extract state.
2. read in airport codes, extract latitude, longitude, round to zero decimals and extract state
3. read in temperature data, extract latitude, longitude, round to zero decimals

Using #3 we can create a helper table with columns: dayofmonth, month, lat, long, AvgTemp.
The idea of this helper table is to determine an average temperature for a given day of month, month and state combination.

Finally, we can link immigration data to average temperatures via the day of month and month (of the arrival dates) and the state.

For a schema, see: <img src="https://i.imgur.com/jHBgMur.png">

#### 3.2 Mapping Out Data Pipelines
##### Below contains the data dictionary as well!

1. Read in immigration data, temperature data, airport codes data
2. Select relevant columns
    1. immigration data: i94port|biryear|gender |airline|i94visa|i94addr|arrdate_dt|depdate_dt|arrdate_dayofmonth|arrdate_month|arrdate_year
    1. temperature data: dayofmonth|month|lat|long|AvgTemp           
    1. airport codes data: lat|long|state
3. create fact and dimension tables
    1. see image above for how to define them and their relationships


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

# Create dimension tables

Use .persist() on the smaller tables to optimize performance.

In [11]:
def create_dim_state():
    """
    Arguments: none. Return state dataframe.
    """
    sc.setJobGroup("Read", "Read and transform dim_state")
    
    return immigration_final_data.\
            select('state').\
            distinct().\
            withColumn("id_state", monotonically_increasing_id())
    
def create_dim_time():
    """
    Arguments: none. Return time dataframe.
    """
    sc.setJobGroup("Read", "Read and transform dim_time")
    return immigration_final_data.\
            select(col('arrdate_dt').alias('date'), col('arrdate_dayofmonth').alias('day_of_month'), col('arrdate_month').alias('month'), col('arrdate_year').alias('year')).\
            distinct().\
            withColumn("id_time", monotonically_increasing_id())
    
def create_dim_person():
    """
    Arguments: none. Return person dataframe.
    """    
    sc.setJobGroup("Read", "Read and transform dim_person")
    return immigration_final_data.\
            select('gender', 'biryear', 'id_imm').\
            withColumn("id_person", monotonically_increasing_id())
    
def create_dim_ports():
    """
    Arguments: none. Return ports dataframe.
    """    
    sc.setJobGroup("Read", "Read and transform dim_ports")
    return immigration_final_data.\
            select('i94port').alias('port').\
            distinct().\
            withColumn("id_port", monotonically_increasing_id())
    
def create_dim_airlines():
    """
    Arguments: none. Return airline dataframe.
    """    
    sc.setJobGroup("Read", "Read and transform dim_airlines")
    return immigration_final_data.\
            select('airline').\
            distinct().\
            withColumn("id_airline", monotonically_increasing_id())
    
def create_fact_temp():
    """
    Arguments: none. Return temperature dataframe.
    """    
    sc.setJobGroup("Read", "Read and transform dim_temp")
    
    return state_temp
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
dim_state = create_dim_state()
dim_state.persist()
dim_state.limit(10).show(truncate=False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------------+
|state|id_state    |
+-----+------------+
|LA   |77309411328 |
|NJ   |137438953472|
|OR   |257698037760|
|RI   |403726925824|
|KY   |420906795008|
|WY   |420906795009|
|NH   |438086664192|
|MI   |463856467968|
|NV   |472446402560|
|MT   |575525617664|
+-----+------------+

In [13]:
dim_time = create_dim_time()
dim_time.persist()
dim_time.limit(10).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------+-----+----+------------+
|date      |day_of_month|month|year|id_time     |
+----------+------------+-----+----+------------+
|2016-04-13|13          |4    |2016|34359738368 |
|2016-04-04|4           |4    |2016|51539607552 |
|2016-05-27|27          |5    |2016|51539607553 |
|2016-04-30|30          |4    |2016|77309411328 |
|2016-05-17|17          |5    |2016|85899345920 |
|2016-05-01|1           |5    |2016|146028888064|
|2016-04-16|16          |4    |2016|137438953472|
|2016-05-02|2           |5    |2016|163208757248|
|2016-04-08|8           |4    |2016|154618822656|
|2016-04-20|20          |4    |2016|369367187456|
+----------+------------+-----+----+------------+

In [14]:
dim_person = create_dim_person()
dim_person.persist()
dim_person.limit(10).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+------+---------+
|gender|biryear|id_imm|id_person|
+------+-------+------+---------+
|F     |1981.0 |0     |0        |
|F     |1970.0 |1     |1        |
|F     |1975.0 |2     |2        |
|M     |1975.0 |3     |3        |
|F     |1963.0 |4     |4        |
|M     |1960.0 |5     |5        |
|F     |1957.0 |6     |6        |
|F     |1959.0 |7     |7        |
|M     |1977.0 |8     |8        |
|F     |1964.0 |9     |9        |
+------+-------+------+---------+

In [15]:
dim_ports = create_dim_ports()
dim_ports.persist()
dim_ports.limit(10).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------+
|i94port|id_port     |
+-------+------------+
|DNS    |8589934592  |
|MOR    |8589934593  |
|HEL    |8589934594  |
|SNA    |34359738368 |
|PTK    |34359738369 |
|DLB    |68719476736 |
|ABS    |68719476737 |
|PVD    |103079215104|
|MYR    |103079215105|
|OAK    |111669149696|
+-------+------------+

In [16]:
dim_airlines = create_dim_airlines()
dim_airlines.persist()
dim_airlines.limit(10).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+
|airline|id_airline |
+-------+-----------+
|DZ     |0          |
|MM     |1          |
|LT     |2          |
|FI     |34359738368|
|AZ     |34359738369|
|R0E    |34359738370|
|B01    |34359738371|
|IC     |34359738372|
|FYG    |51539607552|
|743    |51539607553|
+-------+-----------+

In [17]:
fact_temp = create_fact_temp()
# fact_temp.show(444,truncate=False)
print(f"No of records in fact_temp: {fact_temp.count()}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

No of records in fact_temp: 444

# Create fact table

The fact_imm table should have 7 columns
1. id_imm
1. id_state
1. id_time
1. id_person
1. id_port
1. id_airline
1. id_temp

This is chosen so that we can all link the relevant immigration and temperature data together.
In joining the data we perform left joins. Each entry in immigration_final_data should be preserved as not to lose track of immigration records.
Also, some data is incomplete. For instance, the fact_temp table has 444 records total. This is suppressed due to not every state having a temperature defined in the dataset for each day of month and month combination. Making the input data complete would solve this problem.


In [20]:
def create_fact_imm():
    """
    Arguments: none. Return fact_imm dataframe.
    """    
    sc.setJobGroup("Read", "Read and transform fact_imm")
    
    # we specifically perform left joins. we could transition to inner joins but some of the tables incomplete due to lacking information in the temperature/airport code table.
    return immigration_final_data.\
            join(dim_time, [immigration_final_data.arrdate_dt == dim_time.date], "left").\
            join(dim_airlines, [immigration_final_data.airline == dim_airlines.airline], "left").\
            join(dim_ports, [immigration_final_data.i94port == dim_ports.i94port], "left").\
            join(dim_state, [immigration_final_data.state == dim_state.state], "left").\
            join(fact_temp, [immigration_final_data.arrdate_dayofmonth == fact_temp.dayofmonth, immigration_final_data.arrdate_month == fact_temp.month, immigration_final_data.state == fact_temp.state], "left").\
            join(dim_person, [immigration_final_data.id_imm == dim_person.id_imm], "left").\
            select(immigration_final_data.id_imm, dim_state.id_state, 'id_time', 'id_person', 'id_port', 'id_airline', 'id_temp')

fact_imm = create_fact_imm()
print(f"No of records in fact_imm: {fact_imm.count()}")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

No of records in fact_imm: 5388905

# Write the dimension/fact tables to S3

In [None]:
def writeToS3(displayName, destPath, writeMode, fmt, dataFrame):
    """
    Given a display name displayName, write dataFrame to s3 path 'destPath' with writeMode either 'append' or overwrite
    Use format fmt
    Returns None 
    """

    logging.info(f"Writing {displayName} to {destPath} with format {fmt}, mode {writeMode}.")

    dataFrame.write.format(fmt).mode(writeMode).save(destPath)
    
    
# write out dimension/fact tables to s3
writeToS3('dim_state', f"s3a://{s3_bucket}/capstone/processed/dim_state", 'overwrite', 'parquet', dim_state)
writeToS3('dim_time', f"s3a://{s3_bucket}/capstone/processed/dim_time", 'overwrite', 'parquet', dim_time)
writeToS3('dim_person', f"s3a://{s3_bucket}/capstone/processed/dim_person", 'overwrite', 'parquet', dim_person)
writeToS3('dim_airlines', f"s3a://{s3_bucket}/capstone/processed/dim_airlines", 'overwrite', 'parquet', dim_airlines)
writeToS3('dim_ports', f"s3a://{s3_bucket}/capstone/processed/dim_ports", 'overwrite', 'parquet', dim_ports)

writeToS3('fact_imm', f"s3a://{s3_bucket}/capstone/processed/fact_imm", 'overwrite', 'parquet', fact_imm)
writeToS3('fact_temp', f"s3a://{s3_bucket}/capstone/processed/fact_temp", 'overwrite', 'parquet', fact_temp)


In [21]:

def recordCount(table_name):
    """
    Given a dataframe table_name, return the number of records in it
    """
    return table_name.count()

def checkNumberOfRows(actual_count, expected_count):
    """
    Given a number actual_count, compare to expected_count and raise ValueError exception if it differs.
    """
    if actual_count != expected_count:
        logging.error(f"The number of records found is {actual_count}, differing from expected value {expected_count}")
        raise ValueError(f"The number of records found is {actual_count}, differing from expected value {expected_count}")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 

In [22]:
# perform data quality checks
spark.sparkContext.setJobGroup("DataQuality", "Counting number of records in tables")

expectedRowCount = {dim_state : 52, 
                    fact_temp : 444, 
                    dim_time : 61, 
                    fact_imm : 5388905,
                    dim_airlines : 622,
                    dim_person : 5388905,
                    dim_ports : 314,}

for obj in [dim_state, fact_temp, dim_time, fact_imm, dim_airlines, dim_person, dim_ports]:
    print(f"Evaluating {obj}")
    logging.info(f"Evaluating {obj}")
    numRows = recordCount(obj)

    checkNumberOfRows(numRows, expectedRowCount[obj])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Evaluating DataFrame[state: string, id_state: bigint]
Evaluating DataFrame[dayofmonth: int, month: int, state: string, avg_temp: double, id_temp: bigint]
Evaluating DataFrame[date: date, day_of_month: int, month: int, year: int, id_time: bigint]
Evaluating DataFrame[id_imm: bigint, id_state: bigint, id_time: bigint, id_person: bigint, id_port: bigint, id_airline: bigint, id_temp: bigint]
Evaluating DataFrame[airline: string, id_airline: bigint]
Evaluating DataFrame[gender: string, biryear: double, id_imm: bigint, id_person: bigint]
Evaluating DataFrame[i94port: string, id_port: bigint]

In [23]:
spark.sparkContext.setJobGroup("DataQuality", "Counting total number of distinct states")
dim_state.createOrReplaceTempView("state")
numDistinctStates = spark.sql("""
select count(distinct state) 
from state
""")

checkNumberOfRows(numDistinctStates.collect()[0]['count(DISTINCT state)'], len(valid_us_states) + 1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
In this project we choose the combination of AWS S3 for fast & efficient storage with an AWS EMR cluster for efficient processing on large datasets.
S3 is used as well to store the resulting analysis.

As we have relatively static data (immigration data grows relatively slowly with one day at a time) this analysis was set up as a run-once thing. Meaning, we put the data somewhere, run a one-time analysis and proceed wit hthe results.

* Propose how often the data should be updated and why.
Temperature data currently is averaged out over many years so there's no need to update it often unless weather conditions across the US change a lot. 

Immigration data will have grown by a year after one year of waiting so a yearly frequency might be in order.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.<br>
If the data was increased by 100x then I'd probably look into partitioning the data into smaller partitions. Furthermore, I would increase the number of nodes on the EMR cluster for faster processing. I'd also consider moving the data from S3 to Hadoop FS to optimize the I/O across the network.
Currently we also process all information at once and in full. It will be more efficient to partition data by time and process only the new data (with the help of a workflow management system like Apache Airflow for instance). As the Uber case has shown (one of the topics that have been suggested to me) it probably will be more efficient as well to skip the transformation part in the ETL process and copy the bulk data unprocessed to S3. Then, copy the new bulk data to Hadoop and run the transformations there.
The currently chosen partition key of month, day of month is a partition by time so if there is a 100x increase of data the partitions each will get 100x larger as well. Therefore an additional subdivision might come in handy. Perhaps partitioning by state (seeing as we group by state often) might be useful.
Lastly, I'd more strictly enforce a schema to ensure data quality.
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.<br>
I'd create an Airflow workflow to easily schedule this task. Furthermore I'd process the history once and rewrite the code such that only the daily data added would be processed.
 
 * The database needed to be accessed by 100+ people.<br>
If the database needed to be accessed by 100+ people I'd run the analysis once and cache the results somewhere and serve the cache. As the setup currently is now, there is no user input that could change the output. This would reflect the heavy-read scenario which was described in the feedback.
Generally, when optimizing for heavy reads one should make sure as few operations as possible are needed to serve the data. In this case I'd make sure the data would be horizontally scalable so I'd look at the proper partition/clustering keys and perhaps move the data into something like Apache Cassandra. 

# Analysis example

Find out what state has the most tourists per month and what is the temperature like at the destination?

In [25]:
sc.setJobGroup("Analysis", "Counting number of tourists per month and the average temperature therein")

fact_imm.createOrReplaceTempView("fact_imm")
fact_temp.createOrReplaceTempView("fact_temp")

analysis1 = spark.sql("""
select ft.month, ft.state, avg(ft.avg_temp) as avg_temp, count(fi.id_imm) as tourist_num 
from fact_imm fi
join fact_temp ft on (fi.id_temp = ft.id_temp)
group by month, state
order by tourist_num desc
""")
analysis1.limit(10).show(truncate=False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----+------------------+-----------+
|month|state|avg_temp          |tourist_num|
+-----+-----+------------------+-----------+
|4    |FL   |20.952401390146782|22934      |
|5    |FL   |24.05730552869499 |18995      |
|4    |NY   |6.426848223164618 |17658      |
|4    |CA   |12.953549933932452|13897      |
|5    |NY   |13.02698528746373 |12758      |
|5    |CA   |16.180092082676367|12066      |
|5    |NV   |21.31980813953488 |4081       |
|4    |TX   |18.75916808141302 |3873       |
|5    |TX   |22.84649000785298 |3000       |
|4    |IL   |11.008175343667743|2622       |
+-----+-----+------------------+-----------+