In [None]:
import os
import glob
from joblib import Parallel, delayed
import pandas as pd
import numpy as np
import scipy as sc
from sklearn.model_selection import KFold
import lightgbm as lgb
import warnings
import random
from tqdm import tqdm
warnings.filterwarnings('ignore')
pd.set_option('max_columns', 300)

In [None]:
def seed_everything(seed: int):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)

seed_everything(707)

In [None]:
SMALL_F = 0.00000001
TARGET_SCALE = 1000
# data directory
data_dir = '../input/optiver-realized-volatility-prediction/'

def calc_wap1(df):
    wap = (df['bid_price1'].values * df['ask_size1'].values + df['ask_price1'].values * df['bid_size1'].values) / (df['bid_size1'].values + df['ask_size1'].values)
    return wap

def calc_wap2(df):
    wap = (df['bid_price2'].values * df['ask_size2'].values + df['ask_price2'].values * df['bid_size2'].values) / (df['bid_size2'].values + df['ask_size2'].values)
    return wap

def spread_ratio(series):
    if len(series) > 3:
        return ((series.nlargest(3).mean()) / (series.nsmallest(3).mean())) - 1
    else:
        return 0

def largest_mean(series):
    if len(series) > 5:
        return series.nlargest(5).mean()
    else:
        return series.max()
    
def smallest_mean(series):
    if len(series) > 5:
        return series.nsmallest(5).mean()
    else:
        return series.min()

def log_return(series):
    return np.log(series).diff()

def log_return_half(series):
    return np.log(series).diff(int(len(series)/2))

def realized_volatility(series):
    return np.sum(series**2)

def count_unique(series):
    return len(np.unique(series))

# Function to calculate the root mean squared percentage error
def rmspe(y_true, y_pred):
    return np.sqrt(np.mean(np.square((y_true - y_pred) / y_true)))

# Function to early stop with root mean squared percentage error
def feval_rmspe(y_pred, lgb_train):
    y_true = lgb_train.get_label()
    y_pred[y_pred <= SMALL_F] = SMALL_F
    return 'RMSPE', rmspe(np.sqrt(y_true), np.sqrt(y_pred)), False

In [None]:
# Function to read our base train and test set
def read_train_test():
    train = pd.read_csv('../input/optiver-realized-volatility-prediction/train.csv')
    train['target'] = train['target'].values ** 2
    train['target'] = train['target'].values * TARGET_SCALE
    #https://www.kaggle.com/c/optiver-realized-volatility-prediction/discussion/271920
    #外れ値除去 + 上場廃止 or 売買停止 考慮 
    train = train.loc[~((train['stock_id'] == 31) & (train['time_id'] == 25504))]
    train = train.loc[~((train['stock_id'] == 31) & (train['time_id'] == 27174))]
    train = train.loc[~((train['stock_id'] == 81) & (train['time_id'] == 28319))]
    train = train.loc[~((train['stock_id'] == 31) & (train['time_id'] == 1544))]
    train = train.loc[~((train['stock_id'] == 27) & (train['time_id'] == 20551))]
    # 株式廃止や取引停止に備える
    drop_time = random.sample(list(train['time_id'].unique()),3)
    for t in drop_time:
        drop_stock = random.sample(list(train.loc[train['time_id'] == t]['stock_id'].unique()),3)
        for s in drop_stock:
            train = train.loc[~((train['stock_id'] == s) & (train['time_id'] == t))]
    test = pd.read_csv('../input/optiver-realized-volatility-prediction/test.csv')

    print(f'Our training set has {train.shape[0]} rows')
    return train, test

In [None]:
def book_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    for c in ['bid_price1','ask_price1','bid_price2','ask_price2']:
        df[c].fillna(1,inplace=True)
    for c in ['bid_size1','ask_size1','bid_size2','ask_size2']:
        df[c].fillna(0,inplace=True)
        
    df = df.sort_values(['time_id','seconds_in_bucket'])

    df['wap1'] = calc_wap1(df)
    df['wap2'] = calc_wap2(df)

    for n in [1,2]:
        df[f'log_return{n}'] = df.groupby(['time_id'])[f'wap{n}'].apply(log_return).fillna(0)
        df[f'log_return{n}_half'] = df.groupby(['time_id'])[f'wap{n}'].apply(log_return_half).fillna(0)
        df[f'diff_log_return{n}'] = df[f'log_return{n}'].diff().fillna(0)
        df[f"log_return{n}_minus"] = 0
        df.loc[df[f"log_return{n}"]<0,f"log_return{n}_minus"] = df[f"log_return{n}"]
        df[f"log_return{n}_plus"] = 0
        df.loc[df[f"log_return{n}"]>0,f"log_return{n}_plus"] = df[f"log_return{n}"]
    
    df['ask_volume1'] = df['ask_price1'].values * df['ask_size1'].values
    df['ask_volume2'] = df['ask_price2'].values * df['ask_size2'].values
    df['bid_volume1'] = df['bid_price1'].values * df['bid_size1'].values
    df['bid_volume2'] = df['bid_price2'].values * df['bid_size2'].values
    df['ask_log_return1'] = df.groupby(['time_id'])[f'ask_volume1'].apply(log_return).fillna(0)
    df['bid_log_return1'] = df.groupby(['time_id'])[f'bid_volume1'].apply(log_return).fillna(0)
    df['diff_ask_log_return1'] = df['ask_log_return1'].diff().fillna(0)
    df['diff_bid_log_return1'] = df['bid_log_return1'].diff().fillna(0)
    df['ask_log_return2'] = df.groupby(['time_id'])[f'ask_volume2'].apply(log_return).fillna(0)
    df['bid_log_return2'] = df.groupby(['time_id'])[f'bid_volume1'].apply(log_return).fillna(0)
    df['diff_ask_log_return2'] = df['ask_log_return2'].diff().fillna(0)
    df['diff_bid_log_return2'] = df['bid_log_return2'].diff().fillna(0)
    
    df['chg_count'] = (df[f"log_return1"]!=0).astype(np.int)
    df['ask_price1_chg_count'] =  (df['ask_price1'].diff()!=0).astype(np.int)
    df['ask_price2_chg_count'] =  (df['ask_price2'].diff()!=0).astype(np.int)
    df['bid_price1_chg_count'] =  (df['bid_price1'].diff()!=0).astype(np.int)
    df['bid_price2_chg_count'] =  (df['bid_price2'].diff()!=0).astype(np.int)
    df['ask_size1_chg_count'] =  (df['ask_size1'].diff()!=0).astype(np.int)
    df['ask_size2_chg_count'] =  (df['ask_size2'].diff()!=0).astype(np.int)
    df['bid_size1_chg_count'] =  (df['bid_size1'].diff()!=0).astype(np.int)
    df['bid_size2_chg_count'] =  (df['bid_size1'].diff()!=0).astype(np.int)
    
    df['tick_size'] = np.abs(df['ask_price1'].diff().fillna(0))
    df.loc[df['tick_size']==0,'tick_size'] = 999
    
    df['pct_change_bid_size1'] = np.log1p(df['bid_size1'].pct_change())
    df['pct_change_ask_size1'] = np.log1p(df['ask_size1'].pct_change())
    
    # Calculate wap balance
    df['log_return_balance'] = np.log1p(df['log_return1'].values) / (df['log_return2'].values + 1)
    # Calculate spread
    df['price1_spread'] = (df['ask_price1'].values) / (df['bid_price1'].values + 1) - 1
    df['price2_spread'] = (df['ask_price2'].values) / (df['bid_price2'].values + 1) - 1
    
    df['power1_spread'] = (np.log1p(df['ask_price1'].values * df['ask_size1'].values) / np.log1p(df['bid_price1'].values * df['bid_size1'].values)) - 1
    df['power2_spread'] = (np.log1p(df['ask_price2'].values * df['ask_size2'].values) / np.log1p(df['bid_price2'].values * df['bid_size2'].values)) - 1
    
    df['bid_spread'] = (df['bid_price1'].values) / (df['bid_price2'].values + SMALL_F)
    df['ask_spread'] = (df['ask_price1'].values) / (df['ask_price2'].values + SMALL_F)
    df['bid_spread_log_return'] = df.groupby(['time_id'])['bid_spread'].apply(log_return).fillna(0)
    df['ask_spread_log_return'] = df.groupby(['time_id'])['ask_spread'].apply(log_return).fillna(0)
    df['bid_spread'] -= 1
    df['ask_spread'] -= 1
    
    df['price1_spread_log_return'] = df.groupby(['time_id'])['price1_spread'].apply(log_return).fillna(0)
    df['price2_spread_log_return'] = df.groupby(['time_id'])['price2_spread'].apply(log_return).fillna(0)

    df['total_volume'] = (df['ask_size1'].values + df['ask_size2'].values) + (df['bid_size1'].values + df['bid_size2'].values)
    df['volume_imbalance'] = ((df['ask_size1'].values + df['ask_size2'].values) / (df['total_volume'].values + SMALL_F)) - 1
    
    df['bid_ask_size1_mean'] = (df['bid_size1'].values + df['ask_size1'].values) / 2
    
    df_min_seconds = df.iloc[df.groupby(['time_id'])['log_return1'].idxmin()][['time_id','seconds_in_bucket']]
    df_min_seconds.rename(columns={'seconds_in_bucket':'return_min_seconds_in_bucket'},inplace=True)
    df_max_seconds = df.iloc[df.groupby(['time_id'])['log_return1'].idxmax()][['time_id','seconds_in_bucket']]
    df_max_seconds.rename(columns={'seconds_in_bucket':'return_max_seconds_in_bucket'},inplace=True)
    
    if len(df) > 2:
        df.drop(index=df.groupby(['time_id'])['seconds_in_bucket'].idxmin(),inplace=True)
    
    df_feature = df.groupby(by = ['time_id']).first()[[]].reset_index()
    
    # Dict for aggregations
    create_feature_dict = {
        'wap1':[spread_ratio],
        'wap2':[spread_ratio],
        'log_return1': [realized_volatility, np.std],
        'log_return2': [realized_volatility, np.std],
        'log_return1_half':[realized_volatility],
        'log_return2_half':[realized_volatility],
        'diff_log_return1':[np.sum],
        'diff_log_return2':[np.sum],
        'ask_log_return1': [realized_volatility],
        'bid_log_return1': [realized_volatility],
        'ask_log_return2': [realized_volatility],
        'bid_log_return2': [realized_volatility],
        'diff_ask_log_return1': [realized_volatility],
        'diff_bid_log_return1': [realized_volatility],
        'diff_ask_log_return2': [realized_volatility],
        'diff_bid_log_return2': [realized_volatility],
        'log_return1_minus': [np.sum, np.std ,smallest_mean],
        'log_return1_plus': [np.sum, np.std ,largest_mean],
        'log_return2_minus': [np.sum, np.std ,smallest_mean],
        'log_return2_plus': [np.sum, np.std,largest_mean],
        'chg_count':[np.sum],
        'seconds_in_bucket':[count_unique],
        'ask_price1_chg_count':[np.sum],
        'ask_price2_chg_count':[np.sum],
        'bid_price1_chg_count':[np.sum],
        'bid_price2_chg_count':[np.sum],
        'ask_size1_chg_count':[np.sum],
        'ask_size2_chg_count':[np.sum],
        'bid_size1_chg_count':[np.sum],
        'bid_size2_chg_count':[np.sum],
        'log_return_balance': [np.mean, largest_mean,spread_ratio],
        'price1_spread': [np.mean, largest_mean, spread_ratio],
        'price2_spread': [np.mean, largest_mean, spread_ratio],
        'price1_spread_log_return':[np.std],
        'price2_spread_log_return':[np.std],
        'power1_spread': [np.mean, largest_mean, np.std, spread_ratio],
        'power2_spread': [np.mean, largest_mean, np.std, spread_ratio],
        'bid_spread': [np.mean, largest_mean, spread_ratio],
        'bid_spread_log_return':[np.std],
        'ask_spread': [np.mean, largest_mean, spread_ratio],
        'ask_spread_log_return':[np.std],
        'total_volume': [np.sum, spread_ratio],
        'volume_imbalance': [np.mean, largest_mean, np.std,spread_ratio],
        'pct_change_bid_size1': [realized_volatility, np.std],
        'pct_change_ask_size1': [realized_volatility, np.std],
        'tick_size':[np.min],
        'bid_ask_size1_mean':[np.sum],
    }
    create_feature_time_dict= {
        'log_return1': [realized_volatility, np.std],
        'log_return2': [realized_volatility, np.std],
        'chg_count':[np.sum],
        'ask_price1_chg_count':[np.sum],
        'ask_price2_chg_count':[np.sum],
        'bid_price1_chg_count':[np.sum],
        'bid_price2_chg_count':[np.sum],
        'ask_size1_chg_count':[np.sum],
        'ask_size2_chg_count':[np.sum],
        'bid_size1_chg_count':[np.sum],
        'bid_size2_chg_count':[np.sum],
        'volume_imbalance':[np.mean],
        'total_volume':[np.sum]
    }
    create_features_time_dict_2 = {
        'price1_spread': [np.mean],
        'price2_spread': [np.mean],
        'bid_spread': [np.mean],
        'ask_spread': [np.mean],
    }
    # Function to get group stats for different windows (seconds in bucket)
    def get_stats(add_suffix = False):
        # Group by the window
        df_feature = df.groupby(['time_id']).agg(create_feature_dict)
        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        df_feature = df_feature.reset_index()
        return df_feature

    # Function to get group stats for different windows (seconds in bucket)
    def get_stats_window(interval_sec, feature_dict , add_suffix = False):
        # Group by the window
        df_feature = df[(df['seconds_in_bucket'] >= interval_sec[0]) & (df['seconds_in_bucket'] <= interval_sec[1])].groupby(['time_id']).agg(feature_dict)
        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(interval_sec[0]) + '_' + str(interval_sec[1]))
        df_feature = df_feature.reset_index()
        return df_feature
    
    # Get the stats for different windows
    df_feature = get_stats(add_suffix = False)
        
    for interval_sec in [[150, 600],[300,600],[450,600]]:
        tmp_df =   get_stats_window(interval_sec, create_feature_time_dict, add_suffix = True)
        df_feature = df_feature.merge(tmp_df, how = 'left', on = 'time_id')

    for interval_sec in [[0,300],[300,600]]:
        tmp_df =   get_stats_window(interval_sec,create_features_time_dict_2, add_suffix = True)
        df_feature = df_feature.merge(tmp_df, how = 'left', on = 'time_id')
            
    df_feature = df_feature.merge(df_min_seconds, how = 'left', on = 'time_id')
    df_feature = df_feature.merge(df_max_seconds, how = 'left', on = 'time_id')
        
    # Drop unnecesary time_ids
    df_feature.rename(columns={'time_id_':'time_id'}, inplace = True)
    # Create row_id so we can merge
    df_feature['stock_id'] = int(file_path.split('=')[1])
    
    for i in [1,2]:
        df_feature[f'ask_bid_log_return{i}_realized_volatility'] = (df_feature[f'ask_log_return{i}_realized_volatility'].values) / (df_feature[f'bid_log_return{i}_realized_volatility'].values + SMALL_F) - 1
        del df_feature[f'ask_log_return{i}_realized_volatility'],df_feature[f'bid_log_return{i}_realized_volatility']
        df_feature[f'diff_ask_bid_log_return{i}_realized_volatility'] = (df_feature[f'diff_ask_log_return{i}_realized_volatility'].values) / (df_feature[f'diff_bid_log_return{i}_realized_volatility'].values + SMALL_F) - 1
        del df_feature[f'diff_ask_log_return{i}_realized_volatility'],df_feature[f'diff_bid_log_return{i}_realized_volatility']
        df_feature[f'bid_ask_price{i}_chg_per'] =  (df_feature[f'ask_price{i}_chg_count_sum']) / (df_feature[f'bid_price{i}_chg_count_sum'] + SMALL_F) - 1
        df_feature[f'bid_ask_size{i}_chg_per'] =  (df_feature[f'ask_size{i}_chg_count_sum']) / (df_feature[f'bid_size{i}_chg_count_sum'] + SMALL_F) - 1
        del df_feature[f'ask_price{i}_chg_count_sum'],df_feature[f'bid_price{i}_chg_count_sum']
        del df_feature[f'ask_size{i}_chg_count_sum'],df_feature[f'bid_size{i}_chg_count_sum']

        for interval_sec in [[150, 600],[300,600],[450,600]]:
            df_feature[f'bid_ask_price{i}_chg_per_{interval_sec[0]}_{interval_sec[1]}'] = (df_feature[f'ask_price{i}_chg_count_sum_{interval_sec[0]}_{interval_sec[1]}']) / (df_feature[f'bid_price{i}_chg_count_sum_{interval_sec[0]}_{interval_sec[1]}'] + SMALL_F) - 1
            df_feature[f'bid_ask_size{i}_chg_per_{interval_sec[0]}_{interval_sec[1]}'] = (df_feature[f'ask_size{i}_chg_count_sum_{interval_sec[0]}_{interval_sec[1]}']) / (df_feature[f'bid_size{i}_chg_count_sum_{interval_sec[0]}_{interval_sec[1]}'] + SMALL_F) - 1
            del df_feature[f'ask_price{i}_chg_count_sum_{interval_sec[0]}_{interval_sec[1]}'],df_feature[f'bid_price{i}_chg_count_sum_{interval_sec[0]}_{interval_sec[1]}']
            del df_feature[f'ask_size{i}_chg_count_sum_{interval_sec[0]}_{interval_sec[1]}'],df_feature[f'bid_size{i}_chg_count_sum_{interval_sec[0]}_{interval_sec[1]}']
        
    return df_feature

In [None]:
def trade_preprocessor(file_path):
    df = pd.read_parquet(file_path)
    df = df.sort_values(['time_id','seconds_in_bucket'])
    df['price'].fillna(1,inplace=True)
    for c in ['size','order_count']:
        df[c].fillna(0,inplace=True)
        
    df['log_return'] = df.groupby('time_id')['price'].apply(log_return).fillna(0)
    df['log_return_half'] = df.groupby('time_id')['price'].apply(log_return_half).fillna(0)
    df["amount"] = df['price'].values * df['size'].values
    df['price_size_chg_balance'] = ((df['price'].diff()!=0).astype(np.int)) / ((df['size'].diff()!=0).astype(np.int) + 1) - 1
    
    if len(df) > 2:
        df.drop(index=df.groupby(['time_id'])['seconds_in_bucket'].idxmin(),inplace=True)
    
    df_feature = df.groupby(by = ['time_id']).first()[[]].reset_index()

    # Dict for aggregations
    create_feature_dict = {
        'log_return':[realized_volatility],
        'log_return_half':[realized_volatility],
        'price':[spread_ratio],
        'seconds_in_bucket':[count_unique],
        'size': [np.sum],
        'order_count': [np.sum],
        'amount':[np.sum],
        'price_size_chg_balance':[np.mean],
    }
    create_feature_time_dict= {
        'log_return':[realized_volatility],
        'seconds_in_bucket':[count_unique],
        'amount':[np.sum],
        'order_count': [np.sum],
        'price_size_chg_balance':[np.mean],
    }
    
    # Function to get group stats for different windows (seconds in bucket)
    def get_stats(add_suffix = False):
        # Group by the window
        df_feature = df.groupby(['time_id']).agg(create_feature_dict)
        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        df_feature = df_feature.reset_index()
        return df_feature
    # Function to get group stats for different windows (seconds in bucket)
    def get_stats_window(interval_sec, add_suffix = False):
        # Group by the window
        df_feature = df[(df['seconds_in_bucket'] >= interval_sec[0]) & (df['seconds_in_bucket'] <= interval_sec[1])].groupby(['time_id']).agg(create_feature_time_dict)
        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]
        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(interval_sec[0]) + '_' + str(interval_sec[1]))
        df_feature = df_feature.reset_index()
        return df_feature
    
    # Get the stats for different windows
    df_feature = get_stats(add_suffix = False)      
        
    for interval_sec in [[150, 600],[300,600],[450,600]]:
        tmp_df =   get_stats_window(interval_sec, add_suffix = True)
        df_feature = df_feature.merge(tmp_df, how = 'left', on = 'time_id')

    df_feature = df_feature.add_prefix('trade_')
    df_feature.rename(columns={'trade_time_id':'time_id'}, inplace = True)
    df_feature['stock_id'] = int(file_path.split('=')[1])
    
    
    return df_feature

In [None]:
def preprocessor(list_stock_ids, is_train=True):
    dataType = 'test'
    if is_train:
        dataType = 'train'
        
    # Parrallel for loop
    def for_joblib(stock_id):
        file_path_book = data_dir + f"book_{dataType}.parquet/stock_id={stock_id}"
        file_path_trade = data_dir + f"trade_{dataType}.parquet/stock_id={stock_id}"

        # Preprocess book and trade data and merge them
        df_tmp = pd.merge(book_preprocessor(file_path_book), trade_preprocessor(file_path_trade), on = ['stock_id','time_id'], how = 'left')
        
        # Return the merge dataframe
        return df_tmp
    
    # Use parallel api to call paralle for loop
    df = Parallel(n_jobs = -1, verbose = 1)(delayed(for_joblib)(stock_id) for stock_id in list_stock_ids)
    # Concatenate all the dataframes that return from Parallel
    df = pd.concat(df, ignore_index = True)
    return df

In [None]:
def get_time_col(col,origin_include=True):
    time_col = []
    if origin_include:
        time_col = [col]
    for intervel_sec in [[150, 600],[300,600],[450,600]]:
        time_col += [f'{col}_{intervel_sec[0]}_{intervel_sec[1]}']
    return time_col

def fillna_feature(df):
    for c in ['trade_log_return_realized_volatility','log_return1_realized_volatility','log_return2_realized_volatility','trade_seconds_in_bucket_count_unique']:
        df[c].fillna(0,inplace=True)
        for intervel_sec in get_time_col(c,False):
            df[intervel_sec].fillna(df[c],inplace=True)
    return df

def feat_scaling(df):
    for c in ['log_return1_realized_volatility','log_return2_realized_volatility','trade_log_return_realized_volatility','trade_seconds_in_bucket_count_unique','trade_size_sum','trade_order_count_sum',
              'chg_count_sum','trade_amount_sum','total_volume_sum','seconds_in_bucket_count_unique']:
        df[c] = df[c].astype('float32')
        for intervel_sec in get_time_col(c,False):
            if intervel_sec in df.columns:
                df[intervel_sec] = df[intervel_sec].astype('float32')
                df[intervel_sec] /= df[c]
    return df

def book_seconds_scaling(df):
    for c in ['chg_count_sum' ,'trade_seconds_in_bucket_count_unique']:
        df[c] = df[c].astype('float32')
        df[c] /= df['seconds_in_bucket_count_unique']
    return df
def time_fe(df):
    cols = get_time_col('log_return1_realized_volatility') + get_time_col('log_return2_realized_volatility') + ['trade_log_return_realized_volatility']
    for agg_col in ["time_id"]:
        for agg_func in ["mean", largest_mean, "std", smallest_mean]:
            agg_df = df.groupby(agg_col)[cols].agg(agg_func)
            agg_df.columns = [f"{agg_col}_{agg_func}_{col}" for col in agg_df.columns]
            df = df.merge(agg_df.reset_index(), on=agg_col, how="left")
    
    cols = get_time_col('chg_count_sum') + get_time_col('trade_seconds_in_bucket_count_unique')
    
    for agg_col in ["time_id"]:
        for agg_func in ["mean", "std"]:
            agg_df = df.groupby(agg_col)[cols].agg(agg_func)
            agg_df.columns = [f"{agg_col}_{agg_func}_{col}" for col in agg_df.columns]
            df = df.merge(agg_df.reset_index(), on=agg_col, how="left")
    
    cols = ['log_return1_minus_smallest_mean','power1_spread_mean','power2_spread_mean','bid_ask_price1_chg_per','bid_ask_price2_chg_per','bid_ask_size1_chg_per','bid_ask_size2_chg_per'] + get_time_col('trade_amount_sum') + get_time_col('trade_order_count_sum') + get_time_col('total_volume_sum') + get_time_col('volume_imbalance_mean')
    
    for agg_col in ["time_id"]:
        for agg_func in ["mean"]:
            agg_df = df.groupby(agg_col)[cols].agg(agg_func)
            agg_df.columns = [f"{agg_col}_{agg_func}_{col}" for col in agg_df.columns]
            df = df.merge(agg_df.reset_index(), on=agg_col, how="left")
        
    cols = ['chg_count_sum','trade_order_count_sum']
    for agg_col in ["time_id"]:
        for agg_func in ["sum"]:
            agg_df = df.groupby(agg_col)[cols].agg(agg_func)
            agg_df.columns = [f"{agg_col}_{agg_func}_{col}" for col in agg_df.columns]
            df = df.merge(agg_df.reset_index(), on=agg_col, how="left")
    
    cols = ['stock_id']
    for agg_col in ["time_id"]:
        for agg_func in [count_unique]:
            agg_df = df.groupby(agg_col)[cols].agg(agg_func)
            agg_df.columns = [f"{agg_col}_{agg_func}_{col}" for col in agg_df.columns]
            df = df.merge(agg_df.reset_index(), on=agg_col, how="left")    
        
    return df

In [None]:
# Read train and test
train, test = read_train_test()

In [None]:
# Get unique stock ids 
test_stock_ids = test['stock_id'].unique()
# Preprocess them using Parallel and our single stock id functions
test_ = preprocessor(test_stock_ids, False)
test = test.merge(test_, on = ['time_id','stock_id'], how = 'left')
del test_
test = fillna_feature(test)
test = feat_scaling(test)
test = book_seconds_scaling(test)
test = time_fe(test)

test['time_id_sum_chg_count_sum'] = (test['chg_count_sum']) / (test['time_id_sum_chg_count_sum'] + 1) - 1
test['time_id_sum_trade_order_count_sum'] = (test['trade_order_count_sum']) / (test['time_id_sum_trade_order_count_sum'] + 1) - 1
test['real_volume_per'] = (test['trade_size_sum']) / (test['bid_ask_size1_mean_sum'] + 1) - 1
test['log_return_half_spread'] = ((test['log_return1_half_realized_volatility']) / (test['log_return2_half_realized_volatility'] + SMALL_F)) - 1

for c in ['time_id_mean_total_volume_sum','time_id_mean_trade_amount_sum','time_id_mean_trade_order_count_sum','total_volume_sum','trade_size_sum','trade_order_count_sum','bid_ask_size1_mean_sum']:
    del test[c]

for c in ['bid_ask_price1_chg_per','bid_ask_price2_chg_per']:
    for intervel_sec in get_time_col(c,False):
        test.loc[test[intervel_sec]>10,intervel_sec] = test[c]
    
# Get unique stock ids 
train_stock_ids = train['stock_id'].unique()
# Preprocess them using Parallel and our single stock id functions
train_ = preprocessor(train_stock_ids, True)
train = train.merge(train_, on = ['time_id','stock_id'], how = 'left')
del train_
train = fillna_feature(train)
train = feat_scaling(train)
train = book_seconds_scaling(train)
train = time_fe(train)

train['time_id_sum_chg_count_sum'] = (train['chg_count_sum']) / (train['time_id_sum_chg_count_sum'] + 1) - 1
train['time_id_sum_trade_order_count_sum'] = (train['trade_order_count_sum']) / (train['time_id_sum_trade_order_count_sum'] + 1) - 1
train['real_volume_per'] = (train['trade_size_sum']) / (train['bid_ask_size1_mean_sum'] + 1) - 1
train['log_return_half_spread'] = ((train['log_return1_half_realized_volatility']) / (train['log_return2_half_realized_volatility'] + SMALL_F)) - 1

for c in ['time_id_mean_total_volume_sum','time_id_mean_trade_amount_sum','time_id_mean_trade_order_count_sum','total_volume_sum','trade_size_sum','trade_order_count_sum','bid_ask_size1_mean_sum']:
    del train[c]
    
for c in ['bid_ask_price1_chg_per','bid_ask_price2_chg_per']:
    for intervel_sec in get_time_col(c,False):
        train.loc[train[intervel_sec]>10,intervel_sec] = train[c]

In [None]:
for c in train.columns:
    l = train[c].isnull().sum()
    if l > 0:
        print(c, l)

In [None]:
def train_and_evaluate(train, test, ex_col, submission_file, model_num):
    model_name = 'lgb' + str(model_num)
    pred_name = 'pred_{}'.format(model_name)
    
    for c in train.columns:
        if 'pred' in c:
            del train[c]
    
    train[pred_name] = 0
    feat =  [col for col in train.columns if col not in ['time_id','target', pred_name]]

    params = {
        'objective': 'rmse',
        'boosting_type': 'gbdt',
        'max_depth': 6,
        'max_bin':len(train['stock_id'].unique()),
        'min_data_in_leaf':300,
        'learning_rate': 0.05,
        'subsample': 0.8,
        'subsample_freq': 4,
        'feature_fraction': 0.4,
        'lambda_l1': 0.4,
        'lambda_l2': 0.4,
        'seed':model_num,
        'feature_fraction_seed': model_num,
        'bagging_seed': model_num,
        'drop_seed': model_num,
        'data_random_seed': model_num,
        "tree_learner": 'voting',
        'verbose': -1,
        'metric': 'None',}
    
    # Split features and target
    x = train.drop(['target', 'time_id'] + ex_col , axis = 1)
    y = train['target'].values
    x_test = test.drop(['row_id', 'time_id'] + ex_col, axis = 1)
    
    # Create out of folds array
    oof_predictions = np.zeros(x.shape[0])
    # Create test array to store predictions
    test_predictions = np.zeros(x_test.shape[0])
    # Create a KFold object
    n_folds = 7
    kfold = KFold(n_splits = n_folds, random_state = model_num, shuffle = True)
    publisher_train = train['time_id']
    unique_publisher = train['time_id'].unique()
    # Iterate through each fold
    for fold, (tr_group_idx, va_group_idx) in enumerate(kfold.split(unique_publisher)):
        tr_groups, va_groups = unique_publisher[tr_group_idx], unique_publisher[va_group_idx]
        is_tr = publisher_train.isin(tr_groups)
        is_va = publisher_train.isin(va_groups)
        
        x_train, x_val = x[is_tr], x[is_va]
        y_train, y_val = y[is_tr], y[is_va]
        
        print(f'Training fold {fold + 1}, train={len(x_train)} valid={len(x_val)}')
        # Root mean squared percentage error weights
        train_weights = 1 / np.square(y_train)
        val_weights = 1 / np.square(y_val)
        train_dataset = lgb.Dataset(x_train[feat], y_train, weight = train_weights, categorical_feature = ['stock_id'])
        val_dataset = lgb.Dataset(x_val[feat], y_val, weight = val_weights, categorical_feature = ['stock_id'])
        model = lgb.train(params = params, 
                          train_set = train_dataset, 
                          valid_sets = [train_dataset, val_dataset], 
                          num_boost_round = 10000, 
                          early_stopping_rounds = 80, 
                          verbose_eval = 200,
                          feval = feval_rmspe)
        # Add predictions to the out of folds array
        oof_predictions[is_va] = model.predict(x_val[feat])
        oof_predictions[oof_predictions <= SMALL_F] = SMALL_F 
        train.loc[is_va, pred_name] += oof_predictions[is_va]
        # Predict the test set
        if len(x_test) > 0:
            test_pred = model.predict(x_test[feat]) / TARGET_SCALE
            test_pred[test_pred < SMALL_F] = SMALL_F
            test_predictions += np.sqrt(test_pred) / n_folds
    train.to_csv(model_name,index = False) 
    rmspe_score = rmspe(np.sqrt(y)/TARGET_SCALE, np.sqrt(oof_predictions)/TARGET_SCALE)
    print(f'Our out of folds RMSPE is {rmspe_score}')
    test['target'] = test_predictions
    test[['row_id', 'target']].to_csv(submission_file,index = False)
    lgb.plot_importance(model,max_num_features=50,figsize=(10,10))
    # Return test predictions
    return test_predictions, oof_predictions

In [None]:
print(len(train.columns))
for c in train.columns:
    print(c)

In [None]:
#地合いで分割(傾向によって各特徴の重みも変わるし、target変数も正規分布に近くなると)
time_quantile_dict = {}
train_time_group_dict = {}
col = 'log_return1_realized_volatility'
df_feature = train.groupby(by = ['time_id']).sum()[[col]].reset_index()
for p in [0.2,0.5,0.8]:
    df_quantile = df_feature[['time_id',col]].quantile([p]).reset_index()
    df_quantile = df_quantile[['time_id',col]]
    time_quantile_dict[p] = df_quantile[col][0]


train_time_group_dict[0] = df_feature.loc[df_feature[col] <= time_quantile_dict[0.8]]['time_id'].unique()
print('0.8 under=',len(train_time_group_dict[0]))
train_time_group_dict[1] = df_feature.loc[df_feature[col] >= time_quantile_dict[0.2]]['time_id'].unique()      
print('0.2 over=',len(train_time_group_dict[1]))
print(len(set(np.concatenate([train_time_group_dict[0],train_time_group_dict[1]]))))

test_time_group_dict = {}
df_feature = test.groupby(by = ['time_id']).sum()[[col]].reset_index()
test_time_group_dict[0] = df_feature.loc[df_feature[col] <= time_quantile_dict[0.8]]['time_id'].unique()
print('0.8 under=',len(test_time_group_dict[0]))
test_time_group_dict[1] = df_feature.loc[df_feature[col] >= time_quantile_dict[0.2]]['time_id'].unique()      
print('0.2 over=',len(test_time_group_dict[1]))
print(len(set(np.concatenate([test_time_group_dict[0],test_time_group_dict[1]]))))

In [None]:
for g in range(0, len(train_time_group_dict)):
    test_predictions, valid_predictions = train_and_evaluate(train.loc[train['time_id'].isin(train_time_group_dict[g])].reset_index(drop=True), test[test['time_id'].isin(test_time_group_dict[g])].reset_index(drop=True),[], f"group{g}.csv",g)

In [None]:
result = {}
result[0] = pd.read_csv(f'lgb{0}')[['stock_id','time_id',f'pred_lgb{0}','target']]
result[0].rename(columns={f'pred_lgb{0}':'pred'},inplace=True)
for g in range(1, len(train_time_group_dict)):
    result[g] = pd.read_csv(f'lgb{g}')[['stock_id','time_id',f'pred_lgb{g}','target']]
    result[0] = result[0].merge(result[g][['stock_id','time_id',f'pred_lgb{g}']],on=['stock_id','time_id'],how='left')
    
result[0]['pred'].fillna(result[0]['pred_lgb1'],inplace=True)
result[0]['pred_lgb1'].fillna(result[0]['pred'],inplace=True)
result[0]['pred'] = (result[0]['pred'] + result[0]['pred_lgb1']) / 2
print(rmspe(np.sqrt(result[0]['target'].values),np.sqrt(result[0]['pred'].values)))

In [None]:
result[0][['target','pred','stock_id']].groupby('stock_id').mean().plot()

In [None]:
preds= {}
preds[0] = pd.read_csv(f'group{0}.csv',index_col='row_id')
for g in range(1, len(test_time_group_dict)):
    preds[g] = pd.read_csv(f'group{g}.csv',index_col='row_id')
    preds[g].rename(columns={'target':f'target_{g}'},inplace=True)
    preds[0] = preds[0].merge(preds[g][[f'target_{g}']],left_index=True,right_index=True,how='outer')
preds[0]['target'].fillna(preds[0]['target_1'],inplace=True)
preds[0]['target_1'].fillna(preds[0]['target'],inplace=True)
preds[0]['target'] = (preds[0]['target'] + preds[0]['target_1']) / 2
    
sub = preds[0].reset_index()
test[['row_id']].merge(sub[['row_id', 'target']],on='row_id',how='left').to_csv('submission.csv',index = False)
pd.read_csv('submission.csv',index_col='row_id')

In [None]:
for g in range(0, len(train_time_group_dict)):
    test_predictions, valid_predictions = train_and_evaluate(train.loc[train['time_id'].isin(train_time_group_dict[g])].reset_index(drop=True), test[test['time_id'].isin(test_time_group_dict[g])].reset_index(drop=True),[], f"group{g}.csv",g+10)

In [None]:
result_2 = {}
result_2[0] = pd.read_csv(f'lgb{0}')[['stock_id','time_id',f'pred_lgb{0}','target']]
result_2[0].rename(columns={f'pred_lgb{0}':'pred'},inplace=True)
for g in range(1, len(train_time_group_dict)):
    result_2[g] = pd.read_csv(f'lgb{g+10}')[['stock_id','time_id',f'pred_lgb{g+10}','target']]
    result_2[0] = result_2[0].merge(result_2[g][['stock_id','time_id',f'pred_lgb{g+10}']],on=['stock_id','time_id'],how='left')
    
result_2[0]['pred'].fillna(result_2[0]['pred_lgb11'],inplace=True)
result_2[0]['pred_lgb11'].fillna(result_2[0]['pred'],inplace=True)
result_2[0]['pred'] = (result_2[0]['pred'] + result_2[0]['pred_lgb11']) / 2
print(rmspe(np.sqrt(result_2[0]['target'].values),np.sqrt(result_2[0]['pred'].values)))

In [None]:
result_2[0][['target','pred','stock_id']].groupby('stock_id').mean().plot()

In [None]:
preds= {}
preds[0] = pd.read_csv(f'group{0}.csv',index_col='row_id')
for g in range(1, len(test_time_group_dict)):
    preds[g] = pd.read_csv(f'group{g}.csv',index_col='row_id')
    preds[g].rename(columns={'target':f'target_{g}'},inplace=True)
    preds[0] = preds[0].merge(preds[g][[f'target_{g}']],left_index=True,right_index=True,how='outer')
preds[0]['target'].fillna(preds[0]['target_1'],inplace=True)
preds[0]['target_1'].fillna(preds[0]['target'],inplace=True)
preds[0]['target'] = (preds[0]['target'] + preds[0]['target_1']) / 2
    
sub = preds[0].reset_index()
test[['row_id']].merge(sub[['row_id', 'target']],on='row_id',how='left').to_csv('submission_2.csv',index = False)
pd.read_csv('submission_2.csv',index_col='row_id')

In [None]:
sub = pd.read_csv('submission.csv',index_col='row_id')
sub2 = pd.read_csv('submission_2.csv',index_col='row_id')
sub['target'] = (sub['target'] + sub2['target']) / 2
sub.to_csv('submission.csv')
pd.read_csv('submission.csv',index_col='row_id')