# U.S. Visitors and Immigration Data Exploration 
## Data Engineering Capstone Project

### Project Summary
##### To faciliate data analysis on the visitors and immigrants into the U.S., an ETL pipeline was created to extract I94 immigration data on arrivals to the U.S. and U.S. city temperatures and demographics from various open source datasets into stagging tables for transformation into dimensional and fact tables. Current and retrospective visitor and immigration patterns and distributions can be evaluated. Examples include:
* Statistics and distribution of visa types to the U.S. (business, pleasure, student) and transport mode (air, land, or sea).
* Median age and gender distribution of arrivals.
* Relationship between popularity of arrival cities and their temperature at arrival date.
* How the above distributions change over the seasons/months for each year and also over the years.

#### The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here.
import os
import pandas as pd
import configparser
import math
from pyspark.sql.functions import count, isnull, isnan, col, year, month, round, udf, sum, dayofmonth, dayofweek, month, year, weekofyear
from datetime import datetime, timedelta
from pyspark.sql.types import StringType, StructType, StructField, IntegerType


# https://knowledge.udacity.com/questions/572487
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

# Config file containing AWS credentials.
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

# output path for data model parquet files.
output_data = "./models"

In [2]:
def create_spark_session():
    """
    Creates a Spark session.
    """
    from pyspark.sql import SparkSession
#     spark = SparkSession.builder \
#     .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
#     .enableHiveSupport().getOrCreate()
    spark = SparkSession.builder.getOrCreate()
    return spark

spark = create_spark_session()

In [3]:
def code_mapper(f_content, idx, isnumeric=True):
    """
    Reading of I94_SAS_Labels_Descriptions.SAS to get the country codes, city codes, transport modes, state codes, and visa codes.
    Based on https://knowledge.udacity.com/questions/125439
    """
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    if isnumeric:
        dic = dict([int(i[0].strip()), i[1].strip()] for i in dic if len(i) == 2)
    else:
        dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic

with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')
        
    country_code_dict = code_mapper(f_content, "i94cntyl", isnumeric=True)
    city_code_dict = code_mapper(f_content, "i94prtl", isnumeric=False)
    transport_mode_dict = code_mapper(f_content, "i94model", isnumeric=True)
    state_code_dict = code_mapper(f_content, "i94addrl", isnumeric=False)
    visa_class_dict = {
        1:'Business',
        2: 'Pleasure',
        3: 'Student'
    }

### Step 1: Scope the Project and Gather Data

#### Scope 
Steps include:
1. Load all datasets listed below into PySpark dataframes.
2. Exploratory analysis of each of the datasets, eliminating any neccessary missing data, performing any data transformations, selecting only pertinent data, and any other data cleaning procedures.
3. Design data models - staging, dimensional and fact tables for efficient SQL queries.
4. Create the dimensional and fact tables.
5. Design and perform quality checks on each of the tables.
6. Perform tests and checks to ensure functionality and data accuracy.

Spark and AWS are used.

#### Describe and Gather Data
1. I94 Immigration Data: Data comes from the US National Tourism and Trade Office (https://www.trade.gov/national-travel-and-tourism-office) and contains visitor and immigrant arrival information to the U.S.
2. World Temperature Data: Data comes from Kaggle (https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) and contains average temperatures of cities around the world.
3. U.S. City Demographic Data: Data comes from OpenSoft (https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/), and contains demographics of cities within the U.S.

#### 1.1 I94 Immigration Data

In [4]:
# Read I94 immigration data.
# Based on https://stackoverflow.com/questions/51949414/read-sas-sas7bdat-data-with-spark.
# Note: Unable to read SAS from Spark with Udacity workspace, had error "Exception: Java gateway process exited before sending its port number"
# Restorted to reading parquet data in sas_data
# i94_filename = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
# i94_df = spark.read.format("com.github.saurfang.sas.spark").load(i94_filename)
i94_df = spark.read.load('./sas_data')
i94_df.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

##### Data dictionary of I94 immigration data according to I94_SAS_Labels_Description.SAS.

**Feature**|**Description**
:-----:|:-----:
cicid|Unique record ID
i94yr|4 digit year
i94mon|Numeric month
i94cit|3 digit code for immigrant country of birth
i94res|3 digit code for immigrant country of residence
i94port|Port of admission
arrdate|Arrival Date in the USA
i94mode|Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
i94addr|USA State of arrival
depdate|Departure Date from the USA
i94bir|Age of Respondent in Years
i94visa|Visa codes collapsed into three categories
count|Field used for summary statistics
dtadfile|Character Date Field - Date added to I-94 Files
visapost|Department of State where where Visa was issued
occup|Occupation that will be performed in U.S
entdepa|Arrival Flag - admitted or paroled into the U.S.
entdepd|"Departure Flag - Departed
entdepu|"Update Flag - Either apprehended
matflag|Match flag - Match of arrival and departure records
biryear|4 digit year of birth
dtaddto|Character Date Field - Date to which admitted to U.S. (allowed to stay until)
gender|Non-immigrant sex
insnum|INS number
airline|Airline used to arrive in U.S.
admnum|Admission Number
fltno|Flight number of Airline used to arrive in U.S.
visatype|Code of admission legally admitting the non-immigrant to temporarily stay in U.S.

#### 1.2. World Temperature Data

In [5]:
# Read temperature data.
temperature_filename = "../../data2/GlobalLandTemperaturesByCity.csv"
temperature_df = spark.read.csv(temperature_filename, header=True, sep=',', inferSchema=True)

# Print out the top 5 rows
temperature_df.show(5)

# Print out schema and datatypes.
temperature_df.printSchema()

+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows

root
 |-- dt: timestamp (nullable = tru

##### Data dictionary of world temperature data according to https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data.

**Feature**|**Description**
:-----:|:-----:
dt|Date
AverageTemperature|Global average land temperature in celsius
AverageTemperatureUncertainty|The 95% confidence interval around the average
City|City Name
Country|Country Name
Latitude|City Latitude
Longitude|City Longitude

#### 1.3. U.S. Cities Demographics Data

In [6]:
# Read demographics data.
demographics_filename = "us-cities-demographics.csv"
demographics_df = spark.read.csv(demographics_filename, header=True, sep=';', inferSchema=True)

# Print out the top 5 rows.
demographics_df.show(5)

# Print out schema and datatypes.
demographics_df.printSchema()

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

##### Data dictionary of U.S. cities demographics data.

**Feature**|**Description**
:-----:|:-----:
City|City Name
State|State
Median Age|Median age
Male Population|Total male population
Female Population|Total female population
Total Population|Total population
Number of Veterans|Total veterans
Foreign born|Total foreign born residents
Average Household Size|Average number of people in household
State Code|State code
Race|Race
Count|Total population identifying with race

### Step 2: Explore and Assess the Data
#### Explore the Data and Perform Cleaning and Data Transformations

In [7]:
def count_missing_values(spark_df):
    """
    Count number of missing values in each column
    """
    df = spark_df.toPandas()
    for column in df:
        print(column, df[column].isnull().sum())

In [8]:
@udf(StringType())
def to_PySpark_date(date):
    """
    udf to convert to PySpark date
    Based on https://stackoverflow.com/questions/26923564/convert-sas-numeric-to-python-datetime.
    """
    py_date = None
    if date:
        epoch = datetime(1960, 1, 1)
        py_date = (epoch + timedelta(date)).isoformat()
    return py_date

In [9]:
@udf(StringType())
def city_name_to_code(city):
    """
    udf to find city_code from city name
    """
    city_code = None
    for key, value in city_code_dict.items():
        list_city_splits = value.split(", ")
        if len(list_city_splits) > 1:
            city_name = list_city_splits[0].lower()            
        else:
            city_name = value.lower()
        if city_name == city.lower():
            city_code = key
    return city_code

In [10]:
@udf(StringType())
def country_name_to_code(country):
    """
    udf to find country_code from country name
    """
    country_code = None
    for key, value in country_code_dict.items():
        if country.lower() == value.lower():
            country_code = key
    return country_code

In [11]:
@udf(StringType())
def code_to_country_name(code):
    """
    udf to find country name from code
    """
    return country_code_dict.get(int(code))

In [12]:
@udf(StringType())
def code_to_city_name(code):
    """
    udf to find city name from code
    """
    city_name = city_code_dict.get(code)
    if city_name:
        city_name = city_name.split(", ")[0]
    return city_name

#### Cleaning, Data Transformation and Selection Steps
##### 2.1. I94 Visitor and Immigrant Data

In [13]:
# Initial number of rows.
print(i94_df.count())

# Drop NaN in select columns.
staging_i94_df = i94_df.dropna(how="any", subset=["cicid", "i94addr", "arrdate", "i94cit", "i94res", "i94port"])

# Drop any non-US states.
staging_i94_df = staging_i94_df.filter(i94_df['i94addr'] != "99")

# Convert SAS arrdata to PySpark data, country code to country name, city code to city name
# Cast cicid, i94bir, i94cit, i94res, i94mode, i94visa from double to int
staging_i94_df = staging_i94_df.withColumn("arrival_date", to_PySpark_date(staging_i94_df['arrdate'])) \
    .withColumn("country_name_birth", code_to_country_name(staging_i94_df['i94cit'])) \
    .withColumn("country_name_residence", code_to_country_name(staging_i94_df['i94res'])) \
    .withColumn("city_name", code_to_city_name(staging_i94_df['i94port'])) \
    .withColumn("cicid", col("cicid").cast("int")) \
    .withColumn("i94bir", col("i94bir").cast("int")) \
    .withColumn("i94cit", col("i94cit").cast("int")) \
    .withColumn("i94res", col("i94res").cast("int")) \
    .withColumn("i94mode", col("i94mode").cast("int")) \
    .withColumn("i94visa", col("i94visa").cast("int"))

# Select specific columns.
staging_i94_df = staging_i94_df.select(col("cicid").alias("immigrant_id"), 
                                       col("arrival_date"),
                                       col("i94port").alias("city_code"),
                                       col("city_name"),
                                       col("i94addr").alias("state_code"),
                                       col("i94bir").alias("age"),
                                       col("gender"),
                                       col("i94cit").alias("country_code_birth"),
                                       col("country_name_birth"),
                                       col("i94res").alias("country_code_residence"),
                                       col("country_name_residence"),
                                       col("i94mode").alias("transport_mode"),
                                       col("i94visa").alias("visa_class"),
                                       col("visatype").alias("visa_code")
                                      ).drop_duplicates()

# Final number of rows and schema.
print(staging_i94_df.count())
staging_i94_df.show(5)
staging_i94_df.printSchema()

3096313
2943669
+------------+-------------------+---------+-------------+----------+---+------+------------------+------------------+----------------------+----------------------+--------------+----------+---------+
|immigrant_id|       arrival_date|city_code|    city_name|state_code|age|gender|country_code_birth|country_name_birth|country_code_residence|country_name_residence|transport_mode|visa_class|visa_code|
+------------+-------------------+---------+-------------+----------+---+------+------------------+------------------+----------------------+----------------------+--------------+----------+---------+
|          27|2016-04-01T00:00:00|      BOS|       BOSTON|        MA| 58|     M|               101|           ALBANIA|                   101|               ALBANIA|             1|         1|       B1|
|     1775608|2016-04-10T00:00:00|      LOS|  LOS ANGELES|        CA| 62|     F|               101|           ALBANIA|                   101|               ALBANIA|             1| 

##### 2.2. Cities Temperature Data

In [14]:
# Initial total count before cleaning.
print(temperature_df.count())

# Isolate just U.S. cities, convert dt to datetime.
staging_temperature_df = temperature_df.filter(temperature_df['Country'] == "United States") \
    .withColumn("year", year(temperature_df['dt'])) \
    .withColumn("month", month(temperature_df['dt']))
    
# Isolate just the latest year, convert city name to city code.
print(staging_temperature_df.orderBy('year', ascending=False).show(5))
staging_temperature_df = staging_temperature_df.filter(staging_temperature_df['year'] == "2013") \
    .withColumn("city_code", city_name_to_code(staging_temperature_df['City']))

# Drop NaN for select columns.
staging_temperature_df = staging_temperature_df.dropna(how="any", subset=["dt", "City", "AverageTemperature"])

# Select specific columns.
staging_temperature_df = staging_temperature_df.select(col("year"), 
                                                       col("month"),
                                                       col("city_code"),
                                                       col("City").alias("city_name"),
                                                       round(col("AverageTemperature"), 1).alias("average_temperature")
                                                      ).drop_duplicates()

# Final number of rows and schema.
print(staging_temperature_df.count())
staging_temperature_df.show(5)
staging_temperature_df.printSchema()

8599212
+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+----+-----+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|year|month|
+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+----+-----+
|2013-01-01 00:00:00|              6.32|                        0.267|Abilene|United States|  32.95N|  100.53W|2013|    1|
|2013-06-01 00:00:00|            27.831|          0.24600000000000002|Abilene|United States|  32.95N|  100.53W|2013|    6|
|2013-02-01 00:00:00|             8.116|                        0.222|Abilene|United States|  32.95N|  100.53W|2013|    2|
|2013-03-01 00:00:00|            12.503|                        0.273|Abilene|United States|  32.95N|  100.53W|2013|    3|
|2013-04-01 00:00:00|15.752999999999998|                        0.342|Abilene|United States|  32.95N|  100.53W|2013|    4|
+-------

##### 2.3. Cities Demographics Data

In [15]:
# Initial total count before cleaning.
print(demographics_df.count())

# Drop duplicates.
demographics_df = demographics_df.drop_duplicates()

# Group by race
staging_demographics_df = demographics_df.groupBy("City", "State", "Median Age", "Male Population",
                                        "Female Population","Total Population", "Number of Veterans", 
                                        "Foreign-born", "Average Household Size", "State Code").pivot("Race").sum("Count")

# Normalize population by total.
staging_demographics_df = staging_demographics_df.withColumn("percent_male", (staging_demographics_df['Male Population'] / staging_demographics_df['Total Population']) * 100) \
    .withColumn("percent_female", (staging_demographics_df['Female Population'] / staging_demographics_df['Total Population']) * 100) \
    .withColumn("percent_veterans", (staging_demographics_df['Number of Veterans'] / staging_demographics_df['Total Population']) * 100) \
    .withColumn("percent_foreign_born", (staging_demographics_df['Foreign-born'] / staging_demographics_df['Total Population']) * 100) \
    .withColumn("percent_native", (staging_demographics_df['American Indian and Alaska Native'] / staging_demographics_df['Total Population']) * 100) \
    .withColumn("percent_asian", (staging_demographics_df['Asian'] / staging_demographics_df['Total Population']) * 100) \
    .withColumn("percent_black", (staging_demographics_df['Black or African-American'] / staging_demographics_df['Total Population']) * 100) \
    .withColumn("percent_hispanic", (staging_demographics_df['Hispanic or Latino'] / staging_demographics_df['Total Population']) * 100) \
    .withColumn("percent_white", (staging_demographics_df['White'] / staging_demographics_df['Total Population']) * 100) \
    .withColumn("city_code", city_name_to_code(staging_demographics_df['City']))

# Select specific columns.
staging_demographics_df = staging_demographics_df.select(col("city_code"),
                                                         col("City").alias("city_name"),
                                                         col("State Code").alias("state_code"), 
                                                         col("Median Age").alias("median_age"),
                                                         round(col("percent_male"), 1).alias("percent_male"),
                                                         round(col("percent_female"), 1).alias("percent_female"),
                                                         round(col("percent_veterans"), 1).alias("percent_veterans"),
                                                         round(col("percent_foreign_born"), 1).alias("percent_foreign_born"),
                                                         round(col("percent_native"), 1).alias("percent_native"),
                                                         round(col("percent_asian"), 1).alias("percent_asian"),
                                                         round(col("percent_black"), 1).alias("percent_black"),
                                                         round(col("percent_hispanic"), 1).alias("percent_hispanic"),
                                                         round(col("percent_white"), 1).alias("percent_white"),
                                                         col("Total Population").alias("total_population")
                                                        )

# Drop NaN for select columns. 
staging_demographics_df = staging_demographics_df.dropna(how="any", subset=["city_name", "state_code", "total_population"])

# Final number of rows and schema.
print(staging_demographics_df.count())
staging_demographics_df.show(5)
staging_demographics_df.printSchema()

2891
596
+---------+------------+----------+----------+------------+--------------+----------------+--------------------+--------------+-------------+-------------+----------------+-------------+----------------+
|city_code|   city_name|state_code|median_age|percent_male|percent_female|percent_veterans|percent_foreign_born|percent_native|percent_asian|percent_black|percent_hispanic|percent_white|total_population|
+---------+------------+----------+----------+------------+--------------+----------------+--------------------+--------------+-------------+-------------+----------------+-------------+----------------+
|     null|     Modesto|        CA|      35.2|        49.6|          50.4|             4.7|                18.8|           2.1|          9.2|          4.7|            40.3|         78.9|          211257|
|     null|  Pittsburgh|        PA|      32.9|        49.2|          50.8|             5.8|                 9.3|           0.9|          7.0|         27.0|             3.0|   

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
##### A star schema, consisting of a central fact table of immigration transactions and surrounding dimension tables of immigrant, visa_class, transport_mode, city_temperature, city_demographic, date, is selected for optimum queries on immigration analyses.

![image info](./data_model.png)

##### Dimensional tables
* immigrant - contains immigrant personal information, including age, gender, country of birth, etc.
* visa_class - contains 3 visa classes, business, pleasure or student.
* transport_mode - defined as air, land, sea, or other.
* city_temperature - contains the average temperature per month and year.
* city_demographic - contains population demographics - percentage of each gender, race, veteran, foreign born.
* date - year, month, day, week of year, day of week.

##### Fact table
* immigration - contain information related to arrival of visitor or immigrant to U.S.

##### staging_i94
- immigrant_id
- arrival_date
- city_code
- city_name
- state_code
- age
- gender
- country_code_birth
- country_name_birth
- country_code_residence
- country_name_residence
- transport_mode
- visa_class
- visa_code

##### staging_temperature
- year
- month
- city_code
- city_name
- average_temperature

##### staging_demographics
- city_code
- city_name
- state_code
- median_age
- percent_male
- percent_female
- percent_veterans
- percent_foreign_born
- percent_native
- percent_asian
- percent_black
- percent_hispanic
- percent_white
- total_population

##### immigrant_dim
- immigrant_id INT
- age INT
- gender VARCHAR
- country_code_birth INT
- country_name_birth VARCHAR
- country_code_residence INT
- country_name_residence VARCHAR
- visa_code VARCHAR

##### visa_class_dim
- visa_id INT
- visa_class VARCHAR

##### transport_mode_dim
- mode_id INT
- mode VARCHAR

##### city_temperature_dim
- city_code VARCHAR
- city_name VARCHAR
- year INT
- month INT
- average_temperature FLOAT

##### city_demographics_dim
- city_code VARCHAR
- city_name VARCHAR
- state_code VARCHAR
- median_age FLOAT
- percent_male FLOAT
- percent_female FLOAT
- percent_veterans FLOAT
- percent_foreign_born FLOAT
- percent_native FLOAT
- percent_asian FLOAT
- percent_black FLOAT
- percent_hispanic FLOAT
- percent_white FLOAT
- total_population INT

##### date_dim
- arrival_date VARCHAR
- year INT
- month INT
- day INT
- week_of_year INT
- day_of_week INT

##### immigration_fact
- immigrant_id INT
- arrival_date VARCHAR
- city_code VARCHAR
- city_name VARCHAR
- state_code VARCHAR
- visa_class INT
- transport_mode INT

#### 3.2 Mapping Out Data Pipelines
1. Load all datasets.
2. Eliminate any neccessary missing data, perform data transformations, select only pertinent data.
3. Load to staging tables - staging_i94, staging_temperature, staging_demographics.
4. Create the dimensional tables - immigrant_dim, visa_class_dim, transport_mode_dim, city_temperature_dim, city_demographics_dim, date_dim.
5. Create fact table - immigration_fact.
6. Run quality checks on each of the tables.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [16]:
# Create the dimensional table - immigrant_dim.
immigrant_dim = staging_i94_df.select("immigrant_id",
                                      "age",
                                      "gender",
                                      "country_code_birth",
                                      "country_name_birth",
                                      "country_code_residence",
                                      "country_name_residence",
                                      "visa_code"
                                     ).drop_duplicates()

# Save to parquet.
immigrant_dim.write.parquet(os.path.join(output_data, 'immigrant.parquet'), 'overwrite')
print(immigrant_dim.count())
immigrant_dim.show(5)

2943669
+------------+---+------+------------------+------------------+----------------------+----------------------+---------+
|immigrant_id|age|gender|country_code_birth|country_name_birth|country_code_residence|country_name_residence|visa_code|
+------------+---+------+------------------+------------------+----------------------+----------------------+---------+
|     3832338| 42|     M|               108|           DENMARK|                   108|               DENMARK|       WT|
|     1159817| 17|     F|               111|            FRANCE|                   111|                FRANCE|       WT|
|     1350305| 36|     M|               111|            FRANCE|                   111|                FRANCE|       WT|
|     2881970| 43|     F|               111|            FRANCE|                   111|                FRANCE|       WT|
|     4480315| 25|     M|               111|            FRANCE|                   111|                FRANCE|       WT|
+------------+---+------+-------

In [17]:
# Create the dimensional table - visa_class_dim.
# Based on https://stackoverflow.com/questions/61339594/how-to-convert-a-dictionary-to-dataframe-in-pyspark.
visa_class_dim = spark.createDataFrame(visa_class_dict.items(),
                                      schema=StructType(fields=[
                                          StructField("visa_id", StringType()),
                                          StructField("visa_class", StringType())
                                      ]))

# Save to parquet.
visa_class_dim.write.parquet(os.path.join(output_data, 'visa_class.parquet'), 'overwrite')
print(visa_class_dim.count())
visa_class_dim.show(5)

3
+-------+----------+
|visa_id|visa_class|
+-------+----------+
|      1|  Business|
|      2|  Pleasure|
|      3|   Student|
+-------+----------+



In [18]:
# Create the dimensional table - transport_mode_dim.
# Based on https://stackoverflow.com/questions/61339594/how-to-convert-a-dictionary-to-dataframe-in-pyspark.
transport_mode_dim = spark.createDataFrame(transport_mode_dict.items(),
                                          schema=StructType(fields=[
                                          StructField("mode_id", StringType()),
                                          StructField("transport_mode", StringType())
                                      ]))

# Save to parquet.
transport_mode_dim.write.parquet(os.path.join(output_data, 'transport_mode.parquet'), 'overwrite')
print(transport_mode_dim.count())
transport_mode_dim.show(5)

4
+-------+--------------+
|mode_id|transport_mode|
+-------+--------------+
|      1|           Air|
|      2|           Sea|
|      3|          Land|
|      9|  Not reported|
+-------+--------------+



In [19]:
# Create the dimensional table - city_temperature_dim.
city_temperature_dim = staging_temperature_df.select("city_code", "city_name", "year", "month", "average_temperature").drop_duplicates()

# Save to parquet.
city_temperature_dim.write.parquet(os.path.join(output_data, 'city_temperature.parquet'), 'overwrite')
print(city_temperature_dim.count())
city_temperature_dim.show(5)

2312
+---------+-----------+----+-----+-------------------+
|city_code|  city_name|year|month|average_temperature|
+---------+-----------+----+-----+-------------------+
|     null|Clarksville|2013|    1|                4.8|
|     null|     Dayton|2013|    8|               22.4|
|     null|     Durham|2013|    3|                6.6|
|     null|     Edison|2013|    7|               24.7|
|     null| Fort Wayne|2013|    3|                0.6|
+---------+-----------+----+-----+-------------------+
only showing top 5 rows



In [20]:
# Create the dimensional table - city_demographics_dim.
city_demographic_dim = staging_demographics_df.select("city_code", 
                                                      "city_name", 
                                                      "state_code", 
                                                      "median_age", 
                                                      "percent_male", 
                                                      "percent_female", 
                                                      "percent_veterans",
                                                      "percent_foreign_born",
                                                      "percent_native",
                                                      "percent_asian",
                                                      "percent_black",
                                                      "percent_hispanic",
                                                      "percent_white",
                                                      "total_population"
                                                     ).drop_duplicates()

# Save to parquet.
city_demographic_dim.write.parquet(os.path.join(output_data, 'city_demographic.parquet'), 'overwrite')
print(city_demographic_dim.count())
city_demographic_dim.show(5)

596
+---------+-------------+----------+----------+------------+--------------+----------------+--------------------+--------------+-------------+-------------+----------------+-------------+----------------+
|city_code|    city_name|state_code|median_age|percent_male|percent_female|percent_veterans|percent_foreign_born|percent_native|percent_asian|percent_black|percent_hispanic|percent_white|total_population|
+---------+-------------+----------+----------+------------+--------------+----------------+--------------------+--------------+-------------+-------------+----------------+-------------+----------------+
|      RCM|     Richmond|        CA|      35.3|        48.0|          52.0|             3.3|                38.5|           2.1|         18.3|         25.0|            41.1|         32.9|          109715|
|     null|      Madison|        WI|      30.7|        49.2|          50.8|             3.9|                12.1|           0.9|          9.6|          8.2|             7.9|   

In [21]:
# Create the dimensional table - date_dim.
date_time_dim = staging_i94_df.select("arrival_date").drop_duplicates()
date_time_dim = date_time_dim.withColumn('year', year('arrival_date')) \
    .withColumn("month", month("arrival_date")) \
    .withColumn("week_of_day", weekofyear("arrival_date")) \
    .withColumn("day", dayofmonth("arrival_date"))

# Save to parquet.
date_time_dim.write.parquet(os.path.join(output_data, 'date_time.parquet'), 'overwrite')
print(date_time_dim.count())
date_time_dim.show(5)

30
+-------------------+----+-----+-----------+---+
|       arrival_date|year|month|week_of_day|day|
+-------------------+----+-----+-----------+---+
|2016-04-16T00:00:00|2016|    4|         15| 16|
|2016-04-14T00:00:00|2016|    4|         15| 14|
|2016-04-18T00:00:00|2016|    4|         16| 18|
|2016-04-10T00:00:00|2016|    4|         14| 10|
|2016-04-26T00:00:00|2016|    4|         17| 26|
+-------------------+----+-----+-----------+---+
only showing top 5 rows



In [22]:
# Create the fact table - immigration_fact.
immigration_fact = staging_i94_df.select("immigrant_id", "arrival_date", "city_code", "city_name", "state_code", "visa_class", "transport_mode").drop_duplicates()

# Save to parquet.
immigration_fact.write.parquet(os.path.join(output_data, 'immigration.parquet'), 'overwrite')
print(immigration_fact.count())
immigration_fact.show(5)

2943669
+------------+-------------------+---------+----------------+----------+----------+--------------+
|immigrant_id|       arrival_date|city_code|       city_name|state_code|visa_class|transport_mode|
+------------+-------------------+---------+----------------+----------+----------+--------------+
|     3302204|2016-04-18T00:00:00|      LOS|     LOS ANGELES|        CA|         2|             1|
|     4853697|2016-04-26T00:00:00|      NYC|        NEW YORK|        DC|         2|             1|
|        6954|2016-04-01T00:00:00|      NYC|        NEW YORK|        NY|         2|             1|
|      631588|2016-04-04T00:00:00|      NEW|NEWARK/TETERBORO|        NY|         2|             1|
|     1783463|2016-04-10T00:00:00|      NYC|        NEW YORK|        NY|         2|             1|
+------------+-------------------+---------+----------------+----------+----------+--------------+
only showing top 5 rows



#### 4.2 Data Quality Checks
##### Data quality checks include counting the number of rows in each tables to ensure non-empty tables, as well as on the saved parquet files.

In [23]:
# Data quality to ensure number of rows in each table is at least 1.
dict_tables = {
    'immigrant': immigrant_dim, 
    'visa_class': visa_class_dim, 
    'transport_mode': transport_mode_dim, 
    'city_temperature': city_temperature_dim, 
    'city_demographic': city_demographic_dim, 
    'date_time': date_time_dim, 
    'immigration': immigration_fact
}

def number_rows_in_table(key, table):  
    number_rows = table.count()
    if number_rows > 0:
        print("Data quality check passed, number of rows in " + key + " table is " + str(number_rows))
    else:
        print("Data quality check failed, " + str(number_rows) + "number of rows in " + key + " table.")

for key, table in dict_tables.items():
    number_rows_in_table(key, table)

Data quality check passed, number of rows in immigrant table is 2943669
Data quality check passed, number of rows in visa_class table is 3
Data quality check passed, number of rows in transport_mode table is 4
Data quality check passed, number of rows in city_temperature table is 2312
Data quality check passed, number of rows in city_demographic table is 596
Data quality check passed, number of rows in date_time table is 30
Data quality check passed, number of rows in immigration table is 2943669


In [24]:
# Check to test if saved parquets are readable
for key in dict_tables.keys():
    filename = os.path.join(output_data, key + '.parquet')
    df = spark.read.parquet(filename)
    number_rows_in_table(key, df)

Data quality check passed, number of rows in immigrant table is 2943669
Data quality check passed, number of rows in visa_class table is 3
Data quality check passed, number of rows in transport_mode table is 4
Data quality check passed, number of rows in city_temperature table is 2312
Data quality check passed, number of rows in city_demographic table is 596
Data quality check passed, number of rows in date_time table is 30
Data quality check passed, number of rows in immigration table is 2943669


#### 4.3 Data dictionary 

##### Immigration Fact Table
###### immigration_fact

**Feature**|**Description**
:-----:|:-----:
immigrant_id|Unique record ID
arrival_date|Arrival Date in the USA
city_code|Code for port of admission
city_name|Port of admission
state_code|USA State of arrival
visa_class|Visa codes collapsed into three categories
transport_mode|Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)

##### Immigrant Dimension Table 
###### immigrant_dim

**Feature**|**Description**
:-----:|:-----:
immigrant_id|Unique record ID
age|Age of Respondent in Years
gender|Non-immigrant sex
country_code_birth|3 digit code for immigrant country of birth
country_name_birth|immigrant country of birth
country_code_residence|3 digit code for immigrant country of residence
country_name_residenceh|immigrant country of residence
visa_code|Code of admission legally admitting the non-immigrant to temporarily stay in U.S.

##### Visa Class Dimension Table 
###### visa_class_dim

**Feature**|**Description**
:-----:|:-----:
visa_id|Visa class code
visa_class|Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

##### Transport Mode Dimension Table 
###### transport_mode_dim

**Feature**|**Description**
:-----:|:-----:
mode_id|Transport mode code
Transport mode|Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)

##### City Temperature Dimension Table 
###### city_temperature_dim

**Feature**|**Description**
:-----:|:-----:
city_code|City code
city_name|City name
year|Year
month|Month
average_temperature|Global average land temperature in celsius

##### City Demographic Dimension Table 
###### city_demographic_dim

**Feature**|**Description**
:-----:|:-----:
city_code|City code
city_name|City name
state_code|U.S. State
median_age|Median age
percent_male|Percentage male population
percent_female|Percentage female population
percent_veterans|Percentage veterans
percent_foreign_born|Percentage foreign born residents
percent_native|Percentage American Indian and Alaska Native population
percent_asian|Percentage Asian population
percent_black|Percentage Black population
percent_hispanic|Percentage Hispanic or Latino population
percent_white|Percentage White population
Total Population|Total population

##### Date Time Dimension Table 
###### date_time_dim

**Feature**|**Description**
:-----:|:-----:
arrival_date|Arrival date
year|Year
month|Month
week_of_day|Week of day
day|Day of month

#### Step 5: Complete Project Write Up
* Spark was selected this project due to:
    * able to read very large amounts of data.
    * can read many different file formats - csv, parquet, sas, etc.


* How often the data should be updated and why.
    * Currently I94 data is from April 2016, which can be updated monthly due to the high volume of visitors and immigrants. 
    * Currently city temperatures data are from 2013, which can be updated every 5 years due to larger temperature changes now as a result of climate change.
    * Currently city demographics data are from 2015, which can be updated every 10 years after analysis by US Census Bureau.


* Scenarios:
    * What if the data was increased by 100x,
        * Spark was initially selected for its ability to handle large data, and can handle the increase volume.
        * Scale up EMR in AWS with more nodes can be added, with larger capacities.
    * What if the data populates a dashboard that must be updated on a daily basis by 7am every day,
        * Use Airflow to schedule data pipeline, monitor the DAG in the Airflow UI to spot any issues.
    * What if the database needed to be accessed by 100+ people.
        * Scale up RedShift in AWS.