In [1]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from joblib import Parallel, delayed
from sklearn.cluster import KMeans
from tqdm.notebook import tqdm
%matplotlib inline


# load data

In [2]:
data_dir = '/home/lzhao/data/tmp/optiver'


In [3]:
train_df = pd.read_csv(os.path.join(data_dir, 'train.csv'))
test_df = pd.read_csv(os.path.join(data_dir, 'test.csv'))
#book_example = pd.read_parquet(os.path.join(data_dir, 'book_train.parquet/stock_id=0'))
#trade_example = pd.read_parquet(os.path.join(data_dir, 'trade_train.parquet/stock_id=0'))


In [4]:
train_df['row_id'] = train_df['stock_id'].astype(
    str) + '-' + train_df['time_id'].astype(str)
test_df['row_id'] = test_df['stock_id'].astype(
    str) + '-' + test_df['time_id'].astype(str)


In [5]:
train_df.head(5)


Unnamed: 0,stock_id,time_id,target,row_id
0,0,5,0.004136,0-5
1,0,11,0.001445,0-11
2,0,16,0.002168,0-16
3,0,31,0.002195,0-31
4,0,62,0.001747,0-62


In [6]:
len(train_df.stock_id.unique())


112

In [7]:
train_df.time_id.describe()


count    428932.000000
mean      16038.972721
std        9365.103706
min           5.000000
25%        7854.000000
50%       15853.000000
75%       23994.000000
max       32767.000000
Name: time_id, dtype: float64

In [9]:
def get_cluster_labels(train_p):
    train_p = train_p.pivot(
        index='time_id', columns='stock_id', values='target')
    corr = train_p.corr()
    ids = corr.index
    kmeans = KMeans(n_clusters=7, random_state=0).fit(corr.values)
    print(kmeans.labels_)
    l = []
    for n in range(7):
        l.append([(x-1) for x in ((ids+1)*(kmeans.labels_ == n)) if x > 0])
    return l


cluster_labels = get_cluster_labels(train_df)


[3 4 3 1 3 0 1 3 5 1 0 4 3 3 3 3 3 1 3 3 6 0 0 3 6 3 0 3 6 3 6 3 3 0 4 6 3
 6 3 3 3 0 3 3 0 4 3 3 3 4 0 6 6 6 1 4 1 3 0 3 3 0 3 0 0 6 4 0 6 4 5 2 6 4
 4 3 4 0 6 4 4 3 0 0 4 4 6 6 3 4 0 3 3 3 3 6 0 6 6 0 0 3 0 0 3 3 0 0 3 4 3
 4]


# feature engineer

In [8]:
# 计算对数收益率
def log_return(list_stock_prices):
    '''log(s2/s1) = log(s2) - log(s1)'''
    return np.log(list_stock_prices).diff()

# 计算已实现波动率


def realized_volatility(series_log_return):
    return np.sqrt(np.sum(series_log_return**2))

# 计算wap价格


def calc_wap1(df):
    wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1']
           * df['bid_size1']) / (df['bid_size1'] + df['ask_size1'])
    return wap


def calc_wap2(df):
    wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2']
           * df['bid_size2']) / (df['bid_size2'] + df['ask_size2'])
    return wap


def count_unique(series):
    return len(np.unique(series))


In [10]:
def preprocessor_book(file_path):
    df = pd.read_parquet(file_path)
    # Calculate Wap
    df['wap1'] = calc_wap1(df)
    df['wap2'] = calc_wap2(df)

    # Calculate log returns
    df['log_return1'] = df.groupby(['time_id'])['wap1'].apply(log_return)
    df['log_return2'] = df.groupby(['time_id'])['wap2'].apply(log_return)

    # Calculate wap balance
    df['wap_balance'] = abs(df['wap1'] - df['wap2'])

    # Calculate spread
    df['price_spread1'] = (df['ask_price1'] - df['bid_price1']) / \
        ((df['ask_price1'] + df['bid_price1']) / 2)
    df['price_spread2'] = (df['ask_price2'] - df['bid_price2']) / \
        ((df['ask_price2'] + df['bid_price2']) / 2)
    df['bid_spread'] = df['bid_price1'] - df['bid_price2']
    df['ask_spread'] = df['ask_price1'] - df['ask_price2']
    df["bid_ask_spread"] = abs(df['bid_spread'] - df['ask_spread'])
    df['total_volume'] = (df['ask_size1'] + df['ask_size2']) + \
        (df['bid_size1'] + df['bid_size2'])
    df['volume_imbalance'] = abs(
        (df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2']))

    # Dict for aggregations
    create_feature_dict = {
        'wap1': [np.sum, np.mean, np.std],
        'wap2': [np.sum, np.mean, np.std],
        'log_return1': [np.sum, realized_volatility, np.mean, np.std],
        'log_return2': [np.sum, realized_volatility, np.mean, np.std],
        'wap_balance': [np.sum, np.mean, np.std],
        'price_spread1': [np.sum, np.mean, np.std],
        'price_spread2': [np.sum, np.mean, np.std],
        'bid_spread': [np.sum, np.mean, np.std],
        'ask_spread': [np.sum, np.mean, np.std],
        'total_volume': [np.sum, np.mean, np.std],
        'volume_imbalance': [np.sum, np.mean, np.std],
        "bid_ask_spread": [np.sum, np.mean, np.std],
    }

    def get_stats_window(seconds_in_bucket, add_suffix=False):
        # Function to get group stats for different windows (seconds in bucket)

        # Group by the window
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(
            ['time_id']).agg(create_feature_dict).reset_index()

        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]

        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature

    # Get the stats for different windows
    df_feature = get_stats_window(seconds_in_bucket=0, add_suffix=False)
    df_feature_400 = get_stats_window(seconds_in_bucket=400, add_suffix=True)
    df_feature_300 = get_stats_window(seconds_in_bucket=300, add_suffix=True)
    df_feature_200 = get_stats_window(seconds_in_bucket=200, add_suffix=True)

    # Merge all
    df_feature = df_feature.merge(
        df_feature_400, how='left', left_on='time_id_', right_on='time_id__400')
    df_feature = df_feature.merge(
        df_feature_300, how='left', left_on='time_id_', right_on='time_id__300')
    df_feature = df_feature.merge(
        df_feature_200, how='left', left_on='time_id_', right_on='time_id__200')

    # Drop unnecesary time_ids
    df_feature.drop(['time_id__400', 'time_id__300',
                    'time_id__200'], axis=1, inplace=True)

    # Create row_id so we can merge
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['time_id_'].apply(
        lambda x: f'{stock_id}-{x}')
    df_feature.drop(['time_id_'], axis=1, inplace=True)
    return df_feature


In [11]:
def preprocessor_trade(file_path):
    df = pd.read_parquet(file_path)
    df['log_return'] = df.groupby('time_id').price.apply(log_return)

    # Dict for aggregations
    create_feature_dict = {
        'log_return': [realized_volatility],
        'seconds_in_bucket': [count_unique],
        'size': [np.sum, realized_volatility, np.mean, np.std, np.max, np.min],
        'order_count': [np.mean, np.sum, np.max],
    }

    def get_stats_window(seconds_in_bucket, add_suffix=False):
        # Function to get group stats for different windows (seconds in bucket)

        # Group by the window
        df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(
            ['time_id']).agg(create_feature_dict).reset_index()

        # Rename columns joining suffix
        df_feature.columns = ['_'.join(col) for col in df_feature.columns]

        # Add a suffix to differentiate windows
        if add_suffix:
            df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
        return df_feature

    # Get the stats for different windows
    df_feature = get_stats_window(seconds_in_bucket=0, add_suffix=False)
    df_feature_400 = get_stats_window(seconds_in_bucket=400, add_suffix=True)
    df_feature_300 = get_stats_window(seconds_in_bucket=300, add_suffix=True)
    df_feature_200 = get_stats_window(seconds_in_bucket=200, add_suffix=True)

    def tendency(price, vol):
        df_diff = np.diff(price)
        val = (df_diff/price[1:])*100
        power = np.sum(val*vol[1:])
        return(power)

    lis = []
    for n_time_id in df['time_id'].unique():
        df_id = df[df['time_id'] == n_time_id]
        tendencyV = tendency(df_id['price'].values, df_id['size'].values)
        f_max = np.sum(df_id['price'].values > np.mean(df_id['price'].values))
        f_min = np.sum(df_id['price'].values < np.mean(df_id['price'].values))
        df_max = np.sum(np.diff(df_id['price'].values) > 0)
        df_min = np.sum(np.diff(df_id['price'].values) < 0)
        abs_diff = np.median(
            np.abs(df_id['price'].values - np.mean(df_id['price'].values)))
        energy = np.mean(df_id['price'].values**2)
        iqr_p = np.percentile(df_id['price'].values, 75) - \
            np.percentile(df_id['price'].values, 25)
        abs_diff_v = np.median(
            np.abs(df_id['size'].values - np.mean(df_id['size'].values)))
        energy_v = np.sum(df_id['size'].values**2)
        iqr_p_v = np.percentile(
            df_id['size'].values, 75) - np.percentile(df_id['size'].values, 25)

        lis.append({'time_id': n_time_id, 'tendency': tendencyV, 'f_max': f_max, 'f_min': f_min, 'df_max': df_max, 'df_min': df_min,
                   'abs_diff': abs_diff, 'energy': energy, 'iqr_p': iqr_p, 'abs_diff_v': abs_diff_v, 'energy_v': energy_v, 'iqr_p_v': iqr_p_v})

    df_lr = pd.DataFrame(lis)
    df_feature = df_feature.merge(
        df_lr, how='left', left_on='time_id_', right_on='time_id')

    # Merge all
    df_feature = df_feature.merge(
        df_feature_400, how='left', left_on='time_id_', right_on='time_id__400')
    df_feature = df_feature.merge(
        df_feature_300, how='left', left_on='time_id_', right_on='time_id__300')
    df_feature = df_feature.merge(
        df_feature_200, how='left', left_on='time_id_', right_on='time_id__200')

    # Drop unnecesary time_ids
    df_feature.drop(['time_id__400', 'time_id__300',
                    'time_id__200', 'time_id'], axis=1, inplace=True)
    df_feature = df_feature.add_prefix('trade_')
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['trade_time_id_'].apply(
        lambda x: f'{stock_id}-{x}')
    df_feature.drop(['trade_time_id_'], axis=1, inplace=True)

    return df_feature


In [12]:
def get_time_stock(df):
    # Function to get group stats for the stock_id and time_id

    # Get realized volatility columns
    vol_cols = ['log_return1_realized_volatility', 'log_return2_realized_volatility', 'log_return1_realized_volatility_400', 'log_return2_realized_volatility_400',
                'log_return1_realized_volatility_300', 'log_return2_realized_volatility_300', 'log_return1_realized_volatility_200', 'log_return2_realized_volatility_200',
                'trade_log_return_realized_volatility', 'trade_log_return_realized_volatility_400', 'trade_log_return_realized_volatility_300', 'trade_log_return_realized_volatility_200']

    # Group by the stock id
    df_stock_id = df.groupby(['stock_id'])[vol_cols].agg(
        ['mean', 'std', 'max', 'min', ]).reset_index()

    # Rename columns joining suffix
    df_stock_id.columns = ['_'.join(col) for col in df_stock_id.columns]
    df_stock_id = df_stock_id.add_suffix('_' + 'stock')

    # Group by the stock id
    df_time_id = df.groupby(['time_id'])[vol_cols].agg(
        ['mean', 'std', 'max', 'min', ]).reset_index()

    # Rename columns joining suffix
    df_time_id.columns = ['_'.join(col) for col in df_time_id.columns]
    df_time_id = df_time_id.add_suffix('_' + 'time')

    # Merge with original dataframe
    df = df.merge(df_stock_id, how='left', left_on=[
                  'stock_id'], right_on=['stock_id__stock'])
    df = df.merge(df_time_id, how='left', left_on=[
                  'time_id'], right_on=['time_id__time'])
    df.drop(['stock_id__stock', 'time_id__time'], axis=1, inplace=True)

    return df


In [13]:
def preprocessor(list_stock_ids, is_train=True):
    # Funtion to make preprocessing function in parallel (for each stock id)

    # Parrallel for loop
    def for_joblib(stock_id):
        # Train
        if is_train:
            file_path_book = os.path.join(
                data_dir, "book_train.parquet/stock_id=" + str(stock_id))
            file_path_trade = os.path.join(
                data_dir, "trade_train.parquet/stock_id=" + str(stock_id))
        # Test
        else:
            file_path_book = os.path.join(
                data_dir, "book_test.parquet/stock_id=" + str(stock_id))
            file_path_trade = os.path.join(
                data_dir, "trade_test.parquet/stock_id=" + str(stock_id))

        # Preprocess book and trade data and merge them
        #df_tmp = preprocessor_book(file_path_book)
        df_tmp = pd.merge(preprocessor_book(file_path_book), preprocessor_trade(
            file_path_trade), on='row_id', how='left')

        # Return the merge dataframe
        return df_tmp

    # Use parallel api to call paralle for loop
    df = Parallel(n_jobs=-1, verbose=1)(delayed(for_joblib)(stock_id)
                                        for stock_id in list_stock_ids)

    # Concatenate all the dataframes that return from Parallel
    df = pd.concat(df, ignore_index=True)

    return df


In [14]:
train_stock_ids = train_df['stock_id'].unique()
train_ = preprocessor(train_stock_ids, is_train=True)


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 40 concurrent workers.
[Parallel(n_jobs=-1)]: Done 112 out of 112 | elapsed:  2.4min finished


In [15]:
train = train_df.merge(train_, on=['row_id'], how='left')
train = get_time_stock(train)


In [17]:
num_col = train.columns.difference(
    ['time_id', 'target', 'row_id', 'stock_id']).tolist()
num_col = [col for col in num_col if '_stock' not in col]
cat_col = 'stock_id'
fea_col = [cat_col]+num_col
print(len(fea_col), len(num_col), cat_col)
print(np.nanmean(train[num_col], axis=0).round(5).tolist())


256 255 stock_id
[-0.0002, -0.0002, -0.0002, -0.0002, 0.00012, 0.00012, 0.00011, 0.00011, -0.0764, -0.04995, -0.03724, -0.02469, 0.0004, 0.00039, 0.00039, 0.00039, 0.00017, 0.00016, 0.00016, 0.00016, 0.15191, 0.09935, 0.07406, 0.04911, 0.0002, 0.0002, 0.0002, 0.00019, 0.00012, 0.00011, 0.00011, 0.00011, 0.07551, 0.0494, 0.03682, 0.02442, -0.0, 0.0, 0.0, 0.0, 0.00423, 0.00334, 0.01001, 0.00334, 0.00109, 0.00153, 0.00285, 0.0087, 0.00285, 0.00091, 0.00133, 0.0023, 0.0072, 0.0023, 0.0007, 0.0011, 0.01244, 0.00423, 0.00139, 0.0019, 0.00023, 0.00022, 0.00022, 0.00022, -1e-05, 0.0, 1e-05, 1e-05, -0.0, 0.0, 0.0, 0.0, 0.00581, 0.00458, 0.01428, 0.00458, 0.00123, 0.00226, 0.00392, 0.01241, 0.00392, 0.00102, 0.00196, 0.00315, 0.01023, 0.00315, 0.00077, 0.00162, 0.01766, 0.00581, 0.00158, 0.00281, 0.00032, 0.00031, 0.0003, 0.0003, -1e-05, 0.0, 1e-05, 1e-05, 0.00066, 0.00065, 0.00064, 0.00064, 0.00022, 0.0002, 0.0002, 0.00019, 0.22264, 0.14366, 0.10655, 0.07036, 0.00106, 0.00104, 0.00103, 0.00103,

In [18]:
def load_book_data_by_stock_id(stock_id, is_train=True):
    if is_train == True:
        df = pd.read_parquet(os.path.join(
            data_dir, "book_train.parquet/stock_id=" + str(stock_id)))
    else:
        df = pd.read_parquet(os.path.join(
            data_dir, "book_test.parquet/stock_id=" + str(stock_id)))
    return df


def calc_price(df):
    diff = abs(
        df[['bid_price1', 'ask_price1', 'bid_price2', 'ask_price2']].diff())
    min_diff = np.nanmin(diff.where(lambda x: x > 0))
    n_ticks = (diff / min_diff).round()
    scale = 0.01 / np.nanmean(diff / n_ticks)
    return scale


def calc_prices(stock_id, is_train):
    try:
        book = load_book_data_by_stock_id(stock_id, is_train)
    except:
        return pd.DataFrame()
    book['wap1'] = calc_wap1(book)
    df = book.groupby('time_id').apply(
        calc_price).to_frame('price').reset_index()
    df['stock_id'] = stock_id
    df['first_wap'] = df['price'] * \
        book.groupby('time_id')['wap1'].first().values
    df['last_wap'] = df['price'] * \
        book.groupby('time_id')['wap1'].last().values
    df['mean_wap'] = df['price'] * \
        book.groupby('time_id')['wap1'].mean().values
    return df


In [74]:
prices_df1 = pd.concat(Parallel(n_jobs=4, verbose=51)(
    delayed(calc_prices)(r, False) for r in train_stock_ids))
prices_df2 = pd.concat(Parallel(n_jobs=4, verbose=51)(
    delayed(calc_prices)(r, True) for r in train_stock_ids))


[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done   1 tasks      | elapsed:    0.6s
[Parallel(n_jobs=4)]: Done   2 tasks      | elapsed:    0.6s
[Parallel(n_jobs=4)]: Done   3 tasks      | elapsed:    0.6s
[Parallel(n_jobs=4)]: Done   4 tasks      | elapsed:    0.6s
[Parallel(n_jobs=4)]: Done   5 tasks      | elapsed:    0.6s
[Parallel(n_jobs=4)]: Done   6 tasks      | elapsed:    0.6s
[Parallel(n_jobs=4)]: Done   7 tasks      | elapsed:    0.6s
[Parallel(n_jobs=4)]: Done   8 tasks      | elapsed:    0.6s
[Parallel(n_jobs=4)]: Done   9 tasks      | elapsed:    0.6s
[Parallel(n_jobs=4)]: Done  10 tasks      | elapsed:    0.6s
[Parallel(n_jobs=4)]: Done  11 tasks      | elapsed:    0.6s
[Parallel(n_jobs=4)]: Done  12 tasks      | elapsed:    0.6s
[Parallel(n_jobs=4)]: Batch computation too fast (0.1858s.) Setting batch_size=2.
[Parallel(n_jobs=4)]: Done  13 tasks      | elapsed:    0.6s
[Parallel(n_jobs=4)]: Done  14 tasks      | elapse

In [75]:
prices_df = pd.concat([prices_df1, prices_df2])


In [77]:
prices_df.shape


(428933, 6)

In [78]:
first_df = prices_df[['time_id', 'stock_id', 'first_wap']].pivot(
    'time_id', 'stock_id', 'first_wap')


In [79]:
last_df = prices_df[['time_id', 'stock_id', 'last_wap']].pivot(
    'time_id', 'stock_id', 'last_wap')


In [80]:
first_df = first_df.fillna(first_df.mean())
last_df = last_df.fillna(first_df.mean())


In [81]:
def get_nearby_time_id(query_df, template_df):
    def get_nearby_time_id_(time_id):
        template_df_ = template_df.drop(time_id, axis=1)
        query = query_df[time_id].values.repeat(
            len(template_df_.columns)).reshape(-1, len(template_df_.columns))
        diffs = np.square((template_df_ - query)/query).sum(axis=0)
        diffs = diffs.sort_values()[:2].reset_index().rename(
            columns={'time_id': 'tid', 0: 'loss'})
        diffs['time_id'] = time_id
        return diffs
    edge_df = pd.concat(Parallel(n_jobs=4, verbose=1)(
        delayed(get_nearby_time_id_)(time_id) for time_id in tqdm(query_df.columns)))
    edge_df = edge_df[['time_id', 'tid', 'loss']]
    edge_df.columns = ['a', 'b', 'w']
    return edge_df


In [82]:
last_edge_df = get_nearby_time_id(last_df.T, first_df.T)
first_edge_df = get_nearby_time_id(first_df.T, last_df.T)


  0%|          | 0/3831 [00:00<?, ?it/s]

[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done 116 tasks      | elapsed:    1.3s
[Parallel(n_jobs=4)]: Done 1316 tasks      | elapsed:    7.6s
[Parallel(n_jobs=4)]: Done 3316 tasks      | elapsed:   17.6s
[Parallel(n_jobs=4)]: Done 3831 out of 3831 | elapsed:   20.2s finished


  0%|          | 0/3831 [00:00<?, ?it/s]

[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done 200 tasks      | elapsed:    1.1s
[Parallel(n_jobs=4)]: Done 1400 tasks      | elapsed:    7.0s
[Parallel(n_jobs=4)]: Done 3400 tasks      | elapsed:   16.7s
[Parallel(n_jobs=4)]: Done 3831 out of 3831 | elapsed:   18.9s finished


In [83]:
edge_df = pd.concat([first_edge_df, last_edge_df])
edge_df = edge_df.groupby(['a', 'b'])['w'].min().to_frame('w').reset_index()


In [72]:
edge_df.head(5)


Unnamed: 0,a,b,w
0,5,1205,0.001942
1,5,26708,0.00129
2,5,30183,0.000688
3,11,2811,0.000387
4,11,29583,0.000537


In [84]:
edge_df.shape


(10119, 3)

In [85]:
THRD = 0.02
edge_df = edge_df.query('w < @THRD').reset_index(drop=True)


In [86]:
edge_df.shape


(9768, 3)

In [87]:
'''拓展到所有stock'''
edge_dfs = []
for stock_id in train_stock_ids:
    edge_df_ = edge_df.copy()
    edge_df_['a'] = str(stock_id)+'-' + edge_df.a.astype(str)
    edge_df_['b'] = str(stock_id)+'-' + edge_df.b.astype(str)
    edge_dfs.append(edge_df_)
edge_df = pd.concat(edge_dfs).reset_index(drop=True)
# edge_df


In [88]:
edge_df.shape


(1094016, 3)