In [1]:
import gc
import os
import time
import warnings
from itertools import combinations
from warnings import simplefilter

import joblib
import lightgbm as lgb
import numpy as np
import pandas as pd
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import KFold, TimeSeriesSplit
import polars as pl
from sklearn.preprocessing import MinMaxScaler
from scipy.cluster.hierarchy import dendrogram, linkage, fcluster
warnings.filterwarnings("ignore")
simplefilter(action="ignore", category=pd.errors.PerformanceWarning)

is_offline = True
is_train = True
is_infer = True
max_lookback = np.nan
split_day = 435

ModuleNotFoundError: No module named 'polars'

In [66]:
df = pd.read_csv("/kaggle/input/optiver-trading-at-the-close/optiver-trading-at-the-close/train.csv")
df = df.dropna(subset=["target"]) 
df.reset_index(drop=True, inplace=True)
df.shape

(5237892, 17)

In [68]:
def reduce_mem_usage(df, verbose=0):
    """
    Iterate through all numeric columns of a dataframe and modify the data type
    to reduce memory usage.
    """

    start_mem = df.memory_usage().sum() / 1024**2

    for col in df.columns:
        col_type = df[col].dtype

        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == "int":
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float32)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float32)

    if verbose:
        logger.info(f"Memory usage of dataframe is {start_mem:.2f} MB")
        end_mem = df.memory_usage().sum() / 1024**2
        logger.info(f"Memory usage after optimization is: {end_mem:.2f} MB")
        decrease = 100 * (start_mem - end_mem) / start_mem
        logger.info(f"Decreased by {decrease:.2f}%")

    return df


In [69]:
# Import Numba for just-in-time (JIT) compilation and parallel processing
from numba import njit, prange

# Function to compute triplet imbalance in parallel using Numba
@njit(parallel=True)
def compute_triplet_imbalance(df_values, comb_indices):
    num_rows = df_values.shape[0]
    num_combinations = len(comb_indices)
    imbalance_features = np.empty((num_rows, num_combinations))

    # Loop through all combinations of triplets
    for i in prange(num_combinations):
        a, b, c = comb_indices[i]
        
        # Loop through rows of the DataFrame
        for j in range(num_rows):
            max_val = max(df_values[j, a], df_values[j, b], df_values[j, c])
            min_val = min(df_values[j, a], df_values[j, b], df_values[j, c])
            mid_val = df_values[j, a] + df_values[j, b] + df_values[j, c] - min_val - max_val
            
            # Prevent division by zero
            if mid_val == min_val:
                imbalance_features[j, i] = np.nan
            else:
                imbalance_features[j, i] = (max_val - mid_val) / (mid_val - min_val)

    return imbalance_features

# Function to calculate triplet imbalance for given price data and a DataFrame
def calculate_triplet_imbalance_numba(price, df):
    # Convert DataFrame to numpy array for Numba compatibility
    df_values = df[price].values
    comb_indices = [(price.index(a), price.index(b), price.index(c)) for a, b, c in combinations(price, 3)]

    # Calculate the triplet imbalance using the Numba-optimized function
    features_array = compute_triplet_imbalance(df_values, comb_indices)

    # Create a DataFrame from the results
    columns = [f"{a}_{b}_{c}_imb2" for a, b, c in combinations(price, 3)]
    features = pd.DataFrame(features_array, columns=columns)

    return features



In [70]:
# generate imbalance features
def imbalance_features(df):
    prices = ["reference_price", "far_price", "near_price", "ask_price", "bid_price", "wap"]
    sizes = ["matched_size", "bid_size", "ask_size", "imbalance_size"]

    # V1
    df["volume"] = df.eval("ask_size + bid_size")
    df["mid_price"] = df.eval("(ask_price + bid_price) / 2")
    df["liquidity_imbalance"] = df.eval("(bid_size-ask_size)/(bid_size+ask_size)")
    df["matched_imbalance"] = df.eval("(imbalance_size-matched_size)/(matched_size+imbalance_size)")
    df["size_imbalance"] = df.eval("bid_size / ask_size")
    # Create features for pairwise price imbalances
    for c in combinations(prices, 2):
        df[f"{c[0]}_{c[1]}_imb"] = df.eval(f"({c[0]} - {c[1]})/({c[0]} + {c[1]})")
    # Calculate triplet imbalance features using the Numba-optimized function
    for c in [['ask_price', 'bid_price', 'wap', 'reference_price'], sizes]:
        triplet_feature = calculate_triplet_imbalance_numba(c, df)
        df[triplet_feature.columns] = triplet_feature.values
        
        
    # V2
    df["stock_weights"] = df["stock_id"].map(weights)
    df["weighted_wap"] = df["stock_weights"] * df["wap"]
    df['wap_momentum'] = df.groupby('stock_id')['weighted_wap'].pct_change(periods=6)
    df["imbalance_momentum"] = df.groupby(['stock_id'])['imbalance_size'].diff(periods=1) / df['matched_size']
    df["price_spread"] = df["ask_price"] - df["bid_price"]
    df["spread_intensity"] = df.groupby(['stock_id'])['price_spread'].diff()
    df['price_pressure'] = df['imbalance_size'] * (df['ask_price'] - df['bid_price'])
    df['market_urgency'] = df['price_spread'] * df['liquidity_imbalance']
    df['depth_pressure'] = (df['ask_size'] - df['bid_size']) * (df['far_price'] - df['near_price'])
    df['spread_depth_ratio'] = (df['ask_price'] - df['bid_price']) / (df['bid_size'] + df['ask_size'])
    df['mid_price_movement'] = df['mid_price'].diff(periods=5).apply(lambda x: 1 if x > 0 else (-1 if x < 0 else 0))
    df['micro_price'] = ((df['bid_price'] * df['ask_size']) + (df['ask_price'] * df['bid_size'])) / (df['bid_size'] + df['ask_size'])
    df['relative_spread'] = (df['ask_price'] - df['bid_price']) / df['wap']
    
    #价量横截面统计特征（均值，标准差，偏度，峰度）
    for func in ["mean", "std", "skew", "kurt"]:
        df[f"all_prices_{func}"] = df[prices].agg(func, axis=1)
        df[f"all_sizes_{func}"] = df[sizes].agg(func, axis=1)
    
    
    # V3
    # Calculate shifted and return features for specific columns
    for col in ['matched_size', 'imbalance_size', 'reference_price', 'imbalance_buy_sell_flag']:
        for window in [1, 2, 3, 5, 10]:
            df[f"{col}_shift_{window}"] = df.groupby('stock_id')[col].shift(window)
            df[f"{col}_ret_{window}"] = df.groupby('stock_id')[col].pct_change(window)
                    
            
    # Calculate diff features for specific columns      
    for col in ['ask_price', 'bid_price', 'ask_size', 'bid_size',
                'wap', 'near_price', 'far_price']:#'weighted_wap','price_spread'
        for window in [1, 2, 3, 5, 10]:
            df[f"{col}_diff_{window}"] = df.groupby("stock_id")[col].diff(window)
    
    #V4
    for window in [3, 5, 10]:
        df[f'price_change_diff_{window}'] = df[f'bid_price_diff_{window}'] - df[f'ask_price_diff_{window}']
        df[f'size_change_diff_{window}'] = df[f'bid_size_diff_{window}'] - df[f'ask_size_diff_{window}']
    
    #V5
    pl_df = pl.from_pandas(df)

    windows = [3, 5, 10]
    columns = ['ask_price', 'bid_price', 'ask_size', 'bid_size']

    group = ["stock_id"]
    expressions = []
    
    for window in windows:
        for col in columns:
            rolling_mean_expr = (
                pl.col(f"{col}_diff_{window}")
                .rolling_mean(window)
                .over(group)
                .alias(f'rolling_diff_{col}_{window}')
            )

            rolling_std_expr = (
                pl.col(f"{col}_diff_{window}")
                .rolling_std(window)
                .over(group)
                .alias(f'rolling_std_diff_{col}_{window}')
            )

            expressions.append(rolling_mean_expr)
            expressions.append(rolling_std_expr)

    lazy_df = pl_df.lazy().with_columns(expressions)

    pl_df = lazy_df.collect()

    df = pl_df.to_pandas()
    gc.collect()
    
    df['mid_price*volume'] = df['mid_price_movement'] * df['volume']
    df['harmonic_imbalance'] = df.eval('2 / ((1 / bid_size) + (1 / ask_size))')
    
    for col in df.columns:
        df[col] = df[col].replace([np.inf, -np.inf], 0)

    return df

# generate time & stock features
def other_features(df):
    df["dow"] = df["date_id"] % 5
    df["dom"] = df["date_id"] % 20
    df["seconds"] = df["seconds_in_bucket"] % 60
    df["minute"] = df["seconds_in_bucket"] // 60

    for key, value in global_stock_id_feats.items():
        df[f"global_{key}"] = df["stock_id"].map(value.to_dict())

    return df


In [71]:
weights = [
    0.004, 0.001, 0.002, 0.006, 0.004, 0.004, 0.002, 0.006, 0.006, 0.002, 0.002, 0.008,
    0.006, 0.002, 0.008, 0.006, 0.002, 0.006, 0.004, 0.002, 0.004, 0.001, 0.006, 0.004,
    0.002, 0.002, 0.004, 0.002, 0.004, 0.004, 0.001, 0.001, 0.002, 0.002, 0.006, 0.004,
    0.004, 0.004, 0.006, 0.002, 0.002, 0.04 , 0.002, 0.002, 0.004, 0.04 , 0.002, 0.001,
    0.006, 0.004, 0.004, 0.006, 0.001, 0.004, 0.004, 0.002, 0.006, 0.004, 0.006, 0.004,
    0.006, 0.004, 0.002, 0.001, 0.002, 0.004, 0.002, 0.008, 0.004, 0.004, 0.002, 0.004,
    0.006, 0.002, 0.004, 0.004, 0.002, 0.004, 0.004, 0.004, 0.001, 0.002, 0.002, 0.008,
    0.02 , 0.004, 0.006, 0.002, 0.02 , 0.002, 0.002, 0.006, 0.004, 0.002, 0.001, 0.02,
    0.006, 0.001, 0.002, 0.004, 0.001, 0.002, 0.006, 0.006, 0.004, 0.006, 0.001, 0.002,
    0.004, 0.006, 0.006, 0.001, 0.04 , 0.006, 0.002, 0.004, 0.002, 0.002, 0.006, 0.002,
    0.002, 0.004, 0.006, 0.006, 0.002, 0.002, 0.008, 0.006, 0.004, 0.002, 0.006, 0.002,
    0.004, 0.006, 0.002, 0.004, 0.001, 0.004, 0.002, 0.004, 0.008, 0.006, 0.008, 0.002,
    0.004, 0.002, 0.001, 0.004, 0.004, 0.004, 0.006, 0.008, 0.004, 0.001, 0.001, 0.002,
    0.006, 0.004, 0.001, 0.002, 0.006, 0.004, 0.006, 0.008, 0.002, 0.002, 0.004, 0.002,
    0.04 , 0.002, 0.002, 0.004, 0.002, 0.002, 0.006, 0.02 , 0.004, 0.002, 0.006, 0.02,
    0.001, 0.002, 0.006, 0.004, 0.006, 0.004, 0.004, 0.004, 0.004, 0.002, 0.004, 0.04,
    0.002, 0.008, 0.002, 0.004, 0.001, 0.004, 0.006, 0.004,
]

weights = {int(k):v for k,v in enumerate(weights)}

In [72]:
#Correlation Matrix of WAP
def calculate_daily_returns(stock_data):
    stock_data['return'] = stock_data['wap'].pct_change()
    return stock_data[['return', 'seconds_in_bucket']].dropna()  # Keep 'return' and 'seconds_in_bucket' columns

returns = df.groupby(['stock_id', 'date_id']).apply(calculate_daily_returns).reset_index()

# 2. Align the data for each stock by filling in the gaps (if any) and then concatenate the returns to form a matrix
# For this step, we will pivot the data so each stock has its own column, and each row represents a timestamp.
pivot_returns = returns.pivot_table(index=['date_id', 'seconds_in_bucket'], 
                                    columns='stock_id', 
                                    values='return')

# handle missing values by filling the average of all available 
pivot_returns = pivot_returns.apply(lambda row: row.fillna(row.mean()), axis=1)

# 3. Compute the correlation matrix for all stocks
correlation_matrix = pivot_returns.corr()
#print(correlation_matrix)

In [73]:
# Hierarchical clustering
Z = linkage(correlation_matrix, 'ward')
clusters = fcluster(Z, 10, criterion = 'maxclust')

#Assign clusters to stocks
stock_clusters = pd.DataFrame({'stock_id': correlation_matrix.index, 'cluster': clusters})
print(stock_clusters)

     stock_id  cluster
0           0        4
1           1        7
2           2        7
3           3        4
4           4        4
..        ...      ...
195       195        5
196       196        9
197       197        3
198       198        5
199       199        9

[200 rows x 2 columns]


In [74]:
#Normalized stock cluster feature

def cluster_feature(df):
    #normalize
    scaler = MinMaxScaler(feature_range=(-1, 1))
    normalized_clusters = scaler.fit_transform(clusters.reshape(-1, 1))

    # Assign clusters to stocks
    
    # Assign clusters to stocks
    # 1. Extract cluster labels from hierarchical clustering
    df_clusters = pd.DataFrame({'stock_id': pivot_returns.columns, 'cluster_label': normalized_clusters.flatten()})

    # 2. Map cluster labels to each stock ID
    stock_id_to_cluster = dict(zip(df_clusters['stock_id'], df_clusters['cluster_label']))

    # 3. Add cluster labels to your original DataFrame
    df['cluster'] = df['stock_id'].map(stock_id_to_cluster)
    return df

In [75]:
from sklearn.decomposition import PCA

pca = PCA()
principal_components = pca.fit_transform(correlation_matrix)

In [76]:
#PCA-weighed average price features
def pca_wap_feature(df):
    
    #Focus on the first 4 components and save as DataFrame
    
    #Create a pivot table for wap
    price_pivot = df.pivot_table(index=['date_id', 'seconds_in_bucket'], columns='stock_id', values='wap')
    
    #Generate principal DataFrame
    principal_df = pd.DataFrame(data=principal_components, 
                                index=correlation_matrix.index,  # use stock_ids as the index
                                columns=['PC'+str(i) for i in range(1, principal_components.shape[1] + 1)])
    
    #Ensure the ordering of stock_id in price_pivot and principal_pca is consistent
    ordered_columns = price_pivot.columns
    principal_df = principal_df.loc[ordered_columns].reset_index()
    
    #Handle NaN values and replace with 0
    price_pivot.fillna(0, inplace=True)
    principal_df.fillna(0, inplace=True)
    
    #Initialize a dataframe to hold the PCA_WAP values
    pca_wap_df = pd.DataFrame(index=price_pivot.index)
    
    #Compute 4 WAPs using PCA
    for i in range(1,5):
        pca_wap_df[f'PCA_WAP_{i}'] = (price_pivot.values * principal_df.set_index('stock_id')[f'PC{i}'].values).sum(axis=1)
    
    #Resetting index for merging purposes
    pca_wap_df = pca_wap_df.reset_index()
    
    #Merging the PCA_WAP columns with the initial dataset df
    df = df.merge(pca_wap_df, on=['date_id', 'seconds_in_bucket'], how='left')
    
    return df

In [77]:
# generate all features
def generate_all_features(df):
    cols = [c for c in df.columns if c not in ["row_id", "time_id", "target"]]
    df = df[cols]
    df = imbalance_features(df)
    df = other_features(df)
    df = cluster_feature(df)
    df = pca_wap_feature(df)
    
    gc.collect()
    
    feature_name = [i for i in df.columns if i not in ["row_id", "target", "time_id", "date_id"]]
    
    return df[feature_name]

In [78]:
# Check if the code is running in offline or online mode
if is_offline:
    # In offline mode, split the data into training and validation sets based on the split_day
    df_train = df[df["date_id"] <= split_day]
    df_valid = df[df["date_id"] > split_day]
    print("Offline mode")
    print(f"train : {df_train.shape}, valid : {df_valid.shape}")
else:
    # In online mode, use the entire dataset for training
    df_train = df
    print("Online mode")
    
del df
gc.collect()

Offline mode
train : (4742893, 17), valid : (494999, 17)


0

In [None]:
%%time
if is_train:
    global_stock_id_feats = {
        "median_size": df_train.groupby("stock_id")["bid_size"].median() + df_train.groupby("stock_id")["ask_size"].median(),
        "std_size": df_train.groupby("stock_id")["bid_size"].std() + df_train.groupby("stock_id")["ask_size"].std(),
        "ptp_size": df_train.groupby("stock_id")["bid_size"].max() - df_train.groupby("stock_id")["bid_size"].min(),
        "median_price": df_train.groupby("stock_id")["bid_price"].median() + df_train.groupby("stock_id")["ask_price"].median(),
        "std_price": df_train.groupby("stock_id")["bid_price"].std() + df_train.groupby("stock_id")["ask_price"].std(),
        "ptp_price": df_train.groupby("stock_id")["bid_price"].max() - df_train.groupby("stock_id")["ask_price"].min(),
    }
    if is_offline:
        df_train_feats = generate_all_features(df_train)
        print("Build Train Feats Finished.")
        df_valid_feats = generate_all_features(df_valid)
        print("Build Valid Feats Finished.")
        df_valid_feats = reduce_mem_usage(df_valid_feats)
    else:
        df_train_feats = generate_all_features(df_train)
        print("Build Online Train Feats Finished.")

    df_train_feats = reduce_mem_usage(df_train_feats)
    
feature_name = list(df_train_feats.columns)
print(f"Feature length = {len(feature_name)}")

In [None]:
%%time
# Train procedure
if is_train:
    offline_split = df_train['date_id']>(split_day - 45)
    df_offline_train = df_train_feats[~offline_split]
    df_offline_valid = df_train_feats[offline_split]
    df_offline_train_target = df_train['target'][~offline_split]
    df_offline_valid_target = df_train['target'][offline_split]
    df_train_target = df_train["target"]
    del df_train
    gc.collect()
    
    ctb_params = dict(iterations=1200,
                      learning_rate=1.0,
                      depth=8,
                      l2_leaf_reg=30,
                      bootstrap_type='Bernoulli',
                      subsample=0.66,
                      loss_function='MAE',
                      eval_metric = 'MAE',
                      metric_period=100,
                      od_type='Iter',
                      od_wait=30,
                      task_type='GPU',
                      allow_writing_files=False,
                      )
    
    print("Feature Elimination Performing.")
    ctb_model = CatBoostRegressor(**ctb_params)
    summary = ctb_model.select_features(
        df_offline_train[feature_name], df_offline_train_target,
        eval_set=[(df_offline_valid[feature_name], df_offline_valid_target)],
        features_for_select=feature_name,
        num_features_to_select=len(feature_name)-24,    # Dropping from 124 to 100
        steps=3,
        algorithm=EFeaturesSelectionAlgorithm.RecursiveByShapValues,
        shap_calc_type=EShapCalcType.Regular,
        train_final_model=False,
        plot=True,
    )
    
    print("Valid Model Training on Selected Features Subset.")
    ctb_model = CatBoostRegressor(**ctb_params)
    ctb_model.fit(
        df_offline_train[summary['selected_features_names']], df_offline_train_target,
        eval_set=[(df_offline_valid[summary['selected_features_names']], df_offline_valid_target)],
        use_best_model=True,
    )
    
    del df_offline_train, df_offline_valid, df_offline_train_target, df_offline_valid_target
    gc.collect()
    
    print("Infer Model Training on Selected Features Subset.")
    infer_params = ctb_params.copy()
    # CatBoost train best with Valid number of iterations
    infer_params["iterations"] = ctb_model.best_iteration_
    infer_ctb_model = CatBoostRegressor(**infer_params)
    infer_ctb_model.fit(df_train_feats[summary['selected_features_names']], df_train_target)
    print("Infer Model Training on Selected Features Subset Complete.")
    
    if is_offline:   
        # Offline predictions
        df_valid_target = df_valid["target"]
        offline_predictions = infer_ctb_model.predict(df_valid_feats[summary['selected_features_names']])
        offline_score = mean_absolute_error(offline_predictions, df_valid_target)
        print(f"Offline Score {np.round(offline_score, 4)}")
        del df_valid, df_valid_feats
        gc.collect()
    
    del df_train_feats
    gc.collect()


In [None]:
summary['eliminated_features_names']

In [None]:
feat_importances = infer_ctb_model.get_feature_importance(prettified=True)

plt.figure(figsize=(12, 20))
sns.barplot(x="Importances", y="Feature Id", data=feat_importances)
plt.title('CatBoost features importance:')
plt.tight_layout()

In [None]:
feat_importances = infer_ctb_model.get_feature_importance(prettified=True)

plt.figure(figsize=(12, 20))
sns.barplot(x="Importances", y="Feature Id", data=feat_importances)
plt.title('CatBoost features importance:')
plt.tight_layout()

In [None]:
from catboost import EFstrType
feat_interactions = infer_ctb_model.get_feature_importance(type=EFstrType.Interaction, prettified=True)
top_interactions = feat_interactions[:10]
top_interactions

In [None]:
top_interactions['First Feature Index'] = top_interactions['First Feature Index'].apply(lambda x: summary['selected_features_names'][x])
top_interactions['Second Feature Index'] = top_interactions['Second Feature Index'].apply(lambda x: summary['selected_features_names'][x])
top_interactions.columns = ['First Feature', 'Second Feature', 'Interaction']
top_interactions