# Kaggle - G-Research Crypto Forecasting | Model Fitting

This is the final notebook for submitting predictions. Use the previous modelling setup but with two changes:
1) Simplify the feature construction by not "market neutralising"
2) Remove the Lasso models

These changes are to help us stay within the 9 hour time limit. We note below the OOS score on the test set isn't too affected by these changes.

We also move all functions here to keep the notebook self-contained

In [5]:
import sys
import warnings
import os
import logging
import pickle
import time
from functools import reduce

import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, clone, RegressorMixin
from sklearn.multioutput import MultiOutputRegressor
from sklearn.ensemble import VotingRegressor
from sklearn.linear_model import Lasso
import lightgbm

from pandas.core.common import SettingWithCopyWarning

warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)

logger = logging.getLogger(__name__)

START_TIME = time.time()

USE_GPU = False # for lgbm model training

device_type = "gpu" if USE_GPU else "cpu"


# sys.path.append("/home/gresearch_crypto")
sys.path.append("gresearch_crypto")


import gresearch_crypto
env = gresearch_crypto.make_env()

iter_test = env.iter_test()

In [23]:
####################################################################################################
### Functions for data preparatino + prediction loop
####################################################################################################

TIME_LIMIT = 8.5 # Time limit in hours for fitting + predictions. After this submit the given sample predictions.

def time_elapsed():
    """Get the elapsed time from when this notebook was started"""
    return time.time() - START_TIME


def in_time_limit():
    """Returns True if the elapsed time from the notebook starting is < 8.5 hours.
    Else returns False.
    """
    elapsed_hours = time_elapsed() / 3600
    if elapsed_hours < TIME_LIMIT:
        return True
    return False


def chunk_list(l, chunk_size):
    """Iterate over a list in chunks"""
    for x in range(0, len(l), chunk_size):
        yield l[x : x + chunk_size]


def bar_feats_minimal(df):
    """Augment the given dataframe in place with some features for each bar."""
    midpoint = (df["Open"] + df["Close"]) / 2
    feats = {
        "rel_avg": df["VWAP"] / midpoint,
        "avg_t_size": (df["Volume"] / df["Count"])
        ** (1 / 10),  # average # of units per transaction
        "dollar_vol": np.log(df["Volume"] * df["VWAP"]),  # dollar volume traded
        "rel_dev": ((df["High"] - df["Low"]) / midpoint) ** (1 / 3),
        "shadow_diff": (df["High"] + df["Low"]) / (2 * midpoint) - 1,
    }
    for name, feat in feats.items():
        df.loc[:, name] = feat



def ts_feats_minimal(df, window, price_mom_windows, include_target=True):
    """Add rolling z-score features including price momentum features and the target + target scale.
    Assumes index is timestamps + Asset_IDs

    Warning: changes input df in-place to save memory
    """
    to_z_score = [
        "rel_avg",
        "avg_t_size",
        "shadow_diff",
        "dollar_vol",
        "rel_dev",
    ]
    df_subset = df[to_z_score]
    
    target = df["Target"] if include_target else None
    log_close = np.log(df[["Close"]])

    log_close_grp = log_close.groupby(level="Asset_ID", as_index=False)

    for mom_window in price_mom_windows:
        feat_name = f"price_mom_{mom_window}"
        df_subset.loc[:, feat_name] = log_close_grp.diff(mom_window)["Close"]

    min_periods = max(1, window // 10)
    df_grp = (
        df_subset
        .groupby(level="Asset_ID", as_index=False)
        .rolling(window, min_periods=min_periods)
    )    

    norm_feats = df_subset - df_grp.mean().drop(columns="Asset_ID").fillna(0).reindex(index=df.index)
    roll_std = (
        norm_feats.groupby(level="Asset_ID", as_index=False)
        .rolling(window, min_periods=min_periods)
        .std()
        .drop(columns="Asset_ID")
        .ffill()
        .fillna(1)
        .reindex(index=df.index)
    )
    norm_feats = norm_feats / roll_std

    norm_feats = norm_feats.rename(
        mapper=lambda x: "roll_" + x, axis="columns"
    )

    norm_feats.loc[:, "target_scale"] = roll_std["price_mom_15"]

    if include_target:  # FIXME: potentially confusing target naming convention
        norm_feats.loc[:, "scaled_target"] = target / norm_feats["target_scale"]
        norm_feats.loc[:, "target"] = target

    return norm_feats



def all_feats_minimal(df, include_target=True):
    """Minimal version of all_feats"""
    price_mom_windows = (1, 5, 15, 80)
    window = 600

    bar_feats_minimal(df)  # augment in-place with bar features
    df.drop(
        columns=["Count", "Open", "High", "Low", "Volume", "VWAP"], inplace=True
    )  # drop unused columns

    df.set_index(["timestamp", "Asset_ID"], inplace=True)
    
    df = df.loc[df.index.duplicated(keep='last')]

    return ts_feats_minimal(df, window, price_mom_windows, include_target)



class ResultCacher:
    """Helper class to pickle and load a series of results to a given directory.
    
    Note: saves as pickles rather than eg parquet for low RAM requirements when reading files
    and for low file sizes (compared to csv).
    """
    def __init__(self, save_path):
        self.save_path = save_path # directory to cache intermediate results
        self.result_paths = [] # to keep track of cached files
        
        self.create_save_folder()
        
    def create_save_folder(self):
        os.makedirs(self.save_path)
        
    def get_save_path(self, filename):
        return os.path.join(self.save_path, filename)
        
    def cache_result(self, result, filename):
        """Pickles result to given filename in save_path"""
        file_save_path = self.get_save_path(filename)
        logger.info(f"Saving result to {file_save_path}")
        with open(file_save_path, "wb") as f:
            pickle.dump(result, f)
            
        self.result_paths.append(file_save_path)
            
    def load_all_results(self):
        """Loads all cached results and returns as a list"""
        results = []
        for path in self.result_paths:
            with open(path, "rb") as f:
                results.append(pickle.load(f))
        return results


def chunk_ts_feats(full_df, n_splits):
    """Run all_feats_minimal but chunk over the original df, save intermediate results to disk,
    and concatenate all results after finished to avoid memory issues for the full data preparation.
    
    Note: chunks over timestamps, not array indices to reduce potential ordering bugs.
    
    Warning: deletes all data in full_df to help avoid memory issues
    """
    include_target = True
    all_timestamps = np.sort(full_df["timestamp"].unique())
    chunk_size = len(all_timestamps) // n_splits + 1000 # avoid rounding issue for last chunk
    
    result_cacher = ResultCacher("/tmp/feat_cache")
    
    for i, time_chunk in enumerate(chunk_list(all_timestamps, chunk_size)):   
        df_chunk = full_df.loc[full_df.timestamp.isin(time_chunk)]
        feat_chunk = all_feats_minimal(df_chunk, include_target=True).dropna()
        result_cacher.cache_result(feat_chunk, f"feat_chunk_{i}.pkl")
        
    full_df.drop(full_df.index, inplace=True)  # reduce memory before reloading cache
    all_feats = result_cacher.load_all_results() # FIXME: does not release 
    return pd.concat(all_feats)
        
        

def last_n_ts_df(df, lookback, buffer):
    """Returns the last rows of df where the timestamp is in the last n of all
    timestamps. This is to concatenate with new data provided by the API so that
    rolling calculations can be performed.

    Warning: assumes df is ordered by timestamps, and could return more data than
    requested.
    """
    n_assets = 14
    return df.iloc[-(n_assets * lookback + buffer) :]


def drop_dup_ts_id(df):
    """Drops duplicated rows based on timestamp and asset_id columns, keeping the last rows found"""
    return df.loc[~df.duplicated(subset=["timestamp", "Asset_ID"], keep="first")]


def concat_old_new(old_data, new_data, index_check=False):
    """Concatenate old and new dfs for feature construction. Ensures
    any overlapping timestamps + assetids in the old df are discarded.
    """
    concat = pd.concat([old_data, new_data.drop(columns="row_id")], ignore_index=True)
    
    if index_check:
        return drop_dup_ts_id(concat)
    
    return concat


def subset_test_index(data, orig_data):
    """Subset the prepred data df on the original test timestamps + assetids"""
    orig_index = pd.MultiIndex.from_frame(orig_data[["timestamp", "Asset_ID"]])
    return data.loc[orig_index]


def join_rowids(preds, orig_test):
    """Join our predictions df with the rowids in the supplied test data df"""
    orig_join_on = orig_test[["timestamp", "Asset_ID", "row_id"]].set_index(
        ["timestamp", "Asset_ID"]
    )
    return preds.join(orig_join_on).reset_index(drop=True)


def _predict_loop(model, prev_data, new_data, sample_pred_df, n_to_keep, concat_check):
    """Function for looping over in env.iter_test():
    - Concatenate previous + new data
    - Cache last n rows of this df
    - Calculate new features
    - Drop rows to match the original training timestamps + asset ids
    - Calculate predictions on this subset
    - Join with the given row ids in the sample predictions df

    Returns: last n rows from prev + new data, predictions df
    """
    concat_data = concat_old_new(prev_data, new_data, concat_check)
    last_n = last_n_ts_df(concat_data, n_to_keep, buffer=15)
    feats = all_feats_minimal(concat_data, include_target=False)
    feats = subset_test_index(feats, new_data).replace([np.inf, -np.inf], np.nan).fillna(0)
    preds = model.predict(feats).rename("Target").to_frame()
    return last_n, join_rowids(preds, new_data)


def predict_loop(model, prev_data, new_data, sample_pred_df, n_to_keep, concat_check):
    """Wrap around _predict_loop to catch errors. Defaults to sample predictions if we encounter
    an error.
    """
    if not in_time_limit():
        return prev_data, sample_pred_df
    
    try:
        return _predict_loop(model, prev_data, new_data, sample_pred_df, n_to_keep, concat_check)
    except:
        return prev_data, sample_pred_df


####################################################################################################
### Functions for fitting
####################################################################################################


def feature_names(data_cols):
    """Take single/multiindex columns from a pandas df of features + targets
    and return only the feature names.
    """
    if isinstance(data_cols, pd.MultiIndex):
        data_cols = data_cols.get_level_values(0).unique()

    return [k for k in data_cols if "target" not in k]


def get_xy_arrays(data_df):
    """Returns a tuple of numpy arrays: features, scaled targets"""
    features = feature_names(data_df.columns)
    try:
        target = data_df["scaled_target"].values
    except:
        target = None
    return data_df[features].values, target


def scale_predictions(model, X, y_scale):
    """Undo the vol normalisation for the targets to get predictions
    for actual returns.
    """
    return model.predict(X) * y_scale


def weighted_correlation(a, b, weights):
    """Evaluation metric copied from the discussion page
    https://www.kaggle.com/c/g-research-crypto-forecasting/discussion/291845

    Excpects columns of actual targets, predictions and asset weights

    Args:
    - a, b: the actual and predicted weights
    - weights: the associated asset weights
    """
    w = np.ravel(weights)
    a = np.ravel(a)
    b = np.ravel(b)

    sum_w = np.sum(w)
    mean_a = np.sum(a * w) / sum_w
    mean_b = np.sum(b * w) / sum_w
    var_a = np.sum(w * np.square(a - mean_a)) / sum_w
    var_b = np.sum(w * np.square(b - mean_b)) / sum_w

    cov = np.sum((a * b * w)) / np.sum(w) - mean_a * mean_b
    corr = cov / np.sqrt(var_a * var_b)

    return corr



def score_from_df(pred_df, X):
    """Ensure indices are aligned before calculating the weighted correlation
    on the given predictions df.

    Note: the score will be nan if one df contains index values not found in the other.
    This likely indicates a bug in generating predictions since the predictions should
    have been calculated using X, so the indices should be the same in some order.
    """
    pred_df_reindexed = pred_df.reindex(index=X.index)
    return weighted_correlation(
        X.target.values,
        pred_df_reindexed.values,
        X.target_weight.values,
    )



def score_pool_model(model, X):
    """Convenience function for generating predictions and passing to score_from_df"""
    preds = model.predict(X)
    return score_from_df(preds, X)


class PoolRegressor(BaseEstimator, RegressorMixin):
    """Helper class for fitting pool models.

    Notes:
    - This depends on the input X being a pandas df. sklearn (deliberately) tends not
    to work well with pandas, but we don't use sklearn functionality extensively here
    and what we do use will be okay (for indexing sklearn seems to take care of things,
    see: https://github.com/scikit-learn/scikit-learn/blob/0d378913be6d7e485b792ea36e9268be31ed52d0/sklearn/utils/__init__.py#L307)
    """

    def __init__(self, base_model, clusters: dict):
        self.base_model = base_model
        self.clusters = clusters
        self.asset_ids_ = reduce(lambda x, y: [*x, *y], clusters.values())
        super().__init__()

    def fit(self, X, y=None, **fit_kwargs):
        """Expects a long df using the "targets" column as the targets, and
        any column without "target" in the name is used as a feature. Fit one
        model for each given cluster.

        Note: the case where each cluster has size 1 is the single asset model.
        """
        self.models_ = {}
        for cluster, asset_ids in self.clusters.items():
            X_subset, y_subset = get_xy_arrays(X.loc[(slice(None), list(asset_ids)), :])
            model_clone = clone(self.base_model)
            model_clone.fit(
                X_subset, y_subset, **fit_kwargs
            )  # fit separately for compatibility with Keras
            self.models_[cluster] = model_clone

        return self

    def predict(self, X) -> pd.DataFrame:
        """Take a long df of features and return a wide df of predictions
        with asset_ids as columns.
        """
        preds = []
        for cluster, asset_ids in self.clusters.items():
            X_subset = X.loc[(slice(None), asset_ids), :]
            cluster_preds = scale_predictions(
                self.models_[cluster], get_xy_arrays(X_subset)[0], X_subset.target_scale
            )
            preds.append(pd.Series(cluster_preds, index=X_subset.index))
            # asset_preds = self.models_[asset_id].predict(get_xy_arrays(X_subset)[0])
            # asset_preds = pd.Series(asset_preds, index=X_subset.index)
            # preds[asset_id] = asset_preds * X_subset.target_scale # scale back to returns predictions

        return pd.concat(preds).reindex(index=X.index)  # same order as input df

    def score(self, X, y=None):
        """Return the weighted correlation between all predictions"""
        return score_pool_model(self, X)


class PoolVotingRegressor(RegressorMixin):
    """Wrapper around VotingRegressor intended for use with PoolRegressors.
    In particular:
    - change the default scoring function to weighted regression
    - keep the original pandas indices to predictions
    """

    def __init__(self, estimators, weights=None):
        self.estimators = estimators
        self.weights = weights

    def fit(self, X, y=None):
        empty_y = np.empty_like(X.iloc[:, 0])
        self.voting_regressor_ = VotingRegressor(
            estimators=self.estimators, weights=self.weights
        )
        self.voting_regressor_.fit(X, empty_y)
        return self

    def predict(self, X):
        """Adds the original pandas index of X to the output of the wrapped
        VotingRegressor.
        """
        preds = self.voting_regressor_.predict(X)
        return pd.Series(preds, index=X.index)

    def score(self, X, y=None):  # FIXME: duplication from PoolRegressor
        """Return the weighted correlation between all predictions"""
        return score_pool_model(self, X)


####################################################################################################
### Hardcoded model config
####################################################################################################

clusters = {
    0: (4, 8, 10, 11),
    2: (0, 3, 12, 7), # move 7 in this cluster
    3: (2, 5, 13),
    4: (1, 6, 9),
} # arbitrary cluster labels



# final_allocations = {
#     'pool_lasso': 0.13803354577335214,
#     'pool_LGBM': 0.10673472669853526,
#     'single_lasso': 0.16562975805884267,
#     'single_LGBM': 0.14721038027645952,
#     'pool_all_lasso': 0.21147431517754645,
#     'pool_all_LGBM': 0.23091727401526385,
# }

final_allocations = {
    'pool_LGBM': 0.20,
    'single_LGBM': 0.30,
    'pool_all_LGBM': 0.50,
}


all_assetids = list(range(14))

pool_params = {
    # "lasso": {
    #     "model": Lasso(),
    #     "params": {"alpha": 0.0022222223000000004, "fit_intercept": False},
    # },
    "LGBM": {
        "model": lightgbm.LGBMRegressor(device_type=device_type),
        "params": {"learning_rate": 0.01, "lambda_l1": 0.0, "n_estimators": 400, "alpha": 3}
    },
}

single_params = {
    # "lasso": {
    #     "model": Lasso(),
    #     "params": {"alpha": 0.011111111188888889, "fit_intercept": False},
    # },
    "LGBM": {
        "model": lightgbm.LGBMRegressor(device_type=device_type),
        "params": {"learning_rate": 0.01, "lambda_l1": 0.03, "n_estimators": 100, "alpha": 3}
    },
}

all_params = {
    # "lasso": {
    #     "model": Lasso(),
    #     "params": {"alpha": 0.016733333333333333, "fit_intercept": False},
    # },
    "LGBM": {
        "model": lightgbm.LGBMRegressor(device_type=device_type),
        "params": {"learning_rate": 0.02, "lambda_l1": 0.01, "n_estimators": 200}
    },
}

param_dict = {
    "pool": {
        "params": pool_params,
        "clusters": clusters,
    },
    "single": {
        "params": single_params,
        "clusters": {k: [k] for k in all_assetids},
    },
    "pool_all": {
        "params": all_params,
        "clusters": {-1: all_assetids},
    },
}

all_models = {
    f"{setup}_{model_type}": PoolRegressor(model["model"].set_params(**model["params"]), clusters=model_dict["clusters"])
    for setup, model_dict in param_dict.items()
    for model_type, model in model_dict["params"].items()
}
models_list = [(k, model) for k, model in all_models.items()]
model_weight_list = [final_allocations[k] for k in all_models] # ensure in same order

voting_model = PoolVotingRegressor(estimators=models_list, weights=model_weight_list)

#### Rerun OOS predictions on test
(commented out to avoid running during submission)

Since we've changed the modelling setup we should check expectations on some OOS test set.

Running the below gives roughly 2.4% correlation - this is similar to our previous score, so perhaps the extra "market neutralisation" step and averaging with lasso models isn't adding any value to our predictions. We didn't test this, but we ideally should.

In [3]:
# voting_model_ = PoolVotingRegressor(estimators=models_list, weights=model_weight_list)

# timestamps = np.sort(train.index.get_level_values(0).unique())
# split_point = int(0.8 * len(timestamps))
# train_index = timestamps[:split_point]
# test_index = timestamps[split_point:]

# voting_model_.fit(train.loc[train_index])

# ad = pd.read_csv("asset_details.csv").set_index("Asset_ID")
# weights = ad["Weight"]
# weights /= weights.sum()

# target_weights = ad[["Weight"]].rename(columns={"Weight": "target_weight"})
# voting_model_.score(train.loc[test_index].join(target_weights, on="Asset_ID"))
# roughly 2.4%

In [7]:
train_data = concat_old_new(pd.read_csv("train.csv"), pd.read_csv("supplemental_train.csv"), index_check=True)

N_TO_KEEP = 1000
last_n = last_n_ts_df(train_data, N_TO_KEEP, 15)

In [5]:
! rm -rf /tmp/feat_cache

In [6]:
train = chunk_ts_feats(train_data, 3).replace([np.inf, -np.inf], np.nan).dropna() # FIXME: np.inf introduced, probably from std dev normalisation

  result = getattr(ufunc, method)(*inputs, **kwargs)


In [7]:
voting_model.fit(train)



<__main__.PoolVotingRegressor at 0x7f5715b59ed0>

In [9]:
check_index = True

for (test_df, sample_prediction_df) in iter_test:
    last_n, preds = predict_loop(voting_model, last_n, test_df, sample_prediction_df, N_TO_KEEP, check_index)
    env.predict(preds)

This version of the API is not optimized and should not be used to estimate the runtime of your code on the hidden test set.
CPU times: user 1.84 s, sys: 12.7 ms, total: 1.85 s
Wall time: 445 ms


In [4]:
with open("/tmp/model.pkl", "rb") as f:
    import pickle
    a, b = pickle.load(f)

AttributeError: Can't get attribute 'PoolVotingRegressor' on <module '__main__'>

In [10]:
all_feats_minimal??

[0;31mSignature:[0m [0mall_feats_minimal[0m[0;34m([0m[0mdf[0m[0;34m,[0m [0minclude_target[0m[0;34m=[0m[0;32mTrue[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mSource:[0m   
[0;32mdef[0m [0mall_feats_minimal[0m[0;34m([0m[0mdf[0m[0;34m,[0m [0minclude_target[0m[0;34m=[0m[0;32mTrue[0m[0;34m)[0m[0;34m:[0m[0;34m[0m
[0;34m[0m    [0;34m"""Minimal version of all_feats"""[0m[0;34m[0m
[0;34m[0m    [0mprice_mom_windows[0m [0;34m=[0m [0;34m([0m[0;36m1[0m[0;34m,[0m [0;36m5[0m[0;34m,[0m [0;36m15[0m[0;34m,[0m [0;36m80[0m[0;34m)[0m[0;34m[0m
[0;34m[0m    [0mwindow[0m [0;34m=[0m [0;36m600[0m[0;34m[0m
[0;34m[0m[0;34m[0m
[0;34m[0m    [0mbar_feats_minimal[0m[0;34m([0m[0mdf[0m[0;34m)[0m  [0;31m# augment in-place with bar features[0m[0;34m[0m
[0;34m[0m    [0mdf[0m[0;34m.[0m[0mdrop[0m[0;34m([0m[0;34m[0m
[0;34m[0m        [0mcolumns[0m[0;34m=[0m[0;34m[[0m[0;34m"Count"[0m[0;34m,[0m [0;34m"

In [11]:
train_data_ = train_data.set_index(["timestamp", "Asset_ID"])

In [20]:
a = train_data.duplicated(subset=["timestamp", "Asset_ID"], keep="first")

In [18]:
train_data

Unnamed: 0,timestamp,Asset_ID,Count,Open,High,Low,Close,Volume,VWAP,Target
0,1514764860,2,40.0,2376.580000,2399.500000,2357.140000,2374.590000,1.923301e+01,2373.116392,-0.004218
1,1514764860,0,5.0,8.530000,8.530000,8.530000,8.530000,7.838000e+01,8.530000,-0.014399
2,1514764860,1,229.0,13835.194000,14013.800000,13666.110000,13850.176000,3.155006e+01,13827.062093,-0.014643
3,1514764860,5,32.0,7.659600,7.659600,7.656700,7.657600,6.626713e+03,7.657713,-0.013922
4,1514764860,7,5.0,25.920000,25.920000,25.874000,25.877000,1.210873e+02,25.891363,-0.008264
...,...,...,...,...,...,...,...,...,...,...
2015107,1632182400,9,775.0,157.181571,157.250000,156.700000,156.943857,4.663725e+03,156.994319,
2015108,1632182400,10,34.0,2437.065067,2438.000000,2430.226900,2432.907467,3.975460e+00,2434.818747,
2015109,1632182400,13,380.0,0.091390,0.091527,0.091260,0.091349,2.193732e+06,0.091388,
2015110,1632182400,12,177.0,0.282168,0.282438,0.281842,0.282051,1.828508e+05,0.282134,
