# Project Title
### Data Engineering Capstone Project

#### Project Summary
The purpose of this project is to create a data warehouse by combining immigration and demography data to create a comprehensive single-source-of-truth database.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
# Do all imports and installs
import pandas as pd
import os
import datetime
import configparser

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DateType, IntegerType, StringType
from pyspark.sql.window import Window
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear, upper
from pyspark.sql.functions import monotonically_increasing_id

In [None]:
# Create Spark session
spark = SparkSession.builder.\
            config("spark.jars.repositories", "https://repos.spark-packages.org/").\
            config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
            enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### 1.1. Scope 
- Data:
    - I94 Immigration Data
    - U.S. City Demographic Data
- End solution description:
    - This project builds an **ETL pipeline** for a **data lake**. We loaded data from S3, process the data into analytics tables using **Spark**, and load them back into S3.
- Tools:
    - Amazon S3
    - Apache Spark

#### 1.2. Describe and Gather Data 
- **I94 Immigration Data:** 
    - [Source](https://www.trade.gov/national-travel-and-tourism-office): This data comes from the US National Tourism and Trade Office.
    - Description: This dataset contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries).
- **U.S. City Demographic Data:**
    - [Source](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/): This data comes from OpenSoft.
    - Description: This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.

##### **I94 Immigration Data**
- In this project, I choose to work with April data but you can work with any months in the year.

In [None]:
os.listdir("../../data/18-83510-I94-Data-2016/")

In [None]:
i94_immigration_spark = spark.read.format("com.github.saurfang.sas.spark").load("../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat")
print("Number of partitions: ", i94_immigration_spark.rdd.getNumPartitions())
print("Number of records: ", i94_immigration_spark.rdd.count())
i94_immigration_spark.printSchema()
i94_immigration_spark.limit(5).toPandas()

##### **U.S. City Demographic Data**

In [None]:
uscities_demographic_spark = spark.read.option("header", True).option(
    "delimiter", ";").option("inferSchema", True).csv("raw_data/us-cities-demographics.csv")
print("Number of partitions: ", uscities_demographic_spark.rdd.getNumPartitions())
print("Number of records: ", uscities_demographic_spark.rdd.count())
uscities_demographic_spark.printSchema()
uscities_demographic_spark.limit(5).toPandas()

### Step 2: Explore and Assess the Data

#### 2.1. Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

##### **I94 Immigration Data**
- Before exploring I94 Immigration data, I want to check out the SAS description file "I94_SAS_Labels_Descriptions.SAS"

In [None]:
def process_sas_des_file(sas_des_file, value, columns):
    """
    Process SAS description file to return value as pandas dataframe
    Arguments:
        sas_des_file (str): SAS description file source.
        value (str): sas value to extract.
        columns (list): list of 2 containing column names.
    Return:
        pandas dataframe
    """
    with open(sas_des_file) as f:
        file = f.read()
    
    file = file[file.index(value):]
    file = file[:file.index(";")]
    
    lines = file.split("\n")[1:]
    codes = []
    values = []
    
    for line in lines:
        if "=" in line:
            code, val = line.split("=")
            code = code.strip()
            val = val.strip()

            if code[0] == "'":
                code = code[1:-1]

            if val[0] == "'":
                val = val[1:-1]

            codes.append(code)
            values.append(val)
        
    return pd.DataFrame(list(zip(codes, values)), columns=columns)

In [None]:
i94cit_res = process_sas_des_file("raw_data/I94_SAS_Labels_Descriptions.SAS", "i94cntyl", ["code", "country"])
i94cit_res.head()

In [None]:
i94cit_res.to_csv("clean_data/i94cit_res.csv")

In [None]:
i94port = process_sas_des_file("raw_data/I94_SAS_Labels_Descriptions.SAS", "i94prtl", ["code", "port"])
i94port.head()

In [None]:
i94port.to_csv("clean_data/i94port.csv")

In [None]:
i94mode = process_sas_des_file("raw_data/I94_SAS_Labels_Descriptions.SAS", "i94model", ["code", "mode"])
i94mode.head()

In [None]:
i94addr = process_sas_des_file("raw_data/I94_SAS_Labels_Descriptions.SAS", "i94addrl", ["code", "addr"])
i94addr.head()

In [None]:
i94addr.to_csv("clean_data/i94addr.csv")

In [None]:
i94visa = process_sas_des_file("raw_data/I94_SAS_Labels_Descriptions.SAS", "I94VISA", ["code", "type"])
i94visa.head()

In [None]:
i94_immigration_spark.select(*[
    (
        F.count(F.when((F.isnan(c) | F.col(c).isNull()), c)) if t not in ("timestamp", "date")
        else F.count(F.when(F.col(c).isNull(), c))
    ).alias(c)
    for c, t in i94_immigration_spark.dtypes if c in i94_immigration_spark.columns
]).show(vertical=True)

##### **U.S. City Demographic Data**

In [None]:
uscities_demographic_spark.select(*[
    (
        F.count(F.when((F.isnan(c) | F.col(c).isNull()), c)) if t not in ("timestamp", "date")
        else F.count(F.when(F.col(c).isNull(), c))
    ).alias(c)
    for c, t in uscities_demographic_spark.dtypes if c in uscities_demographic_spark.columns
]).show(vertical=True)

#### 2.2. Cleaning Steps
Document steps necessary to clean the data

##### **I94 Immigration Data**
- Drop columns "occup", "entdepu", "insnum" which have almost null values
- Drop records that have all null columns
- Select important columns

In [None]:
def convert_sas_date(days):
    """
    Convert SAS description file date stored as days since 01/01/1960 to datetime
    params: days since 01/01/2016
    return: datetime
    """
    if days is None:
        return None
    return datetime.date(1960, 1, 1) + datetime.timedelta(days=days)

def convert_sas_day_month(days):
    """
    Convert SAS description file date stored as days since 01/01/1960 to day in month
    params: days since 01/01/2016
    return: datetime
    """
    if days is None:
        return None
    return (datetime.date(1960, 1, 1) + datetime.timedelta(days=days)).day


def convert_i94mode(mode):
    """
    Convert i94mode to i94mode description in SAS file
    params: i94mode
    return: i94mode description
    """
    if mode == 1:
        return "Air"
    elif mode == 2:
        return "Sea"
    elif mode == 3:
        return "Land"
    else:
        return "Not Reported"


def convert_i94visa(visa):
    """
    Convert i94visa to i94visa description in SAS file
    params: i94visa
    return: i94visa description
    """
    if visa == 1:
        return "Business"
    elif visa == 2:
        return "Pleasure"
    elif visa == 3:
        return "Student"
    else:
        return "Not Reported"
    
convert_sas_date_udf = F.udf(convert_sas_date, DateType())
convert_sas_day_month_udf = F.udf(convert_sas_day_month, IntegerType())
convert_i94mode_udf = F.udf(convert_i94mode, StringType())
convert_i94visa_udf = F.udf(convert_i94visa, StringType())

In [None]:
i94_immigration_spark = i94_immigration_spark.drop(
    *["occup", "entdepu", "insnum"])
i94_immigration_spark = i94_immigration_spark.dropna(how='all')

i94_immigration_spark = i94_immigration_spark.withColumn("arrival_date", convert_sas_date_udf(i94_immigration_spark["arrdate"])) \
    .withColumn("departure_date", convert_sas_date_udf(i94_immigration_spark["depdate"])) \
    .withColumn('arrival_day', convert_sas_day_month_udf(i94_immigration_spark['arrdate'])) \
    .withColumn("year", i94_immigration_spark["i94yr"].cast(IntegerType())) \
    .withColumn("month", i94_immigration_spark["i94mon"].cast(IntegerType())) \
    .withColumn("age", i94_immigration_spark["i94bir"].cast(IntegerType())) \
    .withColumn("birth_year", i94_immigration_spark["biryear"].cast(IntegerType())) \
    .withColumn("country_code", i94_immigration_spark["i94cit"].cast(IntegerType())) \
    .withColumn("city_code", i94_immigration_spark["i94addr"].cast(StringType())) \
    .withColumn("port_code", i94_immigration_spark["i94port"].cast(StringType())) \
    .withColumn("mode", convert_i94mode_udf(i94_immigration_spark["i94mode"])) \
    .withColumn("visa_category", convert_i94visa_udf(i94_immigration_spark["i94visa"]))

i94_immigration_spark = i94_immigration_spark.select(["arrival_date", "departure_date", "arrival_day",
                                                      "year", "month", "age", "birth_year", "country_code",
                                                      "city_code", "port_code", "mode", "visa_category"])

In [None]:
i94_immigration_spark.write.mode("append").partitionBy("year", "month", "arrival_day") \
    .parquet("clean_data/i94_immigration")

In [None]:
# Number of records (before cleaning): 3096313
print("Number of partitions: ", i94_immigration_spark.rdd.getNumPartitions())
print("Number of records (after cleaning): ", i94_immigration_spark.rdd.count())
i94_immigration_spark.printSchema()
i94_immigration_spark.limit(5).toPandas()

##### **U.S. City Demographic Data**
- Drop columns having null values
- Drop duplicated samples based on "City", "State", "State Code", "Race"
- Rename columns

In [None]:
subset_cols = [
    "Male Population",
    "Female Population",
    "Number of Veterans",
    "Foreign-born",
    "Average Household Size"
]
uscities_demographic_spark = uscities_demographic_spark.dropna(subset=subset_cols)
uscities_demographic_spark = uscities_demographic_spark.drop_duplicates(
    ["City", "State", "State Code", "Race"])

uscities_demographic_spark = uscities_demographic_spark.withColumnRenamed("City", "city") \
    .withColumnRenamed("State", "state") \
    .withColumnRenamed("State Code", "state_code") \
    .withColumnRenamed("Median Age", "median_age") \
    .withColumnRenamed("Male Population", "male_population") \
    .withColumnRenamed("Female Population", "female_population") \
    .withColumnRenamed("Total Population", "total_population") \
    .withColumnRenamed("Number of Veterans", "number_of_veterans") \
    .withColumnRenamed("Foreign-born", "foreign_born") \
    .withColumnRenamed("Average Household Size", "average_household_size") \
    .withColumnRenamed("Race", "race") \
    .withColumnRenamed("Count", "count")

uscities_demographic_spark = uscities_demographic_spark.withColumn("city_updated", upper(col("city")))

In [None]:
uscities_demographic_spark.write.mode("overwrite").parquet(
    f"clean_data/uscities_demographic")

In [None]:
# Number of records (before cleaning): 2891
print("Number of partitions: ", uscities_demographic_spark.rdd.getNumPartitions())
print("Number of records (after cleaning): ", uscities_demographic_spark.rdd.count())
uscities_demographic_spark.printSchema()
uscities_demographic_spark.limit(5).toPandas()

### Step 3: Define the Data Model

#### 3.1. Conceptual Data Model
- We use snowflake schema.
- The snowflake schema is similar to the star schema. However, in the snowflake schema, dimensions are normalized into multiple related tables, whereas the star schema's dimensions are denormalized with each dimension represented by a single table.
![](data_model.jpg)

#### 3.2. Mapping Out Data Pipelines

0. Assume all data sets are stored in S3 buckets as below:
    * `[Source_S3_Bucket]/immigration/18-83510-I94-Data-2016/*.sas7bdat`
    * `[Source_S3_Bucket]/raw_data/I94_SAS_Labels_Descriptions.SAS`
    * `[Source_S3_Bucket]/raw_data/us-cities-demographics.csv`
1. Follow by Step 1 - Load the datasets
2. Follow by Step 2
    * Parsing label description file to get auxiliary tables
    * Explore and clean the datasets
3. Follow by Step 4
    * Transform i94 immigration data and us cities demographics to 1 fact table and 5 dimension tables
    * Store these tables back to target S3 bucket

### Step 4: Run Pipelines to Model the Data 

#### 4.1. Create the data model
Build the data pipelines to create the data model.

##### **Create arrival date dimension table**

In [None]:
arrival_date_spark = i94_immigration_spark.select(["arrival_date"]).distinct().withColumnRenamed("arrival_date", "date")
arrival_date_spark = arrival_date_spark.withColumn("year", year("date"))
arrival_date_spark = arrival_date_spark.withColumn("month", month("date"))
arrival_date_spark = arrival_date_spark.withColumn("day", dayofmonth("date"))
arrival_date_spark = arrival_date_spark.withColumn("week", weekofyear("date"))
arrival_date_spark = arrival_date_spark.withColumn("day_of_week", dayofweek("date"))
arrival_date_spark = arrival_date_spark.withColumn("date_id", monotonically_increasing_id())

In [None]:
arrival_date_spark.write.mode("overwrite").parquet(
    f"clean_data/arrival_date")

In [None]:
print("Number of partitions: ", arrival_date_spark.rdd.getNumPartitions())
print("Number of records: ", arrival_date_spark.rdd.count())
arrival_date_spark.printSchema()
arrival_date_spark.limit(5).toPandas()

##### **Create country dimension table**

In [None]:
i94cit_res["code"] = i94cit_res["code"].apply(lambda x: int(x))
dict_country = dict(sorted(i94cit_res.values.tolist()))

@udf()
def get_country_name(country_code):
    if dict_country.get(country_code) is not None:
        return dict_country.get(country_code)
    return None

country_spark = i94_immigration_spark.select(["country_code"]).distinct()
country_spark = country_spark.withColumn("country_name", get_country_name(country_spark["country_code"]))
country_spark = country_spark.withColumn("country_id", monotonically_increasing_id())

In [None]:
country_spark.write.mode("overwrite").parquet(
    f"clean_data/country")

In [None]:
print("Number of partitions: ", country_spark.rdd.getNumPartitions())
print("Number of records: ", country_spark.rdd.count())
country_spark.printSchema()
country_spark.limit(5).toPandas()

##### **Create port dimension table**

In [None]:
dict_port = dict(sorted(i94port.values.tolist()))

@udf()
def get_port_name(port_code):
    if dict_port.get(port_code) is not None:
        return dict_port.get(port_code)
    return None

port_spark = i94_immigration_spark.select(["port_code"]).distinct()
port_spark = port_spark.withColumn("port_name", get_port_name(port_spark["port_code"]))
port_spark = port_spark.withColumn("port_id", monotonically_increasing_id())

In [None]:
port_spark.write.mode("overwrite").parquet(
    f"clean_data/port")

In [None]:
print("Number of partitions: ", port_spark.rdd.getNumPartitions())
print("Number of records: ", port_spark.rdd.count())
port_spark.printSchema()
port_spark.limit(5).toPandas()

##### **Create city dimension table**

In [None]:
dict_city = dict(sorted(i94addr.values.tolist()))

@udf()
def get_city_name(city_code):
    if dict_city.get(city_code) is not None:
        return dict_city.get(city_code)
    return None

city_spark = i94_immigration_spark.select(["city_code"]).distinct()
city_spark = city_spark.withColumn("city_name", get_city_name(city_spark["city_code"])) \
    .withColumn("city_id", monotonically_increasing_id())

city_spark = city_spark.join(uscities_demographic_spark, (city_spark["city_name"] == uscities_demographic_spark["city_updated"]), "left") \
    .select("city_id", "city_code", "city_name",
           uscities_demographic_spark["state"].alias("city_state"),
           uscities_demographic_spark["state_code"].alias("city_state_code")) \
    .distinct()

In [None]:
city_spark.write.mode("overwrite").parquet(
    f"clean_data/city")

In [None]:
print("Number of partitions: ", city_spark.rdd.getNumPartitions())
print("Number of records: ", city_spark.rdd.count())
city_spark.printSchema()
city_spark.limit(5).toPandas()

##### **Create us cities demographics dimension table**

In [None]:
uscities_demographic_spark = uscities_demographic_spark.join(city_spark, (uscities_demographic_spark["city_updated"] == city_spark["city_name"]), "left") \
    .distinct() \
    .select("city_id", "median_age", "male_population",
        "female_population", "total_population", "number_of_veterans",
        "foreign_born", "average_household_size", "race", "count") \
    .withColumn("demographic_id", monotonically_increasing_id())

In [None]:
uscities_demographic_spark.write.mode("overwrite").parquet(
    f"clean_data/uscities_demographic")

In [None]:
print("Number of partitions: ", uscities_demographic_spark.rdd.getNumPartitions())
print("Number of records: ", uscities_demographic_spark.rdd.count())
uscities_demographic_spark.printSchema()
uscities_demographic_spark.limit(5).toPandas()

##### **Create i94 immigration fact table**

In [None]:
i94_immigration_spark = i94_immigration_spark.withColumn("immigration_id", monotonically_increasing_id())
i94_immigration_spark = i94_immigration_spark.join(country_spark, (i94_immigration_spark["country_code"] == country_spark["country_code"]), "left").distinct() \
    .join(port_spark, (i94_immigration_spark["port_code"] == port_spark["port_code"]), "left").distinct() \
    .join(city_spark, (i94_immigration_spark["city_code"] == city_spark["city_code"]), "left").distinct() \
    .select("immigration_id", "arrival_date", "departure_date", "year", 
            "month", "age", "birth_year", "country_id", "city_id", 
            "port_id", "mode", "visa_category") \
    .distinct()

In [None]:
i94_immigration_spark.write.mode("overwrite").parquet(
    f"clean_data/i94_immigration")

In [None]:
print("Number of partitions: ", i94_immigration_spark.rdd.getNumPartitions())
print("Number of records: ", i94_immigration_spark.rdd.count())
i94_immigration_spark.printSchema()
i94_immigration_spark.limit(5).toPandas()

#### 4.2. Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
def quality_check(df, df_name):
    """
    Check count on fact and dimension table
    params:
        - df: dataframe to check count on
        - df_name: name of dataframe
    """
    count = df.count()
    if count == 0:
        print(f"Data quality check failed for {df_name} table with zero record")
    else:
        print(f"Data quality check passed for {df_name} table with {count} records")
    return 0

In [None]:
df_dict = {
    "i94 immigration fact": i94_immigration_spark,
    "us cities demographics dimension": uscities_demographic_spark,
    "country dimension": country_spark,
    "port dimension": port_spark,
    "city dimension": city_spark,
    "arrival date dimension": arrival_date_spark
}
for df_name, df in df_dict.items():
    quality_check(df, df_name)

#### 4.3. Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from.

##### **i94 immigration fact table**
- immigration_id: long (nullable = false): Immigration ID
- city_id: long (nullable = true): City ID
- country_id: long (nullable = true): Country ID
- port_id: long (nullable = true): Port ID
- year: integer (nullable = true): 4 digits year
- month: integer (nullable = true): Numeric month
- age: integer (nullable = true): Age of respondent in years
- birth_year: integer (nullable = true): 4 digits year of birth
- mode: string (nullable = true): Mode of transportation
- visa_category: string (nullable = true): Visa codes collapsed into three categories
- arrival_date: date (nullable = true): Arrival date in the USA
- departure_date: date (nullable = true): Departure date from the USA

##### **us cities demographics dimension**
- demographic_id: long (nullable = false): Demographic ID
- city_id: long (nullable = true): City ID
- median_age: double (nullable = true): Median age of the population
- male_population: integer (nullable = true): Count of male population
- female_population: integer (nullable = true): Count of female population
- total_population: integer (nullable = true): Count of total population
- number_of_veterans: integer (nullable = true): Count of total Veterans
- foreign_born: integer (nullable = true): Count of foreign residents
- average_household_size: double (nullable = true): Average city household size
- race: string (nullable = true): Respondent race
- count: integer (nullable = true): Count of city's individual per race

##### **city dimension table**
- city_id: long (nullable = false): City ID
- city_code: string (nullable = true): City code
- city_name: string (nullable = true): City name
- city_state: string (nullable = true): US state where city is located
- city_state_code: string (nullable = true): US state code where city is located

##### **port dimension table**
- port_id: long (nullable = false): Port of admission ID
- port_code: string (nullable = true): Port code
- port_name: string (nullable = true): port name

##### **country dimension table**
- country_id: long (nullable = false): country ID
- country_code: integer (nullable = true): Country code
- country_name: string (nullable = true): Country name

##### **arrival date dimension table**
- date_id: long (nullable = false): Date ID
- date: date (nullable = true): Arrival date in the USA
- year: integer (nullable = true): Arrival year
- month: integer (nullable = true): Arrival month
- day: integer (nullable = true): Arrival day of month
- week: integer (nullable = true): Arrival week
- day_of_week: integer (nullable = true): Arrival day of week

### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
 * Apache Spark was chosen because it is capable of processing large amounts of data in various file formats, offers a fast unified analytics engine for big data, and has user-friendly APIs for working with large datasets.
* Propose how often the data should be updated and why.
    1. The tables created from the immigration and temperature data sets should be updated on a monthly basis, as the raw data is collected monthly.
    2. The tables created from the demography data set can be updated on an annual basis, as demography data collection takes time and frequent updates may be costly and could lead to incorrect conclusions.
    3. All tables should be updated in an append-only manner.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
      * If the standalone server mode of Spark is unable to handle a data set that is 100 times larger, it may be necessary to use a distributed data cluster such as AWS EMR (Amazon Web Services Elastic MapReduce) to process the data on the cloud
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     * Apache Airflow can be utilized to create an ETL (extract, transform, load) data pipeline that regularly updates the data and generates a report. It also has strong integration with Python and AWS, and can be used in conjunction with other tools to provide more advanced task automation capabilities.
 * The database needed to be accessed by 100+ people.
     * AWS Redshift has the capacity to support up to 500 connections. If this database will be accessed by over 100 people, it may be a good idea to consider moving it to Redshift to ensure it can handle the workload. A cost-benefit analysis should be conducted before implementing this cloud solution.