# Test MMM Pipeline Model (with Preprocessing)

This notebook tests `MMM_CHANNEL_ROI_PIPELINE` which includes embedded:
- Adstock transformation (carryover effect)
- Hill Saturation (diminishing returns)
- StandardScaler normalization
- Ridge regression prediction

**Goal**: Verify the pipeline works end-to-end with raw spend data.

**Input**: Raw weekly spend per channel + control variables (19 columns total)
**Output**: Predicted weekly revenue

In [None]:
# Cell 1: Setup and imports
import pandas as pd
import numpy as np
from datetime import datetime

from snowflake.snowpark.context import get_active_session
from snowflake.ml.registry import Registry

session = get_active_session()
print(f"Connected to: {session.get_current_database()}.{session.get_current_schema()}")
print(f"Timestamp: {datetime.now()}")

In [None]:
# Cell 2: Load the pipeline model from Registry
print("="*60)
print("LOADING MMM PIPELINE MODEL")
print("="*60)

reg = Registry(session=session, database_name="GLOBAL_B2B_MMM", schema_name="MMM")

model_ref = reg.get_model("MMM_CHANNEL_ROI_PIPELINE")
print(f"\nModel: {model_ref.fully_qualified_name}")
print(f"Default version: {model_ref.default.version_name}")
print(f"\nAll versions:")
for v in model_ref.versions():
    print(f"  - {v.version_name}")

In [None]:
# Cell 3: Load raw input data (pivoted by channel + control vars)
print("="*60)
print("LOADING INPUT DATA")
print("="*60)

input_df = session.sql("""
    SELECT * FROM GLOBAL_B2B_MMM.MMM_PIPELINE_TEST.V_PIPELINE_MODEL_INPUT
    ORDER BY WEEK_START
""").to_pandas()

print(f"\nInput shape: {input_df.shape}")
print(f"Weeks: {input_df['WEEK_START'].min()} to {input_df['WEEK_START'].max()}")
print(f"\nColumns ({len(input_df.columns)}):")
for col in input_df.columns:
    print(f"  - {col}")

In [None]:
# Cell 4: Prepare model input (columns must match model signature)
print("="*60)
print("PREPARING MODEL INPUT")
print("="*60)

model_columns = [
    'GOOGLE_ADS_GLOBAL_ALL',
    'LINKEDIN_GLOBAL_ALL',
    'META_FACEBOOK_GLOBAL_ALL',
    'META_INSTAGRAM_GLOBAL_ALL',
    'MICROSOFT_ADS_GLOBAL_ALL',
    'PROGRAMMATIC_GLOBAL_ALL',
    'TIKTOK_GLOBAL_ALL',
    'TRADE_PUBLICATIONS_GLOBAL_ALL',
    'X_COM_GLOBAL_ALL',
    'YOUTUBE_GLOBAL_ALL',
    'TREND',
    'SIN_1',
    'COS_1',
    'SIN_2',
    'COS_2',
    'Q1_FLAG',
    'Q3_FLAG',
    'PMI_INDEX',
    'COMPETITOR_SOV'
]

X = input_df[model_columns].copy()
y_actual = input_df['ACTUAL_REVENUE'].copy()
week_start = input_df['WEEK_START'].copy()

print(f"\nModel input shape: {X.shape}")
print(f"\nSample input (first row):)")
for col in model_columns[:5]:
    print(f"  {col}: {X[col].iloc[0]:,.2f}")
print("  ...")

In [None]:
# Cell 5: Run inference using the Pipeline model
print("="*60)
print("RUNNING PIPELINE INFERENCE")
print("="*60)

mv = model_ref.version("LAST")
print(f"\nUsing version: {mv.version_name}")

print("\nRunning predictions...")
predictions = mv.run(X, function_name="predict")

print(f"\nPrediction output type: {type(predictions)}")
print(f"Prediction shape: {predictions.shape if hasattr(predictions, 'shape') else len(predictions)}")

In [None]:
# Cell 6: Evaluate predictions
print("="*60)
print("PIPELINE MODEL EVALUATION")
print("="*60)

if isinstance(predictions, pd.DataFrame):
    pred_col = [c for c in predictions.columns if 'output' in c.lower() or 'pred' in c.lower()]
    if pred_col:
        y_pred = predictions[pred_col[0]].values
    else:
        y_pred = predictions.iloc[:, 0].values
else:
    y_pred = np.array(predictions).flatten()

mask = y_actual > 0
y_actual_valid = y_actual[mask].values
y_pred_valid = y_pred[mask]

correlation = np.corrcoef(y_actual_valid, y_pred_valid)[0, 1]
mae = np.mean(np.abs(y_actual_valid - y_pred_valid))
mape = np.mean(np.abs((y_actual_valid - y_pred_valid) / y_actual_valid)) * 100

print(f"\n{'Metric':<25} {'Value':>15}")
print("-" * 42)
print(f"{'N predictions':<25} {len(y_pred_valid):>15,}")
print(f"{'Correlation':<25} {correlation:>15.4f}")
print(f"{'Mean Absolute Error':<25} ${mae:>14,.0f}")
print(f"{'MAPE':<25} {mape:>14.2f}%")
print(f"{'Avg Actual Revenue':<25} ${np.mean(y_actual_valid):>14,.0f}")
print(f"{'Avg Predicted Revenue':<25} ${np.mean(y_pred_valid):>14,.0f}")

In [None]:
# Cell 7: Save results to test schema
print("="*60)
print("SAVING RESULTS")
print("="*60)

results_df = pd.DataFrame({
    'WEEK_START': week_start,
    'ACTUAL_REVENUE': y_actual,
    'PREDICTED_REVENUE': y_pred,
    'PREDICTION_ERROR': y_pred - y_actual.values,
    'TOTAL_SPEND': X[['GOOGLE_ADS_GLOBAL_ALL', 'LINKEDIN_GLOBAL_ALL', 
                      'META_FACEBOOK_GLOBAL_ALL', 'PROGRAMMATIC_GLOBAL_ALL']].sum(axis=1)
})

session.use_schema("MMM_PIPELINE_TEST")
snowpark_df = session.create_dataframe(results_df)
snowpark_df.write.save_as_table("PIPELINE_PREDICTIONS", mode="overwrite")

print(f"\nâœ“ Results saved to GLOBAL_B2B_MMM.MMM_PIPELINE_TEST.PIPELINE_PREDICTIONS")
print(f"  Rows: {len(results_df):,}")

In [None]:
# Cell 8: Display sample predictions
print("="*60)
print("SAMPLE PREDICTIONS")
print("="*60)

sample = results_df[results_df['ACTUAL_REVENUE'] > 0].head(15).copy()
sample['ERROR_PCT'] = (sample['PREDICTED_REVENUE'] - sample['ACTUAL_REVENUE']) / sample['ACTUAL_REVENUE'] * 100

print(f"\n{'Week':<12} {'Actual':>15} {'Predicted':>15} {'Error %':>10}")
print("-" * 55)
for _, row in sample.iterrows():
    print(f"{str(row['WEEK_START'])[:10]:<12} ${row['ACTUAL_REVENUE']:>13,.0f} ${row['PREDICTED_REVENUE']:>13,.0f} {row['ERROR_PCT']:>9.1f}%")

print("\n" + "="*60)
print("PIPELINE TEST COMPLETE")
print("="*60)