# Project Title
### Data Engineering Capstone Project

In [1]:
# Do all imports and installs here
import pandas as pd, re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from os import walk, path, listdir
from os.path import isfile, join
from pyspark.sql.functions import date_add, expr, col, create_map, lit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, DoubleType
from itertools import chain
from pyspark.sql.types import *
from pyspark.sql.functions import date_format, upper
from pyspark.sql.functions import col, unix_timestamp, to_date

# Step 1: Scope the Project and Gather Data
## Scope
In this project, I plan to perform EDA and build a pipeline to process 4 disparate data sources into an OLAP (Online analytical processing) data layer:
1. I94 Immigration Data: This data comes from the US National Tourism and Trade Office
2. World Temperature Data: This dataset came from Kaggle
3. U.S. City Demographic Data: This data comes from OpenSoft
4. Airport Code Table: This is a simple table of airport codes and corresponding cities

## Describe and Gather Data
The I94 immigration data comes from the US National Tourism and Trade Office, but has been provided as part of this project in a series of SAS files (.sas7bdat). I have also been given a SAS report (I94_SAS_Labels_Descriptions.SAS) that provides valid and invalid values for a number of fields in this dataset (I have manually transformed this report into seperate .txt files for readability)

The data contains 28 fields, and there is multiple files with the file directory to consider. but I believe the below are likely to be of most relevance:

- cicid - Identifier for immigrating person to the US
- arrdate - arrival date
- i94port - airport that immigrating person arrived into
- count - count of people immigrating with?
- airline - airline travelled with
- fltno - flight number
- i94cit - Country of immigrating person
- depdate - Departure date
- i94visa - length of visa
- occup - occupation
- gender - gender
- visatype - type of visa
- i94addr - US address of immigrating person

The historical World Temperature Data comes from Kaggle, but has been provided as part of this project in csv format. The table only contains 7 fields, but I think the below will be most relevant:

- AverageTemperature = Average temperature
- City = City name
- Country = Country name (non US countries are probably not relevant)
- Date = Date of temperature

The U.S. City Demographic comes from OpenSoft, but has been provided as part of this project in csv format. The table only contains 12 fields, but I think the below will be most relevant: 
- City - City name                 
- State - State name             
- Median Age - Median age 
- Male Population - Male Population size       
- Female Population - Female Population size
- Foreign-born - Number of foreign born people

The Airport Code Table comes from DataHub, but has been provided as part of this project in csv format. The table contains 12 fields, but I think the below will be most relevant:
- name - Airport name
- type - type of airport
- elevation_ft - elevant of airport (in ft)
- iso_country - country of airport
- iso_region - region of airport
- municipality - district of airport

There is **no common join key throughout the different data sources**. However, they all relate to locations (location of immigration, location of temperature, location of city, location of airport) therefore I'm hopeful I can find a way to join these data sources

We'll start by checking we can read in all datasets and doing some basic checks to validate the data

## Exploratory Data Anlaysis (EDA)

### Airport Data

In [2]:
# Read in the airport data form csv into Pandas for exploration
fname = './airport-codes_csv.csv'
df_airports = pd.read_csv(fname, sep=',')

In [3]:
# Start with a simple row count
df_airports.shape[0]

55075

In [4]:
# Data has been read in without any errors, display first few rows
df_airports.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [5]:
# Continent and iata_code appear to have lots of NaN values. A summary of the tabel will show how well populated each of the fields are
df_airports.count()

ident           55075
type            55075
name            55075
elevation_ft    48069
continent       27356
iso_country     54828
iso_region      55075
municipality    49399
gps_code        41030
iata_code        9189
local_code      28686
coordinates     55075
dtype: int64

In [6]:
# continent, iata_code and local_code look to be poorly populated, so may not be fit for use (but are out of my scope). Check for duplication (assuming each airport has a unqiue 'ident' key)
len(df_airports[['ident']])-len(df_airports[['ident']].drop_duplicates())

0

In [7]:
# ident look to be the primary key. Check datatypes have been assigned correctly
df_airports.dtypes

ident            object
type             object
name             object
elevation_ft    float64
continent        object
iso_country      object
iso_region       object
municipality     object
gps_code         object
iata_code        object
local_code       object
coordinates      object
dtype: object

In [8]:
# Datatypes have not been assigned correctly and most numeric fields are showing as object/string. I will need to consider this as part of the data cleansing

Findings from airport data:
- ident is the primary key
- continent, iata_code and local_code but are not included in my scope
- Pandas cannot infer the datatypes correctly

### Cities Data

In [9]:
# Read in the cities data from csv into Pandas for exploration
fname = './us-cities-demographics.csv'
df_cities = pd.read_csv(fname, sep=';')

In [10]:
# Start with a simple row count
df_cities.shape[0]

2891

In [11]:
# Data has been read in without any errors, display first few rows
df_cities.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [12]:
# Data look to be of reasonable quality. Summary check will confirm this
df_cities.count()

City                      2891
State                     2891
Median Age                2891
Male Population           2888
Female Population         2888
Total Population          2891
Number of Veterans        2878
Foreign-born              2878
Average Household Size    2875
State Code                2891
Race                      2891
Count                     2891
dtype: int64

In [13]:
# Check for duplication (assuming a city name could be used in more than 1 state?)
len(df_cities[['City', 'State']])-len(df_cities[['City', 'State']].drop_duplicates())

2295

In [14]:
# There definitely looks to be a lot of duplication in this data. So will need to dedup as part of the data cleansing
# City and State should be the primary key. Check datatypes have been assigned correctly
df_cities.dtypes

City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int64
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object

Findings from cities data:
- All fields are well populated
- Dataset contains a large number of duplicates
- Pandas can infer the schema

### Global Land Temperature (by City) Data

In [15]:
# Read in the temperature data from csv into Pandas for exploration
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(fname, sep=',')

In [16]:
# Start with a simple row count
df_temperature.shape[0]

8599212

In [17]:
# Data has been read in without any errors, display first few entries
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [18]:
# Temperature fields (AverageTemperature and Average TemperatureUncertainty) appear to have lots of NaN values. These will likely need filtering out as part of data cleansing. A summary of the tabel will show how well populated each of the fields are
df_temperature.count()

dt                               8599212
AverageTemperature               8235082
AverageTemperatureUncertainty    8235082
City                             8599212
Country                          8599212
Latitude                         8599212
Longitude                        8599212
dtype: int64

In [19]:
# The above confirms that the temperature fields have 4% null values (not drastic, but the data is irrelevant for me without this). Quick check if this relates to a particular city/date/country that I don't need
notnull_temperature_df = df_temperature[df_temperature['AverageTemperature'].isnull()]

unique_city_num = notnull_temperature_df['City'].unique().shape[0]
unique_date_num = notnull_temperature_df['dt'].unique().shape[0]
unique_country_num = notnull_temperature_df['Country'].unique().shape[0]

print('Number of unique cities with NaN is:' + str(unique_city_num))
print('Number of unique dates with NaN is:' + str(unique_date_num))
print('Number of unique Countries with NaN is:' + str(unique_country_num))

Number of unique cities with NaN is:3323
Number of unique dates with NaN is:1772
Number of unique Countries with NaN is:157


In [20]:
# Doesn't look to be a specific issue and more of a general DQ issue that I should filter out in data cleansing. The remaining fields look to be well populated, but I'll an additional check to confirm that there aren't duplicates to consider
len(df_temperature[['dt','City','Country','Latitude','Longitude']])-len(df_temperature[['dt','City','Country','Latitude','Longitude']].drop_duplicates())

0

In [21]:
# No duplication. A final describe may provide some final insight
df_temperature.describe()

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty
count,8235082.0,8235082.0
mean,16.72743,1.028575
std,10.35344,1.129733
min,-42.704,0.034
25%,10.299,0.337
50%,18.831,0.591
75%,25.21,1.349
max,39.651,15.396


In [22]:
# We're only actually interested in temperatures for the US, find corresponding value
df_temperature['Country'].unique()
# Country == 'United States'
df_US = df_temperature[df_temperature['Country']=='United States'].count()
df_US

dt                               687289
AverageTemperature               661524
AverageTemperatureUncertainty    661524
City                             687289
Country                          687289
Latitude                         687289
Longitude                        687289
dtype: int64

Global Temperature data observations:
- some minor data quality issues with the temperature fields that should probably be filtered out. 
- The other fields seem fine and there is no identified duplication. 
- Once filtered down for united states, the dataset isn't too large. But may benefit from parallel processing (e.g. Spark)

### I94 Immegration Data
It has already been suggested that this data is quite large, and so it will be best to use Spark to perform any EDA for this

In [23]:
# Create Spark session with SAS7BDAT jar
spark = SparkSession\
.builder \
.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
.enableHiveSupport().getOrCreate()

In [24]:
# List of files in directory
immegration_path = '../../data/18-83510-I94-Data-2016'
immegration_files = [f for f in listdir(immegration_path) if isfile(join(immegration_path, f))]
print(immegration_files)

['i94_apr16_sub.sas7bdat', 'i94_sep16_sub.sas7bdat', 'i94_nov16_sub.sas7bdat', 'i94_mar16_sub.sas7bdat', 'i94_jun16_sub.sas7bdat', 'i94_aug16_sub.sas7bdat', 'i94_may16_sub.sas7bdat', 'i94_jan16_sub.sas7bdat', 'i94_oct16_sub.sas7bdat', 'i94_jul16_sub.sas7bdat', 'i94_feb16_sub.sas7bdat', 'i94_dec16_sub.sas7bdat']


In [25]:
# There are monthly files for the I94 data, we will just examine one at this stage
immigration_data = path.join(immegration_path,immegration_files[0])
df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(immigration_data)

In [26]:
# Start with a simple row count
df_immigration.count()

3096313

In [27]:
# Given that there is 12 months of files, we can expect there to be around 36m rows of I94 data. 
# Data has been read in without any errors, display first few entries
df_immigration.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [28]:
# Given the size and nature of this data. I'd expect there to be some null values.
# In I94_SAS_Labels_Descriptions.SAS, we can see that there is expected invalid values in the columns i94addr, i94port, i94cit & i94res. These should be filtered out during the data cleansing
# Check for duplicates (assume cicid is the primary key)
df_immigration.count() - df_immigration.dropDuplicates(['cicid']).count()


0

In [29]:
# There looks to be duplicate values in this data that should be filtered out 

Immigration Data Findings:
- There could be around 36m rows of this data to process, definitely warrents using spark for parralel efficiencies
- There are multiple files to consider
- There are invalid values that need filtering out of the data
- The date columns are in SAS data format (number of days since 1st Jan 1960)

# Step 2: Clean the Data and Data Tranformation
I plan to perform the following data cleaning.

For the I94 immigration data:
- DROP rows where i94port do not contain a valid value
- DROP rows where i94cit or i94 res do not contain valid values
- DROP rows where i94addr does not contain valid values
- change date columns from SAS dates to readable dates
- get dictionary values rather than keys where possible
- Derive a CITY and REGION code as foreign keys for temperature, cities and airport data 

For the temperature data:
- DROP rows where AverageTemperature is NaN
- DROP rows where country is not United States
- Upper case STRING columns that may be required as keys
- Cast dates to date format for readability

For the cities data:
- DROP duplicate rows based on City and State
- Assign a valid schema
- Upper case STRING columns that may be required as keys

For the airport data:
- Assign a valid schema
- Derive a REGION code as foreign keys for immigrant data 

Note, dictionaries have been created manually for i94addrl, i94cntyl, i94prtl. These contain the valid values from I94_SAS_Labels_Descriptions.SAS

### i94 Immigration Data

In [30]:


def get_valid_codes(filepath: str, delim:str ="="):
    """
    Reads a csv/txt dictionary file and transforms to dictionary
    
    args:
        filepath: file name & path
        delim: delimeter
        
    returns:
        dictionary
    """
    # Read txt file
    valid_df = pd.read_csv(filepath, sep='=', header=None)
    # convert df to dictionary 
    valid_dict = dict(zip(valid_df[0], valid_df[1]))
    return valid_dict

def read_and_clean_i94_data(filepath: str):
    """
    reads a single i94 SAS file and performs data cleansing:
        - DROP rows where i94port do not contain a valid value
        - DROP rows where i94cit or i94 res do not contain valid values
        - DROP rows where i94addr does not contain valid values
        - change date columns from SAS dates to readable dates
        - get dictionary values rather than keys where possible
        - Derive a CITY and REGION code as foreign keys for temperature, cities and airport data
    
    args:
        filepath: file name & path
        
    returns:
        Spark DataFrame
    """
    
    # get valid code dictionaries
    i94port_dict = get_valid_codes('./i94prtl_valid.txt')
    i94cit_i94res_dict = get_valid_codes('./i94cntyl_valid.txt')
    i94addr_dict = get_valid_codes('./i94addrl_valid.txt')
    
    # map i94port values
    mapping_expr = create_map([lit(x) for x in chain(*i94port_dict.items())])
    
    # Read I94 data into Spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(filepath)

    # Select required columns and filter invalid codes
    df_immigration = (
        df_immigration.select(
            [
                'cicid',
                'arrdate',
                'i94port',
                col('count').alias('cnt'),
                'airline', 
                'fltno',
                'i94cit', 
                'depdate',
                'i94visa',
                'occup',
                'gender',
                'visatype',
                'i94addr',
                
            ]
        )
        # Filter out entries where i94port is invalid
        .filter(df_immigration.i94port.isin(list(i94port_dict.keys())))
        # Filter out entries where i94cit is invalid
        .filter(df_immigration.i94cit.isin(list(i94cit_i94res_dict.keys())))
        # Filter out entries where i94res is invalid
        .filter(df_immigration.i94res.isin(list(i94cit_i94res_dict.keys())))
        # Filter out entries where i94addr is invalid
        .filter(df_immigration.i94addr.isin(list(i94addr_dict.keys())))
        # Convert SAS dates to meaningful dates
        .withColumn('arrdate', expr("date_add(to_date('1960-01-01'), arrdate)"))
        .withColumn('depdate', expr("date_add(to_date('1960-01-01'), depdate)"))
        # Lookup i94port to get City and Region
        .withColumn("CityRegion", mapping_expr.getItem(col("i94port")))
    )
    
    # Create CITY and REGION keys
    df_immigration = (
        df_immigration
        .withColumn("City", expr("trim(split(CityRegion, ',')[0])"))
        .withColumn("Region", expr("trim(split(CityRegion, ',')[1])"))
        .drop("CityRegion")
    )
    
    return df_immigration

### temperature Data

In [31]:

def read_and_clean_temperature_data(filepath: str):
    """
    reads temperature csv file and performs data cleansing:
        - DROP rows where AverageTemperature is NaN
        - DROP rows where country is not United States
        - Upper case STRING columns that may be required as keys
        - Cast dates to date format for readability
    
    args:
        filepath: file name & path
        
    returns:
        Spark DataFrame
    """    
    # Read temperature data into Spark
    df_temperature = (
        spark.read.format("csv") 
        .option("header",True)
        .load(filepath)  
    )
    
    # Filter missing temperatures and United States Cities
    df_temperature = df_temperature.filter(
        "AverageTemperature IS NOT NULL AND Country == 'United States'"
    )
    
    
    # Covert dt to readable date and reorder fields
    df_temperature = (
        df_temperature.select(['Country', 'City', 'dt', 'AverageTemperature']) \
        .withColumn('dt', to_date(unix_timestamp(col('dt'), 'yyyy-MM-dd').cast("timestamp")))
        .withColumn('Country', upper(col('Country')))
        .withColumn('City', upper(col('City')))
    )
    
    return df_temperature

### Cities Data

In [32]:


def read_and_clean_cities_data(filepath: str):
    """
    reads cities csv file and performs data cleansing:
        -DROP duplicate rows based on City and State
        -Assign a valid schema
        -Upper case STRING columns that may be required as keys
    
    args:
        filepath: file name & path
        
    returns:
        Spark DataFrame
    """ 
    
    # Define schema
    citySchema = StructType(
        [
            StructField('City', StringType(), True),
            StructField('State', StringType(), True),
            StructField('MedianAge', DoubleType(), True),
            StructField('MalePopulation', LongType(), True),
            StructField('FemalePopulation', LongType(), True),
            StructField('NumberOfVeterans', LongType(), True),
            StructField('Foreign-born', StringType(), True),     
            StructField('AverageHouseholdSize', IntegerType(), True),
            StructField('Race', StringType(), True),
            StructField('Count', StringType(), True)
        ]                     
    )
    
    # Read cities data into Spark and assign schema
    df_cities = (
        spark.read.format("csv") 
        .option("header",True)
        .option("delimiter", ";")
        .schema(citySchema)
        .load(filepath)  
        .select(
            ['City','State','MedianAge','MalePopulation','FemalePopulation','Foreign-born']
        )
    )
    
    # Drop Duplicates and UPPER case keys
    df_cities = (
        df_cities
        .withColumn('City', upper(col('City')))
        .withColumn('State', upper(col('State')))
        .drop_duplicates(subset=['City', 'State'])
    )
    
    return df_cities

### Airport Data

In [33]:
def read_and_clean_airport_data(filepath: str):
    """
    reads airport csv file and performs data cleansing:
        -Assign a valid schema
        -Derive a REGION code as foreign keys for immigrant data
    args:
        filepath: file name & path
        
    returns:
        Spark DataFrame
    """     
    
    # Define schema    
    airportSchema = StructType(
        [
            StructField('ident', StringType(), True),
            StructField('type', StringType(), True),
            StructField('name', StringType(), True),
            StructField('elevation_ft', IntegerType(), True),
            StructField('continent', StringType(), True),
            StructField('iso_country', StringType(), True),
            StructField('iso_region', StringType(), True),     
            StructField('municipality', StringType(), True),
            StructField('gps_code', StringType(), True),
            StructField('iata_code', StringType(), True),
            StructField('local_code', StringType(), True),
            StructField('coordinates', StringType(), True),
        ]                     
    )
    
    # Read cities data into Spark and assign schema
    df_airport = (
        spark.read.format("csv") 
        .option("header",True)
        .schema(airportSchema)
        .load(filepath)  
        .select(
            ['name','type','elevation_ft','iso_country','iso_region','municipality']
        )
    )
    
    # Derive REGION key
    df_airport = (
        df_airport
        .withColumn("Region", expr("trim(split(iso_region, '-')[1])"))
        .drop("iso_region")
    )
    
    return df_airport

# Step 3: Define the Data Model
## 3.1 Conceptual Data Model

I have decided to create 5 tables based on a STAR schema design.

- The FACT table will a record of people immigrating into the United States and high level attributes
- There will then be 4 DIMENSION tables that will contain additional attributes about:
    - The people immigrating
    - The airports they're immigrating to
    - The temperature where they're immigrating to
    - The cities they're immigrating to
    
The IMMIGRATION_FACT fact table will contain the events from the I94 immigration data (people immigrating to the United States). The data dictionary and lineage for this table will be will be:

| Table            | Variable         | DataType    | Source File             | Source Variable  | Description           |
| ---------------- | ---------------- | ----------- | ----------------------- | ---------------- | --------------------- |
| IMMIGRATION_FACT | cicid 			  | STRING      | i94_<MONYY>_sub.sas7bdat| cicid 			 | Citizen ID            |
| IMMIGRATION_FACT | arrdate          | STRING      | i94_<MONYY>_sub.sas7bdat| arrdate          | Arrival Date          |
| IMMIGRATION_FACT | i94port          | DOUBLE      | i94_<MONYY>_sub.sas7bdat| i94port          | Arrival Airport       |
| IMMIGRATION_FACT | count            | LONG        | i94_<MONYY>_sub.sas7bdat| count            | Arrival Count         |
| IMMIGRATION_FACT | airline          | LONG        | i94_<MONYY>_sub.sas7bdat| airline          | Airline Name          |
| IMMIGRATION_FACT | fltno            | STRING      | i94_<MONYY>_sub.sas7bdat| fltno            | Flight Number         |
| IMMIGRATION_FACT | i94cit           | STRING      | i94_<MONYY>_sub.sas7bdat| i94cit           | Immigrating Country   |

The IMMIGRANT_DIM dimension table will contain additional immigrant level attibutes. The data dictionary and lineage for this table will be will be:

| Table            | Variable         | DataType    | Source File             | Source Variable  | Description           |
| ---------------- | ---------------- | ----------- | ----------------------- | ---------------- | --------------------- |
| IMMIGRATION_DIM  | cicid 			  | STRING      | i94_<MONYY>_sub.sas7bdat| cicid 			 | Citizen ID            |
| IMMIGRATION_DIM  | arrdate          | STRING      | i94_<MONYY>_sub.sas7bdat| arrdate          | Arrival Date          |
| IMMIGRATION_DIM  | i94port          | DOUBLE      | i94_<MONYY>_sub.sas7bdat| i94port          | Arrival Airport       |
| IMMIGRATION_DIM  | depdate          | STRING      | i94_<MONYY>_sub.sas7bdat| depdate          | Departure Date        |
| IMMIGRATION_DIM  | i94visa          | STRING      | i94_<MONYY>_sub.sas7bdat| i94visa          | Visa Type 1           |
| IMMIGRATION_DIM  | occup            | STRING      | i94_<MONYY>_sub.sas7bdat| occup            | Occupation            |
| IMMIGRATION_DIM  | gender           | STRING      | i94_<MONYY>_sub.sas7bdat| gender           | Gender                |
| IMMIGRATION_DIM  | visatype         | STRING      | i94_<MONYY>_sub.sas7bdat| visatype         | Visa Type 2           |
| IMMIGRATION_DIM  | i94addr          | STRING      | i94_<MONYY>_sub.sas7bdat| i94addr          | Immigrating Address   |

The CITIES_DIM table will be a dimension table contains cities and city level attributes. The data dictionary and lineage for this table will be will be:

| Table            | Variable         | DataType    | Source File               | Source Variable  | Description           |
| ---------------- | ---------------- | ----------- | ------------------------- | ---------------- | --------------------- |
| CITY_DIM         | City             | STRING      | us-cities-demographics.csv| City             | City                  |
| CITY_DIM         | State            | STRING      | us-cities-demographics.csv| State            | State                 |
| CITY_DIM         | MedianAge        | DOUBLE      | us-cities-demographics.csv| MedianAge        | Median Age            |
| CITY_DIM         | MalePopulation   | LONG        | us-cities-demographics.csv| MalePopulation   | Male Population Count |
| CITY_DIM         | FemalePopulation | LONG        | us-cities-demographics.csv| FemalePopulation | FemalePopulation      |
| CITY_DIM         | Foreign-born     | STRING      | us-cities-demographics.csv| Foreign-born     | Foreign-born          |


The TEMPERATUTE_DIM dimension will be a dimension table containing historic temperatures corresponding to a City. The data dictionary and lineage for this table will be will be:

| Table            | Variable           | DataType    | Source File                     | Source Variable    | Description           |
| ---------------- | ------------------ | ----------- | ------------------------------- | ------------------ | --------------------- |
| TEMPERATURE_DIM  | Country            | STRING      | GlobalLandTemperaturesByCity.csv| Country            | Country               |
| TEMPERATURE_DIM  | dt                 | DATE        | GlobalLandTemperaturesByCity.csv| dt                 | date                  |
| TEMPERATURE_DIM  | AverageTemperature | DOUBLE      | GlobalLandTemperaturesByCity.csv| AverageTemperature | Average Temperature   |
| TEMPERATURE_DIM  | City               | STRING      | GlobalLandTemperaturesByCity.csv| City               | City                  |

The AIRPORT_DIM dimension will be a dimension table containing airport level information corresponding to a Region. The data dictionary and lineage for this table will be will be:
    
| Table            | Variable         | DataType    | Source File      | Source Variable  | Description           |
| ---------------- | ---------------- | ----------- | ---------------- | ---------------- | --------------------- |
| AIRPORT_DIM      | name             | STRING      | airport-codes_csv| name             | airport name          |
| AIRPORT_DIM      | type             | STRING      | airport-codes_csv| type             | airport type          |
| AIRPORT_DIM      | elevation_ft     | DOUBLE      | airport-codes_csv| elevation_ft     | Elevation (ft)        |
| AIRPORT_DIM      | iso_country      | STRING      | airport-codes_csv| iso_country      | Country               |
| AIRPORT_DIM      | region           | STRING      | airport-codes_csv| isoregion        | region			      |
| AIRPORT_DIM      | municipality     | STRING      | airport-codes_csv| municipality     | district              |  

 All tables will be saved to Parquet files under "/data_model" and will be partitioned accordingly.

## 3.2 Mapping Out Data Pipelines
The pipeline steps are described below:

1. Loop through available I94 immigration files, extract into Spark dataframe, clean/transform and load to parquet
2. Get distinct foreign key values from immigration data
3. Extract into Spark DataFrame, Transform and Load temperature data
4. Extract into Spark DataFrame, Transform and Load city data
5. Extract into Spark DataFrame, Transform and Load airport data

# Step 4: Run Pipelines to Model the Data

## 4.1 Create the data model
Build the data pipelines to create the data model.

### 1. Loop through available I94 immigration files, read into Spark dataframe, clean/transform and load to parquet

In [36]:
immigration_path = '../../data/18-83510-I94-Data-2016'

# to run for all files uncomment the below
immigration_files = [f for f in listdir(immegration_path) if isfile(join(immegration_path, f))]
# immigration_files = ['i94_jan16_sub.sas7bdat', 'i94_feb16_sub.sas7bdat', 'i94_mar16_sub.sas7bdat']

def extract_transform_load_immigrations(file_list: list):
    """
    Takes a list of  immegration filepaths and loops through:
        - Loading into Spark Dataframe clean
        - Seperate out into FACT and DIMENSION data
        - Write to parquet
    args:
        file_list: A list of file paths
        
    returns:
        None
    """  
    
    #Loop through file lits
    for f in file_list:
        print('processing file: ' + f)
        try:
            # get full file path
            filepath=join(immigration_path, f)
            
            # read and clean i94 data
            stg_immigration = read_and_clean_i94_data(filepath)
            
        except:
            raise Exception('failed to read i94 data') 
        try:
            # Extract columns for immigration dimension and fact table
            immigration_fact_table = stg_immigration.select(["cicid", "arrdate", "i94port", "cnt", "airline", "fltno", "i94cit", "City", "Region"])
            immigration_dim_table = stg_immigration.select(["cicid", "arrdate", "depdate", "i94visa", "occup", "gender", "visatype", "i94addr"])
            
            # Write temperature dimension table to parquet files partitioned by arrdate
            immigration_fact_table.write.mode("append").partitionBy("arrdate").parquet("/data_model/immigration_fact.parquet")
            immigration_dim_table.write.mode("append").partitionBy("arrdate").parquet("/data_model/immigration_dim.parquet")
        except:
            raise Exception('failed to write fact and dimension table to parquet')
    
        print('finished processing immigration file: ' + f)
    print('finished processing all immigration files')
    
extract_transform_load_immigrations(immigration_files)

processing file: i94_apr16_sub.sas7bdat
finished processing immigration file: i94_apr16_sub.sas7bdat
processing file: i94_sep16_sub.sas7bdat
finished processing immigration file: i94_sep16_sub.sas7bdat
processing file: i94_nov16_sub.sas7bdat
finished processing immigration file: i94_nov16_sub.sas7bdat
processing file: i94_mar16_sub.sas7bdat
finished processing immigration file: i94_mar16_sub.sas7bdat
processing file: i94_jun16_sub.sas7bdat
finished processing immigration file: i94_jun16_sub.sas7bdat
processing file: i94_aug16_sub.sas7bdat
finished processing immigration file: i94_aug16_sub.sas7bdat
processing file: i94_may16_sub.sas7bdat
finished processing immigration file: i94_may16_sub.sas7bdat
processing file: i94_jan16_sub.sas7bdat
finished processing immigration file: i94_jan16_sub.sas7bdat
processing file: i94_oct16_sub.sas7bdat
finished processing immigration file: i94_oct16_sub.sas7bdat
processing file: i94_jul16_sub.sas7bdat
finished processing immigration file: i94_jul16_sub

### 2. Get distinct foreign key values from immigration data

In [38]:
# For the other dimension tables, we want to perform an inner join to the full fact table to remove all the values from the dimension tables that aren't relevant data because they won't match to the fact table. 
# This will minize the amount of data needed to be stored

distinct_fact_cities = spark.read.parquet("/data_model/immigration_fact.parquet/").select('City').distinct()
distinct_fact_region = spark.read.parquet("/data_model/immigration_fact.parquet/").select('Region').distinct()

### 3. Extract into Spark DataFrame, Transform and Load temperature data

In [45]:
# Extract columns for temperature dimension table
stg_temperature_dim_table = read_and_clean_temperature_data('../../data2/GlobalLandTemperaturesByCity.csv').alias('stg_temperature')

temperature_dim_table = (
    stg_temperature_dim_table
    .join(distinct_fact_cities, on = ['City'], how='inner')
    .select('stg_temperature.*')
)

temperature_dim_table.write.mode("overwrite").partitionBy("City").parquet("/data_model/temperature_dim.parquet")

### 4. Extract into Spark DataFrame, Transform and Load Cities data

In [40]:
# Extract columns for city dimension table
stg_city_dim_table = read_and_clean_cities_data('./us-cities-demographics.csv').alias('stg_city')

city_dim_table = (
    stg_city_dim_table
    .join(distinct_fact_cities, on = ['City'], how='inner')
    .select('stg_city.*')
)

city_dim_table.write.mode("overwrite").parquet("/data_model/city_dim.parquet")

### 5. Extract into Spark DataFrame, Transform and Load airport data

In [47]:
# Extract columns for airport dimension table
stg_airport_dim_table = read_and_clean_airport_data('./airport-codes_csv.csv').alias('stg_airport')

airport_dim_table = (
    stg_airport_dim_table
    .join(distinct_fact_region, on = ['Region'], how='inner')
    .select('stg_airport.*')
)

airport_dim_table.write.mode("overwrite").parquet("/data_model/airport_dim.parquet")

## 4.2 Data Quality Checks
The data quality check will ensure there are adequate number of entries in each table.

In [49]:
def perform_data_quality_checks(table_name, primary_key=None):
    '''
    Input: Spark dataframe, description of Spark dataframe
    
    Output: Print outcome of data quality check
    
    '''
    parquet_path = "/data_model/" + table_name + ".parquet/"
    # first check table exists
    try:
        df = spark.read.parquet(parquet_path)
        print(f'Data Quality check passed: {table_name} exists and can be read by spark')
    except:
        raise Exception(f'Data Quality check failed: table does not exist or failed to read {table_name}')
    
    # Next check table is not empty
    try:
        result = df.count() 
        if result == 0:
            raise Exception(f'Data quality check failed: {table_name} has zero rows')
        else:
            print(f'Data quality check passed: {table_name} has {result} records')
    except:
        raise Exception(f'Data quality check failed: Failed to run row count')
    
    # Next check the primary key is valid
    if primary_key != None:
        try:
            result = df[primary_key].count() - df.dropna(subset=primary_key).count()
            if result >0:
                raise Exception(f'Data Quality check failed: {primary_key} contains null values for table {table_name}')
            else:
                print(f'Data Quality check passed: {primary_key} contains no null values for table {table_name}')
        except:
            raise Exception(f'Data quality check failed: Failed to null value primary key check for table {table_name}')
        try:
            df.count() - df.dropDuplicates(primary_key).count()
            if result > 0:
                raise Exception(f'Data Quality check failed: {table_name} has duplicates for primary key {primary_key}')
            else:
                print(f'Data Quality check passed: {table_name} has no duplicates')
        except:
            raise Exception((f'Data Quality check failed: failed to run duplicates check'))
            
    return print(f'Data Qualkity checks complete and passed for table {table_name}')
    
# Perform data quality check
perform_data_quality_checks("immigration_fact", ['cicid'])
perform_data_quality_checks("immigration_dim", ['cicid'])
perform_data_quality_checks("temperature_dim", ['City','dt'])
perform_data_quality_checks("city_dim", ['City'])
perform_data_quality_checks("airport_dim", ['name'])
                      

Data Quality check passed: immigration_fact exists and can be read by spark
Data quality check passed: immigration_fact has 33658753 records
Data Quality check passed: ['cicid'] contains no null values for table immigration_fact
Data Quality check passed: immigration_fact has no duplicates
Data Qualkity checks complete and passed for table immigration_fact
Data Quality check passed: immigration_dim exists and can be read by spark
Data quality check passed: immigration_dim has 33658753 records
Data Quality check passed: ['cicid'] contains no null values for table immigration_dim
Data Quality check passed: immigration_dim has no duplicates
Data Qualkity checks complete and passed for table immigration_dim
Data Quality check passed: temperature_dim exists and can be read by spark
Data quality check passed: temperature_dim has 209783 records
Data Quality check passed: ['City', 'dt'] contains no null values for table temperature_dim
Data Quality check passed: temperature_dim has no duplicat

## 4.3 Data dictionary
See Section 3.1 for data dictionaries

# Step 5: Complete Project Write Up

## Clearly state the rationale for the choice of tools and technologies for the project.
After performing some basic EDA and analysising the size of the data, Spark was chosen over Pandas because of it's efficiency benefits when processing large ammounts of data and it's ability to support multiple file formats (including .csv, .txt and .sas7bdat). Pyspark was chosen as a the programming tool/language throughout the pipeline in order to remain consistent and avoid changing between pyspark, sparkSQL and pyspark. The workspace resources proved capabale for handling the volume of data and given time constraints and that all the data was already present in the workspace, I avoided using cloud services (but this would be a consideration in the future to further expand my skillset).

## Propose how often the data should be updated and why.
Because the immigration files appear to be delivered in monthly batches, I propose this pipeline should be run on a monthly basis. There may be a requirement to update the airport, cities and temperature data on an ad-hoc or more frequent basis.

## Write a description of how you would approach the problem differently under the following scenarios:

### The data was increased by 100x.
If the data was increased by 100x, it is unlikely we would be able to continue to efficiently process the data using the available workspace resources, unless we were to break the data down into smaller chunks but then we may result in inefficiencies due to the number of partitions. It is likely we would consider using a remote spark cluster mode (with a cluster manager such as Yarn). This could be part of a business' exist on-premise infrastructure or exist in a public or private cloud (such as AWS, GCP or Azure).

### The data populates a dashboard that must be updated on a daily basis by 7am every day.
Currently this pipeline is being manually executed, so it is probably not suitable for a 7am SLA. A scheduling tool such as Airflow would be suitable for this scenario and would allow the pipeline to either run on a schedule or be triggered by the data updates.

### The database needed to be accessed by 100+ people.
If the database needed to be accessed by 100+ people, we could consider rendering the parquet files to HDFS and so that we could control user permission and give read access to the users that need it. We could also use technology such as HIVE to build a SQL-like layer on top of the data, allowing for for users with various skillsets to leverage the data. We could also consider other Big Data technology such a Google BigQuery or AWS Redshift.