# US IMMIGRATION DATA MODEL
### Data Engineering Capstone Project

#### Project Summary
This is the Uacity Data Engineering Capstone Project, which presents a usecase where both big data engineering concepts and technologies are utilized to bring useful insights out of large and diverse data sources. In this project, a data model is built to enable analysts and business users to answer different types of questions related to the immigration trends to the US.

Questions like *is there a peak season for immigration?* *Do immigrants from warm countries prefer certain States versus those who are coming from cold countries?* *What is the percentage of immigrants with business visas versus those with tourist visas?* All these questions and many more can be easily answered once our data model is built and filled with data.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
import os
import psycopg2
import datetime as dt
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from IPython.display import display
from pyspark.sql.functions import udf, month, year, dayofweek, dayofmonth, weekofyear, \
                                    when, col, split, trim, upper, row_number, \
                                    monotonically_increasing_id             
from datetime import timedelta, datetime
from pyspark.sql.types import StringType, IntegerType, DoubleType, LongType, FloatType, DecimalType, DateType
from pyspark.sql.window import Window


In [2]:

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

#Increasing the memory usage on the drive to to 15GB to avoid running out of memory
spark = SparkSession.builder.config("spark.driver.memory", "15g")\
                            .config("spark.sql.broadcastTimeout", "36000")\
                            .getOrCreate()


In [3]:
pd.options.display.max_columns = None
pd.set_option('max_colwidth', -1)

### Step 1: Scope the Project and Gather Data

#### Scope 
This projects aims at building a star schema data model that allows for better undersanding of the immigration trends to the US.

pyspark is used to process our data, and for this project, Spark locally installed on the Udacity workspace is utilized. However, utilizing Amazon EMR would offer much beter performance.

Generated data model is saved in .parquet format.


#### Describe and Gather Data 
Our data comes from different sources and in different formats described as follows:

* **I94 Immigration Data:** This data comes from the US National Tourism and Trade Office. The data comes in .sas format, and it has information about entries made to the US in 2016. The data also comes with labels descriptions file which provides additional information about the main dataset. More about this dataset can be found [here](trade.gov/national-travel-and-tourism-office).
* **World Temperature Data:** This dataset came from Kaggle, and it keeps track of the global weather information. The data is provided as a .csv file. More about this dataset can be found [here](kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).
* **U.S. City Demographic Data:** This data is offered by OpenSoft, and it provides basic information about different city demographics. The data is also provided in .csv format. You can read more about it [here](public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
* **Airport Code Table:** This is a simple table of airport codes and corresponding cities. The data is also provided in .csv format. It comes from [here](datahub.io/core/airport-codes#data).



### Step 2: Explore and Assess Data

#### Explore


In [4]:
#Reading the parquet data files from the sas_data folder using Spark which is installed on the workspace.
i94_df = spark.read.load('./sas_data')
i94_df.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [5]:
temperatures_df = spark.read.option("header", "true").csv('../../data2/GlobalLandTemperaturesByCity.csv')
temperatures_df.show(5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



In [6]:
demos_df = spark.read.option("header", "true").option("delimiter", ";").csv("us-cities-demographics.csv")
demos_df.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

In [7]:
airport_codes_df = spark.read.option("header", "true").csv("airport-codes_csv.csv")
airport_codes_df.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

#### Assess Data

Gather information about our datasets and identify issues such as nulls and uneuseful fields.


In [8]:
i94_df.summary("count").show()

+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+-----+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+-------+--------+
|summary|  cicid|  i94yr| i94mon| i94cit| i94res|i94port|arrdate|i94mode|i94addr|depdate| i94bir|i94visa|  count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto| gender|insnum|airline| admnum|  fltno|visatype|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+-----+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+-------+--------+
|  count|3096313|3096313|3096313|3096313|3096313|3096313|3096313|3096074|2943721|2953856|3095511|3096313|3096313| 3096312| 1215063| 8126|3096075|2957884|    392|2957884|3095511|3095836|2682044|113708|3012686|3096313|3076764| 3096313|
+-------+-------+-------+-------+-------+-------+-------+-------

In [9]:
temperatures_df.summary("count").show()

+-------+-------+------------------+-----------------------------+-------+-------+--------+---------+
|summary|     dt|AverageTemperature|AverageTemperatureUncertainty|   City|Country|Latitude|Longitude|
+-------+-------+------------------+-----------------------------+-------+-------+--------+---------+
|  count|8599212|           8235082|                      8235082|8599212|8599212| 8599212|  8599212|
+-------+-------+------------------+-----------------------------+-------+-------+--------+---------+



In [10]:
demos_df.summary("count").show()

+-------+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|summary|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race|Count|
+-------+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|  count|2891| 2891|      2891|           2888|             2888|            2891|              2878|        2878|                  2875|      2891|2891| 2891|
+-------+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+



In [11]:
airport_codes_df.summary("count").show()

+-------+-----+-----+-----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|summary|ident| type| name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates|
+-------+-----+-----+-----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|  count|55075|55075|55075|       48069|    55075|      55075|     55075|       49399|   41030|     9189|     28686|      55075|
+-------+-----+-----+-----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+



#### Step 3: Define the Data Model

Our data model represents a star schema that is composed of one fact table and six dimenssion tables described as follows.


| Table Name        | Columns                                        | Description                      | Type                         |
| ----------------- | ---------------------------------------------- | -------------------------------- | ---------------------------- |
| fact_I94          | cicid, i94yr, i94mon, i94cit, i94res, i94port, arrdate, i94mode, i94addr, depdate, i94bir, i94visa, dtadfile, gender, airline, fltno, visatype, stay                                       | Stores immigration records       | Fact table                            |
| dim_I94visa       | id, type                                       | Stores visa types                | Dimenssion table             |
| dim_I94mode       | id, mode                                       | Stores travel modes              | Dimenssion table             |
| dim_I94port       | code, port, state_code                         | Stores ports                     | Dimenssion table             |
| dim_Country       | code, country, average_temperature, latitude, longitude | Stores country information as well as temperatures     | Dimenssion table    |
| dim_Demographics  | id, state, state_code, city, foreign_born, male_population, average_household_size, total_population, female_population, maiden_age, number_of_veterans, american_indian_and_alaska_native, asian, black_or_african_American, hispanic_or_latino, white | Stores city demographics | Dimenssion table |
| dim_Date          | sasdate, isodate, year, month, week, day, dayofweek, isweekend, season | Stores data information | Dimenssion table |



*The airport dataset was not used in this model as we could not find any useful usecases where such data can be used.*

More details about these tables and the steps followed to process them are discussed in the following sections.



##### i94 Immigration Data

From the statistics above, we can see that the values of the "visapost", "occup", "entdepu", "insnum" columns are mostly null, which means that they can be dropped without affecting our analyses.

We also beleieve that columns "count", "entdepd", "entdepa", "entdepu", "matflag", "dtaddto", "biryear", "admnum" are not useful for our foreseable analyses, and therefore they will be dropped, as well.

The rest of the columns are fine except for a few that contain some nulls so we will keep those columns but after dropping the null values.

In [12]:
i94_df = i94_df.drop("visapost", "occup", "entdepu", "insnum")\
               .drop("count", "entdepd", "entdepa", "entdepu", "matflag", "dtaddto", "biryear", "admnum")
i94_df = i94_df.na.drop(subset=["airline", "gender", "i94addr"])
i94_df = i94_df.dropna(how="all")


Calculating the length of stay can be useful for our analyses.

In [13]:
"""
Converts SAS dates to normal dates
"""
convert_date = udf(lambda x : x if x is None else (datetime(1960, 1, 1).date() + timedelta(x)).isoformat())


In [14]:
#Calculate stay length (in days)
stay = F.datediff(F.to_date(convert_date(i94_df.depdate)), F.to_date(convert_date(i94_df.arrdate)))
i94_df = i94_df.withColumn("stay", stay)


Fixing data types of our final schema.

In [15]:

i94_df = i94_df.withColumn("cicid", i94_df.cicid.cast(IntegerType()))\
                .withColumn("i94yr", i94_df.i94yr.cast(IntegerType()))\
                .withColumn("i94mon", i94_df.i94mon.cast(IntegerType()))\
                .withColumn("i94cit", i94_df.i94cit.cast(IntegerType()))\
                .withColumn("i94res", i94_df.i94res.cast(IntegerType()))\
                .withColumn("i94mode", i94_df.i94mode.cast(IntegerType()))\
                .withColumn("i94bir", i94_df.i94bir.cast(IntegerType()))\
                .withColumn("i94visa", i94_df.i94visa.cast(IntegerType()))\
                .withColumn("arrdate", i94_df.arrdate.cast(IntegerType()))\
                .withColumn("depdate", i94_df.depdate.cast(IntegerType()))

In [16]:
i94_df.show(5)

+-------+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+--------+------+-------+-----+--------+----+
|  cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|dtadfile|gender|airline|fltno|visatype|stay|
+-------+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+--------+------+-------+-----+--------+----+
|5748517| 2016|     4|   245|   438|    LOS|  20574|      1|     CA|  20582|    40|      1|20160430|     F|     QF|00011|      B1|   8|
|5748518| 2016|     4|   245|   438|    LOS|  20574|      1|     NV|  20591|    32|      1|20160430|     F|     VA|00007|      B1|  17|
|5748519| 2016|     4|   245|   438|    LOS|  20574|      1|     WA|  20582|    29|      1|20160430|     M|     DL|00040|      B1|   8|
|5748520| 2016|     4|   245|   438|    LOS|  20574|      1|     WA|  20588|    29|      1|20160430|     F|     DL|00040|      B1|  14|
|5748521| 2016|     4|   245|   438|    LOS|  20


##### fact_I94 Data Dictionary


| Field Name  | Data Type   | Constraints                                      | Description                          |
| ----------- | ------------| ------------------------------------------------ | ------------------------------------ |
| cicid       | Integer     | PRIMARY KEY                                      | Unique identifier                    |
| i94yr       | integer     | NOT NULL                                         | 4-digit year                         |
| i94mon      | integer     | NOT NULL                                         | Month                                |
| i94cit      | integer     | NOT NULL, FK (REFERENCES dim_County.code)        | Country of birth - 3-digit code      |
| i94res      | integer     | NOT NULL, FK (REFERENCES dim_Country.code)       | Country of residency - 3-digit code  |
| i94port     | integer     | NOT NULL, FK (REFERENCES dim_I94port.code)       | Entry port                           |
| i94mode     | integer     | NOT NULL, FK (REFERENCES dim_I94mode.id)         | Transportation mode                  |
| i94addr     | string      | NOT NULL, FK (REFERENCES dim_Demographics.state) | State of arrival                     |
| i94visa     | integer     | NOT NULL, FK (REFERENCES dim_I94visa.id)         | Visa type                            |
| i94bir      | integer     | NOT NULL                                         | Immigrant's age in years             |
| gender      | string      | NOT NULL                                         | Immigrant's gender                   |
| dtadfile    | integer     | NOT NULL                                         | Allowed until                        |
| airline     | string      | NOT NULL                                         | Airline company                      |
| fltno       | string      | NOT NULL                                         | Flight number                        |
| visatype    | string      | NOT NULL                                         | Visa class                           |
| arrdate     | integer     | NOT NULL, FK (REFERENCES dim_date.sasdate)       | Arrival date                         |
| depdate     | integer     | NULL, FK (REFERENCES dim_date.sasdate)           | Departure date                       |
| stay        | integer     | NULL                                             | Lenght of stay - in days             |
|             |             |                                                  |                                      |



##### i94 Immigration Labels Descriptions Data



As mentioned earlier, the immigration dataset comes with an additional dataset that provides information about the lookup values in the main dataset. We will use this dataset to build some lookup tables that are needed to complete our data model.


In [17]:
#Reading the labels .sas file
with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')
    
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic

#Extracting values
i94cit_res = code_mapper(f_content, "i94cntyl")
i94port = code_mapper(f_content, "i94prtl")
i94mode = code_mapper(f_content, "i94model")
i94addr = code_mapper(f_content, "i94addrl")
i94visa = {'1':'Business', '2': 'Pleasure', '3' : 'Student'}



Now that we extracted our lists from the file, we can build our dataframes.


In [18]:
#Build a dataframe for the country codes

i94cit_res_list = list(i94cit_res.items())
i94cit_res_df = spark.createDataFrame(i94cit_res_list)

#Fix column names and types
i94cit_res_df = i94cit_res_df.withColumnRenamed("_1", "code")
i94cit_res_df = i94cit_res_df.withColumnRenamed("_2", "country")
i94cit_res_df = i94cit_res_df.withColumn("code", i94cit_res_df.code.cast(IntegerType()))

#Removing bad data
i94cit_res_df = i94cit_res_df.dropna().drop_duplicates()
i94cit_res_df = i94cit_res_df.filter((~F.lower(i94cit_res_df.country).contains('country')) & \
                     (~F.lower(i94cit_res_df.country).contains('invalid')) & \
                     (~F.lower(i94cit_res_df.country).contains('not show')))\
                        .orderBy("country")

#Fixing the name of Mexico so that it can be linked with the coresponding values in other tables
i94cit_res_df = i94cit_res_df.withColumn("country", 
                                         F.when(F.col("code") == '582', "MEXICO").otherwise(F.col("country")))

#Fix data type
i94cit_res_df = i94cit_res_df.withColumn("code", i94cit_res_df.code.cast(IntegerType()))


In [19]:
i94cit_res_df.show(5)

+----+-----------+
|code|    country|
+----+-----------+
| 236|AFGHANISTAN|
| 101|    ALBANIA|
| 316|    ALGERIA|
| 102|    ANDORRA|
| 324|     ANGOLA|
+----+-----------+
only showing top 5 rows



In [20]:
#Build a dataframe for the ports

i94port_list = list(i94port.items())
i94port_df = spark.createDataFrame(i94port_list)

i94port_df = i94port_df.withColumnRenamed("_1", "code")
i94port_df = i94port_df.withColumn("code", trim(col("code")))
i94port_df = i94port_df.withColumn("port", trim(split(col("_2"), ", ").getItem(0)))\
            .withColumn("state_code", trim(split(col("_2"), ", ").getItem(1)))\
            .drop("_2")\
            .dropDuplicates()\
            .dropna()\
            .orderBy("state_code", "port")


In [21]:
i94port_df.show(5)

+----+--------------------+----------+
|code|                port|state_code|
+----+--------------------+----------+
| ALC|               ALCAN|        AK|
| ANC|           ANCHORAGE|        AK|
| BAR|BAKER AAF - BAKER...|        AK|
| DAC|       DALTONS CACHE|        AK|
| PIZ|DEW STATION PT LA...|        AK|
+----+--------------------+----------+
only showing top 5 rows




##### dim_I94port Data Dictionary

| Field Name   | Data Type   | Constraints         | Description        |
| ------------ | ----------- | ------------------- | ------------------ |
| id           | integer     | PRIMARY KEY         | Unique identifier  |     
| port         | string      | NOT NULL, UNIQUE    | Port name          |
| state_code   | string      | NOT NULL            | State code         |
|              |             |                     |                    |



In [22]:
#Build a dataframe for the modes

i94mode_list = list(i94mode.items())
i94mode_df = spark.createDataFrame(i94mode_list)

#Fix column names and types
i94mode_df = i94mode_df.withColumnRenamed("_1", "id")
i94mode_df = i94mode_df.withColumnRenamed("_2", "mode")
i94mode_df = i94mode_df.withColumn("id", i94mode_df.id.cast(IntegerType()))


In [23]:
i94mode_df.show()

+---+------------+
| id|        mode|
+---+------------+
|  1|         Air|
|  2|         Sea|
|  3|        Land|
|  9|Not reported|
+---+------------+




##### dim_I94mode Data Dictionary

| Field Name   | Data Type    | Constraints      | Description         |
| ------------ | -------------| ---------------- | ------------------- |
| id           | integer      | PRIMARY KEY      | Unique identifier   |
| mode         | string       | NOT NULL, UNIQUE | transportation mode |
|              |              |                  |                     |


As we will discussed later, we will not use the i94addr dataset as is, but instead we are going to merge it with the demographics dataset to build our demographics table which will also contain the data extracted from i94addr.

In [24]:
#Build a datarame for the state codes

i94addr_list = list(i94addr.items())
i94addr_df = spark.createDataFrame(i94addr_list)

#Fix column names and clean data
i94addr_df = i94addr_df.withColumnRenamed("_1", "code")\
                        .withColumnRenamed("_2", "state")  
i94addr_df = i94addr_df.dropna()\
                        .drop_duplicates()


In [25]:
i94addr_df.show()

+----+-----------------+
|code|            state|
+----+-----------------+
|  WY|          WYOMING|
|  TN|        TENNESSEE|
|  KS|           KANSAS|
|  IA|             IOWA|
|  NV|           NEVADA|
|  IN|          INDIANA|
|  PR|      PUERTO RICO|
|  NY|         NEW YORK|
|  RI|     RHODE ISLAND|
|  MN|        MINNESOTA|
|  DC|DIST. OF COLUMBIA|
|  SD|        S. DAKOTA|
|  MI|         MICHIGAN|
|  ME|            MAINE|
|  AL|          ALABAMA|
|  FL|          FLORIDA|
|  PA|     PENNSYLVANIA|
|  CT|      CONNECTICUT|
|  ND|        N. DAKOTA|
|  MA|    MASSACHUSETTS|
+----+-----------------+
only showing top 20 rows



In [26]:
#Build a dataframe for the visa types

i94visa_list = list(i94visa.items())
i94visa_df = spark.createDataFrame(i94visa_list)
i94visa_df = i94visa_df.withColumnRenamed("_1", "id")\
                        .withColumnRenamed("_2", "type")
i94visa_df = i94visa_df.withColumn("id", i94visa_df.id.cast(IntegerType()))


In [27]:
i94visa_df.show()

+---+--------+
| id|    type|
+---+--------+
|  1|Business|
|  2|Pleasure|
|  3| Student|
+---+--------+




##### dim_I94visa Data Dictionary

| Field Name | Data Type  | Constraints       | Description               |
| -----------| -----------| ----------------- | ------------------------- |
| id         | integer    | PRIMARY KEY       | Unique identifier         |
| type       | string     | NOT NULL, UNIQUE  | Visa type                 |
|            |            |                   |                           |




##### Temperature Data

The temperature table holds informaion up to 2013-09-01, which means that we can not get year-per-year information by linking it to the i94 immigration dataset. However, we can group the average temperatures by country and link it with the i94cit dataset and compose a more detailed country dataset.


In [28]:

#Drop nulls, duplicates and unuseful columns
temperatures_df = temperatures_df.drop("dt", "AverageTemperatureUncertainty", "City")
temperatures_df = temperatures_df.dropna().drop_duplicates()

#Capitalize the country field so that we could link it with i94cit dataset
temperatures_df = temperatures_df.withColumn("Country", upper(col("Country")))

#Group by country
temperatures_df = temperatures_df.groupby("Country")\
                                .agg({"AverageTemperature": "mean", "Latitude": "first", "Longitude": "first"})\
                                .orderBy("Country")

#Rename columns
temperatures_df = temperatures_df.withColumnRenamed("Country", "country")\
                                    .withColumnRenamed("avg(AverageTemperature)", "average_temperature")\
                                    .withColumnRenamed("first(Latitude)", "latitude")\
                                    .withColumnRenamed("first(Longitude)", "longitude")

#Fix data types
temperatures_df = temperatures_df.withColumn("average_temperature", temperatures_df.average_temperature.cast(DoubleType()))\


In [29]:
temperatures_df.show(5)

+-----------+-------------------+--------+---------+
|    country|average_temperature|latitude|longitude|
+-----------+-------------------+--------+---------+
|AFGHANISTAN|  14.15373113443278|  36.17N|   69.61E|
|    ALBANIA| 15.525674512441158|  40.99N|   19.17E|
|    ALGERIA| 17.787410937285568|  36.17N|    3.98E|
|     ANGOLA| 21.446687647212784|  12.05S|   13.15E|
|  ARGENTINA|     16.83114138155|  39.38S|   62.43W|
+-----------+-------------------+--------+---------+
only showing top 5 rows




Now that temperatures_df is ready, we can build a unified, more detailed country dataframe.


In [30]:
#Join i94cit_res_df and temperatures_df and drop the redundant colum
countries_df = i94cit_res_df.join(temperatures_df,i94cit_res_df.country ==  temperatures_df.country,"Left")\
                            .drop(temperatures_df.country) #To avoid adding the common column twice


In [31]:
countries_df.show(5)

+----+--------------------+-------------------+--------+---------+
|code|             country|average_temperature|latitude|longitude|
+----+--------------------+-------------------+--------+---------+
| 151|             ARMENIA|  8.363008838891123|  40.99N|   44.73E|
| 512|             BAHAMAS|  24.70784696016771|  24.92N|   78.03W|
| 373|        SOUTH AFRICA|  16.80486927079032|  26.52S|   28.66E|
| 243|               BURMA| 25.652096833353696|  20.09N|   92.13E|
| 726|HEARD AND MCDONAL...|               null|    null|     null|
+----+--------------------+-------------------+--------+---------+
only showing top 5 rows



##### dim_Country Data Dictionary

| Field Name           | Data Type   | Constraints      | Description        |
| -------------------- | ----------- | ---------------- | -------------------- |
| code                 | integer     | PRIMARY KEY      | Country code         |
| country              | string      | NOT NULL, UNIQUE | Country name         |
| average_temperature  | double      | NOT NULL         | Average temperature  |
| latitude             | string      | NOT NULL         | latitude             |
| longitude            | string      | NOT NULL         | longitude            |
|                      |             |                  |                      | 


##### Demographics Data

This data provides useful information about city demographics. However, it is hard to use it in our model in the current form and thereforere, we are going to pivot the numeric valuues as a step in our transformation of this dataset.

The data will be grouped by state_code, state, and city.


In [32]:
#Rename the columns into a format that's easier to deal with while coding
demos_df = demos_df.withColumnRenamed("Median Age", "median_age")\
                    .withColumnRenamed("Male Population", "male_population")\
                    .withColumnRenamed("Female Population", "female_population")\
                    .withColumnRenamed("Total Population", "total_population")\
                    .withColumnRenamed("City", "city")\
                    .withColumnRenamed("State", "state")\
                    .withColumnRenamed("State Code", "state_code")\
                    .withColumnRenamed("Number of Veterans", "number_of_veterans")\
                    .withColumnRenamed("Foreign-born", "foreign_born")\
                    .withColumnRenamed("Average Household Size", "average_household_size")\
                    .withColumnRenamed("Race", "race")\
                    .withColumnRenamed("Count", "total")

#Changing the data types of the numeric columns from string to the corresponding data types. 
#This step is needed so that we could use aggregate functions with this data
demos_df = demos_df.withColumn("median_age",demos_df.median_age.cast(DoubleType()))\
                    .withColumn("average_household_size",demos_df.average_household_size.cast(DoubleType()))\
                    .withColumn("male_population",demos_df.male_population.cast(IntegerType()))\
                    .withColumn("female_population",demos_df.female_population.cast(IntegerType()))\
                    .withColumn("total_population",demos_df.total_population.cast(IntegerType()))\
                    .withColumn("number_of_veterans",demos_df.number_of_veterans.cast(IntegerType()))\
                    .withColumn("foreign_born",demos_df.foreign_born.cast(IntegerType()))\
                    .withColumn("count",demos_df.total.cast(IntegerType()))\
                    .drop("total")

In [33]:
#Pivoting data

#Group by state_code, state, and city
fixed_df = demos_df.groupby(["state", "city"])\
                    .agg({"state_code": "first", "median_age": "first", "male_population": "first",\
                          "female_population": "first", "total_population": "first",\
                          "number_of_veterans": "first", "foreign_born": "first",\
                          "average_household_size": "first"})

#Pivot our data by race
pivot_df = demos_df.groupby(["state", "city"]).pivot("race").sum("count")

#Join both dataframes and do the necessary transofrmation (i.e., rename columns, and fill null numeric values with 0)
demos_df = fixed_df.join(other=pivot_df, on=["state", "city"], how="inner")\
                    .withColumnRenamed("American Indian and Alaska Native", "american_indian_and_alaska_native")\
                    .withColumnRenamed("Asian", "asian")\
                    .withColumnRenamed("Black or African-American", "black_or_african_American")\
                    .withColumnRenamed("Hispanic or Latino", "hispanic_or_latino")\
                    .withColumnRenamed("White", "white")\
                    .withColumnRenamed("first(foreign_born)", "foreign_born")\
                    .withColumnRenamed("first(male_population)", "male_population")\
                    .withColumnRenamed("first(average_household_size)", "average_household_size")\
                    .withColumnRenamed("first(total_population)", "total_population")\
                    .withColumnRenamed("first(median_age)", "maiden_age") \
                    .withColumnRenamed("first(number_of_veterans)", "number_of_veterans")\
                    .withColumnRenamed("first(female_population)", "female_population")\
                    .withColumnRenamed("first(state_code)", "state_code")\
                    .na.fill(0)

In [34]:
demos_df.show(5)

+-----------+-----------+------------+---------------+----------------------+----------------+-----------------+----------+----------+------------------+---------------------------------+-----+-------------------------+------------------+------+
|      state|       city|foreign_born|male_population|average_household_size|total_population|female_population|state_code|maiden_age|number_of_veterans|american_indian_and_alaska_native|asian|black_or_african_American|hispanic_or_latino| white|
+-----------+-----------+------------+---------------+----------------------+----------------+-----------------+----------+----------+------------------+---------------------------------+-----+-------------------------+------------------+------+
| California|     Orange|       34550|          67337|                  3.14|          140995|            73658|        CA|      35.0|              3993|                              658|22457|                     2853|             46255|100083|
| California|Yor


Both the i94addr and the demographics datasets store State information, which means we can join them together and form a single dataset for States. i94addr dataset has 55 states versus 49 states in the demographics table, which means that we should have a left join between the i94addr and demographics datasets, respectively, to make sure all States are covered.


In [35]:
addr_count = i94addr_df.select("code").distinct().count()
demos_count = demos_df.select("state_code").distinct().count()

print(f"{addr_count} States found in i94addr dataset vs. {demos_count} States found in demographics dataset.")

55 States found in i94addr dataset vs. 49 States found in demographics dataset.


In [36]:
#Redundat columns are deleted after the join is performed
demos_df = i94addr_df.join(demos_df, i94addr_df.code == demos_df.state_code, "Left")\
                        .drop(i94addr_df.code)\
                        .drop(demos_df.state)

#Create an id column to be used as a primary key
#In a standard relational model, state_code and city could be used as composite primary key
demos_df = demos_df.withColumn("id", monotonically_increasing_id())


In [37]:
demos_df.show(5)

+-------+----------+------------+---------------+----------------------+----------------+-----------------+----------+----------+------------------+---------------------------------+-----+-------------------------+------------------+-------+-----------+
|  state|      city|foreign_born|male_population|average_household_size|total_population|female_population|state_code|maiden_age|number_of_veterans|american_indian_and_alaska_native|asian|black_or_african_American|hispanic_or_latino|  white|         id|
+-------+----------+------------+---------------+----------------------+----------------+-----------------+----------+----------+------------------+---------------------------------+-----+-------------------------+------------------+-------+-----------+
|ARIZONA|    Tucson|       82220|         264893|                  2.45|          531674|           266781|        AZ|      33.6|             38182|                            24409|24689|                    33900|            231025| 4043


##### dim_Demographics Data Dictionary


| Field Name                        | Data Type  | Constraints   | Description                          |
| --------------------------------- | ---------- | ------------- | ------------------------------------ |
| id                                | long       | PRIMARY KEY   | Unique identifier                    |
| state_code                        | string     | NOT NULL      | State code                           |
| state                             | string     | NOT NULL      | State name                           |
| city                              | string     | NOT NULL      | City name                            |
| average_household_size            | double     | NOT NULL      | Average household size               |
| foreign_born                      | integer    | NOT NULL      | Number of foreign-born residents     |
| maiden_age                        | double     | NOT NULL      | Maiden age                          |
| number_of_veterans                | integer    | NOT NULL      | Number of vetran residents           |
| male_population                   | integer    | NOT NULL      | Number of male populaion             |
| female_population                 | integer    | NOT NULL      | Number of female population          |
| total_population                  | integer    | NOT NULL      | Total number of population           |
| american_indian_and_alaska_native | long       | NOT NULL      | Number of Native American residents  |
| black_or_african_american         | long       | NOT NULL      | Number of African American residents |
| hispanic_or_latino                | long       | NOT NULL      | Number of hispanic residents         |
| white                             | long       | NOT NULL      | Number of white residents            |
|                                   |            |               |                                      |



Although id field is used here as a primary key, what actually identify each raw is the combination of the State and City fields. That is, there is not two cities with the same name under one state - something like composite keys.

##### Date Data

We need to have this table to complete our model. This dataframe will provide detailed information about both arrival and departure dates.
First, we create a function to calculate the weather seasons of our dates, then we construct our dataframe.


In [38]:
@udf(StringType())
def get_season(x):
    """
    Calculates the weather season of the passed month
    """
    try:
        if (x == 12 or x == 1 or x == 2):
            return "Winter"
        elif (x == 3 or x == 4 or x == 5):
            return "Spring"
        elif (x == 6 or x == 7 or x == 8):
            return "Summer"
        else:
            return "Autumn"
    except:
        return None

In [39]:
#Create a dataframe for dates

#Get a unique list of both arrival and departure dates from i94_df, and then combine them together
arrdates_df = i94_df.select("arrdate").distinct()
depdates_df = i94_df.select("depdate").distinct()
date_df = arrdates_df.union(depdates_df).dropDuplicates()
date_df = date_df.withColumnRenamed("arrdate", "sasdate")

iso_date = convert_date(date_df.sasdate)

dt = F.to_date(iso_date)
year = F.year(dt)
month = F.month(dt)
day = F.dayofmonth(dt)
week = F.weekofyear(dt)
day_of_week = F.dayofweek(dt)
is_weekend = day_of_week.isin([1,7]).cast("int")

#Compose the dataframe using the values extracted from the sasdate field
date_df = date_df.withColumn("isodate", dt.cast(DateType()))\
                    .withColumn("year", year.cast(IntegerType()))\
                    .withColumn("month", month.cast(IntegerType()))\
                    .withColumn("week", week.cast(IntegerType()))\
                    .withColumn("day", day.cast(IntegerType()))\
                    .withColumn("dayofweek", day_of_week.cast(IntegerType()))\
                    .withColumn("isweekend", is_weekend)\
                    .withColumn("season", get_season(month))\
                    .dropna()\
                    .orderBy("isodate")


In [40]:
date_df.show(5)

+-------+----------+----+-----+----+---+---------+---------+------+
|sasdate|   isodate|year|month|week|day|dayofweek|isweekend|season|
+-------+----------+----+-----+----+---+---------+---------+------+
|  20226|2015-05-18|2015|    5|  21| 18|        2|        0|Spring|
|  20383|2015-10-22|2015|   10|  43| 22|        5|        0|Autumn|
|  20479|2016-01-26|2016|    1|   4| 26|        3|        0|Winter|
|  20516|2016-03-03|2016|    3|   9|  3|        5|        0|Spring|
|  20518|2016-03-05|2016|    3|   9|  5|        7|        1|Spring|
+-------+----------+----+-----+----+---+---------+---------+------+
only showing top 5 rows




##### dim_Date Data Dictionary

| Field Name           | Data Type  | Constraints      | Description                               |
| -------------------- | -----------| ---------------- | ----------------------------------------- |
| sasdate              | integer    | PRIMARY KEY      | Date in SAS format                        |
| isodate              | date       | NOT NULL, UNIQUE | Date in ISO format                        |
| year                 | integer    | NOT NULL         | Date year                                 |
| month                | integer    | NOT NULL         | Date month                                |
| week                 | integer    | NOT NULL         | Date week                                 |
| day                  | integer    | NOT NULL         | Date day                                  |
| dayofweek            | integer    | NOT NULL         | Number of the week in the year            |
| isweekend            | integer    | NOT NULL         | Checks whether the date is weekend or not |
| season               | string     | NOT NULL         | Weather season                            |
|                      |            |                  |                                           |



##### Airports Data

No common fields were found in this table that could be useful to our analysis, so it was not used in our data model. However, it was transformed and saved like the rest of the tables.

In [41]:
#From the statistics shown earlier, we think the iata_code field is useless as it's mostly null
airport_codes_df = airport_codes_df.drop("iata_code")

#Exctract States from iso_region
airport_codes_df = airport_codes_df.withColumn("country", split(col("iso_region"), "-").getItem(0))\
                                    .withColumn("state", split(col("iso_region"), "-").getItem(1))\
                                    .drop("iso_country")\
                                    .drop("iso_region")

#Split coordinates field into x and y fields
airport_codes_df = airport_codes_df.withColumn("x_coordinate", split(col("coordinates"), ", ").getItem(0))\
                                    .withColumn("y_coordinate", split(col("coordinates"), ", ").getItem(1))\
                                    .drop("coordinates")

#Fix data types
airport_codes_df = airport_codes_df.withColumn("elevation_ft", airport_codes_df.elevation_ft.cast(IntegerType()))\
                                    .withColumn("x_coordinate", airport_codes_df.x_coordinate.cast(DoubleType()))\
                                    .withColumn("y_coordinate", airport_codes_df.y_coordinate.cast(DoubleType()))


In [42]:
airport_codes_df.show(5)

+-----+-------------+--------------------+------------+---------+------------+--------+----------+-------+-----+------------------+-----------------+
|ident|         type|                name|elevation_ft|continent|municipality|gps_code|local_code|country|state|      x_coordinate|     y_coordinate|
+-----+-------------+--------------------+------------+---------+------------+--------+----------+-------+-----+------------------+-----------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|    Bensalem|     00A|       00A|     US|   PA|-74.93360137939453|   40.07080078125|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|       Leoti|    00AA|      00AA|     US|   KS|       -101.473911|        38.704022|
| 00AK|small_airport|        Lowell Field|         450|       NA|Anchor Point|    00AK|      00AK|     US|   AK|    -151.695999146|      59.94919968|
| 00AL|small_airport|        Epps Airpark|         820|       NA|     Harvest|    00AL|      00AL|  

In [43]:
airport_codes_df.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- x_coordinate: double (nullable = true)
 |-- y_coordinate: double (nullable = true)



##### dim_Airport Data Dictionary

| Field Name     | Data Type  | Constraints    | Description                       |
| ---------------|------------| -------------- | --------------------------------- |
| ident          | integer    | PRIMARY KEY    | Row unique identifier             |
| type           | date       | NOT NULL       | Type of airport                   |
| name           | integer    | NOT NULL       | Name of airport                   |
| elevation_ft   | integer    | NOT NULL       | Elevation - in feet               |
| continent      | integer    | NOT NULL       | Continent where the airport is    |
| municipality   | integer    | NOT NULL       | City where theh airport is        |
| gps_code       | integer    | NOT NULL       | Airport's GPS code                |
| local_code     | integer    | NOT NULL       | Airport's local code              |
| country        | string     | NOT NULL       | Country where the airport is      |
| x_coodinate    | double     | NOT NULL       | X coordinate                      |                                      
| y_coordinate   | double     | NOT NULL       | Y coordinate                      |

#### Data Quality Checks

Here, we perform some data quality checks to see if our model is working and that our data is complete.

In [44]:

print("Validaing row count...")

i94_count = i94_df.count()
if i94_count > 0:
    print(f"fact_I94 table created successfuly. {i94_count} records transferred.")
else:
    print("No data found in fact_I94 table")

demos_count = demos_df.count()
if demos_count > 0:
    print(f"dim_Demographics table created successfully. {demos_count} records transferred.")
else:
    print("No data found in dim_Demographics table")

countries_count = countries_df.count()
if countries_count > 0:
    print(f"dim_Country table migrated successfully. {countries_count} records transferred successfully.")
else:
    print("No data found in dim_Country table")

airports_count = airport_codes_df.count()
if airports_count  > 0:
    print(f"dim_Airport table migrated successfully. {airports_count} records transferred successfully.")
else:
    print("No data found in dim_Airports table")

visas_count = i94visa_df.count()
if visas_count > 0:
    print(f"dim_Visa table migrated successfully. {visas_count} records transferred successfully.")
else:
    print("No data found in dim_Visa table")

modes_count = i94mode_df.count()
if modes_count > 0:
    print(f"dim_Mode table migrated successfully. {modes_count} records transferred.")
else:
    print("No data found in dim_Mode table")
    
ports_count = i94port_df.count()
if ports_count > 0:
    print(f"dim_Port table migrated successfully. {ports_count} records transferred successfully.")
else:
    print("No data found in dim_Port table")

dates_count = date_df.count()
if dates_count > 0:
    print(f"dim_Date table migrated successfully. {dates_count} records transferred.")
else:
    print("No data found in dim_Date table")


Validaing row count...
fact_I94 table created successfuly. 2493086 records transferred.
dim_Demographics table created successfully. 602 records transferred.
dim_Country table migrated successfully. 236 records transferred successfully.
dim_Airport table migrated successfully. 55075 records transferred successfully.
dim_Visa table migrated successfully. 3 records transferred successfully.
dim_Mode table migrated successfully. 4 records transferred.
dim_Port table migrated successfully. 582 records transferred successfully.
dim_Date table migrated successfully. 187 records transferred.


In [45]:
#Counts unique values in each datarame and compare them to the number of rows in that dataframe. Both numbers should match.

print("Validating unique keys...")

country_dis_count = countries_df.select("code").distinct().count()
print("checking dim_Country. Pass?", country_dis_count == countries_count)

port_dis_count = i94port_df.select("code").distinct().count()
print("checking dim_I94port. Pass?",  port_dis_count == ports_count)

mode_dis_count = i94mode_df.select("id").distinct().count()
print("checking dim_I94mode. Pass?", mode_dis_count == modes_count)

visa_dis_count = i94visa_df.select("id").distinct().count()
print("checking dim_I94visa. Pass?", visa_dis_count == visas_count)

dates_dis_count = date_df.select("sasdate").distinct().count()
print("checking dim_Date. Pass?", dates_dis_count == dates_count)

demo_dis_count = demos_df.select("state", "city").distinct().count()
print("checking dim_Demographics. Pass?", demo_dis_count == demos_count)


Validating unique keys...
checking dim_Country. Pass? True
checking dim_I94port. Pass? True
checking dim_I94mode. Pass? True
checking dim_I94visa. Pass? True
checking dim_Date. Pass? True
checking dim_Demographics. Pass? True


In [47]:
#Making sure that the changes made to i94cit_res_df during the cleansing process are persisted. Here's just to test one of those changes
print("Validating data...")

invalid_df = countries_df[F.lower(countries_df["country"]).contains("country") |\
                           F.lower(countries_df["country"]).contains("invalid") |\
                           F.lower(countries_df["country"]).contains("not show")]
if(invalid_df.count() == 0):
    print("Data validation passed. Changed made during data transformation have been persisted.")
else:
    print("Dta validation failed. Some of the changes made during data transformation have not been persisted.")


Validating data...
Data validation passed. Changed made during data transformation have been persisted.



#### Save Data Model

Extracted data is saved as .parquet files in the output drectory on this workspace as follows.

In [48]:
print("Saving data...")
save_count = 0
try:
    i94_df.write.mode("overwrite").parquet("output/fact_i94.parquet")
    global save_count 
    save_count += 1
    print("fact_I94 table saved successfully.")
except ex:
    print(f"Error occured while saving fact_I94 table. {ex}")
    
try:
    demos_df.write.partitionBy("state_code", "city").mode("overwrite").parquet("output/dim_demographics.parquet")
    global save_count 
    save_count += 1
    print("dim_Demographics table saved successfully.")
except:
    print("Error occured while saving dim_Demographics table.")

try:
    countries_df.write.partitionBy("code").mode("overwrite").parquet("output/dim_country.parquet")
    global save_count
    save_count += 1
    print("dim_Country table saved successfully.")
except:
    print("Error occured while saving dim_Country table.")
    
try:
    date_df.write.partitionBy("sasdate").mode("overwrite").parquet("output/dim_date.parquet")
    global save_count
    save_count += 1
    print("dim_Date table saved successfully.")
except:
    print("Error occured while saving dim_Date table.")
    
try:
    i94addr_df.write.partitionBy("code").mode("overwrite").parquet("output/dim_i94addr.parquet")
    global save_count
    save_count += 1
    print("dim_I94addr table saved successfully.")
except:
    print("Error occured while saving dim_I94addr table.")
    
try:
    i94mode_df.write.partitionBy("id").mode("overwrite").parquet("output/dim_i94mode.parquet")
    global save_count
    save_count += 1
    print("dim_I94mode table saved successfully.")
except:
    print("Error while saving dim_I94mode table.")
    
try:
    i94visa_df.write.partitionBy("id").mode("overwrite").parquet("output/dim_i94visa.parquet")
    global save_count
    save_count += 1
    print("dim_I94visa table saved successfully.")
except:
    print("Error while saving dim_I94visa table.")
    
try:
    airport_codes_df.write.partitionBy("ident").mode("overwrite").parquet("output/dim_airports.parquet")
    global save_count
    save_count += 1
    print("dim_Airport table saved successfully.")
except:
    print("Error occured while saving dim_Airport table.")

print(f"{save_count} tables saved.")

Saving data...
fact_I94 table saved successfully.
dim_Demographics table saved successfully.
dim_Country table saved successfully.
dim_Date table saved successfully.
dim_I94addr table saved successfully.
dim_I94mode table saved successfully.
dim_I94visa table saved successfully.
dim_Airport table saved successfully.
8 tables saved.


### Step 4: Run ETL to Model Data

The ETL code is organized in a logical order where each code block represents a task or a number of related tasks. Necessary imports and configurations are done at the begining of the code. No further imports or installs are needed.

To execute this ETL, you can run "python3 etl.py" in Terminal.

Some data quality checks are performed after processing and saving data. Processed data will be saved in .parquet format in a separate folder in this project, namely *output*. Some partitioning is applied to the saved data to achieve better perfprmance during the data read operations.

The ETL has been tested and completed successfully before submission.

### Step 5: Complete Project Write Up


This project presents a usecase where big data engineering concepts and technologies such as Apache Spark are used to solve big data problems.

A star schema was chosen for this model due to its simplicity in comparison to normalized data. That is, star schema generally offers faster reads, which is very important for OLAP applications, as it requires less complex and less number of joins between entities to bring back the data together. In comparison to the snowflake schema, in which the dimenssion tables can be normalized by breaking them into smaller tables, star schema is also much simpler and overlly faster as it is less normalized.

The frequency at which data should be refreshed depends on two factors: 1) The frequency at which the data changes. 2) The reporting cycle. We can start with a monthly update and go from there.

The code written here is scalable regardless of the size of our data as long as the used cluster is powerful enough to handle the assigned workload. However, there are some important considerations in terms of scaling this project to accomodate more complex scenarios such as:

* **If the data was increased by 100x.** 
In this project, the Spark instance installed locally on the workspace is used. However, local/stand-alone Spark may not offer the optimum performance for larger datasets. On the contrary, Amazon EMR can be used in such cases as it offers virtually unlimited amount of computing resources. Users can easily scale up or down the size of their cluster depending on the complexity and the amount of the data being processed.
* **If the pipelines were run on a daily basis by 7am.**
Apache Airflow would make an excellent choice for dealing with automated workflows. SLAs can be set in Apache Airflow to meet certain deadlines.
* **If the database needed to be accessed by 100+ people.**
Again, Amazon EMR would make a perfect solution for such scenarios due to the offered flexibility and electicity of cloud computing.


#### Validating Data Model


In [49]:
#Join out dataframes and replace foreign keys with actual value
query_df = i94_df.join(date_df, i94_df.arrdate == date_df.sasdate)\
                    .join(countries_df, i94_df.i94cit == countries_df.code, "left")\
                    .join(i94visa_df, i94_df.i94visa == i94visa_df.id, "left")\
                    .join(i94mode_df, i94_df.i94mode == i94mode_df.id, "left")\
                    .join(i94port_df, i94_df.i94port == i94port_df.code, "left")

query_df = query_df.select(i94_df.cicid,  i94_df.i94bir, i94_df. i94yr, i94_df.i94addr, \
                           i94_df.gender, i94_df.fltno, i94_df.dtadfile, i94_df.fltno, \
                           countries_df.country, date_df.isodate, date_df.season, \
                           i94visa_df.type, i94mode_df.mode, i94port_df.port, i94_df.stay)\
                    .withColumnRenamed("i94addr", "state")\
                    .withColumnRenamed("country", "birthcountry")\
                    .withColumnRenamed("isodate", "arrdate")\
                    .withColumnRenamed("type", "visa")
                    
query_df.show()

+-------+------+-----+-----+------+-----+--------+-----+------------+----------+------+--------+----+------+----+
|  cicid|i94bir|i94yr|state|gender|fltno|dtadfile|fltno|birthcountry|   arrdate|season|    visa|mode|  port|stay|
+-------+------+-----+-----+------+-----+--------+-----+------------+----------+------+--------+----+------+----+
|  51159|    49| 2016|   NE|     M|CSDFD|20160401|CSDFD|        null|2016-04-01|Spring|Business| Air|BANGOR|   1|
|  51160|    44| 2016|   NE|     M|CSDFD|20160401|CSDFD|        null|2016-04-01|Spring|Business| Air|BANGOR|   1|
|4670699|    43| 2016|   TN|     M|08458|20160425|08458|     DENMARK|2016-04-25|Spring|Business| Air|BANGOR|   2|
|4671459|    58| 2016|   TN|     F|08458|20160425|08458|     DENMARK|2016-04-25|Spring|Business| Air|BANGOR|   2|
|4671460|    40| 2016|   TN|     M|8458C|20160425|8458C|     DENMARK|2016-04-25|Spring|Business| Air|BANGOR|   2|
|4671461|    40| 2016|   TN|     M|8458C|20160425|8458C|     DENMARK|2016-04-25|Spring|B

In [50]:
print("Checking data completeness...")
query_count = query_df.count()
print(f"Number of records returned from the test query: {query_count} records.")
print(f"Number of records found in i94_df: {i94_count} records.")
print("Data complete?", query_count == i94_count)

Checking data completeness...
Number of records returned from the test query: 2493086 records.
Number of records found in i94_df: 2493086 records.
Data complete? True


From these numbers, we can see that our star schema allowed us to bring back our data together with a simple query.

Now, let's take one of the records returned from this query and analyze it.
We can see that the record with cicid = 51160 has a null value in the birth_country field, so let's see if there was matching values in the original i94res list extracted from the labels descriptions sas file to validate that our table dim_Country has the correct information.

In [51]:
i94_df.select("*").where(i94_df.cicid == "51160").show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+--------+------+-------+-----+--------+----+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|dtadfile|gender|airline|fltno|visatype|stay|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+--------+------+-------+-----+--------+----+
|51160| 2016|     4|   148|   112|    BGM|  20545|      1|     NE|  20546|    44|      1|20160401|     M|    *GA|CSDFD|      B1|   1|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+--------+------+-------+-----+--------+----+



In [52]:
countries_df.select("*").where(countries_df.code == "148").show()

+----+-------+-------------------+--------+---------+
|code|country|average_temperature|latitude|longitude|
+----+-------+-------------------+--------+---------+
+----+-------+-------------------+--------+---------+



In [53]:
i94cit_res_df.select("*").where(i94cit_res_df.code == "148").show()

+----+-------+
|code|country|
+----+-------+
+----+-------+



As we can see, there are no matching values for country = 148 in neither the exctracted list nor the dimenssion table we created, which means our data is complete and that our model is working.

To avoid such null values, we can update our fact table (i94_df) to subtract all the records with no matching values in the dimenssion tables. Alternatively, we an update our dimenssion tables with the unique values extracted from the fact table. That is, extract the unique lookup values from the fact table and push them to the corresponding dimmension tables.

We can also run some analytical queries to get more insights out of this data. For example, we can get the number of immigrants per season to see if there is more traffic in particular seasons.

In [54]:
group_df = i94_df.join(date_df, i94_df.arrdate == date_df.sasdate)
group_df = group_df.groupBy("season").count()
group_df.show()

+------+-------+
|season|  count|
+------+-------+
|Spring|2493086|
+------+-------+



In [55]:
group_df = i94_df.join(date_df, i94_df.depdate == date_df.sasdate)
group_df = group_df.groupBy("season").count()
group_df.show()

+------+-------+
|season|  count|
+------+-------+
|Spring|2223705|
|Summer| 147698|
|Autumn|  13531|
|Winter|      1|
+------+-------+



We can also get the number of arrivals (or departures) per date.

In [56]:
group_df = i94_df.join(date_df, i94_df.arrdate == date_df.sasdate)
group_df = group_df.groupBy("isodate").count()
group_df.show()

+----------+-----+
|   isodate|count|
+----------+-----+
|2016-04-01|87915|
|2016-04-02|86499|
|2016-04-03|80258|
|2016-04-04|78716|
|2016-04-05|71279|
|2016-04-06|72368|
|2016-04-07|81558|
|2016-04-08|85140|
|2016-04-09|86005|
|2016-04-10|83150|
|2016-04-11|77843|
|2016-04-12|68102|
|2016-04-13|72679|
|2016-04-14|86135|
|2016-04-15|88673|
|2016-04-16|95072|
|2016-04-17|83690|
|2016-04-18|78796|
|2016-04-19|68745|
|2016-04-20|75635|
+----------+-----+
only showing top 20 rows



These are just simple test queries to validate the correctness of our data model, but more complex queries can be written to get more insights out of our data, if we have a better environment such as Amazon EMR.