In [None]:
#@title Install / imports / config
!pip -q install -U "numpy>=2.0.0,<2.3" scikit-learn==1.5.2 matplotlib==3.9.2 pandas==2.2.2

import numpy as np, pandas as pd, matplotlib.pyplot as plt, json, math, warnings, random, os
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.linear_model import ElasticNetCV, LinearRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.ensemble import HistGradientBoostingRegressor

warnings.filterwarnings("ignore")
os.environ["PYTHONHASHSEED"]="0"; random.seed(42); np.random.seed(42)
plt.rcParams["figure.figsize"] = (10,4); plt.rcParams["axes.grid"] = True


In [None]:
#@title Upload your CSV (pick "Assessment 2 - MMM Weekly.csv")
from google.colab import files
uploaded = files.upload()
csv_path = list(uploaded.keys())[0]
df = pd.read_csv(csv_path)
df.head(3)


In [None]:
class DateFeatures(BaseEstimator, TransformerMixin):
    def __init__(self, date_col="week"): self.date_col = date_col
    def fit(self, X, y=None): return self
    def transform(self, X):
        xx = X.copy()
        xx[self.date_col] = pd.to_datetime(xx[self.date_col], errors="coerce")
        xx["year"] = xx[self.date_col].dt.year
        xx["weekofyear"] = xx[self.date_col].dt.isocalendar().week.astype(int)
        xx["month"] = xx[self.date_col].dt.month
        xx["t"] = np.arange(len(xx))
        return xx[["year","weekofyear","month","t"]]

def adstock_geometric(x, lam=0.6):
    x = np.nan_to_num(np.asarray(x, float), nan=0.0)
    out = np.zeros_like(x, dtype=float)
    for i, v in enumerate(x): out[i] = v + (lam * (out[i-1] if i>0 else 0.0))
    return out

def mape(y_true, y_pred, eps=1.0):
    yt = np.maximum(np.asarray(y_true, float), eps)
    yp = np.asarray(y_pred, float)
    return np.mean(np.abs((yt - yp) / yt)) * 100.0

def print_metrics(name, y_true, y_pred):
    res = {"R2": r2_score(y_true, y_pred),
           "RMSE": mean_squared_error(y_true, y_pred, squared=False),
           "MAPE_%": mape(y_true, y_pred)}
    print(name, json.dumps(res, indent=2)); return res


In [None]:
df = df.copy()
df["week"] = pd.to_datetime(df["week"], errors="coerce")
df = df.sort_values("week").reset_index(drop=True)

target_col, mediator_col = "revenue", "google_spend"
social_cols = ["facebook_spend","tiktok_spend","instagram_spend","snapchat_spend"]
controls    = ["social_followers","emails_send","sms_send"]
price_col, promo_col = "average_price", "promotions"

X_dates = DateFeatures("week").transform(df)
n = len(df); test_size = max(1, int(math.ceil(0.20*n))); split_ix = n - test_size
print("Rows:", n, "Holdout:", test_size)


In [None]:
tscv = TimeSeriesSplit(n_splits=5)
lam_grid = [0.3, 0.5, 0.6, 0.7, 0.9]
best = (-1e9, None, None)

for lam in lam_grid:
    X_ad = pd.DataFrame({f"{c}_adstock_log1p": np.log1p(adstock_geometric(df[c].fillna(0), lam)) for c in social_cols})
    X1 = pd.concat([X_ad, X_dates, df[controls].fillna(0)], axis=1)
    y1 = np.log1p(df[mediator_col].fillna(0))
    pipe = Pipeline([("scaler", StandardScaler()),
                     ("enet", ElasticNetCV(l1_ratio=[0.1,0.5,0.9], alphas=np.logspace(-4,1,40),
                                           cv=tscv, max_iter=20000, random_state=42))])
    oof = np.zeros_like(y1, dtype=float)
    for tr, te in tscv.split(X1):
        pipe.fit(X1.iloc[tr], y1.iloc[tr]); oof[te] = pipe.predict(X1.iloc[te])
    r2o = r2_score(y1, oof)
    if r2o > best[0]: best = (r2o, lam, pipe.fit(X1, y1))

lam_star, model1 = best[1], best[2]
print("Best mediator λ:", lam_star, " | OOF R2:", best[0])

df["g_log1p_hat"] = model1.predict(pd.concat([
    pd.DataFrame({f"{c}_adstock_log1p": np.log1p(adstock_geometric(df[c].fillna(0), lam_star)) for c in social_cols}),
    X_dates, df[controls].fillna(0)
], axis=1))
df["g_hat"] = np.expm1(df["g_log1p_hat"])


In [None]:
def make_stage2_features(dd):
    X = pd.DataFrame({"log1p_avg_price": np.log1p(dd[price_col].fillna(0)),
                      "log1p_g_hat":     np.log1p(dd["g_hat"].fillna(0))})
    return pd.concat([X, dd[[promo_col]+controls].fillna(0), X_dates], axis=1)

X2 = make_stage2_features(df)
y2 = np.log1p(df[target_col].clip(lower=1e-6))
X2_tr, X2_te = X2.iloc[:split_ix], X2.iloc[split_ix:]
y2_tr, y2_te = y2.iloc[:split_ix], y2.iloc[split_ix:]

enet = Pipeline([("scaler", StandardScaler()),
                 ("enet", ElasticNetCV(l1_ratio=[0.1,0.5,0.9], alphas=np.logspace(-4,1,40),
                                       cv=TimeSeriesSplit(n_splits=5), max_iter=20000, random_state=42))])
enet.fit(X2_tr, y2_tr)
pred_tr = np.expm1(enet.predict(X2_tr)); pred_te = np.expm1(enet.predict(X2_te))
act_tr  = np.expm1(y2_tr);                act_te  = np.expm1(y2_te)

print_metrics("ElasticNet — Train", act_tr, pred_tr)
print_metrics("ElasticNet — Test",  act_te, pred_te)

plt.plot(df["week"].iloc[:split_ix], act_tr, label="Actual (train)")
plt.plot(df["week"].iloc[:split_ix], pred_tr, label="Pred (train)"); plt.legend(); plt.title("Train"); plt.show()
plt.plot(df["week"].iloc[split_ix:], act_te, label="Actual (test)")
plt.plot(df["week"].iloc[split_ix:], pred_te, label="Pred (test)"); plt.legend(); plt.title("Test"); plt.show()

coef = pd.Series(enet.named_steps["enet"].coef_, index=X2_tr.columns).sort_values(ascending=False)
coef.to_frame("coef")


In [None]:
Xad = pd.DataFrame({f"ad_{c}": adstock_geometric(df[c].fillna(0), lam_star) for c in social_cols})
resid = {}
gX = df["g_log1p_hat"].values.reshape(-1,1)
for col in Xad.columns:
    lr = LinearRegression().fit(gX, Xad[col].values)
    resid[col+"_resid"] = Xad[col].values - lr.predict(gX)
X_resid = pd.DataFrame(resid)

X2b = pd.concat([make_stage2_features(df), X_resid], axis=1)
X2b_tr, X2b_te = X2b.iloc[:split_ix], X2b.iloc[split_ix:]

hgb = HistGradientBoostingRegressor(max_depth=6, learning_rate=0.05, max_iter=1000, min_samples_leaf=8, random_state=42)
hgb.fit(X2b_tr, y2_tr)
pred_tr_b = np.expm1(hgb.predict(X2b_tr)); pred_te_b = np.expm1(hgb.predict(X2b_te))
print_metrics("Boosting — Train", act_tr, pred_tr_b)
print_metrics("Boosting — Test",  act_te, pred_te_b)

plt.plot(df["week"].iloc[split_ix:], act_te, label="Actual (test)")
plt.plot(df["week"].iloc[split_ix:], pred_te_b, label="Pred (test)"); plt.legend(); plt.title("Test — Boosting"); plt.show()


In [None]:
last = df.iloc[[-1]].copy()
grid = np.linspace(0.9*last[price_col].item(), 1.1*last[price_col].item(), 9)
rows = []
for promo in [0,1]:
    for p in grid:
        tmp = last.copy(); tmp[price_col] = p
        Xtmp = make_stage2_features(tmp.assign(g_hat=last["g_hat"].values))
        rows.append({"promotion": promo, "avg_price": p, "pred_revenue": float(np.expm1(enet.predict(Xtmp))[0])})
sens = pd.DataFrame(rows); sens
for promo in [0,1]:
    sub = sens[sens["promotion"]==promo]
    plt.plot(sub["avg_price"], sub["pred_revenue"], label=f"Promo={promo}")
plt.title("Sensitivity to Average Price (±10%)"); plt.xlabel("Average Price"); plt.ylabel("Predicted Revenue"); plt.legend(); plt.show()
