In [2]:
import pandas as pd
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window

spark = (
    SparkSession.builder.appName("MAST30034 Project 1 Data Preprocessing")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config('spark.driver.memory', '4g')
    .config('spark.executor.memory', '2g')
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")

24/08/26 01:08:54 WARN Utils: Your hostname, DESKTOP-H6V94HM resolves to a loopback address: 127.0.1.1; using 192.168.0.100 instead (on interface eth0)
24/08/26 01:08:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/26 01:08:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Supplementary Data Preprocessing


## Weather

We will shadow the Tan's approach that was mentioned in the MAST30034 2022 Sample Solution to process the feature as well as the final subset of feature that will be used for analysis and modelling.

In [4]:
def basic_cleaning(df):
    """
        This function perform basic cleaning and extraction of the columns of a dataframe
    """
    selected_columns = ["DATE","WND","TMP","DEW","SLP"]
    df = df.select(*selected_columns)
    
    for col in df.columns:
        if col == "DATE":
            df = df.filter(
                (("2023-03-01" <= F.to_date(F.col(col))) & (F.to_date(F.col(col)) <= "2023-12-31")) |
                (("2024-01-01" <= F.to_date(F.col(col))) & (F.to_date(F.col(col)) <= "2024-05-31")))
            continue
        df = df.withColumn(col, F.split(F.col(col), ","))

    # Extracting date and assigning season
    df = df.withColumns({"year": F.year(F.col("DATE")),
                         "month": F.month(F.col("DATE")),
                         "day": F.day(F.col("DATE")),
                         "time": F.date_format(F.col("DATE"), "HH:mm:ss"),
                         "season": season_assign(F.col("DATE"))})
    df = df.drop(F.col("DATE"))

    # Selecting the desired item for each column
    df = df.withColumn("WND", F.col("WND").getItem(3).cast(DoubleType()))
    df = df.withColumn("TMP", F.col("TMP").getItem(0).cast(DoubleType()))
    df = df.withColumn("DEW", F.col("DEW").getItem(0).cast(DoubleType()))
    df = df.withColumn("SLP", F.col("SLP").getItem(0).cast(DoubleType()))


    df = df.withColumnsRenamed({"WND": "wind_speed", "TMP": "temp",
                                "DEW": "dew_point", "SLP": "atm_pressure"})
    ordered_cols = ["season", "year", "month", "day", "time", "wind_speed", "temp",
                    "dew_point", "atm_pressure"]
    
    df = df.select(*ordered_cols)
    return df

def season_assign(col):
    """
        Assign season to entries according to date
    """
    col = F.month(col)
    return (F.when((col == 3) | (col == 4) | (col == 5), 1)\
        .when((col == 6) | (col == 7) | (col == 8), 2)\
        .when((col == 9) | (col == 10) | (col == 11), 3)\
        .when((col == 12) | (col == 1) | (col == 2),4))

def compute_missing_values(df, col):
    """
        This function replace missing value of a feature by the value of the instance right before it.
        This function was inspired by Ming Hui Tan - MAST30034 2022 Sample Solution
    """
    df = df.withColumn("index", F.row_number().over(Window.orderBy(F.lit(1))))
    window_spec = Window.orderBy("index")

    # Creating lag value i.e. value of the preceding row for a specified column
    df = df.withColumn("prev_value", F.lag(col).over(window_spec))
    df = df.withColumn(col, F.when((F.col(col) == 9999) | (F.col(col) == 99999), F.col("prev_value")).otherwise(F.col(col))).drop("prev_value", "index")
    
    return df

def rescale_column(df):
    """
        Rescale the values of the field accordingly to the data dictionary
    """
    SCALE_FACTOR = {"wind_speed": 10, "temp": 10, "dew_point": 10, "atm_pressure": 10}

    for col, factor in SCALE_FACTOR.items():
        df = df.withColumn(col, F.round(F.col(col)/factor,2))
    
    return df

In [10]:
# Load 2023 weather
weather_23 = spark.read.parquet("../data/raw/2023-weather")
weather_23 = basic_cleaning(weather_23)

# Load 2024 weather
weather_24 = spark.read.parquet("../data/raw/2024-weather")
weather_24 = basic_cleaning(weather_24)

Here, we begin to split the weather data into train and test which will later on be matched with the corresponding train and test data. Train data holds records from March 2023 to Febuary 2024, and test data holds records from March 2024 to May 2024

In [11]:
df_train = weather_23.union(weather_24.filter(F.col('month') <= 2))
df_test = weather_24.filter(F.col('month') >= 3)

In [12]:
# Compute missing value by it using it previous records. Some are next to each other, thus iterate the process 
for i in range(1,6):
    for col in ["wind_speed", "temp", "dew_point", "atm_pressure"]:
        df_train = compute_missing_values(df_train, col)
        df_test = compute_missing_values(df_test, col)

Rescaling the column accordingly to the data dictionary and drop any duplicates

In [13]:
df_train = rescale_column(df_train)
df_train = df_train.dropDuplicates()

In [14]:
df_test = rescale_column(df_test)
df_test = df_test.dropDuplicates()

Aggregate the data into a conformable structure that can be used for merging later on.

In [15]:
agg_df_train = df_train.groupBy("month")\
    .agg(
        F.round(F.mean("temp"), 2).alias("avg_temp"),
        F.round(F.mean("wind_speed"),2).alias("avg_windspeed"),
        F.round(F.mean("dew_point"),2).alias("avg_dew"),
        F.round(F.mean("atm_pressure"),2).alias("avg_pressure")
    )

In [17]:
agg_df_test = df_test.groupBy("month")\
    .agg(
        F.round(F.mean("temp"), 2).alias("avg_temp"),
        F.round(F.mean("wind_speed"),2).alias("avg_windspeed"),
        F.round(F.mean("dew_point"),2).alias("avg_dew"),
        F.round(F.mean("atm_pressure"),2).alias("avg_pressure")
    )

Save the clean data into the curated layer

In [18]:
import os
output_dir = "../data/curated"

if not os.path.exists(output_dir):
    os.mkdir(output_dir)

In [None]:
agg_df_train.write.parquet("../data/curated/weather_train")
agg_df_test.write.parquet("../data/curated/weather_test")

## Unemployment rate

This data set is simple and there no need for data cleaning. Some transformation are apply to the `Label` feature so that it contain the month only, which we will use later on for merging. We also filter the data based and split the data that matches our research range

In [20]:
emp_df = pd.read_csv("../data/landing/unemployment_rate.csv")
emp_df = emp_df[['Label', 'Value']]

# Filters enties outside of desired range
emp_df = emp_df.loc[(emp_df['Label'] != '2023 Jan') & 
           (emp_df['Label'] != '2023 Feb') &
           (emp_df['Label'] != '2024 Jun') &
           (emp_df['Label'] != '2024 Jul')]

In [21]:
# Convert format: YYYY M to categorical numerical
emp_df_train = emp_df.iloc[ :12, :]
month_mapping = {
    '2023 Mar': 3, '2023 Apr': 4, '2023 May': 5,
    '2023 Jun': 6, '2023 Jul': 7, '2023 Aug': 8,
    '2023 Sep': 9, '2023 Oct': 10, '2023 Nov': 11,
    '2023 Dec': 12, '2024 Jan': 1, '2024 Feb': 2
}
emp_df_train['Label'] = emp_df_train['Label'].replace(month_mapping)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  emp_df_train['Label'] = emp_df_train['Label'].replace(month_mapping)


In [22]:
emp_df_test = emp_df.iloc[ 12: , :]
month_mapping = {
    '2024 Mar': 3, '2024 Apr': 4, '2024 May': 5
}
emp_df_test['Label'] = emp_df_test['Label'].replace(month_mapping)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  emp_df_test['Label'] = emp_df_test['Label'].replace(month_mapping)


Save the `.csv` file as `.parquet`

In [23]:
emp_df_train.to_parquet("../data/curated/unemp_train.parquet", index=False)
emp_df_test.to_parquet("../data/curated/unemp_test.parquet", index=False)