In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import pandas as pd
import pyarrow.dataset as ds
import pyarrow as pa

import duckdb

pd.set_option('display.max_columns', None)

# Description

The general process for updating these is to:

0) Read configs for each dataset
1) Read in the data already collected and stored in partitions (use duckdb for reads)
    - Get the data for the `n_latest_dates`
    - `df_current`: to use later for filtering out duplicate data already stored
2) Load all new data parquet files and concatenate.
3) Perform all of the requisite preprocessing steps for that type of data
4) Filter out duplicates already in `df_current`
5) Save using pd.to_parquet() with partition_cols

# 0 - Configuration

In [None]:
from src.galton.data_collection.partition_configs import config

In [None]:
n_latest_dates = 5
data_path = "/Users/luketownsend/Desktop/projects/tetlock/data/"

select_dataset = "openmeteo_forecasts"

partition_folder_name = config[select_dataset]["partition_folder_name"]
start_date_field = config[select_dataset]["start_date_field"]
new_data_file_prefixes = config[select_dataset]["new_data_file_prefixes"]
record_index_fields = config[select_dataset]["record_index_fields"]
output_columns = config[select_dataset]["output_columns"]
partition_columns = config[select_dataset]["partition_columns"]

# 1 - Read Existing Data

In [None]:
con = duckdb.connect()

con.execute(f"""
    CREATE OR REPLACE VIEW {partition_folder_name} AS
    SELECT *
    FROM read_parquet('data/local_data/{partition_folder_name}/**/*.parquet');
""")

query = f"""
WITH latest_dates AS (
    SELECT DISTINCT {start_date_field}
    FROM {partition_folder_name}
    ORDER BY {start_date_field} DESC
    LIMIT {n_latest_dates}
)
SELECT f.*
FROM {partition_folder_name} f
JOIN latest_dates d
USING ({start_date_field})
"""

con.execute(query)
df_current = con.fetch_df()

In [None]:
df_current[start_date_field].value_counts()

In [None]:
df_current.shape

# 2 Load New Data

In [None]:
from datetime import datetime, timedelta

#TODO: Modify to accept a list of prefixes
#TODO: Write function to convert date to start_date format
def generate_prefixed_dates(prefix: str, start_date: str) -> list[str]:
    """
    Given a prefix (e.g. 'multi_model_forecast - ') and a start date ('2025-11-21'),
    return a list of 'prefix + YYYY-MM-DD' for every date from start_date through today.
    """
    start = datetime.strptime(start_date, "%Y-%m-%d").date()
    today = datetime.today().date()
    
    num_days = (today - start).days + 1
    return [f"{prefix}{(start + timedelta(days=i))}" for i in range(num_days)]



start_date = df_current[start_date_field].min()

files = generate_prefixed_dates(new_data_file_prefixes[0], start_date)
print(files[-5:])  # show last few

In [None]:
dataset = ds.dataset(data_path, format="parquet")

filenames = dataset.files
filtered = [f for f in filenames if any(p in f for p in files)]
print(len(filtered))

frames = []
for file in filtered:
    temp_df = pd.read_parquet(file)
    frames.append(temp_df)

df = pd.concat(frames)

print(df.shape)

# 3 - Preprocess New Data

In [None]:
# TODO: Refactor & move into dedicated module or add to data_collection.openmeteo?

from src.galton.data_collection.utilities import (
    normalize_field_names,
    convert_datetime_to_utc,
)
from src.galton.feature_engineering.dates import add_date_fields
from src.galton.feature_engineering.forecasts import (
    add_forecast_fields,
    filter_redundant_forecasts,
    filter_unused_forecast_data,
)

df = normalize_field_names(df)
df = add_date_fields(df)
df = add_forecast_fields(df)

print(df.shape)

df = filter_unused_forecast_data(df)
df = filter_redundant_forecasts(df)

df = convert_datetime_to_utc(df)

df["forecast_horizon"] = df["forecast_horizon"].astype(int)

print(df.shape)

datetime_cols = ["datetime", "model_timestamp", "current_timestamp"]

for col in datetime_cols:
    df[col] = df[col].dt.tz_convert("America/Chicago")

# 4 - Filter Existing Records from New Data 

In [None]:
current_index = df_current.set_index(record_index_fields).index

df = df.set_index(record_index_fields)

df = df[~df.index.isin(current_index)].reset_index()

print(df.shape)

df = df[output_columns]

# 5 - Save New Data to Partition

In [None]:
df.to_parquet(f"data/local_data/{partition_folder_name}", engine="pyarrow", partition_cols=partition_columns)