# Mister Car Wash - ML Model Training

This notebook trains three machine learning models for the Mister Car Wash Intelligence Agent:
1. **CHURN_RISK_PREDICTOR**: Predicts if a member will cancel.
2. **EQUIPMENT_FAILURE_PREDICTOR**: Predicts if equipment needs maintenance.
3. **UPSELL_PROPENSITY_SCORER**: Predicts if a member is likely to upgrade.

**Single Source of Truth**: All models train on `V_..._FEATURES` views in `ANALYTICS` schema.

In [None]:
# Import required libraries
from snowflake.snowpark import Session
from snowflake.ml.modeling.ensemble import RandomForestClassifier
from snowflake.ml.modeling.linear_model import LogisticRegression
from snowflake.ml.modeling.preprocessing import OneHotEncoder, StandardScaler
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.registry import Registry
from snowflake.ml.modeling.metrics import accuracy_score
import pandas as pd
import warnings

warnings.filterwarnings('ignore')

print("✅ Libraries imported successfully")

In [None]:
# Get current session
session = Session.builder.getOrCreate()

# Set context
session.use_database("MISTER_CAR_WASH_INTELLIGENCE")
session.use_schema("ANALYTICS")
session.use_warehouse("MISTER_CAR_WASH_WH")

print("✅ Session configured")
print(f"Database: {session.get_current_database()}")
print(f"Schema: {session.get_current_schema()}")
print(f"Warehouse: {session.get_current_warehouse()}")

In [None]:
# Initialize Model Registry
registry = Registry(
    session=session,
    database_name="MISTER_CAR_WASH_INTELLIGENCE",
    schema_name="ANALYTICS"
)

print("✅ Model Registry initialized")

## Model 1: Churn Risk Predictor

**Objective**: Predict likelihood of member churn (cancellation).
**Features**: LTV_SCORE, TENURE_DAYS, DAYS_SINCE_LAST_WASH, TOTAL_WASHES

In [None]:
# Load churn feature data
churn_df = session.table("MISTER_CAR_WASH_INTELLIGENCE.ANALYTICS.V_CHURN_RISK_FEATURES")

print(f"✅ Loaded {churn_df.count()} records for churn prediction")
churn_df.show(5)

In [None]:
# Split data
train_churn, test_churn = churn_df.random_split([0.8, 0.2], seed=42)

# Drop ID columns not needed for training
train_churn = train_churn.drop("MEMBER_ID", "STATUS")
test_churn = test_churn.drop("MEMBER_ID", "STATUS")

print(f"Training set: {train_churn.count()} records")
print(f"Test set: {test_churn.count()} records")

In [None]:
# Create pipeline for Churn Prediction
churn_pipeline = Pipeline([
    ("Scaler", StandardScaler(
        input_cols=["LTV_SCORE", "TENURE_DAYS", "DAYS_SINCE_LAST_WASH", "TOTAL_WASHES"],
        output_cols=["LTV_SCALED", "TENURE_SCALED", "RECENCY_SCALED", "FREQ_SCALED"]
    )),
    ("Classifier", LogisticRegression(
        label_cols=["IS_CHURNED"],
        output_cols=["PREDICTED_CHURN"],
        max_iter=100
    ))
])

print("✅ Churn prediction pipeline created")

In [None]:
# Train model
print("Training churn prediction model...")
churn_pipeline.fit(train_churn)
print("✅ Churn prediction model trained")

In [None]:
# Evaluate model
test_predictions = churn_pipeline.predict(test_churn)
test_results = test_predictions.select("IS_CHURNED", "PREDICTED_CHURN").to_pandas()

accuracy = accuracy_score(test_results['IS_CHURNED'], test_results['PREDICTED_CHURN'])
print(f"Test Accuracy: {accuracy:.3f}")

In [None]:
# Register model
sample_data = train_churn.drop("IS_CHURNED").limit(100)

registry.log_model(
    model=churn_pipeline,
    model_name="CHURN_RISK_PREDICTOR",
    target_platforms=['WAREHOUSE'],
    sample_input_data=sample_data,
    comment="Predicts member churn risk"
)

print("✅ CHURN_RISK_PREDICTOR registered in Model Registry")

## Model 2: Equipment Failure Predictor

**Objective**: Predict equipment failure risk.
**Features**: DAYS_SINCE_LAST_SERVICE, LAST_SERVICE_COST, SEVERITY_SCORE

In [None]:
# Load equipment feature data
equip_df = session.table("MISTER_CAR_WASH_INTELLIGENCE.ANALYTICS.V_MAINTENANCE_RISK_FEATURES")

print(f"✅ Loaded {equip_df.count()} records for equipment prediction")
equip_df.show(5)

In [None]:
# Split data
train_equip, test_equip = equip_df.random_split([0.8, 0.2], seed=42)

# Drop ID columns
train_equip = train_equip.drop("MAINTENANCE_ID", "EQUIPMENT_TYPE")
test_equip = test_equip.drop("MAINTENANCE_ID", "EQUIPMENT_TYPE")

print(f"Training set: {train_equip.count()} records")
print(f"Test set: {test_equip.count()} records")

In [None]:
# Create pipeline for Equipment Failure
equip_pipeline = Pipeline([
    ("Scaler", StandardScaler(
        input_cols=["DAYS_SINCE_LAST_SERVICE", "LAST_SERVICE_COST", "SEVERITY_SCORE"],
        output_cols=["DAYS_SCALED", "COST_SCALED", "SEVERITY_SCALED"]
    )),
    ("Classifier", RandomForestClassifier(
        label_cols=["FAILURE_RISK_LABEL"],
        output_cols=["PREDICTED_FAILURE"],
        n_estimators=10,
        max_depth=5,
        random_state=42
    ))
])

print("✅ Equipment failure pipeline created")

In [None]:
# Train model
print("Training equipment failure model...")
equip_pipeline.fit(train_equip)
print("✅ Equipment failure model trained")

In [None]:
# Evaluate model
test_predictions = equip_pipeline.predict(test_equip)
test_results = test_predictions.select("FAILURE_RISK_LABEL", "PREDICTED_FAILURE").to_pandas()

accuracy = accuracy_score(test_results['FAILURE_RISK_LABEL'], test_results['PREDICTED_FAILURE'])
print(f"Test Accuracy: {accuracy:.3f}")

In [None]:
# Register model
sample_data = train_equip.drop("FAILURE_RISK_LABEL").limit(100)

registry.log_model(
    model=equip_pipeline,
    model_name="EQUIPMENT_FAILURE_PREDICTOR",
    target_platforms=['WAREHOUSE'],
    sample_input_data=sample_data,
    comment="Predicts equipment failure risk"
)

print("✅ EQUIPMENT_FAILURE_PREDICTOR registered in Model Registry")

## Model 3: Upsell Propensity Scorer

**Objective**: Predict likelihood of membership upgrade.
**Features**: LTV_SCORE, VISIT_COUNT, AVG_RATING

In [None]:
# Load upsell feature data
upsell_df = session.table("MISTER_CAR_WASH_INTELLIGENCE.ANALYTICS.V_UPSELL_FEATURES")

print(f"✅ Loaded {upsell_df.count()} records for upsell prediction")
upsell_df.show(5)

In [None]:
# Split data
train_upsell, test_upsell = upsell_df.random_split([0.8, 0.2], seed=42)

# Drop ID columns
train_upsell = train_upsell.drop("MEMBER_ID", "MEMBERSHIP_TIER")
test_upsell = test_upsell.drop("MEMBER_ID", "MEMBERSHIP_TIER")

print(f"Training set: {train_upsell.count()} records")
print(f"Test set: {test_upsell.count()} records")

In [None]:
# Create pipeline for Upsell Propensity
upsell_pipeline = Pipeline([
    ("Scaler", StandardScaler(
        input_cols=["LTV_SCORE", "VISIT_COUNT", "AVG_RATING"],
        output_cols=["LTV_SCALED", "VISITS_SCALED", "RATING_SCALED"]
    )),
    ("Classifier", LogisticRegression(
        label_cols=["UPSELL_LABEL"],
        output_cols=["PREDICTED_UPSELL"],
        max_iter=100
    ))
])

print("✅ Upsell pipeline created")

In [None]:
# Train model
print("Training upsell model...")
upsell_pipeline.fit(train_upsell)
print("✅ Upsell model trained")

In [None]:
# Evaluate model
test_predictions = upsell_pipeline.predict(test_upsell)
test_results = test_predictions.select("UPSELL_LABEL", "PREDICTED_UPSELL").to_pandas()

accuracy = accuracy_score(test_results['UPSELL_LABEL'], test_results['PREDICTED_UPSELL'])
print(f"Test Accuracy: {accuracy:.3f}")

In [None]:
# Register model
sample_data = train_upsell.drop("UPSELL_LABEL").limit(100)

registry.log_model(
    model=upsell_pipeline,
    model_name="UPSELL_PROPENSITY_SCORER",
    target_platforms=['WAREHOUSE'],
    sample_input_data=sample_data,
    comment="Predicts upsell propensity"
)

print("✅ UPSELL_PROPENSITY_SCORER registered in Model Registry")

In [None]:
# List all registered models
models = session.sql("SHOW MODELS IN SCHEMA ANALYTICS").collect()

print("\n" + "="*80)
print("REGISTERED MODELS")
print("="*80)
for model in models:
    print(f"✅ {model['name']}")