In [1]:
# Do all imports and installs here
import pandas as pd
import os
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from datetime import datetime, timedelta
import json
from pyspark.sql.functions import desc, monotonically_increasing_id, udf, to_date, from_unixtime, trim, col
from custom_udf import *

In [2]:
spark = SparkSession.builder.config(
    "spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0"
).getOrCreate()


21/12/07 16:42:36 WARN Utils: Your hostname, Yugeshs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.13 instead (on interface en0)
21/12/07 16:42:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/yugesh/opt/anaconda3/envs/airflow/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/yugesh/.ivy2/cache
The jars for the packages stored in: /Users/yugesh/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0b204692-fda0-4408-9c86-4fa0bd42efcd;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;2.7.0 in central
	found org.apache.hadoop#hadoop-common;2.7.0 in central
	found org.apache.hadoop#hadoop-annotations;2.7.0 in central
	found com.google.guava#guava;11.0.2 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found commons-cli#commons-cli;1.2 in central
	found org.apache.commons#commons-math3;3.1.1 in central
	found xmlenc#xmlenc;0.52 in central
	found commons-httpclient#commons-httpclient;3.1 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.4 in central
	found commons-io#commons-io;2.4 in central
	found commons-net#commons-net;3.1 in central
	found commons-collections#commons-colle

## Immigration data etl

In [3]:
def process_immigration_data(spark, input_data, output_data):
    """[summary]

    Args:
        spark ([type]): [description]
        input_data ([type]): [description]
        output_data ([type]): [description]

    Returns:
        [type]: [description]
    """

    # Read data from the s3
    input_data = os.path.join(
        input_data,
        "sas_data/part-00000-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet",
    )
    immigration_df = spark.read.parquet(input_data)

    # Convert decimal columns to integer
    int_cols = [
        "cicid",
        "i94cit",
        "i94res",
        "arrdate",
        "i94mode",
        "depdate",
        "i94bir",
        "i94visa",
    ]
    for col_name in int_cols:
        immigration_df = immigration_df.withColumn(
            col_name, immigration_df[col_name].cast(IntegerType())
        )

    #  Drop duplicate by excluding cicid
    immigration_df = immigration_df.drop("cicid")
    immigration_df = immigration_df.dropDuplicates()
    immigration_df = immigration_df.withColumn("cicid", monotonically_increasing_id())

    # Assign 0 to null values for integer
    immigration_df = immigration_df.fillna(0, int_cols)

    # Assign real values
    # Retrieve transporation mode using i94mode
    immigration_df = immigration_df.withColumn(
        "transportation_mode", get_mode_udf(immigration_df.i94mode)
    )

    # Retrieve arrived city
    immigration_df = immigration_df.withColumn(
        "arrived_city", get_city_udf(immigration_df.i94port)
    )

    # Retrieve us_address
    immigration_df = immigration_df.withColumn(
        "us_address", get_state_udf(immigration_df.i94addr)
    )

    # Retrieve origin city and travelled from using i94CIT and i94res
    immigration_df = immigration_df.withColumn(
        "origin_city", get_origin_udf(immigration_df.i94cit)
    ).withColumn("traveled_from", get_origin_udf(immigration_df.i94res))

    # Retrive i94visa with value
    immigration_df = immigration_df.withColumn(
        "visa_status", get_visa_udf(immigration_df.i94visa)
    )

    # Exclude unused columns
    unused_cols = [
        "i94yr",
        "i94mon",
        "count",
        "fltno",
        "insnum",
        "entdepd",
        "biryear",
        "dtadfile",
        "biryear",
        "visapost",
        "entdepu",
        "admnum",
        "i94cit",
        "i94res",
        "i94port",
        "i94addr",
        "i94mode",
        "i94visa",
        "entdepa",
        "dtaddto",
    ]
    immigration_df = immigration_df.drop(*unused_cols)

    # Rename columns
    immigration_df = (
        immigration_df.withColumnRenamed("arrdate", "arrival_date")
        .withColumnRenamed("depdate", "departure_date")
        .withColumnRenamed("i94bir", "age")
        .withColumnRenamed("occup", "occupation")
        .withColumnRenamed("matflag", "matched_flag")
    )

    # Order the columns in proper sequence
    immigration_df = immigration_df.select(
        [
            "cicid",
            "origin_city",
            "traveled_from",
            "arrived_city",
            "us_address",
            "arrival_date",
            "departure_date",
            "transportation_mode",
            "age",
            "gender",
            "visa_status",
            "occupation",
            "airline",
        ]
    )

    # Convert arrival_date and departure_date in proper format
    immigration_df = immigration_df.withColumn(
        "arrival_date", udf_datetime_from_sas(immigration_df.arrival_date)
    ).withColumn("departure_date", udf_datetime_from_sas(immigration_df.departure_date))

    return immigration_df

In [4]:
input_data = "./data"
df = process_immigration_data(
    spark=spark, input_data=input_data, output_data="/data/processed_data/"
)



In [5]:
df.show(5)

21/12/07 16:42:58 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-----+-----------+-------------+-------------+-----------------+------------+--------------+-------------------+---+------+-----------+----------+-------+
|cicid|origin_city|traveled_from| arrived_city|       us_address|arrival_date|departure_date|transportation_mode|age|gender|visa_status|occupation|airline|
+-----+-----------+-------------+-------------+-----------------+------------+--------------+-------------------+---+------+-----------+----------+-------+
|    0|    AUSTRIA|      AUSTRIA|      CHICAGO|         MISSOURI|  2016-04-01|    2016-04-15|                Air| 59|     M|   Pleasure|      null|     AA|
|    1|    AUSTRIA|      AUSTRIA|      TORONTO|             null|  2016-04-01|    2016-04-03|                Air| 48|     M|   Business|      null|     AA|
|    2|    AUSTRIA|    AUSTRALIA|  LOS ANGELES|       CALIFORNIA|  2016-04-01|    2016-04-23|                Air| 39|     M|   Pleasure|      null|     AA|
|    3|    BELGIUM|      BELGIUM|WASHINGTON DC|DIST. OF COLUMBIA



In [7]:
df.write.mode("overwrite").parquet("./data/processed_data")

21/12/07 16:43:57 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
21/12/07 16:43:57 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
21/12/07 16:44:16 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
