# MLflow Structure

In [1]:
import os, json, math, tempfile
from dataclasses import dataclass
from typing import Dict, List, Iterator, Tuple, Any

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from sklearn.linear_model import LinearRegression, Ridge, ElasticNet

import mlflow

import statsmodels.api as sm



In [2]:
import os
# import mlflow

TRACK_DIR = os.path.abspath("./mlruns_rga_test")
mlflow.set_tracking_uri(f"file:///{TRACK_DIR.replace(os.sep, '/')}") # for local testing. not needed for Databricks


In [44]:
import pandas as pd
import numpy as np

def make_tiny_df():
    weeks = pd.date_range("2023-01-01", "2025-06-30", freq="W-SUN")
    n_weeks = len(weeks)
    store_ids = list(range(1, 11))  # 10 stores: 1, 2, ..., 10
    n_stores = len(store_ids)

    rng = np.random.default_rng(42)
    n = n_weeks * n_stores

    # Each store has the same week_start: cross product of store_id x week_start
    df = pd.DataFrame({
        "store_id": np.repeat(store_ids, n_weeks),
        "week_start": np.tile(weeks, n_stores),
        # target: raw GC (no log)
        "GC": rng.uniform(100, 10000, n),
    })

    # digital_promo features: within 0-1
    for c in ['digital_promo_1', 'digital_promo_2', 'digital_promo_3', 'digital_promo_4', 'digital_promo_5']:
        df[c] = rng.uniform(0, 1, n)

    # media features: within 100-100000
    for c in ['media_1', 'media_2', 'media_3', 'media_4', 'media_5']:
        df[c] = rng.uniform(100, 100_000, n)

    return df

df = make_tiny_df()


Log strategy

In [39]:
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple, Optional


def _compute_log_offset(
    s: pd.Series,
    strategy: str = "one",
    eps: float = 1e-6,
) -> float:
    """
    Determine offset for log transform.
    
    strategy:
      - "one": offset = 1.0
      - "median": offset = eps * median(positive values)
    """
    if strategy == "one":
        return 1.0

    if strategy == "median":
        pos = s[s > 0]
        if len(pos) == 0:
            return 1.0
        return eps * float(pos.median())

    raise ValueError(f"Unknown offset strategy: {strategy}")


In [40]:
def add_log_features(
    df: pd.DataFrame,
    features: List[str],
    offset_strategy: str = "one",
    eps: float = 1e-6,
    drop_negative: bool = False,
) -> Tuple[pd.DataFrame, Dict[str, Dict]]:
    """
    Create log-transformed features safely.

    Parameters
    ----------
    df : pandas DataFrame
    features : list of column names to transform
    offset_strategy : "one" or "median"
    eps : used only if offset_strategy="median"
    drop_negative : if True, negative values are set to NaN

    Returns
    -------
    df_out : DataFrame with new log_{feature} columns
    log_metadata : dict keyed by feature with transformation stats
    """
    df_out = df.copy()
    log_metadata = {}

    for f in features:
        if f not in df_out.columns:
            raise ValueError(f"Feature '{f}' not found in DataFrame")

        s = df_out[f].astype(float)

        meta = {
            "raw_feature": f,
            "n_rows": len(s),
            "n_missing": int(s.isna().sum()),
            "n_zero": int((s == 0).sum()),
            "n_negative": int((s < 0).sum()),
        }

        # handle negatives
        if drop_negative:
            s = s.mask(s < 0)

        # compute offset
        offset = _compute_log_offset(s, strategy=offset_strategy, eps=eps)
        meta["log_offset"] = offset
        meta["offset_strategy"] = offset_strategy

        # safe log
        log_col = f"log_{f}"
        df_out[log_col] = np.log(s + offset)

        # infinities → NaN
        df_out[log_col] = df_out[log_col].replace([np.inf, -np.inf], np.nan)

        meta["n_log_missing"] = int(df_out[log_col].isna().sum())

        log_metadata[f] = meta

    return df_out, log_metadata


In [47]:
log_features = ['GC',
              'media_1','media_2','media_3','media_4','media_5'
]

df, log_meta = add_log_features(
    df=df,
    features=log_features,
    offset_strategy="one",   # stable, interpretable
    drop_negative=True,
)


In [48]:
df

Unnamed: 0,store_id,week_start,GC,digital_promo_1,digital_promo_2,digital_promo_3,digital_promo_4,digital_promo_5,media_1,media_2,media_3,media_4,media_5,log_GC,log_media_1,log_media_2,log_media_3,log_media_4,log_media_5
0,1,2023-01-01,7762.164881,0.392485,0.877477,0.531428,0.138708,0.863492,61326.061549,17201.343184,97417.410477,71648.944633,3607.031881,8.957145,11.023976,9.752801,11.486770,11.179548,8.190918
1,1,2023-01-08,4444.896554,0.923900,0.959596,0.586109,0.293273,0.240285,3988.161026,67742.162031,5269.544085,62586.983836,16846.864943,8.399737,8.291336,11.123479,8.569889,11.044329,9.731979
2,1,2023-01-15,8600.119407,0.578978,0.228336,0.510166,0.305943,0.839874,12813.036067,2574.355308,79595.063348,65263.859444,41218.643405,9.059648,9.458296,7.853743,11.284720,11.086209,10.626670
3,1,2023-01-22,7003.943488,0.004619,0.931824,0.036025,0.261298,0.078545,50045.750273,16101.731418,64607.084138,17677.173940,71819.519391,8.854371,10.820713,9.686744,11.076095,9.780086,11.181925
4,1,2023-01-29,1032.355744,0.038542,0.571213,0.497091,0.460761,0.159928,74659.258456,90270.550139,26217.482191,66950.482733,36412.155677,6.940567,11.220703,11.410578,10.174220,11.111723,10.502685
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1305,10,2025-06-01,9282.298110,0.494260,0.560057,0.626922,0.845786,0.792066,39787.334767,6773.695983,913.135423,53610.359616,88195.150862,9.135972,10.591329,8.820950,6.817979,10.889516,11.387319
1306,10,2025-06-08,6014.205838,0.197482,0.536615,0.184140,0.035530,0.196871,61258.024739,48449.102347,63222.581574,5875.828796,31326.684255,8.702046,11.022866,10.788290,11.054433,8.678773,10.352257
1307,10,2025-06-15,6722.581824,0.860886,0.828648,0.107689,0.559509,0.242228,12548.255828,43434.769555,32306.555176,17573.507161,63274.505705,8.813376,9.437417,10.679039,10.383056,9.774205,11.055254
1308,10,2025-06-22,620.502304,0.823068,0.138851,0.437612,0.721622,0.841589,16337.065503,19778.196930,77309.609239,78198.384217,37931.698244,6.432140,9.701253,9.892386,11.255586,11.267017,10.543569


1) Configuration objects (easy to extend)

In [4]:
@dataclass(frozen=True)
class TimeSplit:
    time_split_id: str
    train_start: str
    train_end: str
    test_start: str
    test_end: str

@dataclass(frozen=True)
class RunConfig:
    # "fixed" knobs you said exist
    target_node: str              # "GC" or "AC"
    panel_control: str            # "FE" / "Mundlak" / "Bayesian" (placeholder for now)
    algorithm: str                # "OLS" / "Ridge" / "ElasticNet"
    alpha: float = 0.0
    l1_ratio: float = 0.0

    # experiment dimensions
    feature_block_set_id: str = ""
    features: Tuple[str, ...] = tuple()
    time_split: TimeSplit = None
    seed: int = 42

2) MLflow setup (experiment + common tags)

Databricks tip: use a workspace path like "/Users/<you>/RGA/Regressions".

In [5]:
# def setup_mlflow(experiment_name: str, common_tags: Dict[str, str]) -> None:
#     mlflow.set_experiment(experiment_name)
#     mlflow.set_tags(common_tags)

def setup_mlflow(experiment_name: str) -> None:
    mlflow.set_experiment(experiment_name)


3) Search strategy component (swap later)

This yields a stream of RunConfig objects. Keep it dumb/simple now.

In [6]:
# =========================
# 3) Feature subset sampler (K-per-block)
# =========================
from typing import Dict, List, Tuple
import numpy as np

def sample_k_per_block(
    feature_blocks: Dict[str, List[str]],
    k_per_block: Dict[str, int],
    n_samples: int,
    seed: int = 42,
    allow_all_if_small: bool = True,
) -> List[Tuple[str, Tuple[str, ...]]]:
    """
    Returns list of (feature_set_id, features_tuple).
    Each sample picks K features from EACH block and unions them.

    Example k_per_block: {"promo":3, "media":3, "ops":2}
    """
    rng = np.random.default_rng(seed)

    blocks = list(feature_blocks.keys())
    if set(k_per_block.keys()) != set(blocks):
        missing = set(blocks) - set(k_per_block.keys())
        extra = set(k_per_block.keys()) - set(blocks)
        raise ValueError(f"k_per_block keys must match feature_blocks keys. missing={missing}, extra={extra}")

    results = []
    seen = set()

    for i in range(n_samples):
        chosen = []
        for b in blocks:
            feats = feature_blocks[b]
            k = k_per_block[b]

            if len(feats) < k:
                if allow_all_if_small:
                    pick = list(feats)
                else:
                    raise ValueError(f"Block '{b}' has only {len(feats)} features but k={k}.")
            else:
                pick = rng.choice(feats, size=k, replace=False).tolist()

            chosen.extend(pick)

        chosen_sorted = tuple(sorted(set(chosen)))
        if chosen_sorted in seen:
            continue
        seen.add(chosen_sorted)

        fs_id = f"kperblock__{i:04d}"
        results.append((fs_id, chosen_sorted))

    return results


In [None]:
# def generate_feature_sets(feature_blocks: Dict[str, List[str]]) -> Dict[str, Tuple[str, ...]]:
#     """
#     Example: feature_blocks = {"promo":[...], "media":[...], "ops":[...]}
#     Return a dict of feature_block_set_id -> tuple(features)
#     """
#     # Minimal example: each single block + all blocks
#     out = {}
#     for block, feats in feature_blocks.items():
#         out[f"block__{block}"] = tuple(feats)
#     all_feats = tuple(sorted({f for feats in feature_blocks.values() for f in feats}))
#     out["block__ALL"] = all_feats
#     return out

# def search_space(
#     target_node: str,
#     panel_controls: List[str],
#     algorithms: List[Dict[str, Any]],
#     time_splits: List[TimeSplit],
#     feature_sets: Dict[str, Tuple[str, ...]],
#     seed: int = 42,
# ) -> Iterator[RunConfig]:
#     for pc in panel_controls:
#         for algo in algorithms:
#             for ts in time_splits:
#                 for fs_id, feats in feature_sets.items():
#                     yield RunConfig(
#                         target_node=target_node,
#                         panel_control=pc,
#                         algorithm=algo["name"],
#                         alpha=float(algo.get("alpha", 0.0)),
#                         l1_ratio=float(algo.get("l1_ratio", 0.0)),
#                         feature_block_set_id=fs_id,
#                         features=feats,
#                         time_split=ts,
#                         seed=seed,
#                     )


In [7]:
from typing import Iterator, Dict, Any, List, Tuple

def search_space_with_feature_sets(
    target_node: str,
    panel_controls: List[str],
    algorithms: List[Dict[str, Any]],
    time_splits: List[TimeSplit],
    feature_sets: List[Tuple[str, Tuple[str, ...]]],
    seed: int = 42,
) -> Iterator[RunConfig]:
    for pc in panel_controls:
        for algo in algorithms:
            for ts in time_splits:
                for fs_id, feats in feature_sets:
                    yield RunConfig(
                        target_node=target_node,
                        panel_control=pc,
                        algorithm=algo["name"],
                        alpha=float(algo.get("alpha", 0.0)),
                        l1_ratio=float(algo.get("l1_ratio", 0.0)),
                        feature_block_set_id=fs_id,
                        features=feats,
                        time_split=ts,
                        seed=seed,
                    )


4) Model factory component (swap later)

This is where FE/Mundlak/Bayesian wrappers will go. For now it’s vanilla sklearn.

In [8]:
from dataclasses import dataclass
from typing import Callable
import pandas as pd
import numpy as np

@dataclass
class FitResult:
    model: object
    coef_df: pd.DataFrame
    predict_fn: Callable[[pd.DataFrame], np.ndarray]  # predict on any df

def standardize_coef_df(feature_index, coef, t_stat=None, p_value=None) -> pd.DataFrame:
    df = pd.DataFrame({
        "feature": list(feature_index),
        "coef": np.asarray(coef, dtype=float),
        "t_stat": np.asarray(t_stat, dtype=float) if t_stat is not None else np.nan,
        "p_value": np.asarray(p_value, dtype=float) if p_value is not None else np.nan,
    })
    df["abs_coef"] = df["coef"].abs()
    df["sign"] = np.sign(df["coef"]).astype(int)
    df = df.sort_values("abs_coef", ascending=False).reset_index(drop=True)
    df["rank_abscoef"] = np.arange(1, len(df) + 1)
    df["is_significant_05"] = df["p_value"].apply(lambda x: (x < 0.05) if pd.notna(x) else False)
    return df



In [31]:
from linearmodels.panel import PanelOLS
import pandas as pd

def fit_fe_linearmodels(
    df_train: pd.DataFrame,
    y_col: str,
    x_cols: List[str],
    entity_col: str = "store_id",
    time_col: str = "week_start",
    time_effects: bool = False,
) -> FitResult:
    df_tr = df_train.copy()
    df_tr[time_col] = pd.to_datetime(df_tr[time_col])
    df_tr = df_tr.set_index([entity_col, time_col]).sort_index()

    y = df_tr[y_col].astype(float)
    X = df_tr[x_cols].astype(float)

    mod = PanelOLS(y, X, entity_effects=True, time_effects=time_effects)
    res = mod.fit(cov_type="clustered", cluster_entity=True)

    coef_df = standardize_coef_df(
        res.params.index,
        res.params.values,
        t_stat=res.tstats.reindex(res.params.index).values,
        p_value=res.pvalues.reindex(res.params.index).values,
    )

    def predict_fn(df_any: pd.DataFrame) -> np.ndarray:
        df_te = df_any.copy()
        df_te[time_col] = pd.to_datetime(df_te[time_col])
        df_te = df_te.set_index([entity_col, time_col]).sort_index()
        X_te = df_te[x_cols].astype(float)
        pred = res.predict(exog=X_te)
        # res.predict returns a DataFrame/Series aligned to index
        return np.asarray(pred).ravel()

    return FitResult(model=res, coef_df=coef_df, predict_fn=predict_fn)


In [None]:
from dataclasses import dataclass
from typing import List, Callable, Optional, Dict, Any
import numpy as np
import pandas as pd
import statsmodels.api as sm
from statsmodels.regression.mixed_linear_model import MixedLM


@dataclass
class HLMFitResult:
    model: object                       # fitted statsmodels result
    coef_df: pd.DataFrame               # fixed-effect coefficients + inference
    predict_fn: Callable[[pd.DataFrame], np.ndarray]   # prediction function


def fit_hlm_mixedlm(
    df_train: pd.DataFrame,
    y_col: str,
    x_cols: List[str],
    group_col: str = "store_id",
    add_intercept: bool = True,
    random_slopes: Optional[List[str]] = None,
    reml: bool = True,
    maxiter: int = 200,
    method: str = "lbfgs",
) -> HLMFitResult:
    """
    Hierarchical Linear Model (Mixed Effects) via statsmodels MixedLM.

    Default: random intercept by store_id
      y ~ X (fixed effects) + (1 | store)

    Optionally: random slopes for selected features (small list recommended)
      y ~ X + (1 + z1 + z2 | store)

    Returns:
      - fitted model result
      - coef_df: fixed-effect table with std err, z, pvalue
      - predict_fn: predicts using fixed effects + estimated random effects when available
    """
    if random_slopes is None:
        random_slopes = []

    # Basic column checks
    needed = [y_col, group_col] + x_cols + list(random_slopes)
    missing = [c for c in needed if c not in df_train.columns]
    if missing:
        raise ValueError(f"Missing columns for MixedLM: {missing}")

    df = df_train.copy()

    # Build fixed effects design matrix
    X = df[x_cols].astype(float)
    if add_intercept:
        X = sm.add_constant(X, has_constant="add")

    y = df[y_col].astype(float)
    groups = df[group_col]

    # Build random effects design matrix
    # - If empty: random intercept only
    # - If slopes: include intercept + selected slope columns
    if len(random_slopes) == 0:
        # random intercept only -> a single column of ones
        Z = np.ones((len(df), 1), dtype=float)
        re_names = ["re_intercept"]
    else:
        Z = df[random_slopes].astype(float).copy()
        # include random intercept as well
        Z.insert(0, "re_intercept", 1.0)
        re_names = list(Z.columns)
        Z = Z.to_numpy()

    # Fit MixedLM
    model = MixedLM(endog=y, exog=X, groups=groups, exog_re=Z)
    res = model.fit(reml=reml, method=method, maxiter=maxiter, disp=False)

    # Fixed-effect coefficient table
    # res.fe_params / res.bse_fe are fixed effects only
    fe_params = res.fe_params
    bse_fe = res.bse_fe

    zvals = fe_params / bse_fe
    # two-sided p-values (normal approx)
    pvals = 2 * (1 - sm.stats.norm.cdf(np.abs(zvals)))

    coef_df = pd.DataFrame({
        "feature": fe_params.index,
        "coef": fe_params.values,
        "std_err": bse_fe.values,
        "z_stat": zvals.values,
        "p_value": pvals.values,
    })

    # Add sign / rank
    coef_df["abs_coef"] = coef_df["coef"].abs()
    coef_df["sign"] = np.sign(coef_df["coef"]).astype(int)
    coef_df = coef_df.sort_values("abs_coef", ascending=False).reset_index(drop=True)
    coef_df["rank_abscoef"] = np.arange(1, len(coef_df) + 1)
    coef_df["is_significant_05"] = coef_df["p_value"] < 0.05

    # Prediction function
    # Uses fixed effects always; uses random effects for known groups if available
    def predict_fn(df_any: pd.DataFrame) -> np.ndarray:
        dfp = df_any.copy()
        Xp = dfp[x_cols].astype(float)
        if add_intercept:
            Xp = sm.add_constant(Xp, has_constant="add")

        # Fixed part
        yhat = np.asarray(Xp @ res.fe_params, dtype=float)

        # Random part (if group seen in training)
        # res.random_effects: dict(group -> array of RE coefficients)
        if hasattr(res, "random_effects") and res.random_effects is not None:
            g = dfp[group_col].values
            for i in range(len(dfp)):
                gi = g[i]
                if gi in res.random_effects:
                    b = np.asarray(res.random_effects[gi]).ravel()  # (q,)
                    if len(random_slopes) == 0:
                        # random intercept only
                        yhat[i] += b[0]
                    else:
                        # intercept + slopes
                        # build row vector [1, z1, z2, ...]
                        zrow = [1.0] + [float(dfp.iloc[i][c]) for c in random_slopes]
                        yhat[i] += float(np.dot(b, np.asarray(zrow, dtype=float)))

        return yhat

    return HLMFitResult(model=res, coef_df=coef_df, predict_fn=predict_fn)


In [None]:
def fit_hlm_as_fitresult(
    df_train: pd.DataFrame,
    y_col: str,
    x_cols: List[str],
    group_col: str = "store_id",
    add_intercept: bool = True,
    random_slopes: Optional[List[str]] = None,
    reml: bool = True,
    maxiter: int = 200,
    method: str = "lbfgs",
) -> Tuple[FitResult, str]:
    """
    Wrap your existing fit_hlm_mixedlm() so the rest of the pipeline
    can treat it like any other backend (FitResult).
    Returns:
      - FitResult (model, coef_df, predict_fn)
      - summary_text (for artifact logging)
    """
    hlm_res = fit_hlm_mixedlm(
        df_train=df_train,
        y_col=y_col,
        x_cols=x_cols,
        group_col=group_col,
        add_intercept=add_intercept,
        random_slopes=random_slopes,
        reml=reml,
        maxiter=maxiter,
        method=method,
    )
    summary_text = hlm_res.model.summary().as_text()
    fit_res = FitResult(model=hlm_res.model, coef_df=hlm_res.coef_df, predict_fn=hlm_res.predict_fn)
    return fit_res, summary_text


In [None]:
# Random intercept only (recommended default)
fit_res = fit_hlm_mixedlm(
    df_train=train_df,
    y_col="log_GC",
    x_cols=list(cfg.features),
    group_col="store_id",
    random_slopes=None
)

cdf = fit_res.coef_df
pred_test = fit_res.predict_fn(test_df)

# Only pick 1–3 “key drivers” for random slopes, otherwise it can get slow/unstable
fit_res = fit_hlm_mixedlm(
    df_train=train_df,
    y_col="log_GC",
    x_cols=list(cfg.features),
    group_col="store_id",
    random_slopes=["avg_price", "promo_depth"]  # small list
)


In [32]:
from linearmodels.panel import RandomEffects
import statsmodels.api as sm

def fit_mundlak_linearmodels(
    df_train: pd.DataFrame,
    y_col: str,
    x_cols: List[str],
    entity_col: str = "store_id",
    time_col: str = "week_start",
    include_intercept: bool = True,
) -> FitResult:
    df_tr = df_train.copy()
    df_tr[time_col] = pd.to_datetime(df_tr[time_col])

    # Compute entity means on TRAIN ONLY (no leakage)
    means_by_entity = df_tr.groupby(entity_col)[x_cols].mean()

    def make_X_aug(df_any: pd.DataFrame) -> pd.DataFrame:
        df_any = df_any.copy()
        # join train means to any df (unseen entities -> NaN; you can choose how to handle later)
        m = df_any[[entity_col]].merge(
            means_by_entity.reset_index(),
            on=entity_col,
            how="left",
        )
        mean_cols = {c: f"{c}__mean_by_entity" for c in x_cols}
        for c in x_cols:
            df_any[mean_cols[c]] = m[c].values
        X = df_any[x_cols].astype(float)
        X_means = df_any[[mean_cols[c] for c in x_cols]].astype(float)
        X_aug = pd.concat([X, X_means], axis=1)
        if include_intercept:
            X_aug = sm.add_constant(X_aug, has_constant="add")
        return X_aug

    # Panel index
    df_tr = df_tr.set_index([entity_col, time_col]).sort_index()
    y = df_tr[y_col].astype(float)

    # Need X_aug aligned with same index
    df_tr_reset = df_tr.reset_index()
    X_aug = make_X_aug(df_tr_reset)
    X_aug.index = df_tr.index  # align to panel index

    mod = RandomEffects(y, X_aug)
    res = mod.fit(cov_type="clustered", cluster_entity=True)

    coef_df = standardize_coef_df(
        res.params.index,
        res.params.values,
        t_stat=res.tstats.reindex(res.params.index).values,
        p_value=res.pvalues.reindex(res.params.index).values,
    )

    def predict_fn(df_any: pd.DataFrame) -> np.ndarray:
        df_te = df_any.copy()
        df_te[time_col] = pd.to_datetime(df_te[time_col])
        df_te = df_te.set_index([entity_col, time_col]).sort_index()

        df_te_reset = df_te.reset_index()
        X_te_aug = make_X_aug(df_te_reset)
        X_te_aug.index = df_te.index

        pred = res.predict(exog=X_te_aug)
        return np.asarray(pred).ravel()

    return FitResult(model=res, coef_df=coef_df, predict_fn=predict_fn)


In [None]:
from dataclasses import dataclass
from typing import List, Dict, Callable, Optional
import numpy as np
import pandas as pd

def fit_bayesian_hierarchical_pymc(
    df_train: pd.DataFrame,
    y_col: str,
    x_cols: List[str],
    group_col: str = "store_id",
    standardize_X: bool = True,
    inference: str = "advi",          # "advi" (fast) or "nuts" (slower, more accurate)
    advi_steps: int = 20000,
    nuts_draws: int = 1000,
    nuts_tune: int = 1000,
    random_seed: int = 42,
):
    """
    Bayesian hierarchical linear model with random intercept by group (store).
    Returns:
      model, idata, coef_df, predict_fn
    """
    import pymc as pm
    import arviz as az

    df = df_train.copy()

    # --- Encode group as consecutive integers ---
    groups = pd.Categorical(df[group_col])
    g_idx = groups.codes
    n_groups = len(groups.categories)

    # --- Build design matrix ---
    X = df[x_cols].astype(float).to_numpy()
    y = df[y_col].astype(float).to_numpy()

    x_means = X.mean(axis=0)
    x_stds = X.std(axis=0, ddof=0)
    x_stds = np.where(x_stds == 0, 1.0, x_stds)

    if standardize_X:
        Xs = (X - x_means) / x_stds
    else:
        Xs = X

    with pm.Model() as model:
        # Hyperpriors for random intercepts
        sigma_a = pm.HalfNormal("sigma_a", sigma=1.0)
        a = pm.Normal("a", mu=0.0, sigma=sigma_a, shape=n_groups)  # group intercepts

        # Fixed effects priors
        beta = pm.Normal("beta", mu=0.0, sigma=1.0, shape=Xs.shape[1])

        # Noise
        sigma = pm.HalfNormal("sigma", sigma=1.0)

        mu = a[g_idx] + pm.math.dot(Xs, beta)
        pm.Normal("y_obs", mu=mu, sigma=sigma, observed=y)

        if inference.lower() == "advi":
            approx = pm.fit(
                n=advi_steps,
                method="advi",
                random_seed=random_seed,
            )
            idata = approx.sample(1000)
        elif inference.lower() == "nuts":
            idata = pm.sample(
                draws=nuts_draws,
                tune=nuts_tune,
                chains=2,
                target_accept=0.9,
                random_seed=random_seed,
                progressbar=True,
            )
        else:
            raise ValueError("inference must be 'advi' or 'nuts'")

    # --- Coefficient summary ---
    # beta is on standardized scale if standardize_X=True
    beta_samples = idata.posterior["beta"].stack(sample=("chain", "draw")).values  # [p, samples]
    beta_mean = beta_samples.mean(axis=1)
    beta_sd = beta_samples.std(axis=1, ddof=0)

    hdi = az.hdi(idata.posterior["beta"], hdi_prob=0.94).to_array().values  # shape [2, p]
    hdi_low, hdi_high = hdi[0, :], hdi[1, :]

    p_gt_0 = (beta_samples > 0).mean(axis=1)
    p_lt_0 = (beta_samples < 0).mean(axis=1)

    coef_df = pd.DataFrame({
        "feature": x_cols,
        "coef_post_mean": beta_mean,
        "coef_post_sd": beta_sd,
        "hdi94_low": hdi_low,
        "hdi94_high": hdi_high,
        "p_gt_0": p_gt_0,
        "p_lt_0": p_lt_0,
        "sign_post": np.sign(beta_mean).astype(int),
    })

    # Optional: convert back to original scale if you standardized X
    if standardize_X:
        coef_df["coef_post_mean_unscaled"] = coef_df["coef_post_mean"] / x_stds
        coef_df["coef_post_sd_unscaled"] = coef_df["coef_post_sd"] / x_stds
    else:
        coef_df["coef_post_mean_unscaled"] = coef_df["coef_post_mean"]
        coef_df["coef_post_sd_unscaled"] = coef_df["coef_post_sd"]

    coef_df["abs_coef"] = coef_df["coef_post_mean_unscaled"].abs()
    coef_df = coef_df.sort_values("abs_coef", ascending=False).reset_index(drop=True)
    coef_df["rank_abscoef"] = np.arange(1, len(coef_df) + 1)

    # --- Predict function ---
    def predict_fn(df_any: pd.DataFrame) -> np.ndarray:
        dfp = df_any.copy()
        # map groups
        gcat = pd.Categorical(dfp[group_col], categories=groups.categories)
        g = gcat.codes
        Xp = dfp[x_cols].astype(float).to_numpy()
        Xps = (Xp - x_means) / x_stds if standardize_X else Xp

        # posterior mean prediction
        a_mean = idata.posterior["a"].mean(dim=("chain", "draw")).values
        beta_mean_local = idata.posterior["beta"].mean(dim=("chain", "draw")).values
        mu_hat = a_mean[g] + Xps @ beta_mean_local
        return np.asarray(mu_hat, dtype=float)

    return model, idata, coef_df, predict_fn


In [10]:
from sklearn.linear_model import LinearRegression, Ridge, ElasticNet

def fit_sklearn_pooled(
    df_train: pd.DataFrame,
    y_col: str,
    x_cols: List[str],
    algorithm: str,
    alpha: float,
    l1_ratio: float,
    seed: int = 42,
) -> FitResult:
    X = df_train[x_cols].astype(float)
    y = df_train[y_col].astype(float)

    if algorithm == "OLS":
        model = LinearRegression()
    elif algorithm == "Ridge":
        model = Ridge(alpha=alpha, random_state=seed)
    elif algorithm == "ElasticNet":
        model = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=seed, max_iter=10000)
    else:
        raise ValueError(f"Unsupported algorithm for sklearn pooled: {algorithm}")

    model.fit(X, y)
    coef_df = standardize_coef_df(x_cols, model.coef_, t_stat=None, p_value=None)

    def predict_fn(df_any: pd.DataFrame) -> np.ndarray:
        return model.predict(df_any[x_cols].astype(float))

    return FitResult(model=model, coef_df=coef_df, predict_fn=predict_fn)


In [None]:
def fit_with_backend(
    df_train: pd.DataFrame,
    y_col: str,
    x_cols: List[str],
    cfg: RunConfig,
) -> FitResult:
    pc = cfg.panel_control.lower().strip()

    # If you asked for FE/Mundlak, we apply panel methods only for OLS-like estimation
    if pc == "fe" and cfg.algorithm == "OLS":
        return fit_fe_linearmodels(df_train, y_col, x_cols, time_effects=False)

    if pc == "mundlak" and cfg.algorithm == "OLS":
        return fit_mundlak_linearmodels(df_train, y_col, x_cols, include_intercept=True)

    if pc == "hlm" and cfg.algorithm == "OLS":
        fit_res, summary_text = fit_hlm_as_fitresult(
            df_train=df_train,
            y_col=y_col,
            x_cols=x_cols,
            group_col="store_id",
            random_slopes=None,   # start simple; you can parameterize later
            reml=True,
            maxiter=200,
            method="lbfgs",
        )
        return fit_res, summary_text
    
    # Otherwise fallback pooled sklearn (Ridge / ElasticNet / pooled OLS)
    return fit_sklearn_pooled(
        df_train=df_train,
        y_col=y_col,
        x_cols=x_cols,
        algorithm=cfg.algorithm,
        alpha=cfg.alpha,
        l1_ratio=cfg.l1_ratio,
        seed=cfg.seed,
    )


In [None]:
def get_model_summary_text(fit_model: object) -> str:
    """
    Return a readable text summary for different fitted model types.
    """
    # statsmodels (OLS / MixedLM results)
    if hasattr(fit_model, "summary"):
        try:
            s = fit_model.summary()
            # statsmodels summary object
            if hasattr(s, "as_text"):
                return s.as_text()
            return str(s)
        except Exception:
            pass

    # linearmodels (PanelOLS / RandomEffects results)
    # linearmodels results usually have .summary as a Summary instance printable to string
    if hasattr(fit_model, "summary"):
        try:
            return str(fit_model.summary)
        except Exception:
            pass

    # sklearn models (Ridge / ElasticNet / LinearRegression)
    # no built-in summary; log a simple snapshot
    params = {}
    for attr in ["coef_", "intercept_", "alpha", "l1_ratio"]:
        if hasattr(fit_model, attr):
            val = getattr(fit_model, attr)
            if isinstance(val, np.ndarray):
                params[attr] = f"ndarray shape={val.shape}"
            else:
                params[attr] = val

    return "Model summary not available for this model type.\n" + str(params)


In [None]:
# # -------------------------
# # 4D) Simple factory
# # -------------------------
# def make_fit_backend(panel_control: str) -> FitBackend:
#     panel_control = panel_control.strip().lower()
#     if panel_control == "fe":
#         # set time_effects=True if you want week FE as well (often costly but sometimes useful)
#         return FixedEffectsBackend(entity_effects=True, time_effects=False)
#     if panel_control == "mundlak":
#         return MundlakBackend(include_entity_means=True, include_intercept=True)
#     # fallback pooled (or raise)
#     return PooledOLSBackend()

In [None]:
# def make_model(cfg: RunConfig):
#     if cfg.algorithm == "OLS":
#         return LinearRegression()
#     if cfg.algorithm == "Ridge":
#         return Ridge(alpha=cfg.alpha, random_state=cfg.seed)
#     if cfg.algorithm == "ElasticNet":
#         return ElasticNet(alpha=cfg.alpha, l1_ratio=cfg.l1_ratio, random_state=cfg.seed, max_iter=10000)
#     raise ValueError(f"Unknown algorithm: {cfg.algorithm}")


In [None]:
# # =========================
# # 6) Minimal usage demo (fit only; MLflow logging stays in your run_one_experiment)
# # =========================
# def fit_one_trial_no_mlflow(df_pd: pd.DataFrame, cfg: RunConfig) -> FitResult:
#     """
#     Convenience function for testing the FE/Mundlak backends without MLflow.
#     In your pipeline, you'd call make_fit_backend(...) inside run_one_experiment().
#     """
#     train_df, _ = slice_by_time(df_pd, cfg.time_split)
#     y_col = get_target_col(cfg.target_node)
#     x_cols = list(cfg.features)

#     backend = make_fit_backend(cfg.panel_control)
#     return backend.fit(train_df, y_col=y_col, x_cols=x_cols, entity_col="store_id", time_col="week_start")

In [None]:
# def fit_and_get_coef_table(cfg: RunConfig, X_train: pd.DataFrame, y_train: pd.Series):
#     """
#     Returns: model_object, coef_df
#     coef_df columns: feature, coef, sign, abs_coef, rank_abscoef, t_stat, p_value
#     """
#     feats = list(X_train.columns)

#     if cfg.algorithm == "OLS":
#         # statsmodels gives p-values
#         X_sm = sm.add_constant(X_train, has_constant="add")
#         model = sm.OLS(y_train, X_sm).fit()

#         coef = model.params.drop("const", errors="ignore")
#         tstat = model.tvalues.drop("const", errors="ignore")
#         pval = model.pvalues.drop("const", errors="ignore")

#         df = pd.DataFrame({
#             "feature": coef.index,
#             "coef": coef.values,
#             "t_stat": tstat.reindex(coef.index).values,
#             "p_value": pval.reindex(coef.index).values,
#         })

#     else:
#         # sklearn (no p-values)
#         model = make_model(cfg)
#         model.fit(X_train, y_train)

#         coefs = np.asarray(model.coef_).ravel()
#         df = pd.DataFrame({
#             "feature": feats,
#             "coef": coefs,
#             "t_stat": np.nan,
#             "p_value": np.nan,
#         })

#     df["abs_coef"] = df["coef"].abs()
#     df["sign"] = np.sign(df["coef"]).astype(int)
#     df = df.sort_values("abs_coef", ascending=False).reset_index(drop=True)
#     df["rank_abscoef"] = np.arange(1, len(df) + 1)

#     # simple significance flag (customize threshold later)
#     df["is_significant_05"] = (df["p_value"] < 0.05)

#     return model, df


5) Data prep component (you will replace pieces later)

Assumes:

df_pd includes store_id, week_start, plus feature columns

target columns exist, e.g. log_GC, log_AC (or you can compute)

In [12]:
def slice_by_time(df: pd.DataFrame, ts: TimeSplit) -> Tuple[pd.DataFrame, pd.DataFrame]:
    df = df.copy()
    df["week_start"] = pd.to_datetime(df["week_start"])
    train = df[(df["week_start"] >= ts.train_start) & (df["week_start"] <= ts.train_end)]
    test  = df[(df["week_start"] >= ts.test_start)  & (df["week_start"] <= ts.test_end)]
    return train, test

def get_target_col(target_node: str) -> str:
    # you can change this mapping anytime
    if target_node == "GC":
        return "log_GC"
    if target_node == "AC":
        return "log_AC"
    raise ValueError("target_node must be 'GC' or 'AC'")


6) Logging policy component (what to log)

Keep parameters as params; use tags for “indexing / grouping” fields you’ll filter on later.

In [13]:
def log_run_inputs(cfg: RunConfig) -> None:
    # Params (queryable, shown in UI)
    mlflow.log_params({
        "target_node": cfg.target_node,
        "panel_control": cfg.panel_control,
        "algorithm": cfg.algorithm,
        "alpha": cfg.alpha,
        "l1_ratio": cfg.l1_ratio,
        "feature_block_set_id": cfg.feature_block_set_id,
        "n_features": len(cfg.features),
        "time_split_id": cfg.time_split.time_split_id,
        "train_start": cfg.time_split.train_start,
        "train_end": cfg.time_split.train_end,
        "test_start": cfg.time_split.test_start,
        "test_end": cfg.time_split.test_end,
        "seed": cfg.seed,
    })

def log_metrics(prefix: str, y_true, y_pred) -> None:
    rmse = math.sqrt(mean_squared_error(y_true, y_pred))
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    mlflow.log_metrics({
        f"{prefix}_rmse": rmse,
        f"{prefix}_mae": mae,
        f"{prefix}_r2": r2,
    })


7) Fit + log artifacts (coeff table, rank table, config snapshot)

In [14]:
def coef_table(model, feature_names: List[str]) -> pd.DataFrame:
    # Works for linear models that have coef_
    coefs = np.asarray(model.coef_).ravel()
    df = pd.DataFrame({"feature": feature_names, "coef": coefs})
    df["abs_coef"] = df["coef"].abs()
    df["sign"] = np.sign(df["coef"]).astype(int)
    df = df.sort_values("abs_coef", ascending=False).reset_index(drop=True)
    df["rank_abscoef"] = np.arange(1, len(df) + 1)
    return df

def log_dataframe_as_csv(df: pd.DataFrame, artifact_path: str, filename: str) -> None:
    with tempfile.TemporaryDirectory() as tmpdir:
        fpath = os.path.join(tmpdir, filename)
        df.to_csv(fpath, index=False)
        mlflow.log_artifact(fpath, artifact_path=artifact_path)

def log_config_snapshot(cfg: RunConfig) -> None:
    payload = {
        "target_node": cfg.target_node,
        "panel_control": cfg.panel_control,
        "algorithm": cfg.algorithm,
        "alpha": cfg.alpha,
        "l1_ratio": cfg.l1_ratio,
        "feature_block_set_id": cfg.feature_block_set_id,
        "features": list(cfg.features),
        "time_split": cfg.time_split.__dict__,
        "seed": cfg.seed,
    }
    mlflow.log_dict(payload, artifact_file="run_config.json")


8) Single run execution (with failure handling)

In [None]:
# def run_one_experiment(df_pd: pd.DataFrame, cfg: RunConfig) -> None:
#     run_name = f"{cfg.target_node}__{cfg.panel_control}__{cfg.algorithm}__{cfg.feature_block_set_id}__{cfg.time_split.time_split_id}"

#     with mlflow.start_run(run_name=run_name, nested=True):
#         try:
#             log_run_inputs(cfg)
#             log_config_snapshot(cfg)

#             # Data slicing
#             train_df, test_df = slice_by_time(df_pd, cfg.time_split)
#             ycol = get_target_col(cfg.target_node)

#             X_train = train_df.loc[:, list(cfg.features)]
#             y_train = train_df[ycol]
#             X_test  = test_df.loc[:, list(cfg.features)]
#             y_test  = test_df[ycol]

#             # Fit
#             model = make_model(cfg)
#             model.fit(X_train, y_train)

#             # Predict + metrics
#             pred_train = model.predict(X_train)
#             pred_test  = model.predict(X_test)
#             log_metrics("train", y_train, pred_train)
#             log_metrics("test", y_test, pred_test)

#             # Artifacts: coefficients + ranks
#             cdf = coef_table(model, list(cfg.features))
#             log_dataframe_as_csv(cdf, artifact_path="artifacts", filename="coefficients.csv")

#             # Optional: log model (safe to keep optional if volume is huge)
#             # mlflow.sklearn.log_model(model, artifact_path="model")

#         except Exception as e:
#             mlflow.set_tag("run_status", "failed")
#             mlflow.log_text(str(e), "error.txt")
#             raise
#         else:
#             mlflow.set_tag("run_status", "ok")


In [None]:
# print("Active run before:", mlflow.active_run())
# print("Active run entering child:", mlflow.active_run())

Active run before: <ActiveRun: >
Active run entering child: <ActiveRun: >


In [None]:
# def run_one_experiment(df_pd: pd.DataFrame, cfg: RunConfig) -> None:
#     """
#     Execute ONE fully-specified trial and log everything needed for later stability aggregation.

#     Logs (per run):
#       - params: target_node, panel_control, algorithm, hyperparams, feature_set_id, n_features, time split, etc.
#       - metrics: train/test r2/rmse/mae
#       - artifacts: coefficients.csv (coef/sign/rank + p-values for OLS), run_config.json
#       - tags: run_status
#     """
#     run_name = (
#         f"{cfg.target_node}__{cfg.panel_control}__{cfg.algorithm}"
#         f"__{cfg.feature_block_set_id}__{cfg.time_split.time_split_id}"
#     )

#     with mlflow.start_run(run_name=run_name, nested=True):
#         mlflow.set_tag("run_type", "trial")
#         try:
#             # --- 1) Log inputs (params + config snapshot) ---
#             log_run_inputs(cfg)
#             log_config_snapshot(cfg)

#             # --- 2) Slice data ---
#             train_df, test_df = slice_by_time(df_pd, cfg.time_split)
#             ycol = get_target_col(cfg.target_node)

#             # Basic guards (kept simple)
#             if len(cfg.features) == 0:
#                 raise ValueError("cfg.features is empty.")
#             missing_cols = [c for c in cfg.features + (ycol,) if c not in train_df.columns]
#             if missing_cols:
#                 raise ValueError(f"Missing columns in df_pd: {missing_cols}")

#             X_train = train_df.loc[:, list(cfg.features)]
#             y_train = train_df[ycol]
#             X_test = test_df.loc[:, list(cfg.features)]
#             y_test = test_df[ycol]

#             # --- 3) Fit + build coefficients table (with p-values only for OLS) ---
#             model, cdf = fit_and_get_coef_table(cfg, X_train, y_train)

#             # Add run context directly into the coefficient table (helps aggregation later)
#             cdf["target_node"] = cfg.target_node
#             cdf["panel_control"] = cfg.panel_control
#             cdf["algorithm"] = cfg.algorithm
#             cdf["feature_block_set_id"] = cfg.feature_block_set_id
#             cdf["time_split_id"] = cfg.time_split.time_split_id

#             # --- 4) Predict (handle statsmodels vs sklearn) ---
#             if cfg.algorithm == "OLS":
#                 pred_train = model.predict(sm.add_constant(X_train, has_constant="add"))
#                 pred_test = model.predict(sm.add_constant(X_test, has_constant="add"))
#             else:
#                 pred_train = model.predict(X_train)
#                 pred_test = model.predict(X_test)

#             # --- 5) Log metrics ---
#             log_metrics("train", y_train, pred_train)
#             log_metrics("test", y_test, pred_test)

#             # Optional: log a couple simple summary metrics from coefficient table
#             # (e.g. number significant; works for OLS only)
#             if cdf["p_value"].notna().any():
#                 mlflow.log_metric("n_significant_05", float((cdf["p_value"] < 0.05).sum()))
#                 mlflow.log_metric("pct_significant_05", float((cdf["p_value"] < 0.05).mean()))
#             else:
#                 mlflow.log_metric("n_significant_05", np.nan)
#                 mlflow.log_metric("pct_significant_05", np.nan)

#             # For ElasticNet, you may care about selection rate (non-zero)
#             if cfg.algorithm == "ElasticNet":
#                 mlflow.log_metric("n_nonzero_coef", float((cdf["coef"].abs() > 1e-12).sum()))
#                 mlflow.log_metric("pct_nonzero_coef", float((cdf["coef"].abs() > 1e-12).mean()))

#             # --- 6) Log artifacts (system of record for per-feature consolidation) ---
#             log_dataframe_as_csv(cdf, artifact_path="artifacts", filename="coefficients.csv")

#         except Exception as e:
#             mlflow.set_tag("run_status", "failed")
#             mlflow.log_text(str(e), "error.txt")
#             raise
#         else:
#             mlflow.set_tag("run_status", "ok")

In [None]:
import mlflow
import numpy as np

def run_one_experiment(df_pd: pd.DataFrame, cfg: RunConfig) -> None:
    run_name = (
        f"{cfg.target_node}__{cfg.panel_control}__{cfg.algorithm}"
        f"__{cfg.feature_block_set_id}__{cfg.time_split.time_split_id}"
    )

    with mlflow.start_run(run_name=run_name, nested=True):
        mlflow.set_tag("run_type", "trial")
        try:
            # 1) log inputs
            log_run_inputs(cfg)
            log_config_snapshot(cfg)

            # 2) slice data
            train_df, test_df = slice_by_time(df_pd, cfg.time_split)
            ycol = get_target_col(cfg.target_node)

            if len(cfg.features) == 0:
                raise ValueError("cfg.features is empty.")

            needed = list(cfg.features) + [ycol, "store_id", "week_start"]
            missing = [c for c in needed if c not in train_df.columns]
            if missing:
                raise ValueError(f"Missing columns: {missing}")

            x_cols = list(cfg.features)

            # 3) fit using backend selector (FE/Mundlak/pooled)
            fit_res = fit_with_backend(train_df, ycol, x_cols, cfg)

            # 4) predictions + metrics
            pred_train = fit_res.predict_fn(train_df)
            pred_test  = fit_res.predict_fn(test_df)

            y_train = train_df[ycol].astype(float).to_numpy()
            y_test  = test_df[ycol].astype(float).to_numpy()

            log_metrics("train", y_train, pred_train)
            log_metrics("test", y_test, pred_test)

            # 5) add run context to coef table + log artifact
            cdf = fit_res.coef_df.copy()
            cdf["target_node"] = cfg.target_node
            cdf["panel_control"] = cfg.panel_control
            cdf["algorithm"] = cfg.algorithm
            cdf["feature_block_set_id"] = cfg.feature_block_set_id
            cdf["time_split_id"] = cfg.time_split.time_split_id

            # optional summary metrics
            if cdf["p_value"].notna().any():
                mlflow.log_metric("n_significant_05", float((cdf["p_value"] < 0.05).sum()))
                mlflow.log_metric("pct_significant_05", float((cdf["p_value"] < 0.05).mean()))
            else:
                mlflow.log_metric("n_significant_05", np.nan)
                mlflow.log_metric("pct_significant_05", np.nan)

            if cfg.algorithm == "ElasticNet":
                mlflow.log_metric("n_nonzero_coef", float((cdf["coef"].abs() > 1e-12).sum()))
                mlflow.log_metric("pct_nonzero_coef", float((cdf["coef"].abs() > 1e-12).mean()))

            log_dataframe_as_csv(cdf, artifact_path="artifacts", filename="coefficients.csv")
            # NEW: log summary for all models
            summary_text = get_model_summary_text(fit_res.model)
            mlflow.log_text(summary_text, "artifacts/model_summary.txt")

        except Exception as e:
            mlflow.set_tag("run_status", "failed")
            mlflow.log_text(str(e), "error.txt")
            raise
        else:
            mlflow.set_tag("run_status", "ok")


Orchestrator (parent run + many child runs)

This matches the MLflow tutorial pattern (parent run contains the “study”, child runs contain each trial).

In [None]:
# def run_study(
#     df_pd: pd.DataFrame,
#     experiment_name: str,
#     study_name: str,
#     common_tags: Dict[str, str],
#     configs: Iterator[RunConfig],
#     max_runs: int = None,
# ) -> str:
#     setup_mlflow(experiment_name, common_tags)

#     with mlflow.start_run(run_name=study_name) as parent:
#         mlflow.set_tag("run_type", "study")
#         mlflow.log_param("study_name", study_name)

#         n = 0
#         for cfg in configs:
#             if max_runs is not None and n >= max_runs:
#                 break
#             run_one_experiment(df_pd, cfg)
#             n += 1

#         mlflow.log_param("n_child_runs", n)
#         return parent.info.run_id


In [None]:
# def run_study(
#     df_pd: pd.DataFrame,
#     experiment_name: str,
#     study_name: str,
#     common_tags: Dict[str, str],
#     configs: Iterator[RunConfig],
#     max_runs: int = None,
# ) -> str:
#     # extra safety for notebooks: close anything dangling
#     while mlflow.active_run() is not None:
#         mlflow.end_run()

#     setup_mlflow(experiment_name)

#     # start parent run FIRST, then set tags
#     with mlflow.start_run(run_name=study_name) as parent:
#         mlflow.set_tags(common_tags)          # ✅ now safe
#         mlflow.set_tag("run_type", "study")
#         mlflow.log_param("study_name", study_name)

#         n = 0
#         for cfg in configs:
#             if max_runs is not None and n >= max_runs:
#                 break
#             run_one_experiment(df_pd, cfg)     # child run MUST be nested=True
#             n += 1

#         mlflow.log_param("n_child_runs", n)
#         return parent.info.run_id


In [21]:
import json, hashlib
import mlflow
import pandas as pd
from typing import Dict, Iterator, Optional

def _hash_dict(d: Dict) -> str:
    s = json.dumps(d, sort_keys=True)
    return hashlib.sha256(s.encode("utf-8")).hexdigest()[:12]

def run_study(
    df_pd: pd.DataFrame,
    experiment_name: str,
    study_name: str,
    common_tags: Dict[str, str],
    configs: Iterator[RunConfig],
    fe_meta: Optional[Dict] = None,
    max_runs: int = None,
) -> str:
    while mlflow.active_run() is not None:
        mlflow.end_run()

    setup_mlflow(experiment_name)

    with mlflow.start_run(run_name=study_name) as parent:
        mlflow.set_tags(common_tags)
        mlflow.set_tag("run_type", "study")
        mlflow.log_param("study_name", study_name)

        fe_meta_id = None
        if fe_meta is not None:
            fe_meta_id = _hash_dict(fe_meta)
            mlflow.log_param("fe_meta_id", fe_meta_id)
            mlflow.log_dict(fe_meta, "feature_engineering/log_transforms.json")

        n = 0
        for cfg in configs:
            if max_runs is not None and n >= max_runs:
                break
            # # pass the id into the child if you want (or store it globally)
            # run_one_experiment(df_pd, cfg, fe_meta_id=fe_meta_id)
            n += 1

        mlflow.log_param("n_child_runs", n)
        return parent.info.run_id


10) Retrieve results + feature stability summary (simple version)

This is the “later I’ll decide which features to keep” part — implemented minimally using MLflow search + the logged coefficient artifacts.

In [17]:
def load_coefficients_for_run(run_id: str) -> pd.DataFrame:
    # Download the artifact and read it
    local_dir = mlflow.artifacts.download_artifacts(run_id=run_id, artifact_path="artifacts/coefficients.csv")
    return pd.read_csv(local_dir)

def aggregate_feature_stability(
    experiment_name: str,
    filter_query: str = "tags.run_status = 'ok'",
) -> pd.DataFrame:
    exp = mlflow.get_experiment_by_name(experiment_name)
    runs = mlflow.search_runs(experiment_ids=[exp.experiment_id], filter_string=filter_query)

    rows = []
    for _, r in runs.iterrows():
        run_id = r["run_id"]
        try:
            cdf = load_coefficients_for_run(run_id)
            cdf["run_id"] = run_id
            rows.append(cdf[["run_id", "feature", "coef", "sign", "abs_coef", "rank_abscoef"]])
        except Exception:
            # If some run didn't log artifacts, skip (or handle stricter)
            continue

    if not rows:
        return pd.DataFrame()

    allc = pd.concat(rows, ignore_index=True)

    # Stability stats you described (simple baseline)
    g = allc.groupby("feature")
    out = pd.DataFrame({
        "n_runs_appeared": g["run_id"].nunique(),
        "mean_coef": g["coef"].mean(),
        "median_coef": g["coef"].median(),
        "std_coef": g["coef"].std(ddof=1),
        "mean_abscoef": g["abs_coef"].mean(),
        "mean_rank": g["rank_abscoef"].mean(),
        "pct_positive": g["sign"].apply(lambda s: (s > 0).mean()),
        "pct_negative": g["sign"].apply(lambda s: (s < 0).mean()),
    }).reset_index()

    out["coef_cv"] = out["std_coef"] / out["mean_coef"].replace(0, np.nan)
    out = out.sort_values(["n_runs_appeared", "mean_abscoef"], ascending=[False, False]).reset_index(drop=True)
    return out


11) Example usage (plug in your df_pd)

In [16]:
df

Unnamed: 0,week_start,log_GC,digital_promo_1,digital_promo_2,digital_promo_3,digital_promo_4,digital_promo_5,media_1,media_2,media_3,media_4,media_5
0,2023-01-01,0.304717,-1.376686,0.232170,0.459386,1.403821,0.319400,0.044212,0.529413,-0.955625,0.417472,-1.180001
1,2023-01-08,-1.039984,0.635151,-0.555327,0.701954,-0.442536,-0.869047,-0.202914,1.363429,0.437512,-1.320489,0.804570
2,2023-01-15,0.750451,-0.222223,0.471539,0.138241,1.455046,0.177396,-1.082427,-1.880798,-1.241756,0.854686,-0.675114
3,2023-01-22,0.940565,-1.470806,1.012716,0.760133,0.131486,1.212519,-0.151052,-0.317907,-0.204069,-0.800212,0.403954
4,2023-01-29,-1.951035,-1.015579,0.155429,0.229211,0.258229,-0.323792,-0.746098,-0.867005,0.109648,0.632858,0.565460
...,...,...,...,...,...,...,...,...,...,...,...,...
126,2025-06-01,1.463303,-0.376156,0.276274,1.628937,1.847825,-0.079730,0.555582,0.101926,-0.084851,-2.480709,-0.439988
127,2025-06-08,-1.188763,-0.133823,-1.412766,-0.970150,-0.174173,1.797561,-0.622168,-0.762323,-1.600206,-0.996419,-2.955619
128,2025-06-15,-0.639752,-1.374896,-2.310103,-0.887696,1.667888,0.894213,0.987405,-0.859206,-0.761974,1.232902,-1.247317
129,2025-06-22,-0.926576,-0.238174,0.054354,1.335784,-1.103741,0.011445,1.157508,-0.537663,0.148627,-2.777994,1.120841


In [40]:
# import mlflow
# while mlflow.active_run() is not None:
#     mlflow.end_run()
# print("Active run now:", mlflow.active_run())


local testing

In [18]:
feature_blocks = {
    "promo": ['digital_promo_1','digital_promo_2','digital_promo_3','digital_promo_4','digital_promo_5'],
    "media": ['media_1','media_2','media_3','media_4','media_5'],
}

k_per_block = {"promo": 3, "media": 3}

time_splits = [
    TimeSplit("ts1", "2023-01-01", "2024-06-30", "2024-07-01", "2024-12-31"),
]

algorithms = [
    {"name": "OLS"},   # simplest to start (and gives p-values if you used statsmodels in OLS)
]

# feature_sets = generate_feature_sets(feature_blocks)

feature_sets = sample_k_per_block(
    feature_blocks=feature_blocks,
    k_per_block=k_per_block,
    n_samples=200,     # you control how many combinations
    seed=42,
)

# configs_gc = search_space(
#     target_node="GC",
#     panel_controls=["FE"],     # keep one for smoke test
#     algorithms=algorithms,
#     time_splits=time_splits,
#     feature_sets={"block__promo": feature_sets["block__promo"]},  # one feature set only
#     seed=42
# )

configs_gc = search_space_with_feature_sets(
    target_node="GC",
    panel_controls=["FE", "Mundlak"],
    algorithms=[{"name":"OLS"}, {"name":"Ridge","alpha":1.0}, {"name":"ElasticNet","alpha":0.1,"l1_ratio":0.5}],
    time_splits=time_splits,
    feature_sets=feature_sets,
    seed=42,
)


In [33]:
print("Active run before:", mlflow.active_run())

parent_run_id = run_study(
    df_pd=df,
    experiment_name="RGA_Regression_Local",
    study_name="GC_smoketest_v001",
    common_tags={
        "project": "RevenueGrowthAnalytics",
        "layer": "2",
        "framework": "regression_shell",
        "env": "local",
    },
    configs=configs_gc,
    # fe_meta = None,
    max_runs=2,   # <-- smallest smoke test
)

stability = aggregate_feature_stability(
    experiment_name="RGA_Regression_Local",
    filter_query="tags.run_status = 'ok' and params.target_node = 'GC'"
)

print(stability.head(30))


Active run before: None
           feature  n_runs_appeared  mean_coef  median_coef  std_coef  \
0  digital_promo_1                2  -0.184221    -0.184221       0.0   
1  digital_promo_5                2  -0.107128    -0.107128       0.0   
2  digital_promo_4                2  -0.052290    -0.052290       0.0   
3  digital_promo_3                2  -0.023930    -0.023930       0.0   
4  digital_promo_2                2   0.011568     0.011568       0.0   

   mean_abscoef  mean_rank  pct_positive  pct_negative  coef_cv  
0      0.184221        1.0           0.0           1.0     -0.0  
1      0.107128        2.0           0.0           1.0     -0.0  
2      0.052290        3.0           0.0           1.0     -0.0  
3      0.023930        4.0           0.0           1.0     -0.0  
4      0.011568        5.0           1.0           0.0      0.0  


In [None]:
# aggregate_feature_stability() slice_by_time(), fit_and_get_coef_table(), and generate_feature_sets()

complex version

In [None]:
# Example component inputs (you will replace these)
# feature_blocks = {
#     "promo": ["promo_depth", "lto_flag", "discount_idx", "bundle_idx", "coupon_rate"],
#     "media": ["tv_grps", "digital_imps", "search_spend", "social_spend", "ooh_spend"],
# }

feature_blocks = {
    "promo": ['digital_promo_1', 'digital_promo_2', 'digital_promo_3',
       'digital_promo_4', 'digital_promo_5'],
    "media": ['media_1', 'media_2', 'media_3', 'media_4',
       'media_5'],
}

time_splits = [
    TimeSplit("ts1", "2023-01-01", "2024-06-30", "2024-07-01", "2024-12-31"),
    TimeSplit("ts2", "2023-07-01", "2024-12-31", "2025-01-01", "2025-06-30"),
]

algorithms = [
    {"name": "OLS"},
    {"name": "Ridge", "alpha": 1.0},
    {"name": "ElasticNet", "alpha": 0.1, "l1_ratio": 0.5},
]

feature_sets = generate_feature_sets(feature_blocks)

configs_gc = search_space(
    target_node="GC",
    panel_controls=["FE", "Mundlak"],      # Bayesian later
    algorithms=algorithms,
    time_splits=time_splits,
    feature_sets=feature_sets,
    seed=42
)

parent_run_id = run_study(
    df_pd=df,
    # experiment_name="/Users/your.name@company.com/RGA_Regression",
    experiment_name="RGA_Regression_Local",
    study_name="GC_study_v001",
    common_tags={
        "project": "RevenueGrowthAnalytics",
        "layer": "2",
        "framework": "regression_shell",
    },
    configs=configs_gc,
    max_runs=50,   # remove later
)

# Aggregate stability
stability = aggregate_feature_stability(
    experiment_name="/Users/your.name@company.com/RGA_Regression",
    filter_query="tags.run_status = 'ok' and params.target_node = 'GC'"
)

display(stability.head(30))


Quick fast testing run

In [52]:
from typing import Dict, Any, List, Optional
import numpy as np
import pandas as pd
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error

def evaluate_predictions(y_true, y_pred) -> Dict[str, float]:
    y_true = np.asarray(y_true, dtype=float)
    y_pred = np.asarray(y_pred, dtype=float)
    return {
        "r2": float(r2_score(y_true, y_pred)),
        "rmse": float(np.sqrt(mean_squared_error(y_true, y_pred))),
        "mae": float(mean_absolute_error(y_true, y_pred)),
    }

def run_trial_quick(
    df_pd: pd.DataFrame,
    cfg: RunConfig,
    features: List[str],
    time_split: Optional[TimeSplit] = None,
) -> Dict[str, Any]:
    """
    No MLflow. Uses the same backends and returns outputs for interpretation.
    """
    ts = time_split or cfg.time_split
    train_df, test_df = slice_by_time(df_pd, ts)
    ycol = get_target_col(cfg.target_node)

    fit_res = fit_with_backend(train_df, ycol, features, cfg)

    y_train = train_df[ycol].astype(float).to_numpy()
    y_test  = test_df[ycol].astype(float).to_numpy()
    pred_train = fit_res.predict_fn(train_df)
    pred_test  = fit_res.predict_fn(test_df)

    out = {
        "run_name": f"{cfg.target_node}__{cfg.panel_control}__{cfg.algorithm}",
        "cfg": cfg,
        "features": features,
        "metrics_train": evaluate_predictions(y_train, pred_train),
        "metrics_test": evaluate_predictions(y_test, pred_test),
        "coef_df": fit_res.coef_df.copy(),
    }
    return out


In [59]:
result = run_trial_quick(
    df_pd=df,
    cfg=RunConfig(target_node="GC", panel_control="Mundlak", algorithm="OLS", time_split=time_splits[0]),
    features=["log_media_1", "log_media_2", "digital_promo_1", "digital_promo_2"],
)

# result["metrics_test"]
display(result["metrics_test"])
display(result["coef_df"].head(20))


{'r2': -0.0023760852386276454,
 'rmse': 0.8521974064595924,
 'mae': 0.6560787768008266}

Unnamed: 0,feature,coef,t_stat,p_value,abs_coef,sign,rank_abscoef,is_significant_05
0,const,6.496353,1.44003,0.15026,6.496353,1,1,False
1,digital_promo_1__mean_by_entity,-1.370871,-0.673746,0.500672,1.370871,-1,2,False
2,digital_promo_2__mean_by_entity,1.19504,1.975489,0.048564,1.19504,1,3,True
3,log_media_1__mean_by_entity,0.106466,0.680856,0.496165,0.106466,1,4,False
4,digital_promo_2,-0.081768,-0.640369,0.522121,0.081768,-1,5,False
5,log_media_2__mean_by_entity,0.078861,0.207154,0.835943,0.078861,1,6,False
6,log_media_2,-0.039498,-1.161537,0.245779,0.039498,-1,7,False
7,log_media_1,0.031741,0.716252,0.47405,0.031741,1,8,False
8,digital_promo_1,-0.011046,-0.108142,0.913911,0.011046,-1,9,False


In [50]:
def run_many_quick(
    df_pd: pd.DataFrame,
    base_cfg: RunConfig,
    feature_sets: Dict[str, List[str]],
) -> pd.DataFrame:
    rows = []
    coef_tables = {}

    for name, feats in feature_sets.items():
        res = run_trial_quick(df_pd, base_cfg, feats)
        rows.append({
            "spec_name": name,
            "n_features": len(feats),
            **{f"train_{k}": v for k, v in res["metrics_train"].items()},
            **{f"test_{k}": v for k, v in res["metrics_test"].items()},
        })
        coef_tables[name] = res["coef_df"]

    summary = pd.DataFrame(rows).sort_values("test_rmse").reset_index(drop=True)
    return summary, coef_tables


In [51]:
base_cfg = RunConfig(target_node="GC", panel_control="Mundlak", algorithm="OLS", time_split=time_splits[0])

feature_sets = {
    "promo_only": ["digital_promo_1", "digital_promo_2", "digital_promo_3"],
    "media_only": ["log_media_1", "log_media_2", "log_media_3"],
    "promo_plus_media": ["digital_promo_1", "digital_promo_2", "log_media_1", "log_media_2"],
}

summary, coef_tables = run_many_quick(df, base_cfg, feature_sets)
display(summary)
display(coef_tables["promo_plus_media"].head(15))


Unnamed: 0,spec_name,n_features,train_r2,train_rmse,train_mae,test_r2,test_rmse,test_mae
0,media_only,3,0.004644,0.871425,0.689732,-0.002294,0.852162,0.656405
1,promo_plus_media,4,0.005929,0.870863,0.688108,-0.002376,0.852197,0.656079
2,promo_only,3,0.002187,0.8725,0.69048,-0.004426,0.853068,0.656416


Unnamed: 0,feature,coef,t_stat,p_value,abs_coef,sign,rank_abscoef,is_significant_05
0,const,6.496353,1.44003,0.15026,6.496353,1,1,False
1,digital_promo_1__mean_by_entity,-1.370871,-0.673746,0.500672,1.370871,-1,2,False
2,digital_promo_2__mean_by_entity,1.19504,1.975489,0.048564,1.19504,1,3,True
3,log_media_1__mean_by_entity,0.106466,0.680856,0.496165,0.106466,1,4,False
4,digital_promo_2,-0.081768,-0.640369,0.522121,0.081768,-1,5,False
5,log_media_2__mean_by_entity,0.078861,0.207154,0.835943,0.078861,1,6,False
6,log_media_2,-0.039498,-1.161537,0.245779,0.039498,-1,7,False
7,log_media_1,0.031741,0.716252,0.47405,0.031741,1,8,False
8,digital_promo_1,-0.011046,-0.108142,0.913911,0.011046,-1,9,False


In [26]:
# import mlflow
# from mlflow.tracking import MlflowClient

# print("Tracking URI:", mlflow.get_tracking_uri())
# print("Active run:", mlflow.active_run())

# client = MlflowClient()
# exps = client.search_experiments()
# print("Experiments found:", [e.name for e in exps][:10])

# print(TRACK_DIR)

# import os, mlflow
# TRACK_DIR = os.path.abspath("./mlruns_rga_test")
# mlflow.set_tracking_uri("file:///" + TRACK_DIR.replace("\\", "/"))
# print("Tracking URI set to:", mlflow.get_tracking_uri())

In [27]:
# from mlflow.tracking import MlflowClient
# client = MlflowClient()

# exp = client.get_experiment_by_name("RGA_Regression_Local")
# print("Experiment:", exp)

# runs = client.search_runs([exp.experiment_id], max_results=5)
# print("Found runs:", len(runs))
# print([r.data.tags.get("mlflow.runName") for r in runs])


In [28]:
# print(mlflow.get_tracking_uri())

In [29]:
from mlflow.tracking import MlflowClient
client = MlflowClient()

run = client.get_run("15a6649b2b244ec4b624b0d7bfc0afc2")
print(run.data.metrics)


{}


In [30]:
X_train.shape[0] == 0
X_test.shape[0] == 0


NameError: name 'X_train' is not defined

Final model

In [None]:
from dataclasses import dataclass
from typing import List, Dict, Any
import mlflow
import statsmodels.api as sm

@dataclass(frozen=True)
class FinalSpec:
    name: str                  # e.g., "GC_final_v001"
    target_node: str           # "GC" / "AC"
    panel_control: str         # "FE" / "Mundlak" / "Bayesian"
    algorithm: str             # "OLS" / "Ridge" / "ElasticNet"
    alpha: float = 0.0
    l1_ratio: float = 0.0
    features: List[str] = None
    train_start: str = None
    train_end: str = None

def train_final_and_log_model(df_pd, spec: FinalSpec, experiment_name: str, tags: Dict[str,str]):
    mlflow.set_experiment(experiment_name)

    run_name = spec.name
    with mlflow.start_run(run_name=run_name):
        # Tag this as a promoted/final run
        mlflow.set_tags({**tags, "run_stage": "final", "is_champion": "true"})

        # Params (full snapshot)
        mlflow.log_params({
            "target_node": spec.target_node,
            "panel_control": spec.panel_control,
            "algorithm": spec.algorithm,
            "alpha": spec.alpha,
            "l1_ratio": spec.l1_ratio,
            "n_features": len(spec.features),
            "train_start": spec.train_start,
            "train_end": spec.train_end,
        })

        # Train slice
        df = df_pd.copy()
        df["week_start"] = pd.to_datetime(df["week_start"])
        train_df = df[(df["week_start"] >= spec.train_start) & (df["week_start"] <= spec.train_end)]

        ycol = "log_GC" if spec.target_node == "GC" else "log_AC"
        X_train = train_df[spec.features]
        y_train = train_df[ycol]

        # Fit + coeff artifact (reuse your function)
        cfg = RunConfig(
            target_node=spec.target_node,
            panel_control=spec.panel_control,
            algorithm=spec.algorithm,
            alpha=spec.alpha,
            l1_ratio=spec.l1_ratio,
            feature_block_set_id="FINAL",
            features=tuple(spec.features),
            time_split=TimeSplit("FINAL", spec.train_start, spec.train_end, spec.train_start, spec.train_end),
            seed=42,
        )

        model, cdf = fit_and_get_coef_table(cfg, X_train, y_train)
        log_dataframe_as_csv(cdf, artifact_path="artifacts", filename="coefficients.csv")

        # Log model artifact (ONLY for final)
        if spec.algorithm == "OLS":
            # statsmodels model
            mlflow.statsmodels.log_model(model, artifact_path="model")
        else:
            # sklearn model
            import mlflow.sklearn
            mlflow.sklearn.log_model(model, artifact_path="model")

        # Optional: store the final feature list
        mlflow.log_text("\n".join(spec.features), "final_features.txt")

        return mlflow.active_run().info.run_id


In [None]:
final_gc = FinalSpec(
    name="GC_final_v001",
    target_node="GC",
    panel_control="Mundlak",
    algorithm="Ridge",
    alpha=1.0,
    features=[...],                 # your chosen stable features
    train_start="2023-01-01",
    train_end="2025-06-30",
)

run_id = train_final_and_log_model(
    df_pd=df,
    spec=final_gc,
    experiment_name="RGA_Regression_Local",
    tags={"project":"RevenueGrowthAnalytics", "layer":"2"}
)
print("Final model run_id:", run_id)


In [32]:
runs = client.search_runs(
    experiment_ids=[exp.experiment_id],
    filter_string="tags.run_status = 'ok'",
    order_by=["attributes.start_time DESC"],
    max_results=5
)

for r in runs:
    print(r.info.run_id, r.data.tags.get("mlflow.runName"))


c1195de73a404f6a9e5befacc0862eb4 GC__FE__OLS__block__promo__ts1


In [48]:
run = client.get_run("41894288e1974559b68c3337fb586335")
print(run.data.metrics)


{}


In [46]:
from mlflow.tracking import MlflowClient
client = MlflowClient()

exp = client.get_experiment_by_name("RGA_Regression_Local")

runs = client.search_runs(
    experiment_ids=[exp.experiment_id],
    filter_string="tags.run_status = 'ok' and tags.run_type != 'study'",
    order_by=["attributes.start_time DESC"],
    max_results=20,
)

for r in runs:
    print(r.info.run_id, r.data.tags.get("mlflow.runName"), r.data.metrics.get("test_rmse"))


8510b0a2bd9b431291963245548f5017 GC__FE__OLS__block__promo__ts1 0.7691974572947565
