# Data Modeling for US Immigration Department

## Introduction: US immigration Department and Their Analytical Goals

The US immigration Department has grown their immigration record database and wants to move their processes and analytical data onto the cloud. 

This project builds  an ETL pipeline that extracts US immigration Department data and other supporting data from variouys sources. This includes transforming data using Spark into a set of dimension and fact tables and loading it nto S3 for their analytics team to explore and find insights about US immigrants. 

Few key objectives of generating insight is as follows:
- enable more efficient and immigrant friendly
- identify process gaps and security gaps
- forecast resource capacity required by immigration department
- report important reports to various Federal bodies

The project follows the follow steps:
* Step 1: Scope the Project: in README Section
* Step 2: Extract: Gathering & Reading Raw Data from various sources
* Step 3: Explore and Assess the Data
* Step 4: Transform: Building the Data Model, creating Dimension & Fact Tables for the Schema
* Step 5: Data Quality Checks
* Step 6: Load Data into Destination¶
* Step 7: Project Write Up

In [17]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql.functions import udf

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.window import Window
from pyspark.sql import functions as F

from pyspark.sql.functions import monotonically_increasing_id,row_number
from pyspark.sql.functions import col,isnan, when, count

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [5]:
def set_df_columns_nullable(spark, df, column_list, nullable=True):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod

### Step 1: Scope of Project: Details in *README Section*

### Step 2: Extract: Gathering & Reading Raw Data from various sources

In [3]:
# Reading immigration data - Apr 2016 data from source within Workspace
#fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
#df_img = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [5]:
#instantiating Spark Session
#spark = SparkSession.builder.\
#config("spark.jars.repositories", "https://repos.spark-packages.org/").\
#config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
#enableHiveSupport().getOrCreate()

In [6]:
spark = SparkSession.builder\
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .enableHiveSupport().getOrCreate()

In [7]:
# Reading immigration data - Apr 2016 data from source within Spark 
# no need to run as data is already present
#processing the entire Immigration file for Apr 2016
#df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#df_spark.printSchema()
# write to parquet 
# df_spark.write.parquet("sas_data")

In [7]:
# Reading immigration data - Apr 2016 data from source in Spark 
I94_df=spark.read.parquet("input_data/sas_data") 
I94_df.printSchema()
I94_df.show(1)

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [10]:
# UDF to Converting SAS date to datetime
from datetime import datetime, timedelta
from pyspark.sql import types as T
def convert_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), T.DateType())

In [14]:
I94_df = I94_df.withColumn("arr_date", udf_datetime_from_sas(I94_df.arrdate))
I94_df = I94_df.withColumn("dep_date", udf_datetime_from_sas(I94_df.depdate))

I94_df.show(1)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+----------+----------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|  arr_date|  dep_date|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+----------+----------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|2016-04-30|2016-05-08|
+-------

In [24]:
# Reading US Demographics data
fname = 'input_data/us-cities-demographics.csv'
df_usdemog_t = pd.read_csv(fname, delimiter =";")
df_usdemog_t.sample(3)
#len(df_usdemog.index) #2891
#df_usdemog.dtypes

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
1683,Long Beach,California,34.6,238159.0,236013.0,474172,17463.0,127764.0,2.78,CA,Black or African-American,64948
221,Elizabeth,New Jersey,34.2,65896.0,63116.0,129012,1131.0,63413.0,3.18,NJ,Black or African-American,29822
1245,Huntsville,Alabama,38.1,91764.0,97350.0,189114,16637.0,12691.0,2.18,AL,Asian,6566


In [25]:
#df_usdemog = df_usdemog.astype({"Male Population":"int","Female Population":"int"})
float_to_int = ["Male Population", "Female Population", "Number of Veterans", "Foreign-born", "Average Household Size" ]
df_usdemog_t[float_to_int] = df_usdemog_t[float_to_int].fillna(0).astype(int)

rename_col={'Median Age': 'median_age', 'Male Population': 'male_pop', 'Female Population': 'female_pop', 
            'Total Population': 'tot_pop', 'Number of Veterans': 'veteran_pop', 'Foreign-born': 'foreign_born', 
            'Average Household Size': 'avg_hose_size', 'State Code': 'state_code'}
df_usdemog = df_usdemog_t.rename(rename_col, axis='columns')
df_usdemog.dtypes
#df_usdemog.sample(3)

City              object
State             object
median_age       float64
male_pop           int64
female_pop         int64
tot_pop            int64
veteran_pop        int64
foreign_born       int64
avg_hose_size      int64
state_code        object
Race              object
Count              int64
dtype: object

In [31]:
# Reading Temperature Data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)
df_temp['Country'] = df_temp['Country'].str.upper()
df_temp['dt'] = pd.to_datetime(df_temp['dt'])

In [32]:
#df_temp.dtypes
#len(df_temp.index) #8599212
df_temp.sample(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
2582706,2004-11-01,20.032,0.194,Gizeh,EGYPT,29.74N,31.38E
5853245,1943-11-01,25.752,0.19,Phnum Pénh,CAMBODIA,12.05N,105.21E
907795,1812-08-01,,,Bhiwandi,INDIA,20.09N,73.36E
6140490,1974-06-01,19.336,0.525,Qingdao,CHINA,36.17N,121.33E
3366331,1846-05-01,20.444,2.074,Jammu,INDIA,32.95N,75.64E


In [None]:
## Mapping tables

# Reading country codes data 
fname = 'input_data/country_mapping.csv'
dim_countrymapping = pd.read_csv(fname)

# Reading US state codes data 
fname = 'input_data/state_code_dim.csv'
dim_us_state = pd.read_csv(fname)

#Reading port codes
fname = 'input_data/port_code.csv'
dim_port = pd.read_csv(fname)

### Step 3: Explore and Assess the Data
Identify data quality issues, like missing values, duplicate data, etc.

##### *Exploring Immigration data*

- 3096313 records
- Several columns have null values: i94mode, 94addr, depdate, i94bir, visapost,occup, entdepa, entdepd, entdepu, matflag, biryear, dtaddto, gender, insnum, airline, fltno, dep_date
- Yougest member was born in 2019. But that is incorrect as the its a Apr 2016 set

In [59]:
#Null entires in raw immigration data 
I94_df_null = I94_df.select([count(when(col(c).isNull(), c)).alias(c) for c in I94_df.columns])

In [56]:
#I94_df_null = I94_df_null.toPandas()
I94_df_null.to_csv('I94_df_null_f.csv', index = False)

In [49]:
# Summary of columns in raw immigration data; statistics: count, mean, stddev, min, max 
I94_df_summary = I94_df.summary()

In [62]:
#I94_df_summary = I94_df_summary.toPandas()
I94_df_summary.to_csv('I94_df_summary.csv', index = False)

##### *Exploring US demographic data*
- 2891 records
- All fields have 100% fill rates
- There are no duplicates
- Average household size is 2.3 which looks a bit low

In [68]:
#Finding Null entires in raw US demographic data 
print("Null entries :")
df_usdemog_t.isnull().sum()

Null entries :


City                      0
State                     0
Median Age                0
Male Population           0
Female Population         0
Total Population          0
Number of Veterans        0
Foreign-born              0
Average Household Size    0
State Code                0
Race                      0
Count                     0
dtype: int64

In [69]:
#Finding duplicate records in raw US demographic data 
duplicate_usdemog = df_usdemog_t[df_usdemog_t.duplicated()]
print("Duplicate Rows :")
duplicate_usdemog

Duplicate Rows :


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count


In [70]:
#description of raw US demographic data 
print("Description :")
df_usdemog_t.describe()

Description :


Unnamed: 0,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,Count
count,2891.0,2891.0,2891.0,2891.0,2891.0,2891.0,2891.0,2891.0
mean,35.494881,97227.43,101664.0,198966.8,9325.708059,40470.79,2.223798,48963.77
std,4.401617,216210.4,231467.6,447555.9,13196.370589,155422.2,0.485144,144385.6
min,22.9,0.0,0.0,63215.0,0.0,0.0,0.0,98.0
25%,32.8,39289.0,41212.5,80429.0,3728.5,9084.0,2.0,3435.0
50%,35.3,52336.0,53809.0,106782.0,5394.0,18666.0,2.0,13780.0
75%,38.0,86596.0,89589.0,175232.0,9367.5,33878.0,2.0,54447.0
max,70.5,4081698.0,4468707.0,8550405.0,156961.0,3212500.0,4.0,3835726.0


##### *Exploring Temperature Data*
- 8235082 rows
- 364130 have null AverageTemperature and AverageTemperatureUncertainty
- All other field have 100% fill rate
- There are no duplicate rows
- Average temeperatir is 18 Degrees, with max being close to 40


In [42]:
#Null entires in raw Temperature Data 
print("Null entries :")
df_temp.isnull().sum()

Null entries :


dt                                    0
AverageTemperature               364130
AverageTemperatureUncertainty    364130
City                                  0
Country                               0
Latitude                              0
Longitude                             0
dtype: int64

In [43]:
#Findng duplicate records in raw US demographic data 
duplicate_df_temp = df_temp[df_temp.duplicated()]
print("Duplicate Rows :")
duplicate_df_temp

Duplicate Rows :


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude


In [45]:
#description of raw Temperature data 
df_temp.describe()

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty
count,8235082.0,8235082.0
mean,16.72743,1.028575
std,10.35344,1.129733
min,-42.704,0.034
25%,10.299,0.337
50%,18.831,0.591
75%,25.21,1.349
max,39.651,15.396


### Step 4: Transform: Building the Data Model, creating Dimension & Fact Tables

##### Dimension Table: US_Demographics

In [55]:
# Dimension table: US_Demographics

dim_usdemog = df_usdemog.drop(columns=['State'])
dim_usdemog.insert(loc=0, column='demog_id', value=(dim_usdemog.index+1))
#dim_usdemog.sample(3)
#dim_usdemog.dtypes
dim_usdemog=spark.createDataFrame(dim_usdemog) 

In [56]:
dim_usdemog.printSchema()

root
 |-- demog_id: long (nullable = true)
 |-- City: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_pop: long (nullable = true)
 |-- female_pop: long (nullable = true)
 |-- tot_pop: long (nullable = true)
 |-- veteran_pop: long (nullable = true)
 |-- foreign_born: long (nullable = true)
 |-- avg_hose_size: long (nullable = true)
 |-- state_code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: long (nullable = true)



##### Dimension Table: Calendar

In [23]:
# Dimension table: Calendar
from datetime import datetime, timedelta
from pyspark.sql import types as T
def convert_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), T.DateType())

In [24]:
I94_df = I94_df.withColumn("arr_date", udf_datetime_from_sas(I94_df.arrdate))

In [25]:
# extract columns to create time table
dim_calendar = I94_df.selectExpr(
    "arr_date as arr_date",
    "dayofmonth(arr_date) as day",
    "weekofyear(arr_date) as week",
    "month(arr_date) as month",
    "year(arr_date) as year",
    "dayofweek(arr_date) as weekday"
    )
    
dim_calendar= dim_calendar.dropDuplicates()
dim_calendar = set_df_columns_nullable(spark,dim_calendar,['arr_date'], False)

dim_calendar.printSchema()

root
 |-- arr_date: date (nullable = false)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [26]:
dim_calendar.count()
#dim_calendar.show()

30

##### Dimension Table: Weather

In [27]:
## Dimension table: Weather
dim_weather_t1 = df_temp.dropna(subset=['dt', 'AverageTemperature','City', 'Country'])
dim_weather_t1 = dim_weather_t1.sort_values(['City', 'dt']).drop_duplicates('City', keep='last')
#len(dim_weather_t1.index)
dim_weather_t1.shape[0]
dim_weather_t1.sample(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
3005913,2013-08-01,25.783,0.431,Hoshangabad,INDIA,23.31N,77.77E
250141,2013-08-01,18.652,0.595,Americana,BRAZIL,23.31S,48.06W
6600467,2013-08-01,18.346,0.35,Salzgitter,GERMANY,52.24N,10.51E
1449387,2013-08-01,26.98,0.369,Chandrapur,INDIA,20.09N,78.48E
6038754,2013-08-01,15.384,0.814,Pretoria,SOUTH AFRICA,24.92S,28.37E


In [28]:
# Including country code which connects with Fact table
dim_weather = pd.merge(dim_countrymapping, dim_weather_t1, on='Country', suffixes=('_map','_wthr'),how='right')
len(dim_weather.index)
dim_weather.insert(loc=0, column='weather_id', value=(dim_weather.index+1))
dim_weather.rename(columns={'dt': 'measure_date'}, inplace=True)
#dim_weather[['country_code']] = dim_weather[['country_code']].astype(int)
dim_weather[['country_code']] = dim_weather[['country_code']].fillna(0).astype(int)
dim_weather.sample(5)
#dim_weather['Country'].unique()
dim_weather.dtypes
#len(dim_weather.index) #3448

weather_id                                int64
country_code                              int64
Country                                  object
measure_date                     datetime64[ns]
AverageTemperature                      float64
AverageTemperatureUncertainty           float64
City                                     object
Latitude                                 object
Longitude                                object
dtype: object

In [29]:
dim_weather=spark.createDataFrame(dim_weather) 

In [30]:
#dim_weather[dim_weather['country_code'].isna()]['Country'].unique()
dim_weather.printSchema()

root
 |-- weather_id: long (nullable = true)
 |-- country_code: long (nullable = true)
 |-- Country: string (nullable = true)
 |-- measure_date: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



##### Dimension Table: Weather

In [31]:
## Dimension table: Immigrants
dim_immgrant_t1 = I94_df.selectExpr(
    "int(cicid) as immig_id",
    "int(I94CIT) as country_origin",
    "int(BIRYEAR) as birth_year",
    "GENDER as gender",
    "INSNUM as insurance_num",
    "CASE WHEN int(I94VISA) == 1 THEN  'Business' WHEN int(I94VISA) == 2 THEN  'Pleasure' WHEN int(I94VISA) == 3 THEN 'Student' ELSE 'other' END AS visa_category"
    )  
dim_immgrant= dim_immgrant_t1.dropDuplicates(['immig_id'])
dim_immgrant = set_df_columns_nullable(spark,dim_immgrant,['visa_category'])
dim_immgrant = set_df_columns_nullable(spark,dim_immgrant,['immig_id'], False)
#dim_immgrant = pd.merge(dim_countrymapping, dim_immgrant, on='Country', suffixes=('_map','_wthr'),how='right')


#print("dim_immgrant_t1: ", dim_immgrant_t1.count())
#print("dim_immgrant: ", dim_immgrant.count())
dim_immgrant.printSchema()

root
 |-- immig_id: integer (nullable = false)
 |-- country_origin: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- insurance_num: string (nullable = true)
 |-- visa_category: string (nullable = true)



##### Fact Table: Immigration

In [32]:
#Fact table: Immigration
fact_immigrant_t1 = I94_df.selectExpr(
    "int(cicid) as cicid",
    "int(I94CIT) as I94CIT",
    "I94PORT",
    "arr_date",
    "year(arr_date) as year",
    "int(I94MODE) as I94MODE",
    "I94ADDR",
    "dep_date",
    "CASE WHEN int(I94VISA) == 1 THEN  'Business' WHEN int(I94VISA) == 2 THEN  'Pleasure' WHEN int(I94VISA) == 3 THEN 'Student' ELSE 'other' END AS I94VISA",
    "MATFLAG",
    "AIRLINE",
    "ADMNUM",
    "FLTNO",
    "VISATYPE")  
fact_immigrant = fact_immigrant_t1.dropDuplicates()
fact_immigrant = set_df_columns_nullable(spark,fact_immigrant,['cicid'], False)
fact_immigrant = set_df_columns_nullable(spark,fact_immigrant,['I94VISA'])
#fact_immigrant = fact_immigrant.withColumn("immig_id",row_number().over(Window.orderBy(monotonically_increasing_id())))
fact_immigrant = fact_immigrant.withColumn('immig_id', monotonically_increasing_id()+1)
fact_immigrant.show(3)

+-----+------+-------+----------+----+-------+-------+----------+--------+-------+-------+---------------+-----+--------+--------+
|cicid|I94CIT|I94PORT|  arr_date|year|I94MODE|I94ADDR|  dep_date| I94VISA|MATFLAG|AIRLINE|         ADMNUM|FLTNO|VISATYPE|immig_id|
+-----+------+-------+----------+----+-------+-------+----------+--------+-------+-------+---------------+-----+--------+--------+
| 8029|   111|    LOS|2016-04-01|2016|      1|   null|2016-04-02|Pleasure|      M|     AF|5.5461196133E10|00077|      WT|       1|
|12272|   114|    MIA|2016-04-01|2016|      2|   null|2016-04-05|Pleasure|      M|    VES|5.5411217733E10|91285|      WT|       2|
|13442|   116|    PBB|2016-04-01|2016|      3|   null|2016-04-02|Pleasure|      M|   null| 8.223882253E10| LAND|      B2|       3|
+-----+------+-------+----------+----+-------+-------+----------+--------+-------+-------+---------------+-----+--------+--------+
only showing top 3 rows



In [33]:
fact_immigrant.printSchema()

root
 |-- cicid: integer (nullable = false)
 |-- I94CIT: integer (nullable = true)
 |-- I94PORT: string (nullable = true)
 |-- arr_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- I94MODE: integer (nullable = true)
 |-- I94ADDR: string (nullable = true)
 |-- dep_date: date (nullable = true)
 |-- I94VISA: string (nullable = true)
 |-- MATFLAG: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- ADMNUM: double (nullable = true)
 |-- FLTNO: string (nullable = true)
 |-- VISATYPE: string (nullable = true)
 |-- immig_id: long (nullable = false)



### Step 5: Data Quality Checks
2 quality checks are performed:
 * Count checks to ensure completeness: checks if the output schema tables are empty
 * Integrity constraints on the relational database: primary key should not be null in any schema table

In [47]:
# Perform quality checks: empty table
def empty_table(tables, tablename):
    """
    Checks if the schema tables, dim and fact, are empty 
    Parameters: 
        tables: list of table objects
        tablename: list of tale names
    Output:
        fail: flag (1 any table is empty)
        fail_table: list of empty tables 
    """
    fail = 0
    fail_table = []
    i = 0
    for table in tables:
        count = table.count()
        if count < 1:
            fail = 1
            fail_table.append(table)
            print(f"Data quality check failed for {tablename[i]}: 0 records")
        print(f"Data quality check passed for {tablename[i]}: {count} records")
        i = i+1
    return fail, fail_table

In [48]:
tables = [dim_usdemog, dim_weather]
tablename = ['dim_usdemog', 'dim_weather']
(fail_flag, failed_table) = empty_table(tables, tablename)
print(fail_flag,failed_table )

Data quality check passed for dim_usdemog: 2891 records
Data quality check passed for dim_weather: 3448 records
0 []


In [58]:
# Perform quality checks: null primary key table
def null_key(tables, tablename, primary_key):
    """
    Checks if the any primary key in schema tables, dim and fact, has null value 
    Parameters: 
        tables: list of table objects
        tablename: list of tale names
        primary_key: primary keys of the respective tables
    Output:
        fail: 1 any table is empty
        fail_table: list of empty tables 
    """
    fail = 0
    fail_table = []
    i = 0
    for table in tables:
        count = table.where(F.col(primary_key[i]).isNull()).count()
        if count > 1:
            fail = 1
            fail_table.append(table)
            print(f"Data quality check failed for {tablename[i]}: {count} records")
        print(f"Data quality check passed for {tablename[i]}: {count} records")
        i = i+1
    return fail, fail_table

In [62]:
tables = [dim_usdemog, dim_weather]
tablename = ['dim_usdemog', 'dim_weather']
primary_key = ['demog_id', 'weather_id']
(fail_flag, failed_table) = null_key(tables, tablename, primary_key)
print(fail_flag,failed_table )

Data quality check passed for dim_usdemog: 0 records
Data quality check passed for dim_weather: 0 records
0 []


### Step 6: Load Data into Destination

In [None]:
# write tables table to parquet files
output_data = "output_data/"
dim_calendar.write.mode('overwrite').partitionBy("year").parquet(output_data + "dim_calendar")
dim_weather.write.mode('overwrite').parquet(output_data + "dim_weather")
dim_usdemog.write.mode('overwrite').parquet(output_data + "dim_usdemog")
fact_immigration.write.mode('overwrite').partitionBy("arr_year").parquet(output_data + "fact_immigration")
dim_immigrant.write.mode('overwrite').partitionBy("arr_year").parquet(output_data + "dim_immigrant")

### Step 6: Project Write Up

#### 6.1: Data Dictionary
Can be found [here](Data_dictionary.csv) 

#### 6.2 Data Model: Star Schema

##### Why Use Star Schema
With Star Schema there is no need for complex joins when querying data. This makes it very easy to use for business and BI teams. And as a results, queries also run faster as there are no elaborate joins. 

Its is also easy to understand once built and hence any modification is also simple


#### 6.3.Tool and Technology Used
Spark (PySpark and Spark SQL), Python, and AWS S3 has been used to build this ETL pipeline. 
* Since the ETL is done on big data, Spark provides the computing power through it Distributed processing framework. 
* The objective of this initiative is to able host the OLAP DB on the Cloud to enable improved accessibility, security and computing prower in downstream analytics workstream. AWS S3 enable this objective
* Python is the Easy to Use, Highly Compatible with other framworkds such as Spark and AWS, has lots of powerful libraries, and built in data stuctures which makes a labguage of choice for data engineering.

#### 6.4 Approach to problem under different scenarios:
**The data was increased by 100x**: 
If the data is incresed by 100x, we will need more compute/processing power. This can be done by using using AWS EMR clusters. EMR simplifies running big data frameworks such as to process and analyze vast amounts of data.

**The data populates a dashboard that must be updated on a daily basis by 7am every day**: 
Airflow can be used for automated orchestration of data pipeline as in this case. Airflow DAG with appropriate start/end criteria will enable this

**The database needed to be accessed by 100+ people**: 
We can shift to AWS Redshift to query as it provides high availability and limitless concurrency. This is also cost efficient as Redshift doesn't charge per query but by total query volume.