# Immigration Data Processing

The staging script for the Immigration Data Processing Spark Job

In [33]:
# import libraries
import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DateType, StringType

## Preprocessing Spark Job
---

In [3]:
def initialize_spark():
    """
    Initializes a spark instance
    """
    # initialize spark
    spark = SparkSession\
        .builder\
        .appName("immigration-data-preprocessing")\
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages", "saurfang:spark-sas7bdat:3.0.0-s_2.12")\
        .getOrCreate()

    return spark

In [4]:
def import_immigration_and_add_labels(spark, imm_fp, mode_labels_fp, port_labels_fp, 
                                      visa_labels_fp, cit_labels_fp, cols_required):
    """
    Imports all of the data required for immigration pre-processing
    Filters the immigration data down to only the columns we need
    And adds all of the labels to the codified values we're interested in

    Params
    ------
    spark: spark instance
        A spark instance that's been initialized
    imm_fp: str
        The location of the immigration data files
    mode_labels_fp: str
        The location of the labels file for i94mode
    port_labels_fp: str
        The location of the labels file for i94port
    visa_labels_fp: str
        The location of the labels file for i94visa
    cit_labels_fp: str
        The location of the labels file for i94cit
    cols_required: list
        The list of columns we require from the immigration data file
    """
    # import the sas files
    imm = spark.read.format('com.github.saurfang.sas.spark').load(imm_fp)

    # import label files
    mode_labels = spark.read.csv(mode_labels_fp, header=True) # for mode of travel
    port_labels = spark.read.csv(port_labels_fp, header=True) # for port of entry
    visa_labels = spark.read.csv(visa_labels_fp, header=True) # for type of visa
    citz_labels = spark.read.csv(cit_labels_fp, header=True) # for citizenship country

    # select only the columns we need
    imm = imm.select(cols_required)

    # add labels to codified values
    for df in [mode_labels, port_labels, visa_labels, citz_labels]:
        imm = imm.join(
            F.broadcast(df),
            on=df.columns[0],
            how="left"
        )

    return imm

In [5]:
def preprocess_immigration_data(imm, preprocessed_output_fp):
    """
    Proprocess the immigration data by:
        1. Removing records with irrelevant and invalid values
        2. Removing records without an arrival date
        3. Adjusting the format of the arrival date
        4. Adjusting the gender column so everything apart from M and F is O for Other
        5. Bucketing the ages into distinct categories

    Params
    ------
    imm: spark dataframe
        The immigration dataset
    preprocessed_output_fp: str
        The location to save the preprocessed file
    """
    # hardcoded variables
    SAS_START_DATE = datetime.datetime(1960, 1, 1)

    # remove the irrelevant ports and invalid citizenship countries from the dataset
    imm = imm.where(
        (imm["i94port_relevant"] == "RELEVANT") &
        (imm["i94cit_continent"] != "INVALID")
    )
    
    # clean up arrival date which is the number of days after January 1, 1960
    imm = imm.where(imm["arrdate"].isNotNull())    
    adjust_date = F.udf(lambda x: SAS_START_DATE + datetime.timedelta(days=x), DateType())
    imm = imm.withColumn("arrdateclean", adjust_date(imm["arrdate"]))

    # clean up the gender column
    adjust_gender = F.udf(lambda x: x if x in ["M", "F"] else "O")
    imm = imm.withColumn("genderclean", adjust_gender(imm["gender"]))

    # bucket the age groups into distinct categories
    @F.udf(StringType())
    def age_categorizer(age):
        """
        Takes an age value and buckets it into a distinct category
        Returns "Unknown" if the age is a null value, is negative, or is greater than 120

        Params
        ------
        age: integer or float
            the age of a person
        """
        if age == None:
            return "Unknown"
        elif 0 <= age < 18:
            return "Below 18"
        elif 18 <= age < 30:
            return "18 to 29"
        elif 30 <= age < 40:
            return "30 to 39"
        elif 40 <= age < 50:
            return "40 to 49"
        elif 50 <= age < 60:
            return "50 to 59"
        elif 60 <= age <= 120:
            return "60 and Above"
        else:
            return "Unknown"

    imm = imm.withColumn("agecategory", age_categorizer(imm["i94bir"]))

    # create a unique identifier for traveller profiles
    imm = imm.withColumn(
        "profile_id", 
        F.lower(F.concat(
            imm["genderclean"], 
            F.substring(imm["i94cit_continent"], 1, 3), 
            F.substring(imm["agecategory"], 1, 2)
            )
        )
    )

    # add month and year for partitioning
    imm = imm.withColumn("month", F.month(imm["arrdateclean"]))
    imm = imm.withColumn("year", F.year(imm["arrdateclean"]))

    imm.write.partitionBy("month", "year").parquet(preprocessed_output_fp + "immigration_data", "append")

In [6]:
def preprocessing_main():
    """
    The main function that executes the Spark job that preprocessing the immigration data
    """
    # hardcoded variables
    imm_fp = "../data/i94_feb16_sub.sas7bdat"
    mode_labels_fp = "../data/i94mode_labels.csv"
    port_labels_fp = "../data/i94port_labels.csv"
    visa_labels_fp = "../data/i94visa_labels.csv"
    cit_labels_fp = "../data/i94cit_labels.csv"
    cols_required = [
        "i94cit", "i94port", "arrdate",
        "i94mode", "i94visa", "i94bir", "gender"
    ]
    preprocessed_output_fp = "../data/preprocessed_files/"

    # run the proprocessing spark job
    spark = initialize_spark()

    imm = import_immigration_and_add_labels(
        spark, imm_fp, mode_labels_fp, port_labels_fp, 
        visa_labels_fp, cit_labels_fp, cols_required
    )

    imm = preprocess_immigration_data(imm, preprocessed_output_fp)

    spark.stop()

In [7]:
# run above
preprocessing_main()

22/04/07 00:35:17 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.1.9 instead (on interface wlo1)
22/04/07 00:35:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
https://repos.spark-packages.org/ added as a remote repository with the name: repo-1
Ivy Default Cache set to: /home/stefanjaro/.ivy2/cache
The jars for the packages stored in: /home/stefanjaro/.ivy2/jars


:: loading settings :: url = jar:file:/home/stefanjaro/miniconda3/envs/analytics/lib/python3.7/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


saurfang#spark-sas7bdat added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e3764784-e066-4025-8148-e5e2ee335425;1.0
	confs: [default]
	found saurfang#spark-sas7bdat;3.0.0-s_2.12 in spark-packages
	found com.epam#parso;2.0.11 in central
	found org.slf4j#slf4j-api;1.7.5 in central
	found org.apache.logging.log4j#log4j-api-scala_2.12;12.0 in central
	found org.scala-lang#scala-reflect;2.12.10 in central
	found org.apache.logging.log4j#log4j-api;2.13.2 in central
:: resolution report :: resolve 188ms :: artifacts dl 11ms
	:: modules in use:
	com.epam#parso;2.0.11 from central in [default]
	org.apache.logging.log4j#log4j-api;2.13.2 from central in [default]
	org.apache.logging.log4j#log4j-api-scala_2.12;12.0 from central in [default]
	org.scala-lang#scala-reflect;2.12.10 from central in [default]
	org.slf4j#slf4j-api;1.7.5 from central in [default]
	saurfang#spark-sas7bdat;3.0.0-s_2.12 from spark-packages in [default]
	-----------------------------------

# Fact and Dimension Creation Spark Job
---

In [20]:
def initialize_spark():
    """
    Initializes a spark instance
    """
    # initialize spark
    spark = SparkSession\
        .builder\
        .appName("immigration-fact-and-dimension-creation")\
        .getOrCreate()

    return spark

In [21]:
def create_immigration_fact(spark, preprocessed_file_path, output_file_path):
    """
    Creates the immigration fact table

    Params
    ------
    spark: spark instance
        A spark instance that's been initialized
    preprocessed_file_path: str
        The location of the preprocessed immigration data
    output_file_path: str
        The location to store the final fact/dimension data
    """
    # read in the immigration data
    imm = spark.read.parquet(preprocessed_file_path)

    # create the skeleton table that will become fact_immigration table
    fact_imm = imm.select("i94port_state", "profile_id", "arrdateclean").dropDuplicates()

    # index columns
    index_cols = ["i94port_state", "profile_id", "arrdateclean"]

    # add total number of travellers to the above
    fact_imm = fact_imm.join(
        imm.groupby(index_cols).count(),
        on=index_cols,
        how="left"
    )

    # add number of travellers by transportation mode
    fact_imm = fact_imm.join(
        imm.groupby(index_cols).pivot("i94mode_label").count(),
        on=index_cols,
        how="left"
    )

    # add number of travellers by purpose of visit
    fact_imm = fact_imm.join(
        imm.groupby(index_cols).pivot("i94visa_label").count(),
        on=index_cols,
        how="left"
    )

    # drop the not reported column in the fact table
    fact_imm = fact_imm.drop("Not Reported")

    # rename columns in the fact immigration table
    fact_imm_col_names = {
        "i94port_state": "state_id",
        "arrdateclean": "arrival_date",
        "count": "all_travellers",
        "Air": "air_travellers",
        "Land": "land_travellers",
        "Sea": "sea_travellers",
        "Business": "business_travellers",
        "Pleasure": "pleasure_travellers",
        "Student": "student_travellers"
    }

    for k,v in fact_imm_col_names.items():
        fact_imm = fact_imm.withColumnRenamed(k, v)

    # fill nulls with 0
    fact_imm = fact_imm.fillna(0)

    # add a record id column
    fact_imm = fact_imm.withColumn("record_id", F.monotonically_increasing_id())

    # add a month and year for partitioning
    fact_imm = fact_imm.withColumn("month", F.month(fact_imm["arrival_date"]))
    fact_imm = fact_imm.withColumn("year", F.year(fact_imm["arrival_date"]))

    # write to parquet files
    fact_imm.write.partitionBy("month", "year").parquet(output_file_path + "fact_immigration/", "append")

In [22]:
def create_traveller_profile_dimension(spark, preprocessed_file_path, output_file_path):
    """
    Creates the traveller profile dimension table

    Params
    ------
    spark: spark instance
        A spark instance that's been initialized
    preprocessed_file_path: str
        The location of the preprocessed immigration data
    output_file_path: str
        The location to store the final fact/dimension data
    """

    # read in the immigration data
    imm = spark.read.parquet(preprocessed_file_path)

    # create the traveller profile dimension table
    dim_traveller_profile = imm.select(
        "profile_id", "genderclean", "agecategory", 
        "i94cit_continent", "i94cit_global_region"
    ).dropDuplicates()

    # rename columns
    profile_col_names = {
        "genderclean": "gender",
        "agecategory": "age_category",
        "i94cit_continent": "citizen_region",
        "i94cit_global_region": "citizen_global_region"
    }

    for k,v in profile_col_names.items():
        dim_traveller_profile = dim_traveller_profile.withColumnRenamed(k, v)

    # write to output file path
    dim_traveller_profile.write.parquet(output_file_path + "dim_traveller_profile/", "append")

In [23]:
def create_time_dimension(spark, preprocessed_file_path, output_file_path):
    """
    Creates the time dimension table

    Params
    ------
    spark: spark instance
        A spark instance that's been initialized
    preprocessed_file_path: str
        The location of the preprocessed immigration data
    output_file_path: str
        The location to store the final fact/dimension data
    """
    # read in the immigration data
    imm = spark.read.parquet(preprocessed_file_path)

    # create the time dimension table skeleton
    dim_time = imm.select("arrdateclean").drop_duplicates()

    # add the date components
    dim_time = dim_time.withColumn("day", F.dayofmonth(dim_time["arrdateclean"]))
    dim_time = dim_time.withColumn("month", F.month(dim_time["arrdateclean"]))
    dim_time = dim_time.withColumn("year", F.year(dim_time["arrdateclean"]))
    dim_time = dim_time.withColumn("day_of_week", F.dayofweek(dim_time["arrdateclean"]))
    dim_time = dim_time.withColumn("month_year", F.concat(dim_time["month"], F.lit("-"), dim_time["year"]))

    # rename the arrdateclean column
    dim_time = dim_time.withColumnRenamed("arrdateclean", "timestamp")

    # write to parquet files
    dim_time.write.partitionBy("month", "year").parquet(output_file_path + "dim_time/", "append")

In [24]:
def fact_and_dimension_creation_main():
    """
    The main function that runs the Spark job
    to create the fact and dimension tables off the immigration data
    """
    # hardcoded variables
    preprocessed_file_path = "../data/preprocessed_files/immigration_data/"
    output_file_path = "../data/output_files/"

    # run the spark job
    spark = initialize_spark()
    create_traveller_profile_dimension(spark, preprocessed_file_path, output_file_path)
    create_immigration_fact(spark, preprocessed_file_path, output_file_path)
    create_time_dimension(spark, preprocessed_file_path, output_file_path)

    spark.stop()

In [25]:
# run above
fact_and_dimension_creation_main()

                                                                                

# Testing
---

In [26]:
# initialize spark
spark = SparkSession\
    .builder\
    .appName("immigration-fact-and-dimension-creation")\
    .getOrCreate()

In [27]:
# read all data
imm_prepoc = spark.read.parquet("../data/preprocessed_files/immigration_data/")
fact_imm = spark.read.parquet("../data/output_files/fact_immigration")
dim_time = spark.read.parquet("../data/output_files/dim_time")
dim_traveller_profile = spark.read.parquet("../data/output_files/dim_traveller_profile")

In [28]:
# total rows in imm_preproc
imm_prepoc.count()

2158058

In [29]:
# see fact_imm
fact_imm.limit(5).toPandas()

Unnamed: 0,state_id,profile_id,arrival_date,all_travellers,air_travellers,land_travellers,sea_travellers,business_travellers,pleasure_travellers,student_travellers,record_id,month,year
0,IL,meur18,2016-02-20,121,121,0,0,38,83,0,0,2,2016
1,NJ,meur40,2016-02-13,375,375,0,0,59,315,1,1,2,2016
2,IL,meur18,2016-02-19,83,83,0,0,15,68,0,2,2,2016
3,GA,feur50,2016-02-19,51,51,0,0,4,47,0,3,2,2016
4,FL,feur50,2016-02-11,390,386,0,4,13,377,0,4,2,2016


In [30]:
# total number of travellers (should match total rows above)
fact_imm.agg({"all_travellers": "sum"}).collect()[0]

Row(sum(all_travellers)=2158058)

In [31]:
# dim_time head
dim_time.limit(10).toPandas()

Unnamed: 0,timestamp,day,day_of_week,month_year,month,year
0,2016-02-04,4,5,2-2016,2,2016
1,2016-02-08,8,2,2-2016,2,2016
2,2016-02-03,3,4,2-2016,2,2016
3,2016-02-22,22,2,2-2016,2,2016
4,2016-02-11,11,5,2-2016,2,2016
5,2016-02-09,9,3,2-2016,2,2016
6,2016-02-15,15,2,2-2016,2,2016
7,2016-02-18,18,5,2-2016,2,2016
8,2016-02-12,12,6,2-2016,2,2016
9,2016-02-19,19,6,2-2016,2,2016


In [32]:
# dim_traveller_profile head
dim_traveller_profile.limit(10).toPandas()

Unnamed: 0,profile_id,gender,age_category,citizen_region,citizen_global_region
0,mnorbe,M,Below 18,NORTH AMERICA,SOUTH
1,fsou60,F,60 and Above,SOUTH AMERICA,SOUTH
2,oasibe,O,Below 18,ASIA,SOUTH
3,meur40,M,40 to 49,EUROPE,NORTH
4,fafr40,F,40 to 49,AFRICA,SOUTH
5,oasi60,O,60 and Above,ASIA,NORTH
6,foce50,F,50 to 59,OCEANIA,NORTH
7,foce60,F,60 and Above,OCEANIA,NORTH
8,feur50,F,50 to 59,EUROPE,NORTH
9,oasi30,O,30 to 39,ASIA,SOUTH
