In [1]:
# import and installs
import pandas as pd

from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
.getOrCreate()

from pyspark.sql.functions import first
from pyspark.sql.functions import upper, col
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType
from pyspark.sql.functions import udf, date_format
import datetime as dt

In [2]:
# Explore and assess the data

## Explore data
# - Identify data quality issues (missing values, duplicate data etc.)


In [3]:
# Read us-cities-demographics.csv
us_spark = spark.read.csv("./datasets/us-cities-demographics.csv", sep=";", header=True)

In [4]:
# Check columns of the dataset
us_spark.columns

['City',
 'State',
 'Median Age',
 'Male Population',
 'Female Population',
 'Total Population',
 'Number of Veterans',
 'Foreign-born',
 'Average Household Size',
 'State Code',
 'Race',
 'Count']

In [5]:
# Check us_spark dataset for repeated rows, and which columns cause the duplicates
us_spark.select("City", "State", "Median Age", "Male Population", "Female Population", "Total Population", \
                "Foreign-born", "Average Household Size").orderBy("City").show()

+-------+----------+----------+---------------+-----------------+----------------+------------+----------------------+
|   City|     State|Median Age|Male Population|Female Population|Total Population|Foreign-born|Average Household Size|
+-------+----------+----------+---------------+-----------------+----------------+------------+----------------------+
|Abilene|     Texas|      31.3|          65212|            60664|          125876|        8129|                  2.64|
|Abilene|     Texas|      31.3|          65212|            60664|          125876|        8129|                  2.64|
|Abilene|     Texas|      31.3|          65212|            60664|          125876|        8129|                  2.64|
|Abilene|     Texas|      31.3|          65212|            60664|          125876|        8129|                  2.64|
|Abilene|     Texas|      31.3|          65212|            60664|          125876|        8129|                  2.64|
|  Akron|      Ohio|      38.1|          96886| 

In [6]:
# Check subset of `US` dataset that maybe causing dupliate rows
us_spark.select("City","State Code","Race","Count").orderBy("City").show()

+-------+----------+--------------------+------+
|   City|State Code|                Race| Count|
+-------+----------+--------------------+------+
|Abilene|        TX|American Indian a...|  1813|
|Abilene|        TX|  Hispanic or Latino| 33222|
|Abilene|        TX|               White| 95487|
|Abilene|        TX|               Asian|  2929|
|Abilene|        TX|Black or African-...| 14449|
|  Akron|        OH|               White|129192|
|  Akron|        OH|  Hispanic or Latino|  3684|
|  Akron|        OH|Black or African-...| 66551|
|  Akron|        OH|               Asian|  9033|
|  Akron|        OH|American Indian a...|  1845|
|Alafaya|        FL|  Hispanic or Latino| 34897|
|Alafaya|        FL|               Asian| 10336|
|Alafaya|        FL|               White| 63666|
|Alafaya|        FL|Black or African-...|  6577|
|Alameda|        CA|               White| 44232|
|Alameda|        CA|American Indian a...|  1329|
|Alameda|        CA|Black or African-...|  7364|
|Alameda|        CA|

In [7]:
## Printing all columns to check again
us_spark.select("City", "State", "Median Age", "Male Population", "Female Population", "Total Population", \
                "Number of Veterans", "Foreign-born", "Average Household Size", "State Code", \
                "Race", "Count").orderBy("City").show()

#  ['City',
#  'State',
#  'Median Age',
#  'Male Population',
#  'Female Population',
#  'Total Population',
#  'Number of Veterans',
#  'Foreign-born',
#  'Average Household Size',
#  'State Code',
#  'Race',
#  'Count']

+-------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|   City|     State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+-------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|Abilene|     Texas|      31.3|          65212|            60664|          125876|              9367|        8129|                  2.64|        TX|American Indian a...|  1813|
|Abilene|     Texas|      31.3|          65212|            60664|          125876|              9367|        8129|                  2.64|        TX|  Hispanic or Latino| 33222|
|Abilene|     Texas|      31.3|          65212|            60664|          125876|              9367|        8129| 

In [8]:
# Cleaning the dataset --- Approach:
## - 'Race' and 'Count' columns are the cause of duplicate rows. Hence, the approach is to separate them into their own dataset, and include "City" and "State Code" columns for reference
## - Cleaned-up US dataset can be joined back into us_race_cnt dataset to, eventually, contain unique rows
us_race_cnt = (us_spark.select("City", "State Code", "Race", "Count")
            .groupBy(us_spark.City, "State Code")
            .pivot("Race")
            .agg(first("Count")))

In [9]:
## Check us_race_cnt dataset
us_race_cnt.orderBy("City").show()

+------------+----------+---------------------------------+-----+-------------------------+------------------+------+
|        City|State Code|American Indian and Alaska Native|Asian|Black or African-American|Hispanic or Latino| White|
+------------+----------+---------------------------------+-----+-------------------------+------------------+------+
|     Abilene|        TX|                             1813| 2929|                    14449|             33222| 95487|
|       Akron|        OH|                             1845| 9033|                    66551|              3684|129192|
|     Alafaya|        FL|                             null|10336|                     6577|             34897| 63666|
|     Alameda|        CA|                             1329|27984|                     7364|              8265| 44232|
|      Albany|        GA|                              445|  650|                    53440|              1783| 17160|
|      Albany|        NY|                             16

In [10]:
# Compare datasets - before and after dropping duplicate rows
(us_race_cnt.count(), us_race_cnt.dropDuplicates().count())

(596, 596)

In [11]:
uscols=["Number of Veterans", "Race", "Count"]

In [12]:
us=us_spark.drop(*uscols).dropDuplicates()

In [13]:
# Compare row count between original and new dataset with dropped duplicate rows
# the cleaned 'us' dataset matches number of rows with 'us_race_cnt' dataset, which will be combined into one 'us' dataset
(us_spark.count(), us.count())

(2891, 596)

In [14]:
# Check number of records after combining both data sets
# Total rows should match both datasets
us.join(us_race_cnt, ["City", "State Code"]).count()

596

In [15]:
# check the data sample
us.join(us_race_cnt, ["City", "State Code"]).orderBy("city").show()

+------------+----------+------------+----------+---------------+-----------------+----------------+------------+----------------------+---------------------------------+-----+-------------------------+------------------+------+
|        City|State Code|       State|Median Age|Male Population|Female Population|Total Population|Foreign-born|Average Household Size|American Indian and Alaska Native|Asian|Black or African-American|Hispanic or Latino| White|
+------------+----------+------------+----------+---------------+-----------------+----------------+------------+----------------------+---------------------------------+-----+-------------------------+------------------+------+
|     Abilene|        TX|       Texas|      31.3|          65212|            60664|          125876|        8129|                  2.64|                             1813| 2929|                    14449|             33222| 95487|
|       Akron|        OH|        Ohio|      38.1|          96886|           100667| 

In [16]:
# After running the above check, we commit to the combination of the 'us' dataset
us=us.join(us_race_cnt, ["City", "State Code"])

In [17]:
#Secondary check
(us.count(), us.show(5))

+---------------+----------+-----------+----------+---------------+-----------------+----------------+------------+----------------------+---------------------------------+-----+-------------------------+------------------+------+
|           City|State Code|      State|Median Age|Male Population|Female Population|Total Population|Foreign-born|Average Household Size|American Indian and Alaska Native|Asian|Black or African-American|Hispanic or Latino| White|
+---------------+----------+-----------+----------+---------------+-----------------+----------------+------------+----------------------+---------------------------------+-----+-------------------------+------------------+------+
|Highlands Ranch|        CO|   Colorado|      39.6|          49186|            53281|          102467|        8827|                  2.72|                             1480| 5650|                     1779|              8393| 94499|
|           Kent|        WA| Washington|      33.4|          61825|         

(596, None)

In [18]:
# Change `State Code` column name to `state_code` and other similar problems to avoid parquet complications (remove whitespaces)
us=us.select('City', col('State Code').alias('State_Code'), 'State', col('Median Age').alias('Median_age'),
     col('Male Population').alias('Male_Pop'), col('Female Population').alias('Fem_Pop'), 
        col('Total Population').alias('Ttl_Pop'), 'Foreign-born', 
          col('Average Household Size').alias('Avg_Household_Size'),
             col('American Indian and Alaska Native').alias('Native_Pop'), 
                 col('Asian').alias('Asian_Pop'), 
                    col('Black or African-American').alias('Black_Pop'), 
                      col('Hispanic or Latino').alias('Latino_Pop'), 
                        col('White').alias('White_Pop'))

In [19]:
# Check the new column names
us.show(2)

+---------------+----------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
|           City|State_Code|     State|Median_age|Male_Pop|Fem_Pop|Ttl_Pop|Foreign-born|Avg_Household_Size|Native_Pop|Asian_Pop|Black_Pop|Latino_Pop|White_Pop|
+---------------+----------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
|Highlands Ranch|        CO|  Colorado|      39.6|   49186|  53281| 102467|        8827|              2.72|      1480|     5650|     1779|      8393|    94499|
|           Kent|        WA|Washington|      33.4|   61825|  65137| 126962|       38175|              3.06|      3651|    26168|    20450|     21928|    67918|
+---------------+----------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
only showing top 2 rows



In [20]:
# Drop 'state' column
us=us.drop("State")
us.show(2)

+---------------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
|           City|State_Code|Median_age|Male_Pop|Fem_Pop|Ttl_Pop|Foreign-born|Avg_Household_Size|Native_Pop|Asian_Pop|Black_Pop|Latino_Pop|White_Pop|
+---------------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
|Highlands Ranch|        CO|      39.6|   49186|  53281| 102467|        8827|              2.72|      1480|     5650|     1779|      8393|    94499|
|           Kent|        WA|      33.4|   61825|  65137| 126962|       38175|              3.06|      3651|    26168|    20450|     21928|    67918|
+---------------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
only showing top 2 rows



In [21]:
# write transformed 'us' dataset onto parquet file
us.write.mode('overwrite').parquet("./datasets/us_cities_demographics.parquet")

In [22]:
# ----------------------------------------------------------
## I94 Non-Immigration Dataset (Cleaning and Transformation)

In [23]:
# Read i94 non-immigration dataset
i94_spark = spark.read.parquet("./datasets/sas_data")

In [24]:
i94_spark.columns

['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'dtadfile',
 'visapost',
 'occup',
 'entdepa',
 'entdepd',
 'entdepu',
 'matflag',
 'biryear',
 'dtaddto',
 'gender',
 'insnum',
 'airline',
 'admnum',
 'fltno',
 'visatype']

In [25]:
# show a sample of `sas_data`
i94_spark.select("i94res","i94port","arrdate","i94mode","depdate","i94bir","i94visa","count" \
                  ,"gender",col("admnum").cast(LongType())).show(3)

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
| 438.0|    LOS|20574.0|    1.0|20582.0|  40.0|    1.0|  1.0|     F|94953870030|
| 438.0|    LOS|20574.0|    1.0|20591.0|  32.0|    1.0|  1.0|     F|94955622830|
| 438.0|    LOS|20574.0|    1.0|20582.0|  29.0|    1.0|  1.0|     M|94956406530|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
only showing top 3 rows



In [26]:
# Type Conversion
i94_spark = i94_spark.select(col("i94res").cast(IntegerType()), 
                             col("i94port"),
                             col("arrdate").cast(IntegerType()),
                             col("i94mode").cast(IntegerType()),
                             col("depdate").cast(IntegerType()),
                             col("i94bir").cast(IntegerType()),
                             col("i94visa").cast(IntegerType()), 
                             col("count").cast(IntegerType()), \
                             "gender",
                             col("admnum").cast(LongType()))

# Show top 3 rows
i94_spark.show(3)


+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|   438|    LOS|  20574|      1|  20582|    40|      1|    1|     F|94953870030|
|   438|    LOS|  20574|      1|  20591|    32|      1|    1|     F|94955622830|
|   438|    LOS|  20574|      1|  20582|    29|      1|    1|     M|94956406530|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
only showing top 3 rows



In [27]:
## Check for duplicate rows on each dataset by comparing original total rows
i94_spark.count(), i94_spark.dropDuplicates().count()

(3096313, 3096302)

In [28]:
i94_spark.dropDuplicates(['admnum']).count()

3075579

In [29]:
## From the above 2 commands, 'admnum' may not be unique as assumed. 
## We should drop duplicate rows instead of dropping duplicates only by admnum so we will have unique rows
i94_spark=i94_spark.dropDuplicates()

In [30]:
i94_spark.show(3)

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|   209|    AGA|  20552|      1|   null|  null|      2|    1|     M|47842155333|
|   209|    ATL|  20571|      1|   null|  null|      2|    1|     M|44537883633|
|   696|    FTL|  20574|      1|  20597|     0|      2|    1|     M|95022828130|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
only showing top 3 rows



In [31]:
# We'll save i94_spark dataframe to parquet file after joining i94port and us dataframes later.

In [32]:
# Cleaning of i94 code lists and creating dimension tables
## I94_SAS_Labels_Descriptions.SAS file was used to create master i94 code lists by creating text files and then cleaning whitespaces and quotations; converting them into python lists, then saving them as parquet files. These files are used as master record attribute files as they do not change often.

In [33]:
# Begin processing I94_SAS_Labels_Descriptions.SAS to create master i94 code dimensions
'''
/* I94MODE - missing values as well as not reported*/
	1 = 'Air'
	2 = 'Sea'
	3 = 'Land'
	9 = 'Not reported' ;
'''
i94mode_data = [[1,'Air'],[2,'Sea'],[3,'Land'],[9,'Not reported']]

# Convert to spark dataframe
i94mode = spark.createDataFrame(i94mode_data)

# Create i94mode parquest file
i94mode.write.mode("overwrite").parquet('./data/i94mode.parquet')

In [34]:
# Create i94port.txt file by copying from I94_SAS_Labels_Descriptions.SAS
# Read i94port text file
i94port_df = pd.read_csv('./data/i94port.txt',sep='=',names=['id','port'])

# Remove whitespaces and single quotes
i94port_df['id']=i94port_df['id'].str.strip().str.replace("'",'')

# Create two columns from i94port string: port_city and port_addr
# also remove whitespaces and single quotes
i94port_df['port_city'], i94port_df['port_state']=i94port_df['port'].str.strip().str.replace("'",'').str.strip().str.split(',',1).str

# Remove more whitespace from port_addr
i94port_df['port_state']=i94port_df['port_state'].str.strip()

# Drop port column and keep the two new columns: port_city and port_addr
i94port_df.drop(columns =['port'], inplace = True)

# Convert pandas dataframe to list (objects which had single quotes removed automatically become string again with single quotes)
i94port_data=i94port_df.values.tolist()

  i94port_df['port_city'], i94port_df['port_state']=i94port_df['port'].str.strip().str.replace("'",'').str.strip().str.split(',',1).str


In [35]:
## print i94port_data list to check
print(i94port_data[0])
print(i94port_data[1])
print(i94port_data[2])

['ALC', 'ALCAN', 'AK']
['ANC', 'ANCHORAGE', 'AK']
['BAR', 'BAKER AAF - BAKER ISLAND', 'AK']


In [36]:
## Take a look at the i94port_df created
display(i94port_df)

Unnamed: 0,id,port_city,port_state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK
...,...,...,...
655,ADU,No PORT Code (ADU),
656,AKT,No PORT Code (AKT),
657,LIT,No PORT Code (LIT),
658,A2A,No PORT Code (A2A),


In [37]:
# Now convert list to spark dataframe
# Create a schema for the dataframe
i94port_schema = StructType([
    StructField('id', StringType(), True),
    StructField('port_city', StringType(), True),
    StructField('port_state', StringType(), True)
])
i94port=spark.createDataFrame(i94port_data, i94port_schema)

In [38]:
# Display dataframe
display(i94port)

DataFrame[id: string, port_city: string, port_state: string]

In [39]:
# create parquet file
i94port.write.mode('overwrite').parquet('./data/i94port.parquet')

In [40]:
# Read i94res text file
i94res_df = pd.read_csv('./data/i94res_cit.txt',sep='=',names=['id','country'])
# Remove whitespaces and single quotes
i94res_df['country']=i94res_df['country'].str.replace("'",'').str.strip()
# Convert pandas dataframe to list (objects which had single quotes removed automatically become string again with single quotes)
i94res_data=i94res_df.values.tolist()

In [41]:
## print i94res_data list to check
print(i94res_data[0])
print(i94res_data[1])
print(i94res_data[2])

[582, 'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)']
[236, 'AFGHANISTAN']
[101, 'ALBANIA']


In [42]:
# Now convert list to spark dataframe
# Create a schema for the dataframe
i94res_schema = StructType([
    StructField('id', StringType(), True),
    StructField('country', StringType(), True)
])
i94res=spark.createDataFrame(i94res_data, i94res_schema)

In [43]:
# Display dataframe
display(i94res)

DataFrame[id: string, country: string]

In [44]:
# Create parquet file
i94res.write.mode('overwrite').parquet('./data/i94res.parquet')

In [45]:
'''
/* I94VISA - Visa codes collapsed into three categories:
   1 = Business
   2 = Pleasure
   3 = Student
*/
'''
i94visa_data = [[1, 'Business'], [2, 'Pleasure'], [3, 'Student']]

## Convert to spark dataframe
i94visa=spark.createDataFrame(i94visa_data)

# Create parquet file
i94visa.write.mode('overwrite').parquet('./data/i94visa.parquet')

In [46]:
us.show(3)

+---------------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
|           City|State_Code|Median_age|Male_Pop|Fem_Pop|Ttl_Pop|Foreign-born|Avg_Household_Size|Native_Pop|Asian_Pop|Black_Pop|Latino_Pop|White_Pop|
+---------------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
|Highlands Ranch|        CO|      39.6|   49186|  53281| 102467|        8827|              2.72|      1480|     5650|     1779|      8393|    94499|
|           Kent|        WA|      33.4|   61825|  65137| 126962|       38175|              3.06|      3651|    26168|    20450|     21928|    67918|
|        Madison|        WI|      30.7|  122596| 126360| 248956|       30090|              2.23|      2296|    23937|    20424|     19697|   204302|
+---------------+----------+----------+--------+-------+-------+------------+------------------+----------

In [47]:
i94_spark.columns

['i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'gender',
 'admnum']

In [48]:
# Add i94port city and state columns to i94 dataframe
i94_spark.join(i94port, i94_spark.i94port==i94port.id, how='left').count()

3096302

In [49]:
# count above matches original i94 cleaned dataset `i94_spark`
#   Commit i94_spark
i94_spark=i94_spark.join(i94port, i94_spark.i94port==i94port.id, how='left')

In [50]:
i94_spark.columns

['i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'gender',
 'admnum',
 'id',
 'port_city',
 'port_state']

In [51]:
i94_spark.show()

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+---+---------+----------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum| id|port_city|port_state|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+---+---------+----------+
|   110|    BGM|  20550|      1|  20556|    36|      1|    1|     F|92847893530|BGM|   BANGOR|        ME|
|   108|    BGM|  20569|      1|  20571|    43|      1|    1|     M|59234302533|BGM|   BANGOR|        ME|
|   129|    BGM|  20559|      1|  20584|    67|      1|    1|     M|93553405030|BGM|   BANGOR|        ME|
|   261|    BGM|  20568|      1|   null|     9|      1|    1|     M|94455571030|BGM|   BANGOR|        ME|
|   111|    BGM|  20563|      1|  20567|    56|      1|    1|     M|93951299930|BGM|   BANGOR|        ME|
|   135|    BGM|  20564|      1|  20569|    30|      2|    1|     M|94033156330|BGM|   BANGOR|        ME|
|   582|    BGM|  20563|      1|  20575|    57

In [52]:
# Drop `id` column
i94_spark=i94_spark.drop("id")

In [53]:
i94_spark.columns

['i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'gender',
 'admnum',
 'port_city',
 'port_state']

In [54]:
us.columns

['City',
 'State_Code',
 'Median_age',
 'Male_Pop',
 'Fem_Pop',
 'Ttl_Pop',
 'Foreign-born',
 'Avg_Household_Size',
 'Native_Pop',
 'Asian_Pop',
 'Black_Pop',
 'Latino_Pop',
 'White_Pop']

In [55]:
## We will join `us` with `i94_spark` to get fact table 'i94non_immigrant_port_entry`
# NOTE: We use left join againt city records which may cause null values because
# we may not currently have demographic stats on all U.S. ports of entry

In [56]:
i94non_immigrant_port_entry=i94_spark.join(us, (upper(i94_spark.port_city)==upper(us.City)) & \
                                           (upper(i94_spark.port_state)==upper(us.State_Code)), how='left')

In [57]:
i94non_immigrant_port_entry.count()

i94non_immigrant_port_entry.columns

['i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'gender',
 'admnum',
 'port_city',
 'port_state',
 'City',
 'State_Code',
 'Median_age',
 'Male_Pop',
 'Fem_Pop',
 'Ttl_Pop',
 'Foreign-born',
 'Avg_Household_Size',
 'Native_Pop',
 'Asian_Pop',
 'Black_Pop',
 'Latino_Pop',
 'White_Pop']

In [58]:
# Drop City and State_Code
i94non_immigrant_port_entry=i94non_immigrant_port_entry.drop("City","State_Code")

In [59]:
# Show i94non_immigrant_port_entry
i94non_immigrant_port_entry.show()

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+---------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|port_city|port_state|Median_age|Male_Pop|Fem_Pop|Ttl_Pop|Foreign-born|Avg_Household_Size|Native_Pop|Asian_Pop|Black_Pop|Latino_Pop|White_Pop|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+---------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
|   103|    NEC|  20556|      3|  20557|    51|      2|    1|     F|  788711085|    NECHE|        ND|      null|    null|   null|   null|        null|              null|      null|     null|     null|      null|     null|
|   112|    NEC|  20573|      3|  20575|    32|      2|    1|     F|59477349333|    NECHE|        ND|      null|

In [60]:
## Create the i94date dimension from the i94non_immigrant_port_entry dataframe:
# Convert SAS arrival date to datetime format - add it as last column
get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
i94non_immigrant_port_entry = i94non_immigrant_port_entry.withColumn("arrival_date", get_date(i94non_immigrant_port_entry.arrdate))

In [61]:
i94non_immigrant_port_entry.show(3)

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+---------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+------------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|port_city|port_state|Median_age|Male_Pop|Fem_Pop|Ttl_Pop|Foreign-born|Avg_Household_Size|Native_Pop|Asian_Pop|Black_Pop|Latino_Pop|White_Pop|arrival_date|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+---------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+------------+
|   103|    NEC|  20556|      3|  20557|    51|      2|    1|     F|  788711085|    NECHE|        ND|      null|    null|   null|   null|        null|              null|      null|     null|     null|      null|     null|  2016-04-12|
|   112|    NEC|  20573|      3|  20575|    32|      2|    1

In [62]:
i94date=i94non_immigrant_port_entry.select(col('arrdate').alias('arrival_sasdate'),
                                           col('arrival_date').alias('arrival_iso_date'),
                                           date_format('arrival_date','M').alias('arrival_month'),
                                           date_format('arrival_date','E').alias('arrival_dayofweek'), 
                                           date_format('arrival_date', 'y').alias('arrival_year'), 
                                           date_format('arrival_date', 'd').alias('arrival_day'),
                                           date_format('arrival_date','w').alias('arrival_weekofyear')).dropDuplicates()

In [63]:
# Drop arrival_date column from the i94non_immigrant_port_entry dataframe and finally save it to parquet file 
i94non_immigrant_port_entry.drop('arrival_date').write.mode("overwrite").parquet('./data/i94non_immigrant_port_entry.parquet')

In [64]:
i94date.count()
i94date.show(5)

Py4JJavaError: An error occurred while calling o362.count.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#2390]
+- *(15) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#1969L])
   +- *(15) HashAggregate(keys=[arrival_day#1869, arrival_iso_date#1865, arrival_month#1866, arrival_dayofweek#1867, arrival_weekofyear#1870, arrival_year#1868, arrival_sasdate#1864], functions=[], output=[])
      +- Exchange hashpartitioning(arrival_day#1869, arrival_iso_date#1865, arrival_month#1866, arrival_dayofweek#1867, arrival_weekofyear#1870, arrival_year#1868, arrival_sasdate#1864, 200), ENSURE_REQUIREMENTS, [id=#2385]
         +- *(14) HashAggregate(keys=[arrival_day#1869, arrival_iso_date#1865, arrival_month#1866, arrival_dayofweek#1867, arrival_weekofyear#1870, arrival_year#1868, arrival_sasdate#1864], functions=[], output=[arrival_day#1869, arrival_iso_date#1865, arrival_month#1866, arrival_dayofweek#1867, arrival_weekofyear#1870, arrival_year#1868, arrival_sasdate#1864])
            +- *(14) Project [arrdate#1057 AS arrival_sasdate#1864, pythonUDF0#1967 AS arrival_iso_date#1865, date_format(cast(pythonUDF0#1967 as timestamp), M, Some(Asia/Singapore)) AS arrival_month#1866, date_format(cast(pythonUDF0#1967 as timestamp), E, Some(Asia/Singapore)) AS arrival_dayofweek#1867, date_format(cast(pythonUDF0#1967 as timestamp), y, Some(Asia/Singapore)) AS arrival_year#1868, date_format(cast(pythonUDF0#1967 as timestamp), d, Some(Asia/Singapore)) AS arrival_day#1869, date_format(cast(pythonUDF0#1967 as timestamp), w, Some(Asia/Singapore)) AS arrival_weekofyear#1870]
               +- BatchEvalPython [<lambda>(arrdate#1057)], [pythonUDF0#1967]
                  +- *(13) Project [arrdate#1057]
                     +- SortMergeJoin [upper(port_city#1226), upper(port_state#1227)], [upper(City#16), upper(State_Code#688)], LeftOuter
                        :- *(7) Sort [upper(port_city#1226) ASC NULLS FIRST, upper(port_state#1227) ASC NULLS FIRST], false, 0
                        :  +- Exchange hashpartitioning(upper(port_city#1226), upper(port_state#1227), 200), ENSURE_REQUIREMENTS, [id=#2348]
                        :     +- *(6) Project [arrdate#1057, port_city#1226, port_state#1227]
                        :        +- SortMergeJoin [i94port#953], [id#1225], LeftOuter
                        :           :- *(3) Sort [i94port#953 ASC NULLS FIRST], false, 0
                        :           :  +- Exchange hashpartitioning(i94port#953, 200), ENSURE_REQUIREMENTS, [id=#2335]
                        :           :     +- *(2) HashAggregate(keys=[count#1062, i94mode#1058, i94bir#1060, i94port#953, arrdate#1057, admnum#1063L, depdate#1059, i94visa#1061, i94res#1056, gender#970], functions=[], output=[i94port#953, arrdate#1057])
                        :           :        +- Exchange hashpartitioning(count#1062, i94mode#1058, i94bir#1060, i94port#953, arrdate#1057, admnum#1063L, depdate#1059, i94visa#1061, i94res#1056, gender#970, 200), ENSURE_REQUIREMENTS, [id=#2331]
                        :           :           +- *(1) HashAggregate(keys=[count#1062, i94mode#1058, i94bir#1060, i94port#953, arrdate#1057, admnum#1063L, depdate#1059, i94visa#1061, i94res#1056, gender#970], functions=[], output=[count#1062, i94mode#1058, i94bir#1060, i94port#953, arrdate#1057, admnum#1063L, depdate#1059, i94visa#1061, i94res#1056, gender#970])
                        :           :              +- *(1) Project [cast(i94res#952 as int) AS i94res#1056, i94port#953, cast(arrdate#954 as int) AS arrdate#1057, cast(i94mode#955 as int) AS i94mode#1058, cast(depdate#957 as int) AS depdate#1059, cast(i94bir#958 as int) AS i94bir#1060, cast(i94visa#959 as int) AS i94visa#1061, cast(count#960 as int) AS count#1062, gender#970, cast(admnum#973 as bigint) AS admnum#1063L]
                        :           :                 +- *(1) ColumnarToRow
                        :           :                    +- FileScan parquet [i94res#952,i94port#953,arrdate#954,i94mode#955,depdate#957,i94bir#958,i94visa#959,count#960,gender#970,admnum#973] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/terry/Desktop/DataEngineering/Capstone/datasets/sas_data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i94res:double,i94port:string,arrdate:double,i94mode:double,depdate:double,i94bir:double,i9...
                        :           +- *(5) Sort [id#1225 ASC NULLS FIRST], false, 0
                        :              +- Exchange hashpartitioning(id#1225, 200), ENSURE_REQUIREMENTS, [id=#2340]
                        :                 +- *(4) Filter isnotnull(id#1225)
                        :                    +- *(4) Scan ExistingRDD[id#1225,port_city#1226,port_state#1227]
                        +- *(12) Sort [upper(City#16) ASC NULLS FIRST, upper(State_Code#688) ASC NULLS FIRST], false, 0
                           +- Exchange hashpartitioning(upper(City#16), upper(State_Code#688), 200), ENSURE_REQUIREMENTS, [id=#2372]
                              +- *(11) Project [City#16, State Code#25 AS State_Code#688]
                                 +- *(11) BroadcastHashJoin [City#16, State Code#25], [City#542, State Code#551], Inner, BuildRight, false
                                    :- *(11) HashAggregate(keys=[Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24], functions=[], output=[City#16, State Code#25])
                                    :  +- Exchange hashpartitioning(Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24, 200), ENSURE_REQUIREMENTS, [id=#2356]
                                    :     +- *(8) HashAggregate(keys=[Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24], functions=[], output=[Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24])
                                    :        +- *(8) Filter (isnotnull(City#16) AND isnotnull(State Code#25))
                                    :           +- FileScan csv [City#16,State#17,Median Age#18,Male Population#19,Female Population#20,Total Population#21,Foreign-born#23,Average Household Size#24,State Code#25] Batched: false, DataFilters: [isnotnull(City#16), isnotnull(State Code#25)], Format: CSV, Location: InMemoryFileIndex[file:/home/terry/Desktop/DataEngineering/Capstone/datasets/us-cities-demographi..., PartitionFilters: [], PushedFilters: [IsNotNull(City), IsNotNull(State Code)], ReadSchema: struct<City:string,State:string,Median Age:string,Male Population:string,Female Population:string...
                                    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true]),false), [id=#2367]
                                       +- *(10) HashAggregate(keys=[City#542, State Code#551], functions=[], output=[City#542, State Code#551])
                                          +- Exchange hashpartitioning(City#542, State Code#551, 200), ENSURE_REQUIREMENTS, [id=#2363]
                                             +- *(9) HashAggregate(keys=[City#542, State Code#551], functions=[], output=[City#542, State Code#551])
                                                +- *(9) Filter (isnotnull(City#542) AND isnotnull(State Code#551))
                                                   +- FileScan csv [City#542,State Code#551] Batched: false, DataFilters: [isnotnull(City#542), isnotnull(State Code#551)], Format: CSV, Location: InMemoryFileIndex[file:/home/terry/Desktop/DataEngineering/Capstone/datasets/us-cities-demographi..., PartitionFilters: [], PushedFilters: [IsNotNull(City), IsNotNull(State Code)], ReadSchema: struct<City:string,State Code:string>

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:163)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:746)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:321)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:387)
	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3006)
	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3005)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:3005)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(arrival_day#1869, arrival_iso_date#1865, arrival_month#1866, arrival_dayofweek#1867, arrival_weekofyear#1870, arrival_year#1868, arrival_sasdate#1864, 200), ENSURE_REQUIREMENTS, [id=#2385]
+- *(14) HashAggregate(keys=[arrival_day#1869, arrival_iso_date#1865, arrival_month#1866, arrival_dayofweek#1867, arrival_weekofyear#1870, arrival_year#1868, arrival_sasdate#1864], functions=[], output=[arrival_day#1869, arrival_iso_date#1865, arrival_month#1866, arrival_dayofweek#1867, arrival_weekofyear#1870, arrival_year#1868, arrival_sasdate#1864])
   +- *(14) Project [arrdate#1057 AS arrival_sasdate#1864, pythonUDF0#1967 AS arrival_iso_date#1865, date_format(cast(pythonUDF0#1967 as timestamp), M, Some(Asia/Singapore)) AS arrival_month#1866, date_format(cast(pythonUDF0#1967 as timestamp), E, Some(Asia/Singapore)) AS arrival_dayofweek#1867, date_format(cast(pythonUDF0#1967 as timestamp), y, Some(Asia/Singapore)) AS arrival_year#1868, date_format(cast(pythonUDF0#1967 as timestamp), d, Some(Asia/Singapore)) AS arrival_day#1869, date_format(cast(pythonUDF0#1967 as timestamp), w, Some(Asia/Singapore)) AS arrival_weekofyear#1870]
      +- BatchEvalPython [<lambda>(arrdate#1057)], [pythonUDF0#1967]
         +- *(13) Project [arrdate#1057]
            +- SortMergeJoin [upper(port_city#1226), upper(port_state#1227)], [upper(City#16), upper(State_Code#688)], LeftOuter
               :- *(7) Sort [upper(port_city#1226) ASC NULLS FIRST, upper(port_state#1227) ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(upper(port_city#1226), upper(port_state#1227), 200), ENSURE_REQUIREMENTS, [id=#2348]
               :     +- *(6) Project [arrdate#1057, port_city#1226, port_state#1227]
               :        +- SortMergeJoin [i94port#953], [id#1225], LeftOuter
               :           :- *(3) Sort [i94port#953 ASC NULLS FIRST], false, 0
               :           :  +- Exchange hashpartitioning(i94port#953, 200), ENSURE_REQUIREMENTS, [id=#2335]
               :           :     +- *(2) HashAggregate(keys=[count#1062, i94mode#1058, i94bir#1060, i94port#953, arrdate#1057, admnum#1063L, depdate#1059, i94visa#1061, i94res#1056, gender#970], functions=[], output=[i94port#953, arrdate#1057])
               :           :        +- Exchange hashpartitioning(count#1062, i94mode#1058, i94bir#1060, i94port#953, arrdate#1057, admnum#1063L, depdate#1059, i94visa#1061, i94res#1056, gender#970, 200), ENSURE_REQUIREMENTS, [id=#2331]
               :           :           +- *(1) HashAggregate(keys=[count#1062, i94mode#1058, i94bir#1060, i94port#953, arrdate#1057, admnum#1063L, depdate#1059, i94visa#1061, i94res#1056, gender#970], functions=[], output=[count#1062, i94mode#1058, i94bir#1060, i94port#953, arrdate#1057, admnum#1063L, depdate#1059, i94visa#1061, i94res#1056, gender#970])
               :           :              +- *(1) Project [cast(i94res#952 as int) AS i94res#1056, i94port#953, cast(arrdate#954 as int) AS arrdate#1057, cast(i94mode#955 as int) AS i94mode#1058, cast(depdate#957 as int) AS depdate#1059, cast(i94bir#958 as int) AS i94bir#1060, cast(i94visa#959 as int) AS i94visa#1061, cast(count#960 as int) AS count#1062, gender#970, cast(admnum#973 as bigint) AS admnum#1063L]
               :           :                 +- *(1) ColumnarToRow
               :           :                    +- FileScan parquet [i94res#952,i94port#953,arrdate#954,i94mode#955,depdate#957,i94bir#958,i94visa#959,count#960,gender#970,admnum#973] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/terry/Desktop/DataEngineering/Capstone/datasets/sas_data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i94res:double,i94port:string,arrdate:double,i94mode:double,depdate:double,i94bir:double,i9...
               :           +- *(5) Sort [id#1225 ASC NULLS FIRST], false, 0
               :              +- Exchange hashpartitioning(id#1225, 200), ENSURE_REQUIREMENTS, [id=#2340]
               :                 +- *(4) Filter isnotnull(id#1225)
               :                    +- *(4) Scan ExistingRDD[id#1225,port_city#1226,port_state#1227]
               +- *(12) Sort [upper(City#16) ASC NULLS FIRST, upper(State_Code#688) ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(upper(City#16), upper(State_Code#688), 200), ENSURE_REQUIREMENTS, [id=#2372]
                     +- *(11) Project [City#16, State Code#25 AS State_Code#688]
                        +- *(11) BroadcastHashJoin [City#16, State Code#25], [City#542, State Code#551], Inner, BuildRight, false
                           :- *(11) HashAggregate(keys=[Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24], functions=[], output=[City#16, State Code#25])
                           :  +- Exchange hashpartitioning(Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24, 200), ENSURE_REQUIREMENTS, [id=#2356]
                           :     +- *(8) HashAggregate(keys=[Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24], functions=[], output=[Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24])
                           :        +- *(8) Filter (isnotnull(City#16) AND isnotnull(State Code#25))
                           :           +- FileScan csv [City#16,State#17,Median Age#18,Male Population#19,Female Population#20,Total Population#21,Foreign-born#23,Average Household Size#24,State Code#25] Batched: false, DataFilters: [isnotnull(City#16), isnotnull(State Code#25)], Format: CSV, Location: InMemoryFileIndex[file:/home/terry/Desktop/DataEngineering/Capstone/datasets/us-cities-demographi..., PartitionFilters: [], PushedFilters: [IsNotNull(City), IsNotNull(State Code)], ReadSchema: struct<City:string,State:string,Median Age:string,Male Population:string,Female Population:string...
                           +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true]),false), [id=#2367]
                              +- *(10) HashAggregate(keys=[City#542, State Code#551], functions=[], output=[City#542, State Code#551])
                                 +- Exchange hashpartitioning(City#542, State Code#551, 200), ENSURE_REQUIREMENTS, [id=#2363]
                                    +- *(9) HashAggregate(keys=[City#542, State Code#551], functions=[], output=[City#542, State Code#551])
                                       +- *(9) Filter (isnotnull(City#542) AND isnotnull(State Code#551))
                                          +- FileScan csv [City#542,State Code#551] Batched: false, DataFilters: [isnotnull(City#542), isnotnull(State Code#551)], Format: CSV, Location: InMemoryFileIndex[file:/home/terry/Desktop/DataEngineering/Capstone/datasets/us-cities-demographi..., PartitionFilters: [], PushedFilters: [IsNotNull(City), IsNotNull(State Code)], ReadSchema: struct<City:string,State Code:string>

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:163)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:746)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:151)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:149)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:166)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 40 more
Caused by: org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: Fail to recognize 'w' pattern in the DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern with the guide from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkLegacyFormatter$1.applyOrElse(DateTimeFormatterHelper.scala:196)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkLegacyFormatter$1.applyOrElse(DateTimeFormatterHelper.scala:185)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.validatePatternString(TimestampFormatter.scala:109)
	at org.apache.spark.sql.catalyst.util.TimestampFormatter$.getFormatter(TimestampFormatter.scala:300)
	at org.apache.spark.sql.catalyst.util.TimestampFormatter$.apply(TimestampFormatter.scala:333)
	at org.apache.spark.sql.catalyst.expressions.TimestampFormatterHelper.getFormatter(datetimeExpressions.scala:72)
	at org.apache.spark.sql.catalyst.expressions.TimestampFormatterHelper.getFormatter$(datetimeExpressions.scala:67)
	at org.apache.spark.sql.catalyst.expressions.DateFormatClass.getFormatter(datetimeExpressions.scala:771)
	at org.apache.spark.sql.catalyst.expressions.TimestampFormatterHelper.$anonfun$formatterOption$1(datetimeExpressions.scala:64)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.catalyst.expressions.TimestampFormatterHelper.formatterOption(datetimeExpressions.scala:64)
	at org.apache.spark.sql.catalyst.expressions.TimestampFormatterHelper.formatterOption$(datetimeExpressions.scala:62)
	at org.apache.spark.sql.catalyst.expressions.DateFormatClass.formatterOption$lzycompute(datetimeExpressions.scala:771)
	at org.apache.spark.sql.catalyst.expressions.DateFormatClass.formatterOption(datetimeExpressions.scala:771)
	at org.apache.spark.sql.catalyst.expressions.DateFormatClass.doGenCode(datetimeExpressions.scala:790)
	at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:146)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:141)
	at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:163)
	at org.apache.spark.sql.execution.ProjectExec.$anonfun$doConsume$2(basicPhysicalOperators.scala:73)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.ProjectExec.$anonfun$doConsume$1(basicPhysicalOperators.scala:73)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.withSubExprEliminationExprs(CodeGenerator.scala:1026)
	at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:73)
	at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194)
	at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:149)
	at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.InputRDDCodegen.doProduce(WholeStageCodegenExec.scala:483)
	at org.apache.spark.sql.execution.InputRDDCodegen.doProduce$(WholeStageCodegenExec.scala:456)
	at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:54)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:733)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:47)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:655)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:718)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:151)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:149)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:166)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 64 more
Caused by: java.lang.IllegalArgumentException: All week-based patterns are unsupported since Spark 3.0, detected: w, Please use the SQL function EXTRACT instead
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$.$anonfun$convertIncompatiblePattern$4(DateTimeFormatterHelper.scala:323)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$.$anonfun$convertIncompatiblePattern$4$adapted(DateTimeFormatterHelper.scala:321)
	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.immutable.StringOps.foreach(StringOps.scala:33)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$.$anonfun$convertIncompatiblePattern$2(DateTimeFormatterHelper.scala:321)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$.convertIncompatiblePattern(DateTimeFormatterHelper.scala:318)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper.getOrCreateFormatter(DateTimeFormatterHelper.scala:121)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper.getOrCreateFormatter$(DateTimeFormatterHelper.scala:117)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.getOrCreateFormatter(TimestampFormatter.scala:59)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.formatter$lzycompute(TimestampFormatter.scala:68)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.formatter(TimestampFormatter.scala:67)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.validatePatternString(TimestampFormatter.scala:108)
	... 134 more


In [95]:
# Create temporary sql table
i94date.createOrReplaceTempView("i94date_table")

In [96]:
# Add seasons to i94 date dimension table
i94date_season=spark.sql('''select arrival_sasdate,
                         arrival_iso_date,
                         arrival_month,
                         arrival_dayofweek,
                         arrival_year,
                         arrival_day,
                         arrival_weekofyear,
                         CASE WHEN arrival_month IN (12, 1, 2) THEN 'winter' 
                                WHEN arrival_month IN (3, 4, 5) THEN 'spring' 
                                WHEN arrival_month IN (6, 7, 8) THEN 'summer' 
                                ELSE 'autumn' 
                         END AS date_season from i94date_table''')

In [97]:
i94date_season.show(3)

Py4JJavaError: An error occurred while calling o1241.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(arrival_day#3551, arrival_iso_date#3547, arrival_month#3548, arrival_dayofweek#3549, arrival_weekofyear#3552, arrival_year#3550, arrival_sasdate#3546, 200), ENSURE_REQUIREMENTS, [id=#6016]
+- *(14) HashAggregate(keys=[arrival_day#3551, arrival_iso_date#3547, arrival_month#3548, arrival_dayofweek#3549, arrival_weekofyear#3552, arrival_year#3550, arrival_sasdate#3546], functions=[], output=[arrival_day#3551, arrival_iso_date#3547, arrival_month#3548, arrival_dayofweek#3549, arrival_weekofyear#3552, arrival_year#3550, arrival_sasdate#3546])
   +- *(14) Project [arrdate#2325 AS arrival_sasdate#3546, pythonUDF0#3719 AS arrival_iso_date#3547, date_format(cast(pythonUDF0#3719 as timestamp), M, Some(Asia/Singapore)) AS arrival_month#3548, date_format(cast(pythonUDF0#3719 as timestamp), E, Some(Asia/Singapore)) AS arrival_dayofweek#3549, date_format(cast(pythonUDF0#3719 as timestamp), y, Some(Asia/Singapore)) AS arrival_year#3550, date_format(cast(pythonUDF0#3719 as timestamp), d, Some(Asia/Singapore)) AS arrival_day#3551, date_format(cast(pythonUDF0#3719 as timestamp), w, Some(Asia/Singapore)) AS arrival_weekofyear#3552]
      +- BatchEvalPython [<lambda>(arrdate#2325)], [pythonUDF0#3719]
         +- *(13) Project [arrdate#2325]
            +- SortMergeJoin [upper(port_city#2529), upper(port_state#2530)], [upper(City#16), upper(State_Code#804)], LeftOuter
               :- *(7) Sort [upper(port_city#2529) ASC NULLS FIRST, upper(port_state#2530) ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(upper(port_city#2529), upper(port_state#2530), 200), ENSURE_REQUIREMENTS, [id=#5979]
               :     +- *(6) Project [arrdate#2325, port_city#2529, port_state#2530]
               :        +- SortMergeJoin [i94port#2137], [id#2528], LeftOuter
               :           :- *(3) Sort [i94port#2137 ASC NULLS FIRST], false, 0
               :           :  +- Exchange hashpartitioning(i94port#2137, 200), ENSURE_REQUIREMENTS, [id=#5966]
               :           :     +- *(2) HashAggregate(keys=[count#2330, i94mode#2326, i94bir#2328, i94port#2137, arrdate#2325, admnum#2331L, depdate#2327, i94visa#2329, i94res#2324, gender#2154], functions=[], output=[i94port#2137, arrdate#2325])
               :           :        +- Exchange hashpartitioning(count#2330, i94mode#2326, i94bir#2328, i94port#2137, arrdate#2325, admnum#2331L, depdate#2327, i94visa#2329, i94res#2324, gender#2154, 200), ENSURE_REQUIREMENTS, [id=#5962]
               :           :           +- *(1) HashAggregate(keys=[count#2330, i94mode#2326, i94bir#2328, i94port#2137, arrdate#2325, admnum#2331L, depdate#2327, i94visa#2329, i94res#2324, gender#2154], functions=[], output=[count#2330, i94mode#2326, i94bir#2328, i94port#2137, arrdate#2325, admnum#2331L, depdate#2327, i94visa#2329, i94res#2324, gender#2154])
               :           :              +- *(1) Project [cast(i94res#2136 as int) AS i94res#2324, i94port#2137, cast(arrdate#2138 as int) AS arrdate#2325, cast(i94mode#2139 as int) AS i94mode#2326, cast(depdate#2141 as int) AS depdate#2327, cast(i94bir#2142 as int) AS i94bir#2328, cast(i94visa#2143 as int) AS i94visa#2329, cast(count#2144 as int) AS count#2330, gender#2154, cast(admnum#2157 as bigint) AS admnum#2331L]
               :           :                 +- *(1) ColumnarToRow
               :           :                    +- FileScan parquet [i94res#2136,i94port#2137,arrdate#2138,i94mode#2139,depdate#2141,i94bir#2142,i94visa#2143,count#2144,gender#2154,admnum#2157] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/terry/Desktop/DataEngineering/Capstone/datasets/sas_data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i94res:double,i94port:string,arrdate:double,i94mode:double,depdate:double,i94bir:double,i9...
               :           +- *(5) Sort [id#2528 ASC NULLS FIRST], false, 0
               :              +- Exchange hashpartitioning(id#2528, 200), ENSURE_REQUIREMENTS, [id=#5971]
               :                 +- *(4) Filter isnotnull(id#2528)
               :                    +- *(4) Scan ExistingRDD[id#2528,port_city#2529,port_state#2530]
               +- *(12) Sort [upper(City#16) ASC NULLS FIRST, upper(State_Code#804) ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(upper(City#16), upper(State_Code#804), 200), ENSURE_REQUIREMENTS, [id=#6003]
                     +- *(11) Project [City#16, State Code#25 AS State_Code#804]
                        +- *(11) BroadcastHashJoin [City#16, State Code#25], [City#611, State Code#620], Inner, BuildRight, false
                           :- *(11) HashAggregate(keys=[Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24], functions=[], output=[City#16, State Code#25])
                           :  +- Exchange hashpartitioning(Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24, 200), ENSURE_REQUIREMENTS, [id=#5987]
                           :     +- *(8) HashAggregate(keys=[Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24], functions=[], output=[Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24])
                           :        +- *(8) Filter (isnotnull(City#16) AND isnotnull(State Code#25))
                           :           +- FileScan csv [City#16,State#17,Median Age#18,Male Population#19,Female Population#20,Total Population#21,Foreign-born#23,Average Household Size#24,State Code#25] Batched: false, DataFilters: [isnotnull(City#16), isnotnull(State Code#25)], Format: CSV, Location: InMemoryFileIndex[file:/home/terry/Desktop/DataEngineering/Capstone/datasets/us-cities-demographi..., PartitionFilters: [], PushedFilters: [IsNotNull(City), IsNotNull(State Code)], ReadSchema: struct<City:string,State:string,Median Age:string,Male Population:string,Female Population:string...
                           +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true]),false), [id=#5998]
                              +- *(10) HashAggregate(keys=[City#611, State Code#620], functions=[], output=[City#611, State Code#620])
                                 +- Exchange hashpartitioning(City#611, State Code#620, 200), ENSURE_REQUIREMENTS, [id=#5994]
                                    +- *(9) HashAggregate(keys=[City#611, State Code#620], functions=[], output=[City#611, State Code#620])
                                       +- *(9) Filter (isnotnull(City#611) AND isnotnull(State Code#620))
                                          +- FileScan csv [City#611,State Code#620] Batched: false, DataFilters: [isnotnull(City#611), isnotnull(State Code#620)], Format: CSV, Location: InMemoryFileIndex[file:/home/terry/Desktop/DataEngineering/Capstone/datasets/us-cities-demographi..., PartitionFilters: [], PushedFilters: [IsNotNull(City), IsNotNull(State Code)], ReadSchema: struct<City:string,State Code:string>

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:163)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:746)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:321)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:439)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
	at sun.reflect.GeneratedMethodAccessor201.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: Fail to recognize 'w' pattern in the DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern with the guide from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkLegacyFormatter$1.applyOrElse(DateTimeFormatterHelper.scala:196)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkLegacyFormatter$1.applyOrElse(DateTimeFormatterHelper.scala:185)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.validatePatternString(TimestampFormatter.scala:109)
	at org.apache.spark.sql.catalyst.util.TimestampFormatter$.getFormatter(TimestampFormatter.scala:300)
	at org.apache.spark.sql.catalyst.util.TimestampFormatter$.apply(TimestampFormatter.scala:333)
	at org.apache.spark.sql.catalyst.expressions.TimestampFormatterHelper.getFormatter(datetimeExpressions.scala:72)
	at org.apache.spark.sql.catalyst.expressions.TimestampFormatterHelper.getFormatter$(datetimeExpressions.scala:67)
	at org.apache.spark.sql.catalyst.expressions.DateFormatClass.getFormatter(datetimeExpressions.scala:771)
	at org.apache.spark.sql.catalyst.expressions.TimestampFormatterHelper.$anonfun$formatterOption$1(datetimeExpressions.scala:64)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.catalyst.expressions.TimestampFormatterHelper.formatterOption(datetimeExpressions.scala:64)
	at org.apache.spark.sql.catalyst.expressions.TimestampFormatterHelper.formatterOption$(datetimeExpressions.scala:62)
	at org.apache.spark.sql.catalyst.expressions.DateFormatClass.formatterOption$lzycompute(datetimeExpressions.scala:771)
	at org.apache.spark.sql.catalyst.expressions.DateFormatClass.formatterOption(datetimeExpressions.scala:771)
	at org.apache.spark.sql.catalyst.expressions.DateFormatClass.doGenCode(datetimeExpressions.scala:790)
	at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:146)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:141)
	at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:163)
	at org.apache.spark.sql.execution.ProjectExec.$anonfun$doConsume$2(basicPhysicalOperators.scala:73)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.ProjectExec.$anonfun$doConsume$1(basicPhysicalOperators.scala:73)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.withSubExprEliminationExprs(CodeGenerator.scala:1026)
	at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:73)
	at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194)
	at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:149)
	at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.InputRDDCodegen.doProduce(WholeStageCodegenExec.scala:483)
	at org.apache.spark.sql.execution.InputRDDCodegen.doProduce$(WholeStageCodegenExec.scala:456)
	at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:54)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:733)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:47)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:655)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:718)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:151)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:149)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:166)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 44 more
Caused by: java.lang.IllegalArgumentException: All week-based patterns are unsupported since Spark 3.0, detected: w, Please use the SQL function EXTRACT instead
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$.$anonfun$convertIncompatiblePattern$4(DateTimeFormatterHelper.scala:323)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$.$anonfun$convertIncompatiblePattern$4$adapted(DateTimeFormatterHelper.scala:321)
	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.immutable.StringOps.foreach(StringOps.scala:33)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$.$anonfun$convertIncompatiblePattern$2(DateTimeFormatterHelper.scala:321)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$.convertIncompatiblePattern(DateTimeFormatterHelper.scala:318)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper.getOrCreateFormatter(DateTimeFormatterHelper.scala:121)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper.getOrCreateFormatter$(DateTimeFormatterHelper.scala:117)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.getOrCreateFormatter(TimestampFormatter.scala:59)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.formatter$lzycompute(TimestampFormatter.scala:68)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.formatter(TimestampFormatter.scala:67)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.validatePatternString(TimestampFormatter.scala:108)
	... 114 more


In [98]:
# Save i94date dimension to parquet file partitioned by year and month:
i94date_season.write.mode("overwrite").partitionBy("arrival_year", "arrival_month").parquet('./data/i94date.parquet')

Py4JJavaError: An error occurred while calling o1254.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:874)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(arrival_day#3551, arrival_iso_date#3547, arrival_month#3548, arrival_dayofweek#3549, arrival_weekofyear#3552, arrival_year#3550, arrival_sasdate#3546, 200), ENSURE_REQUIREMENTS, [id=#6252]
+- *(14) HashAggregate(keys=[arrival_day#3551, arrival_iso_date#3547, arrival_month#3548, arrival_dayofweek#3549, arrival_weekofyear#3552, arrival_year#3550, arrival_sasdate#3546], functions=[], output=[arrival_day#3551, arrival_iso_date#3547, arrival_month#3548, arrival_dayofweek#3549, arrival_weekofyear#3552, arrival_year#3550, arrival_sasdate#3546])
   +- *(14) Project [arrdate#2325 AS arrival_sasdate#3546, pythonUDF0#3739 AS arrival_iso_date#3547, date_format(cast(pythonUDF0#3739 as timestamp), M, Some(Asia/Singapore)) AS arrival_month#3548, date_format(cast(pythonUDF0#3739 as timestamp), E, Some(Asia/Singapore)) AS arrival_dayofweek#3549, date_format(cast(pythonUDF0#3739 as timestamp), y, Some(Asia/Singapore)) AS arrival_year#3550, date_format(cast(pythonUDF0#3739 as timestamp), d, Some(Asia/Singapore)) AS arrival_day#3551, date_format(cast(pythonUDF0#3739 as timestamp), w, Some(Asia/Singapore)) AS arrival_weekofyear#3552]
      +- BatchEvalPython [<lambda>(arrdate#2325)], [pythonUDF0#3739]
         +- *(13) Project [arrdate#2325]
            +- SortMergeJoin [upper(port_city#2529), upper(port_state#2530)], [upper(City#16), upper(State_Code#804)], LeftOuter
               :- *(7) Sort [upper(port_city#2529) ASC NULLS FIRST, upper(port_state#2530) ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(upper(port_city#2529), upper(port_state#2530), 200), ENSURE_REQUIREMENTS, [id=#6215]
               :     +- *(6) Project [arrdate#2325, port_city#2529, port_state#2530]
               :        +- SortMergeJoin [i94port#2137], [id#2528], LeftOuter
               :           :- *(3) Sort [i94port#2137 ASC NULLS FIRST], false, 0
               :           :  +- Exchange hashpartitioning(i94port#2137, 200), ENSURE_REQUIREMENTS, [id=#6202]
               :           :     +- *(2) HashAggregate(keys=[count#2330, i94mode#2326, i94bir#2328, i94port#2137, arrdate#2325, admnum#2331L, depdate#2327, i94visa#2329, i94res#2324, gender#2154], functions=[], output=[i94port#2137, arrdate#2325])
               :           :        +- Exchange hashpartitioning(count#2330, i94mode#2326, i94bir#2328, i94port#2137, arrdate#2325, admnum#2331L, depdate#2327, i94visa#2329, i94res#2324, gender#2154, 200), ENSURE_REQUIREMENTS, [id=#6198]
               :           :           +- *(1) HashAggregate(keys=[count#2330, i94mode#2326, i94bir#2328, i94port#2137, arrdate#2325, admnum#2331L, depdate#2327, i94visa#2329, i94res#2324, gender#2154], functions=[], output=[count#2330, i94mode#2326, i94bir#2328, i94port#2137, arrdate#2325, admnum#2331L, depdate#2327, i94visa#2329, i94res#2324, gender#2154])
               :           :              +- *(1) Project [cast(i94res#2136 as int) AS i94res#2324, i94port#2137, cast(arrdate#2138 as int) AS arrdate#2325, cast(i94mode#2139 as int) AS i94mode#2326, cast(depdate#2141 as int) AS depdate#2327, cast(i94bir#2142 as int) AS i94bir#2328, cast(i94visa#2143 as int) AS i94visa#2329, cast(count#2144 as int) AS count#2330, gender#2154, cast(admnum#2157 as bigint) AS admnum#2331L]
               :           :                 +- *(1) ColumnarToRow
               :           :                    +- FileScan parquet [i94res#2136,i94port#2137,arrdate#2138,i94mode#2139,depdate#2141,i94bir#2142,i94visa#2143,count#2144,gender#2154,admnum#2157] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/terry/Desktop/DataEngineering/Capstone/datasets/sas_data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i94res:double,i94port:string,arrdate:double,i94mode:double,depdate:double,i94bir:double,i9...
               :           +- *(5) Sort [id#2528 ASC NULLS FIRST], false, 0
               :              +- Exchange hashpartitioning(id#2528, 200), ENSURE_REQUIREMENTS, [id=#6207]
               :                 +- *(4) Filter isnotnull(id#2528)
               :                    +- *(4) Scan ExistingRDD[id#2528,port_city#2529,port_state#2530]
               +- *(12) Sort [upper(City#16) ASC NULLS FIRST, upper(State_Code#804) ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(upper(City#16), upper(State_Code#804), 200), ENSURE_REQUIREMENTS, [id=#6239]
                     +- *(11) Project [City#16, State Code#25 AS State_Code#804]
                        +- *(11) BroadcastHashJoin [City#16, State Code#25], [City#611, State Code#620], Inner, BuildRight, false
                           :- *(11) HashAggregate(keys=[Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24], functions=[], output=[City#16, State Code#25])
                           :  +- Exchange hashpartitioning(Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24, 200), ENSURE_REQUIREMENTS, [id=#6223]
                           :     +- *(8) HashAggregate(keys=[Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24], functions=[], output=[Total Population#21, Female Population#20, City#16, Median Age#18, State Code#25, Foreign-born#23, Male Population#19, State#17, Average Household Size#24])
                           :        +- *(8) Filter (isnotnull(City#16) AND isnotnull(State Code#25))
                           :           +- FileScan csv [City#16,State#17,Median Age#18,Male Population#19,Female Population#20,Total Population#21,Foreign-born#23,Average Household Size#24,State Code#25] Batched: false, DataFilters: [isnotnull(City#16), isnotnull(State Code#25)], Format: CSV, Location: InMemoryFileIndex[file:/home/terry/Desktop/DataEngineering/Capstone/datasets/us-cities-demographi..., PartitionFilters: [], PushedFilters: [IsNotNull(City), IsNotNull(State Code)], ReadSchema: struct<City:string,State:string,Median Age:string,Male Population:string,Female Population:string...
                           +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true]),false), [id=#6234]
                              +- *(10) HashAggregate(keys=[City#611, State Code#620], functions=[], output=[City#611, State Code#620])
                                 +- Exchange hashpartitioning(City#611, State Code#620, 200), ENSURE_REQUIREMENTS, [id=#6230]
                                    +- *(9) HashAggregate(keys=[City#611, State Code#620], functions=[], output=[City#611, State Code#620])
                                       +- *(9) Filter (isnotnull(City#611) AND isnotnull(State Code#620))
                                          +- FileScan csv [City#611,State Code#620] Batched: false, DataFilters: [isnotnull(City#611), isnotnull(State Code#620)], Format: CSV, Location: InMemoryFileIndex[file:/home/terry/Desktop/DataEngineering/Capstone/datasets/us-cities-demographi..., PartitionFilters: [], PushedFilters: [IsNotNull(City), IsNotNull(State Code)], ReadSchema: struct<City:string,State Code:string>

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:163)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:746)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.ProjectExec.doExecute(basicPhysicalOperators.scala:92)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.SortExec.doExecute(SortExec.scala:112)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:184)
	... 33 more
Caused by: org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: Fail to recognize 'w' pattern in the DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern with the guide from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkLegacyFormatter$1.applyOrElse(DateTimeFormatterHelper.scala:196)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkLegacyFormatter$1.applyOrElse(DateTimeFormatterHelper.scala:185)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.validatePatternString(TimestampFormatter.scala:109)
	at org.apache.spark.sql.catalyst.util.TimestampFormatter$.getFormatter(TimestampFormatter.scala:300)
	at org.apache.spark.sql.catalyst.util.TimestampFormatter$.apply(TimestampFormatter.scala:333)
	at org.apache.spark.sql.catalyst.expressions.TimestampFormatterHelper.getFormatter(datetimeExpressions.scala:72)
	at org.apache.spark.sql.catalyst.expressions.TimestampFormatterHelper.getFormatter$(datetimeExpressions.scala:67)
	at org.apache.spark.sql.catalyst.expressions.DateFormatClass.getFormatter(datetimeExpressions.scala:771)
	at org.apache.spark.sql.catalyst.expressions.TimestampFormatterHelper.$anonfun$formatterOption$1(datetimeExpressions.scala:64)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.catalyst.expressions.TimestampFormatterHelper.formatterOption(datetimeExpressions.scala:64)
	at org.apache.spark.sql.catalyst.expressions.TimestampFormatterHelper.formatterOption$(datetimeExpressions.scala:62)
	at org.apache.spark.sql.catalyst.expressions.DateFormatClass.formatterOption$lzycompute(datetimeExpressions.scala:771)
	at org.apache.spark.sql.catalyst.expressions.DateFormatClass.formatterOption(datetimeExpressions.scala:771)
	at org.apache.spark.sql.catalyst.expressions.DateFormatClass.doGenCode(datetimeExpressions.scala:790)
	at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:146)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:141)
	at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:163)
	at org.apache.spark.sql.execution.ProjectExec.$anonfun$doConsume$2(basicPhysicalOperators.scala:73)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.ProjectExec.$anonfun$doConsume$1(basicPhysicalOperators.scala:73)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.withSubExprEliminationExprs(CodeGenerator.scala:1026)
	at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:73)
	at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194)
	at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:149)
	at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.InputRDDCodegen.doProduce(WholeStageCodegenExec.scala:483)
	at org.apache.spark.sql.execution.InputRDDCodegen.doProduce$(WholeStageCodegenExec.scala:456)
	at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:54)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:733)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:47)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:655)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:718)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:151)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:149)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:166)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 63 more
Caused by: java.lang.IllegalArgumentException: All week-based patterns are unsupported since Spark 3.0, detected: w, Please use the SQL function EXTRACT instead
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$.$anonfun$convertIncompatiblePattern$4(DateTimeFormatterHelper.scala:323)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$.$anonfun$convertIncompatiblePattern$4$adapted(DateTimeFormatterHelper.scala:321)
	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.immutable.StringOps.foreach(StringOps.scala:33)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$.$anonfun$convertIncompatiblePattern$2(DateTimeFormatterHelper.scala:321)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$.convertIncompatiblePattern(DateTimeFormatterHelper.scala:318)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper.getOrCreateFormatter(DateTimeFormatterHelper.scala:121)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper.getOrCreateFormatter$(DateTimeFormatterHelper.scala:117)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.getOrCreateFormatter(TimestampFormatter.scala:59)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.formatter$lzycompute(TimestampFormatter.scala:68)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.formatter(TimestampFormatter.scala:67)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.validatePatternString(TimestampFormatter.scala:108)
	... 133 more


In [None]:
# Step 3: Define the Data Model

## 3.1 Conceptual Data Model
## Using the dimensional tables saved as parquet files we can implement them on any columnar database in Star Schema model. 
## Star Schema model was chosen because it will be easier for Data Analysts and Data Scientists to understand and apply queries with best performance outcomes and flexibility. (Refer to databasediagram1.PNG)



In [None]:
## 3.2 Mapping out data pipelines

a) Packages needed
pandas
datetime
pyspark.sql -> SparkSession -> "org.apache.hadoop:hadoop-aws:2.7.0"
pyspark.sql.functions -> first, upper, col, udf, date_format, expr
pyspark.sql.types -> StructField, StructType, StringType, LongType, IntegerType

b) Create the dimensions (i94port, i94visa, i94res, i94mode) from i94_SAS_Labels_Descriptions.SAS file. 
*NOTE: Once they're created it does not have to be included in future Data Pipeline schedules because these are essentially master records which do not frequently get added or changed onto the dimension tables.

Read US Cities Demo dataset (us-cities-demographics.csv) to form us_spark dataframe

Create 'us_race_cnt' dataset from us_spark
Drop columns we don't need and drop duplicate rows from us_spark

Join us_spark with us_race_cnt to form US data set
Change state code column name to state_code and other similar problems to avoid parquet complications. Remove whitespaces!

Remove the state column
Write transformed US dataset onto parquet file

Read i94 non-immigration dataset to form i94_spark dataframe
Execute type conversion:  numbers to longtype and integertype
Drop duplicate rows
Read i94port dimension parquet file so we can use it to join with i94_spark. This will add i94port city and state columns to i94_spark dataframe

Drop id column from i94_spark dataframe
Join US with i94_spark to get fact table i94non_immigrant_port_entry
Add "iso date format" column arrival_date inside the i94non_immigrant_port_entry dataframe by using custom function.

Create time dimension from i94non_immigrant_port_entry and save to parquet file.
Drop arrival_date column from i94non_immigrant_port_entry and save it to parquet file.

Add seasons to i94date_seasons dataframe.
Save i94date_seasons to parquet file partitioned by year and month.



In [None]:
Step 4: Run Pipelines to Model the Data
4.1 Create the data model
Build the data pipelines to create the data model.

Data pipeline is built inside `etl.py`



In [99]:
### --- Data quality checks here --- ###

def check_consistent_data():
    if us.count() == us_race_cnt.count():
        return True
    else:
        return False

def check_usSpark_contents():
    if us_spark.count() > 0:
        return True
    else:
        return False

def check_i94Spark_contents():
    if i94_spark.count() > 0:
        return True
    else:
        return False

def check_i94_data_count():
    if i94_spark.count() == i94non_immigrant_port_entry.count():
        return True
    else:
        return False



test1 = check_consistent_data()
if test1 == True:
    print('us.count EQUALS us_race_cnt.count. PASS')
elif test1 == False:
    print('us.count & us_race_cnt have inconsistent data between them. FAIL')

test2 = check_usSpark_contents()
if test2 == True:
    print('us_spark read PASS.')
elif test2 == False:
    print('Nothing inside us_spark. FAIL')

test3 = check_i94Spark_contents()
if test3 == True:
    print('i94_spark read PASS')
elif test3 == False:
    print('Nothing inside i94_spark. FAIL')
    
test4 = check_i94_data_count()
if test4 == True:
    print('i94_spark.count EQUALS i94non_immigrant_port_entry.count. PASS')
elif test4 == False:
    print('i94_spark.count & i94non_immigrant_port_entry.count are not equal. FAIL')


us.count EQUALS us_race_cnt.count. PASS
us_spark read PASS.
i94_spark read PASS
i94_spark.count EQUALS i94non_immigrant_port_entry.count. PASS


In [None]:
4.3 Data Dictionary
Data dictionary is created in file: `4.3_data_dictionary.txt`.
Please refer to it there.

In [None]:
5 Project Write Up
Project write-up will be done in `5_project_writeup.txt`.
Please refer to it there.

In [74]:
# - Sample Query - 

import pyarrow.parquet as pq
i94visa_pq_file = "/home/terry/Desktop/DataEngineering/Capstone/data/i94visa.parquet"
df_i94visa = pq.read_table(source=i94visa_pq_file).to_pandas()

i94port_pq_file = "/home/terry/Desktop/DataEngineering/Capstone/data/i94port.parquet"
df_i94port = pq.read_table(source=i94port_pq_file).to_pandas()

In [75]:
print(df_i94port.to_string())

ISBURG                         PA
362  HSB                   HARRISONBURG                         PA
363  PHI                   PHILADELPHIA                         PA
364  PIT                      PITTSBURG                         PA
365  AGU                      AGUADILLA                         PR
366  BQN          BORINQUEN - AGUADILLO                         PR
367  JCP      CULEBRA - BENJAMIN RIVERA                         PR
368  ENS                       ENSENADA                         PR
369  FAJ                        FAJARDO                         PR
370  HUM                        HUMACAO                         PR
371  JOB                          JOBOS                         PR
372  MAY                       MAYAGUEZ                         PR
373  PON                          PONCE                         PR
374  PSE                PONCE-MERCEDITA                         PR
375  SAJ                       SAN JUAN                         PR
376  VQS                   V