In [1]:
import os
import glob
from joblib import Parallel, delayed
import pandas as pd
import numpy as np
import scipy as sc
from sklearn.model_selection import KFold
import lightgbm as lgb
import warnings
warnings.filterwarnings('ignore')
from tqdm import tqdm
import joblib
from catboost import CatBoostRegressor
import catboost
from scipy.stats import kurtosis
from scipy.stats import skew





# LightGBM和CatBoost

In [2]:
# 数据集目录
data_dir = '../input/optiver-realized-volatility-prediction/'

# 对数回报率
def log_return(series):
    return np.log(series).diff()

# 基于最有竞争力价格和第二竞争力价格的加权平均价格
def calc_wap1(df):
    wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']) / (df['bid_size1'] + df['ask_size1'])
    return wap
def calc_wap2(df):
    wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']) / (df['bid_size2'] + df['ask_size2'])
    return wap

# 已实现波动率
def realized_volatility(series):
    return np.sqrt(np.sum(series**2))

# 返回数据基数
def count_unique(series):
    return len(np.unique(series))

# 订单深度
def calc_depth(df):
    depth = df['bid_price1'] * df['bid_size1'] + df['ask_price1'] * df['ask_size1'] + df['bid_price2'] * df[
               'bid_size2'] + df['ask_price2'] * df['ask_size2']
    return depth

# 订单斜率
def calc_slope(df):
    v0 = (df['bid_size1']+df['ask_size1'])/2
    p0 = (df['bid_price1']+df['ask_price1'])/2
    slope_bid = ((df['bid_size1']/v0)-1)/abs((df['bid_price1']/p0)-1)+(
                (df['bid_size2']/df['bid_size1'])-1)/abs((df['bid_price2']/df['bid_price1'])-1)
    slope_ask = ((df['ask_size1']/v0)-1)/abs((df['ask_price1']/p0)-1)+(
                (df['ask_size2']/df['ask_size1'])-1)/abs((df['ask_price2']/df['ask_price1'])-1)
    return (slope_bid+slope_ask)/2, abs(slope_bid-slope_ask)

# 读取数据
def read():
    train = pd.read_csv('../input/optiver-realized-volatility-prediction/train.csv')
    test = pd.read_csv('../input/optiver-realized-volatility-prediction/test.csv')
    # 创建合并索引
    train['row_id'] = train['stock_id'].astype(str) + '-' + train['time_id'].astype(str)
    test['row_id'] = test['stock_id'].astype(str) + '-' + test['time_id'].astype(str)
    print(f'训练集行数{train.shape[0]}')
    print(f'测试集行数{test.shape[0]}')
    return train, test

# 处理订单本数据

def book_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    # 特征衍生
    df['wap1'] = calc_wap1(df)
    df['wap2'] = calc_wap2(df)
    df['log_return1'] = df.groupby(['time_id'], group_keys=False)['wap1'].apply(log_return)
    df['log_return2'] = df.groupby(['time_id'], group_keys=False)['wap2'].apply(log_return)
    df['wap_balance'] = abs(df['wap1'] - df['wap2'])
    df['price_spread_1'] = (df['ask_price1'] - df['bid_price1'])/df['ask_price1']
    df['price_spread_2'] = (df['ask_price2'] - df['bid_price2'])/df['ask_price2']
    df['log_ask1'] = np.log(df['ask_size1'])
    df['log_ask2'] = np.log(df['ask_size2']) 
    df['log_bid1'] = np.log(df['bid_size1']) 
    df['log_bid2'] = np.log(df['bid_size2']) 
    df['log_ask_total'] = np.log(df['ask_size1']+df['ask_size2']) 
    df['log_bid_total'] = np.log(df['bid_size1']+df['bid_size2']) 
    df['log_askplusbid'] =  df['log_ask_total']+df['log_bid_total']
    df['log_askdbid'] =  df['log_ask_total']/df['log_bid_total']
    df['price_spread'] = (df['ask_price1'] - df['bid_price1']) / ((df['ask_price1'] + df['bid_price1']) / 2)
    df['price_spread2'] = (df['ask_price2'] - df['bid_price2']) / ((df['ask_price2'] + df['bid_price2']) / 2)
    df['bid_spread'] = df['bid_price1'] - df['bid_price2']
    df['ask_spread'] = df['ask_price1'] - df['ask_price2']
    df['total_volume'] = (df['ask_size1'] + df['ask_size2']) + (df['bid_size1'] + df['bid_size2'])
    df['apply_require'] = (df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2'])
    df['volume_imbalance'] = abs((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2']))
    df['bid_vol1'] = np.prod([df['bid_price1'], df['bid_size1']])
    df['bid_vol2'] = np.prod([df['bid_price2'], df['bid_size2']])
    df['ask_vol1'] = np.prod([df['ask_price1'], df['ask_size1']])
    df['ask_vol2'] = np.prod([df['ask_price2'], df['ask_size2']])
    df['price1_abs'] = df['bid_spread'].abs()
    df['price2_abs'] = df['ask_spread'].abs()
    df['price_spread1'] = (df['ask_price1'] - df['bid_price1']) / (df['ask_price1'] + df['bid_price1'])
    df['depth']=calc_depth(df)
    df['slope1'],df['slope2']=calc_slope(df)
    # 窗口聚合字典
    create_feature_dict = {
        'wap1': [np.mean, np.std],
        'wap2': [np.mean, np.std],
        'log_return1': [realized_volatility, np.std, kurtosis, skew,np.max,np.min],
        'log_return2': [realized_volatility, np.std, kurtosis, skew,np.max,np.min],
        'log_ask1': [np.mean, np.std,np.max,np.min],
        'log_ask2': [np.mean, np.std],
        'log_bid1': [np.mean, np.std,np.max,np.min],
        'log_bid2': [np.mean, np.std],
        'log_ask_total': [np.mean, np.std,np.max,np.min],
        'log_bid_total': [np.mean, np.std,np.max,np.min],
        'log_askplusbid': [np.mean, np.std,np.max,np.min],
        'log_askdbid': [np.mean, np.std,np.max,np.min],
        'wap_balance': [np.mean, np.std,np.max,np.min],
        'price_spread_1':[np.mean, np.std,np.max,np.min],
        'price_spread_2':[np.mean, np.std,np.max,np.min],
        'price_spread':[ np.mean, np.std,np.max,np.min],
        'price_spread1':[ np.mean, np.std,np.max,np.min],
        'price_spread2':[ np.mean, np.std,np.max,np.min],
        'bid_spread':[ np.mean, np.std,np.max,np.min],
        'ask_spread':[np.mean, np.std,np.max,np.min],
        'total_volume':[np.mean, np.std,np.max,np.min],
        'volume_imbalance':[np.mean, np.std,np.max,np.min],
        'bid_vol1':[np.mean, np.std,np.max,np.min],
        'bid_vol2':[np.mean, np.std],
        'ask_vol1':[np.mean, np.std,np.max,np.min],
        'ask_vol2':[np.mean, np.std],
        'depth':[np.mean, np.std,np.max,np.min],
        'slope1':[np.mean, np.std,np.max,np.min],
        'slope2':[np.mean, np.std,np.max,np.min],
        'apply_require':[ np.mean, np.std,np.max,np.min]
    }
    
    #分组窗口聚合
    def get_stats_window(seconds_in_bucket, add_suffix = False):

        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(create_feature_dict).reset_index()
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature
    
    
    
    df_feature = get_stats_window(seconds_in_bucket = 0, add_suffix = False)
    df_feature_300 = get_stats_window(seconds_in_bucket = 300, add_suffix = True)
    df_feature_450 = get_stats_window(seconds_in_bucket = 450, add_suffix = True)
    df_feature_550 = get_stats_window(seconds_in_bucket = 550, add_suffix = True)
    df_feature = df_feature.merge(df_feature_300, how = 'left', left_on = 'time_id_', right_on = 'time_id__300')
    df_feature = df_feature.merge(df_feature_450, how = 'left', left_on = 'time_id_', right_on = 'time_id__450')
    df_feature = df_feature.merge(df_feature_550, how = 'left', left_on = 'time_id_', right_on = 'time_id__550')
    df_feature.drop(['time_id__300', 'time_id__450', 'time_id__550'], axis = 1, inplace = True)

    
    # 创建合并索引
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['time_id_'].apply(lambda x: f'{stock_id}-{x}')
    df_feature.drop(['time_id_'], axis = 1, inplace = True)
    return df_feature

# 处理交易数据
def trade_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    df['log_return'] = df.groupby('time_id', group_keys=False)['price'].apply(log_return)
    df['amount']=df['price']*df['size']
    df['vol'] = np.prod([df['price'], df['size']])
# 窗口聚合字典
    create_feature_dict = {
        'log_return':[realized_volatility, np.mean, np.std, kurtosis, skew],
        'seconds_in_bucket':[count_unique],
        'size':[np.sum, np.mean, np.std, np.max, np.min, kurtosis, skew],
        'order_count':[np.sum, np.mean, np.std, np.max, np.min, kurtosis, skew],
        'amount':[np.sum, np.mean, np.std, np.max, np.min, kurtosis, skew]
    }
    
     #分组窗口聚合
    def get_stats_window(seconds_in_bucket, add_suffix = False):
        # 以时间窗口分组统计
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(create_feature_dict).reset_index()
        # 重命名
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature
    
    
    df_feature = get_stats_window(seconds_in_bucket = 0, add_suffix = False)
    df_feature_300 = get_stats_window(seconds_in_bucket = 300, add_suffix = True)
    df_feature_450 = get_stats_window(seconds_in_bucket = 450, add_suffix = True)
    df_feature_550 = get_stats_window(seconds_in_bucket = 550, add_suffix = True)
    df_feature = df_feature.merge(df_feature_300, how = 'left', left_on = 'time_id_', right_on = 'time_id__300')
    df_feature = df_feature.merge(df_feature_450, how = 'left', left_on = 'time_id_', right_on = 'time_id__450')
    df_feature = df_feature.merge(df_feature_550, how = 'left', left_on = 'time_id_', right_on = 'time_id__550')
    df_feature.drop(['time_id__300', 'time_id__450', 'time_id__550'], axis = 1, inplace = True)

    
    df_feature = df_feature.add_prefix('trade_')
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['trade_time_id_'].apply(lambda x:f'{stock_id}-{x}')
    df_feature.drop(['trade_time_id_'], axis = 1, inplace = True)
    return df_feature


def get_time_stock(df):
    #需要计算统计特征的列名
    vol_cols = ['log_return1_realized_volatility', 'log_return2_realized_volatility', 'log_return1_realized_volatility_300', 'log_return2_realized_volatility_300', 
                'log_return1_realized_volatility_450', 'log_return2_realized_volatility_450', 'log_return1_realized_volatility_550', 'log_return2_realized_volatility_550',
                'trade_log_return_realized_volatility', 'trade_log_return_realized_volatility_300', 'trade_log_return_realized_volatility_450', 'trade_log_return_realized_volatility_550'
               ,'trade_seconds_in_bucket_count_unique','trade_seconds_in_bucket_count_unique_450','trade_size_sum','trade_size_sum_300','trade_size_sum_450']

    #stock_id聚合
    df_stock_id = df.groupby(['stock_id'])[vol_cols].agg(['mean', 'std', 'max', 'min',lambda x: np.percentile(x, q=25), lambda x: np.percentile(x, q=75) ]).reset_index()
    df_stock_id.columns = ['_'.join(col) for col in df_stock_id.columns]
    df_stock_id = df_stock_id.add_suffix('_' + 'stock')
    #time_id聚合 
    df_time_id = df.groupby(['time_id'])[vol_cols].agg(['mean', 'std', 'max', 'min',lambda x: np.percentile(x, q=25), lambda x: np.percentile(x, q=75) ]).reset_index()
    df_time_id.columns = ['_'.join(col) for col in df_time_id.columns]
    df_time_id = df_time_id.add_suffix('_' + 'time')
    

    df = df.merge(df_stock_id, how = 'left', left_on = ['stock_id'], right_on = ['stock_id__stock'])
    df = df.merge(df_time_id, how = 'left', left_on = ['time_id'], right_on = ['time_id__time'])
    df.drop(['stock_id__stock', 'time_id__time'], axis = 1, inplace = True)
    return df
    
# 处理函数集成
def preprocessor(list_stock_ids, is_train = True):
    
    def for_joblib(stock_id):
        # 训练集
        if is_train:
            file_path_book = data_dir + "book_train.parquet/stock_id=" + str(stock_id)
            file_path_trade = data_dir + "trade_train.parquet/stock_id=" + str(stock_id)
        # 测试集
        else:
            file_path_book = data_dir + "book_test.parquet/stock_id=" + str(stock_id)
            file_path_trade = data_dir + "trade_test.parquet/stock_id=" + str(stock_id)
    
        # 合并订单本数据和交易数据
        df_tmp = pd.merge(book_preprocessor(file_path_book), trade_preprocessor(file_path_trade), on = 'row_id', how = 'left')
        
        return df_tmp
    
      # 并行循环处理
    df = Parallel(n_jobs = -1, verbose = 1)(delayed(for_joblib)(stock_id) for stock_id in tqdm(list_stock_ids))
    df = pd.concat(df, ignore_index = True)
    return df



def rmspe(y_true, y_pred):
    return np.sqrt(np.mean(np.square((y_true - y_pred) / y_true)))

def feval_rmspe(y_pred, lgb_train):
    y_true = lgb_train.get_label()
    return 'RMSPE', rmspe(y_true, y_pred), False

def train_and_evaluate_LGB(train, test):
    # 超参数
    params = {
      'objective': 'rmse',  
      'boosting_type': 'gbdt',
      'num_leaves': 100,
      'n_jobs': -1,
      'learning_rate': 0.1,
      'feature_fraction': 0.8,
      'bagging_fraction': 0.8,
      'verbose': -1
    }
    
    # 数据集分割
    x = train.drop(['row_id', 'target', 'time_id'], axis = 1)
    y = train['target']
    x_test = test.drop(['row_id', 'time_id'], axis = 1)
    x['stock_id'] = x['stock_id'].astype(int)
    x_test['stock_id'] = x_test['stock_id'].astype(int)
    
    # 每一折训练后得到的验证集预测结果的集合
    oof_predictions = np.zeros(x.shape[0])
    # 每一折训练后对测试集的结果进行的预测的集合
    test_predictions = np.zeros(x_test.shape[0])
    # 5折交叉验证
    kfold = KFold(n_splits = 5, random_state = 66, shuffle = True)
    for fold, (trn_ind, val_ind) in enumerate(kfold.split(x)):
#         print(f'Training fold {fold + 1}')
#         x_train, x_val = x.iloc[trn_ind], x.iloc[val_ind]
#         y_train, y_val = y.iloc[trn_ind], y.iloc[val_ind]

#         train_weights = 1 / np.square(y_train)
#         val_weights = 1 / np.square(y_val)
#         train_dataset = lgb.Dataset(x_train, y_train, weight = train_weights, categorical_feature = ['stock_id'])
#         val_dataset = lgb.Dataset(x_val, y_val, weight = val_weights, categorical_feature = ['stock_id'])
        
#         model = lgb.train(params = params, 
#                         train_set = train_dataset, 
#                         valid_sets = [train_dataset, val_dataset], 
#                         num_boost_round = 10000, 
#                         early_stopping_rounds = 50, 
#                         verbose_eval = 50,
#                         feval = feval_rmspe)
#         joblib.dump(model, f"LGB_{fold}.pkl")
        
     model = joblib.load(f'../input/optiver-lgb-cat/LGB_{fold}(1).pkl')
#oof_predictions[val_ind] = model.predict(x_val)
  
     test_predictions += np.abs(model.predict(x_test)) / 5
        
#     rmspe_score = rmspe(y, oof_predictions)
#     print(f'每一折的RMSPE为 {rmspe_score}')

    return test_predictions

def train_and_evaluate_CAT(train, test):
    # 超参数
    params = {
      'objective': 'RMSE',  
      'verbose': True,
      'iterations': 3000, 
      'early_stopping_rounds': 30,
      'eval_metric': 'RMSE',
      'subsample': 0.8,
      'random_seed': 2023,
    }

    # 数据集分割
    x = train.drop(['row_id', 'target', 'time_id'], axis=1)
    y = train['target']
    x_test = test.drop(['row_id', 'time_id'], axis=1)
    x['stock_id'] = x['stock_id'].astype(int)
    x_test['stock_id'] = x_test['stock_id'].astype(int)

    # 每一折训练后得到的验证集预测结果的集合
    oof_predictions = np.zeros(x.shape[0])
    # 每一折训练后对测试集的结果进行的预测的集合
    test_predictions = np.zeros(x_test.shape[0])
    # 5折交叉验证
    kfold = KFold(n_splits=5, random_state=11, shuffle=True)

    for fold, (trn_ind, val_ind) in enumerate(kfold.split(x)):
#         print(f'Training fold {fold + 1}')
#         x_train, x_val = x.iloc[trn_ind], x.iloc[val_ind]
#         y_train, y_val = y.iloc[trn_ind], y.iloc[val_ind]

#         train_weights = 1 / np.square(y_train)
#         val_weights = 1 / np.square(y_val)

#         train_pool = catboost.Pool(x_train, y_train, weight=train_weights, cat_features=['stock_id'])
#         val_pool = catboost.Pool(x_val, y_val, weight=val_weights, cat_features=['stock_id'])

#         model = CatBoostRegressor(**params)
#         model.fit(train_pool, eval_set=val_pool, verbose_eval=50)
#         joblib.dump(model, f"CAT_{fold}.pkl")
        model = joblib.load(f'../input/optiver-lgb-cat/CAT_{fold}(1).pkl')

#         oof_predictions[val_ind] = model.predict(x_val)

        test_predictions += np.abs(model.predict(x_test)) / 5

#     rmspe_score = rmspe(y, oof_predictions)
#     print(f'每一折的RMSPE为 {rmspe_score}')

    return test_predictions



In [3]:
train, test = read()

训练集行数428932
测试集行数3


In [4]:
# train_stock_ids = train['stock_id'].unique()
# train_ = preprocessor(train_stock_ids, is_train = True)
# train = train.merge(train_, on = ['row_id'], how = 'left')
# train = get_time_stock(train)

In [5]:
# train.to_parquet('train-lgb-cat.parquet')
train = pd.read_parquet('../input/optiver1/train-lgb-cat.parquet')

In [6]:
test_stock_ids = test['stock_id'].unique()
test_ = preprocessor(test_stock_ids, is_train = False)
test = test.merge(test_, on = ['row_id'], how = 'left')
test = get_time_stock(test)

  0%|          | 0/1 [00:00<?, ?it/s][Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers.
100%|██████████| 1/1 [00:00<00:00, 10.87it/s]
[Parallel(n_jobs=-1)]: Done   1 out of   1 | elapsed:    2.3s finished


In [7]:
test_predictions_l = train_and_evaluate_LGB(train, test)
test_predictions_c = train_and_evaluate_CAT(train, test)

In [8]:
import gc
gc.collect()

0

# NN

In [9]:
from tensorflow import keras
from numpy.random import seed
import tensorflow as tf
from sklearn.pipeline import Pipeline
from collections import Counter, defaultdict
from keras.layers import Activation
from tensorflow import keras
import numpy as np
from keras import backend as K
from keras.backend import sigmoid
import random
from sklearn import preprocessing, model_selection
from sklearn.preprocessing import MinMaxScaler,StandardScaler,LabelEncoder
from sklearn.preprocessing import QuantileTransformer
from sklearn.metrics import r2_score
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
def swish(x, beta = 1):
    return (x * sigmoid(beta * x))
custom_objects = keras.utils.get_custom_objects()
keras.utils.get_custom_objects().update({'swish': keras.layers.Activation(swish)})

def root_mean_squared_per_error(y_true, y_pred):
         return K.sqrt(K.mean(K.square( (y_true - y_pred)/ y_true )))


In [10]:
def get_time_agg(df):
    #需要计算统计特征的列名
    gcols=['book_log_return1_realized_volatility']
    gcols+=['book_log_return1_realized_volatility_150win_150']
    gcols+=['book_log_return2_realized_volatility_150win_300']
    gcols+=['book_log_return1_realized_volatility_150win_450']+['trade_log_return_realized_volatility_150win_450']
    gcols+=['book_price_spread_sum_150win_150']
    gcols+=['trade_size_tau_150win_150']
    gcols+=['book_depth_sum_150win_150']
    gcols+=['book_dispersion_sum_150win_150']

     #time_id聚合
    df_time_id = df.groupby('time_id')[gcols].agg(['mean', 'std', 'max', 'min']).reset_index()

    df_time_id.columns = ['_'.join(col) for col in df_time_id.columns]
    df_time_id = df_time_id.add_suffix('_' + 'time')
    

    df_time_id = df_time_id.rename(columns={'time_id__time':'time_id'})
    return df_time_id, [col for col in df_time_id if col not in ['time_id']]

In [11]:
#分层K折交叉验证，分层指的是训练集和测试集中的样本尽可能地包含来自每个组的不同类别的样本
def stratified_group_k_fold(X, y, groups, k, seed=None):
    """ https://www.kaggle.com/jakubwasikowski/stratified-group-k-fold-cross-validation """
    #统计 y 中类别的数量和每个组中每个类别出现的次数
    labels_num = np.max(y) + 1
    y_counts_per_group = defaultdict(lambda: np.zeros(labels_num))
    y_distr = Counter()
    for label, g in zip(y, groups):
        y_counts_per_group[g][label] += 1
        y_distr[label] += 1
    
    # 初始化 y_counts_per_fold 和 groups_per_fold 变量，用于存储每个折的每个类别在每个组中的出现次数和每个折包含的组的集合
    y_counts_per_fold = defaultdict(lambda: np.zeros(labels_num))
    groups_per_fold = defaultdict(set)
    
    #定义辅助函数 eval_y_counts_per_fold，用于计算每一折中每个类别出现次数的标准差
    def eval_y_counts_per_fold(y_counts, fold):
        y_counts_per_fold[fold] += y_counts
        std_per_label = []
        for label in range(labels_num):
            label_std = np.std([y_counts_per_fold[i][label] / y_distr[label] for i in range(k)])
            std_per_label.append(label_std)
        y_counts_per_fold[fold] -= y_counts
        return np.mean(std_per_label)
    
    #对 groups 根据 y 的出现次数排序，并随机打乱顺序，得到 groups_and_y_counts
    groups_and_y_counts = list(y_counts_per_group.items())
    random.Random(seed).shuffle(groups_and_y_counts)

    for g, y_counts in tqdm(sorted(groups_and_y_counts, key=lambda x: -np.std(x[1])), total=len(groups_and_y_counts)):
        best_fold = None
        min_eval = None
        for i in range(k):
            fold_eval = eval_y_counts_per_fold(y_counts, i)
            if min_eval is None or fold_eval < min_eval:
                min_eval = fold_eval
                best_fold = i
        y_counts_per_fold[best_fold] += y_counts
        groups_per_fold[best_fold].add(g)

    all_groups = set(groups)
    for i in range(k):
        train_groups = all_groups - groups_per_fold[i]
        test_groups = groups_per_fold[i]

        train_indices = [i for i, g in enumerate(groups) if g in train_groups]
        test_indices = [i for i, g in enumerate(groups) if g in test_groups]

        yield train_indices, test_indices

In [12]:
def base_model(numfeats, cat_data=train['stock_id']):
    #定义了神经网络的隐藏层，其中包含了多个全连接层，每层都有不同数量的神经元
    hidden_units = (64, 32, 32, 16, 16, 16, 16, 8, 8)
    #指定股票 ID 的嵌入维度
    stock_embedding_size = 24
    
    # 分别表示输入的股票 ID（通过 Embedding 层嵌入）和数值特征
    stock_id_input = keras.Input(shape=(1,), name='stock_id')
    num_input = keras.Input(shape=(numfeats,), name='num_data')


    #将每个股票 ID 嵌入到一个矢量空间中，转化为固定长度的特征向量
    stock_embedded = keras.layers.Embedding(max(cat_data)+1, stock_embedding_size, 
                                           input_length=1, name='stock_embedding')(stock_id_input)
    
    #将 Embedding 层的输出展平，与数值特征拼接在一起
    stock_flattened = keras.layers.Flatten()(stock_embedded)
    out = keras.layers.Concatenate()([stock_flattened, num_input])
    
    # 通过循环添加多个全连接层，并使用 Swish 作为激活函数。最后为输出层设计单个神经元，其激活函数为线性函数
    for n_hidden in hidden_units:
        out = keras.layers.Dense(n_hidden, activation='swish')(out)

    #out = keras.layers.Concatenate()([out, num_input])

    out = keras.layers.Dense(1, activation='linear', name='prediction')(out)
    
    model = keras.Model(
    inputs = [stock_id_input, num_input],
    outputs = out,
    )
    
    return model




In [13]:
def read_train_test():
    
    train = pd.read_csv('../input/optiver-realized-volatility-prediction/train.csv')
    test = pd.read_csv('../input/optiver-realized-volatility-prediction/test.csv')


    train['row_id'] = train['stock_id'].astype(str) + '-' + train['time_id'].astype(str)
    test['row_id'] = test['stock_id'].astype(str) + '-' + test['time_id'].astype(str)
    print(f'Our training set has {train.shape[0]} rows')
    print(f'Our test set has {test.shape[0]} rows')
    print(f'Our training set has {train.isna().sum().sum()} missing values')
    print(f'Our test set has {test.isna().sum().sum()} missing values')
    
    return train, test

train, test = read_train_test()

Our training set has 428932 rows
Our test set has 3 rows
Our training set has 0 missing values
Our test set has 0 missing values


In [14]:
data_dir = '../input/optiver-realized-volatility-prediction/'


def calc_wap1(df):
    wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']) / (df['bid_size1'] + df['ask_size1'])
    return wap

def calc_wap2(df):
    wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']) / (df['bid_size2'] + df['ask_size2'])
    return wap

def calc_wap12(df):
    var1 = df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']
    var2 = df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']
    den = df['bid_size1'] + df['ask_size1'] + df['bid_size2'] + df['ask_size2']
    return (var1+var2) / den

def calc_wap3(df):
    wap = (df['bid_price1'] * df['bid_size1'] + df['ask_price1'] * df['ask_size1']) / (df['bid_size1'] + df['ask_size1'])
    return wap

def calc_wap34(df):
    var1 = df['bid_price1'] * df['bid_size1'] + df['ask_price1'] * df['ask_size1']
    var2 = df['bid_price2'] * df['bid_size2'] + df['ask_price2'] * df['ask_size2']
    den = df['bid_size1'] + df['ask_size1'] + df['bid_size2'] + df['ask_size2']
    return (var1+var2) / den

def calc_swap12(df):
    return df['wap12'] - df['wap34']

def calc_tswap1(df):
    return -df['swap1'].diff()

def calc_tswap12(df):
    return -df['swap12'].diff()

def calc_wss12(df):
    ask = (df['ask_price1'] * df['ask_size1'] + df['ask_price2'] * df['ask_size2'])/(df['ask_size1']+df['ask_size2'])
    bid = (df['bid_price1'] * df['bid_size1'] + df['bid_price2'] * df['bid_size2'])/(df['bid_size1']+df['bid_size2'])
    return (ask - bid) / df['midprice']


def calc_depth(df):
    depth = df['bid_price1'] * df['bid_size1'] + df['ask_price1'] * df['ask_size1'] + df['bid_price2'] * df[
               'bid_size2'] + df['ask_price2'] * df['ask_size2']
    return depth


def calc_slope(df):
    v0 = (df['bid_size1']+df['ask_size1'])/2
    p0 = (df['bid_price1']+df['ask_price1'])/2
    slope_bid = ((df['bid_size1']/v0)-1)/abs((df['bid_price1']/p0)-1)+(
                (df['bid_size2']/df['bid_size1'])-1)/abs((df['bid_price2']/df['bid_price1'])-1)
    slope_ask = ((df['ask_size1']/v0)-1)/abs((df['ask_price1']/p0)-1)+(
                (df['ask_size2']/df['ask_size1'])-1)/abs((df['ask_price2']/df['ask_price1'])-1)
    return (slope_bid+slope_ask)/2, abs(slope_bid-slope_ask)


def calc_dispersion(df):
    bspread = df['bid_price1'] - df['bid_price2']
    aspread = df['ask_price2'] - df['ask_price1']
    bmid = (df['bid_price1'] + df['ask_price1'])/2  - df['bid_price1']
    bmid2 = (df['bid_price1'] + df['ask_price1'])/2  - df['bid_price2']
    amid = df['ask_price1'] - (df['bid_price1'] + df['ask_price1'])/2
    amid2 = df['ask_price2'] - (df['bid_price1'] + df['ask_price1'])/2
    bdisp = (df['bid_size1']*bmid + df['bid_size2']*bspread)/(df['bid_size1']+df['bid_size2'])
    bdisp2 = (df['bid_size1']*bmid + df['bid_size2']*bmid2)/(df['bid_size1']+df['bid_size2'])
    adisp = (df['ask_size1']*amid + df['ask_size2']*aspread)/(df['ask_size1']+df['ask_size2'])      
    adisp2 = (df['ask_size1']*amid + df['ask_size2']*amid2)/(df['ask_size1']+df['ask_size2'])
    return (bdisp + adisp)/2, (bdisp2 + adisp2)/2

def calc_price_impact(df):
    ask = (df['ask_price1'] * df['ask_size1'] + df['ask_price2'] * df['ask_size2'])/(df['ask_size1']+df['ask_size2'])
    bid = (df['bid_price1'] * df['bid_size1'] + df['bid_price2'] * df['bid_size2'])/(df['bid_size1']+df['bid_size2'])
    return (df['ask_price1'] - ask)/df['ask_price1'], (df['bid_price1'] - bid)/df['bid_price1']


def calc_ofi(df):
    a = df['bid_size1']*np.where(df['bid_price1'].diff()>=0,1,0)
    b = df['bid_size1'].shift()*np.where(df['bid_price1'].diff()<=0,1,0)
    c = df['ask_size1']*np.where(df['ask_price1'].diff()<=0,1,0)
    d = df['ask_size1'].shift()*np.where(df['ask_price1'].diff()>=0,1,0)
    return a - b - c + d


def calc_tt1(df):
    p1 = df['ask_price1'] * df['ask_size1'] + df['bid_price1'] * df['bid_size1']
    p2 = df['ask_price2'] * df['ask_size2'] + df['bid_price2'] * df['bid_size2']      
    return p2 - p1 


def log_return(series):
    return np.log(series).diff()

def log_return_out(series):
    ret = np.log(series).diff()
    return remove_outliers(ret)

def remove_outliers(series):
    cu = 6
    ser_mean, ser_std = np.mean(series), np.std(series)
    series = series.where(series<=(ser_mean + cu*ser_std), ser_mean)
    series = series.where(series>=(ser_mean - cu*ser_std), ser_mean)
    return series

def realized_volatility(series):
    return np.sqrt(np.sum(series**2))
    
def realized_volatility_downside(series):
    return np.sqrt(np.sum(series[series<0]**2))

def realized_volatility_upside(series):
    return np.sqrt(np.sum(series[series>0]**2))


def realized_bipowvar(series):
    cnt = series.count()
    if cnt<3:
        return np.nan
    else:
        cons = (np.pi/2)*(cnt/(cnt-2))
        return cons*np.nansum(np.abs(series)*np.abs(series.shift()))
    

def realized_quarticity(series):
    return (series.count()/3)*np.sum(series**4)


def realized_medianvar(series):
    cnt = series.count()
    if cnt<3:
        return np.nan
    else:
        cons = (np.pi/(6-4*np.sqrt(3)+np.pi))*(cnt/(cnt-2))
        return cons*np.nansum(np.median([np.abs(series),np.abs(series.shift()),np.abs(series.shift(2))], axis=0)**2)
    

def realized_absvar(series):
    return np.sqrt(np.pi/(2*series.count()))*np.sum(np.abs(series))


def realized_vol_weighted(series):
    return np.sqrt(np.sum(series**2)/series.count())


def realized_skew(series):
    return np.sqrt(series.count())*np.sum(series**3)/(realized_volatility(series)**3)


def realized_kurtosis(series):
    return series.count()*np.sum(series**4)/(realized_volatility(series)**4)


def get_stats_bins(df, feat_dict, bins, quantile=False):
    # Group by the window
    if bins==0:
        df_feature = df.groupby('time_id').agg(feat_dict).reset_index()
        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        df_feature = df_feature.rename(columns={'time_id_': 'time_id'})
    else:
        if quantile:
            df['sbins'] = pd.qcut(df.seconds_in_bucket, bins, labels=False)
            q = 'q'
        else:
            df['sbins'] = pd.cut(df.seconds_in_bucket, bins, labels=False)
            q = ''
        df_feature = None
        for i in range(bins):
            df_feat = df.loc[df.sbins==i].groupby('time_id').agg(feat_dict).reset_index()
            # Rename columns joining suffix
            df_feat.columns = ['_'.join(col) for col in df_feat.columns]
            # Add a suffix to differentiate bins
            df_feat = df_feat.rename(columns={col: col+'_'+q+str(bins)+'bins_'+str(i) for col in df_feat.columns})
            df_feat = df_feat.rename(columns={'time_id_'+'_'+q+str(bins)+'bins_'+str(i): 'time_id'})
            if isinstance(df_feature, pd.DataFrame):
                df_feature = pd.merge(df_feature, df_feat, how='left', on='time_id')
            else:
                df_feature = df_feat.copy()
        df = df.drop('sbins', axis=1)
    return df_feature

# Function to get group stats for different windows (seconds in bucket)
def get_stats_window(df, feat_dict, window):
    # Group by the window
    if window==0:
        df_feature = df.groupby('time_id').agg(feat_dict).reset_index()
        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        df_feature = df_feature.rename(columns={'time_id_': 'time_id'})
    else:
        df_feature = None
        for w in range(window, 600, window):
            df_feat = df.loc[df.seconds_in_bucket>=w].groupby('time_id').agg(feat_dict).reset_index()
            # Rename columns joining suffix
            df_feat.columns = ['_'.join(col) for col in df_feat.columns]
            # Add a suffix to differentiate bins
            df_feat = df_feat.rename(columns={col: col+'_'+str(window)+'win_'+str(w) for col in df_feat.columns})
            df_feat = df_feat.rename(columns={'time_id_'+'_'+str(window)+'win_'+str(w): 'time_id'})
            if isinstance(df_feature, pd.DataFrame):
                df_feature = pd.merge(df_feature, df_feat, how='left', on='time_id')
            else:
                df_feature = df_feat.copy()
    return df_feature

# Function to sample from the window and average
def get_stats_sampled(df, feat_dict, numsamp, fraction, boot):
    df_feature = None
    for i in range(numsamp):
        df_feat = df.groupby('time_id').sample(frac=fraction, random_state=i, replace=boot).reset_index(drop=True)
        df_feat = df_feat.groupby('time_id').agg(feat_dict).reset_index()
        df_feat.columns = ['_'.join(col) for col in df_feat.columns]
        if isinstance(df_feature, pd.DataFrame):
            df_feature += df_feat.values/numsamp   
        else:
            df_feature = df_feat.copy()/numsamp
    df_feature = df_feature.rename(columns={col: col+'_sample_'+str(fraction)+'_'+str(numsamp) for col in df_feature.columns})
    df_feature = df_feature.rename(columns={'time_id_'+'_sample_'+str(fraction)+'_'+str(numsamp): 'time_id'})
    return df_feature

In [15]:
def book_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    
    #### Data Cleansing (if any needed)
    
    df = df.drop(df.loc[df.ask_price1 <= 0].index)
    df = df.drop(df.loc[df.bid_price1 <= 0].index)
    df = df.drop(df.loc[(df['ask_price1'] - df['bid_price1']) < 0].index)
    df = df.groupby(['time_id','seconds_in_bucket']).mean().reset_index()
    
    ####
    
    # Calculate prices
    df['wap1'] = calc_wap1(df)
    df['wap2'] = calc_wap2(df)
    
    df['depth'] = calc_depth(df)
    df['slope'], _ = calc_slope(df)

    # Calculate log returns
    df['log_return1'] = df.groupby('time_id')['wap1'].apply(log_return)
    df['log_return2'] = df.groupby('time_id')['wap2'].apply(log_return)
    
    # Calculate spreads
    df['price_spread'] = (df['ask_price1'] - df['bid_price1']) / ((df['ask_price1'] + df['bid_price1']) / 2)
    df['dispersion'], _ = calc_dispersion(df)

    # Dict for aggregations
    create_feature_dict = {
        'depth': [np.sum],
        'slope': [np.sum],
        'log_return1': [realized_volatility, realized_absvar],
        'log_return2': [realized_volatility, realized_absvar],
        'price_spread': [np.sum, np.nanmedian],
    }

    create_feature_dict_bins = {
        'depth': [np.sum],
        'slope': [np.sum],
        'dispersion': [np.sum],
        'log_return1': [realized_volatility, realized_absvar],
        'log_return2': [realized_volatility, realized_absvar],
        'price_spread': [np.sum],
    }


    # Get the stats for different windows
    df_feature_0 = get_stats_bins(df, create_feature_dict, 0)
    df_feature_w4 = get_stats_window(df, create_feature_dict_bins, 150)

    # Merge all
    df_feature_0 = df_feature_0.merge(df_feature_w4, how = 'left', on = 'time_id')   

    df_feature_0 = df_feature_0.add_prefix('book_')
    stock_id = file_path.split('=')[1]
    df_feature_0['row_id'] = df_feature_0['book_time_id'].apply(lambda x:f'{stock_id}-{x}')
    df_feature_0.drop(['book_time_id'], axis = 1, inplace = True)
    return df_feature_0

# Function to preprocess trade data (for each stock id)
def trade_preprocessor(file_path):
    df = pd.read_parquet(file_path)

    #### Data Cleansing
    
    df = df.drop(df.loc[df.price <= 0].index)
    df = df.groupby(['time_id','seconds_in_bucket']).mean().reset_index()
    
    ####
    
    df['log_return'] = df.groupby('time_id')['price'].apply(log_return)
    
    # Dict for aggregations
    create_feature_dict = {
        'log_return': [realized_volatility, realized_absvar],
        'order_count':[np.sum, np.max],
    }
    
    create_feature_dict_bins = {
        'log_return': [realized_volatility, realized_absvar],
        'order_count': [np.sum],
    }
    
    # Get the stats for different windows
    df_feature_0 = get_stats_bins(df, create_feature_dict, 0)
    df_feature_w4 = get_stats_window(df, create_feature_dict_bins, 150)

    # Merge all
    df_feature_0 = df_feature_0.merge(df_feature_w4, how = 'left', on = 'time_id')   

    df_feature_0 = df_feature_0.add_prefix('trade_')
    stock_id = file_path.split('=')[1]
    df_feature_0['row_id'] = df_feature_0['trade_time_id'].apply(lambda x:f'{stock_id}-{x}')
    df_feature_0.drop(['trade_time_id'], axis = 1, inplace = True)
    return df_feature_0
    
# Funtion to make preprocessing function in parallel (for each stock id)
def preprocessor(list_stock_ids, is_train = True):
    
    # Parrallel for loop
    def for_joblib(stock_id):
        # Train
        if is_train:
            file_path_book = data_dir + "book_train.parquet/stock_id=" + str(stock_id)
            file_path_trade = data_dir + "trade_train.parquet/stock_id=" + str(stock_id)
        # Test
        else:
            file_path_book = data_dir + "book_test.parquet/stock_id=" + str(stock_id)
            file_path_trade = data_dir + "trade_test.parquet/stock_id=" + str(stock_id)
    
        # Preprocess book and trade data and merge them
        df_tmp = pd.merge(book_preprocessor(file_path_book), trade_preprocessor(file_path_trade), on = 'row_id', how = 'left')      
        # Return the merge dataframe
        return df_tmp
    
    # Use parallel api to call paralle for loop
    df = Parallel(n_jobs = -1, verbose = 1)(delayed(for_joblib)(stock_id) for stock_id in list_stock_ids)
    # Concatenate all the dataframes that return from Parallel
    df = pd.concat(df, ignore_index = True)
    return df

# Get unique stock ids 
train_stock_ids = train['stock_id'].unique()
# Preprocess them using Parallel and our single stock id functions
train_ = preprocessor(train_stock_ids, is_train = True)
train = train.merge(train_, on = ['row_id'], how = 'left')

# Get unique stock ids 
test_stock_ids = test['stock_id'].unique()
# Preprocess them using Parallel and our single stock id functions
test_ = preprocessor(test_stock_ids, is_train = False)
test = test.merge(test_, on = ['row_id'], how = 'left')
gc.collect()

[Parallel(n_jobs=-1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=-1)]: Done 112 out of 112 | elapsed: 55.2min finished
[Parallel(n_jobs=-1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 out of   1 | elapsed:    0.1s finished


52

In [16]:
train['trade_size_tau'] = np.sqrt(1/train['trade_order_count_sum'])
    
for w in range(150, 600, 150):
    train['trade_size_tau_150win_'+str(w)] = np.sqrt(1/train['trade_order_count_sum_150win_'+str(w)])
    
test['trade_size_tau'] = np.sqrt(1/test['trade_order_count_sum'])
    
for w in range(150, 600, 150):
    test['trade_size_tau_150win_'+str(w)] = np.sqrt(1/test['trade_order_count_sum_150win_'+str(w)])

In [17]:
cols = ['row_id','time_id','stock_id','target']
cols+=['book_log_return1_realized_volatility']
cols+=['book_log_return1_realized_volatility_150win_150']+['book_log_return2_realized_volatility_150win_150']+[
            'trade_log_return_realized_volatility_150win_150']
cols+=['book_log_return1_realized_absvar_150win_150']+['book_log_return2_realized_absvar_150win_150']+[
            'trade_log_return_realized_absvar_150win_150']
cols+=['book_log_return2_realized_volatility_150win_300']
cols+=['book_log_return1_realized_volatility_150win_450']+['trade_log_return_realized_volatility_150win_450']
cols+=['book_price_spread_sum_150win_150']
cols+=['trade_size_tau_150win_150']
cols+=['book_depth_sum_150win_150']
cols+=['book_dispersion_sum_150win_150']

train = train[[col for col in train.columns if col in cols]]
test = test[[col for col in test.columns if col in cols]]

In [18]:
tf.random.set_seed(42)
seed(42)

es = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=50, verbose=0,
                                      mode='min', restore_best_weights=True)

plateau = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=15, 
                                               verbose=0, mode='min')

scores_folds = {}
model_name = 'NN'

scores_folds[model_name] = []
counter = 1

predictions_nn_1 = np.zeros(test.shape[0])
y = train['target']
# Create a KFold object
kf = 5
# Iterate through each fold
features = [col for col in train.columns if col not in {"time_id", "target", "row_id"}]
feats_nostock = [col for col in train.columns if col not in {"time_id", "target", "row_id", "stock_id"}] 
skf = stratified_group_k_fold(X=train[feats_nostock], y=train['stock_id'].astype('category').cat.codes.values, 
                              groups=np.array(train['time_id'].astype('category').cat.codes.values), k=kf, seed=2021)

In [19]:
for fold, (trn_ind, val_ind) in enumerate(skf):
    print(f'Training fold {fold + 1}')
    
    x_train = train.iloc[trn_ind].copy()
    x_val = train.iloc[val_ind].copy()
    print(x_train.shape)
    print(x_val.shape)
    y_train, y_val = y.iloc[trn_ind], y.iloc[val_ind]
    tt = test.copy()
    
    #############################################################################################
    # NN
    #############################################################################################
    
    x_train.loc[:, feats_nostock] = x_train.loc[:, feats_nostock].fillna(x_train.groupby('stock_id')[feats_nostock].transform('median')).values
    for i in x_val.stock_id.unique():
        x_val.loc[x_val.stock_id==i, feats_nostock] = x_val.loc[x_val.stock_id==i, feats_nostock].fillna(x_train.loc[x_train.stock_id==i, feats_nostock].median()).values
    for i in tt.stock_id.unique():
        tt.loc[tt.stock_id==i, feats_nostock] = tt.loc[tt.stock_id==i, feats_nostock].fillna(x_train.loc[x_train.stock_id==i, feats_nostock].median()).values
    
    x_train_agg_time, agg_feats_time = get_time_agg(x_train)
    x_train = x_train.merge(x_train_agg_time, how='left', on='time_id')
    x_val_agg_time, _ = get_time_agg(x_val)
    x_val = x_val.merge(x_val_agg_time, how='left', on='time_id')
    test_agg_time, _ = get_time_agg(tt)
    tt = tt.merge(test_agg_time, how='left', on='time_id')
    del x_train_agg_time,  x_val_agg_time, test_agg_time
    gc.collect()

    traincols = feats_nostock+agg_feats_time

    for i in tt.stock_id.unique():
        tt.loc[tt.stock_id==i, traincols] = tt.loc[tt.stock_id==i, traincols].fillna(x_train.loc[x_train.stock_id==i, traincols].median()).values

    num_trans = Pipeline([('qt', QuantileTransformer(n_quantiles=2000, output_distribution='normal')),
                         ('numscaler', MinMaxScaler())])
    agg_trans = Pipeline([('aggscaler', MinMaxScaler())])
    preprocessor = ColumnTransformer(transformers=[('num', num_trans, feats_nostock),
                                                    ('agg', agg_trans, agg_feats_time)])
    pipe = Pipeline([('pp', preprocessor)])
    pipe.fit(x_train[traincols])
    
    model = base_model(len(traincols))
    model.compile(
        keras.optimizers.Adam(learning_rate=0.001),
        loss=root_mean_squared_per_error
    )
    
    model.fit([x_train['stock_id'], pipe.transform(x_train[traincols])], 
              y_train,               
              batch_size=2048,
              epochs=1000,
              validation_data=([x_val['stock_id'], pipe.transform(x_val[traincols])], y_val),
              callbacks=[es, plateau],
              validation_batch_size=len(y_val),
              shuffle=True,
              verbose = 1)

    preds = model.predict([x_val['stock_id'], pipe.transform(x_val[traincols])]).reshape(1,-1)[0]
    
    score = round(rmspe(y_true = y_val, y_pred = preds), 5)
    print('Fold {} {}: {}'.format(counter, model_name, score))
    scores_folds[model_name].append(score)
    
    predictions_nn_1 += np.abs(model.predict([tt['stock_id'], pipe.transform(tt[traincols])]).reshape(1,-1)[0]) / kf
    

    counter += 1
    
print('RMSPE {}: Folds: {}'.format(model_name, scores_folds[model_name]))

100%|██████████| 3830/3830 [01:59<00:00, 32.07it/s]


Training fold 1
(343146, 18)
(85786, 18)
Epoch 1/1000
Epoch 2/1000
Epoch 3/1000
Epoch 4/1000
Epoch 5/1000
Epoch 6/1000
Epoch 7/1000
Epoch 8/1000
Epoch 9/1000
Epoch 10/1000
Epoch 11/1000
Epoch 12/1000
Epoch 13/1000
Epoch 14/1000
Epoch 15/1000
Epoch 16/1000
Epoch 17/1000
Epoch 18/1000
Epoch 19/1000
Epoch 20/1000
Epoch 21/1000
Epoch 22/1000
Epoch 23/1000
Epoch 24/1000
Epoch 25/1000
Epoch 26/1000
Epoch 27/1000
Epoch 28/1000
Epoch 29/1000
Epoch 30/1000
Epoch 31/1000
Epoch 32/1000
Epoch 33/1000
Epoch 34/1000
Epoch 35/1000
Epoch 36/1000
Epoch 37/1000
Epoch 38/1000
Epoch 39/1000
Epoch 40/1000
Epoch 41/1000
Epoch 42/1000
Epoch 43/1000
Epoch 44/1000
Epoch 45/1000
Epoch 46/1000
Epoch 47/1000
Epoch 48/1000
Epoch 49/1000
Epoch 50/1000
Epoch 51/1000
Epoch 52/1000
Epoch 53/1000
Epoch 54/1000
Epoch 55/1000
Epoch 56/1000
Epoch 57/1000
Epoch 58/1000
Epoch 59/1000
Epoch 60/1000
Epoch 61/1000
Epoch 62/1000
Epoch 63/1000
Epoch 64/1000
Epoch 65/1000
Epoch 66/1000
Epoch 67/1000
Epoch 68/1000
Epoch 69/1000
Ep

In [20]:
gc.collect()

6191

# Tabnet

In [21]:
!pip install ../input/optiver-pytorch-tabnet/torch-1.12.1-cp310-cp310-manylinux1_x86_64.whl
!pip install ../input/optiver-pytorch-tabnet/pytorch_tabnet-4.0-py3-none-any.whl

Processing /kaggle/input/optiver-pytorch-tabnet/torch-1.12.1-cp310-cp310-manylinux1_x86_64.whl
Installing collected packages: torch
  Attempting uninstall: torch
    Found existing installation: torch 2.0.0+cpu
    Uninstalling torch-2.0.0+cpu:
      Successfully uninstalled torch-2.0.0+cpu
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
torchvision 0.15.1+cpu requires torch==2.0.0, but you have torch 1.12.1 which is incompatible.
torchtext 0.15.1+cpu requires torch==2.0.0, but you have torch 1.12.1 which is incompatible.
torchdata 0.6.0 requires torch==2.0.0, but you have torch 1.12.1 which is incompatible.
torchaudio 2.0.1+cpu requires torch==2.0.0, but you have torch 1.12.1 which is incompatible.[0m[31m
[0mSuccessfully installed torch-1.12.1
[0mProcessing /kaggle/input/optiver-pytorch-tabnet/pytorch_tabnet-4.0-py3-none-any.whl
Installin

In [22]:
import numpy as np
import pandas as pd
import torch
from joblib import Parallel, delayed
from pytorch_tabnet.metrics import Metric
from pytorch_tabnet.tab_model import TabNetRegressor
from sklearn.cluster import KMeans
from sklearn.model_selection import KFold
from sklearn.preprocessing import LabelEncoder, StandardScaler
from torch.optim import Adam
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts

import warnings
warnings.filterwarnings("ignore")

def calc_wap1(df):
    # 一价
    wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']) / (
                df['bid_size1'] + df['ask_size1'])
    return wap


def calc_wap2(df):
    # 二价
    wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']) / (
                df['bid_size2'] + df['ask_size2'])
    return wap


def log_return(series):
    # 对数回报
    return np.log(series).diff()


def realized_volatility(series):
    return np.sqrt(np.sum(series ** 2))


def count_unique(series):
    # 计数
    return len(np.unique(series))


def book_preprocessor(file_path):
    # 订单处理
    df = pd.read_parquet(file_path)

    df['wap1'] = calc_wap1(df)
    df['wap2'] = calc_wap2(df)

    df['log_return1'] = df.groupby(['time_id'],group_keys=False)['wap1'].apply(log_return)
    df['log_return2'] = df.groupby(['time_id'],group_keys=False)['wap2'].apply(log_return)

    df['wap_balance'] = abs(df['wap1'] - df['wap2'])

    df['price_spread'] = (df['ask_price1'] - df['bid_price1']) / ((df['ask_price1'] + df['bid_price1']) / 2)
    df['price_spread2'] = (df['ask_price2'] - df['bid_price2']) / ((df['ask_price2'] + df['bid_price2']) / 2)
    df['bid_spread'] = df['bid_price1'] - df['bid_price2']
    df['ask_spread'] = df['ask_price1'] - df['ask_price2']
    df["bid_ask_spread"] = abs(df['bid_spread'] - df['ask_spread'])
    df['total_volume'] = (df['ask_size1'] + df['ask_size2']) + (df['bid_size1'] + df['bid_size2'])
    df['volume_imbalance'] = abs((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2']))

    create_feature_dict = {
        'wap1': [np.sum, np.mean, np.std],
        'wap2': [np.sum, np.mean, np.std],
        'log_return1': [np.sum, realized_volatility, np.mean, np.std],
        'log_return2': [np.sum, realized_volatility, np.mean, np.std],
        'wap_balance': [np.sum, np.mean, np.std],
        'price_spread': [np.sum, np.mean, np.std],
        'price_spread2': [np.sum, np.mean, np.std],
        'bid_spread': [np.sum, np.mean, np.std],
        'ask_spread': [np.sum, np.mean, np.std],
        'total_volume': [np.sum, np.mean, np.std],
        'volume_imbalance': [np.sum, np.mean, np.std],
        "bid_ask_spread": [np.sum, np.mean, np.std],
    }

    def get_stats_window(seconds_in_bucket, add_suffix=False):
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(
            create_feature_dict).reset_index()
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature

    df_feature = get_stats_window(seconds_in_bucket=0, add_suffix=False)
    df_feature_400 = get_stats_window(seconds_in_bucket=400, add_suffix=True)
    df_feature_300 = get_stats_window(seconds_in_bucket=300, add_suffix=True)
    df_feature_200 = get_stats_window(seconds_in_bucket=200, add_suffix=True)

    df_feature = df_feature.merge(df_feature_400, how='left', left_on='time_id_', right_on='time_id__400')
    df_feature = df_feature.merge(df_feature_300, how='left', left_on='time_id_', right_on='time_id__300')
    df_feature = df_feature.merge(df_feature_200, how='left', left_on='time_id_', right_on='time_id__200')

    df_feature.drop(['time_id__400', 'time_id__300', 'time_id__200'], axis=1, inplace=True)

    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['time_id_'].apply(lambda x: f'{stock_id}-{x}')
    df_feature.drop(['time_id_'], axis=1, inplace=True)

    return df_feature


def trade_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    df['log_return'] = df.groupby('time_id',group_keys=False)['price'].apply(log_return)

    create_feature_dict = {
        'log_return': [realized_volatility],
        'seconds_in_bucket': [count_unique],
        'size': [np.sum, realized_volatility, np.mean, np.std, np.max, np.min],
        'order_count': [np.mean, np.sum, np.max],
    }

    def get_stats_window(seconds_in_bucket, add_suffix=False):
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(
            create_feature_dict).reset_index()
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature

    df_feature = get_stats_window(seconds_in_bucket=0, add_suffix=False)
    df_feature_400 = get_stats_window(seconds_in_bucket=400, add_suffix=True)
    df_feature_300 = get_stats_window(seconds_in_bucket=300, add_suffix=True)
    df_feature_200 = get_stats_window(seconds_in_bucket=200, add_suffix=True)

    def tendency(price, vol):
        df_diff = np.diff(price)
        val = (df_diff / price[1:]) * 100
        power = np.sum(val * vol[1:])
        return (power)

    lis = []
    for n_time_id in df['time_id'].unique():
        df_id = df[df['time_id'] == n_time_id]
        tendencyV = tendency(df_id['price'].values, df_id['size'].values)
        f_max = np.sum(df_id['price'].values > np.mean(df_id['price'].values))
        f_min = np.sum(df_id['price'].values < np.mean(df_id['price'].values))
        df_max = np.sum(np.diff(df_id['price'].values) > 0)
        df_min = np.sum(np.diff(df_id['price'].values) < 0)
        abs_diff = np.median(np.abs(df_id['price'].values - np.mean(df_id['price'].values)))
        energy = np.mean(df_id['price'].values ** 2)
        iqr_p = np.percentile(df_id['price'].values, 75) - np.percentile(df_id['price'].values, 25)
        abs_diff_v = np.median(np.abs(df_id['size'].values - np.mean(df_id['size'].values)))
        energy_v = np.sum(df_id['size'].values ** 2)
        iqr_p_v = np.percentile(df_id['size'].values, 75) - np.percentile(df_id['size'].values, 25)

        lis.append({'time_id': n_time_id, 'tendency': tendencyV, 'f_max': f_max, 'f_min': f_min, 'df_max': df_max,
                    'df_min': df_min,
                    'abs_diff': abs_diff, 'energy': energy, 'iqr_p': iqr_p, 'abs_diff_v': abs_diff_v,
                    'energy_v': energy_v, 'iqr_p_v': iqr_p_v})

    df_lr = pd.DataFrame(lis)

    df_feature = df_feature.merge(df_lr, how='left', left_on='time_id_', right_on='time_id')

    # Merge all
    df_feature = df_feature.merge(df_feature_400, how='left', left_on='time_id_', right_on='time_id__400')
    df_feature = df_feature.merge(df_feature_300, how='left', left_on='time_id_', right_on='time_id__300')
    df_feature = df_feature.merge(df_feature_200, how='left', left_on='time_id_', right_on='time_id__200')

    # Drop unnecesary time_ids
    df_feature.drop(['time_id__400', 'time_id__300', 'time_id__200', 'time_id'], axis=1, inplace=True)
    df_feature = df_feature.add_prefix('trade_')
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['trade_time_id_'].apply(lambda x: f'{stock_id}-{x}')
    df_feature.drop(['trade_time_id_'], axis=1, inplace=True)

    def order_sum(df, sec: str):
        new_col = 'size_tau' + sec
        bucket_col = 'trade_seconds_in_bucket_count_unique' + sec
        df[new_col] = np.sqrt(1 / df[bucket_col])

        new_col2 = 'size_tau2' + sec
        order_col = 'trade_order_count_sum' + sec
        df[new_col2] = np.sqrt(1 / df[order_col])

        if sec == '400_':
            df['size_tau2_d'] = df['size_tau2_400'] - df['size_tau2']

    for sec in ['', '_200', '_300', '_400']:
        order_sum(df_feature, sec)

    df_feature['size_tau2_d'] = df_feature['size_tau2_400'] - df_feature['size_tau2']

    return df_feature


def get_time_stock(df):
    vol_cols = ['log_return1_realized_volatility', 'log_return2_realized_volatility',
                'log_return1_realized_volatility_400', 'log_return2_realized_volatility_400',
                'log_return1_realized_volatility_300', 'log_return2_realized_volatility_300',
                'log_return1_realized_volatility_200', 'log_return2_realized_volatility_200',
                'trade_log_return_realized_volatility', 'trade_log_return_realized_volatility_400',
                'trade_log_return_realized_volatility_300', 'trade_log_return_realized_volatility_200']

    df_stock_id = df.groupby(['stock_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()
    df_stock_id.columns = ['_'.join(col) for col in df_stock_id.columns]
    df_stock_id = df_stock_id.add_suffix('_' + 'stock')

    df_time_id = df.groupby(['time_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()

    df_time_id.columns = ['_'.join(col) for col in df_time_id.columns]
    df_time_id = df_time_id.add_suffix('_' + 'time')

    df = df.merge(df_stock_id, how='left', left_on=['stock_id'], right_on=['stock_id__stock'])
    df = df.merge(df_time_id, how='left', left_on=['time_id'], right_on=['time_id__time'])
    df.drop(['stock_id__stock', 'time_id__time'], axis=1, inplace=True)

    return df


def create_agg_features(train, test):
    train_p = pd.read_csv('../input/optiver-realized-volatility-prediction/train.csv')
    train_p = train_p.pivot(index='time_id', columns='stock_id', values='target')
    corr = train_p.corr()
    ids = corr.index
    kmeans = KMeans(n_clusters=7, random_state=0).fit(corr.values)
    l = []
    for n in range(7):
        l.append([(x - 1) for x in ((ids + 1) * (kmeans.labels_ == n)) if x > 0])

    mat = []
    mat_test = []
    n = 0
    for ind in l:
        new_df = train.loc[train['stock_id'].isin(ind)]
        new_df = new_df.groupby(['time_id']).agg(np.nanmean)
        new_df.loc[:, 'stock_id'] = str(n) + 'c1'
        mat.append(new_df)
        new_df = test.loc[test['stock_id'].isin(ind)]
        new_df = new_df.groupby(['time_id']).agg(np.nanmean)
        new_df.loc[:, 'stock_id'] = str(n) + 'c1'
        mat_test.append(new_df)
        n += 1

    mat1 = pd.concat(mat).reset_index()
    mat1.drop(columns=['target'], inplace=True)
    mat2 = pd.concat(mat_test).reset_index()

    mat2 = pd.concat([mat2, mat1.loc[mat1.time_id == 5]])

    mat1 = mat1.pivot(index='time_id', columns='stock_id')
    mat1.columns = ["_".join(x) for x in mat1.columns.ravel()]
    mat1.reset_index(inplace=True)

    mat2 = mat2.pivot(index='time_id', columns='stock_id')
    mat2.columns = ["_".join(x) for x in mat2.columns.ravel()]
    mat2.reset_index(inplace=True)

    prefix = ['log_return1_realized_volatility', 'total_volume_mean', 'trade_size_mean', 'trade_order_count_mean',
              'price_spread_mean', 'bid_spread_mean', 'ask_spread_mean',
              'volume_imbalance_mean', 'bid_ask_spread_mean', 'size_tau2']
    selected_cols = mat1.filter(regex='|'.join(f'^{x}.(0|1|3|4|6)c1' for x in prefix)).columns.tolist()
    selected_cols.append('time_id')

    train_m = pd.merge(train, mat1[selected_cols], how='left', on='time_id')
    test_m = pd.merge(test, mat2[selected_cols], how='left', on='time_id')

    features = [col for col in train_m.columns.tolist() if col not in ['time_id', 'target', 'row_id']]
    train_m[features] = train_m[features].fillna(train_m[features].mean())
    test_m[features] = test_m[features].fillna(train_m[features].mean())

    return train_m, test_m


def preprocessor(list_stock_ids, is_train=True):
    def for_joblib(stock_id):
        if is_train:
            file_path_book = data_dir + "book_train.parquet/stock_id=" + str(stock_id)
            file_path_trade = data_dir + "trade_train.parquet/stock_id=" + str(stock_id)
        else:
            file_path_book = data_dir + "book_test.parquet/stock_id=" + str(stock_id)
            file_path_trade = data_dir + "trade_test.parquet/stock_id=" + str(stock_id)

        df_tmp = pd.merge(book_preprocessor(file_path_book), trade_preprocessor(file_path_trade), on='row_id',
                          how='left')

        return df_tmp

    df = Parallel(n_jobs=-1, verbose=1)(delayed(for_joblib)(stock_id) for stock_id in list_stock_ids)
    df = pd.concat(df, ignore_index=True)
    return df


train = pd.read_csv('../input/optiver-realized-volatility-prediction/train.csv')
test = pd.read_csv('../input/optiver-realized-volatility-prediction/test.csv')

train['row_id'] = train['stock_id'].astype(str) + '-' + train['time_id'].astype(str)
test['row_id'] = test['stock_id'].astype(str) + '-' + test['time_id'].astype(str)

# data directory
data_dir = '../input/optiver-realized-volatility-prediction/'

train_stock_ids = train['stock_id'].unique()
train_ = preprocessor(train_stock_ids, is_train=True)
train = train.merge(train_, on=['row_id'], how='left')
train.to_csv('train.csv', index=False)

test_stock_ids = test['stock_id'].unique()
test_ = preprocessor(test_stock_ids, is_train=False)
test = test.merge(test_, on=['row_id'], how='left')

train = get_time_stock(train)
test = get_time_stock(test)

train.replace([np.inf, -np.inf], np.nan, inplace=True)
test.replace([np.inf, -np.inf], np.nan, inplace=True)

train.to_csv('train_tab.csv', index=False)
# train = pd.read_csv(f'../input/optiver-tabnet-kmeans-model-csv/train.csv')


train, test = create_agg_features(train, test)

X = train.drop(['row_id', 'target', 'time_id'], axis=1)
y = train['target']
X_test = test.copy()
X_test.drop(['time_id', 'row_id'], axis=1, inplace=True)


def rmspe(y_true, y_pred):
    return np.sqrt(np.mean(np.square((y_true - y_pred) / y_true)))


class RmspeClz(Metric):
    def __init__(self):
        self._name = "rmspe"
        self._maximize = False

    def __call__(self, y_true, y_score):
        return np.sqrt(np.mean(np.square((y_true - y_score) / y_true)))


def RmspeLoss(y_pred, y_true):
    return torch.sqrt(torch.mean(((y_true - y_pred) / y_true) ** 2)).clone()


nunique = X.nunique()
types = X.dtypes

categorical_columns = []
categorical_dims = {}

for col in X.columns:
    if col == 'stock_id':
        l_enc = LabelEncoder()
        X[col] = l_enc.fit_transform(X[col].values)
        X_test[col] = l_enc.transform(X_test[col].values)
        categorical_columns.append(col)
        categorical_dims[col] = len(l_enc.classes_)
    else:
        scaler = StandardScaler()
        X[col] = scaler.fit_transform(X[col].values.reshape(-1, 1))
        X_test[col] = scaler.transform(X_test[col].values.reshape(-1, 1))

cat_idxs = [i for i, f in enumerate(X.columns.tolist()) if f in categorical_columns]
cat_dims = [categorical_dims[f] for i, f in enumerate(X.columns.tolist()) if f in categorical_columns]

tabnet_params = dict(
    cat_idxs=cat_idxs,
    cat_dims=cat_dims,
    cat_emb_dim=1,
    n_d=16,
    n_a=16,
    n_steps=2,
    gamma=2,
    n_independent=2,
    n_shared=2,
    lambda_sparse=0,
    optimizer_fn=Adam,
    optimizer_params=dict(lr=(2e-2)),
    mask_type="entmax",
    scheduler_params=dict(T_0=200, T_mult=1, eta_min=1e-4, last_epoch=-1, verbose=False),
    scheduler_fn=CosineAnnealingWarmRestarts,
    seed=42,
    verbose=10
)

from sklearn.model_selection import train_test_split
import gc
gc.collect()
# 划分训练集和验证集
X_train, X_val, y_train, y_val = train_test_split(X.values, y.values.reshape(-1, 1), test_size=0.3, random_state=2023)

clf = TabNetRegressor(**tabnet_params)
clf.fit(
  X_train, y_train,
 eval_set=[(X_val, y_val)],
  max_epochs=200,
  patience=50,
  batch_size=1024 * 20,
  virtual_batch_size=128 * 20,
  num_workers=4,
  drop_last=False,
  eval_metric=[RmspeClz],
  loss_fn=RmspeLoss
)

saving_path_name = "./tabnet"
saved_filepath = clf.save_model(saving_path_name)

#     clf = TabNetRegressor()
#     clf.load_model(f'../input/optiver-tabnet-kmeans-model-csv/fold{fold}')

test_predictions = clf.predict(X_test.values).flatten()
test_predictions = np.abs(test_predictions)

# kfold = KFold(n_splits=5, random_state=42, shuffle=True)
# oof_predictions = np.zeros((X.shape[0], 1))
# test_predictions = np.zeros(X_test.shape[0])

# for fold, (trn_ind, val_ind) in enumerate(kfold.split(X)):
#     print(f'Training fold {fold + 1}')
#     X_train, X_val = X.iloc[trn_ind].values, X.iloc[val_ind].values
#     y_train, y_val = y.iloc[trn_ind].values.reshape(-1, 1), y.iloc[val_ind].values.reshape(-1, 1)

#     clf = TabNetRegressor(**tabnet_params)
#     clf.fit(
#       X_train, y_train,
#      eval_set=[(X_val, y_val)],
#       max_epochs=200,
#       patience=50,
#       batch_size=1024 * 20,
#       virtual_batch_size=128 * 20,
#       num_workers=4,
#       drop_last=False,
#       eval_metric=[RmspeClz],
#       loss_fn=RmspeLoss
#     )

#     saving_path_name = f"./fold{fold}"
#     saved_filepath = clf.save_model(saving_path_name)

# #     clf = TabNetRegressor()
# #     clf.load_model(f'../input/optiver-tabnet-kmeans-model-csv/fold{fold}')

#     oof_predictions[val_ind] = clf.predict(X_val)
#     test_predictions += clf.predict(X_test.values).flatten() / 5

# print(rmspe(y, oof_predictions.flatten()))

test_predictions_tabnet = test_predictions

[Parallel(n_jobs=-1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=-1)]: Done 112 out of 112 | elapsed: 57.2min finished
[Parallel(n_jobs=-1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 out of   1 | elapsed:    0.2s finished


epoch 0  | loss: 253.02449| val_0_rmspe: 124.59943|  0:00:58s
epoch 10 | loss: 0.65357 | val_0_rmspe: 0.54652 |  0:10:51s
epoch 20 | loss: 0.37874 | val_0_rmspe: 0.54944 |  0:20:58s
epoch 30 | loss: 0.32257 | val_0_rmspe: 0.28653 |  0:31:15s
epoch 40 | loss: 0.26039 | val_0_rmspe: 0.24408 |  0:42:01s
epoch 50 | loss: 0.23874 | val_0_rmspe: 0.23217 |  0:52:35s
epoch 60 | loss: 0.22602 | val_0_rmspe: 0.22507 |  1:03:21s
epoch 70 | loss: 0.22255 | val_0_rmspe: 0.24207 |  1:14:22s
epoch 80 | loss: 0.22114 | val_0_rmspe: 0.22504 |  1:25:03s
epoch 90 | loss: 0.21669 | val_0_rmspe: 0.21559 |  1:35:22s
epoch 100| loss: 0.21508 | val_0_rmspe: 0.21646 |  1:45:40s
epoch 110| loss: 0.21177 | val_0_rmspe: 0.21233 |  1:56:00s
epoch 120| loss: 0.20999 | val_0_rmspe: 0.21123 |  2:06:15s
epoch 130| loss: 0.20886 | val_0_rmspe: 0.21032 |  2:16:08s
epoch 140| loss: 0.20837 | val_0_rmspe: 0.20996 |  2:26:09s
epoch 150| loss: 0.20749 | val_0_rmspe: 0.2096  |  2:36:14s
epoch 160| loss: 0.20693 | val_0_rmspe

In [23]:
import gc
gc.collect()

0

In [24]:
test['target'] = 0.25*test_predictions_l+0.25*test_predictions_c + 0.25*predictions_nn_1+0.25*test_predictions_tabnet
test[['row_id', 'target']].to_csv('submission.csv',index = False)