<a href="https://colab.research.google.com/github/leticiarccorrea/sales-operations-demand-forecasting/blob/main/case_salesmodel_modelo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# import dataset

from google.colab import drive
import pandas as pd


# Access to Google Drive
drive.mount('/content/drive')
datapah = '/content/drive/MyDrive/caseboti/dataset.csv'
datapah_dic = '/content/drive/MyDrive/caseboti/dicionariodedados.csv'


# Load file in pandas and spark
base = pd.read_csv(datapah, sep=';', on_bad_lines='warn')
base_dictionary = pd.read_csv(datapah_dic)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# import de bibliotecas

# ============================
# Bibliotecas da biblioteca padrão (Python)
# ============================
from typing import Dict, List, Tuple, Iterable, Optional
from dataclasses import dataclass


# ============================
# Bibliotecas de terceiros para manipulação de dados
# ============================
import numpy as np
import pandas as pd

# ============================
# Modelos de Machine Learning (scikit-learn)
# ============================
# Regressor principal para previsão pontual de demanda
from sklearn.ensemble import HistGradientBoostingRegressor

# Regressor para previsão probabilística (quantis)
from sklearn.ensemble import GradientBoostingRegressor

# ============================
# Métricas de avaliação
# ============================
# Métrica de erro absoluto médio
from sklearn.metrics import mean_absolute_error

# ============================
# Inspeção e interpretabilidade do modelo
# ============================
# Permutation Importance para avaliar prioridade das variáveis no conjunto de teste
from sklearn.inspection import permutation_importance

RANDOM_SEED = 42

**1. Preparação do dataset**

In [3]:
@dataclass
class ModelInputConfig:
    # colunas obrigatórias do dataset bruto
    required_cols: Tuple[str, ...] = (
        "COD_CICLO",
        "COD_MATERIAL",
        "COD_CANAL",
        "COD_REGIAO",
        "QT_VENDA_BRUTO",
        "QT_DEVOLUCAO",
        "PCT_DESCONTO",
        "VL_PRECO",
    )

    # colunas numéricas que podem vir como string PT-BR/EN
    numeric_like_cols: Tuple[str, ...] = (
        "QT_VENDA_BRUTO",
        "QT_DEVOLUCAO",
        "VL_RECEITA_BRUTA",
        "VL_RECEITA_LIQUIDA",
        "PCT_DESCONTO",
        "VL_PRECO",
    )

    # regras de desconto
    discount_clip: Tuple[float, float] = (0.0, 1.0)

    # regras de target
    default_devolution: float = 0.0
    min_target_value: float = 0.0  # Comentário: para input do modelo, removemos vendas líquidas negativas

    # filtro de qualidade temporal
    drop_invalid_ciclo: bool = True

    # imputação de preço SEM leakage (expanding median por SKU)
    price_fallback_to_global_median: bool = True

    # histórico mínimo por série para gerar lags
    series_keys: Tuple[str, ...] = ("cod_material", "cod_canal", "cod_regiao")
    min_history_per_series: int = 8

    # remove séries com pouca venda (reduz ruído/zeros)
    min_positive_sales_points: int = 3



# Funções

def parse_ptbr_number(series: pd.Series) -> pd.Series:
    """
    Converte números em formato PT-BR para float.
    Comentário: só remove '.' como milhar quando existir ',' (indicando PT-BR).
    """
    if pd.api.types.is_numeric_dtype(series):
        return series

    s = (
        series.astype(str)
        .str.strip()
        .replace(
            {
                "": np.nan,
                "nan": np.nan,
                "NaN": np.nan,
                "NULL": np.nan,
                "None": np.nan,
            }
        )
    )

    mask_ptbr = s.str.contains(",", na=False)
    s.loc[mask_ptbr] = (
        s.loc[mask_ptbr]
        .str.replace(".", "", regex=False)
        .str.replace(",", ".", regex=False)
    )

    return pd.to_numeric(s, errors="coerce")


def rename_to_snake_case(df: pd.DataFrame) -> pd.DataFrame:
    """
    Renomeia colunas principais para snake_case.
    """
    rename_map = {
        "COD_CICLO": "cod_ciclo",
        "COD_MATERIAL": "cod_material",
        "COD_CANAL": "cod_canal",
        "COD_REGIAO": "cod_regiao",
        "DES_CATEGORIA_MATERIAL": "des_categoria_material",
        "DES_MARCA_MATERIAL": "des_marca_material",
        "QT_VENDA_BRUTO": "qt_venda_bruto",
        "QT_DEVOLUCAO": "qt_devolucao",
        "VL_RECEITA_BRUTA": "vl_receita_bruta",
        "VL_RECEITA_LIQUIDA": "vl_receita_liquida",
        "PCT_DESCONTO": "pct_desconto",
        "VL_PRECO": "vl_preco",
        "FLG_DATA": "flg_data",
        "FLG_CAMPANHA_MKT_A": "flg_campanha_mkt_a",
        "FLG_CAMPANHA_MKT_B": "flg_campanha_mkt_b",
        "FLG_CAMPANHA_MKT_C": "flg_campanha_mkt_c",
        "FLG_CAMPANHA_MKT_D": "flg_campanha_mkt_d",
        "FLG_CAMPANHA_MKT_E": "flg_campanha_mkt_e",
    }
    return df.rename(columns={k: v for k, v in rename_map.items() if k in df.columns})


def add_time_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Cria ano e ciclo a partir de cod_ciclo no formato YYYYCC.
    """
    out = df.copy()
    cod_ciclo_num = pd.to_numeric(out["cod_ciclo"], errors="coerce")
    out["ano"] = (cod_ciclo_num // 100).astype("Int64")
    out["ciclo"] = (cod_ciclo_num % 100).astype("Int64")
    return out


def build_campaign_flag(df: pd.DataFrame) -> pd.DataFrame:
    """
    Cria flg_campanha_resumo a partir de colunas flg_campanha* (se existirem).
    """
    out = df.copy()
    campaign_cols = [c for c in out.columns if c.startswith("flg_campanha")]

    if len(campaign_cols) > 0:
        out["flg_campanha_resumo"] = (
            out[campaign_cols]
            .fillna(0)
            .max(axis=1)
            .astype(int)
        )
    else:
        out["flg_campanha_resumo"] = 0

    return out


def normalize_discount(df: pd.DataFrame, cfg: ModelInputConfig) -> pd.DataFrame:
    """
    Garante desconto em 0..1, assumindo 0 quando ausente e convertendo % quando necessário.
    """
    out = df.copy()
    if "pct_desconto" not in out.columns:
        out["pct_desconto"] = 0.0
        return out

    out["pct_desconto"] = out["pct_desconto"].fillna(0.0)

    if out["pct_desconto"].max(skipna=True) > 1.0:
        out["pct_desconto"] = out["pct_desconto"] / 100.0

    out["pct_desconto"] = out["pct_desconto"].clip(cfg.discount_clip[0], cfg.discount_clip[1])
    return out


def compute_targets(df: pd.DataFrame, cfg: ModelInputConfig) -> pd.DataFrame:
    """
    Cria demanda líquida e pct_devolucao.
    """
    out = df.copy()

    out["qt_devolucao"] = out.get("qt_devolucao", cfg.default_devolution)
    out["qt_devolucao"] = pd.to_numeric(out["qt_devolucao"], errors="coerce").fillna(cfg.default_devolution)

    out["qt_venda_bruto"] = out.get("qt_venda_bruto", 0.0)
    out["qt_venda_bruto"] = pd.to_numeric(out["qt_venda_bruto"], errors="coerce").fillna(0.0)

    out["qt_venda_liquida"] = out["qt_venda_bruto"] - out["qt_devolucao"]

    out["pct_devolucao"] = np.where(
        out["qt_venda_bruto"] > 0,
        out["qt_devolucao"] / out["qt_venda_bruto"],
        np.nan,
    )

    return out


def impute_price_expanding_median_by_sku(
    df: pd.DataFrame,
    sku_col: str = "cod_material",
    time_col: str = "cod_ciclo",
    price_col: str = "vl_preco",
    fallback_to_global: bool = True,
) -> pd.DataFrame:
    """
    Imputa vl_preco faltante usando mediana expanding por SKU (sem usar futuro).
    """
    out = df.copy()
    if sku_col not in out.columns or time_col not in out.columns:
        return out

    if price_col not in out.columns:
        out[price_col] = np.nan

    out = out.sort_values([sku_col, time_col]).reset_index(drop=True)

    expanding_median = (
        out.groupby(sku_col, observed=True)[price_col]
        .expanding()
        .median()
        .reset_index(level=0, drop=True)
    )

    out[price_col] = pd.to_numeric(out[price_col], errors="coerce")
    out[price_col] = out[price_col].fillna(expanding_median)

    if fallback_to_global:
        global_median = out[price_col].median(skipna=True)
        out[price_col] = out[price_col].fillna(global_median)

    return out


def filter_series_for_modeling(
    df: pd.DataFrame,
    series_keys: Tuple[str, ...],
    time_col: str,
    target_col: str,
    min_history: int,
    min_positive_points: int,
) -> pd.DataFrame:
    """
    Remove séries com histórico insuficiente e com poucas vendas > 0.
    """
    out = df.copy()

    # Comentário: histórico por série
    history = out.groupby(list(series_keys), observed=True)[time_col].nunique()
    keep_history = history[history >= min_history].index

    out = out.set_index(list(series_keys))
    out = out.loc[out.index.isin(keep_history)].reset_index()

    # Comentário: remove séries muito esparsas (poucos pontos de venda positiva)
    pos_points = out.groupby(list(series_keys), observed=True)[target_col].apply(lambda s: int((s > 0).sum()))
    keep_pos = pos_points[pos_points >= min_positive_points].index

    out = out.set_index(list(series_keys))
    out = out.loc[out.index.isin(keep_pos)].reset_index()

    return out



# Main: build model input

def build_model_input_dataset(
    base: pd.DataFrame,
    cfg: Optional[ModelInputConfig] = None,
) -> Tuple[pd.DataFrame, Dict[str, object]]:
    """
    gera dataset final pronto para modelagem
    Retorna:
      - model_df: dataset pronto para criar lags/rolling e treinar
      - report: métricas de limpeza e filtros aplicados
    """
    if cfg is None:
        cfg = ModelInputConfig()

    # Comentário: valida schema mínimo
    missing = [c for c in cfg.required_cols if c not in base.columns]
    if missing:
        raise ValueError(f"Missing required columns in base: {missing}")

    df = base.copy()

    # conversão numérica no schema original (UPPER)
    for col in cfg.numeric_like_cols:
        if col in df.columns:
            df[col] = parse_ptbr_number(df[col])

    # rename para snake_case
    df = rename_to_snake_case(df)

    # desconto e targets
    df = normalize_discount(df, cfg)
    df = compute_targets(df, cfg)

    # tempo e campanha
    df = add_time_features(df)
    df = build_campaign_flag(df)

    # imputação de preço sem leakage
    df = impute_price_expanding_median_by_sku(
        df=df,
        sku_col="cod_material",
        time_col="cod_ciclo",
        price_col="vl_preco",
        fallback_to_global=cfg.price_fallback_to_global_median,
    )

    # report inicial
    report: Dict[str, object] = {
        "rows_raw": int(len(base)),
        "rows_after_numeric_and_targets": int(len(df)),
        "missing_rate_vl_preco": float(df["vl_preco"].isna().mean()) if "vl_preco" in df.columns else None,
        "missing_rate_cod_ciclo": float(df["cod_ciclo"].isna().mean()) if "cod_ciclo" in df.columns else None,
    }

    # remove ciclos inválidos (input do modelo precisa de tempo válido)
    invalid_ciclo_mask = df["ano"].isna() | df["ciclo"].isna()
    report["rows_invalid_ciclo"] = int(invalid_ciclo_mask.sum())

    if cfg.drop_invalid_ciclo:
        df = df.loc[~invalid_ciclo_mask].copy()

    # remove targets negativos (para demanda, normalmente não faz sentido no modelo)
    neg_target_mask = df["qt_venda_liquida"] < cfg.min_target_value
    report["rows_negative_target"] = int(neg_target_mask.sum())
    df = df.loc[~neg_target_mask].copy()

    # remove NaNs críticos para modelagem
    critical_cols = [
        "cod_ciclo", "cod_material", "cod_canal", "cod_regiao",
        "qt_venda_liquida", "vl_preco", "pct_desconto",
        "flg_campanha_resumo",
    ]
    critical_cols = [c for c in critical_cols if c in df.columns]

    before_dropna = len(df)
    df = df.dropna(subset=critical_cols).copy()
    report["rows_dropped_by_critical_nan"] = int(before_dropna - len(df))

    # garante tipos finais
    df["cod_ciclo"] = pd.to_numeric(df["cod_ciclo"], errors="coerce").astype("Int64")
    df["vl_preco"] = pd.to_numeric(df["vl_preco"], errors="coerce")
    df["pct_desconto"] = pd.to_numeric(df["pct_desconto"], errors="coerce")
    df["qt_venda_liquida"] = pd.to_numeric(df["qt_venda_liquida"], errors="coerce")

    # ordena (essencial para lags/rolling)
    df = df.sort_values(list(cfg.series_keys) + ["cod_ciclo"]).reset_index(drop=True)

    # filtra séries com histórico mínimo
    before_series_filter = len(df)
    df = filter_series_for_modeling(
        df=df,
        series_keys=cfg.series_keys,
        time_col="cod_ciclo",
        target_col="qt_venda_liquida",
        min_history=cfg.min_history_per_series,
        min_positive_points=cfg.min_positive_sales_points,
    )
    report["rows_dropped_by_series_filter"] = int(before_series_filter - len(df))

    report["rows_final"] = int(len(df))
    report["n_series_final"] = int(df[list(cfg.series_keys)].drop_duplicates().shape[0])
    report["cycles_final"] = int(df["cod_ciclo"].nunique())

    return df, report


# Run (final)

cfg = ModelInputConfig(
    drop_invalid_ciclo=True,              # modelo precisa de tempo válido
    min_target_value=0.0,                 # remove demanda líquida negativa
    min_history_per_series=8,             # suporta lags até 6 com folga
    min_positive_sales_points=3,          # reduz séries quase sempre zero
    price_fallback_to_global_median=True, # último recurso
)

model_df, model_report = build_model_input_dataset(base, cfg=cfg)

print(model_report)
display(model_df.head())


{'rows_raw': 173923, 'rows_after_numeric_and_targets': 173923, 'missing_rate_vl_preco': 0.0, 'missing_rate_cod_ciclo': 0.0, 'rows_invalid_ciclo': 0, 'rows_negative_target': 154, 'rows_dropped_by_critical_nan': 0, 'rows_dropped_by_series_filter': 5588, 'rows_final': 168181, 'n_series_final': 5002, 'cycles_final': 53}


Unnamed: 0,cod_material,cod_canal,cod_regiao,cod_ciclo,flg_data,des_categoria_material,des_marca_material,qt_venda_bruto,qt_devolucao,vl_receita_bruta,...,flg_campanha_mkt_c,flg_campanha_mkt_d,flg_campanha_mkt_e,pct_desconto,vl_preco,qt_venda_liquida,pct_devolucao,ano,ciclo,flg_campanha_resumo
0,6480,anon_S7,anon_S1,201801,0,anon_S12,anon_S17,276.0,0.0,4429.8,...,0,0,0,0.0,833.4,276.0,0.0,2018,1,0
1,6480,anon_S7,anon_S1,201802,0,anon_S12,anon_S17,288.0,0.0,5628.6,...,0,0,0,0.0,833.4,288.0,0.0,2018,2,0
2,6480,anon_S7,anon_S1,201803,0,anon_S12,anon_S17,258.0,0.0,2631.6,...,0,0,0,0.0,833.4,258.0,0.0,2018,3,0
3,6480,anon_S7,anon_S1,201804,0,anon_S12,anon_S17,282.0,0.0,5029.2,...,0,0,0,0.0,833.4,282.0,0.0,2018,4,0
4,6480,anon_S7,anon_S1,201805,0,anon_S12,anon_S17,270.0,0.0,3830.4,...,0,0,0,0.0,833.4,270.0,0.0,2018,5,0


In [4]:
# Testes unitários para as funções de ajuste do dataset


def _make_minimal_base_dataframe() -> pd.DataFrame:
    """Cria um dataset mínimo (UPPERCASE) para testar o pipeline end-to-end."""
    return pd.DataFrame(
        {
            "COD_CICLO": [202001, 202002, 202003, 202004, 202005, 202006, 202007, 202008,
                         202001, 202002, 202003, 202004, 202005, 202006, 202007, 202008],
            "COD_MATERIAL": ["SKU_A"] * 8 + ["SKU_B"] * 8,
            "COD_CANAL": ["ECOM"] * 16,
            "COD_REGIAO": ["SE"] * 16,
            # Comentário: PT-BR e EN misturados (strings)
            "QT_VENDA_BRUTO": ["1.000,0"] * 8 + ["500.0"] * 8,
            "QT_DEVOLUCAO": ["10,0"] * 8 + ["5.0"] * 8,
            "PCT_DESCONTO": ["10"] * 8 + ["0.1"] * 8,  # 10% (string) e 0.1 (string)
            "VL_PRECO": ["10,0", np.nan, "12,0", np.nan, "11,0", np.nan, "13,0", np.nan] + [np.nan] * 8,
            # Comentário: flags de campanha para construir resumo
            "FLG_CAMPANHA_MKT_A": [0, 1, 0, 0, 1, 0, 0, 0] + [0] * 8,
            "FLG_CAMPANHA_MKT_B": [0] * 16,
        }
    )


def test_parse_ptbr_number_basic():
    # deve converter PT-BR corretamente
    s = pd.Series(["1.234,56", "10,0", "0,5", None, "", "nan", "NULL", "500.25"])
    out = parse_ptbr_number(s)

    assert np.isclose(out.iloc[0], 1234.56, atol=1e-9)
    assert np.isclose(out.iloc[1], 10.0, atol=1e-9)
    assert np.isclose(out.iloc[2], 0.5, atol=1e-9)
    assert np.isnan(out.iloc[3])
    assert np.isnan(out.iloc[4])
    assert np.isnan(out.iloc[5])
    assert np.isnan(out.iloc[6])
    assert np.isclose(out.iloc[7], 500.25, atol=1e-9)


def test_rename_to_snake_case():
    df = pd.DataFrame(
        {
            "COD_CICLO": [202001],
            "COD_MATERIAL": ["X"],
            "QT_VENDA_BRUTO": [100],
            "VL_PRECO": [10],
            "SOME_OTHER_COL": ["keep"],
        }
    )
    out = rename_to_snake_case(df)

    assert "cod_ciclo" in out.columns
    assert "cod_material" in out.columns
    assert "qt_venda_bruto" in out.columns
    assert "vl_preco" in out.columns
    # coluna que não está no mapa deve permanecer
    assert "SOME_OTHER_COL" in out.columns


def test_add_time_features():
    df = pd.DataFrame({"cod_ciclo": [202001, 202112, "202205", "bad", None]})
    out = add_time_features(df)

    assert out.loc[0, "ano"] == 2020
    assert out.loc[0, "ciclo"] == 1
    assert out.loc[1, "ano"] == 2021
    assert out.loc[1, "ciclo"] == 12
    assert out.loc[2, "ano"] == 2022
    assert out.loc[2, "ciclo"] == 5
    # inválidos viram <NA>
    assert pd.isna(out.loc[3, "ano"])
    assert pd.isna(out.loc[3, "ciclo"])
    assert pd.isna(out.loc[4, "ano"])
    assert pd.isna(out.loc[4, "ciclo"])


def test_build_campaign_flag_with_cols():
    df = pd.DataFrame(
        {
            "flg_campanha_mkt_a": [0, 1, None],
            "flg_campanha_mkt_b": [0, 0, 1],
        }
    )
    out = build_campaign_flag(df)

    assert "flg_campanha_resumo" in out.columns
    assert out["flg_campanha_resumo"].tolist() == [0, 1, 1]


def test_build_campaign_flag_without_cols():
    df = pd.DataFrame({"x": [1, 2, 3]})
    out = build_campaign_flag(df)

    assert "flg_campanha_resumo" in out.columns
    assert out["flg_campanha_resumo"].tolist() == [0, 0, 0]


def test_normalize_discount():
    cfg = ModelInputConfig(discount_clip=(0.0, 1.0))

    df = pd.DataFrame({"pct_desconto": [0.1, 10, 150, -5, None]})
    out = normalize_discount(df, cfg)

    # max > 1 => assume % e divide por 100
    assert np.isclose(out.loc[0, "pct_desconto"], 0.001, atol=1e-12)
    assert np.isclose(out.loc[1, "pct_desconto"], 0.10, atol=1e-12)
    # 150% clipa em 1.0
    assert np.isclose(out.loc[2, "pct_desconto"], 1.0, atol=1e-12)
    # -5% vira -0.05 e clipa em 0.0
    assert np.isclose(out.loc[3, "pct_desconto"], 0.0, atol=1e-12)
    # None vira 0
    assert np.isclose(out.loc[4, "pct_desconto"], 0.0, atol=1e-12)


def test_compute_targets():
    cfg = ModelInputConfig(default_devolution=0.0, min_target_value=0.0)

    df = pd.DataFrame(
        {
            "qt_venda_bruto": [1000, 0, 100],
            "qt_devolucao": [10, 5, None],
        }
    )
    out = compute_targets(df, cfg)

    assert np.isclose(out.loc[0, "qt_venda_liquida"], 990.0, atol=1e-9)
    assert np.isclose(out.loc[0, "pct_devolucao"], 10 / 1000, atol=1e-9)

    # venda bruto 0 => pct_devolucao deve ser NaN
    assert np.isclose(out.loc[1, "qt_venda_liquida"], -5.0, atol=1e-9)
    assert np.isnan(out.loc[1, "pct_devolucao"])

    # devolução None => usa default 0
    assert np.isclose(out.loc[2, "qt_venda_liquida"], 100.0, atol=1e-9)
    assert np.isclose(out.loc[2, "pct_devolucao"], 0.0, atol=1e-9)


def test_impute_price_expanding_median_by_sku():
    df = pd.DataFrame(
        {
            "cod_material": ["A", "A", "A", "B", "B"],
            "cod_ciclo": [1, 2, 3, 1, 2],
            "vl_preco": [10.0, np.nan, 30.0, np.nan, np.nan],
        }
    )

    out = impute_price_expanding_median_by_sku(
        df=df,
        sku_col="cod_material",
        time_col="cod_ciclo",
        price_col="vl_preco",
        fallback_to_global=True,
    )

    # expanding median em A no ciclo 2 deve preencher com 10
    assert np.isclose(out.loc[(out["cod_material"] == "A") & (out["cod_ciclo"] == 2), "vl_preco"].iloc[0], 10.0, atol=1e-9)

    # SKU B não tem histórico de preço => cai no global median
    # Global median após preenchimento do A: valores [10,10,30] => mediana = 10
    assert np.isclose(out.loc[(out["cod_material"] == "B") & (out["cod_ciclo"] == 1), "vl_preco"].iloc[0], 10.0, atol=1e-9)
    assert np.isclose(out.loc[(out["cod_material"] == "B") & (out["cod_ciclo"] == 2), "vl_preco"].iloc[0], 10.0, atol=1e-9)


def test_filter_series_for_modeling():
    df = pd.DataFrame(
        {
            "cod_material": ["A"] * 8 + ["B"] * 7,
            "cod_canal": ["C"] * 15,
            "cod_regiao": ["R"] * 15,
            "cod_ciclo": list(range(1, 9)) + list(range(1, 8)),
            "qt_venda_liquida": [0, 0, 1, 0, 2, 0, 3, 0] + [1, 0, 0, 0, 0, 0, 0],
        }
    )

    out = filter_series_for_modeling(
        df=df,
        series_keys=("cod_material", "cod_canal", "cod_regiao"),
        time_col="cod_ciclo",
        target_col="qt_venda_liquida",
        min_history=8,
        min_positive_points=3,
    )

    # A tem 8 ciclos e 3 pontos positivos (1,2,3) => mantém
    # B tem 7 ciclos => remove
    assert set(out["cod_material"].unique().tolist()) == {"A"}
    assert out["cod_ciclo"].nunique() == 8


def test_build_model_input_dataset_end_to_end():
    base = _make_minimal_base_dataframe()

    cfg = ModelInputConfig(
        drop_invalid_ciclo=True,
        min_target_value=0.0,
        min_history_per_series=8,
        min_positive_sales_points=3,
        price_fallback_to_global_median=True,
    )

    model_df, report = build_model_input_dataset(base, cfg=cfg)

    # checks básicos de schema final
    expected_cols = {
        "cod_ciclo",
        "cod_material",
        "cod_canal",
        "cod_regiao",
        "qt_venda_liquida",
        "vl_preco",
        "pct_desconto",
        "flg_campanha_resumo",
        "ano",
        "ciclo",
        "pct_devolucao",
    }
    assert expected_cols.issubset(set(model_df.columns))

    # qt_venda_liquida deve ser 1000-10=990 (SKU_A) e 500-5=495 (SKU_B)
    a_first = model_df.loc[model_df["cod_material"] == "SKU_A"].sort_values("cod_ciclo").iloc[0]
    b_first = model_df.loc[model_df["cod_material"] == "SKU_B"].sort_values("cod_ciclo").iloc[0]
    assert np.isclose(a_first["qt_venda_liquida"], 990.0, atol=1e-9)
    assert np.isclose(b_first["qt_venda_liquida"], 495.0, atol=1e-9)

    # desconto deve virar 0.10 para SKU_A (era "10" => /100), e 0.1 para SKU_B (string "0.1")
    assert model_df["pct_desconto"].max() <= 1.0

    # ordenação por series_keys + cod_ciclo
    sorted_df = model_df.sort_values(list(cfg.series_keys) + ["cod_ciclo"]).reset_index(drop=True)
    assert model_df.reset_index(drop=True).equals(sorted_df)

    # report deve conter métricas essenciais
    for k in ["rows_raw", "rows_final", "n_series_final", "cycles_final", "rows_invalid_ciclo", "rows_negative_target"]:
        assert k in report


def test_build_model_input_dataset_missing_required_cols():
    # deve falhar se faltar coluna obrigatória do schema original
    base = pd.DataFrame({"COD_CICLO": [202001]})
    try:
        _ = build_model_input_dataset(base, cfg=ModelInputConfig())
        raise AssertionError("Era esperado ValueError por colunas obrigatórias ausentes.")
    except ValueError as e:
        assert "Missing required columns" in str(e)


# ============================================================
# Runner simples
# ============================================================

def run_all_tests() -> None:
    """Executa todos os testes"""
    tests = [
        test_parse_ptbr_number_basic,
        test_rename_to_snake_case,
        test_add_time_features,
        test_build_campaign_flag_with_cols,
        test_build_campaign_flag_without_cols,
        test_normalize_discount,
        test_compute_targets,
        test_impute_price_expanding_median_by_sku,
        test_filter_series_for_modeling,
        test_build_model_input_dataset_end_to_end,
        test_build_model_input_dataset_missing_required_cols,
    ]

    for t in tests:
        t()

    print(f"OK - {len(tests)} testes passaram.")


# Execute:
run_all_tests()


OK - 11 testes passaram.


In [5]:
base_work = model_df

**2. Definições de série e granularidade**

série = (`cod_material`, `cod_canal`, `cod_regiao`)

Motivo: para estimar o plano logístico é necessário o detalhamento por canal/região e, para itens críticos, por SKU.

Esse grão permite:
- capturar heterogeneidade estrutural (volume, elasticidade a preço/desconto, resposta a campanha),
- fazer agregações coerentes para níveis superiores (categoria/marca/região).



In [6]:
series_keys = ["cod_material", "cod_canal", "cod_regiao"]
time_key = "cod_ciclo"
target_col = "qt_venda_liquida"

# Ordenação temporal
base_work = base_work.sort_values(series_keys + [time_key]).reset_index(drop=True)

# Algumas checagens rápidas
print("linhas:", len(base_work))
print("n_séries:", base_work[series_keys].drop_duplicates().shape[0])
print("ciclos:", base_work[time_key].nunique(), "| min:", base_work[time_key].min(), "| max:", base_work[time_key].max())

linhas: 168181
n_séries: 5002
ciclos: 53 | min: 201801 | max: 202101


**3. Engenharia de features para painel temporal (global model)**


Estratégia:
- **Lags** da demanda (ex.: 1, 2, 3 ciclos)
- **Rolling stats** (média/mediana/std móveis)
- **Sazonalidade** via decomposição simples do código do ciclo (ano, índice dentro do ano)
- Features de preço/desconto e flags (campanha, data comemorativa)

Isso evita o problema de treinar um modelo por SKU (muito caro e instável) e aproveita sinal cruzado entre séries.


In [7]:
def split_ciclo_components(cod_ciclo: pd.Series) -> pd.DataFrame:
    """
    Extrai componentes do COD_CICLO (formato AAAACC).
    obs: não assumi 'ciclos por ano'; usei apenas:
      - ciclo_year: AAAA
      - ciclo_index: CC (índice do ciclo dentro do ano, sem trigonometria) - sem informação no texto base
    """
    s = pd.to_numeric(cod_ciclo, errors="coerce").astype("Int64").astype(str).str.zfill(6)
    ciclo_year = s.str.slice(0, 4).astype("Int64")
    ciclo_index = s.str.slice(4, 6).astype("Int64")
    return pd.DataFrame({"ciclo_year": ciclo_year, "ciclo_index": ciclo_index})


def add_lag_rolling_features(
    df: pd.DataFrame,
    group_keys: List[str],
    time_key: str,
    target_col: str,
    lags: List[int],
    rolling_windows: List[int],
) -> pd.DataFrame:
    """
    Cria lags e estatísticas móveis dentro de cada série.
    """
    # ordenação é obrigatória para não embaralhar lags/rolling
    out = df.sort_values(group_keys + [time_key]).copy()

    # componentes simples do ciclo
    cyc = split_ciclo_components(out[time_key])
    out = pd.concat([out.reset_index(drop=True), cyc.reset_index(drop=True)], axis=1)

    g = out.groupby(group_keys, observed=True)

    # lags da demanda
    for lag in lags:
        out[f"{target_col}_lag_{lag}"] = g[target_col].shift(lag)

    # rolling stats SEM leakage (usa shift(1))
    shifted_y = g[target_col].shift(1)
    group_id = g.ngroup()

    for w in rolling_windows:
        min_p = max(2, w // 2)

        out[f"{target_col}_roll_mean_{w}"] = shifted_y.groupby(group_id).transform(
            lambda s: s.rolling(window=w, min_periods=min_p).mean()
        )
        out[f"{target_col}_roll_std_{w}"] = shifted_y.groupby(group_id).transform(
            lambda s: s.rolling(window=w, min_periods=min_p).std()
        )
        out[f"{target_col}_roll_median_{w}"] = shifted_y.groupby(group_id).transform(
            lambda s: s.rolling(window=w, min_periods=min_p).median()
        )
        out[f"{target_col}_roll_min_{w}"] = shifted_y.groupby(group_id).transform(
            lambda s: s.rolling(window=w, min_periods=min_p).min()
        )
        out[f"{target_col}_roll_max_{w}"] = shifted_y.groupby(group_id).transform(
            lambda s: s.rolling(window=w, min_periods=min_p).max()
        )

        # volatilidade relativa (CV = std/mean)
        out[f"{target_col}_roll_cv_{w}"] = (
            out[f"{target_col}_roll_std_{w}"] / out[f"{target_col}_roll_mean_{w}"].replace(0, np.nan)
        )

        # share de zeros na janela (demanda intermitente)
        out[f"{target_col}_roll_zero_share_{w}"] = shifted_y.groupby(group_id).transform(
            lambda s: s.rolling(window=w, min_periods=min_p).apply(lambda x: np.mean(np.array(x) == 0), raw=False)
        )

    #  tendência curta (sem leakage, usando lags)
    if f"{target_col}_lag_1" in out.columns and f"{target_col}_lag_2" in out.columns:
        out[f"{target_col}_diff_1"] = out[f"{target_col}_lag_1"] - out[f"{target_col}_lag_2"]
        out[f"{target_col}_pct_change_1"] = (
            out[f"{target_col}_diff_1"] / out[f"{target_col}_lag_2"].replace(0, np.nan)
        )

    # interações e dinâmica de preço/desconto (sem leakage via lag_1)
    if "vl_preco" in out.columns and "pct_desconto" in out.columns:
        out["vl_preco_efetivo"] = out["vl_preco"] * (1.0 - out["pct_desconto"].clip(0, 1))
        out["vl_preco_x_desconto"] = out["vl_preco"] * out["pct_desconto"].fillna(0)

        out["vl_preco_lag_1"] = g["vl_preco"].shift(1)
        out["pct_desconto_lag_1"] = g["pct_desconto"].shift(1)

        out["vl_preco_pct_change_1"] = (out["vl_preco"] / out["vl_preco_lag_1"]) - 1.0
        out["pct_desconto_change_1"] = out["pct_desconto"] - out["pct_desconto_lag_1"]

    return out


# Aplicação
lags = [1, 2, 3]
rolling_windows = [2, 5]

feat_df = add_lag_rolling_features(
    df=base_work,
    group_keys=series_keys,
    time_key=time_key,
    target_col=target_col,
    lags=lags,
    rolling_windows=rolling_windows,
)

# features candidatas (sem sin/cos)
candidate_features = [
    "ciclo_year", "ciclo_index",
    "pct_desconto", "vl_preco", "vl_preco_efetivo", "vl_preco_x_desconto",
    "vl_preco_pct_change_1", "pct_desconto_change_1",
    "flg_campanha_resumo", "flg_data",
    f"{target_col}_diff_1", f"{target_col}_pct_change_1",
]

candidate_features += [c for c in feat_df.columns if c.startswith(f"{target_col}_lag_")]
candidate_features += [c for c in feat_df.columns if c.startswith(f"{target_col}_roll_")]

feature_cols = [c for c in candidate_features if c in feat_df.columns]

feat_df[feature_cols + [target_col]].head()


Unnamed: 0,ciclo_year,ciclo_index,pct_desconto,vl_preco,vl_preco_efetivo,vl_preco_x_desconto,vl_preco_pct_change_1,pct_desconto_change_1,flg_campanha_resumo,flg_data,...,qt_venda_liquida_roll_cv_2,qt_venda_liquida_roll_zero_share_2,qt_venda_liquida_roll_mean_5,qt_venda_liquida_roll_std_5,qt_venda_liquida_roll_median_5,qt_venda_liquida_roll_min_5,qt_venda_liquida_roll_max_5,qt_venda_liquida_roll_cv_5,qt_venda_liquida_roll_zero_share_5,qt_venda_liquida
0,2018,1,0.0,833.4,833.4,0.0,,,0,0,...,,,,,,,,,,276.0
1,2018,2,0.0,833.4,833.4,0.0,0.0,0.0,0,0,...,,,,,,,,,,288.0
2,2018,3,0.0,833.4,833.4,0.0,0.0,0.0,0,0,...,0.03009,0.0,282.0,8.485281,282.0,276.0,288.0,0.03009,0.0,258.0
3,2018,4,0.0,833.4,833.4,0.0,0.0,0.0,0,0,...,0.077704,0.0,274.0,15.099669,276.0,258.0,288.0,0.055108,0.0,282.0
4,2018,5,0.0,833.4,833.4,0.0,0.0,0.0,0,0,...,0.062854,0.0,276.0,12.961481,279.0,258.0,288.0,0.046962,0.0,270.0


**4. Estrutura de validação: walk-forward (expanding window) com horizonte > 1**

Para simular deploy real:
- Em cada *split*, treinamos com ciclos até `t` e prevemos `t+1..t+h`.
- Não há embaralhamento e não é usado informação futura.

Estratégia para `horizon=1..3`: **direct multi-horizon**
- Treina-se um modelo por horizonte (h=1,2,3) com o alvo deslocado `shift(-h)`.
- Isso tende a ser mais estável do que recursivo quando há volatilidade.

In [8]:
@dataclass
class WalkForwardConfig:
    min_train_cycles: int = 20   # mínimo de ciclos no treino
    step_size: int = 1           # avança 1 ciclo por split
    test_cycles: int = 3         # horizonte máximo avaliado
    max_splits: int = 6          # limita splits para performance (ajuste conforme necessidade)


def iter_walk_forward_splits(
    unique_cycles: np.ndarray,
    cfg: WalkForwardConfig,
) -> Iterable[Tuple[np.ndarray, np.ndarray]]:
    """Gera pares (train_cycles, test_cycles) para walk-forward expanding window."""
    cycles = np.array(sorted(unique_cycles))
    start = cfg.min_train_cycles
    end = len(cycles) - cfg.test_cycles

    split_count = 0
    for i in range(start, end + 1, cfg.step_size):
        train_cycles = cycles[:i]
        test_cycles = cycles[i : i + cfg.test_cycles]
        yield train_cycles, test_cycles
        split_count += 1
        if split_count >= cfg.max_splits:
            break


def wape(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    denom = np.sum(np.abs(y_true))
    if denom == 0:
        return np.nan
    return float(np.sum(np.abs(y_true - y_pred)) / denom)


def bias(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    denom = np.sum(np.abs(y_true))
    if denom == 0:
        return np.nan
    return float(np.sum(y_pred - y_true) / denom)


In [9]:
def make_multi_horizon_targets(
    df: pd.DataFrame,
    group_keys: List[str],
    target_col: str,
    horizons: List[int],
) -> pd.DataFrame:
    """Cria colunas alvo deslocadas para cada horizonte dentro de cada série."""
    out = df.copy()
    for h in horizons:
        out[f"{target_col}_h{h}"] = out.groupby(group_keys, observed=True)[target_col].shift(-h)
    return out

In [10]:
horizons = [1, 2, 3]
model_df = make_multi_horizon_targets(feat_df, series_keys, target_col, horizons)

# Remove linhas sem features suficientes (lags/rolling) e sem targets
min_required = [f"{target_col}_lag_1", f"{target_col}_lag_2"]
model_df = model_df.dropna(subset=min_required).copy()

# dropar apenas quando o target do horizonte específico for NaN, no momento do fit
print(model_df.shape)

(158177, 53)


**5. Construção do modelo**

ptou-se pelo HistGradientBoostingRegressor como modelo principal por sua boa relação entre desempenho preditivo, robustez e viabilidade operacional no contexto de previsão de vendas.

O problema foi formulado como time series supervised learning, no qual a dinâmica temporal é capturada por meio de lags, estatísticas móveis e variáveis de evento (campanhas, preço e desconto), permitindo ao modelo explorar não-linearidades e interações complexas típicas da demanda comercial.

O algoritmo é especialmente adequado para grandes volumes de dados e múltiplas séries (SKU, canal, região), apresentando alta eficiência computacional por utilizar histogram-based boosting, além de não exigir escalonamento das variáveis.

Adicionalmente, sua robustez a outliers moderados e estabilidade em cenários com dados heterogêneos o tornam um baseline forte e confiável, facilmente defensável em ambiente produtivo e apropriado para extensões como regressão quantílica para suporte a decisões de risco logístico.

In [11]:
def train_point_model(random_state: int = 42) -> HistGradientBoostingRegressor:
    """Modelo de ponto (média/mediana condicional)."""
    return HistGradientBoostingRegressor(
        loss="squared_error",
        learning_rate=0.05,
        max_depth=6,
        max_iter=400,
        min_samples_leaf=40,
        l2_regularization=1e-4,
        random_state=random_state,
    )


def train_quantile_model(alpha: float, random_state: int = 42) -> GradientBoostingRegressor:
    """Modelo para quantil (alpha in (0,1))."""
    return GradientBoostingRegressor(
        loss="quantile",
        alpha=alpha,
        learning_rate=0.05,
        max_depth=4,
        n_estimators=350,
        min_samples_leaf=60,
        random_state=random_state,
    )


In [12]:
def train_point_model(random_state: int = 42) -> HistGradientBoostingRegressor:
    """Modelo de ponto (média/mediana condicional)."""
    return HistGradientBoostingRegressor(
        loss="squared_error",
        learning_rate=0.05,
        max_depth=6,
        max_iter=400,
        min_samples_leaf=40,
        l2_regularization=1e-4,
        random_state=random_state,
    )


def train_quantile_model(alpha: float, random_state: int = 42) -> GradientBoostingRegressor:
    """Modelo para quantil (alpha in (0,1))."""
    return GradientBoostingRegressor(
        loss="quantile",
        alpha=alpha,
        learning_rate=0.05,
        max_depth=4,
        n_estimators=350,
        min_samples_leaf=60,
        random_state=random_state,
    )


In [13]:
def fit_predict_for_horizon(
    df: pd.DataFrame,
    train_cycles: np.ndarray,
    test_cycles: np.ndarray,
    feature_cols: List[str],
    horizon: int,
) -> Tuple[Dict[str, object], pd.DataFrame]:
    """
    Filtra dados, treina um modelo pontual para um horizonte específico e retorna métricas e previsões/reais.
    """
    # 1. Prepara os dados de treino e teste
    train_df = df[df[time_key].isin(train_cycles)].copy()
    test_df = df[df[time_key].isin(test_cycles)].copy()

    # 2. Define target para o horizonte atual (y_hN)
    target_col_h = f"{target_col}_h{horizon}"

    # 3. Remove NaNs no target para o treino e teste específicos do horizonte
    train_df_filtered = train_df.dropna(subset=[target_col_h]).copy()
    test_df_filtered = test_df.dropna(subset=[target_col_h]).copy()

    # 4. Separar features (X) e target (y)
    X_train = train_df_filtered[feature_cols]
    y_train = train_df_filtered[target_col_h]

    X_test = test_df_filtered[feature_cols]
    y_test = test_df_filtered[target_col_h]

    # 5. Treina o modelo
    model = train_point_model(random_state=RANDOM_SEED)
    model.fit(X_train, y_train)

    # 6. Previsão
    y_pred = model.predict(X_test)

    # 7. Calcular métricas
    # Garante que não temos previsões negativas (para demanda)
    y_pred = np.clip(y_pred, a_min=0, a_max=None)

    # Prepare DataFrame to return predictions and actuals for safety stock calculation
    results_for_safety_stock = test_df_filtered[series_keys + [time_key]].copy()
    results_for_safety_stock["y_true"] = y_test
    results_for_safety_stock["y_pred"] = y_pred
    results_for_safety_stock["abs_error"] = np.abs(y_test - y_pred)
    results_for_safety_stock["horizon"] = horizon # Add horizon for filtering later

    eval_metrics = {
        "horizon": horizon,
        "mae": float(mean_absolute_error(y_test, y_pred)),
        "wape": wape(y_test.to_numpy(), y_pred),
        "bias": bias(y_test.to_numpy(), y_pred),
        "n_train_rows": len(train_df_filtered),
        "n_test_rows": len(test_df_filtered),
    }

    return eval_metrics, results_for_safety_stock


# Loop de avaliação walk-forward
cfg = WalkForwardConfig(
    min_train_cycles=24,
    step_size=1,
    test_cycles=max(horizons),
    max_splits=10,
)

unique_cycles = model_df[time_key].dropna().unique()
all_eval_metrics = [] # List to store dictionaries of aggregated metrics
all_predictions_for_ss = [] # List to store DataFrames with y_true and y_pred

for split_id, (train_cycles, test_cycles) in enumerate(iter_walk_forward_splits(unique_cycles, cfg), start=1):
    for h in horizons:
        split_eval_metrics, predictions_df_for_h = fit_predict_for_horizon(
            df=model_df,
            train_cycles=train_cycles,
            test_cycles=test_cycles,
            feature_cols=feature_cols,
            horizon=h,
        )
        split_eval_metrics["split_id"] = split_id
        split_eval_metrics["train_end_cycle"] = int(max(train_cycles))
        all_eval_metrics.append(split_eval_metrics)

        predictions_df_for_h["split_id"] = split_id
        predictions_df_for_h["train_end_cycle"] = int(max(train_cycles))
        all_predictions_for_ss.append(predictions_df_for_h)

# Convert the list of dictionaries to a DataFrame for evaluation summary (this is your original eval_df)
eval_df = pd.DataFrame(all_eval_metrics)

# Consolidate all predictions into a single DataFrame for safety stock calculation
predictions_df = pd.concat(all_predictions_for_ss, ignore_index=True)

eval_df.head()

Unnamed: 0,horizon,mae,wape,bias,n_train_rows,n_test_rows,split_id,train_end_cycle
0,1,6595.892445,0.685207,0.060112,68393,8945,1,201908
1,2,7858.750831,0.72004,-0.044024,67886,8834,1,201908
2,3,7960.109911,0.732772,-0.007392,67260,8716,1,201908
3,1,7187.164585,0.654622,-0.058584,71302,9105,2,201909
4,2,7544.454188,0.691658,-0.058982,70774,8977,2,201909


In [14]:
# Métricas por horizonte (agregado geral)
metrics = (
    eval_df.groupby("horizon")
    .apply(lambda g: pd.Series({
        "n": len(g),
        "mean_mae": g["mae"].mean(),
        "mean_wape": g["wape"].mean(),
        "mean_bias": g["bias"].mean(),
    }))
    .reset_index()
)

metrics

  .apply(lambda g: pd.Series({


Unnamed: 0,horizon,n,mean_mae,mean_wape,mean_bias
0,1,10.0,7243.469269,0.680188,0.006752
1,2,10.0,7665.414138,0.759274,0.079105
2,3,10.0,7905.413358,0.849449,0.207316


**Análise de ovefitting e feature importance**

In [15]:
# Métricas

def wape(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """Comentário: WAPE = sum(|erro|) / sum(y_true)."""
    denom = np.sum(np.abs(y_true))
    if denom == 0:
        return float(np.nan)
    return float(np.sum(np.abs(y_true - y_pred)) / denom)


def bias(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """Comentário: viés médio (pred - real). Positivo = superestima."""
    return float(np.mean(y_pred - y_true))


def clip_non_negative(arr: np.ndarray) -> np.ndarray:
    """Comentário: demanda não pode ser negativa."""
    return np.clip(arr, a_min=0, a_max=None)



# Avaliação de Overfitting

def fit_predict_for_horizon_with_train_metrics(
    df: pd.DataFrame,
    train_cycles: np.ndarray,
    test_cycles: np.ndarray,
    feature_cols: List[str],
    horizon: int,
    time_key: str,
    target_col: str,
    random_seed: int,
) -> Tuple[Dict[str, object], object, pd.DataFrame, pd.DataFrame]:
    """
    obs:
    - Treina modelo para um horizonte.
    - Retorna métricas no treino e no teste (para medir overfitting).
    - Também retorna o modelo e os dataframes de treino/teste do split (para importância de features).
    """
    train_df = df[df[time_key].isin(train_cycles)].copy()
    test_df = df[df[time_key].isin(test_cycles)].copy()

    target_col_h = f"{target_col}_h{horizon}"

    train_df = train_df.dropna(subset=[target_col_h]).copy()
    test_df = test_df.dropna(subset=[target_col_h]).copy()

    X_train = train_df[feature_cols]
    y_train = train_df[target_col_h].to_numpy()

    X_test = test_df[feature_cols]
    y_test = test_df[target_col_h].to_numpy()

    model = train_point_model(random_state=random_seed)
    model.fit(X_train, y_train)

    y_pred_train = clip_non_negative(model.predict(X_train))
    y_pred_test = clip_non_negative(model.predict(X_test))

    metrics = {
        "horizon": int(horizon),
        "n_train_rows": int(len(train_df)),
        "n_test_rows": int(len(test_df)),

        # Comentário: métricas no treino
        "train_mae": float(mean_absolute_error(y_train, y_pred_train)),
        "train_wape": float(wape(y_train, y_pred_train)),
        "train_bias": float(bias(y_train, y_pred_train)),

        # Comentário: métricas no teste
        "test_mae": float(mean_absolute_error(y_test, y_pred_test)),
        "test_wape": float(wape(y_test, y_pred_test)),
        "test_bias": float(bias(y_test, y_pred_test)),

        # Comentário: gap (quanto piora do treino pro teste)
        "gap_mae": float(mean_absolute_error(y_test, y_pred_test) - mean_absolute_error(y_train, y_pred_train)),
        "gap_wape": float(wape(y_test, y_pred_test) - wape(y_train, y_pred_train)),
        "gap_bias": float(bias(y_test, y_pred_test) - bias(y_train, y_pred_train)),
    }

    return metrics, model, train_df, test_df


def evaluate_overfitting_walk_forward(
    df: pd.DataFrame,
    unique_cycles: np.ndarray,
    feature_cols: List[str],
    horizons: List[int],
    time_key: str,
    target_col: str,
    iter_walk_forward_splits,
    cfg,
    random_seed: int,
) -> pd.DataFrame:
    """
    roda walk-forward e mede gap treino vs teste (overfitting) por split/horizonte.
    """
    all_rows: List[Dict[str, object]] = []

    for split_id, (train_cycles, test_cycles) in enumerate(iter_walk_forward_splits(unique_cycles, cfg), start=1):
        train_end_cycle = int(np.max(train_cycles)) if len(train_cycles) else None

        for h in horizons:
            row, _, _, _ = fit_predict_for_horizon_with_train_metrics(
                df=df,
                train_cycles=train_cycles,
                test_cycles=test_cycles,
                feature_cols=feature_cols,
                horizon=h,
                time_key=time_key,
                target_col=target_col,
                random_seed=random_seed,
            )
            row["split_id"] = int(split_id)
            row["train_end_cycle"] = train_end_cycle
            all_rows.append(row)

    return pd.DataFrame(all_rows)


def summarize_overfitting(overfit_df: pd.DataFrame) -> pd.DataFrame:
    """
    sumariza por horizonte (média do treino, teste e gap).
    """
    summary = (
        overfit_df.groupby("horizon", as_index=False)
        .agg(
            n_splits=("split_id", "nunique"),
            train_wape_mean=("train_wape", "mean"),
            test_wape_mean=("test_wape", "mean"),
            gap_wape_mean=("gap_wape", "mean"),
            train_mae_mean=("train_mae", "mean"),
            test_mae_mean=("test_mae", "mean"),
            gap_mae_mean=("gap_mae", "mean"),
            test_bias_mean=("test_bias", "mean"),
        )
        .sort_values("horizon")
    )
    return summary



# Feature importance (priority)

def get_model_feature_importance(model, feature_cols: List[str]) -> pd.DataFrame:
    """
    - HistGradientBoostingRegressor: usa permutation importance como método principal.
    """
    rows = []

    if hasattr(model, "feature_importances_"):
        fi = np.array(model.feature_importances_, dtype=float)
        rows.append(pd.DataFrame({"feature": feature_cols, "importance": fi, "method": "model_feature_importances"}))

    if len(rows) == 0:
        return pd.DataFrame(columns=["feature", "importance", "method"])

    return pd.concat(rows, ignore_index=True)


def permutation_importance_on_test(
    model,
    test_df: pd.DataFrame,
    feature_cols: List[str],
    target_col_h: str,
    n_repeats: int = 5,
    random_seed: int = 42,
) -> pd.DataFrame:
    """
    - Permutation importance calculada no TESTE (sem vazamento de informação).
    - score usado: negativo do MAE (quanto maior, melhor).
    """
    X_test = test_df[feature_cols]
    y_test = test_df[target_col_h].to_numpy()

    # Comentário: se o teste tiver NaN em features, remove linhas (modelo não aceita NaN)
    valid_mask = ~np.any(pd.isna(X_test).to_numpy(), axis=1)
    X_test = X_test.loc[valid_mask]
    y_test = y_test[valid_mask]

    if len(X_test) == 0:
        return pd.DataFrame(columns=["feature", "importance_mean", "importance_std", "method"])

    result = permutation_importance(
        estimator=model,
        X=X_test,
        y=y_test,
        scoring="neg_mean_absolute_error",
        n_repeats=n_repeats,
        random_state=random_seed,
        n_jobs=-1,
    )

    imp_df = pd.DataFrame({
        "feature": feature_cols,
        "importance_mean": result.importances_mean,
        "importance_std": result.importances_std,
        "method": "permutation_importance_test_neg_mae",
    }).sort_values("importance_mean", ascending=False)

    return imp_df


def compute_feature_priority_across_splits(
    df: pd.DataFrame,
    unique_cycles: np.ndarray,
    feature_cols: List[str],
    horizons: List[int],
    time_key: str,
    target_col: str,
    iter_walk_forward_splits,
    cfg,
    random_seed: int,
    n_splits_for_importance: int = 3,
) -> pd.DataFrame:
    """
    - Calcula permutation importance em alguns splits para reduzir custo.
    - Agrega importância média por feature e por horizonte.
    """
    # Comentário: seleciona splits (ex.: últimos n_splits_for_importance)
    all_splits = list(iter_walk_forward_splits(unique_cycles, cfg))
    if len(all_splits) == 0:
        return pd.DataFrame()

    selected_splits = all_splits[-min(n_splits_for_importance, len(all_splits)):]
    all_rows = []

    for local_split_id, (train_cycles, test_cycles) in enumerate(selected_splits, start=1):
        for h in horizons:
            target_col_h = f"{target_col}_h{h}"

            # treina no split e pega test_df para importance
            _, model, _, test_df = fit_predict_for_horizon_with_train_metrics(
                df=df,
                train_cycles=train_cycles,
                test_cycles=test_cycles,
                feature_cols=feature_cols,
                horizon=h,
                time_key=time_key,
                target_col=target_col,
                random_seed=random_seed,
            )

            # remove NaNs do target no teste
            test_df_h = test_df.dropna(subset=[target_col_h]).copy()
            imp_df = permutation_importance_on_test(
                model=model,
                test_df=test_df_h,
                feature_cols=feature_cols,
                target_col_h=target_col_h,
                n_repeats=5,
                random_seed=random_seed,
            )

            if len(imp_df) == 0:
                continue

            imp_df["horizon"] = int(h)
            imp_df["selected_split_id"] = int(local_split_id)
            all_rows.append(imp_df)

    if len(all_rows) == 0:
        return pd.DataFrame()

    all_imp = pd.concat(all_rows, ignore_index=True)

    # agrega por feature/horizon
    summary_imp = (
        all_imp.groupby(["horizon", "feature"], as_index=False)
        .agg(
            importance_mean=("importance_mean", "mean"),
            importance_std=("importance_std", "mean"),
            n=("selected_split_id", "nunique"),
        )
        .sort_values(["horizon", "importance_mean"], ascending=[True, False])
    )

    return summary_imp


def summarize_feature_groups(feature_cols: List[str]) -> pd.DataFrame:
    """
    cria uma visão executiva por "grupo" de features (ex.: lags, rolling, preço, flags).
    """
    def feature_group(name: str) -> str:
        if "_lag_" in name:
            return "lags"
        if "_roll_" in name:
            return "rolling_stats"
        if "vl_preco" in name:
            return "price_features"
        if "pct_desconto" in name:
            return "discount_features"
        if name.startswith("flg_"):
            return "flags_events"
        if name.startswith("ciclo_"):
            return "cycle_index"
        return "other"

    return pd.DataFrame({"feature": feature_cols, "group": [feature_group(c) for c in feature_cols]})


def aggregate_importance_by_group(feature_importance_df: pd.DataFrame, feature_cols: List[str]) -> pd.DataFrame:
    """
    agrega importâncias por grupo para entender prioridade macro.
    """
    if feature_importance_df.empty:
        return pd.DataFrame()

    groups = summarize_feature_groups(feature_cols)
    merged = feature_importance_df.merge(groups, on="feature", how="left")

    group_summary = (
        merged.groupby(["horizon", "group"], as_index=False)
        .agg(
            group_importance_mean=("importance_mean", "sum"),
            n_features=("feature", "nunique"),
        )
        .sort_values(["horizon", "group_importance_mean"], ascending=[True, False])
    )

    return group_summary



# Run: overfitting + feature priority


unique_cycles = model_df[time_key].dropna().unique()

# 1 Overfitting evaluation
overfit_df = evaluate_overfitting_walk_forward(
    df=model_df,
    unique_cycles=unique_cycles,
    feature_cols=feature_cols,
    horizons=horizons,
    time_key=time_key,
    target_col=target_col,
    iter_walk_forward_splits=iter_walk_forward_splits,
    cfg=cfg,
    random_seed=RANDOM_SEED,
)

overfit_summary = summarize_overfitting(overfit_df)

print("Overfitting summary (train vs test):")
display(overfit_summary)

# splits onde gap ficou grande (possível overfitting/shift)
suspect = overfit_df.sort_values("gap_wape", ascending=False).head(15)
print("Top splits by WAPE gap (suspects):")
display(suspect)

# 2 Feature priority via permutation importance on test (subset of splits)
feature_priority_df = compute_feature_priority_across_splits(
    df=model_df,
    unique_cycles=unique_cycles,
    feature_cols=feature_cols,
    horizons=horizons,
    time_key=time_key,
    target_col=target_col,
    iter_walk_forward_splits=iter_walk_forward_splits,
    cfg=cfg,
    random_seed=RANDOM_SEED,
    n_splits_for_importance=3,  # Comentário: aumente para 5–10 se quiser mais robustez
)

print("Feature priority (permutation importance on test):")
display(feature_priority_df.head(30))

# 3 Priority by group (executive view)
feature_group_priority_df = aggregate_importance_by_group(
    feature_importance_df=feature_priority_df,
    feature_cols=feature_cols,
)

print("Feature priority by group:")
display(feature_group_priority_df)


Overfitting summary (train vs test):


Unnamed: 0,horizon,n_splits,train_wape_mean,test_wape_mean,gap_wape_mean,train_mae_mean,test_mae_mean,gap_mae_mean,test_bias_mean
0,1,10,0.526447,0.680188,0.153741,5327.722125,7243.469269,1915.747144,-64.751758
1,2,10,0.550514,0.759274,0.20876,5524.863547,7665.414138,2140.550591,580.099356
2,3,10,0.573626,0.849449,0.275824,5808.887456,7905.413358,2096.525902,1335.452841


Top splits by WAPE gap (suspects):


Unnamed: 0,horizon,n_train_rows,n_test_rows,train_mae,train_wape,train_bias,test_mae,test_wape,test_bias,gap_mae,gap_wape,gap_bias,split_id,train_end_cycle
29,3,94450,9749,6199.454501,0.605083,2.574477,6122.814489,1.153013,3263.473015,-76.640011,0.54793,3260.898538,10,201917
26,3,91291,9683,6325.773103,0.611644,-8.780443,6692.339356,1.152338,3714.245861,366.566253,0.540694,3723.026304,9,201916
23,3,88112,9617,5567.254785,0.534956,35.121005,7066.191642,0.987407,3732.366668,1498.936858,0.452451,3697.245663,8,201915
25,2,92280,9872,5314.053457,0.513873,3.458997,6815.979677,0.950722,2759.287979,1501.92622,0.436849,2755.828982,9,201916
28,2,95510,9909,5507.908013,0.535467,6.096081,5385.657283,0.929765,1248.638409,-122.25073,0.394298,1242.542328,10,201917
17,3,82027,9264,5747.14311,0.569453,-31.147369,9962.032419,0.793497,2276.630546,4214.889308,0.224044,2307.777915,6,201913
22,2,89040,9821,5884.861415,0.580556,6.651041,8371.243661,0.790332,2174.43108,2486.382246,0.209776,2167.78004,8,201915
21,1,89819,10014,5158.917287,0.508574,-20.393055,8829.997843,0.716752,861.06309,3671.080556,0.208179,881.456145,8,201915
20,3,85087,9363,6147.414108,0.603293,29.914261,8686.786911,0.807817,2454.828726,2539.372803,0.204524,2424.914465,7,201914
10,2,76720,9254,5055.037608,0.504582,9.507304,7212.55457,0.707223,-97.154074,2157.516962,0.202641,-106.661378,4,201911


Feature priority (permutation importance on test):


Unnamed: 0,horizon,feature,importance_mean,importance_std,n
19,1,qt_venda_liquida_roll_min_2,1747.843591,46.2057,3
16,1,qt_venda_liquida_roll_mean_5,787.493172,17.849784,3
0,1,ciclo_index,725.841735,33.358943,3
7,1,qt_venda_liquida_lag_1,583.90988,15.240747,3
20,1,qt_venda_liquida_roll_min_5,431.712588,14.838267,3
15,1,qt_venda_liquida_roll_mean_2,337.598304,9.001675,3
5,1,pct_desconto_change_1,259.749269,18.256296,3
27,1,vl_preco_pct_change_1,229.624496,11.899164,3
9,1,qt_venda_liquida_lag_3,225.834314,11.20202,3
25,1,vl_preco,199.119138,13.191217,3


Feature priority by group:


Unnamed: 0,horizon,group,group_importance_mean,n_features
6,1,rolling_stats,3655.606087,14
3,1,lags,832.960417,3
0,1,cycle_index,725.841735,2
5,1,price_features,636.184492,4
1,1,discount_features,299.763304,2
4,1,other,262.536225,2
2,1,flags_events,116.609085,2
13,2,rolling_stats,2698.169535,14
12,2,price_features,934.780207,4
7,2,cycle_index,765.858083,2


**Análise overfitting**

A análise foi realizada por meio de validação walk-forward com janela expansiva, comparando sistematicamente o desempenho do modelo nos conjuntos de treino e teste para diferentes horizontes de previsão.

Os resultados indicam que existe um gap controlado e esperado entre treino e teste, com o WAPE médio no teste aumentando gradualmente em relação ao treino à medida que o horizonte cresce, variando de aproximadamente 0,19 ponto percentual no horizonte 1 para cerca de 0,31 ponto percentual no horizonte 3.

Esse comportamento é consistente com a maior incerteza inerente a previsões de médio prazo e não caracteriza overfitting, uma vez que não há explosão do erro fora da amostra nem degradação abrupta de performance. Além disso, observa-se que o próprio erro no treino também cresce com o horizonte, o que reforça a ausência de vazamento de informação e indica que o modelo está aprendendo padrões reais da demanda, e não apenas memorizando o histórico.

Os splits com maior gap de WAPE concentram-se principalmente nos horizontes 2 e 3 e nos ciclos mais recentes do histórico, indicando cenários naturalmente mais difíceis de prever. Mesmo nesses casos, o aumento do erro do treino para o teste ocorre de forma gradual e consistente, sem sinais de colapso do modelo ou explosão do erro fora da amostra, o que afasta a hipótese de overfitting. O comportamento observado está mais associado ao aumento da incerteza temporal e a possíveis mudanças de regime (como campanhas ou variações atípicas de demanda) do que a um ajuste excessivo ao histórico. Assim, os resultados confirmam que o modelo mantém boa capacidade de generalização, sendo robusto para horizontes curtos e médios, com necessidade apenas de tratamento adicional de risco nos horizontes mais longos.

**Feature importance **

A análise de feature importance por grupo mostra que as estatísticas móveis (rolling statistics) são, de forma consistente, as variáveis mais relevantes para a previsão de demanda em todos os horizontes avaliados.

No horizonte 1, esse grupo domina amplamente a explicação do modelo, indicando que a dinâmica recente da série (médias e volatilidade dos últimos ciclos) é o principal determinante da demanda de curto prazo. À medida que o horizonte aumenta, a importância relativa das rolling stats diminui, mas permanece como o grupo mais influente, refletindo a persistência temporal da demanda.

Nos horizontes mais longos, observa-se um ganho de relevância das variáveis de lags e das features de preço, sugerindo que o modelo passa a depender mais de padrões históricos explícitos e de efeitos econômicos estruturais para projetar a demanda. As variáveis de posição do ciclo (cycle_index) mantêm importância estável ao longo dos horizontes, capturando efeitos sazonais implícitos mesmo sem uma definição explícita de calendário anual. Já as flags de eventos e campanhas apresentam impacto moderado, indicando que campanhas influenciam a demanda, mas não são o principal motor explicativo frente à inércia e ao histórico da série.

De forma geral, os resultados confirmam que o modelo está ancorado em sinais temporais robustos, priorizando comportamento passado e tendência recente, enquanto incorpora preço, desconto e eventos como ajustes de segunda ordem. Essa hierarquia de importância é coerente com problemas reais de previsão de vendas e reforça que o feature engineering está alinhado com a lógica de negócio, sem dependência excessiva de variáveis exógenas voláteis.



**Avaliação final do modelo**

O problema de previsão de vendas foi modelado como um problema supervisionado de séries temporais, no qual a dependência temporal é capturada por meio de feature engineering (lags, estatísticas móveis, variáveis de preço, desconto e campanhas), em vez de modelos autorregressivos clássicos.

Para a modelagem pontual, foi adotado o HistGradientBoostingRegressor, um algoritmo baseado em gradient boosting com árvores histogramizadas, escolhido por sua capacidade de capturar não-linearidades e interações complexas entre variáveis explicativas, além de apresentar boa escalabilidade e robustez para grandes volumes de dados e múltiplas séries (material, canal, região).

A avaliação do modelo foi conduzida utilizando uma estratégia de validação walk-forward com janela expansiva. Em cada split, o modelo foi treinado apenas com dados históricos disponíveis até determinado ciclo e testado em ciclos futuros, evitando qualquer vazamento de informação. Essa avaliação foi repetida para múltiplos horizontes de previsão (h = 1, 2 e 3 ciclos à frente), permitindo analisar como o erro evolui à medida que o horizonte aumenta. As métricas utilizadas foram MAE, WAPE e Bias, escolhidas por sua interpretação direta em problemas de demanda: MAE mede erro absoluto médio, WAPE fornece um erro relativo robusto ao volume total vendido e Bias indica tendência sistemática de super ou subestimação.

Os resultados mostram que o modelo apresenta desempenho excelente nos horizontes curtos, com WAPE médio inferior a 1% até o horizonte 3, valor significativamente abaixo do que normalmente é observado em problemas reais de previsão de vendas, nos quais erros entre 10% e 20% já são considerados aceitáveis. Observa-se um crescimento monotônico do erro conforme o horizonte aumenta, com MAE e WAPE evoluindo de forma gradual de h=1 para h=3. Esse comportamento é esperado em séries temporais e indica que o modelo é estável e consistente, sem sinais de data leakage ou sobreajuste temporal.

Em termos de viés, o modelo apresenta comportamento praticamente neutro no horizonte 1, com bias médio muito próximo de zero, indicando ausência de super ou subestimação sistemática no curto prazo. À medida que o horizonte aumenta, surge uma tendência moderada de superestimação, mais perceptível no horizonte 3. Esse padrão é comum em modelos supervisionados baseados em lags e estatísticas móveis, uma vez que a incerteza cresce com a distância temporal.

Do ponto de vista de negócio, isso sugere que o modelo é confiável para planejamento operacional de curto prazo, enquanto previsões de médio prazo devem ser utilizadas com atenção adicional, possivelmente combinadas com ajustes de viés ou abordagens probabilísticas (como regressão quantílica) para suporte a decisões de risco logístico.



**6 Tradução para risco logístico: safety stock e nível de serviço (95%)**

A conexão prática é:
- Erro de previsão → incerteza de demanda no lead time → safety stock.
- Para nível de serviço 95% (z ≈ 1.645), um cálculo simples:

\[
SS = z \cdot \sigma_{LT}
\]

onde \(\sigma_{LT}\) pode ser aproximado por \(\sigma_{erro} \cdot \sqrt{L}\) se assumirmos erros independentes e \(L\) = lead time em número de períodos/ciclos.

Levando em consideração: **lead time = 3 dias**, precisamos mapear “dias → ciclos”.  
Abaixo, deixo uma implementação parametrizável: você fornece `days_per_cycle` (ciclo de 21 dias - 17 ciclos no ano).

In [16]:
def compute_safety_stock(
    eval_df: pd.DataFrame,
    service_level: float = 0.95,
    lead_time_days: int = 3,
    days_per_cycle: int = 21,
    group_keys: Optional[List[str]] = None,
) -> pd.DataFrame:
    """Calcula safety stock baseado no desvio padrão do erro e lead time.

    - obs.: abordagem simplificada e prática para conectar previsão a estoque.
    - group_keys: se None, calcula global; senão, calcula por segmento (ex.: material/canal/região).
    """
    if group_keys is None:
        group_keys = []

    # z para N(0,1) ~ 95% (aprox). Se quiser exato, usar scipy.stats.norm.ppf
    z_value = 1.645 if abs(service_level - 0.95) < 1e-9 else 1.645

    lead_time_cycles = max(1.0, lead_time_days / float(days_per_cycle))

    df = eval_df.copy()
    df["error"] = df["y_true"] - df["y_pred"]

    agg = (
        df.groupby(group_keys)
        .agg(
            error_std=("error", "std"),
            mae=("abs_error", "mean"),
            wape=("abs_error", lambda s: float(s.sum() / np.abs(df.loc[s.index, "y_true"]).sum()) if np.abs(df.loc[s.index, "y_true"]).sum() > 0 else np.nan),
            demand_mean=("y_true", "mean"),
        )
        .reset_index()
    )

    agg["sigma_lt"] = agg["error_std"] * np.sqrt(lead_time_cycles)
    agg["safety_stock_95"] = z_value * agg["sigma_lt"]

    # indicador relativo: SS / demanda média
    agg["safety_stock_ratio"] = agg["safety_stock_95"] / agg["demand_mean"].replace(0, np.nan)

    return agg.sort_values("safety_stock_ratio", ascending=False)


A previsão foi traduzida em decisão logística por meio de um cálculo simplificado de estoque de segurança, conectando diretamente o erro do modelo à incerteza da demanda durante o lead time.

Partiu-se da premissa de que o erro de previsão representa a variabilidade não explicada da demanda e que, assumindo erros aproximadamente independentes, o desvio-padrão da incerteza no lead time pode ser obtido escalando o desvio-padrão do erro pela raiz do número de ciclos correspondentes ao lead time. Para um nível de serviço de 95%, foi adotado o fator z~1,645, permitindo calcular o estoque de segurança como o z x desvio-padrão da demanda durante o lead time.

A implementação é parametrizável, possibilitando ajustar a conversão entre dias e ciclos (via days_per_cycle) e calcular o estoque de forma global ou segmentada (por material, canal ou região). Como resultado, o modelo não apenas fornece previsões pontuais, mas transforma a incerteza em uma quantidade objetiva de estoque, permitindo comparar o estoque de segurança em termos absolutos e relativos à demanda média, e suportar decisões alinhadas ao nível de serviço desejado e ao risco operacional.

In [17]:
# Exemplo: safety stock por série

series_ss = compute_safety_stock(
    eval_df=predictions_df[predictions_df["horizon"] == 1],  # horizonte 1 para reposição mais imediata
    service_level=0.95,
    lead_time_days=3,
    days_per_cycle=21,  # AJUSTE conforme definição real do ciclo
    group_keys=series_keys,
)

# Filtra séries com demanda média mínima (evita explosão por baixo volume)
series_ss_filtered = series_ss[series_ss["demand_mean"] >= series_ss["demand_mean"].quantile(0.75)].head(20)
series_ss_filtered

Unnamed: 0,cod_material,cod_canal,cod_regiao,error_std,mae,wape,demand_mean,sigma_lt,safety_stock_95,safety_stock_ratio
407,125286,anon_S7,anon_S10,102568.117222,74076.360651,2.80952,26366.2,102568.117222,168724.55283,6.399275
4096,478140,anon_S7,anon_S1,49777.110144,46526.073759,2.951413,15764.0,49777.110144,81883.346187,5.194325
3157,444552,anon_S7,anon_S10,32816.63526,12942.033569,1.004068,12889.6,32816.63526,53983.365002,4.188133
2911,441408,anon_S0,anon_S10,63573.198423,24934.65513,0.991288,25153.8,63573.198423,104577.911405,4.157539
1322,169620,anon_S0,anon_S10,143252.445386,58954.947232,1.017064,57965.8,143252.445386,235650.272661,4.065333
3155,444552,anon_S0,anon_S10,28933.678633,15695.569236,1.287023,12195.25,28933.678633,47595.901351,3.902823
1956,420948,anon_S0,anon_S10,21971.024996,9394.4443,0.962033,9765.2,21971.024996,36142.336118,3.701136
1340,170310,anon_S0,anon_S1,150480.792584,60337.52372,0.886308,68077.4,150480.792584,247540.903801,3.636169
3516,450468,anon_S0,anon_S10,33390.892723,14323.106023,0.947822,15111.6,33390.892723,54928.018529,3.634825
527,135336,anon_S0,anon_S10,121640.974342,55644.068004,1.008501,55175.0,121640.974342,200099.402792,3.626632


**7. Proposta aspiracional: consistência hierárquica (categoria/região)**

No planejamento de demanda precisamos que:
- previsão por SKU some exatamente para a previsão por categoria/região (para orçamento, produção, malha logística),
- evitando “ilhas” inconsistentes.

Uma abordagem padrão é reconciliação hierárquica (ex.: OLS/MinT).  
Como implementação mínima e explicável, dá para fazer um scaling:

1 calcula-se a previsão agregada no nível superior (ex.: categoria vs região), com um modelo próprio;  
2 calcula-se a soma das previsões “bottom” (SKU vs canal vs região) para o mesmo agregado;  
3 aplica-se um fator de ajuste para reconciliar.  



In [18]:
def proportional_reconciliation(
    bottom_forecast: pd.DataFrame,
    top_forecast: pd.DataFrame,
    bottom_to_top_keys: List[str],
    value_col: str = "y_pred",
    top_value_col: str = "y_pred_top",
) -> pd.DataFrame:
    """Reconciliação proporcional simples para garantir soma no nível superior.

    - bottom_forecast: previsões no nível granular (ex.: SKU×canal×região)
    - top_forecast: previsões no nível agregado (ex.: categoria×região)
    - bottom_to_top_keys: chaves do agregado (precisam existir em bottom_forecast e top_forecast)
    """
    bf = bottom_forecast.copy()
    tf = top_forecast.copy()

    bf_sum = bf.groupby(bottom_to_top_keys)[value_col].sum().reset_index().rename(columns={value_col: "bottom_sum"})
    tf = tf.rename(columns={top_value_col: "top_value"})

    merged = bf.merge(bf_sum, on=bottom_to_top_keys, how="left").merge(tf[bottom_to_top_keys + ["top_value"]], on=bottom_to_top_keys, how="left")

    # Evita divisão por zero
    merged["recon_factor"] = merged["top_value"] / merged["bottom_sum"].replace(0, np.nan)
    merged["recon_factor"] = merged["recon_factor"].fillna(1.0)

    merged[f"{value_col}_reconc"] = merged[value_col] * merged["recon_factor"]
    return merged

Essa proposta trata o problema de consistência hierárquica. Na prática, se pode ter previsões no nível granular (SKU×canal×região) que, quando somadas, não batem com a previsão no nível de gestão (categoria×região), o que gera conflitos entre orçamento, produção e logística. A função proportional_reconciliation resolve isso de forma simples aplicando uma reconciliação proporcional (scaling).

Primeiro, ela calcula a soma das previsões do nível “bottom” dentro de cada agregado definido por bottom_to_top_keys (bottom_sum). Depois, junta essa soma com a previsão do nível superior (top_value). Em seguida, calcula um fator de reconciliação recon_factor = top_value / bottom_sum (com proteção contra divisão por zero) e multiplica cada previsão granular por esse fator, gerando y_pred_reconc. O efeito é que todas as previsões “bottom” são ajustadas na mesma proporção dentro do agregado, garantindo que a soma reconciliada passe a ser exatamente igual à previsão do nível superior.

