In [1]:
from tqdm import tqdm
from copy import deepcopy
import pandas as pd
import polars as pl
import numpy as np
import pickle

In [2]:
train_df = pl.read_parquet(
    "data/raw/train.parquet",
    use_pyarrow=True,   
    low_memory=True
).lazy()

In [3]:
test_df = pl.read_parquet(
    "data/raw/test.parquet",
    use_pyarrow=True,   
    low_memory=True
).lazy()

In [4]:
with open("features/best_pc1_contribution.pkl", "rb") as f:
    temp1 = pickle.load(f)
with open("features/best_label_correlation.pkl", "rb") as f:
    temp2 = pickle.load(f)
with open("features/best_overall.pkl", "rb") as f:
    temp3 = pickle.load(f)
with open("features/best_label_correlation_same_cluster.pkl", "rb") as f:
    temp5 = pickle.load(f)

In [5]:
added_features = [
    'bid_ask_interaction','bid_buy_interaction', 'bid_sell_interaction', 
    'ask_buy_interaction', 'ask_sell_interaction', 'buy_sell_interaction',
    'spread_indicator',
    'volume_weighted_buy', 'volume_weighted_sell', 'volume_weighted_bid', 'volume_weighted_ask',
    'buy_sell_ratio', 'bid_ask_ratio',
    'order_flow_imbalance',
    'buying_pressure', 'selling_pressure',
    'total_liquidity', 'liquidity_imbalance', 'relative_spread',
    'trade_intensity', 'avg_trade_size', 'net_trade_flow',
    'depth_ratio', 'volume_participation', 'market_activity',
    'effective_spread_proxy', 'realized_volatility_proxy',
    'normalized_buy_volume', 'normalized_sell_volume',
    'liquidity_adjusted_imbalance', 'pressure_spread_interaction', 
    'trade_direction_ratio', 'net_buy_volume', 'bid_skew' , 'ask_skew'
]

def preprocessing(df):
    # Add new features based on: https://www.kaggle.com/code/yich723/drw-data-standardization
    df = df.with_columns(
        # interaction features
        (pl.col("bid_qty") * pl.col("ask_qty")).alias("bid_ask_interaction"),
        (pl.col("bid_qty") * pl.col("buy_qty")).alias("bid_buy_interaction"),
        (pl.col("bid_qty") * pl.col("sell_qty")).alias("bid_sell_interaction"),
        (pl.col("ask_qty") * pl.col("buy_qty")).alias("ask_buy_interaction"),
        (pl.col("ask_qty") * pl.col("sell_qty")).alias("ask_sell_interaction"),
        (pl.col("buy_qty") * pl.col("sell_qty")).alias("buy_sell_interaction"),

        # spread features
        ((pl.col("ask_qty") - pl.col("bid_qty")) / (pl.col("ask_qty") + pl.col("bid_qty")) + 1e-8).alias("spread_indicator"),

        # volume weighted features
        (pl.col("buy_qty") * pl.col("volume")).alias("volume_weighted_buy"),
        (pl.col("sell_qty") * pl.col("volume")).alias("volume_weighted_sell"),
        (pl.col("bid_qty") * pl.col("volume")).alias("volume_weighted_bid"),
        (pl.col("ask_qty") * pl.col("volume")).alias("volume_weighted_ask"),

        # buy/sell and bid/ask ratios
        (pl.col("buy_qty") / (pl.col("sell_qty") + 1e-8)).alias("buy_sell_ratio"),
        (pl.col("bid_qty") / (pl.col("ask_qty") + 1e-8)).alias("bid_ask_ratio"),

        # order flow imbalance
        ((pl.col("buy_qty") - pl.col("sell_qty")) / (pl.col("volume") + 1e-8)).alias("order_flow_imbalance"),

        # buying and selling pressure
        (pl.col("buy_qty") / (pl.col("volume") + 1e-8)).alias("buying_pressure"),
        (pl.col("sell_qty") / (pl.col("volume") + 1e-8)).alias("selling_pressure"),

        # liquidity features
        (pl.col("bid_qty") + pl.col("ask_qty")).alias("total_liquidity"),
        ((pl.col("bid_qty") - pl.col("ask_qty")) / (pl.col("bid_qty") + pl.col("ask_qty") + 1e-8)).alias("liquidity_imbalance"),
        ((pl.col("ask_qty") - pl.col("bid_qty")) / (pl.col("volume") + 1e-8)).alias("relative_spread"),

        # trade related features (size, intensity, flow)
        ((pl.col("buy_qty") + pl.col("sell_qty")) / (pl.col("volume") + 1e-8)).alias("trade_intensity"),
        (pl.col("volume") / (pl.col("buy_qty") + pl.col("sell_qty") + 1e-8)).alias("avg_trade_size"),
        ((pl.col("buy_qty") - pl.col("sell_qty")) / (pl.col("buy_qty") + pl.col("sell_qty") + 1e-8)).alias("net_trade_flow"),

        # volume features in market + how market is working wrt to volume 
        ((pl.col("bid_qty") + pl.col("ask_qty")) / (pl.col("volume") + 1e-8)).alias("depth_ratio"),
        ((pl.col("buy_qty") + pl.col("sell_qty")) / (pl.col("bid_qty") + pl.col("ask_qty") + 1e-8)).alias("volume_participation"),
        (pl.col("volume") * (pl.col("bid_qty") + pl.col("ask_qty"))).alias("market_activity"),

        # spread proxy and realized vol proxy
        ((pl.col("buy_qty") - pl.col("sell_qty")).abs() / (pl.col("volume") + 1e-8)).alias("effective_spread_proxy"),
        (((pl.col("buy_qty") - pl.col("sell_qty")) / (pl.col("volume") + 1e-8)) * pl.col("volume")).alias("realized_volatility_proxy"),

        # normalized buy/sell volume
        (pl.col("buy_qty") / (pl.col("bid_qty") + 1e-8)).alias("normalized_buy_volume"),
        (pl.col("sell_qty") / (pl.col("ask_qty") + 1e-8)).alias("normalized_sell_volume"),

        # imbalance and spread interaction
        (((pl.col("buy_qty") - pl.col("sell_qty")) / (pl.col("volume") + 1e-8)) * ((pl.col("bid_qty") + pl.col("ask_qty")) / (pl.col("volume") + 1e-8))).alias("liquidity_adjusted_imbalance"),
        ((pl.col("buy_qty") / (pl.col("volume") + 1e-8)) * ((pl.col("ask_qty") - pl.col("bid_qty")) / (pl.col("ask_qty") + pl.col("bid_qty")) + 1e-8)).alias("pressure_spread_interaction"),

        # trade direction ratio, net buying volume, bid/ask skew
        (pl.col("buy_qty") / (pl.col("buy_qty") + pl.col("sell_qty") + 1e-8)).alias("trade_direction_ratio"),
        (pl.col("buy_qty") - pl.col("sell_qty")).alias("net_buy_volume"),
        (pl.col("bid_qty") / (pl.col("bid_qty") + pl.col("ask_qty") + 1e-8)).alias("bid_skew"),
        (pl.col("ask_qty") / (pl.col("bid_qty") + pl.col("ask_qty") + 1e-8)).alias("ask_skew")
    )

    # # remove inf and fill nas with 0
    # df = df.with_columns([
    #     pl.when(pl.col(col).is_infinite())  # check if value is inf or -inf
    #     .then(np.nan)                     # replace with NaN
    #     .otherwise(pl.col(col))          # otherwise keep original
    #     .alias(col)
    #     for col in col_names
    # ])

    df = df.fill_nan(0).fill_null(0)

    new_selected_variable_1 = deepcopy(temp1 + added_features)
    new_selected_variable_2 = deepcopy(temp2 + added_features)
    new_selected_variable_3 = deepcopy(temp3 + added_features)
    new_selected_variable_5 = deepcopy(temp5 + added_features)
    if "timestamp" in df.collect_schema().names():
        new_selected_variable_1.append("timestamp")
        new_selected_variable_2.append("timestamp")
        new_selected_variable_3.append("timestamp")
        new_selected_variable_5.append("timestamp")
    if "label" in df.collect_schema().names():
        new_selected_variable_1.append("label")
        new_selected_variable_2.append("label")
        new_selected_variable_3.append("label")
        new_selected_variable_5.append("label")
    cleaned_df_1 = df.select(new_selected_variable_1)
    cleaned_df_2 = df.select(new_selected_variable_2)
    cleaned_df_3 = df.select(new_selected_variable_3)
    cleaned_df_5 = df.select(new_selected_variable_5)

    # Make an extra df for time features
    cleaned_df_4 = deepcopy(cleaned_df_2)
    if "timestamp" in cleaned_df_4.schema.keys():
        cleaned_df_4 = cleaned_df_4.with_columns(
            (pl.col("timestamp").dt.minute() / 60 * 2 * np.pi).sin().alias("sin_minute"),
            (pl.col("timestamp").dt.hour() / 24 * 2 * np.pi).sin().alias("sin_hour"),
            (pl.col("timestamp").dt.day() / 31 * 2 * np.pi).sin().alias("sin_day"),
            (pl.col("timestamp").dt.month() / 12 * 2 * np.pi).sin().alias("sin_month")
        )

    return cleaned_df_1, cleaned_df_2, cleaned_df_3, cleaned_df_4, cleaned_df_5

In [6]:
cleaned_train_df_1, cleaned_train_df_2, cleaned_train_df_3, cleaned_train_df_4, cleaned_train_df_5 = preprocessing(train_df)
#cleaned_train_df_1.sink_parquet("data/cleaned/cleaned_train_1.parquet")
cleaned_train_df_2.sink_parquet("data/cleaned/cleaned_train_2.parquet")
#cleaned_train_df_3.sink_parquet("data/cleaned/cleaned_train_3.parquet")
#cleaned_train_df_4.sink_parquet("data/cleaned/cleaned_train_4.parquet")
cleaned_train_df_5.sink_parquet("data/cleaned/cleaned_train_5.parquet")

  if "timestamp" in cleaned_df_4.schema.keys():


In [7]:
cleaned_test_df_1, cleaned_test_df_2, cleaned_test_df_3, cleaned_test_df_4, cleaned_test_df_5 = preprocessing(test_df)
#cleaned_test_df_1.sink_parquet("data/cleaned/cleaned_test_1.parquet")
cleaned_test_df_2.sink_parquet("data/cleaned/cleaned_test_2.parquet")
#cleaned_test_df_3.sink_parquet("data/cleaned/cleaned_test_3.parquet")
#cleaned_test_df_4.sink_parquet("data/cleaned/cleaned_test_4.parquet")
cleaned_test_df_5.sink_parquet("data/cleaned/cleaned_test_5.parquet")

  if "timestamp" in cleaned_df_4.schema.keys():


In [8]:
popular_features_train = train_df.select(["volume", "bid_qty", "ask_qty", "buy_qty", "sell_qty"])
popular_features_train.sink_parquet("data/cleaned/popular_features_train.parquet")

In [9]:
popular_features_test = test_df.select(["volume", "bid_qty", "ask_qty", "buy_qty", "sell_qty"])
popular_features_test.sink_parquet("data/cleaned/popular_features_test.parquet")