In [0]:
################################################################################
# Missing Values (GroupImputer)
################################################################################

class GroupImputer(BaseEstimator, TransformerMixin):
    """
    Hierarchical imputer for numeric + categorical features.

    Idea:
    ----
    We have to  compute the median value for the train dataset and fill the missing values in train, validation and test set with the median from the train dataset.
    For each row with a missing value, fill it using statistics from "similar" rows first, and only fall back to global statistics if needed.

    Extra (age ↔ mileage consistency):
    -------------------------------
    We found that `mileage` and `age` are strongly correlated.
    Therefore, before the group hierarchy, we add a lightweight bucket-based fallback:

    - If `mileage` is missing and `age` is available:
        fill with median mileage of the corresponding age bucket (learned on train fold in fit()).
    - If `age` is missing and `mileage` is available:
        fill with median age of the corresponding mileage bucket (learned on train fold in fit()).

    Hierarchy for numeric columns (num_cols):
        0) (special case) mileage from age-bucket median, age from mileage-bucket median
        1) median per (group_cols[0], group_cols[1])    > we use brand, model
        2) median per group_cols[0]                     > we use brand
        3) global median across all rows

    Hierarchy for categorical columns (cat_cols):
        1) mode per (group_cols[0], group_cols[1])      > we use brand, model
        2) mode per group_cols[0]                       > we use brand
        3) global mode across all rows

    Notes:
    -----
    - `group_cols` are used only to define groups; they themselves are not imputed.
    - `num_cols` and `cat_cols` can be given explicitly (lists of column names). If None, they are inferred from the dtypes in `fit`.
    """

    def __init__(
        self,
        group_cols=("brand", "model"),
        num_cols=None,
        cat_cols=None,
        fallback="__MISSING__",
        verbose=False,
        verbose_top_n=10,
        # NEW: correlation-based bucket imputation (lean defaults)
        age_col="age",
        mileage_col="mileage",
        n_bins_age=8,
        n_bins_mileage=8,
        min_bucket_samples=20,
    ):
        self.group_cols = group_cols
        self.num_cols = num_cols
        self.cat_cols = cat_cols
        self.fallback = fallback
        self.verbose = verbose
        self.verbose_top_n = verbose_top_n

        self.age_col = age_col
        self.mileage_col = mileage_col
        self.n_bins_age = n_bins_age
        self.n_bins_mileage = n_bins_mileage
        self.min_bucket_samples = min_bucket_samples

    # helpers
    def _mode(self, s: pd.Series):
        """
        Deterministic mode helper.

        - Compute the most frequent non-null value.
        - If multiple values tie, Series.mode() returns them in order, we take .iloc[0].
        - If there is no valid mode (all NaN), return fallback token.
        """
        m = s.mode(dropna=True)
        if not m.empty:
            return m.iloc[0]
        return self.fallback

    def _get_group_series(self, df: pd.DataFrame, col_name: str) -> pd.Series:
        """
        Get the FIRST physical column with the given label from df.

        Reason
        ------
        - In some workflows, df.columns can contain duplicate labels
          (e.g. "brand" appearing twice after some operations).
        - df["brand"] would then raise "Grouper for 'brand' not 1-dimensional".
        - By using np.where(df.columns == col_name)[0] we get *positions* and
          explicitly pick the first one.

        Raises
        ------
        ValueError if no column with that name exists.
        """
        matches = np.where(df.columns == col_name)[0]
        if len(matches) == 0:
            raise ValueError(f"GroupImputer: grouping column '{col_name}' not found in data.")
        return df.iloc[:, matches[0]]

    # NEW: robust quantile-based bin edges (works with out-of-range at transform time)
    def _make_bin_edges(self, s: pd.Series, n_bins: int):
        vals = pd.to_numeric(s, errors="coerce").dropna().values
        if vals.size < self.min_bucket_samples:
            return None
        qs = np.linspace(0.0, 1.0, int(n_bins) + 1)
        edges = np.unique(np.quantile(vals, qs))
        # need at least 2 distinct cut points (+/- inf wrapper makes >=3 total)
        if edges.size < 2:
            return None
        inner = edges[1:-1]  # can be empty
        return np.r_[-np.inf, inner, np.inf]

    def fit(self, X, y=None):
        """
        Learn the group-level and global statistics from the training data.

        Steps
        -----
        1) Convert X to DataFrame and remember the original column order.
        2) Resolve which columns are numeric/categorical to impute.
        3) Build group keys (g0, g1) from group_cols (e.g. brand, model).
        4) For numeric columns:
            - compute global medians
            - medians per g0 (e.g. per brand)
            - medians per (g0, g1) (e.g. per brand+model)
        5) For categorical columns:
            - global modes
            - modes per g0
            - modes per (g0, g1)
        6) (special case) learn bucket medians for mileage↔age fallback.
        """

        df = pd.DataFrame(X).copy()
        self.feature_names_in_ = df.columns.to_list()

        # group_cols must contain at least one column name
        if self.group_cols is None or len(self.group_cols) == 0:
            raise ValueError("GroupImputer: at least one group column must be specified.")

        self.group_cols_ = list(self.group_cols)

        # Determine numeric columns to impute (internal num_cols_)
        if self.num_cols is None:
            # If not specified: all numeric columns except the group columns
            num_cols_all = df.select_dtypes(include="number").columns.tolist()
            self.num_cols_ = [c for c in num_cols_all if c not in self.group_cols_]
        else:
            # If specified: keep only those that exist in df
            self.num_cols_ = [c for c in self.num_cols if c in df.columns]

        # Determine categorical columns to impute (internal cat_cols_)
        if self.cat_cols is None:
            # If not specified: all non-group, non-numeric columns
            self.cat_cols_ = [c for c in df.columns if c not in self.group_cols_ + self.num_cols_]
        else:
            # If specified: keep only those that exist in df
            self.cat_cols_ = [c for c in self.cat_cols if c in df.columns]

        # Build group key series based on the current df
        # g0 = first grouping column (e.g. brand)
        g0 = self._get_group_series(df, self.group_cols_[0])

        # g1 = second grouping column (e.g. model), optional
        g1 = None
        if len(self.group_cols_) > 1:
            g1 = self._get_group_series(df, self.group_cols_[1])

        # NEW: mileage <-> age bucket medians (learned only on train fold)
        self.age_bin_edges_ = None
        self.mileage_bin_edges_ = None
        self.mileage_by_agebin_ = None
        self.age_by_mileagebin_ = None

        has_age = self.age_col in df.columns
        has_mileage = self.mileage_col in df.columns

        if has_age and has_mileage:
            age_s = pd.to_numeric(df[self.age_col], errors="coerce")
            mil_s = pd.to_numeric(df[self.mileage_col], errors="coerce")

            # mileage from age buckets
            self.age_bin_edges_ = self._make_bin_edges(age_s, self.n_bins_age)
            if self.age_bin_edges_ is not None:
                age_bins = pd.cut(age_s, self.age_bin_edges_, include_lowest=True)
                self.mileage_by_agebin_ = mil_s.groupby(age_bins, dropna=True).median()

            # age from mileage buckets
            self.mileage_bin_edges_ = self._make_bin_edges(mil_s, self.n_bins_mileage)
            if self.mileage_bin_edges_ is not None:
                mil_bins = pd.cut(mil_s, self.mileage_bin_edges_, include_lowest=True)
                self.age_by_mileagebin_ = age_s.groupby(mil_bins, dropna=True).median()

        # numeric statistics
        if self.num_cols_:
            # Extract the numeric columns to impute
            num_df = df[self.num_cols_].copy()

            # 3) Global median per numeric column (fallback for any group with no stats)
            self.num_global_ = num_df.median(numeric_only=True)

            # 2) Median per first-level group (g0, e.g. brand)
            num_first = num_df.copy()
            num_first["_g0"] = g0.values  # temporary group key column
            self.num_first_ = num_first.groupby("_g0", dropna=True).median(numeric_only=True)

            # 1) Median per pair (g0, g1), e.g. (brand, model)
            if g1 is not None:
                num_pair = num_df.copy()
                num_pair["_g0"] = g0.values
                num_pair["_g1"] = g1.values
                self.num_pair_ = num_pair.groupby(["_g0", "_g1"], dropna=True).median(numeric_only=True)
            else:
                self.num_pair_ = pd.DataFrame()
        else:
            self.num_global_ = pd.Series(dtype="float64")
            self.num_first_ = pd.DataFrame()
            self.num_pair_ = pd.DataFrame()

        # categorical statistics
        if self.cat_cols_:
            cat_df = df[self.cat_cols_].copy()

            # 3) Global mode per categorical column
            self.cat_global_ = pd.Series({c: self._mode(cat_df[c]) for c in self.cat_cols_}, dtype="object")

            # 2) Mode per first-level group (g0)
            cat_first = cat_df.copy()
            cat_first["_g0"] = g0.values
            self.cat_first_ = cat_first.groupby("_g0", dropna=True).agg(lambda s: self._mode(s))

            # 1) Mode per pair (g0, g1)
            if g1 is not None:
                cat_pair = cat_df.copy()
                cat_pair["_g0"] = g0.values
                cat_pair["_g1"] = g1.values
                self.cat_pair_ = cat_pair.groupby(["_g0", "_g1"], dropna=True).agg(lambda s: self._mode(s))
            else:
                self.cat_pair_ = pd.DataFrame()
        else:
            self.cat_global_ = pd.Series(dtype="object")
            self.cat_first_ = pd.DataFrame()
            self.cat_pair_ = pd.DataFrame()

        return self

    def transform(self, X):
        """
        Apply hierarchical imputation to new data.
            1) Convert input to DataFrame and align columns to what fit() saw.
            2) Rebuild group keys g0, g1 from the current data.
            3) (special case) mileage↔age bucket fallback where possible.
            4) For each numeric column with missing values:
                - try pair-level median (g0, g1)
                - then brand-level median (g0)
                - then global median
            5) Same for categorical columns with modes.
        """
        df = pd.DataFrame(X).copy()
        df = df.reindex(columns=self.feature_names_in_)

        g0 = self._get_group_series(df, self.group_cols_[0])
        g1 = None
        if len(self.group_cols_) > 1:
            g1 = self._get_group_series(df, self.group_cols_[1])

        # NEW: audit counters
        report = {
            "num_age_bucket": 0,
            "num_mileage_bucket": 0,
            "num_pair": 0,
            "num_brand": 0,
            "num_global": 0,
            "cat_pair": 0,
            "cat_brand": 0,
            "cat_global": 0,
        }
        per_col = Counter()

        # numeric imputation
        if hasattr(self, "num_cols_") and self.num_cols_:
            df[self.num_cols_] = df[self.num_cols_].astype("float64")

            # (0) special-case: mileage from age bucket
            if (
                self.mileage_col in df.columns
                and self.age_col in df.columns
                and self.age_bin_edges_ is not None
                and isinstance(self.mileage_by_agebin_, pd.Series)
                and not self.mileage_by_agebin_.empty
            ):
                age_bins = pd.cut(df[self.age_col], self.age_bin_edges_, include_lowest=True)
                fill_mileage = age_bins.map(self.mileage_by_agebin_)
                mask = df[self.mileage_col].isna() & fill_mileage.notna()
                n = int(mask.sum())
                if n > 0:
                    report["num_age_bucket"] += n
                    per_col[self.mileage_col] += n
                    df.loc[mask, self.mileage_col] = fill_mileage.loc[mask].astype("float64")

            # (0) special-case: age from mileage bucket
            if (
                self.mileage_col in df.columns
                and self.age_col in df.columns
                and self.mileage_bin_edges_ is not None
                and isinstance(self.age_by_mileagebin_, pd.Series)
                and not self.age_by_mileagebin_.empty
            ):
                mil_bins = pd.cut(df[self.mileage_col], self.mileage_bin_edges_, include_lowest=True)
                fill_age = mil_bins.map(self.age_by_mileagebin_)
                mask = df[self.age_col].isna() & fill_age.notna()
                n = int(mask.sum())
                if n > 0:
                    report["num_mileage_bucket"] += n
                    per_col[self.age_col] += n
                    df.loc[mask, self.age_col] = fill_age.loc[mask].astype("float64")

            to_impute_num = [c for c in self.num_cols_ if df[c].isna().any()]

            if to_impute_num:
                # 1) pair-level imputation: per (g0, g1)
                if g1 is not None and not self.num_pair_.empty:
                    key_df = pd.DataFrame({"_g0": g0.values, "_g1": g1.values})
                    med_df = self.num_pair_.reset_index()
                    joined = key_df.merge(med_df, on=["_g0", "_g1"], how="left")

                    for col in to_impute_num:
                        if col not in self.num_pair_.columns:
                            continue
                        mask = df[col].isna() & joined[col].notna()
                        n = int(mask.sum())
                        report["num_pair"] += n
                        per_col[col] += n
                        df.loc[mask, col] = joined.loc[mask, col]

                # 2) first-level imputation: per g0 only
                if not self.num_first_.empty:
                    key1 = pd.DataFrame({"_g0": g0.values})
                    med1 = self.num_first_.reset_index()
                    joined1 = key1.merge(med1, on="_g0", how="left")

                    for col in to_impute_num:
                        if col not in self.num_first_.columns:
                            continue
                        mask = df[col].isna() & joined1[col].notna()
                        n = int(mask.sum())
                        report["num_brand"] += n
                        per_col[col] += n
                        df.loc[mask, col] = joined1.loc[mask, col]

                # 3) global median fallback
                for col in to_impute_num:
                    if col in self.num_global_:
                        mask = df[col].isna()
                        n = int(mask.sum())
                        report["num_global"] += n
                        per_col[col] += n
                        df[col] = df[col].fillna(self.num_global_[col])

        # categorical imputation
        if hasattr(self, "cat_cols_") and self.cat_cols_:
            to_impute_cat = [c for c in self.cat_cols_ if df[c].isna().any()]

            if to_impute_cat:
                # 1) pair-level imputation: per (g0, g1)
                if g1 is not None and not self.cat_pair_.empty:
                    key_df = pd.DataFrame({"_g0": g0.values, "_g1": g1.values})
                    mode_df = self.cat_pair_.reset_index()
                    joined = key_df.merge(mode_df, on=["_g0", "_g1"], how="left")

                    for col in to_impute_cat:
                        if col not in self.cat_pair_.columns:
                            continue
                        mask = df[col].isna() & joined[col].notna()
                        n = int(mask.sum())
                        report["cat_pair"] += n
                        per_col[col] += n
                        df.loc[mask, col] = joined.loc[mask, col]

                # 2) first-level imputation: per g0 only
                if not self.cat_first_.empty:
                    key1 = pd.DataFrame({"_g0": g0.values})
                    mode1 = self.cat_first_.reset_index()
                    joined1 = key1.merge(mode1, on="_g0", how="left")

                    for col in to_impute_cat:
                        if col not in self.cat_first_.columns:
                            continue
                        mask = df[col].isna() & joined1[col].notna()
                        n = int(mask.sum())
                        report["cat_brand"] += n
                        per_col[col] += n
                        df.loc[mask, col] = joined1.loc[mask, col]

                # 3) global mode fallback (or fallback token)
                for col in to_impute_cat:
                    mask = df[col].isna()
                    n = int(mask.sum())
                    report["cat_global"] += n
                    per_col[col] += n
                    df[col] = df[col].fillna(self.cat_global_.get(col, self.fallback))

        # store report for later inspection
        self.report_ = report
        self.report_by_column_ = (
            pd.DataFrame(per_col.items(), columns=["column", "values_filled"])
            .sort_values("values_filled", ascending=False)
            .reset_index(drop=True)
        )

        if self.verbose:
            _print_section("GroupImputer report")
            print("Imputed Missing Values ( always try 'most similar cars' first):\n")
            print(
                f"- Numeric (Median):   age->mileage={report['num_age_bucket']}, mileage->age={report['num_mileage_bucket']}, "
                f"(brand+model)={report['num_pair']}, brand={report['num_brand']}, global={report['num_global']}"
            )
            print(
                f"- Categorical (Mode): (brand+model)={report['cat_pair']}, brand={report['cat_brand']}, global={report['cat_global']}"
            )
            print("\nTop columns affected:")
            _maybe_display(self.report_by_column_, max_rows=self.verbose_top_n)

        return df

    def get_feature_names_out(self, input_features=None):
        """
        Make the transformer compatible with sklearn's get feature-name.

        - If called without arguments, return the original feature names seen in fit().
        - This is mostly useful when GroupImputer is at the top of a Pipeline and
          later steps want to introspect feature names.
        """
        if input_features is None:
            input_features = getattr(self, "feature_names_in_", None)
        return np.asarray(input_features, dtype=object)
