# US immigration analysis
### Data Engineering Capstone Project

#### Project Summary
Starting from a few different data sources I want to create a database where you can answers question about US immigration statistics.
I will use I94 immigration dataset, a tataset containing temperature data from Kaggle and a US city demographic dataset from OpenSoft.
I will import all data into Spark, clean it and prepare it for the destination database where you as a Data Analyst could answer your questions.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
!pip install -q pyjanitor

In [2]:
# Do all imports and installs here
import pandas as pd
import os
from pyspark.sql import SparkSession, GroupedData
from pyspark.sql.functions import udf, col, year, month, dayofmonth
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType, DateType, DoubleType
import datetime
import janitor.spark

import i94_labels

In [3]:
# Build spark session
spark = SparkSession.builder \
.config("spark.jars.packages","saurfang:spark-sas7bdat:2.1.0-s_2.11,org.postgresql:postgresql:9.4.1209") \
.enableHiveSupport() \
.getOrCreate()

In [4]:
# Prepare destination database params
# For testing purposes I will use a local postgress database

pg_mode = "overwrite"
pg_url = "jdbc:postgresql://postgres:5432/airflow"
pg_properties = {"user": "airflow","password": "airflow","driver": "org.postgresql.Driver"}

### Step 1: Scope the Project and Gather Data

#### Scope 
As a data engineer I need to prepare a database for our companies data analysts.
They have a series of questions related to US immigration statistics.
I will start by exploring the datasources, identify the columns, clean the data.
Then I will structure the staging tables into our Fact and Dimmensions tables so the data is more accessible to business users questions.
I will use Spark for this process and store the data into a Postgress database.

#### Describe and Gather Data 
- **I94 Immigration Data**: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
- **World Temperature Data**: This dataset came from Kaggle. You can read more about it here.
- **U.S. City Demographic Data**: This data comes from OpenSoft. You can read more about it here.

In [111]:
data_folder = 'data'
demographics_path = os.path.join(data_folder, "us-cities-demographics.csv")
airport_codes_path = os.path.join(data_folder, "airport-codes_csv.csv")
world_temperature_path = os.path.join(data_folder, "GlobalLandTemperaturesByState.csv")

immigration_path = 'sas_data'
valid_ports_codes = [x.upper() for x in list(i94_labels.i94prtl.keys())]
valid_ports = i94_labels.i94prtl
valid_states = i94_labels.i94addrl

Read data from sources

In [6]:
demographics = spark.read.format("csv") \
    .option("header", "true") \
    .option("delimiter", ";") \
    .load(demographics_path)

In [7]:
world_temperatures = spark.read.format("csv") \
    .option("header", "true") \
    .load(world_temperature_path)

In [8]:
# get data from raw source or parsed parquet format
if os.path.exists(immigration_path):
    immigration_data = spark.read.parquet(immigration_path)
else:
    immigration_data = spark.read.format('com.github.saurfang.sas.spark') \
        .load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
    immigration_data.write.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [9]:
# Performing cleaning tasks here
print('Demographic count: %s' % demographics.count())
print('World Temperatures count: %s' % world_temperatures.count())
print('Immigration Data count: %s' % immigration_data.count())

Demographic count: 2891
World Temperatures count: 645675
Immigration Data count: 2421760


In [10]:
#Build a valid state df
valid_state_codes = list(demographics.select("State Code").distinct().toPandas()['State Code'])
print('There are %s valid states.' % len(valid_state_codes))

There are 49 valid states.


In [11]:
print('There are %s ports.' % len(valid_ports_codes))

There are 660 ports.


### Issues
- must drop rows without: port, state_code and gender
- states codes must be validated
- drop rows from other countries than us
- dates are in SAS format

#### Cleaning Steps
Document steps necessary to clean the data

In [12]:
# Convert SAS date to isoformat
# SAS time starts from January 1st 1960
def sas_to_isoformat(sas_time):
    if sas_time:
        return (datetime.datetime(1960, 1, 1).date() + datetime.timedelta(int(sas_time))).isoformat()
    return None

sas_to_isoformat_udf = udf(sas_to_isoformat, StringType())

In [13]:
# Convert mdy date to isoformat
def mdy_to_isoformat(mdy_date):
    if mdy_date and len(mdy_date) == 8:
        return (datetime.datetime.strptime(mdy_date, "%m%d%Y").date()).isoformat()
    return None

mdy_to_isoformat_udf = udf(mdy_to_isoformat, StringType())

In [14]:
immigration_data.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

#### Clean immigration data

In [15]:
# drop 
clean_immigration_data = immigration_data.dropna(how="any", subset=["i94port", "i94addr", "gender"])

# check states
clean_immigration_data = clean_immigration_data.filter(~clean_immigration_data.i94addr.isin(valid_state_codes))

# convert sas dates
clean_immigration_data = clean_immigration_data.withColumn("arrdate", sas_to_isoformat_udf(clean_immigration_data.arrdate).cast(DateType()))
clean_immigration_data = clean_immigration_data.withColumn("depdate", sas_to_isoformat_udf(clean_immigration_data.depdate).cast(DateType()))
clean_immigration_data = clean_immigration_data.withColumn("dtaddto", mdy_to_isoformat_udf(clean_immigration_data.dtaddto).cast(DateType()))

# remote other states
clean_immigration_data = clean_immigration_data.filter(clean_immigration_data.i94addr != 'other')

# rename fields
staging_immigration_data = clean_immigration_data.select(
        col("cicid").alias("id").cast(IntegerType()), 
        col("i94yr").alias("year").cast(IntegerType()),
        col("i94mon").alias("month").cast(IntegerType()),
        col("arrdate").alias("arrival_date"),
        col("depdate").alias("depart_date"),
        col("dtaddto").alias("allowed_date"),
        col("i94port").alias("city_code"),
        col("i94addr").alias("state_code"),
        col("i94bir").alias("age").cast(IntegerType()),
        col("gender").alias("gender"),
        col("i94visa").alias("visa_type").cast(IntegerType())
    ).drop_duplicates()

Checkout immigration sample interval

In [16]:
staging_immigration_data.agg(F.min("arrival_date"), F.max("arrival_date")).toPandas()

Unnamed: 0,min(arrival_date),max(arrival_date)
0,2016-04-03,2016-04-28


#### Clean demographics data

In [17]:
demographics.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [112]:
# Convert column names to lower & underscore
clean_demographics = demographics.clean_names()

In [113]:
def city_to_port_code(city):
    for key, city_name in valid_ports.items():
        if city.upper() in city_name.upper():
            return key

city_to_port_code_udf = udf(city_to_port_code, StringType())

In [114]:
clean_demographics = clean_demographics.withColumn("city_code", city_to_port_code_udf(clean_demographics.city))

In [115]:
clean_demographics = clean_demographics.dropna(how='any', subset=["city_code", "state", "race"])

In [116]:
clean_demographics = clean_demographics \
    .withColumn("median_age",col("median_age").cast(DoubleType())) \
    .withColumn("male_population",col("male_population").cast(IntegerType())) \
    .withColumn("female_population",col("female_population").cast(IntegerType())) \
    .withColumn("total_population",col("total_population").cast(IntegerType())) \
    .withColumn("number_of_veterans",col("number_of_veterans").cast(IntegerType())) \
    .withColumn("foreign_born",col("foreign_born").cast(IntegerType())) \
    .withColumn("average_household_size",col("average_household_size").cast(DoubleType())) \
    .withColumn("count",col("count").cast(IntegerType()))

In [118]:
clean_demographics.limit(10).toPandas()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,race,count,city_code
0,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402,NEW
1,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL,American Indian and Alaska Native,1343,PIA
2,Philadelphia,Pennsylvania,34.1,741270,826172,1567442,61995,205339,2.61,PA,Asian,122721,PHI
3,Fort Myers,Florida,37.3,36850,37165,74015,4312,15365,2.45,FL,White,50169,FMY
4,Laredo,Texas,28.8,124305,131484,255789,4921,68427,3.66,TX,American Indian and Alaska Native,1253,LCB
5,Allen,Pennsylvania,33.5,60626,59581,120207,5691,19652,2.67,PA,Black or African-American,22304,MCA
6,New Haven,Connecticut,29.9,63765,66545,130310,2567,25871,2.48,CT,American Indian and Alaska Native,2205,NWH
7,Salt Lake City,Utah,32.1,98364,94296,192660,6829,32166,2.38,UT,Asian,13153,SLC
8,Suffolk,Virginia,38.2,43048,45113,88161,10114,2829,2.72,VA,Black or African-American,39107,FOK
9,Los Angeles,California,35.0,1958998,2012898,3971896,85417,1485425,2.86,CA,White,2177650,LOS


In [119]:
clean_demographics.count()

883

#### Clean temperature data

- Temperatures are not per day are actually averages per month.
- We don't have Immigran's origin country so we will keep only temperatures from the US
- We will remove any row without state or average_temperature
- We will keep only more recent data because we don't have older immigration data

In [24]:
world_temperatures.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,State,Country
0,1855-05-01,25.544,1.171,Acre,Brazil
1,1855-06-01,24.228,1.103,Acre,Brazil
2,1855-07-01,24.371,1.044,Acre,Brazil
3,1855-08-01,25.427,1.073,Acre,Brazil
4,1855-09-01,25.675,1.014,Acre,Brazil


In [25]:
def get_state_code(state):
    for key, s in valid_states.items():
        print(state)
        if state.upper() == s.upper():
            return key
    return None

get_state_code_udf = udf(get_state_code, StringType())

In [26]:
clean_world_temperatures = world_temperatures.clean_names(case_type='snake')

Check world temperatures sample interval. 
My data sample doesn't have overlaping intervals

In [27]:
clean_world_temperatures.agg(F.min("dt"), F.max("dt")).toPandas()

Unnamed: 0,min(dt),max(dt)
0,1743-11-01,2013-09-01


In [28]:
clean_temperatures = clean_world_temperatures.filter(
        (clean_world_temperatures.country == 'United States') &
        (clean_world_temperatures.dt > '2000-01-01')
    )

In [29]:
clean_temperatures = clean_temperatures \
    .withColumn('year', year(clean_world_temperatures.dt).cast(IntegerType())) \
    .withColumn('month', month(clean_world_temperatures.dt).cast(IntegerType()))

In [30]:
clean_temperatures = clean_temperatures.withColumn("state_code", get_state_code_udf(clean_world_temperatures["state"]))

In [31]:
clean_temperatures = clean_temperatures.dropna(how='any', subset=["state_code", "average_temperature"]).drop_duplicates()

In [32]:
clean_temperatures = clean_temperatures.select(
        col("state"), 
        col("state_code"),
        col("year"), 
        col("month"),
        col("average_temperature").cast(DoubleType()),
        col("average_temperature_uncertainty").cast(DoubleType()),
    )

In [33]:
clean_temperatures.count()

7050

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### Staging tables

https://dbdiagram.io/d/5ffafa2580d742080a35b116

![Staging Tables](media/staging.png "Staging Tables")

#### Dimension and Facts tables

https://dbdiagram.io/d/5ffae4d980d742080a35af8d

![Star Schema](media/star_schema.png "Star Schema.")

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model
- Made a cleanup for each Staging tables
- Will select all fields needed for Dimensions tables
- Will select all fields needed for the Facts tables
- Drop duplicates

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [126]:
immigrants_df = staging_immigration_data.select("id", "gender", "age").drop_duplicates()

In [127]:
immigrants_df.count()

31226

In [120]:
cities_df = clean_demographics.select(
        "city_code", "state_code", "city", "state", "median_age",
        "male_population", "female_population", "total_population", 
        "number_of_veterans", "foreign_born", "average_household_size", "race"
    ).drop_duplicates()

In [121]:
cities_df.count()

883

In [38]:
visits_df = staging_immigration_data.select("id", "arrival_date", "depart_date", "allowed_date", "state_code", "city_code", "visa_type").drop_duplicates()

In [39]:
visits_df.count()

31226

In [40]:
temperatures_df = clean_temperatures

In [41]:
temperatures_df.count()

7050

Write all data to a Postgress database

In [42]:
immigrants_df.write.mode("overwrite").jdbc(url=pg_url, table="immigrants", mode=pg_mode, properties=pg_properties)

In [43]:
cities_df.write.mode("overwrite").jdbc(url=pg_url, table="cities", mode=pg_mode, properties=pg_properties)

In [44]:
visits_df.write.mode("overwrite").jdbc(url=pg_url, table="visits", mode=pg_mode, properties=pg_properties)

In [45]:
temperatures_df.write.mode("overwrite").jdbc(url=pg_url, table="temperatures", mode=pg_mode, properties=pg_properties)

DataFrame[id: int, gender: string, age: int]

#### 4.2 Data Quality Checks
I will check the saved integrity
- Dataframe counts should be the same as what is saved in the database
- Some of the fields should be unique

Run Quality Checks

In [54]:
# Perform quality checks here
tables = ['immigrants', 'cities', 'visits', 'temperatures']

for table in tables:
    c = spark.read.jdbc(url=pg_url, table="temperatures", properties=pg_properties).count()
    if c == 0:
        print('Table %s is empty' % table)

In [76]:
# Check same counts between dataframes and database
assert( immigrants_df.count() == spark.read.jdbc(url=pg_url, table="immigrants", properties=pg_properties).count() )
assert( cities_df.count() == spark.read.jdbc(url=pg_url, table="cities", properties=pg_properties).count() )
assert( visits_df.count() == spark.read.jdbc(url=pg_url, table="visits", properties=pg_properties).count() )
assert( temperatures_df.count() == spark.read.jdbc(url=pg_url, table="temperatures", properties=pg_properties).count() )

In [96]:
assert( immigrants_df.select(F.countDistinct("id").alias('distinct_ids')).toPandas().distinct_ids[0] == immigrants_df.count() )

#### 4.3 Data dictionary 
- Immigrant
    - id
    - age
    - gender
    
- Cities
    - city_code
    - state_code
    - city_name
    - median_age
    - male_population
    - female_population
    - total_pop
    - number_of_veterans
    - foreign_born
    - average_household_size
    - race

- Temperatures
    - state
    - state_code
    - year
    - month
    - avg_temperature
    - average_temperature_uncertainty

- Visits - Facts
    - id
    - year
    - month
    - origin
    - state_code
    - city_code
    - date

#### Step 5: Complete Project Write Up
* For this demo I choose Spark because we can use the same interface to read formats, write to many different formats or databases. Also Spark can handle large amounts of data so when our datasets will grow we can use the same tool to do the job.
* The current example is made to be ran once and will overwrite the existing database.

* Other usecases:
 * The data was increased by 100x.
     - Spark will be able to handle this data but we will have to connect to a proper cluster, now it is used in local mode.
     - The output database (Postgress) will probably not work, but we can use a Redshift cluster for the same purpose and they are compatible
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     - In order to accomodate daily jobs we will need to make few changes so we can batch import and we will probably some orchestration tool like Airflow.
 * The database needed to be accessed by 100+ people.
     - For the data to be accessible to multiple users we will probably need a bigger Redshift cluster and we will need to see the database usecases in order to optimize queries.