In [51]:
import pandas as pd
import os
import re
from functools import reduce

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import udf

### Step 1: Scope the Project and Gather Data

#### Scope 

The goal is to create an ETL pipeline using the Udacity provided I94 immigration dataset and the city temperature data from Kaggle to allow users to make queries to see there is a correlation between destination temperature and immigration statistics.

#### Describe and Gather Data 

The I94 immigration data comes from the US National Tourism and Trade Office. It is provided in SAS7BDAT format which is a binary database storage format. Some relevant attributes include:

    i94yr = 4 digit year
    i94mon = numeric month
    i94cit = 3 digit code of origin city
    i94port = 3 character code of destination USA city
    arrdate = arrival date in the USA
    i94mode = 1 digit travel code
    depdate = departure date from the USA
    i94visa = reason for immigration

The temperature data comes from Kaggle. It is provided in csv format. Some relevant attributes include:

    AverageTemperature = average temperature
    City = city name
    Country = country name
    Latitude= latitude
    Longitude = longitude

In [11]:
sas_filenames = [os.path.join(os.getcwd(), 'data/18-83510-I94-Data-2016', fn) for fn in os.listdir('data/18-83510-I94-Data-2016')]
sas_header_file = 'data/I94_SAS_Labels_Descriptions.SAS'

In [12]:
sas_filenames

['/Users/krchia/udacity-capstone/data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat',
 '/Users/krchia/udacity-capstone/data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat',
 '/Users/krchia/udacity-capstone/data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat',
 '/Users/krchia/udacity-capstone/data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat',
 '/Users/krchia/udacity-capstone/data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat',
 '/Users/krchia/udacity-capstone/data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat',
 '/Users/krchia/udacity-capstone/data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat',
 '/Users/krchia/udacity-capstone/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',
 '/Users/krchia/udacity-capstone/data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat',
 '/Users/krchia/udacity-capstone/data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat',
 '/Users/krchia/udacity-capstone/data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat',
 '/Users/krchia/udacity-capstone/data/18-83510-I94-Dat

In [13]:
exploratory_file = sas_filenames[0]
df = pd.read_sas(exploratory_file, 'sas7bdat', encoding="ISO-8859-1")

In [21]:
with open(sas_header_file) as f:
    lines = f.readlines()    

comments = [line for line in lines if '/*' in line and '*/\n' in line]
regexp = re.compile(r'^/\*\s+(?P<code>.+?)\s+-\s+(?P<description>.+)\s+\*/$')
matches = [regexp.match(c) for c in comments]

for m in matches:
    print(m.group("code"), ":", m.group('description'))

I94YR : 4 digit year
I94MON : Numeric month
I94CIT & I94RES : This format shows all the valid and invalid codes for processing
I94PORT : This format shows all the valid and invalid codes for processing
I94MODE : There are missing values as well as not reported (9)
I94BIR : Age of Respondent in Years
COUNT : Used for summary statistics
DTADFILE : Character Date Field - Date added to I-94 Files - CIC does not use
VISAPOST : Department of State where where Visa was issued - CIC does not use
OCCUP : Occupation that will be performed in U.S. - CIC does not use
ENTDEPA : Arrival Flag - admitted or paroled into the U.S. - CIC does not use
ENTDEPD : Departure Flag - Departed, lost I-94 or is deceased - CIC does not use
ENTDEPU : Update Flag - Either apprehended, overstayed, adjusted to perm residence - CIC does not use
MATFLAG : Match flag - Match of arrival and departure records
BIRYEAR : 4 digit year of birth
DTADDTO : Character Date Field - Date to which admitted to U.S. (allowed to stay un

In [37]:
### i94_port_lines = (303, 962)
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
valid_ports = {}
for line in lines[302:961]:
    match = re_obj.search(line)
    valid_ports[match.group(1)]=[match.group(2)]
from pprint import pprint
pprint(valid_ports)

{'.GA': ['No PORT Code (.GA)'],
 '060': ['No PORT Code (60)'],
 '48Y': ['PINECREEK BORDER ARPT, MN'],
 '5KE': ['KETCHIKAN, AK'],
 '5T6': ['No PORT Code (5T6)'],
 '74S': ['No PORT Code (74S)'],
 '888': ['UNIDENTIFED AIR / SEAPORT'],
 'A2A': ['No PORT Code (A2A)'],
 'ABE': ['ABERDEEN, WA          '],
 'ABG': ['ALBURG, VT            '],
 'ABQ': ['ALBUQUERQUE, NM       '],
 'ABS': ['ALBURG SPRINGS, VT    '],
 'ACY': ['POMONA FIELD - ATLANTIC CITY, NJ'],
 'ADS': ['ADDISON AIRPORT- ADDISON, TX'],
 'ADT': ['AMISTAD DAM, TX       '],
 'ADU': ['No PORT Code (ADU)'],
 'ADW': ['ANDREWS AFB, MD'],
 'AFW': ['FORT WORTH ALLIANCE, TX'],
 'AG': ['No PORT Code (AG)'],
 'AG0': ['MAGNOLIA, AR'],
 'AGA': ['AGANA, GU             '],
 'AGM': ['ALGOMA, WI            '],
 'AGN': ['ALGONAC, MI           '],
 'AGS': ['BUSH FIELD - AUGUSTA, GA'],
 'AGU': ['AGUADILLA, PR         '],
 'AKR': ['AKRON, OH             '],
 'AKT': ['No PORT Code (AKT)'],
 'ALA': ['ALAMAGORDO, NM (BPS)'],
 'ALB': ['ALBANY, NY          

In [15]:
print(df.shape)
df.head()

(2914926, 28)


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,8.0,2016.0,11.0,126.0,689.0,CHA,20759.0,2.0,,,...,,,1954.0,02082017,F,2771.0,,13295810000.0,,WT
1,18.0,2016.0,11.0,582.0,582.0,NOG,20759.0,9.0,FL,,...,,M,1969.0,11222016,M,1163.0,,13616910000.0,,CP
2,19.0,2016.0,11.0,582.0,582.0,NOG,20759.0,9.0,CA,,...,,M,1982.0,03032017,M,1442.0,,13642680000.0,,CP
3,65.0,2016.0,11.0,213.0,213.0,NEW,20759.0,1.0,CA,,...,,,1990.0,D/S,M,,UA,12924810000.0,49.0,F1
4,67.0,2016.0,11.0,687.0,687.0,ATL,20759.0,1.0,AL,,...,,,1993.0,D/S,M,,DL,12924810000.0,110.0,F1


### Step 2: Explore and Assess the Data
#### Explore the Data 
Comments below. We will ultimately use Spark for processing in the data pipeline, but will use Pandas to explore the data first.

In [16]:
df.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

In [17]:
df.biryear.unique()

array([1954., 1969., 1982., 1990., 1993., 1978., 1967., 2000., 1985.,
       1981., 1984., 1989., 1999., 1975., 1986., 1992., 1943., 1980.,
       1948., 1965., 1997., 1950., 1949., 1996., 1991., 1964., 1988.,
       1962., 1955., 1939., 1972., 1970., 1977., 1974., 1963., 1956.,
       1995., 1994., 1951., 1966., 1961., 1979., 1958., 1987., 1959.,
       1983., 1971., 1936., 1968., 1946., 1960., 1947., 1976., 1944.,
       1957., 2001., 1998., 2011., 1952., 2005., 2002., 1953., 1973.,
       1929., 1941., 2008., 1945., 2010., 2014., 2015., 2012., 1940.,
       2007., 1937., 2006., 2009., 2004., 1942., 2016., 1938., 2003.,
       2013.,   nan, 1932., 1933., 1928., 1934., 1919., 1935., 1931.,
       1925., 1930., 1927., 1924., 1920., 1926., 1921., 1918., 1922.,
       1923., 1917., 1904., 1912., 1901., 2017., 1916., 1911., 1906.,
       1905.])

In [18]:
df.i94bir.unique()

array([ 62.,  47.,  34.,  26.,  23.,  38.,  49.,  16.,  31.,  35.,  32.,
        27.,  17.,  41.,  30.,  24.,  73.,  36.,  68.,  51.,  19.,  66.,
        67.,  20.,  25.,  52.,  28.,  54.,  61.,  77.,  44.,  46.,  39.,
        42.,  53.,  60.,  21.,  22.,  65.,  50.,  55.,  37.,  58.,  29.,
        57.,  33.,  45.,  80.,  48.,  70.,  56.,  69.,  40.,  72.,  59.,
        15.,  18.,   5.,  64.,  11.,  14.,  63.,  43.,  87.,  75.,   8.,
        71.,   6.,   2.,   1.,   4.,  76.,   9.,  79.,  10.,   7.,  12.,
        74.,   0.,  78.,  13.,   3.,  nan,  84.,  83.,  88.,  82.,  97.,
        81.,  85.,  91.,  86.,  89.,  92.,  96.,  90.,  95.,  98.,  94.,
        93.,  99., 112., 104., 115.,  -1., 100., 105., 110., 111.])

In [19]:
df.i94port.unique()

array(['CHA', 'NOG', 'NEW', 'ATL', 'BOS', 'MIA', 'FTL', 'NYC', 'DAL',
       'HOU', 'ORL', 'LOS', 'SFR', 'PHI', 'CHI', 'SAJ', 'WAS', 'DET',
       'SFB', 'SPM', 'AGA', 'SAI', 'HHW', 'DEN', 'LVG', 'NOL', 'SEA',
       'TAM', 'RDU', 'SDP', 'SAC', 'PHO', 'BAL', 'ELP', 'ONT', 'AUS',
       'SLC', 'SNJ', 'NCA', 'XXX', 'SNA', 'STT', 'FMY', 'PVD', 'TST',
       'CLT', 'OAK', 'SHA', 'MAA', 'CHM', 'PEV', 'NYL', 'LAR', 'OTT',
       'BRO', 'SYS', 'FPT', 'BUF', 'SAV', 'TUC', 'CLE', 'NSV', 'OGG',
       'VCV', 'X96', 'WPB', 'HAR', 'CLM', 'TOR', 'POO', 'CIN', 'W55',
       'PIT', 'STL', 'MDT', 'PSP', 'OTM', 'TEC', 'PEM', 'BLA', 'LNB',
       'DOU', 'GAL', 'ABQ', 'MCA', 'OPF', '5T6', 'ANC', 'KAN', 'MMU',
       'RFD', 'FPR', 'CLS', 'INT', 'RNO', 'CHL', 'SYR', 'ROC', 'HPN',
       'BED', 'INP', 'MIL', 'HIG', 'LAU', 'SGR', 'BGM', 'GRB', 'PTK',
       'SRQ', 'CRQ', 'DAB', 'JAC', 'SUM', 'YGF', 'DUB', 'OKC', 'SAA',
       'KEY', 'ALB', 'NOR', 'AXB', 'CRP', 'ORO', 'SWE', 'MEM', 'FAJ',
       'MON', 'CHR',

For the I94 immigration data, we can clean the following columns: 

Clean `biryear` by ensuring data values not existing between 1900 and 2016 are turned to NULL. 

Similarly for `i194bir` with values outside 0 to 116. For `i94port`, drop rows with 'XXX' (unknown as specified in header file)

Also, not all columns are relevant. Keep only those that will be used in the dimension table.

In [69]:
spark = SparkSession.builder \
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .enableHiveSupport() \
    .getOrCreate()

def file_to_spark_df(file):
    return spark.read.format('com.github.saurfang.sas.spark').load(file)

def clean_i94_data(df):
    columns_to_keep = {'i94yr', 'i94mon', 'i94cit', 'i94port', 'i94mode', 'i94bir', 'arrdate', 'depdate', 'i94visa'}
    temp = df.filter(df.i94port.isin(list(valid_port.keys())))
    return temp.select([c for c in temp.columns if c in columns_to_keep])

In [71]:
immigration_df = clean_i94_data(file_to_spark_df(sas_filenames[0]))
immigration_df.show()

+------+------+------+-------+-------+-------+-------+------+-------+
| i94yr|i94mon|i94cit|i94port|arrdate|i94mode|depdate|i94bir|i94visa|
+------+------+------+-------+-------+-------+-------+------+-------+
|2016.0|  11.0| 126.0|    CHA|20759.0|    2.0|   null|  62.0|    2.0|
|2016.0|  11.0| 582.0|    NOG|20759.0|    9.0|   null|  47.0|    2.0|
|2016.0|  11.0| 582.0|    NOG|20759.0|    9.0|   null|  34.0|    2.0|
|2016.0|  11.0| 213.0|    NEW|20759.0|    1.0|   null|  26.0|    3.0|
|2016.0|  11.0| 687.0|    ATL|20759.0|    1.0|   null|  23.0|    3.0|
|2016.0|  11.0| 254.0|    BOS|20759.0|    1.0|20828.0|  38.0|    3.0|
|2016.0|  11.0| 692.0|    MIA|20759.0|    1.0|20768.0|  49.0|    1.0|
|2016.0|  11.0| 690.0|    ATL|20759.0|    1.0|20848.0|  16.0|    2.0|
|2016.0|  11.0| 696.0|    ATL|20759.0|    1.0|20937.0|  31.0|    2.0|
|2016.0|  11.0| 692.0|    FTL|20759.0|    1.0|   null|  35.0|    2.0|
|2016.0|  11.0| 687.0|    MIA|20759.0|    1.0|20764.0|  38.0|    2.0|
|2016.0|  11.0| 689.

In [10]:
temp_df = pd.read_csv('data/GlobalLandTemperaturesByCity.csv', sep=',')

In [11]:
temp_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [21]:
unique_temps = temp_df.AverageTemperature.unique()
print(max(unique_temps))
print(min(unique_temps))

39.650999999999996
-42.70399999999999


For the temperature data, we can clean the following columns:

Values for `AverageTemperatures` seem to be reasonable so no need to remove outliers

Drop rows with `AverageTemperature == NaN`

Drop rows with duplicate locations

In [77]:
@udf()
def city_to_port(city):
    for key in valid_port:
        if city.lower() in valid_port[key][0].lower():
            return key

def csv_to_spark_df(file):
    return spark.read.format("csv").option("header", "true").load(file)

def clean_temp_data(temp_df):
    temp_df = temp_df.filter(temp_df.AverageTemperature != 'NaN')
    temp_df = temp_df.dropDuplicates(['City', 'Country'])
    temp_df = temp_df.withColumn("i94port", city_to_port(temp_df.City))
    return temp_df.filter(temp_df.i94port != 'null')

temp_df = clean_temp_data(csv_to_spark_df("data/GlobalLandTemperaturesByCity.csv"))
# temp_df.show()

In [64]:
%env
%env IPYTHON=1
%env PYSPARK_PYTHON=/usr/local/bin/python3
%env PYSPARK_DRIVER_PYTHON=/usr/local/bin/python3
%env PYSPARK_DRIVER_PYTHON_OPTS="notebook"

env: IPYTHON=1
env: PYSPARK_PYTHON=/usr/local/bin/python3
env: PYSPARK_DRIVER_PYTHON=/usr/local/bin/python3
env: PYSPARK_DRIVER_PYTHON_OPTS="notebook"


### Step 3: Define the Data Model

#### 3.1 Conceptual Data Model

Dimension Tables

`dim_demographics` will contain the following columns from the I94 data. 

- `I94YR` : 4 digit year
- `I94MON` : Numeric month
- `I94CIT` : This format shows all the valid and invalid codes for processing
- `I94PORT` : This format shows all the valid and invalid codes for processing
- `I94MODE` : There are missing values as well as not reported (9)
- `I94BIR` : Age of Respondent in Years
- `ARRDATE` : Arrival date
- `DEPDATE` : Departure date
- `I94VISA` : Visa code (Business/Pleasure/Student)

`dim_temperature` will contain the following columns from the temperature dataset. Most are self explanatory names.

- `I94PORT` : map the city/country/location to the corresponding I94 port code.
- `AverageTemperature`
- `City`
- `Country`
- `Latitude`
- `Longitude`

Fact Tables

`fact_immigration` allows queries in line with the intended purpose of the project:

- `I94YR` : 4 digit year
- `I94MON` : Numeric month
- `I94CIT` : This format shows all the valid and invalid codes for processing
- `I94PORT` : This format shows all the valid and invalid codes for processing
- `I94MODE` : There are missing values as well as not reported (9)
- `I94BIR` : Age of Respondent in Years
- `ARRDATE` : Arrival date
- `DEPDATE` : Departure date
- `I94VISA` : Visa code (Business/Pleasure/Student)
- `AverageTemperature`

#### 3.2 Mapping Out Data Pipelines

1. clean the data and create the dimension table for both i94 and temperature datasets
2. create fact table by joining both dimension tables on i94port and write to parquet file partitioned by i94port


### Step 4: Run Pipelines to Model the Data
#### 4.1 Create the data model

Build the data pipelines to create the data model.


In [None]:
immigration_df = clean_i94_data(file_to_spark_df(sas_filenames[0]))
immigration_df.write \
    .mode("append") \
    .partitionBy("i94port") \
    .parquet("/tables/immigration.parquet")

temp_df = clean_temp_data(csv_to_spark_df("data/GlobalLandTemperaturesByCity.csv"))
temp_df.write \
    .mode("append") \
    .partitionBy("i94port") \
    .parquet("/tables/temperature.parquet")

# Create temporary views of the immigration and temperature data
immigration_df.createOrReplaceTempView("immigration_view")
temp_df.createOrReplaceTempView("temp_view")

# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql("""
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.i94mode as i94mode,
       immigration_view.i94bir as i94bir,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temp_view.AverageTemperature as temperature,
FROM immigration_view
JOIN temp_view ON (immigration_view.i94port = temp_view.i94port)
""")

# Write fact table to parquet files partitioned by i94port
fact_table.write \
    .mode("append") \
    .partitionBy("i94port") \
    .parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks

In [None]:
def count_check(df):
    return df.count() == 0 

def integrity_check(df_immigration, df_temp):
    return df_immigration.select(col("i94port")).distinct() \
         .join(df_temp, df_immigration["i94port"] == df_temp["i94port"], "left_anti") \
         .count() == 0

def quality_check(df_immigration, df_temp):
    return count_check(df_immigration) and count_check(df_temp) \
        and integrity_check(df_immigration, df_temp)

# Perform data quality check
quality_check(df_immigration, df_temp)

#### 4.3 Data dictionary

Dimension Tables

`dim_demographics` will contain the following columns from the I94 data. 

- `I94YR` : 4 digit year
- `I94MON` : Numeric month
- `I94CIT` : This format shows all the valid and invalid codes for processing
- `I94PORT` : This format shows all the valid and invalid codes for processing
- `I94MODE` : There are missing values as well as not reported (9)
- `I94BIR` : Age of Respondent in Years
- `ARRDATE` : Arrival date
- `DEPDATE` : Departure date
- `I94VISA` : Visa code (Business/Pleasure/Student)

`dim_temperature` will contain the following columns from the temperature dataset. Most are self explanatory names.

- `I94PORT` : map the city/country/location to the corresponding I94 port code.
- `AverageTemperature`
- `City`
- `Country`
- `Latitude`
- `Longitude`

Fact Tables

`fact_immigration` allows queries in line with the intended purpose of the project:

- `I94YR` : 4 digit year
- `I94MON` : Numeric month
- `I94CIT` : This format shows all the valid and invalid codes for processing
- `I94PORT` : This format shows all the valid and invalid codes for processing
- `I94MODE` : There are missing values as well as not reported (9)
- `I94BIR` : Age of Respondent in Years
- `ARRDATE` : Arrival date
- `DEPDATE` : Departure date
- `I94VISA` : Visa code (Business/Pleasure/Student)
- `AverageTemperature`