# Explore destinations of travellers entering the U.S. with Spark and Redshift
### Data Engineering Capstone Project

#### Project Summary
The objective of this project is to construct a model that gather information about traveller entering the U.S. and natural and social environment of  their destination state, which may be helpful to understand the immigration and population flow tendency. For example, many male travellers from country A immigrate to state B for work, where there is a good weather, a younger median age in population, or more foreign born residents, we may correlate the demographics situation to the immigration and find out the insight in further analysis.

The project include the gather, access and clean data form various data sources and build a data lake using Spark, create ETL pipelines for final model  with Redshift

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [40]:
# all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession

from pyspark.sql.functions import udf, col,when, isnan, year, month, dayofmonth, hour, weekofyear, date_format, to_date, min, max,desc
from pyspark.sql.types import StructType, StructField, DoubleType,FloatType, StringType, IntegerType, DateType

from datetime import date, datetime,timedelta

import configparser
import psycopg2
import os
from sql_queries import create_table_queries, drop_table_queries, copy_table_queries, insert_table_queries_dim, insert_table_queries_fact

In [41]:
# input
fname_immg = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
fname_immg_label = 'I94_SAS_Labels_Descriptions.SAS'

fname_tmpt = '../../data2/GlobalLandTemperaturesByCity.csv'

fname_dmgr = 'us-cities-demographics.csv'

In [57]:
# CONFIG
config = configparser.ConfigParser()
config.read('dwh.cfg')

AWS_ACCESS_KEY_ID=config['AWS']['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY=config['AWS']['AWS_SECRET_ACCESS_KEY']

"""
AWS_ACCESS_KEY_ID=config['AWS']['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY=config['AWS']['AWS_SECRET_ACCESS_KEY']

os.environ['AWS_ACCESS_KEY_ID']= config['AWS']['AWS_SECRET_ACCESS_KEY']
os.environ['AWS_SECRET_ACCESS_KEY']= config['AWS']['AWS_SECRET_ACCESS_KEY']
"""

"\nAWS_ACCESS_KEY_ID=config['AWS']['AWS_ACCESS_KEY_ID']\nAWS_SECRET_ACCESS_KEY=config['AWS']['AWS_SECRET_ACCESS_KEY']\n\nos.environ['AWS_ACCESS_KEY_ID']= config['AWS']['AWS_SECRET_ACCESS_KEY']\nos.environ['AWS_SECRET_ACCESS_KEY']= config['AWS']['AWS_SECRET_ACCESS_KEY']\n"

In [58]:


spark = SparkSession.builder.\
    config("spark.jars.repositories", "https://repos.spark-packages.org/").\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11, org.apache.hadoop:hadoop-aws:2.7.5").\
    config("spark.hadoop.fs.s3a.awsAccessKeyId", AWS_ACCESS_KEY_ID).\
    config("spark.hadoop.fs.s3a.awsSecretAccessKey", AWS_SECRET_ACCESS_KEY).\
    enableHiveSupport().getOrCreate()


'\nspark = SparkSession.builder.    config("spark.jars.repositories", "https://repos.spark-packages.org/").    config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").    enableHiveSupport().getOrCreate()\nspark._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY)\nspark._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", AWS_SECRET_KEY)\n    config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").    config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.7"). \n'

In [None]:
# test upload to s3

df_dmgr = spark.read.options(delimiter=';',header='true').csv(fname_dmgr)
#df_dmgr.write.parquet("s3a://awsbucketmxy666/capstone/",mode="overwrite")
df_dmgr.write.parquet("s3a://aws-emr-resources-508919957385-us-east-2/cap/US_demographics",mode="overwrite")


### Step 1: Scope the Project and Gather Data

#### Scope 
The data source contain 1 SAS data and 2 CSV format source. The project create a data lake with these raw sources with big data frame Spark and store them into S3 bucket in AWS. Then, it preform a ETL process, extracting data from S3, transforming and loading the desired model into Redshift data warehouse, with which other analytic roles could perform further analysis  

#### Describe and Gather Data 
- I94 Immigration Data: This data comes from the US National Tourism and Trade Office and it includes information about details of travellers who land in the U.S.. This project picks the data of April 2016 as sample.
More information: https://travel.trade.gov/research/reports/i94/historical/2016.html
- World Temperature Data: This dataset came from Kaggle and it includes information about monthly average temperature of  main cities in the world. This project select data ranging from 1993 to 2013.
More information: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data
- U.S. City Demographic Data: This data comes from OpenSoft and includes demographical  information in the US at city level.
More information: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/

- dataset 1: Immigration

In [6]:
df_i94 = spark.read.format('com.github.saurfang.sas.spark').load(fname_immg)

In [None]:
# Read in the data here --TOO SLOW
df = pd.read_sas(fname_immg, 'sas7bdat', encoding="ISO-8859-1")
df.head()

In [13]:
# preview the dataset
pd.set_option('display.max_columns', None)

print(df_i94.count())
df_i94.printSchema()
df_i94.limit(5).toPandas()

3096313
root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nul

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


- dataset 2: Temperature

In [7]:
df_tmpt = spark.read.options(header='True').csv(fname_tmpt)

In [7]:
# preview the dataset
print(df_tmpt.count())
print(df_tmpt.show(5))
df_tmpt.printSchema()

8599212
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows

None
root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemper

 - dateset 3: Demographics

In [8]:
df_dmgr = spark.read.options(delimiter=';',header='true').csv(fname_dmgr)

In [108]:
# preview the dataset
print(df_dmgr.count())
print(df_dmgr.show(5))
df_dmgr.printSchema()

2891
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc, and explore the distribution of values in the dataset to have an overview of their content.

#### Cleaning Steps
Document steps necessary to clean the data

- dataset 1.1: lable table: 
    - turn the ".sas" file into a dictionary

In [9]:
with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')

# can't read the file using spark or pandas
'''
df_i94_label = spark.read.format('com.github.saurfang.sas.spark').load(fname_immg_label)
df_i94_label = pd.read_sas(fname_immg_label, 'sas7bdat', encoding="ISO-8859-1") 
df_i94_label.head()
'''

'\ndf_i94_label = spark.read.format(\'com.github.saurfang.sas.spark\').load(fname_immg_label)\ndf_i94_label = pd.read_sas(fname_immg_label, \'sas7bdat\', encoding="ISO-8859-1") \ndf_i94_label.head()\n'

In [10]:
# map a section of SAS file into a dictionary
def code_mapper(file, idx):
    f_content_clean = f_content[f_content.index(idx):]
    f_content_clean = f_content_clean[:f_content_clean.index(';')].split('\n')
    f_content_clean = [i.replace("'", "") for i in f_content_clean]
    
    dic = [i.split('=') for i in f_content_clean[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    
    return dic

# create dictionaries to map values into immigration
i94cit_res = code_mapper(f_content, "i94cntyl")
i94port = code_mapper(f_content, "i94prtl")
i94mode = code_mapper(f_content, "i94model")
i94addr = code_mapper(f_content, "i94addrl")
i94visa = code_mapper(f_content, "I94VISA")

In [11]:
# clean the dictionary to reduce invalid data
def clean_dic(dic,keywords):
    for key, value in dic.items():
        for keyword in keywords:
            if value.startswith(keyword):
                dic[key] = "others"
    return dic

keywords=["INVALID","No Country Code","Collapsed","No PORT Code"]
i94cit_res=clean_dic(i94cit_res,keywords)
i94port=clean_dic(i94port,keywords)

 - dataset 1.2: main table

In [12]:
## Performing cleaning tasks
# map the value in dictionary to columns
get_i94cit_res = udf(lambda x: i94cit_res.get(str(int(x))) if x != None else x )
get_i94port = udf(lambda x: i94port.get(x))
get_i94mode = udf(lambda x: i94mode.get(str(int(x))) if x != None else x )
get_i94addr = udf(lambda x: i94addr.get(x))
get_i94visa = udf(lambda x: i94visa.get(str(int(x))) if x != None else x )

# calculate the current age of recorded immigrant
sas2dt = udf(lambda x: (datetime(1960,1,1) + timedelta(days = x)).strftime('%Y-%m-%d') if x != None else x )

df_i94_clean =df_i94.withColumn('i94cit', get_i94cit_res(df_i94.i94cit)) \
                .withColumn('i94port', get_i94port(df_i94.i94port)) \
                .withColumn('i94mode', get_i94mode(df_i94.i94mode)) \
                .withColumn('i94addr', get_i94addr(df_i94.i94addr)) \
                .withColumn('i94visa', get_i94visa(df_i94.i94visa)) \
                .drop("i94res","dtadfile", "count","visapost", "occup", "entdepa", "entdepd", "entdepu") \
                .withColumn("cicid",col("cicid").cast(IntegerType())) \
                .withColumn("i94yr",col("i94yr").cast(IntegerType())) \
                .withColumn("i94mon",col("i94mon").cast(IntegerType()))\
                .withColumn("biryear",col("biryear").cast(IntegerType()))\
                .withColumn("admnum",col("admnum").cast(IntegerType())) \
                .withColumn("i94bir",date.today().year-col("biryear")) \
                .withColumn("dtaddto",to_date(col("dtaddto"),"MMddyyyy")) \
                .withColumn("arrdate",to_date(sas2dt(df_i94.arrdate))) \
                .withColumn("depdate",to_date(sas2dt(df_i94.depdate))) \
                .withColumnRenamed("cicid","cic_id") \
                .withColumnRenamed("i94yr","i94_year") \
                .withColumnRenamed("i94mon","i94_month") \
                .withColumnRenamed("i94cit","i94_incoming_country") \
                .withColumnRenamed("i94port","i94_landing_port") \
                .withColumnRenamed("arrdate","arrival_date") \
                .withColumnRenamed("i94visa","i94_visa") \
                .withColumnRenamed("i94mode","i94_travel_mode") \
                .withColumnRenamed("i94addr","i94_address") \
                .withColumnRenamed("depdate","departure_date") \
                .withColumnRenamed("i94bir","i94_age") \
                .withColumnRenamed("matflag","match_flag") \
                .withColumnRenamed("biryear","birth_year") \
                .withColumnRenamed("dtaddto","admitted_date") \
                .withColumnRenamed("insnum","INS_no") \
                .withColumnRenamed("admnum","admission_no") \
                .withColumnRenamed("fltno","flight_no") \
                .withColumnRenamed("visatype","visa_type")

df_i94_clean.limit(5).toPandas() # .select('i94cit_res').drop_duplicates()

Unnamed: 0,cic_id,i94_year,i94_month,i94_incoming_country,i94_landing_port,arrival_date,i94_travel_mode,i94_address,departure_date,i94_age,i94_visa,match_flag,birth_year,admitted_date,gender,INS_no,airline,admission_no,flight_no,visa_type
0,6,2016,4,ECUADOR,NOT REPORTED/UNKNOWN,2016-04-29,,,,42,Pleasure,,1979,2016-10-28,,,,1897628485,,B2
1,7,2016,4,,"ATLANTA, GA",2016-04-07,Air,ALABAMA,,30,Student,,1991,,M,,,2147483647,296.0,F1
2,15,2016,4,ALBANIA,WASHINGTON DC,2016-04-01,Air,MICHIGAN,2016-08-25,60,Pleasure,M,1961,2016-09-30,M,,OS,666643185,93.0,B2
3,16,2016,4,ALBANIA,"NEW YORK, NY",2016-04-01,Air,MASSACHUSETTS,2016-04-23,33,Pleasure,M,1988,2016-09-30,,,AA,2147483647,199.0,B2
4,17,2016,4,ALBANIA,"NEW YORK, NY",2016-04-01,Air,MASSACHUSETTS,2016-04-23,9,Pleasure,M,2012,2016-09-30,,,AA,2147483647,199.0,B2


In [None]:
## data exploratory by column
# df_i94_clean.groupBy('i94addr').count().orderBy(desc('count')).show(10)
df_i94_clean.groupBy('i94_incoming_country').count().orderBy(desc('count')).show()
df_i94_clean.groupBy('match_flag').count().show()
df_i94_clean.groupBy('gender').count().show()

In [None]:
#write to parquet
df_i94_clean.write.parquet("i94_2016_apr") #.partitionBy('i94addr')

- dataset 2

In [13]:
# the range of tha date: 1743-2013
min_date, max_date = df_tmpt.select(min("dt"), max("dt")).first()
# set 20 year before last recorded year as start date
start_year = datetime.strptime(max_date, '%Y-%m-%d').year - 20

min_date, max_date, start_year

('1743-11-01', '2013-09-01', 1993)

In [14]:
# Performing cleaning tasks here

df_tmpt_clean=df_tmpt.where(df_tmpt.AverageTemperature!="null") \
                    .withColumn("AverageTemperature",col("AverageTemperature").cast(FloatType())) \
                    .withColumn("AverageTemperatureUncertainty",col("AverageTemperatureUncertainty").cast(FloatType())) \
                    .withColumn("year", year(df_tmpt.dt)) \
                    .withColumn("month", month(df_tmpt.dt)) \
                    .withColumn("dt",col("dt").cast(DateType())) \
                    .withColumnRenamed("dt","date") \
                    .withColumnRenamed("AverageTemperature","average_temperature") \
                    .withColumnRenamed("AverageTemperatureUncertainty","average_temperature_uncertainty") \
                    .withColumnRenamed("City","city") \
                    .withColumnRenamed("Country","country") \
                    .withColumnRenamed("Latitude","latitude") \
                    .withColumnRenamed("Longitude","longitude")
df_tmpt_clean = df_tmpt_clean.where(df_tmpt_clean.year >= start_year).distinct()

df_tmpt_clean.printSchema()
print(df_tmpt_clean.count())
print(df_tmpt_clean.show(3))

root
 |-- date: date (nullable = true)
 |-- average_temperature: float (nullable = true)
 |-- average_temperature_uncertainty: float (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

870920
+----------+-------------------+-------------------------------+-----+-------+--------+---------+----+-----+
|      date|average_temperature|average_temperature_uncertainty| city|country|latitude|longitude|year|month|
+----------+-------------------+-------------------------------+-----+-------+--------+---------+----+-----+
|2003-04-01|              9.286|                           0.36|Çorlu| Turkey|  40.99N|   27.69E|2003|    4|
|1995-08-01|             21.224|                          0.273|Çorum| Turkey|  40.99N|   34.08E|1995|    8|
|1998-02-01|              1.314|                          0.228|Ç

In [None]:
## explore the distribution of the data by time and by location

# df_tmpt_clean.describe(['Country']).show()
# df_tmpt_clean.select('Country').distinct().count()
df_tmpt_clean.groupBy('Country').count().orderBy(desc('count')).show(10)
df_tmpt_clean.groupBy('month').count().orderBy(desc('count')).show(12)

+-------------+-----+
|      Country|count|
+-------------+-----+
|        India|96968|
|        China|94240|
|United States|63992|
|       Brazil|54560|
|        Japan|43400|
|       Russia|38688|
|    Indonesia|35960|
|       Mexico|24153|
|      Nigeria|22568|
|      Germany|20088|
+-------------+-----+
only showing top 10 rows



In [None]:
#write to parquet
df_tmpt_clean.write.parquet("global_temperature")

 - dateset 3: Demogrphics

In [15]:
# Performing cleaning tasks

df_dmgr_clean =df_dmgr.withColumn("male_population",col("Male Population").cast(IntegerType())) \
                      .withColumn("female_population",col("Female Population").cast(IntegerType())) \
                      .withColumn("median_age",col("Median Age").cast(FloatType())) \
                      .withColumn("total_population",col("Total Population").cast(IntegerType()))\
                      .withColumn("number_of_veterans",col("Number of Veterans").cast(IntegerType()))\
                      .withColumn("foreign_born",col("Foreign-born").cast(IntegerType()))\
                      .withColumn("average_household_size",col("Average Household Size").cast(FloatType()))\
                      .withColumn("Count",col("Count").cast(IntegerType())) \
                      .withColumnRenamed("State Code","state_code") \
                      .withColumnRenamed("City","city") \
                      .withColumnRenamed("State","state") \
                      .withColumnRenamed("Count","count") \
                      .drop("Median Age","Male Population","Male Population","Total Population",
                            "Number of Veterans","Foreign-born","Average Household Size","Female Population")

pd.set_option('display.max_columns', None)
df_dmgr_clean.printSchema()
print(df_dmgr_clean.count())
print(df_dmgr_clean.limit(5).toPandas())

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- count: integer (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- median_age: float (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- number_of_veterans: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- average_household_size: float (nullable = true)

2891
               city          state state_code                       Race  \
0     Silver Spring       Maryland         MD         Hispanic or Latino   
1            Quincy  Massachusetts         MA                      White   
2            Hoover        Alabama         AL                      Asian   
3  Rancho Cucamonga     California         CA  Black or African-American   
4            Newark     New Jersey         NJ                      White   

   cou

In [27]:
## explore the distribution of the data by state, state and race

print(df_dmgr_clean.select('State').distinct().count())
df_dmgr_clean.groupBy('State').count().orderBy(desc('count')).show(10)
df_dmgr_clean.groupBy('State','Race').count().orderBy(desc('count')).show(10)

49
+--------------+-----+
|         State|count|
+--------------+-----+
|    California|  676|
|         Texas|  273|
|       Florida|  222|
|      Illinois|   91|
|    Washington|   85|
|      Colorado|   80|
|       Arizona|   80|
|      Michigan|   79|
|North Carolina|   70|
|      Virginia|   70|
+--------------+-----+
only showing top 10 rows

+----------+--------------------+-----+
|     State|                Race|count|
+----------+--------------------+-----+
|California|  Hispanic or Latino|  137|
|California|               White|  137|
|California|               Asian|  136|
|California|Black or African-...|  136|
|California|American Indian a...|  130|
|     Texas|               White|   57|
|     Texas|  Hispanic or Latino|   57|
|     Texas|               Asian|   56|
|     Texas|Black or African-...|   54|
|     Texas|American Indian a...|   49|
+----------+--------------------+-----+
only showing top 10 rows



In [47]:
#write to parquet: US_demographics
df_dmgr_clean.write.parquet("US_demographics")

In [16]:
## create temperature_df in U.S. by joining democraphics_df
df_city_state = df_dmgr_clean.select('city','state').distinct().withColumnRenamed("city","City_")

df_tmpt_clean_us=df_tmpt_clean.where(df_tmpt_clean.country == "United States")\
                            .join(df_city_state,df_tmpt_clean.city ==  df_city_state.City_,"left") \
                            .drop("country","City_")

In [75]:
#write to parquet: US_temperature table
df_tmpt_clean_us.write.parquet("US_temperature")

- upload parquets into S3

In [None]:
# currently not able to directly store the data into S3, 
# thus the parquet files are uploaded manually

"""
import pathlib
import boto3
import configparser

# BASE_DIR = pathlib.Path(__file__).parent.resolve()

AWS_REGION = "us-east-2"
S3_BUCKET_NAME = "s3a://aws-emr-resources-508919957385-us-east-2/cap/"

s3_client = boto3.client("s3", region_name=AWS_REGION)

def upload_files(file_name, bucket, object_name=None, args=None):
    if object_name is None:
        object_name = file_name

    s3_client.upload_file(file_name, bucket, object_name, ExtraArgs=args)
    print(f"'{file_name}' has been uploaded to '{S3_BUCKET_NAME}'")

upload_files(parquet_dmgr, S3_BUCKET_NAME) # f"{BASE_DIR}/files/demo.txt"
"""
"""
df_dmgr_clean.write.parquet("s3a://aws-emr-resources-508919957385-us-east-2/cap/US_demographics",mode="overwrite")
df_dmgr_clean.write.parquet("s3://aws-emr-resources-508919957385-us-east-2/cap/US_demographics",mode="overwrite")
df_dmgr_clean.write.parquet("s3a://awsbucketmxy666/capstone/",mode="overwrite")
"""


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Star schema with 3 dimension table and 1 fact table: 
The final model needs a complex join operation of 3 sources and 2 of them require aggregation of source data into "state" level. it would be more clear and vulnerable to errors if the process divided into 2 steps: 1. extract and transform data into 3 dimension tables, 2 of which require to be aggregated, 2. join the transformed tables into the final fact table.

fact table:
- Information of travellers and their destination table: get the information of travellers who enter the U.S. and their destination state's demographical and geographical information. it might be helpful to understand some characters of immigration and population.

Dimension table:
- traveller table with basic identity information of travellers
- temperature table in U.S. by state
- Demographics table, including population and their distribution by sex and by race,  in U.S by state


#### 3.2 Mapping Out Data Pipelines
- Create 3 stage tables using 3 parquet files in S3.
- Extract information from stage tables into 3 dimension tables: traveller information table, temperature table from 1993 to 2013 aggregated by state, demographics tables aggregated by state
- Extract information from 3 dimension tables into final fact table by join operation

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

- 4.10 Configuration

In [23]:
config = configparser.ConfigParser()
config.read('dwh.cfg')

conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

- 4.11 Create table

In [18]:
# drop tables
def drop_tables(cur, conn):
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()

# create desired tables
def create_tables(cur, conn):
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()

In [19]:
drop_tables(cur, conn)
create_tables(cur, conn)
print("staging tables and query tables has been successfully created")

staging tables and query tables has been successfully created


- 4.12 stage table

In [20]:
# copy data from s3 and insert data into staging tables
def load_staging_tables(cur, conn):
    for query in copy_table_queries:
        cur.execute(query)
        conn.commit()

In [24]:
load_staging_tables(cur, conn)
print("data sources has been successfully loaded into staging tables")

data sources has been successfully loaded into staging tables


- 4.11 insert data

In [25]:
# insert data into dimension table from staging tables
def insert_tables_dim (cur, conn):
    for query in insert_table_queries_dim :
        cur.execute(query)
        conn.commit()

# insert data into fact table from dimension tables
def insert_tables_fact (cur, conn):
    for query in insert_table_queries_fact :
        cur.execute(query)
        conn.commit()

In [26]:
insert_tables_dim(cur, conn)
insert_tables_fact(cur, conn)
print("data in the staging tables has been successfully inserted into query tables")

data in the staging tables has been successfully inserted into query tables


In [9]:
# close the connection
conn.close()

#### 4.2 Data Quality Checks
Quality checks to ensure the pipeline ran as expected, which include:
- Integrity constraints on the relational database (e.g., unique key, correctness if join operation, etc.)
- Source/Count checks to ensure completeness
 
Run Quality Checks

In [37]:
# Perform quality checks here
# check the uniqueness of the key column
def check_unique_key(dic,cur, conn):
    query_count="""
                SELECT COUNT (*) 
                FROM {};
                """
    query_count_unique_key="""
                SELECT COUNT ( DISTINCT {} ) 
                FROM {};
                """
    query_count_sample="""
                SELECT *
                FROM {}
                LIMIT 3
                """
    
    for table, key in dic.items():
        # check the completeness of the data
        cur.execute(query_count.format(table))
        res_count=cur.fetchone()[0]
        if res_count<1:
            raise Exception(f"Sorry, no record is loaded into TABLE {table}")

        
        # check the uniqueness of the key column and 
        cur.execute(query_count_unique_key.format(key,table))
        res_count_uqnie_key=cur.fetchone()[0]
        if res_count!=res_count_uqnie_key:
            raise Exception(f"Sorry, the {key} column is not unique in the TABLE {table}, please check!")
        
        print(f"congratulation! no error is found in uniqueness and completeness in the TABLE {table}")
        
        # print sample data
        cur.execute(query_count_sample.format(table))
        res_sample=cur.fetchmany(3)
        
        print("sample data:")
        for r_s in res_sample:
            print(r_s)
        print("\n")
    
    
    
# check if the join operation is correctly performed
def check_join(dic,cur, conn):
    query_count="""
                SELECT COUNT (*) 
                FROM {};
                """
    query_count_null="""
                SELECT COUNT (*) 
                FROM {}
                WHERE {} IS NULL
                """
    
    query_count_sample="""
                SELECT *
                FROM {}
                LIMIT 3
                """
    
    for table, columns in dic.items():
        # count the number of records
        cur.execute(query_count.format(table))
        res_count=cur.fetchone()[0]
        
        for column in columns:
            # count null values in assigned column
            cur.execute(query_count_null.format(table, column))
            res_count_null=cur.fetchone()[0]
            
            # if more than half of the value in the column is NULL, raise error
            null_ratio = res_count_null/res_count
            if null_ratio >= 0.5:
                raise Exception(f"{res_count_null} out of {res_count} records in {column} column of TABLE {table} is NULL!\
                \n you may want to check the join opearation")
            
    print(f"congratulation! no error is found in join opearation in the TABLE {table}")
    
    # print sample data
    cur.execute(query_count_sample.format(table))
    res_sample=cur.fetchmany(3)
        
    print("sample data:")
    for r_s in res_sample:
            print(r_s)
    print("\n")

In [38]:
dic_unique_count={"temperature_us_table" : "state",
               "demographics_us_table" : "state",
               "traveler_table" : "cic_id",
               "traveller_destination_table" : "cic_id"}

dic_join={"traveller_destination_table":["state_total_population",
                                            "state_median_age",
                                            "state_foreign_born",
                                            "state_male_population",
                                            "state_female_population"]}

check_unique_key(dic_unique_count,cur, conn)
check_join(dic_join,cur, conn)

congratulation! no error is found in uniqueness and completeness in the TABLE temperature_us_table
sample data:
('Maryland', 14.86, 0.26)
('Oregon', 10.41, 0.31)
('Connecticut', 12.04, 0.33)


congratulation! no error is found in uniqueness and completeness in the TABLE demographics_us_table
sample data:
('Alabama', 36.16, 5163306, 352896, 252541, 2.43, 2448200, 2715106)
('Minnesota', 35.58, 7044165, 321738, 1069888, 2.5, 3478803, 3565362)
('Illinois', 35.71, 22514390, 723049, 4632600, 2.73, 10943864, 11570526)


congratulation! no error is found in uniqueness and completeness in the TABLE traveler_table
sample data:
(459653, 'UNITED KINGDOM', 'ATLANTA, GA', datetime.date(2016, 4, 3), 'Air', 'FLORIDA', datetime.date(2016, 4, 13), 49, 'Pleasure', 1972, datetime.date(2016, 10, 2), 'M', None, 'B2')
(459669, 'UNITED KINGDOM', 'ATLANTA, GA', datetime.date(2016, 4, 3), 'Air', 'GEORGIA', datetime.date(2016, 4, 6), 38, 'Business', 1983, datetime.date(2016, 7, 1), 'F', None, 'WB')
(459685, 'UNI

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
- Choice of technologies:
	- Spark for data lake: Spark is much quicker than Pandas when processing large datasets
	- Store the data as parquet at S3: saving as parquet saves storage space and quick to read
	- Data modelling with ETL pipeline using Redshift: well connected with S3 bucket and easy to operate using python
- Update frequency: Once a month. Because the 2 data sources, "I94 Immigration Data" and "World Temperature Data", are both update or aggregate monthly, and "U.S. City Demographic Data" may not vary largely over month.

Further development under different scenarios:
- The data was increased by 100x: 
to improve performance, i will join all 3 data sources into one file in data lake by broadcast joining "U.S. City Demographic Data" and "World Temperature Data" and ripartition "I94 Immigration Data" by certain columns. 
At the same time, add number of clusters and improve their competence in Redshift, carefully set sort key and distribution key when creating tables to take advantages of the distribution features
- The data populates a dashboard that must be updated on a daily basis by 7am every day: 
construct a pipeline using Airflow which permit operate the pipeline automatically every day at 7 am
- The database needed to be accessed by 100+ people: 
create visitor role for Redshift cluster giving view and query access, then give the secret to authorized people so that they can access the database
