In [2]:
import pandas as pd

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

import datetime
import os, sys

In [3]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

## US CITY DEMOGRAPHICS DIMENSION

In [13]:
city_demographics = spark.read.csv("cleaned_data/city_demographics_cleaned.csv", header=True)
city_demographics.show(3)

+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|         City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|
+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|Silver Spring|     Maryland|      33.8|        40601.0|          41862.0|           82463|            1562.0|     30908.0|                   2.6|        MD|
|       Quincy|Massachusetts|      41.0|        44129.0|          49500.0|           93629|            4147.0|     32935.0|                  2.39|        MA|
|       Hoover|      Alabama|      38.5|        38040.0|          46799.0|           84839|            4819.0|      8229.0|                  2.58|        AL|
+-------------+-------------+----------+------------

In [14]:
city_demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)



In [17]:
# We notice that most columns are numerical fields in string format.
# We cast them to appropriate formats


city_demographics = city_demographics.withColumn("Median Age", F.col("Median Age").cast(T.DoubleType()))\
                                    .withColumn("Male Population", F.col("Male Population").cast(T.IntegerType()))\
                                    .withColumn("Female Population", F.col("Female Population").cast(T.IntegerType()))\
                                    .withColumn("Total Population", F.col("Total Population").cast(T.IntegerType()))\
                                    .withColumn("Number of Veterans", F.col("Number of Veterans").cast(T.IntegerType()))\
                                    .withColumn("Foreign-born", F.col("Foreign-born").cast(T.IntegerType()))\
                                    .withColumn("Average Household Size", F.col("Average Household Size")\
                                                 .cast(T.DoubleType()))

In [18]:
city_demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)



In [20]:
# Clean spaces from the names of the columns for easier handling later

city_demographics = city_demographics.withColumnRenamed("Median Age", "Median_Age")\
                 .withColumnRenamed("Male Population", "Male_Population")\
                 .withColumnRenamed("Female Population", "Female_Population")\
                 .withColumnRenamed("Total Population", "Total_Population")\
                 .withColumnRenamed("Number of Veterans", "Number_of_Veterans")\
                 .withColumnRenamed("Average Household Size", "Average_Household_Size")\
                 .withColumnRenamed("State Code", "State_Code")

In [22]:
# From prior exploration, the following columns had null values in them 
# [Male pop, Female pop, Total pop, Num of veterans, Foreign born, Average Hosehold size]
# While values of 0 are plausible, they are highly unlikely
# Nans might also make computations difficult.
# So we fill up the Nans with 0s


city_demographics = city_demographics.na.fill(0, ["Male_Population", "Female_Population", "Total_Population", 
                                                  "Number_of_Veterans", "Average_Household_Size"])

In [23]:
city_demographics.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|            City|        State|Median_Age|Male_Population|Female_Population|Total_Population|Number_of_Veterans|Foreign-born|Average_Household_Size|State_Code|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|
|          Hoover|      Alabama|      38.5|          38040|            46799|           84839|              4819|        8229|                  2.58|        AL|
|Rancho Cucamonga|   California|  

In [24]:
# Save this final dimension table as parquet (for s3 bucket)

# city_demographics.write.mode("overwrite").parquet("fact_and_dim_tables/city_demographics_dim.parquet")

## RACE COUNTS DIMENSION

In [25]:
race_counts = spark.read.csv("cleaned_data/race_counts.csv", header=True)
race_counts.show(3)

+-------------+-------------+------------------+-----+
|         City|        State|              Race|Count|
+-------------+-------------+------------------+-----+
|Silver Spring|     Maryland|Hispanic or Latino|25924|
|       Quincy|Massachusetts|             White|58723|
|       Hoover|      Alabama|             Asian| 4759|
+-------------+-------------+------------------+-----+
only showing top 3 rows



In [26]:
race_counts.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [27]:
race_counts = race_counts.withColumn("Count", F.col("Count").cast(T.IntegerType()))

In [28]:
# Rename to avoid clash with any built in count functions

race_counts = race_counts.withColumnRenamed("Count", "Race_Population")

In [30]:
race_counts.show(3)

+-------------+-------------+------------------+---------------+
|         City|        State|              Race|Race_Population|
+-------------+-------------+------------------+---------------+
|Silver Spring|     Maryland|Hispanic or Latino|          25924|
|       Quincy|Massachusetts|             White|          58723|
|       Hoover|      Alabama|             Asian|           4759|
+-------------+-------------+------------------+---------------+
only showing top 3 rows



In [31]:
# Save dim table (for city demographics. Snowflake schema)

# race_counts.write.mode("overwrite").parquet("fact_and_dim_tables/race_counts_dim.parquet")

## TEMPERATURE DATA DIMENSION

In [59]:
temp_data = spark.read.csv("cleaned_data/GlobalTemperature_clean.csv", header=True)
temp_data.show(3)

+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|1900-01-01|-0.9890000000000001|                        0.588|Århus|denmark|  57.05N|   10.33E|
|1900-02-01|             -2.799|                        0.882|Århus|denmark|  57.05N|   10.33E|
|1900-03-01| 0.5919999999999999|                        0.429|Århus|denmark|  57.05N|   10.33E|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+
only showing top 3 rows



In [60]:
temp_data.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [61]:
# Align data types

temp_data = temp_data.withColumn("dt", F.col("dt").cast(T.DateType()))\
                  .withColumn("AverageTemperature", F.col("AverageTemperature").cast(T.DoubleType()))\
                  .withColumn("AverageTemperatureUncertainty", F.col("AverageTemperatureUncertainty")\
                              .cast(T.DoubleType()))

In [62]:
"Getting month and year columns for easier aggregations later"

temp_data = temp_data.withColumn("temp_year", F.year(F.col('dt')))\
            .withColumn("temp_month", F.month(F.col('dt')))

temp_data.show(3)

+----------+-------------------+-----------------------------+-----+-------+--------+---------+---------+----------+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|temp_year|temp_month|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+---------+----------+
|1900-01-01|-0.9890000000000001|                        0.588|Århus|denmark|  57.05N|   10.33E|     1900|         1|
|1900-02-01|             -2.799|                        0.882|Århus|denmark|  57.05N|   10.33E|     1900|         2|
|1900-03-01| 0.5919999999999999|                        0.429|Århus|denmark|  57.05N|   10.33E|     1900|         3|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+---------+----------+
only showing top 3 rows



In [42]:
# Read in the I94 COUNTRY CODES to join to this table.
# This allows for link between this Temperature dimension and the fact table

i94_cc = spark.read.csv("I94COUNTRY_output.csv", header=True, inferSchema=True)
i94_cc.show(3)

+----+--------------------+
|Code|             Country|
+----+--------------------+
| 582| MEXICO Air Sea, ...|
| 236|         AFGHANISTAN|
| 101|             ALBANIA|
+----+--------------------+
only showing top 3 rows



In [43]:
i94_cc.printSchema()

root
 |-- Code: integer (nullable = true)
 |-- Country: string (nullable = true)



In [45]:
i94_cc = i94_cc.withColumn("Country_for_join", F.lower(F.col('Country')))

In [63]:
i94_cc = i94_cc.withColumn("Country_for_join", F.trim(F.col('Country_for_join')))

In [48]:
# These countries that did not match were gotten from the initial data exploration

country_renaming = {'china, prc':'china',
              'ivory coast': "côte d'ivoire",
              'mexico air sea, and not reported (i-94, no land arrivals)':'mexico',
              'bosnia-herzegovina': 'bosnia and herzegovina',
              'guinea-bissau': 'guinea bissau',
               'congo': 'congo (democratic republic of the)'}

In [65]:
for old_country, new_country in country_renaming.items():
    i94_cc = i94_cc.withColumn("Country_for_join", F.when(F.col('Country_for_join')== old_country, new_country)\
                  .otherwise(F.col('Country_for_join')))

In [66]:
i94_cc_for_join = i94_cc.select("Code", "Country_for_join")

In [67]:
i94_cc_for_join.show(5)

+----+----------------+
|Code|Country_for_join|
+----+----------------+
| 582|          mexico|
| 236|     afghanistan|
| 101|         albania|
| 316|         algeria|
| 102|         andorra|
+----+----------------+
only showing top 5 rows



In [68]:
# Now join to the temp data 

temp_code_data = temp_data.join(i94_cc_for_join, temp_data.Country==i94_cc_for_join.Country_for_join, "left")\
                     .drop("Country_for_join")

In [69]:
temp_code_data.show(5)

+----------+-------------------+-----------------------------+-----+-------+--------+---------+---------+----------+----+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|temp_year|temp_month|Code|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+---------+----------+----+
|1900-01-01|-0.9890000000000001|                        0.588|Århus|denmark|  57.05N|   10.33E|     1900|         1| 108|
|1900-02-01|             -2.799|                        0.882|Århus|denmark|  57.05N|   10.33E|     1900|         2| 108|
|1900-03-01| 0.5919999999999999|                        0.429|Århus|denmark|  57.05N|   10.33E|     1900|         3| 108|
|1900-04-01|               4.63|                        0.417|Århus|denmark|  57.05N|   10.33E|     1900|         4| 108|
|1900-05-01|              9.576|                        0.521|Århus|denmark|  57.05N|   10.33E|     1900|         5| 108|
+----------+------------

In [72]:
# Some countries do not have codes.

temp_code_data.filter(F.isnull(F.col('Code'))).select('Code').count()

391752

In [73]:
temp_code_data = temp_code_data.withColumn("Country", F.initcap(F.col('Country')))
temp_code_data.show(3)

+----------+-------------------+-----------------------------+-----+-------+--------+---------+---------+----------+----+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|temp_year|temp_month|Code|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+---------+----------+----+
|1900-01-01|-0.9890000000000001|                        0.588|Århus|Denmark|  57.05N|   10.33E|     1900|         1| 108|
|1900-02-01|             -2.799|                        0.882|Århus|Denmark|  57.05N|   10.33E|     1900|         2| 108|
|1900-03-01| 0.5919999999999999|                        0.429|Århus|Denmark|  57.05N|   10.33E|     1900|         3| 108|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+---------+----------+----+
only showing top 3 rows



In [74]:
print(temp_code_data.count())

temp_code_data = temp_code_data.dropDuplicates()

print(temp_code_data.count())

4788080
4788080


In [75]:
# Save this temperature dimension

temp_code_data.write.mode("overwrite").parquet("fact_and_dim_tables/temp_dim.parquet")

## VISA AND MODE (JUNK) DIMENSION

In [85]:
visa_data = spark.read.csv("I94VISA.csv", inferSchema=True, header=True)
visa_data.show()

+----+---------+
|Code|Visa_type|
+----+---------+
|   1| Business|
|   2| Pleasure|
|   3|  Student|
+----+---------+



In [86]:
mode_data = spark.read.csv("I94MODE_output.csv", inferSchema=True, header=True)
mode_data.show()

+----+-----------+
|Code|       Mode|
+----+-----------+
|   1|        Air|
|   2|        Sea|
|   3|       Land|
|   9|Notreported|
+----+-----------+



In [87]:
visa_data = visa_data.withColumnRenamed("Code", "Visa_Code")
mode_data = mode_data.withColumnRenamed("Code", "Mode_Code")

In [90]:
visa_and_mode_dim = visa_data.crossJoin(mode_data)
visa_and_mode_dim.show()

+---------+---------+---------+-----------+
|Visa_Code|Visa_type|Mode_Code|       Mode|
+---------+---------+---------+-----------+
|        1| Business|        1|        Air|
|        1| Business|        2|        Sea|
|        1| Business|        3|       Land|
|        1| Business|        9|Notreported|
|        2| Pleasure|        1|        Air|
|        2| Pleasure|        2|        Sea|
|        2| Pleasure|        3|       Land|
|        2| Pleasure|        9|Notreported|
|        3|  Student|        1|        Air|
|        3|  Student|        2|        Sea|
|        3|  Student|        3|       Land|
|        3|  Student|        9|Notreported|
+---------+---------+---------+-----------+



In [91]:
visa_and_mode_dim.write.mode("overwrite").parquet("fact_and_dim_tables/visa_and_mode_dim.parquet")

## IMMIGRATION TABLE

This table would provide the time dimension and the fact table

In [4]:
immig_data = spark.read.parquet("sas_data")
immig_data.show(3)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [5]:
immig_data.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [26]:
# By inspection, we can see that most of the columns with double type should be integers
# We convert them

for col, coltype in immig_data.dtypes:
    if coltype == 'double':
        immig_data = immig_data.withColumn(col, F.col(col).cast(T.IntegerType()))

In [28]:
immig_data.show(3)

+-------+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+----------+-----+--------+
|  cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|    admnum|fltno|visatype|
+-------+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+----------+-----+--------+
|5748517| 2016|     4|   245|   438|    LOS|  20574|      1|     CA|  20582|    40|      1|    1|20160430|     SYD| null|      G|      O|   null|      M|   1976|10292016|     F|  null|     QF|2147483647|00011|      B1|
|5748518| 2016|     4|   245|   438|    LOS|  20574|      1|     NV|  20591|    32|      1|    1|20160430|     SYD| null|   

In [47]:
# Convert depdate and arrdate to normal date formats
# SAS formats begin 1960-01-01 unlike the normal unix format of 1970

immig_data = immig_data.withColumn("arrival_date", F.expr("date_add('1960-01-01', arrdate)"))\
          .withColumn("departure_date", F.expr("date_add('1960-01-01', depdate)"))
          
immig_data.select("arrdate", "arrival_date", "depdate", "departure_date").show(5)

+-------+------------+-------+--------------+
|arrdate|arrival_date|depdate|departure_date|
+-------+------------+-------+--------------+
|  20574|  2016-04-30|  20582|    2016-05-08|
|  20574|  2016-04-30|  20591|    2016-05-17|
|  20574|  2016-04-30|  20582|    2016-05-08|
|  20574|  2016-04-30|  20588|    2016-05-14|
|  20574|  2016-04-30|  20588|    2016-05-14|
+-------+------------+-------+--------------+
only showing top 5 rows



In [73]:
# Get the time columns

df_arr_dates = immig_data.select("arrival_date")
df_dep_dates = immig_data.select("departure_date")

df_dates = df_arr_dates.union(df_dep_dates)

df_dates = df_dates.dropDuplicates()
df_dates.count()

236

In [75]:
df_dates = df_dates.withColumnRenamed("arrival_date", "migration_date")

In [76]:
# Now we get other useful date columns for our Date dimension table

df_dates = df_dates.withColumn("Year", F.year("migration_date"))\
                   .withColumn("Month", F.month("migration_date"))\
                   .withColumn("DayOfMonth", F.dayofmonth("migration_date"))\
                   .withColumn("DayOfWeek", F.dayofweek("migration_date"))\
                   .withColumn("DayOfYear", F.dayofyear("migration_date"))\
                   .withColumn("WeekofYear", F.weekofyear("migration_date"))\
                   .withColumn("Quarter", F.quarter("migration_date"))\

df_dates.show(5)

+--------------+----+-----+----------+---------+---------+----------+-------+
|migration_date|Year|Month|DayOfMonth|DayOfWeek|DayOfYear|WeekofYear|Quarter|
+--------------+----+-----+----------+---------+---------+----------+-------+
|    2016-03-01|2016|    3|         1|        3|       61|         9|      1|
|    2016-04-25|2016|    4|        25|        2|      116|        17|      2|
|    2016-05-03|2016|    5|         3|        3|      124|        18|      2|
|    2016-08-15|2016|    8|        15|        2|      228|        33|      3|
|    2016-08-31|2016|    8|        31|        4|      244|        35|      3|
+--------------+----+-----+----------+---------+---------+----------+-------+
only showing top 5 rows



In [78]:
# Save the Date dimension

# df_dates.write.mode("overwrite").parquet("fact_and_dim_tables/Date_dim.parquet")

In [82]:
# Get the useful columns from th eimmigration data to form the fact table

print(immig_data.count())
immig_data.dropDuplicates().count()

3096313


3096313

In [83]:
# immig_data.write.mode('overwrite').parquet('fact_and_dim_tables/migration_fact.parquet')