# Project Title
### Data Engineering Capstone Project

#### Project Summary

The project runs production level code to create a data warehouse.

In [11]:
# Import packages
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType

from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from collections import OrderedDict
import psycopg2

# Set defaults
pd.set_option("max_rows", 1000)
pd.set_option("max_columns", 1000)

# Build spark session
spark = SparkSession.builder.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
                    .enableHiveSupport().getOrCreate()

In [12]:
spark_files = ['../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat']

def load_data(spark_files):
    '''
    Input: spark_files is a list of file names for i94 data
    
    Output: dataframes for i94 data, city data, and airport data
    '''

    # Read only 1 file of the immigration dataset to keep the size easy to work with
    # Only reading 1k rows to preserve memory space
    im_df = spark.read.format('com.github.saurfang.sas.spark').load(spark_files,forceLowercaseNames=True).limit(1000)
    im_df = im_df.toPandas()
    
    # For CSV files:
    # Only 2 and unchanging so no need for a complicated loop
    airport_df = pd.read_csv('airport-codes_csv.csv', nrows=1000)
    city_df = pd.read_csv('us-cities-demographics.csv', delimiter=';', nrows=1000)
    
    return im_df, airport_df, city_df

In [13]:
def __clean_col_names(df):
    '''
    Input: dataframe whose column names should be standardized and cleaned
    Output: dataframe with cleaned column names
    '''
    cols = list(df.columns)
    cols = [x.lower().replace(' ','_') for x in cols]

    df.columns = cols
    return df

### Step 2: Prep and Clean Data

#### Immigration Data

In [14]:
def prep_clean_im_df(im_df):
    '''
    Input:  Immigration data dataframe
    Output: Cleaned and prepped dataframe
    '''
    # Update dtypes
    im_df['cicid'] = im_df['cicid'].astype(object)
    im_df['arrdate'] = pd.to_datetime(im_df['arrdate']).dt.date
    im_df['depdate'] = pd.to_datetime(im_df['depdate']).dt.date
    im_df['dtadfile'] = pd.to_datetime(im_df['dtadfile']).dt.date
    im_df['i94yr']=im_df['i94yr'].astype(int)
    im_df['i94mon']=im_df['i94mon'].astype(int)
    im_df['i94cit']=im_df['i94cit'].astype(int)
    im_df['i94res']=im_df['i94res'].astype(int)
    #im_df['i94mode']=im_df['i94mode'].astype(int)
    im_df['i94bir']=im_df['i94bir'].astype(int)
    im_df['i94visa']=im_df['i94visa'].astype(int)
    im_df['count']=im_df['count'].astype(int)
    im_df['biryear']=im_df['biryear'].astype(int)

    # Replace invalid values, then update remaining data type
    im_test = np.where(im_df['dtaddto']=='D/S', '01011900', im_df['dtaddto'])
    im_df['dtaddto'] = im_test
    im_df['dtaddto'] = pd.to_datetime(im_df['dtaddto'], format = '%m%d%Y').dt.date
    
    # Drop all blank columns and rename as needed
    im_df = im_df.drop(columns = ['occup', 'insnum']).rename(columns={'i94bir': 'age'}, inplace=True)

    # Clean column names
    im_df = __clean_col_names(im_df)

    return im_df

#### City Data

In [15]:
def prep_clean_city(city_df):
    '''
    Input:  Geo data dataframe
    Output: Cleaned and prepped dataframe
    '''
    # Clean column names
    city_df = __clean_col_names(city_df)
    
    return city_df

#### Airport Data

In [16]:
def prep_clean_airport(airport_df):
    '''
    Input:  Airport data dataframe
    Output: Cleaned and prepped dataframe
    '''
    airport_df['airline'] = airport_df['local_code'].str[2:]
    airport_df[['latitude', 'longitude']] = airport_df['coordinates'].str.split(",", expand = True) \
                                                            .rename(columns = {0: 'latitude', 1: 'longitude'})
    airport_df = airport_df.drop(columns = ['coordinates', 'continent', 'iata_code'])
    
    # Clean column names
    airport_df = __clean_col_names(airport_df)
    
    return airport_df

#### Date Data

##### Create dataframe to be used for date and time dimension table by:
1. Create master date column to match date fields on
2. Create components from each date to be stored as separate columns

In [17]:
# adapted from here: https://stackoverflow.com/questions/34898525/generate-list-of-months-between-interval-in-python
dates = ["1900-01-01", "2020-01-01"]


def __get_all_dates(dates):
    '''
    Input:  List of two dates, first as start date and second as end date
    Output: Ordered dictionary of all dates at the day level between the start and end date, inclusive
    '''
    # format dates provided, assign them start and end signifiers
    start, end = [datetime.strptime(_, "%Y-%m-%d") for _ in dates]
    # return ordered dictionary of all days in between the start and end dates
    return OrderedDict(((start + timedelta(_)) \
                        .strftime(r"%Y-%m-%d"), None) for _ in range((end - start).days)).keys()


def prep_clean_dates(dates):
    '''
    Input:  Ordered dict of all dates from the last step
    Output: Dataframe with all date part components as separate columns for each date in the list
    '''
    dates_df = __get_all_dates(dates)
    dates_df = pd.DataFrame(list(dates_df)).rename(columns = {0: 'dates_master'})
    dates_df['dates_master'] = pd.to_datetime(dates_df['dates_master'])
    dates_df['month'] = dates_df['dates_master'].dt.month
    dates_df['day'] = dates_df['dates_master'].dt.day
    dates_df['year'] = dates_df['dates_master'].dt.year
    dates_df['weekday'] = dates_df['dates_master'].dt.weekday
    dates_df['week'] = dates_df['dates_master'].dt.week
    dates_df['dates_master'] = (dates_df['dates_master'].dt.date).astype(object)

    return dates_df

### Convert to Spark and write to tables

In [18]:
def create_fact_im(im_df):
    '''
    Input: Pandas immigration dataframe
    Output: None.  Spark immigration dataframe is written to spark table
    '''
    # Create the conceptual fact table schema in Spark
    im_Schema = StructType([StructField("cicid", StringType(), True), \
                            StructField("i94yr", IntegerType(), True), \
                            StructField("i94mon", IntegerType(), True), \
                            StructField("i94cit", IntegerType(), True), \
                            StructField("i94res", StringType(), True), \
                            StructField("i94port", StringType(), True), \
                            StructField("arrdate", DateType(), True), \
                            StructField("i94mode", StringType(), True), \
                            StructField("i94addr", StringType(), True), \
                            StructField("depdate", StringType(), True), \
                            StructField("i94bir", IntegerType(), True), \
                            StructField("i94visa", IntegerType(), True), \
                            StructField("count", IntegerType(), True), \
                            StructField("dtadfile", StringType(), True), \
                            StructField("visapost", StringType(), True), \
                            StructField("entdepa", StringType(), True), \
                            StructField("entdepd", StringType(), True), \
                            StructField("entdepu", StringType(), True), \
                            StructField("matflag", StringType(), True), \
                            StructField("biryear", IntegerType(), True), \
                            StructField("dtaddto", DateType(), True), \
                            StructField("gender", StringType(), True), \
                            StructField("airline", StringType(), True), \
                            StructField("admnum", DoubleType(), True), \
                            StructField("fltno", StringType(), True), \
                            StructField("visatype", StringType(), True)])

    spark_im_df = spark.createDataFrame(im_df,schema=im_Schema)
    #spark_im_df.write.mode("append").parquet("/results/fact_immigration.parquet")

In [19]:
def create_dim_date(dates_df):
    '''
    Input: Pandas dates dataframe
    Output: None.  Spark dataframe is written to spark table
    '''

    # Create the conceptual date dimension table
    dates_Schema = StructType([StructField("dates_master", StringType(), True), \
                            StructField("month", IntegerType(), True), \
                            StructField("day", IntegerType(), True), \
                            StructField("year", IntegerType(), True), \
                            StructField("weekday", StringType(), True), \
                            StructField("week", StringType(), True)])

    spark_dates_df = spark.createDataFrame(dates_df,schema=dates_Schema)

    #write table
    spark_dates_df.write.mode("append").parquet("/results/dim_dates.parquet")

In [20]:
def create_dim_airport(airport_df):
    '''
    Input: Pandas airports dataframe
    Output: None.  Spark dataframe is written to spark table
    '''

    # Create the conceptual airlines dimension table
    airport_Schema = StructType([StructField("ident", StringType(), True), \
                            StructField("type", StringType(), True), \
                            StructField("name", StringType(), True), \
                            StructField("elevation_ft", DoubleType(), True), \
                            StructField("iso_country", StringType(), True), \
                            StructField("iso_region", StringType(), True),\
                            StructField("municipality", StringType(), True), \
                            StructField("gps_code", StringType(), True), \
                            StructField("local_code", StringType(), True), \
                            StructField("latitude", StringType(), True), \
                            StructField("longitude", StringType(), True), \
                            StructField("airline", StringType(), True)])

    spark_airport_df = spark.createDataFrame(airport_df,schema=airport_Schema)

    spark_airport_df.write.mode("append").parquet("/results/dim_airport.parquet")

In [21]:
def create_dim_geo(city_df):
    '''
    Input: Pandas geography and population dataframe
    Output: None.  Spark dataframe is written to spark table
    '''
    geo_Schema = StructType([StructField("city", StringType(), True), \
                        StructField("state", StringType(), True), \
                        StructField("median_age", DoubleType(), True), \
                        StructField("male_population", DoubleType(), True), \
                        StructField("female_population", DoubleType(), True), \
                        StructField("total_population", IntegerType(), True), \
                        StructField("number_of_veterans", DoubleType(), True), \
                        StructField("foreign_born", DoubleType(), True), \
                        StructField("average_household_size", DoubleType(), True), \
                        StructField("state_code", StringType(), True), \
                        StructField("race", StringType(), True), \
                        StructField("count", IntegerType(), True)])

    spark_geo_df = spark.createDataFrame(city_df,schema=geo_Schema)

    spark_geo_df.write.mode("append").parquet("/results/dim_geo.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [25]:
def quality_check(df):
    '''
    Input: Dataframe
    Output: Outcome of data quality check
    '''
    if df.count() == 0:
        print("Data quality check failed for {} with zero records".format(df))
    else:
        print("Data quality check passed for {} with {} records".format(df, df.count()))

### Run Pipelines to Model the Data 

In [None]:
def run_pipeline(spark_files, dates):
    load_data(spark_files)
    prep_clean_im_df(im_df)
    prep_clean_city(city_df)
    prep_clean_airport(airport_df)
    prep_clean_dates(dates)
    create_fact_im(im_df)
    create_dim_date(dates_df)
    create_dim_airport(airport_df)
    create_dim_geo(city_df)
    quality_check(im_df)
    quality_check(city_df)
    quality_check(dates_df)
    quality_check(airport_df)

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Immigration Fact Table
- cicid: ID number for each applicant
- i94yr: 4 digit year of I9
- i94mon: Numeric month of I9
- i94cit: 3 digit code of original citizenship country
- i94res: 3 digit code of original residence country
- i94port: 3 digit code of US city upon arrival
- arrdate: Date arrived to US
- i94mode: 1 digit travel code indicating travel type (air, land, sea)
- i94addr: State on i94 visa, 2 digit code
- depdate: Departure date
- i94bir: Age
- i94visa: Reason requesting i94 visa (1-3, indicating business, pleasure, or student)
- count: 1 for all records; used for quick counts
- dtadfile: Date added to file
- visapost: Department of State office issuing the visa
- entdepa: Arrival flag, either admitted or not
- entdepd: Departure flag (indcating departed, lost I-94 or is deceased)
- entdepu: Update flag (indicating apprehended, overstayed, adjusted to perm residence)
- matflag: Match flag (match of arrival and departure records)
- biryear: 4 digit year where born
- dtaddto: Date allowed to stay in US until
- gender: Gender of i94 visa holder
- airline: Airline used to arrive in U.S.
- admnum: Admission number
- fltno: Flight number of Airline used to arrive in U.S.
- visatype: type of i94 visa (letter + number)

#### Dimension Airport Table
- ident: Alphanumeric code identifying airport
- type: Type of airport (e.g. helipad, small airport, closed)
- name: Name of airport
- elevation_ft: Elevation where airport is located, in feet
- iso_country: Country code of airport
- iso_region: Region code of airport, combination of 2 digit country code and 2 digit state abbreviation
- municipality: Locality / city where airport is located
- gps_code: Alphanumeric code identifying the airport
- local_code: Alphanumeric code identifying the airport
- airline: Last 2 digits of local code since most are padded with leading zeros; to facilitate joins on city code
- latitude: Latitude of airport
- longitude: Longitude of airport

**Dropped**: coordinates, continent, and iata code columns.  IATA code would be highly useful for matching out to other datasets 
but unfortunately all codes were missing in this dataset.  The continent column was the same value since all destination airports are in the same continent.  Coordinates was dropped because it was separated into latitude and longitude columns.

#### Dimension Geo Table
- city: city
- state: state (no abbreviation)
- median_age: median age for the combination of city, state, population, and race slice the row represents
- male_population: male population for the combination of city, state, population, and race slice the row represents
- female_population: female population for the combination of city, state, population, and race slice the row represents
- total_population: total population for the combination of city, state, population, and race slice the row represents
- number_of_veterans: number of veterans for the combination of city, state, population, and race slice the row represents
- foreign-born: number of foreign born residents for the combination of city, state, population, and race slice the row represents
- average_household_size: typical number of people living in one home for the combination of city, state, population, and race slice the row represents
- state_code: 2 digit state abbreviation
- race: Race of the population data represented in the city-state row
- count: Number of i94 visas for that city state

**Dropped:** No columns dropped.

#### Dimension Date Table
- dates_master: YYYY-MM-DD formatted list of all days between 1900-01-01 and 2020-01-01
- month: Month of date in dates_master
- day: Day of date in dates_master
- year: Year of date in dates_master
- weekday: Day of week (string) of date in dates_master
- week: Week ID for the date in dates_master

**Dropped:** No columns dropped.

#### Step 5: Project Write Up.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

Rational for choice of tools and technologies:  
- A data warehouse was used rather than a data lake because this project had a pre-defined purpose of preparing immigration i94 data for predictive modeling.  Data lakes are more commonly used for vast pools of data with no defined structure.  Cleaning and dropping fields happens after extracting the data from the lake, not prior to creating the lake.  The warehouse structure allows for dropping all blank fields.
- Spark was used given the size of the i94 data.  For purpose of code development, a single file was used.  In production, the entire dataset should be processed and stored.  Spark's parallelization and distributed computing capabilities make it the ideal tool for working with data of this scale.
- Python was also used for quick analysis and quality checks.  In the development phase especially, this was ideal for getting to the answer that drives business value quickly.
- A STAR schema was used for the tables to facilitate easy analysis and logical, structured data organization.  The fact table is the i94 immigration data.  The dimensions tables include geographic data, airport code data, and date data.

Update schedule:
- i94 data should be updated as frequently as each new refresh is available from the government.  This data comprises the fact table, which is the heart of the data warehouse.  It is also the central piece around which all modeling questions are based.  Given this key role, it needs to be as complete and current as possible as consistently as possible.
- Geographic data and airport data are unlikely to change significantly as often.  Geographic data does contain population information, so an annual refresh would be ideal.  Alternately, if an annual refresh is not possible, updates should use the latest U.S. census data and mirror the census data update schedule.  Airport data should be monitored annually as well, especially for closures of smaller locations.  This is the data that is most likely to stay consistent so this refresh can take lower priority compared to i94 data and geographic data.

Contingency Scenario Planning:
- If the data was increased 100x, I recommend continuing to stick with the Redshift platform but adapting the number of clusters and workers to optimize job performance.
- If a dashboard must be updated by 7am daily, I recommend scheduling this job to run autmoatically overnight.  If the data quality check is not passed, a set of rules should be set up so that the dashboard is not populated and the same data from the prior day remains on the dashboard.  Slightly old data is better than bad data!  The analyst or data scientist could then look at the failures upon arriving to the office that morning.  In the event that delay is not feasible for the business need and this fits with the data availability / refresh schedule, I recommend running the job at the end of the day the day prior (say 4pm on Monday for the Tuesday 7am refresh).  This would allow someone to manually review any flags thrown during the ingest and update process.  Then, a separate job could just refresh the GUI prior to 7am, essentially separating the data ingest and check process from the dashboard update process.
- If the database needs accessed by 100+ people, I recommend using Redshift's autoscaling feature to accommodate sudden, unanticipated changes in access and querying.  Redshift also offers good read performance, so would be a better fit than Apache Cassandra, which offers better write performance.