# ETL Development

- Base template for the etl.py
- Documents cleaning steps
- Each cleaning step is tested on the i94 dataset

In [1]:
from datetime import datetime, timedelta

import configparser
import os
from pprint import pprint

import pandas as pd, numpy as np
import matplotlib.pyplot as plt

import findspark
findspark.init()
print(findspark.find())
print(os.environ['SPARK_HOME'])
print(os.environ['JAVA_HOME'])
print(os.environ['HADOOP_HOME'])

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.functions import col, asc, desc, min, max, coalesce, lit
from pyspark.sql.types import *

pd.set_option('display.max_rows', 50)

c:\spark
c:\spark
C:\Program Files\Zulu\zulu-8-jre\
c:\Hadoop


## Configs

In [3]:
#def create_spark_session(local=True):
#    """
#    Creates and returns spark session.
#    """
#    spark = SparkSession \
#        .builder \
#        .config("spark.jars.packages", "saurfang:spark-sas7bdat:3.0.0-s_2.12,org.apache.hadoop:hadoop-aws:2.7.6") \
#        .config("fs.s3a.access.key", KEY) \
#        .config("fs.s3a.secret.key", SECRET) \
#        .config("fs.s3a.endpoint", f"s3-{REGION}.amazonaws.com") \
#        .enableHiveSupport() \
#        .getOrCreate()
#    
#    #spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
#    return spark
#
## create spark session
#spark = create_spark_session()

In [4]:
def create_spark_session(local=True):
    """
    Creates and returns spark session.
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "saurfang:spark-sas7bdat:3.0.0-s_2.12") \
        .enableHiveSupport() \
        .getOrCreate()
    
    #spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
    return spark

# create spark session
spark = create_spark_session()

## Read I94

In [5]:
# define schema
# https://knowledge.udacity.com/questions/316417 - I94cit and I94 res clarification

i94_schema = StructType([
    StructField("cicid", IntegerType(), True),    # id
    StructField("i94yr", IntegerType(), True),    # Year
    StructField("i94mon", IntegerType(), True),   # Month
    StructField("i94cit", IntegerType(), True),   # Country Codes I94CIT represents the country of citizenship.
    StructField("i94res", IntegerType(), True),   # Country Codes I94RES represents the country of residence.
    StructField("i94port", StringType(), True),   # e. g. 'DTH'	=	'DUTCH HARBOR, AK  
    StructField("arrdate", IntegerType(), True),  # ARRDATE is the Arrival Date in the USA. SAS date numeric field
    StructField("i94mode", IntegerType(), True),  # Air, Sea, Land ...
    StructField("i94addr", StringType(), True),   # States: FL, ...
    StructField("depdate", IntegerType(), True),  # SAS date numeric field 
    StructField("i94bir", IntegerType(), True),   # Age of Respondent in Years
    StructField("i94visa", IntegerType(), True),  # Business, Pleasure, Student
    StructField("count", IntegerType(), True),    # COUNT - Used for summary statistics
    StructField("dtadfile", StringType(), True),  # DTADFILE - Character Date Field - Date added to I-94 Files - CIC does not use
    StructField("visapost", StringType(), True),  # VISAPOST - Department of State where where Visa was issued - CIC does not use
    StructField("occup", StringType(), True),     # OCCUP - Occupation that will be performed in U.S. - CIC does not use
    StructField("entdepa", StringType(), True),   # ENTDEPA - Arrival Flag - admitted or paroled into the U.S. - CIC does not use
    StructField("entdepd", StringType(), True),   # ENTDEPD - Departure Flag - Departed, lost I-94 or is deceased - CIC does not use
    StructField("entdepu", StringType(), True),   # ENTDEPU - Update Flag - Either apprehended, overstayed, adjusted to perm residence - CIC does not use
    StructField("matflag", StringType(), True),   # MATFLAG - Match flag - Match of arrival and departure records
    StructField("biryear", IntegerType(), True),  # BIRYEAR - 4 digit year of birth
    StructField("dtaddto", StringType(), True),   # DTADDTO - Character Date Field - Date to which admitted to U.S. (allowed to stay until) - CIC does not use
    StructField("gender", StringType(), True),    # GENDER - Non-immigrant sex
    StructField("insnum", StringType(), True),    # INSNUM - INS number
    StructField("airline", StringType(), True),   # AIRLINE - Airline used to arrive in U.S.
    StructField("admnum", DoubleType(), True),    # ADMNUM - Admission Number
    StructField("fltno", StringType(), True),     # FLTNO - Flight number of Airline used to arrive in U.S.
    StructField("visatype", StringType(), True),  # VISATYPE - Class of admission legally admitting the non-immigrant to temporarily stay in U.S.
])        

In [6]:
# read spark with schema definition
input_data = '../../staging/i94/i94_apr16_sub.sas7bdat'

df_spark = spark.read.format('com.github.saurfang.sas.spark').load(input_data, schema=i94_schema)

print(df_spark.count())

3096313


In [7]:
df_spark.show(5)

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    6| 2016|     4|   692|   692|    XXX|  20573|   null|   null|   null|    37|      2|    1|    null|    null| null|      T|   null|      U|   null|   1979|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|    7| 2016|     4|   254|   276|    ATL|  20551|      1|     AL|   null|    25|      3|    1|20130811|     SEO| nu

## Datetime conversions

In [12]:
#https://knowledge.udacity.com/questions/66798

# convert SAS date to date
def convert_sas_date(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
    
# register udf
udf_date_from_sas = udf(lambda x: convert_sas_date(x), DateType())

In [9]:
# convert string format to date

def convert_str_to_date(x):
    try:
        return datetime.strptime(x, "%Y%m%d")
    except:
        return None

# register udf
udf_date_from_str = udf(lambda x: convert_str_to_date(x), DateType())

In [10]:
# add date columns
df_spark = df_spark\
    .withColumn("arrival_date", udf_date_from_sas("arrdate")) \
    .withColumn("departure_date", udf_date_from_sas("depdate")) \
    .withColumn("dtadfile_date", udf_date_from_str("dtadfile"))

In [11]:
df_spark.show(5)

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+--------------+-------------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|arrival_date|departure_date|dtadfile_date|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+--------------+-------------+
|    6| 2016|     4|   692|   692|    XXX|  20573|   null|   null|   null|    37|      2|    1|    null|    null| null|      T|   null|      U|   null|   1979|10282016|  null|  null|   null| 1.897628485E9| null|

## i94cit/res number to iso-code mapping
- Read country mapping
- Create mapping function
- Perform mapping

In [13]:
# read country mapping table

country_map_schema = StructType([
    StructField("i94_code", IntegerType(), False),
    StructField("i94_desc", StringType(), False),
    StructField("iso_code", StringType(), False),
])   

df_con = spark.read.csv("../staging/countries_mapping.csv", sep=";", header=True, schema=country_map_schema)

In [14]:
df_con.show(5)

+--------+-----------+--------+
|i94_code|   i94_desc|iso_code|
+--------+-----------+--------+
|     236|AFGHANISTAN|      AF|
|     101|    ALBANIA|      AL|
|     316|    ALGERIA|      DZ|
|     102|    ANDORRA|      AD|
|     324|     ANGOLA|      AO|
+--------+-----------+--------+
only showing top 5 rows



In [15]:
# join iso_code column as i94cit_id
# if no value found set to dummy value 99 = unknown

joinExpr = [df_spark.i94cit == df_con.i94_code]
df_spark_con =\
    df_spark.join(df_con.select("i94_code","iso_code"), joinExpr, "left_outer")\
        .withColumn("i94cit_id", coalesce("iso_code", lit(99))).drop("i94_code","iso_code")

In [16]:
df_spark_con.select("i94cit", "i94cit_id").show(5)

+------+---------+
|i94cit|i94cit_id|
+------+---------+
|   692|       EC|
|   254|       99|
|   101|       AL|
|   101|       AL|
|   101|       AL|
+------+---------+
only showing top 5 rows



In [17]:
# join iso_code column as i94res_id
# if no value found set to dummy value 99 = unknown

joinExpr = [df_spark.i94res == df_con.i94_code]
df_spark_con =\
    df_spark_con.join(df_con.select("i94_code","iso_code"), joinExpr, "left_outer")\
        .withColumn("i94res_id", coalesce("iso_code", lit(99))).drop("i94_code","iso_code")

In [18]:
df_spark_con.select("i94res", "i94res_id").show(5)

+------+---------+
|i94res|i94res_id|
+------+---------+
|   692|       EC|
|   276|       KR|
|   101|       AL|
|   101|       AL|
|   101|       AL|
+------+---------+
only showing top 5 rows



In [19]:
df_spark_con.select("i94cit","i94cit_id","i94res","i94res_id").show(50)

+------+---------+------+---------+
|i94cit|i94cit_id|i94res|i94res_id|
+------+---------+------+---------+
|   692|       EC|   692|       EC|
|   254|       99|   276|       KR|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|       AL|
|   101|       AL|   101|   

- transformation looks fine

##### Alternative mapping approaches: do not work on AWS EMR (missing pandas installation)

In [20]:
## select relevant mapping columns and convert to list
#i94_country_code_to_iso = df_con.select("i94_code", "iso_code").toPandas().values.tolist()
#
## convert to dictionary and create broadcast variable
#country_map = spark.sparkContext.broadcast(dict(i94_country_code_to_iso))
#
## mapping function
#get_country_isocode = lambda x: country_map.value.get(x, 99)
#
## register udf
#country_number_to_code = udf(get_country_isocode, StringType())

In [21]:
## read country mapping table
#df_con = pd.read_csv("./data/countries_mapping.csv", sep=";", header=0, index_col=0)
#
## select relevant mapping columns and convert to list
#i94_country_code_to_iso = df_con[['i94_code','iso_code']].values.tolist()
#
## convert to dictionary and create broadcast variable
#country_map = spark.sparkContext.broadcast(dict(i94_country_code_to_iso))
#
## mapping function
#get_country_isocode = lambda x: country_map.value.get(x, 99)
#
## register udf
#country_number_to_code = udf(get_country_isocode, StringType())

In [22]:
# perform mapping and add columns

#df_spark =df_spark\
#            .withColumn("94cit_id", country_number_to_code("i94cit"))\
#            .withColumn("94res_id", country_number_to_code("i94res"))\
#
#df_spark.select("i94cit","94cit_id","i94res","94res_id").show(25)

## Visatype to visa_id mapping
- Read visa_categories dimension / mapping table
- Create mapping function
- Perform mapping

In [23]:
df_spark = df_spark_con

In [24]:
# read visa categories

visa_cat_schema = StructType([
    StructField("visa_category", StringType(), False),
    StructField("visa_group", StringType(), False),
    StructField("visa_desc", StringType(), False),
    StructField("visa", StringType(), True),
    StructField("visa_id", IntegerType(), False),
])   

df_visa = spark.read.csv("../staging/visa_categories.csv", sep=";", header=True, schema=visa_cat_schema)

In [25]:
df_visa.show(25)

+--------------------+--------------------+--------------------+---------+-------------+-------+
|       visa_category|          visa_group|           visa_desc|visa_code|visa_code_map|visa_id|
+--------------------+--------------------+--------------------+---------+-------------+-------+
|Unknown Visa Cate...|        Unknown Visa|             Unknown|     null|         null|      1|
|Immigrant Visa Ca...|Immediate Relativ...|Spouse of a U.S. ...|      IR1|          IR1|      2|
|Immigrant Visa Ca...|Immediate Relativ...|Spouse of a U.S. ...|      CR1|          CR1|      3|
|Immigrant Visa Ca...|Immediate Relativ...|Spouse of a U.S. ...|    K-3 *|           K3|      4|
|Immigrant Visa Ca...|Immediate Relativ...|Fiancé(e) to marr...|    K-1 *|           K1|      5|
|Immigrant Visa Ca...|Immediate Relativ...|Intercountry Adop...|      IR3|          IR3|      6|
|Immigrant Visa Ca...|Immediate Relativ...|Intercountry Adop...|      IH3|          IH3|      7|
|Immigrant Visa Ca...|Immediat

In [26]:
# join visatype and visa_code_map
# if no value found set to dummy value 1 = Unknown

joinExpr = [df_spark.visatype == df_visa.visa_code_map]
df_spark_visa =\
    df_spark.join(df_visa.select("visa","visa_id").dropna(), joinExpr, "left_outer")\
        .withColumn("visa_id", coalesce("visa_id", lit(1))).drop("visa")

In [27]:
df_spark_visa.select("visatype","visa_id").show(25)

+--------+-------+
|visatype|visa_id|
+--------+-------+
|      B2|     57|
|      F1|     13|
|      B2|     57|
|      B2|     57|
|      B2|     57|
|      B1|     33|
|      B2|     57|
|      B2|     57|
|      B2|     57|
|      B1|     33|
|      B2|     57|
|      B2|     57|
|      B1|     33|
|      B1|     33|
|      B2|     57|
|      B2|     57|
|      B2|     57|
|      B2|     57|
|      B2|     57|
|      B2|     57|
|      B2|     57|
|      B2|     57|
|      B2|     57|
|      B2|     57|
|      B1|     33|
+--------+-------+
only showing top 25 rows



- transformation looks fine

In [28]:
#df_spark_visa.select("visatype","visa_id").groupBy("visatype","visa_id").count().show()

##### Alternative mapping approaches: do not work on AWS EMR (missing pandas installation)

In [29]:
## select relevant mapping columns and convert to list
#i94_visa_code_to_id = df_visa.select("visa_code_map", "visa_id").toPandas().values.tolist()
#
## convert to dictionary and create broadcast variable
#visa_map = spark.sparkContext.broadcast(dict(i94_visa_code_to_id))
#
## mapping function
#get_visa_id = lambda x: int(visa_map.value.get(x, 1))
#
### register udf
#visa_code_to_id = udf(get_visa_id, IntegerType())

In [30]:
#i94_visa_code_to_id

In [31]:
## read visa categories
#
#df_visa = pd.read_csv("./data/visa_categories.csv", sep=";", header=0)
#
## select relevant mapping columns and convert to list
#i94_visa_code_to_id = df_visa[['visa_code_map','visa_id']].values.tolist()
#
## convert to dictionary and create broadcast variable
#visa_map = spark.sparkContext.broadcast(dict(i94_visa_code_to_id))
#
## mapping function
#get_visa_id = lambda x: visa_map.value.get(x, 1)
#
## register udf
#visa_code_to_id = udf(get_visa_id, IntegerType())

In [32]:
#df_spark = df_spark.withColumn("visa_id", visa_code_to_id("visatype"))
#
#df_spark.select("visatype","visa_id").show(5)

## Clean i94mode

- fill nulls with 9 = 'Not Reported'

In [33]:
#value i94model
#	1 = 'Air'
#	2 = 'Sea'
#	3 = 'Land'
#	9 = 'Not reported' ;

In [34]:
df_spark.select("i94mode").groupBy("i94mode").count().show()

+-------+-------+
|i94mode|  count|
+-------+-------+
|   null|    239|
|      1|2994505|
|      3|  66660|
|      9|   8560|
|      2|  26349|
+-------+-------+



In [35]:
# null values --> fill with not reported as dictionary suggests

df_spark = df_spark.fillna({"i94mode":9})

In [36]:
df_spark.select("i94mode").groupBy("i94mode").count().show()

+-------+-------+
|i94mode|  count|
+-------+-------+
|      1|2994505|
|      3|  66660|
|      9|   8799|
|      2|  26349|
+-------+-------+



- all values are filled

## Clean Age
- negative age is replaced with null

In [37]:
df_spark.select("i94bir").where(col("i94bir").isNull()).count()

802

In [38]:
df_spark.describe("i94bir").show()

+-------+------------------+
|summary|            i94bir|
+-------+------------------+
|  count|           3095511|
|   mean|41.767614458485205|
| stddev|17.420260534588213|
|    min|                -3|
|    max|               114|
+-------+------------------+



In [39]:
def clean_negative_age(x):
    if x == None:
        return None
    elif x < 0:
        return None
    else:
        return x
    
# register udf
clean_age = udf(lambda x: clean_negative_age(x), IntegerType())

In [40]:
# perform cleaning

df_spark = df_spark.withColumn("age", clean_age("i94bir"))
df_spark.describe("age").show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|           3095510|
|   mean|  41.7676289205979|
| stddev|17.420244765569194|
|    min|                 0|
|    max|               114|
+-------+------------------+



- Negative age was removed

## Write to parquet (test only)

In [41]:
# check schema
df_spark.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: integer (nullable = true)
 |-- i94mode: integer (nullable = false)
 |-- i94addr: string (nullable = true)
 |-- depdate: integer (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double

In [8]:
# write to parquet
output_data = "../model/"
df_spark.limit(250).repartition(1).write.parquet(output_data+"i94.parquet", mode="overwrite")

- i94 data is provided as monthly dataset
- this way each additional month can be added in a consistent way (mode in production is append)
- queries will be most likely on most recent months so this also helps to increase performance