## Loading Data from Google Cloud Storage (GCS) and Schema Alignment

In [1]:
from pyspark.sql import SparkSession, functions as F, types as T
import unicodedata
from functools import reduce
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Imputer, VectorAssembler
from pyspark.sql.types import DoubleType

In [2]:
spark = (
    SparkSession.builder
    .appName("Read-CSV-Subset-GCS")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")

GCS_PATH = "gs://dsa5208-mllib-proj/2024/*.csv" 
REQUIRED_COLS = [
    'DATE','LATITUDE','LONGITUDE','ELEVATION','WND','CIG','VIS','DEW','SLP','TMP'
]

def canon(name: str) -> str:
    s = unicodedata.normalize("NFKC", name or "")
    s = s.replace("\ufeff","").strip()
    s = " ".join(s.split())
    return s.replace(" ", "_").lower()

df_all = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", False)
    .option("multiLine", False)
    .option("recursiveFileLookup", True)
    .option("mode", "PERMISSIVE")
    .option("columnNameOfCorruptRecord", "_corrupt")
    .load(GCS_PATH) 
)

REQUIRED_COLS = [canon(c) for c in REQUIRED_COLS]

print("DF columns:", df_all.columns[:20])  # first 20
print("Required :", REQUIRED_COLS)
present = [c for c in REQUIRED_COLS if c in df_all.columns]
missing = [c for c in REQUIRED_COLS if c not in df_all.columns]
print("Present   :", present)
print("Missing   :", missing)

canon_cols = [canon(c) for c in df_all.columns]
df_all = df_all.toDF(*canon_cols)

required_set = set(REQUIRED_COLS)
present = [c for c in REQUIRED_COLS if c in df_all.columns]
missing = [c for c in REQUIRED_COLS if c not in df_all.columns]

for c in missing:
    df_all = df_all.withColumn(c, F.lit(None).cast(T.StringType()))

df_subset = df_all.select(*REQUIRED_COLS)

print(f"Files read from: {GCS_PATH}")
print(f"Total rows: {df_subset.count()}")
print("Columns kept:", df_subset.columns)
if missing:
    print("Missing columns (added as NULL):", missing)


25/11/04 05:24:00 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

DF columns: ['STATION', 'DATE', 'SOURCE', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'NAME', 'REPORT_TYPE', 'CALL_SIGN', 'QUALITY_CONTROL', 'WND', 'CIG', 'VIS', 'TMP', 'DEW', 'SLP', 'AA1', 'AB1', 'AE1', 'AO1']
Required : ['date', 'latitude', 'longitude', 'elevation', 'wnd', 'cig', 'vis', 'dew', 'slp', 'tmp']
Present   : []
Missing   : ['date', 'latitude', 'longitude', 'elevation', 'wnd', 'cig', 'vis', 'dew', 'slp', 'tmp']
Files read from: gs://dsa5208-mllib-proj/2024/*.csv




Total rows: 130222106
Columns kept: ['date', 'latitude', 'longitude', 'elevation', 'wnd', 'cig', 'vis', 'dew', 'slp', 'tmp']


                                                                                

In [3]:
df_f = df_subset
df_f.show(3, truncate=False)

+-------------------+--------+---------+---------+--------------+-----------+------------+-------+-------+-------+
|date               |latitude|longitude|elevation|wnd           |cig        |vis         |dew    |slp    |tmp    |
+-------------------+--------+---------+---------+--------------+-----------+------------+-------+-------+-------+
|2024-01-01T00:00:00|34.7728 |-87.6399 |161.5    |211,1,H,0018,1|99999,9,9,N|999999,9,9,9|+9999,9|99999,9|+0105,1|
|2024-01-01T00:05:00|34.7728 |-87.6399 |161.5    |214,1,H,0017,1|99999,9,9,N|999999,9,9,9|+9999,9|99999,9|+0100,1|
|2024-01-01T00:10:00|34.7728 |-87.6399 |161.5    |212,1,H,0017,1|99999,9,9,N|999999,9,9,9|+9999,9|99999,9|+0088,1|
+-------------------+--------+---------+---------+--------------+-----------+------------+-------+-------+-------+
only showing top 3 rows



## Parsing and Splitting Multi-Value Observation Columns

In [4]:
# Generic splitter for unsigned comma-separated fields (e.g., WND, CIG, VIS, SLP)
def split_column(df, col, new_names, casts=None):
    arr = F.split(F.col(col).cast("string"), ",")
    for i, name in enumerate(new_names):
        v = F.trim(F.when(F.size(arr) > i, arr.getItem(i)))
        if casts and i < len(casts) and casts[i]:
            v = v.cast(casts[i])
        df = df.withColumn(f"{col}_{name}", v)
    return df.drop(col)

# For TMP/DEW: signed integer in tenths of Â°C + QC
def split_signed_tenths(df, col, value_name="C", qc_name="q"):
    # build expressions that reference `col`
    arr  = F.split(F.col(col).cast("string"), ",")
    raw  = F.trim(F.when(F.size(arr) > 0, arr.getItem(0)))
    qc   = F.trim(F.when(F.size(arr) > 1, arr.getItem(1))).cast("int")

    tenths = F.regexp_extract(raw, r'^[+-]?\d+', 0).cast("int")
    tenths = F.when(tenths.isNull() | (F.abs(tenths) == 9999), None).otherwise(tenths)
    degC   = (tenths / 10.0).cast("double")

    # add columns while `col` still exists, then drop it
    df = df.withColumn(f"{col}_{value_name}", degC) \
           .withColumn(f"{col}_{qc_name}", qc)
    return df.drop(col)

# Unsigned multi-part fields
df_f = split_column(df_f, "wnd", ["dir","q1","type","speed","q2"], ["int","int","string","int","int"])
df_f = split_column(df_f, "cig", ["height","q"], ["int","int"])
df_f = split_column(df_f, "vis", ["dist","q1"], ["int","int"])
df_f = split_column(df_f, "slp", ["val","q"], ["int","int"])

# Signed & scaled fields
df_f = split_signed_tenths(df_f, "tmp", value_name="cel", qc_name="q")   # -> TMP_cel, TMP_q
df_f = split_signed_tenths(df_f, "dew", value_name="cel", qc_name="q")   # -> DEW_cel, DEW_q

# Signed single-value numeric columns
df_f = (df_f
  .withColumn("latitude",  F.col("latitude").cast("double"))
  .withColumn("longitude", F.col("longitude").cast("double"))
  .withColumn("elevation", F.col("elevation").cast("double")))


In [5]:
df_f.show(3, truncate=False)

+-------------------+--------+---------+---------+-------+------+--------+---------+------+----------+-----+--------+------+-------+-----+-------+-----+-------+-----+
|date               |latitude|longitude|elevation|wnd_dir|wnd_q1|wnd_type|wnd_speed|wnd_q2|cig_height|cig_q|vis_dist|vis_q1|slp_val|slp_q|tmp_cel|tmp_q|dew_cel|dew_q|
+-------------------+--------+---------+---------+-------+------+--------+---------+------+----------+-----+--------+------+-------+-----+-------+-----+-------+-----+
|2024-01-01T00:00:00|34.7728 |-87.6399 |161.5    |211    |1     |H       |18       |1     |99999     |9    |999999  |9     |99999  |9    |10.5   |1    |NULL   |9    |
|2024-01-01T00:05:00|34.7728 |-87.6399 |161.5    |214    |1     |H       |17       |1     |99999     |9    |999999  |9     |99999  |9    |10.0   |1    |NULL   |9    |
|2024-01-01T00:10:00|34.7728 |-87.6399 |161.5    |212    |1     |H       |17       |1     |99999     |9    |999999  |9     |99999  |9    |8.8    |1    |NULL   |9    

In [6]:
#Check if the negative values remain

df_f.where(F.col("tmp_cel") < 0) \
    .select("date","latitude","longitude","tmp_cel","tmp_q") \
    .show(5, False)

df_f.where(F.col("dew_cel") < 0) \
    .select("date","latitude","longitude","dew_cel","dew_q") \
    .show(5, False)

+-------------------+--------+---------+-------+-----+
|date               |latitude|longitude|tmp_cel|tmp_q|
+-------------------+--------+---------+-------+-----+
|2024-01-02T12:35:00|34.7728 |-87.6399 |-0.7   |1    |
|2024-01-02T12:40:00|34.7728 |-87.6399 |-0.5   |1    |
|2024-01-02T12:45:00|34.7728 |-87.6399 |-1.3   |1    |
|2024-01-02T12:50:00|34.7728 |-87.6399 |-1.3   |1    |
|2024-01-02T12:55:00|34.7728 |-87.6399 |-0.8   |1    |
+-------------------+--------+---------+-------+-----+
only showing top 5 rows





+-------------------+---------+---------+-------+-----+
|date               |latitude |longitude|dew_cel|dew_q|
+-------------------+---------+---------+-------+-----+
|2024-01-06T21:00:00|52.923353|4.780625 |-0.4   |1    |
|2024-01-06T21:06:00|52.923353|4.780625 |-1.0   |1    |
|2024-01-06T21:14:00|52.923353|4.780625 |-1.0   |1    |
|2024-01-06T21:25:00|52.923353|4.780625 |-1.0   |1    |
|2024-01-06T21:27:00|52.923353|4.780625 |-1.0   |1    |
+-------------------+---------+---------+-------+-----+
only showing top 5 rows



                                                                                

## Removal of Sentinel Values 

In [7]:
# (1) Define sentinel values based on data documentation from the Federal Climate Complex \n
# Data Documentation for Integrated Surface Data (ISD)

removal_values = [
    '3', '7', '9', '99', '999', '9999', '99999', '999999',
    '+3', '+7', '+9', '+99', '+999', '+9999', '+99999', '+999999'
]

# (2) Filtering: Keep rows where *no* column contains any of the sentinel values.
# We cast data to string because some columns are numeric.

valid_conditions = [
    ~F.col(c).cast("string").isin(removal_values)
    for c in df_f.columns
]

combined_condition = reduce(lambda a, b: a & b, valid_conditions)

# (3) Apply the filter
df_f = df_f.filter(combined_condition)

# (4) Count rows after filtering (single pass)
row_count = df_f.count()
print(f"Rows remaining after sentinel removal: {row_count:,}")

df_f.show(5, truncate=False)

                                                                                

Rows remaining after sentinel removal: 17,953,096




+-------------------+---------+---------+---------+-------+------+--------+---------+------+----------+-----+--------+------+-------+-----+-------+-----+-------+-----+
|date               |latitude |longitude|elevation|wnd_dir|wnd_q1|wnd_type|wnd_speed|wnd_q2|cig_height|cig_q|vis_dist|vis_q1|slp_val|slp_q|tmp_cel|tmp_q|dew_cel|dew_q|
+-------------------+---------+---------+---------+-------+------+--------+---------+------+----------+-----+--------+------+-------+-----+-------+-----+-------+-----+
|2024-01-01T00:00:00|52.923353|4.780625 |0.91     |220    |1     |N       |100      |1     |900       |1    |9000    |1     |9928   |1    |7.5    |1    |5.1    |1    |
|2024-01-01T01:00:00|52.923353|4.780625 |0.91     |220    |1     |N       |90       |1     |750       |1    |5000    |1     |9930   |1    |6.7    |1    |5.1    |1    |
|2024-01-01T02:00:00|52.923353|4.780625 |0.91     |230    |1     |N       |70       |1     |1200      |1    |16000   |1     |9928   |1    |8.7    |1    |6.1    

                                                                                

### Mid-point Row Count Check

Number of Rows Left: Final rows: 17,953,096 (removed 112,269,010 rows with sentinel values)


## Feature Engineering

In [8]:
# Convert DATE column to proper timestamp format
# Spark automatically parses standard formats like "2024-03-25T14:00:00"
df_f = df_f.withColumn("date", F.to_timestamp(F.col("date")))
df_f = df_f.withColumn("date_numeric", F.datediff(F.col("date"), F.lit("1970-01-01")))
df_f = df_f.drop("date")

In [9]:
# Creating column WND_SIN, WND_COS only is WND_DIR exists, otherwise fallback to raw data
if "wnd_dir" in df_f.columns and "wnd_dir_sin" not in df_f.columns and "wnd_dir_cos" not in df_f.columns:
    df_f = df_f.withColumn("wnd_dir", F.col("wnd_dir").cast(DoubleType()))
    rad = F.radians(F.col("wnd_dir"))
    df_f = df_f.withColumn("wnd_dir_sin", F.sin(rad)).withColumn("wnd_dir_cos", F.cos(rad))

# Converting to Standard metric hPa
if "slp_val" in df_f.columns and "slp_hpa" not in df_f.columns:
    df_f = df_f.withColumn("slp_hpa", (F.col("slp_val") / 10.0).cast(DoubleType()))

In [10]:
df_f.show(3, truncate=False)



+---------+---------+---------+-------+------+--------+---------+------+----------+-----+--------+------+-------+-----+-------+-----+-------+-----+------------+-------------------+-------------------+-------+
|latitude |longitude|elevation|wnd_dir|wnd_q1|wnd_type|wnd_speed|wnd_q2|cig_height|cig_q|vis_dist|vis_q1|slp_val|slp_q|tmp_cel|tmp_q|dew_cel|dew_q|date_numeric|wnd_dir_sin        |wnd_dir_cos        |slp_hpa|
+---------+---------+---------+-------+------+--------+---------+------+----------+-----+--------+------+-------+-----+-------+-----+-------+-----+------------+-------------------+-------------------+-------+
|52.923353|4.780625 |0.91     |220.0  |1     |N       |100      |1     |900       |1    |9000    |1     |9928   |1    |7.5    |1    |5.1    |1    |19723       |-0.6427876096865393|-0.766044443118978 |992.8  |
|52.923353|4.780625 |0.91     |220.0  |1     |N       |90       |1     |750       |1    |5000    |1     |9930   |1    |6.7    |1    |5.1    |1    |19723       |-0.6

                                                                                

In [11]:
#Optional Code for categorical columns

#Only use categorical columns that exist and have non-null values
#CATEGORICAL_COLS = []
#for col in ["wnd_type"]:  # add more potential categorical cols here
#    if col in df_f.columns:
#        non_null_count = df_f.filter(F.col(col).isNotNull()).count()
#        if non_null_count > 0:
#            CATEGORICAL_COLS.append(col)
#            print(f"Using categorical column: {col} ({non_null_count} non-null rows)")
#        else:
#            print(f"Skipping {col}: all values are null")
#    else:
#        print(f"Column {col} does not exist")

#print(f"\nFinal CATEGORICAL_COLS: {CATEGORICAL_COLS}")

In [12]:
TARGET_COL = "tmp_cel"
FEATURE_COLS = [
    "latitude","longitude","elevation",
    "dew_cel","slp_hpa","wnd_speed","vis_dist","cig_height",
    "date_numeric"]

ALL = [TARGET_COL] + FEATURE_COLS

# # Finalised Dataset for Modeling
df_final = df_f.select([F.col(c).cast("double").alias(c) for c in ALL])

## Saving the Dataset in Parquet Format for Subsequent ML Modeling 

In [13]:
# Cleaned Dataset
df_final.write.mode("overwrite").parquet("gs://dsa5208-mllib-proj/processed/df_cleaned.parquet")

                                                                                