# End-to-End ML Workflow in Snowflake

Welcome to the ML workflow notebook! In this hands-on session, you will learn how to build, train, deploy, and monitor a machine learning model entirely within Snowflake.

---

## What We Will Build

We will create a complete ML pipeline to **predict daily first-time player conversions** for marketing campaigns. The workflow covers:

| Stage | Snowflake Capability | What You Will Learn |
|-------|---------------------|---------------------|
| 1. Data Exploration | Snowpark DataFrames | Load and explore data with Python |
| 2. Feature Engineering | Feature Store | Create reusable, governed features |
| 3. Preprocessing | ML Transformers | Encode, scale, and prepare data |
| 4. Experiment Tracking | MLflow Integration | Track experiments and compare runs |
| 5. Model Training | Snowpark ML | Train XGBoost with hyperparameter tuning |
| 6. Model Management | Model Registry | Version and govern your models |
| 7. Deployment | Batch Inference | Score new data at scale |
| 8. Orchestration | ML Jobs | Schedule automated pipelines |
| 9. Monitoring | Prediction Quality | Track model performance over time |

---

## Prerequisites

Before running this notebook, ensure you have completed:
- `01_setup_and_data_generation.ipynb` (creates database, tables, and sample data)

---

## Snowflake ML Capabilities Overview

Snowflake provides a comprehensive set of ML tools that work together:

**Data Manipulation:**
- Snowpark DataFrames (pandas-like syntax)
- SQL integration
- Pandas API on Snowpark

**Feature Engineering:**
- Feature Store for governed, reusable features
- Point-in-time lookups to prevent data leakage
- Feature pipelines for complex transformations

**Preprocessing (snowflake.ml.modeling.preprocessing):**
- OneHotEncoder, OrdinalEncoder - Categorical encoding
- StandardScaler, MinMaxScaler, MaxAbsScaler - Numeric scaling
- LabelEncoder - Target encoding
- SimpleImputer - Missing value handling
- Pipeline - Chain multiple transformers

**Model Training (snowflake.ml.modeling):**
- XGBoost, LightGBM, CatBoost - Gradient boosting
- RandomForest, GradientBoosting - Ensemble methods
- LinearRegression, LogisticRegression - Linear models
- GridSearchCV, RandomizedSearchCV - Hyperparameter tuning

**Experiment Tracking:**
- MLflow integration for logging parameters and metrics
- Compare runs across experiments
- Visualize training progress

**Model Management:**
- Model Registry for versioning
- Lineage tracking
- Deployment-ready models


---

## Section 1: Environment Setup and Imports

### What We Are Doing
Setting up the Snowpark session and importing the necessary libraries for our ML workflow.

### Snowflake Notebook Tip
In Snowflake Notebooks, the session is automatically available. You can switch between SQL and Python cells seamlessly.


In [None]:
# Import core Snowpark libraries
from snowflake.snowpark import Session
from snowflake.snowpark.context import get_active_session
import snowflake.snowpark.functions as F
from snowflake.snowpark.types import *

# Import ML libraries
from snowflake.ml.modeling.preprocessing import (
    OneHotEncoder, 
    OrdinalEncoder, 
    StandardScaler, 
    MinMaxScaler,
    LabelEncoder
)
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.metrics import mean_absolute_error, mean_squared_error, r2_score

# Import Feature Store
from snowflake.ml.feature_store import (
    FeatureStore, 
    FeatureView, 
    Entity,
    CreationMode
)

# Import Model Registry
from snowflake.ml.registry import Registry

# Import for experiment tracking
import snowflake.ml.mlflow as mlflow

# Standard Python libraries
import numpy as np
import pandas as pd
from datetime import datetime, timedelta

# Get the active session (automatically available in Snowflake Notebooks)
session = get_active_session()

# Set the context
session.use_database("ML_WORKFLOW_DEMO")
session.use_schema("WORKSHOP")
session.use_warehouse("ML_COMPUTE_WH")

print("Session established successfully!")
print(f"Current database: {session.get_current_database()}")
print(f"Current schema: {session.get_current_schema()}")
print(f"Current warehouse: {session.get_current_warehouse()}")


---

## Section 2: Data Exploration with Snowpark

### What We Are Doing
Loading our tables into Snowpark DataFrames and exploring the data using Python syntax.

### Why This Matters
Snowpark allows you to work with data using familiar pandas-like operations while keeping all computation in Snowflake. This means:
- No data movement to your local machine
- Scalable processing on Snowflake's compute
- Seamless integration with SQL when needed

### Snowflake Capabilities: Snowpark DataFrames
- **Lazy Evaluation**: Operations are not executed until you call `.collect()`, `.show()`, or `.to_pandas()`
- **Pushdown Optimization**: All operations are translated to SQL and run on Snowflake
- **Pandas API**: Use `.to_pandas()` for local analysis or use Snowpark pandas API for distributed pandas operations


In [None]:
# Load tables into Snowpark DataFrames
# This does NOT load data into memory - it creates a reference to the table

jackpot_df = session.table("JACKPOT")
ftmp_df = session.table("FTMP")
sales_df = session.table("SALES")
marketing_df = session.table("MARKETING")
upcoming_df = session.table("UPCOMING_CAMPAIGNS")

# Check row counts
print("Table Row Counts:")
print(f"  JACKPOT: {jackpot_df.count():,} rows")
print(f"  FTMP: {ftmp_df.count():,} rows")
print(f"  SALES: {sales_df.count():,} rows")
print(f"  MARKETING: {marketing_df.count():,} rows")
print(f"  UPCOMING_CAMPAIGNS: {upcoming_df.count():,} rows")


In [None]:
# Explore the schema of each table
print("JACKPOT Schema:")
jackpot_df.schema


In [None]:
# Preview the FTMP table (our target variable)
# .show() displays results without collecting to local memory
ftmp_df.show(10)


In [None]:
# Analyze the target variable distribution
# Using Snowpark aggregation functions

ftmp_stats = ftmp_df.select(
    F.count("FTMP").alias("count"),
    F.avg("FTMP").alias("mean"),
    F.stddev("FTMP").alias("std"),
    F.min("FTMP").alias("min"),
    F.max("FTMP").alias("max"),
    F.median("FTMP").alias("median")
)

print("FTMP (Target Variable) Statistics:")
ftmp_stats.show()


In [None]:
# Create the aggregated training dataset
# We predict TOTAL daily FTMP (aggregated across all sites/channels)

# Step 1: Aggregate FTMP by date
daily_ftmp = ftmp_df.group_by("FTMP_DATE").agg(
    F.sum("FTMP").alias("TOTAL_FTMP")
).with_column_renamed("FTMP_DATE", "DATE")

print("Daily FTMP (aggregated):")
daily_ftmp.show(10)


In [None]:
# Step 2: Aggregate marketing spend by date and channel
# First, handle NULL channel_code values by replacing with 'ORGANIC'
marketing_clean = marketing_df.with_column(
    "CHANNEL_CODE",
    F.coalesce(F.col("CHANNEL_CODE"), F.lit("ORGANIC"))
).with_column(
    "COSTS_NUM",
    F.col("COSTS").cast("FLOAT")
)

# Aggregate total daily marketing spend
daily_marketing = marketing_clean.group_by("DAT").agg(
    F.sum("COSTS_NUM").alias("TOTAL_MARKETING_SPEND")
).with_column_renamed("DAT", "DATE")

print("Daily Marketing Spend:")
daily_marketing.show(10)


In [None]:
# Step 3: Aggregate sales by date
daily_sales = sales_df.group_by("DAT").agg(
    F.sum("CUSTOMER_COUNTS").alias("TOTAL_CUSTOMERS"),
    F.sum("SUM").alias("TOTAL_REVENUE")
).with_column_renamed("DAT", "DATE")

print("Daily Sales:")
daily_sales.show(10)


In [None]:
# Step 4: Prepare jackpot data (already at daily level)
jackpot_clean = jackpot_df.select(
    F.col("DS").alias("DATE"),
    F.col("EJ_JP"),
    F.col("LOTTO_JP"),
    F.col("IS_EJ_DRAW_DATE"),
    F.col("IS_LOTTO_DRAW_DATE"),
    F.col("IS_EJ_MAX_JP")
)

print("Jackpot Data:")
jackpot_clean.show(10)


In [None]:
# Step 5: Join all data sources
# This creates our base training dataset

training_base = daily_ftmp.join(
    jackpot_clean,
    daily_ftmp["DATE"] == jackpot_clean["DATE"],
    "inner"
).drop(jackpot_clean["DATE"])

training_base = training_base.join(
    daily_marketing,
    training_base["DATE"] == daily_marketing["DATE"],
    "left"
).drop(daily_marketing["DATE"])

training_base = training_base.join(
    daily_sales,
    training_base["DATE"] == daily_sales["DATE"],
    "left"
).drop(daily_sales["DATE"])

# Fill nulls in aggregated columns with 0
training_base = training_base.na.fill({
    "TOTAL_MARKETING_SPEND": 0,
    "TOTAL_CUSTOMERS": 0,
    "TOTAL_REVENUE": 0
})

print(f"Training dataset: {training_base.count()} rows")
training_base.show(10)


---

## Section 3: Feature Engineering with Feature Store

### What We Are Doing
Creating reusable, governed features using Snowflake's Feature Store.

### Why This Matters
The Feature Store provides:
- **Reusability**: Define features once, use them across multiple models
- **Governance**: Track feature lineage and ownership
- **Consistency**: Ensure training and inference use the same feature definitions
- **Point-in-Time Correctness**: Prevent data leakage with proper temporal joins

### Snowflake Feature Store Capabilities

| Component | Description |
|-----------|-------------|
| **Entity** | The primary key for features (e.g., DATE, USER_ID) |
| **FeatureView** | A collection of related features tied to an entity |
| **Spine DataFrame** | The dataset you want to enrich with features |
| **retrieve_feature_values()** | Get feature values for training or inference |
### UI Guide: Viewing the Feature Store

After running this section, you can explore your features in Snowsight:
1. Navigate to **Data** in the left sidebar
2. Click on **Feature Store** (under the AI/ML section)
3. You will see your registered entities and feature views
4. Click on a feature view to see:
   - Feature definitions
   - Data lineage
   - Usage statistics



In [None]:
# Initialize the Feature Store
fs = FeatureStore(
    session=session,
    database="ML_WORKFLOW_DEMO",
    name="WORKSHOP",
    default_warehouse="ML_COMPUTE_WH",
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST
)

print("Feature Store initialized successfully!")


In [None]:
# Create an Entity for our features
# The Entity defines the primary key (join key) for features

date_entity = Entity(
    name="DAILY_DATE",
    join_keys=["DATE"],
    desc="Daily date entity for time-series features"
)

# Register the entity
fs.register_entity(date_entity)
print("Entity 'DAILY_DATE' registered successfully!")


In [None]:
# Save the training base as a table for the Feature Store
training_base.write.save_as_table("TRAINING_BASE", mode="overwrite")

# Create feature engineering transformations
# Add day of week, month features, and lagged values

feature_sql = """
SELECT 
    DATE,
    -- Target variable
    TOTAL_FTMP,
    
    -- Log-transform the target (handles skewness)
    LN(TOTAL_FTMP + 1) as LOG_FTMP,
    
    -- Jackpot features (already numeric)
    EJ_JP,
    LOTTO_JP,
    EJ_JP / 1000000.0 as EJ_JP_MILLIONS,
    LOTTO_JP / 1000000.0 as LOTTO_JP_MILLIONS,
    
    -- Boolean features
    IS_EJ_DRAW_DATE,
    IS_LOTTO_DRAW_DATE,
    IS_EJ_MAX_JP,
    
    -- Marketing and sales features
    TOTAL_MARKETING_SPEND,
    TOTAL_CUSTOMERS,
    TOTAL_REVENUE,
    
    -- Temporal features
    DAYOFWEEK(TO_DATE(DATE)) as DAY_OF_WEEK,
    MONTH(TO_DATE(DATE)) as MONTH,
    DAYOFMONTH(TO_DATE(DATE)) as DAY_OF_MONTH,
    WEEKOFYEAR(TO_DATE(DATE)) as WEEK_OF_YEAR,
    
    -- Weekend indicator
    CASE WHEN DAYOFWEEK(TO_DATE(DATE)) IN (0, 6) THEN 1 ELSE 0 END as IS_WEEKEND,
    
    -- Lagged features (previous day values)
    LAG(TOTAL_MARKETING_SPEND, 1) OVER (ORDER BY DATE) as MARKETING_SPEND_LAG1,
    LAG(TOTAL_MARKETING_SPEND, 7) OVER (ORDER BY DATE) as MARKETING_SPEND_LAG7,
    LAG(EJ_JP, 1) OVER (ORDER BY DATE) as EJ_JP_LAG1,
    LAG(TOTAL_FTMP, 1) OVER (ORDER BY DATE) as FTMP_LAG1,
    LAG(TOTAL_FTMP, 7) OVER (ORDER BY DATE) as FTMP_LAG7,
    
    -- Rolling averages
    AVG(TOTAL_MARKETING_SPEND) OVER (ORDER BY DATE ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING) as MARKETING_SPEND_ROLLING_7D,
    AVG(TOTAL_FTMP) OVER (ORDER BY DATE ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING) as FTMP_ROLLING_7D
    
FROM TRAINING_BASE
ORDER BY DATE
"""

# Create the feature dataset
feature_df = session.sql(feature_sql)

# Remove rows with NULL lagged features (first few rows)
feature_df = feature_df.na.drop()

print(f"Feature dataset: {feature_df.count()} rows")
feature_df.show(10)


In [None]:
# Save the feature data as a table
feature_df.write.save_as_table("DAILY_FEATURES", mode="overwrite")

# Create a Feature View from the feature table
daily_features_fv = FeatureView(
    name="DAILY_CONVERSION_FEATURES",
    entities=[date_entity],
    feature_df=session.table("DAILY_FEATURES"),
    desc="Daily features for conversion prediction including jackpots, marketing, and temporal features"
)

# Register the Feature View
daily_features_fv = fs.register_feature_view(
    feature_view=daily_features_fv,
    version="V1",
    block=True  # Wait for registration to complete
)

print("Feature View 'DAILY_CONVERSION_FEATURES' registered successfully!")
print(f"Version: {daily_features_fv.version}")


In [None]:
# List all registered Feature Views
print("Registered Feature Views:")
for fv in fs.list_feature_views().collect():
    print(f"  - {fv['NAME']} (Version: {fv['VERSION']})")


---

## Section 4: Data Preprocessing with Snowflake ML

### What We Are Doing
Applying preprocessing transformations to prepare our features for model training.

### Why This Matters
Proper preprocessing ensures:
- Categorical variables are encoded for ML algorithms
- Numeric features are scaled for optimal model performance
- The same transformations are applied consistently during training and inference

### Snowflake ML Preprocessing Capabilities

| Transformer | Description | Use Case |
|-------------|-------------|----------|
| **OrdinalEncoder** | Maps categories to integers | Ordinal categories (low/medium/high) |
| **OneHotEncoder** | Creates binary columns for each category | Nominal categories (colors, countries) |
| **StandardScaler** | Scales to zero mean, unit variance | Most ML algorithms |
| **MinMaxScaler** | Scales to [0, 1] range | Neural networks, distance-based algorithms |
| **MaxAbsScaler** | Scales by maximum absolute value | Sparse data |
| **LabelEncoder** | Encodes target labels | Classification targets |
| **SimpleImputer** | Fills missing values | Handling NULLs |
| **Pipeline** | Chains multiple transformers | Complex preprocessing workflows |

### Key Concept: Fit vs Transform

- **fit()**: Learn parameters from training data (e.g., mean/std for scaling)
- **transform()**: Apply learned parameters to new data
- **fit_transform()**: Do both in one step (training only)


In [None]:
# Load the feature data for preprocessing
features_df = session.table("DAILY_FEATURES")

# Define feature columns and target
TARGET_COL = "LOG_FTMP"  # We use log-transformed target
NUMERIC_FEATURES = [
    "EJ_JP_MILLIONS", "LOTTO_JP_MILLIONS", 
    "TOTAL_MARKETING_SPEND", "TOTAL_CUSTOMERS", "TOTAL_REVENUE",
    "MARKETING_SPEND_LAG1", "MARKETING_SPEND_LAG7", "EJ_JP_LAG1",
    "FTMP_LAG1", "FTMP_LAG7", "MARKETING_SPEND_ROLLING_7D", "FTMP_ROLLING_7D"
]
CATEGORICAL_FEATURES = ["DAY_OF_WEEK", "MONTH"]
BOOLEAN_FEATURES = ["IS_EJ_DRAW_DATE", "IS_LOTTO_DRAW_DATE", "IS_EJ_MAX_JP", "IS_WEEKEND"]

print(f"Target: {TARGET_COL}")
print(f"Numeric features ({len(NUMERIC_FEATURES)}): {NUMERIC_FEATURES}")
print(f"Categorical features ({len(CATEGORICAL_FEATURES)}): {CATEGORICAL_FEATURES}")
print(f"Boolean features ({len(BOOLEAN_FEATURES)}): {BOOLEAN_FEATURES}")


In [None]:
# Create preprocessing transformers

# StandardScaler for numeric features
# Scales features to have mean=0 and std=1
numeric_scaler = StandardScaler(
    input_cols=NUMERIC_FEATURES,
    output_cols=[f"{col}_SCALED" for col in NUMERIC_FEATURES]
)

# OrdinalEncoder for categorical features
# Converts categories to integers
categorical_encoder = OrdinalEncoder(
    input_cols=CATEGORICAL_FEATURES,
    output_cols=[f"{col}_ENCODED" for col in CATEGORICAL_FEATURES]
)

print("Preprocessors created:")
print("  - StandardScaler for numeric features")
print("  - OrdinalEncoder for categorical features")


In [None]:
# Split data into training and test sets (80/20 split by date)
# Time-series split: use earlier dates for training, later dates for testing

# Get the date range
date_stats = features_df.select(
    F.min("DATE").alias("min_date"),
    F.max("DATE").alias("max_date"),
    F.count("DATE").alias("total_rows")
).collect()[0]

print(f"Date range: {date_stats['MIN_DATE']} to {date_stats['MAX_DATE']}")
print(f"Total rows: {date_stats['TOTAL_ROWS']}")

# Calculate split date (80% of data for training)
split_index = int(date_stats['TOTAL_ROWS'] * 0.8)

# Get all dates sorted
all_dates = features_df.select("DATE").distinct().sort("DATE").collect()
split_date = all_dates[split_index]['DATE']

print(f"Split date: {split_date}")
print(f"Training: dates before {split_date}")
print(f"Testing: dates on or after {split_date}")


In [None]:
# Create train/test split
train_df = features_df.filter(F.col("DATE") < split_date)
test_df = features_df.filter(F.col("DATE") >= split_date)

print(f"Training set: {train_df.count()} rows")
print(f"Test set: {test_df.count()} rows")


In [None]:
# Fit and transform the training data
# fit_transform learns the parameters AND applies the transformation

train_scaled = numeric_scaler.fit(train_df).transform(train_df)
train_encoded = categorical_encoder.fit(train_scaled).transform(train_scaled)

# Transform the test data using the same fitted transformers
# This ensures consistent preprocessing between train and test
test_scaled = numeric_scaler.transform(test_df)
test_encoded = categorical_encoder.transform(test_scaled)

print("Preprocessing complete!")
print(f"Training columns: {len(train_encoded.columns)}")
train_encoded.show(5)


---

## Section 5: Experiment Tracking with MLflow

### What We Are Doing
Setting up experiment tracking to log parameters, metrics, and models for each training run.

### Why This Matters
Experiment tracking enables:
- **Reproducibility**: Record exactly what was done for each experiment
- **Comparison**: Compare different model versions and hyperparameters
- **Collaboration**: Share experiment results with team members
- **Governance**: Track model lineage from data to deployment

### Snowflake MLflow Integration

Snowflake provides native MLflow integration that:
- Stores experiment data in Snowflake tables (no external server needed)
- Integrates with the Model Registry
- Provides a UI for comparing runs

### UI Guide: Viewing Experiments in Snowsight

After running experiments, you can view them in Snowsight:
1. Navigate to **AI / ML** in the left sidebar
2. Click on **Experiments**
3. You will see a list of all experiments
4. Click on an experiment to see:
   - Individual runs
   - Parameters and metrics for each run
   - Comparison charts
   - Model artifacts


In [None]:
# Set up MLflow experiment
# The experiment name groups all related runs together

EXPERIMENT_NAME = "FTMP_Conversion_Prediction"

# Create or get the experiment
mlflow.set_experiment(EXPERIMENT_NAME)

print(f"Experiment '{EXPERIMENT_NAME}' is ready for tracking!")


---

## Section 6: Model Training

### What We Are Doing
Training XGBoost regression models with different hyperparameters and comparing their performance.

### Why This Matters
- XGBoost is a powerful gradient boosting algorithm that works well with tabular data
- Hyperparameter tuning can significantly improve model performance
- Tracking experiments allows us to find the best model configuration

### Snowflake ML Training Capabilities

**Available Algorithms (snowflake.ml.modeling):**

| Category | Algorithms |
|----------|------------|
| **Gradient Boosting** | XGBRegressor, XGBClassifier, LightGBMRegressor, LightGBMClassifier |
| **Ensemble** | RandomForestRegressor, RandomForestClassifier, GradientBoostingRegressor |
| **Linear** | LinearRegression, LogisticRegression, Ridge, Lasso, ElasticNet |
| **Tuning** | GridSearchCV, RandomizedSearchCV |

### Key Hyperparameters for XGBoost

| Parameter | Description | Typical Range |
|-----------|-------------|---------------|
| **n_estimators** | Number of trees | 100-1000 |
| **max_depth** | Maximum tree depth | 3-10 |
| **learning_rate** | Step size for updates | 0.01-0.3 |
| **subsample** | Fraction of samples per tree | 0.6-1.0 |
| **colsample_bytree** | Fraction of features per tree | 0.6-1.0 |


In [None]:
# Prepare feature columns for training
SCALED_FEATURES = [f"{col}_SCALED" for col in NUMERIC_FEATURES]
ENCODED_FEATURES = [f"{col}_ENCODED" for col in CATEGORICAL_FEATURES]
ALL_FEATURES = SCALED_FEATURES + ENCODED_FEATURES + BOOLEAN_FEATURES

print(f"Total features for training: {len(ALL_FEATURES)}")
print(f"Features: {ALL_FEATURES}")


In [None]:
# Define hyperparameter configurations to try
hyperparameter_configs = [
    {
        "name": "baseline",
        "n_estimators": 100,
        "max_depth": 3,
        "learning_rate": 0.1,
        "subsample": 0.8
    },
    {
        "name": "deeper_trees",
        "n_estimators": 100,
        "max_depth": 6,
        "learning_rate": 0.1,
        "subsample": 0.8
    },
    {
        "name": "more_trees_slower_lr",
        "n_estimators": 200,
        "max_depth": 4,
        "learning_rate": 0.05,
        "subsample": 0.8
    }
]

print(f"Will train {len(hyperparameter_configs)} model configurations")


In [None]:
# Train models and track experiments
results = []

for config in hyperparameter_configs:
    print(f"\nTraining model: {config['name']}")
    
    # Start an MLflow run
    with mlflow.start_run(run_name=config['name']) as run:
        # Log hyperparameters
        mlflow.log_param("n_estimators", config['n_estimators'])
        mlflow.log_param("max_depth", config['max_depth'])
        mlflow.log_param("learning_rate", config['learning_rate'])
        mlflow.log_param("subsample", config['subsample'])
        mlflow.log_param("model_type", "XGBRegressor")
        
        # Create and train the model
        model = XGBRegressor(
            input_cols=ALL_FEATURES,
            label_cols=[TARGET_COL],
            output_cols=["PREDICTION"],
            n_estimators=config['n_estimators'],
            max_depth=config['max_depth'],
            learning_rate=config['learning_rate'],
            subsample=config['subsample'],
            random_state=42
        )
        
        # Fit the model
        model.fit(train_encoded)
        
        # Make predictions on test set
        test_predictions = model.predict(test_encoded)
        
        # Convert to pandas for metrics calculation
        test_results_pd = test_predictions.select(
            TARGET_COL, "PREDICTION"
        ).to_pandas()
        
        # Calculate metrics
        mae = np.mean(np.abs(test_results_pd[TARGET_COL] - test_results_pd["PREDICTION"]))
        rmse = np.sqrt(np.mean((test_results_pd[TARGET_COL] - test_results_pd["PREDICTION"])**2))
        
        # Calculate R2
        ss_res = np.sum((test_results_pd[TARGET_COL] - test_results_pd["PREDICTION"])**2)
        ss_tot = np.sum((test_results_pd[TARGET_COL] - test_results_pd[TARGET_COL].mean())**2)
        r2 = 1 - (ss_res / ss_tot)
        
        # Log metrics
        mlflow.log_metric("mae", mae)
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        
        # Store results
        results.append({
            "name": config['name'],
            "run_id": run.info.run_id,
            "mae": mae,
            "rmse": rmse,
            "r2": r2,
            "model": model
        })
        
        print(f"  MAE: {mae:.4f}")
        print(f"  RMSE: {rmse:.4f}")
        print(f"  R2: {r2:.4f}")

print("\n" + "="*50)
print("All training runs completed!")


In [None]:
# Compare results and select the best model
results_df = pd.DataFrame([
    {"Model": r["name"], "MAE": r["mae"], "RMSE": r["rmse"], "R2": r["r2"]}
    for r in results
])

print("Model Comparison:")
print(results_df.to_string(index=False))

# Select the best model (lowest RMSE)
best_result = min(results, key=lambda x: x["rmse"])
best_model = best_result["model"]

print(f"\nBest Model: {best_result['name']}")
print(f"  RMSE: {best_result['rmse']:.4f}")
print(f"  R2: {best_result['r2']:.4f}")


---

## Section 7: Model Registry

### What We Are Doing
Registering the best model to the Snowflake Model Registry for versioning, governance, and deployment.

### Why This Matters
The Model Registry provides:
- **Version Control**: Track different versions of your models
- **Metadata**: Store model descriptions, metrics, and tags
- **Lineage**: See what data and features were used to train the model
- **Deployment**: Deploy models directly from the registry

### Snowflake Model Registry Capabilities

| Feature | Description |
|---------|-------------|
| **log_model()** | Register a new model or version |
| **get_model()** | Retrieve a registered model |
| **list_models()** | View all registered models |
| **run()** | Execute batch inference with a model |
| **delete_model()** | Remove a model from the registry |
### UI Guide: Viewing the Model Registry

After registering a model, explore it in Snowsight:
1. Navigate to **AI / ML** in the left sidebar
2. Click on **Models** (or Model Registry)
3. You will see a list of all registered models
4. Click on a model to see:
   - All versions
   - Metadata and tags
   - Lineage information
   - Usage history



In [None]:
# Initialize the Model Registry
registry = Registry(session=session)

# Define model name and version
MODEL_NAME = "FTMP_CONVERSION_PREDICTOR"
MODEL_VERSION = "V1"

# Register the best model
registered_model = registry.log_model(
    model=best_model,
    model_name=MODEL_NAME,
    version_name=MODEL_VERSION,
    comment=f"XGBoost model for daily FTMP prediction. Best config: {best_result['name']}. RMSE: {best_result['rmse']:.4f}",
    metrics={
        "mae": best_result["mae"],
        "rmse": best_result["rmse"],
        "r2": best_result["r2"]
    }
)

print(f"Model registered successfully!")
print(f"  Name: {MODEL_NAME}")
print(f"  Version: {MODEL_VERSION}")


In [None]:
# List all registered models
print("Registered Models:")
models_df = registry.show_models()
models_df


In [None]:
# Retrieve the model from the registry
# This demonstrates how to load a model for inference
retrieved_model = registry.get_model(MODEL_NAME).version(MODEL_VERSION)

print(f"Retrieved model: {MODEL_NAME} version {MODEL_VERSION}")
print(f"Model type: {type(retrieved_model)}")


---

## Section 8: Batch Scoring and Deployment

### What We Are Doing
Using the registered model and Feature Store together to score new data (upcoming campaigns).

### Why This Matters
This demonstrates the production inference workflow:
1. **Feature Store**: Retrieve or generate features for new data
2. **Model Registry**: Load the deployed model
3. **Inference**: Apply the model to generate predictions

### Production Inference Flow

```
New Data (UPCOMING_CAMPAIGNS)
         |
         v
Feature Engineering (same transformations)
         |
         v
Preprocessing (fitted transformers)
         |
         v
Model Registry (get_model)
         |
         v
Predictions
```

### UI Guide: Viewing Model Lineage

After running inference, you can see the connection between models and features:
1. Navigate to **AI / ML** > **Models**
2. Click on your model
3. Look for the **Lineage** tab
4. This shows:
   - Source tables
   - Feature views used
   - Training data lineage


In [None]:
# Load the upcoming campaigns data
upcoming_campaigns = session.table("UPCOMING_CAMPAIGNS")
print(f"Upcoming campaigns to score: {upcoming_campaigns.count()} rows")
upcoming_campaigns.show(10)


In [None]:
# Step 1: Get the last known feature values for lagged features
# For future dates, we need to use historical data for lag features

last_training_date = features_df.select(F.max("DATE")).collect()[0][0]
last_known_features = features_df.filter(F.col("DATE") == last_training_date).collect()[0]

print(f"Using features from last known date: {last_training_date}")
print(f"Last FTMP: {last_known_features['TOTAL_FTMP']}")
print(f"Last Marketing Spend: {last_known_features['TOTAL_MARKETING_SPEND']}")
print("\nThese values will be used for lagged features in future date predictions.")


In [None]:
# Step 2: Generate features for upcoming campaigns using the SAME logic as training
# This ensures consistency - we're applying identical feature engineering

# Aggregate upcoming campaigns by date (same as we did for training data)
# Cast columns to match DAILY_FEATURES table types
upcoming_aggregated = upcoming_campaigns.group_by("CAMPAIGN_DATE").agg(
    F.sum("PLANNED_COSTS").cast("FLOAT").alias("TOTAL_MARKETING_SPEND"),
    F.max("EXPECTED_EJ_JP").cast("FLOAT").alias("EJ_JP"),
    F.max("EXPECTED_LOTTO_JP").cast("FLOAT").alias("LOTTO_JP"),
    F.max(F.col("IS_EJ_DRAW_DATE").cast("BOOLEAN")).alias("IS_EJ_DRAW_DATE"),
    F.max(F.col("IS_LOTTO_DRAW_DATE").cast("BOOLEAN")).alias("IS_LOTTO_DRAW_DATE")
).with_column_renamed("CAMPAIGN_DATE", "DATE")

# Apply the SAME feature transformations as in Section 3
# This is the key to Feature Store consistency!
new_features = upcoming_aggregated.with_column(
    "TOTAL_FTMP", F.lit(None).cast("FLOAT")  # NULL for future dates (we're predicting this!)
).with_column(
    "LOG_FTMP", F.lit(None).cast("FLOAT")    # NULL for future dates
).with_column(
    "EJ_JP_MILLIONS", (F.col("EJ_JP") / 1000000.0).cast("FLOAT")
).with_column(
    "LOTTO_JP_MILLIONS", (F.col("LOTTO_JP") / 1000000.0).cast("FLOAT")
).with_column(
    "IS_EJ_MAX_JP", (F.col("EJ_JP") >= 90000000).cast("BOOLEAN")
).with_column(
    "DAY_OF_WEEK", F.dayofweek(F.to_date(F.col("DATE"))).cast("INTEGER")
).with_column(
    "MONTH", F.month(F.to_date(F.col("DATE"))).cast("INTEGER")
).with_column(
    "DAY_OF_MONTH", F.dayofmonth(F.to_date(F.col("DATE"))).cast("INTEGER")
).with_column(
    "WEEK_OF_YEAR", F.weekofyear(F.to_date(F.col("DATE"))).cast("INTEGER")
).with_column(
    "IS_WEEKEND", F.when(F.dayofweek(F.to_date(F.col("DATE"))).isin([0, 6]), 1).otherwise(0).cast("INTEGER")
).with_column(
    # Lagged features use last known historical values
    "MARKETING_SPEND_LAG1", F.lit(float(last_known_features['TOTAL_MARKETING_SPEND']))
).with_column(
    "MARKETING_SPEND_LAG7", F.lit(float(last_known_features['MARKETING_SPEND_LAG7'] or 0))
).with_column(
    "EJ_JP_LAG1", F.lit(float(last_known_features['EJ_JP']))
).with_column(
    "FTMP_LAG1", F.lit(float(last_known_features['TOTAL_FTMP']))
).with_column(
    "FTMP_LAG7", F.lit(float(last_known_features['FTMP_LAG7'] or 0))
).with_column(
    "MARKETING_SPEND_ROLLING_7D", F.lit(float(last_known_features['MARKETING_SPEND_ROLLING_7D'] or 0))
).with_column(
    "FTMP_ROLLING_7D", F.lit(float(last_known_features['FTMP_ROLLING_7D'] or 0))
).with_column(
    "TOTAL_CUSTOMERS", F.lit(float(last_known_features['TOTAL_CUSTOMERS']))
).with_column(
    "TOTAL_REVENUE", F.lit(float(last_known_features['TOTAL_REVENUE']))
)

print(f"Generated features for {new_features.count()} future dates")
print("Feature columns match DAILY_FEATURES table structure")
new_features.show(5)


In [None]:
# Step 3: Insert new features into the Feature Store table (DAILY_FEATURES)
# This adds the future dates to the same table used for training

# Get the schema of DAILY_FEATURES to ensure column order matches
daily_features_schema = session.table("DAILY_FEATURES").schema
column_order = [field.name for field in daily_features_schema.fields]
print(f"DAILY_FEATURES columns: {column_order}")

# Select columns from new_features in the same order as DAILY_FEATURES
# This ensures the insert will work correctly
new_features_ordered = new_features.select(column_order)

# Check if these dates already exist (avoid duplicates)
existing_dates = session.table("DAILY_FEATURES").select("DATE")
new_dates_only = new_features_ordered.join(existing_dates, on="DATE", how="left_anti")

row_count = new_dates_only.count()
if row_count > 0:
    # Append new feature rows to DAILY_FEATURES
    new_dates_only.write.mode("append").save_as_table("DAILY_FEATURES")
    print(f"Inserted {row_count} new feature rows into DAILY_FEATURES")
else:
    print("Feature rows for these dates already exist in DAILY_FEATURES")

# Verify the insertion
total_rows = session.table("DAILY_FEATURES").count()
print(f"DAILY_FEATURES now has {total_rows} total rows")

In [None]:
# Step 4: Retrieve features from the Feature Store using retrieve_feature_values()
# This is the KEY step that ensures consistency with training!

# Create a spine DataFrame with the dates we want to predict
spine_df = upcoming_campaigns.select(
    F.col("CAMPAIGN_DATE").alias("DATE")
).distinct()

print(f"Retrieving features for {spine_df.count()} dates from Feature Store...")

# Retrieve features using the Feature Store
# This uses the SAME Feature View we used for training
inference_features = fs.retrieve_feature_values(
    spine_df=spine_df,
    features=[daily_features_fv],
    exclude_columns=["TOTAL_FTMP", "LOG_FTMP"]  # Exclude target columns (we're predicting these!)
)

print("Features retrieved from Feature Store!")
print(f"Columns: {inference_features.columns}")
inference_features.show(5)

In [None]:
# Step 5: Apply the SAME preprocessing transformers used during training
# The numeric_scaler was fitted on training data - we apply the same transformation
inference_processed = numeric_scaler.transform(inference_features)

print("Preprocessing applied (same StandardScaler from training)")
print(f"Ready for inference with {len(inference_processed.columns)} columns")
inference_processed.show(5)


In [None]:
# Step 6: Make predictions using the model from the Model Registry
# This completes the Feature Store + Model Registry workflow:
# - Features came from Feature Store (consistent with training)
# - Model came from Model Registry (versioned, governed)

predictions = retrieved_model.run(inference_processed, function_name="predict")

# The prediction is in log scale, so we need to convert back
predictions_with_ftmp = predictions.with_column(
    "PREDICTED_FTMP",
    F.exp(F.col("PREDICTION")) - 1  # Reverse the log transformation
)

# Select final columns
final_predictions = predictions_with_ftmp.select(
    "DATE",
    "TOTAL_MARKETING_SPEND",
    "EJ_JP_MILLIONS",
    "IS_EJ_DRAW_DATE",
    "IS_LOTTO_DRAW_DATE",
    F.round(F.col("PREDICTED_FTMP"), 0).alias("PREDICTED_FTMP")
).sort("DATE")

print("Predictions for Upcoming Campaigns:")
final_predictions.show()

# Save predictions to table for monitoring and analysis
final_predictions.write.mode("overwrite").save_as_table("FTMP_PREDICTIONS")
print("\nPredictions saved to FTMP_PREDICTIONS table")


---

## Section 9: Orchestration with ML Jobs

### What We Are Doing
Creating a scheduled task to automate the scoring pipeline.

### Why This Matters
Automation ensures:
- Predictions are generated on a regular schedule
- No manual intervention required
- Consistent, reproducible results

### Snowflake Orchestration Capabilities

| Feature | Description |
|---------|-------------|
| **Tasks** | Scheduled SQL or Python execution |
| **DAGs** | Chain tasks with dependencies |
| **Streams** | Trigger on data changes |
| **Dynamic Tables** | Automatically refresh materialized views |
### UI Guide: Monitoring Tasks

After creating a task, monitor it in Snowsight:
1. Navigate to **Data** > **Databases** > **ML_WORKFLOW_DEMO**
2. Click on **Tasks** under the WORKSHOP schema
3. You will see the task with its schedule and status
4. Click on the task to see:
   - Execution history
   - Run duration
   - Error logs (if any)



In [None]:
-- Create a stored procedure to run the scoring pipeline
-- This procedure can be scheduled as a task

CREATE OR REPLACE PROCEDURE RUN_FTMP_SCORING()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
    -- In production, this would:
    -- 1. Load new campaign data
    -- 2. Generate features
    -- 3. Apply the model
    -- 4. Save predictions
    
    -- For demo purposes, we'll just update the prediction timestamp
    CREATE OR REPLACE TABLE SCORING_LOG AS
    SELECT 
        CURRENT_TIMESTAMP() AS last_run,
        'SUCCESS' AS status,
        (SELECT COUNT(*) FROM FTMP_PREDICTIONS) AS predictions_count;
    
    RETURN 'Scoring pipeline completed successfully';
END;
$$;


In [None]:
-- Create a scheduled task to run the scoring pipeline daily
-- The task is created in SUSPENDED state for safety

CREATE OR REPLACE TASK DAILY_FTMP_SCORING
    WAREHOUSE = ML_COMPUTE_WH
    SCHEDULE = 'USING CRON 0 6 * * * UTC'  -- Run at 6 AM UTC daily
    COMMENT = 'Daily scoring of FTMP predictions using registered model'
AS
    CALL RUN_FTMP_SCORING();

-- Note: To activate the task, run: ALTER TASK DAILY_FTMP_SCORING RESUME;
-- For the workshop, we'll leave it suspended

SELECT 'Task DAILY_FTMP_SCORING created (suspended)' AS status;


In [None]:
-- View the task details
SHOW TASKS LIKE 'DAILY_FTMP_SCORING';


### UI Guide: Monitoring Tasks

Now that you have created the task, explore it in Snowsight:

**Step 1: Navigate to Tasks**
- Open a new Snowsight tab (Catalog menu is hidden in notebook mode)
- Navigate to **Catalog** > **Databases** > **ML_WORKFLOW_DEMO**
- Expand the **WORKSHOP** schema
- Click on **Tasks**

**Step 2: Find Your Task**
- Look for `DAILY_FTMP_SCORING` in the tasks list
- Click on it to open the task details

**Step 3: View Task Configuration**
- See the schedule (6 AM UTC daily)
- View the SQL command that will be executed
- Check the current state (SUSPENDED)

**Step 4: Task Management**
- To activate: `ALTER TASK DAILY_FTMP_SCORING RESUME;`
- To pause: `ALTER TASK DAILY_FTMP_SCORING SUSPEND;`
- View execution history after the task runs

**Step 5: Monitor Execution**
- Once activated, the task runs on schedule
- Check the **Run History** tab for execution logs
- View duration, status, and any errors

> **Note:** For this workshop, we leave the task suspended. In production, you would activate it to run the scoring pipeline automatically.

---

## Section 10: ML Observability with Model Monitor

### What We Are Doing
Creating a Model Monitor to automatically track prediction quality, detect drift, and monitor model behavior over time.

### Why This Matters
According to [Snowflake ML Observability documentation](https://docs.snowflake.com/en/developer-guide/snowflake-ml/model-registry/model-observability):
- **Data Drift**: Input data distributions can change over time
- **Concept Drift**: The relationship between features and target can shift
- **Model Degradation**: Prediction accuracy may decrease

### How Model Monitors Work

| Component | Description |
|-----------|-------------|
| **Source Table** | Table containing predictions with ID, timestamp, features |
| **Baseline Table** | Optional reference data for drift comparison |
| **Aggregation Window** | Time granularity for metrics (minimum 1 day) |
| **Refresh Interval** | How often the monitor updates |

### Prerequisites
- Model must be in the Snowflake Model Registry
- Source table with: ID column, TIMESTAMP_NTZ column, prediction column (NUMBER), feature columns
- Supports regression and binary classification models



In [None]:
-- First, prepare the predictions table with required columns for Model Monitor
-- Requirements: ID (unique), TIMESTAMP_NTZ, prediction as NUMBER, features

CREATE OR REPLACE TABLE FTMP_PREDICTIONS_FOR_MONITORING AS
SELECT 
    ROW_NUMBER() OVER (ORDER BY DATE) AS PREDICTION_ID,
    DATE::TIMESTAMP_NTZ AS PREDICTION_TIMESTAMP,
    TOTAL_MARKETING_SPEND::NUMBER AS TOTAL_MARKETING_SPEND,
    EJ_JP_MILLIONS::NUMBER AS EJ_JP_MILLIONS,
    CASE WHEN IS_EJ_DRAW_DATE THEN 1 ELSE 0 END AS IS_EJ_DRAW_DATE,
    CASE WHEN IS_LOTTO_DRAW_DATE THEN 1 ELSE 0 END AS IS_LOTTO_DRAW_DATE,
    PREDICTED_FTMP::NUMBER AS PREDICTED_FTMP
FROM FTMP_PREDICTIONS;

SELECT * FROM FTMP_PREDICTIONS_FOR_MONITORING;


In [None]:
-- Create the Model Monitor
-- Note: Model Monitor requires the model to be in the Model Registry
-- The monitor automatically tracks drift, performance metrics, and data quality

CREATE OR REPLACE MODEL MONITOR FTMP_PREDICTION_MONITOR
  WITH
    MODEL = FTMP_CONVERSION_PREDICTOR
    VERSION = V1
    FUNCTION = PREDICT
    SOURCE = FTMP_PREDICTIONS_FOR_MONITORING
    ID_COLUMNS = (PREDICTION_ID)
    TIMESTAMP_COLUMN = PREDICTION_TIMESTAMP
    PREDICTION_SCORE_COLUMNS = (PREDICTED_FTMP)
    WAREHOUSE = ML_COMPUTE_WH
    AGGREGATION_WINDOW = '1 day'
    REFRESH_INTERVAL = '1 day';

-- Note: If you get an error about privileges, ensure you have CREATE MODEL MONITOR on the schema
-- Also ensure your model is properly registered in the Model Registry


In [None]:
-- View all model monitors in the schema
SHOW MODEL MONITORS;

-- Get details about the specific monitor
-- DESCRIBE MODEL MONITOR FTMP_PREDICTION_MONITOR;


### UI Guide: Viewing the Model Monitor Dashboard

Now that you have created a Model Monitor, explore it in Snowsight:

**Step 1: Navigate to Models**
- In the left sidebar, click on **AI / ML**
- Click on **Models**

**Step 2: Find Your Model**
- Look for `FTMP_CONVERSION_PREDICTOR` in the models list
- Click on it to open the model details page

**Step 3: View the Monitors Tab**
- On the model details page, find the **Monitors** section
- Click on `FTMP_PREDICTION_MONITOR` to view the dashboard

**Step 4: Explore the Dashboard**
The Model Monitor dashboard shows:
- **Prediction Volume**: Number of predictions over time
- **Drift Metrics**: Distribution changes in features and predictions
- **Statistical Metrics**: Counts, null values, and data quality
- **Time Range Selector**: Adjust the date range for analysis

**Step 5: Configure Alerts (Optional)**
- Click **Settings** to customize which metrics are displayed
- Use **Compare model** to compare different model versions
- Set up alerts using Snowflake's notification system

### Querying Monitor Metrics Programmatically

You can also query monitor metrics using SQL:

```sql
-- Get drift metrics
SELECT * FROM TABLE(MODEL_MONITOR_DRIFT_METRIC(
    'FTMP_PREDICTION_MONITOR',
    'PSI',  -- Population Stability Index
    'TOTAL_MARKETING_SPEND',
    'DAY',
    '2024-01-01'::TIMESTAMP_NTZ,
    CURRENT_TIMESTAMP()
));

-- Get statistical metrics
SELECT * FROM TABLE(MODEL_MONITOR_STAT_METRIC(
    'FTMP_PREDICTION_MONITOR',
    'COUNT',
    'DAY',
    '2024-01-01'::TIMESTAMP_NTZ,
    CURRENT_TIMESTAMP()
));
```



---

## Summary

Congratulations! You have completed the end-to-end ML workflow in Snowflake.

### What You Learned

| Section | Capability | Key Takeaway |
|---------|-----------|--------------|
| 1. Setup | Snowpark Session | Python runs natively in Snowflake |
| 2. Data Exploration | Snowpark DataFrames | Pandas-like syntax with Snowflake scale |
| 3. Feature Engineering | Feature Store | Reusable, governed features |
| 4. Preprocessing | ML Transformers | StandardScaler, OrdinalEncoder, Pipeline |
| 5. Experiment Tracking | MLflow Integration | Track experiments without external tools |
| 6. Model Training | Snowpark ML | XGBoost, LightGBM, and more |
| 7. Model Registry | Version Control | Governed model deployment |
| 8. Batch Scoring | Model Inference | Feature Store + Registry for production |
| 9. Orchestration | Tasks | Automated, scheduled pipelines |
| 10. Monitoring | Prediction Quality | Detect drift and degradation |

### Objects Created in This Workshop

| Object | Type | Purpose |
|--------|------|---------|
| `ML_WORKFLOW_DEMO.WORKSHOP` | Schema | Workshop namespace |
| `TRAINING_BASE` | Table | Joined training data |
| `DAILY_FEATURES` | Table | Engineered features |
| `FTMP_PREDICTIONS` | Table | Model predictions |
| `DAILY_DATE` | Entity | Feature Store entity |
| `DAILY_CONVERSION_FEATURES` | Feature View | Registered features |
| `FTMP_CONVERSION_PREDICTOR` | Model | Registered XGBoost model |
| `DAILY_FTMP_SCORING` | Task | Scheduled scoring job |
| `PREDICTION_MONITORING` | View | Monitoring dashboard |

### Next Steps

1. **Explore the UI**: Navigate to AI/ML in Snowsight to see your experiments, models, and features
2. **Try Different Models**: Experiment with LightGBM, RandomForest, or other algorithms
3. **Add More Features**: Enhance the feature engineering with additional transformations
4. **Production Deployment**: Activate the task and set up alerts for monitoring
5. **Advanced Topics**: Explore CI/CD integration, A/B testing, and model explainability

### Additional Resources

- [Snowflake ML Documentation](https://docs.snowflake.com/en/guides-overview-ml)
- [Snowpark Python API Reference](https://docs.snowflake.com/en/developer-guide/snowpark/python/index)
- [Feature Store Guide](https://docs.snowflake.com/en/developer-guide/snowflake-ml/feature-store)
- [Model Registry Guide](https://docs.snowflake.com/en/developer-guide/snowflake-ml/model-registry)

---

**Thank you for participating in this workshop!**


In [None]:
# Save predictions for monitoring
final_predictions.write.save_as_table("FTMP_PREDICTIONS", mode="overwrite")
print("Predictions saved to FTMP_PREDICTIONS table")
