# DWH augmentation for US immigration data


## Project Summary
In this project, we transform US immigration data for usage in a data warehouse. We also enrich this data with auxilliary information about airports and weather data. 

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

What is _not_ covered here is the actual upload of the data into a suitable database and the execution of test queries.

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col
import pyspark.sql.functions as f
import pyspark.sql.types as t
from datetime import datetime
from datetime import timedelta

In [None]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

## Step 1: Scope of the project

The following data sets were provided by Udacity:
* I94 Immigration Data: Immigration data from the US National Tourism and Trade Office in parquet file format. A data dictionary is included.
* World Temperature Data: A temperature dataset from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).
* U.S. City Demographic Data: A US demographic dataset from [OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
* Airport Code Table: A table of airport codes and corresponding cities from [datahub](https://datahub.io/core/airport-codes#data).

The goal is to model this data and provide connections between the data fields that can be used by a data scientist who works with a DWH. This means that the relevant data from the temperature, the demographic and the airport data sets should be linked to the immigration data, which acts as the main fact table. 

We will use PySpark for the main fact table (immigration data) and Pandas for the dimension tables. 

### Loading provided data

In [None]:
df_sas = spark.read.parquet("sas_data")
df_airport = pd.read_csv("data/airport-codes_csv.csv")
df_demo = pd.read_csv("data/us-cities-demographics.csv", sep=';')
df_weather = pd.read_csv("temperature_data/GlobalLandTemperaturesByCountry.csv")

### Generating additional data from the SAS dictionary

Here, we extract tables from the SAS dictionary using a custom function. we will also perform some basic cleaning steps on this data.

In [None]:
with open('data/I94_SAS_Labels_Descriptions.SAS') as file:
    
    def clean_field(df, col, regex):
        '''Extracts regex from dataframe column, removes whitespaces and converts to upper case.'''
        df[col] = df[col].str.extract(regex)
        df[col] = df[col].str.strip()
        df[col] = df[col].str.upper()
        return df[col]
    
    lines=file.readlines()
    
    df_cntyl = pd.DataFrame(lines[9:297])
    df_cntyl = df_cntyl[0].str.split("=", n=1, expand= True)
    df_cntyl.columns = ['i94cntyl','country']
    df_cntyl['country'] = clean_field(df_cntyl, 'country', r'\'([^\']+)\'')
    df_cntyl['i94cntyl'] = df_cntyl['i94cntyl'].astype(int)
    
    df_port = pd.DataFrame(lines[302:962])
    df_port = df_port[0].str.split("=", n=1, expand= True)
    df_port_comma_split = df_port[1].str.split(",", n=1, expand= True)
    df_port[1] = df_port_comma_split[0]
    df_port[2] = df_port_comma_split[1]
    df_port.columns = ['i94port','port','addr']
    df_port['i94port'] = clean_field(df_port, 'i94port', r'\'([^\']+)\'')
    df_port['port'] = clean_field(df_port, 'port', r'\'([^\']+)')
    df_port['addr'] = clean_field(df_port, 'addr', r'([^\']+)\'')
  
    df_mode = pd.DataFrame(lines[972:976])
    df_mode = df_mode[0].str.split("=", n=1, expand= True)
    df_mode.columns = ['i94mode','mode']
    df_mode['mode'] = clean_field(df_mode, 'mode', r'\'([^\']+)\'')
    df_mode['i94mode'] = clean_field(df_mode, 'i94mode', r'\s+([^\']+)')
    
    df_addr = pd.DataFrame(lines[981:1036])
    df_addr = df_addr[0].str.split("=", n=1, expand= True)
    df_addr.columns = ['i94addr','state']
    df_addr['i94addr'] = clean_field(df_addr, 'i94addr', r'\'([^\']+)\'')
    df_addr['state'] = clean_field(df_addr, 'state', r'\'([^\']+)\'')
    
    df_visa = pd.DataFrame(lines[1046:1049])
    df_visa = df_visa[0].str.split("=", n=1, expand= True)
    df_visa.columns = ['i94visa','visa']
    df_visa['visa'] = clean_field(df_visa, 'visa', r'([^\']+)\n')
    

## Step 2: Data exploration and cleaning

### Immigration data

First, we show the schema and some rows of the immigration data to get a feel for it.

In [None]:
# rows of data
df_sas.count()

In [None]:
# schema
df_sas.printSchema()

In [None]:
# first 10 rows of data
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
    display(df_sas.limit(10).toPandas())

We may also decide to drop fields that have a significant fraction of null values and perform some additional cleanup steps.

In [None]:
# Count the relative number of null values 
df_sas_total_rows = df_sas.count()
df_sas_nulls = df_sas.select([(count(when(isnan(c) | col(c).isNull(), c))/df_sas_total_rows).alias(c) for c in df_sas.columns]).toPandas()

# Drop columns with over 90% null values. 
# Note: This step is for demonstration purposes; in a real project I would leave
# this decision to a data scientist.
empty_cols = []
for c in df_sas_nulls.columns:
    if df_sas_nulls[c][0] > 0.9:
        empty_cols.append(c)
print(empty_cols)
df_sas_clean_a = df_sas.drop(*empty_cols)
       

In [None]:
# Drop columns with duplicate ids
df_sas_clean_b = df_sas_clean_a.dropna(how='all', subset=['cicid'])

In [None]:
# Convert double columns to the original format (integer)
df_sas_clean_c = df_sas_clean_b.\
withColumn("cicid", df_sas_clean_b["cicid"].cast('integer')).\
withColumn("i94yr", df_sas_clean_b["i94yr"].cast('integer')).\
withColumn("i94mon", df_sas_clean_b["i94mon"].cast('integer')).\
withColumn("i94cit", df_sas_clean_b["i94cit"].cast('integer')).\
withColumn("i94res", df_sas_clean_b["i94res"].cast('integer')).\
withColumn("arrdate", df_sas_clean_b["arrdate"].cast('integer')).\
withColumn("i94mode", df_sas_clean_b["i94mode"].cast('integer')).\
withColumn("i94bir", df_sas_clean_b["i94bir"].cast('integer')).\
withColumn("count", df_sas_clean_b["count"].cast('integer')).\
withColumn("i94visa", df_sas_clean_b["i94visa"].cast('integer')).\
withColumn("depdate", df_sas_clean_b["depdate"].cast('integer')).\
withColumn("biryear", df_sas_clean_b["biryear"].cast('integer')).\
withColumn("admnum", df_sas_clean_b["admnum"].cast('integer'))

In [None]:
# Convert SAS date format to datetime:
def date_add_(days):
    '''Converts SAS time to datetime.'''
    date = datetime.strptime('1960-01-01', "%Y-%m-%d")
    return date + timedelta(days)

date_add_udf = f.udf(date_add_, t.DateType())

df_sas_clean_d = df_sas_clean_c.withColumn('arrdate', date_add_udf('arrdate'))\
    .withColumn('depdate', date_add_udf('depdate'))

# Drop year and mon columns
df_sas_clean_e = df_sas_clean_d.drop('i94yr','i94mon')

Here, I have decided against keeping year and month columns (or even generating an additional day column), since we do not actually have weather data for the given data, and hence a direct join would not make much sense even from a technical perspective. Instead, I leave it up to the data scientist on the receiving end of the data to process the date values and join them as desired.

### Remaining data

For the weather and cyntl data, the country column is capitalized to enable joins. We also convert the weather date string to datetime format. The demographic column names have many spaces and capitalization, so we adjust them to be more DWH-friendly

In [None]:
df_weather.columns=['date','average_temperature','average_temperature_uncertainty','country']
df_weather['country'] = df_weather['country'].str.upper().astype(str)
df_weather['date'] = pd.to_datetime(df_weather['date'])
df_weather=df_weather[df_weather['average_temperature'].notnull()]

df_cntyl['country'] = df_cntyl['country'].str.upper().astype(str)


In [None]:
df_demo.columns=['city', 'state', 'median_age', 'male_population', 'female_population',
       'total_population', 'number_of_veterans', 'foreign_born',
       'average_household_size', 'state_code', 'race', 'count']

## Step 3: Defining the Data Model

Three major steps will be necessary to relate the selected data:

### Connecting the weather data

Here, we will have to add the (somewhat obscure) i94 country code to the weather data, such that it can be used by the dwh users for a join. We can also remove countries from the data that do not appear in the immigration data

### Connecting the demographic data

For the demographic data, a state code is already available, which can be used for joining the data to the immigration data. However, the demographic data is listed by city (as opposed to state). Hence, we will need to perform aggregations on a state level. 

### Connecting the airport data

In this case, it will be necessary to map the airport codes used in the immigration data to the airport codes in the airport data.

### Data sources from the SAS data dictionary

These sources can mostly be used as-is.

## Step 4: Run Pipelines to Model the Data 

Here, we build the data pipelines to create the data model. We also define a testing function to perform some basic data quality checks on a dataframe.

In [None]:
def dataframe_quality_check(df):
    '''Generates description of dataframe argument.'''
    
    if(df.index.is_unique):
        print("The dataframe has a unique index.")
    else:
        print("Warning: The dataframe does not have a unique index.")
    
    col_summary = dict()
    for c in df.columns:
        col_attributes = dict()
        col_attributes['dtype'] = df[c].dtype
        col_attributes['count'] = df[c].count()
        col_attributes['count_null'] = df[c].size - col_attributes['count']
        col_attributes['unique_values'] = df[c].nunique()
    
        col_summary[c] = col_attributes
    return pd.DataFrame(col_summary).transpose()

### Immigration data

With the cleanups we already did, the immigration data should actually be fine as-is.

In [None]:
df_immigration_dwh = df_sas_clean_e

### Weather data

In order to be able to join the weather data to the immigration data, the cntyl country code needs to be available in the weather data. This is done via a join on the country field. Ideally, this join would be fuzzy, but for now we will just perform a rigid join.

We leave the actual aggreagation of weather data over time to the data scientist. This implies that country names will have multiple appearances, and hence cannot be used as an index column.

In [None]:
df_weather_dwh = pd.merge(left=df_weather, right=df_cntyl, 
                          left_on='country', right_on='country',
                          how='left')

In [None]:
df_weather_dwh.head(5)

In [None]:
# Only keep countries that are in the imigration data
i94cntyl_in_sas = list(set(df_immigration_dwh.select("i94cit").distinct().toPandas()['i94cit'] \
+ df_immigration_dwh.select("i94res").distinct().toPandas()['i94res']))
i94cntyl_in_sas = [int(x) for x in i94cntyl_in_sas if str(x) != 'nan']

df_weather_dwh = df_weather_dwh[df_weather_dwh['i94cntyl'].notnull()]
df_weather_dwh = df_weather_dwh[df_weather_dwh['i94cntyl'].isin(i94cntyl_in_sas)]
df_weather_dwh = df_weather_dwh.reset_index(drop=True)

In [None]:
dataframe_quality_check(df_weather_dwh)

The data looks fine.

### Demographic data
We aggregate the available numeric data on a city level for each state. Since we don't have access to total state demographics in this data set, we express the male population, female population, veteran number and foreign born number as fractions of total pupolation.

In [None]:
df_demo_dwh = df_demo[['state_code', 'state']].drop_duplicates().set_index('state_code')\
.join(df_demo.groupby(['state_code'])['male_population', 'female_population',\
                                      'total_population', 'number_of_veterans', 'foreign_born'].agg('sum'))\
.join(df_demo.groupby(['state_code'])['median_age', 'average_household_size'].agg('median'))

df_demo_dwh['male_population'] = df_demo_dwh['male_population']/df_demo_dwh['total_population']
df_demo_dwh['female_population'] = df_demo_dwh['female_population']/df_demo_dwh['total_population']
df_demo_dwh['number_of_veterans'] = df_demo_dwh['number_of_veterans']/df_demo_dwh['total_population']
df_demo_dwh['foreign_born'] = df_demo_dwh['foreign_born']/df_demo_dwh['total_population']

df_demo_dwh = df_demo_dwh.drop(['total_population'], axis=1)

In [None]:
df_demo_dwh.head(5)

In [None]:
dataframe_quality_check(df_demo_dwh)

In addition to the data above, we can also extract the "race distribution" of each state in a similar fashion. This table acts as an additinal dimension table for each state code.

In [None]:
df_demo_race_dwh = pd.DataFrame(df_demo.groupby(['state_code', 'race'])['count'].agg('sum'))\
.join(df_demo.groupby(['state_code'])['count'].agg('sum'), rsuffix='_total')

df_demo_race_dwh['fraction']=df_demo_race_dwh['count']/df_demo_race_dwh['count_total']
df_demo_race_dwh = pd.DataFrame(df_demo_race_dwh['fraction'])

In [None]:
df_demo_race_dwh.head(10)

In [None]:
dataframe_quality_check(df_demo_race_dwh)

### Airport data

Previously, we extracted df_port from the sas data file:

In [None]:
df_port.head()

We can attempt to combine this information with the available airport information:

In [None]:
df_airport_dwh = pd.merge(left=df_port, right=df_airport, 
                          left_on='i94port', right_on='ident',
                          how='left')
df_airport_dwh = df_airport_dwh.drop(['ident'], axis=1)

The join is acctually successful in some occasions:

In [None]:
df_airport_dwh[df_airport_dwh['type'].notnull()].head(3)

In [None]:
dataframe_quality_check(df_airport_dwh)

We can use an additional check: The iso_region field from df_airport should match with the addr field from df_port. Let us check the cases where this is _not_ true.

In [None]:
df_airport_dwh['iso_region_state'] = clean_field(df_airport_dwh, 'iso_region', r'-([^-]+)')
df_airport_dwh_outliers = df_airport_dwh[\
    (df_airport_dwh['type'].notnull())\
    & (df_airport_dwh['iso_region_state'] != df_airport_dwh['addr'])]

print(len(df_airport_dwh_outliers))
df_airport_dwh_outliers.head(10)

Many of these are not even in the US, which clearly indicates a false join. This makes it hard to trust the data we generated with the join. We should _at least_ remove these cases, even though they will leave very little data to work with. I will leave this as an option question an simple flag the data with the improper join.

In [None]:
df_airport_dwh['false_join'] = df_airport_dwh.index.isin(df_airport_dwh_outliers.index.tolist())

### Data from SAS dictionary

#### Mode data

The mode data can be taken as-is with the correct index.

In [None]:
df_mode_dwh=df_mode.set_index('i94mode')

In [None]:
df_mode_dwh.head()

#### Visa data

The visa data can be taken as-is with the correct index.

In [None]:
df_visa_dwh = df_visa.set_index('i94visa')

In [None]:
df_visa_dwh.head()

#### State data

This data only contains information which is already available. Hence, it will not be used.

In [None]:
df_addr.head()

### Data dictionary
Below, for each field we provide a what the data is and where it came from. 

*Note:* The **bold** values link to other fact/dimension tables.

#### df_immigration_dwh

This is the fact table with the immigration data. 

| field | type | description | origin |
| --- | --- | --- | --- |
| cicid | int | unique id | original parquet/sas files
| **i94cit** | int | country code (birth) | original parquet/sas files
| **i94res** | int | country code (residence) | original parquet/sas files
| **i94port** | string | arrival airport | original parquet/sas files
| arrdate | date | arrival date | original parquet/sas files
| **i94mode** | int | mode of transportation | original parquet/sas files
| **i94addr** | string | arrival state code | original parquet/sas files
| depdate | int | departure date | original parquet/sas files
| i94bir | int | age of respondent in years | original parquet/sas files
| **i94visa** | int | visa type | original parquet/sas files
| count | int | summary statistics | original parquet/sas files
| dtadfile | string | character date field | original parquet/sas files
| visapost | string | Department of State where where Visa was issued | original parquet/sas files
| entdepa | string | Arrival Flag | original parquet/sas files
| entdepd | string | Departure Flag | original parquet/sas files
| matflag | string | Match flag | original parquet/sas files
| biryear | int | 4 digit year of birth | original parquet/sas files
| dtaddto | string | character date field | original parquet/sas files
| gender | string | Non-immigrant sex | original parquet/sas files
| airline | string | Airline used to arrive in US | original parquet/sas files
| admnum | int | Admission number | original parquet/sas files
| fltno | string | Flight number | original parquet/sas files
| visatype | string | class of admission | original parquet/sas files

#### df_weather_dwh

This is a fact table with weather data by country and date. 

| field | type | description | origin |
| --- | --- | --- | --- |
| index | int | unique id | generated |
| date | date | date of record | weather data |
| average_temperature | numeric |average temperature |weather data |
| average_temperature_uncertainty |  numeric | average temperature uncertainty  | weather data |
| country | string | country | weather data |
| **i94cntyl** | int | cntyl country code | SAS description via join | 

#### df_demo_dwh

This is a dimension table that provides demographic data for the states in the us.

| field | type | description | origin |
| --- | --- | --- | --- |
| **state_code** | string | us state code | demographic data |
| state | string | us state | demographic data |
| male_population | numeric | fraction of males |demographic data |
| female_population | numeric | fraction of females |demographic data |
| number_of_veterans | numeric | fraction of veterans | demographic data |
| foreign_born | numeric | fraction of foreign borns | demographic data |
| median_age | numeric | median age | demographic data |
| average_household_size | numeric | average household size | demographic data |

#### df_demo_race_dwh

This is a dimension table that provides racial data for the states in the us. 

| field | type | description | origin |
| --- | --- | --- | --- |
| **state_code** | string | us state code | demographic data | 
| race | string | race (forms unique index combined with state_code) | demographic data | 
| fraction | numeric | fration of race | demographic data | 

#### df_mode_dwh

This is a dimension table that provides the full form of the transportation mode.

| field | type | description | origin |
| --- | --- | --- | --- |
| **i94mode** | int | mode of transportation (numeric) | SAS description | 
| mode | string | mode of transportation | SAS description | 

#### df_visa_dwh

This is a dimension table that provides the full form of the visa type.

| field | type | description | origin |
| --- | --- | --- | --- |
| **i94visa** | int | visa type (numeric) | SAS description | 
| visa | string | visa type | SAS description | 

#### df_airport_dwh

This is a dimension table that provides detailed information for airports.

| field | type | description | origin |
| --- | --- | --- | --- |
| i94port | string | airport code | SAS description |
| port | string | short airport name | SAS description |
| addr | string | us state code | SAS description |
| type | string | airport type | airport data |
| name | string | full airport name | airport data |
| elevation_ft | numeric | airport elevation | airport data |
| continent | string | continent | airport data |
| iso_country | string | country of airport | airport data |
| iso_region | string | region of airport | airport data |
| municipality | string | municipality of airport | airport data |
| gps_code | string | airport gps code | airport data |
| iata_code | string | airport iata code | airport data |
| local_code | string | airport local code | airport data |
| coordinates | string | airport gps coordinates | airport data |
| false_join | bool | indicates if airport data might be faulty | airport data |

## Step 5: Project Summary


### Outline

In this project, we enriched US immigration data with dimension data from three other data sets:
* Temperature data of origin countries
* US state demographic data
* Airport data

The data can be joined by DWH users via specific columns in the immigration data. 

We did not cover the actual upload of data to a suitable DWH. At this stage of the project, the data is available in PySpark and Pandas dataframes. 

### Choice of tools
The immigration data contains over 3 million rows. With its scalable big-data capabilities, (Py)Spark is a natural choice to perform the data wrangling in such a scenario. I decided to use Pandas for the remaining data sets, since they could quickly be loaded as Pandas dataframes and Pandas syntax is slighlty more consice (compared with Spark). 

To simplify the development, all steps are performed in a Jupyter Notebook. Of course, in a production environment, one should use more robust and version-friendly tools, such as a proper Python project with .py scripts.


### Data updates
The data used in the project is static and hence there were no considerations of regular updates. If there was a desire for regular updates, a proper automatic data pipeline would need to be established - this is covered below. 


### Outlook

We briefly describe how this project would need to be adapted in three different scenarios.

#### Scenario 1: Increase in data volume by 100x
If the immigration data increased by a factor of 100x, Spark is still the proper tool to use. In order to perform the cleaning steps above more efficiently, it might be required to use an actual spark cluster and distribute the computation to multiple nodes.

For the data used for dimensions, we do not expect them to increase by 100x, since they are already covering the whole world or at least all US states. However, in the unlikely case that this data will eventually run over memory, it will be necessary to migrate the computations from Pandas to PySpark. 


#### Scenario 2: The data populates a dashboard that must be updated on a daily basis by 7am every day
In this scenario a transition to a data orchestration platform, such as Apache Airflow, is required. This is needed to reliably perform ETL pipelines and report any issues along the way. Alternatively, one could also just run this script with as a CRON job (provided updated input data), but a dedicated tool is clearly the better choice.

Additionally, one should consider adjusting the code to allow for incremental updates. 

#### Scenario 3: The database needs to be accessed by 100+ people
If access by many people is required, the data should be made available on a scalable DWH platform, such as AWS Redhift. In this case, the easiest solution would be to  
1. Move the data to S3 buckets in a suitable format (e.g. csv)
2. Load the data from S3 to Redhift with an ETL pipeline extension (ideally via Airflow or similar). 
