# Immigration Database
### Data Engineering Capstone Project

#### Project Summary
In the final program of this course we will attempt to set up a data-warehouse in Redshift where analysts can run queries against a dataset provided by Udacity. The goal of this dataset will be to provide a basis for analysis for immigration data of the US. The dataset contains 3 different sources, namely 1. Immigration data from I94 filings from the United States government from 2016, 2. US City demographic data and 3. airport data containing basic airport and city data. We will first perform ETL that processes the data in Spark where it is then stored in s3. From there, we schedule ingestion from S3 into Redshift using Airflow.

The idea of automation on the latter part is as follows; given infrequent releases of these data sources (in the real world) we would have a notebook (like this one) ready to run whenever a new batch of data is available, this could even be from different sources or formats over time as especially government datasets are notoriously tricky to handle in terms of changing schemas or column categories. After verifying the ETL process in a notebook like this the data would be put in S3. Since we will require a fixed format to be present in S3 we can then safely the ingestion part and we have an automatic process that picks up this data and updates the tables in Redshift.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [51]:
# imports
import configparser
import pandas as pd
import glob
from pyspark.sql import SparkSession
from pyspark import SparkConf
import re
import os
from pyspark.sql.functions import count, col, udf, year, month, avg, round, dayofweek, weekofyear, isnull, when, isnan
from pyspark.sql.types import StringType, IntegerType
from datetime import datetime, timedelta

In [52]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [53]:
# sessions
conf = SparkConf()
conf.set(
    "spark.jars.packages",
    "saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.0")

"""Create a apache spark session."""
spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .enableHiveSupport() \
    .getOrCreate()
spark

# Step 1: Scope the Project and Gather Data

#### Scope 
- Goal: providing a data source for analysts to analyze migration patterns using different input sources like the ones listed below.
- Tools: Spark for preprocessing/sanity checks and dumping the data in AWS S3, then Airflow/Python to move the data from S3 to Redshift.
- Considerations: it would be cleaner to put everything in 1 pipeline (i.e. start from local files -> ETL -> S3 -> Redshift) but this would make the project much more complex while adding little to my knowledge. In practice I'm seeing that in the companies I have worked these pipelines are often cut in pieces (i.e. different teams own the local files -> S3 part, then another owns the S3 to redshift), so splitting the project like this makes sense in my opinion.

#### Describe and Gather Data 
- I94 Immigration Data: 
    Source: The U.S. National Tourism and Trade Office and contains various statistics on international visitor arrival in USA. The dataset contains data from 2016. 
    [https://travel.trade.gov/research/reports/i94/historical/2016.html]
- U.S. City Demographic Data: 
    Source: OpenSoft and contains information about the demographics of all US cities such as average age, sex etc. 
    [https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/]

In [54]:
# Immigration data

# use the glob below to obtain all files, here we will be working with one file only
# immigration_files = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
immigration_files = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
immigration_df = spark.read.format("com.github.saurfang.sas.spark").load(immigration_files)

In [55]:
# Demographics data
demographics_file = "us-cities-demographics.csv"
demographics_df = (spark
                       .read
                       .format("csv")
                        .option("delimiter", ";")
                        .option("header", "true")
                        .load(demographics_file)
                  )

# Step 2: Explore and Assess the Data

### Immigration data

For the purpose of our project we are especially interested in data that describes origin and destination, as well as a few basic facts about the immigrants. We potentially want to do side-analysis about airports as well, so we are also including fields related to those. Therefore we will be using the following fields:
- cicid -> immigration_id
- i94yr -> immigration_year
- i94mnth -> immigration_month
- i94res -> origin_country
- i94port -> arrival_port
- arrdate -> arrival_date
- i94mode -> port_type
- i94addr -> state_of_residence
- depdate -> departure_date
- i94bir -> immigrant_age
- i94visa -> visitor_type
- gender -> gender
- airline -> airline_code
- fltno -> airline_flight_number
- visatype -> visa_type

Some of these need a bit of work since they are codes, the description file I94_SAS_Labels_Descriptions.SAS provides some conversions.

In [56]:
# we get a list of valid ports according to the description
# if a port is not in this list or it's from the part of the list
# with invalid codes we put it as 'Other'
def get_valid_port_list():
    i94_sas_label_descriptions = "I94_SAS_Labels_Descriptions.SAS"
    with open(i94_sas_label_descriptions) as f:
        lines = f.readlines()

    re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
    valid_ports = {}
    # line 302 to 892 describes valid ports
    for line in lines[302:893]:
        results = re_compiled.search(line)
        if 'UNKNOWN' or 'UNIDENTIFIED' or 'NOT REPORTED' in results.group(2):
            pass
        # removing whitespace for readability
        valid_ports[results.group(1)] = re.sub("(\s+)", "", results.group(2))
    return valid_ports
valid_ports = get_valid_port_list()

@udf(StringType())
def get_valid_port(x):
    if x in valid_ports.keys():
        return x
    else:
        return "Other"

In [57]:
# we get a list of valid countries according to the description
# if a country is not in this list or it's from the part of the list
# with invalid codes we put it as 'Other'
def get_valid_country_list():
    i94_sas_label_descriptions = "I94_SAS_Labels_Descriptions.SAS"
    with open(i94_sas_label_descriptions) as f:
        lines = f.readlines()

    re_compiled = re.compile(r"([0-9]+).*\'(.*)\'")
    valid_ports = {}
    for line in lines[9:245]:
        results = re_compiled.search(line)
        valid_ports[int(results.group(1))] = re.sub("(\s+)", "", results.group(2))
    return valid_ports
valid_countries = get_valid_country_list()

@udf(StringType())
def get_valid_country(x):
    if int(x) in valid_countries.keys():
        return valid_countries[x]
    else:
        return "Other"

In [58]:
# Parse the 'arrdate' and 'depdate' fields to a valid date
@udf(StringType())
def convert_to_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

In [59]:
# Parse the 'i94mode' field to a valid port_type
@udf(StringType())
def convert_mode(x):
    mode_dict = {
        1.0 : 'Air',
        2.0 : 'Sea',
        3.0 : 'Land',
        9.0 : 'Not reported'
    }
    if x in mode_dict.keys():
        return mode_dict[x]
    return 'Not reported'

In [60]:
# Parse the 'i94visa' field to a valid visitor type
@udf(StringType())
def convert_visa(x):
    mode_dict = {
        1 : 'Business',
        2 : 'Pleasure',
        3 : 'Student'
    }
    if x in mode_dict.keys():
        return mode_dict[x]
    return 'Unknown'

In [61]:
# create port from city
@udf(StringType())
def city_to_port(city):
    for key in valid_ports:
        if city.lower() in valid_ports[key].lower():
            return key

In [62]:
# convert fields
immigration_df_clean = ((immigration_df
        .withColumnRenamed('cicid', 'immigration_id')
        .withColumnRenamed('i94yr', 'immigration_year') 
        .withColumnRenamed('i94mon', 'immigration_month')
        .withColumn('origin_country', get_valid_country(col('i94res')))
        .withColumn('arrival_port', get_valid_port(col('i94port')))
        .withColumn('arrival_date', convert_to_datetime(col('arrdate')))
        .withColumn('port_type', convert_mode(col('i94mode')))
        .withColumnRenamed('i94addr', 'state_of_residence')
        .withColumn('departure_date', convert_to_datetime(col('depdate')))
        .withColumnRenamed('i94bir', 'immigrant_age')
        .withColumn('visitor_type', convert_visa(col('i94visa')))
        .withColumnRenamed('airline', 'airline_code')
        .withColumnRenamed('fltno', 'airline_flight_number')
        .withColumnRenamed('visatype', 'visa_type')                       
).select('immigration_id', 'immigration_year'
         , 'immigration_month', 'origin_country'
         , 'arrival_port', 'arrival_date', 'port_type'
         , 'state_of_residence'
         , 'immigrant_age', 'visitor_type', 'airline_code'
         , 'airline_flight_number', 'visa_type', 'gender'))

In [63]:
immigration_df_clean.show(n=5)

+--------------+----------------+-----------------+--------------+------------+------------+------------+------------------+-------------+------------+------------+---------------------+---------+------+
|immigration_id|immigration_year|immigration_month|origin_country|arrival_port|arrival_date|   port_type|state_of_residence|immigrant_age|visitor_type|airline_code|airline_flight_number|visa_type|gender|
+--------------+----------------+-----------------+--------------+------------+------------+------------+------------------+-------------+------------+------------+---------------------+---------+------+
|           6.0|          2016.0|              4.0|       ECUADOR|         XXX|  2016-04-29|Not reported|              null|         37.0|    Pleasure|        null|                 null|       B2|  null|
|           7.0|          2016.0|              4.0|    SOUTHKOREA|         ATL|  2016-04-07|         Air|                AL|         25.0|     Student|        null|                0029

This is looking good already! For some columns it makes sense to have nulls but for some we might need to replace them. In the UDFs we already took care of some cases (i.e. unknown arrival ports are denoted as 'Other' etc.), let's see where we still have nulls: 

In [64]:
immigration_df_clean.select([count(when(isnull(c), c)).alias(c) for c in immigration_df_clean.columns]).show()

+--------------+----------------+-----------------+--------------+------------+------------+---------+------------------+-------------+------------+------------+---------------------+---------+------+
|immigration_id|immigration_year|immigration_month|origin_country|arrival_port|arrival_date|port_type|state_of_residence|immigrant_age|visitor_type|airline_code|airline_flight_number|visa_type|gender|
+--------------+----------------+-----------------+--------------+------------+------------+---------+------------------+-------------+------------+------------+---------------------+---------+------+
|             0|               0|                0|             0|           0|           0|        0|            152592|          802|           0|       83627|                19549|        0|414269|
+--------------+----------------+-----------------+--------------+------------+------------+---------+------------------+-------------+------------+------------+---------------------+---------+---

We make the following choices:
- state_of_residence: we don't know, so we put Unknown
- departure_date: likely the immigrant hasn't departed (i.e. not yet or permanent immigration), so we put NA
- immigrant_age: we put -1 to make clear that we don't know
- airline_code: this one can be either because it's not entered correctly (we have less missing flight numbers than airline codes so this happens) but can also be because the country is not entered via air, so putting Unknown to be safe
- airfline_number: same as above
- gender: unknown gender we put X generally denoted as unknown (or not willing to provide) gender

In [65]:
immigration_df_clean = (immigration_df_clean
                            .fillna({'immigrant_age' : -1, 'state_of_residence' : 'unknown'
                                     , 'airline_code' : 'Unkown'
                                     , 'airline_flight_number' : 'Unknown', 'gender' : 'X'}))

In [66]:
immigration_df_clean.createOrReplaceTempView('immigration_df_clean')
immigration_df_clean = spark.sql('select row_number() over (order by "immigration_id") as immigrant_id, * from immigration_df_clean')

In [67]:
#final data
immigration_df_clean.show(5)

+------------+--------------+----------------+-----------------+--------------+------------+------------+------------+------------------+-------------+------------+------------+---------------------+---------+------+
|immigrant_id|immigration_id|immigration_year|immigration_month|origin_country|arrival_port|arrival_date|   port_type|state_of_residence|immigrant_age|visitor_type|airline_code|airline_flight_number|visa_type|gender|
+------------+--------------+----------------+-----------------+--------------+------------+------------+------------+------------------+-------------+------------+------------+---------------------+---------+------+
|           1|           6.0|          2016.0|              4.0|       ECUADOR|         XXX|  2016-04-29|Not reported|           unknown|         37.0|    Pleasure|      Unkown|              Unknown|       B2|     X|
|           2|     3308035.0|          2016.0|              4.0|        FRANCE|         NYC|  2016-04-18|         Air|              

### Demographics data

The demographics data is fairly straightforward, we just read in the data and rename a few columns. There are a few duplicates which we drop.

In [69]:
demographics_df.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

In [70]:
demographics_df_clean = (demographics_df.select('City', 'State Code',
                                       'Male Population',
                                       'Female Population',
                                       'Total Population',
                                       'Foreign-born',
                                       'Race',
                                        'Count') 
            .withColumnRenamed('City', 'city') 
            .withColumnRenamed('State Code', 'state') 
            .withColumnRenamed('Male Population', 'male_population') 
            .withColumnRenamed('Female Population', 'female_population') 
            .withColumnRenamed('Total Population', 'total_population') 
            .withColumnRenamed('Foreign-born', 'foreign_born')
            .withColumnRenamed('Race', 'race')
            .withColumnRenamed('Count', 'n_persons')
            .withColumn('port', city_to_port('City')))

In [75]:
#final data
demographics_df_clean.show(5)

+----------------+-----+---------------+-----------------+----------------+------------+--------------------+---------+----+
|            city|state|male_population|female_population|total_population|foreign_born|                race|n_persons|port|
+----------------+-----+---------------+-----------------+----------------+------------+--------------------+---------+----+
|   Silver Spring|   MD|          40601|            41862|           82463|       30908|  Hispanic or Latino|    25924|null|
|          Quincy|   MA|          44129|            49500|           93629|       32935|               White|    58723|null|
|          Hoover|   AL|          38040|            46799|           84839|        8229|               Asian|     4759|null|
|Rancho Cucamonga|   CA|          88127|            87105|          175232|       33878|Black or African-...|    24437|null|
|          Newark|   NJ|         138040|           143873|          281913|       86253|               White|    76402| NEW|


# Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
For this project we choose a star model with dimension and fact tables. A star model makes sense in this case since we have a single unique object of study (the immigrant) and based on a few basic dimensions about the immigrant we can then extend those to more facts about the dimension. The dimension table is the immigrant table with an unique ID per immigrant. This table then contains a few fields that allow to join on fact tables that contain more information about that dimension. We propose the following:

Dimension Table: immigrant
- immigrant_id (to get to immigrant info, to get to arrival info)
- arrival_port (to get to city info)
- state_of_residence (to get to city info)
- arrival_date (to get to arrival time date)

Fact table: immigrant_stats
- immigrant_id
- immigration_id
- gender
- age
- origin_country
- visa_type
- visitor_type

Fact table: city_pop
- city
- port
- state
- male population
- female population
- total population
- foreign born

Fact table: city_demographics
- city
- port
- state
- race
- n_persons

Fact table: arrival_info
- immigrant_id
- arrival_port
- port_type
- airline_code
- airline_flight_number

Fact table: arrival_date
- arrival_date
- immigration_year
- immigration_date



#### 3.2 Mapping Out Data Pipelines
1. First, we load the two cleaned spark tables for immigration and demographics data to S3
2. Then we use Airflow to schedule staging tables to Redshift
3. In the next step we create the star schema as outline above
4. Finally, we add quality checks at the end

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

Step 1 below, please see ########### for steps 2-4

In [72]:
bucket = 's3a://jehofman-udacity-dend-capstone-project/'

demographics_df_clean.write.json(
            os.path.join(
                bucket,
                'demographics/'),
            'overwrite')

In [73]:
immigration_df_clean.write.json(
            os.path.join(
                bucket,
                'immigration/'),
            'overwrite')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [74]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.