In [None]:
from typing import Sequence, Tuple
import pandas as pd

In [None]:
class MockApi:
    def __init__(self, input_paths: Sequence[str], gid_col: str, export_gid_col: bool):
        '''
        YOU MUST UPDATE THE FIRST THREE LINES of this method.
        They've been intentionally left in an invalid state.

        Variables to set:
            input_paths: a list of two or more paths to the csv files to be served
            group_id_column: the column that identifies which groups of rows the API should serve.
                A call to iter_test serves all rows of all dataframes with the current group ID value.
            export_group_id_column: if true, the dataframes iter_test serves will include the group_id_column values.
        '''
        self.input_paths: Sequence[str] = input_paths
        self.group_id_column: str = gid_col
        self.export_group_id_column: bool = export_gid_col
        # iter_test is only designed to support at least two dataframes, such as test and sample_submission
        assert len(self.input_paths) >= 2

        self._status = 'initialized'
        self.predictions = []

    def iter_test(self) -> Tuple[pd.DataFrame]:
        '''
        Loads all of the dataframes specified in self.input_paths,
        then yields all rows in those dataframes that equal the current self.group_id_column value.
        '''
        if self._status != 'initialized':
            raise Exception('WARNING: the real API can only iterate over `iter_test()` once.')

        dataframes = []
        for pth in self.input_paths:
            dataframes.append(pd.read_csv(pth, low_memory=False))
        group_order = dataframes[0][self.group_id_column].drop_duplicates().tolist()
        dataframes = [df.set_index(self.group_id_column) for df in dataframes]

        for group_id in group_order:
            self._status = 'prediction_needed'
            current_data = []
            for df in dataframes:
                cur_df = df.loc[group_id].copy()
                # returning single line dataframes from df.loc requires special handling
                if not isinstance(cur_df, pd.DataFrame):
                    cur_df = pd.DataFrame({a: b for a, b in zip(cur_df.index.values, cur_df.values)}, index=[group_id])
                    cur_df.index.name = self.group_id_column
                cur_df = cur_df.reset_index(drop=not(self.export_group_id_column))
                current_data.append(cur_df)
            yield tuple(current_data)

            while self._status != 'prediction_received':
                print('You must call `predict()` successfully before you can continue with `iter_test()`', flush=True)
                yield None

        with open('submission.csv', 'w') as f_open:
            pd.concat(self.predictions).to_csv(f_open, index=False)
        self._status = 'finished'

    def predict(self, user_predictions: pd.DataFrame):
        '''
        Accepts and stores the user's predictions and unlocks iter_test once that is done
        '''
        if self._status == 'finished':
            raise Exception('You have already made predictions for the full test set.')
        if self._status != 'prediction_needed':
            raise Exception('You must get the next test sample from `iter_test()` first.')
        if not isinstance(user_predictions, pd.DataFrame):
            raise Exception('You must provide a DataFrame.')

        self.predictions.append(user_predictions)
        self._status = 'prediction_received'
        
    def reset_status(self):
        self._status = 'initialized'


def make_env(input_paths: Sequence[str], gid_col: str, export_gid_col: bool):
    return MockApi(input_paths, gid_col, export_gid_col)

In [None]:
def gen_features_online(test_df, cache, feature_dicts, max_ts_len, ):
    
    prices = feature_dicts['prices']
    sizes = feature_dicts['sizes']
    categories = feature_dicts['categories']
    
    df_v1, v1_features, v1_feature_category = gen_v1_features(test_df, prices)
    feature_dicts['v1_features'] = v1_features
    feature_dicts['v1_feature_category'] = v1_feature_category
    
    df_v2, v2_features = gen_v2_features(test_df, sizes)
    
    
    
    

In [None]:
input_files = ['test.csv', 'revealed_targets.csv', 'sample_submission.csv']
input_paths = [f'/home/lishi/projects/Competition/kaggle_2023/data/example_test_files/{x}' for x in input_files]

env = make_env(input_paths, gid_col='time_id', export_gid_col=True)

for (test_df, revealed_targets, sample_prediction_df) in env.iter_test():
    print(test_df)
    print(sample_prediction_df)
    env.predict(sample_prediction_df)
    env.reset_status()
    break

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import talib as ta
from itertools import combinations
import seaborn as sns
import os, sys, warnings
from time import time 

warnings.filterwarnings('ignore')
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 200)

import sys 
# add path of optiver2023 package to pythonpath 
sys.path.append(os.path.abspath('../data'))

import optiver2023 

In [2]:
def reduce_mem_usage(df, verbose=0):
    """
    Iterate through all numeric columns of a dataframe and modify the data type
    to reduce memory usage.
    """
    # Calculate the initial memory usage of the DataFrame
    start_mem = df.memory_usage().sum() / 1024**2

    for col in df.columns:
        col_type = df[col].dtype

        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            
            if str(col_type)[:3] == "int":
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                # Check if the column's data type is a float
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float32)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float32)
                    
    if verbose:
        print(f"Memory usage of dataframe is {start_mem:.2f} MB")
        end_mem = df.memory_usage().sum() / 1024**2
        print(f"Memory usage after optimization is: {end_mem:.2f} MB")
        decrease = 100 * (start_mem - end_mem) / start_mem
        print(f"Decreased by {decrease:.2f}%")

    return df

In [3]:
def gen_v1_features(df, prices):

    # V1 features: directly apply formula to a single row
    v1_features = {
        "volume": "ask_size + bid_size",
        "mid_price": "(ask_price + bid_price)/2",
        "liquidity_imbalance": "(bid_size-ask_size)/(bid_size+ask_size)",
        "matched_imbalance": "(imbalance_size - matched_size)/(matched_size+imbalance_size)",
        "size_imbalance": "bid_size / ask_size",
        "imbalance_intensity": "imbalance_size / volume",
        "matched_intensity": "matched_size / volume",
        "price_spread": "ask_price - bid_price",
        'market_urgency': 'price_spread * liquidity_imbalance',
        'depth_pressure': '(ask_size - bid_size) * (far_price - near_price)',
        'price_pressure': 'imbalance_size * (ask_price - bid_price)',
        'imbalance_with_flag': 'imbalance_size * imbalance_buy_sell_flag',
    }

    # include pair-wise price imbalances
    for c in combinations(prices, 2):
        v1_features[f"{c[0]}_{c[1]}_imbalance"] = f"({c[0]} - {c[1]}) / ({c[0]} + {c[1]})"
    
    for k, v in v1_features.items():
        df[k] = df.eval(v)
        
    v1_feature_category = {
        'minute': 'seconds_in_bucket // 60',
        'imb_buy_side': "(imbalance_buy_sell_flag == 1)",
        'imb_sell_side': "(imbalance_buy_sell_flag == -1)",
        'first_half_session': '(seconds_in_bucket <= 240)',
        'second_half_session': '(seconds_in_bucket > 240)'
    }
    
    for k, v in v1_feature_category.items():
        df[k] = df.eval(v).astype(np.int8)
        
    df = reduce_mem_usage(df, verbose=0)
        
    return df, list(v1_features.keys()), list(v1_feature_category.keys())

In [4]:
def gen_v2_features(df, v2_feat_cols):
    
    # V2 features: cross-section features
    # V2 features are generated on the groupby(['date_id', 'seconds_in_bucket'])
    # These features includes:
    # 1. statistics of V1 features (non-categorical)
    # 2. rank of V1 features for each stocks (non-categorical)
    group = df.groupby(['date_id', 'seconds_in_bucket'])

    v2_features_stats = ['mean', 'median', 'std', 'min', 'max']

    # calculate statistics of V1 features for each stock
    df_v2 = group[v2_feat_cols].agg(v2_features_stats).reset_index()
    df_v2.columns = ['date_id', 'seconds_in_bucket'] + [f"{c[1]}_{c[0]}" for c in df_v2.columns[2:]]
    df = df.merge(df_v2, on=['date_id', 'seconds_in_bucket'], how='left')
    
    # calculate rank of V1 features for each stock
    df_v2 = group[v2_feat_cols].rank(pct=True).add_prefix('rank_')
    df = df.merge(df_v2, left_index=True, right_index=True, how='left')
    
    df = reduce_mem_usage(df, verbose=0)
    
    v2_features =\
        [f"{s}_{c}" for c in v2_feat_cols for s in v2_features_stats] + \
        [f"rank_{c}" for c in v2_feat_cols]
        
    return df, v2_features

In [5]:
# !!! Requrires at least 11 timesteps to calculate all rolling statistics
def gen_v3_features(df, prices, sizes, v1_features):
    # V3 features: rolling statistics of V1 features (non-categorical)
    # V3 features are generated on the groupby(['date_id', 'stock_id'])
    # here we introduce ta-lib functions to calculate TA indicators

    # V3.1 relative change of V1 features by shift(1)
    # for prices, we calculate the change in basis points (*1e4)
    # for other features, we calculate the change in percentage (*1e2)
    group_by_stock = df.groupby(['date_id', 'stock_id'])
    
    relative_price = group_by_stock[prices].pct_change(1).add_prefix('pct_')*1e4
    relative_others = group_by_stock[sizes+v1_features].pct_change(1).add_prefix('pct_')*1e2

    df = pd.concat([df, relative_price, relative_others], axis=1)
    v3_features = list(relative_price.columns) + list(relative_others.columns)
    
    # V3.2 Simple TA indicators
    # Those are simple TA indicators that use only one feature
    df_v3 = group_by_stock[prices + sizes + v1_features].rolling(5).agg(['mean', 'std', 'max', 'min']).reset_index()
    stats_cols = [f"{c[1]}_{c[0]}_5" for c in df_v3.columns[2:]]
    df_v3.columns = ['date_id', 'stock_id'] + stats_cols
    df_v3.set_index('_level_2_5', inplace=True)
    df_v3.drop(columns=['date_id', 'stock_id'], inplace=True)
    
    df = df.merge(df_v3, left_index=True, right_index=True, how='left')
    v3_features += df_v3.columns.tolist()
        
    # # V3.3 TA indicators that use multiple features
    def composite_ta(df):

        ad_osc = ta.ADOSC(df['ask_price'], df['bid_price'], df['wap'], df['volume'], fastperiod=3, slowperiod=5)
        macd, macdsignal, macdhist = ta.MACD(df['wap'], fastperiod=5, slowperiod=11, signalperiod=3)
        
        return pd.DataFrame({
            'ema': ta.EMA(df['wap'], timeperiod=5),
            'rsi': ta.RSI(df['wap'], timeperiod=5),
            'cci': ta.CCI(df['ask_price'], df['bid_price'], df['wap'], timeperiod=5),
            'mfi': ta.MFI(df['ask_price'], df['bid_price'], df['wap'], df['volume'], timeperiod=5),
            'ad_osc': ad_osc,
            'macd': macd,
            'macdsignal': macdsignal,
            'macdhist': macdhist
        })
    
    df_v3 = group_by_stock.apply(composite_ta) 
    v3_features += df_v3.columns.tolist()
    
    df_v3.reset_index(inplace=True)
    df_v3.set_index('level_2', inplace=True)
    df_v3.drop(columns=['date_id', 'stock_id'], inplace=True)
    
    df = pd.concat([df, df_v3], axis=1)
    
    return df, v3_features

In [6]:
env = optiver2023.make_env()
iter_test = env.iter_test()
counter = 0 
max_ts_len = 12 # max length of ts to keep in cache

n_reveals = 0

feature_dicts = {
    'prices': ["reference_price", "far_price", "near_price", "ask_price", "bid_price", "wap"],
    'sizes':  ["matched_size", "bid_size", "ask_size", "imbalance_size"],
    "category": ["stock_id", "seconds_in_bucket", 'imbalance_buy_sell_flag']
}

cache = pd.DataFrame()
df_records = pd.DataFrame()

day_begin = time()

for (test, revealed_targets, sample_prediction) in iter_test:
    
    now_time = time()
    
    current_sec = test['seconds_in_bucket'].unique()

    test = test.fillna(0)
    test = reduce_mem_usage(test, verbose=0)
    
    df_v1, v1_feat, v1_feat_cat = gen_v1_features(test, feature_dicts['prices'])
    feature_dicts['v1_features'] = v1_feat
    feature_dicts['v1_feature_category'] = v1_feat_cat
    
    v2_feat_cols = feature_dicts['prices'] + feature_dicts['sizes'] + feature_dicts['v1_features']
    df_v2, v2_features = gen_v2_features(df_v1, v2_feat_cols)
    feature_dicts['v2_features'] = v2_features
    
    cache = pd.concat([cache, df_v2])
    cache.reset_index(drop=True, inplace=True)
    
    # In cache, we keep only the past max_ts_len seconds of data
    if counter > max_ts_len:
        sec_in_buk_list = cache['seconds_in_bucket'].unique()
        sec_in_buk_list.sort()
        sec_to_keep = sec_in_buk_list[-max_ts_len:]
        cache = cache[cache['seconds_in_bucket'].isin(sec_to_keep)]
        cache.reset_index(drop=True, inplace=True)
        
    df_v3, v3_features = gen_v3_features(
        cache, 
        feature_dicts['prices'],
        feature_dicts['sizes'],
        feature_dicts['v1_features']
        )
    
    feature_dicts['v3_features'] = v3_features
    
    df_v3.fillna(0, inplace=True)
    df_v3.replace([np.inf, -np.inf], 0, inplace=True)
    df_v3 = reduce_mem_usage(df_v3, verbose=0)
    
    df_test = df_v3[df_v3['seconds_in_bucket'].isin(current_sec)]
    df_records = pd.concat([df_records, df_test])
    
    sample_prediction['target'] = 0.5
    
    env.predict(sample_prediction)

    # after 54 timesteps, a new day starts
    if counter >= 54:
        print(f"New Day! Time used: {time() - day_begin:2f}s.")
        counter = 0
        day_begin = time()
    else:
        counter += 1
        
    if counter == 1:
        n_reveals += 1        
        print('Targets revealed for day', revealed_targets['revealed_date_id'].unique().tolist())
        if n_reveals > 1:
            df_records.merge(
                revealed_targets, 
                left_on=['date_id', 'stock_id', 'seconds_in_bucket'],
                right_on=['revealed_date_id', 'stock_id', 'seconds_in_bucket'], 
                how='left')
            print(df_records.shape)

This version of the API is not optimized and should not be used to estimate the runtime of your code on the hidden test set.
Targets revealed for day [477]
New Day! Time used: 41.040879s.
Targets revealed for day [478]
(11200, 463)
New Day! Time used: 52.635002s.
Targets revealed for day [479]
(18600, 463)
New Day! Time used: 79.226819s.


In [7]:
df_records.shape

(28200, 463)

In [None]:
df = pd.read_csv("../data/train.csv", nrows=10000)
df = df[~df['target'].isnull()] 

df['far_price'] = df['far_price'].fillna(0)
df['near_price'] = df['near_price'].fillna(0)

df = reduce_mem_usage(df, verbose=1)

df.head()

In [None]:
group = df.groupby(['date_id', 'seconds_in_bucket'])

In [None]:
v2_feat_cols = ["reference_price", "far_price", "near_price", "ask_price", "bid_price", "wap"]

v2_features_stats = ['mean', 'median', 'std', 'min', 'max']

# calculate statistics of V1 features for each stock
df_v2 = group[v2_feat_cols].agg(v2_features_stats).reset_index()
df_v2.columns = ['date_id', 'seconds_in_bucket'] + [f"{c[1]}_{c[0]}" for c in df_v2.columns[2:]]
# df = df.merge(df_v2, on=['date_id', 'seconds_in_bucket'], how='left')
df_v2 

In [None]:
sub_df = df[(df['date_id'] == 0) & (df['seconds_in_bucket'] == 0)]

sub_df.groupby(['date_id', 'seconds_in_bucket'])[v2_feat_cols].agg(v2_features_stats).reset_index()