# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project gathers together data on immigration into the US, as well as demographics of the destination cities and the source countries, including mean 21st century temperature.

The data are processed using PySpark.

In [1]:
# Do all imports and installs here
import numpy as np
import os
import pandas as pd
import re

import pyspark.sql.functions as f

from datetime import timedelta
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType, IntegerType, StringType, FloatType

In [2]:
# Create Spark session
spark = SparkSession.builder.\
        config("spark.jars.repositories", "https://repos.spark-packages.org/").\
        config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
        enableHiveSupport().getOrCreate()

# Step 1: Import and prepare data
First of all, we will read in the data, and perform necessary filters and data transformations

## Immigration data
These data comes from the US National Tourism and Trade Office and details arriving immigrants to the US in the year 2016
These data are available [here](https://travel.trade.gov/research/reports/i94/historical/2016.html).

In [3]:
immigration_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

staging_immigration = (spark.
                  read.
                  format('com.github.saurfang.sas.spark').
                  load(immigration_file))

In [4]:
staging_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [5]:
staging_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


Of all these columns, those that we are interested in are:
  - i94cit
  - i94res
  - arrdate
  - depdate
  - i94mode
  - gender
  
`i94mode` will be used to filter to select only immigrants arriving by plane.

In [6]:
staging_immigration = (staging_immigration.
                       select("cicid", "i94yr", "i94mon", "i94res",
                              "i94port", "i94bir", "i94visa",
                              "gender","arrdate", "depdate", "i94mode"))

As we are interested in analysing arrival ports, we will drop any missing values

In [7]:
staging_immigration = staging_immigration.dropna(subset=["i94port"])

In [8]:
staging_immigration.groupBy("i94mode").count().show()

+-------+-------+
|i94mode|  count|
+-------+-------+
|   null|    239|
|    1.0|2994505|
|    3.0|  66660|
|    2.0|  26349|
|    9.0|   8560|
+-------+-------+



The key for these data is available in the file `I94_SAS_Labels_Descriptions.SAS`. 
We can use regular expressions to extract the key:value pairs

In [9]:
with open("I94_SAS_Labels_Descriptions.SAS") as sas_dict:
    sas_labels = [re.sub("[^a-zA-Z0-9,= ]*|[ {2+}]", "", x).split("=") for x in sas_dict.readlines()]

def sas_labels_df(sas_labels_subset):
    """
    This function takes a sub-range of the SAS labels text file and converts to a 
    dataframe
    """
    return pd.DataFrame(sas_labels_subset, columns = ["code", "key"]).set_index("code")

In [10]:
mode_codes = sas_labels_df(sas_labels[972:976])
mode_codes

Unnamed: 0_level_0,key
code,Unnamed: 1_level_1
1,Air
2,Sea
3,Land
9,Not reported


Therefore, we will filter the immigration data where `i94mode == 1`

In [11]:
staging_immigration = (staging_immigration.
                       filter("i94mode == 1"))

This filter is evaluated lazily, so is only performed once necessary.

Next we need to convert the date columns `arrdate` and `depdate` into a usable format.
SAS stores dates as days since 1/1/1960 which we will need to convert first. We use a 
user-defined function, 'udf', to allow this function to be applied directly on the Spark 
dataframe.

In [12]:
def sas_date_to_timestamp(x):
    """
    Converts a value x in SAS date format to 
    a standard Python date
    SAS date formats are integers recorded in 
    days since January 1st 1960
    """
    if x is None:
        return None
    origin_ts = pd.Timestamp("1960-01-01")
    return origin_ts + timedelta(x)

udf_sas_date_to_timestamp = udf(sas_date_to_timestamp, DateType())

In [13]:
staging_immigration = (staging_immigration.
                       withColumn("arrival_date", udf_sas_date_to_timestamp(f.col("arrdate"))).
                       withColumn("departure_date", udf_sas_date_to_timestamp(f.col("depdate")))
                       )

In [14]:
# Want to remove cases where immigrant left before they arrived
staging_immigration = (staging_immigration.
                       filter("arrdate < depdate")
                       )

In [15]:
staging_immigration.show(n = 5)

+-----+------+------+------+-------+------+-------+------+-------+-------+-------+------------+--------------+
|cicid| i94yr|i94mon|i94res|i94port|i94bir|i94visa|gender|arrdate|depdate|i94mode|arrival_date|departure_date|
+-----+------+------+------+-------+------+-------+------+-------+-------+-------+------------+--------------+
| 15.0|2016.0|   4.0| 101.0|    WAS|  55.0|    2.0|     M|20545.0|20691.0|    1.0|  2016-04-01|    2016-08-25|
| 16.0|2016.0|   4.0| 101.0|    NYC|  28.0|    2.0|  null|20545.0|20567.0|    1.0|  2016-04-01|    2016-04-23|
| 17.0|2016.0|   4.0| 101.0|    NYC|   4.0|    2.0|  null|20545.0|20567.0|    1.0|  2016-04-01|    2016-04-23|
| 18.0|2016.0|   4.0| 101.0|    NYC|  57.0|    1.0|  null|20545.0|20555.0|    1.0|  2016-04-01|    2016-04-11|
| 19.0|2016.0|   4.0| 101.0|    NYC|  63.0|    2.0|  null|20545.0|20558.0|    1.0|  2016-04-01|    2016-04-14|
+-----+------+------+------+-------+------+-------+------+-------+-------+-------+------------+--------------+
o

Next, to add context to the source of the immigrants, we decode the `i94res` variable

In [16]:
country_codes = sas_labels_df(sas_labels[9:298])
# Remove some invalid codes
country_codes = country_codes[~country_codes.key.str.contains("INVALID|No Country|should not show")]
# Convert to title case to conform with other datasets
country_codes.key = country_codes.key.str.title()
# Remove unnecessary space
country_codes.key = country_codes.key.str.strip()
country_codes.reset_index(inplace = True)
# Convert country code to integer
country_codes.code = country_codes.code.astype('int')
country_codes.set_index("code", inplace = True)

In [17]:
valid_country_codes = list(country_codes.index)

In [18]:
staging_immigration = (staging_immigration.
                  filter(staging_immigration.i94res.isin(valid_country_codes))
                 )

## Temperature data
We will use the temperature data to give context on the cities in which immigrants are arriving as well as the countries they are coming from

In [19]:
temp_file = '../../data2/GlobalLandTemperaturesByCity.csv'

staging_temp = (spark.
               read.
               option("header", True).
               option("inferSchema", True).
               csv(temp_file))

In [20]:
staging_temp.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [21]:
staging_temp.createOrReplaceTempView("temperature")

spark.sql("""
    SELECT MAX(dt)
    FROM temperature
""").collect()

[Row(max(dt)=datetime.datetime(2013, 9, 1, 0, 0))]

As the temperature data only go as far as 2013, while the immigration data relate to 2016, we will take an average temperature value of the 21st century for each City.
We will also filter to only include countries with a valid country code. We are interested in the mean temperatures for cities in the US and countries elsewhere 

In [22]:
valid_countries = list(country_codes.key.str.strip())
country_temp = (staging_temp.
           filter("dt >= '2000/01/01'").
           filter(staging_temp.Country.isin(valid_countries)).
           groupBy("Country").
           agg(f.mean("AverageTemperature").alias("mean_temperature")))
us_city_temp = (staging_temp.
           filter("dt >= '2000/01/01'").
           filter("Country == 'United States'").
           groupBy("City").
           agg(f.mean("AverageTemperature").alias("mean_temperature")))

In [23]:
us_city_temp.limit(10).toPandas()

Unnamed: 0,City,mean_temperature
0,Worcester,8.890026
1,Charleston,19.56715
2,Corona,16.817961
3,Springfield,12.003608
4,Tempe,21.974248
5,North Las Vegas,18.297157
6,Thornton,9.813471
7,Phoenix,21.974248
8,Hollywood,23.840542
9,Pembroke Pines,23.840542


In [24]:
country_temp.limit(10).toPandas()

Unnamed: 0,Country,mean_temperature
0,Chad,28.036576
1,Russia,4.733811
2,Paraguay,23.626738
3,Yemen,26.730773
4,Senegal,26.820206
5,Sweden,6.881589
6,Guyana,27.464941
7,Burma,26.77733
8,Philippines,27.188468
9,Eritrea,24.945757


We can look at summaries of the mean_temperature value to ensure we have reasonable results

In [25]:
us_city_temp.describe("mean_temperature").collect()

[Row(summary='count', mean_temperature='248'),
 Row(summary='mean', mean_temperature='15.221801703098478'),
 Row(summary='stddev', mean_temperature='4.437567673987982'),
 Row(summary='min', mean_temperature='-1.0546776315789472'),
 Row(summary='max', mean_temperature='23.84054248366012')]

In [26]:
country_temp.describe("mean_temperature").collect()

[Row(summary='count', mean_temperature='149'),
 Row(summary='mean', mean_temperature='19.398531163156033'),
 Row(summary='stddev', mean_temperature='7.5070008952120535'),
 Row(summary='min', mean_temperature='-1.8284473684210532'),
 Row(summary='max', mean_temperature='30.102092105263154')]

## Demographics data
Next, read in the demographics data which outlines the demographics for each US city 

In [27]:
staging_demo = (spark.
           read.
           option("delimiter", ';').
           option("header", True).
           option("inferSchema", True).
           csv("us-cities-demographics.csv"))

As can be seen below, the data storage could be improved by gathering the race data into columns to avoid duplication of the other data

In [28]:
staging_demo.filter("City == 'Silver Spring'").toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,White,37756
2,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Black or African-American,21330
3,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,American Indian and Alaska Native,1084
4,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Asian,8841


In [29]:
staging_demo_race_pivot = (staging_demo.
                           groupBy("City", "State").
                           pivot("Race").
                           agg(f.first("Count")))
staging_demo = (staging_demo.
                groupBy("City", "State").
                agg(f.first("Median Age").alias("median_age"),
                    f.first("Male Population").alias("male_population"),
                    f.first("Female Population").alias("female_population"),
                    f.first("Total Population").alias("total_population"),
                    f.first("Average Household Size").alias("mean_household_size"),
                    f.first("State Code").alias("state_code")
                   )
               ).join(staging_demo_race_pivot, on = ["City", "State"])

In [30]:
# Add the average temperature for US cities
staging_demo = staging_demo.join(us_city_temp, on = ["City"])

Finally, we'll add a row number which creates a unique code for each city

In [31]:
window = Window.orderBy(f.col('City'), f.col('state_code'))
staging_demo = staging_demo.withColumn('city_code', f.row_number().over(window))

In [32]:
staging_demo.limit(10).toPandas()

Unnamed: 0,City,State,median_age,male_population,female_population,total_population,mean_household_size,state_code,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White,mean_temperature,city_code
0,Abilene,Texas,31.3,65212,60664,125876,2.64,TX,1813,2929,14449,33222,95487,17.826222,1
1,Akron,Ohio,38.1,96886,100667,197553,2.24,OH,1845,9033,66551,3684,129192,10.857634,2
2,Albuquerque,New Mexico,36.0,273323,285808,559131,2.49,NM,32243,25140,26774,271854,411847,12.027353,3
3,Alexandria,Virginia,36.6,74989,78522,153511,2.2,VA,1133,13315,37168,25573,106215,13.139941,4
4,Amarillo,Texas,33.8,99391,100260,199651,2.64,TX,4260,8563,14050,65392,174214,16.00949,5
5,Anaheim,California,33.6,179603,171135,350738,3.45,CA,2489,53270,9775,201593,259820,16.817961,6
6,Anchorage,Alaska,32.2,152945,145750,298695,2.77,AK,36339,36825,23107,27261,212696,-1.054678,7
7,Ann Arbor,Michigan,28.1,58789,58281,117070,2.17,MI,1935,18797,9577,5888,90173,9.851516,8
8,Antioch,California,34.0,54733,55809,110542,3.31,CA,3462,14333,23227,35563,51151,15.108948,9
9,Arlington,Texas,32.6,191060,197062,388122,2.89,TX,2918,31804,92955,112078,251155,16.05719,10


## Airport data
Finally, we're going to get the airport data. We will only use this to identify the city of arrival for people in the immigration data. As such, we only need US airports.


In [33]:
airport_file = "airport-codes_csv.csv"
staging_airport = spark.read.option("header", True).option("inferSchema", True).csv(airport_file)
staging_airport = (staging_airport.
             filter("iso_country == 'US'").
             withColumnRenamed("municipality", "City").
             select("iata_code", "Name", "City", "type").
             dropna(subset = ["iata_code"]))
staging_airport = staging_airport.join(staging_demo.select("City", "city_code"), on = ["City"])

In [34]:
staging_airport.limit(10).toPandas()

Unnamed: 0,City,iata_code,Name,type,city_code
0,Abilene,DYS,Dyess Air Force Base,large_airport,1
1,Abilene,ABI,Abilene Regional Airport,medium_airport,1
2,Akron,CAK,Akron Canton Regional Airport,medium_airport,2
3,Akron,AKC,Akron Fulton International Airport,medium_airport,2
4,Akron,AKO,Colorado Plains Regional Airport,small_airport,2
5,Albuquerque,ABQ,Albuquerque International Sunport,large_airport,3
6,Alexandria,ESF,Esler Regional Airport,medium_airport,4
7,Alexandria,AXN,Chandler Field,medium_airport,4
8,Alexandria,AEX,Alexandria International Airport,medium_airport,4
9,Amarillo,TDW,Tradewind Airport,small_airport,5


### ETL
#### Table schema - snowflake

 - Fact table - immigration
  - immigrant_id [int]
  - arrival_city_code [int]
  - airport_code [int]
  - arrival_date [timestamp]
  - departure_date [timestamp]
  - source_country [int]
  - age [int]
  - gender [string]
  
 - Dimension table - city
  - city_code [int]
  - city_name [string]
  - population [int]
  - male_population [int]
  - female_population [int]
  - mean_temperature [float]
  - american_indian_alaska [int]
  - asian [int]
  - black_african_american [int]
  - hispanic_latino [int]
  - white [int]
  
 - Dimension table - airport
  - airport_code [int]
  - airport_name [string]
  - city_code [int]
  - airport_type [string]
  
 - Dimension table - country
  - country_code [int]
  - country_name [string]
  - mean_temperature [float]
  
In order to add arrival city code we'll need to do a join between the immigration staging table and the airport staging table

In [35]:
airport_city_key = staging_airport.select(f.col("iata_code").alias("i94port"), 
                                     f.col("City").alias("arrival_city"),
                                    f.col("city_code"))
immigration_table = (staging_immigration.
                  join(airport_city_key, on = ["i94port"]).
                 select(f.col("cicid").alias("immigrant_id"),
                       f.col("city_code").alias("arrival_city_code"),
                       f.col("i94port").alias("airport_code"),
                       f.col("arrival_date"),
                       f.col("departure_date"),
                       f.col("i94res").alias("source_country_code"),
                       f.col("i94bir").alias("age"),
                       f.col("gender")))

immigration_table.write.mode("overwrite").partitionBy("arrival_city_code", "airport_code").parquet("tables/immigration")

In [36]:
demo_table = (staging_demo.
             select(f.col("city_code"),
                    f.col("state_code"),
                    f.col("City").alias("city_name"),
                    f.col("total_population"),
                    f.col("male_population"),
                    f.col("female_population"),
                    f.col("mean_temperature"),
                    f.col("American Indian and Alaska Native").alias("american_indian_alaska"),
                    f.col("Asian").alias("asian"),
                    f.col("Black or African-American").alias("black_african_american"),
                    f.col("Hispanic or Latino").alias("hispanic_latino"),
                    f.col("White").alias("white")
                   )
             )
demo_table.write.mode("overwrite").parquet("tables/city")

In [37]:
airport_table = staging_airport.select(f.col("iata_code").alias("airport_code"),
                 f.col("Name").alias("airport_name"),
                 f.col("city_code"),
                 f.col("type").alias("airport_type"))

airport_table.write.mode("overwrite").parquet("tables/airport")

In [47]:
country_table = spark.createDataFrame(country_codes.reset_index())

country_table = (country_table.
                 join(country_temp.
                      select(f.col("Country").alias("key"), 
                             "mean_temperature"),
                      on = ["key"]).
                 select(f.col("code").alias("country_code"),
                       f.col("key").alias("country_name"),
                       "mean_temperature")
                )
country_table.write.mode("overwrite").parquet("tables/country")

## Data Quality Checks

In [39]:
immigration_table.createOrReplaceTempView("immigration")
airport_table.createOrReplaceTempView("airport")

In [40]:
# Check the data looks as it should
spark.sql("""
SELECT *
FROM immigration
LIMIT 10
"""
).toPandas()

Unnamed: 0,immigrant_id,arrival_city_code,airport_code,arrival_date,departure_date,source_country_code,age,gender
0,9732.0,211,SNA,2016-04-01,2016-04-03,111.0,30.0,M
1,12310.0,211,SNA,2016-04-01,2016-04-18,114.0,37.0,F
2,16573.0,211,SNA,2016-04-01,2016-04-15,582.0,51.0,M
3,22742.0,211,SNA,2016-04-01,2016-04-04,129.0,48.0,M
4,139792.0,211,SNA,2016-04-01,2016-04-03,575.0,41.0,M
5,140354.0,211,SNA,2016-04-01,2016-04-03,575.0,31.0,F
6,141463.0,211,SNA,2016-04-01,2016-04-06,577.0,36.0,M
7,141464.0,211,SNA,2016-04-01,2016-04-16,577.0,51.0,M
8,141795.0,211,SNA,2016-04-01,2016-04-16,582.0,36.0,F
9,142454.0,211,SNA,2016-04-01,2016-04-11,582.0,42.0,M


In [41]:
# Check no cases where arrival is after departure
spark.sql("""
SELECT *
FROM immigration
WHERE arrival_date > departure_date
"""
).toPandas()

Unnamed: 0,immigrant_id,arrival_city_code,airport_code,arrival_date,departure_date,source_country_code,age,gender


In [45]:
# Check the joins can be performed
spark.sql("""
SELECT airport.airport_name, COUNT(1) AS COUNT
FROM immigration
LEFT JOIN airport
ON immigration.airport_code = airport.airport_code
GROUP BY airport.airport_name
ORDER BY COUNT DESC
LIMIT 10
"""
).toPandas()

Unnamed: 0,airport_name,COUNT
0,Miami International Airport,328470
1,San Fernando Airport,145994
2,Orlando Executive Airport,142804
3,Lakefront Airport,131177
4,William P Hobby Airport,95188
5,Hartsfield Jackson Atlanta International Airport,88566
6,Dallas Love Field,67819
7,General Edward Lawrence Logan International Ai...,53976
8,Seattle Tacoma International Airport,44585
9,Coleman A. Young Municipal Airport,32966


#### 4.3 Data dictionary 
The data dictionary is available in `data_dictionary.md`

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

PySpark was chosen as it is designed for handling large amounts of data quickly.
The flexibility of using pandas/Python syntax or SQL syntax helps facilitate analysis by end users.
PySpark also integrates well with cloud data services. 

### If the data were increased 100x
It would be sensible to move the data storage and preparation to a cloud service such as an EC2 cluster running Redshift. This would allow to parallelise the Spark processes and quickly process and filter the data.

### The data populates a dashboard that must be updated on a daily basis by 7am every day.
An Apache Airflow pipeline could be set up with operators for reading and preparing the data
The data pipeline could integrate dependencies to ensure the data are correctly loaded

`load_data` >>`process_data` >> `store_data` >> `quality_checks` >> `push_to_dashboard`

As speed would be of importance for any dashboard, it may worthwhile considering storing the data in an Apache Cassandra database which may be less efficient in storage as data are duplicated across nodes to ensure availability

### The database needed to be accessed by 100+ people.
A cloud server would be the only option, with an EC2 cluster running Redshift which is able to scale to accommodate the demands.