# Udacity Data Engineering Capstone Project

## US Immigrations, Demographics and Airports

The project focuses on the provision of a database containing United States (US) immigration from 2016, city demographics from 2015 and airport data for data analysts, statisticians or researcher within that field.

----

## Table of Contents

1. [Scope](#scope)
1. [Tools & Technologies](#tools-tech)
1. [Data Wrangling](#wrangling)
<br>3.1. [Gathering and Assessing Data](#gathering-assessing)
<br>3.2. [Cleaning Data](#cleaning)
<br>3.2.1. [I94 Immigration](#c-immigration)
<br>3.2.2. [Airport Codes](#c-airport)
<br>3.2.3. [US City Demographic](#c-demographic)
1. [Data Model Definition](#model-definition)
<br>4.1. [Conceptual Data Model](#model-conceptual)
<br>4.2. [ETL Definition](#etl-definition)
1. [Run ETL Pipeline](#etl-run)
<br>5.1. [Data Model Creation](#model-creation)
<br>5.2. [Data Dictionary ](#dict)
<br>5.3. [Data Quality Checks](#quality)
1. [Further Scenarios and Approaches](#scenarios)
1. [Known Issues](#issues)

In [1]:
from pathlib import Path
import collections
import os 
import re
import datetime
import pandas as pd
import json
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window
import utils

<a id='scope'></a>
## 1. Scope

The goal of the project is to provide a database containing United States (US) immigration from 2016, city demographics from 2015 and airport data for data analysts, statisticians or researcher within that field.

The process of the database creation is outlined and implemented in this notebook.

The database will consist of analytic tables stored in a data lake in Apache parquet format. Fact and dimension tables will be created (star schema). For following use cases and queries the database could be used:
1. Is there any relation between US immigrations and US city demographics?
1. How long is the average stay of immigrants in the US and how does it vary per month?
1. Which Airport is the most frequent one used by immigrants?
1. Which type of visa is most popular?
1. How does the city population impact the number and size of Airports in a municipality?

<a id='tools-tech'></a>
## 2. Tools & Technologies
1. Python with Pandas for data wrangling of small datasets
1. Pyspark for data wrangling and ETL-steps to allow parellel processing of large datasets
1. Parquet for storing analytics table
1. Optional: AWS S3 as storage (data lake)

The tools and technologies were chosen, because Pandas allows data processing of small datasets. Pyspark was chosen, because it's made for processing of large datasets.

Pandas as well as Pyspark allow reading the format of the raw datasets provided. To store the target data model as star schema for OLAP and BI purposes, the Apache parquet format was chosen. Parquet allows storing data in columnar format (column oriented) and to partition the data, which will make the analytics queries, processing and joins using Pypsark in a compute cluster environment faster. For data storage AWS S3 was chosen, because it's an objet store suitable as data lake with a great performance and interoperability.

In [2]:
### Constants used within the project below

## ------ EDIT THE FOLLOWING CONSTANTS AS REQUIRED ------

# Set this to True, if Udacity workspace is used. For Udacity workspace symlinks will be created automatically
UDACITY_WS = True
# Set this to True, if data is stored on S3
S3 = False
# Set this to True, to use only a sample file for world temparature dataset
SAMPLE_TEMPERATURE = False
# Set this to True, to use only a sample file for US immigration dataset
SAMPLE_IMMIGRATION = True
# Set this to True, to use only a single month file for US immigration dataset
SAMPLE_IMMIGRATION_SAS = False

In [3]:
if UDACITY_WS:
    !ln -s /data/18-83510-I94-Data-2016/ /home/workspace/data/raw/
    !ln -s /data/I94_SAS_Labels_Descriptions.SAS /home/workspace/data/raw/
    !ln -s /data2/GlobalLandTemperaturesByCity.csv /home/workspace/data/raw/

ln: failed to create symbolic link '/home/workspace/data/raw/18-83510-I94-Data-2016': File exists
ln: failed to create symbolic link '/home/workspace/data/raw/I94_SAS_Labels_Descriptions.SAS': File exists
ln: failed to create symbolic link '/home/workspace/data/raw/GlobalLandTemperaturesByCity.csv': File exists


In [4]:
## ------ DO NOT CHANGE THE FOLLOWING CONSTANTS ------

# Relative data dirs
DATA_RAW = Path(f"s3://{utils.S3_BUCKET_NAME}/raw") if S3 else Path("data/raw")
DATA_RAW_AIRPORT_CSV = DATA_RAW / "airport-codes.csv"
DATA_RAW_IMMIGRATION_SAMPLE_CSV = DATA_RAW / "immigration_data_sample.csv"
DATA_RAW_IMMIGRATION_SAS_DIR = DATA_RAW / "18-83510-I94-Data-2016"
DATA_RAW_IMMIGRATION_SAS_DATA_DICT = DATA_RAW / "I94_SAS_Labels_Descriptions.SAS"
DATA_RAW_TEMP_CSV = DATA_RAW / "GlobalLandTemperaturesByCity.csv"
DATA_RAW_TEMP_SAMPLE_CSV = DATA_RAW / "global-land-temperature-by-city-sample.csv"
DATA_RAW_US_DEMO_CSV = DATA_RAW / "us-cities-demographics.csv"

DATA_CLEANED = Path(f"s3://{S3_BUCKET_NAME}/cleaned") if S3 else Path("data/cleaned")
DATA_CLEANED_IMMIGRATION = DATA_CLEANED / "immigration"
if SAMPLE_IMMIGRATION:
    DATA_CLEANED_IMMIGRATION_PARQUET = str(DATA_CLEANED_IMMIGRATION / "sample_csv_pq")
elif SAMPLE_IMMIGRATION_SAS:
    DATA_CLEANED_IMMIGRATION_PARQUET = str(DATA_CLEANED_IMMIGRATION / "sample_month_pq")
else:
    DATA_CLEANED_IMMIGRATION_PARQUET = str(DATA_CLEANED_IMMIGRATION / "complete_pq")
DATA_CLEANED_AIRPORT_PARQUET = str(DATA_CLEANED / "airport_pq")
DATA_CLEANED_DEMO_PARQUET = str(DATA_CLEANED / "demographic_pq")

DATA_PROCESSED = Path(f"s3://{S3_BUCKET_NAME}/processed") if S3 else Path("data/processed")
DATA_PROCESSED_IMMIGRATION_DATA_DICT = DATA_PROCESSED / "immigration_data_dict"
if SAMPLE_IMMIGRATION:
    DATA_PROCESSED_IMMIGRATION = DATA_PROCESSED / "immigration_sample_csv"
elif SAMPLE_IMMIGRATION_SAS:
    DATA_PROCESSED_IMMIGRATION = DATA_PROCESSED / "immigration_sample_month"
else:
    DATA_PROCESSED_IMMIGRATION = DATA_PROCESSED / "immigration_complete"
DATA_PROCESSED_F_IMMIGRATIONS = str(DATA_PROCESSED_IMMIGRATION / "f_us_immigrations_pq")
DATA_PROCESSED_D_AIRPORTS = str(DATA_PROCESSED / "d_us_airports_pq")
DATA_PROCESSED_D_DEMOGRAPHIC = str(DATA_PROCESSED / "d_us_demographics_pq")
DATA_PROCESSED_D_VISITORS = str(DATA_PROCESSED / "d_visitors_pq")

for d in [DATA_RAW_IMMIGRATION_SAS_DIR, DATA_PROCESSED_IMMIGRATION_DATA_DICT, DATA_PROCESSED_IMMIGRATION]:
    os.makedirs(d, exist_ok=True)

In [5]:
### Initialize Spark - Adapt, if using AWS EMR Spark Cluster
spark = SparkSession.builder \
    .config("spark.jars.repositories", "https://repos.spark-packages.org/") \
    .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .enableHiveSupport() \
    .getOrCreate()

<a id='wrangling'></a>
## 3. Data Wrangling

<a id='gathering-assessing'></a>
### 3.1. Gathering and Assessing Data

The following datasets will be used:
* I94 Immigration: This dataset comes from the [US National Tourism and Trade Office](https://travel.trade.gov/research/reports/i94/historical/2016.html). It consists of:
    * Sample CSV file containing 1000 rows
    * SAS files partioned by month for the complete year 2016
    * Data dictionary 
* World Temperature: This dataset came from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data). It provides monthly average temparature data for cities around the global.
* Airport Codes: This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).
* US City Demographic: This data comes from [OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). It covers information about demographic of cities collect by the US Census Bureau's 2015 American Community Survey.

The data will be gathered and explored using Pandas and Pyspark in the following. This will cover visual and programmatic assessment in order to identify quality and tidiness issues. The findings will be summarized and cleaning actions defined within the next section.

**Load the datasets initially to allow cross-dataset checks**

In [6]:
df_immigration_sample = pd.read_csv(DATA_RAW_IMMIGRATION_SAMPLE_CSV)
df_temp = pd.read_csv(DATA_RAW_TEMP_SAMPLE_CSV) if SAMPLE_TEMPERATURE else pd.read_csv(DATA_RAW_TEMP_CSV)
df_airport = pd.read_csv(DATA_RAW_AIRPORT_CSV)
df_demo = pd.read_csv(DATA_RAW_US_DEMO_CSV, sep=";")

**I94 Immigration**

In [7]:
df_immigration_sample.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [8]:
df_immigration_sample.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null float64
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null float64
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null int64
visapost      382 non-null object
occup         4 non-null object
entdepa       1000 non-null object
entdepd       954 non-null object
entdepu       0 non-null float64
matflag       954 non-null object
biryear       1000 non-null float64
dtaddto       1000 non-null object
gender        859 non-null object
insnum        35 non-null float64
airline       967 non

In [9]:
df_immigration_sample.describe()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,arrdate,i94mode,depdate,i94bir,i94visa,count,dtadfile,entdepu,biryear,insnum,admnum
count,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,951.0,1000.0,1000.0,1000.0,1000.0,0.0,1000.0,35.0,1000.0
mean,1542097.0,3040461.0,2016.0,4.0,302.928,298.262,20559.68,1.078,20575.037855,42.382,1.859,1.0,20160420.0,,1973.618,3826.857143,69372370000.0
std,915287.9,1799818.0,0.0,0.0,206.485285,202.12039,8.995027,0.485955,24.211234,17.903424,0.386353,0.0,49.51657,,17.903424,221.742583,23381340000.0
min,10925.0,13208.0,2016.0,4.0,103.0,103.0,20545.0,1.0,20547.0,1.0,1.0,1.0,20160400.0,,1923.0,3468.0,0.0
25%,721442.2,1412170.0,2016.0,4.0,135.0,131.0,20552.0,1.0,20561.0,30.75,2.0,1.0,20160410.0,,1961.0,3668.0,55993010000.0
50%,1494568.0,2941176.0,2016.0,4.0,213.0,213.0,20560.0,1.0,20570.0,42.0,2.0,1.0,20160420.0,,1974.0,3887.0,59314770000.0
75%,2360901.0,4694151.0,2016.0,4.0,438.0,438.0,20567.25,1.0,20580.0,55.0,2.0,1.0,20160420.0,,1985.25,3943.0,93436230000.0
max,3095749.0,6061994.0,2016.0,4.0,746.0,696.0,20574.0,9.0,20715.0,93.0,3.0,1.0,20160800.0,,2015.0,4686.0,95021510000.0


Switch to Spark to avoid implementation in Pandas to be transfered to Spark, because SAS files are very large

In [10]:
sas_files = os.listdir(DATA_RAW_IMMIGRATION_SAS_DIR)
sas_files

['i94_apr16_sub.sas7bdat',
 'i94_sep16_sub.sas7bdat',
 'i94_nov16_sub.sas7bdat',
 'i94_mar16_sub.sas7bdat',
 'i94_jun16_sub.sas7bdat',
 'i94_aug16_sub.sas7bdat',
 'i94_may16_sub.sas7bdat',
 'i94_jan16_sub.sas7bdat',
 'i94_oct16_sub.sas7bdat',
 'i94_jul16_sub.sas7bdat',
 'i94_feb16_sub.sas7bdat',
 'i94_dec16_sub.sas7bdat']

In [11]:
def get_immigration_df(sas_files):
    # Pyspark 3.x: df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(sas_data_dir, pathGlobFilter="*.sas7bdat")
    for i, f in enumerate(sas_files): 
        if i == 0:
            if SAMPLE_IMMIGRATION:
                df = spark.read.option("header", True).csv(str(DATA_RAW_IMMIGRATION_SAMPLE_CSV))
            else:
                df = spark.read.format('com.github.saurfang.sas.spark').load(f"{DATA_RAW_IMMIGRATION_SAS_DIR / f}")
            continue
        if SAMPLE_IMMIGRATION_SAS or SAMPLE_IMMIGRATION:
            break
        df_month = spark.read.format('com.github.saurfang.sas.spark').load(f"{DATA_RAW_IMMIGRATION_SAS_DIR / f}")
        df = df.unionByName(df_month)
    return df

In [12]:
df_immigration = get_immigration_df(sas_files)
df_immigration.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = tru

In [13]:
df_immigration.count()

1000

In [14]:
len(df_immigration.columns)

29

In [15]:
df_immigration.head(5)

[Row(_c0='2027561', cicid='4084316.0', i94yr='2016.0', i94mon='4.0', i94cit='209.0', i94res='209.0', i94port='HHW', arrdate='20566.0', i94mode='1.0', i94addr='HI', depdate='20573.0', i94bir='61.0', i94visa='2.0', count='1.0', dtadfile='20160422', visapost=None, occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear='1955.0', dtaddto='07202016', gender='F', insnum=None, airline='JL', admnum='56582674633.0', fltno='00782', visatype='WT'),
 Row(_c0='2171295', cicid='4422636.0', i94yr='2016.0', i94mon='4.0', i94cit='582.0', i94res='582.0', i94port='MCA', arrdate='20567.0', i94mode='1.0', i94addr='TX', depdate='20568.0', i94bir='26.0', i94visa='2.0', count='1.0', dtadfile='20160423', visapost='MTR', occup=None, entdepa='G', entdepd='R', entdepu=None, matflag='M', biryear='1990.0', dtaddto='10222016', gender='M', insnum=None, airline='*GA', admnum='94361995930.0', fltno='XBLNG', visatype='B2'),
 Row(_c0='589494', cicid='1195600.0', i94yr='2016.0', i94mon='4.0', i94cit='148.

In [16]:
def print_null_nan_values(df):
    df.select([F.count(F.when(F.isnan(c) |  F.col(c).isNull(), c)).alias(c) for c in df.columns if "date" not in c]).show()

In [17]:
def print_null_values_date(df):
    df.select([F.count(F.when( F.col(c).isNull(), c)).alias(c) for c in df.columns if "date" in c]).show()

In [18]:
print_null_nan_values(df_immigration)

+---+-----+-----+------+------+------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|_c0|cicid|i94yr|i94mon|i94cit|i94res|i94port|i94mode|i94addr|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|
+---+-----+-----+------+------+------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|  0|    0|    0|     0|     0|     0|      0|      0|     59|     0|      0|    0|       0|     618|  996|      0|     46|   1000|     46|      0|      0|   141|   965|     33|     0|    8|       0|
+---+-----+-----+------+------+------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+


In [19]:
print_null_values_date(df_immigration)

+-------+-------+
|arrdate|depdate|
+-------+-------+
|      0|     49|
+-------+-------+



In [20]:
df_immigration.filter( F.col("i94addr").isNull()).head(5)

[Row(_c0='1339656', cicid='2711583.0', i94yr='2016.0', i94mon='4.0', i94cit='148.0', i94res='112.0', i94port='FTL', arrdate='20559.0', i94mode='2.0', i94addr=None, depdate='20565.0', i94bir='54.0', i94visa='2.0', count='1.0', dtadfile='20160415', visapost=None, occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear='1962.0', dtaddto='07132016', gender='F', insnum=None, airline='VES', admnum='56175860733.0', fltno='93724', visatype='WT'),
 Row(_c0='2938436', cicid='5960799.0', i94yr='2016.0', i94mon='4.0', i94cit='245.0', i94res='245.0', i94port='SAI', arrdate='20545.0', i94mode='1.0', i94addr=None, depdate='20550.0', i94bir='30.0', i94visa='2.0', count='1.0', dtadfile='20160615', visapost=None, occup=None, entdepa='P', entdepd='D', entdepu=None, matflag='M', biryear='1986.0', dtaddto='04132016', gender='M', insnum='3882', airline='MU', admnum='44162582033.0', fltno='00763', visatype='CP'),
 Row(_c0='1808498', cicid='3669540.0', i94yr='2016.0', i94mon='4.0', i94cit='12

Read the data dictionary to understand the data

In [21]:
with open(DATA_RAW_IMMIGRATION_SAS_DATA_DICT) as f:
    immigration_labels = f.read()

In [22]:
immigration_labels

"libname library 'Your file location' ;\nproc format library=library ;\n\n/* I94YR - 4 digit year */\n\n/* I94MON - Numeric month */\n\n/* I94CIT & I94RES - This format shows all the valid and invalid codes for processing */\n  value i94cntyl\n   582 =  'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'\n   236 =  'AFGHANISTAN'\n   101 =  'ALBANIA'\n   316 =  'ALGERIA'\n   102 =  'ANDORRA'\n   324 =  'ANGOLA'\n   529 =  'ANGUILLA'\n   518 =  'ANTIGUA-BARBUDA'\n   687 =  'ARGENTINA '\n   151 =  'ARMENIA'\n   532 =  'ARUBA'\n   438 =  'AUSTRALIA'\n   103 =  'AUSTRIA'\n   152 =  'AZERBAIJAN'\n   512 =  'BAHAMAS'\n   298 =  'BAHRAIN'\n   274 =  'BANGLADESH'\n   513 =  'BARBADOS'\n   104 =  'BELGIUM'\n   581 =  'BELIZE'\n   386 =  'BENIN'\n   509 =  'BERMUDA'\n   153 =  'BELARUS'\n   242 =  'BHUTAN'\n   688 =  'BOLIVIA'\n   717 =  'BONAIRE, ST EUSTATIUS, SABA' \n   164 =  'BOSNIA-HERZEGOVINA'\n   336 =  'BOTSWANA'\n   689 =  'BRAZIL'\n   525 =  'BRITISH VIRGIN ISLANDS'\n   217 =  '

**World Temperature**

In [23]:
df_temp.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [24]:
df_temp.tail(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
8599207,2013-05-01,11.464,0.236,Zwolle,Netherlands,52.24N,5.26E
8599208,2013-06-01,15.043,0.261,Zwolle,Netherlands,52.24N,5.26E
8599209,2013-07-01,18.775,0.193,Zwolle,Netherlands,52.24N,5.26E
8599210,2013-08-01,18.025,0.298,Zwolle,Netherlands,52.24N,5.26E
8599211,2013-09-01,,,Zwolle,Netherlands,52.24N,5.26E


In [25]:
df_temp.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


In [26]:
df_temp.describe()

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty
count,8235082.0,8235082.0
mean,16.72743,1.028575
std,10.35344,1.129733
min,-42.704,0.034
25%,10.299,0.337
50%,18.831,0.591
75%,25.21,1.349
max,39.651,15.396


In [27]:
(df_temp.Country == 'United States').sum()

687289

In [28]:
df_temp[df_temp.Country == 'United States'].dt.tail(5)

8439242    2013-05-01
8439243    2013-06-01
8439244    2013-07-01
8439245    2013-08-01
8439246    2013-09-01
Name: dt, dtype: object

In [29]:
(pd.to_datetime(df_temp.dt) >= pd.Timestamp(datetime.date(2016,1,1))).sum()

0

**Airport Codes**

In [30]:
df_airport.head(5)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [31]:
df_airport.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [32]:
df_airport.describe()

Unnamed: 0,elevation_ft
count,48069.0
mean,1240.789677
std,1602.363459
min,-1266.0
25%,205.0
50%,718.0
75%,1497.0
max,22000.0


In [33]:
df_airport.iso_country.unique()

array(['US', 'PR', 'MH', 'MP', 'GU', 'SO', 'AQ', 'GB', 'PG', 'AD', 'SD',
       'SA', 'AE', 'SS', 'ES', 'CN', 'AF', 'LK', 'SB', 'CO', 'AU', 'MG',
       'TD', 'AL', 'AM', 'MX', 'MZ', 'PW', 'NR', 'AO', 'AR', 'AS', 'AT',
       'ZZ', 'GA', 'AZ', 'BA', 'BB', 'BE', 'DE', 'BF', 'BG', 'GL', 'BH',
       'BI', 'IS', 'BJ', 'OM', 'XK', 'BM', 'KE', 'PH', 'BO', 'BR', 'BS',
       'CV', 'BW', 'FJ', 'BY', 'UA', 'LR', 'BZ', 'CA', 'CD', 'CF', 'CG',
       'MR', 'CH', 'CL', 'CM', 'MA', 'CR', 'CU', 'CY', 'CZ', 'SK', 'PA',
       'DZ', 'ID', 'GH', 'RU', 'CI', 'DK', 'NG', 'DO', 'NE', 'HR', 'TN',
       'TG', 'EC', 'EE', 'FI', 'EG', 'GG', 'JE', 'IM', 'FK', 'EH', 'NL',
       'IE', 'FO', 'LU', 'NO', 'PL', 'ER', 'MN', 'PT', 'SE', 'ET', 'LV',
       'LT', 'ZA', 'SZ', 'GQ', 'SH', 'MU', 'IO', 'ZM', 'FM', 'KM', 'YT',
       'RE', 'TF', 'ST', 'FR', 'SC', 'ZW', 'MW', 'LS', nan, 'ML', 'GM',
       'GE', 'GF', 'SL', 'GW', 'GN', 'SN', 'GR', 'GT', 'TZ', 'GY', 'SR',
       'DJ', 'HK', 'LY', 'HN', 'VN', 'KZ', 'RW', 'HT

In [34]:
airport_us_mask = df_airport.iso_country == 'US'

In [35]:
airport_us_mask.sum()

22757

In [36]:
df_us_airport = df_airport[airport_us_mask].copy()
df_us_airport.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 22757 entries, 0 to 54896
Data columns (total 12 columns):
ident           22757 non-null object
type            22757 non-null object
name            22757 non-null object
elevation_ft    22518 non-null float64
continent       1 non-null object
iso_country     22757 non-null object
iso_region      22757 non-null object
municipality    22655 non-null object
gps_code        20984 non-null object
iata_code       2019 non-null object
local_code      21236 non-null object
coordinates     22757 non-null object
dtypes: float64(1), object(11)
memory usage: 2.3+ MB


In [37]:
df_us_airport.municipality.nunique()

8738

In [38]:
df_us_airport.iso_region.nunique(), df_us_airport.iso_region.unique()

(52, array(['US-PA', 'US-KS', 'US-AK', 'US-AL', 'US-AR', 'US-OK', 'US-AZ',
        'US-CA', 'US-CO', 'US-FL', 'US-GA', 'US-HI', 'US-ID', 'US-IN',
        'US-IL', 'US-KY', 'US-LA', 'US-MD', 'US-MI', 'US-MN', 'US-MO',
        'US-MT', 'US-NJ', 'US-NC', 'US-NY', 'US-OH', 'US-OR', 'US-SC',
        'US-SD', 'US-TX', 'US-TN', 'US-UT', 'US-VA', 'US-WA', 'US-WI',
        'US-WV', 'US-WY', 'US-CT', 'US-IA', 'US-MA', 'US-ME', 'US-NE',
        'US-NH', 'US-NM', 'US-NV', 'US-MS', 'US-ND', 'US-VT', 'US-RI',
        'US-DC', 'US-DE', 'US-U-A'], dtype=object))

In [39]:
airport_us_state_codes = df_us_airport.iso_region.str.replace("US-", "").unique()
airport_us_state_codes

array(['PA', 'KS', 'AK', 'AL', 'AR', 'OK', 'AZ', 'CA', 'CO', 'FL', 'GA',
       'HI', 'ID', 'IN', 'IL', 'KY', 'LA', 'MD', 'MI', 'MN', 'MO', 'MT',
       'NJ', 'NC', 'NY', 'OH', 'OR', 'SC', 'SD', 'TX', 'TN', 'UT', 'VA',
       'WA', 'WI', 'WV', 'WY', 'CT', 'IA', 'MA', 'ME', 'NE', 'NH', 'NM',
       'NV', 'MS', 'ND', 'VT', 'RI', 'DC', 'DE', 'U-A'], dtype=object)

In [40]:
i94addr_unique_sample = df_immigration_sample[df_immigration_sample.i94addr.notnull()].i94addr.unique()
i94addr_unique_sample

array(['HI', 'TX', 'FL', 'CA', 'NY', 'GA', 'IL', 'MA', 'NV', 'PA', 'GU',
       'NC', 'NJ', 'VT', 'WA', 'NE', 'VA', 'MP', 'IN', 'MO', 'MI', 'OR',
       'MN', 'UN', 'ID', 'AZ', 'KY', 'SC', 'MS', 'MD', 'TN', 'OH', 'CT',
       'KS', 'DC', 'IA', 'LA', 'VQ', 'PR', 'CO', 'AL', 'SW', 'NM', 'UT',
       'OK', 'NH', 'TE', 'ME', 'AR', 'RI', 'WI'], dtype=object)

In [41]:
{ausc for ausc in airport_us_state_codes if ausc not in i94addr_unique_sample}, {sc for sc in i94addr_unique_sample  if sc not in airport_us_state_codes}

({'AK', 'DE', 'MT', 'ND', 'SD', 'U-A', 'WV', 'WY'},
 {'GU', 'MP', 'PR', 'SW', 'TE', 'UN', 'VQ'})

In [42]:
demo_state_code_unique = df_demo["State Code"].unique()
demo_state_code_unique

array(['MD', 'MA', 'AL', 'CA', 'NJ', 'IL', 'AZ', 'MO', 'NC', 'PA', 'KS',
       'FL', 'TX', 'VA', 'NV', 'CO', 'MI', 'CT', 'MN', 'UT', 'AR', 'TN',
       'OK', 'WA', 'NY', 'GA', 'NE', 'KY', 'SC', 'LA', 'NM', 'IA', 'RI',
       'PR', 'DC', 'WI', 'OR', 'NH', 'ND', 'DE', 'OH', 'ID', 'IN', 'AK',
       'MS', 'HI', 'SD', 'ME', 'MT'], dtype=object)

In [43]:
{ausc for ausc in airport_us_state_codes if ausc not in demo_state_code_unique}, {sc for sc in demo_state_code_unique  if sc not in airport_us_state_codes}

({'U-A', 'VT', 'WV', 'WY'}, {'PR'})

In [44]:
print(df_us_airport[df_us_airport.iso_region == "US-VT"].shape)
df_us_airport[df_us_airport.iso_region == "US-VT"].head(5)

(102, 12)


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
305,04VT,small_airport,Lightning Bolt Field Airport,2156.0,,US,US-VT,Cabot,04VT,,04VT,"-72.26360321039999, 44.4011001587"
365,05VT,heliport,Port of Highgate Springs Heliport,68.0,,US,US-VT,Highgate Springs,05VT,,05VT,"-73.08670043945312, 45.013301849365234"
478,07VT,small_airport,Meadow STOLport,801.0,,US,US-VT,Jericho,07VT,,07VT,"-72.91470336914062, 44.447200775146484"
632,0B7,small_airport,Warren-Sugarbush Airport,1470.0,,US,US-VT,Warren,0B7,,0B7,"-72.8271026611, 44.1166992188"
1715,18VT,heliport,Springfield Hospital Heliport,589.0,,US,US-VT,Springfield,18VT,,18VT,"-72.494588, 43.298465"


In [45]:
print(df_us_airport[df_us_airport.iso_region == "US-WV"].shape)
df_us_airport[df_us_airport.iso_region == "US-WV"].head(5)

(140, 12)


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
60,00WV,small_airport,Lazy J. Aerodrome,2060.0,,US,US-WV,Beverly,00WV,,00WV,"-79.86609649658203, 38.82889938354492"
310,04WV,heliport,West Virginia Univ. Hosp. Inc. Gnd. Pad #2 Hel...,1137.0,,US,US-WV,Morgantown,04WV,,04WV,"-79.9561111111, 39.6536111111"
1374,12WV,small_airport,Crazy Horse Airport,603.0,,US,US-WV,Hamlin,12WV,,12WV,"-82.115, 38.3421667"
1482,14P,small_airport,Boggs Field,927.0,,US,US-WV,Spencer,14P,,14P,"-81.34940338134766, 38.82389831542969"
1497,14WV,heliport,Snowshoe Resort/Topof the World Heliport,4792.0,,US,US-WV,Linwood,14WV,,14WV,"-79.99579620360001, 38.4006004333"


In [46]:
print(df_us_airport[df_us_airport.iso_region == "US-WY"].shape)
df_us_airport[df_us_airport.iso_region == "US-WY"].head(5)

(127, 12)


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
61,00WY,heliport,Mountain View Regional Hospital Heliport,5210.0,,US,US-WY,Casper,00WY,,00WY,"-106.224443, 42.840361"
125,01WY,small_airport,Keyhole Airport,4250.0,,US,US-WY,Moorcroft,01WY,,01WY,"-104.81099700927734, 44.347198486328125"
1171,0WY0,small_airport,Freedom Air Ranch Airport,5698.0,,US,US-WY,Freedom,0WY0,,0WY0,"-111.038056, 43.039722"
1172,0WY1,small_airport,Dorsey Creek Ranch Airport,4017.0,,US,US-WY,Basin,0WY1,,0WY1,"-108.163333, 44.411111"
1250,10WY,small_airport,Willow Creek Ranch Airport,5521.0,,US,US-WY,Kaycee,10WY,,10WY,"-106.823611, 43.423333"


In [47]:
df_us_airport[df_us_airport.iso_region == "PR"]

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates


In [48]:
df_us_airport[df_us_airport.iso_region == "US-U-A"]

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
49318,US-0303,large_airport,atl,,,US,US-U-A,,,,,"-84.375, 33.137551"
49334,US-0319,small_airport,Ronnie Cole,900.0,,US,US-U-A,Greenfield,,,,"-85.72268, 39.831008"
49399,US-0384,closed,Beacon Station Air Strip,,,US,US-U-A,Baker,,,,"-116.20977, 35.13047"
49625,US-0610,closed,Erase Me 13,,,US,US-U-A,,,,,"0, 0.4"
49757,US-0742,medium_airport,34S Airport,,,US,US-U-A,,,,,"-16.875, 19.145168"
49811,US-0796,closed,Not DTW although Detroit can be just as isolated,,,US,US-U-A,,,,,"0.8, 0.6"
49820,US-0805,small_airport,Twin Cities,,,US,US-U-A,"Tabor City,NC",K5J9,,,"0, 0"
49894,US-0879,medium_airport,Buy Lyrica online (Pregabalin) 150mg capsules ...,,,US,US-U-A,,,,,"3.1666666666667, -1.1666666666667"
49895,US-0880,medium_airport,Buy Lyrica online. Discount Lyrica 75 mg onlin...,0.0,,US,US-U-A,,,,,"-2.1666666666667, -2.1666666666667"
49947,US-0932,small_airport,CLE,,,US,US-U-A,Cleveland,,,,"0, 0"


Double check, if iata_code or local_code matches immigration i94port code

In [49]:
i94port_unique_sample = df_immigration_sample.i94port.unique()
i94port_unique_sample, len(i94port_unique_sample)

(array(['HHW', 'MCA', 'OGG', 'LOS', 'CHM', 'ATL', 'SFR', 'NYC', 'CHI',
        'PHI', 'FTL', 'BOS', 'SAI', 'NAS', 'SEA', 'ORL', 'PSP', 'HOU',
        'NEW', 'BAL', 'SNJ', 'DET', 'AGA', 'LVG', 'MIA', 'SDP', 'VCV',
        'DUB', 'PEM', 'TAM', 'BLA', 'WAS', 'KOA', 'DAL', 'SHA', 'SPM',
        'NIA', 'PHR', 'MIL', 'SLC', 'CLT', 'EPI', 'SNA', 'MON', 'DLR',
        'SFB', 'OPF', 'X96', 'CLM', 'LIH', 'DEN', 'PHO', 'POO', 'NOL',
        'WPB', 'PBB', 'TOR', 'MAA', 'RNO', 'FMY', 'HIG', 'OAK', 'OTM',
        'ONT', 'SRQ', 'LLB', 'NCA', 'SUM', 'STR', 'HAM'], dtype=object), 70)

In [50]:
df_airport.iata_code.isin(i94port_unique_sample).sum(), df_airport.local_code.isin(i94port_unique_sample).sum(), df_airport.ident.isin(i94port_unique_sample).sum(), \
df_airport.gps_code.isin(i94port_unique_sample).sum()

(60, 63, 2, 0)

In [51]:
airport_code_in_i94port_mask = (df_airport.local_code.isin(i94port_unique_sample) | df_airport.iata_code.isin(i94port_unique_sample) | 
                                df_airport.ident.isin(i94port_unique_sample) | df_airport.gps_code.isin(i94port_unique_sample))

In [52]:
df_airport[airport_code_in_i94port_mask][["local_code", "iata_code", "ident", "gps_code"]].nunique()

local_code    49
iata_code     65
ident         93
gps_code      69
dtype: int64

In [53]:
(airport_us_mask & airport_code_in_i94port_mask).sum()

36

In [54]:
df_airport[airport_us_mask & airport_code_in_i94port_mask][["local_code", "iata_code", "ident", "gps_code"]].nunique()

local_code    35
iata_code     33
ident         36
gps_code      35
dtype: int64

In [55]:
df_us_airport.query("iata_code == 'LAX'")

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
27795,KLAX,large_airport,Los Angeles International Airport,125.0,,US,US-CA,Los Angeles,KLAX,LAX,LAX,"-118.4079971, 33.94250107"


In [56]:
df_airport.query("iata_code == 'LOS' or local_code == 'LOS' or gps_code == 'LOS' or ident == 'LOS'")

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
17551,DNMM,large_airport,Murtala Muhammed International Airport,135.0,AF,NG,NG-LA,Lagos,DNMM,LOS,,"3.321160078048706, 6.5773701667785645"
34652,MX-0438,small_airport,Los Charcos Airstrip,8879.0,,MX,MX-DUR,Mezquital,,,LOS,"-104.293973, 23.000115"


In [57]:
df_us_airport.query("iata_code == 'LOS' or local_code == 'LOS' or gps_code == 'LOS' or ident == 'LOS'")

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates


In [58]:
airports_la_municipality = df_us_airport[df_us_airport.municipality.notnull() & df_us_airport.municipality.str.upper().str.contains("LOS ANGELES")]
print(airports_la_municipality.shape)
airports_la_municipality.head(5)

(68, 12)


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
71,01CN,heliport,Los Angeles County Sheriff's Department Heliport,300.0,,US,US-CA,Los Angeles,01CN,,01CN,"-118.15399932861328, 34.03779983520508"
639,0CA0,closed,Drew Medical Center Heliport,180.0,,US,US-CA,Los Angeles,,,,"-118.241997, 33.923302"
641,0CA2,heliport,Va Greater Los Angeles Health Care Center Heli...,294.0,,US,US-CA,West Los Angeles,0CA2,,0CA2,"-118.45600128173828, 34.049198150634766"
666,0CL7,heliport,Good Samaritan Hospital Heliport,473.0,,US,US-CA,Los Angeles,0CL7,,0CL7,"-118.264967, 34.054901"
1459,14L,heliport,Devonshire Area Heliport,1012.0,,US,US-CA,Los Angeles,14L,,14L,"-118.53099822998047, 34.256900787353516"


**US City Demographic**

In [59]:
df_demo = pd.read_csv(DATA_RAW_US_DEMO_CSV, sep=";")
df_demo.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [60]:
df_demo.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


In [61]:
df_demo.describe()

Unnamed: 0,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,Count
count,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891.0
mean,35.494881,97328.43,101769.6,198966.8,9367.832523,40653.6,2.742543,48963.77
std,4.401617,216299.9,231564.6,447555.9,13211.219924,155749.1,0.433291,144385.6
min,22.9,29281.0,27348.0,63215.0,416.0,861.0,2.0,98.0
25%,32.8,39289.0,41227.0,80429.0,3739.0,9224.0,2.43,3435.0
50%,35.3,52341.0,53809.0,106782.0,5397.0,18822.0,2.65,13780.0
75%,38.0,86641.75,89604.0,175232.0,9368.0,33971.75,2.95,54447.0
max,70.5,4081698.0,4468707.0,8550405.0,156961.0,3212500.0,4.98,3835726.0


In [62]:
df_demo.City.nunique()

567

In [63]:
df_demo[df_demo.City.duplicated()].sort_values(by="City").head(10)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
1403,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,Hispanic or Latino,33222
1533,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,White,95487
2880,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,Black or African-American,14449
2727,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,Asian,2929
2325,Akron,Ohio,38.1,96886.0,100667.0,197553,12878.0,10024.0,2.24,OH,Hispanic or Latino,3684
2175,Akron,Ohio,38.1,96886.0,100667.0,197553,12878.0,10024.0,2.24,OH,Asian,9033
1972,Akron,Ohio,38.1,96886.0,100667.0,197553,12878.0,10024.0,2.24,OH,American Indian and Alaska Native,1845
1100,Akron,Ohio,38.1,96886.0,100667.0,197553,12878.0,10024.0,2.24,OH,White,129192
554,Alafaya,Florida,33.5,39504.0,45760.0,85264,4176.0,15842.0,2.94,FL,Asian,10336
1868,Alafaya,Florida,33.5,39504.0,45760.0,85264,4176.0,15842.0,2.94,FL,Black or African-American,6577


In [64]:
{v for v in df_demo["State Code"].unique() if v not in i94addr_unique_sample}

{'AK', 'DE', 'MT', 'ND', 'SD'}

In [65]:
airport_uniq_municipality = df_us_airport[df_us_airport.municipality.notnull() & df_us_airport.municipality.str.upper()].municipality.unique()
len(airport_uniq_municipality)

8738

In [66]:
df_demo_uniq_city = df_demo.City.drop_duplicates()
df_demo_uniq_city.shape, df_demo_uniq_city.isin(airport_uniq_municipality).sum()

((567,), 477)

<a id='cleaning'></a>
### 3.2. Cleaning Data

The following issues for each dataset have been identified and will be cleaned.
   
**I94 Immigration**
1. Process the immigration dataset data dictionary and store for each column a Python dictonary as json file to improve readability and programmatic accessibility
1. Select only relevant columns as defined by the project scope and rename the columns to improve readability and programmatic accessibility (apply snake case) 
1. Set proper data types for each column, e.g. datetime, int
1. Compute the missing US state codes based on the port_id and data dictionary
1. Compute the city name based on the port_id and data dictionary
1. Drop duplicates
1. Store a cleaned version of the dataset partioned by `year`, `month` and `state_code`

**World Temperature**
1. The immigration data provided contains values for the year 2016. For 2016 no temparature data for cities in the United States is available. Hence, the temparature data will not be further processed and considered.

**Airport Codes**
1. Based on `port_id` of immigration data, the mapping to the different Airport codes seems not to match and might lead to false positives. Especially the columns `ident` and `gps_code` not seem to be suitable for mapping to `port_id` of immigration data, so they will be dropped. Since, the immigration data dictionary now has been processed, try to map using the municipality as further assessment. Anyways, `municipality` will be kept, because the option to join with US city demographic data will be required. 
1. US `iso_region` not necessarily seem to map to each `state_code` in immigration and demograpics data, but let's still keep data for those states.
1. Select only relevant columns as defined by the project scope  and rename the columns to improve readability and programmatic accessibility (apply snake case).
1. Filter by Airport in the US and drop the column `iso_country`
1. Remove the prefix `US-` from the `state_code` to simplify joining with immigration and demograpics data 
1. Store the values of `municipality` in upper case to simplify joining with immigration data 
1. Drop duplicates
1. Store a cleaned version of the dataset partioned by `state_code` and `municipality`

**US City Demographic**
1. Since, the immigration data dictionary now has been processed and immigration city names exist, try to map the us demographic `City` as further assessment. Anyways, `City` will be kept, because the option to join with US immigration `city` and airport `municipality` will be required.
1. Select only relevant columns as defined by the project scope and rename the columns to improve readability and programmatic accessibility (apply snake case).
1. The data seems to be grouped by `Race` and the `Count` depends on race. Everything else is duplicated, which can be dropped, because `Race` related data is out of scope.
1. Store the values of `city` in upper case to simplify joining with immigration and airport data  
1. Store a cleaned version of the dataset partioned by `state_code`

<a id='c-immigration'></a>
#### 3.2.1. I94 Immigration

Clean the immigration data dictionary

In [67]:
def parse_label_values(raw_values):
    values = dict()
    for v in raw_values.split("\n"):
        if "=" not in v:
            continue
        key_val = v.replace("\"","").replace("'","").strip().split("=")
        try:
            key = int(key_val[0].strip())
        except Exception as e:
            key = key_val[0].strip()
        values[key] = key_val[1].replace(";","").strip()
    return values 

In [68]:
def get_key_by_value(d, value):
    keys = [k for k, v in d.items() if v == value]
    if len(keys) > 1:
        raise(f"Multiple keys found for value {value}")
    if not keys:
        raise(f"No key found for value {value}")
    return keys[0]

In [69]:
def compute_immigration_data_dictionary():
    labels = dict()
    for line in [l.strip() for l in immigration_labels.split("/*")[1:]]:
        match = re.match(r"\b[A-Z0-9]+\b", line)
        if not match:
            continue
        values = dict()
        split_by_value = [v.strip() for v in line.split("value ")]
        split_by_colon = split_by_value[0].split(":")
        if len(split_by_value) == 2:
            values = parse_label_values(split_by_value[1])
        if len(split_by_colon) == 2:
            values = parse_label_values(split_by_colon[1])
        if len(split_by_value) == 2 and len(split_by_colon) == 2:
            raise Exception("ERROR: It's not expected that the source file contains values and a colon")
        if "&" in split_by_colon[0]:
            label_desc = ["_".join([v.strip() for v in split_by_colon[0].replace("&","").split()][:2])] + [" ".join(split_by_colon[0].split()[4:]).replace("*/","").strip()]
        else:
            label_desc = [split_by_colon[0].split()[0]] + [" ".join(split_by_colon[0].split()[2:]).replace("*/","").strip()]
        print(f"\nLabel: {label_desc[0]}\nDesc: {label_desc[1]}")
        labels[label_desc[0]] = dict()
        labels[label_desc[0]]["desc"] = label_desc[1]
        labels[label_desc[0]]["values"] = values
        if values:
            print(f"Max. 10 random values of {len(values)}: {list(values.items())[:10]}")
            with open(DATA_PROCESSED_IMMIGRATION_DATA_DICT / f"{label_desc[0]}.json", "w") as f:
                json.dump(values, f)
    return labels

In [70]:
labels = compute_immigration_data_dictionary()


Label: I94YR
Desc: 4 digit year

Label: I94MON
Desc: Numeric month

Label: I94CIT_I94RES
Desc: This format shows all the valid and invalid codes for processing
Max. 10 random values of 289: [(582, 'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'), (236, 'AFGHANISTAN'), (101, 'ALBANIA'), (316, 'ALGERIA'), (102, 'ANDORRA'), (324, 'ANGOLA'), (529, 'ANGUILLA'), (518, 'ANTIGUA-BARBUDA'), (687, 'ARGENTINA'), (151, 'ARMENIA')]

Label: I94PORT
Desc: This format shows all the valid and invalid codes for processing
Max. 10 random values of 660: [('ALC', 'ALCAN, AK'), ('ANC', 'ANCHORAGE, AK'), ('BAR', 'BAKER AAF - BAKER ISLAND, AK'), ('DAC', 'DALTONS CACHE, AK'), ('PIZ', 'DEW STATION PT LAY DEW, AK'), ('DTH', 'DUTCH HARBOR, AK'), ('EGL', 'EAGLE, AK'), ('FRB', 'FAIRBANKS, AK'), ('HOM', 'HOMER, AK'), ('HYD', 'HYDER, AK')]

Label: ARRDATE
Desc: the Arrival Date in the USA. It is a SAS date numeric field that a permament format has not been applied. Please apply whichever date format work

Inspect the data dict results

In [71]:
labels.keys()

dict_keys(['I94YR', 'I94MON', 'I94CIT_I94RES', 'I94PORT', 'ARRDATE', 'I94MODE', 'I94ADDR', 'DEPDATE', 'I94BIR', 'I94VISA', 'COUNT', 'DTADFILE', 'VISAPOST', 'OCCUP', 'ENTDEPA', 'ENTDEPD', 'ENTDEPU', 'MATFLAG', 'BIRYEAR', 'DTADDTO', 'GENDER', 'INSNUM', 'AIRLINE', 'ADMNUM', 'FLTNO', 'VISATYPE'])

In [72]:
i94cit_94res = labels.get("I94CIT_I94RES").get("values")
i94cit_94res

{582: 'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)',
 236: 'AFGHANISTAN',
 101: 'ALBANIA',
 316: 'ALGERIA',
 102: 'ANDORRA',
 324: 'ANGOLA',
 529: 'ANGUILLA',
 518: 'ANTIGUA-BARBUDA',
 687: 'ARGENTINA',
 151: 'ARMENIA',
 532: 'ARUBA',
 438: 'AUSTRALIA',
 103: 'AUSTRIA',
 152: 'AZERBAIJAN',
 512: 'BAHAMAS',
 298: 'BAHRAIN',
 274: 'BANGLADESH',
 513: 'BARBADOS',
 104: 'BELGIUM',
 581: 'BELIZE',
 386: 'BENIN',
 509: 'BERMUDA',
 153: 'BELARUS',
 242: 'BHUTAN',
 688: 'BOLIVIA',
 717: 'BONAIRE, ST EUSTATIUS, SABA',
 164: 'BOSNIA-HERZEGOVINA',
 336: 'BOTSWANA',
 689: 'BRAZIL',
 525: 'BRITISH VIRGIN ISLANDS',
 217: 'BRUNEI',
 105: 'BULGARIA',
 393: 'BURKINA FASO',
 243: 'BURMA',
 375: 'BURUNDI',
 310: 'CAMEROON',
 326: 'CAPE VERDE',
 526: 'CAYMAN ISLANDS',
 383: 'CENTRAL AFRICAN REPUBLIC',
 384: 'CHAD',
 690: 'CHILE',
 245: 'CHINA, PRC',
 721: 'CURACAO',
 270: 'CHRISTMAS ISLAND',
 271: 'COCOS ISLANDS',
 691: 'COLOMBIA',
 317: 'COMOROS',
 385: 'CONGO',
 467: 'COOK ISLANDS',
 575: 

In [73]:
i94addr = labels.get("I94ADDR").get("values")
i94addr

{'AL': 'ALABAMA',
 'AK': 'ALASKA',
 'AZ': 'ARIZONA',
 'AR': 'ARKANSAS',
 'CA': 'CALIFORNIA',
 'CO': 'COLORADO',
 'CT': 'CONNECTICUT',
 'DE': 'DELAWARE',
 'DC': 'DIST. OF COLUMBIA',
 'FL': 'FLORIDA',
 'GA': 'GEORGIA',
 'GU': 'GUAM',
 'HI': 'HAWAII',
 'ID': 'IDAHO',
 'IL': 'ILLINOIS',
 'IN': 'INDIANA',
 'IA': 'IOWA',
 'KS': 'KANSAS',
 'KY': 'KENTUCKY',
 'LA': 'LOUISIANA',
 'ME': 'MAINE',
 'MD': 'MARYLAND',
 'MA': 'MASSACHUSETTS',
 'MI': 'MICHIGAN',
 'MN': 'MINNESOTA',
 'MS': 'MISSISSIPPI',
 'MO': 'MISSOURI',
 'MT': 'MONTANA',
 'NC': 'N. CAROLINA',
 'ND': 'N. DAKOTA',
 'NE': 'NEBRASKA',
 'NV': 'NEVADA',
 'NH': 'NEW HAMPSHIRE',
 'NJ': 'NEW JERSEY',
 'NM': 'NEW MEXICO',
 'NY': 'NEW YORK',
 'OH': 'OHIO',
 'OK': 'OKLAHOMA',
 'OR': 'OREGON',
 'PA': 'PENNSYLVANIA',
 'PR': 'PUERTO RICO',
 'RI': 'RHODE ISLAND',
 'SC': 'S. CAROLINA',
 'SD': 'S. DAKOTA',
 'TN': 'TENNESSEE',
 'TX': 'TEXAS',
 'UT': 'UTAH',
 'VT': 'VERMONT',
 'VI': 'VIRGIN ISLANDS',
 'VA': 'VIRGINIA',
 'WV': 'W. VIRGINIA',
 'WA': 'WAS

In [74]:
i94port = labels.get("I94PORT").get("values")
i94port

{'ALC': 'ALCAN, AK',
 'ANC': 'ANCHORAGE, AK',
 'BAR': 'BAKER AAF - BAKER ISLAND, AK',
 'DAC': 'DALTONS CACHE, AK',
 'PIZ': 'DEW STATION PT LAY DEW, AK',
 'DTH': 'DUTCH HARBOR, AK',
 'EGL': 'EAGLE, AK',
 'FRB': 'FAIRBANKS, AK',
 'HOM': 'HOMER, AK',
 'HYD': 'HYDER, AK',
 'JUN': 'JUNEAU, AK',
 '5KE': 'KETCHIKAN, AK',
 'KET': 'KETCHIKAN, AK',
 'MOS': 'MOSES POINT INTERMEDIATE, AK',
 'NIK': 'NIKISKI, AK',
 'NOM': 'NOM, AK',
 'PKC': 'POKER CREEK, AK',
 'ORI': 'PORT LIONS SPB, AK',
 'SKA': 'SKAGWAY, AK',
 'SNP': 'ST. PAUL ISLAND, AK',
 'TKI': 'TOKEEN, AK',
 'WRA': 'WRANGELL, AK',
 'HSV': 'MADISON COUNTY - HUNTSVILLE, AL',
 'MOB': 'MOBILE, AL',
 'LIA': 'LITTLE ROCK, AR (BPS)',
 'ROG': 'ROGERS ARPT, AR',
 'DOU': 'DOUGLAS, AZ',
 'LUK': 'LUKEVILLE, AZ',
 'MAP': 'MARIPOSA AZ',
 'NAC': 'NACO, AZ',
 'NOG': 'NOGALES, AZ',
 'PHO': 'PHOENIX, AZ',
 'POR': 'PORTAL, AZ',
 'SLU': 'SAN LUIS, AZ',
 'SAS': 'SASABE, AZ',
 'TUC': 'TUCSON, AZ',
 'YUI': 'YUMA, AZ',
 'AND': 'ANDRADE, CA',
 'BUR': 'BURBANK, CA',
 '

In [75]:
i94visa = labels.get("I94VISA").get("values")
i94visa

{1: 'Business', 2: 'Pleasure', 3: 'Student'}

In [76]:
i94modes = labels.get("I94MODE").get("values")
i94modes

{1: 'Air', 2: 'Sea', 3: 'Land', 9: 'Not reported'}

In [77]:
mode_air_key = get_key_by_value(i94modes, "Air")
mode_air_key

1

Clean the immigration dataset

In [78]:
@F.udf(T.DateType())
def udf_sas_numeric_to_datetime(x):
    try:
        return datetime.datetime(1960, 1, 1) + datetime.timedelta(days=int(float(x)))
    except:
        return None

In [79]:
def extract_state_code_from_port(x):
    return x.split(",")[-1].replace("WASHINGTON DC","DC").replace("WASHINGTON #INTL", "DC").split("(")[0].split("#")[0].strip()

In [80]:
def extract_city_from_port(x):
    comma_split = x.split(",")
    last_seq_comma_split_removed = ",".join(comma_split[:-1]) if len(comma_split) > 1 else x
    return last_seq_comma_split_removed.strip()

In [81]:
def clean_city(city):
    blacklist = ["HWY. STATION", "#", "INTL", "Collapsed", "AIRP", "No PORT Code", "UNIDENTIFED", "UNKNOWN"]
    for sub_str in blacklist:
        if sub_str.lower() in city.lower():
            return None
    return city.replace("MUNICIPAL", "")

In [82]:
@F.udf(T.StringType())
def udf_get_missing_us_state_codes(x):
    try:
        port = x[0]
        state_code = x[1]
        extracted_state_code = extract_state_code_from_port(i94port.get(port)) if port else None
        return extracted_state_code if extracted_state_code in i94addr.keys() else state_code
    except:
        return state_code

In [83]:
@F.udf(T.StringType())
def udf_get_city_based_on_port(x):
    try:
        port = i94port.get(x)
        city = extract_city_from_port(port)
        return clean_city(city)
    except:
        return None

In [84]:
def clean_df_immigration(df):
    # Define relevant columns and new name
    relevant_old_to_new_cols = collections.OrderedDict({
        "i94yr": "year",
        "i94mon": "month",
        "i94addr": "state_code",
        "cicid": "cicid", 
        "i94cit": "citizenship_country_id", 
                            "i94res": "residency_country_id", 
                            "i94port": "port_id", 
                            "arrdate": "arrival_date", 
                            "i94mode": "mode",  
                            "depdate": "departure_date", 
                            "i94bir": "age", 
                            "biryear": "birth_year", 
                            "i94visa": "visa_id", 
                            "gender": "gender"})
    # Select old columns
    df_cleaned = df.select(*relevant_old_to_new_cols.keys())
    
    # Rename columns
    df_cleaned = df_cleaned.toDF(*relevant_old_to_new_cols.values())
    
    # Cast the data types
    df_cleaned = df_cleaned \
        .withColumn("year", df_cleaned.year.cast('int')) \
        .withColumn("month", df_cleaned.month.cast('int')) \
        .withColumn("cicid", df_cleaned.cicid.cast('int')) \
        .withColumn("citizenship_country_id", df_cleaned.citizenship_country_id.cast('int')) \
        .withColumn("residency_country_id", df_cleaned.residency_country_id.cast('int')) \
        .withColumn("mode", df_cleaned.mode.cast('int')) \
        .withColumn("visa_id", df_cleaned.visa_id.cast('int')) \
        .withColumn("age", df_cleaned.age.cast('int')) \
        .withColumn("birth_year", df_cleaned.birth_year.cast('int')) \
        .withColumn("arrival_date", udf_sas_numeric_to_datetime("arrival_date")) \
        .withColumn("departure_date", udf_sas_numeric_to_datetime("departure_date")) \
        .filter( F.col("mode") == mode_air_key) \
        .dropDuplicates()
    
    # Compute missing us state codes
    df_cleaned = df_cleaned \
        .withColumn("state_code", udf_get_missing_us_state_codes(F.struct(["port_id", "state_code"])))
    
    # Compute city based on port
    df_cleaned = df_cleaned \
        .withColumn("city", udf_get_city_based_on_port("port_id"))
    
    return df_cleaned

In [85]:
df_immigration_cleaned = clean_df_immigration(df_immigration)

Inspect the dataset after cleaning

In [86]:
df_immigration_cleaned.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- state_code: string (nullable = true)
 |-- cicid: integer (nullable = true)
 |-- citizenship_country_id: integer (nullable = true)
 |-- residency_country_id: integer (nullable = true)
 |-- port_id: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- mode: integer (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- visa_id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- city: string (nullable = true)



In [87]:
df_immigration_cleaned.head(10)

[Row(year=2016, month=4, state_code='CA', cicid=861557, citizenship_country_id=209, residency_country_id=209, port_id='SDP', arrival_date=datetime.date(2016, 4, 5), mode=1, departure_date=datetime.date(2016, 4, 8), age=46, birth_year=1970, visa_id=2, gender='M', city='SAN DIEGO'),
 Row(year=2016, month=4, state_code='NY', cicid=2161755, citizenship_country_id=123, residency_country_id=123, port_id='NYC', arrival_date=datetime.date(2016, 4, 12), mode=1, departure_date=datetime.date(2016, 4, 14), age=60, birth_year=1956, visa_id=1, gender=None, city='NEW YORK'),
 Row(year=2016, month=4, state_code='DC', cicid=2903779, citizenship_country_id=135, residency_country_id=135, port_id='WAS', arrival_date=datetime.date(2016, 4, 16), mode=1, departure_date=datetime.date(2016, 6, 8), age=60, birth_year=1956, visa_id=2, gender='F', city='WASHINGTON DC'),
 Row(year=2016, month=4, state_code='CA', cicid=3380572, citizenship_country_id=266, residency_country_id=266, port_id='LOS', arrival_date=dateti

In [88]:
df_immigration_cleaned.count()

962

In [89]:
print_null_nan_values(df_immigration_cleaned)

+----+-----+----------+-----+----------------------+--------------------+-------+----+---+----------+-------+------+----+
|year|month|state_code|cicid|citizenship_country_id|residency_country_id|port_id|mode|age|birth_year|visa_id|gender|city|
+----+-----+----------+-----+----------------------+--------------------+-------+----+---+----------+-------+------+----+
|   0|    0|        10|    0|                     0|                   0|      0|   0|  0|         0|      0|   140|   1|
+----+-----+----------+-----+----------------------+--------------------+-------+----+---+----------+-------+------+----+



In [90]:
print_null_values_date(df_immigration_cleaned)

+------------+--------------+
|arrival_date|departure_date|
+------------+--------------+
|           0|            40|
+------------+--------------+



In [91]:
df_immigration_cleaned.filter( F.col("state_code").isNull()).head(5)

[Row(year=2016, month=4, state_code=None, cicid=6045611, citizenship_country_id=254, residency_country_id=276, port_id='SAI', arrival_date=datetime.date(2016, 4, 23), mode=1, departure_date=datetime.date(2016, 4, 26), age=4, birth_year=2012, visa_id=2, gender='M', city='SAIPAN'),
 Row(year=2016, month=4, state_code=None, cicid=5963522, citizenship_country_id=245, residency_country_id=245, port_id='SAI', arrival_date=datetime.date(2016, 4, 9), mode=1, departure_date=datetime.date(2016, 4, 11), age=28, birth_year=1988, visa_id=2, gender='F', city='SAIPAN'),
 Row(year=2016, month=4, state_code=None, cicid=6046172, citizenship_country_id=254, residency_country_id=276, port_id='SAI', arrival_date=datetime.date(2016, 4, 25), mode=1, departure_date=datetime.date(2016, 4, 28), age=11, birth_year=2005, visa_id=2, gender='X', city='SAIPAN'),
 Row(year=2016, month=4, state_code=None, cicid=5963414, citizenship_country_id=245, residency_country_id=245, port_id='SAI', arrival_date=datetime.date(201

In [92]:
df_immigration_cleaned.filter(F.col("city").isNull()).head(1)

[Row(year=2016, month=4, state_code='NV', cicid=2424686, citizenship_country_id=582, residency_country_id=582, port_id='RNO', arrival_date=datetime.date(2016, 4, 13), mode=1, departure_date=datetime.date(2016, 4, 20), age=16, birth_year=2000, visa_id=2, gender='M', city=None)]

In [93]:
port_id_no_city = df_immigration_cleaned.filter( F.col("city").isNull()).head(1)[0][df_immigration_cleaned.columns.index('port_id')]
p = i94port.get(port_id_no_city)
c = extract_city_from_port(p)
cc = clean_city(c)
print(port_id_no_city, p, c, cc)

RNO CANNON INTL - RENO/TAHOE, NV CANNON INTL - RENO/TAHOE None


In [94]:
df_immigration_cleaned.filter(F.col("gender").isNull()).head(5)

[Row(year=2016, month=4, state_code='NY', cicid=2161755, citizenship_country_id=123, residency_country_id=123, port_id='NYC', arrival_date=datetime.date(2016, 4, 12), mode=1, departure_date=datetime.date(2016, 4, 14), age=60, birth_year=1956, visa_id=1, gender=None, city='NEW YORK'),
 Row(year=2016, month=4, state_code='FL', cicid=3849271, citizenship_country_id=124, residency_country_id=124, port_id='MIA', arrival_date=datetime.date(2016, 4, 21), mode=1, departure_date=datetime.date(2016, 4, 22), age=64, birth_year=1952, visa_id=2, gender=None, city='MIAMI'),
 Row(year=2016, month=4, state_code='GA', cicid=3870400, citizenship_country_id=148, residency_country_id=112, port_id='ATL', arrival_date=datetime.date(2016, 4, 21), mode=1, departure_date=datetime.date(2016, 4, 27), age=35, birth_year=1981, visa_id=2, gender=None, city='ATLANTA'),
 Row(year=2016, month=4, state_code='CA', cicid=3722852, citizenship_country_id=438, residency_country_id=438, port_id='LOS', arrival_date=datetime.d

In [95]:
df_immigration_cleaned.filter(F.col("departure_date").isNull()).head(5)

[Row(year=2016, month=4, state_code='CA', cicid=3380572, citizenship_country_id=266, residency_country_id=266, port_id='LOS', arrival_date=datetime.date(2016, 4, 18), mode=1, departure_date=None, age=23, birth_year=1993, visa_id=3, gender='M', city='LOS ANGELES'),
 Row(year=2016, month=4, state_code='NY', cicid=5020169, citizenship_country_id=113, residency_country_id=113, port_id='NYC', arrival_date=datetime.date(2016, 4, 27), mode=1, departure_date=None, age=33, birth_year=1983, visa_id=2, gender='M', city='NEW YORK'),
 Row(year=2016, month=4, state_code='NY', cicid=1553520, citizenship_country_id=692, residency_country_id=692, port_id='NYC', arrival_date=datetime.date(2016, 4, 8), mode=1, departure_date=None, age=1, birth_year=2015, visa_id=2, gender='F', city='NEW YORK'),
 Row(year=2016, month=4, state_code='GU', cicid=5957654, citizenship_country_id=254, residency_country_id=276, port_id='SAI', arrival_date=datetime.date(2016, 4, 12), mode=1, departure_date=None, age=20, birth_yea

In [96]:
assert df_immigration_cleaned.count() == df_immigration_cleaned.select(F.countDistinct("cicid")).collect()[0][0]

Save the cleaned dataset

In [97]:
df_immigration_cleaned.write.partitionBy("year", "month", "state_code").mode("overwrite").parquet(DATA_CLEANED_IMMIGRATION_PARQUET)

<a id='c-airport'></a>
#### 3.2.2. Airport Codes

Inspect the mapping of immigration and airport df

In [98]:
def get_df_map_immigration_airport(df_us_airport, i94port_unique_sample):
    json_map_immigration_airport = {"i94p": [], "city": [], "cleaned_city": [], "airport_name_codes": [], "airport_index_codes": [], "airport_codes": [],
                                    "airport_index_municipality": [], "airport_municipality": []}
    airport_code_query = "iata_code == '{port}' or local_code == '{port}' or gps_code == '{port}' or ident == '{port}'"
    for i94p in i94port_unique_sample:
        cq = df_us_airport.query(airport_code_query.format(port=i94p))
        ic = cq.index.values
        an = df_us_airport.loc[ic][["name"]].values if len(ic) > 0 else None
        ac = df_us_airport.loc[ic][["local_code", "iata_code", "ident", "gps_code"]].values if len(ic) > 0 else None
        p = i94port.get(i94p)
        c = extract_city_from_port(p)
        cc = clean_city(c)
        mm = df_us_airport.municipality.notnull() & df_us_airport.municipality.str.upper().str.contains(cc) if cc else None
        mq = df_us_airport[mm] if cc else None
        im = mq.index.values if cc else None
        am = mq.municipality.values if cc and len(im) > 0 else None
        json_map_immigration_airport["i94p"].append(i94p)
        json_map_immigration_airport["city"].append(c)
        json_map_immigration_airport["cleaned_city"].append(cc)
        json_map_immigration_airport["airport_index_codes"].append(ic)
        json_map_immigration_airport["airport_name_codes"].append(an)
        json_map_immigration_airport["airport_codes"].append(ac)
        json_map_immigration_airport["airport_index_municipality"].append(im)
        json_map_immigration_airport["airport_municipality"].append(am)
    return pd.DataFrame(json_map_immigration_airport)

In [99]:
df_map_immigration_airport = get_df_map_immigration_airport(df_us_airport, i94port_unique_sample)
print(df_map_immigration_airport.shape)
df_map_immigration_airport.head(10)

(70, 8)


Unnamed: 0,i94p,city,cleaned_city,airport_name_codes,airport_index_codes,airport_codes,airport_index_municipality,airport_municipality
0,HHW,HONOLULU,HONOLULU,[[Stan Stamper Municipal Airport]],[27465],"[[HHW, HUJ, KHHW, KHHW]]","[22948, 22954, 22962, 22963, 22970, 22971, 387...","[Honolulu, Honolulu, Honolulu, Honolulu, Honol..."
1,MCA,MCALLEN,MCALLEN,,[],,[],
2,OGG,KAHULUI - MAUI,KAHULUI - MAUI,[[Kahului Airport]],[38756],"[[OGG, OGG, PHOG, PHOG]]",[],
3,LOS,LOS ANGELES,LOS ANGELES,,[],,"[71, 639, 641, 666, 1459, 2500, 2565, 2751, 29...","[Los Angeles, Los Angeles, West Los Angeles, L..."
4,CHM,CHAMPLAIN,CHAMPLAIN,,[],,[],
5,ATL,ATLANTA,ATLANTA,[[Hartsfield Jackson Atlanta International Air...,[26128],"[[ATL, ATL, KATL, KATL]]","[1914, 3708, 3956, 4313, 5840, 5869, 5978, 632...","[Atlanta, Atlanta, Atlanta, Atlanta, Atlanta, ..."
6,SFR,SAN FRANCISCO,SAN FRANCISCO,[[San Fernando Airport]],[42481],"[[nan, SFR, SFR, nan]]","[1560, 13681, 13682, 13685, 15239, 29940, 4920...","[San Francisco, San Francisco, San Francisco, ..."
7,NYC,NEW YORK,NEW YORK,,[],,"[2746, 5445, 7431, 7432, 7433, 25070, 25072, 2...","[New York Mills, New York Mills, New York, New..."
8,CHI,CHICAGO,CHICAGO,,[],,"[4748, 5326, 5350, 5366, 6937, 7333, 9996, 239...","[Chicago, Chicago/Schaumburg, West Chicago, Ch..."
9,PHI,PHILADELPHIA,PHILADELPHIA,,[],,"[292, 353, 1238, 1359, 2082, 2218, 2219, 2235,...","[Philadelphia, Philadelphia, Philadelphia, Phi..."


Clean the airport dataset

In [100]:
def clean_df_airport(csv_file):
    df_cleaned = spark.read.option("header", True).csv(csv_file)
    airport_columns = ["iso_region", "municipality", "iata_code", "local_code", "name", "type", "iso_country"]
    df_cleaned = df_cleaned \
        .select(*airport_columns) \
        .filter( F.col("iso_country") == "US") \
        .withColumnRenamed("iso_region", "state_Code") \
        .withColumn("municipality", F.upper( F.col("municipality"))) \
        .dropDuplicates()
    airport_columns.remove("iso_region") 
    airport_columns.append("state_code")
    df_cleaned = df_cleaned \
        .select(*airport_columns) \
        .withColumn("state_code", F.regexp_replace("state_code", "US-", "")) 
    airport_spark_obsolete_columns = ["iso_country"]
    return df_cleaned.drop(*airport_spark_obsolete_columns)

In [101]:
df_airport_cleaned = clean_df_airport(str(DATA_RAW_AIRPORT_CSV))

Inspect the dataset after cleaning

In [102]:
df_airport_cleaned.printSchema()

root
 |-- municipality: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)
 |-- state_code: string (nullable = true)



In [103]:
df_airport_cleaned.count()

22754

In [104]:
df_airport_cleaned.head(10)

[Row(municipality='VICTORY BIBLE CAMP', iata_code=None, local_code='0AK6', name='Victory Airport', type='small_airport', state_code='AK'),
 Row(municipality='BROADVIEW', iata_code=None, local_code='0IL1', name='Loyola University Medical Center Heliport', type='heliport', state_code='IL'),
 Row(municipality='SEQUIM', iata_code=None, local_code='0WN0', name="Rucilla's Roost Airport", type='small_airport', state_code='WA'),
 Row(municipality='MONTPELIER', iata_code=None, local_code='16ID', name='Bear Lake Memorial Hospital Helipad', type='heliport', state_code='ID'),
 Row(municipality='LANCASTER', iata_code=None, local_code='1CL1', name='Little Buttes Antique Airfield', type='small_airport', state_code='CA'),
 Row(municipality='MOJAVE', iata_code=None, local_code='1CL2', name='Pontious Airport', type='small_airport', state_code='CA'),
 Row(municipality='CAMBRIDGE', iata_code=None, local_code='1MD1', name='Big Oak Farm Airport', type='small_airport', state_code='MD'),
 Row(municipality='NE

In [105]:
print_null_nan_values(df_airport_cleaned)

+------------+---------+----------+----+----+----------+
|municipality|iata_code|local_code|name|type|state_code|
+------------+---------+----------+----+----+----------+
|         102|    20736|      1520|   0|   0|         0|
+------------+---------+----------+----+----+----------+



Save the cleaned dataset

In [106]:
df_airport_cleaned.write.partitionBy("state_code", "municipality").mode("overwrite").parquet(DATA_CLEANED_AIRPORT_PARQUET)

<a id='c-demographic'></a>
#### 3.2.3. US City Demographic

Inspect the mapping of immigration and demographic dataset

In [107]:
def get_immigration_sample_cleaned_cities(i94port_unique_sample):
    immigration_sample_cleaned_cities = set()
    for i94p in i94port_unique_sample:
        p = i94port.get(i94p)
        c = extract_city_from_port(p)
        cc = clean_city(c)
        if cc:
            immigration_sample_cleaned_cities.add(cc)
    print(f"Number of cleaned cities in immigration sample df: {len(immigration_sample_cleaned_cities)}")
    return immigration_sample_cleaned_cities

In [108]:
def get_demo_uniq_cities_set(df_demo_uniq_city):
    demo_uniq_city_set = set(df_demo_uniq_city.str.upper().values)
    print(f"Number of cities in demo df: {len(demo_uniq_city_set)}")
    return demo_uniq_city_set

In [109]:
i_cities = get_immigration_sample_cleaned_cities(i94port_unique_sample)
d_cities = get_demo_uniq_cities_set(df_demo_uniq_city)
i_cities_in_d = {ic for ic in i_cities for dc in d_cities if ic in dc}
print(f"Number of immigration cities in demographic df: {len(i_cities_in_d)} ({len(i_cities_in_d)/len(i_cities)*100}%)")

Number of cleaned cities in immigration sample df: 68
Number of cities in demo df: 567
Number of immigration cities in demographic df: 36 (52.94117647058824%)


Clean the demographic df

In [110]:
def clean_df_demo(csv_file):
    df_cleaned = spark.read.csv(csv_file, sep=";", header=True)
    demo_org_columns = ["State Code", "City", "Median Age", "Male Population", "Female Population", "Total Population", 
                        "Average Household Size", "Foreign-born", "Number of Veterans"]
    demo_unordered_cols = dict()
    for c in demo_org_columns:
        demo_unordered_cols[c] = c.replace("-","_").replace(" ","_").lower()
    demo_relevant_old_to_new_cols = collections.OrderedDict(demo_unordered_cols)
    df_cleaned = df_cleaned.select(*demo_relevant_old_to_new_cols.keys()).dropDuplicates()
    df_cleaned = df_cleaned.toDF(*demo_relevant_old_to_new_cols.values())
    df_cleaned = df_cleaned \
        .withColumn("city", F.upper( F.col("city"))) \
        .withColumn("median_age", df_cleaned.median_age.cast('int')) \
        .withColumn("male_population", df_cleaned.male_population.cast('int')) \
        .withColumn("female_population", df_cleaned.female_population.cast('int')) \
        .withColumn("total_population", df_cleaned.total_population.cast('int')) \
        .withColumn("foreign_born", df_cleaned.foreign_born.cast('int')) \
        .withColumn("number_of_veterans", df_cleaned.number_of_veterans.cast('int')) \
        .withColumn("average_household_size", df_cleaned.average_household_size.cast('double'))
    return df_cleaned

In [111]:
df_demo_cleaned = clean_df_demo(str(DATA_RAW_US_DEMO_CSV))

Inspect the dataset after cleaning

In [112]:
df_demo_cleaned.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- median_age: integer (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- number_of_veterans: integer (nullable = true)



In [113]:
df_demo_cleaned.count()

596

In [114]:
df_demo_cleaned.select([F.count(F.when(F.isnan(c) |  F.col(c).isNull(), c)).alias(c) for c in df_demo_cleaned.columns]).show()

+----------+----+----------+---------------+-----------------+----------------+----------------------+------------+------------------+
|state_code|city|median_age|male_population|female_population|total_population|average_household_size|foreign_born|number_of_veterans|
+----------+----+----------+---------------+-----------------+----------------+----------------------+------------+------------------+
|         0|   0|         0|              1|                1|               0|                     8|           7|                 7|
+----------+----+----------+---------------+-----------------+----------------+----------------------+------------+------------------+



In [115]:
df_demo_cleaned.head(10)

[Row(state_code='RI', city='PROVIDENCE', median_age=29, male_population=89090, female_population=90114, total_population=179204, average_household_size=2.72, foreign_born=53532, number_of_veterans=4933),
 Row(state_code='TX', city='LEWISVILLE', median_age=31, male_population=52776, female_population=52032, total_population=104808, average_household_size=2.78, foreign_born=24865, number_of_veterans=4211),
 Row(state_code='UT', city='LAYTON', median_age=29, male_population=37748, female_population=36394, total_population=74142, average_household_size=3.24, foreign_born=4268, number_of_veterans=3811),
 Row(state_code='PR', city='SAN JUAN', median_age=41, male_population=155408, female_population=186829, total_population=342237, average_household_size=None, foreign_born=None, number_of_veterans=None),
 Row(state_code='FL', city='WESTON', median_age=38, male_population=32956, female_population=36991, total_population=69947, average_household_size=3.34, foreign_born=30876, number_of_veterans

Save the cleaned dataset

In [116]:
df_demo_cleaned.write.partitionBy("state_code").mode("overwrite").parquet(DATA_CLEANED_DEMO_PARQUET)

<a id='model-definition'></a>
## 4. Data Model Definition
<a id='model-conceptual'></a>
### 4.1. Conceptual Data Model

![](docs/conceptual_erd_star.drawio.png)

The illustration above shows a conceptual ERD of the database. A star schema with fact and dimension tables was defined to allow analytic queries as stated within the project scope. The schema is optimized for OLAP and BI.


<a id='etl-definition'></a>
### 4.2. ETL Definition

The prequisite for the ETL pipeline is that the raw data was cleaned and stored in Apache parquet format as described in step 3.2. within this notebook.

The tree of the data directory is illustrated and described in the [README.md](README.md#data-directory-tree).

The ETL will consists of the following steps to pipeline the cleaned data into the chosen data model:
1. Select the data of the cleaned immigration dataset `df_immigration_cleaned` to create the `d_visitors` dimension table and compute a uniqe id for each record. Partition the data by `year` and `month`. Without the unique id the records would look like that they contains duplicates, since the column `cicid` has been dropped (i.e. 2 persons with the same characteristics visited the US on the same date and port). 
1. Select the data of the cleaned immigration dataset `df_immigration_cleaned` to create the `f_us_immigrations` fact table. Drop duplicate records. Afterwards compute a uniqe id for each record. Partion the data by `year`, `month` and `state_code`. 
1. Select the data of the cleaned airport dataset `df_airport_cleaned` to create the `d_us_airports` dimension table. Drop duplicate records. Afterwards compute a uniqe id for each record. Partion the data by `state_code` and `municipality`.
1. Select the data of the cleaned US city demographic dataset `df_demo_cleaned` to create the `d_us_demo` dimension table. Drop duplicate records. Afterwards compute a uniqe id for each record. Partion the data by `state_code`.
1. Finally, compute checks to assure data quality.

<a id='etl'></a>
## 5. Run ETL Pipeline
<a id='model-creation'></a>
### 5.1. Data Model Creation

In [117]:
# Uncomment the following lines to read the parquet files and skip the previous cleaning steps
# df_immigration_cleaned = spark.read.parquet(DATA_CLEANED_IMMIGRATION_PARQUET)
# df_airport_cleaned = spark.read.parquet(DATA_CLEANED_AIRPORT_PARQUET)
# df_demo_cleaned = spark.read.parquet(DATA_CLEANED_DEMO_PARQUET)

Compute dimension table d_visitor

In [118]:
def generate_d_visitors(df_immigration_cleaned):
    d_visitors_partition_cols = ["year", "month"]
    d_visitors_cols = d_visitors_partition_cols + ["id", "port_id", "arrival_date", "departure_date", "citizenship_country_id",
                                                   "residency_country_id", "age", "birth_year", "gender", "visa_id"]
    d_visitors = df_immigration_cleaned.select(*df_immigration_cleaned.columns) \
        .withColumn("monotonically_increasing_id", F.monotonically_increasing_id())
    window = Window.orderBy( F.col('monotonically_increasing_id'))
    d_visitors = d_visitors.withColumn('id', F.row_number().over(window))
    d_visitors = d_visitors.drop(*[c for c in d_visitors.columns if c not in d_visitors_cols]).select(*d_visitors_cols)
    d_visitors.write.partitionBy(*d_visitors_partition_cols).mode("overwrite").parquet(DATA_PROCESSED_D_VISITORS)
    return d_visitors

In [119]:
d_visitors = generate_d_visitors(df_immigration_cleaned)

Compute fact table f_us_immigrations

In [120]:
def generate_f_us_immigrations(df_immigration_cleaned):
    f_us_immigrations_partition_cols = ["year", "month", "state_code"]
    f_us_immigrations_cols = f_us_immigrations_partition_cols + ["id", "city", "port_id"]
    f_us_immigrations = df_immigration_cleaned.select(*[c for c in f_us_immigrations_cols if c != "id"]) \
        .dropDuplicates() \
        .withColumn("monotonically_increasing_id", F.monotonically_increasing_id())
    window = Window.orderBy( F.col('monotonically_increasing_id'))
    f_us_immigrations = f_us_immigrations.withColumn('id', F.row_number().over(window))
    f_us_immigrations = f_us_immigrations.drop(*[c for c in f_us_immigrations.columns if c not in f_us_immigrations_cols]).select(*f_us_immigrations_cols)
    f_us_immigrations.write.partitionBy(*f_us_immigrations_partition_cols).mode("overwrite").parquet(DATA_PROCESSED_F_IMMIGRATIONS)
    return f_us_immigrations

In [121]:
f_us_immigrations = generate_f_us_immigrations(df_immigration_cleaned)

Compute dimension table d_us_airports

In [122]:
def generate_d_us_airports(df_airport_cleaned):
    d_us_airports_partition_cols = ["state_code", "municipality"]
    d_us_airports_cols = d_us_airports_partition_cols + ["id", "iata_code", "local_code", "name", "type"]
    d_us_airports = df_airport_cleaned.select(*[c for c in d_us_airports_cols if c != "id"]) \
        .dropDuplicates() \
        .withColumn("monotonically_increasing_id", F.monotonically_increasing_id())
    window = Window.orderBy( F.col('monotonically_increasing_id'))
    d_us_airports = d_us_airports.withColumn('id', F.row_number().over(window))
    d_us_airports = d_us_airports.drop(*[c for c in d_us_airports.columns if c not in d_us_airports_cols]).select(*d_us_airports_cols)
    d_us_airports.write.partitionBy(*d_us_airports_partition_cols).mode("overwrite").parquet(DATA_PROCESSED_D_AIRPORTS)
    return d_us_airports

In [123]:
d_us_airports = generate_d_us_airports(df_airport_cleaned)

Compute dimension table d_us_demo

In [124]:
def generate_d_us_demo(df_demo_cleaned):
    d_us_demo_partition_cols = ["state_code"]
    d_us_demo_cols = d_us_demo_partition_cols + ["id", "city", "median_age", "male_population", "female_population", 
                                                 "total_population", "average_household_size", 
                                                 "foreign_born", "number_of_veterans"]
    d_us_demo = df_demo_cleaned.select(*[c for c in d_us_demo_cols if c != "id"]) \
        .dropDuplicates() \
        .withColumn("monotonically_increasing_id", F.monotonically_increasing_id())
    window = Window.orderBy( F.col('monotonically_increasing_id'))
    d_us_demo = d_us_demo.withColumn('id', F.row_number().over(window))
    d_us_demo = d_us_demo.drop(*[c for c in d_us_demo.columns if c not in d_us_demo_cols]).select(*d_us_demo_cols)
    d_us_demo.write.partitionBy(*d_us_demo_partition_cols).mode("overwrite").parquet(DATA_PROCESSED_D_DEMOGRAPHIC)
    return d_us_demo

In [125]:
d_us_demo = generate_d_us_demo(df_demo_cleaned)

<a id='dict'></a>
### 5.2. Data Dictionary 

Below is for each analytic table of the data model the column name, data type and a short description listed. Some columns only represent an id for which the value can be retrieved using the json dictionaries stored in `data/processed/immigration_data_dict`.

Since Pyspark was used, the listed data types for each table below refer to [Spark data types](https://spark.apache.org/docs/latest/sql-ref-datatypes.html). 

### US Immigration Fact Table: `f_us_immigrations`

| Table Column | Data Type    | Description  |
| ------------ | ------------ | ------------ |
| year | `IntegerType`  | 4-digit year, primary partition key |
| month | `IntegerType` | Numeric month, secondary partition key |
| state_code | `StringType` | US state code, tertiary partition key, use the json dictionary `data/processed/immigration_data_dict/I94ADDR.json` to retrieve the US state name based on the code |
| id | `IntegerType` | Auto generated unique id  |
| city | `StringType` | Name of city extracted from json dictionary `data/processed/immigration_data_dict/I94PORT.json` based on the `port_id` |
| port_id | `StringType` |  Port id for which the name can be retrieved using the json dictionary `data/processed/immigration_data_dict/I94PORT.json` |


### Visitor Dimension Table: `d_visitors`

| Table Column | Data Type    | Description  |
| ------------ | ------------ | ------------ |
| year | `IntegerType`  | 4-digit year, primary partition key |
| month | `IntegerType` | Numeric month, secondary partition key |
| id | `IntegerType` | Auto generated unique id  |
| port_id | `StringType` |  Port id for which the name can be retrieved using the json dictionary `data/processed/immigration_data_dict/I94PORT.json` |
| arrival_date | `DateType` |  Arrival date 
| departure_date |  `DateType` |  Departure date |
| citizenship_country_id | `IntegerType` | Citizenship country id for which the name can be retrieved using the json dictionary `data/processed/immigration_data_dict/I94CIT_I94RES.json` |
| residency_country_id | `IntegerType` |  Residency country id for which the name can be retrieved using the json dictionary `data/processed/immigration_data_dict/I94CIT_I94RES.json` |
| age | `IntegerType`|  Age on arrival date |
| birth_year | `IntegerType`|  Birth year |
| gender | `StringType` |  Sex |
| visa_id | `IntegerType` |  Visa id for which the name can be retrieved using the json dictionary `data/processed/immigration_data_dict/I94VISA.json` |


### US Airport Dimension Table: `d_us_airports`

| Table Column | Data Type    | Description  |
| ------------ | ------------ | ------------ |
| state_code | `StringType` | US state code, primary partition key, use the json dictionary `data/processed/immigration_data_dict/I94ADDR.json` to retrieve the US state name based on the code |
| municipality | `StringType` | Municipality in which the Airport is located, secondary partition key |
| id | `IntegerType` | Auto generated unique id  |
| iata_code | `StringType` | "International Air Transport Association" code |
| local_code | `StringType` | Local code used in the US |
| name | `StringType` |  Name |
| type | `StringType` |  Type, e.g. large, small, closed Airport |

### US City Demographic Dimension Table: `d_us_demo`

| Table Column | Data Type    | Description  |
| ------------ | ------------ | ------------ |
| state_code | `StringType` | US state code, primary partition key, use the json dictionary `data/processed/immigration_data_dict/I94ADDR.json` to retrieve the US state name based on the code |
| id | `IntegerType` | Auto generated unique id  |
| city | `StringType` | Name of city |
| median_age | `IntegerType` | Media age of the city population |
| male_population | `IntegerType` | Male population of the city |
| female_population | `IntegerType` | Female population of the city |
| total_population | `IntegerType` | Total population of the city |
| average_household_size | `DoubleType` | Average household size of the city |
| foreign_born | `IntegerType` | Persons not born in the US with residency in the city|
| number_of_veterans | `IntegerType` | Number of veterans with residency in the city|


<a id='quality'></a>
### 5.3. Data Quality Checks

To assure data quality, the following check will be performed:
1. Verify that each table is not empty
1. Verify that for each table a unique id per row has been computed
1. Manually verify that the tables match the defined data model and data types
1. Run an analytics query joining multiple table to provide evidence: Are immigrants travel to cities with generally more immigrants and more Airports?

**5.3.1. Verify that each table is not empty**

In [126]:
d_visitors_count = d_visitors.count()
d_visitors_count

962

In [127]:
assert d_visitors_count > 0

In [128]:
f_us_immigrations_count = f_us_immigrations.count()
f_us_immigrations_count

68

In [129]:
assert f_us_immigrations_count > 0

In [130]:
d_us_airports_count = d_us_airports.count()
d_us_airports_count

22754

In [131]:
assert d_us_airports_count > 0

In [132]:
d_us_demo_count = d_us_demo.count()
d_us_demo_count

596

In [133]:
assert d_us_demo_count > 0

**5.3.2. Verify that for each table a unique id per row has been computed**

In [134]:
d_visitors_count_id = d_visitors.select(F.countDistinct("id")).collect()[0][0]
d_visitors_count_id

962

In [135]:
assert d_visitors_count == d_visitors_count_id

In [136]:
f_us_immigrations_count_id = f_us_immigrations.select(F.countDistinct("id")).collect()[0][0]
f_us_immigrations_count_id

68

In [137]:
assert f_us_immigrations_count == f_us_immigrations_count_id

In [138]:
d_us_airports_count_id = d_us_airports.select(F.countDistinct("id")).collect()[0][0]
d_us_airports_count_id

22754

In [139]:
assert d_us_airports_count == d_us_airports_count_id

In [140]:
d_us_demo_count_id = d_us_demo.select(F.countDistinct("id")).collect()[0][0]
d_us_demo_count_id

596

In [141]:
assert d_us_demo_count == d_us_demo_count_id

**5.3.3. Manually verify that the tables match the defined data model and data types**

In [142]:
d_visitors.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- port_id: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- citizenship_country_id: integer (nullable = true)
 |-- residency_country_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- visa_id: integer (nullable = true)



In [143]:
f_us_immigrations.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- state_code: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- port_id: string (nullable = true)



In [144]:
d_us_airports.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)



In [145]:
d_us_demo.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- median_age: integer (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- number_of_veterans: integer (nullable = true)



**5.3.4. Run an analytics query joining multiple table to provide evidence**

Are immigrants travel to cities with generally more immigrants or US states with most Airports?

In [146]:
count_visitors_per_city = f_us_immigrations.alias("f") \
    .join(d_visitors.alias("dv"), f_us_immigrations.port_id == d_visitors.port_id) \
    .select("f.year", "f.month", F.col("f.city").alias("City")) \
    .filter((f_us_immigrations.year == 2016) & (f_us_immigrations.month == 4)) \
    .groupBy("City").count() \
    .distinct() \
    .orderBy(F.desc("count"))

count_visitor_city_foreign_born = count_visitors_per_city.alias("cvpc") \
    .join(d_us_demo.alias("dd"), count_visitors_per_city.City == d_us_demo.city) \
    .select("cvpc.City", F.col("dd.foreign_born").alias("Foreign Born Count per Year"), F.col("count").alias("Visitor Count for April 2016")) \
    .distinct() \
    .orderBy(F.desc("Foreign Born Count per Year"))

count_visitor_city_foreign_born.show()

+-------------+---------------------------+----------------------------+
|         City|Foreign Born Count per Year|Visitor Count for April 2016|
+-------------+---------------------------+----------------------------+
|     NEW YORK|                    3212500|                         155|
|  LOS ANGELES|                    1485425|                         104|
|      HOUSTON|                     696210|                          30|
|      CHICAGO|                     573463|                          45|
|     SAN JOSE|                     401493|                           2|
|    SAN DIEGO|                     373842|                           2|
|       DALLAS|                     326825|                          18|
|      PHOENIX|                     300702|                          15|
|SAN FRANCISCO|                     297199|                          54|
|        MIAMI|                     260789|                         110|
|  SAN ANTONIO|                     208046|        

In [147]:
count_visitor_city_foreign_born.orderBy("Foreign Born Count per Year").show()

+---------------+---------------------------+----------------------------+
|           City|Foreign Born Count per Year|Visitor Count for April 2016|
+---------------+---------------------------+----------------------------+
|       PORTLAND|                       9229|                           4|
|       COLUMBUS|                      10376|                           1|
|     FORT MYERS|                      15365|                           6|
|    NEW ORLEANS|                      21679|                           1|
|      VANCOUVER|                      21748|                           6|
|WEST PALM BEACH|                      30675|                           3|
|        ATLANTA|                      32016|                          37|
| SALT LAKE CITY|                      32166|                           1|
|        MCALLEN|                      37691|                           1|
|        DETROIT|                      39861|                          12|
|FORT LAUDERDALE|        

In [148]:
count_visitors_per_state = f_us_immigrations.alias("f") \
    .join(d_visitors.alias("dv"), f_us_immigrations.port_id == d_visitors.port_id) \
    .select("f.year", "f.month", F.col("f.state_code").alias("state_code")) \
    .filter((f_us_immigrations.year == 2016) & (f_us_immigrations.month == 4)) \
    .groupBy("state_code").count() \
    .distinct() \
    .orderBy(F.desc("count"))

count_airports_per_state = d_us_airports.alias("da") \
    .select("state_code") \
    .groupBy("state_code").count() \
    .distinct() \
    .orderBy(F.desc("count"))

count_visitor_state_airports = count_visitors_per_state.alias("cvps") \
    .join(count_airports_per_state.alias("caps"), count_visitors_per_state.state_code == count_airports_per_state.state_code) \
    .select(F.col("cvps.state_code").alias("State Code"), F.col("caps.count").alias("Airport Count"), F.col("cvps.count").alias("Visitor Count for April 2016")) \
    .distinct() \
    .orderBy(F.desc("Airport Count"))

count_visitor_state_airports.show()

+----------+-------------+----------------------------+
|State Code|Airport Count|Visitor Count for April 2016|
+----------+-------------+----------------------------+
|        TX|         2276|                          51|
|        CA|         1088|                         183|
|        FL|          967|                         206|
|        PA|          918|                          11|
|        IL|          902|                          60|
|        OH|          799|                           5|
|        NY|          668|                         169|
|        WI|          624|                           1|
|        LA|          592|                           1|
|        WA|          578|                          16|
|        MN|          569|                           4|
|        MI|          549|                          12|
|        GA|          522|                          41|
|        CO|          505|                           2|
|        OR|          492|                      

In [149]:
count_visitor_state_airports.orderBy("Airport Count").show()

+----------+-------------+----------------------------+
|State Code|Airport Count|Visitor Count for April 2016|
+----------+-------------+----------------------------+
|        DC|           21|                          24|
|        HI|           64|                          52|
|        VT|          102|                           9|
|        NV|          156|                          23|
|        UT|          170|                           1|
|        MD|          257|                           2|
|        MA|          257|                          29|
|        AZ|          359|                          15|
|        NJ|          442|                          45|
|        NC|          473|                           6|
|        OR|          492|                           4|
|        CO|          505|                           2|
|        GA|          522|                          41|
|        MI|          549|                          12|
|        MN|          569|                      

<a id='scenarios'></a>
## 6. Further Scenarios and Approaches

### Data Update Frequency
1. The immigration data is grouped by year and month so that it would make sense to append new data on a monthly basis. It would be important to ensure that the id columns for the visitors dimension and us immigrations fact table stay unique.
1. The airport data can be independently updated of the other datasets, if new information is available existing records should either be updated (i.e. an Airport closes) or new records added (a new Airport opened). Anyways, it would make sense to align the update frequency with the immigration data as long as it is part of the same data pipeline.
1. The processed US demographic data covers the year 2015. The table should be updated based on the frequency of the US census surveys. An additional column to group and partition the data by year or month would be required. This would allow to append new survey data. The updates can be done independently of the other raw dataset updates, which would require an additional pipeline.

Independently it would require to change the cleaning and ETL implementation so that the existing database will be updated. Tools like Airflow could be used to schedule those updates and to allow multiple pipelines.
 
### The data was increased by 100x.
For a smooth data assessment Pyspark instead of Pandas would be required. Pandas could still be used on sample data. Currently, a Spark single-node (standalone installation) can be used to process the data in an appropriate time. This won't be possible anymore when the data increased by 100x. A Spark cluster to compute the processing using multiple nodes would be required, e.g. AWS EMR could be leveraged. The use of AWS S3 would be mandatory as data storage layer for the Spark cluster. The usage of a local disk would not be recommended, because of network latency, disk io, S3 options like backup, lifecycle rules and data tiering.

### The pipelines would be run on a daily basis by 7 am every day.
A workflow engine, e.g. Airflow or Luigi, should be used to create a DAG (direct acyclic graph) defining the data pipeline steps. Such tools are made for scheduling tasks or workflows described as DAG. Additionally, they provide monitoring of data pipelines and a Web GUI, which increases maintainability and accessibility.

### The database needed to be accessed by 100+ people.
A large analytics teams should use a Spark cluster instead of a Spark single-node (standalone installation) so that the processing can be distributed within the cluster, e.g. AWS EMR. Additionally, AWS S3 data lake will be mandatory. Furthermore, the ETL could be adapted to load the data into a traditional Cloud Data Warehouse (i.e. AWS Redshift), which is made for massive parallel processing. The costs and organization data engineering knowledge as well as the data consumers (BI apps, e.g. Metabase, Superset) are factors to consider for such a decision.

<a id='issues'></a>
## 7. Known Issues
1. No global temparature data for US cities for the year 2016 available, so that no average temperature data was considered in the target data model.
1. The `port_id` in `f_us_immigrations` not necessarily matches `iata_code` or `local_code` in `d_us_airports`, because of that partially another encoding was used in immigration dataset.
1. The `city` in `f_us_immigrations` not necessarily matches the `municipality` in `d_us_airports`. Further cleaning would be required.
1. The `city` and `port_id` in `f_us_immigrations` contain cities and ports not in the US. Further cleaning would be required.
1. Some `state_code` not exists in all datasets, e.g. `U-A` only exists in `d_us_airports`, but not in `f_us_immigrations` and `d_us_demo`.