In [1]:
import configparser
import os
import logging
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType, LongType, IntegerType
from pyspark.sql.functions import udf, col, lit, year, month, upper, to_date
from pyspark.sql import Window
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder.config("spark.jars.repositories", "https://repos.spark-packages.org/"
                                   ).config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11"
                                           ).enableHiveSupport().getOrCreate()

In [3]:
def rename_columns(table, new_columns):
    for original, new in zip(table.columns, new_columns):
        table = table.withColumnRenamed(original, new)
    return table

def SAS_to_date(date):
    if date is not None:
        return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')
    
sas_to_date_udf = udf(SAS_to_date, DateType())

In [None]:
# Write the resulting dim_demographics table to parquet files in the output data location
output_path = 'transformed_data'

In [4]:
logging.info("Start processing US Cities Demographies")
# Construct the full path to the demographic data file stored in the input data location
us_cities_demographics_data = 'us-cities-demographics.csv'

# Read the demographic data file using Spark's DataFrame API with CSV format
# The CSV file has a header row and uses a semicolon (';') as a delimiter
df = spark.read.format('csv').options(header=True, delimiter=';').load(us_cities_demographics_data)

# Group the data and sum the 'Count' for each combination of index columns and 'Race'
grouped_df = df.groupBy(
    'City', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population',
    'Number of Veterans', 'Foreign-born', 'Average Household Size', 'State Code', 'Race'
).agg(F.sum('Count').alias('Count'))

# Pivot the DataFrame on the 'Race' column, creating a column for each unique 'Race' value
pivoted_df = grouped_df.groupBy(
    'City', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population',
    'Number of Veterans', 'Foreign-born', 'Average Household Size', 'State Code'
).pivot('Race').agg(F.first('Count'))

# Replace any nulls with 0, since `fill_value=0` was used in Pandas
final_df = pivoted_df.fillna(0)

# Cast all relevant columns from float to int
final_df = final_df.withColumn("Median Age", F.col("Median Age").cast(IntegerType())) \
    .withColumn("Male Population", F.col("Male Population").cast(IntegerType())) \
    .withColumn("Female Population", F.col("Female Population").cast(IntegerType())) \
    .withColumn("Total Population", F.col("Total Population").cast(IntegerType())) \
    .withColumn("Number of Veterans", F.col("Number of Veterans").cast(IntegerType())) \
    .withColumn("Foreign-born", F.col("Foreign-born").cast(IntegerType())) \
    .withColumn("Average Household Size", F.col("Average Household Size").cast(IntegerType())) \
    .withColumn("American Indian and Alaska Native", F.col("American Indian and Alaska Native").cast(IntegerType())) \
    .withColumn("Asian", F.col("Asian").cast(IntegerType())) \
    .withColumn("Black or African-American", F.col("Black or African-American").cast(IntegerType())) \
    .withColumn("Hispanic or Latino", F.col("Hispanic or Latino").cast(IntegerType())) \
    .withColumn("White", F.col("White").cast(IntegerType()))

# Define a list of new column names for renaming
new_columns = ['city', 'state', 'median_age', 'male_population', 'female_population','total_population', 'num_vetarans', 'foreign_born',
               'avg_house_size', 'state_code', 'american_indian', 'asian', 'black_african_american', 'hispanic', 'white']

# Rename the columns of the DataFrame using a custom function 'rename_columns'
dim_demographics = rename_columns(final_df, new_columns)

# The mode 'overwrite' will replace any existing files with the same name
#dim_demographics.write.mode("overwrite").parquet(f'{output_path}/dim_demographics')

In [5]:
# Show the result (optional)
dim_demographics.show(5)

+-------------+--------------+----------+---------------+-----------------+----------------+------------+------------+--------------+----------+---------------+-----+----------------------+--------+------+
|         city|         state|median_age|male_population|female_population|total_population|num_vetarans|foreign_born|avg_house_size|state_code|american_indian|asian|black_african_american|hispanic| white|
+-------------+--------------+----------+---------------+-----------------+----------------+------------+------------+--------------+----------+---------------+-----+----------------------+--------+------+
|       Skokie|      Illinois|        43|          31382|            33437|           64819|        1066|       27424|             2|        IL|              0|20272|                  4937|    6590| 40642|
|    Charlotte|North Carolina|        34|         396646|           430475|          827121|       36046|      128897|             2|        NC|           8746|55399|          

In [6]:
logging.info("Start processing label descriptions")
label_file = 'I94_SAS_Labels_Descriptions.SAS'

with open(label_file) as f:
    contents = f.readlines()

dim_port_code = {}
for ports in contents[302:962]:
    pair = ports.split('=')
    port_code, port = pair[0].strip("\t").strip().strip("'"),pair[1].strip('\t').strip().strip("''")
    dim_port_code[port_code] = port

# Create a Spark DataFrame from the dictionary
df_port_code = spark.createDataFrame(dim_port_code.items(), ['port_code', 'port'])

# Save the DataFrame as Parquet (overwrite mode)
#df_port_code.write.mode("overwrite").parquet(f'{output_path}/dim_port_code')  # Save as Parquet file

In [7]:
df_port_code.show(5)  

+---------+--------------------+
|port_code|                port|
+---------+--------------------+
|      ALC|ALCAN, AK        ...|
|      ANC|ANCHORAGE, AK    ...|
|      BAR|BAKER AAF - BAKER...|
|      DAC|DALTONS CACHE, AK...|
|      PIZ|DEW STATION PT LA...|
+---------+--------------------+
only showing top 5 rows



In [8]:
dim_state_code = {}
for states in contents[982:1036]:
    pair = states.split('=')
    state_code, state = pair[0].strip('\t').strip("'"), pair[1].strip().strip("'")
    dim_state_code[state_code] = state

# Create a Spark DataFrame from the dictionary
df_state_code = spark.createDataFrame(dim_state_code.items(), ['state_code', 'state'])

# Save the DataFrame as Parquet (overwrite mode)
#df_state_code.write.mode("overwrite").parquet(f'{output_path}/dim_state_code')  # Save as Parquet file

In [9]:
df_state_code.show(5)

+----------+----------+
|state_code|     state|
+----------+----------+
|        AK|    ALASKA|
|        AZ|   ARIZONA|
|        AR|  ARKANSAS|
|        CA|CALIFORNIA|
|        CO|  COLORADO|
+----------+----------+
only showing top 5 rows



In [10]:
dim_visa_code = {}
for visas in contents[1046:1049]:
    pair = visas.split('=')
    visa_code, visa_category = pair[0].strip(), pair[1].strip().strip("'")
    dim_visa_code[visa_code] = visa_category

# Create a Spark DataFrame from the dictionary    
df_visa_code = spark.createDataFrame(dim_visa_code.items(), ['visa_code', 'visa_category'])

# Save the DataFrame as Parquet (overwrite mode)
#df_visa_code.write.mode("overwrite").parquet(f'{output_path}/dim_visa_code')

In [11]:
df_visa_code.show(5)

+---------+-------------+
|visa_code|visa_category|
+---------+-------------+
|        1|     Business|
|        2|     Pleasure|
|        3|      Student|
+---------+-------------+



In [12]:
dim_transport_mode_code = {}
for mode in contents[972:976]:
    pair = mode.split('=')
    transport_mode_code, transport_mode = pair[0].strip('\t').strip("'"), pair[1].strip().strip("'")
    dim_transport_mode_code[transport_mode_code] = transport_mode

df_transport_mode_code = spark.createDataFrame(dim_transport_mode_code.items(), ['transport_mode_code', 'transport_mode'])

#df_transport_mode_code.write.mode("overwrite").parquet(f'{output_path}/dim_transport_mode_code')

In [13]:
df_transport_mode_code.show(5)

+-------------------+---------------+
|transport_mode_code| transport_mode|
+-------------------+---------------+
|                 1 |            Air|
|                 2 |            Sea|
|                 3 |           Land|
|                 9 |Not reported' ;|
+-------------------+---------------+



In [14]:
dim_country_code = {}
for countries in contents[9:298]:
    pair = countries.split('=')
    country_code, country = pair[0].strip(), pair[1].strip().strip("'")
    dim_country_code[country_code] = country

df_country_code = spark.createDataFrame(dim_country_code.items(),['country_code', 'country'])

#df_country_code.write.mode("overwrite").parquet(f'{output_path}/dim_country_code')

In [15]:
df_country_code.show(5)

+------------+--------------------+
|country_code|             country|
+------------+--------------------+
|         582|MEXICO Air Sea, a...|
|         236|         AFGHANISTAN|
|         101|             ALBANIA|
|         316|             ALGERIA|
|         102|             ANDORRA|
+------------+--------------------+
only showing top 5 rows



In [16]:
# read immigration data file
immigration_data = ('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df = spark.read.format('com.github.saurfang.sas.spark').load(immigration_data)

In [17]:
logging.info("Start processing fact_immigration")
# extract columns to create fact_immigration table
fact_immigration = df.select('cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa').distinct()

# data wrangling to match data model
new_columns = ['cic_id', 'arrival_year', 'arrival_month', 'arrival_port_code', 'arrival_state_code', 'arrival_date','departure_date', 'transport_mode_code', 'visa_code']
fact_immigration = rename_columns(fact_immigration, new_columns)

fact_immigration = fact_immigration.withColumn('arrival_date', sas_to_date_udf(col('arrival_date')))
fact_immigration = fact_immigration.withColumn('departure_date', sas_to_date_udf(col('departure_date')))

# Convert float columns to integer columns
fact_immigration = fact_immigration \
    .withColumn('cic_id', col('cic_id').cast('int')) \
    .withColumn('arrival_year', col('arrival_year').cast('int')) \
    .withColumn('arrival_month', col('arrival_month').cast('int')) \
    .withColumn('transport_mode_code', col('transport_mode_code').cast('int')) \
    .withColumn('visa_code', col('visa_code').cast('int'))


In [18]:
# write fact_immigration table to parquet files partitioned by state and city
#fact_immigration.write.mode("overwrite").partitionBy('arrival_state_code').parquet(f'{output_path}/fact_immigration')

In [19]:
fact_immigration.show(5)

+------+------------+-------------+-----------------+------------------+------------+--------------+-------------------+---------+
|cic_id|arrival_year|arrival_month|arrival_port_code|arrival_state_code|arrival_date|departure_date|transport_mode_code|visa_code|
+------+------------+-------------+-----------------+------------------+------------+--------------+-------------------+---------+
|    27|        2016|            4|              BOS|                MA|  2016-04-01|    2016-04-05|                  1|        1|
|   233|        2016|            4|              NYC|                NY|  2016-04-01|    2016-04-07|                  1|        2|
|  1103|        2016|            4|              NEW|                NY|  2016-04-01|    2016-04-09|                  1|        2|
|  1123|        2016|            4|              NEW|                PA|  2016-04-01|    2016-04-08|                  1|        1|
|  1446|        2016|            4|              NYC|                NY|  2016-04-0

In [20]:
logging.info("Start processing dim_immigrants")

# Extract columns to create dim_immigrants table
dim_immigrants = df.select('cicid', 'i94cit', 'i94res', 'biryear', 'gender').distinct()

# Data wrangling to match data model
new_columns = ['cic_id', 'citizen_country', 'residence_country', 'birth_year', 'gender']
dim_immigrants = rename_columns(dim_immigrants, new_columns)

# Convert float columns to int
dim_immigrants = dim_immigrants.withColumn('cic_id', col('cic_id').cast('int')) \
                               .withColumn('citizen_country', col('citizen_country').cast('int')) \
                               .withColumn('residence_country', col('residence_country').cast('int')) \
                               .withColumn('birth_year', col('birth_year').cast('int'))

# Write dim_immigrants table to parquet files
#dim_immigrants.write.mode("overwrite").parquet(f'{output_path}/dim_immigrants')

In [21]:
dim_immigrants.show(5)

+------+---------------+-----------------+----------+------+
|cic_id|citizen_country|residence_country|birth_year|gender|
+------+---------------+-----------------+----------+------+
|    16|            101|              101|      1988|  null|
|    84|            103|              103|      1994|     M|
|   536|            103|              103|      1956|     M|
|   670|            103|              124|      1979|     M|
|   681|            103|              112|      1955|     F|
+------+---------------+-----------------+----------+------+
only showing top 5 rows



In [22]:
logging.info("Start processing dim_immi_airline")

# Extract columns to create dim_immi_airline table
dim_immi_airline = df.select('cicid', 'airline', 'admnum', 'fltno', 'visatype').distinct()

# Data wrangling to match data model
new_columns = ['cic_id', 'airline', 'admin_num', 'flight_number', 'visa_type']
dim_immi_airline = rename_columns(dim_immi_airline, new_columns)

# Convert float columns to int or long
dim_immi_airline = dim_immi_airline.withColumn('cic_id', col('cic_id').cast('int')) \
                                   .withColumn('admin_num', col('admin_num').cast(LongType()))

# Write dim_immi_airline table to parquet files
#dim_immi_airline.write.mode("overwrite").parquet(f'{output_path}/dim_immi_airline')

In [23]:
dim_immi_airline.show(5)

+------+-------+-----------+-------------+---------+
|cic_id|airline|  admin_num|flight_number|visa_type|
+------+-------+-----------+-------------+---------+
|   372|     OS|55428239233|        00097|       WT|
|   498|     OS|55428626033|        00065|       WT|
|   715|     AA|55453491633|        00237|       WT|
|   719|     AA|55439268633|        00717|       WT|
|   993|     UA|55413424033|        00056|       WT|
+------+-------+-----------+-------------+---------+
only showing top 5 rows



In [24]:
logging.info("Start processing dim_temperature")
# read temperature data file
temperature_data = '../../data2/GlobalLandTemperaturesByCity.csv'
df = spark.read.csv(temperature_data, header=True)

df = df.where(df['Country'] == 'United States')
df = df.withColumn('dt', to_date(col('dt')))
df = df.withColumn('year', year(df['dt']))
df = df.withColumn('month', month(df['dt']))

dim_temperature = df.select(['AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country', 'year', 'month']).distinct()

new_columns = ['avg_temp', 'avg_temp_uncertnty', 'city', 'country', 'year', 'month']
dim_temperature = rename_columns(dim_temperature, new_columns)

# write dim_temperature table to parquet files
#dim_temperature.write.mode("overwrite").parquet(f'{output_path}/dim_temperature')

In [25]:
dim_temperature.show(5)

+------------------+-------------------+-------+-------------+----+-----+
|          avg_temp| avg_temp_uncertnty|   city|      country|year|month|
+------------------+-------------------+-------+-------------+----+-----+
|            27.517|              1.101|Abilene|United States|1872|    7|
|            27.275|              1.693|Abilene|United States|1883|    7|
|21.311999999999998|              0.226|Abilene|United States|1925|    5|
|            16.561|0.35700000000000004|Abilene|United States|1930|   10|
|             8.354|              0.239|Abilene|United States|1977|   12|
+------------------+-------------------+-------+-------------+----+-----+
only showing top 5 rows

