## 前処理


In [1]:
import pandas as pd
import polars as pl
import numpy as np
import json

data_dir = "./optiver-trading-at-the-close"
train_df = pl.read_csv(
    f"{data_dir}/train.csv",
    dtypes={
        "stock_id": pl.UInt8,
        "date_id": pl.UInt16,
        "seconds_in_bucket": pl.UInt16,
        "reference_price": pl.Float32,
        "far_price": pl.Float32,
        "near_price": pl.Float32,
        "bid_price": pl.Float32,
        "ask_price": pl.Float32,
        "wap": pl.Float32,
        "time_id": pl.UInt16,
        "target": pl.Float32,
        "imbalance_buy_sell_flag": pl.Int8,
    },
)

# indexのウエイト
# weight = json.load(open(f"./weight.json", "r"))
# weight_df = pl.DataFrame(
#     zip(range(200), weight), schema=[("stock_id", pl.UInt8), ("weight", pl.Float32)]
# )

In [2]:
train_df = train_df.with_columns(pl.col("target").fill_nan(pl.col("target").median()))

In [3]:
train_df.null_count()

stock_id,date_id,seconds_in_bucket,imbalance_size,imbalance_buy_sell_flag,reference_price,matched_size,far_price,near_price,bid_price,bid_size,ask_price,ask_size,wap,target,time_id,row_id
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,220,0,220,220,2894342,2857180,220,0,220,0,220,0,0,0


In [4]:
from typing import List
from itertools import combinations
from typing import Tuple

# @title Feature Engineering Functions


def lag_function(
    df: pl.DataFrame, columns_to_lag: List[str], num_days_to_lag: List[int]
) -> pl.DataFrame:
    cols = [
        pl.col(columns_to_lag)
        .shift(i)
        .over(["stock_id", "seconds_in_bucket"])
        .name.prefix(f"lag{i}_")
        for i in num_days_to_lag
    ]
    return df.with_columns(*cols)


def create_diff_lagged_features_within_date(
    df: pl.DataFrame, columns_to_lag: List[str], lags: List[int]
) -> pl.DataFrame:
    cols = [
        pl.col(columns_to_lag)
        .sub(pl.col(columns_to_lag).shift(lag).over(["stock_id", "date_id"]))
        .name.suffix(f"_lag_{lag}")
        for lag in lags
    ]
    return df.with_columns(*cols)


def create_features_from_start(df: pl.DataFrame, columns: List[str]) -> pl.DataFrame:
    return df.with_columns(
        pl.col(columns)
        .sub(pl.col(columns).first().over(["stock_id", "date_id"]))
        .name.suffix("_from_start")
    )


def compute_imbalances(df: pl.DataFrame, columns: List[str], prefix="") -> pl.DataFrame:
    cols = []
    for col1, col2 in combinations(columns, 2):
        col1, col2 = sorted([col1, col2])
        total = pl.col(col1).add(pl.col(col2))
        col_name = f"{col1}_{col2}_imbalance_{prefix}"
        c = pl.col(col1).sub(pl.col(col2)).truediv(total)
        cols.append(pl.when(c.is_infinite()).then(float("nan")).otherwise(c).alias(col_name))
    return df.with_columns(*cols)


def compute_percentage_difference(
    df: pl.DataFrame, columns: Tuple[str, str], prefix=""
) -> pl.DataFrame:
    cols = []
    for col1, col2 in combinations(columns, 2):
        col1, col2 = sorted([col1, col2])
        col_name = f"{col1}_{col2}_percentage_difference_{prefix}"
        cols.append(
            pl.col(col1).sub(pl.col(col2)).truediv(pl.col(col2)).alias(col_name)
        )
    return df.with_columns(*cols)


def create_cumsum_features(df: pl.DataFrame, columns: List[str]) -> pl.DataFrame:
    return df.with_columns(
        pl.col(columns).cum_sum().over(["stock_id", "date_id"]).name.suffix("_cumsum")
    )


def create_deviation_within_seconds(
    df: pl.DataFrame,
    columns: List[str],
) -> pl.DataFrame:
    return df.with_columns(
        pl.col(columns)
        .sub(
            pl.col(columns).median().over(["date_id", "seconds_in_bucket"])
        )
        .name.suffix("_deviation")
    )


def feature_engineering(df: pl.DataFrame) -> pl.DataFrame:
    return df.with_columns(
        spread_eng=pl.col("ask_price").sub(pl.col("bid_price")),
        volume_eng=pl.col("ask_size").add(pl.col("bid_size")),
        volumne_imbalance_eng=pl.col("bid_size").sub(pl.col("ask_size")),
        imbalance_ratio=pl.col("imbalance_size").truediv(pl.col("matched_size")),
        price_spread_near_far=pl.col("near_price").sub(pl.col("far_price")),
        price_wap_difference_eng=pl.col("reference_price").sub(pl.col("wap")),
        weighted_imbalance_eng=pl.col("imbalance_size").mul(
            pl.col("imbalance_buy_sell_flag")
        ),
        bid_ask_ratio=pl.col("bid_size").truediv(pl.col("ask_size")),
        imbalance_to_bid_ratio_eng=pl.col("imbalance_size").truediv(pl.col("bid_size")),
        imbalance_to_ask_ratio_eng=pl.col("imbalance_size").truediv(pl.col("ask_size")),
        matched_size_to_total_size_ratio_eng=pl.col("matched_size").truediv(
            pl.col("bid_size").add(pl.col("ask_size"))
        ),
    )

In [5]:
def to_describe(columns) -> List:
    cols = []
    for col in columns:
        prefix = f"{col}_"
        cols.extend(
            [
                pl.col(col).mean().alias(f"{prefix}mean"),
                pl.col(col).std().alias(f"{prefix}std"),
                pl.col(col).min().alias(f"{prefix}min"),
                pl.col(col).quantile(0.25).alias(f"{prefix}q25"),
                pl.col(col).median().alias(f"{prefix}median"),
                pl.col(col).quantile(0.75).alias(f"{prefix}q75"),
                pl.col(col).max().alias(f"{prefix}max"),
            ]
        )
    return cols


def global_features(df: pl.DataFrame) -> pl.DataFrame:
    columns = ["bid_size", "ask_size", "bid_price", "ask_price"]
    groupby_cols = ["stock_id"]
    global_features_df = (
        df.group_by(groupby_cols).agg(to_describe(columns)).sort("stock_id")
    )
    global_features_df = global_features_df.with_columns(
        median_size=pl.col("bid_size_median").add(pl.col("ask_size_median")),
        std_size=pl.col("bid_size_std").add(pl.col("ask_size_std")),
        ptp_size=pl.col("bid_size_max").sub(pl.col("ask_size_min")),
        median_price=pl.col("bid_price_median").add(pl.col("ask_price_median")),
        std_price=pl.col("bid_price_std").add(pl.col("ask_price_std")),
        ptp_price=pl.col("bid_price_max").sub(pl.col("ask_price_min")),
    )
    return df.join(global_features_df, on="stock_id", how="left")

In [6]:
def calculate_stat_lag(df: pl.DataFrame, num_lags: int) -> pl.DataFrame:
    lags = [f"lag{i}_target" for i in range(1, num_lags + 1)]
    l = pl.concat_list(lags)
    return df.with_columns(
        target_mean=pl.mean_horizontal(*lags),
        target_range=pl.max_horizontal(*lags).sub(pl.min_horizontal(*lags)),
        target_std=l.list.eval(pl.element().std()).list.first(),
        target_variance=l.list.eval(pl.element().var()).list.first(),
        target_median=l.list.eval(pl.element().median()).list.first(),
    )

In [7]:
def split_by_date(df: pl.DataFrame, dates: Tuple[int, int]) -> pl.DataFrame:
    return df.filter(
        pl.col("date_id").ge(dates[0]).and_(pl.col("date_id").le(dates[1]))
    )

In [8]:
from polars import DataFrame


raw_cols = [
    "imbalance_size",
    "matched_size",
    "bid_size",
    "ask_size",
    "reference_price",
    "far_price",
    "near_price",
    "bid_price",
    "ask_price",
    "wap",
    "imbalance_buy_sell_flag",
]
columns_prices = [
    "reference_price",
    "far_price",
    "near_price",
    "bid_price",
    "ask_price",
    "wap",
]
columns_4prices = ["reference_price", "bid_price", "ask_price", "wap"]

columns_sizes = ["imbalance_size", "matched_size", "bid_size", "ask_size"]
columns_flag = ["imbalance_buy_sell_flag"]


diff_lags = [1, 2, 3, 6, 12, 18, 24]
diff_lags_extra = [30, 36, 42, 48]

num_of_target_lags = 12
target_lags = list(range(1, num_of_target_lags + 1))


def feature_pipeline(df: pl.DataFrame) -> pl.DataFrame:

    df = feature_engineering(df)
    df = compute_imbalances(df, columns_sizes, prefix="_sz_")
    df = compute_imbalances(df, columns_prices, prefix="_pr_")
    eng_features = [c for c in df.schema.keys() if "_eng" in c]
    imb_features_all = [c for c in df.schema.keys() if "_imb_" in c]
    imb_features_price = [c for c in df.schema.keys() if "_pr_" in c]
    imb_features_size = [c for c in df.schema.keys() if "_sz_" in c]

    diff_lag_cols = raw_cols + eng_features
    print(f"diff lagging {len(diff_lag_cols)} columns for {len(diff_lags)} lags.")
    df = create_diff_lagged_features_within_date(df, diff_lag_cols, diff_lags)

    cumsum_columns = columns_sizes + imb_features_size + eng_features
    print(f"cumsum for {len(cumsum_columns)} cols.")
    df = create_cumsum_features(df, cumsum_columns)

    deviation_cols = raw_cols + eng_features + imb_features_size  # + imb_features_price
    print(f"deviation {len(deviation_cols)} columns within seconds.")
    df = create_deviation_within_seconds(df, deviation_cols)

    print(f"lagging target column for {len(target_lags)} lags.")
    df = lag_function(df, ["target"], target_lags)

    df = global_features(df)
    df = calculate_stat_lag(df, num_lags=num_of_target_lags)
    return df.with_columns(
        pl.when(pl.col(col).is_infinite())
        .then(np.nan)
        .otherwise(pl.col(col))
        .alias(col)
        for col, value in df.schema.items()
        if value in [pl.Float32, pl.Float64]
    )


def feature_pipeline_rnn(df: pl.DataFrame) -> pl.DataFrame:
    df = compute_imbalances(df, columns_sizes, prefix="_sz_")
    df = compute_imbalances(df, columns_prices, prefix="_pr_")
    num_of_target_lags = 3
    target_lags = list(range(1, num_of_target_lags + 1))
    print(f"lagging target column for {len(target_lags)} lags.")
    df = lag_function(df, ["target"], target_lags)
    eng_features = [c for c in df.schema.keys() if "_eng" in c]
    imb_features_size = [c for c in df.schema.keys() if "_sz_" in c]
    deviation_cols = raw_cols + eng_features + imb_features_size
    df = create_deviation_within_seconds(df, deviation_cols)
    df = global_features(df)    

    return df

In [9]:
feature_df = feature_pipeline(train_df)
with open(f"data/train_eng.parquet", "wb") as f:
    feature_df.write_parquet(f)

diff lagging 19 columns for 7 lags.
cumsum for 18 cols.
deviation 25 columns within seconds.
lagging target column for 12 lags.


In [10]:
rnn_feature_df = feature_pipeline_rnn(train_df)
with open(f"data/train_eng_rnn.parquet", "wb") as f:
    rnn_feature_df.write_parquet(f)

lagging target column for 3 lags.
