In [1]:
%%configure
{ 
    "name": "data-exploration",
    "conf": {
        "spark.driver.memory": "4G",
        "spark.jars.packages": "saurfang:spark-sas7bdat:2.1.0-s_2.11", 
        "spark.driver.extraJavaOptions" : "-Dlog4jspark.root.logger=DEBUG,console" 
  }
}

In [29]:
import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DateType, BooleanType


@udf(DateType())
def get_date_from_int(x):
    if x is None:
        return x
    if x < 0:
        return datetime.datetime(1900, 1, 1)  # default date for invalid dates
    else:
        return datetime.datetime(1960, 1, 1) + datetime.timedelta(x)


@udf(StringType())
def normalize_gender(x):
    if x in {"M", "F"}:
        return x
    elif x is None:
        return None
    else:
        return "OTHER"


@udf(BooleanType())
def normalize_match_flag(x):
    if x == None:
        return False
    else:
        return True


@udf(StringType())
def normalize_mode(x):
    if x == 1:
        return "Air"
    elif x == 2:
        return "Sea"
    elif x == 3:
        return "Land"
    elif x is None:
        return None
    else:
        return "Other"


@udf(StringType())
def normalize_visa_category(x):
    if x == 1:
        return "Business"
    elif x == 2:
        return "Pleasure"
    elif x == 3:
        return "Student"
    elif x is None:
        return None
    else:
        return "Other"


An error was encountered:
Invalid status code '404' from http://localhost:8998/sessions/1 with error payload: {"msg":"Session '1' not found."}


In [20]:
from urllib.parse import urlparse
import logging
import os
import boto3
from pyspark.sql import SparkSession

from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.types import IntegerType, FloatType, LongType


VALID_COLUMNS = [
    'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate',
    'i94bir', 'i94visa', 'count', 'dtadfile',
    'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear',
    'dtaddto', 'gender', 'insnum', 'airline', 'admnum', 'fltno', 'visatype'
]


FINAL_COLUMNS = [
    "year", "month", "day",
    "arrival_date", "departure_date", "permitted_date", "unrestricted_stay",
    "country_citizenship", "country_residence", "port_name",
    "destination_state",
    "age", "gender", "num_previous_stays",
    "visa_type", "visa_category",
    "is_overstay"
]

SAS_FORMAT_READER = "com.github.saurfang.sas.spark"



def read_sas_data(spark, path: str, columns_to_read=None):

    logging.info("Reading the immigration data")

    url_components = urlparse(path)

    if url_components.scheme == 's3':
        s3_client = boto3.client('s3')

        s3_objects = s3_client.list_objects(Bucket=url_components.netloc, Prefix=url_components.path.lstrip("/"))

        df = None
        for key in s3_objects['Contents']:
            file_key = key['Key']
            print("Reading File: {}".format(file_key))
            logging.warning("Reading File: {}".format(file_key))
            file_df = (
                spark.read
                .format(SAS_FORMAT_READER)
                .load("s3://{}/{}".format(url_components.netloc, file_key))
            )
            if columns_to_read:
                file_df = file_df.select(VALID_COLUMNS)
            if df:
                df = df.union(file_df)
            else:
                df = file_df
    else:
        df = (
            spark.read
            .format(SAS_FORMAT_READER)
            .load("s3://{}/{}".format(url_components.netloc, file_key))
        )
        if columns_to_read:
            df.select(columns_to_read)

    return df


def read_dimension_data(spark: SparkSession, root_path: str):

    logging.info("Reading the dimension data")

    gdp_df = spark.read.option("header", True).format("csv").load(f"{root_path}/gdp.csv")
    country_ids_df = spark.read.option("header", True).format("csv").load(f"{root_path}/country_ids.csv")
    states_df = spark.read.option("header", True).format("csv").load(f"{root_path}/states.csv")
    ports_df = spark.read.option("header", True).format("csv").load(f"{root_path}/ports.csv")

    country_ids_df_cleaned = (
        country_ids_df
        .withColumn("country_id", F.col("country_id").cast(IntegerType()))
        .withColumn("country_name", F.initcap(F.col("country_name")))  # title case
        .withColumn("country_name_lower", F.lower(F.col("country_name")))
    )
    gdp_df_cleaned = (
        gdp_df
        .withColumnRenamed("Country Name", "country_name")
        .withColumnRenamed("Country Code", "country_code")
        .withColumnRenamed("Value", "gdp_value")
        # .filter(F.col("Year") == "2016")
        .withColumn("gdp_value", F.col("gdp_value").cast(FloatType()).cast(LongType()))
        .withColumn("country_name", F.initcap(F.col("country_name")))  # title case
        .withColumn("country_name_lower", F.lower(F.col("country_name")))
        .drop("Year")
    )

    return country_ids_df_cleaned, gdp_df_cleaned, ports_df


def clean_immigration_data(raw_data):

    logging.warning("Cleaning the immigration data")

    USEFUL_COLUMNS = [
        "admnum", "i94yr", "i94mon",
        "arrdate", "depdate", "dtaddto",
        "i94cit", "i94res", "i94port", "i94addr",
        "matflag", "i94mode",
        "i94bir", "gender", "visatype", "i94visa",
        "entdepa", "entdepu", "entdepd"
    ]

    clean_df = raw_data.select(USEFUL_COLUMNS)

    clean_df = (
        clean_df
        .withColumnRenamed('i94yr', 'year')
        .withColumnRenamed('i94mon', 'month')
        .withColumnRenamed('i94res', 'country_residence_id')
        .withColumnRenamed('i94cit', 'country_citizenship_id')
        .withColumnRenamed('i94port', 'entry_port')
        .withColumnRenamed('i94addr', 'destination_state')
        .withColumnRenamed('i94mode', 'travel_mode')
        .withColumnRenamed('i94bir', 'age')
        .withColumnRenamed('i94visa', 'visa_category')
        .withColumnRenamed('visatype', 'visa_type')
        .withColumnRenamed('matflag', 'dates_match_flag')
        .withColumnRenamed('arrdate', 'arrival_date')
        .withColumnRenamed('depdate', 'departure_date')
        .withColumnRenamed('dtaddto', 'permitted_date')
    )

    same_user_window = (
        Window
        .partitionBy("admnum")
        .orderBy("arrival_date")
        .rowsBetween(Window.unboundedPreceding, -1)
    )
    clean_df = (
        clean_df
        .withColumn('admnum', F.col("admnum").cast(IntegerType()))
        .withColumn('year', F.col("year").cast(IntegerType()))
        .withColumn('month', F.col("month").cast(IntegerType()))
        .withColumn('unrestricted_stay', F.when(F.col('permitted_date') == 'D/S', True).otherwise(False))
        .withColumn('arrival_date', get_date_from_int(F.col("arrival_date")))
        .withColumn('departure_date', get_date_from_int(F.col("departure_date")))
        .withColumn('permitted_date', F.to_date('permitted_date', 'MMddyyyy'))
        .withColumn('day', F.dayofmonth("arrival_date"))
        .withColumn('age', F.col("age").cast(IntegerType()))
        .withColumn('gender', normalize_gender(F.col("gender")))
        .withColumn('dates_match_flag', normalize_match_flag(F.col("dates_match_flag")))
        .withColumn('travel_mode', normalize_mode(F.col("travel_mode")))
        .withColumn('visa_category', normalize_visa_category(F.col("visa_category").cast(IntegerType())))
        .withColumn('country_citizenship_id', F.col("country_citizenship_id").cast(IntegerType()))
        .withColumn('country_residence_id', F.col("country_residence_id").cast(IntegerType()))
        .withColumn('num_previous_stays', F.count("*").over(same_user_window))
    )

    logging.info("Adding the ML Label")
    # add the label column for the ML problem, check the Data Exploration notebook for the explanation of why 
    # these conditions were chosen
    clean_df = clean_df.withColumn(
        "is_overstay",
        F.when(
            (
                (clean_df.dates_match_flag == False) &
                (clean_df.unrestricted_stay == False) &
                ((F.isnull(clean_df.departure_date)) & ((F.isnull(clean_df.entdepd)))) &
                (F.isnull(clean_df.entdepu))
            ),
            True
        ).otherwise(False)
    )
    return clean_df


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
RAW_BUCKET_URL = "s3://udacity-capstone-raw-data"
STAGING_BUCKET_URL = "s3://udacity-capstone-staging-data"
DATA_PATH = f"{RAW_BUCKET_URL}/i94-data"
OUTPUT_DATA_PATH = STAGING_BUCKET_URL

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
immigration_data_path = DATA_PATH
dimension_root_path = RAW_BUCKET_URL
output_path = OUTPUT_DATA_PATH

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
raw_data = read_sas_data(spark, immigration_data_path, VALID_COLUMNS)
clean_df = clean_immigration_data(raw_data)
country_df, gdp_df, port_df = read_dimension_data(spark, dimension_root_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Reading File: i94-data/i94_apr16_sub.sas7bdat
Reading File: i94-data/i94_aug16_sub.sas7bdat
Reading File: i94-data/i94_dec16_sub.sas7bdat
Reading File: i94-data/i94_feb16_sub.sas7bdat
Reading File: i94-data/i94_jan16_sub.sas7bdat
Reading File: i94-data/i94_jul16_sub.sas7bdat
Reading File: i94-data/i94_jun16_sub.sas7bdat
Reading File: i94-data/i94_mar16_sub.sas7bdat
Reading File: i94-data/i94_may16_sub.sas7bdat
Reading File: i94-data/i94_nov16_sub.sas7bdat
Reading File: i94-data/i94_oct16_sub.sas7bdat
Reading File: i94-data/i94_sep16_sub.sas7bdat

In [26]:
joined_df = (
    clean_df
    .alias("immigration")
    .join(country_df.alias("cit_df"), F.col("immigration.country_citizenship_id") == F.col("cit_df.country_id"), "leftouter")
    .join(country_df.alias("res_df"), F.col("immigration.country_residence_id") == F.col("res_df.country_id"), "leftouter")
    .join(port_df.alias("port_df"), F.col("immigration.entry_port") == F.col("port_df.port_id"), "leftouter")
    .selectExpr("immigration.*", "cit_df.country_name as country_citizenship", "res_df.country_name as country_residence", "port_df.port_name")
)

joined_df = joined_df.select(FINAL_COLUMNS)

joined_df = joined_df.repartition(8)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
(
    joined_df
    .write
    .mode("overwrite")
    .partitionBy("year", "month", "day")  # we partition by day so that if we want to run this every day, we don't overwrite the month partition
    .parquet(os.path.join(output_path, "immigration_data"))
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
(
    gdp_df
    .write
    .mode("overwrite")
    .parquet(os.path.join(output_path, "gdp_data"))
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…