In [1]:
import os
import polars as pl

def generate_test_data(
    base_path: str,
    id_col: str, 
    date_id: str,  # 例: ("date_id", int)
    time_id: str,  # 例: ("time_id", int)
    pred_cols: list, # 例: [("responder_0", float)]
    input_cols: list,# 例: [("feature_00", float)]
    num_partitions=4,
    num_id=2,
    date_per_partition=2
):
    """
    id_col, date_id, time_id, pred_cols, input_colsをもとにテストデータを作成し、
    partition_id=0～(num_partitions-1)までのParquetファイルに出力する。

    各パーティションは、以下の条件でデータを生成：
      - id_col: 0～(num_id-1)までのIDを用意
      - date_id: partition i は [i*date_per_partition, (i+1)*date_per_partition-1]
      - time_id: 0～24 (25個)
    よって1パーティションあたり: date_per_partition * num_id * 25行。

    pred_cols, input_colsはタプルのリスト [(col_name, type), ...]で指定し、
    値は日付・ID・time_idに基づき決定的に割り当てる。
    """
    
    os.makedirs(base_path, exist_ok=True)

    # カラム名抽出
    id_col_name = id_col[0]
    date_col_name = date_id[0]
    time_col_name = time_id[0]

    # 1パーティションあたりの行数
    time_per_day = 25
    rows_per_partition = date_per_partition * num_id * time_per_day

    # 全ての列名
    all_col_names = [id_col_name, date_col_name, time_col_name] + [c[0] for c in pred_cols] + [c[0] for c in input_cols]

    for i in range(num_partitions):
        date_start = i * date_per_partition
        date_end = date_start + date_per_partition

        # データ生成
        # 二重三重ループで決定的に割り当て
        date_ids_list = []
        id_list = []
        time_ids_list = []

        for d in range(date_start, date_end):
            for sid in range(num_id):
                for t in range(time_per_day):
                    date_ids_list.append(d)
                    id_list.append(sid)
                    time_ids_list.append(t)

        # ベースとなる数値計算用
        # responder_0やfeature_00に割り当てるための基礎値
        # 例：val = d*10000 + sid*100 + t
        base_vals = []
        for d_id, s_id, t_id in zip(date_ids_list, id_list, time_ids_list):
            val = d_id * 10000 + s_id * 100 + t_id
            base_vals.append(val)

        # DataFrame用辞書
        data = {
            id_col_name: id_list,
            date_col_name: date_ids_list,
            time_col_name: time_ids_list,
        }

        # pred_cols割り当て（例：responder_0 = base_val + index_of_col）
        for idx, (c_name, c_type) in enumerate(pred_cols):
            # 例: responder_0 = base_val + idx
            c_values = [v + idx for v in base_vals]
            data[c_name] = c_values

        # input_cols割り当て（例：feature_00 = base_val/1000）
        for idx, (c_name, c_type) in enumerate(input_cols):
            # 例: feature_00 = base_val / 1000.0
            c_values = [v / 1000.0 + idx for v in base_vals]
            data[c_name] = c_values

        df = pl.DataFrame(data)
        # ソート（id_col_name, date_col_name, time_col_name順）
        df = df.sort([id_col_name, date_col_name, time_col_name])

        partition_dir = os.path.join(base_path, f"partition_id={i}")
        os.makedirs(partition_dir, exist_ok=True)
        df.write_parquet(os.path.join(partition_dir, "part-0.parquet"))

    print("Test data generated without rows_per_partition, using given columns.")

In [2]:
# 入力変数選択
id_col = ("symbol_id", int)
date_id = ("date_id", int)
time_id = ("time_id", int)

#preds_cols = [f"responder_{i}" for i in range(9)] # 目的変数
pred_cols = [("responder_0", float)]
input_cols = [("feature_00", float)] # 説明変数

target_cols = [id_col, date_id, time_id] + pred_cols + input_cols
target_cols

[('symbol_id', int),
 ('date_id', int),
 ('time_id', int),
 ('responder_0', float),
 ('feature_00', float)]

In [3]:
generate_test_data("test", id_col, date_id, time_id, pred_cols, input_cols, num_partitions=2, num_id=2, date_per_partition=3)

Test data generated without rows_per_partition, using given columns.


In [4]:
df = (
    pl.scan_parquet("/kaggle/working/test/partition_id=*/part-*.parquet", glob=True)
      .filter(pl.col("symbol_id") == 0)
      .collect()
)
df

symbol_id,date_id,time_id,responder_0,feature_00
i64,i64,i64,i64,f64
0,0,0,0,0.0
0,0,1,1,0.001
0,0,2,2,0.002
0,0,3,3,0.003
0,0,4,4,0.004
…,…,…,…,…
0,5,20,50020,50.02
0,5,21,50021,50.021
0,5,22,50022,50.022
0,5,23,50023,50.023


In [5]:
%%writefile make_walk_forward_feature.py
import os
import glob
import gc
import time
import polars as pl
from typing import List, Tuple, Union

# =============================================================================
# メタ情報を作る関数: 指定のファイル一覧から min/max(date_id) を取得
# =============================================================================
def create_metadata_from_files(
    files: List[str],
    date_id: str = "date_id",
):
    """
    files の一覧を走査し、各ファイルから date_id の min/max を取得。
    返り値: List[ (index, min_date, max_date) ]
    """
    meta = []
    for i, fpath in enumerate(sorted(files)):
        # Parquet読み込み（形式固定例）
        df_all = pl.read_parquet(fpath, columns=[date_id])
        min_id = df_all.select(pl.col(date_id).min()).item()
        max_id = df_all.select(pl.col(date_id).max()).item()
        meta.append((i, min_id, max_id))

    print("meta: partition_index, min_date, max_date =", meta)
    return meta

def partitions_for_range(
    meta: List[tuple],
    start_id: int,
    end_id: int
):
    """
    meta: [(index, min_date, max_date), ...]
    指定した日付範囲 [start_id, end_id] と重なるファイルの index を返す
    """
    needed = []
    for (idx, mini, maxi) in meta:
        if maxi >= start_id and mini <= end_id:
            needed.append(idx)
    return needed

# =============================================================================
# シンプルにParquetだけ読み書きするヘルパー
# =============================================================================
def _read_parquet_file(
    fpath: str,
    cols_to_read: List[str],
) -> pl.DataFrame:
    """
    Parquetを指定列だけ読み込む
    """
    return pl.read_parquet(fpath, columns=cols_to_read)

def _write_parquet_file(
    df: pl.DataFrame,
    out_path: str,
    compression: str = "zstd",
    compression_level: int = 9
):
    """
    Parquet書き込み
    """
    df.write_parquet(
        out_path,
        compression=compression,
        compression_level=compression_level
    )

def make_feature_walk_forward(
    files: List[str],
    output_base: str,
    id_col: str,
    date_id: str,
    time_id: str,
    pred_cols: List[str],
    input_cols: List[str],
    train_length: int,
    retroactive_size: int,
    add_feature_func
):
    original_cols = [id_col, date_id, time_id] + pred_cols + input_cols
    print("original_cols:", original_cols)

    # メタ情報作成
    meta = create_metadata_from_files(files, date_id=date_id)
    max_date_id_global = max(m[2] for m in meta)

    # 【変更点】metaから全パーティションの最小date_idを取り、そこからスタート
    min_date_id_global = min(m[1] for m in meta)
    # start_of_train = 0  # 従来
    start_of_train = min_date_id_global  # 修正後

    sprint_num = 1

    gen_feat = None
    while True:
        if start_of_train > max_date_id_global:
            print("break because start_of_train > max_date_id_global")
            break

        train_start = start_of_train
        train_end = start_of_train + train_length
        if train_end > max_date_id_global + 1:
            train_end = max_date_id_global + 1

        extended_start = max(train_start - retroactive_size, 0)
        required_start = extended_start
        required_end   = train_end - 1

        needed_partitions = partitions_for_range(meta, required_start, required_end)
        if len(needed_partitions) == 0:
            print("break because len(needed_partitions) == 0")
            break

        df_current = None
        for idx in needed_partitions:
            fpath = files[idx]
            print("read", fpath)
            df_part = _read_parquet_file(fpath, cols_to_read=[
                c for c in original_cols if c in pl.read_parquet(fpath, n_rows=1).columns
            ])
            if df_current is None:
                df_current = df_part
            else:
                df_current = pl.concat([df_current, df_part], how="vertical")

        if df_current is None or len(df_current) == 0:
            print("break because df_current is None or len(df_current) == 0")
            break

        # フィルタ
        df_current = df_current.filter(
            (pl.col(date_id) >= extended_start) & (pl.col(date_id) < train_end)
        )

        print(f"Sprint {sprint_num}: extended_start={extended_start}, train_end={train_end}, rows={len(df_current)}")

        # 特徴量作成
        df_out, gen_feat, gen_pred = add_feature_func(
            df_current,
            pred_cols=pred_cols,
            input_cols=input_cols,
            original_cols=original_cols
        )

        # train_df
        train_df = df_out.filter(
            (pl.col(date_id) >= train_start) & (pl.col(date_id) < train_end)
        )

        # 出力先
        sprint_dir = os.path.join(output_base, f"sprint{sprint_num}")
        os.makedirs(sprint_dir, exist_ok=True)
        train_path = os.path.join(sprint_dir, "train.parquet")

        train_df = train_df.collect()
        _write_parquet_file(train_df, train_path)
        print(f"  -> wrote {train_path}, rows={len(train_df)}")

        del df_current, df_out, train_df
        gc.collect()

        # 次のスプリントへ
        start_of_train = train_end
        sprint_num += 1

    print("Done (make_feature_walk_forward).")
    gc.collect()
    return gen_feat


# =============================================================================
# 2) train+valid分割 + train_shiftありのウォークフォワード
# =============================================================================
def make_feature_falk_forward_train_valid(
    # 入力
    files: List[str],
    output_base: str,
    # カラム指定
    id_col: str,
    date_id: str,
    time_id: str,
    pred_cols: List[str],
    input_cols: List[str],
    # パラメータ
    train_length: int,
    valid_length: int,
    train_shift: int,
    retroactive_size: int,
    # 特徴量作成関数
    add_feature_func
):
    """
    従来の train+valid + train_shift があるウォークフォワード版。
    """
    original_cols = [id_col, date_id, time_id] + pred_cols + input_cols
    print("original_cols:", original_cols)

    meta = create_metadata_from_files(files, date_id=date_id)
    max_date_id_global = max(m[2] for m in meta)

    start_of_train = 0
    sprint_num = 1

    prev_gen_feature_str = None
    prev_gen_preds_str   = None

    while True:
        if start_of_train > max_date_id_global:
            break

        train_start = start_of_train
        train_end   = start_of_train + train_length
        valid_start = train_end
        valid_end   = train_end + valid_length

        if valid_start > max_date_id_global:
            break
        if valid_end > max_date_id_global + 1:
            valid_end = max_date_id_global + 1

        extended_start = max(train_start - retroactive_size, 0)
        required_start = extended_start
        required_end   = valid_end - 1

        needed_partitions = partitions_for_range(meta, required_start, required_end)
        if len(needed_partitions) == 0:
            break

        df_current = None
        for idx in needed_partitions:
            fpath = files[idx]
            df_part = _read_parquet_file(fpath, cols_to_read=[
                c for c in original_cols if c in pl.read_parquet(fpath, n_rows=1).columns
            ])
            if df_current is None:
                df_current = df_part
            else:
                df_current = pl.concat([df_current, df_part], how="vertical")

        if df_current is None or len(df_current) == 0:
            break

        # filter
        df_current = df_current.filter(
            (pl.col(date_id) >= extended_start) & (pl.col(date_id) < valid_end)
        )
        print(f"Retroactive={extended_start}, Train=[{train_start},{train_end}), Valid=[{valid_start},{valid_end}), rows={len(df_current)}")

        gc.collect()

        df_out, generated_feature, generated_preds = add_feature_func(
            df_current,
            pred_cols=pred_cols,
            input_cols=input_cols,
            original_cols=original_cols
        )

        # 前回と比較
        current_gen_feature_str = repr(generated_feature)
        current_gen_preds_str   = repr(generated_preds)
        if prev_gen_feature_str is not None:
            if current_gen_feature_str != prev_gen_feature_str:
                raise Exception("generated_feature differs from previous sprint!")
        if prev_gen_preds_str is not None:
            if current_gen_preds_str != prev_gen_preds_str:
                raise Exception("generated_preds differs from previous sprint!")
        prev_gen_feature_str = current_gen_feature_str
        prev_gen_preds_str   = current_gen_preds_str

        train_df = df_out.filter((pl.col(date_id) >= train_start) & (pl.col(date_id) < train_end))
        valid_df = df_out.filter((pl.col(date_id) >= valid_start) & (pl.col(date_id) < valid_end))

        sprint_dir = os.path.join(output_base, f"sprint{sprint_num}")
        os.makedirs(sprint_dir, exist_ok=True)

        train_path = os.path.join(sprint_dir, "train.parquet")
        valid_path = os.path.join(sprint_dir, "valid.parquet")

        train_df = train_df.collect()
        valid_df = valid_df.collect()
        _write_parquet_file(train_df, train_path)
        _write_parquet_file(valid_df, valid_path)
        print(f"Sprint {sprint_num}: Train=[{train_start},{train_end}), Valid=[{valid_start},{valid_end})")
        print(f"  -> train={train_path}, rows={len(train_df)}")
        print(f"  -> valid={valid_path}, rows={len(valid_df)}")

        del df_current, df_out, train_df, valid_df
        gc.collect()

        start_of_train += train_shift
        sprint_num += 1

    print("Done (make_feature_falk_forward_train_valid).")
    gc.collect()

Writing make_walk_forward_feature.py


In [6]:
%%writefile time_serise_feature.py
import re
import polars as pl
from typing import List, Tuple

def create_stat_features_by(
    ldf: pl.LazyFrame,
    cols: List[str],
    key_by: List[str],
    agg_funcs: List[str],
) -> Tuple[pl.LazyFrame, List[str]]:
    """
    ldf を key_by で group_by し、cols に対して指定された agg_funcs を実行する汎用関数。
    例:
      agg_funcs に ["mean","std","min","max","median","sum","count","n_unique",
                    "last","first","skew","kurtosis","cv", "q0.25","q0.75" ...]
      のような文字列を指定できる。

    戻り値:
      (ldf_agg, generated_cols):
        ldf_agg : group_by(key_by).agg(...) の結果 (key_by + 各集計列 を持つ LazyFrame)
        generated_cols : 新しく生成された列名のリスト

    注意:
      - "q0.25" 等のquantile形式は "q0.x" のフォーマットで xをfloatとして解釈し expr.quantile(x)。
      - "cv" (coefficient of variation) = std/mean (mean=0に注意)
      - skew, kurtosis は Polars のバージョンによっては使えない場合あり
      - rolling_meanなどのウィンドウ関数はこのgroup_by集約とは別物

    例:
      aggregator_mapに含まれる文字列一覧を "agg_funcs" で指定
        -> "mean","std","cv","q0.25","skew"など
    """

    # Polarsでサポートする単一スカラー集約を文字列→lambda で定義
    def parse_quantile(alias: str, cexpr: pl.Expr) -> pl.Expr:
        # 例: alias == "q0.25" → 0.25
        #     alias == "q0.75" → 0.75
        m = re.match(r"q0\.(\d+)", alias)  # 例: "q0.25" -> group(1)=="25"
        if not m:
            # 不正形式ならそのまま col
            return cexpr
        float_str = "0." + m.group(1)
        q = float(float_str)
        return cexpr.quantile(q)  # quantile(0.25 / 0.75 etc.)

    aggregator_map = {
        "mean":      lambda cexpr: cexpr.mean(),
        "std":       lambda cexpr: cexpr.std(),
        "min":       lambda cexpr: cexpr.min(),
        "max":       lambda cexpr: cexpr.max(),
        "median":    lambda cexpr: cexpr.median(),
        "sum":       lambda cexpr: cexpr.sum(),
        "count":     lambda cexpr: cexpr.count(),
        "n_unique":  lambda cexpr: cexpr.n_unique(),
        "last":      lambda cexpr: cexpr.last(),
        "first":     lambda cexpr: cexpr.first(),
        "skew":      lambda cexpr: cexpr.skew(),
        "kurtosis":  lambda cexpr: cexpr.kurtosis(),
        # 変動係数 (coefficient of variation)
        "cv":        lambda cexpr: (cexpr.std() / cexpr.mean()),  # mean=0注意
    }

    agg_exprs = []
    generated_cols: List[str] = []

    for c in cols:
        for f in agg_funcs:
            alias_name = f"{c}_{'_'.join(key_by)}_{f}"

            # 1) quantile系:  "q0.25", "q0.50" etc.
            if f.startswith("q0."):
                expr = parse_quantile(f, pl.col(c)).alias(alias_name)
                agg_exprs.append(expr)
                generated_cols.append(alias_name)
                continue

            # 2) aggregator_map にあるか？
            aggregator = aggregator_map.get(f, None)
            if aggregator is not None:
                expr = aggregator(pl.col(c)).alias(alias_name)
                agg_exprs.append(expr)
                generated_cols.append(alias_name)
            else:
                raise Exception(f"create_stat_features_by: not defined {f}")

    # group_by and agg
    ldf_agg = ldf.group_by(key_by).agg(agg_exprs)

    return ldf_agg, generated_cols

import polars as pl
from typing import List, Tuple

def add_rolling_stats(
    ldf: pl.LazyFrame,
    cols: List[str],
    sort_keys: List[str],
    group_keys: List[str],
    n: int = 5,
) -> Tuple[pl.LazyFrame, List[str]]:
    """
    過去n期間の rolling_std(ボラティリティ) と rolling_mean を計算し、新しい列を追加する。

    Parameters
    ----------
    ldf : pl.LazyFrame
        処理対象のLazyFrame
    cols : List[str]
        計算対象の数値カラム (["feature_00","feature_01"]など)
    sort_keys : List[str]
        時系列順を保証するためにソートするカラムのリスト
        例: ["symbol_id","date_id","time_id"]
    group_keys : List[str]
        rolling計算を「どのカラム単位でパーティション切る」か
        例: ["symbol_id"]
    n : int, optional
        rollingのウィンドウサイズ (デフォルト=5)

    Returns
    -------
    (ldf_out, generated_cols):
      ldf_out       : 新しい列を追加したLazyFrame
      generated_cols: 作成された列の名前一覧
    """

    # 1) ソート (時系列順を正しく)
    if sort_keys:
        ldf = ldf.sort(by=sort_keys)

    # 2) group_keys があれば over() でパーティション分割
    def rolling_std_expr(col: str):
        if group_keys:
            return pl.col(col).rolling_std(window_size=n).over(group_keys)
        else:
            return pl.col(col).rolling_std(window_size=n)

    def rolling_mean_expr(col: str):
        if group_keys:
            return pl.col(col).rolling_mean(window_size=n).over(group_keys)
        else:
            return pl.col(col).rolling_mean(window_size=n)

    new_cols = []
    exprs = []

    for col_name in cols:
        # 出力列名
        std_col_name = f"{col_name}_rolling_std_{n}"
        mean_col_name = f"{col_name}_rolling_mean_{n}"
        new_cols.extend([std_col_name, mean_col_name])

        # rolling_std
        exprs.append(
            rolling_std_expr(col_name).alias(std_col_name)
        )
        # rolling_mean
        exprs.append(
            rolling_mean_expr(col_name).alias(mean_col_name)
        )

    ldf_out = ldf.with_columns(exprs)

    return ldf_out, new_cols


import polars as pl
from typing import List, Tuple

def add_rolling_stats(
    ldf: pl.LazyFrame,
    cols: List[str],
    sort_keys: List[str],
    group_keys: List[str],
    n: int = 5,
) -> Tuple[pl.LazyFrame, List[str]]:
    """
    過去n期間の rolling_std(ボラティリティ) と rolling_mean を計算し、新しい列を追加する。

    Parameters
    ----------
    ldf : pl.LazyFrame
        処理対象のLazyFrame
    cols : List[str]
        計算対象の数値カラム (["feature_00","feature_01"]など)
    sort_keys : List[str]
        時系列順を保証するためにソートするカラムのリスト
        例: ["symbol_id","date_id","time_id"]
    group_keys : List[str]
        rolling計算を「どのカラム単位でパーティション切る」か
        例: ["symbol_id"]
    n : int, optional
        rollingのウィンドウサイズ (デフォルト=5)

    Returns
    -------
    (ldf_out, generated_cols):
      ldf_out       : 新しい列を追加したLazyFrame
      generated_cols: 作成された列の名前一覧
    """

    # 1) ソート (時系列順を正しく)
    if sort_keys:
        ldf = ldf.sort(by=sort_keys)

    # 2) group_keys があれば over() でパーティション分割
    def rolling_std_expr(col: str):
        if group_keys:
            return pl.col(col).rolling_std(window_size=n).over(group_keys)
        else:
            return pl.col(col).rolling_std(window_size=n)

    def rolling_mean_expr(col: str):
        if group_keys:
            return pl.col(col).rolling_mean(window_size=n).over(group_keys)
        else:
            return pl.col(col).rolling_mean(window_size=n)

    new_cols = []
    exprs = []

    for col_name in cols:
        # 出力列名
        std_col_name = f"{col_name}_rolling_std_{n}"
        mean_col_name = f"{col_name}_rolling_mean_{n}"
        new_cols.extend([std_col_name, mean_col_name])

        # rolling_std
        exprs.append(
            rolling_std_expr(col_name).alias(std_col_name)
        )
        # rolling_mean
        exprs.append(
            rolling_mean_expr(col_name).alias(mean_col_name)
        )

    ldf_out = ldf.with_columns(exprs)

    return ldf_out, new_cols

import polars as pl
from typing import List, Tuple

def add_change_rate(
    ldf: pl.LazyFrame,
    cols: List[str],
    sort_keys: List[str],
    group_keys: List[str],
    n: int = 5,
    use_log_return: bool = False,
) -> Tuple[pl.LazyFrame, List[str]]:
    """
    t-nとの変化率 (log/ratio) * 100 を計算し、新しい列を追加する。

    例:
      - use_log_return=True  -> (log(current) - log(shift(n))) * 100
      - use_log_return=False -> ((current / shift(n)) - 1) * 100

    Parameters
    ----------
    ldf : pl.LazyFrame
        処理対象のLazyFrame
    cols : List[str]
        計算対象の数値カラム (["feature_00","feature_01"]など)
    sort_keys : List[str]
        時系列順を保証するためにソートするカラムのリスト
        例: ["symbol_id","date_id","time_id"]
    group_keys : List[str]
        shiftを「どのカラム単位でパーティション切る」か
        例: ["symbol_id"]
    n : int, optional
        t-n の期間差 (デフォルト=5)
    use_log_return : bool, optional
        Trueならlogリターンで計算 (デフォルト=False)

    Returns
    -------
    (ldf_out, generated_cols):
      ldf_out       : 新しい列を追加したLazyFrame
      generated_cols: 作成された列の名前一覧
    """

    # 1) ソート (時系列順を正しく)
    if sort_keys:
        ldf = ldf.sort(by=sort_keys)

    # 2) group_keys があれば over() でパーティション分割
    def shift_n_expr(col: str, periods: int):
        if group_keys:
            return pl.col(col).shift(periods).over(group_keys)
        else:
            return pl.col(col).shift(periods)

    # 3) カラムごとに t-n との変化率列を追加
    new_cols = []
    exprs = []
    for col_name in cols:
        # 出力列名
        rate_col_name = f"{col_name}_change_rate_{n}"
        new_cols.append(rate_col_name)

        # (log(current) - log(shift(n))) * 100
        #  or
        # ((current / shift(n)) - 1) * 100
        if use_log_return:
            exprs.append(
                (
                    (pl.col(col_name).log() - shift_n_expr(col_name, n).log()) * 100
                ).alias(rate_col_name)
            )
        else:
            exprs.append(
                (
                    ((pl.col(col_name) / shift_n_expr(col_name, n)) - 1) * 100
                ).alias(rate_col_name)
            )

    ldf_out = ldf.with_columns(exprs)

    return ldf_out, new_cols

Writing time_serise_feature.py


In [7]:
from time_serise_feature import *

def create_prev_features(df: pl.DataFrame, pred_cols: list[str]) -> pl.DataFrame:
    """
    """
    agg_expr = []
    for c in pred_cols:
        agg_expr.append(pl.col(c).first().alias(f"{c}_prev_first"))
        agg_expr.append(pl.col(c).last().alias(f"{c}_prev_last"))
        agg_expr.append(pl.col(c).max().alias(f"{c}_prev_max"))
        agg_expr.append(pl.col(c).min().alias(f"{c}_prev_min"))
        agg_expr.append(pl.col(c).std().alias(f"{c}_prev_std"))

    df = (
        df.group_by(["symbol_id", "date_id"])
        .agg(agg_expr)
        .with_columns((pl.col("date_id") + 1).alias("date_id_next"))
        .drop("date_id")
        .rename({"date_id_next": "date_id"})
    )
    cols = df.collect_schema().keys() - ["symbol_id", "date_id"]
    return df, list(cols)

window_size = 7
def add_feature(
    df: pl.DataFrame, 
    pred_cols, 
    input_cols,
    original_cols
) -> (pl.DataFrame, pl.DataFrame):
    # add_rolling_preds_funcを使用してprev_day_aggs相当を計算
    generated_features = []
    df = df.lazy()
    
    df_prev, generated_cols = create_prev_features(df, pred_cols)
    generated_features.extend(generated_cols)
    df = df.join(df_prev, on=["symbol_id", "date_id"], how="left")
    df, generated_cols = add_rolling_stats(df, pred_cols, n = 2, group_keys=["symbol_id"], sort_keys=["symbol_id", "date_id", "time_id"])
    generated_features.extend(generated_cols)
    df, generated_cols = add_change_rate(df, pred_cols, n = 2, group_keys=["symbol_id"], sort_keys=["symbol_id", "date_id", "time_id"])
    generated_features.extend(generated_cols)
    
    return df, generated_features, []

In [8]:
import os
import glob
import polars as pl
import matplotlib.pyplot as plt
import seaborn as sns
from make_walk_forward_feature import make_feature_walk_forward

data_path = "/kaggle/working/test"
parquet_files = glob.glob(os.path.join(data_path, "partition_id=*", "*.parquet"))

output_base = "test_make_feature_walk_forward"
os.makedirs(output_base, exist_ok=True)

test_train_length = 2
test_retroactive_size = 1

print("parquet_files", parquet_files)

gen_feat = make_feature_walk_forward(
    files=parquet_files,
    output_base=output_base,
    id_col=id_col[0],
    date_id=date_id[0], 
    time_id=time_id[0],
    pred_cols=[c[0] for c in pred_cols],
    input_cols=[c[0] for c in input_cols],
    train_length=test_train_length,
    retroactive_size=test_retroactive_size,
    add_feature_func=add_feature
)
gen_feat

parquet_files ['/kaggle/working/test/partition_id=0/part-0.parquet', '/kaggle/working/test/partition_id=1/part-0.parquet']
original_cols: ['symbol_id', 'date_id', 'time_id', 'responder_0', 'feature_00']
meta: partition_index, min_date, max_date = [(0, 0, 2), (1, 3, 5)]
read /kaggle/working/test/partition_id=0/part-0.parquet
Sprint 1: extended_start=0, train_end=2, rows=100
  -> wrote test_make_feature_walk_forward/sprint1/train.parquet, rows=100
read /kaggle/working/test/partition_id=0/part-0.parquet
read /kaggle/working/test/partition_id=1/part-0.parquet
Sprint 2: extended_start=1, train_end=4, rows=150
  -> wrote test_make_feature_walk_forward/sprint2/train.parquet, rows=100
read /kaggle/working/test/partition_id=1/part-0.parquet
Sprint 3: extended_start=3, train_end=6, rows=150
  -> wrote test_make_feature_walk_forward/sprint3/train.parquet, rows=100
break because start_of_train > max_date_id_global
Done (make_feature_walk_forward).


['responder_0_prev_last',
 'responder_0_prev_first',
 'responder_0_prev_max',
 'responder_0_prev_std',
 'responder_0_prev_min',
 'responder_0_rolling_std_2',
 'responder_0_rolling_mean_2',
 'responder_0_change_rate_2']

In [9]:
df = pl.read_parquet("/kaggle/working/test_make_feature_walk_forward/sprint1/train.parquet")
df.filter(pl.col("symbol_id") == 1)

symbol_id,date_id,time_id,responder_0,feature_00,responder_0_prev_first,responder_0_prev_last,responder_0_prev_max,responder_0_prev_min,responder_0_prev_std,responder_0_rolling_std_2,responder_0_rolling_mean_2,responder_0_change_rate_2
i64,i64,i64,i64,f64,i64,i64,i64,i64,f64,f64,f64,f64
1,0,0,100,0.1,,,,,,,,
1,0,1,101,0.101,,,,,,0.707107,100.5,
1,0,2,102,0.102,,,,,,0.707107,101.5,2.0
1,0,3,103,0.103,,,,,,0.707107,102.5,1.980198
1,0,4,104,0.104,,,,,,0.707107,103.5,1.960784
…,…,…,…,…,…,…,…,…,…,…,…,…
1,1,20,10120,10.12,100,124,124,100,7.359801,0.707107,10119.5,0.019767
1,1,21,10121,10.121,100,124,124,100,7.359801,0.707107,10120.5,0.019765
1,1,22,10122,10.122,100,124,124,100,7.359801,0.707107,10121.5,0.019763
1,1,23,10123,10.123,100,124,124,100,7.359801,0.707107,10122.5,0.019761


# Jane Street Real

In [10]:
%%writefile jane_street_real.py
import polars as pl
from typing import List, Tuple
from time_serise_feature import *

target_9_path = "/kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id=9"
max_time_id = pl.read_parquet(target_9_path).select(pl.col("time_id").max()).item()
segment_size = (max_time_id + 1) // 4  # 4等分のサイズを計算

# 4分割の閾値を計算
threshold1 = segment_size
threshold2 = segment_size * 2
threshold3 = segment_size * 3

print("threshold", threshold1, threshold2, threshold3)

def feature_today_rolling_func(
    ldf: pl.DataFrame,
    pred_cols: List[str],
    input_cols: List[str],
    original_cols: List[str] | None = None
) -> Tuple[pl.DataFrame, List[List[str]], List[str]]:
    """
    featureに関する特徴量を計算
    """
    generated_features: List[str] = []
    generated_preds: List[str] = []
    

    # 当日特徴量
    ## rolling特徴量
    ldf, generated_cols = add_avg_change_and_volatility(ldf, input_cols, n = 4, group_keys=["symbol_id"], sort_keys=["date_id", "time_id"])
    generated_features.extend(generated_cols)

    ldf, generated_cols = add_avg_change_and_volatility(ldf, input_cols, n = 28, group_keys=["symbol_id"], sort_keys=["date_id", "time_id"])
    generated_features.extend(generated_cols)

    return ldf, generated_features, generated_preds

def feature_today_stat_func(
    ldf: pl.DataFrame,
    pred_cols: List[str],
    input_cols: List[str],
    original_cols: List[str] | None = None
) -> Tuple[pl.DataFrame, List[List[str]], List[str]]:
    """
    featureに関する特徴量を計算
    """
    generated_features: List[str] = []
    generated_preds: List[str] = []
    
    # date_id, time_id単位
    df_datetime, datetime_feature_cols = create_stat_features_by(ldf=ldf, cols=input_cols, key_by=["date_id", "time_id"], agg_funcs=["mean", "std", "skew", "kurtosis", "cv", "last"])
    generated_features.extend(datetime_feature_cols)
    
    df_datetime, datetime_feature_cols = add_avg_change_and_volatility(df_datetime, datetime_feature_cols, n = 7, group_keys=["date_id"], sort_keys=["time_id"])
    generated_features.extend(datetime_feature_cols)
    
    return df_datetime, generated_features, generated_preds

def create_raw_responder_lag(
    df: pl.DataFrame,
    pred_cols: List[str]
) -> Tuple[pl.LazyFrame, List[str]]:
    """
    1) 'symbol_id', 'date_id', 'w' と pred_cols の列のみ取り出す
    2) w * responder_6 の列を新たに作る ("w_responder_6")
    3) w を削除
    4) date_id を (date_id - 1) シフト
    5) pred_cols + ["w_responder_6"] を "_lag_1" にリネーム
    戻り値:
      (lazy_frame, generated_cols):
        lazy_frame : 上記処理結果をLazyFrameに変換したもの
        generated_cols : リネーム後の列名(例: "responder_6_lag_1", "w_responder_6_lag_1"など)
    """

    # 1) 必要列のみ抽出 (Eager DataFrame)
    target_cols = ["symbol_id", "date_id", "time_id_group", "weight"] + pred_cols
    df_lags = df.select(target_cols)

    # 2) w * responder を作成
    w_responder_cols = []
    for c in pred_cols:
        w_c = f"w_{c}"
        w_responder_cols.append(w_c)
        df_lags = df_lags.with_columns(
            (pl.col("weight") * pl.col(c)).alias(w_c)
        )
    new_pred_cols = pred_cols + w_responder_cols
    
    # 3) wを削除
    df_lags = df_lags.drop(["weight"])

    # 4) date_id = date_id + 1
    df_lags = df_lags.with_columns(
        (pl.col("date_id") + 1).alias("date_id")
    )

    # 5) 上記 new_pred_cols を "colName_lag_1" にリネーム
    rename_map = {}
    for c in new_pred_cols:
        rename_map[c] = f"{c}_lag_1"

    df_lags = df_lags.rename(rename_map)

    # リネーム後の列名一覧
    generated_cols = list(rename_map.values())
    
    keys = ["symbol_id", "date_id", "time_id_group"]
    return df_lags, generated_cols, keys

def create_responder_lag_feature(ldf, cols):
    generated_features = []
    df_raw_lags, lags_cols, raw_lags_keys = create_raw_responder_lag(ldf, cols)

    # symbol_id, date_id, time_id_group 単位
    df_lags, gen_cols_symdate_group = create_stat_features_by(ldf=df_raw_lags, cols=lags_cols, key_by=["symbol_id", "date_id", "time_id_group"], agg_funcs=["mean", "std", "skew", "kurtosis", "cv", "last"])
    generated_features.extend(gen_cols_symdate_group)

    ## rolling特徴量
    ### 4 time_id_groupまで見る
    df_lags, generated_cols = add_avg_change_and_volatility(df_lags, gen_cols_symdate_group, n = 4, group_keys=["symbol_id"], sort_keys=["date_id", "time_id_group"])
    generated_features.extend(generated_cols)

    ### 4 * 7 time_id_groupまで見る
    df_lags, generated_cols = add_avg_change_and_volatility(df_lags, gen_cols_symdate_group, n = 28, group_keys=["symbol_id"], sort_keys=["date_id", "time_id_group"])
    generated_features.extend(generated_cols)

    # date_id単位でラグ列を集計
    df_lags_dateid, gen_cols_date = create_stat_features_by(ldf=df_raw_lags, cols=lags_cols, key_by=["date_id"], agg_funcs=["mean", "std", "skew", "kurtosis", "cv", "last"])
    generated_features.extend(gen_cols_date)

    df_lags_dateid, generated_cols = add_avg_change_and_volatility(df_lags_dateid, gen_cols_date, n = 7, group_keys=["date_id"], sort_keys=[])
    generated_features.extend(generated_cols)
    
    df_lags = df_lags.join(df_lags_dateid, on=["date_id"], how="left")

    # symbol_id, date_id単位
    df_lags_symdate, gen_cols_symdate = create_stat_features_by(ldf=df_raw_lags, cols=lags_cols,key_by=["symbol_id", "date_id"],agg_funcs=["mean", "std", "skew", "kurtosis", "cv", "last"])
    generated_features.extend(gen_cols_symdate)
    
    df_lags_symdate, generated_cols = add_avg_change_and_volatility(df_lags_symdate, gen_cols_symdate, n = 7, group_keys=["symbol_id"], sort_keys=["date_id"])
    generated_features.extend(generated_cols)

    df_lags = df_lags.join(df_lags_symdate, on=["symbol_id", "date_id"], how="left")

    # date_id, time_group_id単位
    df_lags_datetime, gen_cols_datetime = create_stat_features_by(ldf=df_raw_lags, cols=lags_cols, key_by=["date_id", "time_id_group"],agg_funcs=["mean", "std", "skew", "kurtosis", "cv", "last"])
    generated_features.extend(gen_cols_datetime)
   
    df_lags_datetime, generated_cols = add_avg_change_and_volatility(df_lags_datetime, gen_cols_datetime, n = 7, group_keys=["date_id"], sort_keys=["time_id_group"])
    generated_features.extend(generated_cols)
    
    df_lags = df_lags.join(df_lags_datetime, on=["date_id","time_id_group"], how="left")

    return df_lags, generated_features

def responder_lag_feature_func(
    ldf: pl.DataFrame,
    pred_cols: List[str],
    input_cols: List[str],
    original_cols: List[str] | None = None
) -> Tuple[pl.DataFrame, List[List[str]], List[str]]:
    ## [lags.parquetの扱い]
    ## 当日と前日のtiem_idが異なる可能性なので、単純なdate_id, time_idのlagは有用ではない
    ## また、前日の全てのlagは当日のtime_id == 0のタイミングで提供される
    ## すなわち、lagsが提供された場合で統計量の処理を行い、date_id - 1のresponderの統計量として扱う。
    
    generated_features: List[str] = []
    generated_preds: List[str] = []

    # responder-lag特徴量
    ldf, cols = create_responder_lag_feature(ldf, pred_cols)
    generated_features.extend(cols)

    return ldf, generated_features, generated_preds


def create_raw_feature_lag(
    df: pl.DataFrame,
    pred_cols: List[str]
) -> Tuple[pl.LazyFrame, List[str]]:
    target_cols = ["symbol_id", "date_id", "time_id_group"] + pred_cols
    df_lags = df.select(target_cols)

    # 4) date_id = date_id + 1
    df_lags = df_lags.with_columns(
        (pl.col("date_id") + 1).alias("date_id")
    )
    rename_map = {}
    for c in pred_cols:
        rename_map[c] = f"{c}_lag_1"

    df_lags = df_lags.rename(rename_map)

    # リネーム後の列名一覧
    generated_cols = list(rename_map.values())
    
    keys = ["symbol_id", "date_id", "time_id_group"]
    return df_lags, generated_cols, keys

def create_feature_lag(ldf, cols):
    generated_features = []
    df_raw_lags, lags_cols, raw_lags_keys = create_raw_feature_lag(ldf, cols)

    # symbol_id, date_id, time_id_group 単位
    df_lags, gen_cols_symdate_group = create_stat_features_by(ldf=df_raw_lags, cols=lags_cols, key_by=["symbol_id", "date_id", "time_id_group"], agg_funcs=["mean", "std", "skew", "kurtosis", "cv", "last"])
    generated_features.extend(gen_cols_symdate_group)

    ## rolling特徴量
    ### 4 time_id_groupまで見る
    df_lags, generated_cols = add_avg_change_and_volatility(df_lags, gen_cols_symdate_group, n = 4, group_keys=["symbol_id"], sort_keys=["date_id", "time_id_group"])
    generated_features.extend(generated_cols)

    ### 4 * 7 time_id_groupまで見る
    df_lags, generated_cols = add_avg_change_and_volatility(df_lags, gen_cols_symdate_group, n = 28, group_keys=["symbol_id"], sort_keys=["date_id", "time_id_group"])
    generated_features.extend(generated_cols)

    # date_id単位でラグ列を集計
    df_lags_dateid, gen_cols_date = create_stat_features_by(ldf=df_raw_lags, cols=lags_cols, key_by=["date_id"], agg_funcs=["mean", "std", "skew", "kurtosis", "cv", "last"])
    generated_features.extend(gen_cols_date)

    df_lags_dateid, generated_cols = add_avg_change_and_volatility(df_lags_dateid, gen_cols_date, n = 7, group_keys=["date_id"], sort_keys=[])
    generated_features.extend(generated_cols)
    
    df_lags = df_lags.join(df_lags_dateid, on=["date_id"], how="left")

    # symbol_id, date_id単位
    df_lags_symdate, gen_cols_symdate = create_stat_features_by(ldf=df_raw_lags, cols=lags_cols,key_by=["symbol_id", "date_id"],agg_funcs=["mean", "std", "skew", "kurtosis", "cv", "last"])
    generated_features.extend(gen_cols_symdate)
    
    df_lags_symdate, generated_cols = add_avg_change_and_volatility(df_lags_symdate, gen_cols_symdate, n = 7, group_keys=["symbol_id"], sort_keys=["date_id"])
    generated_features.extend(generated_cols)

    df_lags = df_lags.join(df_lags_symdate, on=["symbol_id", "date_id"], how="left")

    # date_id, time_group_id単位
    df_lags_datetime, gen_cols_datetime = create_stat_features_by(ldf=df_raw_lags, cols=lags_cols, key_by=["date_id", "time_id_group"],agg_funcs=["mean", "std", "skew", "kurtosis", "cv", "last"])
    generated_features.extend(gen_cols_datetime)
   
    df_lags_datetime, generated_cols = add_avg_change_and_volatility(df_lags_datetime, gen_cols_datetime, n = 7, group_keys=["date_id"], sort_keys=["time_id_group"])
    generated_features.extend(generated_cols)
    
    df_lags = df_lags.join(df_lags_datetime, on=["date_id","time_id_group"], how="left")

    return df_lags, generated_features

def feature_lag_func(
    ldf: pl.DataFrame,
    pred_cols: List[str],
    input_cols: List[str],
    original_cols: List[str] | None = None
) -> Tuple[pl.DataFrame, List[List[str]], List[str]]:
    """
    featureに関する特徴量を計算
    """
    generated_features: List[str] = []
    generated_preds: List[str] = []
    
    #前日特徴量(symbol_id, date_id -1, time_group_id)
    ldf, generated_cols = create_feature_lag(ldf, input_cols)
    generated_features.extend(generated_cols)

    return ldf, generated_features, generated_preds

def add_feature_func(
    df: pl.DataFrame,
    pred_cols: List[str],
    input_cols: List[str],
    original_cols: List[str] | None = None
) -> Tuple[pl.DataFrame, List[List[str]], List[str]]:

    generated_features: List[str] = []
    generated_preds: List[str] = []

    ldf = df.lazy()
    ldf = ldf.sort(["symbol_id", "date_id", "time_id"])

    # time_id_group (4分割)
    ldf = ldf.with_columns(
        pl.when(pl.col("time_id") < threshold1).then(0)
          .when(pl.col("time_id") < threshold2).then(1)
          .when(pl.col("time_id") < threshold3).then(2)
          .otherwise(3)
          .cast(pl.Int32)
          .alias("time_id_group")
    )
    generated_features.extend(["time_id_group"])

    # id, date_id, time(feature, preds付き) -> id, date_id, time(feature, preds付き)
    ldf_feature, f, p = feature_today_rolling_func(ldf, pred_cols, input_cols, original_cols)
    generated_features.extend(f)
    generated_preds.extend(p)

    # id, date_id, time(feature, preds付き) -> date_id, time_id
    ldf_feature_today_stat, f, p = feature_today_stat_func(ldf, pred_cols, input_cols, original_cols)
    generated_features.extend(f)
    generated_preds.extend(p)
    ldf_feature = ldf_feature.join(ldf_feature_today_stat, on=["date_id", "time_id"], how="left")

    # id, date_id, time(feature, preds付き) -> ["symbol_id", "date_id", "time_id_group"]
    ldf_responder_lag_feature, f, p = responder_lag_feature_func(ldf, pred_cols, input_cols, original_cols)
    generated_features.extend(f)
    generated_preds.extend(p)
    ldf_feature = ldf_feature.join(ldf_responder_lag_feature, on=["symbol_id", "date_id", "time_id_group"], how="left")
  
    # id, date_id, time(feature, preds付き) -> 
    ldf_feature_lag, f, p = feature_lag_func(ldf, pred_cols, input_cols, original_cols)
    generated_features.extend(f)
    generated_preds.extend(p)
    ldf_feature = ldf_feature.join(ldf_feature_lag, on=["symbol_id", "date_id", "time_id_group"], how="left")

    
    return ldf_feature, generated_features, generated_preds

Writing jane_street_real.py


In [11]:
"""
import os
import glob
import polars as pl
import numpy as np

import sys
sys.path.append('/kaggle/input/lib-time-series-libs/')

from jane_street_real import *

output_base = "datasets"
os.makedirs(output_base, exist_ok=True)

parquet_files = [
    "/kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id=1",
    "/kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id=2"
]

# 入力変数選択（事前定義）
id_col = "symbol_id"    # ID列
date_id = "date_id"     # 日付列
time_id = "time_id"     # 時間列
pred_cols = [f"responder_{i}" for i in range(9)]  # 目的変数

# 最初のファイルからカラム一覧とdtype取得
if not parquet_files:
    raise FileNotFoundError("No parquet files found in the specified data path.")

target_9_path = "/kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id=9"
df_test = pl.read_parquet(target_9_path, n_rows=1)
all_cols = df_test.columns
schema = df_test.schema  # {col_name: polars.DataType}

# 除外する列
exclude_cols = {id_col, date_id, time_id} | set(pred_cols)

# input_colsを自動的に決定
input_cols = []
for col in all_cols:
    if col not in exclude_cols:
        polars_dtype = schema[col]
        print(f"{col}, {polars_dtype}")
        input_cols.append(col)

# target_cols作成
target_cols = [id_col, date_id, time_id] + pred_cols + input_cols
print("target_cols:", target_cols)

train_length = 30
retroactive_size = 30 # rollingやwindowの最大lags数
print("parquet_files", parquet_files)
"""

'\nimport os\nimport glob\nimport polars as pl\nimport numpy as np\n\nimport sys\nsys.path.append(\'/kaggle/input/lib-time-series-libs/\')\n\nfrom jane_street_real import *\n\noutput_base = "datasets"\nos.makedirs(output_base, exist_ok=True)\n\nparquet_files = [\n    "/kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id=1",\n    "/kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id=2"\n]\n\n# 入力変数選択（事前定義）\nid_col = "symbol_id"    # ID列\ndate_id = "date_id"     # 日付列\ntime_id = "time_id"     # 時間列\npred_cols = [f"responder_{i}" for i in range(9)]  # 目的変数\n\n# 最初のファイルからカラム一覧とdtype取得\nif not parquet_files:\n    raise FileNotFoundError("No parquet files found in the specified data path.")\n\ntarget_9_path = "/kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id=9"\ndf_test = pl.read_parquet(target_9_path, n_rows=1)\nall_cols = df_test.columns\nschema = df_test.schema  # {col_name: polars.Da

In [12]:
"""
output_base = "feature_datasets"
os.makedirs(output_base, exist_ok=True)

make_feature_walk_forward(
    files=parquet_files,
    output_base=output_base,
    id_col=id_col,
    date_id=date_id,
    time_id=time_id,
    pred_cols=pred_cols,
    input_cols=input_cols,
    train_length=train_length,
    retroactive_size=retroactive_size,
    add_feature_func=add_feature_func
)
"""

'\noutput_base = "feature_datasets"\nos.makedirs(output_base, exist_ok=True)\n\nmake_feature_walk_forward(\n    files=parquet_files,\n    output_base=output_base,\n    id_col=id_col,\n    date_id=date_id,\n    time_id=time_id,\n    pred_cols=pred_cols,\n    input_cols=input_cols,\n    train_length=train_length,\n    retroactive_size=retroactive_size,\n    add_feature_func=add_feature_func\n)\n'