# City Immigration, Demographics and Temperatures
### Data Engineering Capstone Project

#### Project Summary
This project aims to be able to answers questions on US immigration such as what are the most popular cities for immigration, what is the gender distribution of the immigrants, what is the visa type distribution of the immigrants, what is the average age per immigrant and what is the average temperature per month per city. We extract data from 3 different sources, the I94 immigration dataset of 2016, city temperature data from Kaggle and US city demographic data from OpenSoft. We design 4 dimension tables: Cities, immigrants, monthly average city temperature and time, and 1 fact table: Immigration. We use Spark for ETL jobs and store the results in parquet for downstream analysis.

In [None]:
# Do all imports and installs here
import pandas as pd
import os
import glob
import re
from datetime import datetime, timedelta
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, year, month, avg, round, dayofweek, weekofyear, isnull
from pyspark.sql.types import StringType, IntegerType

In [None]:
# Create Spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
The goal of this project is pull data from 3 different sources and create fact and dimension table to be able to do analysis on US immigration using factors of city monthly average temperature, city demographics and seasonality.

#### Describe and Gather Data 
1. **I94 Immigration Data**: comes from the U.S. National Tourism and Trade Office and contains various statistics on international visitor arrival in USA and comes from the US National Tourism and Trade Office. The dataset contains data from 2016. [link](https://travel.trade.gov/research/reports/i94/historical/2016.html)
2. **World Temperature Data**: comes from Kaggle and contains average weather temperatures by city. [link](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)
3. **U.S. City Demographic Data**: comes from OpenSoft and contains information about the demographics of all US cities such as average age, male and female population. [link](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

In [None]:
# Read immigration data
# Assumption for the project: in this project only the i94_apr16_sub.sas7bdat will be used, in order to all process all of the available files, simple use i94_files
i94_files = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
i94_fname = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
i94_df = spark.read.format("com.github.saurfang.sas.spark").load(i94_fname)

In [None]:
# Read temperature data
temperature_fname = "../../data2/GlobalLandTemperaturesByCity.csv"
temperature_df = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(temperature_fname)

In [None]:
# Read demographics data
demo_fname = "us-cities-demographics.csv"
demo_df = spark.read.format("csv").option("delimiter", ";").option("header", "true").load(demo_fname)

### Step 2: Explore and Assess the Data
##### Data Exploration & Cleaning

#### Immigration data

In [None]:
# Explore immigration data
i94_df.count()

In [None]:
i94_df.limit(5).toPandas()

In [None]:
# Create list of valid ports
i94_sas_label_descriptions_fname = "I94_SAS_Labels_Descriptions.SAS"
with open(i94_sas_label_descriptions_fname) as f:
    lines = f.readlines()

re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_ports = {}
for line in lines[302:961]:
    results = re_compiled.search(line)
    valid_ports[results.group(1)] = results.group(2)
print(len(valid_ports))
#pprint(valid_ports)

In [None]:
# Create list of valid states
valid_states = demo_df.toPandas()["State Code"].unique()
print(len(valid_states))
print(valid_states)

In [None]:
# Create udf to convert SAS date to PySpark date 
@udf(StringType())
def convert_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

In [None]:
# Create udf to validate state
@udf(StringType())
def validate_state(x):  
    if x in valid_states:
        return x
    return 'other'

In [None]:
# Clean immigration data

# Remove any missing values
cleaned_i94_df = i94_df.dropna(how="any", subset=["i94port", "i94addr", "gender"])

# Extract valid states 
cleaned_i94_df = cleaned_i94_df.withColumn("i94addr", validate_state(cleaned_i94_df.i94addr))

# Convert arrival_date (SAS format) to PySpark format
cleaned_i94_df = cleaned_i94_df.withColumn("arrdate", convert_datetime(cleaned_i94_df.arrdate))

# only keep us related immigration data
cleaned_i94_df = cleaned_i94_df.filter(cleaned_i94_df.i94addr != 'other')

staging_i94_df = cleaned_i94_df.select(col("cicid").alias("id"), 
                                       col("arrdate").alias("date"),
                                       col("i94port").alias("city_code"),
                                       col("i94addr").alias("state_code"),
                                       col("i94bir").alias("age"),
                                       col("gender").alias("gender"),
                                       col("i94visa").alias("visa_type"),
                                       "count").drop_duplicates()

staging_i94_df.limit(5).toPandas()

In [None]:
staging_i94_df.printSchema()

#### Temperature data

In [None]:
temperature_df.count()

In [None]:
temperature_df.limit(3).toPandas()

In [None]:
# Create udf to map city full name to city port abbreviation

@udf(StringType())
def city_to_port(city):
    for key in valid_ports:
        if city.lower() in valid_ports[key].lower():
            return key

In [None]:
# Clean temperature data

# Only use temperatures from United States
# Map full name to city port abbreviation
# Remove invalid ports
cleaned_temp_df = temperature_df.filter(temperature_df["Country"] == "United States") \
    .withColumn("year", year(temperature_df['dt'])) \
    .withColumn("month", month(temperature_df["dt"])) \
    .withColumn("i94port", city_to_port(temperature_df["City"])) \
    .withColumn("AverageTemperature", col("AverageTemperature").cast("float")) \
    .dropna(how='any', subset=["i94port"])

# Only use temperatures from 2013 (the latest year in the dataset)
cleaned_temp_df = cleaned_temp_df.filter(cleaned_temp_df["year"] == 2013)

staging_temp_df = cleaned_temp_df.select(col("year"), col("month"), col("i94port").alias("city_code"),
                                         round(col("AverageTemperature"), 1).alias("avg_temperature"),
                                         col("Latitude").alias("lat"), col("Longitude").alias("long")).drop_duplicates()

print(staging_temp_df.count())
staging_temp_df.limit(5).toPandas()

In [None]:
staging_temp_df.printSchema()

#### Demographics Data

In [None]:
demo_df.count()

In [None]:
demo_df.limit(5).toPandas()

In [None]:
# Clean demographics data

# Calculate percentages of numeric columns and create new ones
cleaned_demo_df = demo_df.withColumn("median_age", demo_df['Median Age']) \
    .withColumn("pct_male_pop", (demo_df['Male Population'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_female_pop", (demo_df['Female Population'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_veterans", (demo_df['Number of Veterans'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_foreign_born", (demo_df['Foreign-born'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_race", (demo_df['Count'] / demo_df['Total Population']) * 100) \
    .withColumn("city_code", city_to_port(demo_df["City"])) \
    .dropna(how='any', subset=["city_code"])

cleaned_demo_df = cleaned_demo_df.select(col("City").alias("city_name"), col("State Code").alias("state_code"), 
                         "median_age", "pct_male_pop", "pct_female_pop","pct_veterans", 
                         "pct_foreign_born", col("Total Population").alias("total_pop"), 
                         col("Race").alias("race"), "pct_race").drop_duplicates()

cleaned_demo_df.count()

In [None]:
# Pivot the race column
pivot_demo_df = cleaned_demo_df.groupBy("city_name", "state_code", "median_age", "pct_male_pop",
                                        "pct_female_pop","pct_veterans", "pct_foreign_born", "total_pop").pivot("Race").avg("pct_race")

pivot_demo_df = pivot_demo_df.withColumn("city_code", city_to_port(pivot_demo_df["city_name"])) \
    .dropna(how='any', subset=["city_code"])

staging_demo_df = pivot_demo_df.select("city_code", "state_code", "city_name", "median_age",
                                    round(col("pct_male_pop"), 1).alias("pct_male_pop"),
                                    round(col("pct_female_pop"), 1).alias("pct_female_pop"),
                                    round(col("pct_veterans"), 1).alias("pct_veterans"),
                                    round(col("pct_veterans"), 1).alias("pct_foreign_born"),
                                    round(col("American Indian and Alaska Native"), 1).alias("pct_native_american"),
                                    round(col("Asian"), 1).alias("pct_asian"),
                                    round(col("Black or African-American"), 1).alias("pct_black"),
                                    round(col("Hispanic or Latino"), 1).alias("pct_hispanic_or_latino"),
                                    round(col("White"), 1).alias("pct_white"), "total_pop")
print(staging_demo_df.count())
staging_demo_df.limit(10).toPandas()

In [None]:
staging_demo_df.printSchema()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The star schema is chosen as the data model because it is simple and yet effective. users can write simple queries by joing fact and dimension tables to analyze the data.

Here are the tables of the schema:

##### Staging Tables
```
staging_i94_df
    id
    date
    city_code
    state_code
    age
    gender
    visa_type
    count
    
staging_temp_df
    year
    month
    city_code
    city_name
    avg_temperature
    lat
    long
    
staging_demo_df
    city_code
    state_code
    city_name
    median_age
    pct_male_pop
    pct_female_pop
    pct_veterans
    pct_foreign_born
    pct_native_american
    pct_asian
    pct_black
    pct_hispanic_or_latino
    pct_white
    total_pop
```

##### Dimension Tables
```
immigrant_df
    id
    gender
    age
    visa_type
    
city_df
    city_code
    state_code
    city_name
    median_age
    pct_male_pop
    pct_female_pop
    pct_veterans
    pct_foreign_born
    pct_native_american
    pct_asian
    pct_black
    pct_hispanic_or_latino
    pct_white
    total_pop
    lat
    long
    
monthly_city_temp_df
    city_code
    year
    month
    avg_temperature
    
time_df
    date
    dayofweek
    weekofyear
    month
```

##### Fact Table
```
immigration_df
    id
    state_code
    city_code
    date
    count
```

#### 3.2 Mapping Out Data Pipelines
Steps necessary to pipeline the data into the chosen data model

1. Clean the data on nulls, data types, duplicates, etc
2. Load staging tables for staging_i94_df, staging_temp_df and staging_demo_df
3. Create dimension tables for immigrant_df, city_df, monthly_city_temp_df and time_df
4. Create fact table immigration_df with information on immigration count, mapping id in immigrant_df, city_code in city_df and monthly_city_temp_df and date in time_df ensuring referential integrity
5. Save processed dimension and fact tables in parquet for downstream query

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Create dimension table for immigrant

immigrant_df = staging_i94_df.select("id", "gender", "age", "visa_type").drop_duplicates()

In [None]:
immigrant_df.count()

In [None]:
immigrant_df.limit(5).toPandas()

In [None]:
# Create dimension table for city

city_df = staging_demo_df.join(staging_temp_df, "city_code") \
    .select("city_code", "state_code", "city_name", "median_age", "pct_male_pop", "pct_female_pop", "pct_veterans",
           "pct_foreign_born", "pct_native_american", "pct_asian", "pct_black",
           "pct_hispanic_or_latino", "pct_white", "total_pop", "lat", "long").drop_duplicates()

In [None]:
city_df.count()

In [None]:
city_df.limit(5).toPandas()

In [None]:
# Create dimension table for monthly city temperature

monthly_city_temp_df = staging_temp_df.select("city_code", "year", "month", "avg_temperature").drop_duplicates()

In [None]:
monthly_city_temp_df.count()

In [None]:
monthly_city_temp_df.limit(5).toPandas()

In [None]:
# Create dimension table for time

time_df = staging_i94_df.withColumn("dayofweek", dayofweek("date"))\
                .withColumn("weekofyear", weekofyear("date"))\
                .withColumn("month", month("date"))
                        
time_df = time_df.select("date", "dayofweek", "weekofyear", "month").drop_duplicates()

In [None]:
time_df.count()

In [None]:
time_df.limit(5).toPandas()

In [None]:
# Create fact table for immigration

immigration_df = staging_i94_df.select("id", "state_code", "city_code", "date", "count").drop_duplicates()

In [None]:
immigration_df.count()

In [None]:
immigration_df.limit(5).toPandas()

In [None]:
# Write dimension tables to parquet
immigrant_df.write.mode("overwrite").partitionBy("gender", "age").parquet("immigrants")
city_df.write.mode("overwrite").partitionBy("state_code").parquet("cities")
monthly_city_temp_df.write.mode("overwrite").parquet("monthly_city_temperatues")
time_df.write.mode("overwrite").parquet("time")

# Write fact table to parquet
immigration_df.write.mode("overwrite").partitionBy("state_code", "city_code").parquet("immigration")

#### 4.2 Data Quality Checks
 
Run Quality Checks

In [None]:
# Perform quality checks here

def table_exists(df):
    if df is not None:
        return True
    else:
        return False
        
if table_exists(immigrant_df) & table_exists(city_df) & table_exists(monthly_city_temp_df) & table_exists(time_df) & table_exists(immigration_df):
    print("data quality check passed")
    print("dimension tables and fact table exist")
    print()
else:
    print("data quality check failed")
    print("table missing...")

In [None]:
def table_not_empty(df):
    return df.count() != 0 

if table_not_empty(immigrant_df) & table_not_empty(city_df) & table_not_empty(monthly_city_temp_df) & table_not_empty(time_df) & table_not_empty(immigration_df):
    print("data quality check passed!")
    print("dimension tables and fact table contain records")
    print()
else:
    print("data quality check failed!")
    print("null records...")

#### 4.3 Data dictionary 
Brief description of what the data is and where it came from.

##### Dimension Tables
```
immigrant_df
    id: id of immigrant
    gender: gender of immigrant
    age: age of immigrant
    visa_type: immigrant's visa type
    
city_df
    city_code: city port code
    state_code: state code of the city
    city_name: name of the city
    median_age: median age of the city
    pct_male_pop: city's male population in percentage
    pct_female_pop: city's female population in percentage
    pct_veterans: city's veteran population in percentage
    pct_foreign_born: city's foreign born population in percentage
    pct_native_american: city's native american population in percentage
    pct_asian: city's asian population in percentage
    pct_black: city's black population in percentage
    pct_hispanic_or_latino: city's hispanic or latino population in percentage
    pct_white: city's white population in percentage
    total_pop: city's total population
    lat: latitude of the city
    long: longitude of the city
    
monthly_city_temp_df
    city_code: city port code
    year: year
    month: month 
    avg_temperature: average temperature in city for given month
    
time_df
    date: date
    dayofweek: day of the week
    weekofyear: week of year
    month: month
```

##### Fact Table
```
immigration_df
    id: id
    state_code: state code of arrival city
    city_code: city port code of arrival city
    date: date of arrival
    count: count of immigrant's entries into the US
```

#### Step 5: Complete Project Write Up
Spark is chosen for this project as it is known for processing large amount of data fast (with in-memory compute), scale easily with additional worker nodes, with ability to digest different data formats (e.g. SAS, Parquet, CSV), and integrate nicely with cloud storage like S3 and warehouse like Redshift.

The data update cycle is typically chosen on two criteria. One is the reporting cycle, the other is the availabilty of new data to be fed into the system. For example, if new batch of average temperature can be made available at monthly interval, we might settle for monthly data refreshing cycle.

There are also considerations in terms of scaling existing solution.
* **If the data was increased by 100x:**
We can consider spinning up larger instances of EC2s hosting Spark and/or additional Spark work nodes. With added capacity arising from either vertical scaling or horizontal scaling, we should be able to accelerate processing time.

* **If the data populates a dashboard that must be updated on a daily basis by 7am every day:**
We can consider using Airflow to schedule and automate the data pipeline jobs. Built-in retry and monitoring mechanism can enable us to meet user requirement.

* **If the database needed to be accessed by 100+ people:**
We can consider hosting our solution in production scale data warehouse in the cloud, with larger capacity to serve more users, and workload management to ensure equitable usage of resources across users.