# Causal Forecasting with Databricks

This notebook demonstrates Double Machine Learning (DML) forecasting with LightGBM on Databricks.

## Features

- Databricks connection and authentication
- Double Machine Learning for causal forecasting
- LightGBM-based nuisance and effect models
- Synthetic data generation for testing
- On-policy and off-policy predictions

In [5]:
"""
Double Machine Learning Forecaster with LightGBM
================================================

Implements a simplified version of the DML forecaster from
"Causal Forecasting for Pricing" (Schultz et al., 2024),
but using LightGBM instead of transformer models.

Key ideas (matching the paper):
- Nuisance models:
    * Outcome model:   q_t  ≈ E[q_t | Z_t]
    * Treatment model: d_t  ≈ E[d_t | Z_t]
  where Z_t are covariates / confounders (no current d_t).
- Effect model:
    * Learns E[q_t - q_hat_t | d_t - d_hat_t, Z_t]
- Final prediction for a desired discount d*:
    q*(d*) = q_hat(Z) + effect_model(d* - d_hat(Z), Z)

We treat this as a one-step-ahead forecasting problem.
"""

import numpy as np
import pandas as pd

from lightgbm import LGBMRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error


# -------------------------------------------------------------------
# 1. Synthetic dataset generator (pricing + demand + confounding)
# -------------------------------------------------------------------

def generate_synthetic_pricing_data(
    n_items: int = 200,
    n_weeks: int = 60,
    seed: int = 0,
) -> pd.DataFrame:
    """
    Generate synthetic panel data {i, t} with:
      - Time-varying base demand (trend + seasonality)
      - Constant item-level price elasticity
      - Confounded pricing policy: discounts depend on latent demand

    Columns returned:
      item_id, week, demand, discount, base_price, category, season_type,
      price, lag_demand, lag_discount, week_sin, week_cos, elasticity_true
    """
    rng = np.random.default_rng(seed)
    rows = []

    for i in range(n_items):
        item_id = i
        base_price = rng.uniform(20, 100)        # "black price"
        category = rng.integers(0, 5)            # 5 categories
        season_type = rng.integers(0, 4)         # 4 season types
        gamma = rng.uniform(-0.05, 0.05)         # trend slope
        amp = rng.uniform(0.5, 1.5)              # seasonal amplitude
        phase = rng.uniform(0, 2 * np.pi)        # seasonal phase
        elasticity = rng.uniform(-3.0, -0.5)     # constant, negative

        base_level = rng.uniform(20, 80)         # base scale

        last_q = base_level                      # for lag feature
        last_d = 0.0                             # for lag feature

        for t in range(n_weeks):
            # deterministic trend + seasonality for base demand
            season = amp * np.sin(2 * np.pi * (t / 30.0) + phase)
            trend = gamma * t
            base_demand = np.exp(np.log(base_level) + trend) * (1 + 0.3 * season)

            # Pricing policy (confounding):
            # - when base demand is low, discount more.
            norm_bd = (base_demand - base_level) / (0.5 * base_level + 1e-6)
            disc_mean = np.clip(0.3 - 0.15 * norm_bd, 0.0, 0.6)
            discount = np.clip(disc_mean + rng.normal(0, 0.05), 0.0, 0.6)

            price = base_price * (1.0 - discount)

            # Constant elasticity demand:
            # q_t = base_demand_t * (p_t / base_price)^{elasticity}
            expected_q = base_demand * ((price / base_price) ** elasticity)
            expected_q = max(expected_q, 1e-3)

            # Poisson noise around expected demand
            demand = rng.poisson(expected_q)

            rows.append(
                {
                    "item_id": item_id,
                    "week": t,
                    "demand": float(demand),
                    "discount": float(discount),
                    "base_price": base_price,
                    "category": category,
                    "season_type": season_type,
                    "elasticity_true": elasticity,
                    "base_demand": base_demand,
                    "price": price,
                    "lag_demand": last_q,
                    "lag_discount": last_d,
                }
            )

            last_q = demand
            last_d = discount

    df = pd.DataFrame(rows)

    # Add periodic time features (like in the paper’s time embeddings)
    df["week_sin"] = np.sin(2 * np.pi * df["week"] / 30.0)
    df["week_cos"] = np.cos(2 * np.pi * df["week"] / 30.0)

    return df


# -------------------------------------------------------------------
# 2. DML Forecaster using LightGBM
# -------------------------------------------------------------------

class DMLForecasterLGBM:
    """
    Double Machine Learning forecaster with LightGBM:

    Stage 1 (nuisance models, trained via cross-fitting):
        outcome_model:   y  ~ Z    (predict demand without using current discount)
        treatment_model: d  ~ Z    (predict discount from Z)

    Stage 2 (effect model):
        Residuals:   y_resid = y - y_hat
                     d_resid = d - d_hat
        effect_model: y_resid ~ [d_resid, Z]

    Prediction for arbitrary discount scenario d*:
        1. Compute y_hat(Z), d_hat(Z) using nuisance models
        2. d_resid* = d* - d_hat(Z)
        3. delta_y  = effect_model.predict([d_resid*, Z])
        4. y_pred   = y_hat(Z) + delta_y

    This mimics Eq. (3) and the orthogonalization idea in the paper,
    but uses flexible LightGBM models instead of transformers.
    """

    def __init__(
        self,
        outcome_params=None,
        treatment_params=None,
        effect_params=None,
        n_folds: int = 2,
        random_state: int = 0,
    ):
        self.outcome_params = outcome_params or {
            "n_estimators": 200,
            "learning_rate": 0.05,
            "max_depth": -1,
        }
        self.treatment_params = treatment_params or {
            "n_estimators": 200,
            "learning_rate": 0.05,
            "max_depth": -1,
        }
        self.effect_params = effect_params or {
            "n_estimators": 200,
            "learning_rate": 0.05,
            "max_depth": -1,
        }
        self.n_folds = n_folds
        self.random_state = random_state

        self.outcome_models_ = []
        self.treatment_models_ = []
        self.effect_model_ = None

        self.feature_cols_ = None
        self.treatment_col_ = None
        self.outcome_col_ = None
        self.item_col_ = None

    # ------------ helpers ------------

    def _build_folds(self, df: pd.DataFrame):
        """
        Build folds for cross-fitting. If an item_id column is given,
        we split by items (as in the paper: even/odd articles).
        """
        rng = np.random.default_rng(self.random_state)

        if self.item_col_ is not None:
            items = df[self.item_col_].unique()
            rng.shuffle(items)
            folds_items = np.array_split(items, self.n_folds)
            index_folds = []
            for items_fold in folds_items:
                mask = df[self.item_col_].isin(items_fold).values
                index_folds.append(np.where(mask)[0])
            return index_folds
        else:
            n = len(df)
            indices = np.arange(n)
            rng.shuffle(indices)
            return np.array_split(indices, self.n_folds)

    # ------------ API ------------

    def fit(
        self,
        df: pd.DataFrame,
        feature_cols,
        treatment_col: str,
        outcome_col: str,
        item_col: str | None = None,
    ):
        """
        Fit the DML forecaster.

        Parameters
        ----------
        df : pd.DataFrame
            Training data.
        feature_cols : list of str
            Names of covariate columns Z (no current treatment).
        treatment_col : str
            Name of treatment column d (discount).
        outcome_col : str
            Name of outcome column q (demand).
        item_col : str, optional
            Column identifying items; used for panel-wise cross-fitting.
        """
        self.feature_cols_ = list(feature_cols)
        self.treatment_col_ = treatment_col
        self.outcome_col_ = outcome_col
        self.item_col_ = item_col

        X = df[self.feature_cols_].values
        d = df[treatment_col].values.astype(float)
        y = df[outcome_col].values.astype(float)
        n = len(df)

        # Out-of-fold nuisance predictions
        y_hat = np.zeros(n)
        d_hat = np.zeros(n)

        folds = self._build_folds(df)

        for val_idx in folds:
            train_idx = np.setdiff1d(np.arange(n), val_idx)

            X_train, X_val = X[train_idx], X[val_idx]
            y_train, y_val = y[train_idx], y[val_idx]
            d_train, d_val = d[train_idx], d[val_idx]

            outcome_model = LGBMRegressor(
                random_state=self.random_state, **self.outcome_params
            )
            treatment_model = LGBMRegressor(
                random_state=self.random_state, **self.treatment_params
            )

            # Outcome nuisance: E[q | Z]
            outcome_model.fit(X_train, y_train)
            # Treatment nuisance: E[d | Z]
            treatment_model.fit(X_train, d_train)

            y_hat[val_idx] = outcome_model.predict(X_val)
            d_hat[val_idx] = treatment_model.predict(X_val)

            self.outcome_models_.append(outcome_model)
            self.treatment_models_.append(treatment_model)

        # Residuals for the effect model
        y_resid = y - y_hat
        d_resid = d - d_hat
        effect_features = np.column_stack([d_resid.reshape(-1, 1), X])

        self.effect_model_ = LGBMRegressor(
            random_state=self.random_state, **self.effect_params
        )
        self.effect_model_.fit(effect_features, y_resid)

        return self

    def _predict_nuisances(self, df: pd.DataFrame):
        """Average over nuisance models for inference (as in an ensemble)."""
        X = df[self.feature_cols_].values
        y_hats = []
        d_hats = []
        for outcome_model, treatment_model in zip(
            self.outcome_models_, self.treatment_models_
        ):
            y_hats.append(outcome_model.predict(X))
            d_hats.append(treatment_model.predict(X))
        y_hat = np.mean(np.stack(y_hats, axis=0), axis=0)
        d_hat = np.mean(np.stack(d_hats, axis=0), axis=0)
        return y_hat, d_hat

    def predict(self, df: pd.DataFrame, discount_scenario=None) -> np.ndarray:
        """
        Predict demand under a given discount scenario.

        Parameters
        ----------
        df : pd.DataFrame
            Data containing covariates Z (and possibly current discount).
        discount_scenario :
            - None: use df[treatment_col_] (on-policy predictions)
            - float: use this fixed discount for all rows
            - array-like (len(df)): per-row discounts

        Returns
        -------
        np.ndarray
            Predicted demand.
        """
        if self.effect_model_ is None:
            raise RuntimeError("Call fit() before predict().")

        # Nuisance predictions
        y_hat, d_hat = self._predict_nuisances(df)

        # Decide which discount to use
        if discount_scenario is None:
            d_new = df[self.treatment_col_].values.astype(float)
        else:
            if np.isscalar(discount_scenario):
                d_new = np.full(len(df), float(discount_scenario), dtype=float)
            else:
                d_new = np.asarray(discount_scenario, dtype=float)
                if d_new.shape[0] != len(df):
                    raise ValueError("discount_scenario has wrong length.")

        d_resid_new = d_new - d_hat
        X = df[self.feature_cols_].values
        effect_features_new = np.column_stack([d_resid_new.reshape(-1, 1), X])

        delta_y = self.effect_model_.predict(effect_features_new)
        y_pred = y_hat + delta_y
        return y_pred


# -------------------------------------------------------------------
# 3. Example usage
# -------------------------------------------------------------------

if __name__ == "__main__":
    # 3.1 Generate synthetic data
    df = generate_synthetic_pricing_data(
        n_items=200,
        n_weeks=60,
        seed=42,
    )

    # Train / test split along time (like in the paper)
    train_df = df[df["week"] < 40].copy()
    test_df = df[df["week"] >= 40].copy()

    # Covariate set Z_t (no current discount; we *do* include lag_discount)
    feature_cols = [
        "week",
        "base_price",
        "category",
        "season_type",
        "lag_demand",
        "lag_discount",
        "week_sin",
        "week_cos",
    ]

    # 3.2 Instantiate and train the DML forecaster
    forecaster = DMLForecasterLGBM(
        outcome_params={"n_estimators": 200, "learning_rate": 0.05, "max_depth": -1},
        treatment_params={"n_estimators": 200, "learning_rate": 0.05, "max_depth": -1},
        effect_params={"n_estimators": 200, "learning_rate": 0.05, "max_depth": -1},
        n_folds=2,
        random_state=0,
    )

    forecaster.fit(
        train_df,
        feature_cols=feature_cols,
        treatment_col="discount",
        outcome_col="demand",
        item_col="item_id",
    )

    # 3.3 On-policy prediction: use actual discounts in the test set
    y_true = test_df["demand"].values
    y_pred_on = forecaster.predict(test_df)

    mae_on = mean_absolute_error(y_true, y_pred_on)
    mse_on = mean_squared_error(y_true, y_pred_on)

    print("On-policy performance (using logged discounts):")
    print(f"  MAE = {mae_on:.3f}")
    print(f"  MSE = {mse_on:.3f}")

    # 3.4 Off-policy prediction: e.g. set all discounts to 30% or 10%
    for disc in [0.1, 0.3, 0.5]:
        y_pred_off = forecaster.predict(test_df, discount_scenario=disc)
        mae_off = mean_absolute_error(y_true, y_pred_off)
        mse_off = mean_squared_error(y_true, y_pred_off)
        print(f"\nOff-policy performance (counterfactual discount = {disc:.1f}):")
        print(f"  MAE vs actual realized demand = {mae_off:.3f}")
        print(f"  MSE vs actual realized demand = {mse_off:.3f}")

    # You can also inspect how average predicted demand changes with discount:
    for disc in [0.0, 0.2, 0.4, 0.6]:
        y_pred = forecaster.predict(test_df, discount_scenario=disc)
        print(
            f"Average predicted demand at discount {disc:.1f}: "
            f"{y_pred.mean():.2f}"
        )


ImportError: dlopen(/Users/ryuta.yoshimatsu/miniconda3/envs/databricks-ml/lib/python3.12/site-packages/pandas/_libs/interval.cpython-312-darwin.so, 0x0002): tried: '/Users/ryuta.yoshimatsu/miniconda3/envs/databricks-ml/lib/python3.12/site-packages/pandas/_libs/interval.cpython-312-darwin.so' (mach-o file, but is an incompatible architecture (have 'x86_64', need 'arm64e' or 'arm64')), '/System/Volumes/Preboot/Cryptexes/OS/Users/ryuta.yoshimatsu/miniconda3/envs/databricks-ml/lib/python3.12/site-packages/pandas/_libs/interval.cpython-312-darwin.so' (no such file), '/Users/ryuta.yoshimatsu/miniconda3/envs/databricks-ml/lib/python3.12/site-packages/pandas/_libs/interval.cpython-312-darwin.so' (mach-o file, but is an incompatible architecture (have 'x86_64', need 'arm64e' or 'arm64'))