# i94 data modeling
### Data Engineering Capstone Project

#### Project Summary

This project uses the Udacity provided dataset of i94 updates, along with supplementary data about city demographics, temperatures and airports.

The purpose of this project is to clean up the dataset using spark to write out cleaned dimensional tables in a star schema as parquet files.

This cleaned up table can potentially be used for queries regarding patterns of immigration wrt to geography, time and busiest airports using simple joins between a fact and relevant dimension tables in a star schema.

Following are the stages, we go through in this notebook in setting up the pipeline.

* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
import re
import os
import psycopg2
import pandas as pd
import datetime
import configparser

pd.options.display.max_columns = 90
pd.options.display.max_rows = 90


In [3]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [4]:
os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

In [5]:
MODE='LOCAL' # 'S3'

In [6]:
DATA = config.get(MODE, 'DATA')
DATA_LABELS = config.get(MODE, 'DATA_LABELS')
US_DEMO_DATA = config.get(MODE, 'US_DEMO_DATA')
GLOBAL_TEMP_DATA = config.get(MODE, 'GLOBAL_TEMP_DATA')
AIRPORT_DATA = config.get(MODE, 'AIRPORT_DATA')

OUTPUT_DIR = config.get(MODE, 'OUTPUT_DIR')


### Step 1: Scope the Project and Gather Data

#### Scope 

The goal of this project is to build an ETL pipeline that pulls from five different data sources- a folder with i94 SAS files, its associated label file, and csv files for city temperatures, demographics and airport codes- then cleans the data and  builds dimensional models in a star schema that is optimized for querying.

The final solution will consist of an i94 update fact table that describes an arrival date, and departure date pair (if departure has occured, else None) for a set of admission numbers. This fact table can be joined with dimension tables that store details about the city and date of arrival/departure, as well as details of the admission number holder.

We will primarily use Spark SQL to build the ETL pipeline, due to the potential size of final data and given that the  cleanup required can be quite heavy and that the original data isn't particularly normalized well.

#### Describe and Gather Data 

- [I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html): this dataset is available for purchase from US National Tourism and Trade Office, and a sample of it has been provided by Udacity. It includes details on arrival and departure date pairs for i94 admission numbers. The i94 is valid from time of entry in US to exit, and updates are also recorded when the departure happens to Canada/ Mexico. 

- [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/): this dataset comes from OpenSoft and contains city wise statistics for each race.

- [Airport Code Table](https://datahub.io/core/airport-codes#data): this dataset comes from datahub.io and includes city, coordinates and elevation for airport codes (IATA and gps).

- [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data): this dataset contains the average temperature at a given city on a particular date.

In [7]:
# Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()


In [8]:
# Pyspark imports
from pyspark.sql.functions import udf, col, monotonically_increasing_id, isnan, when, count, countDistinct, split, upper
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import (
    StructType as R, StructField as Fld, 
    DecimalType as Decimal, DoubleType as Dbl, 
    StringType as Str, 
    ShortType as Short, IntegerType as Int, LongType as Long, 
    DateType as Date ,
    TimestampType as TimeStamp,
)

Loading a sample file from i94 folder into a spark dataframe and getting a spark SQL temp view:

In [9]:
df_spark = spark.read.format('com.github.saurfang.sas.spark').load(DATA)

In [9]:
!rm -rf sas_data

In [10]:
df_spark.write.parquet("sas_data")

In [11]:
df_spark=spark.read.parquet("sas_data")
df_spark.createOrReplaceTempView("staging_i94data")

Extracting the codes-value lookup for the i94 table columns, from the SAS labels file.

In [42]:
def extract_sas_value(sas_file, value_name):
    """
    Extract key-value pairs for a given SAS variable (value) from the SAS file
    :param sas_file: name of the SAS label file
    :type sas_file: str
    :param value_name: name of the SAS variable (value) without the $ prefix
    :type value_name: str
    :return: list of key-value pairs for all possible codes and their details
    :rtype: list
    """
    capturing = False
    header_re = re.compile(r'(value\s+\$?{})'.format(value_name))
    k_v_re = re.compile(r"'?(?P<code>[.\w\s]+)'?\s*=\s*'(?P<value>[:&/#\w.,\s()-]+)'\s*(?P<sep>;?)")
    k_v_pairs = list()
    with open(sas_file, 'r') as file:
        for line in file:
            line = line.strip()
            if capturing: # If header was encountered and terminating semicolon wasn't encountered yet
                k_v_match = k_v_re.match(line) 
                if k_v_match: # Extract code and value and append to k_v_pairs list
                    k_v_pairs.append({'code': str(k_v_match.group('code')).strip(), 'value': str(k_v_match.group('value')).strip()})
                    if k_v_match.group('sep') == ';': # If terminating semicolon after the last k-v pair
                        capturing = False
                        break
                elif line.startswith(';'): # If terminating semicolon alone present in the line
                    capturing = False
                    break
                else:
                    raise ValueError("Cannot parse SAS file. The line {} does not match pattern {}".format(line, k_v_re.pattern))
            else:
                if header_re.match(line):
                    capturing = True
        if capturing == True: # Did not encounter terminating semicolon at end of file
            raise ValueError("Incomplete SAS file")
    return k_v_pairs

In [43]:
df_i94cntyl = spark.createDataFrame(pd.DataFrame(extract_sas_value(DATA_LABELS, "i94cntyl")))
df_i94prtl = spark.createDataFrame(pd.DataFrame(extract_sas_value(DATA_LABELS, "i94prtl")))
df_i94model = spark.createDataFrame(pd.DataFrame(extract_sas_value(DATA_LABELS, "i94model")))
df_i94addrl = spark.createDataFrame(pd.DataFrame(extract_sas_value(DATA_LABELS, "i94addrl")))

In [44]:
# Create the visa type lookup manually since it was specified in comment
df_i94visa = spark.createDataFrame(pd.DataFrame([{'code': '1', 'value': 'Business'}, {'code': '2', 'value': 'Pleasure'}, {'code': '3', 'value': 'Student'}]))

Loading other data sources in their entirety into spark.

In [15]:
df_airport = spark.read.csv(AIRPORT_DATA, header="true")
df_airport.createOrReplaceTempView("staging_airport_codes")

In [78]:
df_us_city_demo = spark.read.csv(US_DEMO_DATA, header="true", sep=';')
df_us_city_demo.createOrReplaceTempView("staging_us_city_demo")

In [27]:
df_city_temp = spark.read.csv(GLOBAL_TEMP_DATA, header="true")
df_city_temp.createOrReplaceTempView("staging_city_temp")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Let's explore each of the dataframes using printSchema, head, describe, and countDistinct calls.

****staging_i94data****

In [18]:
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [19]:
df_spark.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,1.0,20160430,SYD,,G,O,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,1.0,20160430,SYD,,G,O,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


Schema is auto-inferred from metadata in sas, and some of the datatypes aren't accurate. Our ideal schema will be as shown below. 

In [20]:
i94_schema = R([
    Fld("cicid", Long(), nullable=False),  
    Fld("i94year", Short()),
    Fld("i94mon", Short()),
    Fld("i94cit", Short()),
    Fld("i94res", Short()),    
    Fld("i94port", Str()),
    Fld("arrdate", Int()),
    Fld("i94mode", Short()),    
    Fld("i94addr", Str()),   
    Fld("depdate", Int()),
    Fld("i94bir", Dbl()),
    Fld("i94visa", Short()),
    Fld("count", Dbl()),
    Fld("dtadfile", Long()),
    Fld("visapost", Str()),
    Fld("occup", Str()),
    Fld("entdepa", Str()),
    Fld("entdepd", Str()),    
    Fld("entdepu", Str()),   
    Fld("matflag", Str()),
    Fld("biryear", Short()),
    Fld("dtaddto", Long()),
    Fld("gender", Str()),
    Fld("insnum", Str()),
    Fld("airline", Str()),
    Fld("admnum", Long()),    
    Fld("fltno", Str()),
    Fld("visatype", Str()),
])


Also, arrdate and depdate are specified as SAS numeric data field which seems to be specifying the number of days elapsed since Jan 1, 1960. We can convert these two dates to Spark DateType and add a separate table for DateType to dayofweek, year, month etc.

In [21]:
df_spark.describe().toPandas()


Unnamed: 0,summary,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,count,3096313.0,3096313.0,3096313.0,3096313.0,3096313.0,3096313,3096313.0,3096074.0,2943721,2953856.0,3095511.0,3096313.0,3096313.0,3096312.0,1215063,8126,3096075,2957884,392,2957884,3095511.0,3095836,2682044,113708,3012686,3096313.0,3076764,3096313
1,mean,3078651.879075533,2016.0,4.0,304.9069344733559,303.28381949757664,,20559.84854179794,1.0736897761487614,51.652482269503544,20573.95283554784,41.767614458485205,1.8453925685161676,1.0,20160424.766168267,999.0,885.675,,,,,1974.2323855415148,8291120.333841449,,4131.050016327899,59.477601493233784,70828850110.76167,1360.2463696420555,
2,stddev,1763278.0997498818,0.0,3.5220513889588873e-16,210.02688853063336,208.583212927889,,8.77733947482668,0.5158963131657286,42.979062313709846,29.35696848162746,17.420260534588273,0.3983910200540981,8.805128472397217e-17,50.0151344998323,0.0,264.6551105950961,,,,,17.420260534588174,1656502.4244924926,,8821.743471773656,172.63339952061742,22154415947.55769,5852.676345633787,
3,min,6.0,2016.0,4.0,101.0,101.0,5KE,20545.0,1.0,..,15176.0,-3.0,1.0,1.0,20130811.0,999,049,A,D,U,M,1902.0,/ 183D,F,0,*FF,0.0,00000,B1
4,max,6102785.0,2016.0,4.0,999.0,760.0,YSL,20574.0,9.0,ZU,45427.0,114.0,3.0,1.0,20160919.0,ZZZ,WTR,Z,W,Y,M,2019.0,D/S,X,YM0167,ZZ,99915565930.0,ZZZ,WT


The following fields seem to have a lot of missing values - i94mode, i94addr, depdate, i94bir, visapost, occup, entdepa, entdepd, entdepu, matflag, dtadfile, biryear, dtaddto, gender, insnum, airline, fltno.

Missing depdate makes sense since not everyone who arrived might have left US, so we will make it equal to 0 (start of epoch) to denote missing value, and we will filter out this date (We know we will never have a depdate of 1/1/1960 unless we ingest a datafile that is for 1960)

i94bir and biryear, convey the same information and both of them have the same number of missing values. We will hold onto just one of these.

Since visapost, dtadfile, occup, entdepa, entdepd, entdepu, and dtaddto are not used by CIC and they all have significant missing values, we will drop these.

insnum seems to have several values missing and doesn't appear significant enough to our analysis so we will drop this column. 


That leaves us with the following columns:

In [22]:
i94_cols_some_missing = ['depdate', 'i94mode', 'i94addr', 'matflag', 'biryear', 'gender', 'airline', 'fltno'] 

i94_cols_complete = ['cicid', 'admnum', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'i94visa', 'visatype', 'arrdate']

i94_cols = i94_cols_complete + i94_cols_some_missing

We will fill the appropriate value for each of the missing columns, else 0

Let's investigate these for unique counts.

In [23]:
df_spark.select([countDistinct(c).alias(c) for c in i94_cols]).toPandas()

Unnamed: 0,cicid,admnum,i94yr,i94mon,i94cit,i94res,i94port,i94visa,visatype,arrdate,depdate,i94mode,i94addr,matflag,biryear,gender,airline,fltno
0,3096313,3075579,1,1,243,229,299,3,17,30,235,4,457,1,112,4,534,7152


Let's investigate one instance of duplicate admnum

In [24]:
spark.sql("""
    SELECT CAST(admnum AS BIGINT), COUNT(admnum)
     FROM staging_i94data
     GROUP BY admnum
     HAVING COUNT(admnum) > 1
""").limit(5).toPandas()

Unnamed: 0,admnum,count(admnum)
0,59171236833,2
1,59171145033,2
2,42327034033,2
3,43877718733,2
4,42354885033,2


In [25]:
# Investigating 59171236833 from example above
spark.sql("""
    SELECT *
    FROM staging_i94data
    WHERE CAST(admnum AS BIGINT) = 59171236833
""").limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5754147.0,2016.0,4.0,254.0,276.0,TOR,20574.0,1.0,NY,20579.0,33.0,1.0,1.0,20160430,,,H,O,,M,1983.0,7222016,F,,RS,59171240000.0,7612,WB
1,4547695.0,2016.0,4.0,254.0,276.0,NYC,20568.0,1.0,NY,20571.0,33.0,1.0,1.0,20160424,,,G,O,,M,1983.0,7222016,F,,KE,59171240000.0,81,WB


Looks like this admnum is for a female, who arrived at NYC  and departed three days later and arrived in toronto 3 days later and departed 5 days later. So, duplicate admnum entries are caused by arrival and departure to Canada, Mexico, but for the same person. So, we can move admnum, biryear, gender to a separate dimension table, as these won't change for a given admission number (whereas visa / address related information might and arrival/departure will for each new cicid). That leaves us with date, geography and visa related updates in our fact table, which can be joined the dimension table having admnum as primary key. i94yr and i94mon will continue to serve as partition keys for our fact table.

In [26]:
df_i94cntyl.limit(2).toPandas()

Unnamed: 0,code,value
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN


i94cit and i94res codes map to countries, which could also be a column in other dimension tables. Rather than inject the i94 country code into all potential dimension tables, or have a three way join using the df_i94cntyl lookup, we will simply convert i94cit and res field into countries by joining while creating our fact table (invalid codes will not match and can be dealt with on a per-query basis). 

In [27]:
df_i94prtl.limit(2).toPandas()

Unnamed: 0,code,value
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"


Similarly, the 3 letter city code used in i94port only matches with iata code in airport table, but has no equivant in the city tables. So, we will keep the 3 letter port but to denormalize but will also denormalize it city name and state code (only for US cities) to join with US demographics table and global temperature tables.

In [28]:
df_i94addrl.limit(2).toPandas()

Unnamed: 0,code,value
0,AL,ALABAMA
1,AK,ALASKA


i94addrl uses two letter state code which is pretty commonplace, so we will keep this as a separate dimension table. 

Similarly, i94visa and i94model, while useful for other queries, don't need to serve as join keys with other dimensions so we will keep them normalized.

****airport_codes****

In [29]:
df_airport.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [30]:
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


Our ideal schema would be as shown below

In [31]:
airport_schema = R([
    Fld("ident", Str()),  
    Fld("type", Str()),
    Fld("name", Str()),
    Fld("elevation_ft", Int()),
    Fld("continent", Str()),    
    Fld("iso_country", Str()),
    Fld("iso_region", Str()),
    Fld("municipality", Str()),
    Fld("gps_code", Str()),
    Fld("iata_code", Str()),    
    Fld("local_code", Str()),
    Fld("coordinates", Str()),
])

coordinates is not normalized, and it is not useful for our intended nature of queries so we will drop this column.

In [32]:
df_airport.describe().toPandas()

Unnamed: 0,summary,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,count,55075,55075,55075,48069.0,55075,55075,55075,49399,41030,9189,28686,55075
1,mean,2.3873375337777779E8,,,1240.7896773388254,,,,,2.1920446610204083E8,0.0,8.580556178571428E7,
2,stddev,9.492375382267495E8,,,1602.3634593484112,,,,,9.1123224377024E8,0.0,5.747026415216717E8,
3,min,00A,balloonport,"""""""Der Dingel"""" Airfield""",-1.0,AF,AD,AD-04,'S Gravenvoeren,0000,-,-,"-0.004722000099718571, 9.425000190734863"
4,max,spgl,small_airport,Çá¸¾á¸á¸ á¸®á¸Ç{+91-9680118734} GiRLFRieNd...,999.0,SA,ZZ,ZZ-U-A,Å½ocene,ZYYY,ZZV,ZZV,"99.9555969238, 8.47115039825"


In [33]:
df_airport.select([countDistinct(c).alias(c) for c in df_airport.columns]).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,55075,7,52144,5449,7,244,2810,27133,40850,9042,27436,54874


As we see, the primary key for this table as names of aiport need not be unique. There are only 9189 airports in this table which have IATA codes that would map to a i94 port directly. While this may seem like we are not using bulk of the airports in this table, should we join using iata_code, aggregating the elevation to any other higher level such as city or country does not make sense. 

We do see that there are some duplicates in iata_code column so we need to investigate what we need to do with those.


In [34]:
spark.sql("""
    SELECT iata_code, COUNT(iata_code)
     FROM staging_airport_codes
     GROUP BY iata_code
     HAVING COUNT(iata_code) > 1
""").limit(5).toPandas()

Unnamed: 0,iata_code,count(iata_code)
0,CLG,2
1,MUP,2
2,CMN,2
3,SVD,2
4,IST,2


In [35]:
# Investigating CLG from example above
spark.sql("""
    SELECT *
    FROM staging_airport_codes
    WHERE iata_code = "CLG"
""").limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,CLG,closed,Coalinga Airport,,,US,US-CA,,,CLG,,"-120.360116959, 36.1580433385"
1,KC80,small_airport,New Coalinga Municipal Airport,622.0,,US,US-CA,Coalinga,,CLG,C80,"-120.29399871826172, 36.16310119628906"


In this instance, the duplicate is because one of the entries is no longer valid as the airport is closed. Let's filter this table to remove all closed airports and check the duplicate count again.

In [36]:
df_airport = df_airport.filter('type != "closed"')
df_airport.createOrReplaceTempView("staging_airport_codes")

In [37]:
df_airport.describe().toPandas()

Unnamed: 0,summary,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,count,51469,51469,51469,45455.0,51469,51469,51469,46431,40286,8910,28187,51469
1,mean,2.4983764688372093E8,,,1253.1317346826531,,,,,2.2853231572340426E8,0.0,8.649200628E7,
2,stddev,9.701128365731996E8,,,1616.855980247137,,,,,9.296601891065612E8,0.0,5.769634728494486E8,
3,min,00A,balloonport,"""""""Der Dingel"""" Airfield""",-1.0,AF,AD,AD-04,108 Mile,00A,-,-,"-0.004722000099718571, 9.425000190734863"
4,max,spgl,small_airport,Çá¸¾á¸á¸ á¸®á¸Ç{+91-9680118734} GiRLFRieNd...,999.0,SA,ZW,ZW-U-A,Å½ocene,ZYYY,ZZV,ZZV,"99.9555969238, 8.47115039825"


In [38]:
df_airport.select([countDistinct(c).alias(c) for c in df_airport.columns]).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,51469,6,48807,5396,7,243,2756,25840,40149,8798,27001,51299


In [39]:
spark.sql("""
    SELECT iata_code, COUNT(iata_code)
     FROM staging_airport_codes
     GROUP BY iata_code
     HAVING COUNT(iata_code) > 1
""").limit(5).toPandas()

Unnamed: 0,iata_code,count(iata_code)
0,CMN,2
1,IST,2
2,PCO,2
3,IZA,2
4,ULG,2


In [40]:
# Investigating CMN from example above
spark.sql("""
    SELECT *
    FROM staging_airport_codes
    WHERE iata_code = "CMN" OR iata_code = "IZA"
""").limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,CMN,large_airport,CMN,,AF,MA,MA-CAS,,,CMN,,"0, 0"
1,GMMN,large_airport,Mohammed V International Airport,656.0,AF,MA,MA-CAS,Casablanca,GMMN,CMN,,"-7.589970111846924, 33.36750030517578"
2,SBZM,medium_airport,Presidente Itamar Franco Airport,1348.0,SA,BR,BR-MG,Juiz de Fora,SBZM,IZA,,"-43.173069, -21.513086"
3,SDZY,medium_airport,Zona da Mata Regional Airport,1348.0,SA,BR,BR-MG,Juiz De Fora,SBZM,IZA,,"-43.1730575562, -21.5130558014"


We see that these duplicates are mostly the result of duplicate entries for same airport or incorrect entries. Since we are interested in airport type, and elevation from query standpoint, we will filter out entries that are missing these values and then drop duplicates in iata_code after that.

****staging_us_city_demo****

In [41]:
df_us_city_demo.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [42]:
df_us_city_demo.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


This would be the ideal schema we would refer to when copying from this table.

In [43]:
us_city_demo_schema = R([
    Fld("City", Str()),  
    Fld("State", Str()),
    Fld("Median Age", Dbl()),
    Fld("Male Population", Long()),
    Fld("Female Population", Long()),    
    Fld("Total Population", Long()),
    Fld("Number of Veterans", Long()),
    Fld("Foreign-born", Long()),
    Fld("Average Household Size", Dbl()),
    Fld("State Code", Str()),    
    Fld("Race", Str()),
    Fld("Count", Long()),
])

In [44]:
df_us_city_demo.describe().toPandas()

Unnamed: 0,summary,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,count,2891,2891,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891,2891,2891.0
1,mean,,,35.49488066413016,97328.4262465374,101769.6308864266,198966.77931511588,9367.832522585128,40653.59867963864,2.742542608695655,,,48963.77447250087
2,stddev,,,4.401616730099886,216299.93692873296,231564.5725714828,447555.9296335903,13211.21992386408,155749.1036650984,0.4332910878973046,,,144385.58856460615
3,min,Abilene,Alabama,22.9,100135.0,100260.0,100247.0,10001.0,10024.0,2.0,AK,American Indian and Alaska Native,100055.0
4,max,Yuma,Wisconsin,70.5,99967.0,99430.0,99897.0,9988.0,9929.0,4.98,WI,White,99948.0


In [45]:
df_us_city_demo.select([countDistinct(c).alias(c) for c in df_us_city_demo.columns]).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,567,49,180,593,594,594,577,587,161,49,5,2785


There are only 5 distinct races and it appears that data per row shows information per race, per city (since total rows is roughly 5 * no of distinct cities ). We will confirm this by looking at one city.

In [46]:
df_us_city_demo.filter('city == "Johnson City"').toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Johnson City,Tennessee,38.2,31019,34350,65369,5038,2878,2.18,TN,Asian,1877
1,Johnson City,Tennessee,38.2,31019,34350,65369,5038,2878,2.18,TN,American Indian and Alaska Native,400
2,Johnson City,Tennessee,38.2,31019,34350,65369,5038,2878,2.18,TN,White,59147
3,Johnson City,Tennessee,38.2,31019,34350,65369,5038,2878,2.18,TN,Hispanic or Latino,1114
4,Johnson City,Tennessee,38.2,31019,34350,65369,5038,2878,2.18,TN,Black or African-American,6016


This confirms that all of the columns except count and race columns are duplicated. It would therefore make sense to pivot on race column. We will then convert the city name to uppercase, so we can join with our fact table on city name, state code only for US. Joining on the state code for i94addrl or the state name would also make sense, but these can be easily derived from the city level demographic table using averages for the aggregation, so we will keep this table atomic to the city level.

****staging_city_temp****

In [47]:
df_city_temp.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [48]:
df_city_temp.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


This would be our preferred schema for the city temperature table.

In [49]:
city_temp_schema = R([
    Fld("dt", Date()),  
    Fld("AverageTemperature", Dbl()),
    Fld("AverageTemperatureUncertainty", Dbl()),
    Fld("City", Str()),
    Fld("Country", Str()),    
    Fld("Latitude", Str()),
    Fld("Longitude", Str()),
])

In [50]:
df_city_temp.describe().toPandas()

Unnamed: 0,summary,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,count,8599212,8235082.0,8235082.0,8599212,8599212,8599212,8599212
1,mean,,16.72743263625098,1.0285747414537,,,,
2,stddev,,10.353442482534351,1.1297332887133802,,,,
3,min,1743-11-01,-0.0009999999999994,0.034,A Coruña,Afghanistan,0.80N,0.00W
4,max,2013-09-01,9.999,9.998,Ürümqi,Zimbabwe,8.84S,99.91E


In [51]:
df_city_temp.select([countDistinct(c).alias(c) for c in df_city_temp.columns]).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,3239,113163,10902,3448,159,73,1227


Since the main value add for this table is the average temperature, we will drop nulls in the average temperature column. We will then join on dt, city and country with the fact table, to get temperature information for the city of immigration. Similar to elevation, aggregating to state or country level doesn't make sense.

Again we will drop the latitude and longitude as we can't join them directly to the fact table.

#### Cleaning Steps

****staging_i94data****

- Fill na

In [52]:
# ['depdate', 'i94mode', 'i94addr', 'matflag', 'gender', 'airline', 'fltno'] 
df_spark = df_spark.na.fill({'depdate': 0.0, 'i94mode': 0.0, 'matflag': 'N', 'gender': 'U', 'airline': 'unknown', 'fltno': 'unknown'})

- Change arrdate and depdate to DateType objects

In [53]:
# have to account for depdate nulls
get_date = udf(lambda x: datetime.datetime(1960,1,1) + datetime.timedelta(days=int(x)), Date())

In [54]:
df_spark = df_spark.withColumn('arrival_date', get_date(df_spark.arrdate)).withColumn('departure_date', get_date(df_spark.depdate))

- Denormalize i94cit and i94res columns

In [55]:
df_spark = df_spark.join(df_i94cntyl, df_spark.i94cit == df_i94cntyl.code).drop('code', 'i94cit').withColumnRenamed('value', 'i94_cit')
df_spark = df_spark.join(df_i94cntyl, df_spark.i94res == df_i94cntyl.code).drop('code', 'i94res').withColumnRenamed('value', 'i94_res')

- Denormalize i94port

In [56]:
@udf(Str())
def get_i94_port_state(prt):
    us_state_code_re = re.compile(r'(?P<state_code>[A-Z]{2}$)')
    us_state_code_with_brackets_re = re.compile(r'(?P<state_code>[A-Z]{2})\s\(')
    port_arr = str(prt).split(',')
    if len(port_arr) == 1:
        return "{};unknown;unknown".format(port_arr[0])
    if len(port_arr) == 2:
        m = us_state_code_re.match(port_arr[1].strip())
        if m:
            return "{};{};UNITED STATES".format(port_arr[0], m.group('state_code'))
        m = us_state_code_with_brackets_re.match(port_arr[1].strip())
        if m:
            return "{};{};UNITED STATES".format(port_arr[0], m.group('state_code'))
        return "{};unknown;{}".format(port_arr[0], port_arr[1])
    if len(port_arr) == 3:
        return "{};{};{}".format(port_arr[0], port_arr[1], port_arr[2])

In [57]:
df_i94prtl = df_i94prtl.withColumn('temp', get_i94_port_state(df_i94prtl.value))

In [58]:
split_i94_port_col = split(df_i94prtl.temp, ';')

In [59]:
df_i94prtl = df_i94prtl.withColumn('i94_port_city', split_i94_port_col.getItem(0)).withColumn('i94_port_state_code', split_i94_port_col.getItem(1)).withColumn('i94_port_country', split_i94_port_col.getItem(2)).drop('temp')

In [60]:
df_i94prtl.limit(5).toPandas()

Unnamed: 0,code,value,i94_port_city,i94_port_state_code,i94_port_country
0,ALC,"ALCAN, AK",ALCAN,AK,UNITED STATES
1,ANC,"ANCHORAGE, AK",ANCHORAGE,AK,UNITED STATES
2,BAR,"BAKER AAF - BAKER ISLAND, AK",BAKER AAF - BAKER ISLAND,AK,UNITED STATES
3,DAC,"DALTONS CACHE, AK",DALTONS CACHE,AK,UNITED STATES
4,PIZ,"DEW STATION PT LAY DEW, AK",DEW STATION PT LAY DEW,AK,UNITED STATES


In [61]:
df_spark = df_spark.join(df_i94prtl, df_spark.i94port == df_i94prtl.code).drop('code', 'value')

In [62]:
df_spark.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,arrival_date,departure_date,i94_cit,i94_res,i94_port_city,i94_port_state_code,i94_port_country
0,4041803.0,2016.0,4.0,BGM,20566.0,1.0,CA,20581.0,49.0,1.0,1.0,20160422,BEN,,G,R,,M,1967.0,10212016,F,,*GA,94264470000.0,OEIPW,B1,2016-04-22,2016-05-07,GERMANY,GERMANY,BANGOR,ME,UNITED STATES
1,4041804.0,2016.0,4.0,BGM,20566.0,1.0,CA,0.0,38.0,1.0,1.0,20160422,FRN,,G,,,N,1978.0,10212016,M,,*GA,94265530000.0,OEIPW,B1,2016-04-22,1960-01-01,GERMANY,GERMANY,BANGOR,ME,UNITED STATES
2,4041805.0,2016.0,4.0,BGM,20566.0,1.0,CA,20581.0,45.0,1.0,1.0,20160422,FRN,,G,R,,M,1971.0,10212016,M,,*GA,94264770000.0,OEIPW,B1,2016-04-22,2016-05-07,GERMANY,GERMANY,BANGOR,ME,UNITED STATES
3,4041806.0,2016.0,4.0,BGM,20566.0,1.0,CA,20581.0,25.0,1.0,1.0,20160422,MUN,,G,R,,M,1991.0,10212016,M,,*GA,94264620000.0,OEIPW,B1,2016-04-22,2016-05-07,GERMANY,GERMANY,BANGOR,ME,UNITED STATES
4,452706.0,2016.0,4.0,BGM,20547.0,1.0,VA,20549.0,38.0,1.0,1.0,20160403,OSL,,G,R,,M,1978.0,10022016,M,,*GA,92692550000.0,MBAEP,B1,2016-04-03,2016-04-05,NORWAY,NORWAY,BANGOR,ME,UNITED STATES


In [63]:
# Update SQL view after cleanup
df_spark.write.parquet(OUTPUT_DIR + 'staging_i94data')

****staging_airport_codes****

- Filter rows missing iata_code, elevation and drop duplicates on iata code

In [64]:
df_airport = df_airport.filter(df_airport.iata_code.isNotNull())

In [65]:
df_airport = df_airport.filter(df_airport.elevation_ft.isNotNull())

In [66]:
df_airport = df_airport.dropDuplicates(['iata_code'])

In [67]:
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,VAAK,medium_airport,Akola Airport,999,AS,IN,IN-MM,Akola,VAAK,AKD,,"77.058601, 20.698999"
1,KBGM,medium_airport,Greater Binghamton/Edwin A Link field,1636,,US,US-NY,Binghamton,KBGM,BGM,BGM,"-75.97979736, 42.20869827"
2,YBRL,small_airport,Borroloola Airport,55,OC,AU,AU-NT,,YBRL,BOX,,"136.302001953125, -16.075300216674805"
3,2TE0,small_airport,Eagle Air Park,15,,US,US-TX,Brazoria,2TE0,BZT,2TE0,"-95.579696655273, 28.982200622559"
4,YPCC,medium_airport,Cocos (Keeling) Islands Airport,10,AS,CC,CC-U-A,Cocos (Keeling) Islands,YPCC,CCK,,"96.8339004517, -12.1883001328"


In [103]:
df_airport.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [68]:
df_airport.write.parquet(OUTPUT_DIR + 'staging_airport_codes')

****staging_us_city_demo****

- Pivot race column

In [79]:
df_us_city_demo = df_us_city_demo.withColumn("Count", df_us_city_demo.Count.cast(Long()))

In [80]:
df_us_city_demo = df_us_city_demo.groupBy("City", "State", "Median Age", "Male Population", "Female Population", "Total Population", "Number of Veterans", "Foreign-born", "Average Household size", "State Code").pivot("Race").sum("Count")

In [81]:
for col in df_us_city_demo.columns:
    df_us_city_demo = df_us_city_demo.withColumnRenamed(col, col.lower().replace(" ", "_").replace("-", "_"))

In [82]:
df_us_city_demo.filter('city == "Johnson City"').toPandas()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,american_indian_and_alaska_native,asian,black_or_african_american,hispanic_or_latino,white
0,Johnson City,Tennessee,38.2,31019,34350,65369,5038,2878,2.18,TN,400,1877,6016,1114,59147


In [83]:
df_us_city_demo.write.parquet(OUTPUT_DIR + 'staging_us_city_demo')

****staging_city_temp****

In [76]:
df_city_temp.count()

8599212

In [77]:
df_city_temp = df_city_temp.filter(df_city_temp.AverageTemperature.isNotNull())

In [78]:
df_city_temp.count()

8235082

In [79]:
df_city_temp.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1744-04-01,5.787999999999999,3.624,Århus,Denmark,57.05N,10.33E
2,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
3,1744-06-01,14.050999999999998,1.347,Århus,Denmark,57.05N,10.33E
4,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E


In [88]:
df_city_temp.write.parquet(OUTPUT_DIR + 'staging_city_temp')

#### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The final data model consists of one fact table - i94_event_table and the following dimension tables

- admissions_table (join on admnum = admnum)
- i94_modes_table (join on code =  i94modes)
- visa_codes_table (join on code = i94visa)
- arrivals_date_table (join on dt = arrival_date)
- departure_date_table (join on dt = departure_date)
- airport_table (join on iata_code = i94port)
- us_city_demo_table (join on city = i94_port_city & state_code = i94_port_state_code where country = "United States" )
- city_temp_table (join on city = i94_port_city & dt = arrival_date  / dt = departure_date)

The schema for each of these tables is printed before writing out to parquet.

This star schema was chosen to denormalize it for analysis, which keeping each dimension atomic to the level which can be derived from the data source.

#### 3.2 Mapping Out Data Pipelines
The data pipeline consists of the following steps-
 - Clean the data from source and write the staging tables into parquet ( done in previous section)
 - Load the staging tables into spark and extract the values for each dimension table and fact table with appropriate type-casting.
 - Write the final tables out to parquet.
 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [30]:
df_spark = spark.read.parquet(OUTPUT_DIR + "staging_i94data")

##### Fact Table

****i94_event_table****

In [31]:
i94_event_table = df_spark.select(
                    df_spark.cicid.cast(Long()),
                    df_spark.i94yr.cast(Short()),
                    df_spark.i94mon.cast(Short()),
                    df_spark.i94port,
                    df_spark.i94_port_city,
                    df_spark.i94_port_state_code,
                    df_spark.i94_port_country,
                    df_spark.i94mode.cast(Short()),
                    df_spark.i94addr,
                    df_spark.i94visa.cast(Short()),
                    df_spark.matflag,
                    df_spark.airline,
                    df_spark.admnum.cast(Long()),    
                    df_spark.fltno,
                    df_spark.visatype,
                    df_spark.arrival_date,
                    df_spark.departure_date,
                    df_spark.i94_cit,
                    df_spark.i94_res
)

In [32]:
i94_event_table.printSchema()

root
 |-- cicid: long (nullable = true)
 |-- i94yr: short (nullable = true)
 |-- i94mon: short (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94_port_city: string (nullable = true)
 |-- i94_port_state_code: string (nullable = true)
 |-- i94_port_country: string (nullable = true)
 |-- i94mode: short (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- i94visa: short (nullable = true)
 |-- matflag: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: long (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- i94_cit: string (nullable = true)
 |-- i94_res: string (nullable = true)



In [33]:
i94_event_table.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94port,i94_port_city,i94_port_state_code,i94_port_country,i94mode,i94addr,i94visa,matflag,airline,admnum,fltno,visatype,arrival_date,departure_date,i94_cit,i94_res
0,4858657,2016,4,MND,MINOT,ND,UNITED STATES,1,CA,2,M,*GA,94621955130,N45NY,B2,2016-04-26,2016-06-07,HUNGARY,HUNGARY
1,5205011,2016,4,NYC,NEW YORK,NY,UNITED STATES,1,IL,2,M,BA,59404336933,00295,WT,2016-04-28,2016-05-06,FINLAND,FINLAND
2,514902,2016,4,NYC,NEW YORK,NY,UNITED STATES,1,IL,2,M,PD,92660857830,00389,B2,2016-04-03,2016-04-29,MONGOLIA,MONGOLIA
3,5205012,2016,4,NYC,NEW YORK,NY,UNITED STATES,1,IL,2,M,BA,59404617333,00295,WT,2016-04-28,2016-05-06,FINLAND,FINLAND
4,514903,2016,4,NYC,NEW YORK,NY,UNITED STATES,1,LA,1,M,CX,92689435830,00890,B1,2016-04-03,2016-05-07,MONGOLIA,MONGOLIA


In [39]:
i94_event_table.write.parquet(OUTPUT_DIR + 'i94_event_table')

##### Dimension Tables

****admissions_table****

In [18]:
# Create admissions dimension table

In [19]:
admissions_table = df_spark.groupBy('admnum', 'biryear', 'gender').count().drop('count')

In [20]:
admissions_table = admissions_table.withColumn("admnum", admissions_table.admnum.cast(Long())).withColumn("biryear", admissions_table.biryear.cast(Short()))

In [21]:
admissions_table.printSchema()

root
 |-- admnum: long (nullable = true)
 |-- biryear: short (nullable = true)
 |-- gender: string (nullable = true)



In [22]:
admissions_table.limit(5).toPandas()

Unnamed: 0,admnum,biryear,gender
0,56404026333,1976,F
1,56438878033,1953,F
2,94924819030,1983,M
3,56096764233,1962,M
4,92608941430,1950,F


In [23]:
admissions_table.write.parquet(OUTPUT_DIR + 'admissions_table')

****i94_modes_table****

In [50]:
i94_modes_table = df_i94model.select(
    df_i94model.code.cast(Short()),
    df_i94model.value,            
)

In [51]:
i94_modes_table.printSchema()

root
 |-- code: short (nullable = true)
 |-- value: string (nullable = true)



In [106]:
i94_modes_table.limit(5).toPandas()

Unnamed: 0,code,value
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [52]:
i94_modes_table.write.parquet(OUTPUT_DIR + 'i94_modes_table')

****visa_codes_table****

In [46]:
visa_codes_table = df_i94visa.select(
    df_i94visa.code.cast(Short()),
    df_i94visa.value,            
)

In [49]:
visa_codes_table.printSchema()

root
 |-- code: short (nullable = true)
 |-- value: string (nullable = true)



In [105]:
visa_codes_table.limit(5).toPandas()

Unnamed: 0,code,value
0,1,Business
1,2,Pleasure
2,3,Student


In [47]:
visa_codes_table.write.parquet(OUTPUT_DIR + 'visa_codes_table')

****arrivals_date_table****

In [118]:
arrival_table = df_spark.selectExpr('arrival_date AS dt', 
                                 'dayofmonth(arrival_date) AS day', 
                                 'weekofyear(arrival_date) AS week',
                                 'month(arrival_date) AS month',
                                 'year(arrival_date) AS year',
                                 'dayofweek(arrival_date) AS weekday'                                          
                                       )

In [119]:
arrival_table.printSchema()

root
 |-- dt: date (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [120]:
arrival_table.limit(5).toPandas()

Unnamed: 0,dt,day,week,month,year,weekday
0,2016-04-26,26,17,4,2016,3
1,2016-04-28,28,17,4,2016,5
2,2016-04-03,3,13,4,2016,1
3,2016-04-28,28,17,4,2016,5
4,2016-04-03,3,13,4,2016,1


In [30]:
arrival_table.write.parquet(OUTPUT_DIR + 'arrivals_date_table')

****departure_date_table****

In [31]:
departure_table = df_spark.selectExpr('departure_date AS dt', 
                                    'dayofmonth(departure_date) AS day', 
                                    'weekofyear(departure_date) AS week',
                                    'month(departure_date) AS month',
                                    'year(departure_date) AS year',
                                    'dayofweek(departure_date) AS weekday'                                          
                                       )

In [103]:
departure_table.printSchema()

root
 |-- dt: date (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [32]:
departure_table.limit(5).toPandas()

Unnamed: 0,dt,day,week,month,year,weekday
0,2016-06-07,7,23,6,2016,3
1,2016-05-06,6,18,5,2016,6
2,2016-04-29,29,17,4,2016,6
3,2016-05-06,6,18,5,2016,6
4,2016-05-07,7,18,5,2016,7


In [34]:
departure_table.write.parquet(OUTPUT_DIR + 'departure_date_table')

****airport_table****

In [53]:
df_airport_codes = spark.read.parquet(OUTPUT_DIR + 'staging_airport_codes')

In [55]:
airport_table = df_airport_codes.select(
    df_airport_codes.ident,
    df_airport_codes.type,
    df_airport_codes.name,
    df_airport_codes.elevation_ft.cast(Int()),
    df_airport_codes.continent,
    df_airport_codes.iso_country,
    df_airport_codes.iso_region,
    df_airport_codes.municipality,
    df_airport_codes.gps_code,
    df_airport_codes.iata_code,    
    df_airport_codes.local_code
)

In [56]:
airport_table.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)



In [58]:
airport_table.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code
0,EDKA,small_airport,Aachen-MerzbrÃ¼ck Airport,623,EU,DE,DE-NW,Aachen,EDKA,AAH,
1,KAHC,small_airport,Amedee Army Air Field,4012,,US,US-CA,Herlong,KAHC,AHC,AHC
2,KANP,small_airport,Lee Airport,34,,US,US-MD,Annapolis,KANP,ANP,ANP
3,KASN,small_airport,Talladega Municipal Airport,529,,US,US-AL,Talladega,KASN,ASN,ASN
4,AZB,small_airport,Amazon Bay Airport,12,OC,PG,PG-CPM,,,AZB,AZB


In [66]:
airport_table.write.parquet(OUTPUT_DIR + 'airport_table')

****us_city_demo_table****

In [90]:
df_us_city_demo = spark.read.parquet(OUTPUT_DIR + 'staging_us_city_demo')

In [91]:
us_city_demo_schema = R([
    Fld("City", Str()),  
    Fld("State", Str()),
    Fld("Median Age", Dbl()),
    Fld("Male Population", Long()),
    Fld("Female Population", Long()),    
    Fld("Total Population", Long()),
    Fld("Number of Veterans", Long()),
    Fld("Foreign-born", Long()),
    Fld("Average Household Size", Dbl()),
    Fld("State Code", Str()),    
    Fld("Race", Str()),
    Fld("Count", Long()),
])

In [95]:
us_city_demo_table = df_us_city_demo.select(
    upper(df_us_city_demo.city).alias('city'),
    df_us_city_demo.state,
    df_us_city_demo.median_age.cast(Dbl()),
    df_us_city_demo.male_population.cast(Long()),
    df_us_city_demo.female_population.cast(Long()),
    df_us_city_demo.total_population.cast(Long()),
    df_us_city_demo.number_of_veterans.cast(Long()),
    df_us_city_demo.foreign_born.cast(Long()),
    df_us_city_demo.average_household_size.cast(Dbl()),
    df_us_city_demo.state_code,
    df_us_city_demo.american_indian_and_alaska_native,
    df_us_city_demo.asian,
    df_us_city_demo.black_or_african_american,    
    df_us_city_demo.hispanic_or_latino,
    df_us_city_demo.white
)

In [96]:
us_city_demo_table.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: long (nullable = true)
 |-- female_population: long (nullable = true)
 |-- total_population: long (nullable = true)
 |-- number_of_veterans: long (nullable = true)
 |-- foreign_born: long (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- american_indian_and_alaska_native: long (nullable = true)
 |-- asian: long (nullable = true)
 |-- black_or_african_american: long (nullable = true)
 |-- hispanic_or_latino: long (nullable = true)
 |-- white: long (nullable = true)



In [97]:
us_city_demo_table.limit(5).toPandas()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,american_indian_and_alaska_native,asian,black_or_african_american,hispanic_or_latino,white
0,INDIO,California,35.9,43803,43723,87526,3647,22538,3.08,CA,1112.0,2866,3300,58852,48562
1,EAU CLAIRE,Wisconsin,32.0,32327,35106,67433,3261,2833,2.4,WI,876.0,4202,725,1488,62194
2,THOUSAND OAKS,California,44.8,65113,64216,129329,6323,25330,2.79,CA,2690.0,16134,3229,26754,108375
3,MOUNT PLEASANT,South Carolina,42.7,39429,41880,81309,3981,4701,2.53,SC,,1827,3730,2569,75908
4,THE WOODLANDS,Texas,38.8,56083,63061,119144,8351,21188,2.93,TX,2427.0,8828,6420,19574,106202


In [98]:
us_city_demo_table.write.parquet(OUTPUT_DIR + 'us_city_demo_table')

****city_temp_table****

In [99]:
df_city_temp = spark.read.parquet(OUTPUT_DIR + 'staging_city_temp')

In [100]:
city_temp_table = df_city_temp.select(
    df_city_temp.dt,
    df_city_temp.AverageTemperature.cast(Dbl()).alias('average_temperature'),
    df_city_temp.AverageTemperatureUncertainty.cast(Dbl()).alias('average_temperature_uncertainty'),    
    upper(df_city_temp.City).alias('city'),
    df_city_temp.Country.alias('country')
)

In [104]:
city_temp_table.printSchema()

root
 |-- dt: string (nullable = true)
 |-- average_temperature: double (nullable = true)
 |-- average_temperature_uncertainty: double (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)



In [101]:
city_temp_table.limit(5).toPandas()

Unnamed: 0,dt,average_temperature,average_temperature_uncertainty,city,country
0,1743-11-01,6.068,1.737,ÅRHUS,Denmark
1,1744-04-01,5.788,3.624,ÅRHUS,Denmark
2,1744-05-01,10.644,1.283,ÅRHUS,Denmark
3,1744-06-01,14.051,1.347,ÅRHUS,Denmark
4,1744-07-01,16.082,1.396,ÅRHUS,Denmark


In [102]:
city_temp_table.write.parquet(OUTPUT_DIR + 'city_temp_table')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [112]:
i94_event_table.count()

2710124

****admissions_table check****

In [108]:
admissions_table.count()

2693681

****date check****

In [123]:
i94_event_table.join(arrival_table, i94_event_table.arrival_date == arrival_table.dt).limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94port,i94_port_city,i94_port_state_code,i94_port_country,i94mode,i94addr,i94visa,matflag,biryear,airline,admnum,fltno,visatype,arrival_date,departure_date,i94_cit,i94_res,dt,day,week,month,year,weekday
0,4858657,2016,4,MND,MINOT,ND,UNITED STATES,1,CA,2,M,1967,*GA,94621955130,N45NY,B2,2016-04-26,2016-06-07,HUNGARY,HUNGARY,2016-04-26,26,17,4,2016,3
1,4858657,2016,4,MND,MINOT,ND,UNITED STATES,1,CA,2,M,1967,*GA,94621955130,N45NY,B2,2016-04-26,2016-06-07,HUNGARY,HUNGARY,2016-04-26,26,17,4,2016,3
2,4858657,2016,4,MND,MINOT,ND,UNITED STATES,1,CA,2,M,1967,*GA,94621955130,N45NY,B2,2016-04-26,2016-06-07,HUNGARY,HUNGARY,2016-04-26,26,17,4,2016,3
3,4858657,2016,4,MND,MINOT,ND,UNITED STATES,1,CA,2,M,1967,*GA,94621955130,N45NY,B2,2016-04-26,2016-06-07,HUNGARY,HUNGARY,2016-04-26,26,17,4,2016,3
4,4858657,2016,4,MND,MINOT,ND,UNITED STATES,1,CA,2,M,1967,*GA,94621955130,N45NY,B2,2016-04-26,2016-06-07,HUNGARY,HUNGARY,2016-04-26,26,17,4,2016,3


****airport_table****

In [125]:
i94_event_table.join(airport_table, i94_event_table.i94port == airport_table.iata_code).limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94port,i94_port_city,i94_port_state_code,i94_port_country,i94mode,i94addr,i94visa,matflag,biryear,airline,admnum,fltno,visatype,arrival_date,departure_date,i94_cit,i94_res,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code
0,4858657,2016,4,MND,MINOT,ND,UNITED STATES,1,CA,2,M,1967,*GA,94621955130,N45NY,B2,2016-04-26,2016-06-07,HUNGARY,HUNGARY,SK-217,small_airport,Medina Airport,1315,SA,CO,CO-CUN,Medina,,MND,MND
1,3486659,2016,4,STT,ST THOMAS,VI,UNITED STATES,2,VQ,2,N,1969,VES,56430066033,MANAD,WT,2016-04-19,1960-01-01,FINLAND,FINLAND,TIST,medium_airport,Cyril E. King Airport,23,,VI,VI-U-A,"Charlotte Amalie, Harry S. Truman Airport",TIST,STT,STT
2,1571855,2016,4,STT,ST THOMAS,VI,UNITED STATES,2,VQ,1,M,1974,VES,93094492430,MANAD,B1,2016-04-09,2016-05-02,FRANCE,FRANCE,TIST,medium_airport,Cyril E. King Airport,23,,VI,VI-U-A,"Charlotte Amalie, Harry S. Truman Airport",TIST,STT,STT
3,3486660,2016,4,STT,ST THOMAS,VI,UNITED STATES,2,VQ,2,N,1984,VES,56429847033,MANAD,WT,2016-04-19,1960-01-01,FINLAND,FINLAND,TIST,medium_airport,Cyril E. King Airport,23,,VI,VI-U-A,"Charlotte Amalie, Harry S. Truman Airport",TIST,STT,STT
4,5018850,2016,4,STT,ST THOMAS,VI,UNITED STATES,2,VQ,2,M,1956,VES,59320390333,MANAD,WT,2016-04-27,2016-04-28,FRANCE,FRANCE,TIST,medium_airport,Cyril E. King Airport,23,,VI,VI-U-A,"Charlotte Amalie, Harry S. Truman Airport",TIST,STT,STT


****us_city_demo_table****

In [10]:
i94_event_table = spark.read.parquet(OUTPUT_DIR + 'i94_event_table')

In [23]:
city_temp_table = spark.read.parquet(OUTPUT_DIR + 'city_temp_table')

In [22]:
i94_event_table.join(us_city_demo_table, [i94_event_table.i94_port_city == us_city_demo_table.city, i94_event_table.i94_port_state_code == us_city_demo_table.state_code, i94_event_table.i94_port_country == "UNITED STATES"]).limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94port,i94_port_city,i94_port_state_code,i94_port_country,i94mode,i94addr,i94visa,matflag,biryear,airline,admnum,fltno,visatype,arrival_date,departure_date,i94_cit,i94_res,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,american_indian_and_alaska_native,asian,black_or_african_american,hispanic_or_latino,white
0,5205011,2016,4,NYC,NEW YORK,NY,UNITED STATES,1,IL,2,M,1950,BA,59404336933,295,WT,2016-04-28,2016-05-06,FINLAND,FINLAND,NEW YORK,New York,36.0,4081698,4468707,8550405,156961,3212500,2.68,NY,90923,1304564,2192248,2485125,3835726
1,514902,2016,4,NYC,NEW YORK,NY,UNITED STATES,1,IL,2,M,1955,PD,92660857830,389,B2,2016-04-03,2016-04-29,MONGOLIA,MONGOLIA,NEW YORK,New York,36.0,4081698,4468707,8550405,156961,3212500,2.68,NY,90923,1304564,2192248,2485125,3835726
2,5205012,2016,4,NYC,NEW YORK,NY,UNITED STATES,1,IL,2,M,2005,BA,59404617333,295,WT,2016-04-28,2016-05-06,FINLAND,FINLAND,NEW YORK,New York,36.0,4081698,4468707,8550405,156961,3212500,2.68,NY,90923,1304564,2192248,2485125,3835726
3,514903,2016,4,NYC,NEW YORK,NY,UNITED STATES,1,LA,1,M,1963,CX,92689435830,890,B1,2016-04-03,2016-05-07,MONGOLIA,MONGOLIA,NEW YORK,New York,36.0,4081698,4468707,8550405,156961,3212500,2.68,NY,90923,1304564,2192248,2485125,3835726
4,5205013,2016,4,NYC,NEW YORK,NY,UNITED STATES,1,IL,2,M,2007,BA,59404542133,295,WT,2016-04-28,2016-05-06,FINLAND,FINLAND,NEW YORK,New York,36.0,4081698,4468707,8550405,156961,3212500,2.68,NY,90923,1304564,2192248,2485125,3835726


****city_temp_table****

In [24]:
i94_event_table.join(city_temp_table, [i94_event_table.i94_port_city == city_temp_table.city, i94_event_table.arrival_date == city_temp_table.dt]).limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94port,i94_port_city,i94_port_state_code,i94_port_country,i94mode,i94addr,i94visa,matflag,biryear,airline,admnum,fltno,visatype,arrival_date,departure_date,i94_cit,i94_res,dt,average_temperature,average_temperature_uncertainty,city,country


Looks like there is no overlap with city temp data

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

****i94_event_data****  
 |-- cicid: primary id  
 |-- i94yr: year of i94 arrival  
 |-- i94mon: month of i94 arrival  
 |-- i94port: city code for i94 arrival  
 |-- i94_port_city: city for i94 arrival  
 |-- i94_port_state_code: 2 letter state code for i94 arrival  
 |-- i94_port_country: country for i94 arrival  
 |-- i94mode: mode of transportation used in i94 code  
 |-- i94addr: country of residence  
 |-- i94visa: visa type for i94 entry  
 |-- matflag: matched or no-match for i94 arrival and departure   
 |-- airline: airline used for arrival or departure  
 |-- admnum: admission number  
 |-- fltno: flight number  
 |-- visatype: visa category  
 |-- arrival_date: date of i94 arrival  
 |-- departure_date: date of i94 departure  
 |-- i94_cit: country of origin  
 |-- i94_res: country of residence  
 
 ****admissions_table****  
 |-- admnum: admission number  
 |-- biryear: year of birth of applicant  
 |-- gender: gender of applicant  
 
 ****i94_mode_table****  
 |-- code: i94 mode  
 |-- value: description of i94 mode  
 
 ****visa_codes_table****  
 |-- code: visa code  
 |-- value: description of visa code  
 
 ****arrival/departure table****  
 |-- dt: date  
 |-- day: day  
 |-- week: week number  
 |-- month: month  
 |-- year: year  
 |-- weekday: integer showing weekday  
 
 ****airport_table****  
 |-- ident: primary key  
 |-- type: airport table  
 |-- name: airport name  
 |-- elevation_ft: elevation of airport  
 |-- continent: continent of airport  
 |-- iso_country: country of airport  
 |-- iso_region: region of airport  
 |-- municipality: airport municipality  
 |-- gps_code: gps_code of airport  
 |-- iata_code: iata_code of airport  
 |-- local_code: local code  
 
 ****us_city_demo_table****  
 |-- city: city  
 |-- state: state  
 |-- median_age: median age in that city  
 |-- male_population: count of male population  
 |-- female_population: count of female population  
 |-- total_population: total city population  
 |-- number_of_veterans: number of veterans in city  
 |-- foreign_born: number of foreign born  
 |-- average_household_size: number of people in household  
 |-- state_code: state code  
 |-- american_indian_and_alaska_native: count of this demographic  
 |-- asian: same as above  
 |-- black_or_african_american: same as above  
 |-- hispanic_or_latino: same as above  
 |-- white: same as above  
 
 ****city_temp_table****  
 |-- dt: date  
 |-- average_temperature: average temperature of the city  
 |-- average_temperature_uncertainty: uncertainty in temperature  
 |-- city: city  
 |-- country: country  

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

Spark is an ideal candidate for this project since its powerful analytics and summary functions allow for inspecting the data to identify areas for cleaning. The data is nicely parititioned by year, month but the data sources are varied and unclean, which makes a datalake using Spark the ideal candidate. Also, since the tables are being built for analytics, star schema is better than snowflake.

The data should be updated on a monthly cadence ideally since the arrival / depamrture dates are usually a few weeks apart.

 * The data was increased by 100x.
    - If this was the case, we could stage a periodic refresh using Airflow and perform the update in batches. 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
    - For this case, we would continue to use Airflow data pipeline and schedule a daily refresh that ends at 7am everyday.
 * The database needed to be accessed by 100+ people.
    - An ACID transaction database would be better suited for this high availability use-case. 