# Train Cold Start CLV Model

This notebook trains an XGBoost model to predict 12-month customer lifetime value for new customers.

**Model Purpose**: Predict CLV for customers in their first 30 days, enabling early segmentation and personalized onboarding.

**Steps**:
1. Load and preprocess data
2. Engineer features with business rationale
3. Train XGBoost with hyperparameter tuning
4. Evaluate model performance
5. Deploy to Snowflake Model Registry
6. Create Dynamic Table for continuous inference

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import xgboost as xgb
from snowflake.snowpark.context import get_active_session
from snowflake.ml.registry import Registry
from snowflake.ml.model import task
from snowflake.ml.model.target_platform import TargetPlatform
from snowflake.ml.modeling import tune
from snowflake.ml.modeling.tune.search import BayesOpt
from snowflake.ml.data.data_connector import DataConnector
import matplotlib.pyplot as plt
import seaborn as sns

import warnings
warnings.filterwarnings('ignore')

# Get active Snowflake session
session = get_active_session()



## Configuration

Set your database and schema here:

In [None]:
# Database and schema configuration
DATABASE = 'ML_DEMO'
SCHEMA = 'PUBLIC'

# Set context
session.use_database(DATABASE)
session.use_schema(SCHEMA)

print(f"Using database: {DATABASE}")
print(f"Using schema: {SCHEMA}")
print(f"Current warehouse: {session.get_current_warehouse()}")
print(f"Current role: {session.get_current_role()}")

## Load Data from Snowflake

In [None]:
# Read data from Snowflake table
table_name = 'COLDSTART_CUSTOMERS'
df = session.table(table_name).to_pandas()

# Convert date columns
df['SIGNUP_DATE'] = pd.to_datetime(df['SIGNUP_DATE'])
df['FIRST_PURCHASE_DATE'] = pd.to_datetime(df['FIRST_PURCHASE_DATE'])

print(f"Loaded {len(df)} customer records from {DATABASE}.{SCHEMA}.{table_name}")
print(f"Features: {df.columns.tolist()}")
df.head()

## Feature Engineering

### Binary Indicator: Made First Purchase
**Rationale**: The single strongest signal for cold start customers. Customers who purchase within 30 days show commitment and intent.

In [None]:
df['MADE_FIRST_PURCHASE'] = (df['FIRST_PURCHASE_AMOUNT'] > 0).astype(int)

print(f"Conversion rate: {df['MADE_FIRST_PURCHASE'].mean():.2%}")

### Acquisition Quality Score
**Rationale**: Different channels bring different quality customers. Organic and referral typically outperform paid channels due to higher intent.

In [None]:
channel_quality_map = {
    'referral': 5,
    'direct': 4,
    'organic_search': 4,
    'email': 3,
    'affiliate': 2,
    'paid_search': 2,
    'social_media': 1
}

df['CHANNEL_QUALITY_SCORE'] = df['ACQUISITION_CHANNEL'].map(channel_quality_map)

### Early Engagement Composite Score
**Rationale**: Aggregating multiple engagement signals (visits, emails, views) into a single metric captures overall interest level.

In [None]:
df['ENGAGEMENT_SCORE'] = (
    df['WEBSITE_VISITS_30D'] * 1.0 +
    df['EMAIL_OPENS_30D'] * 2.0 +
    df['EMAIL_CLICKS_30D'] * 5.0 +
    df['ITEMS_VIEWED_30D'] * 1.5 +
    df['CART_ADDS_30D'] * 3.0
)

print(f"Average engagement score: {df['ENGAGEMENT_SCORE'].mean():.2f}")

### Email Engagement Rate
**Rationale**: Click-through rate indicates genuine interest vs passive behavior.

In [None]:
df['EMAIL_ENGAGEMENT_RATE'] = df['EMAIL_CLICKS_30D'] / df['EMAIL_OPENS_30D'].replace(0, np.nan)
df['EMAIL_ENGAGEMENT_RATE'].fillna(0, inplace=True)

### Cart-to-Purchase Conversion
**Rationale**: High cart adds with no purchase may indicate friction or price sensitivity.

In [None]:
df['CART_CONVERSION_RATE'] = df['MADE_FIRST_PURCHASE'] / df['CART_ADDS_30D'].replace(0, np.nan)
df['CART_CONVERSION_RATE'].fillna(0, inplace=True)

## Prepare Features and Target

In [None]:
categorical_features = [
    'ACQUISITION_CHANNEL',
    'ACQUISITION_SOURCE', 
    'DEVICE_TYPE',
    'AGE_GROUP',
    'REGION',
    'AREA_TYPE',
    'FIRST_PURCHASE_CATEGORY'
]

numerical_features = [
    'CHANNEL_QUALITY_SCORE',
    'DAYS_TO_FIRST_PURCHASE',
    'FIRST_PURCHASE_AMOUNT',
    'WEBSITE_VISITS_30D',
    'EMAIL_OPENS_30D',
    'EMAIL_CLICKS_30D',
    'ITEMS_VIEWED_30D',
    'CART_ADDS_30D',
    'MADE_FIRST_PURCHASE',
    'ENGAGEMENT_SCORE',
    'EMAIL_ENGAGEMENT_RATE',
    'CART_CONVERSION_RATE'
]

df[categorical_features] = df[categorical_features].fillna('unknown')
df[numerical_features] = df[numerical_features].fillna(0)

X = df[categorical_features + numerical_features]
y = df['ACTUAL_12M_LTV']

print(f"Feature matrix shape: {X.shape}")
print(f"Target variable shape: {y.shape}")

## Train-Validation-Test Split

**Temporal split**: Using signup date to simulate real deployment where we predict for future customers.

In [None]:
df_sorted = df.sort_values('SIGNUP_DATE').reset_index(drop=True)

train_size = int(0.7 * len(df_sorted))
val_size = int(0.15 * len(df_sorted))

train_df = df_sorted.iloc[:train_size]
val_df = df_sorted.iloc[train_size:train_size + val_size]
test_df = df_sorted.iloc[train_size + val_size:]

X_train = train_df[categorical_features + numerical_features]
y_train = train_df['ACTUAL_12M_LTV']

X_val = val_df[categorical_features + numerical_features]
y_val = val_df['ACTUAL_12M_LTV']

X_test = test_df[categorical_features + numerical_features]
y_test = test_df['ACTUAL_12M_LTV']

print(f"Train set: {len(X_train)} samples")
print(f"Validation set: {len(X_val)} samples")
print(f"Test set: {len(X_test)} samples")

## Build Preprocessing Pipeline

**Why preprocessing matters**:
- StandardScaler: Normalizes numerical features for better convergence
- OneHotEncoder: Converts categorical variables to numerical format XGBoost can process

In [None]:
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numerical_features),
        ('cat', OneHotEncoder(handle_unknown='ignore', sparse_output=False), categorical_features)
    ]
)

X_train_processed = preprocessor.fit_transform(X_train)
X_val_processed = preprocessor.transform(X_val)
X_test_processed = preprocessor.transform(X_test)

print(f"Processed feature dimensionality: {X_train_processed.shape[1]}")

## Hyperparameter Tuning with Snowflake ML HPO

**Overfitting prevention strategies**:
- `max_depth`: Limits tree complexity
- `min_child_weight`: Requires minimum samples per leaf
- `subsample`: Row sampling per tree
- `colsample_bytree`: Column sampling per tree
- `reg_alpha`, `reg_lambda`: L1 and L2 regularization
- Temporal validation: Uses separate validation set to evaluate hyperparameters

In [None]:
# Prepare data for HPO using DataConnector
train_connector = DataConnector.from_dataframe(
    session.create_dataframe(
        pd.concat([X_train, y_train.rename('ACTUAL_12M_LTV')], axis=1)
    )
)

val_connector = DataConnector.from_dataframe(
    session.create_dataframe(
        pd.concat([X_val, y_val.rename('ACTUAL_12M_LTV')], axis=1)
    )
)

# Define search space with Snowflake ML tune functions
search_space = {
    "n_estimators": tune.uniform(100, 300),
    "max_depth": tune.uniform(3, 8),
    "learning_rate": tune.loguniform(0.01, 0.3),
    "min_child_weight": tune.uniform(3, 7),
    "subsample": tune.uniform(0.7, 0.9),
    "colsample_bytree": tune.uniform(0.7, 0.9),
    "reg_alpha": tune.uniform(0, 0.5),
    "reg_lambda": tune.uniform(1, 2)
}

# Store preprocessor and feature names globally for training function
global_preprocessor = preprocessor
global_categorical_features = categorical_features
global_numerical_features = numerical_features

# Define training function for HPO
def train_func():
    from snowflake.ml.modeling.tune import get_tuner_context
    import xgboost as xgb
    from sklearn.metrics import mean_squared_error
    
    # Get HPO context
    context = get_tuner_context()
    params = context.get_hyper_params()
    dataset_map = context.get_dataset_map()
    
    # Load training and validation data
    train_df = dataset_map['train'].to_pandas()
    val_df = dataset_map['val'].to_pandas()
    
    # Separate features and target
    X_train_hpo = train_df[global_categorical_features + global_numerical_features]
    y_train_hpo = train_df['ACTUAL_12M_LTV']
    
    X_val_hpo = val_df[global_categorical_features + global_numerical_features]
    y_val_hpo = val_df['ACTUAL_12M_LTV']
    
    # Preprocess features
    X_train_processed = global_preprocessor.transform(X_train_hpo)
    X_val_processed = global_preprocessor.transform(X_val_hpo)
    
    # Train XGBoost with current hyperparameters
    model = xgb.XGBRegressor(
        objective='reg:squarederror',
        random_state=42,
        n_jobs=-1,
        n_estimators=int(params["n_estimators"]),
        max_depth=int(params["max_depth"]),
        learning_rate=params["learning_rate"],
        min_child_weight=int(params["min_child_weight"]),
        subsample=params["subsample"],
        colsample_bytree=params["colsample_bytree"],
        reg_alpha=params["reg_alpha"],
        reg_lambda=params["reg_lambda"]
    )
    
    model.fit(X_train_processed, y_train_hpo)
    
    # Evaluate on validation set
    val_pred = model.predict(X_val_processed)
    rmse = np.sqrt(mean_squared_error(y_val_hpo, val_pred))
    mae = mean_absolute_error(y_val_hpo, val_pred)
    r2 = r2_score(y_val_hpo, val_pred)
    
    # Report metrics back to HPO
    context.report(
        metrics={"rmse": rmse, "mae": mae, "r2": r2},
        model=model
    )

# Configure HPO with Bayesian optimization
tuner_config = tune.TunerConfig(
    metric="rmse",
    mode="min",
    search_alg=BayesOpt(
        utility_kwargs={"kind": "ucb", "kappa": 2.5, "xi": 0.0}
    ),
    num_trials=20,
    max_concurrent_trials=4  # Parallel trials
)

# Create dataset map for HPO
dataset_map = {
    "train": train_connector,
    "val": val_connector
}

print("Starting distributed hyperparameter optimization...")
print(f"  Search space: {len(search_space)} hyperparameters")
print(f"  Total trials: {tuner_config.num_trials}")
print(f"  Concurrent trials: {tuner_config.max_concurrent_trials}")
print(f"  Search algorithm: Bayesian Optimization")

# Run HPO
tuner = tune.Tuner(train_func, search_space, tuner_config)
tuner_results = tuner.run(dataset_map=dataset_map)

print("\n✓ Hyperparameter optimization completed!")

# Extract best hyperparameters - they are stored with "config/" prefix
best_result = tuner_results.best_result
print(f"\nBest hyperparameters:")
for col in best_result.index:
    if col.startswith('config/'):
        param_name = col.replace('config/', '')
        value = best_result[col]
        print(f"  {param_name}: {value:.4f}" if isinstance(value, float) else f"  {param_name}: {value}")

print(f"\nBest validation metrics:")
print(f"  RMSE: ${best_result['rmse']:.2f}")
print(f"  MAE: ${best_result['mae']:.2f}")
print(f"  R²: {best_result['r2']:.4f}")

# Train final model with best hyperparameters
best_model = xgb.XGBRegressor(
    objective='reg:squarederror',
    random_state=42,
    n_jobs=-1,
    n_estimators=int(best_result['config/n_estimators']),
    max_depth=int(best_result['config/max_depth']),
    learning_rate=best_result['config/learning_rate'],
    min_child_weight=int(best_result['config/min_child_weight']),
    subsample=best_result['config/subsample'],
    colsample_bytree=best_result['config/colsample_bytree'],
    reg_alpha=best_result['config/reg_alpha'],
    reg_lambda=best_result['config/reg_lambda']
)

best_model.fit(X_train_processed, y_train)
print("\n✓ Final model trained with best hyperparameters")

## Model Evaluation

In [None]:
y_train_pred = best_model.predict(X_train_processed)
y_val_pred = best_model.predict(X_val_processed)
y_test_pred = best_model.predict(X_test_processed)

def evaluate_model(y_true, y_pred, dataset_name):
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    
    print(f"\n{dataset_name} Metrics:")
    print(f"  RMSE: ${rmse:.2f}")
    print(f"  MAE: ${mae:.2f}")
    print(f"  R²: {r2:.4f}")
    
    return {'rmse': rmse, 'mae': mae, 'r2': r2}

train_metrics = evaluate_model(y_train, y_train_pred, "Train")
val_metrics = evaluate_model(y_val, y_val_pred, "Validation")
test_metrics = evaluate_model(y_test, y_test_pred, "Test")

if train_metrics['r2'] - test_metrics['r2'] > 0.1:
    print("\n⚠️ Warning: Significant gap between train and test R² suggests potential overfitting")
else:
    print("\n✓ Model shows good generalization")

## Feature Importance Analysis

In [None]:
feature_names = (
    numerical_features + 
    list(preprocessor.named_transformers_['cat'].get_feature_names_out(categorical_features))
)

feature_importance_df = pd.DataFrame({
    'feature': feature_names,
    'importance': best_model.feature_importances_
}).sort_values('importance', ascending=False)

print("\nTop 15 Most Important Features:")
print(feature_importance_df.head(15))

plt.figure(figsize=(10, 6))
sns.barplot(data=feature_importance_df.head(15), x='importance', y='feature')
plt.title('Top 15 Feature Importances - Cold Start CLV Model')
plt.xlabel('Importance')
plt.tight_layout()
plt.savefig('coldstart_feature_importance.png')
plt.show()

## Create Full Pipeline for Deployment

In [None]:
full_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('model', best_model)
])

full_pipeline.fit(X_train, y_train)

pipeline_test_pred = full_pipeline.predict(X_test)
pipeline_test_rmse = np.sqrt(mean_squared_error(y_test, pipeline_test_pred))
print(f"Pipeline test RMSE: ${pipeline_test_rmse:.2f}")

## Deploy to Snowflake Model Registry

**Deployment Strategy**:
- Use Snowflake Model Registry for versioning and management
- Deploy to both WAREHOUSE (SQL inference) and SPCS (Python inference)
- Register with sample input for schema inference
- Include metrics for tracking

**Target Platforms**:
- **WAREHOUSE**: Enables SQL-based inference (e.g., `SELECT COLDSTART_CLV_MODEL!PREDICT(...)`)
- **SNOWPARK_CONTAINER_SERVICES**: Enables Python inference in containers



In [None]:
# Initialize Model Registry with active session
registry = Registry(session=session)

# Prepare sample input for schema inference
sample_input = X_train.head(100)

# Log model to registry with both target platforms
model_version = registry.log_model(
    model=full_pipeline,
    model_name="COLDSTART_CLV_MODEL",
    version_name="V1",
    comment="Cold start CLV model with HPO - supports both Warehouse and SPCS inference",
    metrics={
        "test_rmse": float(test_metrics['rmse']),
        "test_mae": float(test_metrics['mae']),
        "test_r2": float(test_metrics['r2']),
        "train_r2": float(train_metrics['r2']),
        "hpo_best_rmse": float(tuner_results.best_result['rmse'])
    },
    sample_input_data=sample_input,
    task=task.Task.TABULAR_REGRESSION,
    target_platforms=[
        TargetPlatform.WAREHOUSE,                    # SQL inference
        TargetPlatform.SNOWPARK_CONTAINER_SERVICES   # Python inference in containers
    ]
)

print(f"\n✓ Model registered successfully!")
print(f"  Database: {DATABASE}")
print(f"  Schema: {SCHEMA}")
print(f"  Model: COLDSTART_CLV_MODEL")
print(f"  Version: V1")
print(f"  Target Platforms:")
print(f"    - WAREHOUSE (SQL inference)")
print(f"    - SNOWPARK_CONTAINER_SERVICES (Python inference)")
print(f"  HPO: Bayesian optimization with {tuner_config.num_trials} trials")

## Create Staging Table and Dynamic Table for Continuous Inference

**Purpose**: Enable real-time CLV predictions for new customer signups

In [None]:
create_staging_table = """
CREATE OR REPLACE TABLE COLDSTART_CUSTOMERS_STAGING (
    customer_id INT,
    signup_date TIMESTAMP,
    acquisition_channel VARCHAR,
    acquisition_source VARCHAR,
    device_type VARCHAR,
    age_group VARCHAR,
    region VARCHAR,
    area_type VARCHAR,
    days_to_first_purchase FLOAT,
    first_purchase_amount FLOAT,
    first_purchase_category VARCHAR,
    website_visits_30d INT,
    email_opens_30d INT,
    email_clicks_30d INT,
    items_viewed_30d INT,
    cart_adds_30d INT
)
"""

session.sql(create_staging_table).collect()
print("✓ Staging table created: COLDSTART_CUSTOMERS_STAGING")

In [None]:
create_dynamic_table = """
CREATE OR REPLACE DYNAMIC TABLE COLDSTART_CLV_PREDICTIONS
    TARGET_LAG = '5 minutes'
    WAREHOUSE = COMPUTE_WH
    REFRESH_MODE = AUTO
AS
SELECT 
    customer_id,
    signup_date,
    acquisition_channel,
    COLDSTART_CLV_MODEL!PREDICT(
        acquisition_channel,
        acquisition_source,
        device_type,
        age_group,
        region,
        area_type,
        first_purchase_category,
        COALESCE(days_to_first_purchase, 0),
        COALESCE(first_purchase_amount, 0),
        website_visits_30d,
        email_opens_30d,
        email_clicks_30d,
        items_viewed_30d,
        cart_adds_30d
    ) AS predicted_12m_ltv,
    CURRENT_TIMESTAMP() AS prediction_timestamp
FROM COLDSTART_CUSTOMERS_STAGING
"""

try:
    session.sql(create_dynamic_table).collect()
    print("✓ Dynamic table created: COLDSTART_CLV_PREDICTIONS")
    print("  - Refreshes every 5 minutes")
    print("  - Automatically scores new customers as they sign up")
except Exception as e:
    print(f"Note: Dynamic table creation may require adjusting warehouse name or schema permissions")
    print(f"Error: {str(e)}")

## Test Inference with Sample Data

In [None]:
test_sample = X_test.head(5)

test_predictions = full_pipeline.predict(test_sample)

print("\nSample Predictions:")
for i, pred in enumerate(test_predictions):
    actual = y_test.iloc[i]
    print(f"  Customer {i+1}: Predicted ${pred:.2f}, Actual ${actual:.2f}, Diff ${abs(pred-actual):.2f}")

## Summary

This notebook accomplished:

1. ✓ **Feature Engineering**: Created meaningful features from cold start signals
2. ✓ **Model Training**: XGBoost with comprehensive hyperparameter tuning
3. ✓ **Overfitting Prevention**: Cross-validation, regularization, and temporal validation
4. ✓ **Model Evaluation**: RMSE, MAE, R² metrics across train/val/test
5. ✓ **Deployment**: Registered to Snowflake Model Registry
6. ✓ **Continuous Inference**: Dynamic table for real-time predictions

**Next Steps**:
- Monitor model performance on production data
- Set up retraining pipeline for model drift
- A/B test predictions in marketing campaigns