# Immigration and Tempurature Examination
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
# Do all imports and installs here
import pandas as pd
import configparser
from pyspark.sql.functions import *
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

## Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

This project will focus on exploring the relationship between tempurature and immigration in a specific state such as New York or city such as new york city. This will require using only the portions of the US Immigration Data in which the arrival was in a city in new york. To supplement this data, US tempurature data will also be used. In addition, demographic data will be added to explore relationships if any between tempurature and the frequency of immigration within a specifc demographic.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

#### Datasets:



### US Immigration data

In [None]:
# Read in immigration data	
# note: too large to read everything into memory at once
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *


spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [None]:
# check schema of df 
print(df_spark.columns)

In [None]:
im_pd = pd.DataFrame(df_spark.take(5))
im_pd.columns = df_spark.columns
im_pd.head()

## Demographic Data

In [None]:
# read demographic data
dem_df = pd.read_csv("us-cities-demographics.csv", sep=";")
dem_df.head()

In [None]:
dem_df.columns

## World Tempurature Data

In [None]:
# read in tempurature data
temp_df = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')
temp_df.head()

In [None]:
temp_df.columns

## Airport Codes

In [None]:
# read airport codes
air_codes_df = pd.read_csv("airport-codes_csv.csv")
air_codes_df.head()

In [None]:
air_codes_df.columns

# Step 2: Explore and Assess the Data
* Identify data quality issues, like missing values, duplicate data, etc.
* Document steps necessary to clean the data

## US Immigration Data Explortion & Assessment

### Volume
Since the data analysis is focused on the relationship between tempurature and immigration in new york city , the exploration will be focused on that subset of the data

In [None]:
df_spark_nyc = df_spark.filter(df_spark.i94port == "NYC")
df_spark_nyc.count()

### Data Quality

First, lets take a look at the columns again just for the new york city data

In [None]:
df_spark_nyc.toPandas().head()

## US Demographic Data Explortion & Assessment

### Volume

In [None]:
# get new york city data and check volume
# dem_df_nyc = dem_df[dem_df['City'] == "New York"]
# dem_df_nyc.count()
dem_df.columns = ['city', 'state', 'median_age', 'male_population','female_population',
                     'total_population', 'number_of_veterans','foreign_born', 'average_household_size',
                     'state_code','race','count']
dem_df_spark = spark.createDataFrame(dem_df)
dem_count = dem_df_spark.count()
print(dem_count)

### Data Quality Assessments

first, I will take a look at the demographic city data to explore what data is there and if there are any issues with it

In [None]:
dem_df_spark.toPandas().head()

For the purposes of this project, I scoped my analysis to a single city so I will only need the few records that pertain to New York, New York specifically, but I will still assess the data as whole as it pertains to duplicates and missing fields:

In [None]:
dem_na_count = dem_df_spark.dropna(how="any").count()
dem_dup_count = dem_df_spark.dropDuplicates().count()

print(f"Number of records with missing data in demographics data set: {dem_count - dem_na_count}")
print(f"Number of records with duplicate data in demographics data set: {dem_count - dem_dup_count}")

## World Tempurature Data Explortion & Assessment

### Volume

In [None]:
# get nyc sample
temp_df_nyc = spark.createDataFrame(temp_df[temp_df['City'] == "New York"])
temp_count = temp_df_nyc.count()

### Data Quality Assessments

In [None]:
# read tempurature sample
temp_df_nyc.toPandas().head() 

Now I will assess how many records are duplicates or have missing data

In [None]:
temp_na_count = temp_df_nyc.dropna(how="any").count()
temp_dup_count = temp_df_nyc.dropDuplicates().count()

print(f"Number of records with missing data in demographics data set: {temp_count - temp_na_count}")
print(f"Number of records with duplicate data in demographics data set: {temp_count - temp_dup_count}")

## Airport Code Explortion & Assessment

Though airport codes are not needed for the scope of this project, it will be processed anyways in case future analysis requires it

### Volume

before the data can be used with the spark, thedata types need to be turned into something spark can parse, as currently everything was read in as object, for now everything will be made to be a string and adjusted to their proper type in the cleaning step

In [None]:
print(air_codes_df.columns)

In [None]:
air_codes_df = air_codes_df.astype(str)

In [None]:
air_codes_df.dtypes

In [None]:
# count air code data
air_df = spark.createDataFrame(air_codes_df)
air_count = air_df.count()

In [None]:
print(air_count)

### Data Quality Assessments

first I will look at the data just to remind us of what is there

In [None]:
air_df.toPandas().head()

# Step 3: Define the Data Model
## 3.1 Conceptual Data Model
(Map out the conceptual data model and explain why you chose that model)

The Data Model that will be used is a star schema. This is because since there will be so much data in the data warehouse, it is important to simplify the joins as not doing so will significantly slow down queries. 

The us immigration data will serve as the fact table since that is the primary event in question i.e people immigrating into the US. The demographic data will be a dimension table and should join with the immigration data via the city name. The Tempurature data will be another dimension table which can join with the immigration data based on the date of arrival. The airport codes can be joined based on the city name.

## 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

Currently the data exists as csv files and the end goal is to have a datawarehouse that can be quried. AWS Will be used for the pipeline and the steps are as follows:

1. extract the source data csv files using spark and examine and report and the data quality
2. clean the data as per the above cleaning transformation requirements
3. set up AWS using IAC and create the needed S3 buckets for the data lake and the tables needed for redshift
4. load the cleaned spark dataframes into S3 as csv files
5. use S3 to load data into redshift

## Step 4: Run Pipelines to Model the Data 
## 4.1 Create the data model
Build the data pipelines to create the data model.

# Infrastructure Setup

### Read the confugration data

In [None]:
KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

### Set up clients for EC2, S3, IAM and Redshift

In [None]:
import boto3

ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

### Use S3 client to create a bucket

In [None]:
# Create bucket
try:
    location = {'LocationConstraint': "us-west-2"}
    s3.create_bucket(Bucket="deng-capstone", \
    CreateBucketConfiguration=location)
except ClientError as e:
    print(e)

### List buckets to ensure it was created

In [None]:
deng_bucket = s3.Bucket("deng-capstone")

# Output the bucket object names
for obj in deng_bucket.objects.filter():
    print(obj)

### Create IAM Role for redshift to S3 access 

In [None]:
from botocore.exceptions import ClientError
import json

#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)

In [None]:
# Attach a policy
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

In [None]:
# TODO: Get and print the IAM role ARN to make sure it worked
print('1.3 Get the IAM role ARN')
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

### Create redshift cluster with provided configs

In [None]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

### Display cluster info to see when it is available

In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

# get 1st cluster from cluster list (only 1 in this case)
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

### When cluster is ready, get the info needed to connect to it

In [None]:
### Wait Until Redshift is available before running this
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

### Open an incoming  TCP port to access the cluster ednpoint

In [None]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

### Check connection to redshift cluster

In [None]:
%load_ext sql

In [None]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

### When ready, delete cluster (Do not do this until project run through is complete)

In [None]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

# Extraction
Since the data has already been retreived so that it could be explored and assessed in a previous step the extraction step is already handled. All that is left is to transform and load the data.

# Transformation

### Immigration Data Cleaning

The following data is needed for analysis and thus should not be null
* arrdate: needed to compare to tempurature for that date

In [None]:
# count amount of records after dropping missing arrival dates
nyc_valid = df_spark_nyc.dropna(how="any", subset=["arrdate"])
num_valid_dates = nyc_valid.count()
print(num_valid_dates)

So as it turns out there are no empty dates which is good. Now I will drop any records that don't have data for the other fields I intend to use for analysis

In [None]:
nyc_valid = nyc_valid.dropna(how="any", subset=["I94BIR", "I94CIT", "I94RES", "GENDER", "I94MODE"])
num_nonnulls = nyc_valid.count()
print("Number of records dropped due to missing data: " + str(num_valid_dates - num_nonnulls))
print("New Record count is: " + str(num_nonnulls))

Now that records with missing data has been removed, all that is left for the immigration data is to get ride of any duplicates. In this case, I will assume duplicates are multiple records with the same admission number

In [None]:
nyc_valid = nyc_valid.dropDuplicates(["ADMNUM"])
num_nodups = nyc_valid.count()
print("Number of records dropped due to duplicate data: " + str(num_nonnulls - num_nodups))
print("New Record count is: " + str(num_nodups))

In addition, the entry_city format will need to be changed so that it can match up with the other tables which refer to "New York City" as just "New York"

In [None]:
nyc_valid = nyc_valid.select(df_spark.columns)\
            .withColumn("full_entry_city", lit("New York"))

In [None]:
nyc_valid.toPandas().head()

### Demographic Data Cleaning

Next, I will get rid of any record that don't have data in the following fields as they are essential to my analysis
* City
* State
* Race
* Count

In [None]:
dem_valid = dem_df_spark.dropna(how="any", subset=["City", "State", "Race", "Count"])
dem_nonull_count = dem_valid.count()

In [None]:
print("Number of records dropped due to missing data: " + str(dem_count - dem_nonull_count))
print("New Record count is: " + str(dem_nonull_count))

As it turns out, there is an additional issue with the parsing when trying to load into Redshift with parsing values, in this case any NaN values will be changed to a numeric value that clearly indicates an unknown field such as -1. This is because it would be useful to keep the field numeric but still be able to use the record. 

Since this project is scoped to New York, we are okay since those records have valid data.

In [None]:
dem_valid = dem_valid.na.fill(-1)

In [None]:
dem_valid.createOrReplaceTempView("dem")

In [None]:
d = spark.sql("""
SELECT * FROM dem
where number_of_veterans = -1
""")
d.toPandas().head()

### World Tempurature Data Cleaning

I need to get rid of any records with missing average tempurature data as that is the most important aspect of this data set

In [None]:
temp_df_valid = temp_df_nyc.dropna(how="any", subset=["AverageTemperature"])
temp_valid_count = temp_df_valid.count()

In [None]:
print("Number of records dropped due to missing data: " + str(temp_df_nyc.count() - temp_valid_count))
print("New Record count is: " + str(temp_valid_count))

### Airport Codes Cleaning

This data set will need to be processed a bit differently. Because everything had to be parsed as a string, I won't be able to look for the value NaN but I can look for it as a string. Duplicates will be anything with the same identifier (ident)

In [None]:
air_na_count = air_df.filter("""continent == 'nan' or iso_country == 'nan' 
or iso_region == 'nan' or municipality == 'nan'
""").count()
air_dup_count = air_df.dropDuplicates(["ident"]).count()

print(f"Number of records with missing data in air codes data set: {air_count - air_na_count}")
print(f"Number of records with duplicate data int he air codes data set: {air_count - air_dup_count}")

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def remove_quote(field):
    l = field.split()
    for index, word in enumerate(l):
        if '\"' in word:
            l[index] = word.replace('\"',"")
            print(word)
    return " ".join(l)

spark.udf.register("unQuote", remove_quote, StringType())

In [None]:
# run udf to remove field usage of double quotes due to redshift conflict
air_df.createOrReplaceTempView("air_codes")
air_df = spark.sql("""
SELECT ident,
 type,unQuote(name) as Name, elevation_ft, continent, iso_country,
 iso_region, municipality, gps_code, iata_code,
 local_code, coordinates 
 FROM air_codes
""")
air_df.toPandas().head()

In [None]:
# nyc_valid.filter(nyc_valid.full_entry_city == "New York").toPandas().head()
# dem_valid.filter(dem_valid.city == "New York").toPandas().head()
# temp_df_valid.filter(temp_df_valid.City == "New York").toPandas().head()
# air_df.filter(air_df.municipality == "New York").toPandas().head()

As it turns out there is alot of missing data in the air codes so the original data should be examined to find out why but for my purposes I will just upload the full data set

# Loading

## Upload to S3

In [None]:
# write valid data frames to json
folderpath = "./final_csv"
nyc_valid.write.format("csv").option("header", "False").save(f"{folderpath}/valid_im_data")
dem_valid.write.format("csv").option("header", "False").save(f"{folderpath}/valid_dem_data")
temp_df_valid.write.format("csv").option("header", "False").save(f"{folderpath}/valid_temp_data")
air_df.write.format("csv").option("header", "False").save(f"{folderpath}/valid_air_data")

In [None]:
import os
# define upload func
# response is the bucket, should rename it
def upload_files(path):
    for subdir, dirs, files in os.walk(path):
        for file in files:
            full_path = os.path.join(subdir, file)
            with open(full_path, 'rb') as data:
                deng_bucket.put_object(Key=full_path[2:], Body=data)

# upload csvs to S3
folder_paths = [f"{folderpath}/valid_im_data", f"{folderpath}/valid_dem_data", f"{folderpath}/valid_temp_data",\
               f"{folderpath}/valid_air_data"]
for data_path in folder_paths:
    upload_files(data_path)

## S3 to Redshift

### Redshift Data Model Setup
Once a redshift cluster has been provisioned, the star schema model must be set up for it as well as the staging tables

In [None]:
import psycopg2

# setting up postgres connection to redshift cluster
conn = psycopg2.connect(f"host={DWH_ENDPOINT} dbname={DWH_DB} user={DWH_DB_USER} password={DWH_DB_PASSWORD} port={DWH_PORT}")
cur = conn.cursor()

In [None]:
# set up create queries
staging_img_drop = "DROP TABLE IF EXISTS staging_img;"
staging_dem_drop = "DROP TABLE IF EXISTS staging_dem;"
staging_temp_drop = "DROP TABLE IF EXISTS staging_temp;"
staging_air_drop = "DROP TABLE IF EXISTS staging_air;"

immigration_drop = "DROP TABLE IF EXISTS immigration;"
demographic_drop = "DROP TABLE IF EXISTS demographics;"
tempurature_drop = "DROP TABLE IF EXISTS tempuratures;"
airport_drop = "DROP TABLE IF EXISTS airports;"

drop_queries = [staging_img_drop, staging_dem_drop, staging_temp_drop, staging_air_drop, immigration_drop, demographic_drop, tempurature_drop, airport_drop]

staging_img_create = ("""
CREATE TABLE IF NOT EXISTS staging_img(
id bigint identity(0, 1), cicid numeric, year numeric, month numeric, city_code numeric,
res_code numeric, entry_city varchar, arrival_date numeric, arrival_mode numeric, state varchar, 
dep_date numeric, age numeric, visa_type numeric, count numeric, dtadfile varchar, visapost varchar, 
occupation varchar, entdepa varchar, entdepd varchar, entdepu varchar, matflag varchar, birth_year numeric, 
dtaddto varchar, gender varchar, insnum varchar, airline varchar, admission_num numeric, 
fltno varchar, visa_class varchar, full_entry_city varchar);""")

staging_dem_create = ("""
CREATE TABLE IF NOT EXISTS staging_dem(
city varchar, state varchar, median_age numeric, male_population numeric, female_population numeric,
total_population bigint, number_of_veterans numeric, foreign_born numeric,
average_household_size numeric, state_code varchar, race varchar, count bigint) 
""")

staging_temp_create = ("""
CREATE TABLE IF NOT EXISTS staging_temp(
datetime varchar, average_temperature numeric, average_temperature_uncertainty numeric, city varchar,
country varchar, latitude varchar, longitude varchar) 
""")

staging_air_create = ("""
CREATE TABLE IF NOT EXISTS staging_air(
airport_id varchar, type varchar, name varchar, elevation_ft varchar, continent varchar, iso_country varchar,
iso_region varchar, city varchar, gps_code varchar, iata_code varchar, local_code varchar,
coordinates varchar) 
""")

# create final tables ----------------
# join entry_city on immigration to city field in the other tables
demographic_create = ("""
CREATE TABLE IF NOT EXISTS demographics(
dem_id bigint identity(0, 1) PRIMARY KEY, city varchar, state varchar, median_age numeric, male_population numeric, female_population numeric,
total_population bigint, number_of_veterans numeric, foreign_born numeric,
average_household_size numeric, state_code varchar, race varchar, count bigint) 
""")

tempurature_create = ("""
CREATE TABLE IF NOT EXISTS tempuratures(
temp_id bigint identity(0, 1) PRIMARY KEY, datetime varchar, average_temperature numeric, average_temperature_uncertainty numeric, city varchar,
country varchar, latitude varchar, longitude varchar) 
""")

airport_codes_create = ("""
CREATE TABLE IF NOT EXISTS airports(
airport_id varchar PRIMARY KEY, type varchar, name varchar, elevation_ft varchar, continent varchar, iso_country varchar,
iso_region varchar, city varchar, gps_code varchar, iata_code varchar, local_code varchar,
coordinates varchar) 
""")

immigration_create = ("""
CREATE TABLE IF NOT EXISTS immigration(
id bigint identity(0, 1) PRIMARY KEY,  dem_id bigint, temp_id bigint, airport_id varchar, 
cicid numeric, year numeric, month numeric, city_code numeric,
res_code numeric, entry_city varchar, arrival_date numeric, arrival_mode numeric, state varchar, 
dep_date numeric, age numeric, visa_type numeric, count numeric, dtadfile varchar, visapost varchar, 
occupation varchar, entdepa varchar, entdepd varchar, entdepu varchar, matflag varchar, birth_year numeric, 
dtaddto varchar, gender varchar, insnum varchar, airline varchar, admission_num numeric, 
fltno varchar, visa_class varchar, full_entry_city varchar,
FOREIGN KEY(dem_id) REFERENCES demographics(dem_id),
FOREIGN KEY(temp_id) REFERENCES tempuratures(temp_id),
FOREIGN KEY(airport_id) REFERENCES airports(airport_id));""")

create_queries = [staging_img_create, staging_dem_create, staging_temp_create, staging_air_create, demographic_create, tempurature_create, airport_codes_create, immigration_create]

In [None]:
# create or reset database by executing drop and creat queries
for query in drop_queries:
    print(query)
    cur.execute(query)
    conn.commit()

In [None]:
for query in create_queries:
    print(query)
    cur.execute(query)
    conn.commit()

### S3 to Redshift Loading
Now the data can be loaded from S3 into redshift

In [None]:
# setting up the redshift query to handle the loading
# Need to update data sources
staging_img_copy = ("""
COPY staging_img FROM '{}/part' 
CREDENTIALS 'aws_iam_role={}'
csv REGION 'us-west-2'
delimiter ',';
""").format(config.get("S3", "IM_DATA"), DWH_ROLE_ARN)

staging_dem_copy = ("""
COPY staging_dem FROM '{}/part' 
CREDENTIALS 'aws_iam_role={}'
csv REGION 'us-west-2'
delimiter ',';
""").format(config.get("S3", "DEM_DATA"), DWH_ROLE_ARN)

staging_temp_copy = ("""
COPY staging_temp FROM '{}/part' 
CREDENTIALS 'aws_iam_role={}'
csv REGION 'us-west-2'
delimiter ',';
""").format(config.get("S3", "TEMP_DATA"), DWH_ROLE_ARN)

staging_air_copy = ("""
COPY staging_air FROM '{}/part' 
CREDENTIALS 'aws_iam_role={}'
csv REGION 'us-west-2'
delimiter ',';
""").format(config.get("S3", "AIR_DATA"), DWH_ROLE_ARN)

staging_queries = [staging_img_copy, staging_dem_copy, staging_temp_copy, staging_air_copy]

In [None]:
# execute the redshift query
from psycopg2 import Error
for query in staging_queries:
    try:
        print(query)
        cur.execute(query)
    except Error as e:
        print(e.diag.message_primary)
    conn.commit()

In [None]:
# set up dimension table loading queries
insert_dem = ("""
INSERT INTO demographics(city, state, median_age, male_population, female_population,
total_population, number_of_veterans, foreign_born,
average_household_size, state_code, race, count)
(
SELECT city, state, median_age, male_population, female_population,
total_population, number_of_veterans, foreign_born,
average_household_size, state_code, race, count
FROM staging_dem
)""")

insert_temp = ("""
INSERT INTO tempuratures(datetime, average_temperature , average_temperature_uncertainty, city,
country, latitude, longitude)
( SELECT * FROM staging_temp )""")

insert_air = ("""
INSERT INTO airports(airport_id, type, name, elevation_ft, continent, iso_country,
iso_region, city, gps_code, iata_code, local_code, coordinates)
(
SELECT airport_id, type, name, elevation_ft, continent, iso_country,
iso_region, city, gps_code, iata_code, local_code, coordinates
FROM staging_air
)""")

insert_im = ("""
INSERT INTO immigration(dem_id, temp_id, airport_id, 
cicid, year, month, city_code,
res_code, entry_city, arrival_date, arrival_mode, state, 
dep_date, age, visa_type, count, dtadfile, visapost, 
occupation, entdepa, entdepd, entdepu, matflag, birth_year, 
dtaddto, gender, insnum, airline, admission_num, fltno, visa_class, full_entry_city)
(
SELECT d.dem_id, t.temp_id, a.airport_id, i.cicid, i.year, i.month, i.city_code, i.res_code, i.entry_city , i.arrival_date, i.arrival_mode, i.state, 
i.dep_date, i.age, i.visa_type, i.count, i.dtadfile, i.visapost, i.occupation, i.entdepa, i.entdepd , i.entdepu , i.matflag, i.birth_year, 
i.dtaddto, i.gender, i.insnum, i.airline, i.admission_num, i.fltno, i.visa_class, i.full_entry_city
FROM staging_img i
LEFT JOIN demographics d ON i.entry_city = d.city
LEFT JOIN tempuratures t ON i.entry_city = t.city
LEFT JOIN airports a ON i.entry_city = a.city
)""")

# make sure all dim tables added
insert_queries = [insert_air, insert_dem, insert_temp]

In [None]:
# execute dimension table loading queries
for query in insert_queries:
    print(query)
    cur.execute(query)
    conn.commit()

In [None]:
# execute fact table loading query
query = insert_im
cur.execute(query)
conn.commit()

# 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness

## Completness Checks

In [None]:
cur.execute("SELECT COUNT(*) FROM immigration")
im_db_count = cur.fetchone()
print(f"The count for the dataframe is {nyc_valid.count()} and the count for the db is {im_db_count[0]}")

In [None]:
cur.execute("SELECT COUNT(*) FROM tempuratures")
temp_db_count = cur.fetchone()
print(f"The count for the dataframe is {temp_df_valid.count()} and the count for the db is {temp_db_count[0]}")

In [None]:
cur.execute("SELECT COUNT(*) FROM airports")
air_db_count = cur.fetchone()
print(f"The count for the dataframe is {air_df.count()} and the count for the db is {air_db_count[0]}")

In [None]:
cur.execute("SELECT COUNT(*) FROM demographics")
dem_db_count = cur.fetchone()
print(f"The count for the dataframe is {dem_valid.count()} and the count for the db is {dem_db_count[0]}")

## Data Model Checks

In [None]:
%sql SELECT * FROM immigration LIMIT 5

In [None]:
%sql SELECT * FROM demographics LIMIT 5

In [None]:
%sql SELECT * FROM tempuratures LIMIT 5

In [None]:
%sql SELECT * FROM airports LIMIT 5

note: dates were left as SAS numberic dates to allow flexibility in parsing depending on where data is used

In [None]:
%sql SELECT to_date('19600101', 'YYYYMMDD') + cast(arrival_date as integer) AS "Date", COUNT(*) FROM immigration group by arrival_date LIMIT 5

In [None]:
cur.execute("""
SELECT to_date('19600101', 'YYYYMMDD') + cast(arrival_date as integer) AS "Date", COUNT(*) FROM immigration group by arrival_date order by "Date" asc
""")
res_df = pd.DataFrame(cur.fetchall())
res_df.columns = ["date", "count"]
res_df.head()

In [None]:
%matplotlib inline

In [None]:
from matplotlib import pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(12, 6))
plt.plot(res_df["date"], res_df["count"])

# 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

All dictionaries are in the data_dictionary folder

# Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

The primary tools used in this project are Spark, Pandas, S3 for the data lake and Redshift for the dimensional model. With respect to data processing and exploration, Spark is a key tool as it provides the parallelized abstraction of the Spark data frame and operates on that parallelized data structure in memory, making the ETL process much faster than traditional data processing tools such as MapReduce. When it comes to visualizing the data, Pandas is used because it integrates well with jupyter notebooks making the tables look much cleaner than simple df.show() on a spark data frame. S3 is used as the data lake because it provides an easy to use interface for storing lots of files and the cost is relativley low. On top of this, it has good integration with Redshift and allows the loading to be done in parallel just like spark allowed for the transformations. Redshift is used because as a column based database that can be interfaced with using postgres, it provides both the query speed that a data ware house needs and the familiarity of PostgreSQL, allowing existing SQL based workloads such as those an analyst might use.

How often the data should be updated depends on the use for this pipeline. The scope of this project prepared a data set for examining possible relationships between tempurature and immigration. If this information is to be used for long term urban planning projects, less frequent updates, such as daily throughout the project's life, would suffice. If however, the data set will be used for operational decision that need to be made faster, such as new immigration decisions, it must be updated much more frequently such as hourly.

If the data was increased 100x, I would have to be much more careful about operating on it and tracking changes. In all liklihood I would load the data set into S3 first to free up on prem space requirements. I would also increase the amount of nodes in the redshift cluster to handle the extra data loading.  

Thankfully, I went through this project keeping in mind that someone will need to use to it. This is why I chose to use Redshift. Since it can interfaced with using PostgreSQL, pretty much any analyst or BI specialist - of which SQL knowledge is a basic requirement - can interact with the data. In addition, because of the popularity of PostgreSQL, most popular dashboard tools like Tableau, Power BI etc. will likely have native, easy to use support for things like connectors to PostgreSQL databases. The fact that it would need to be updated at a specific time each day would make an scheduling tool like airflow more necessary and would handle running all of the code in this notebook before the dashboard needs to be examined each day. 

If the data needed to be accessed by 100+ people, Redshift configuration can be changed to make it more available. Some examples of that would be having Redshift servicing multiple availability zones. In addition, if there are common workloads or queries that need to be run by many of these people, they can be prepared in advanced such as pre aggregated OLAP cubes.