# Building Data Warehouse with U.S Immigration data
### Data Engineering Capstone Project

#### Project Summary
This project uses the U.S immigration and several satellite datasets to help understand immigration trends in the U.S. The data is aggregated and stored in a data warehouse to be used for analytics, such as: 
- understanding which airports are the most popular. Knowing this can help, for example, with airport infrastructure improvements
- understanding which states are the most popular
- how travel trends depend on area demographics
- how many students/business travellers are there and to which areas they travel

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
import pandas as pd
import numpy as np
import os
import glob
import hashlib
import uuid
import re
import configparser
import psycopg2
import pyspark.sql.functions as F
from pyspark.sql.types import *

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project the data is ingested from source files into S3, then cleaned using Spark, saved to parquet and after that loaded into Redshift data warehouse. Data analysts will be able to analyse data in Redshift.
The process is as follows:
1. Ingest data from source files (parquet, csv) into the raw datalake.
2. Read raw data from datalake, clean it and save back to S3 (cleansed layer).
3. Build data model from cleaned data and save back to S3 (application layer).
4. Load data from application layer into Redshift for analysis using star schema.

#### Data Description
Currently data is collected from 3 sources:
- __US National Tourism and Trade Office__: [link](https://www.trade.gov/national-travel-and-tourism-office)
 I-94 is the U.S Visitor Arrivals Program which provides a count of visitor arrivals to the United States (with stays of 1-night or more and visiting under certain visa types) to calculate U.S. travel and tourism volume exports. It includes information about a country where a person came from, departure date, arrival state, persons' details (birth year, gender), visa category.
 This data is in parquet format in *sas_data* folder. Come columns (country, state, port, visa type) use numeric codes. The definitions for these codes are in sas_metadata folder in separate csv file for every column.
- __U.S. City Demographic Data__: [link](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)
Contains the following data:
 *City*, *State*, *MedianAge*, *Male Population*, *Female Population*, *Total Population*, *Number of Veterans*, *Foreign-born*, *Average Household Size*, *State Code*, *Race*, *Count*
- __Airport Code Table__: [link](https://datahub.io/core/airport-codes#data)
Contains the following data: 
 *ident*, *type*, *name*, *elevation_ft*, *continent*, *iso_country*, *iso_region*, *municipality*, *gps_code*, *iata_code*, *local_code*, *coordinates*

In [None]:
CONFIG_FILE_KEY = 'config.cfg'

config = configparser.ConfigParser()
config.read(CONFIG_FILE_KEY)

os.environ['AWS_ACCESS_KEY_ID'] = config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

In [None]:
DATALAKE_RAW_PATH = config.get('S3', 'DATALAKE_RAW_PATH')
DATALAKE_CLEAN_PATH = config.get('S3', 'DATALAKE_CLEAN_PATH')
DATALAKE_APPLICATION_PATH = config.get('S3', 'DATALAKE_APPLICATION_PATH')

In [None]:
#Read sample data
fname = 'immigration_data_sample.csv'
df = pd.read_csv(fname)

In [None]:
df.head()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('Udacity Capstone Project') \
    .master('local[4]') \
    .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
    .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .enableHiveSupport() \
    .getOrCreate()

In [None]:
spark

In [None]:
df_spark = spark.read.parquet('sas_data/*')

In [None]:
df_spark.printSchema()

In [None]:
# Make sure we can see all columns
pd.set_option('display.max_columns', 50)
pd.set_option('display.max_colwidth', 100)

In [None]:
df_spark.head(2)

In [None]:
# Convert double data types to integer
df_spark = df_spark.withColumn('cicid', F.col('cicid').cast(IntegerType())) \
                    .withColumn('i94yr', F.col('i94yr').cast(IntegerType())) \
                    .withColumn('i94mon', F.col('i94mon').cast(IntegerType())) \
                    .withColumn('i94cit', F.col('i94cit').cast(IntegerType())) \
                    .withColumn('i94res', F.col('i94res').cast(IntegerType())) \
                    .withColumn('arrdate', F.col('arrdate').cast(IntegerType())) \
                    .withColumn('i94mode', F.col('i94mode').cast(IntegerType())) \
                    .withColumn('depdate', F.col('depdate').cast(IntegerType())) \
                    .withColumn('i94bir', F.col('i94bir').cast(IntegerType())) \
                    .withColumn('i94visa', F.col('i94visa').cast(IntegerType())) \
                    .withColumn('count', F.col('count').cast(IntegerType())) \
                    .withColumn('biryear', F.col('biryear').cast(IntegerType())) \
                    .withColumn('admnum', F.col('admnum').cast(IntegerType()))

In [None]:
df_spark.head(5)

#### Save I-94 Immigration data to S3

In [None]:
df_spark.write \
    .partitionBy("i94yr", "i94mon") \
    .save(path=DATALAKE_RAW_PATH+'/i94-immigration', source='parquet', mode='overwrite')

### Read U.S. demographics data

In [None]:
demographics_data_fname = 'us-cities-demographics.csv'
df_demographics_data = spark.read.format("csv") \
  .option("sep", ";")\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .load(demographics_data_fname)
df_demographics_data.head(5)

In [None]:
df_demographics_data.columns

In [None]:
df_demographics_data.dtypes

In [None]:
# Replace spaces in columns names with underscores
df_demographics_data = df_demographics_data.toDF(*(c.replace(' ', '_') for c in df_demographics_data.columns))

In [None]:
df_demographics_data.show(5)

#### Save U.S demographics data to S3

In [None]:
df_demographics_data.coalesce(4).write \
    .partitionBy("State_Code") \
    .save(path=DATALAKE_RAW_PATH+'/us-demographics', source='parquet', mode='overwrite')

### Load Airport codes

In [None]:
airport_codes_data_fname = 'airport-codes_csv.csv'
df_airport_codes_data = spark.read.format("csv") \
  .option("sep", ",")\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .load(airport_codes_data_fname)

In [None]:
df_airport_codes_data.head(5)

#### Save Airport data to S3

In [None]:
df_airport_codes_data.write \
    .save(path=DATALAKE_RAW_PATH+'/airports', source='parquet', mode='overwrite')

### Load SAS metadata files

In [None]:
BASE_METADATA_PATH = './sas_metadata'
sas_metadata_files = glob.glob(BASE_METADATA_PATH+"/*.csv")

[os.path.basename(x) for x in sas_metadata_files]

In [None]:
metadataSchema = StructType([
    StructField("Code", StringType(), False),
    StructField("Value", StringType(), False)
])

sas_metadata_dfs = dict()
for fname in [os.path.basename(x) for x in sas_metadata_files]:
    print(f"{BASE_METADATA_PATH}/{fname}, key={fname.split('.')[0]}")
    sas_metadata_dfs[fname.split('.')[0]] = spark.read.format("csv") \
      .option("sep", "=")\
      .option("header", "false")\
      .load(f'{BASE_METADATA_PATH}/{fname}', schema=metadataSchema)


In [None]:
sas_metadata_dfs['i94addrl'].toPandas().shape

In [None]:
# Have a look at the data
sas_metadata_dfs['i94cntyl'].head(5)

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
These steps will be performed to clean the data:
1. Remove the columns that won't be used in the final model
2. Check for nulls and remove rows with nulls
3. Remove duplicates
4. Filter out redundant data

#### Clean Immigration data

In [None]:
# 1. Remove columns that are not needed
columns_to_drop = ['count', 'dtadfile', 'visapost', 'occup','entdepa', 'entdepd', 'entdepu', 'matflag', 'dtaddto', 'insnum', \
                   'airline', 'admnum', 'fltno']
df_spark_clean = df_spark.drop(*columns_to_drop)
#df_spark_clean.columns

In [None]:
is_null_or_nan = F.udf(lambda col: F.isnan(F.col(col)) | F.col(col).isNull(), BooleanType())

In [None]:
# 2. Check for null and missing values
df_spark_clean.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df_spark_clean.columns]).toPandas().head()

In [None]:
df_spark_clean.count()

In [None]:
# 3. Drop rows with NULLs. Since there's not too many nulls, it won't affect data analysis.
df_spark_clean_no_na = df_spark_clean.na.drop(subset=["i94mode", 'i94addr', 'i94bir', 'biryear', 'gender'])

In [None]:
df_spark_clean_no_na.count()

In [None]:
# 4. Drop duplciates
df_spark_clean_no_na = df_spark_clean_no_na.drop_duplicates()

In [None]:
# 5. Parse dates

According to this http://www.scsug.org/wp-content/uploads/2018/10/Horstman_SCSUG2018_Dating_for_SAS_Programmers.pdf, a SAS date value is stored as the number of days since January 1, 1960. Thus, the date January 1,
1960, corresponds to a value of zero. Likewise, January 2, 1960, would be represented as 1 

In [None]:
BASE_SAS_DATE = '1960-01-01'
df_spark_clean_no_na = df_spark_clean_no_na.withColumn('arrival_date', F.expr(f"date_add(to_date('{BASE_SAS_DATE}'), arrdate)")) \
                        .withColumn('departure_date', F.expr(f"date_add(to_date('{BASE_SAS_DATE}'), depdate)")) \
                        .drop('arrdate') \
                        .drop('depdate')

In [None]:
df_spark_clean_no_na.head()

#### Clean U.S. demographics data

In [None]:
# Check for NULLs and missing values
df_demographics_data.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df_demographics_data.columns]).show()

In [None]:
# Drop rows with missing values
df_demographics_data_no_na = df_demographics_data.na.drop()

In [None]:
# Drop duplicates
df_demographics_data_clean = df_demographics_data_no_na.drop_duplicates()
df_demographics_data_clean.show(5)

In [None]:
# Verify the states are correct
df_demographics_data_no_na.select('State_Code').distinct().sort('State_Code').show()

In [None]:
df_demographics_data_no_na.count()

In [None]:
df_demographics_data_no_na.head()

In [None]:
df_demographics_data_clean = df_demographics_data_no_na \
    .coalesce(1)

In [None]:
df_demographics_data_clean \
    .withColumn("state_abbr", F.col('state_code')) \
    .write \
    .partitionBy("state_abbr") \
    .save(path=DATALAKE_CLEAN_PATH+'/us-demographics', source='parquet', mode='overwrite')

In [None]:
df_demographics_data_clean.show(5)

#### Clean airports data

In [None]:
# Check for NULLs and missing values
df_airport_codes_data.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df_airport_codes_data.columns]).show()

In [None]:
# Drop rows with missing values
df_airport_codes_data_no_na = df_airport_codes_data.na.drop()

In [None]:
# Drop duplicates
df_airport_codes_data_clean = df_airport_codes_data_no_na.drop_duplicates()

In [None]:
df_airport_codes_data_clean.toPandas().head()

Airport data contains airports from all over the world. 
For this project we're only interested in US airports so let's leave only them, thus reducing the data set size.

In [None]:
# First, let's see what countries are there
df_airport_codes_data_clean.select('iso_country').distinct().sort('iso_country').show(200)

In [None]:
df_airport_codes_data_clean = df_airport_codes_data_clean.filter(F.col('iso_country')=='US')

In [None]:
df_airport_codes_data_clean.count()

Column iso_region is in format "US-StateCode". Let's extract state code into a separate column

In [None]:
# First, let's have a look at all the state codes
df_airport_codes_data_clean.select('iso_region').distinct().sort('iso_region').show(60)

In [None]:
df_airport_codes_data_clean = df_airport_codes_data_clean \
                                .withColumn("state_code", F.split(F.col("iso_region"),"-").getItem(1).alias("state_code")) \
                                .drop('iso_region')

In [None]:
df_airport_codes_data_clean.show(5)

In [None]:
# Remove 'continent', 'iso_country' columns since we're only using US airports data
df_airport_codes_data_clean = df_airport_codes_data_clean.drop(*['continent', 'iso_country'])

In [None]:
df_airport_codes_data_clean.show(2)

#### Save cleaned airports data

In [None]:
df_airport_codes_data_clean = df_airport_codes_data_clean \
    .withColumn('state', F.col('state_code')) \
    .coalesce(1)

In [None]:
df_airport_codes_data_clean \
    .write \
    .partitionBy("state") \
    .mode('overwrite') \
    .parquet(DATALAKE_CLEAN_PATH+'/airports')

#### Clean metadata
First, we need to remove extra characters from the values
'i94cntyl.csv', 'i94visa.csv', 'i94prtl.csv', 'i94model.csv', 'i94addrl.csv

In [None]:
def clean_extra_characters(val):
    return val.strip(" '")

clean_extra_characters_udf = F.udf(clean_extra_characters, StringType())

__Clean countries__

In [None]:
sas_metadata_dfs['i94cntyl'].head(5)

In [None]:
sas_countries_df = sas_metadata_dfs['i94cntyl'].withColumn('Code', F.regexp_extract(F.col('Code'), '(\d+)', 1)) \
                    .withColumn('Value', F.regexp_extract(F.col('Value'), '([a-zA-Z0-9\.,\s\-\(\)]+[a-zA-Z0-9\.\)])', 1))
sas_countries_df.head(5)

__Clean airports (from SAS metadata)__

In [None]:
sas_metadata_dfs['i94prtl'].head(5)

In [None]:
sas_airports_df = sas_metadata_dfs['i94prtl'].withColumn('Code', F.regexp_extract(F.col('Code'), '([A-Za-z\d]{2,3}+)', 1)) \
                    .withColumn('Value', F.regexp_extract(F.col('Value'), '([a-zA-Z0-9\.,\s\-\(\)]+[a-zA-Z0-9\.\)])', 1))
sas_airports_df.head(5)

Value column in Airports is in format "City, State". Let's split them and place in 2 different columns.

In [None]:
sas_airports_df = sas_airports_df.select('Code', \
                       F.split(F.col("Value"),", ").getItem(0).alias("City"), \
                       F.split(F.col("Value"),", ").getItem(1).alias("StateCode")
                      )

Verify the split worked as expected

In [None]:
sas_airports_df.head(5)

__Clean states__

In [None]:
sas_metadata_dfs['i94addrl'].head(5)

In [None]:
sas_states_df = sas_metadata_dfs['i94addrl'].withColumn('Code', F.regexp_extract(F.col('Code'), '([A-Za-z\d]{2}+)', 1)) \
                    .withColumn('Value', F.regexp_extract(F.col('Value'), '([a-zA-Z0-9\.\s\-]+[a-zA-Z0-9\.\)])', 1))
sas_states_df.head(5)

__Clean modes of arrival__

In [None]:
sas_metadata_dfs['i94model'].head(5)

In [None]:
sas_modes_df = sas_metadata_dfs['i94model'].withColumn('Code', F.regexp_extract(F.col('Code'), '(\d{1})', 1)) \
                    .withColumn('Value', F.regexp_extract(F.col('Value'), '([a-zA-Z0-9\s]+[a-zA-Z0-9])', 1))
sas_modes_df.head(5)

__Clean visa type__

In [None]:
sas_metadata_dfs['i94visa'].head(5)

In [None]:
sas_visa_types_df = sas_metadata_dfs['i94visa'].withColumn('Code', F.regexp_extract(F.col('Code'), '(\d{1})', 1)) \
                    .withColumn('Value', F.regexp_extract(F.col('Value'), '([a-zA-Z0-9]+)', 1))
sas_visa_types_df.head(5)

#### Join the metadata with immigration data

In [None]:
sas_countries_df.createOrReplaceTempView("SAS_Countries")
sas_airports_df.createOrReplaceTempView("SAS_Airports")
sas_states_df.createOrReplaceTempView("SAS_States")
sas_visa_types_df.createOrReplaceTempView("SAS_VisaTypes")
sas_modes_df.createOrReplaceTempView("SAS_Modes")
df_spark_clean_no_na.createOrReplaceTempView("Immigration")

In [None]:
df_immigration_clean = spark.sql("""
    SELECT cicid, i94yr, i94mon, i94port, 
        M.Value as mode, i94addr,
        V.Value as visa_code,
        C1.Value as i94_cit_country,
        C2.Value as i94_res_country,
        i94bir, biryear, gender, 
        visatype, arrival_date, departure_date
    FROM Immigration I 
    INNER JOIN SAS_Countries C1 on I.i94cit = C1.Code
    INNER JOIN SAS_Countries C2 on I.i94res = C2.Code 
    INNER JOIN SAS_VisaTypes V on I.i94visa = V.Code
    INNER JOIN SAS_Modes M on I.i94mode = M.Code
    """)

In [None]:
df_immigration_clean.show(5)

#### Save cleaned immigration data

In [None]:
df_immigration_clean = df_immigration_clean \
    .withColumn('year_part', F.col('i94yr')) \
    .withColumn('month_part', F.col('i94mon')) \
    .coalesce(1)

In [None]:
df_immigration_clean \
    .write \
    .partitionBy("year_part", "month_part") \
    .save(path=DATALAKE_CLEAN_PATH+'/immigration', source='parquet', mode='overwrite')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
In this project we're building data pipeline for analysing immigration flow to the US, and there's one data set containing this data. Other data sets contain complimentary data. This fits well into star schema with Immigration table as a fact, and data from other data sets as dimensions. Therefore, the model is as follows:

##### Fact Table
- __F_Immigration__ - immigration data.
Columns: _cicid_, _year_, _month_, _residence_country_, _citizenship_country, _arrival_date_, _departure_date_, _arrival_mode_, _arrival_port_, _age_, _visa_type_, _birth_year_, _visa_code_, _gender_, _arrival_state_

##### Dimension Tables
- __D_Airport__ - U.S. aiports. 
Columns: _identifier_, _type_, _airport_name_, _municipality_, _iata_code_, _state_code_
- __D_Demographics__ - U.S. demographics by state.
Columns: _city_, _state_, _median_age_, _male_population_, _female_population_, _total_population_, _num_veterans_, _foreign_born_, _avg_household_size_, _state_code_, _race_
- __D_US_State__ - U.S. states - abbreviations and full name.
Columns: _state_code_, _state_name_


#### 3.2 Mapping Out Data Pipelines
Here are the steps necessary to pipeline the data into the data model:
1. Load data from files (SAS data from parquet, demographics, airport and SAS metadata - from csv) as save to S3 (raw datalake)
2. Read raw data from datalake, clean it and save back to S3 (conformed layer).
3. Build data model (using snowflake schema, described above) from cleaned data and save back to S3 (application layer).
4. Load data from application layer into Redshift for analysis.
5. Do data quality checks

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
We need to load cleaned data from S3 which was saved there in previous step but since we've already loaded all the data earlier, for simplicity we won't be re-loading it again from S3.

In [None]:
generate_unique_id = F.udf(lambda : str(uuid.uuid4()),StringType())

In [None]:
def save_dataframe_to_s3_as_json(df, dest_path):
    df.write.option("header", True) \
        .mode('overwrite') \
        .json(dest_path)

### Create Immigration fact table

In [None]:
f_immigration = df_immigration_clean.withColumn("immigration_key", generate_unique_id()) \
                        .select(F.col('immigration_key'), \
                        F.col('cicid'), \
                        F.col('i94yr').alias('year'), \
                        F.col('i94mon').alias('month'), \
                        F.col('i94_cit_country').alias('citizenship_country'),\
                        F.col('i94_res_country').alias('residence_country'), \
                        F.col('visa_code').alias('visa_category'), \
                        F.col('visatype').alias('visa_type'), \
                        F.col('arrival_date'), \
                        F.col('departure_date'), \
                        F.col('mode').alias('arrival_mode'), \
                        F.col('i94port').alias('arrival_port'), \
                        F.col('i94bir').alias('age'), \
                        F.col('biryear').alias('birth_year'), \
                        F.col('gender'), \
                        F.col('i94addr').alias('arrival_state'))

### Save Immigration table to S3 (application layer, to be loaded into Redshift for analysis)

In [None]:
save_dataframe_to_s3_as_json(f_immigration, DATALAKE_APPLICATION_PATH+'/f_immigration')

### Create Demographics dimension table

In [None]:
df_demographics_data_clean.show(5)

In [None]:
dim_demographics = df_demographics_data_clean.withColumn("demographics_key", generate_unique_id()) \
                        .select(F.col('demographics_key'), \
                        F.col('City').alias('city'), \
                        F.col('State').alias('state'), \
                        F.col('Median_Age').alias('median_age'),\
                        F.col('Male_Population').alias('male_population'), \
                        F.col('Female_Population').alias('female_population'), \
                        F.col('Total_Population').alias('total_population'), \
                        F.col('Number_of_Veterans').alias('num_veterans'), \
                        F.col('Foreign-born').alias('foreign_born'), \
                        F.col('Average_Household_Size').alias('avg_household_size'), \
                        F.col('State_Code').alias('state_code'), \
                        F.col('Race').alias('race'))

In [None]:
save_dataframe_to_s3_as_json(dim_demographics, DATALAKE_APPLICATION_PATH+'/d_demographics')

### Create Airports dimension table

In [None]:
# Rename columns and select only columns that are needed for the model
dim_airports = df_airport_codes_data_clean.withColumn("airport_key", generate_unique_id()) \
                    .select(F.col('airport_key'), \
                    F.col('ident').alias('identifier'), \
                    F.col('type').alias('type'), \
                    F.col('name').alias('airport_name'),\
                    F.col('state_code'), \
                    F.col('municipality'), \
                    F.col('iata_code'))

In [None]:
save_dataframe_to_s3_as_json(dim_airports, DATALAKE_APPLICATION_PATH+'/d_airport')

### Create States dimension table

In [None]:
dim_airports.createOrReplaceTempView("Airports")
dim_demographics.createOrReplaceTempView("Demographics")

In [None]:
dim_states = spark.sql("""
    SELECT DISTINCT a.state_code, 
        d.state as state_name
    FROM Airports A 
    LEFT JOIN Demographics D on D.state_code = A.state_code
    ORDER BY A.state_code
    """)
dim_states.show(51)

As we can see, 3 states don't have corresponding state names. Let's fill them in.

In [None]:
dim_states_updated = spark.createDataFrame([
    ('VT', "Vermont"),
    ('WV', "West Virginia"),
    ('WY', "Wyoming")
], ("state_code", "state_name"))

In [None]:
dim_states.createOrReplaceTempView("US_States")
states_without_null_names = spark.sql(f"SELECT * FROM US_States WHERE state_name is not NULL")
dim_states_final = states_without_null_names.union(dim_states_updated)
dim_states_final.head(51)

In [None]:
dim_states = dim_states_final.withColumn("state_key", generate_unique_id()) \
                            .select(F.col('state_key'), F.col('state_code'), F.col('state_name'))

In [None]:
save_dataframe_to_s3_as_json(dim_states, DATALAKE_APPLICATION_PATH+'/d_us_state')

### Load all tables into Redshift
Script for creating Redshift tables is in create_tables.py file

In [None]:
IAM_ROLE = config.get('AWS', 'IAM_ROLE')
DWH_ENDPOINT = config.get('CLUSTER', 'DWH_ENDPOINT')
DWH_USER = config.get('CLUSTER', 'DWH_USER')
DWH_PASSWORD = config.get('CLUSTER', 'DWH_PASSWORD')
DWH_PORT = config.get('CLUSTER', 'DWH_PORT')
DWH_DB = config.get('CLUSTER', 'DWH_NAME')

In [None]:
table_names = ['d_airport', 'd_demographics', 'd_us_state', 'f_immigration']

In [None]:
copy_statement = """
                    COPY {} FROM '{}' 
                    IAM_ROLE '{}'
                    format as json 'auto';
                """

In [None]:
%load_ext sql

In [None]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_USER, DWH_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

In [None]:
for table in table_names:
    try:
        copy_data = copy_statement.format(table, \
                               f"{DATALAKE_APPLICATION_PATH.replace('s3a', 's3')}/{table}/part-", \
                               IAM_ROLE)
        %sql $copy_data
    except Exception as ex:
        print(f'Error while loading data for table {table}: {str(ex)}')

#### 4.2 Data Quality Checks
The data quality checks that will be performed:
- Number of rows in source and destination. By doing this check, we automatically check that destination tables have data, so no need to check separately for that.
   
Also, no need to check that non-null columns don't have null values because this is by design (see script that creates tables).
   

In [None]:
conn = psycopg2.connect(dbname=DWH_DB, host=DWH_ENDPOINT, port=DWH_PORT, user=DWH_USER, password=DWH_PASSWORD)

In [None]:
def validate_counts(conn, table_name, expected_count):
    """
    Checks the number of records in table is same as expected
    """
    try:
        cur = conn.cursor()
        cur.execute(f"select count(*) from {table_name};")
        data = np.array(cur.fetchone())
        cur.close()
        record_count = data[0]
        if data and int(data[0]) == expected_count:
            return record_count
        raise ValueError(f'Data quality check (row number) for table {table_name} failed. Expected rows: {expected_count} Actual: {record_count}')
    except Exception as ex:
        print(f'Exception while checking number of rows in table {table_name}: {str(ex)}')
        cur.close()

Get the number of rows in each table before it was loaded into Redshift

In [None]:
tables_with_counts = {'d_airport': dim_airports.count(), \
          'd_demographics': dim_demographics.count(),\
          'd_us_state': dim_states.count(), \
          'f_immigration': f_immigration.count()}
tables_with_counts

Verify the number of rows in Redshift table is the same as in source data

In [None]:
counts = [validate_counts(conn, tbl, cnt) for tbl, cnt in tables_with_counts.items()]
counts

In [None]:
conn.close()

#### 4.3 Data dictionary 
See Data dictionary in file _data_dictionary.txt_.
Data model is in _US-Immigration-ER-Diagram.png_ file.

#### Step 5: Complete Project Write Up

Since files in the main data set are large Spark is used to load the data. S3 was selected for data storage as easy-to-use and inexpensive solution. Redshift was selected as the tool for data analysis because it seamlessly integrates with the chosen data storage and it is also reliable and scalable.

Frequency of data updates: immigration data changes every day, but trends don't change that often, so updating data once a month should be enough for the goal of this project.

If the requirements change, here's how the project would be implemented differently:
 * The data was increased by 100x.
   This does not change data storage or loading because S3 can cope with such amount of data.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
   The pipeline would run in Airflow scheduled to run at 6am (so it definitely finishes by 7am).
 * The database needed to be accessed by 100+ people.
   Redhift can support 100+ connections to this would not be a problem.