In [2]:
# | default_exp data

In [3]:
# | exporti

import abc
from dataclasses import dataclass
from datetime import timedelta
from itertools import product

import numpy as np
import polars as pl

In [4]:
# | export


class DataLoader:
    """A class for loading and processing time series data with adjustable parameters.

    This class handles loading CSV data, computing returns, and calculating rolling variance.

    Attributes:
        max_records: Maximum number of records to keep (from the end of the dataset)
    """

    def __init__(self, max_records: int = None):
        self.max_records = max_records

    def load_data(self, path: str) -> pl.DataFrame:
        """Load and process time series data from a CSV file.

        Args:
            path: Path to the CSV file containing the data

        Returns:
            Processed DataFrame with returns and rolling variance
        """
        df = (
            pl.read_csv(path, try_parse_dates=True, infer_schema_length=None)
            .rename({"Date": "date", "Price": "price"})
            .sort("date")
        )
        df = df.with_columns(
            ret=pl.col("price") / pl.col("price").shift(1),
        )

        if self.max_records is not None:
            df = df[-self.max_records :]

        return df

In [5]:
# | exec: false
# Create a data loader with default parameters and load the data
data_loader = DataLoader(max_records=9000)
source_df = data_loader.load_data("./data/ng_daily.csv")
source_df.head()

date,price,ret
date,f64,f64
1997-01-07,3.82,
1997-01-08,3.8,0.994764
1997-01-09,3.61,0.95
1997-01-10,3.92,1.085873
1997-01-13,4.0,1.020408


In [None]:
# | export


@dataclass
class DFFeature(abc.ABC):
    source_field: str
    feature_name: str

    @abc.abstractmethod
    def extract(self, df: pl.DataFrame) -> pl.DataFrame:
        pass


@dataclass
class Variance(DFFeature):
    rolling_variance_window: int = 3

    def extract(self, df: pl.DataFrame) -> pl.DataFrame:
        return df.with_columns(
            pl.col(self.source_field)
            .rolling_var(self.rolling_variance_window)
            .clip(lower_bound=1e-4)
            .alias(self.feature_name)
        )


@dataclass
class Square(DFFeature):
    def extract(self, df: pl.DataFrame) -> pl.DataFrame:
        return df.with_columns(
            (pl.col(self.source_field) ** 2).alias(self.feature_name)
        )


@dataclass
class LogReturn(DFFeature):
    def extract(self, df: pl.DataFrame) -> pl.DataFrame:
        return df.with_columns(pl.col(self.source_field).log().alias(self.feature_name))


@dataclass
class FeatureEngineer:
    """A class for creating lagged features from time series data.

    This class handles the creation of lagged (shifted) features that can be used for
    GARCH-like models and other time series forecasting tasks.

    Attributes:
        columns: List of column names to create lags for
        n_shifts: Number of lag periods to create
        drop_nulls: whether to drop the nulls after rolling window calculations
    """

    def __init__(
        self,
        transforms: list[DFFeature],
        n_shifts=3,
        drop_nulls: bool = True,
    ):
        """Initialize the FeatureEngineer.

        Args:
            columns: List of column names to create lags for (default: ['ret', 'var'])
            n_shifts: Number of lag periods to create (default: 3)
            drop_nulls: whether to drop the nulls after rolling window calculations
        """
        self.transforms = transforms
        self.columns = [t.feature_name for t in transforms]
        self.n_shifts = n_shifts
        self.drop_nulls = drop_nulls

    def create_features(self, df: pl.DataFrame) -> pl.DataFrame:
        """Create lagged features from the input DataFrame.

        Args:
            df: Input DataFrame containing time series data

        Returns:
            DataFrame with original columns plus lagged features
        """
        # Create a copy of the dataframe to avoid modifying the original
        for t in self.transforms:
            df = t.extract(df)

        result_df = df.clone()

        # Create lagged features for each specified column
        for col in self.columns:
            # Check if column exists in dataframe
            if col not in df.columns:
                print(f"Warning: Column '{col}' not found in dataframe. Skipping.")
                continue

            # Create each lag
            for shift in range(1, self.n_shifts + 1):
                # Create new column name (e.g., prev_ret_1, prev_var_2)
                new_col_name = f"prev_{col}_{shift}"

                # Add the shifted column to the dataframe
                result_df = result_df.with_columns(
                    pl.col(col).shift(shift).alias(new_col_name)
                )
        if self.drop_nulls:
            result_df = result_df.drop_nulls()
        return result_df

    def to_numpy_dict(self, df: pl.DataFrame, drop: set[str] | None = None) -> dict:
        """Convert the dataframe with lagged features to a dictionary of NumPy arrays.

        This method extracts the original columns that were used to create lags,
        as well as all the generated lag columns, and converts them to NumPy arrays.
        The resulting dictionary can be used directly with NumPyro models.

        Args:
            df: DataFrame with lagged features created by create_features()

        Returns:
            Dictionary mapping column names to NumPy arrays
        """
        if drop is None:
            drop = set()
        # Create a new dataframe with the features we want to process
        features_df = df.clone()

        # Dictionary to store the NumPy arrays
        numpy_dict = {}

        # Add original columns
        for col in self.columns:
            if col in drop:
                continue
            if col in features_df.columns:
                numpy_dict[col] = features_df[col].to_numpy()

        # Add lagged features
        for col in self.columns:
            if col in drop:
                continue
            for shift in range(1, self.n_shifts + 1):
                lag_col = f"prev_{col}_{shift}"
                if lag_col in features_df.columns:
                    numpy_dict[lag_col] = features_df[lag_col].to_numpy()

        return numpy_dict

    def get_iterator(self, site: str | None = None):
        if site is None:
            site = self.columns
        else:
            site = [site]
        for pair in product(site, range(1, self.n_shifts + 1)):
            yield pair

    def get_shift_pattern(self, site: str, shift: int, prefix: str = ""):
        return f"{prefix}prev_{site}_{shift}"

In [7]:
# | exec: false
feature_engineer = FeatureEngineer(
    transforms=[
        LogReturn(source_field="ret", feature_name="log_ret"),
        Variance(source_field="price", feature_name="var"),
    ],
    n_shifts=3,
)
df_with_features = feature_engineer.create_features(source_df)
df_with_features.head()

date,price,ret,log_ret,var,prev_log_ret_1,prev_log_ret_2,prev_log_ret_3,prev_var_1,prev_var_2,prev_var_3
date,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
1997-01-14,4.01,1.0025,0.002497,0.002433,0.020203,0.082384,-0.051293,0.042433,0.024433,0.013433
1997-01-15,4.34,1.082294,0.079083,0.037433,0.002497,0.020203,0.082384,0.002433,0.042433,0.024433
1997-01-16,4.71,1.085253,0.081814,0.122633,0.079083,0.002497,0.020203,0.037433,0.002433,0.042433
1997-01-17,3.91,0.830149,-0.186151,0.1603,0.081814,0.079083,0.002497,0.122633,0.037433,0.002433
1997-01-20,3.26,0.83376,-0.18181,0.5275,-0.186151,0.081814,0.079083,0.1603,0.122633,0.037433


In [8]:
# | export
def append_from_log_ret(df: pl.DataFrame, new_log_ret: float) -> pl.DataFrame:
    """Adds a new record to the dataframe based on a log return value.

    Args:
        df: Input DataFrame containing time series data
        new_log_ret: The new log return value to add

    Returns:
        DataFrame with a new row appended
    """
    # Get the latest date and add one day
    last_date = df["date"].max()
    new_date = last_date + timedelta(days=1)

    # Calculate the new return value from log return
    new_ret = np.exp(new_log_ret)

    # Get the last price and calculate the new price
    last_price = df["price"].tail(1).item()
    new_price = last_price * new_ret

    # Create a new row
    new_row = pl.DataFrame(
        {
            "date": [new_date],
            "price": [new_price],
            "ret": [new_ret],
        }
    )

    # Append the new row to the existing DataFrame
    return pl.concat([df, new_row])