# Data cleaning and feature creation
Often writes data to data/processed or data/interim

In [1]:
import polars as pl
import os

DATA_PATH = "../data/raw"

In [2]:
# Load data
file_names = [x for x in os.listdir(DATA_PATH) if ".csv" in x]  # List all CSV files in the raw data directory

df_list = []
for file in file_names:
    df = pl.scan_csv(os.path.join(DATA_PATH, file))
    df_list.append(df)
    
# Combine lazyframes into a single lazyframe
df_combined = pl.concat(df_list, how="vertical")
df_combined_col = df_combined.collect()


In [3]:
# TBC: Clean Station names + Scope to only day time readings (there is some issue with converting to datetime)

In [4]:
# To proceed with only station IDs having at least 2500 dates
selected_stations_list = (df_combined_col.group_by("station_id")
 .agg(pl.col("date").n_unique().alias("date_unique_count"))
 .sort("date_unique_count", descending=True)
 .filter(pl.col("date_unique_count") >= 2500)
 .select(pl.col("station_id"))
).to_series().to_list()

In [5]:
# Aggregate rainfall min max per day
df_agg = (df_combined_col.filter(pl.col("station_id").is_in(selected_stations_list))
          .group_by(["station_id", "date"])
          .agg(
    pl.min("reading_value").alias("min_rainfall"),
    pl.max("reading_value").alias("max_rainfall")
         ).sort("station_id", "date"))
df_agg.head()

station_id,date,min_rainfall,max_rainfall
str,str,f64,f64
"""S08""","""2017-01-01""",0.0,0.4
"""S08""","""2017-01-02""",0.0,0.6
"""S08""","""2017-01-03""",0.0,0.2
"""S08""","""2017-01-04""",0.0,0.4
"""S08""","""2017-01-05""",0.0,0.4


In [6]:
# Further filtering to having first and last date as 2017-01-01 and 2024-12-31 respectively
df_min_max_date = (df_agg.group_by("station_id")
                    .agg(
                        pl.min("date").alias("min_date"),
                        pl.max("date").alias("max_date"),
                        pl.count("date").alias("date_count"))
)

df_min_max_date = df_min_max_date.filter(
    (pl.col("min_date") == "2017-01-01") & 
    (pl.col("max_date") == "2024-12-31")
)

with pl.Config(tbl_rows=50):
    print(df_min_max_date) # 30 left, some dates are missing! 

shape: (30, 4)
┌────────────┬────────────┬────────────┬────────────┐
│ station_id ┆ min_date   ┆ max_date   ┆ date_count │
│ ---        ┆ ---        ┆ ---        ┆ ---        │
│ str        ┆ str        ┆ str        ┆ u32        │
╞════════════╪════════════╪════════════╪════════════╡
│ S08        ┆ 2017-01-01 ┆ 2024-12-31 ┆ 2803       │
│ S104       ┆ 2017-01-01 ┆ 2024-12-31 ┆ 2728       │
│ S107       ┆ 2017-01-01 ┆ 2024-12-31 ┆ 2842       │
│ S109       ┆ 2017-01-01 ┆ 2024-12-31 ┆ 2701       │
│ S112       ┆ 2017-01-01 ┆ 2024-12-31 ┆ 2796       │
│ S113       ┆ 2017-01-01 ┆ 2024-12-31 ┆ 2840       │
│ S115       ┆ 2017-01-01 ┆ 2024-12-31 ┆ 2791       │
│ S116       ┆ 2017-01-01 ┆ 2024-12-31 ┆ 2787       │
│ S119       ┆ 2017-01-01 ┆ 2024-12-31 ┆ 2854       │
│ S121       ┆ 2017-01-01 ┆ 2024-12-31 ┆ 2793       │
│ S123       ┆ 2017-01-01 ┆ 2024-12-31 ┆ 2767       │
│ S24        ┆ 2017-01-01 ┆ 2024-12-31 ┆ 2835       │
│ S33        ┆ 2017-01-01 ┆ 2024-12-31 ┆ 2794       │
│ S35        

In [7]:
# More filtering to have min and max dates as 2017-01-01 and 2024-12-31 respectively
df_agg = df_agg.filter(
    pl.col("station_id").is_in(
        df_min_max_date.select(pl.col("station_id")).to_series().to_list()
        )
)

In [8]:
# Need to fill in the missing dates for each station
df_agg = df_agg.with_columns(pl.col("date").cast(pl.Date, strict=False)).set_sorted("date")
df_agg_filled = df_agg.upsample(
    time_column="date", every="1d", group_by="station_id", maintain_order=True
).select(pl.all().forward_fill())


In [9]:
# Tag whether max of the station is more than 0.01
df_agg_filled = df_agg_filled.with_columns(
    pl.when(pl.col("max_rainfall") > 0.01)
    .then(1)
    .otherwise(0)
    .alias("y")
)

In [10]:
# joining in the station names
df_stations = (df_combined_col.select("station_id", "station_name", 
                                      "location_longitude", "location_latitude").unique(subset=["station_id", "station_name"]).sort("station_id")
               .filter(~pl.col("station_name").str.contains("S\\d", literal=False)))

df_agg_filled_final = df_agg_filled.join(df_stations, on="station_id", how="left")

In [11]:
df_agg_filled_final.head()

date,station_id,min_rainfall,max_rainfall,y,station_name,location_longitude,location_latitude
date,str,f64,f64,i32,str,f64,f64
2017-01-01,"""S08""",0.0,0.4,1,"""Upper Thomson Road""",103.8271,1.3701
2017-01-02,"""S08""",0.0,0.6,1,"""Upper Thomson Road""",103.8271,1.3701
2017-01-03,"""S08""",0.0,0.2,1,"""Upper Thomson Road""",103.8271,1.3701
2017-01-04,"""S08""",0.0,0.4,1,"""Upper Thomson Road""",103.8271,1.3701
2017-01-05,"""S08""",0.0,0.4,1,"""Upper Thomson Road""",103.8271,1.3701


In [12]:
df_agg_filled_final.describe()

statistic,date,station_id,min_rainfall,max_rainfall,y,station_name,location_longitude,location_latitude
str,str,str,f64,f64,f64,str,f64,f64
"""count""","""87660""","""87660""",87660.0,87660.0,87660.0,"""87660""",87660.0,87660.0
"""null_count""","""0""","""0""",0.0,0.0,0.0,"""0""",0.0,0.0
"""mean""","""2020-12-31 12:00:00""",,5e-06,1.298923,0.531736,,103.82084,1.347174
"""std""",,,0.000955,2.245187,0.498995,,0.088741,0.046951
"""min""","""2017-01-01""","""S08""",0.0,0.0,0.0,"""Alexandra Road""",103.61843,1.281
"""25%""","""2019-01-01""",,0.0,0.0,0.0,,103.7556,1.30703
"""50%""","""2021-01-01""",,0.0,0.2,1.0,,103.8271,1.3399
"""75%""","""2023-01-01""",,0.0,1.6,1.0,,103.8878,1.37288
"""max""","""2024-12-31""","""S94""",0.2,37.2,1.0,"""Woodlands Avenue 9""",103.9826,1.44387


In [22]:
# Create lagged columns for max rainfall
df_agg_filled_final = df_agg_filled_final.with_columns(
    [
        pl.col("max_rainfall").shift(i).over("station_id").alias(f"mr_lag_{i}")
        for i in range(1, 8)
    ]
)
df_agg_filled_final.group_by("station_id").head(3)

station_id,date,min_rainfall,max_rainfall,y,station_name,location_longitude,location_latitude,mr_lag_1,mr_lag_2,mr_lag_3,mr_lag_4,mr_lag_5,mr_lag_6,mr_lag_7
str,date,f64,f64,i32,str,f64,f64,f64,f64,f64,f64,f64,f64,f64
"""S109""",2017-01-01,0.0,1.0,1,"""Ang Mo Kio Avenue 5""",103.8492,1.3764,,,,,,,
"""S109""",2017-01-02,0.0,4.2,1,"""Ang Mo Kio Avenue 5""",103.8492,1.3764,1.0,,,,,,
"""S109""",2017-01-03,0.0,0.2,1,"""Ang Mo Kio Avenue 5""",103.8492,1.3764,4.2,1.0,,,,,
"""S77""",2017-01-01,0.0,4.8,1,"""Alexandra Road""",103.8125,1.2937,,,,,,,
"""S77""",2017-01-02,0.0,0.8,1,"""Alexandra Road""",103.8125,1.2937,4.8,,,,,,
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""S81""",2017-01-02,0.0,2.8,1,"""Punggol Central""",103.9092,1.4029,0.2,,,,,,
"""S81""",2017-01-03,0.0,0.0,0,"""Punggol Central""",103.9092,1.4029,2.8,0.2,,,,,
"""S35""",2017-01-01,0.0,2.6,1,"""Old Toh Tuck Road""",103.7556,1.3329,,,,,,,
"""S35""",2017-01-02,0.0,3.4,1,"""Old Toh Tuck Road""",103.7556,1.3329,2.6,,,,,,


In [23]:
# selecting only the columns we need
output_df = df_agg_filled_final.select(
    pl.col("date"),
    pl.col("^mr_lag_\\d$"),
    pl.col("^location.*$"),
    pl.col("station_name"),
    pl.col("y"))

In [24]:
# Split into train (2017-2022), test(2023) and deploy sets (2024)
train = output_df.filter(output_df['date'].dt.year().is_between(2017, 2022)).write_parquet("../data/processed/train.parquet")
test = output_df.filter(output_df['date'].dt.year() == 2023).write_parquet("../data/processed/test.parquet")
deploy = output_df.filter(output_df['date'].dt.year() == 2024).write_parquet("../data/processed/deploy.parquet")