# Project Title
### Data Engineering Capstone Project

#### Project Summary
The goal of this project was to create an ETL pipeline for I94 immigration, US demographics, and global land temperatures for analytics database on
immigration events. To show male and female ratio, population and average temperature present on a specific city.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import os
import re
import configparser
import datetime as dt

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.sql import SQLContext
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import *


### Step 1: Scope the Project and Gather Data

#### Scope 
In order to create our analytics database, we must use the following steps:
* Use spark to read and load the data into dataframes.
* Check for missing values of I94 immigration, demographics, and global land temperatures dataset.
* Perform data cleaning on them.
* Create dimension tables.
    * Create immigration table from I94 immigration dataset, this will be link to fact table through valid 194port.
    * Create temperature table from temperature dataset, this will be link to fact table through valid I94port also.
    * Create demographic table from demographic dataset, this will be link to fact table through city.
* Create fact table from I94 immigration dataset
* Create another fact table from demographic dataset.

We used <b>Spark</b> to process these data.
    

#### Describe and Gather Data 
The data comes from the US National Tourism and Trade Office. The project data is in a folder located at ../../data/18-83510-I94-Data-2016/. The immigration data
is an SAS binary database storage format <i>sas7bdat</i>. We use the month of August for this project. However, the program was designed to work with any month's worth data.

In [2]:
# Create Spark Session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [3]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat'
df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(fname)

In [4]:
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,22.0,2016.0,8.0,323.0,323.0,NYC,20667.0,1.0,FL,,...,,,1993.0,D/S,M,,EK,64510500000.0,201,F1
1,55.0,2016.0,8.0,209.0,209.0,AGA,20667.0,1.0,CA,,...,,,1975.0,09142016,M,3955.0,JL,57571870000.0,941,GMT
2,56.0,2016.0,8.0,209.0,209.0,AGA,20667.0,1.0,GU,,...,,,1992.0,09152016,F,3661.0,UA,57571890000.0,874,GMT
3,61.0,2016.0,8.0,213.0,213.0,CHI,20667.0,1.0,WA,20774.0,...,,M,1989.0,D/S,M,,UA,59059190000.0,906,F1
4,64.0,2016.0,8.0,111.0,111.0,BOS,20667.0,1.0,MS,20670.0,...,,M,1982.0,08242016,F,32572.0,QK,61043090000.0,8456,WT


<b><i>Data dictionary</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">cicid</td><td class="tg-0pky">Unique record ID</td>
 <tr><td class="tg-0pky">i94yr</td><td class="tg-0pky">4 digit year</td>
 <tr><td class="tg-0pky">i94mon</td><td class="tg-0pky">Numeric month</td>
 <tr><td class="tg-0pky">i94cit</td><td class="tg-0pky">3 digit code for immigrant country of birth</td>
 <tr><td class="tg-0pky">i94res</td><td class="tg-0pky">3 digit code for immigrant country of residence </td>
 <tr><td class="tg-0pky">i94port</td><td class="tg-0pky">Port of admission</td>
 <tr><td class="tg-0pky">arrdate</td><td class="tg-0pky">Arrival Date in the USA</td>
 <tr><td class="tg-0pky">i94mode</td><td class="tg-0pky">Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)</td>
 <tr><td class="tg-0pky">i94addr</td><td class="tg-0pky">USA State of arrival</td>
 <tr><td class="tg-0pky">depdate</td><td class="tg-0pky">Departure Date from the USA</td>
 <tr><td class="tg-0pky">i94bir</td><td class="tg-0pky">Age of Respondent in Years</td>
 <tr><td class="tg-0pky">i94visa</td><td class="tg-0pky">Visa codes collapsed into three categories</td>
 <tr><td class="tg-0pky">count</td><td class="tg-0pky">Field used for summary statistics</td>
 <tr><td class="tg-0pky">dtadfile</td><td class="tg-0pky">Character Date Field - Date added to I-94 Files</td>
 <tr><td class="tg-0pky">visapost</td><td class="tg-0pky">Department of State where where Visa was issued </td>
 <tr><td class="tg-0pky">occup</td><td class="tg-0pky">Occupation that will be performed in U.S</td>
 <tr><td class="tg-0pky">entdepa</td><td class="tg-0pky">Arrival Flag - admitted or paroled into the U.S.</td>
 <tr><td class="tg-0pky">entdepd</td><td class="tg-0pky">Departure Flag - Departed, lost I-94 or is deceased</td>
 <tr><td class="tg-0pky">entdepu</td><td class="tg-0pky">Update Flag - Either apprehended, overstayed, adjusted to perm residence</td>
 <tr><td class="tg-0pky">matflag</td><td class="tg-0pky">Match flag - Match of arrival and departure records</td>
 <tr><td class="tg-0pky">biryear</td><td class="tg-0pky">4 digit year of birth</td>
 <tr><td class="tg-0pky">dtaddto</td><td class="tg-0pky">Character Date Field - Date to which admitted to U.S. (allowed to stay until)</td>
 <tr><td class="tg-0pky">gender</td><td class="tg-0pky">Non-immigrant sex</td>
 <tr><td class="tg-0pky">insnum</td><td class="tg-0pky">INS number</td>
 <tr><td class="tg-0pky">airline</td><td class="tg-0pky">Airline used to arrive in U.S.</td>
 <tr><td class="tg-0pky">admnum</td><td class="tg-0pky">Admission Number</td>
 <tr><td class="tg-0pky">fltno</td><td class="tg-0pky">Flight number of Airline used to arrive in U.S.</td>
 <tr><td class="tg-0pky">visatype</td><td class="tg-0pky">Class of admission legally admitting the non-immigrant to temporarily stay in U.S.</td>
</table>

In [5]:
print('{:,}'.format(df_immigration.count()))

4,103,570


In [6]:
# Getting Temperature data 
temperature_data = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = spark.read.csv(temperature_data, header=True, inferSchema=True)
df_temp.show(5)

+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



<b><i>Data dictionary</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">dt</td><td class="tg-0pky">Date</td>
 <tr><td class="tg-0pky">AverageTemperature</td><td class="tg-0pky">Global average land temperature in celsius</td>
 <tr><td class="tg-0pky">AverageTemperatureUncertainty</td><td class="tg-0pky">95% confidence interval around the average</td>
 <tr><td class="tg-0pky">City</td><td class="tg-0pky">Name of City</td>
 <tr><td class="tg-0pky">Country</td><td class="tg-0pky">Name of Country</td>
 <tr><td class="tg-0pky">Latitude</td><td class="tg-0pky">City Latitude</td>
 <tr><td class="tg-0pky">Longitude</td><td class="tg-0pky">City Longitude</td>
</table>

In [7]:
print('{:,}'.format(df_temp.count()))

8,599,212


In [8]:
# Read demographic data 
path = 'us-cities-demographics.csv'
df_demographic = spark.read.csv(path, header=True, inferSchema=True, sep=';')
df_demographic.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

<b><i>Data dictionary</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">City</td><td class="tg-0pky">City Name</td>
 <tr><td class="tg-0pky">State</td><td class="tg-0pky">US State where city is located</td>
 <tr><td class="tg-0pky">Median Age</td><td class="tg-0pky">Median age of the population</td>
 <tr><td class="tg-0pky">Male Population</td><td class="tg-0pky">Count of male population</td>
 <tr><td class="tg-0pky">Female Population</td><td class="tg-0pky">Count of female population</td>
 <tr><td class="tg-0pky">Total Population</td><td class="tg-0pky">Count of total population</td>
 <tr><td class="tg-0pky">Number of Veterans</td><td class="tg-0pky">Count of total Veterans</td>
 <tr><td class="tg-0pky">Foreign born</td><td class="tg-0pky">Count of residents of the city that were not born in the city</td>
 <tr><td class="tg-0pky">Average Household Size</td><td class="tg-0pky">Average city household size</td>
 <tr><td class="tg-0pky">State Code</td><td class="tg-0pky">Code of the US state</td>
 <tr><td class="tg-0pky">Race</td><td class="tg-0pky">Respondent race</td>
 <tr><td class="tg-0pky">Count</td><td class="tg-0pky">Count of city's individual per race</td>
</table>

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [9]:
# Check all files in data repository
data_files = os.listdir('../../data/18-83510-I94-Data-2016/')
data_files

['i94_apr16_sub.sas7bdat',
 'i94_sep16_sub.sas7bdat',
 'i94_nov16_sub.sas7bdat',
 'i94_mar16_sub.sas7bdat',
 'i94_jun16_sub.sas7bdat',
 'i94_aug16_sub.sas7bdat',
 'i94_may16_sub.sas7bdat',
 'i94_jan16_sub.sas7bdat',
 'i94_oct16_sub.sas7bdat',
 'i94_jul16_sub.sas7bdat',
 'i94_feb16_sub.sas7bdat',
 'i94_dec16_sub.sas7bdat']

In [10]:
# Performing cleaning tasks here
def check_missing_values(df):
    """
    This function checks null values in a dataframe
    param: df or the dataframe
    return:
        nan_count_df - list of columns, values, and percentage of missing values
    """
     # create a dataframe with missing values count per column
    nan_count_df = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()
    
    # convert dataframe from wide format to long format
    nan_count_df = pd.melt(nan_count_df, var_name='cols', value_name='values')
    
    # count total records in df
    total = df.count()
    
    # now lets add % missing values column
    nan_count_df['% missing values'] = 100*nan_count_df['values']/total
    return nan_count_df

In [11]:
check_missing_values(df_immigration)

Unnamed: 0,cols,values,% missing values
0,cicid,0,0.0
1,i94yr,0,0.0
2,i94mon,0,0.0
3,i94cit,6142,0.149675
4,i94res,0,0.0
5,i94port,0,0.0
6,arrdate,0,0.0
7,i94mode,1884,0.045911
8,i94addr,184462,4.495159
9,depdate,651788,15.883438


In [12]:
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [13]:
df_temp = df_temp.withColumn("dt",col("dt").cast(StringType()))
check_missing_values(df_temp)

Unnamed: 0,cols,values,% missing values
0,dt,0,0.0
1,AverageTemperature,364130,4.234458
2,AverageTemperatureUncertainty,364130,4.234458
3,City,0,0.0
4,Country,0,0.0
5,Latitude,0,0.0
6,Longitude,0,0.0


In [14]:
df_temp.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [15]:
df_temp.count()

8599212

Us-Cities-demographics

In [16]:
# View columns with missing data
nulls_df = pd.DataFrame(data= df_demographic.toPandas().isnull().sum(), columns=['values'])
nulls_df = nulls_df.reset_index()
nulls_df.columns = ['cols', 'values']

# calculate % missing values
nulls_df['% missing values'] = 100*nulls_df['values']/df_demographic.count()
nulls_df[nulls_df['% missing values']>0]

Unnamed: 0,cols,values,% missing values
3,Male Population,3,0.10377
4,Female Population,3,0.10377
6,Number of Veterans,13,0.449671
7,Foreign-born,13,0.449671
8,Average Household Size,16,0.553442


In [18]:
df_demographic.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [19]:
df_demographic.count()

2891

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [20]:
# Path to I94 immigration data 
# Here only april data is loaded but we can load whole millions of data using 
# path = '../../data/18-83510-I94-Data-2016/*.sas7bdat'
immigration_data = '../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat'

In [21]:
# Creating dictionary of i94port which are of valid format for june month
re_compile_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
valid_i94port = {}
with open('valid_194port.txt') as f:
     for line in f:
            match = re_compile_obj.search(line)
            valid_i94port[match[1]]=[match[2]]

In [22]:
# # drop these columns
# columns with over 90% missing values
cols = ['entdepu','occup']

df_new_immigration = df_immigration.drop(*cols)
df_new_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable =

In [23]:
# drop rows with missing values
df_new_immigration = df_new_immigration.dropDuplicates(['cicid'])

df_new_immigration = df_new_immigration.dropna(how='all', subset=['cicid'])

In [24]:
immigration_dim = df_new_immigration.filter(df_new_immigration.i94port.isin(list(valid_i94port)))

In [25]:
# selecting i94port column which contain only valid port
immigration_dim.select(["i94port"]).show()

+-------+
|i94port|
+-------+
|    MIA|
|    WAS|
|    AGA|
|    SAI|
|    SAI|
|    AGA|
|    DET|
|    SAI|
|    CHM|
|    JKM|
|    SAI|
|    SAI|
|    LOS|
|    LEW|
|    HOU|
|    MIA|
|    NYC|
|    SWE|
|    SAI|
|    NYC|
+-------+
only showing top 20 rows



In [26]:
# Create immigration dimentional table
immigration_dim.show()

+-------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|  cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|entdepa|entdepd|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+-------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|  299.0|2016.0|   8.0| 117.0| 117.0|    MIA|20667.0|    1.0|   null|20668.0|  57.0|    2.0|  1.0|20160801|    null|      H|      O|      M| 1959.0|10122016|     M|  null|     YX|6.3985433533E10| 4366|      WT|
|  596.0|2016.0|   8.0| 135.0| 135.0|    WAS|20667.0|    1.0|     VA|20668.0|  43.0|    1.0|  1.0|20160802|    null|      G|      G|      M| 1973.0|    null

In [27]:
# writing data to separet folder in parquet formate and partition by i94port and i94month
immigration_dim.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [28]:
@udf
def get_column(city):
    for key in valid_i94port:
        if key.lower() in valid_i94port[key][0].lower():
            return key

In [29]:
# Adding column i94port to temperature_dim table
temperature_dim = df_temp.withColumn("i94port", get_column(df_temp.City))
temperature_dim = temperature_dim.drop('AverageTemperatureUncertainty')
temperature_dim.show()

+-------------------+-------------------+-----+-------+--------+---------+-------+
|                 dt| AverageTemperature| City|Country|Latitude|Longitude|i94port|
+-------------------+-------------------+-----+-------+--------+---------+-------+
|1743-11-01 00:00:00|              6.068|Århus|Denmark|  57.05N|   10.33E|    ALC|
|1743-12-01 00:00:00|               null|Århus|Denmark|  57.05N|   10.33E|    ALC|
|1744-01-01 00:00:00|               null|Århus|Denmark|  57.05N|   10.33E|    ALC|
|1744-02-01 00:00:00|               null|Århus|Denmark|  57.05N|   10.33E|    ALC|
|1744-03-01 00:00:00|               null|Århus|Denmark|  57.05N|   10.33E|    ALC|
|1744-04-01 00:00:00| 5.7879999999999985|Århus|Denmark|  57.05N|   10.33E|    ALC|
|1744-05-01 00:00:00|             10.644|Århus|Denmark|  57.05N|   10.33E|    ALC|
|1744-06-01 00:00:00| 14.050999999999998|Århus|Denmark|  57.05N|   10.33E|    ALC|
|1744-07-01 00:00:00|             16.082|Århus|Denmark|  57.05N|   10.33E|    ALC|
|174

In [30]:
temperature_dim = temperature_dim.dropDuplicates(['City', 'Country'])
temperature_dim.count()

3490

In [31]:
temperature_dim = temperature_dim.dropna(how='all')
temperature_dim.show()

+-------------------+--------------------+------------+------------------+--------+---------+-------+
|                 dt|  AverageTemperature|        City|           Country|Latitude|Longitude|i94port|
+-------------------+--------------------+------------+------------------+--------+---------+-------+
|1743-11-01 00:00:00|               3.264|   Allentown|     United States|  40.99N|   74.56W|    ALC|
|1779-11-01 00:00:00|0.011999999999999985|      Atyrau|        Kazakhstan|  47.42N|   50.92E|    ALC|
|1825-01-01 00:00:00|  26.069000000000003|     Bintulu|          Malaysia|   2.41N|  113.30E|    ALC|
|1825-01-01 00:00:00|              26.517| Butterworth|          Malaysia|   5.63N|  100.09E|    ALC|
|1845-01-01 00:00:00|              24.995|      Cainta|       Philippines|  15.27N|  120.83E|    ALC|
|1825-01-01 00:00:00|              24.753|      Ciamis|         Indonesia|   7.23S|  107.84E|    ALC|
|1850-01-01 00:00:00|              22.121|      Dodoma|          Tanzania|   5.63S

In [32]:
temperature_dim = temperature_dim.withColumnRenamed("City","city")
temperature_dim.show()

+-------------------+--------------------+------------+------------------+--------+---------+-------+
|                 dt|  AverageTemperature|        city|           Country|Latitude|Longitude|i94port|
+-------------------+--------------------+------------+------------------+--------+---------+-------+
|1743-11-01 00:00:00|               3.264|   Allentown|     United States|  40.99N|   74.56W|    ALC|
|1779-11-01 00:00:00|0.011999999999999985|      Atyrau|        Kazakhstan|  47.42N|   50.92E|    ALC|
|1825-01-01 00:00:00|  26.069000000000003|     Bintulu|          Malaysia|   2.41N|  113.30E|    ALC|
|1825-01-01 00:00:00|              26.517| Butterworth|          Malaysia|   5.63N|  100.09E|    ALC|
|1845-01-01 00:00:00|              24.995|      Cainta|       Philippines|  15.27N|  120.83E|    ALC|
|1825-01-01 00:00:00|              24.753|      Ciamis|         Indonesia|   7.23S|  107.84E|    ALC|
|1850-01-01 00:00:00|              22.121|      Dodoma|          Tanzania|   5.63S

In [33]:
temperature_dim.count()

3490

In [34]:
# writing table to parquet formate
temperature_dim.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [35]:
demographics_dim = df_demographic.select(col("City").alias("city"), col("State").alias("state"), col("Male Population").alias("male_population"), col("Female Population").alias("female_population"), col("Average Household Size").alias("average_household_size"))
demographics_dim.dropDuplicates(["City","State"])
demographics_dim.count()

2891

In [36]:
demographics_dim.show()

+----------------+--------------+---------------+-----------------+----------------------+
|            city|         state|male_population|female_population|average_household_size|
+----------------+--------------+---------------+-----------------+----------------------+
|   Silver Spring|      Maryland|          40601|            41862|                   2.6|
|          Quincy| Massachusetts|          44129|            49500|                  2.39|
|          Hoover|       Alabama|          38040|            46799|                  2.58|
|Rancho Cucamonga|    California|          88127|            87105|                  3.18|
|          Newark|    New Jersey|         138040|           143873|                  2.73|
|          Peoria|      Illinois|          56229|            62432|                   2.4|
|        Avondale|       Arizona|          38712|            41971|                  3.18|
|     West Covina|    California|          51629|            56860|                  3.56|

In [37]:
# writing data to results directory in parquet format
demographics_dim.write.mode("append").partitionBy("city").parquet("/results/demographics.parquet")

In [38]:
# Creating Fact table from Dimenstion tables which were created above

# Create views for each table 
demographics_dim.createOrReplaceTempView("demographics_view")
temperature_dim.createOrReplaceTempView("temperature_view")
immigration_dim.createOrReplaceTempView("immigration_view")

In [39]:
# Create immigration_temperature_table by joining immigration dimention table and temperature_table dimention table
immigration_temperature_table = immigration_dim.join(temperature_dim, on=["i94port"])

In [40]:
immigration_temperature_table.show()

+-------+--------+------+------+------+------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+-------------------+--------------------+------------+------------------+--------+---------+
|i94port|   cicid| i94yr|i94mon|i94cit|i94res|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|entdepa|entdepd|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|                 dt|  AverageTemperature|        city|           Country|Latitude|Longitude|
+-------+--------+------+------+------+------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+-------------------+--------------------+------------+------------------+--------+---------+
|    ALC|508754.0|2016.0|   8.0| 148.0| 112.0|20669.0|    3.0|     AK|20677.0|  58.0|   

In [41]:
immigration_temperature_table.printSchema()

root
 |-- i94port: string (nullable = true)
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable =

In [42]:
# selecting required column for fact table changing column name to readable format
fact_table = immigration_temperature_table.select(col("i94yr").alias("year"),
                                                col("i94mon").alias("month"),
                                                col("i94cit").alias("city_code"), 
                                                col("city").alias("city"),
                                                col("Country").alias("country"),
                                                col("i94port").alias("i94port"),
                                                col("arrdate").alias("arrival_date"),
                                                col("i94mode").alias("travel_mode"),
                                                col("i94addr").alias("i94addr"),
                                                col("depdate").alias("departure_date"),
                                                col("i94bir").alias("i94bir"),
                                                col("i94visa").alias("reason"),
                                                col("count").alias("count"),
                                                col("dtadfile").alias("dtadfile"),
                                                col("visapost").alias("visapost"),
                                                col("entdepa").alias("arrival_flag"),
                                                col("entdepd").alias("departure_flag"),
                                                col("matflag").alias("match_flag"),
                                                col("biryear").alias("birth_year"),
                                                col("dtaddto").alias("stay_until"),
                                                col("gender").alias("gender"),
                                                col("insnum").alias("ins_num"),
                                                col("airline").alias("airline"),
                                                col("admnum").alias("admission_number"),
                                                col("fltno").alias("flight_number"),
                                                col("visatype").alias("visatype"),
                                                col("AverageTemperature").alias("averagetemperature"),
                                                col("Latitude").alias("latitude"),
                                                col("Longitude").alias("longitude")
                                               )
fact_table.printSchema()

root
 |-- year: double (nullable = true)
 |-- month: double (nullable = true)
 |-- city_code: double (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrival_date: double (nullable = true)
 |-- travel_mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- departure_date: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- reason: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- arrival_flag: string (nullable = true)
 |-- departure_flag: string (nullable = true)
 |-- match_flag: string (nullable = true)
 |-- birth_year: double (nullable = true)
 |-- stay_until: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- ins_num: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admission_number: double (nullable = true)
 |-- flight_number: st

In [43]:
fact_table.show()

+------+-----+---------+------------+------------------+-------+------------+-----------+-------+--------------+------+------+-----+--------+--------+------------+--------------+----------+----------+----------+------+-------+-------+----------------+-------------+--------+--------------------+--------+---------+
|  year|month|city_code|        city|           country|i94port|arrival_date|travel_mode|i94addr|departure_date|i94bir|reason|count|dtadfile|visapost|arrival_flag|departure_flag|match_flag|birth_year|stay_until|gender|ins_num|airline|admission_number|flight_number|visatype|  averagetemperature|latitude|longitude|
+------+-----+---------+------------+------------------+-------+------------+-----------+-------+--------------+------+------+-----+--------+--------+------------+--------------+----------+----------+----------+------+-------+-------+----------------+-------------+--------+--------------------+--------+---------+
|2016.0|  8.0|    148.0|   Allentown|     United States

In [44]:
fact_table.count()

3556310

In [45]:
fact_table.write.mode("append").partitionBy(["i94port","city"]).parquet("/results/fact_table.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [46]:
def quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram
    Output: Print outcome of data quality check
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

In [47]:
# Perform data quality check
quality_check(immigration_dim, "immigration table")
quality_check(temperature_dim, "temperature table")
quality_check(demographics_dim, "demographics table")
quality_check(fact_table, "fact table")

Data quality check passed for immigration table with 4094950 records
Data quality check passed for temperature table with 3490 records
Data quality check passed for demographics table with 2891 records
Data quality check passed for fact table with 3556310 records


0

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.