In [None]:
import warnings

warnings.filterwarnings(
    "ignore",
    message="X does not have valid feature names, but LGBMRegressor was fitted with feature names",
    category=UserWarning,
    module="sklearn.utils.validation",
)

warnings.filterwarnings(
    "ignore",
    message=".*elapsed before server startup.*",
    category=RuntimeWarning,
    module="kaggle_evaluation.core.templates",
)

from __future__ import annotations

from pathlib import Path
import hashlib
import importlib
import subprocess
import sys

INPUT_ROOT = Path("/kaggle/input")
if not INPUT_ROOT.exists():
    raise FileNotFoundError("/kaggle/input is not mounted.")

ARTIFACT_FILENAMES = (
    "inference_bundle.pkl",
    "model_meta.json",
    "feature_list.json",
)


def _discover_artifact_root() -> Path:
    candidates: list[Path] = []
    for child in sorted(INPUT_ROOT.iterdir()):
        if not child.is_dir():
            continue
        if all((child / name).exists() for name in ARTIFACT_FILENAMES):
            candidates.append(child)
    if not candidates:
        raise FileNotFoundError(
            "No input dataset contains the required two-head artifacts "
            "(inference_bundle.pkl, model_meta.json, feature_list.json)."
        )
    if len(candidates) > 1:
        print("[warn] multiple artifact datasets detected; using", candidates[0])
    return candidates[0]


ARTIFACT_ROOT = _discover_artifact_root()
print("artifact root:", ARTIFACT_ROOT)


def _find_wheel(prefix: str) -> Path | None:
    pattern = f"{prefix}-*.whl"
    matches = sorted(ARTIFACT_ROOT.glob(pattern))
    if matches:
        return matches[0]
    matches = sorted(INPUT_ROOT.glob(f"**/{pattern}"))
    if matches:
        return matches[0]
    return None


SKLEARN_WHEEL = _find_wheel("scikit_learn")
if SKLEARN_WHEEL is None:
    raise FileNotFoundError("Required scikit-learn wheel not found alongside SU1 artifacts.")

py_tag = f"cp{sys.version_info.major}{sys.version_info.minor}"
if py_tag not in SKLEARN_WHEEL.name:
    raise RuntimeError(f"Wheel tag mismatch: expected runtime tag '{py_tag}' in {SKLEARN_WHEEL.name}")

subprocess.run(
    [
        sys.executable,
        "-m",
        "pip",
        "uninstall",
        "-y",
        "scikit-learn",
        "sklearn-compat",
        "category-encoders",
    ],
    check=False,
    stdout=subprocess.DEVNULL,
    stderr=subprocess.DEVNULL,
)

try:
    import numpy as _np
except ImportError as exc:
    raise ImportError("numpy must be preinstalled in the Kaggle image.") from exc
else:
    print("numpy:", _np.__version__)

    def _ensure_numpy_bitgenerator_aliases() -> None:
        try:
            import numpy.random._pickle as _np_random_pickle  # type: ignore[attr-defined]
        except Exception:
            return

        generator_cls = getattr(_np.random, "MT19937", None)
        if generator_cls is None:
            return

        aliases = (
            ("<class 'numpy.random._mt19937.MT19937'>", generator_cls),
            ("numpy.random._mt19937.MT19937", generator_cls),
        )
        bit_generators = getattr(_np_random_pickle, "BitGenerators", {})
        for key, cls in aliases:
            if key not in bit_generators:
                bit_generators[key] = cls

    _ensure_numpy_bitgenerator_aliases()

subprocess.check_call(
    [
        sys.executable,
        "-m",
        "pip",
        "install",
        "--no-index",
        "--no-deps",
        "--force-reinstall",
        "--upgrade",
        str(SKLEARN_WHEEL),
    ]
)

for key in list(sys.modules):
    if key == "sklearn" or key.startswith("sklearn."):
        del sys.modules[key]
importlib.invalidate_caches()

import sklearn
print("scikit-learn:", sklearn.__version__, "@", sklearn.__file__)
if not sklearn.__version__.startswith("1.7."):
    raise RuntimeError("scikit-learn version mismatch: expected 1.7.x")

STATSMODELS_WHEEL = _find_wheel("statsmodels")
if STATSMODELS_WHEEL is not None:
    try:
        subprocess.check_call(
            [
                sys.executable,
                "-m",
                "pip",
                "install",
                "--no-index",
                "--no-deps",
                "--force-reinstall",
                "--upgrade",
                str(STATSMODELS_WHEEL),
            ]
        )
    except subprocess.CalledProcessError as exc:
        print("[warn] failed to install statsmodels wheel:", STATSMODELS_WHEEL.name, exc)
    else:
        for key in list(sys.modules):
            if key == "statsmodels" or key.startswith("statsmodels."):
                del sys.modules[key]
        importlib.invalidate_caches()
        try:
            import statsmodels as _sm
        except ImportError:
            print("[warn] statsmodels wheel installed but module not importable")
        else:
            print("statsmodels:", _sm.__version__)

for pkg_name in ("joblib", "pandas", "polars", "pyarrow"):
    module = __import__(pkg_name)
    version = getattr(module, "__version__", "unknown")
    print(f"{pkg_name}: {version}")

import json
import os
import types
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Mapping, Sequence, Tuple, cast

import joblib
import numpy as np
import pandas as pd
import polars as pl
import kaggle_evaluation.default_inference_server as kies

# === Module: preprocess.M_group.m_group ===
from collections import deque
from copy import deepcopy
from typing import Any, Deque, Dict, Hashable, Iterable, List, Mapping, Sequence, Tuple, cast
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.decomposition import PCA
from sklearn.experimental import enable_iterative_imputer  # noqa: F401
from sklearn.impute import IterativeImputer, KNNImputer
from sklearn.linear_model import Ridge
from sklearn.ensemble import RandomForestRegressor

try:  # optional dependency
    from statsmodels.tsa.arima.model import ARIMA
    from statsmodels.tsa.statespace.structural import UnobservedComponents

except ImportError:  # pragma: no cover - statsmodels optional at runtime
    ARIMA = None  # type: ignore[assignment]
    UnobservedComponents = None  # type: ignore[assignment]

class MGroupImputer(TransformerMixin, BaseEstimator):
    """時間情報を考慮して M 系特徴量の欠損を補完する推定器。
    Parameters
    ----------
    columns:
        対象とする列。指定しない場合は ``fit`` 時に ``"M"`` で始まる列を自動選択。
    policy:
        採用する補完ポリシー。 ``SUPPORTED_POLICIES`` を参照。
    rolling_window:
        ローリング系ポリシーで用いる窓幅。
    ema_alpha:
        指数移動平均ポリシーで用いる平滑化係数。
    calendar_column:
        曜日や月次など季節性ポリシーが参照する日時列名。指定しない場合はポリシー内で自動探索。
    policy_params:
        ポリシー固有のハイパーパラメータを指定する辞書。キーは文字列、値は数値または文字列。
    random_state:
        多変量モデルを利用するポリシーの乱数シード。
    Notes
    -----
        - **重要:** ``ffill_bfill`` は学習時のみ後方補完を併用し、 ``transform`` では前方方向の値だけで補完します。
            エイリアス ``ffill_train_bfill_in_fit`` でも指定可能で、推論時の未来参照を防ぎつつ学習時には末尾値をウォームスタートできます。
        - ``kalman_*`` と ``arima_auto`` ポリシーは statsmodels の ``fittedvalues``（フィルタによる一歩先推定）だけを使用し、将来の平滑値を参照しません。
    """
    _BASE_POLICIES: Tuple[str, ...] = (
        "ffill_bfill",
        "ffill_only",
        "rolling_median_k",
        "rolling_mean_k",
        "ema_alpha",
        "linear_interp",
        "spline_interp_deg",
        "time_interp",
        "backfill_robust",
        "winsorized_median_k",
        "quantile_fill",
        "dow_median",
        "dom_median",
        "month_median",
        "holiday_bridge",
        "knn_k",
        "pca_reconstruct_r",
        "mice",
        "missforest",
        "ridge_stack",
        "kalman_local_level",
        "arima_auto",
        "state_space_custom",
        "mask_plus_mean",
        "two_stage",
    )
    POLICY_ALIASES: Dict[str, str] = {
        "ffill_train_bfill_in_fit": "ffill_bfill",
    }
    SUPPORTED_POLICIES: Tuple[str, ...] = _BASE_POLICIES + tuple(POLICY_ALIASES.keys())

    def __init__(
        self,
        columns: Iterable[Hashable] | None = None,
        policy: str = "ffill_bfill",
        rolling_window: int = 5,
        ema_alpha: float = 0.3,
        calendar_column: str | None = None,
        policy_params: Mapping[str, Any] | None = None,
        random_state: int = 42,
    ) -> None:
        self.columns = columns
        resolved_policy = self.POLICY_ALIASES.get(policy, policy)
        self.policy_requested = policy
        self.policy = resolved_policy
        self.rolling_window = int(rolling_window)
        self.ema_alpha = float(ema_alpha)
        self.calendar_column = calendar_column
        self.policy_params = policy_params
        self._policy_params: Dict[str, Any] = dict(policy_params) if policy_params is not None else {}
        self.random_state = int(random_state)
    # ------------------------------------------------------------------

    def fit(self, X: pd.DataFrame, y=None):  # type: ignore[override]
        X_df = self._ensure_dataframe(X).copy()
        if self.columns is None:
            cols: List[str] = [c for c in X_df.columns if isinstance(c, str) and c.startswith("M")]
        else:
            cols = [c for c in self.columns if isinstance(c, str) and c in X_df.columns]
        self.columns_ = cols
        self.extra_columns_: List[str] = []
        if not self.columns_:
            self._train_index = X_df.index.copy()
            self._train_filled_ = pd.DataFrame(index=X_df.index.copy())
            self._state_ = {}
            self._calendar_fit_values_ = None
            self._output_columns_ = []
            self._medians_dict_ = {}
            return self
        if self.policy not in self.SUPPORTED_POLICIES:
            raise ValueError(f"Unsupported policy '{self.policy}'. Supported: {list(self.SUPPORTED_POLICIES)}")
        if self.rolling_window <= 0:
            raise ValueError("rolling_window must be positive")
        if not (0.0 < self.ema_alpha <= 1.0):
            raise ValueError("ema_alpha must be in (0, 1]")
        calendar_series = self._extract_calendar_series(X_df)
        self._calendar_column_name_ = calendar_series.name if calendar_series is not None else None
        self._calendar_fit_values_ = calendar_series.copy() if calendar_series is not None else None
        data = cast(pd.DataFrame, X_df.loc[:, self.columns_].copy())
        medians_series = data.median(numeric_only=True)
        if not isinstance(medians_series, pd.Series):  # pragma: no cover - defensive
            raise TypeError("Expected pandas Series from DataFrame.median().")
        medians_series = medians_series.reindex(self.columns_)
        medians_series = medians_series.fillna(0.0)
        medians_dict: Dict[Hashable, float] = {}
        for idx_label in medians_series.index:
            idx_position = cast(int, medians_series.index.get_loc(idx_label))
            medians_dict[idx_label] = float(cast(float, medians_series.iloc[idx_position]))
        self._medians_dict_ = medians_dict
        if self.policy == "ffill_bfill":
            filled, state = self._fit_ffill(data, medians_dict, use_bfill=True)
        elif self.policy == "ffill_only":
            filled, state = self._fit_ffill(data, medians_dict, use_bfill=False)
        elif self.policy in {"rolling_median_k", "rolling_mean_k"}:
            filled, state = self._fit_rolling(
                data,
                use_median=self.policy == "rolling_median_k",
                medians_lookup=medians_dict,
            )
            state["medians"] = medians_dict
        elif self.policy == "ema_alpha":
            filled, state = self._fit_ema(data, medians_lookup=medians_dict)
        elif self.policy == "linear_interp":
            filled, state = self._fit_linear_interp(data, medians_dict)
        elif self.policy == "spline_interp_deg":
            filled, state = self._fit_spline_interp(data, medians_dict)
        elif self.policy == "time_interp":
            filled, state = self._fit_time_interp(data, medians_dict, calendar_series)
        elif self.policy == "backfill_robust":
            filled, state = self._fit_backfill_robust(data, medians_dict)
        elif self.policy == "winsorized_median_k":
            filled, state = self._fit_winsorized_median(data, medians_dict)
        elif self.policy == "quantile_fill":
            filled, state = self._fit_quantile_fill(data, medians_dict)
        elif self.policy in {"dow_median", "dom_median", "month_median"}:
            filled, state = self._fit_seasonal_median(data, medians_dict, calendar_series)
        elif self.policy == "holiday_bridge":
            filled, state = self._fit_holiday_bridge(data, medians_dict, calendar_series)
        elif self.policy == "knn_k":
            filled, state = self._fit_knn(data)
        elif self.policy == "pca_reconstruct_r":
            filled, state = self._fit_pca_reconstruct(data, medians_dict)
        elif self.policy == "mice":
            filled, state = self._fit_mice(data)
        elif self.policy == "missforest":
            filled, state = self._fit_missforest(data)
        elif self.policy == "ridge_stack":
            filled, state = self._fit_ridge_stack(data, medians_dict)
        elif self.policy == "kalman_local_level":
            filled, state = self._fit_kalman(data, medians_dict, level_only=True)
        elif self.policy == "arima_auto":
            filled, state = self._fit_arima_auto(data, medians_dict)
        elif self.policy == "state_space_custom":
            filled, state = self._fit_kalman(data, medians_dict, level_only=False)
        elif self.policy == "mask_plus_mean":
            filled, state = self._fit_mask_plus_mean(data, medians_dict)
        elif self.policy == "two_stage":
            filled, state = self._fit_two_stage(data, medians_dict)
        else:  # pragma: no cover - safeguarded above
            raise ValueError(self.policy)
        state = self._prepare_state(state)
        self.extra_columns_ = [c for c in filled.columns if c not in self.columns_]
        self._train_index = X_df.index.copy()
        self._train_filled_ = filled
        self._state_ = state
        self._output_columns_ = list(filled.columns)
        return self
    # ------------------------------------------------------------------

    def transform(self, X: pd.DataFrame):  # type: ignore[override]
        self._validate_fitted()
        X_df = self._ensure_dataframe(X)
        if not self.columns_:
            return X_df
        df = X_df.copy()
        if df.index.equals(self._train_index):
            for col in self._output_columns_:
                if col in df.columns:
                    df.loc[:, col] = self._train_filled_.loc[:, col].values
                else:
                    df[col] = self._train_filled_.loc[:, col].values
            return df
        subset = cast(pd.DataFrame, df.loc[:, self.columns_])
        expect_calendar = bool(getattr(self, "_calendar_column_name_", None))
        calendar_series = self._extract_calendar_series(df, expect_existing=expect_calendar)
        if self.policy == "ffill_bfill":
            filled = self._transform_ffill(subset, use_bfill=True)
        elif self.policy == "ffill_only":
            filled = self._transform_ffill(subset, use_bfill=False)
        elif self.policy in {"rolling_median_k", "rolling_mean_k"}:
            filled = self._transform_rolling(subset, use_median=self.policy == "rolling_median_k")
        elif self.policy == "ema_alpha":
            filled = self._transform_ema(subset)
        elif self.policy == "linear_interp":
            filled = self._transform_linear_interp(subset)
        elif self.policy == "spline_interp_deg":
            filled = self._transform_spline_interp(subset)
        elif self.policy == "time_interp":
            filled = self._transform_time_interp(subset, calendar_series)
        elif self.policy == "backfill_robust":
            filled = self._transform_backfill_robust(subset)
        elif self.policy == "winsorized_median_k":
            filled = self._transform_winsorized_median(subset)
        elif self.policy == "quantile_fill":
            filled = self._transform_quantile_fill(subset)
        elif self.policy in {"dow_median", "dom_median", "month_median"}:
            filled = self._transform_seasonal_median(subset, calendar_series)
        elif self.policy == "holiday_bridge":
            filled = self._transform_holiday_bridge(subset, calendar_series)
        elif self.policy == "knn_k":
            filled = self._transform_knn(subset)
        elif self.policy == "pca_reconstruct_r":
            filled = self._transform_pca_reconstruct(subset)
        elif self.policy == "mice":
            filled = self._transform_mice(subset)
        elif self.policy == "missforest":
            filled = self._transform_missforest(subset)
        elif self.policy == "ridge_stack":
            filled = self._transform_ridge_stack(subset)
        elif self.policy == "kalman_local_level":
            filled = self._transform_kalman(subset, calendar_series)
        elif self.policy == "arima_auto":
            filled = self._transform_arima_auto(subset)
        elif self.policy == "state_space_custom":
            filled = self._transform_kalman(subset, calendar_series)
        elif self.policy == "mask_plus_mean":
            filled = self._transform_mask_plus_mean(subset)
        elif self.policy == "two_stage":
            filled = self._transform_two_stage(subset)
        else:  # pragma: no cover - safeguarded in fit
            raise ValueError(self.policy)
        for col in self.columns_:
            df.loc[:, col] = filled.loc[:, col].values
        for col in self.extra_columns_:
            df.loc[:, col] = filled.loc[:, col].values
        return df
    # ------------------------------------------------------------------

    def get_feature_names_out(self, input_features=None):
        if input_features is not None:
            return np.array(input_features)
        if hasattr(self, "_output_columns_") and self._output_columns_:
            return np.array(self._output_columns_)
        return np.array(self.columns_)
    # ------------------------------------------------------------------

    def _ensure_dataframe(self, X):
        if isinstance(X, pd.DataFrame):
            return X
        raise TypeError("MGroupImputer expects a pandas DataFrame as input.")

    @staticmethod

    def _ensure_warning_list(state: Dict[str, Any]) -> List[str]:
        warnings = state.get("warnings")
        if isinstance(warnings, list):
            return warnings
        warnings_list: List[str] = []
        state["warnings"] = warnings_list
        return warnings_list

    def _prepare_state(self, state: Dict[str, Any]) -> Dict[str, Any]:
        self._ensure_warning_list(state)
        return state

    def _record_warning(self, message: str) -> None:
        if not hasattr(self, "_state_"):
            return
        warnings = self._state_.setdefault("warnings", [])
        if isinstance(warnings, list):
            warnings.append(message)
        else:  # pragma: no cover - defensive
            self._state_["warnings"] = [message]

    @staticmethod

    def _effective_history(frame: pd.DataFrame, requested_len: int) -> tuple[int, pd.DataFrame]:
        if requested_len <= 0 or frame.empty:
            return 0, frame.iloc[0:0].copy()
        effective = min(requested_len, len(frame))
        return effective, frame.tail(effective).copy()

    def _empty_tail_frame(self) -> pd.DataFrame:
        if not hasattr(self, "columns_") or not self.columns_:
            return pd.DataFrame()
        return pd.DataFrame({col: pd.Series(dtype=float) for col in self.columns_})

    def _state_tail_frame(self) -> pd.DataFrame:
        stored = self._state_.get("tail") if hasattr(self, "_state_") else None
        if isinstance(stored, pd.DataFrame):
            return stored
        return self._empty_tail_frame()

    @staticmethod

    def _deque_median(values: Sequence[float]) -> float:
        seq = list(values)
        if not seq:
            return float("nan")
        arr = np.asarray(seq, dtype=float)
        return float(np.median(arr))

    def _validate_fitted(self):
        if not hasattr(self, "_state_"):
            raise AttributeError("MGroupImputer is not fitted.")
    # ローリング補完用の内部処理 -------------------------------------------

    def _fit_rolling(self, data: pd.DataFrame, *, use_median: bool, medians_lookup: dict[Hashable, float]):
        filled = data.copy()
        deques: dict[str, Deque[float]] = {col: deque(maxlen=self.rolling_window) for col in self.columns_}
        for index_label in filled.index:
            for col in self.columns_:
                original_val = data.at[index_label, col]
                if pd.isna(original_val):
                    dq = deques[col]
                    if dq:
                        if use_median:
                            fill_value = self._deque_median(dq)
                        else:
                            fill_value = float(np.mean(dq))
                            if np.isfinite(fill_value):
                                dq.append(fill_value)
                    else:
                        lookup = medians_lookup.get(col)
                        fill_value = float(lookup) if lookup is not None else float("nan")
                        if not use_median and np.isfinite(fill_value):
                            dq.append(fill_value)
                    filled.at[index_label, col] = fill_value
                else:
                    fill_value = float(cast(float, original_val))
                    filled.at[index_label, col] = fill_value
                    deques[col].append(fill_value)
        state = {
            "deques": {col: deque(deques[col], maxlen=self.rolling_window) for col in self.columns_},
            "use_median": use_median,
        }
        return filled, state

    def _transform_rolling(self, data: pd.DataFrame, *, use_median: bool):
        filled = data.copy()
        state = cast(dict[str, Deque[float]], deepcopy(self._state_["deques"]))
        medians_lookup = cast(dict[Hashable, float], self._state_["medians"])
        for index_label in filled.index:
            for col in self.columns_:
                original_val = data.at[index_label, col]
                dq = state[col]
                if pd.isna(original_val):
                    if dq:
                        fill_value = self._deque_median(dq) if use_median else float(np.mean(dq))
                    else:
                        lookup = medians_lookup.get(col)
                        fill_value = float(lookup) if lookup is not None else float("nan")
                    filled.at[index_label, col] = fill_value
                    if not use_median:
                        dq.append(fill_value)
                else:
                    fill_value = float(cast(float, original_val))
                    filled.at[index_label, col] = fill_value
                    dq.append(fill_value)
        return filled
    # EMA 補完用の内部処理 ------------------------------------------------

    def _fit_ema(self, data: pd.DataFrame, *, medians_lookup: dict[Hashable, float]):
        filled = data.copy()
        ema_state: dict[str, float | None] = {col: None for col in self.columns_}
        for index_label in filled.index:
            for col in self.columns_:
                val = filled.at[index_label, col]
                ema_val = ema_state[col]
                if pd.isna(val):
                    if ema_val is not None:
                        fill_value = ema_val
                    else:
                        lookup = medians_lookup.get(col)
                        fill_value = float(lookup) if lookup is not None else float("nan")
                    filled.at[index_label, col] = fill_value
                    ema_val = float(fill_value)
                else:
                    val_float = float(cast(float, val))
                    ema_val = val_float if ema_val is None else self.ema_alpha * val_float + (1 - self.ema_alpha) * ema_val
                    filled.at[index_label, col] = val_float
                ema_state[col] = ema_val
        state = {
            "ema": ema_state,
            "medians": medians_lookup.copy(),
        }
        return filled, state

    def _transform_ema(self, data: pd.DataFrame):
        filled = data.copy()
        ema_state = cast(dict[str, float | None], deepcopy(self._state_["ema"]))
        medians_lookup = cast(dict[Hashable, float], self._state_["medians"])
        for index_label in filled.index:
            for col in self.columns_:
                val = filled.at[index_label, col]
                ema_val = ema_state[col]
                if pd.isna(val):
                    if ema_val is not None:
                        fill_value = ema_val
                    else:
                        lookup = medians_lookup.get(col)
                        fill_value = float(lookup) if lookup is not None else float("nan")
                    filled.at[index_label, col] = fill_value
                    ema_val = float(fill_value)
                else:
                    val_float = float(cast(float, val))
                    ema_val = val_float if ema_val is None else self.ema_alpha * val_float + (1 - self.ema_alpha) * ema_val
                    filled.at[index_label, col] = val_float
                ema_state[col] = ema_val
        return filled
    # 共通ユーティリティ --------------------------------------------------

    def _extract_calendar_series(self, X_df: pd.DataFrame, expect_existing: bool = False) -> pd.Series | None:
        """ポリシーが必要とする場合に日時列を抽出する。"""
        col_name = getattr(self, "_calendar_column_name_", None)
        if col_name is not None and col_name in X_df.columns:
            return cast(pd.Series, X_df[col_name])
        if col_name is not None and col_name not in X_df.columns:
            if expect_existing:
                raise KeyError(
                    f"Calendar column '{col_name}' required by policy '{self.policy}' is missing in transform input."
                )
            return None
        if self.calendar_column and self.calendar_column in X_df.columns:
            self._calendar_column_name_ = self.calendar_column
            return cast(pd.Series, X_df[self.calendar_column])
        if self.calendar_column and self.calendar_column not in X_df.columns:
            if expect_existing:
                raise KeyError(
                    f"Calendar column '{self.calendar_column}' not found in input DataFrame for policy '{self.policy}'."
                )
            return None
        for candidate in ("date", "date_id", "timestamp", "datetime", "evaluation_date"):
            if candidate in X_df.columns:
                self._calendar_column_name_ = candidate
                return cast(pd.Series, X_df[candidate])
        if expect_existing:
            raise KeyError(
                "Calendar column is required for the selected policy. Provide calendar_column explicitly or include a compatible column in the input."
            )
        return None

    def _get_policy_param(self, key: str, default: Any) -> Any:
        return self._policy_params.get(key, default)

    def _history_length(self) -> int:
        raw = int(self._get_policy_param("history_length", max(self.rolling_window, 2)))
        return max(2, raw)

    def _compute_scaler_stats(self, data: pd.DataFrame) -> tuple[Dict[str, float], Dict[str, float]]:
        means: Dict[str, float] = {}
        stds: Dict[str, float] = {}
        for col in self.columns_:
            series = data[col].astype(float)
            values = series.to_numpy()
            with np.errstate(all="ignore"):
                mean = float(np.nanmean(values))
                std = float(np.nanstd(values))
            if not np.isfinite(mean):
                mean = 0.0
            if not np.isfinite(std) or std < 1e-12:
                std = 1.0
            means[col] = mean
            stds[col] = std
        return means, stds

    def _standardize_with_stats(
        self,
        data: pd.DataFrame,
        means: Mapping[str, float],
        stds: Mapping[str, float],
    ) -> pd.DataFrame:
        scaled = data.astype(float).copy()
        for col in self.columns_:
            mean = float(means.get(col, 0.0))
            std = float(stds.get(col, 1.0))
            if not np.isfinite(std) or std < 1e-12:
                std = 1.0
            scaled[col] = (scaled[col] - mean) / std
        return scaled

    def _destandardize_with_stats(
        self,
        data: pd.DataFrame,
        means: Mapping[str, float],
        stds: Mapping[str, float],
    ) -> pd.DataFrame:
        restored = data.astype(float).copy()
        for col in self.columns_:
            mean = float(means.get(col, 0.0))
            std = float(stds.get(col, 1.0))
            restored[col] = restored[col] * std + mean
        return restored

    @staticmethod

    def _series_any(series: pd.Series) -> bool:
        """Return whether a boolean Series contains any True values."""
        return bool(np.any(series.to_numpy(dtype=bool, copy=False)))

    def _calendar_to_datetime(self, series: pd.Series | None) -> pd.Series | None:
        if series is None:
            return None
        if pd.api.types.is_datetime64_any_dtype(series):
            return series
        origin = self._get_policy_param("calendar_origin", None)
        unit = self._get_policy_param("calendar_unit", "D")
        try:
            if origin is not None:
                return pd.to_datetime(series, unit=unit, origin=origin)
            return pd.to_datetime(series)
        except Exception as exc:  # pragma: no cover - defensive
            raise ValueError("Failed to convert calendar column to datetime. Consider specifying calendar_origin and calendar_unit.") from exc
    # フィル前後処理 -------------------------------------------------------

    def _fit_ffill(self, data: pd.DataFrame, medians_lookup: Dict[Hashable, float], *, use_bfill: bool):
        filled = data.ffill()
        if use_bfill:
            filled = filled.bfill()
        filled = filled.fillna(pd.Series(medians_lookup))
        last_values: Dict[str, float] = {}
        for col in self.columns_:
            series = filled[col].dropna()
            if not series.empty:
                last_values[col] = float(series.iloc[-1])
        state = {
            "last": last_values,
            "medians": medians_lookup,
            "use_bfill": use_bfill,
        }
        return filled, state

    def _transform_ffill(self, data: pd.DataFrame, *, use_bfill: bool):
        """Apply forward-only fills during transform (``use_bfill`` kept for compatibility)."""
        filled = data.ffill()
        last_values = cast(Dict[str, float], self._state_.get("last", {}))
        if last_values:
            filled = filled.fillna(last_values)
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        filled = filled.fillna(pd.Series(medians_lookup))
        return filled
    # 線形/スプライン/時間補間 --------------------------------------------

    def _fit_linear_interp(self, data: pd.DataFrame, medians_lookup: Dict[Hashable, float]):
        requested_history = self._history_length()
        filled = data.interpolate(method="linear", limit_direction="forward")
        filled = filled.ffill()
        filled = filled.fillna(pd.Series(medians_lookup))
        history_len, tail = self._effective_history(filled, requested_history)
        state = {
            "history_len": history_len,
            "tail": tail,
            "medians": medians_lookup,
        }
        return filled, state

    def _transform_linear_interp(self, data: pd.DataFrame):
        history_len = cast(int, self._state_.get("history_len", self._history_length()))
        tail = self._state_tail_frame()
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        if len(tail) != history_len:
            raise RuntimeError(f"history_len mismatch in linear_interp: tail={len(tail)} expected={history_len}")
        combined = pd.concat([tail, data], axis=0)
        combined = combined.interpolate(method="linear", limit_direction="forward")
        combined = combined.ffill()
        combined = combined.fillna(pd.Series(medians_lookup))
        result = combined.iloc[history_len:].copy()
        result.index = data.index
        new_tail = combined.tail(history_len).copy()
        self._state_["tail"] = new_tail
        self._state_["history_len"] = len(new_tail)
        return result

    def _fit_spline_interp(self, data: pd.DataFrame, medians_lookup: Dict[Hashable, float]):
        order = int(self._get_policy_param("spline_degree", 3))
        requested_history = self._history_length()
        try:
            filled = data.interpolate(method="spline", order=order, limit_direction="forward")
        except ValueError as exc:  # pragma: no cover - SciPy 未導入など
            raise RuntimeError("spline_interp_deg policy requires scipy to be installed.") from exc
        filled = filled.ffill()
        filled = filled.fillna(pd.Series(medians_lookup))
        history_len, tail = self._effective_history(filled, requested_history)
        state = {
            "history_len": history_len,
            "tail": tail,
            "medians": medians_lookup,
            "order": order,
        }
        return filled, state

    def _transform_spline_interp(self, data: pd.DataFrame):
        history_len = cast(int, self._state_.get("history_len", self._history_length()))
        tail = self._state_tail_frame()
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        order = int(self._state_.get("order", 3))
        if len(tail) != history_len:
            raise RuntimeError(f"history_len mismatch in spline_interp: tail={len(tail)} expected={history_len}")
        combined = pd.concat([tail, data], axis=0)
        try:
            combined = combined.interpolate(method="spline", order=order, limit_direction="forward")
        except ValueError as exc:  # pragma: no cover
            raise RuntimeError("spline_interp_deg policy requires scipy during transform as well.") from exc
        combined = combined.ffill()
        combined = combined.fillna(pd.Series(medians_lookup))
        result = combined.iloc[history_len:].copy()
        result.index = data.index
        new_tail = combined.tail(history_len).copy()
        self._state_["tail"] = new_tail
        self._state_["history_len"] = len(new_tail)
        return result

    def _fit_time_interp(
        self,
        data: pd.DataFrame,
        medians_lookup: Dict[Hashable, float],
        calendar_series: pd.Series | None,
    ):
        calendar_dt = self._calendar_to_datetime(calendar_series)
        if calendar_dt is None:
            raise ValueError("time_interp policy requires a datetime-like calendar column.")
        requested_history = self._history_length()
        working = data.copy()
        working.index = pd.DatetimeIndex(calendar_dt)
        filled = working.interpolate(method="time", limit_direction="forward")
        filled = filled.ffill()
        filled = filled.fillna(pd.Series(medians_lookup))
        history_len, tail = self._effective_history(filled, requested_history)
        tail_calendar = calendar_dt.tail(history_len).copy() if history_len else calendar_dt.iloc[0:0].copy()
        filled_reset = filled.copy()
        filled_reset.index = data.index
        state = {
            "history_len": history_len,
            "tail": tail,
            "tail_calendar": tail_calendar,
            "medians": medians_lookup,
        }
        return filled_reset, state

    def _transform_time_interp(self, data: pd.DataFrame, calendar_series: pd.Series | None):
        calendar_dt = self._calendar_to_datetime(calendar_series)
        if calendar_dt is None:
            raise ValueError("time_interp policy requires a datetime-like calendar column during transform.")
        history_len = cast(int, self._state_.get("history_len", self._history_length()))
        tail = self._state_tail_frame()
        tail_calendar = cast(pd.Series, self._state_.get("tail_calendar", pd.Series(dtype="datetime64[ns]")))
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        if len(tail) != history_len:
            raise RuntimeError(f"history_len mismatch in time_interp: tail={len(tail)} expected={history_len}")
        if len(tail_calendar) != history_len:
            raise RuntimeError(
                f"history_len mismatch in time_interp calendar: tail_calendar={len(tail_calendar)} expected={history_len}"
            )
        combined = pd.concat([tail, data], axis=0)
        combined_calendar = pd.concat([tail_calendar, calendar_dt], axis=0)
        combined.index = pd.DatetimeIndex(pd.to_datetime(combined_calendar))
        combined = combined.interpolate(method="time", limit_direction="forward")
        combined = combined.ffill()
        combined = combined.fillna(pd.Series(medians_lookup))
        result = combined.iloc[history_len:].copy()
        result.index = data.index
        new_tail = combined.tail(history_len).copy()
        new_tail_calendar = (
            combined_calendar.tail(history_len).copy() if history_len else combined_calendar.iloc[0:0].copy()
        )
        self._state_["tail"] = new_tail
        self._state_["tail_calendar"] = new_tail_calendar
        self._state_["history_len"] = len(new_tail)
        return result
    # ロバスト/分位系 ----------------------------------------------------

    def _fit_backfill_robust(self, data: pd.DataFrame, medians_lookup: Dict[Hashable, float]):
        deques: Dict[str, Deque[float]] = {col: deque(maxlen=self.rolling_window) for col in self.columns_}
        last_valid: Dict[str, float | None] = {col: None for col in self.columns_}
        fallback = data.copy()
        for index_label in data.index:
            for col in self.columns_:
                original = data.at[index_label, col]
                if pd.isna(original):
                    dq = deques[col]
                    if dq:
                        fallback_val = self._deque_median(dq)
                    else:
                        fallback_val = float(medians_lookup.get(col, np.nan))
                    fallback.at[index_label, col] = fallback_val
                else:
                    valf = float(cast(float, original))
                    fallback.at[index_label, col] = valf
                    deques[col].append(valf)
                    last_valid[col] = valf
        filled = fallback.copy()
        requested_history = self._history_length()
        history_len, tail = self._effective_history(filled, requested_history)
        state = {
            "medians": medians_lookup,
            "deques": {col: deque(deques[col], maxlen=self.rolling_window) for col in self.columns_},
            "last_valid": last_valid,
            "history_len": history_len,
            "tail": tail,
        }
        return filled, state

    def _transform_backfill_robust(self, data: pd.DataFrame):
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        stored_deques = cast(Dict[str, Deque[float]], self._state_.get("deques", {}))
        base_deques = {col: deque(stored_deques.get(col, deque()), maxlen=self.rolling_window) for col in self.columns_}
        last_valid = cast(Dict[str, float | None], self._state_.get("last_valid", {}))
        fallback = data.copy()
        for col in self.columns_:
            last_val = last_valid.get(col)
            if last_val is not None:
                base_deques[col].append(last_val)
        for index_label in data.index:
            for col in self.columns_:
                original = data.at[index_label, col]
                if pd.isna(original):
                    dq = base_deques[col]
                    if dq:
                        fallback_val = self._deque_median(dq)
                    else:
                        fallback_val = float(medians_lookup.get(col, np.nan))
                    fallback.at[index_label, col] = fallback_val
                else:
                    valf = float(cast(float, original))
                    fallback.at[index_label, col] = valf
                    base_deques[col].append(valf)
        filled = fallback.copy()
        for col in self.columns_:
            series = filled[col]
            if not series.empty:
                last_valid[col] = float(series.iloc[-1])
        self._state_["last_valid"] = last_valid
        self._state_["deques"] = {col: deque(base_deques[col], maxlen=self.rolling_window) for col in self.columns_}
        history_len = cast(int, self._state_.get("history_len", self._history_length()))
        history_len = min(history_len, len(filled)) if len(filled) else 0
        tail_updated = filled.tail(history_len).copy() if history_len else filled.iloc[0:0].copy()
        self._state_["tail"] = tail_updated
        self._state_["history_len"] = len(tail_updated)
        return filled

    def _winsorized_stat(self, values: Sequence[float], clip: float) -> float:
        arr = np.asarray(list(values), dtype=float)
        if arr.size == 0:
            return float("nan")
        lower = float(np.quantile(arr, clip))
        upper = float(np.quantile(arr, 1.0 - clip))
        clipped = np.clip(arr, lower, upper)
        return float(np.median(clipped))

    def _fit_winsorized_median(self, data: pd.DataFrame, medians_lookup: Dict[Hashable, float]):
        clip = float(self._get_policy_param("winsor_clip", 0.1))
        clip = min(max(0.0, clip), 0.49)
        deques: Dict[str, Deque[float]] = {col: deque(maxlen=self.rolling_window) for col in self.columns_}
        filled = data.copy()
        for index_label in data.index:
            for col in self.columns_:
                original = data.at[index_label, col]
                if pd.isna(original):
                    dq = deques[col]
                    if dq:
                        fill_value = self._winsorized_stat(dq, clip)
                    else:
                        fill_value = float(medians_lookup.get(col, np.nan))
                    filled.at[index_label, col] = fill_value
                    deques[col].append(fill_value)
                else:
                    valf = float(cast(float, original))
                    filled.at[index_label, col] = valf
                    deques[col].append(valf)
        state = {
            "medians": medians_lookup,
            "deques": {col: deque(deques[col], maxlen=self.rolling_window) for col in self.columns_},
            "clip": clip,
        }
        return filled, state

    def _transform_winsorized_median(self, data: pd.DataFrame):
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        clip = float(self._state_.get("clip", 0.1))
        stored_deques = cast(Dict[str, Deque[float]], self._state_.get("deques", {}))
        deques = {col: deque(stored_deques.get(col, deque()), maxlen=self.rolling_window) for col in self.columns_}
        filled = data.copy()
        for index_label in data.index:
            for col in self.columns_:
                original = data.at[index_label, col]
                if pd.isna(original):
                    dq = deques[col]
                    if dq:
                        fill_value = self._winsorized_stat(dq, clip)
                    else:
                        fill_value = float(medians_lookup.get(col, np.nan))
                    filled.at[index_label, col] = fill_value
                    deques[col].append(fill_value)
                else:
                    valf = float(cast(float, original))
                    filled.at[index_label, col] = valf
                    deques[col].append(valf)
        self._state_["deques"] = {col: deque(deques[col], maxlen=self.rolling_window) for col in self.columns_}
        return filled

    def _fit_seasonal_median(
        self,
        data: pd.DataFrame,
        medians_lookup: Dict[Hashable, float],
        calendar_series: pd.Series | None,
    ):
        calendar_dt = self._calendar_to_datetime(calendar_series)
        if calendar_dt is None:
            raise ValueError(f"{self.policy} policy requires a datetime-like calendar column.")
        if self.policy == "dow_median":
            keys = calendar_dt.dt.dayofweek
        elif self.policy == "dom_median":
            keys = calendar_dt.dt.day
        else:  # month_median
            keys = calendar_dt.dt.month
        stats: Dict[str, Dict[int, float]] = {}
        filled = data.copy()
        for col in self.columns_:
            frame = pd.DataFrame({"key": keys, "value": data[col]})
            grouped = frame.groupby("key")["value"].agg(["median", "count"])
            stats_for_col: Dict[int, float] = {}
            fallback = float(medians_lookup.get(col, np.nan))
            for idx, row in grouped.iterrows():
                idx_key = int(cast(int, idx))
                count = int(row["count"])
                median_raw = row["median"]
                if isinstance(median_raw, (int, float, np.floating)):
                    median_val = float(median_raw)
                else:
                    median_val = fallback
                if count >= 2 and not np.isnan(median_val):
                    stats_for_col[idx_key] = median_val
                else:
                    stats_for_col[idx_key] = fallback
            stats[col] = stats_for_col
            fill_map = keys.map(stats_for_col)
            col_series = data[col].copy()
            col_series = col_series.fillna(fill_map)
            col_series = col_series.fillna(medians_lookup.get(col, np.nan))
            filled[col] = col_series.astype(float)
        state = {
            "medians": medians_lookup,
            "seasonal_stats": stats,
        }
        return filled, state

    def _transform_seasonal_median(self, data: pd.DataFrame, calendar_series: pd.Series | None):
        calendar_dt = self._calendar_to_datetime(calendar_series)
        if calendar_dt is None:
            raise ValueError(f"{self.policy} policy requires a datetime-like calendar column during transform.")
        if self.policy == "dow_median":
            keys = calendar_dt.dt.dayofweek
        elif self.policy == "dom_median":
            keys = calendar_dt.dt.day
        else:
            keys = calendar_dt.dt.month
        stats = cast(Dict[str, Dict[int, float]], self._state_.get("seasonal_stats", {}))
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        filled = data.copy()
        for col in self.columns_:
            col_series = data[col].copy()
            mapping = stats.get(col, {})
            fill_map = keys.map(mapping)
            col_series = col_series.fillna(fill_map)
            col_series = col_series.fillna(medians_lookup.get(col, np.nan))
            filled[col] = col_series.astype(float)
        return filled

    def _apply_holiday_bridge(
        self,
        frame: pd.DataFrame,
        medians_lookup: Dict[Hashable, float],
        window: int,
        *,
        online_start: int | None = None,
    ):
        bridged = frame.copy()
        for col in self.columns_:
            series = bridged[col].copy()
            values = series.values
            n = len(values)
            i = 0
            while i < n:
                if not np.isnan(values[i]):
                    i += 1
                    continue
                start = i
                while i < n and np.isnan(values[i]):
                    i += 1
                end = i
                length = end - start
                prev_idx = start - 1
                next_idx = end
                prev_val = values[prev_idx] if prev_idx >= 0 and not np.isnan(values[prev_idx]) else None
                next_val = None
                if next_idx < n and not np.isnan(values[next_idx]):
                    if online_start is None or next_idx < online_start:
                        next_val = values[next_idx]
                if prev_val is not None and next_val is not None and length <= window:
                    fill_val = float((prev_val + next_val) / 2.0)
                    values[start:end] = fill_val
                else:
                    segment = series.iloc[max(0, start - self.rolling_window) : start].dropna()
                    if not segment.empty:
                        fill_val = float(segment.median())
                    elif prev_val is not None:
                        fill_val = float(prev_val)
                    elif next_val is not None:
                        fill_val = float(next_val)
                    else:
                        fill_val = float(medians_lookup.get(col, np.nan))
                    values[start:end] = fill_val
            bridged[col] = values
        return bridged

    def _fit_holiday_bridge(
        self,
        data: pd.DataFrame,
        medians_lookup: Dict[Hashable, float],
        calendar_series: pd.Series | None,
    ):
        if self._calendar_to_datetime(calendar_series) is None:
            raise ValueError("holiday_bridge policy requires a datetime-like calendar column.")
        window = int(self._get_policy_param("holiday_window", 2))
        requested_history = self._history_length()
        bridged = self._apply_holiday_bridge(data, medians_lookup, window)
        history_len, tail = self._effective_history(bridged, requested_history)
        state = {
            "medians": medians_lookup,
            "window": window,
            "history_len": history_len,
            "tail": tail,
        }
        return bridged, state

    def _transform_holiday_bridge(self, data: pd.DataFrame, calendar_series: pd.Series | None):
        if self._calendar_to_datetime(calendar_series) is None:
            raise ValueError("holiday_bridge policy requires a datetime-like calendar column during transform.")
        window = int(self._state_.get("window", 2))
        history_len = cast(int, self._state_.get("history_len", self._history_length()))
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        tail = self._state_tail_frame()
        if len(tail) != history_len:
            raise RuntimeError(f"history_len mismatch in holiday_bridge: tail={len(tail)} expected={history_len}")
        combined = pd.concat([tail, data], axis=0)
        bridged = self._apply_holiday_bridge(combined, medians_lookup, window, online_start=len(tail))
        result = bridged.iloc[history_len:].copy()
        result.index = data.index
        new_tail = bridged.tail(history_len).copy()
        self._state_["tail"] = new_tail
        self._state_["history_len"] = len(new_tail)
        return result
    # 多変量補完 ----------------------------------------------------------

    def _fit_knn(self, data: pd.DataFrame):
        n_neighbors = int(self._get_policy_param("knn_neighbors", min(5, len(data))))
        n_neighbors = max(1, n_neighbors)
        means, stds = self._compute_scaler_stats(data)
        scaled = self._standardize_with_stats(data, means, stds)
        available_cols: List[str] = []
        for col in self.columns_:
            notna_series = cast(pd.Series, data[col].notna())
            if self._series_any(notna_series):
                available_cols.append(col)
        missing_cols = [col for col in self.columns_ if col not in available_cols]
        if not available_cols:
            # No informative columns => fall back to medians.
            medians_series = pd.Series({col: self._medians_dict_.get(col, 0.0) for col in self.columns_})
            filled = data.fillna(medians_series)
            state = {
                "imputer": None,
                "scaler_means": means,
                "scaler_stds": stds,
                "available_cols": available_cols,
                "missing_cols": missing_cols,
                "medians": self._medians_dict_,
            }
            return filled, state
        imputer = KNNImputer(n_neighbors=n_neighbors, weights="distance")
        scaled_subset = scaled.loc[:, available_cols]
        filled_array = imputer.fit_transform(scaled_subset)
        filled_scaled_subset = pd.DataFrame(
            filled_array,
            columns=pd.Index(available_cols),
            index=data.index,
        )
        filled_scaled = scaled.copy()
        filled_scaled.loc[:, available_cols] = filled_scaled_subset
        filled = self._destandardize_with_stats(filled_scaled, means, stds)
        medians_lookup = self._medians_dict_
        for col in missing_cols:
            median_val = float(medians_lookup.get(col, 0.0))
            mask_missing = cast(pd.Series, filled[col].isna())
            if self._series_any(mask_missing):
                filled.loc[mask_missing, col] = median_val
        for col in self.columns_:
            mask_series = cast(pd.Series, data[col].isna()).astype(bool)
            if self._series_any(~mask_series):
                filled.loc[~mask_series, col] = data.loc[~mask_series, col].astype(float)
        state = {
            "imputer": imputer,
            "scaler_means": means,
            "scaler_stds": stds,
            "available_cols": available_cols,
            "missing_cols": missing_cols,
            "medians": medians_lookup,
        }
        return filled, state

    def _transform_knn(self, data: pd.DataFrame):
        imputer = cast(KNNImputer, self._state_.get("imputer"))
        means = cast(Dict[str, float], self._state_.get("scaler_means", {}))
        stds = cast(Dict[str, float], self._state_.get("scaler_stds", {}))
        available_cols = cast(List[str], self._state_.get("available_cols", []))
        missing_cols = cast(List[str], self._state_.get("missing_cols", []))
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        scaled = self._standardize_with_stats(data, means, stds)
        if imputer is not None and available_cols:
            scaled_subset = scaled.loc[:, available_cols]
            filled_array = imputer.transform(scaled_subset)
            filled_scaled_subset = pd.DataFrame(
                filled_array,
                columns=pd.Index(available_cols),
                index=data.index,
            )
            scaled.loc[:, available_cols] = filled_scaled_subset
        filled = self._destandardize_with_stats(scaled, means, stds)
        for col in missing_cols:
            median_val = float(medians_lookup.get(col, 0.0))
            mask_missing = cast(pd.Series, filled[col].isna())
            if self._series_any(mask_missing):
                filled.loc[mask_missing, col] = median_val
        for col in self.columns_:
            mask_series = cast(pd.Series, data[col].isna()).astype(bool)
            if self._series_any(~mask_series):
                filled.loc[~mask_series, col] = data.loc[~mask_series, col].astype(float)
        return filled

    def _fit_pca_reconstruct(self, data: pd.DataFrame, medians_lookup: Dict[Hashable, float]):
        n_features = max(1, len(self.columns_))
        default_components = max(1, min(n_features - 1, n_features // 2)) if n_features > 1 else 1
        components = int(self._get_policy_param("pca_components", default_components))
        components = max(1, min(components, n_features))
        medians_series = pd.Series(medians_lookup)
        filled_reference = data.fillna(medians_series)
        rng = np.random.RandomState(self.random_state)
        pca = PCA(n_components=components, random_state=rng)
        pca.fit(filled_reference.values)
        reconstructed = pca.inverse_transform(pca.transform(filled_reference.values))
        recon_df = pd.DataFrame(reconstructed, columns=data.columns, index=data.index)
        filled = data.copy()
        for col in self.columns_:
            mask = data[col].isna()
            filled.loc[mask, col] = recon_df.loc[mask, col]
            filled.loc[~mask, col] = data.loc[~mask, col].astype(float)
        state = {
            "pca": pca,
            "medians": medians_lookup,
        }
        return filled, state

    def _transform_pca_reconstruct(self, data: pd.DataFrame):
        pca = cast(PCA, self._state_.get("pca"))
        if pca is None:
            raise RuntimeError("PCA model missing; ensure fit was called.")
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        medians_series = pd.Series(medians_lookup)
        filled_reference = data.fillna(medians_series)
        reconstructed = pca.inverse_transform(pca.transform(filled_reference.values))
        recon_df = pd.DataFrame(reconstructed, columns=data.columns, index=data.index)
        filled = data.copy()
        for col in self.columns_:
            mask = data[col].isna()
            filled.loc[mask, col] = recon_df.loc[mask, col]
            filled.loc[~mask, col] = data.loc[~mask, col].astype(float)
        return filled

    def _fit_mice(self, data: pd.DataFrame):
        max_iter = int(self._get_policy_param("mice_max_iter", 10))
        means, stds = self._compute_scaler_stats(data)
        scaled = self._standardize_with_stats(data, means, stds)
        available_cols: List[str] = []
        for col in self.columns_:
            notna_series = cast(pd.Series, data[col].notna())
            if self._series_any(notna_series):
                available_cols.append(col)
        missing_cols = [col for col in self.columns_ if col not in available_cols]
        if not available_cols:
            medians_series = pd.Series({col: self._medians_dict_.get(col, 0.0) for col in self.columns_})
            filled = data.fillna(medians_series)
            state = {
                "imputer": None,
                "scaler_means": means,
                "scaler_stds": stds,
                "available_cols": available_cols,
                "missing_cols": missing_cols,
                "medians": self._medians_dict_,
            }
            return filled, state
        scaled_subset = scaled.loc[:, available_cols]
        rng = np.random.RandomState(self.random_state)
        imputer = IterativeImputer(random_state=rng, max_iter=max_iter, sample_posterior=False)
        filled_array = imputer.fit_transform(scaled_subset)
        filled_scaled_subset = pd.DataFrame(
            filled_array,
            columns=pd.Index(available_cols),
            index=data.index,
        )
        filled_scaled = scaled.copy()
        filled_scaled.loc[:, available_cols] = filled_scaled_subset
        filled = self._destandardize_with_stats(filled_scaled, means, stds)
        medians_lookup = self._medians_dict_
        for col in missing_cols:
            median_val = float(medians_lookup.get(col, 0.0))
            mask_missing = cast(pd.Series, filled[col].isna())
            if self._series_any(mask_missing):
                filled.loc[mask_missing, col] = median_val
        for col in self.columns_:
            mask_series = cast(pd.Series, data[col].isna()).astype(bool)
            if self._series_any(~mask_series):
                filled.loc[~mask_series, col] = data.loc[~mask_series, col].astype(float)
        state = {
            "imputer": imputer,
            "scaler_means": means,
            "scaler_stds": stds,
            "available_cols": available_cols,
            "missing_cols": missing_cols,
            "medians": medians_lookup,
        }
        return filled, state

    def _transform_mice(self, data: pd.DataFrame):
        imputer = cast(IterativeImputer, self._state_.get("imputer"))
        means = cast(Dict[str, float], self._state_.get("scaler_means", {}))
        stds = cast(Dict[str, float], self._state_.get("scaler_stds", {}))
        available_cols = cast(List[str], self._state_.get("available_cols", []))
        missing_cols = cast(List[str], self._state_.get("missing_cols", []))
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        scaled = self._standardize_with_stats(data, means, stds)
        if imputer is not None and available_cols:
            scaled_subset = scaled.loc[:, available_cols]
            filled_array = imputer.transform(scaled_subset)
            filled_scaled_subset = pd.DataFrame(
                filled_array,
                columns=pd.Index(available_cols),
                index=data.index,
            )
            scaled.loc[:, available_cols] = filled_scaled_subset
        filled = self._destandardize_with_stats(scaled, means, stds)
        for col in missing_cols:
            median_val = float(medians_lookup.get(col, 0.0))
            mask_missing = cast(pd.Series, filled[col].isna())
            if self._series_any(mask_missing):
                filled.loc[mask_missing, col] = median_val
        for col in self.columns_:
            mask_series = cast(pd.Series, data[col].isna()).astype(bool)
            if self._series_any(~mask_series):
                filled.loc[~mask_series, col] = data.loc[~mask_series, col].astype(float)
        return filled

    def _fit_missforest(self, data: pd.DataFrame):
        max_iter = int(self._get_policy_param("missforest_max_iter", 5))
        n_estimators = int(self._get_policy_param("missforest_estimators", 200))
        means, stds = self._compute_scaler_stats(data)
        scaled = self._standardize_with_stats(data, means, stds)
        available_cols: List[str] = []
        for col in self.columns_:
            notna_series = cast(pd.Series, data[col].notna())
            if self._series_any(notna_series):
                available_cols.append(col)
        missing_cols = [col for col in self.columns_ if col not in available_cols]
        if not available_cols:
            medians_series = pd.Series({col: self._medians_dict_.get(col, 0.0) for col in self.columns_})
            filled = data.fillna(medians_series)
            state = {
                "imputer": None,
                "scaler_means": means,
                "scaler_stds": stds,
                "available_cols": available_cols,
                "missing_cols": missing_cols,
                "medians": self._medians_dict_,
            }
            return filled, state
        scaled_subset = scaled.loc[:, available_cols]
        rng_estim = np.random.RandomState(self.random_state)
        estimator = RandomForestRegressor(
            n_estimators=n_estimators,
            random_state=rng_estim,
            n_jobs=-1,
            max_depth=self._get_policy_param("missforest_max_depth", None),
        )
        rng_imputer = np.random.RandomState(self.random_state)
        imputer = IterativeImputer(
            estimator=estimator,
            random_state=rng_imputer,
            max_iter=max_iter,
            sample_posterior=False,
            initial_strategy="median",
        )
        filled_array = imputer.fit_transform(scaled_subset)
        filled_scaled_subset = pd.DataFrame(
            filled_array,
            columns=pd.Index(available_cols),
            index=data.index,
        )
        filled_scaled = scaled.copy()
        filled_scaled.loc[:, available_cols] = filled_scaled_subset
        filled = self._destandardize_with_stats(filled_scaled, means, stds)
        medians_lookup = self._medians_dict_
        for col in missing_cols:
            median_val = float(medians_lookup.get(col, 0.0))
            mask_missing = cast(pd.Series, filled[col].isna())
            if self._series_any(mask_missing):
                filled.loc[mask_missing, col] = median_val
        for col in self.columns_:
            mask_series = cast(pd.Series, data[col].isna()).astype(bool)
            if self._series_any(~mask_series):
                filled.loc[~mask_series, col] = data.loc[~mask_series, col].astype(float)
        state = {
            "imputer": imputer,
            "scaler_means": means,
            "scaler_stds": stds,
            "available_cols": available_cols,
            "missing_cols": missing_cols,
            "medians": medians_lookup,
        }
        return filled, state

    def _transform_missforest(self, data: pd.DataFrame):
        imputer = cast(IterativeImputer, self._state_.get("imputer"))
        means = cast(Dict[str, float], self._state_.get("scaler_means", {}))
        stds = cast(Dict[str, float], self._state_.get("scaler_stds", {}))
        available_cols = cast(List[str], self._state_.get("available_cols", []))
        missing_cols = cast(List[str], self._state_.get("missing_cols", []))
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        scaled = self._standardize_with_stats(data, means, stds)
        if imputer is not None and available_cols:
            scaled_subset = scaled.loc[:, available_cols]
            filled_array = imputer.transform(scaled_subset)
            filled_scaled_subset = pd.DataFrame(
                filled_array,
                columns=pd.Index(available_cols),
                index=data.index,
            )
            scaled.loc[:, available_cols] = filled_scaled_subset
        filled = self._destandardize_with_stats(scaled, means, stds)
        for col in missing_cols:
            median_val = float(medians_lookup.get(col, 0.0))
            mask_missing = cast(pd.Series, filled[col].isna())
            if self._series_any(mask_missing):
                filled.loc[mask_missing, col] = median_val
        for col in self.columns_:
            mask_series = cast(pd.Series, data[col].isna()).astype(bool)
            if self._series_any(~mask_series):
                filled.loc[~mask_series, col] = data.loc[~mask_series, col].astype(float)
        return filled

    def _fit_ridge_stack(self, data: pd.DataFrame, medians_lookup: Dict[Hashable, float]):
        alpha = float(self._get_policy_param("ridge_alpha", 1.0))
        medians_series = pd.Series(medians_lookup)
        filled_reference = data.fillna(medians_series)
        models: Dict[str, Ridge] = {}
        for col in self.columns_:
            mask = cast(pd.Series, data[col].notna())
            if int(mask.sum()) < 2:
                continue
            target = data.loc[mask, col].astype(float)
            features = filled_reference.loc[mask, [c for c in self.columns_ if c != col]]
            model = Ridge(alpha=alpha, random_state=np.random.RandomState(self.random_state))
            model.fit(features, target)
            models[col] = model
        filled = filled_reference.copy()
        for col in self.columns_:
            mask_series = cast(pd.Series, data[col].isna()).astype(bool)
            if not self._series_any(mask_series):
                filled[col] = data[col].astype(float)
                continue
            model = models.get(col)
            if model is None:
                filled.loc[mask_series, col] = medians_lookup.get(col, np.nan)
                continue
            feature_cols = [c for c in self.columns_ if c != col]
            preds = model.predict(filled.loc[mask_series, feature_cols])
            filled.loc[mask_series, col] = preds
        state = {
            "models": models,
            "medians": medians_lookup,
        }
        return filled, state

    def _transform_ridge_stack(self, data: pd.DataFrame):
        models = cast(Dict[str, Ridge], self._state_.get("models", {}))
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        medians_series = pd.Series(medians_lookup)
        filled = data.fillna(medians_series)
        for col in self.columns_:
            mask_series = cast(pd.Series, data[col].isna()).astype(bool)
            if not self._series_any(mask_series):
                filled[col] = data[col].astype(float)
                continue
            model = models.get(col)
            if model is None:
                filled.loc[mask_series, col] = medians_lookup.get(col, np.nan)
                continue
            feature_cols = [c for c in self.columns_ if c != col]
            preds = model.predict(filled.loc[mask_series, feature_cols])
            filled.loc[mask_series, col] = preds
        return filled

    def _fit_quantile_fill(self, data: pd.DataFrame, medians_lookup: Dict[Hashable, float]):
        quantile = float(self._get_policy_param("quantile", 0.5))
        quantile = min(max(0.0, quantile), 1.0)
        deques: Dict[str, Deque[float]] = {col: deque(maxlen=self.rolling_window) for col in self.columns_}
        filled = data.copy()
        for index_label in data.index:
            for col in self.columns_:
                original = data.at[index_label, col]
                if pd.isna(original):
                    dq = deques[col]
                    if dq:
                        fill_value = float(np.quantile(list(dq), quantile))
                    else:
                        fill_value = float(medians_lookup.get(col, np.nan))
                    filled.at[index_label, col] = fill_value
                    deques[col].append(fill_value)
                else:
                    valf = float(cast(float, original))
                    filled.at[index_label, col] = valf
                    deques[col].append(valf)
        state = {
            "medians": medians_lookup,
            "deques": {col: deque(deques[col], maxlen=self.rolling_window) for col in self.columns_},
            "quantile": quantile,
        }
        return filled, state

    def _transform_quantile_fill(self, data: pd.DataFrame):
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        quantile = float(self._state_.get("quantile", 0.5))
        stored_deques = cast(Dict[str, Deque[float]], self._state_.get("deques", {}))
        deques = {col: deque(stored_deques.get(col, deque()), maxlen=self.rolling_window) for col in self.columns_}
        filled = data.copy()
        for index_label in data.index:
            for col in self.columns_:
                original = data.at[index_label, col]
                if pd.isna(original):
                    dq = deques[col]
                    if dq:
                        fill_value = float(np.quantile(list(dq), quantile))
                    else:
                        fill_value = float(medians_lookup.get(col, np.nan))
                    filled.at[index_label, col] = fill_value
                    deques[col].append(fill_value)
                else:
                    valf = float(cast(float, original))
                    filled.at[index_label, col] = valf
                    deques[col].append(valf)
        self._state_["deques"] = {col: deque(deques[col], maxlen=self.rolling_window) for col in self.columns_}
        return filled

    def _fit_kalman(
        self,
        data: pd.DataFrame,
        medians_lookup: Dict[Hashable, float],
        *,
        level_only: bool,
    ):
        """Fit UnobservedComponents models and backfill using filter (one-step-ahead) estimates."""
        if UnobservedComponents is None:
            raise RuntimeError("kalman_* policies require the 'statsmodels' package.")
        models: Dict[str, Any] = {}
        filled = data.copy()
        requested_history = self._history_length()
        warnings: List[str] = []
        for col in self.columns_:
            series = data[col].astype(float)
            if series.notna().sum() < 3:
                filled[col] = series.fillna(medians_lookup.get(col, np.nan))
                continue
            model = UnobservedComponents(series, level="local level", trend=not level_only)  # type: ignore[arg-type]
            try:
                res = cast(Any, model.fit(disp=False))
                fitted_series = pd.Series(cast(Any, res.fittedvalues), index=series.index)
                if hasattr(res, "remove_data"):
                    # Drop cached training arrays to keep serialized artifacts small while
                    # retaining parameters required for forward filtering.
                    res.remove_data()
            except Exception as exc:  # pragma: no cover
                res = None
                fitted_series = series.fillna(method="ffill")
                warnings.append(f"kalman_fit_fallback[{col}]: {type(exc).__name__}")
            col_filled = series.copy()
            mask = series.isna()
            col_filled.loc[mask] = fitted_series.loc[mask]
            col_filled = col_filled.fillna(medians_lookup.get(col, np.nan))
            filled[col] = col_filled
            if res is not None:
                models[col] = res
        history_len, tail = self._effective_history(filled, requested_history)
        state = {
            "models": models,
            "medians": medians_lookup,
            "history_len": history_len,
            "tail": tail,
            "warnings": warnings,
        }
        return filled, state

    def _transform_kalman(self, data: pd.DataFrame, calendar_series: pd.Series | None):
        models = cast(Dict[str, Any], self._state_.get("models", {}))
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        history_len = cast(int, self._state_.get("history_len", self._history_length()))
        tail = self._state_tail_frame()
        if len(tail) != history_len:
            raise RuntimeError(f"history_len mismatch in kalman transform: tail={len(tail)} expected={history_len}")
        combined = pd.concat([tail, data], axis=0)
        result = combined.copy()
        for col in self.columns_:
            series = combined[col].astype(float)
            res = models.get(col)
            if res is None:
                filled_col = series.fillna(method="ffill").fillna(medians_lookup.get(col, np.nan))
                self._record_warning(f"kalman_transform_fallback[{col}]: model_missing")
            else:
                try:
                    series_for_filter = series.copy()
                    for idx in data.index:
                        if idx in series_for_filter.index:
                            series_for_filter.at[idx] = np.nan
                    applied = res.apply(series_for_filter)
                    forecasts_obj = getattr(applied, "forecasts", None)
                    if forecasts_obj is None:
                        filter_results = getattr(applied, "filter_results", None)
                        forecasts_obj = getattr(filter_results, "forecasts", None)
                    if forecasts_obj is not None:
                        forecasts_arr = np.asarray(forecasts_obj)
                        if forecasts_arr.ndim == 2:
                            forecasts_arr = forecasts_arr.reshape(forecasts_arr.shape[0], -1)
                            if forecasts_arr.shape[0] == 1:
                                forecasts_arr = forecasts_arr[0]
                            else:
                                forecasts_arr = forecasts_arr.mean(axis=0)
                        fitted_values = forecasts_arr.reshape(-1)
                        if len(fitted_values) < len(series):
                            fitted_values = np.pad(fitted_values, (len(series) - len(fitted_values), 0), mode="edge")
                    else:
                        raise AttributeError("kalman_forecast_unavailable")
                except Exception as exc:  # pragma: no cover
                    fitted_values = series.fillna(method="ffill")
                    self._record_warning(f"kalman_transform_fallback[{col}]: {type(exc).__name__}")
                fitted_series = pd.Series(fitted_values, index=combined.index)
                filled_col = series.copy()
                mask = series.isna()
                filled_col.loc[mask] = fitted_series.loc[mask]
                filled_col = filled_col.fillna(medians_lookup.get(col, np.nan))
            result[col] = filled_col
        tail_updated = result.tail(history_len).copy()
        self._state_["tail"] = tail_updated
        self._state_["history_len"] = len(tail_updated)
        return result.iloc[history_len:].copy()

    def _fit_arima_auto(self, data: pd.DataFrame, medians_lookup: Dict[Hashable, float]):
        """Auto-select ARIMA orders and use filter outputs to keep the transform forward-only."""
        if ARIMA is None:
            raise RuntimeError("arima_auto policy requires the 'statsmodels' package.")
        max_p = int(self._get_policy_param("arima_max_p", 2))
        max_d = int(self._get_policy_param("arima_max_d", 1))
        max_q = int(self._get_policy_param("arima_max_q", 2))
        models: Dict[str, Any] = {}
        filled = data.copy()
        requested_history = self._history_length()
        warnings: List[str] = []
        for col in self.columns_:
            series = data[col].astype(float)
            if series.notna().sum() < 5:
                filled[col] = series.fillna(medians_lookup.get(col, np.nan))
                continue
            best_res = None
            best_aic = np.inf
            for p in range(max_p + 1):
                for d in range(max_d + 1):
                    for q in range(max_q + 1):
                        if p == d == q == 0:
                            continue
                        try:
                            res = ARIMA(
                                series,
                                order=(p, d, q),
                                enforce_stationarity=False,
                                enforce_invertibility=False,
                            ).fit(method_kwargs={"warn_convergence": False})
                            if res.aic < best_aic:
                                best_aic = float(res.aic)
                                best_res = res
                        except Exception:
                            continue
            if best_res is None:
                filled[col] = series.fillna(method="ffill").fillna(medians_lookup.get(col, np.nan))
                warnings.append(f"arima_fit_fallback[{col}]: no_model")
            else:
                fitted_series = pd.Series(best_res.fittedvalues, index=series.index)  # filter (one-step) predictions
                col_filled = series.copy()
                mask = series.isna()
                col_filled.loc[mask] = fitted_series.loc[mask]
                col_filled = col_filled.fillna(medians_lookup.get(col, np.nan))
                filled[col] = col_filled
                models[col] = best_res
        history_len, tail = self._effective_history(filled, requested_history)
        state = {
            "models": models,
            "medians": medians_lookup,
            "history_len": history_len,
            "tail": tail,
            "warnings": warnings,
        }
        return filled, state

    def _transform_arima_auto(self, data: pd.DataFrame):
        models = cast(Dict[str, Any], self._state_.get("models", {}))
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        history_len = cast(int, self._state_.get("history_len", self._history_length()))
        tail = self._state_tail_frame()
        if len(tail) != history_len:
            raise RuntimeError(f"history_len mismatch in arima transform: tail={len(tail)} expected={history_len}")
        combined = pd.concat([tail, data], axis=0)
        result = combined.copy()
        for col in self.columns_:
            series = combined[col].astype(float)
            res = models.get(col)
            if res is None:
                filled_col = series.fillna(method="ffill").fillna(medians_lookup.get(col, np.nan))
                self._record_warning(f"arima_transform_fallback[{col}]: model_missing")
            else:
                try:
                    forecast_res = res.get_forecast(steps=len(data))
                    forecast_mean = np.asarray(forecast_res.predicted_mean)
                    forecast_series = pd.Series(forecast_mean, index=data.index)
                except Exception as exc:  # pragma: no cover
                    fitted_values = series.fillna(method="ffill")
                    self._record_warning(f"arima_transform_fallback[{col}]: {type(exc).__name__}")
                    filled_col = series.copy()
                    mask = series.isna()
                    filled_col.loc[mask] = fitted_values.loc[mask] if isinstance(fitted_values, pd.Series) else fitted_values
                    filled_col = filled_col.fillna(medians_lookup.get(col, np.nan))
                else:
                    filled_col = series.copy()
                    mask_combined = series.isna()
                    mask_new = mask_combined.loc[data.index]
                    missing_indices = mask_new.index[mask_new]
                    if not missing_indices.empty:
                        filled_col.loc[missing_indices] = forecast_series.loc[missing_indices]
                    filled_col = filled_col.fillna(medians_lookup.get(col, np.nan))
            result[col] = filled_col
        tail_updated = result.tail(history_len).copy()
        self._state_["tail"] = tail_updated
        self._state_["history_len"] = len(tail_updated)
        return result.iloc[history_len:].copy()
    # マスク系・二段補完 --------------------------------------------------

    def _fit_mask_plus_mean(self, data: pd.DataFrame, medians_lookup: Dict[Hashable, float]):
        medians_series = pd.Series(medians_lookup)
        filled = data.fillna(medians_series)
        mask_map: Dict[str, str] = {}
        for col in self.columns_:
            mask_col = f"{col}_missing_flag"
            mask_map[col] = mask_col
            filled[mask_col] = data[col].isna().astype(float)
        state = {
            "means": medians_lookup,
            "mask_map": mask_map,
        }
        return filled, state

    def _transform_mask_plus_mean(self, data: pd.DataFrame):
        means = cast(Dict[Hashable, float], self._state_.get("means", {}))
        mask_map = cast(Dict[str, str], self._state_.get("mask_map", {}))
        filled = data.fillna(pd.Series(means))
        for col, mask_col in mask_map.items():
            filled[mask_col] = data[col].isna().astype(float)
        return filled

    def _fit_two_stage(self, data: pd.DataFrame, medians_lookup: Dict[Hashable, float]):
        requested_history = self._history_length()
        first_pass = data.interpolate(method="linear", limit_direction="forward")
        first_pass = first_pass.ffill()
        first_pass = first_pass.fillna(pd.Series(medians_lookup))
        deques: Dict[str, Deque[float]] = {col: deque(maxlen=self.rolling_window) for col in self.columns_}
        second_pass = first_pass.copy()
        for index_label in first_pass.index:
            for col in self.columns_:
                value = second_pass.at[index_label, col]
                if pd.isna(data.at[index_label, col]):
                    dq = deques[col]
                    if dq:
                        fill_value = self._deque_median(dq)
                    else:
                        fill_value = float(medians_lookup.get(col, np.nan))
                    second_pass.at[index_label, col] = fill_value
                    deques[col].append(fill_value)
                else:
                    valf = float(cast(float, value))
                    second_pass.at[index_label, col] = valf
                    deques[col].append(valf)
        history_len, tail = self._effective_history(second_pass, requested_history)
        state = {
            "medians": medians_lookup,
            "history_len": history_len,
            "tail": tail,
            "deques": {col: deque(deques[col], maxlen=self.rolling_window) for col in self.columns_},
        }
        return second_pass, state

    def _transform_two_stage(self, data: pd.DataFrame):
        medians_lookup = cast(Dict[Hashable, float], self._state_.get("medians", {}))
        history_len = cast(int, self._state_.get("history_len", self._history_length()))
        tail = self._state_tail_frame()
        deques_state = cast(Dict[str, Deque[float]], self._state_.get("deques", {}))
        if len(tail) != history_len:
            raise RuntimeError(f"history_len mismatch in two_stage: tail={len(tail)} expected={history_len}")
        tail_base = tail.reindex(columns=self.columns_, fill_value=np.nan)
        combined = pd.concat([tail_base, data], axis=0)
        first_pass = combined.interpolate(method="linear", limit_direction="forward")
        first_pass = first_pass.ffill()
        first_pass = first_pass.fillna(pd.Series(medians_lookup))
        deques = {col: deque(deques_state.get(col, deque()), maxlen=self.rolling_window) for col in self.columns_}
        second_pass = first_pass.copy()
        combined_original = pd.concat([tail_base, data], axis=0)
        for index_label in second_pass.index:
            for col in self.columns_:
                original_val = combined_original.at[index_label, col]
                if pd.isna(original_val):
                    dq = deques[col]
                    if dq:
                        fill_value = self._deque_median(dq)
                    else:
                        fill_value = float(medians_lookup.get(col, np.nan))
                    second_pass.at[index_label, col] = fill_value
                    deques[col].append(fill_value)
                else:
                    valf = float(cast(float, second_pass.at[index_label, col]))
                    second_pass.at[index_label, col] = valf
                    deques[col].append(valf)
        self._state_["deques"] = {col: deque(deques[col], maxlen=self.rolling_window) for col in self.columns_}
        new_tail = second_pass.tail(history_len).copy()
        self._state_["tail"] = new_tail
        self._state_["history_len"] = len(new_tail)
        result = second_pass.iloc[history_len:].copy()
        result.index = data.index
        return result

# === Module: preprocess.E_group.e_group ===
import math
from typing import Any, Hashable, Iterable, List, Mapping, cast
import numpy as np
import pandas as pd

_BaseImputer = MGroupImputer

class EGroupImputer(_BaseImputer):
    """Imputer tailored for E-group features leveraging the M-group policies."""
    CALENDAR_REQUIRED_POLICIES = {
        "dow_median",
        "dom_median",
        "month_median",
        "holiday_bridge",
        "time_interp",
        "kalman_local_level",
        "state_space_custom",
        "arima_auto",
    }

    def __init__(
        self,
        columns: Iterable[Hashable] | None = None,
        policy: str = "ffill_bfill",
        rolling_window: int = 5,
        ema_alpha: float = 0.3,
        calendar_column: str | None = None,
        policy_params: Mapping[str, Any] | None = None,
        random_state: int = 42,
        all_nan_strategy: str = "keep_nan",
        all_nan_fill: float = 0.0,
    ) -> None:
        self._user_calendar_column = calendar_column
        self.all_nan_strategy = all_nan_strategy
        strategy_choices = {"keep_nan", "fill_zero", "fill_constant"}
        if self.all_nan_strategy not in strategy_choices:
            raise ValueError(f"all_nan_strategy must be one of {sorted(strategy_choices)}")
        self.all_nan_fill = float(all_nan_fill)
        self.all_nan_fill_value_ = float(all_nan_fill)
        self.all_nan_columns_: List[str] = []
        self._prefit_warnings: List[str] = []
        super().__init__(
            columns=columns,
            policy=policy,
            rolling_window=rolling_window,
            ema_alpha=ema_alpha,
            calendar_column=calendar_column,
            policy_params=policy_params,
            random_state=random_state,
        )
        if self.policy in self.CALENDAR_REQUIRED_POLICIES and self._user_calendar_column is None:
            raise ValueError(
                f"Policy '{self.policy_requested}' requires calendar_column to be provided explicitly."
            )

    def fit(self, X: pd.DataFrame, y: Any = None):  # type: ignore[override]
        frame = self._ensure_dataframe(X).copy()
        if self.columns is None:
            selected = [c for c in frame.columns if isinstance(c, str) and c.startswith("E")]
            self.columns = selected
        if self.columns is None:
            self.columns = []
        numeric_cols = []
        for col in list(self.columns):
            if col not in frame.columns:
                continue
            frame.loc[:, col] = pd.to_numeric(frame[col], errors="coerce")
            numeric_cols.append(col)
        self.columns = numeric_cols
        all_nan_columns: List[str] = []
        for col in self.columns:
            series = cast(pd.Series, frame[col])
            if series.isna().all():
                all_nan_columns.append(str(col))
        self.all_nan_columns_ = all_nan_columns
        self._prefit_warnings = []
        if self.policy in self.CALENDAR_REQUIRED_POLICIES:
            calendar_col = self._user_calendar_column or self.calendar_column
            if calendar_col is None:
                raise ValueError(
                    f"Policy '{self.policy_requested}' requires calendar_column but none was supplied."
                )
            if calendar_col not in frame.columns:
                raise ValueError(f"Calendar column '{calendar_col}' not found in training frame.")
            calendar_series = pd.to_datetime(frame[calendar_col], errors="coerce")
            if calendar_series.isna().any():
                self._prefit_warnings.append("calendar_column_contains_non_parseable_values")
            if calendar_series.duplicated().any():
                self._prefit_warnings.append("calendar_column_contains_duplicates")
        fitted = super().fit(frame, y)
        if self._prefit_warnings:
            for msg in self._prefit_warnings:
                self._record_warning(msg)
        extra_columns = getattr(self, "extra_columns_", None)
        if isinstance(extra_columns, list) and extra_columns:
            rename_map: dict[str, str] = {}
            for col in extra_columns:
                rename_map[col] = self._rename_generated_column(col)
            if rename_map:
                self._train_filled_ = self._train_filled_.rename(columns=rename_map)
                renamed_extra: List[str] = [str(rename_map.get(col, col)) for col in extra_columns]
                self.extra_columns_ = renamed_extra
                output_cols = list(getattr(self, "_output_columns_", []))
                self._output_columns_ = [str(rename_map.get(col, col)) for col in output_cols]
                state = getattr(self, "_state_", {})
                if isinstance(state, dict):
                    for key, value in list(state.items()):
                        if isinstance(value, pd.DataFrame):
                            state[key] = value.rename(columns=rename_map)
                    mask_map = state.get("mask_map")
                    if isinstance(mask_map, dict):
                        state["mask_map"] = {k: rename_map.get(v, v) for k, v in mask_map.items()}
        if self.all_nan_columns_:
            fill_value: float | None
            if self.all_nan_strategy == "keep_nan":
                fill_value = np.nan
            elif self.all_nan_strategy == "fill_zero":
                fill_value = 0.0
            else:  # fill_constant
                fill_value = self.all_nan_fill_value_
            for col in self.all_nan_columns_:
                if col in self._train_filled_.columns:
                    if fill_value is None:
                        self._train_filled_.loc[:, col] = np.nan
                    elif isinstance(fill_value, float) and math.isnan(fill_value):
                        self._train_filled_.loc[:, col] = np.nan
                    else:
                        self._train_filled_.loc[:, col] = float(fill_value)
            state = getattr(self, "_state_", {})
            if isinstance(state, dict):
                state["all_nan_columns"] = list(self.all_nan_columns_)
                state["all_nan_strategy"] = self.all_nan_strategy
                state["all_nan_fill_value"] = float(self.all_nan_fill_value_)
        return fitted

    def transform(self, X: pd.DataFrame):  # type: ignore[override]
        transformed = super().transform(X)
        if self.all_nan_columns_:
            if self.all_nan_strategy == "keep_nan":
                fill_value = np.nan
            elif self.all_nan_strategy == "fill_zero":
                fill_value = 0.0
            else:
                fill_value = self.all_nan_fill_value_
            for col in self.all_nan_columns_:
                if col in transformed.columns:
                    if fill_value is None:
                        transformed.loc[:, col] = np.nan
                    elif isinstance(fill_value, float) and math.isnan(fill_value):
                        transformed.loc[:, col] = np.nan
                    else:
                        transformed.loc[:, col] = float(fill_value)
        return transformed

    def _rename_generated_column(self, name: str) -> str:
        if name.endswith("_missing_flag"):
            base = name[: -len("_missing_flag")]
            return f"Emask__{base}"
        if name.startswith("E__") or name.startswith("Emask__"):
            return name
        return f"E__{name}"

# === Module: preprocess.I_group.i_group ===
from typing import Any, Dict, Iterable, Mapping
import numpy as np
import pandas as pd

_BaseMGroupImputer = MGroupImputer

class IGroupImputer(_BaseMGroupImputer):
    """Specialised imputer for I-group (inventory) features.
    Extends :class:`MGroupImputer` with I-specific defaults:
    - Automatically scopes to columns starting with ``"I"`` when no column list is provided.
    - Renames generated helper columns so they remain I-namespaced (e.g. ``Imask__``).
    - Optionally applies quantile clipping after imputation to cap extreme values.
    """
    CALENDAR_REQUIRED_POLICIES = {
        "dow_median",
        "dom_median",
        "month_median",
        "holiday_bridge",
        "time_interp",
    }

    def __init__(
        self,
        columns: Iterable[str] | None = None,
        policy: str = "ffill_bfill",
        rolling_window: int = 5,
        ema_alpha: float = 0.3,
        calendar_column: str | None = None,
        policy_params: Mapping[str, Any] | None = None,
        random_state: int = 42,
        *,
        clip_quantile_low: float = 0.001,
        clip_quantile_high: float = 0.999,
        enable_quantile_clip: bool = True,
    ) -> None:
        self._user_calendar_column = calendar_column
        self.clip_quantile_low = float(clip_quantile_low)
        self.clip_quantile_high = float(clip_quantile_high)
        self.enable_quantile_clip = bool(enable_quantile_clip)
        self._clip_bounds_: Dict[str, tuple[float, float]] = {}
        self._prefit_warnings: list[str] = []
        super().__init__(
            columns=columns,
            policy=policy,
            rolling_window=rolling_window,
            ema_alpha=ema_alpha,
            calendar_column=calendar_column,
            policy_params=policy_params,
            random_state=random_state,
        )
    # ------------------------------------------------------------------

    def fit(self, X: pd.DataFrame, y=None):  # type: ignore[override]
        frame = self._ensure_dataframe(X).copy()
        selected_columns = self._resolve_columns(frame)
        numeric_columns: list[str] = []
        for col in selected_columns:
            if col not in frame.columns:
                continue
            frame.loc[:, col] = pd.to_numeric(frame[col], errors="coerce")
            numeric_columns.append(col)
        self.columns = numeric_columns
        calendar_column = self._resolve_calendar_column(frame)
        if calendar_column is not None and calendar_column in frame.columns:
            calendar_series = pd.to_datetime(frame[calendar_column], errors="coerce")
            if calendar_series.isna().any():
                self._prefit_warnings.append("calendar_column_contains_non_parseable_values")
            if calendar_series.duplicated().any():
                self._prefit_warnings.append("calendar_column_contains_duplicates")
        fitted = super().fit(frame, y)
        self._relabel_generated_columns()
        self._clip_bounds_ = self._compute_clip_bounds()
        if hasattr(self, "_state_") and isinstance(self._state_, dict):
            if self._prefit_warnings:
                warnings = self._state_.setdefault("warnings", [])
                if isinstance(warnings, list):
                    warnings.extend(self._prefit_warnings)
            self._state_["clip_bounds"] = dict(self._clip_bounds_)
            self._state_["clip_quantile_low"] = self.clip_quantile_low
            self._state_["clip_quantile_high"] = self.clip_quantile_high
            self._state_["enable_quantile_clip"] = self.enable_quantile_clip
        return fitted
    # ------------------------------------------------------------------

    def transform(self, X: pd.DataFrame):  # type: ignore[override]
        frame = self._ensure_dataframe(X).copy()
        for col in getattr(self, "columns_", []):
            if col in frame.columns:
                frame.loc[:, col] = pd.to_numeric(frame[col], errors="coerce")
        transformed = super().transform(frame)
        if self.enable_quantile_clip and self._clip_bounds_:
            for col, (low, high) in self._clip_bounds_.items():
                if col in transformed.columns:
                    transformed.loc[:, col] = transformed[col].clip(lower=low, upper=high)
        return transformed
    # ------------------------------------------------------------------

    def _resolve_columns(self, frame: pd.DataFrame) -> list[str]:
        if self.columns is None:
            return [c for c in frame.columns if isinstance(c, str) and c.startswith("I")]
        return [c for c in self.columns if isinstance(c, str)]

    def _resolve_calendar_column(self, frame: pd.DataFrame) -> str | None:
        calendar_column = self._user_calendar_column or self.calendar_column
        if self.policy in self.CALENDAR_REQUIRED_POLICIES and calendar_column is None:
            raise ValueError(
                f"Policy '{self.policy}' requires a calendar column but none was provided."
            )
        if calendar_column is not None and calendar_column not in frame.columns:
            raise KeyError(
                f"Calendar column '{calendar_column}' not found in input DataFrame."
            )
        return calendar_column

    def _relabel_generated_columns(self) -> None:
        rename_map: Dict[str, str] = {}
        extra_columns = getattr(self, "extra_columns_", [])
        if not extra_columns:
            return
        for col in extra_columns:
            rename_map[col] = self._rename_generated_column(col)
        if not rename_map:
            return
        if isinstance(getattr(self, "_train_filled_", None), pd.DataFrame):
            self._train_filled_ = self._train_filled_.rename(columns=rename_map)
        self.extra_columns_ = [str(rename_map.get(col, col)) for col in extra_columns]
        if hasattr(self, "_output_columns_"):
            self._output_columns_ = [str(rename_map.get(col, col)) for col in getattr(self, "_output_columns_", [])]
        if hasattr(self, "_state_") and isinstance(self._state_, dict):
            for key, value in list(self._state_.items()):
                if isinstance(value, pd.DataFrame):
                    self._state_[key] = value.rename(columns=rename_map)
            mask_map = self._state_.get("mask_map")
            if isinstance(mask_map, dict):
                self._state_["mask_map"] = {k: rename_map.get(v, v) for k, v in mask_map.items()}

    def _rename_generated_column(self, name: str) -> str:
        if name.endswith("_missing_flag"):
            base_name = name[: -len("_missing_flag")]
            return f"Imask__{base_name}"
        if name.startswith("I"):
            return name
        return f"Iextra__{name}"

    def _compute_clip_bounds(self) -> Dict[str, tuple[float, float]]:
        if not self.enable_quantile_clip:
            return {}
        q_low = float(self.clip_quantile_low)
        q_high = float(self.clip_quantile_high)
        if q_low < 0.0 or q_high > 1.0 or q_low >= q_high:
            return {}
        filled = getattr(self, "_train_filled_", None)
        if not isinstance(filled, pd.DataFrame):
            return {}
        bounds: Dict[str, tuple[float, float]] = {}
        for col in getattr(self, "columns_", []):
            if col not in filled.columns:
                continue
            numeric_values = pd.to_numeric(filled[col], errors="coerce")
            series = pd.Series(numeric_values).dropna()
            if series.empty:
                continue
            low_value = float(series.quantile(q_low))
            high_value = float(series.quantile(q_high))
            if not np.isfinite(low_value) or not np.isfinite(high_value):
                continue
            if high_value <= low_value:
                continue
            bounds[col] = (low_value, high_value)
        return bounds

# === Module: preprocess.P_group.p_group ===
from typing import Any, Dict, Iterable, Mapping, cast
import numpy as np
import pandas as pd

_BaseMGroupImputer = MGroupImputer

class PGroupImputer(_BaseMGroupImputer):
    """Imputer tailored for valuation (P-group) features.
    The implementation mirrors :class:`MGroupImputer` but adds P-specific behavior:
    - Default column discovery targets columns beginning with ``"P"``.
    - Helper columns generated by policies such as ``mask_plus_mean`` are renamed to
      remain P-namespaced (``Pmask__`` / ``Pextra__``).
    - After fitting, values are clipped by a robust median±MAD envelope (configurable)
      to damp extreme valuation swings. A quantile fallback is used when MAD is zero
      or insufficient samples are available.
    """
    CALENDAR_REQUIRED_POLICIES = {
        "dow_median",
        "dom_median",
        "month_median",
        "holiday_bridge",
        "time_interp",
    }

    def __init__(
        self,
        columns: Iterable[str] | None = None,
        policy: str = "ffill_bfill",
        rolling_window: int = 5,
        ema_alpha: float = 0.3,
        calendar_column: str | None = None,
        policy_params: Mapping[str, Any] | None = None,
        random_state: int = 42,
        *,
        mad_clip_scale: float = 4.0,
        mad_clip_min_samples: int = 25,
        enable_mad_clip: bool = True,
        fallback_quantile_low: float = 0.005,
        fallback_quantile_high: float = 0.995,
    ) -> None:
        self._user_calendar_column = calendar_column
        self.mad_clip_scale = float(mad_clip_scale)
        self.mad_clip_min_samples = int(mad_clip_min_samples)
        self.enable_mad_clip = bool(enable_mad_clip)
        self.fallback_quantile_low = float(fallback_quantile_low)
        self.fallback_quantile_high = float(fallback_quantile_high)
        self._clip_bounds_: Dict[str, tuple[float, float]] = {}
        self._prefit_warnings: list[str] = []
        super().__init__(
            columns=columns,
            policy=policy,
            rolling_window=rolling_window,
            ema_alpha=ema_alpha,
            calendar_column=calendar_column,
            policy_params=policy_params,
            random_state=random_state,
        )
    # ------------------------------------------------------------------

    def fit(self, X: pd.DataFrame, y: Any = None):  # type: ignore[override]
        frame = self._ensure_dataframe(X).copy()
        selected_columns = self._resolve_columns(frame)
        numeric_columns: list[str] = []
        for col in selected_columns:
            if col not in frame.columns:
                continue
            frame.loc[:, col] = pd.to_numeric(frame[col], errors="coerce")
            numeric_columns.append(col)
        self.columns = numeric_columns
        calendar_column = self._resolve_calendar_column(frame)
        if calendar_column is not None and calendar_column in frame.columns:
            calendar_series = pd.to_datetime(frame[calendar_column], errors="coerce")
            if calendar_series.isna().any():
                self._prefit_warnings.append("calendar_column_contains_non_parseable_values")
            if calendar_series.duplicated().any():
                self._prefit_warnings.append("calendar_column_contains_duplicates")
        fitted = super().fit(frame, y)
        self._relabel_generated_columns()
        self._clip_bounds_ = self._compute_clip_bounds()
        if hasattr(self, "_state_") and isinstance(self._state_, dict):
            if self._prefit_warnings:
                warnings = self._state_.setdefault("warnings", [])
                if isinstance(warnings, list):
                    warnings.extend(self._prefit_warnings)
            self._state_["mad_clip_bounds"] = dict(self._clip_bounds_)
            self._state_["mad_clip_scale"] = self.mad_clip_scale
            self._state_["mad_clip_min_samples"] = self.mad_clip_min_samples
            self._state_["enable_mad_clip"] = self.enable_mad_clip
            self._state_["fallback_quantile_low"] = self.fallback_quantile_low
            self._state_["fallback_quantile_high"] = self.fallback_quantile_high
        return fitted
    # ------------------------------------------------------------------

    def transform(self, X: pd.DataFrame):  # type: ignore[override]
        frame = self._ensure_dataframe(X).copy()
        for col in getattr(self, "columns_", []):
            if col in frame.columns:
                frame.loc[:, col] = pd.to_numeric(frame[col], errors="coerce")
        transformed = super().transform(frame)
        if self.enable_mad_clip and self._clip_bounds_:
            for col, (low, high) in self._clip_bounds_.items():
                if col in transformed.columns:
                    transformed.loc[:, col] = transformed[col].clip(lower=low, upper=high)
        return transformed
    # ------------------------------------------------------------------

    def _resolve_columns(self, frame: pd.DataFrame) -> list[str]:
        if self.columns is None:
            return [c for c in frame.columns if isinstance(c, str) and c.startswith("P")]
        return [c for c in self.columns if isinstance(c, str)]

    def _resolve_calendar_column(self, frame: pd.DataFrame) -> str | None:
        calendar_column = self._user_calendar_column or self.calendar_column
        if self.policy in self.CALENDAR_REQUIRED_POLICIES and calendar_column is None:
            raise ValueError(
                f"Policy '{self.policy}' requires a calendar column but none was provided."
            )
        if calendar_column is not None and calendar_column not in frame.columns:
            raise KeyError(
                f"Calendar column '{calendar_column}' not found in input DataFrame."
            )
        return calendar_column

    def _relabel_generated_columns(self) -> None:
        rename_map: Dict[str, str] = {}
        extra_columns = getattr(self, "extra_columns_", [])
        if not extra_columns:
            return
        for col in extra_columns:
            rename_map[col] = self._rename_generated_column(col)
        if not rename_map:
            return
        train_filled = getattr(self, "_train_filled_", None)
        if isinstance(train_filled, pd.DataFrame):
            self._train_filled_ = train_filled.rename(columns=rename_map)
        self.extra_columns_ = [str(rename_map.get(col, col)) for col in extra_columns]
        if hasattr(self, "_output_columns_"):
            self._output_columns_ = [str(rename_map.get(col, col)) for col in getattr(self, "_output_columns_", [])]
        state = getattr(self, "_state_", None)
        if isinstance(state, dict):
            for key, value in list(state.items()):
                if isinstance(value, pd.DataFrame):
                    state[key] = value.rename(columns=rename_map)
            mask_map = state.get("mask_map")
            if isinstance(mask_map, dict):
                state["mask_map"] = {k: rename_map.get(v, v) for k, v in mask_map.items()}

    def _rename_generated_column(self, name: str) -> str:
        if name.endswith("_missing_flag"):
            base_name = name[: -len("_missing_flag")]
            return f"Pmask__{base_name}"
        if name.startswith("P"):
            return name
        return f"Pextra__{name}"

    def _compute_clip_bounds(self) -> Dict[str, tuple[float, float]]:
        if not self.enable_mad_clip:
            return {}
        filled = getattr(self, "_train_filled_", None)
        if not isinstance(filled, pd.DataFrame):
            return {}
        bounds: Dict[str, tuple[float, float]] = {}
        q_low = float(self.fallback_quantile_low)
        q_high = float(self.fallback_quantile_high)
        use_quantile = 0.0 <= q_low < q_high <= 1.0
        for col in getattr(self, "columns_", []):
            if col not in filled.columns:
                continue
            numeric_series = cast(pd.Series, pd.to_numeric(filled[col], errors="coerce"))
            numeric_series = numeric_series.dropna()
            if numeric_series.empty:
                continue
            low = high = None
            values = numeric_series.to_numpy(dtype=float, copy=False)
            if values.size >= max(1, self.mad_clip_min_samples):
                median = float(np.median(values))
                mad = float(np.median(np.abs(values - median)))
                if np.isfinite(mad) and mad > 0.0 and np.isfinite(median):
                    spread = float(self.mad_clip_scale) * mad
                    low = median - spread
                    high = median + spread
            if (low is None or high is None or not np.isfinite(low) or not np.isfinite(high) or high <= low) and use_quantile:
                low_q = float(numeric_series.quantile(q_low))
                high_q = float(numeric_series.quantile(q_high))
                if np.isfinite(low_q) and np.isfinite(high_q) and high_q > low_q:
                    low, high = low_q, high_q
            if low is None or high is None or not np.isfinite(low) or not np.isfinite(high) or high <= low:
                continue
            bounds[col] = (float(low), float(high))
        return bounds

# === Module: preprocess.S_group.s_group ===
from typing import Any, Dict, Iterable, Mapping, cast
import numpy as np
import pandas as pd

_BaseMGroupImputer = MGroupImputer

class SGroupImputer(_BaseMGroupImputer):
    """Imputer tailored for sentiment (S-group) features.
    The implementation mirrors :class:`MGroupImputer` but adds S-specific behavior:
    - Default column discovery targets columns beginning with ``"S"``.
    - Helper columns generated by policies such as ``mask_plus_mean`` are renamed to
      remain S-namespaced (``Smask__`` / ``Sextra__``).
    - After fitting, values are clipped by a robust median±MAD envelope (configurable)
      to damp extreme sentiment swings. A quantile fallback is used when MAD is zero
      or insufficient samples are available.
    """
    CALENDAR_REQUIRED_POLICIES = {
        "dow_median",
        "dom_median",
        "month_median",
        "holiday_bridge",
        "time_interp",
    }

    def __init__(
        self,
        columns: Iterable[str] | None = None,
        policy: str = "ffill_bfill",
        rolling_window: int = 5,
        ema_alpha: float = 0.3,
        calendar_column: str | None = None,
        policy_params: Mapping[str, Any] | None = None,
        random_state: int = 42,
        *,
        mad_clip_scale: float = 4.0,
        mad_clip_min_samples: int = 25,
        enable_mad_clip: bool = True,
        fallback_quantile_low: float = 0.005,
        fallback_quantile_high: float = 0.995,
    ) -> None:
        self._user_calendar_column = calendar_column
        self.mad_clip_scale = float(mad_clip_scale)
        self.mad_clip_min_samples = int(mad_clip_min_samples)
        self.enable_mad_clip = bool(enable_mad_clip)
        self.fallback_quantile_low = float(fallback_quantile_low)
        self.fallback_quantile_high = float(fallback_quantile_high)
        self._clip_bounds_: Dict[str, tuple[float, float]] = {}
        self._prefit_warnings: list[str] = []
        super().__init__(
            columns=columns,
            policy=policy,
            rolling_window=rolling_window,
            ema_alpha=ema_alpha,
            calendar_column=calendar_column,
            policy_params=policy_params,
            random_state=random_state,
        )
    # ------------------------------------------------------------------

    def fit(self, X: pd.DataFrame, y: Any = None):  # type: ignore[override]
        frame = self._ensure_dataframe(X).copy()
        selected_columns = self._resolve_columns(frame)
        numeric_columns: list[str] = []
        for col in selected_columns:
            if col not in frame.columns:
                continue
            frame.loc[:, col] = pd.to_numeric(frame[col], errors="coerce")
            numeric_columns.append(col)
        self.columns = numeric_columns
        calendar_column = self._resolve_calendar_column(frame)
        if calendar_column is not None and calendar_column in frame.columns:
            calendar_series = pd.to_datetime(frame[calendar_column], errors="coerce")
            if calendar_series.isna().any():
                self._prefit_warnings.append("calendar_column_contains_non_parseable_values")
            if calendar_series.duplicated().any():
                self._prefit_warnings.append("calendar_column_contains_duplicates")
        fitted = super().fit(frame, y)
        self._relabel_generated_columns()
        self._clip_bounds_ = self._compute_clip_bounds()
        if hasattr(self, "_state_") and isinstance(self._state_, dict):
            if self._prefit_warnings:
                warnings = self._state_.setdefault("warnings", [])
                if isinstance(warnings, list):
                    warnings.extend(self._prefit_warnings)
            self._state_["mad_clip_bounds"] = dict(self._clip_bounds_)
            self._state_["mad_clip_scale"] = self.mad_clip_scale
            self._state_["mad_clip_min_samples"] = self.mad_clip_min_samples
            self._state_["enable_mad_clip"] = self.enable_mad_clip
            self._state_["fallback_quantile_low"] = self.fallback_quantile_low
            self._state_["fallback_quantile_high"] = self.fallback_quantile_high
        return fitted
    # ------------------------------------------------------------------

    def transform(self, X: pd.DataFrame):  # type: ignore[override]
        frame = self._ensure_dataframe(X).copy()
        for col in getattr(self, "columns_", []):
            if col in frame.columns:
                frame.loc[:, col] = pd.to_numeric(frame[col], errors="coerce")
        transformed = super().transform(frame)
        if self.enable_mad_clip and self._clip_bounds_:
            for col, (low, high) in self._clip_bounds_.items():
                if col in transformed.columns:
                    transformed.loc[:, col] = transformed[col].clip(lower=low, upper=high)
        return transformed
    # ------------------------------------------------------------------

    def _resolve_columns(self, frame: pd.DataFrame) -> list[str]:
        if self.columns is None:
            return [c for c in frame.columns if isinstance(c, str) and c.startswith("S")]
        return [c for c in self.columns if isinstance(c, str)]

    def _resolve_calendar_column(self, frame: pd.DataFrame) -> str | None:
        calendar_column = self._user_calendar_column or self.calendar_column
        if self.policy in self.CALENDAR_REQUIRED_POLICIES and calendar_column is None:
            raise ValueError(
                f"Policy '{self.policy}' requires a calendar column but none was provided."
            )
        if calendar_column is not None and calendar_column not in frame.columns:
            raise KeyError(
                f"Calendar column '{calendar_column}' not found in input DataFrame."
            )
        return calendar_column

    def _relabel_generated_columns(self) -> None:
        rename_map: Dict[str, str] = {}
        extra_columns = getattr(self, "extra_columns_", [])
        if not extra_columns:
            return
        for col in extra_columns:
            rename_map[col] = self._rename_generated_column(col)
        if not rename_map:
            return
        train_filled = getattr(self, "_train_filled_", None)
        if isinstance(train_filled, pd.DataFrame):
            self._train_filled_ = train_filled.rename(columns=rename_map)
        self.extra_columns_ = [str(rename_map.get(col, col)) for col in extra_columns]
        if hasattr(self, "_output_columns_"):
            self._output_columns_ = [str(rename_map.get(col, col)) for col in getattr(self, "_output_columns_", [])]
        state = getattr(self, "_state_", None)
        if isinstance(state, dict):
            for key, value in list(state.items()):
                if isinstance(value, pd.DataFrame):
                    state[key] = value.rename(columns=rename_map)
            mask_map = state.get("mask_map")
            if isinstance(mask_map, dict):
                state["mask_map"] = {k: rename_map.get(v, v) for k, v in mask_map.items()}

    def _rename_generated_column(self, name: str) -> str:
        if name.endswith("_missing_flag"):
            base_name = name[: -len("_missing_flag")]
            return f"Smask__{base_name}"
        if name.startswith("S"):
            return name
        return f"Sextra__{name}"

    def _compute_clip_bounds(self) -> Dict[str, tuple[float, float]]:
        if not self.enable_mad_clip:
            return {}
        filled = getattr(self, "_train_filled_", None)
        if not isinstance(filled, pd.DataFrame):
            return {}
        bounds: Dict[str, tuple[float, float]] = {}
        q_low = float(self.fallback_quantile_low)
        q_high = float(self.fallback_quantile_high)
        use_quantile = 0.0 <= q_low < q_high <= 1.0
        for col in getattr(self, "columns_", []):
            if col not in filled.columns:
                continue
            numeric_series = cast(pd.Series, pd.to_numeric(filled[col], errors="coerce"))
            numeric_series = numeric_series.dropna()
            if numeric_series.empty:
                continue
            low = high = None
            values = numeric_series.to_numpy(dtype=float, copy=False)
            if values.size >= max(1, self.mad_clip_min_samples):
                median = float(np.median(values))
                mad = float(np.median(np.abs(values - median)))
                if np.isfinite(mad) and mad > 0.0 and np.isfinite(median):
                    spread = float(self.mad_clip_scale) * mad
                    low = median - spread
                    high = median + spread
            if (low is None or high is None or not np.isfinite(low) or not np.isfinite(high) or high <= low) and use_quantile:
                low_q = float(numeric_series.quantile(q_low))
                high_q = float(numeric_series.quantile(q_high))
                if np.isfinite(low_q) and np.isfinite(high_q) and high_q > low_q:
                    low, high = low_q, high_q
            if low is None or high is None or not np.isfinite(low) or not np.isfinite(high) or high <= low:
                continue
            bounds[col] = (float(low), float(high))
        return bounds

# === Module: src.feature_generation.su1.feature_su1 ===
"""SU1（欠損構造コア特徴量）の生成ロジック。
本モジュールは ``docs/feature_generation/SU1.md`` に記載された方針を実装し、
scikit-learn 互換のトランスフォーマー ``SU1FeatureGenerator`` と、設定 YAML や
生データを読み込むためのヘルパー関数を提供する。
"""
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, Literal, Mapping, MutableMapping, Sequence
import numpy as np
import pandas as pd
import yaml
from sklearn.base import BaseEstimator, TransformerMixin

def _infer_group(column_name: str) -> str | None:
    """列名の接頭辞から特徴グループを推定する。"""
    prefix_chars: list[str] = []
    for char in column_name:
        if char.isalpha() and char.isupper():
            prefix_chars.append(char)
            continue
        if char.isdigit():
            break
        # 英数字以外が出現した場合は規約外列とみなす。
        return None
    if not prefix_chars:
        return None
    # 複数文字の接頭辞もそのままグループ識別子とする。
    return "".join(prefix_chars)

def _coerce_dtype(dtype_like: str) -> np.dtype:
    """dtype 指定文字列を ``numpy.dtype`` に変換する。"""
    try:
        dtype = np.dtype(dtype_like)
    except TypeError as exc:  # pragma: no cover - 防御的分岐
        raise ValueError(f"Invalid dtype specification: {dtype_like!r}") from exc
    return dtype

def _path_from_config(base_dir: Path, value: str) -> Path:
    """YAML 設定内で指定されたパスを解決する。"""
    raw_path = Path(value)
    return raw_path if raw_path.is_absolute() else (base_dir / raw_path).resolve()

@dataclass(frozen=True)

class SU1Config:
    """SU1 特徴量生成に必要な設定を保持するデータクラス。"""
    id_column: str
    exclude_columns: tuple[str, ...]
    target_groups: tuple[str, ...]
    gap_clip: int
    run_clip: int
    flag_dtype: np.dtype
    run_dtype: np.dtype
    include_avg_gap: bool
    include_avg_run: bool
    exclude_all_nan_for_means: bool
    raw_dir: Path
    train_filename: str
    test_filename: str

    @classmethod

    def from_mapping(cls, mapping: Mapping[str, Any], *, base_dir: Path) -> "SU1Config":
        data_section = mapping.get("data", {})
        exclude_columns = tuple(mapping.get("exclude_columns", ()))
        groups_mapping = mapping.get("groups", {})
        include_groups = set(groups_mapping.get("include", []))
        exclude_groups = set(groups_mapping.get("exclude", []))
        target_groups = tuple(sorted(include_groups.difference(exclude_groups)))
        if not target_groups:
            raise ValueError("SU1 configuration must specify at least one target group.")
        id_column = mapping.get("id_column", "date_id")
        gap_clip = int(mapping.get("gap_clip", 60))
        run_clip = int(mapping.get("run_clip", gap_clip))
        dtype_section = mapping.get("dtype", {})
        flag_dtype = _coerce_dtype(dtype_section.get("flag", "uint8"))
        run_dtype = _coerce_dtype(dtype_section.get("run", "int16"))
        include_group_means = mapping.get("include_group_means", {})
        include_avg_gap = bool(include_group_means.get("gap_ffill", True))
        include_avg_run = bool(include_group_means.get("run_na", True))
        exclude_all_nan_for_means = bool(include_group_means.get("exclude_all_nan", False))
        raw_dir = _path_from_config(base_dir, data_section.get("raw_dir", "data/raw"))
        train_filename = data_section.get("train_filename", "train.csv")
        test_filename = data_section.get("test_filename", "test.csv")
        return cls(
            id_column=id_column,
            exclude_columns=exclude_columns,
            target_groups=target_groups,
            gap_clip=gap_clip,
            run_clip=run_clip,
            flag_dtype=flag_dtype,
            run_dtype=run_dtype,
            include_avg_gap=include_avg_gap,
            include_avg_run=include_avg_run,
            exclude_all_nan_for_means=exclude_all_nan_for_means,
            raw_dir=raw_dir,
            train_filename=train_filename,
            test_filename=test_filename,
        )

    @property

    def train_path(self) -> Path:
        """学習データ CSV への絶対パスを返す。"""
        return (self.raw_dir / self.train_filename).resolve()

    @property

    def test_path(self) -> Path:
        """テストデータ CSV への絶対パスを返す。"""
        return (self.raw_dir / self.test_filename).resolve()

def load_su1_config(config_path: str | Path) -> SU1Config:
    """SU1 設定 YAML を読み込み :class:`SU1Config` を生成する。"""
    path = Path(config_path).resolve()
    with path.open("r", encoding="utf-8") as fh:
        full_cfg: Mapping[str, Any] = yaml.safe_load(fh) or {}
    try:
        su1_section = full_cfg["su1"]
    except KeyError as exc:  # pragma: no cover - 防御的分岐
        raise KeyError("'su1' section is required in feature_generation.yaml") from exc
    return SU1Config.from_mapping(su1_section, base_dir=path.parent)

def load_raw_dataset(config: SU1Config, *, dataset: Literal["train", "test"] = "train") -> pd.DataFrame:
    """SU1 特徴量生成用に生データセットを読み込む。"""
    if dataset not in {"train", "test"}:  # pragma: no cover - 防御的分岐
        raise ValueError("dataset must be 'train' or 'test'")
    csv_path = config.train_path if dataset == "train" else config.test_path
    if not csv_path.exists():
        raise FileNotFoundError(f"Raw data file not found: {csv_path}")
    df = pd.read_csv(csv_path)
    if config.id_column in df.columns:
        df = df.set_index(config.id_column)
    return df

def _clip_array(values: np.ndarray, clip_value: int) -> np.ndarray:
    """配列を上限値でクリップし、その参照を返す。"""
    np.clip(values, None, clip_value, out=values)
    return values

def _distance_from_last_observation(mask: np.ndarray, clip: int, dtype: np.dtype) -> np.ndarray:
    """NaN マスクから直近観測までの距離を算出する。"""
    out = np.zeros(mask.shape[0], dtype=dtype)
    last_obs_index = -1
    seen_obs = False
    for idx, is_missing in enumerate(mask):
        if is_missing:
            if not seen_obs:
                out[idx] = clip
            else:
                distance = idx - last_obs_index
                out[idx] = distance if distance <= clip else clip
        else:
            out[idx] = 0
            last_obs_index = idx
            seen_obs = True
    if not seen_obs:
        out.fill(0)
    return out

def _run_length(mask: np.ndarray, clip: int, dtype: np.dtype, *, target_missing: bool) -> np.ndarray:
    """NaN または観測が連続する長さを算出する。"""
    out = np.zeros(mask.shape[0], dtype=dtype)
    counter = 0
    for idx, is_missing in enumerate(mask):
        condition = is_missing if target_missing else not is_missing
        if condition:
            counter = counter + 1 if counter < clip else clip
            out[idx] = counter
        else:
            counter = 0
            out[idx] = 0
    return out

class SU1FeatureGenerator(BaseEstimator, TransformerMixin):
    """生データから SU1 欠損構造特徴量を生成するトランスフォーマー。"""

    def __init__(self, config: SU1Config):
        self.config = config
        self.feature_columns_: list[str] | None = None
        self.group_columns_: Dict[str, list[str]] | None = None

    def fit(self, X: pd.DataFrame, y: Any = None) -> "SU1FeatureGenerator":
        df = self._ensure_dataframe(X)
        self.feature_columns_ = self._select_feature_columns(df.columns)
        self.group_columns_ = self._build_group_columns(self.feature_columns_)
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        if self.feature_columns_ is None or self.group_columns_ is None:
            raise RuntimeError("The transformer must be fitted before calling transform().")
        df = self._ensure_dataframe(X)
        missing_columns = [col for col in self.feature_columns_ if col not in df.columns]
        if missing_columns:
            raise KeyError(f"Input dataframe is missing columns required for SU1: {missing_columns}")
        feature_df = self._generate_features(df)
        feature_df.index = df.index
        return feature_df
    # ------------------------------------------------------------------
    # 内部ヘルパー
    # ------------------------------------------------------------------

    def _ensure_dataframe(self, X: pd.DataFrame) -> pd.DataFrame:
        if not isinstance(X, pd.DataFrame):  # pragma: no cover - 防御的分岐
            raise TypeError("SU1FeatureGenerator expects a pandas.DataFrame input")
        return X.copy()

    def _select_feature_columns(self, columns: Iterable[str]) -> list[str]:
        selected: list[str] = []
        for column in columns:
            if column in self.config.exclude_columns:
                continue
            group = _infer_group(column)
            if group and group in self.config.target_groups:
                selected.append(column)
        if not selected:
            raise ValueError("No columns matched the SU1 configuration criteria.")
        return selected

    def _build_group_columns(self, columns: Sequence[str]) -> Dict[str, list[str]]:
        group_map: Dict[str, list[str]] = {group: [] for group in self.config.target_groups}
        for column in columns:
            group = _infer_group(column)
            if group in group_map:
                group_map[group].append(column)
        return group_map

    def _generate_features(self, df: pd.DataFrame) -> pd.DataFrame:
        feature_columns = self.feature_columns_
        group_columns = self.group_columns_
        if feature_columns is None or group_columns is None:
            raise RuntimeError("The transformer must be fitted before generating features.")
        data = df[feature_columns]
        mask = data.isna()
        all_nan_series = mask.all(axis=0)
        all_nan_lookup = all_nan_series.to_dict()
        flag_df = mask.astype(self.config.flag_dtype)
        flag_df.columns = [f"m/{col}" for col in feature_columns]
        gap_data: MutableMapping[str, np.ndarray] = {}
        run_na_data: MutableMapping[str, np.ndarray] = {}
        run_obs_data: MutableMapping[str, np.ndarray] = {}
        for column in feature_columns:
            column_mask = mask[column].to_numpy(dtype=bool)
            gap_values = _distance_from_last_observation(
                column_mask, self.config.gap_clip, self.config.run_dtype
            )
            run_na_values = _run_length(
                column_mask,
                self.config.run_clip,
                self.config.run_dtype,
                target_missing=True,
            )
            run_obs_values = _run_length(column_mask, self.config.run_clip, self.config.run_dtype, target_missing=False)
            gap_data[f"gap_ffill/{column}"] = _clip_array(gap_values, self.config.gap_clip)
            run_na_data[f"run_na/{column}"] = _clip_array(run_na_values, self.config.run_clip)
            run_obs_data[f"run_obs/{column}"] = _clip_array(run_obs_values, self.config.run_clip)
        gap_df = pd.DataFrame(gap_data, index=data.index)
        run_na_df = pd.DataFrame(run_na_data, index=data.index)
        run_obs_df = pd.DataFrame(run_obs_data, index=data.index)
        m_any_day = flag_df.sum(axis="columns").astype(self.config.run_dtype)
        m_rate_day = (m_any_day / len(feature_columns)).astype(np.float32)
        group_features: Dict[str, pd.Series] = {
            "m_any_day": m_any_day,
            "m_rate_day": m_rate_day,
            "m_cnt/ALL": m_any_day,
            "m_rate/ALL": m_rate_day,
        }
        for group, columns in group_columns.items():
            if not columns:
                continue
            flag_cols = [f"m/{col}" for col in columns]
            group_count = flag_df[flag_cols].sum(axis="columns").astype(self.config.run_dtype)
            group_rate = (group_count / len(columns)).astype(np.float32)
            group_features[f"m_cnt/{group}"] = group_count
            group_features[f"m_rate/{group}"] = group_rate
            if self.config.include_avg_gap:
                gap_cols = [f"gap_ffill/{col}" for col in columns]
                gap_values = gap_df[gap_cols].copy()
                if self.config.exclude_all_nan_for_means:
                    for orig_col, gap_col in zip(columns, gap_cols):
                        if all_nan_lookup.get(orig_col, False):
                            gap_values[gap_col] = np.nan
                group_features[f"avg_gapff/{group}"] = gap_values.mean(axis=1).astype(np.float32)
            if self.config.include_avg_run:
                run_cols = [f"run_na/{col}" for col in columns]
                run_values = run_na_df[run_cols].copy()
                if self.config.exclude_all_nan_for_means:
                    for orig_col, run_col in zip(columns, run_cols):
                        if all_nan_lookup.get(orig_col, False):
                            run_values[run_col] = np.nan
                group_features[f"avg_run_na/{group}"] = run_values.mean(axis=1).astype(np.float32)
        aggregated_df = pd.DataFrame(group_features, index=data.index)
        output_frames = [flag_df, gap_df, run_na_df, run_obs_df, aggregated_df]
        return pd.concat(output_frames, axis=1)

    def __init__(self, config: SU1Config):
        self.config = config
        self.feature_columns_: list[str] | None = None
        self.group_columns_: Dict[str, list[str]] | None = None

    def fit(self, X: pd.DataFrame, y: Any = None) -> "SU1FeatureGenerator":
        df = self._ensure_dataframe(X)
        self.feature_columns_ = self._select_feature_columns(df.columns)
        self.group_columns_ = self._build_group_columns(self.feature_columns_)
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        if self.feature_columns_ is None or self.group_columns_ is None:
            raise RuntimeError("The transformer must be fitted before calling transform().")
        df = self._ensure_dataframe(X)
        missing_columns = [col for col in self.feature_columns_ if col not in df.columns]
        if missing_columns:
            raise KeyError(f"Input dataframe is missing columns required for SU1: {missing_columns}")
        feature_df = self._generate_features(df)
        feature_df.index = df.index
        return feature_df
    # ------------------------------------------------------------------
    # 内部ヘルパー
    # ------------------------------------------------------------------

    def _ensure_dataframe(self, X: pd.DataFrame) -> pd.DataFrame:
        if not isinstance(X, pd.DataFrame):  # pragma: no cover - 防御的分岐
            raise TypeError("SU1FeatureGenerator expects a pandas.DataFrame input")
        return X.copy()

    def _select_feature_columns(self, columns: Iterable[str]) -> list[str]:
        selected: list[str] = []
        for column in columns:
            if column in self.config.exclude_columns:
                continue
            group = _infer_group(column)
            if group and group in self.config.target_groups:
                selected.append(column)
        if not selected:
            raise ValueError("No columns matched the SU1 configuration criteria.")
        return selected

    def _build_group_columns(self, columns: Sequence[str]) -> Dict[str, list[str]]:
        group_map: Dict[str, list[str]] = {group: [] for group in self.config.target_groups}
        for column in columns:
            group = _infer_group(column)
            if group in group_map:
                group_map[group].append(column)
        return group_map

    def _generate_features(self, df: pd.DataFrame) -> pd.DataFrame:
        feature_columns = self.feature_columns_
        group_columns = self.group_columns_
        if feature_columns is None or group_columns is None:
            raise RuntimeError("The transformer must be fitted before generating features.")
        data = df[feature_columns]
        mask = data.isna()
        all_nan_series = mask.all(axis=0)
        all_nan_lookup = all_nan_series.to_dict()
        flag_df = mask.astype(self.config.flag_dtype)
        flag_df.columns = [f"m/{col}" for col in feature_columns]
        gap_data: MutableMapping[str, np.ndarray] = {}
        run_na_data: MutableMapping[str, np.ndarray] = {}
        run_obs_data: MutableMapping[str, np.ndarray] = {}
        for column in feature_columns:
            column_mask = mask[column].to_numpy(dtype=bool)
            gap_values = _distance_from_last_observation(
                column_mask, self.config.gap_clip, self.config.run_dtype
            )
            run_na_values = _run_length(
                column_mask,
                self.config.run_clip,
                self.config.run_dtype,
                target_missing=True,
            )
            run_obs_values = _run_length(column_mask, self.config.run_clip, self.config.run_dtype, target_missing=False)
            gap_data[f"gap_ffill/{column}"] = _clip_array(gap_values, self.config.gap_clip)
            run_na_data[f"run_na/{column}"] = _clip_array(run_na_values, self.config.run_clip)
            run_obs_data[f"run_obs/{column}"] = _clip_array(run_obs_values, self.config.run_clip)
        gap_df = pd.DataFrame(gap_data, index=data.index)
        run_na_df = pd.DataFrame(run_na_data, index=data.index)
        run_obs_df = pd.DataFrame(run_obs_data, index=data.index)
        m_any_day = flag_df.sum(axis="columns").astype(self.config.run_dtype)
        m_rate_day = (m_any_day / len(feature_columns)).astype(np.float32)
        group_features: Dict[str, pd.Series] = {
            "m_any_day": m_any_day,
            "m_rate_day": m_rate_day,
            "m_cnt/ALL": m_any_day,
            "m_rate/ALL": m_rate_day,
        }
        for group, columns in group_columns.items():
            if not columns:
                continue
            flag_cols = [f"m/{col}" for col in columns]
            group_count = flag_df[flag_cols].sum(axis="columns").astype(self.config.run_dtype)
            group_rate = (group_count / len(columns)).astype(np.float32)
            group_features[f"m_cnt/{group}"] = group_count
            group_features[f"m_rate/{group}"] = group_rate
            if self.config.include_avg_gap:
                gap_cols = [f"gap_ffill/{col}" for col in columns]
                gap_values = gap_df[gap_cols].copy()
                if self.config.exclude_all_nan_for_means:
                    for orig_col, gap_col in zip(columns, gap_cols):
                        if all_nan_lookup.get(orig_col, False):
                            gap_values[gap_col] = np.nan
                group_features[f"avg_gapff/{group}"] = gap_values.mean(axis=1).astype(np.float32)
            if self.config.include_avg_run:
                run_cols = [f"run_na/{col}" for col in columns]
                run_values = run_na_df[run_cols].copy()
                if self.config.exclude_all_nan_for_means:
                    for orig_col, run_col in zip(columns, run_cols):
                        if all_nan_lookup.get(orig_col, False):
                            run_values[run_col] = np.nan
                group_features[f"avg_run_na/{group}"] = run_values.mean(axis=1).astype(np.float32)
        aggregated_df = pd.DataFrame(group_features, index=data.index)
        output_frames = [flag_df, gap_df, run_na_df, run_obs_df, aggregated_df]
        return pd.concat(output_frames, axis=1)

# === Module: src.feature_generation.su5.feature_su5 ===

def _infer_group_su5(column_name: str) -> str | None:
    """列名の接頭辞から特徴グループを推定する。"""
    prefix_chars: list[str] = []
    for char in column_name:
        if char.isalpha() and char.isupper():
            prefix_chars.append(char)
            continue
        if char.isdigit():
            break
        return None
    if not prefix_chars:
        return None
    return "".join(prefix_chars)


@dataclass(frozen=True)
class SU5Config:
    """SU5（共欠損構造）特徴生成の設定を保持するデータクラス。"""
    id_column: str
    output_prefix: str
    top_k_pairs: int
    top_k_pairs_per_group: int | None
    windows: Tuple[int, ...]
    reset_each_fold: bool
    dtype_flag: np.dtype
    dtype_int: np.dtype
    dtype_float: np.dtype


class SU5FeatureGenerator(BaseEstimator, TransformerMixin):
    """SU5 共欠損特徴量生成器。"""
    
    def __init__(self, config: SU5Config):
        self.config = config
        self.m_columns_: list[str] | None = None
        self.groups_: dict[str, list[str]] | None = None
        self.top_pairs_: list[tuple[str, str]] | None = None
        self.feature_names_: list[str] | None = None

    def fit(self, X: pd.DataFrame, y: Any = None):
        if not isinstance(X, pd.DataFrame):
            raise TypeError("SU5FeatureGenerator expects a pandas.DataFrame input")
        self.m_columns_ = sorted([c for c in X.columns if c.startswith("m/")])
        if not self.m_columns_:
            raise ValueError("No 'm/' columns found in input. SU5 requires SU1 features as input.")
        self.groups_ = self._extract_groups()
        self.top_pairs_ = self._select_top_k_pairs(X)
        self.feature_names_ = self._build_feature_names()
        return self

    def transform(self, X: pd.DataFrame, fold_indices: np.ndarray | None = None):
        if self.m_columns_ is None or self.top_pairs_ is None:
            raise RuntimeError("The transformer must be fitted before calling transform().")
        n = len(X)
        features: dict[str, np.ndarray] = {}
        fold_boundaries = self._compute_fold_boundaries(n, fold_indices)
        co_now = self._compute_co_miss_now(X, fold_boundaries)
        features.update(co_now)
        co_roll = self._compute_co_miss_rollrate(features, fold_boundaries)
        features.update(co_roll)
        co_deg = self._compute_co_miss_degree(X)
        features.update(co_deg)
        return pd.DataFrame(features, index=X.index)

    def _extract_groups(self):
        groups: dict[str, list[str]] = {}
        if self.m_columns_ is None:
            return groups
        for col in self.m_columns_:
            base_col = col[2:]
            group = _infer_group_su5(base_col)
            if group:
                if group not in groups:
                    groups[group] = []
                groups[group].append(base_col)
        return groups

    def _select_top_k_pairs(self, X: pd.DataFrame):
        if self.m_columns_ is None:
            return []
        pair_scores: list[tuple[float, str, str]] = []
        for i, col_a in enumerate(self.m_columns_):
            for col_b in self.m_columns_[i + 1 :]:
                m_a = np.asarray(X[col_a].values)
                m_b = np.asarray(X[col_b].values)
                both_na = int(np.sum((m_a == 1) & (m_b == 1)))
                either_na = int(np.sum((m_a == 1) | (m_b == 1)))
                if either_na > 0:
                    score = float(both_na) / float(either_na)
                else:
                    score = 0.0
                base_a = col_a[2:]
                base_b = col_b[2:]
                pair_scores.append((score, base_a, base_b))
        pair_scores.sort(reverse=True)
        if self.config.top_k_pairs_per_group is not None:
            selected_pairs = self._select_top_k_per_group(pair_scores)
        else:
            selected_pairs = [(a, b) for _, a, b in pair_scores[: self.config.top_k_pairs]]
        return selected_pairs

    def _select_top_k_per_group(self, pair_scores):
        if self.groups_ is None or self.config.top_k_pairs_per_group is None:
            return []
        selected: list[tuple[str, str]] = []
        group_counts: dict[str, int] = {grp: 0 for grp in self.groups_.keys()}
        for score, col_a, col_b in pair_scores:
            group_a = _infer_group_su5(col_a)
            group_b = _infer_group_su5(col_b)
            if group_a == group_b and group_a is not None:
                if group_counts[group_a] < self.config.top_k_pairs_per_group:
                    selected.append((col_a, col_b))
                    group_counts[group_a] += 1
        return selected

    def _build_feature_names(self):
        names: list[str] = []
        if self.top_pairs_ is None:
            return names
        for col_a, col_b in self.top_pairs_:
            names.append(f"co_miss_now/{col_a}__{col_b}")
        for window in self.config.windows:
            for col_a, col_b in self.top_pairs_:
                names.append(f"co_miss_rollrate_{window}/{col_a}__{col_b}")
        if self.m_columns_ is not None:
            for col in self.m_columns_:
                base_col = col[2:]
                names.append(f"co_miss_deg/{base_col}")
        return names

    def _compute_fold_boundaries(self, n_rows: int, fold_indices: np.ndarray | None):
        if fold_indices is None or not self.config.reset_each_fold:
            return [(0, n_rows)]
        boundaries = []
        unique_folds = np.unique(fold_indices)
        for fold_id in unique_folds:
            fold_mask = fold_indices == fold_id
            indices = np.where(fold_mask)[0]
            if len(indices) > 0:
                boundaries.append((int(indices[0]), int(indices[-1]) + 1))
        return boundaries if boundaries else [(0, n_rows)]

    def _compute_co_miss_now(self, X: pd.DataFrame, fold_boundaries):
        features: dict[str, np.ndarray] = {}
        if self.top_pairs_ is None:
            return features
        for col_a, col_b in self.top_pairs_:
            m_a = np.asarray(X[f"m/{col_a}"].values)
            m_b = np.asarray(X[f"m/{col_b}"].values)
            co_miss = ((m_a == 1) & (m_b == 1)).astype(self.config.dtype_flag)
            features[f"co_miss_now/{col_a}__{col_b}"] = co_miss
        return features

    def _compute_co_miss_rollrate(self, features: dict[str, np.ndarray], fold_boundaries):
        rollrate_features: dict[str, np.ndarray] = {}
        if self.top_pairs_ is None:
            return rollrate_features
        for window in self.config.windows:
            for col_a, col_b in self.top_pairs_:
                co_miss_now = features[f"co_miss_now/{col_a}__{col_b}"]
                n = len(co_miss_now)
                rollrate = np.full(n, np.nan, dtype=self.config.dtype_float)
                for start_idx, end_idx in fold_boundaries:
                    for i in range(start_idx, end_idx):
                        window_start = max(start_idx, i - window + 1)
                        window_end = i + 1
                        if window_end - window_start >= window:
                            window_values = co_miss_now[window_start:window_end]
                            rollrate[i] = np.mean(window_values)
                rollrate_features[f"co_miss_rollrate_{window}/{col_a}__{col_b}"] = rollrate
        return rollrate_features

    def _compute_co_miss_degree(self, X: pd.DataFrame):
        degree_features: dict[str, np.ndarray] = {}
        if self.m_columns_ is None or self.top_pairs_ is None:
            return degree_features
        degree_counts: dict[str, int] = {col[2:]: 0 for col in self.m_columns_}
        for col_a, col_b in self.top_pairs_:
            degree_counts[col_a] += 1
            degree_counts[col_b] += 1
        n = len(X)
        for col in self.m_columns_:
            base_col = col[2:]
            degree_value = degree_counts[base_col]
            degree_features[f"co_miss_deg/{base_col}"] = np.full(
                n, degree_value, dtype=self.config.dtype_int
            )
        return degree_features


class SU5FeatureAugmenter(BaseEstimator, TransformerMixin):
    """SU5特徴量をSU1の上に追加するトランスフォーマー。"""
    
    def __init__(self, su1_config: SU1Config, su5_config: SU5Config, fill_value: float | None = 0.0) -> None:
        self.su1_config = su1_config
        self.su5_config = su5_config
        self.fill_value = fill_value

    def fit(self, X: pd.DataFrame, y: Any = None):
        frame = self._ensure_dataframe(X)
        # SU1 fit
        self.su1_generator_ = SU1FeatureGenerator(self.su1_config)
        self.su1_generator_.fit(frame)
        su1_features = self.su1_generator_.transform(frame)
        # SU5 fit (using SU1 features)
        self.su5_generator_ = SU5FeatureGenerator(self.su5_config)
        self.su5_generator_.fit(su1_features)
        su5_features = self.su5_generator_.transform(su1_features)
        # Store feature names
        self.su1_feature_names_ = list(su1_features.columns)
        self.su5_feature_names_ = list(su5_features.columns)
        self.input_columns_ = list(frame.columns)
        return self

    def transform(self, X: pd.DataFrame, fold_indices: np.ndarray | None = None):
        if not hasattr(self, "su1_generator_"):
            raise RuntimeError("SU5FeatureAugmenter must be fitted before transform().")
        frame = self._ensure_dataframe(X)
        # Generate SU1 features
        su1_features = self.su1_generator_.transform(frame)
        su1_features = su1_features.reindex(columns=self.su1_feature_names_, copy=True)
        if self.fill_value is not None:
            su1_features = su1_features.fillna(self.fill_value)
        # Generate SU5 features
        su5_features = self.su5_generator_.transform(su1_features, fold_indices=fold_indices)
        su5_features = su5_features.reindex(columns=self.su5_feature_names_, copy=True)
        if self.fill_value is not None:
            su5_features = su5_features.fillna(self.fill_value)
        # Concatenate: original + SU1 + SU5
        augmented = pd.concat([frame, su1_features, su5_features], axis=1)
        augmented.index = frame.index
        return augmented

    @staticmethod
    def _ensure_dataframe(X: pd.DataFrame) -> pd.DataFrame:
        if not isinstance(X, pd.DataFrame):
            raise TypeError("SU5FeatureAugmenter expects a pandas.DataFrame input")
        return X.copy()


    def _select_feature_columns(self, columns: Iterable[str]) -> list[str]:
        selected: list[str] = []
        for column in columns:
            if column in self.config.exclude_columns:
                continue
            group = _infer_group(column)
            if group and group in self.config.target_groups:
                selected.append(column)
        if not selected:
            raise ValueError("No columns matched the SU1 configuration criteria.")
        return selected

    def _build_group_columns(self, columns: Sequence[str]) -> Dict[str, list[str]]:
        group_map: Dict[str, list[str]] = {group: [] for group in self.config.target_groups}
        for column in columns:
            group = _infer_group(column)
            if group in group_map:
                group_map[group].append(column)
        return group_map

    def _generate_features(self, df: pd.DataFrame) -> pd.DataFrame:
        feature_columns = self.feature_columns_
        group_columns = self.group_columns_
        if feature_columns is None or group_columns is None:
            raise RuntimeError("The transformer must be fitted before generating features.")
        data = df[feature_columns]
        mask = data.isna()
        all_nan_series = mask.all(axis=0)
        all_nan_lookup = all_nan_series.to_dict()
        flag_df = mask.astype(self.config.flag_dtype)
        flag_df.columns = [f"m/{col}" for col in feature_columns]
        gap_data: MutableMapping[str, np.ndarray] = {}
        run_na_data: MutableMapping[str, np.ndarray] = {}
        run_obs_data: MutableMapping[str, np.ndarray] = {}
        for column in feature_columns:
            column_mask = mask[column].to_numpy(dtype=bool)
            gap_values = _distance_from_last_observation(
                column_mask, self.config.gap_clip, self.config.run_dtype
            )
            run_na_values = _run_length(
                column_mask,
                self.config.run_clip,
                self.config.run_dtype,
                target_missing=True,
            )
            run_obs_values = _run_length(column_mask, self.config.run_clip, self.config.run_dtype, target_missing=False)
            gap_data[f"gap_ffill/{column}"] = _clip_array(gap_values, self.config.gap_clip)
            run_na_data[f"run_na/{column}"] = _clip_array(run_na_values, self.config.run_clip)
            run_obs_data[f"run_obs/{column}"] = _clip_array(run_obs_values, self.config.run_clip)
        gap_df = pd.DataFrame(gap_data, index=data.index)
        run_na_df = pd.DataFrame(run_na_data, index=data.index)
        run_obs_df = pd.DataFrame(run_obs_data, index=data.index)
        m_any_day = flag_df.sum(axis="columns").astype(self.config.run_dtype)
        m_rate_day = (m_any_day / len(feature_columns)).astype(np.float32)
        group_features: Dict[str, pd.Series] = {
            "m_any_day": m_any_day,
            "m_rate_day": m_rate_day,
            "m_cnt/ALL": m_any_day,
            "m_rate/ALL": m_rate_day,
        }
        for group, columns in group_columns.items():
            if not columns:
                continue
            flag_cols = [f"m/{col}" for col in columns]
            group_count = flag_df[flag_cols].sum(axis="columns").astype(self.config.run_dtype)
            group_rate = (group_count / len(columns)).astype(np.float32)
            group_features[f"m_cnt/{group}"] = group_count
            group_features[f"m_rate/{group}"] = group_rate
            if self.config.include_avg_gap:
                gap_cols = [f"gap_ffill/{col}" for col in columns]
                gap_values = gap_df[gap_cols].copy()
                if self.config.exclude_all_nan_for_means:
                    for orig_col, gap_col in zip(columns, gap_cols):
                        if all_nan_lookup.get(orig_col, False):
                            gap_values[gap_col] = np.nan
                group_features[f"avg_gapff/{group}"] = gap_values.mean(axis=1).astype(np.float32)
            if self.config.include_avg_run:
                run_cols = [f"run_na/{col}" for col in columns]
                run_values = run_na_df[run_cols].copy()
                if self.config.exclude_all_nan_for_means:
                    for orig_col, run_col in zip(columns, run_cols):
                        if all_nan_lookup.get(orig_col, False):
                            run_values[run_col] = np.nan
                group_features[f"avg_run_na/{group}"] = run_values.mean(axis=1).astype(np.float32)
        aggregated_df = pd.DataFrame(group_features, index=data.index)
        output_frames = [flag_df, gap_df, run_na_df, run_obs_df, aggregated_df]
        return pd.concat(output_frames, axis=1)

def generate_su1_features(
    config_path: str | Path,
    *,
    dataset: Literal["train", "test"] = "train",
) -> pd.DataFrame:
    """High-level helper to produce SU1 features from raw data."""
    config = load_su1_config(config_path)
    raw_df = load_raw_dataset(config, dataset=dataset)
    generator = SU1FeatureGenerator(config)
    generator.fit(raw_df)
    return generator.transform(raw_df)
__all__ = [
    "SU1Config",
    "SU1FeatureGenerator",
    "generate_su1_features",
    "load_raw_dataset",
    "load_su1_config",
]

# === Module: src.feature_generation.su1.train_su1 (subset) ===

class SU1FeatureAugmenter(BaseEstimator, TransformerMixin):
    """下流前処理の前に SU1 特徴量を入力フレームへ追加するトランスフォーマー。
    :class:`SU1FeatureGenerator` の挙動を保ったまま列順制御などの利便性を補い、``fit`` 時に
    必要な生列および生成列の順序を記録して ``transform`` で決定的な配置を再現する。
    """

    def __init__(self, config: SU1Config, fill_value: float | None = 0.0) -> None:
        self.config = config
        self.fill_value = fill_value

    def fit(self, X: pd.DataFrame, y: Any = None) -> "SU1FeatureAugmenter":
        frame = self._ensure_dataframe(X)
        generator = SU1FeatureGenerator(self.config)
        generator.fit(frame)
        features = generator.transform(frame)
        if self.fill_value is not None:
            features = features.fillna(self.fill_value)
        # transform 時に再利用する内部状態を保持する。
        self.generator_ = generator
        self.su1_feature_names_ = list(features.columns)
        self.input_columns_ = list(frame.columns)
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        if not hasattr(self, "generator_"):
            raise RuntimeError("SU1FeatureAugmenter must be fitted before transform().")
        frame = self._ensure_dataframe(X)
        features = self.generator_.transform(frame)
        # fit/transform の境界を跨いでも列順が変わらないように揃える。
        features = features.reindex(columns=self.su1_feature_names_, copy=True)
        if self.fill_value is not None:
            features = features.fillna(self.fill_value)
        augmented = pd.concat([frame, features], axis=1)
        augmented.index = frame.index
        return augmented

    @staticmethod

    def _ensure_dataframe(X: pd.DataFrame) -> pd.DataFrame:
        if not isinstance(X, pd.DataFrame):  # pragma: no cover - 防御的な分岐
            raise TypeError("SU1FeatureAugmenter expects a pandas.DataFrame input")
        return X.copy()

def _ensure_package(name: str) -> types.ModuleType:
    if name in sys.modules:
        return sys.modules[name]
    module = types.ModuleType(name)
    module.__path__ = []
    sys.modules[name] = module
    parent, _, child = name.rpartition(".")
    if parent:
        parent_pkg = _ensure_package(parent)
        setattr(parent_pkg, child, module)
    return module
preprocess_pkg = _ensure_package("preprocess")
M_pkg = _ensure_package("preprocess.M_group")
E_pkg = _ensure_package("preprocess.E_group")
I_pkg = _ensure_package("preprocess.I_group")
P_pkg = _ensure_package("preprocess.P_group")
S_pkg = _ensure_package("preprocess.S_group")
m_module = types.ModuleType("preprocess.M_group.m_group")
m_module.MGroupImputer = MGroupImputer
sys.modules["preprocess.M_group.m_group"] = m_module
M_pkg.m_group = m_module
e_module = types.ModuleType("preprocess.E_group.e_group")
e_module.EGroupImputer = EGroupImputer
sys.modules["preprocess.E_group.e_group"] = e_module
E_pkg.e_group = e_module
i_module = types.ModuleType("preprocess.I_group.i_group")
i_module.IGroupImputer = IGroupImputer
sys.modules["preprocess.I_group.i_group"] = i_module
I_pkg.i_group = i_module
p_module = types.ModuleType("preprocess.P_group.p_group")
p_module.PGroupImputer = PGroupImputer
sys.modules["preprocess.P_group.p_group"] = p_module
P_pkg.p_group = p_module
s_module = types.ModuleType("preprocess.S_group.s_group")
s_module.SGroupImputer = SGroupImputer
sys.modules["preprocess.S_group.s_group"] = s_module
S_pkg.s_group = s_module
preprocess_pkg.M_group = M_pkg
preprocess_pkg.E_group = E_pkg
preprocess_pkg.I_group = I_pkg
preprocess_pkg.P_group = P_pkg
preprocess_pkg.S_group = S_pkg
htmpre_pkg = _ensure_package("htmpre")
htmpre_m = types.ModuleType("htmpre.m_group")
htmpre_m.MGroupImputer = MGroupImputer
sys.modules["htmpre.m_group"] = htmpre_m
htmpre_pkg.m_group = htmpre_m
htmpre_e = types.ModuleType("htmpre.e_group")
htmpre_e.EGroupImputer = EGroupImputer
sys.modules["htmpre.e_group"] = htmpre_e
htmpre_pkg.e_group = htmpre_e
htmpre_i = types.ModuleType("htmpre.i_group")
htmpre_i.IGroupImputer = IGroupImputer
sys.modules["htmpre.i_group"] = htmpre_i
htmpre_pkg.i_group = htmpre_i
htmpre_p = types.ModuleType("htmpre.p_group")
htmpre_p.PGroupImputer = PGroupImputer
sys.modules["htmpre.p_group"] = htmpre_p
htmpre_pkg.p_group = htmpre_p
htmpre_s = types.ModuleType("htmpre.s_group")
htmpre_s.SGroupImputer = SGroupImputer
sys.modules["htmpre.s_group"] = htmpre_s
htmpre_pkg.s_group = htmpre_s
src_pkg = _ensure_package("src")
feature_pkg = _ensure_package("src.feature_generation")
su1_pkg = _ensure_package("src.feature_generation.su1")
su1_feature_module = types.ModuleType("src.feature_generation.su1.feature_su1")
su1_feature_module.SU1Config = SU1Config
su1_feature_module.SU1FeatureGenerator = SU1FeatureGenerator
su1_feature_module.load_su1_config = load_su1_config
su1_feature_module.load_raw_dataset = load_raw_dataset
sys.modules["src.feature_generation.su1.feature_su1"] = su1_feature_module
su1_pkg.feature_su1 = su1_feature_module
su1_train_module = types.ModuleType("src.feature_generation.su1.train_su1")
su1_train_module.SU1FeatureAugmenter = SU1FeatureAugmenter
sys.modules["src.feature_generation.su1.train_su1"] = su1_train_module

# Register SU5 module
su5_pkg = _ensure_package("src.feature_generation.su5")
su5_feature_module = types.ModuleType("src.feature_generation.su5.feature_su5")
su5_feature_module.SU5Config = SU5Config
su5_feature_module.SU5FeatureGenerator = SU5FeatureGenerator
su5_feature_module.SU5FeatureAugmenter = SU5FeatureAugmenter
su5_feature_module._infer_group_su5 = _infer_group_su5
sys.modules["src.feature_generation.su5.feature_su5"] = su5_feature_module
sys.modules["src.feature_generation.su5"] = su5_pkg
su5_pkg.feature_su5 = su5_feature_module
feature_pkg.su5 = su5_pkg

# Register SU5 train_su5 module (for evaluate_baseline.py bundles)
su5_train_module = types.ModuleType("src.feature_generation.su5.train_su5")
su5_train_module.SU5FeatureAugmenter = SU5FeatureAugmenter
su5_train_module.SU1FeatureGenerator = SU1FeatureGenerator
su5_train_module.SU5FeatureGenerator = SU5FeatureGenerator
su5_train_module.SU1Config = SU1Config
su5_train_module.SU5Config = SU5Config
sys.modules["src.feature_generation.su5.train_su5"] = su5_train_module
su5_pkg.train_su5 = su5_train_module

su1_pkg.train_su1 = su1_train_module
BUNDLE_PATH = ARTIFACT_ROOT / "inference_bundle.pkl"
META_PATH = ARTIFACT_ROOT / "model_meta.json"
FEATURE_LIST_PATH = ARTIFACT_ROOT / "feature_list.json"
for required_path in (BUNDLE_PATH, META_PATH, FEATURE_LIST_PATH):
    if not required_path.exists():
        raise FileNotFoundError(f"Required artifact not found: {required_path}")
with META_PATH.open("r", encoding="utf-8") as fp:
    META = json.load(fp)


@dataclass(frozen=True)
class TwoHeadPositionConfig:
    """Configuration for two-head position mapping."""
    x: float
    clip_min: float = 0.0
    clip_max: float = 2.0
    epsilon: float = 1e-8


def map_positions_from_forward_rf(
    forward_pred: np.ndarray,
    rf_pred: np.ndarray,
    x: float,
    clip_min: float = 0.0,
    clip_max: float = 2.0,
    epsilon: float = 1e-8,
) -> np.ndarray:
    """Map predictions to positions using two-head formula.
    
    Formula: position = clip((x - rf_pred) / (forward_pred - rf_pred), clip_min, clip_max)
    """
    forward_pred = np.asarray(forward_pred, dtype=float)
    rf_pred = np.asarray(rf_pred, dtype=float)
    
    denominator = forward_pred - rf_pred
    denominator = np.where(
        np.abs(denominator) < epsilon,
        np.sign(denominator) * epsilon,
        denominator
    )
    denominator = np.where(denominator == 0, epsilon, denominator)
    
    raw_position = (x - rf_pred) / denominator
    return np.clip(raw_position, clip_min, clip_max)


def _resolve_position_config(meta: Mapping[str, Any]) -> TwoHeadPositionConfig:
    """Extract position config from metadata."""
    x = float(meta.get("best_x", 0.0))
    return TwoHeadPositionConfig(x=x, clip_min=0.0, clip_max=2.0, epsilon=1e-8)


def _hash_file(path: Path) -> str:
    hasher = hashlib.sha256()
    with path.open("rb") as fh:
        for chunk in iter(lambda: fh.read(8192), b""):
            hasher.update(chunk)
    return hasher.hexdigest()


def _check_bundle_compat(meta: Mapping[str, Any]) -> None:
    lib_versions = meta.get("library_versions") if isinstance(meta, Mapping) else None
    if isinstance(lib_versions, Mapping):
        for lib_name, expected_version in lib_versions.items():
            if not isinstance(lib_name, str):
                continue
            try:
                module = importlib.import_module(lib_name)
                actual_version = getattr(module, "__version__", None)
            except Exception:
                actual_version = None
            if expected_version is None or actual_version is None:
                print(
                    f"[warn] unable to verify version for {lib_name}: expected={expected_version} actual={actual_version}"
                )
                continue
            if str(actual_version) != str(expected_version):
                print(
                    f"[warn] library version mismatch for {lib_name}: expected {expected_version}, running {actual_version}"
                )
    for key in ("config_hash", "config_digest"):
        expected_hash = meta.get(key)
        if not isinstance(expected_hash, str):
            continue
        config_path_str = meta.get("config_path")
        if not isinstance(config_path_str, str):
            continue
        config_path = Path(config_path_str)
        if not config_path.exists():
            print(f"[warn] unable to verify config hash; file not found: {config_path}")
            continue
        actual_hash = _hash_file(config_path)
        if actual_hash != expected_hash:
            print(
                f"[warn] config hash mismatch: expected {expected_hash}, current {actual_hash} (path={config_path})"
            )
    preprocess_hash = meta.get("preprocess_config_hash")
    preprocess_path_str = meta.get("preprocess_config_path")
    if isinstance(preprocess_hash, str) and isinstance(preprocess_path_str, str):
        preprocess_path = Path(preprocess_path_str)
        if preprocess_path.exists():
            actual_pp_hash = _hash_file(preprocess_path)
            if actual_pp_hash != preprocess_hash:
                print(
                    f"[warn] preprocess config hash mismatch: expected {preprocess_hash}, current {actual_pp_hash}"
                )
        else:
            print(
                f"[warn] unable to verify preprocess config hash; file not found: {preprocess_path}"
            )


POSITION_CONFIG = _resolve_position_config(META)
print("Two-head position config:", POSITION_CONFIG)
print("Hull Sharpe (CV):", META.get("hull_sharpe"))
print("Best x:", META.get("best_x"))

with FEATURE_LIST_PATH.open("r", encoding="utf-8") as fp:
    FEATURE_MANIFEST: Dict[str, Any] = json.load(fp)
PIPELINE_INPUT_COLUMNS = list(FEATURE_MANIFEST.get("pipeline_input_columns") or [])
if not PIPELINE_INPUT_COLUMNS:
    raise ValueError("pipeline_input_columns missing in feature_list.json")
SU1_GENERATED_COLUMNS = FEATURE_MANIFEST.get("su1_generated_columns", [])
ID_COL = str(META.get("id_col", "date_id"))
TARGET_COL = str(META.get("target_col", "market_forward_excess_returns"))
DROP_NON_FEATURES = {col for col in ("row_id", TARGET_COL, "is_scored") if col and col != ID_COL}

def _ensure_columns(frame: pd.DataFrame, columns: Sequence[str]) -> pd.DataFrame:
    missing = [col for col in columns if col not in frame.columns]
    if missing:
        preview = ", ".join(str(col) for col in missing[:5])
        print(f"[info] adding {len(missing)} missing columns (preview: {preview})")
        for col in missing:
            frame[col] = np.nan
    return frame.reindex(columns=list(columns))

def _coerce_numeric_like_columns(frame: pd.DataFrame) -> None:
    object_cols = frame.select_dtypes(include="object").columns
    for col in object_cols:
        converted = pd.to_numeric(frame[col], errors="coerce")
        if converted.notna().any() or frame[col].notna().sum() == 0:
            frame[col] = converted

def _extract_required_calendar_columns(meta: Mapping[str, Any]) -> set[str]:
    required: set[str] = set()
    imputer_meta = meta.get("imputer_metadata")
    if isinstance(imputer_meta, Mapping):
        for info in imputer_meta.values():
            if not isinstance(info, Mapping):
                continue
            calendar_value = info.get("calendar_column")
            if isinstance(calendar_value, str) and calendar_value.strip():
                required.add(calendar_value.strip())
    return required
REQUIRED_CALENDAR_COLUMNS = _extract_required_calendar_columns(META)
_check_bundle_compat(META)
BUNDLE = joblib.load(BUNDLE_PATH)
# Two-head bundle format: dict with forward_model, rf_model, augmenter, excluded_features, position_config
if not isinstance(BUNDLE, dict):
    raise ValueError("Two-head bundle must be a dict with forward_model and rf_model")

FORWARD_MODEL = BUNDLE.get("forward_model")
RF_MODEL = BUNDLE.get("rf_model")
AUGMENTER = BUNDLE.get("augmenter")
EXCLUDED_FEATURES: list[str] = BUNDLE.get("excluded_features", [])
BUNDLE_POSITION_CONFIG = BUNDLE.get("position_config", {})

if FORWARD_MODEL is None or RF_MODEL is None:
    raise ValueError("Two-head bundle must contain both forward_model and rf_model")

# Update position config from bundle if available
if BUNDLE_POSITION_CONFIG:
    POSITION_CONFIG = TwoHeadPositionConfig(
        x=float(BUNDLE_POSITION_CONFIG.get("x", POSITION_CONFIG.x)),
        clip_min=float(BUNDLE_POSITION_CONFIG.get("clip_min", POSITION_CONFIG.clip_min)),
        clip_max=float(BUNDLE_POSITION_CONFIG.get("clip_max", POSITION_CONFIG.clip_max)),
        epsilon=float(BUNDLE_POSITION_CONFIG.get("epsilon", POSITION_CONFIG.epsilon)),
    )
    print("Updated position config from bundle:", POSITION_CONFIG)

print("Loaded two-head bundle:", BUNDLE_PATH)
print(f"Forward model: {type(FORWARD_MODEL).__name__}")
print(f"RF model: {type(RF_MODEL).__name__}")
print(f"Augmenter: {type(AUGMENTER).__name__ if AUGMENTER else 'None'}")
print(f"Excluded features: {len(EXCLUDED_FEATURES)}")
print("Pipeline input columns:", len(PIPELINE_INPUT_COLUMNS))
print("Total feature count:", FEATURE_MANIFEST.get("total_feature_count", len(PIPELINE_INPUT_COLUMNS)))
DATA_DIR = Path("/kaggle/input/hull-tactical-market-prediction")
if not DATA_DIR.exists():
    raise FileNotFoundError(
        "Competition dataset not mounted at /kaggle/input/hull-tactical-market-prediction"
    )

def align_features(df: pd.DataFrame, *, sort_by_id: bool) -> tuple[pd.DataFrame, pd.DataFrame]:
    working = df.reset_index(drop=True).copy()
    drop_cols = [col for col in DROP_NON_FEATURES if col in working.columns]
    if drop_cols:
        working = working.drop(columns=drop_cols)
    working["__original_order__"] = np.arange(len(working))
    if sort_by_id and ID_COL in working.columns:
        working_sorted = working.sort_values(ID_COL).reset_index(drop=True)
    else:
        working_sorted = working
    feature_frame = working_sorted.drop(columns=["__original_order__"])
    feature_frame = _ensure_columns(feature_frame, PIPELINE_INPUT_COLUMNS)
    _coerce_numeric_like_columns(feature_frame)
    return working_sorted, feature_frame

def run_pipeline(X: pd.DataFrame) -> np.ndarray:
    """Run two-head pipeline and return positions.
    
    1. Apply augmenter (SU1 + SU5 feature generation)
    2. Apply feature exclusion (tier3)
    3. Predict forward_returns and risk_free_rate
    4. Apply two-head formula to compute positions
    """
    # Apply augmenter if present
    if AUGMENTER is not None:
        X_augmented = AUGMENTER.transform(X)
        # Apply feature exclusion if specified
        if EXCLUDED_FEATURES:
            cols_to_drop = [c for c in X_augmented.columns if c in EXCLUDED_FEATURES]
            X_augmented = X_augmented.drop(columns=cols_to_drop, errors="ignore")
    else:
        X_augmented = X
    
    # Predict with both heads
    forward_pred = np.asarray(FORWARD_MODEL.predict(X_augmented), dtype=float).ravel()
    rf_pred = np.asarray(RF_MODEL.predict(X_augmented), dtype=float).ravel()
    
    # Apply two-head formula: position = clip((x - rf) / (forward - rf), 0, 2)
    positions = map_positions_from_forward_rf(
        forward_pred=forward_pred,
        rf_pred=rf_pred,
        x=POSITION_CONFIG.x,
        clip_min=POSITION_CONFIG.clip_min,
        clip_max=POSITION_CONFIG.clip_max,
        epsilon=POSITION_CONFIG.epsilon,
    )
    return positions

def predict_bulk() -> tuple[pd.DataFrame, pd.DataFrame]:
    if (DATA_DIR / "test.parquet").exists():
        test_df = pd.read_parquet(DATA_DIR / "test.parquet")
    else:
        test_df = pd.read_csv(DATA_DIR / "test.csv")
    missing_calendar_cols = [col for col in REQUIRED_CALENDAR_COLUMNS if col not in test_df.columns]
    if missing_calendar_cols:
        raise KeyError(
            "Missing calendar columns required by preprocessing: "
            + ", ".join(sorted(map(str, missing_calendar_cols)))
        )
    sorted_frame, X = align_features(test_df, sort_by_id=True)
    positions = run_pipeline(X)  # Already in [0, 2] range from two-head formula
    sorted_frame = sorted_frame.assign(
        prediction=positions.astype(np.float32, copy=False),
    )
    sorted_frame = sorted_frame.sort_values("__original_order__").reset_index(drop=True)
    predictions = sorted_frame["prediction"].to_numpy()
    if "is_scored" not in test_df.columns:
        raise KeyError("Expected 'is_scored' column in test data for submission filtering.")
    scored_mask = test_df["is_scored"].to_numpy(dtype=bool)
    scored_ids = test_df.loc[scored_mask, ID_COL].to_numpy()
    scored_predictions = predictions[scored_mask]
    if scored_ids.size != scored_predictions.size:
        raise RuntimeError("Mismatch between scored ids and predictions lengths.")
    submission = pd.DataFrame(
        {
            ID_COL: scored_ids.astype(np.int64, copy=False),
            "prediction": scored_predictions.astype(np.float32, copy=False),
        }
    )
    submission = submission.sort_values(ID_COL).reset_index(drop=True)
    if list(submission.columns) != [ID_COL, "prediction"]:
        submission = submission[[ID_COL, "prediction"]]
    if not np.isfinite(submission["prediction"]).all():
        raise ValueError("Submission contains non-finite predictions.")
    if not submission[ID_COL].is_unique:
        raise ValueError("Submission date_id values must be unique.")
    expected_count = int(scored_mask.sum())
    if len(submission) != expected_count:
        raise ValueError(
            f"submission length {len(submission)} differs from scored rows {expected_count}"
        )
    submission[ID_COL] = submission[ID_COL].astype("int64", copy=False)
    submission["prediction"] = submission["prediction"].astype("float32", copy=False)
    submission_path = Path("/kaggle/working/submission.csv")
    submission_parquet_path = submission_path.with_suffix(".parquet")
    submission.to_parquet(submission_parquet_path, index=False)
    submission.to_csv(submission_path, index=False)
    print("Saved submission.csv:", submission_path)
    print("Saved submission.parquet:", submission_parquet_path)
    return test_df, submission


def predict(test: pl.DataFrame) -> float:
    """Single-row prediction for Kaggle inference server."""
    pdf = test.to_pandas()
    _, X = align_features(pdf, sort_by_id=False)
    positions = run_pipeline(X)  # Already in [0, 2] range
    return float(np.asarray(positions, dtype=np.float32)[-1])
TEST_DF, SUBMISSION_DF = predict_bulk()
print("submission preview:")
print(SUBMISSION_DF.head())
inference_server = kies.DefaultInferenceServer(predict)
if os.getenv("KAGGLE_IS_COMPETITION_RERUN"):
    inference_server.serve()
else:
    inference_server.run_local_gateway(("/kaggle/input/hull-tactical-market-prediction/",))