Largely built on [@iqmansingh's](https://www.kaggle.com/iqmansingh) notebook, [4-Fold Time-Series Split Ensemble](https://www.kaggle.com/code/iqmansingh/optiver-4-fold-time-series-split-ensemble), although this borrows the `reduce_mem_usage` and `imbalance_features` snippets as well. The core idea is still to build a voting ensemble on time series splits, but with score tracking so it can reject models that degrade performance.

I eventually learned that scikit-learn has a built-in [`VotingRegressor`](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.VotingRegressor.html) class, (*shout-out to [@chinzorigtganbat's](https://www.kaggle.com/chinzorigtganbat) [VotingRegressor + Boosters](https://www.kaggle.com/code/chinzorigtganbat/votingregressor-boosters)*), but it's different enough from what I'm going for here that I decided not to use it.

In [1]:
import os
import gc
import time
# import typing
import joblib
import warnings
import itertools
warnings.simplefilter('ignore') # ignore FutureWarnings; must precede pandas import
import pandas as pd
import numpy as np
import numba as nb
import xgboost as xgb
import lightgbm as lgb
import sklearn.svm as svm
import sklearn.impute as imp
import sklearn.metrics as met
import sklearn.model_selection as sel
import matplotlib.pyplot as plt
import typing_extensions as ext # used over vanilla typing since it backports 3.11+ features
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # ignore bugged CUDA errors; must precede tf import
import tensorflow as tf
tf.keras.utils.disable_interactive_logging() # ensemble will provide its own condensed version
print(('GPU available.' if len(tf.config.list_physical_devices('GPU')) > 0 else 'No GPU detected.'))

No GPU detected.


In [2]:
@nb.njit(parallel=True)
def compute_triplet_imbalance(values:np.ndarray, combo_indices:list[tuple[int, int, int]]) -> np.ndarray:
    num_rows = values.shape[0]
    num_combinations = len(combo_indices)
    imbalance_features = np.empty((num_rows, num_combinations))
    for i in nb.prange(num_combinations): # enumerate() works but prange() lets us run in parallel
        a, b, c = combo_indices[i]
        for j in nb.prange(num_rows):
            _a, _b, _c = values[j, a], values[j, b], values[j, c]
            max_val = max(_a, _b, _c)
            min_val = min(_a, _b, _c)
            mid_val = sum([_a, _b, _c])-max_val-min_val
            imbalance_features[j, i] = np.nan if mid_val == min_val else (max_val-mid_val)/(mid_val-min_val)
    return imbalance_features   

def calculate_triplet_imbalance_numba(cols:list[str], data:pd.DataFrame) -> pd.DataFrame:
    values = data[cols].values
    combo_indices = []
    columns = []
    for a, b, c in itertools.combinations(cols, 3):
        combo_indices.append(tuple([cols.index(col) for col in [a, b, c]]))
        columns.append(f'{a}_{b}_{c}_imbalance')
    features_array = compute_triplet_imbalance(values, combo_indices)
    return pd.DataFrame(features_array, columns=columns)

# notably, rolling_average and compute_rolling_averages, while typically included, are never actually used

def imbalance_features(data:pd.DataFrame) -> pd.DataFrame:
    prices = [*[col for col in data.columns if 'price' in col], 'wap']
    sizes = [col for col in data.columns if 'size' in col]
    data['volume'] = data.eval('ask_size+bid_size')
    data['mid_price'] = data.eval('(ask_price+bid_price)/2')
    data['liquidity_imbalance'] = data.eval('(bid_size-ask_size)/volume')
    data['matched_imbalance'] = data.eval('(imbalance_size-matched_size)/(imbalance_size+matched_size)')
    data['size_imbalance'] = data.eval('bid_size/ask_size')
    data['imbalance_momentum'] = data.groupby('stock_id').imbalance_size.diff(periods=1) / data.matched_size
    data['price_spread'] = data.eval('ask_price-bid_price')
    data['spread_intensity'] = data.groupby('stock_id').price_spread.diff()
    data['price_pressure'] = data.eval('imbalance_size*price_spread')
    data['market_urgency'] = data.eval('price_spread*liquidity_imbalance')
    data['depth_pressure'] = data.eval('(ask_size-bid_size)*(far_price-near_price)')
    for cols in itertools.combinations(prices, 2):
        data[f'{cols[0]}_{cols[1]}_imbalance'] = data.eval(f'({cols[0]}-{cols[1]})/({cols[0]}+{cols[1]})')
    for cols in [['ask_price', 'bid_price', 'wap', 'reference_price'], sizes]:
        triplet_feature = calculate_triplet_imbalance_numba(cols, data)
        data[triplet_feature.columns] = triplet_feature.values
    for func in ['mean', 'std', 'skew', 'kurt']:
        data[f'all_prices_{func}'] = data[prices].agg(func, axis=1)
        data[f'all_sizes_{func}'] = data[sizes].agg(func, axis=1)
    for win in [1, 2, 3, 10]:
        for col in ['matched_size', 'imbalance_size', 'reference_price', 'imbalance_buy_sell_flag']:
            data[f'{col}_shift_{win}'] = data.groupby('stock_id')[col].shift(win)
            data[f'{col}_pct_{win}'] = data.groupby('stock_id')[col].pct_change(win)
        for col in ['ask_price', 'bid_price', 'ask_size', 'bid_size', 'market_urgency', 'imbalance_momentum', 'size_imbalance']:
            data[f'{col}_diff_{win}'] = data.groupby('stock_id')[col].diff(win)
    data = data.replace([np.inf, -np.inf], 0)
    return data

def reduce_mem_usage(data:pd.DataFrame, verbose:bool=False) -> pd.DataFrame:
    if verbose: mem_start = data.memory_usage().sum()
    for col in data.columns:
        match data[col].dtype:
            case 'object': continue
            case 'int32' | 'int64':
                for int_size in [np.int8, np.int16, np.int32]:
                    if data[col].min() > np.iinfo(int_size).min and data[col].max() < np.iinfo(int_size).max:
                        data[col] = data[col].astype(int_size)
            case 'float32' | 'float64':
                for float_size in [np.float16, np.float32]:
                    if data[col].min() > np.finfo(float_size).min and data[col].max() < np.finfo(float_size).max:
                        data[col] = data[col].astype(float_size)
            case _: raise Exception(data[col].dtype)
    if verbose:
        mem_end = data.memory_usage().sum()
        print(f'DataFrame memory reduced from {mem_start} to {mem_end}.')
    return data

In [3]:
LOCAL_DATA_TRAIN = '.data/train.csv'
LOCAL_DATA_TEST_X = '.data/test.csv'
LOCAL_DATA_TEST_Y = '.data/revealed_targets.csv'

KAGGLE_DATA_TRAIN = '/kaggle/input/optiver-trading-at-the-close/train.csv'
KAGGLE_DATA_TEST_X = '/kaggle/input/optiver-trading-at-the-close/example_test_files/test.csv'
KAGGLE_DATA_TEST_Y = '/kaggle/input/optiver-trading-at-the-close/example_test_files/revealed_targets.csv'

DROPS = ['index', 'time_id', 'currently_scored', 'time_id_x', 'time_id_y', 'revealed_date_id', 'revealed_time_id', 'row_id']
SORTS = ['date_id', 'stock_id', 'seconds_in_bucket'] # order matters here
SKIPS = ['imbalance_buy_sell_flag', 'target']

def preprocess(data:pd.DataFrame) -> pd.DataFrame: # separate from load_data() for submission compat
    data = data.set_index(SORTS).sort_index()      # pushing these into a multi-index makes life easier down the road
    data = imbalance_features(data)                # add features early so they can be standardized; note SKIPS must be present here
    skip = data[[col for col in SKIPS if col in data.columns]]
    data = data.drop([col for col in [*DROPS, *SKIPS] if col in data.columns], axis=1)
    data = data.groupby(level='stock_id').ffill()  # impute with last observation; groupby() ensures ffill() is per-stock, per-day
    data = (data - data.mean()) / data.std(ddof=0) # normalize (z-score)
    data = data.fillna(0)                          # clean columns that didn't ffill or with a stdev of 0 (i.e., only 1 unique value)
    data = pd.concat([skip, data], axis=1, join='inner') # re-join with skipped columns
    temp = data.index.to_frame().seconds_in_bucket       # encode seconds as sin/cos waves
    data['seconds_in_bucket_sin'] = np.sin((temp * 2 * np.pi / 540))
    data['seconds_in_bucket_cos'] = np.cos((temp * 2 * np.pi / 540))
    return reduce_mem_usage(data)

def load_vars(test:bool=False) -> tuple[pd.DataFrame, pd.Series]: # returns training (or test) data for either local or kaggle setup
    def read_data(train, test_x, test_y): # wrap call to read_csv() since test X and y values are stored separately and must be merged
        if test: return pd.merge(*[pd.read_csv(path) for path in [test_x, test_y]], on=SORTS).rename(columns={'revealed_target':'target'})
        else: return pd.read_csv(train)
    try: data = read_data(LOCAL_DATA_TRAIN, LOCAL_DATA_TEST_X, LOCAL_DATA_TEST_Y)
    except FileNotFoundError: data = read_data(KAGGLE_DATA_TRAIN, KAGGLE_DATA_TEST_X, KAGGLE_DATA_TEST_Y)
    data = data.dropna(subset=['target']) # some rows have null targets
    data = preprocess(data)
    return data.drop('target', axis=1), data.target

In [4]:
class PredictionError(Exception): pass # specific error type so we can fail gracefully later on during training

class IModel(ext.Protocol): # interface for any sklearn-API model https://scikit-learn.org/stable/developers/develop.html
    def fit(self, X, y, **kwargs) -> ext.Self: ...
    def predict(self, X, **kwargs) -> np.ndarray: ...
    def get_params(self, deep=True) -> dict[str, ext.Any]: ...

class SelectiveEnsemble:
    def __init__(self, limit:int=None) -> None:
        self.limit = limit # once len(models) >= limit, only add new models with scores below the mean
        self.models = dict[str, IModel]()
        self.scores = dict[str, float]()
        self.kwargs = dict[str, dict]()
        self.test_x, self.test_y = load_vars(test=True)
    
    @property
    def mean_score(self) -> float:
        return sum(self.scores[m] for m in self.models) / len(self) if len(self) > 0 else None
    
    @property
    def best_score(self) -> float:
        return min(self.scores[m] for m in self.models) if len(self) > 0 else None
    
    @property
    def best_model(self) -> IModel:
        return [self.models[m] for m in self.models if self.scores[m] == self.best_score][0]
    
    def add(self, model:IModel, name:str, kwargs:dict) -> tuple[bool, float]: # raises PredictionError
        if name in self.models: name = f'{name}(1)'
        pred = model.predict(self.test_x, **kwargs)
        if len(np.unique(pred)) == 1: raise PredictionError('Model is guessing a constant value.')
        if np.isnan(pred).any(): raise PredictionError('Model is guessing NaN.')
        score = met.mean_absolute_error(self.test_y, pred)
        if self.limit and len(self) >= self.limit and self.mean_score < score: return False, score
        self.models[name] = model
        self.scores[name] = score
        self.kwargs[name] = kwargs
        return True, score

    def prune(self, limit:int=None) -> ext.Self: # removes models with scores above the mean; recurses if limit is set
        pruned = SelectiveEnsemble(limit=(limit or self.limit))
        pruned.models = {m:self.models[m] for m in self.models if self.scores[m] <= self.mean_score}
        pruned.scores = {m:self.scores[m] for m in pruned.models}
        pruned.kwargs = {m:self.kwargs[m] for m in pruned.models}
        if pruned.limit and len(pruned) > pruned.limit > 1: return pruned.prune()
        return pruned
    
    def predict(self, X:pd.DataFrame, **kwargs) -> np.ndarray: # predict() wrapper for soft voting
        y = np.zeros(len(X))
        for m in self.models:
            pred = self.models[m].predict(X, **self.kwargs[m])
            y += pred.reshape(-1) # reshape needed for tensorflow output; doesn't impact other model types
        y = y / len(self)
        return y

    def __len__(self) -> int:
        return len(self.models)
    
    def __repr__(self) -> str:
        return f'<Ensemble ({len(self)} model(s); mean: {self.mean_score:.6f}; best: {self.best_score:.6f}; limit: {self.limit})>'

In [5]:
def build_ensemble(models:list[IModel], folds:int=5, limit:int=None, skip_prediction_errors:bool=True, ignore_errors:bool=True) -> SelectiveEnsemble:
    print(f'Pre-training setup...', end='\r')
    ensemble = SelectiveEnsemble(limit=limit)
    cv = sel.TimeSeriesSplit(folds)
    X, y = load_vars()
    for j, model in enumerate(models): # customize fit() and predict() kwargs for each model's type and params
        fit_kw = dict()
        predict_kw = dict()
        model_class = type(model).__name__ # used over isinstance() to use match/case
        match model_class:
            case 'Sequential':
                model.compile(optimizer='adam', loss='mae')
                keras_kw = dict(batch_size=256, verbose=0)
                fit_kw.update(keras_kw) # in testing, multiple epochs per fold did not help
                predict_kw.update(keras_kw)
                early_stop = True
            case 'LGBMRegressor':
                fit_kw.update(dict(verbose=False)) # verbose=0 throws an error
                early_stop = 'early_stopping_round' in model.get_params()
            case 'XGBRegressor':
                fit_kw.update(dict(verbose=0))
                early_stop = 'early_stopping_rounds' in model.get_params()
            # I tried CatBoostRegressor but it consistently underperformed and caused too many issues
        fails = 0
        for i, (i_train, i_valid) in enumerate(cv.split(X)):
            name = f'{j}_{model_class}_{i}'
            msg = f'Model {j+1}/{len(models)}: Fold {i+1}/{folds}: {name}'
            try: # fail gracefully instead of giving up on the whole ensemble
                print(f'{msg} - Training...', end='\r')
                X_valid, y_valid = X.iloc[i_valid, :], y.iloc[i_valid]
                early_stop_kw = {}
                if early_stop:
                    if model_class == 'Sequential': early_stop_kw['validation_data'] = (X_valid, y_valid)
                    else: # LGB + XGB
                        early_stop_kw['eval_set'] = [(X_valid, y_valid)]
                        if model_class == 'LGBMRegressor': early_stop_kw['eval_metric'] = 'l1'
                fit_kw.update(early_stop_kw)
                try: model.fit(X.iloc[i_train, :], y.iloc[i_train], **fit_kw) # some kw fail on kaggle
                except: model.fit(X.iloc[i_train, :], y.iloc[i_train], **early_stop_kw) # regardless, always want the early stop
                del X_valid, y_valid
                print(f'{msg} - Submitting to ensemble...', end='\r')
                if model_class == 'Sequential':
                    clone = tf.keras.models.clone_model(model)
                    clone.set_weights(model.get_weights())
                else: clone = None
                res, score = ensemble.add((clone or model), name, predict_kw)
                print(f'{msg} - {("Accepted" if res else "Rejected")} with score: {score}\n\t{ensemble}')
                fails = 0 # reset fail counter to pick up consecutive fails only
            except PredictionError as e: # these tend not to improve, so skip remaining folds and move to next model
                print(f'{msg} - Stopped: {e}')
                if skip_prediction_errors: break
            except Exception as e: # these are mostly out of memory errors, which can generally be ignored
                if not ignore_errors: raise e # ...generally
                print(f'{msg} - Error: {type(e).__name__}: {e}')
                fails += 1
                if fails > 1: break # consecutive failures are usually a misconfig, skip these models as well
            finally: gc.collect()
    return ensemble

In [6]:
N_FEATURES = len(load_vars(test=True)[0].columns) # 
ACTIVATION_1 = 'tanh' # inputs are standardized so keep negative range
ACTIVATION_2 = 'relu' # performed better than tanh, sigmoid
DROPOUT = 0.5         # performed better than 0.3, 0.4
RANDOM_STATE = 25     # funnier than 24

layers = tf.keras.layers
Sequential = tf.keras.Sequential
regularizer = tf.keras.regularizers.l1(0.001)
tf.keras.utils.set_random_seed(RANDOM_STATE)
gb_params = dict(random_state=RANDOM_STATE, n_jobs=16, learning_rate=0.2, max_depth=3, colsample_bytree=0.85, subsample=0.8, reg_alpha=500) # lgb and xgb have some overlap

models = [
    Sequential([
        layers.Dense(N_FEATURES, kernel_regularizer=regularizer, activation=ACTIVATION_1, input_shape=[N_FEATURES]),
        layers.Dropout(DROPOUT),
        layers.BatchNormalization(),
        layers.Dense(N_FEATURES//8, kernel_regularizer=regularizer, activation=ACTIVATION_2),
        layers.Dropout(DROPOUT),
        layers.BatchNormalization(),
        layers.Dense(1)
    ]),
    Sequential([
        layers.Dense(N_FEATURES, kernel_regularizer=regularizer, activation=ACTIVATION_1, input_shape=[N_FEATURES]),
        layers.Dropout(DROPOUT),
        layers.BatchNormalization(),
        layers.Dense(N_FEATURES//4, kernel_regularizer=regularizer, activation=ACTIVATION_2),
        layers.Dropout(DROPOUT),
        layers.BatchNormalization(),
        layers.Dense(1)
    ]),
    Sequential([
        layers.Dense(N_FEATURES, kernel_regularizer=regularizer, activation=ACTIVATION_1, input_shape=[N_FEATURES]),
        layers.Dropout(DROPOUT),
        layers.BatchNormalization(),
        layers.Dense(N_FEATURES//2, kernel_regularizer=regularizer, activation=ACTIVATION_2),
        layers.Dropout(DROPOUT),
        layers.BatchNormalization(),
        layers.Dense(1)
    ]),
    Sequential([
        layers.Dense(N_FEATURES, kernel_regularizer=regularizer, activation=ACTIVATION_1, input_shape=[N_FEATURES]),
        layers.Dropout(DROPOUT),
        layers.BatchNormalization(),
        layers.Dense(N_FEATURES, kernel_regularizer=regularizer, activation=ACTIVATION_2),
        layers.Dropout(DROPOUT),
        layers.BatchNormalization(),
        layers.Dense(1)
    ]),
    # including xgb and lgb just in case but these tend to get rejected/pruned
    xgb.XGBRegressor(**gb_params, early_stopping_rounds=5, eval_metric='mae', tree_method='hist', gamma=0.2),
    lgb.LGBMRegressor(**gb_params, early_stopping_round=5, metric='l1', num_leaves=8, min_child_samples=2000, min_split_gain=0.001, verbosity=-1),
]

ensemble = build_ensemble(models)

Model 1/6: Fold 1/5: 0_Sequential_0 - Accepted with score: 5.470284938812256
	<Ensemble (1 model(s); mean: 5.470285; best: 5.470285; limit: None)>
Model 1/6: Fold 2/5: 0_Sequential_1 - Accepted with score: 5.480103492736816
	<Ensemble (2 model(s); mean: 5.475194; best: 5.470285; limit: None)>
Model 1/6: Fold 3/5: 0_Sequential_2 - Accepted with score: 5.4581475257873535
	<Ensemble (3 model(s); mean: 5.469512; best: 5.458148; limit: None)>
Model 1/6: Fold 4/5: 0_Sequential_3 - Accepted with score: 5.460043907165527
	<Ensemble (4 model(s); mean: 5.467145; best: 5.458148; limit: None)>
Model 1/6: Fold 5/5: 0_Sequential_4 - Accepted with score: 5.4551615715026855
	<Ensemble (5 model(s); mean: 5.464748; best: 5.455162; limit: None)>
Model 2/6: Fold 1/5: 1_Sequential_0 - Accepted with score: 5.490260601043701
	<Ensemble (6 model(s); mean: 5.469000; best: 5.455162; limit: None)>
Model 2/6: Fold 2/5: 1_Sequential_1 - Accepted with score: 5.490682125091553
	<Ensemble (7 model(s); mean: 5.472098;

In [10]:
pruned = ensemble.prune()
top = ensemble.prune(len(models))
print(f'Ensemble: {ensemble.mean_score:.6f}, {len(ensemble)} models (<all>)')
print(f'Pruned  : {pruned.mean_score:.6f}, {len(pruned)} models ({", ".join([m for m in pruned.models])})')
print(f'Top N   : {top.mean_score:.6f},  {len(top)} models ({", ".join([m for m in top.models])})')

Ensemble: 5.557648, 30 models (<all>)
Pruned  : 5.473001, 21 models (0_Sequential_0, 0_Sequential_1, 0_Sequential_2, 0_Sequential_3, 0_Sequential_4, 1_Sequential_0, 1_Sequential_1, 1_Sequential_2, 1_Sequential_3, 1_Sequential_4, 2_Sequential_0, 2_Sequential_1, 2_Sequential_2, 2_Sequential_3, 2_Sequential_4, 3_Sequential_0, 3_Sequential_1, 3_Sequential_2, 3_Sequential_3, 3_Sequential_4, 5_LGBMRegressor_4)
Top N   : 5.454747,  6 models (0_Sequential_2, 0_Sequential_4, 1_Sequential_3, 2_Sequential_2, 2_Sequential_3, 3_Sequential_3)


In [11]:
# raise Exception # stop for manual eval
model = pruned

In [12]:
import optiver2023
env = optiver2023.make_env()
iter_test = env.iter_test()

for (test, _, _) in iter_test:
    X_test = preprocess(test)
    y_pred = model.predict(X_test, verbose=0)
    submission = test[['row_id']].set_index('row_id') # needed to match rows
    submission['target'] = y_pred
    submission = submission.reset_index() # convert back for final CSV write
    env.predict(submission)

try:
    res = pd.read_csv('/kaggle/working/submission.csv') # sanity check
except FileNotFoundError:
    res = pd.read_csv('./.data/submission.csv')
res

Unnamed: 0,row_id,target
0,478_0_0,-0.272907
1,478_0_1,0.528827
2,478_0_2,0.823432
3,478_0_3,-0.351504
4,478_0_4,-0.393337
...,...,...
32995,480_540_195,-0.144231
32996,480_540_196,-0.962665
32997,480_540_197,-0.018800
32998,480_540_198,0.140045
