In [1]:
import glob
import joblib
import lightgbm as lgb
import numpy as np
import os
import pandas as pd
from abc import ABCMeta, abstractmethod
from scipy.stats import median_absolute_deviation
from sklearn.model_selection import KFold
from tqdm import tqdm
from typing import Callable, List, Tuple, Union, Optional, Iterable

In [2]:
DATA_DIR = '../input/optiver-realized-volatility-prediction/'

def read_book(train_test: str, stock_id: int):
    if stock_id is None:
        return pd.concat(read_book(train_test, i) for i in range(127))

    book = pd.read_parquet(DATA_DIR + f"book_{train_test}.parquet/stock_id={stock_id}")
    book["stock_id"] = stock_id
    return book


def read_trade(train_test: str, stock_id: int):
    if stock_id is None:
        return pd.concat(read_trade(train_test, i) for i in range(127))

    trade = pd.read_parquet(DATA_DIR + f"trade_{train_test}.parquet/stock_id={stock_id}")
    trade["stock_id"] = stock_id
    return trade

In [3]:
# Functions to compute features from book


def wap(book: pd.DataFrame) -> pd.Series:
    return (
        book["bid_price1"] * book["ask_size1"] + book["ask_price1"] * book["bid_size1"]
    ) / (book["bid_size1"] + book["ask_size1"])


def wap2(book: pd.DataFrame) -> pd.Series:
    return (
        book["bid_price2"] * book["ask_size2"] + book["ask_price2"] * book["bid_size2"]
    ) / (book["bid_size2"] + book["ask_size2"])


def price_spread1(book: pd.DataFrame) -> pd.Series:
    return book["ask_price1"] - book["bid_price1"]


def price_spread2(book: pd.DataFrame) -> pd.Series:
    return book["ask_price2"] - book["bid_price2"]


def price_spread_bid(book: pd.DataFrame) -> pd.Series:
    return book["bid_price1"] - book["bid_price2"]


def price_spread_ask(book: pd.DataFrame) -> pd.Series:
    return book["ask_price1"] - book["ask_price2"]


def size_total_bid(book: pd.DataFrame) -> pd.Series:
    return book['bid_size1'] + book['bid_size2']


def size_total_ask(book: pd.DataFrame) -> pd.Series:
    return book['ask_size1'] + book['ask_size2']


def size_total1(book: pd.DataFrame) -> pd.Series:
    return book['bid_size1'] + book['ask_size1']


def size_total2(book: pd.DataFrame) -> pd.Series:
    return book['bid_size2'] + book['ask_size2']


def size_total3(book: pd.DataFrame) -> pd.Series:
    return book['bid_size1'] + book['bid_size2'] + book['ask_size1'] + book['ask_size2']


def size_spread1(book: pd.DataFrame) -> pd.Series:
    return book['bid_size1'] - book['ask_size1']


def size_spread2(book: pd.DataFrame) -> pd.Series:
    return book['bid_size2'] - book['ask_size2']


def size_spread3(book: pd.DataFrame) -> pd.Series:
    return book['bid_size1'] + book['bid_size2'] - book['ask_size1'] - book['ask_size2']


def log_return(book: pd.DataFrame) -> pd.Series:
    return wap(book).groupby(book.time_id).apply(np.log).diff()


def log_return2(book: pd.DataFrame) -> pd.Series:
    return wap2(book).groupby(book.time_id).apply(np.log).diff()


def instvar(book: pd.DataFrame):
    """Compute instantaneous variance."""
    return log_return(book) ** 2


def instvar2(book: pd.DataFrame):
    """Compute instantaneous variance."""
    return log_return2(book) ** 2


def updown(book: pd.DataFrame) -> pd.Series:
    n_samples = 10
    wap_groupby = wap(book).groupby(book.time_id)
    head = wap_groupby.head(n_samples).groupby(book.time_id).mean()
    tail = wap_groupby.tail(n_samples).groupby(book.time_id).mean()
    return (tail - head).apply(np.sign)


def stock_id(book: pd.DataFrame) -> pd.Series:
    return book.stock_id.groupby(book.time_id).first()


def time_id(book: pd.DataFrame) -> pd.Series:
    return book.time_id.groupby(book.time_id).first()

In [4]:
# Functions to compute features from trade

def trade__size(trade: pd.DataFrame) -> pd.Series:
    return trade['size']

def trade__price(trade: pd.DataFrame) -> pd.Series:
    return trade['price']

def trade__order_count(trade: pd.DataFrame) -> pd.Series:
    return trade['order_count']

def trade__true_range(trade: pd.DataFrame) -> pd.Series:
    return trade['price'].max() - trade['price'].min()

In [5]:
from functools import partial


def derive(fn: Callable, name=None, **kwargs) -> Callable:
    """Derive a function,named name,which when called will behave like
    fn called with the keyword arguments kwargs.
    """
    function = partial(fn, **kwargs)
    if name is not None:
        function.__name__ = name
    return function

In [6]:
# Aggregation Functions


def ewm(series: pd.Series, halflife: float = 10.0) -> pd.Series:
    return series.ewm(halflife=halflife).mean().iat[-1]


ewm10 = derive(ewm, "ewm10", halflife=10.0)
ewm20 = derive(ewm, "ewm20", halflife=20.0)

In [7]:
from collections import OrderedDict


class CreateFeatures:
    """Class to compute features.

    Args:
        features: list[dict[str, (Callable, Callable)]]
            Each key is the name of a feature.
            Each value is a tuple: ``(fn, agg)`` where fn is a function applied to the book 
            to make series and agg is an aggregation function

            If agg is not None:
                feature (pd.Series) will be computed by 1) applying fn to a book,
                2) aggregating the output by `time_id`, and then 
                3) aggregate the groupby object with agg.
                fn receives pd.DataFrame of book and returns pd.Series of feature time-series.
                agg is any aggregation function that can be passed to pd.SeriesGroupby.agg.

            If agg is None:
                feature will be computed by fn(book).
    """

    def __init__(self, features: List[Tuple[Callable, Callable]]) -> None:
        # attribute _features is:
        # each key: name of feature "{fn name}__{aggregation name}"
        # each value: (fn, agg) 
        self._features = OrderedDict()
        for fn, agg in features:
            name = self._feature_name(fn, agg)
            self.register_feature(name, fn=fn, agg=agg)

    def compute_df_features(self, book, trade) -> pd.DataFrame:
        return pd.merge(
                        pd.DataFrame(
                        {
                            name: self._compute_feature(book, fn, agg)
                            for name, fn, agg in self.features()
                            if not 'trade__' in name
                        }
                        ).reset_index(drop=True),
                        pd.DataFrame(
                        {
                            name: self._compute_feature(trade, fn, agg)
                            for name, fn, agg in self.features()
                            if "trade__" in name or "stock_id" in name or "time_id" in name
                        }
                        ).reset_index(drop=True),
                        on=["stock_id", "time_id"],
                        how="outer"
        )

    def register_feature(self, name, fn, agg) -> None:
        """Adds a feature to the class."""
        self._features[name] = (fn, agg)
        
    def features(self) -> Iterable:
        """Returns an iterator over (name, fn, agg) of features."""
        for name, (fn, agg) in self._features.items():
            yield (name, fn, agg)

    @property
    def feature_names(self) -> List[str]:
        return list(self._features.keys())

    def _feature_name(self, fn, agg) -> str:
        name = fn.__name__
        if agg is not None:
            name += "__" + (agg if isinstance(agg, str) else agg.__name__)
        return name

    def _compute_feature(self, book_trade, fn, agg) -> pd.Series:
        output = fn(book_trade)
        if agg is not None:
            output = output.groupby(book_trade.time_id).agg(agg)
        return output

In [8]:
from itertools import product

features = [
    # book
    (stock_id, None),
    (time_id, None),
    (instvar, np.mean),  # volatility
    (instvar2, np.mean),  # volatility
    (updown, None),
    # trade
    (trade__true_range, None),
]


fns = [
    # book
    price_spread1, price_spread2, price_spread_bid, price_spread_ask, size_total_bid, size_total_ask,
    size_total1, size_total2, size_total3, size_spread1, size_spread2, size_spread3,
    # trade
    trade__size, trade__price, trade__order_count,
    ]
aggs = [np.mean, np.max, np.min, median_absolute_deviation, np.std, ewm10, ewm20]


for fn, agg in product(fns, aggs):
    features.append((fn, agg))

In [9]:
cf = CreateFeatures(features)

cf.feature_names

['stock_id',
 'time_id',
 'instvar__mean',
 'instvar2__mean',
 'updown',
 'trade__true_range',
 'price_spread1__mean',
 'price_spread1__amax',
 'price_spread1__amin',
 'price_spread1__median_absolute_deviation',
 'price_spread1__std',
 'price_spread1__ewm10',
 'price_spread1__ewm20',
 'price_spread2__mean',
 'price_spread2__amax',
 'price_spread2__amin',
 'price_spread2__median_absolute_deviation',
 'price_spread2__std',
 'price_spread2__ewm10',
 'price_spread2__ewm20',
 'price_spread_bid__mean',
 'price_spread_bid__amax',
 'price_spread_bid__amin',
 'price_spread_bid__median_absolute_deviation',
 'price_spread_bid__std',
 'price_spread_bid__ewm10',
 'price_spread_bid__ewm20',
 'price_spread_ask__mean',
 'price_spread_ask__amax',
 'price_spread_ask__amin',
 'price_spread_ask__median_absolute_deviation',
 'price_spread_ask__std',
 'price_spread_ask__ewm10',
 'price_spread_ask__ewm20',
 'size_total_bid__mean',
 'size_total_bid__amax',
 'size_total_bid__amin',
 'size_total_bid__median_abs

In [10]:
def create_df_features(cf: CreateFeatures, train_test: str = None) -> pd.DataFrame:
    paths = glob.glob(DATA_DIR + f"book_{train_test}.parquet/*")
    dfs = []
    for path in tqdm(paths):
        book = pd.read_parquet(path)
        trade = pd.read_parquet(path.replace('book', 'trade'))
        book["stock_id"] = trade["stock_id"] = int(path.split("=")[-1])
        dfs.append(cf.compute_df_features(book, trade))
    df_features = pd.concat(dfs).reset_index(drop=True)
    return df_features

In [11]:
class Model(metaclass=ABCMeta):

    def __init__(self, run_fold_name: str, feature_names: List[str],
                 params: dict) -> None:
        ''' Constructor

        :param feature_names: list of feature names to specify columns of feature dataframe
        :param params: hyper parameters
        '''
        self.run_fold_name = run_fold_name
        self.feature_names = feature_names
        self.params = params
        self.model = None

    @abstractmethod
    def train(self, X_train: pd.DataFrame, y_train: pd.Series,
              X_valid: Optional[pd.DataFrame] = None,
              y_valid: Optional[pd.Series] = None
              ) -> None:
        ''' trains a model

        :param X_train: features of training data
        :param y_train: targets of training data
        :param X_valid: features of validation data
        :param y_valid: targets of validation data
        '''
        pass

    @abstractmethod
    def predict(self, X: pd.DataFrame) -> np.array:
        ''' returns prediction output from a learned model

        :param X: features of test data or validation data
        :return: predicted value
        '''
        pass

    @abstractmethod
    def save_model(self) -> None:
        ''' saves a model '''
        pass

    @abstractmethod
    def load_model(self) -> None:
        ''' loads a model '''
        pass

In [12]:
class ModelLGB(Model):


    def train(self, X_train, y_train, X_valid=None, y_valid=None):

        params = self.params
        if X_valid is not None:
            # weights to change feval from RMSE to RMSPE
            # idea from: https://www.kaggle.com/c/optiver-realized-volatility-prediction/discussion/250324
            w_train = 1/np.square(y_train)
            w_valid = 1/np.square(y_valid)
            lgb_train = lgb.Dataset(X_train, y_train, weight=w_train)
            lgb_eval = lgb.Dataset(X_valid, y_valid, weight=w_valid)
            self.model = lgb.train(params,
                                   lgb_train,
                                   num_boost_round=100,
                                   valid_sets=lgb_eval,
                                   feval=rmspe,
                                   verbose_eval=50,
                                   early_stopping_rounds=100,
                                   categorical_feature=['stock_id']
                                  )
        else:
            w_train = 1/np.square(y_train)
            lgb_train = lgb.Dataset(X_train, y_train, weight=w_train)
            self.model = lgb.train(params,
                                   lgb_train,
                                   num_boost_round=100,
                                   feval=rmspe,
                                   categorical_feature=['stock_id']
                           )

    def predict(self, X_test):
        return self.model.predict(X_test)

    def save_model(self):
        self.model.save_model('model.txt')

    def load_model(self):
        self.model = lgb.Booster(model_file='model.txt')

In [13]:
# delf-defiend evaluation metric function
def rmspe(y_true: np.array, y_pred: np.array):
    ''' self-defined eval metric
        Root Mean Squared Percentage Error

    :return: name: str, eval_result: float, is_higher_better: bool
    '''
    if type(y_pred) == lgb.basic.Dataset:
        y_pred = y_pred.get_label()
    rmspe = (np.sqrt(np.mean(np.square((y_true - y_pred) / y_true))))
    return 'RMSPE', rmspe, False

In [14]:
class Util:
    @classmethod
    def dump(cls, value, path):
        os.makedirs(os.path.dirname(path), exist_ok=True)
        joblib.dump(value, path, compress=True)

    @classmethod
    def load(cls, path):
        return joblib.load(path)

    @classmethod
    def submission(cls, df_pred: pd.DataFrame) -> None:
        row_id = df_pred['stock_id'].apply(str) + '-' + df_pred['time_id'].apply(str)
        df_submission = pd.DataFrame({'row_id': row_id, 'target':df_pred['target']})
        df_submission.to_csv('submission.csv', index=False)

In [15]:
class Runner:

    def __init__(self, run_name: str, model_cls: Callable[[str, dict], Model],
                 feature_names: List[str], df_features_train: pd.DataFrame,
                 df_features_test: pd.DataFrame, params: dict):
        ''' Constructor

        :param run_name: name of run
        :param model_cls: class of model
        :param feature_names: list of feature names
        :param params: hyper parameters
        '''
        self.run_name = run_name
        self.model_cls = model_cls
        self.feature_names = feature_names
        self.X_train_all = df_features_train
        self.X_test = df_features_test
        self.params = params
        self.n_fold = 4

    def _train_fold(self, i_fold: Union[int, str]) -> Tuple[
                    Model, Optional[np.array],
                    Optional[np.array], Optional[float]]:
        ''' specifies number of fold for cv then learns & evaluates

        :param i_fold: number of fold ('all' for all)
        :return: a tuple of instance of model, index of record,
                 predicted value, and evaluation score
                 (returns only model if i_fold=='all')
        '''
        # load train data
        y_train_all = self._load_y_train()
        self.X_train_all['stock_id'] = self.X_train_all['stock_id'].astype('int64')
        self.X_train_all['time_id'] = self.X_train_all['time_id'].astype('int64')
        Xy_train_all = pd.merge(self.X_train_all, y_train_all,
                                left_on=['stock_id','time_id'], right_on=['stock_id','time_id'],
                                how='inner')
        X_train_all = Xy_train_all[self.feature_names]
        y_train_all = Xy_train_all['target']

        validation = i_fold != 'all'
        if validation:
            # split data into training and validation
            idx_train, idx_valid = self._load_index_fold(i_fold)
            X_train = X_train_all.iloc[idx_train]
            y_train = y_train_all.iloc[idx_train]
            X_valid = X_train_all.iloc[idx_valid]
            y_valid = y_train_all.iloc[idx_valid]

            # execute learning
            model = self._build_model(i_fold)
            model.train(X_train, y_train, X_valid, y_valid)

            # prediction and evaluation with validation data
            pred_valid = model.predict(X_valid)
            _, score, _ = rmspe(y_true=y_valid, y_pred=pred_valid)

            # return model, index, prediction, and score
            return model, idx_valid, pred_valid, score
        else:
            # learining with all data
            model = self._build_model(i_fold)
            model.train(X_train_all, y_train_all)

            # return model
            return model, None, None, None

    def run_train_cv(self) -> None:
        ''' learns and evaluates by CV

        learns, evaluates, and saves models and scores of each fold
        '''
        scores = []
        idxes_valid = []
        preds = []

        # learning for each fold
        for i_fold in range(self.n_fold):
            model, idx_valid, pred_valid, score = self._train_fold(i_fold)

            # hold result
            idxes_valid.append(idx_valid)
            scores.append(score)
            preds.append(pred_valid)

            # save model
            model.save_model()
        print(f'Mean score of the folds: {np.mean(scores)}')

    def run_predict_cv(self) -> pd.DataFrame:
        ''' predicts for test data with the mean of
            each fold's model learned through CV
            
            :return: predicted target as the mean of folds
        '''
        preds = []
        # prediction for each fold's model
        for i_fold in range(self.n_fold):
            model = self._build_model(i_fold)
            model.load_model()
            pred = model.predict(self.X_test[self.feature_names])
            preds.append(pred)

        # mean of the prediction values
        pred_mean = np.mean(preds, axis=0)
        df_pred = self.X_test[self.X_test.columns[:2]]
        df_pred.loc[:, 'target'] = pred_mean
        return df_pred

    def run_train_all(self) -> None:
        ''' learns with all the training data and save the model '''
        # learning
        i_fold = 'all'
        model, _, _, _ = self._train_fold(i_fold)
        model.save_model()

    def run_predict_all(self) -> pd.DataFrame:
        ''' predicts for test data with the model learned with all the training data

        :return: predicted target
        '''
        
        # predict with the mdoel learned with all the learning data
        i_fold = 'all'
        model = self._build_model(i_fold)
        model.load_model()
        pred = model.predict(self.X_test[self.feature_names])
        df_pred = self.X_test[self.X_test.columns[:2]]
        df_pred['target'] = pred
        return df_pred

    def _build_model(self, i_fold: Union[int, str]) -> Model:
        ''' builds a model with a specified fold for cv

        :param i_fold: number of fold
        :return: instance of model
        '''
        # build a model with run name, fold, and class of model
        run_fold_name = f'{self.run_name}-{i_fold}'
        return self.model_cls(run_fold_name, self.feature_names, self.params)

    def _load_y_train(self) -> pd.DataFrame:
        ''' loads target of train data; ['stock_id', 'time_id', 'target']

        :return: target dataframe of train data
        '''
        return pd.read_csv(DATA_DIR + 'train.csv')

    def _load_index_fold(self, i_fold: int) -> np.array:
        ''' returns the record index in response to the fold specified for cv

        :param i_fold: number of the fold
        :return: record index for the fold
        '''
        # return index to split data for learning and validation
        y_train = self._load_y_train()
        x_dummy = np.zeros(len(y_train))
        skf = KFold(n_splits=self.n_fold, shuffle=True, random_state=31)
        return list(skf.split(x_dummy, y_train))[i_fold]

## main

In [16]:
params_lgb = {
            'boosting_type': 'gbdt',
            'num_leaves': 100,
            'learning_rate': 0.05,
            'n_estimators': 1000,
            'metric': 'rmse',
            }

# rerun=False if just using saved features works
# cf = CreateFeatures(rerun=True)
df_features_train = create_df_features(cf, "train")
df_features_test = create_df_features(cf, "test")
print(df_features_train.head())
feature_names = cf.feature_names

# learning and prediction by LightGBM and create submission file
run_name = 'lgb'
runner = Runner(run_name=run_name,
                model_cls=ModelLGB,
                feature_names=feature_names,
                df_features_train=df_features_train,
                df_features_test=df_features_test,
                params=params_lgb)
runner.run_train_all()
pred = runner.run_predict_all()
Util.submission(pred)

 18%|█▊        | 20/112 [16:45<1:18:18, 51.07s/it]