In [None]:
import os
from typing import Optional
import pandas as pd
import polars as pl

import kaggle_evaluation.jane_street_inference_server


The evaluation API requires that you set up a server which will respond to inference requests. We have already defined the server; you just need write the predict function. When we evaluate your submission on the hidden test set the client defined in `jane_street_gateway` will run in a different container with direct access to the hidden test set and hand off the data timestep by timestep.



Your code will always have access to the published copies of the files.

## Custom Classes/Methods/Functions

In [None]:
class Preprocessor:
    def __init__(
        self,
        symbol_id: Optional[int] = None,
        responder: int = 6,
        partition_ids: Optional[list[int]] = None,
        feature_set: Optional[list] = None,
        sample_frequency: int = 15,
        exclude_set: list = [
            "feature_00",
            "feature_01",
            "feature_02",
            "feature_03",
            "feature_04",
            "feature_21",
            "feature_26",
            "feature_27",
            "feature_31",
        ],
    ):
        self.symbol_id = symbol_id
        self.responder = responder
        self.partition_ids = partition_ids
        self.feature_set = feature_set
        self.sample_frequency = sample_frequency
        self.exclude_set = exclude_set

    def filter_symbol(self, df: pd.DataFrame, symbol_id: int):
        return df[df["symbol_id"] == symbol_id]

    def resample(self, df: pd.DataFrame, sample_frequency: int):
        return (
            df.set_index("time_index")
            .resample(f"{sample_frequency}min")
            .first()
            .reset_index()
        )

    def create_time_index(self, df: pd.DataFrame) -> pd.DataFrame:

        # Convert to numpy.int32 to prevent overflow
        df["date_id"] = df["date_id"].astype("int32")
        df["time_id"] = df["time_id"].astype("int32")

        return df

    def read_partition(self, read_all=False) -> pd.DataFrame:
        if self.partition_ids:
            if read_all:
                df = pd.concat([pd.read_parquet(parquet) for parquet in train_parquets])
            else:
                dfs = []
                for partition_id in self.partition_ids:
                    dfs.append(pd.read_parquet(train_parquets[partition_id]))
                df = pd.concat(dfs, ignore_index=True)
        else:
            df = pd.read_parquet(test_parquet)

        df = self.create_time_index(df)
        if self.symbol_id:
            df = self.filter_symbol(df, self.symbol_id)
        df = self.resample(df, self.sample_frequency)

        if self.exclude_set:
            df.drop(columns=self.exclude_set, inplace=True)

        return df

In [None]:
def get_time_weights(n, halflife=0.35):
    decay_factor = 0.5 ** (1 / (halflife * n))
    weights = decay_factor ** np.arange(n)
    weights /= weights.sum()
    return weights

def time_weighted_mean(vals, n, halflife=0.35):
    weights = get_time_weights(n, halflife)
    return np.dot(vals, weights[::-1])

In [None]:
class ExpWeightedMeanCalculator:
    def __init__(self, halflife=0.35, lookback=15, max_nans=5, replace=True):
        self.halflife = halflife
        self.lookback = lookback
        self.max_nans = max_nans
        self.replace = replace

    def calculate(self, df: pd.DataFrame, tdate: int, feature_column: str) -> float:
        lookback_dates = range(tdate - self.lookback, tdate)
        mean_values = []

        for date in lookback_dates:
            daily_mean = df[df["date_id"] == date][feature_column].mean()
            mean_values.append(daily_mean)

        nan_count = sum(pd.isna(mean_values))
        if nan_count > self.max_nans:
            return 0

        if self.replace:
            mean_values = [0 if pd.isna(m) else m for m in mean_values]

        res = time_weighted_mean(mean_values, self.lookback, self.halflife)
        return res

## Copy-Pasted Definitions (until I figure out how to import scripts)

In [None]:
lags_ : pl.DataFrame | None = None
accumulated_data : pl.DataFrame = pl.DataFrame()
ewma_calculator = ExpWeightedMeanCalculator(halflife=0.35, lookback=15)

# Replace this function with your inference code.
# You can return either a Pandas or Polars dataframe, though Polars is recommended.
# Each batch of predictions (except the very first) must be returned within 1 minute of the batch features being provided.
def predict(test: pl.DataFrame, lags: pl.DataFrame | None) -> pl.DataFrame | pd.DataFrame:
    """Make a prediction."""
    global lags_, accumulated_data
    if lags is not None:
        lags_ = lags

    # Append new test data to accumulated_data
    accumulated_data = pl.concat([accumulated_data, test])

    # Check if we have accumulated 15 days of data
    if accumulated_data['date_id'].n_unique() < 15:
        # Use the mean of responder_6_lag_1 from lags
        mean_responder_6_lag_1 = lags_['responder_6_lag_1'].mean()
        predictions = test.select(
            'row_id',
            pl.lit(mean_responder_6_lag_1).alias('responder_6'),
        )
    else:
        # Use the ExpWeightedMeanCalculator
        resampled_data = Preprocessor(sample_frequency = 15).resample(accumulated_data)
        tdate = test['date_id'].max()
        ewma_value = ewma_calculator.calculate(accumulated_data.to_pandas(), tdate, 'responder_6')
        predictions = test.select(
            'row_id',
            pl.lit(ewma_value).alias('responder_6'),
        )

    if isinstance(predictions, pl.DataFrame):
        assert predictions.columns == ['row_id', 'responder_6']
    elif isinstance(predictions, pd.DataFrame):
        assert (predictions.columns == ['row_id', 'responder_6']).all()
    else:
        raise TypeError('The predict function must return a DataFrame')
    # Confirm has as many rows as the test data.
    assert len(predictions) == len(test)

    return predictions

When your notebook is run on the hidden test set, inference_server.serve must be called within 15 minutes of the notebook starting or the gateway will throw an error. If you need more than 15 minutes to load your model you can do so during the very first `predict` call, which does not have the usual 1 minute response deadline.

In [None]:
inference_server = kaggle_evaluation.jane_street_inference_server.JSInferenceServer(predict)

if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    inference_server.run_local_gateway(
        (
            '/kaggle/input/jane-street-real-time-market-data-forecasting/test.parquet',
            '/kaggle/input/jane-street-real-time-market-data-forecasting/lags.parquet',
        )
    )