# Project 08 - Analysis of U.S. Immigration (I-94) Data
### Udacity Data Engineer - Capstone Project
> by Peter Wissel | 2021-05-05

## Project Overview
This project works with a data set for immigration to the United States. The supplementary datasets will include data on
airport codes, U.S. city demographics and temperature data.

The following process is divided into five sub-steps to illustrate how to answer the questions set by the business
analytics team.

The project file follows the following steps:
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data


##### 3.1.1. From which country do immigrants come to the U.S. and how many? [(Data pipeline)](#question1_data_pipeline) <a name="question1_description">
1. Clean data and create staging table `st_i94_immigration` from files `i94_<month>16_sub.sas7bdat`
2. Clean data and create staging table `st_immigration_countries` from file
   [`I94_SAS_Labels_I94CIT_I94RES.txt`](../P8_capstone_resource_files/I94_sas_labels_descriptions_extracted_data/I94_SAS_Labels_I94CIT_I94RES.txt)
3. Creation of a fact table named `f_i94_immigration` based on staging table `st_i94_immigration`.
4. Creation of a dimension named `d_immigration_countries` based on staging table `st_immigration_countries`.
5. Mapping of dimension `d_immigration_countries` to  fact table `f_i94_immigration` based on columns
   (`st_i94_immigration.st_i94_cit` --> `f_i94_immigration.d_ic_id`) == (`st_immigration_countries.st_ic_country_code`
   --> `d_immigration_countries.d_ic_id` )
6. Answer Project Question 1: From which country do immigrants come to the U.S. and how many?

**NOTE:** The three columns `st_i94_port_iso`, `st_i94_port_state_code` and `st_i94_port_city` will be inserted after
creation of the staging table `st_immigration_airports` within the next step.


##### 4.1.1. From which country do immigrants come to the U.S. and how many? [(Description)](#question1_description) <a name="question1_data_pipeline">

1. Clean data and create staging table `st_i94_immigration` from files `i94_<month>16_sub.sas7bdat`

##### Convert SAS data into spark parquet files as 1st staging step #####

In [None]:
###### Imports and Installs section
import shutil
import pandas as pd
import pyspark.sql.functions as F
# import spark as spark
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, LongType, TimestampType, DateType
from datetime import datetime, timedelta
from pyspark.sql import SparkSession, DataFrameNaFunctions
from pyspark.sql.functions import when, count, col, to_date, datediff, date_format, month
import re
import json
from os import path


MAX_MEMORY = "5g"

spark = SparkSession\
    .builder\
    .appName("etl pipeline for project 8 - I94 data") \
    .config("spark.jars.packages","saurfang:spark-sas7bdat:3.0.0-s_2.12")\
    .config('spark.sql.repl.eagerEval.enabled', True) \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .appName("Foo") \
    .enableHiveSupport()\
    .getOrCreate()

# setting the current LOG-Level
spark.sparkContext.setLogLevel('ERROR')

In [2]:
# original path in Udacity workspace
#df_spark =spark.read.format('com.github.saurfang.sas.spark')
# .load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

# The SAS files (e.g. i94_apr16_sub.sas7bdat) are partitioned by month. The for loop extracts each file and stores it
# partitioned by month in parquet format.

months_abbreviation = ["jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec"]

for current_month in months_abbreviation:
    month_abbreviation = current_month

    filepath_i94 = f"../P8_capstone_resource_files/immigration_data/18-83510-I94-Data-2016/" \
                   f"i94_{month_abbreviation}16_sub.sas7bdat"
    print(filepath_i94)

    # load current month
    df_spark_i94 = spark\
        .read\
        .format('com.github.saurfang.sas.spark')\
        .load(filepath_i94)

    """
    Note: optionally load conditions:
            .load(filepath_i94,
                  forceLowercaseNames=True,
                  inferLong=True)
    """

    # write data and append all month to the same parquet result set
    location_to_write = "../P8_capstone_resource_files/parquet_raw/i94_sas_data"

    # write data frame as parquet file (ca. 815 MB)
    df_spark_i94 \
        .repartition(int(1)) \
        .write \
        .mode(saveMode='append') \
        .partitionBy('i94mon') \
        .parquet(location_to_write, compression="gzip")


../P8_capstone_resource_files/immigration_data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat
../P8_capstone_resource_files/immigration_data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat
../P8_capstone_resource_files/immigration_data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat
../P8_capstone_resource_files/immigration_data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat
../P8_capstone_resource_files/immigration_data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat
../P8_capstone_resource_files/immigration_data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat
../P8_capstone_resource_files/immigration_data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat
../P8_capstone_resource_files/immigration_data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat
../P8_capstone_resource_files/immigration_data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat
../P8_capstone_resource_files/immigration_data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat
../P8_capstone_resource_files/immigration_data/18-83510-I94-Data-2016/

##### Optional write methods (.csv & .csv.gz)

In [3]:
    location_to_write = "../P8_capstone_resource_files/parquet_raw/i94_data.csv"

    # delete folder if already exists
    if path.exists(location_to_write):
        shutil.rmtree(location_to_write)

    # write data frame as uncompressed CSV file (approx. 5,9 GB)
    df_spark_i94\
        .coalesce(1)\
        .write\
        .option("header", "true")\
        .csv(location_to_write)

In [4]:
    location_to_write = "../P8_capstone_resource_files/parquet_raw/i94_data.csv.gz"

    # delete folder if already exists
    if path.exists(location_to_write):
        shutil.rmtree(location_to_write)

    # write data frame as compressed CSV file (approx. 885 MB)
    df_spark_i94\
        .coalesce(1)\
        .write\
        .option("header", "true")\
        .option("codec", "org.apache.hadoop.io.compress.GzipCodec")\
        .csv(location_to_write)

##### Check written data frame

In [5]:
# Read written data frame back into memory
df_spark_i94 = spark.read.parquet("../P8_capstone_resource_files/parquet_raw/i94_sas_data")

# read only three month of data
#df_spark_i94 = spark.read.parquet("../P8_capstone_resource_files/parquet/i94_sas_data/i94mon=12.0")
"""
df_spark_i94 = spark.read\
    .parquet("../P8_capstone_resource_files/parquet_raw/i94_sas_data/i94mon=4.0",
             "../P8_capstone_resource_files/parquet_raw/i94_sas_data/i94mon=5.0",
             "../P8_capstone_resource_files/parquet_raw/i94_sas_data/i94mon=6.0")


df_spark_i94 = spark.read\
    .parquet("../P8_capstone_resource_files/parquet_raw/i94_sas_data/i94mon=1.0",
             "../P8_capstone_resource_files/parquet_raw/i94_sas_data/i94mon=2.0",
             "../P8_capstone_resource_files/parquet_raw/i94_sas_data/i94mon=3.0")
"""

'\ndf_spark_i94 = spark.read    .parquet("../P8_capstone_resource_files/parquet_raw/i94_sas_data/i94mon=4.0",\n             "../P8_capstone_resource_files/parquet_raw/i94_sas_data/i94mon=5.0",\n             "../P8_capstone_resource_files/parquet_raw/i94_sas_data/i94mon=6.0")\n\n\ndf_spark_i94 = spark.read    .parquet("../P8_capstone_resource_files/parquet_raw/i94_sas_data/i94mon=1.0",\n             "../P8_capstone_resource_files/parquet_raw/i94_sas_data/i94mon=2.0",\n             "../P8_capstone_resource_files/parquet_raw/i94_sas_data/i94mon=3.0")\n'

In [6]:
# Get lines of data from data frame
df_spark_i94.count()

40790529

In [7]:
# check current df Schema
df_spark_i94.printSchema()


root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = t

In [8]:
# Show Summary statistics. Attention: This could take very long to compute!
df_spark_i94.describe()

summary,cicid,i94yr,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,i94mon
count,40790529.0,40790529.0,40761954.0,40790529.0,40790529,40790529.0,40716580.0,38762603,37482517.0,40781012.0,40790529.0,40790529.0,40659479.0,16758354,192955,40788125,37502620,13206,37570948,40781012.0,40688978,36710546,5112434,39482463,40790529.0,40456607,40790529,40790529.0
mean,3293392.166653808,2016.0,312.72151028382984,310.87080760830537,,20643.13001496009,1.0767158243644237,76.87271186440678,20659.627880833083,39.53764980133401,1.901888793842316,1.0,20160697.567812737,Infinity,886.3284132841328,,,,,1976.462350198666,6759393.900854463,,64956.88453638432,51.10082764263461,45973737501.22861,1562.5950372058192,,6.730874561592471
stddev,1943430.3813472984,0.0,209.28597607303516,207.86066101830448,,100.98698880844428,0.4675023948362171,36.626221057276055,116.65803017373672,17.810250881773467,0.4069698211916488,0.0,327.44246882105733,,181.51721193873058,,,,,17.810250881775286,3592018.915138121,,118503.14943524264,155.794790882693,31770407636.65807,7077.25392767434,,3.2988930890066643
min,1.0,2016.0,0.0,101.0,48Y,20454.0,0.0,**,-14388.0,-3.0,1.0,1.0,20081124.0,999,010,A,D,P,M,204.0,-00-0000,F,',*FF,0.0,-,B1,1.0
max,7667577.0,2016.0,999.0,760.0,YUM,20819.0,9.0,ZX,48342.0,1812.0,3.0,1.0,20170609.0,ZZZ,WTR,Z,Z,Y,M,2019.0,`1132017,X,`930,ZZ,99999998130.0,y,WT,12.0


In [None]:
# Check if the Conversion step worked as expected
# !!! Keep in mind: Setup virtual environment path: ..../Project_8_Data_Engineering_Capstone_Project/venv/bin/python3
# because we use UDF --> python code !!!!!!!!!!!!!!!!!!

In [9]:
# Preparation to get an enumeration of all elements within the data frame and a
# UDF to convert a SAS date (Integer Format) into a DateType() format.

from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))

# register UDF function to calculate a DateType from given SAS date format
getDateFomSASDate = F.udf(lambda y: get_date_from_sas_date(y), DateType())
spark.udf.register("getDateFomSASDate", getDateFomSASDate)

# Function to convert a SAS date into a DateType
"""
Convert SAS date into a DateType value. If sas_date == 0 then choose the default value 1960-01-01.
"""
def get_date_from_sas_date(sas_date):
    sas_date_int = int(sas_date)
    if sas_date_int > 0:
        return datetime(1960, 1, 1) + timedelta(days=sas_date_int)
    else:
        return datetime(1900, 1, 1)

In [10]:
## Transformation of the originally stored data from files `i94_<month>16_sub.sas7bdat`
# read parquet file
# fill up null values
# convert data into new columns
# select only needed columns

df_st_i94_immigrations = spark\
    .read\
    .parquet("../P8_capstone_resource_files/parquet_raw/i94_sas_data")\
    .fillna(value=0.0 ,subset=['i94cit'])\
    .fillna(value='99', subset=['i94addr'])\
    .fillna(value=0.0, subset=['depdate'])\
    .fillna(value='99991231', subset=['dtadfile'])\
    .fillna(value='NA', subset=['matflag'])\
    .withColumn("st_i94_cit", F.round("i94cit", 0).cast(IntegerType()))\
    .withColumn("st_i94_port", col("i94port"))\
    .withColumn("st_i94_addr", col("i94addr"))\
    .withColumn("st_i94_arrdate", F.round("arrdate").cast(IntegerType()))\
    .withColumn("st_i94_arrdate_iso", getDateFomSASDate("arrdate"))\
    .withColumn("st_i94_depdate", F.round("depdate").cast(IntegerType()))\
    .withColumn("st_i94_depdate_iso", getDateFomSASDate("depdate"))\
    .withColumn('st_i94_dtadfile', to_date('dtadfile','yyyyMMdd')) \
    .withColumn("st_i94_matflag", col("matflag"))\
    .withColumn("st_i94_count", F.round("count", 0).cast(IntegerType()))\
    .withColumn("st_i94_year", col("i94yr").cast(IntegerType()))\
    .withColumn("st_i94_month", col("i94mon").cast(IntegerType())) \
    .select(
              "st_i94_cit",
              "st_i94_port",
              "st_i94_addr",
              "st_i94_arrdate",
              "st_i94_arrdate_iso",
              "st_i94_depdate",
              "st_i94_depdate_iso",
              "st_i94_dtadfile",
              "st_i94_matflag",
              "st_i94_count",
              "st_i94_year",
              "st_i94_month" )

In [11]:
# compare the counts of the full dataset
# Count of rows         : 40.790.529
# Count of distinct rows: 12.228.839
print('Count of rows: {0}'.format(df_st_i94_immigrations.count()))
print('Count of distinct rows: {0}'.format(df_st_i94_immigrations.distinct().count()))

Count of rows: 40790529
Count of distinct rows: 12228839


In [12]:
# clean up complete identical rows. Only do this if the results from the step above are not identical!
df_st_i94_immigrations = df_st_i94_immigrations.drop_duplicates()

In [13]:
# compare the counts of the full dataset again
# Count of rows         : 12.228.839
# Count of distinct rows: 12.228.839
print('Count of rows: {0}'.format(df_st_i94_immigrations.count()))
print('Count of distinct rows: {0}'.format(df_st_i94_immigrations.distinct().count()))

Count of rows: 12228839
Count of distinct rows: 12228839


In [14]:
# After dropping duplicates we create for each row a unique ID.
# The F.row_number().over(w)) method gives each record a unique and increasing ID and starts with 1.
# The F.monotonicallymonotonically_increasing_id() method gives each record a unique and increasing ID and starts with 0.
df_st_i94_immigrations = df_st_i94_immigrations\
    .sort("st_i94_year", "st_i94_month", "st_i94_cit") \
    .withColumn("st_i94_id",  F.row_number().over(w))
#    .withColumn('st_i94_id_new', F.monotonically_increasing_id())

In [15]:
df_st_i94_immigrations.show()

+----------+-----------+-----------+--------------+------------------+--------------+------------------+---------------+--------------+------------+-----------+------------+---------+
|st_i94_cit|st_i94_port|st_i94_addr|st_i94_arrdate|st_i94_arrdate_iso|st_i94_depdate|st_i94_depdate_iso|st_i94_dtadfile|st_i94_matflag|st_i94_count|st_i94_year|st_i94_month|st_i94_id|
+----------+-----------+-----------+--------------+------------------+--------------+------------------+---------------+--------------+------------+-----------+------------+---------+
|       101|        SYS|         99|         20464|        2016-01-11|         20465|        2016-01-12|     2016-01-11|             M|           1|       2016|           1|        1|
|       516|        MIA|         MD|         20814|        2016-12-26|         20832|        2017-01-13|     2016-12-26|             M|           1|       2016|          12|        2|
|       101|        WAS|         VA|         20470|        2016-01-17|         2

In [None]:
# compare the counts of the full dataset again
# Count of rows         : 12.228.839
# Count of distinct rows: 12.228.839
print('Count of rows: {0}'.format(df_st_i94_immigrations.count()))
print('Count of distinct rows: {0}'.format(df_st_i94_immigrations.distinct().count()))

In [16]:
# *** OPTIONAL 1 ***
# Let's check whether there are any duplicates in the data irrespective of `st_i94_id`.
# Only columns other than the `st_i94_id` column:

print('Count of ids: {0}'.format(df_st_i94_immigrations.count()))
print('Count of distinct ids: {0}'.format(
    df_st_i94_immigrations.select( [
        c for c in df_st_i94_immigrations.columns if c != 'st_i94_id'
    ])
        .distinct()
        .count())
    )

Count of ids: 12228839
Count of distinct ids: 12228839


In [17]:
# *** OPTIONAL 2 ***
# clean up if found duplicate rows irrespective of 'st_i94_id'
df_st_i94_immigrations = df_st_i94_immigrations.dropDuplicates(subset=[
c for c in df_st_i94_immigrations.columns if c != 'st_i94_id'
])

In [18]:
# Avoid duplicates in ID column `st_i94_id`
df_st_i94_immigrations.agg(
    F.count('st_i94_id').alias('count'),
    F.countDistinct('st_i94_id').alias('distinct')
).show()

+--------+--------+
|   count|distinct|
+--------+--------+
|12228839|12228839|
+--------+--------+



In [19]:
# Check percentage of missing observations are there in each column:
df_st_i94_immigrations.agg(*[
(1 - (F.count(c) / F.count('*'))).alias(c + '_missing')
for c in df_st_i94_immigrations.columns
]).show()

+------------------+-------------------+-------------------+----------------------+--------------------------+----------------------+--------------------------+-----------------------+----------------------+--------------------+-------------------+--------------------+-----------------+
|st_i94_cit_missing|st_i94_port_missing|st_i94_addr_missing|st_i94_arrdate_missing|st_i94_arrdate_iso_missing|st_i94_depdate_missing|st_i94_depdate_iso_missing|st_i94_dtadfile_missing|st_i94_matflag_missing|st_i94_count_missing|st_i94_year_missing|st_i94_month_missing|st_i94_id_missing|
+------------------+-------------------+-------------------+----------------------+--------------------------+----------------------+--------------------------+-----------------------+----------------------+--------------------+-------------------+--------------------+-----------------+
|               0.0|                0.0|                0.0|                   0.0|                       0.0|                   0.0|   

In [20]:
# check whether there are still zero values in the result data frame
df_st_i94_immigrations\
.select([count( when(col(c).isNull(), c) )
        .alias(c) for c in df_st_i94_immigrations.columns])\
.toPandas().T

Unnamed: 0,0
st_i94_cit,0
st_i94_port,0
st_i94_addr,0
st_i94_arrdate,0
st_i94_arrdate_iso,0
st_i94_depdate,0
st_i94_depdate_iso,0
st_i94_dtadfile,0
st_i94_matflag,0
st_i94_count,0


In [21]:
# get current Schema of staging table st_i94_immigration
df_st_i94_immigrations.printSchema()

root
 |-- st_i94_cit: integer (nullable = true)
 |-- st_i94_port: string (nullable = true)
 |-- st_i94_addr: string (nullable = false)
 |-- st_i94_arrdate: integer (nullable = true)
 |-- st_i94_arrdate_iso: date (nullable = true)
 |-- st_i94_depdate: integer (nullable = true)
 |-- st_i94_depdate_iso: date (nullable = true)
 |-- st_i94_dtadfile: date (nullable = true)
 |-- st_i94_matflag: string (nullable = false)
 |-- st_i94_count: integer (nullable = true)
 |-- st_i94_year: integer (nullable = true)
 |-- st_i94_month: integer (nullable = true)
 |-- st_i94_id: integer (nullable = true)



In [22]:
# check content of current staging table st_i94_immigration
df_st_i94_immigrations.limit(2)

st_i94_cit,st_i94_port,st_i94_addr,st_i94_arrdate,st_i94_arrdate_iso,st_i94_depdate,st_i94_depdate_iso,st_i94_dtadfile,st_i94_matflag,st_i94_count,st_i94_year,st_i94_month,st_i94_id
101,LOS,CA,20454,2016-01-01,20458,2016-01-05,2016-01-01,M,1,2016,1,523
101,SFR,CA,20454,2016-01-01,20461,2016-01-08,2016-01-01,M,1,2016,1,715


**NOTE:** The column `st_i94_port_state_code` will be inserted after creation of the staging
table `st_immigration_airports` within the step [**2 - Airport dimension**](#question2_data_pipeline)

In [23]:
# write data and append all month to the same parquet result set
location_to_write = "../P8_capstone_resource_files/parquet_stage/PQ1/st_i94_immigrations"

# delete folder if already exists
if path.exists(location_to_write):
    shutil.rmtree(location_to_write)

# write data frame as parquet file (40 Mio. Rows: ~601MB (GZIP) or 855 MB (uncompressed); 12 Mio. Rows: 101 MB (GZIP))
# NOTE: One column is still missing: `st_i94_port_state_code`.
df_st_i94_immigrations \
    .repartition(int(1)) \
    .write \
    .format("parquet")\
    .mode(saveMode='overwrite') \
    .partitionBy('st_i94_year', 'st_i94_month') \
    .parquet(location_to_write, compression="gzip")

"""
df_st_i94_immigrations\
    .write\
    .format('parquet') \
    .mode("overwrite") \
    .partitionBy('st_i94_year', 'st_i94_month', 'st_i94_port') \
    .saveAsTable('st_i94_immigrations',
                 format='parquet',
                 mode='overwrite',
                 compression="gzip",
                 path=filepath_st_i94_immigrations
                 )
"""

'\ndf_st_i94_immigrations    .write    .format(\'parquet\')     .mode("overwrite")     .partitionBy(\'st_i94_year\', \'st_i94_month\', \'st_i94_port\')     .saveAsTable(\'st_i94_immigrations\',\n                 format=\'parquet\',\n                 mode=\'overwrite\',\n                 compression="gzip",\n                 path=filepath_st_i94_immigrations\n                 )\n'

2. Clean data and create staging table `st_immigration_countries` from file [`I94_SAS_Labels_I94CIT_I94RES.txt`](../P8_capstone_resource_files/I94_sas_labels_descriptions_extracted_data/I94_SAS_Labels_I94CIT_I94RES.txt)

In [24]:
# path of txt file
filepath_immigration_countries = "../P8_capstone_resource_files/I94_sas_labels_descriptions_extracted_data/I94_SAS_Labels_I94CIT_I94RES.txt"

# read txt file into data frame
df_txt_immigration_countries=spark.read.text(filepath_immigration_countries)

# create a new df with two columns (st_id_country_code, st_ic_country_name) as staging table st_immigration_countries
df_st_immigration_countries = df_txt_immigration_countries\
    .select(F.regexp_extract('value', r'^\s*(\d*)\s*=  \'(\w*.*)\'', 1).alias('st_ic_country_code').cast(IntegerType()),
            F.regexp_extract('value', r'^\s*(\d*)\s*=  \'(\w*.*)\'', 2).alias('st_ic_country_name'))\
    .drop_duplicates()\
    .sort("st_ic_country_code")

In [25]:
# show prepared staging table st_immigration_countries
df_st_immigration_countries.sort("st_ic_country_code").show(50, False)

+------------------+-----------------------+
|st_ic_country_code|st_ic_country_name     |
+------------------+-----------------------+
|0                 |INVALID: STATELESS     |
|54                |No Country Code (54)   |
|100               |No Country Code (100)  |
|101               |ALBANIA                |
|102               |ANDORRA                |
|103               |AUSTRIA                |
|104               |BELGIUM                |
|105               |BULGARIA               |
|106               |INVALID: CZECHOSLOVAKIA|
|107               |POLAND                 |
|108               |DENMARK                |
|109               |ESTONIA                |
|110               |FINLAND                |
|111               |FRANCE                 |
|112               |GERMANY                |
|113               |GREECE                 |
|114               |HUNGARY                |
|115               |ICELAND                |
|116               |IRELAND                |
|117      

In [26]:
# compare the counts of the full dataset
# Count of rows         : 289
# Count of distinct rows: 289
print('Count of rows: {0}'.format(df_st_immigration_countries.count()))
print('Count of distinct rows: {0}'.format(df_st_immigration_countries.distinct().count()))

Count of rows: 289
Count of distinct rows: 289


In [27]:
# clean up complete identical rows. Only do this if the results from the step above are not identical!
df_st_immigration_countries = df_st_immigration_countries.drop_duplicates()

In [28]:
# show prepared staging table st_immigration_countries
df_st_immigration_countries.sort("st_ic_country_code").show(50, False)

+------------------+-----------------------+
|st_ic_country_code|st_ic_country_name     |
+------------------+-----------------------+
|0                 |INVALID: STATELESS     |
|54                |No Country Code (54)   |
|100               |No Country Code (100)  |
|101               |ALBANIA                |
|102               |ANDORRA                |
|103               |AUSTRIA                |
|104               |BELGIUM                |
|105               |BULGARIA               |
|106               |INVALID: CZECHOSLOVAKIA|
|107               |POLAND                 |
|108               |DENMARK                |
|109               |ESTONIA                |
|110               |FINLAND                |
|111               |FRANCE                 |
|112               |GERMANY                |
|113               |GREECE                 |
|114               |HUNGARY                |
|115               |ICELAND                |
|116               |IRELAND                |
|117      

In [29]:
# Select default value for all invalid entries. All country_code entries in the fact table that are not included in the
# d_immigration countries dimension get the country_code 999 as default value (invalid)
df_st_immigration_countries_default = df_st_immigration_countries\
    .select("st_ic_country_code", "st_ic_country_name") \
    .filter("st_ic_country_code ==  999")

In [30]:
# data cleansing: filter out invalid values from staging table and drop them. Join default value back to dataframe
df_st_immigration_countries \
    .select("st_ic_country_code",
            F.regexp_replace('st_ic_country_name', r"^No Country Code (.*)", "EntryToDelete").alias("st_ic_country_name")
            )\
    .select("st_ic_country_code",
            F.regexp_replace('st_ic_country_name', r"^.*\((should not show)\)", "EntryToDelete").alias("st_ic_country_name")
            )\
    .select("st_ic_country_code",
            F.regexp_replace('st_ic_country_name', r"^INVALID:.*", "EntryToDelete").alias("st_ic_country_name")
        )\
    .filter("st_ic_country_name != 'EntryToDelete'") \
    .union(df_st_immigration_countries_default)\
    .show(500, False)

#

+------------------+---------------------------------------------------------+
|st_ic_country_code|st_ic_country_name                                       |
+------------------+---------------------------------------------------------+
|759               |INDIAN OCEAN AREAS (FRENCH)                              |
|392               |MALI                                                     |
|213               |INDIA                                                    |
|335               |LESOTHO                                                  |
|266               |VIETNAM                                                  |
|691               |COLOMBIA                                                 |
|163               |UZBEKISTAN                                               |
|114               |HUNGARY                                                  |
|525               |BRITISH VIRGIN ISLANDS                                   |
|384               |CHAD                            

In [31]:
# write staging table st_immigration_countries as parquet file
location_to_write = "../P8_capstone_resource_files/parquet_stage/PQ1/st_immigration_countries"

# delete folder if already exists
if path.exists(location_to_write):
    shutil.rmtree(location_to_write)

df_st_immigration_countries \
    .repartition(int(1)) \
    .write \
    .format("parquet")\
    .mode(saveMode='overwrite') \
    .parquet(location_to_write, compression="gzip")


3. Creation of a fact table named `f_i94_immigrations` based on staging table `st_i94_immigrations`.

In [32]:
# Read written data frame back into memory
df_st_i94_immigrations = spark.read.parquet("../P8_capstone_resource_files/parquet_stage/PQ1/st_i94_immigrations")

# show current Schema
df_st_i94_immigrations.printSchema()
df_st_i94_immigrations.count()

root
 |-- st_i94_cit: integer (nullable = true)
 |-- st_i94_port: string (nullable = true)
 |-- st_i94_addr: string (nullable = true)
 |-- st_i94_arrdate: integer (nullable = true)
 |-- st_i94_arrdate_iso: date (nullable = true)
 |-- st_i94_depdate: integer (nullable = true)
 |-- st_i94_depdate_iso: date (nullable = true)
 |-- st_i94_dtadfile: date (nullable = true)
 |-- st_i94_matflag: string (nullable = true)
 |-- st_i94_count: integer (nullable = true)
 |-- st_i94_id: integer (nullable = true)
 |-- st_i94_year: integer (nullable = true)
 |-- st_i94_month: integer (nullable = true)



12228839

In [33]:
# create fact table f_i94_immigration based of staging table st_i94_immigration
df_f_i94_immigrations = df_st_i94_immigrations\
    .withColumnRenamed("st_i94_id", "f_i94_id")\
    .withColumnRenamed("st_i94_cit", "f_i94_cit")\
    .withColumnRenamed("st_i94_addr", "f_i94_addr")\
    .withColumnRenamed("st_i94_arrdate", "f_i94_arrdate")\
    .withColumnRenamed("st_i94_arrdate_iso", "f_i94_arrdate_iso")\
    .withColumnRenamed("st_i94_depdate", "f_i94_depdate")\
    .withColumnRenamed("st_i94_depdate_iso", "f_i94_depdate_iso")\
    .withColumnRenamed("st_i94_dtadfile", "f_i94_dtadfile")\
    .withColumnRenamed("st_i94_matflag", "f_i94_matflag")\
    .withColumnRenamed("st_i94_count", "f_i94_count")\
    .withColumnRenamed("st_i94_year", "f_i94_year")\
    .withColumnRenamed("st_i94_month", "f_i94_month")\
    .withColumnRenamed("st_i94_port", "f_i94_port")\
    .withColumn("d_ic_id", col("f_i94_cit"))\
    .withColumn("d_ia_id", col("f_i94_port")) \
    .withColumn("d_da_id", col("f_i94_arrdate_iso")) \
    .withColumn("d_dd_id", col("f_i94_depdate_iso")) \
    .drop("f_i94_arrdate")\
    .drop("f_i94_depdate")

# show current fact table Schema
df_f_i94_immigrations.printSchema()

root
 |-- f_i94_cit: integer (nullable = true)
 |-- f_i94_port: string (nullable = true)
 |-- f_i94_addr: string (nullable = true)
 |-- f_i94_arrdate_iso: date (nullable = true)
 |-- f_i94_depdate_iso: date (nullable = true)
 |-- f_i94_dtadfile: date (nullable = true)
 |-- f_i94_matflag: string (nullable = true)
 |-- f_i94_count: integer (nullable = true)
 |-- f_i94_id: integer (nullable = true)
 |-- f_i94_year: integer (nullable = true)
 |-- f_i94_month: integer (nullable = true)
 |-- d_ic_id: integer (nullable = true)
 |-- d_ia_id: string (nullable = true)
 |-- d_da_id: date (nullable = true)
 |-- d_dd_id: date (nullable = true)



In [34]:
# take a look inside the fact table f_i94_immigration
df_f_i94_immigrations.show(5,False)

+---------+----------+----------+-----------------+-----------------+--------------+-------------+-----------+--------+----------+-----------+-------+-------+----------+----------+
|f_i94_cit|f_i94_port|f_i94_addr|f_i94_arrdate_iso|f_i94_depdate_iso|f_i94_dtadfile|f_i94_matflag|f_i94_count|f_i94_id|f_i94_year|f_i94_month|d_ic_id|d_ia_id|d_da_id   |d_dd_id   |
+---------+----------+----------+-----------------+-----------------+--------------+-------------+-----------+--------+----------+-----------+-------+-------+----------+----------+
|254      |DAL       |KY        |2016-07-26       |2016-07-29       |2016-07-29    |M            |1          |8643306 |2016      |7          |254    |DAL    |2016-07-26|2016-07-29|
|0        |HHW       |99        |2016-07-01       |2016-09-16       |2016-07-01    |M            |1          |7195726 |2016      |7          |0      |HHW    |2016-07-01|2016-09-16|
|254      |DAL       |KY        |2016-07-26       |2016-08-04       |2016-08-04    |M          

In [35]:
# write fact table f_i94_immigration based on staging table st_i94_immigration (~ 69 MB)
location_to_write = "../P8_capstone_resource_files/parquet_star/PQ1/f_i94_immigrations"

# delete folder if already exists
if path.exists(location_to_write):
    shutil.rmtree(location_to_write)

df_f_i94_immigrations \
    .repartition(int(1)) \
    .write \
    .format("parquet")\
    .mode(saveMode='overwrite') \
    .partitionBy("f_i94_year", "f_i94_month")\
    .parquet(location_to_write, compression="gzip")


4. Creation of a dimension named `d_immigration_countries` based on staging table `st_immigration_countries`.

In [36]:
# Read written data frame back into memory
df_st_i94_immigration_countries = spark.read.parquet("../P8_capstone_resource_files/parquet_stage/PQ1/st_immigration_countries")

# show current Schema
df_st_i94_immigration_countries.printSchema()


# create dimension table d_i94_immigration_countries based of staging table st_i94_immigration_countries
df_d_i94_immigration_countries = df_st_i94_immigration_countries\
    .withColumn("d_ic_id", col("st_ic_country_code"))\
    .withColumnRenamed("st_ic_country_code", "d_ic_country_code")\
    .withColumnRenamed("st_ic_country_name", "d_ic_country_name")

# get current content of dimension table
df_d_i94_immigration_countries.printSchema()
df_d_i94_immigration_countries.sort("d_ic_id").show(5, False)


# write fact table f_i94_immigration based on staging table st_i94_immigration
location_to_write = "../P8_capstone_resource_files/parquet_star/PQ1/d_immigration_countries"

# delete folder if already exists
if path.exists(location_to_write):
    shutil.rmtree(location_to_write)

df_d_i94_immigration_countries \
    .repartition(int(1)) \
    .write \
    .format("parquet")\
    .mode(saveMode='overwrite') \
    .parquet(location_to_write, compression="gzip")

root
 |-- st_ic_country_code: integer (nullable = true)
 |-- st_ic_country_name: string (nullable = true)

root
 |-- d_ic_country_code: integer (nullable = true)
 |-- d_ic_country_name: string (nullable = true)
 |-- d_ic_id: integer (nullable = true)

+-----------------+---------------------+-------+
|d_ic_country_code|d_ic_country_name    |d_ic_id|
+-----------------+---------------------+-------+
|0                |INVALID: STATELESS   |0      |
|54               |No Country Code (54) |54     |
|100              |No Country Code (100)|100    |
|101              |ALBANIA              |101    |
|102              |ANDORRA              |102    |
+-----------------+---------------------+-------+
only showing top 5 rows



5. Mapping of dimension `d_immigration_countries` to  fact table `f_i94_immigration` based on columns
   (`st_i94_immigration.st_i94_cit` --> `f_i94_immigration.d_ic_id`) == (`st_immigration_countries.st_ic_country_code`
   --> `d_immigration_countries.d_ic_id` )

6. Answer Project Question 1: From which country do immigrants come to the U.S. and how many?

In [37]:
# Read written data frame back into memory
df_f_i94_immigrations = spark.read.parquet("../P8_capstone_resource_files/parquet_star/PQ1/f_i94_immigrations")
df_d_immigration_countries = spark.read.parquet("../P8_capstone_resource_files/parquet_star/PQ1/d_immigration_countries")

# check read data frames
df_f_i94_immigrations.printSchema()
df_d_immigration_countries.printSchema()

root
 |-- f_i94_cit: integer (nullable = true)
 |-- f_i94_port: string (nullable = true)
 |-- f_i94_addr: string (nullable = true)
 |-- f_i94_arrdate_iso: date (nullable = true)
 |-- f_i94_depdate_iso: date (nullable = true)
 |-- f_i94_dtadfile: date (nullable = true)
 |-- f_i94_matflag: string (nullable = true)
 |-- f_i94_count: integer (nullable = true)
 |-- f_i94_id: integer (nullable = true)
 |-- d_ic_id: integer (nullable = true)
 |-- d_ia_id: string (nullable = true)
 |-- d_da_id: date (nullable = true)
 |-- d_dd_id: date (nullable = true)
 |-- f_i94_year: integer (nullable = true)
 |-- f_i94_month: integer (nullable = true)

root
 |-- d_ic_country_code: integer (nullable = true)
 |-- d_ic_country_name: string (nullable = true)
 |-- d_ic_id: integer (nullable = true)



In [38]:
# Register data frames as Views
df_f_i94_immigrations.createOrReplaceTempView("f_i94_immigrations")
df_d_immigration_countries.createOrReplaceTempView("d_immigration_countries")


# SQL to answer project question 1 (From which country do immigrants come to the U.S. and how many?)
df_pq1 = spark.sql("select f_i94.f_i94_cit as county_id"
                   "     , d_ic.d_ic_country_name as country"
                   "     , count(f_i94.f_i94_count) as immigrants"
                   "     , RANK() OVER (ORDER BY count(f_i94.f_i94_count) desc) Immigrants_rank"
                   "  from f_i94_immigrations f_i94"
                   "  join d_immigration_countries d_ic on d_ic.d_ic_id = f_i94.d_ic_id"
                   " group by f_i94.f_i94_cit"
                   "         ,d_ic.d_ic_country_name"
                   "  order by Immigrants_rank"
                   "")

# Show top 10 countries where Immigrants come from and how many
df_pq1.filter(df_pq1.Immigrants_rank < 11).show()

"""
+---------+--------------------+----------+---------------+
|county_id|             country|immigrants|Immigrants_rank|
+---------+--------------------+----------+---------------+
|      135|      UNITED KINGDOM|    815721|              1|
|      245|          CHINA, PRC|    767992|              2|
|      582|MEXICO Air Sea, a...|    684295|              3|
|      213|               INDIA|    544067|              4|
|      209|               JAPAN|    441132|              5|
|      111|              FRANCE|    408438|              6|
|      689|              BRAZIL|    387223|              7|
|      438|           AUSTRALIA|    352852|              8|
|      117|               ITALY|    317084|              9|
|      129|               SPAIN|    281121|             10|
+---------+--------------------+----------+---------------+
"""

+---------+--------------------+----------+---------------+
|county_id|             country|immigrants|Immigrants_rank|
+---------+--------------------+----------+---------------+
|      135|      UNITED KINGDOM|    815721|              1|
|      245|          CHINA, PRC|    767992|              2|
|      582|MEXICO Air Sea, a...|    684295|              3|
|      213|               INDIA|    544067|              4|
|      209|               JAPAN|    441132|              5|
|      111|              FRANCE|    408438|              6|
|      689|              BRAZIL|    387223|              7|
|      438|           AUSTRALIA|    352852|              8|
|      117|               ITALY|    317084|              9|
|      129|               SPAIN|    281121|             10|
+---------+--------------------+----------+---------------+

