# U.S. i94 Immigrant Travel DataWarehouse Build
### A Data Engineering Project

#### Project Summary
This project recieves and prepares I94 Immigrant data from the United States for analysis. This data represents every immigrant departure at every port by date. It also has other pertinent information including the address the immigrant stayed in the US as well as whether the records was matched to an irrival i94 or not. The project intends to include dimensions of state-of-residence ddemographics and average monthly temps at departing port.

The model is used to answer analytical questions such as:
- What airports handle the most or least number of immigrants in a given month?
- Any correlation of immigrant travel with city temperatures at destination? Tourist?
- What airports may need more or less immigration officers assigned.
- What states are gaining or losing immigrants?

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# All imports and installs here
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from decimal import *
from pyspark.sql.functions import *

### Step 1: Scope the Project and Gather Data

#### Scope 
Model a star schema for the I94 immigrant travel data. The fact table is the aggregation of I94 travel data by month, airport and address. The model has two dimensions
1. state_demographics dimension has population data aggregated by state corresponding with the addresses of Immigrant State of residence.
2. I94port city details. This dimension details the city the airport is located in, along with weather data for that city aggregated by month where available.

The main reason to choose the star schema over other schemas like snowflake is beause of it's faster queries because the fact tables are not normalized. Our city and airport and temperature dimension is denormalized. This is deliberate to increase efficiency of analytical queries with less joins.

#### Describe and Gather Data 
The I94 Immigrant travel data is a collection of government immigration data from 2016 departures. It includes the port of departure, date , arrival where available, gender, year of birth and airline details.
The city data includes US city and state information including geographic coordinates.
US cities demographis information contains population counts for male, females, vets, foreign-born individuals by city.

### Step 2: Exploring and Assessing the Data

#### Common utility functions

In [2]:
# Create spark session
def getSparkSession():
    sparkSession = SparkSession.builder.\
    config("spark.jars.repositories", "https://repos.spark-packages.org/").\
    config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    enableHiveSupport().getOrCreate()
    return sparkSession

In [3]:
# Read SAS7BAT data to Spark DF
def readSAS7BDATtoSparkDF(sparksession,fname):
    df_spark = sparksession.read.format('com.github.saurfang.sas.spark').load(fname)
    return df_spark

In [4]:
# Read CSV to Spark DF
def readCSVtoSparkDF(sparksession,fname,delimeter_=';',header_=True,inferSchema_=True):
    sparkDF = sparksession.read.option("delimiter",delimeter_).option("header",True).option("inferSchema",True).csv(fname)
    return sparkDF

In [5]:
# Clean I94 Travel Data
def cleanI94TravelData(I94TravelDataDF):
    I94TravelDataDF = I94TravelDataDF["i94yr","i94mon","i94port","i94addr"]
    I94TravelDataDF = I94TravelDataDF.groupby("i94yr","i94mon","i94port","i94addr").count()
    return I94TravelDataDF

In [6]:
# Clean city demographics data
def cleanCityDemographicsData(cityDemographicsDataDF):
    cityDemographicsDataDF = cityDemographicsDataDF.drop('Race','Count','Average Household Size','Median Age','City','State')
    cityDemographicsDataDF = cityDemographicsDataDF.drop_duplicates()
    cleanedCityDemographicsDataDF = cityDemographicsDataDF \
        .groupby("State Code") \
        .sum("Male Population","Female Population","Total Population","Number of Veterans","Foreign-born") \
        .withColumnRenamed("sum(Male Population)", "male_population") \
        .withColumnRenamed("sum(Female Population)", "female_population") \
        .withColumnRenamed("sum(Total Population)", "total_population") \
        .withColumnRenamed("sum(Number of Veterans)", "num_of_vets") \
        .withColumnRenamed("sum(Foreign-born)", "foreign_born") \
        .withColumnRenamed("State Code","state_code")
    return cleanedCityDemographicsDataDF

In [7]:
# Clean airport codes data
def cleanAirportCodes(airportCodesDF):
    airportCodesDF = airportCodesDF.dropna(how='any',subset=['iata_code']).filter(airportCodesDF.iso_country == 'US')
    airportCodesDF = airportCodesDF \
        .withColumn('Longitude', split(col("coordinates"),",").getItem(0).cast(DoubleType())) \
        .withColumn('Latitude',  split(col("coordinates"),",").getItem(1).cast(DoubleType()))
    airportCodesDF = airportCodesDF.drop("continent","iso_country","coordinates","ident")
    return airportCodesDF

In [8]:
# Clean global temperature by city data step 1 filter
def filterGlobalTemperaturesByCityDF(globalLandTemperaturesByCityDF):
    globalLandTemperaturesByCityDF = globalLandTemperaturesByCityDF.dropna(how='any')
    globalLandTemperaturesByCityDF = globalLandTemperaturesByCityDF.filter(globalLandTemperaturesByCityDF.Country == "United States")
    filteredGlobalLandTemperaturesByCityDF = globalLandTemperaturesByCityDF.withColumn('month',month(globalLandTemperaturesByCityDF.dt))
    return filteredGlobalLandTemperaturesByCityDF

In [9]:
# Clean global temperature by city data step 2 column renames
def replaceGlobalLandTemperaturesByCity(filteredGlobalLandTemperaturesByCityDF):
    filteredGlobalLandTemperaturesByCityDF = filteredGlobalLandTemperaturesByCityDF \
    .withColumn('absLongitude', (split(col("Longitude"),"W").getItem(0).cast(DoubleType())*-1 )) \
    .withColumn('absLatitude',split(col("Latitude"),"N").getItem(0).cast(DoubleType()))
    replacedGlobalLandTemperaturesByCityDF = filteredGlobalLandTemperaturesByCityDF.drop("dt","Longitude","Latitude")
    return replacedGlobalLandTemperaturesByCityDF

In [10]:
# Clean global temperature by city data step 3 groupby
def groupByGlobalLandTemperaturesByCity(replacedGlobalLandTemperaturesByCityDF):
    groupbyGlobalLandTemperaturesByCityDF = replacedGlobalLandTemperaturesByCityDF \
    .groupby("month","Country","City","absLongitude","absLatitude").mean("AverageTemperature","AverageTemperatureUncertainty") \
    .withColumnRenamed("avg(AverageTemperature)","avg_temp") \
    .withColumnRenamed("avg(AverageTemperatureUncertainty)","avg_temp_uncertainty")
    return groupbyGlobalLandTemperaturesByCityDF

In [11]:
def joinAndCleanupAirportAndTempData(airportCodesDF,groupByGlobalLandTemperaturesByCityDF):
    # Left join
    i95port_cities_mon_avg_tempsDF = airportCodesDF.join(groupByGlobalLandTemperaturesByCityDF,airportCodesDF.municipality == groupByGlobalLandTemperaturesByCityDF.City,how='left')
    # Eliminate any records where City name exists in two or more states.
    i95port_cities_mon_avg_tempsDF = i95port_cities_mon_avg_tempsDF \
        .filter( i95port_cities_mon_avg_tempsDF.Longitude.isNull() | ((abs(i95port_cities_mon_avg_tempsDF.absLongitude - i95port_cities_mon_avg_tempsDF.Longitude) < 2.0) \
        & (abs(i95port_cities_mon_avg_tempsDF.absLatitude - i95port_cities_mon_avg_tempsDF.Latitude) < 2.0)))
    # Drop unneded columns
    i95port_cities_mon_avg_tempsDF = i95port_cities_mon_avg_tempsDF \
        .drop("type","elevation_ft","gps_code","local_code","Longitude","Latitude","Country","City","absLongitude","absLatitude") \
        .withColumnRenamed("iso-region","state").withColumnRenamed("municipality","city").withColumnRenamed("iata_code","i94prtl")
    return i95port_cities_mon_avg_tempsDF

In [47]:
# Quality Check confirm existence of a table
def checkTableExists(table_name):
    list_of_tables = sparkSession.sql("show tables")
    if list_of_tables.count() > 0:
        if len(list_of_tables.filter(list_of_tables.tableName == 'state_demographics').collect()) == 1:
            return True
        else:
            return False
    else:
        return False

In [76]:
# Quality Check confirm existence of a table
def checkTableRowCountMatches(table_name,expected_count):
    query = f"SELECT COUNT(*) FROM {table_name}"
    actual_count_df = sparkSession.sql(query)
    if actual_count_df.count() > 0:
        actual_count_df = actual_count_df.withColumnRenamed("count(1)","actualCount")
        if len(actual_count_df.filter(actual_count_df.actualCount == expected_count).collect()) == 1:
            return True
        else:
            return False
    else:
        return False

In [None]:
def checkDistinctCountByColumnMatches(tablename,columnname,expected_count):
    query = f"SELECT DISTINCT {columnname} FROM {tablename}"
    spark_df = spark.sql(query)
    if spark_df.count() == expected_count:
        return True
    else:
        return False

#### Explore the Data 

##### Explore I94 Travel Data

In [18]:
# Get spark session
sparkSession = getSparkSession()

In [19]:
# Read a random month for exploration
I94TravelDataDF = readSAS7BDATtoSparkDF(sparkSession,'../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat')

In [20]:
# Explore records and compare with provided I94_SAS_Labels_Descriptions.SAS
I94TravelDataDF.filter(I94TravelDataDF.i94port == "ATL").filter(I94TravelDataDF.i94addr == "FL").filter(I94TravelDataDF.airline == "DL").filter(I94TravelDataDF.depdate == 20852.0).show()

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|   2347.0|2016.0|  11.0| 577.0| 577.0|    ATL|20759.0|    1.0|     FL|20852.0|  36.0|    1.0|  1.0|20161101|     GTM| null|      T|      I|   null|      M| 1980.0|04302017|     M|  null|     DL|1.2928767685E10|  906|      B1|
|2462421.0|2016.0|  11.0| 135.0| 135.0|    ATL|20772.0|    1.0|     FL|20852.0|  33.0|    2.

##### Explore City Demographics Data

In [21]:
# Get spark session
sparkSession = getSparkSession()

In [22]:
# Read city demographics data
cityDemographicsDataDF = readCSVtoSparkDF(sparkSession,"us-cities-demographics.csv",';',True,True)

In [23]:
# Explore a random city (Toledo)
cityDemographicsDataDF.filter(cityDemographicsDataDF.City == "Toledo").show()

+------+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|  City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+------+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|Toledo| Ohio|      36.1|         135455|           144323|          279778|             15286|        9257|                  2.29|        OH|American Indian a...|  3942|
|Toledo| Ohio|      36.1|         135455|           144323|          279778|             15286|        9257|                  2.29|        OH|  Hispanic or Latino| 23614|
|Toledo| Ohio|      36.1|         135455|           144323|          279778|             15286|        9257|                  2.29|        OH|   

##### Explore Airport Codes data

In [24]:
# Get spark session
sparkSession = getSparkSession()

In [25]:
# Read airport codes data
airportCodesDF = readCSVtoSparkDF(sparkSession,"airport-codes_csv.csv",',',True,True)

In [26]:
# Sample a regions airports
airportCodesDF.filter(airportCodesDF.iso_region == "US-GA").filter(airportCodesDF.type == "large_airport").show()

+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region| municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
| KAGS|large_airport|Augusta Regional ...|         144|       NA|         US|     US-GA|      Augusta|    KAGS|      AGS|       AGS|-81.9645004272461...|
| KATL|large_airport|Hartsfield Jackso...|        1026|       NA|         US|     US-GA|      Atlanta|    KATL|      ATL|       ATL| -84.428101, 33.6367|
| KMGE|large_airport|Dobbins Air Reser...|        1068|       NA|         US|     US-GA|     Marietta|    KMGE|      MGE|       MGE|-84.51629639, 33....|
| KSAV|large_airport|Savannah Hilton H...|          50|       NA|         US

#### Explore City Temperature Data

In [27]:
# Get spark session
sparkSession = getSparkSession()

In [28]:
# Read airport codes data
globalLandTemperaturesByCityDF = readCSVtoSparkDF(sparkSession,'../../data2/GlobalLandTemperaturesByCity.csv',',',True,True)

In [29]:
#Sample data
globalLandTemperaturesByCityDF.show()

+-------------------+-------------------+-----------------------------+-----+-------+--------+---------+
|                 dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01 00:00:00| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01 00:00:00|             10.644|           1.2

### Notes:

1.  The city and state of the airports is available. But the weather data only states the city. So the state information for the weather is missing. Joining these two will result in ambigouse data since some City names (for example Toledo) appear in more than one state.
The only piece of information available to ensure correct alignment of Airport city with it's weather is to also use coordinates. This piece of information is presented differently in the airport codes vs the weather data. Needs formatting to be able to use for joins.

2.  There is lots of missing data. For the case of weather any record missing temperature shall be removed.

#### Cleaning Steps

##### Steps to clean the I94 Travel Data
Main step here is to identify the needed columns. Many of the missing values are on columns not needed by our data model. Then appropriately groupby the required year, month, airport and address.

In [30]:
#Clean,De-dup,aggregate I94TravelData
cleanedI94TravelDataDF = cleanI94TravelData(I94TravelDataDF)

In [31]:
# validate the needed columns are present
cleanedI94TravelDataDF.filter(I94TravelDataDF.i94port == "DET").filter(I94TravelDataDF.i94addr == "RI").show()

+------+------+-------+-------+-----+
| i94yr|i94mon|i94port|i94addr|count|
+------+------+-------+-------+-----+
|2016.0|  11.0|    DET|     RI|   33|
+------+------+-------+-------+-----+



##### Clean City Demographics Data

City demographics data is denormalized and has lots of duplicates. The steps to clean are as follows
1. Identify the needed fields for out data model (state code, male, female,total, number of vets and foreign born populations)
2. Drop unneeded fields and the drop duplicates.
3. Finally group by the appropriate field (state code) and sum the populations.

In [32]:
# Clean city demographics data
cleanedCityDemographicsDataDF = cleanCityDemographicsData(cityDemographicsDataDF)

In [33]:
# Sample the data
cleanedCityDemographicsDataDF.filter(cleanedCityDemographicsDataDF.state_code == "AZ").show()

+----------+---------------+-----------------+----------------+-----------+------------+
|state_code|male_population|female_population|total_population|num_of_vets|foreign_born|
+----------+---------------+-----------------+----------------+-----------+------------+
|        AZ|        2227455|          2272087|         4499542|     264505|      682313|
+----------+---------------+-----------------+----------------+-----------+------------+



##### Clean Airport Codes Data

As noted during exploration, there are non-US airports as well as non-airports like Hellipads on this data. Cleanup by
1. Restricting to US only as we are interested in I94 departures from US.
2. Restrict by iata_code as that is the available information for departure port. This is key information, and any travel info without this is not useful for out data model. It is noise that needs to be cleaned up.

In [37]:
# Clean airport codes data
cleanedAirportCodesDF = cleanAirportCodes(airportCodesDF)

In [38]:
# Sample airport
cleanedAirportCodesDF.filter(cleanedAirportCodesDF.iata_code == "HOU").show()

+-------------+--------------------+------------+----------+------------+--------+---------+----------+------------+-----------+
|         type|                name|elevation_ft|iso_region|municipality|gps_code|iata_code|local_code|   Longitude|   Latitude|
+-------------+--------------------+------------+----------+------------+--------+---------+----------+------------+-----------+
|large_airport|William P Hobby A...|          46|     US-TX|     Houston|    KHOU|      HOU|       HOU|-95.27890015|29.64539909|
+-------------+--------------------+------------+----------+------------+--------+---------+----------+------------+-----------+



#### Clean City Temperature Data
We are only interested in the United States. Clean up steps involve
1. Filter and keep only United States data
2. Drop any records with null tempreatures are they are meaningless in this context.
3. Extract the month and create a column for month as we need to group by month for our data model.
4. To handle duplicates, we decide to take the average of the temps and variability index.

In [39]:
# Filter, replace columns and finally groupby
filteredGlobalLandTemperaturesByCityDF = filterGlobalTemperaturesByCityDF(globalLandTemperaturesByCityDF)
replacedGlobalLandTemperaturesByCityDF = replaceGlobalLandTemperaturesByCity(filteredGlobalLandTemperaturesByCityDF)
groupByGlobalLandTemperaturesByCityDF = groupByGlobalLandTemperaturesByCity(replacedGlobalLandTemperaturesByCityDF)

In [40]:
# Sample the city temperature aggregation
groupByGlobalLandTemperaturesByCityDF.show()

+-----+-------------+------------+------------+-----------+--------------------+--------------------+
|month|      Country|        City|absLongitude|absLatitude|            avg_temp|avg_temp_uncertainty|
+-----+-------------+------------+------------+-----------+--------------------+--------------------+
|    1|United States|  Alexandria|      -76.99|      39.38|-0.35399619771863117|  1.7332661596958165|
|    7|United States| Chula Vista|     -117.77|      32.95|  19.191678787878775|  0.8804181818181819|
|    5|United States|    Columbus|      -83.24|      39.38|  16.823312499999993|  1.2040703124999985|
|   10|United States|    Columbus|      -83.24|      39.38|  11.967081081081082|   1.280857142857143|
|    3|United States|     Concord|     -122.03|      37.78|  11.944357575757572|  0.5757030303030299|
|    3|United States|      Dayton|      -83.24|      39.38|   4.976736641221376|  1.4362671755725183|
|   12|United States|Fort Collins|     -104.38|      40.99|  -2.595175000000001|  

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

![title](data_model_i94_analytics.JPG)

#### 3.2 Mapping Out Data Pipelines
Steps to pipeline the data into the data model

1. Drop and re-create the spark tables needed using hive.
2. Read the us-cities demographics data
2. Clean the us-cities demographics data
3. Aggregate the us-cities demographics data by state
3. Load the us-cities demographics data to table state_demographics.

4. Read the airport codes data
5. Cleanup and aggregate the airport data
6. Align the coordinates data for joining.

7. Read the Global Cities Temperature Data.
8. Cleanup and aggreage the temperature data.
9. Align the coordinates data for joining.

10. Perform a left join of Airport Codes data and the Cities Temperature. This ensures all airport data is retained even if no temperature data for that city is available.
11. Load this date to the i94port_cities_mon_avg_temps table.

12. Read and cleanup the I94 travel data.
13. Drop unneeded columns and group by the year, month, airport and addres.
14. Load the fact table df_i94_aggregates.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [12]:
# Get spark session
spark = getSparkSession()

In [13]:
# Drop tables if exists
spark.sql("DROP TABLE IF EXISTS state_demographics")
spark.sql("DROP TABLE IF EXISTS df_i94_aggregates")
spark.sql("DROP TABLE IF EXISTS i94port_cities_mon_avg_temps")


DataFrame[]

In [14]:
# Define tables
create_state_demographics_sql = """CREATE TABLE IF NOT EXISTS state_demographics (
                                      state_code STRING, 
                                      male_population BIGINT, 
                                      female_population BIGINT, 
                                      total_population BIGINT, 
                                      num_of_vets BIGINT, 
                                      foreign_born BIGINT) USING hive"""

create_df_i94_aggregates_sql = """CREATE TABLE IF NOT EXISTS df_i94_aggregates (
                                     i94yr DOUBLE, 
                                     i94mon DOUBLE, 
                                     i94port STRING, 
                                     i94addr STRING, 
                                     count BIGINT) USING hive"""

create_i94port_cities_mon_avg_temps_sql = """CREATE TABLE IF NOT EXISTS i94port_cities_mon_avg_temps(
                                            name STRING,
                                            state STRING,
                                            city STRING,
                                            i94prtl STRING,
                                            month BIGINT,
                                            avg_temp FLOAT,
                                            avg_temp_uncertainty FLOAT) USING hive"""

In [15]:
# Create tables
spark.sql(create_state_demographics_sql)
spark.sql(create_df_i94_aggregates_sql)
spark.sql(create_i94port_cities_mon_avg_temps_sql)

DataFrame[]

In [16]:
# ETL for state demographics data
# Get spark session
sparkSession = getSparkSession()
# Read, transform and load city demographics data
cityDemographicsDataDF = readCSVtoSparkDF(sparkSession,"us-cities-demographics.csv",';',True,True)
cleanedCityDemographicsDataDF = cleanCityDemographicsData(cityDemographicsDataDF)
cleanedCityDemographicsDataDF.createOrReplaceTempView("state_demographics_tmp")
spark.sql("INSERT INTO state_demographics SELECT * FROM state_demographics_tmp")

DataFrame[]

In [17]:
# ETL for i94port_cities_mon_avg_temps
# Get spark session
sparkSession = getSparkSession()
# Read airport codes data
airportCodesDF = readCSVtoSparkDF(sparkSession,"airport-codes_csv.csv",',',True,True)
# Read global temperatures by city data
globalLandTemperaturesByCityDF = readCSVtoSparkDF(sparkSession,'../../data2/GlobalLandTemperaturesByCity.csv',',',True,True)
# Clean airport codes data
cleanedAirportCodesDF = cleanAirportCodes(airportCodesDF)
# Clean global temperatures by city data
filteredGlobalLandTemperaturesByCityDF = filterGlobalTemperaturesByCityDF(globalLandTemperaturesByCityDF)
replacedGlobalLandTemperaturesByCityDF = replaceGlobalLandTemperaturesByCity(filteredGlobalLandTemperaturesByCityDF)
groupByGlobalLandTemperaturesByCityDF = groupByGlobalLandTemperaturesByCity(replacedGlobalLandTemperaturesByCityDF)
# Join airport and temperature data and further cleanup
i95port_cities_mon_avg_tempsDF = joinAndCleanupAirportAndTempData(cleanedAirportCodesDF,groupByGlobalLandTemperaturesByCityDF)
# load
i95port_cities_mon_avg_tempsDF.createOrReplaceTempView("i94port_cities_mon_avg_temps_tmpview")
spark.sql("INSERT INTO i94port_cities_mon_avg_temps SELECT * FROM i94port_cities_mon_avg_temps_tmpview")

DataFrame[]

In [21]:
# ETL for df_i94_aggregates
# Get spark session
sparkSession = getSparkSession()
# List of data files
fnames = [
    '../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat',
    '../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat',
    '../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat',
    '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',
    '../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat',
    '../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat',
    '../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat',
    '../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat',
    '../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat',
    '../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat',
    '../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat',
    '../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat'
]
# Read, Transform and Load
for fname in fnames:
    I94TravelDataDF = readSAS7BDATtoSparkDF(sparkSession,fname)
    cleanedI94TravelDataDF = cleanI94TravelData(I94TravelDataDF)
    cleanedI94TravelDataDF.createOrReplaceTempView("i94tempTable")
    spark.sql("INSERT INTO df_i94_aggregates SELECT * FROM i94tempTable")

In [22]:
#Sample the data
spark.sql("SELECT * FROM state_demographics").show()

+----------+---------------+-----------------+----------------+-----------+------------+
|state_code|male_population|female_population|total_population|num_of_vets|foreign_born|
+----------+---------------+-----------------+----------------+-----------+------------+
|        IL|        2218541|          2343771|         4562312|     146701|      941735|
|        ME|          31480|            35392|           66872|       3666|        9229|
|        DE|          32680|            39277|           71957|       3063|        3336|
|        OH|        1177546|          1256143|         2433689|     127372|      175219|
|        LA|         626998|           673597|         1300595|      69771|       83419|
|        NM|         409010|           430032|          839042|      60474|       89112|
|        IA|         361176|           372635|          733811|      39898|       63687|
|        OK|         714573|           734422|         1448995|      95468|      151174|
|        AL|         

In [23]:
#Sample the data
spark.sql("SELECT * FROM i94port_cities_mon_avg_temps").show()

+--------------------+-----+------------+-------+-----+----------+--------------------+
|                name|state|        city|i94prtl|month|  avg_temp|avg_temp_uncertainty|
+--------------------+-----+------------+-------+-----+----------+--------------------+
|Jacksonville Exec...|US-FL|Jacksonville|    CRG|   10| 22.314194|           1.2847413|
|Jacksonville Exec...|US-FL|Jacksonville|    CRG|    3|   17.5267|           1.4525381|
|Jacksonville Exec...|US-FL|Jacksonville|    CRG|    8| 26.977701|              1.1355|
|Jacksonville Exec...|US-FL|Jacksonville|    CRG|   12|15.2025175|           1.6224138|
|Jacksonville Exec...|US-FL|Jacksonville|    CRG|   11|   18.5359|           1.4591423|
|Jacksonville Exec...|US-FL|Jacksonville|    CRG|    7| 26.932032|           1.2254186|
|Jacksonville Exec...|US-FL|Jacksonville|    CRG|    2| 15.430149|            1.696229|
|Jacksonville Exec...|US-FL|Jacksonville|    CRG|    5|  23.44645|           1.1750898|
|Jacksonville Exec...|US-FL|Jack

In [24]:
#Sample the data
spark.sql("SELECT * FROM df_i94_aggregates").show()

+------+------+-------+-------+-----+
| i94yr|i94mon|i94port|i94addr|count|
+------+------+-------+-------+-----+
|2016.0|   8.0|    SPM|     MN| 8026|
|2016.0|   8.0|    SAI|     NH|   35|
|2016.0|   8.0|    ELP|     TX|  110|
|2016.0|   8.0|    ATL|     NY| 4583|
|2016.0|   8.0|    BOS|     OH|  842|
|2016.0|   8.0|    SEA|     DE|  166|
|2016.0|   8.0|    DAL|     TR|   39|
|2016.0|   8.0|    DEN|     TN|   28|
|2016.0|   8.0|    XXX|     VA|   36|
|2016.0|   8.0|    ATL|     SO|   54|
|2016.0|   8.0|    DEN|     AK|   32|
|2016.0|   8.0|    ORL|     UN|   53|
|2016.0|   8.0|    VCV|   null| 1038|
|2016.0|   8.0|    POO|     PA|   18|
|2016.0|   8.0|    FTL|     WV|   20|
|2016.0|   8.0|    JAC|     GA|    3|
|2016.0|   8.0|    MAA|     RI|    1|
|2016.0|   8.0|    MAF|     WI|    1|
|2016.0|   8.0|    HIG|     PR|    3|
|2016.0|   8.0|    CLE|     GA|    1|
+------+------+-------+-------+-----+
only showing top 20 rows



#### 4.2 Data Quality Checks
Data quality checks to ensure the pipeline ran as expected. These include:
- Unit test the scripts by sampling the tables to verify actual data was loaded.
- Doing counts to verify approximate expected row counts
- Join tables to actually get a complete record for sample cities
- Validate the complete record is unique by month for completeness
 
Run Quality Checks

In [48]:
#Check existence of the table
print (checkTableExists("state_demographics"))

True


In [50]:
#Check existence of the table
print (checkTableExists("i94port_cities_mon_avg_temps"))

True


In [51]:
#Check existence of the table
print (checkTableExists("df_i94_aggregates"))

True


In [75]:
#Check row count matches for a given table
print (checkTableRowCountMatches("state_demographics",49))

True


In [90]:
# Check that we have all 12 months of data
print (checkDistinctCountByColumnMatches("i94port_cities_mon_avg_temps","month",12))

True


In [91]:
# Check that we have all 12 months of data
print (checkDistinctCountByColumnMatches("df_i94_aggregates","i94mon",12))

True


#### 4.3 Data dictionary 

#### Table: df_i94_aggregates

#### Table: state_demographics

#### Table: i94port_cities_mon_avg_temps

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

##### Tools
For data exploration, python pandas was sufficient and efficient for small data sets like the airport codes. For larger data set and requiring lots of groupby and analytical queries,  apache spark is the appropriate tool

For the global temperature data, Apache Spark is needed because of the volume and the amount of work to get it to align with cities data.

Enabling Hive in spark enabled creation of tables are using performing SQL directly on in memory spark tables.


#### Frequency of Update
The fact table data should be updated at least monthly so as to recieve the new data for the month.
The city demographics data should be updated as frequently as census is done or other trusted population estimation data is recieved.
The temperature data can be updated every few years.
The airport date should be updated every few years as well or whenever a few file is delivered.


#### Preparing for 100x data growth
Incase of 100x growht in amount of data, we need to implement a spark cluster with 12 workers so each months data is handled by a different worker. The data is delianted well by month and minimal cross referencing so this will improve efficiency and enable processing with reasonable available memory on the wokrkers.
If a larger cluster is needed, we need to further partition the data by month and by airport. This will take advantage of even larger Apache Clusters beyond just 12 workers.


#### Updating Dashboard by 7am
I proopose we implement the ETL via airflow because of the SLA of 7am. Airflow allows re-tries incase of any failures to improve likelihood of completing the tasks by 7am. Additionally, it also allows us to paralleliized tasks we can do in parallel easily. In addition, it has alarming capability to alert us incase of failure and inability to continue.

#### Database User base is 100 People
For this we need to write out the tables and load it to a DW like Redshift in AWS Cloud. This will allow many analytical queries to simulteneuously be run.