# Snowflake ML Workflow with AutoGluon

## End-to-End ML Pipeline for Classification

**Author:** Randal Scott King  
**Date:** December 2024

This notebook demonstrates a complete ML workflow in Snowflake including:
- Automated feature selection using Pearson correlation
- Feature Store integration with versioning
- AutoGluon AutoML training (LightGBM, XGBoost, CatBoost, RF, Neural Networks)
- Model evaluation and selection (F1, Recall, Precision)
- Model Registry for versioning and lineage
- Optional deployment to Snowflake Container Services

## 1. Setup and Imports

Import all necessary libraries for the ML workflow.

In [None]:
# Import Snowflake libraries
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
from snowflake.ml.feature_store import (
    FeatureStore,
    FeatureView,
    Entity,
    CreationMode
)
from snowflake.ml.registry import Registry
from snowflake.ml.modeling.preprocessing import StandardScaler
from snowflake.snowpark import Session

# Import ML libraries
import pandas as pd
import numpy as np
from autogluon.tabular import TabularPredictor
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, recall_score, precision_score, accuracy_score

# Import utilities
from typing import List, Dict, Tuple, Optional
import json
from datetime import datetime

print("✓ All libraries imported successfully")

## 2. Configuration

Set up your configuration parameters for the workflow.

In [None]:
# Workflow Configuration
CONFIG = {
    # Data Configuration
    'source_table': 'ML_DATA.PUBLIC.CUSTOMER_FEATURES',
    'target_column': 'CHURN_FLAG',
    'entity_column': 'CUSTOMER_ID',
    
    # Feature Engineering
    'correlation_threshold': 0.95,
    'feature_store_db': 'ML_FEATURES',
    'feature_view_name': 'customer_churn_features',
    
    # Model Training
    'model_registry_db': 'ML_MODELS',
    'model_name': 'customer_churn_classifier',
    'test_size': 0.2,
    'random_state': 42,
    
    # AutoGluon Settings
    'time_limit': 3600,  # 1 hour
    'presets': 'best_quality',
    'num_bag_folds': 5,
    'num_stack_levels': 1,
    'eval_metric': 'f1',
    
    # Container Services (optional)
    'deploy_to_container': False,
    'compute_pool': 'ML_COMPUTE_POOL',
    'service_name': 'churn_prediction_service'
}

print("Configuration:")
print(json.dumps(CONFIG, indent=2))

## 3. Initialize Snowflake Session

Get the current Snowflake session (automatically available in Snowflake notebooks).

In [None]:
# In Snowflake notebooks, session is automatically available
# If running locally, uncomment and configure:
# session = Session.builder.configs({
#     "account": "your_account",
#     "user": "your_user",
#     "password": "your_password",
#     "role": "ML_ENGINEER",
#     "warehouse": "ML_WAREHOUSE",
#     "database": "ML_DATA",
#     "schema": "PUBLIC"
# }).create()

# Use the existing session in Snowflake notebook
print(f"Connected to Snowflake")
print(f"Current role: {session.get_current_role()}")
print(f"Current warehouse: {session.get_current_warehouse()}")
print(f"Current database: {session.get_current_database()}")

## 4. Initialize Feature Store and Model Registry

Set up connections to Snowflake's Feature Store and Model Registry.

In [None]:
# Initialize Feature Store
feature_store = FeatureStore(
    session=session,
    database=CONFIG['feature_store_db'],
    name="ML_FEATURE_STORE",
    default_warehouse=session.get_current_warehouse(),
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST
)

print("✓ Feature Store initialized")

# Initialize Model Registry
registry = Registry(
    session=session,
    database_name=CONFIG['model_registry_db'],
    schema_name="MODEL_REGISTRY"
)

print("✓ Model Registry initialized")

## 5. Load Data

Load the source data from Snowflake table.

In [None]:
# Load data from source table
print(f"Loading data from {CONFIG['source_table']}...")
df = session.table(CONFIG['source_table'])

# Display data info
row_count = df.count()
columns = df.columns

print(f"✓ Loaded {row_count:,} rows")
print(f"✓ Columns: {len(columns)}")
print(f"\nColumn names: {', '.join(columns[:10])}...")

# Show sample data
df.limit(5).show()

## 6. Feature Selection by Correlation

Automatically select features by removing highly correlated pairs using Pearson correlation.

In [None]:
def select_features_by_correlation(
    df: snowpark.DataFrame,
    target_column: str,
    threshold: float = 0.95
) -> Tuple[List[str], Dict]:
    """
    Select features by removing highly correlated columns.
    """
    print("Calculating feature correlations...")
    
    # Convert to pandas for correlation analysis
    pdf = df.to_pandas()
    numeric_cols = pdf.select_dtypes(include=[np.number]).columns.tolist()
    
    # Exclude target column
    feature_cols = [col for col in numeric_cols if col != target_column]
    
    # Calculate correlation matrix
    corr_matrix = pdf[feature_cols].corr().abs()
    
    # Find features to drop
    upper_triangle = np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
    upper_corr = corr_matrix.where(upper_triangle)
    
    to_drop = []
    dropped_pairs = []
    
    for column in upper_corr.columns:
        correlated = upper_corr[column][upper_corr[column] > threshold]
        if not correlated.empty:
            for idx, corr_value in correlated.items():
                if column not in to_drop:
                    to_drop.append(column)
                    dropped_pairs.append({
                        'kept_feature': idx,
                        'dropped_feature': column,
                        'correlation': float(corr_value)
                    })
    
    selected_features = [col for col in feature_cols if col not in to_drop]
    
    report = {
        'total_numeric_features': len(feature_cols),
        'selected_features': len(selected_features),
        'dropped_features': len(to_drop),
        'correlation_threshold': threshold,
        'dropped_pairs': dropped_pairs,
        'selected_feature_list': selected_features
    }
    
    return selected_features, report

# Execute feature selection
selected_features, correlation_report = select_features_by_correlation(
    df=df,
    target_column=CONFIG['target_column'],
    threshold=CONFIG['correlation_threshold']
)

print(f"\n✓ Feature selection complete:")
print(f"  - Original features: {correlation_report['total_numeric_features']}")
print(f"  - Selected features: {correlation_report['selected_features']}")
print(f"  - Dropped features: {correlation_report['dropped_features']}")

# Show dropped pairs
if correlation_report['dropped_pairs']:
    print(f"\nDropped feature pairs (showing first 5):")
    for pair in correlation_report['dropped_pairs'][:5]:
        print(f"  - Kept: {pair['kept_feature']}, Dropped: {pair['dropped_feature']}, Corr: {pair['correlation']:.3f}")

## 7. Create Feature View in Feature Store

Register selected features in Snowflake Feature Store for versioning and reuse.

In [None]:
def create_feature_view(
    df: snowpark.DataFrame,
    selected_features: List[str],
    entity_column: str,
    target_column: str,
    feature_view_name: str
) -> FeatureView:
    """
    Create and register a Feature View in the Feature Store.
    """
    print(f"Creating feature view '{feature_view_name}'...")
    
    # Define entity
    entity = Entity(name=entity_column, join_keys=[entity_column])
    
    # Select features and entity
    feature_cols = selected_features + [entity_column, target_column]
    feature_df = df.select(feature_cols)
    
    # Create feature view
    feature_view = FeatureView(
        name=feature_view_name,
        entities=[entity],
        feature_df=feature_df,
        refresh_freq="1 day",
        desc=f"Curated features for ML model training - Created {datetime.now()}"
    )
    
    # Register in feature store
    feature_store.register_feature_view(
        feature_view=feature_view,
        version="1.0",
        block=True
    )
    
    return feature_view

# Create feature view
feature_view = create_feature_view(
    df=df,
    selected_features=selected_features,
    entity_column=CONFIG['entity_column'],
    target_column=CONFIG['target_column'],
    feature_view_name=CONFIG['feature_view_name']
)

print(f"✓ Feature view '{CONFIG['feature_view_name']}' registered successfully")

## 8. Load Features for Training

Load the features from Feature Store and prepare for model training.

In [None]:
def load_features_for_training(
    feature_view_name: str,
    version: str = "1.0"
) -> pd.DataFrame:
    """
    Load features from Feature Store for model training.
    """
    print(f"Loading features from Feature Store: {feature_view_name} v{version}")
    
    # Load as pandas DataFrame
    spine_df = session.sql(f"""
        SELECT * FROM {CONFIG['feature_store_db']}.{feature_view_name}
    """)
    
    pdf = spine_df.to_pandas()
    print(f"✓ Loaded {len(pdf)} rows with {len(pdf.columns)} columns")
    
    return pdf

# Load training data
training_data = load_features_for_training(
    feature_view_name=CONFIG['feature_view_name']
)

# Display data info
print(f"\nData shape: {training_data.shape}")
print(f"Target distribution:")
print(training_data[CONFIG['target_column']].value_counts())

## 9. Split Data into Train and Test Sets

Create stratified train/test split for model training and evaluation.

In [None]:
# Split data
train_df, test_df = train_test_split(
    training_data,
    test_size=CONFIG['test_size'],
    random_state=CONFIG['random_state'],
    stratify=training_data[CONFIG['target_column']]
)

print(f"✓ Data split complete:")
print(f"  - Training set: {len(train_df):,} rows ({len(train_df)/len(training_data)*100:.1f}%)")
print(f"  - Test set: {len(test_df):,} rows ({len(test_df)/len(training_data)*100:.1f}%)")

print(f"\nTraining set target distribution:")
print(train_df[CONFIG['target_column']].value_counts())

print(f"\nTest set target distribution:")
print(test_df[CONFIG['target_column']].value_counts())

## 10. Train Models with AutoGluon

Train multiple classification models using AutoGluon AutoML.

In [None]:
def train_autogluon_models(
    train_data: pd.DataFrame,
    target_column: str,
    time_limit: int = 3600,
    presets: str = "best_quality",
    eval_metric: str = "f1",
    num_bag_folds: int = 5,
    num_stack_levels: int = 1
) -> TabularPredictor:
    """
    Train multiple classification models using AutoGluon.
    """
    print("Starting AutoGluon training...")
    print(f"  - Time limit: {time_limit}s ({time_limit/60:.1f} minutes)")
    print(f"  - Presets: {presets}")
    print(f"  - Bag folds: {num_bag_folds}")
    print(f"  - Stack levels: {num_stack_levels}")
    
    # Configure AutoGluon
    predictor = TabularPredictor(
        label=target_column,
        problem_type='binary',
        eval_metric=eval_metric,
        path='./autogluon_models'
    )
    
    # Train models
    predictor.fit(
        train_data=train_data,
        time_limit=time_limit,
        presets=presets,
        num_bag_folds=num_bag_folds,
        num_stack_levels=num_stack_levels,
        verbosity=2
    )
    
    print("\n✓ AutoGluon training complete!")
    return predictor

# Train models
predictor = train_autogluon_models(
    train_data=train_df,
    target_column=CONFIG['target_column'],
    time_limit=CONFIG['time_limit'],
    presets=CONFIG['presets'],
    eval_metric=CONFIG['eval_metric'],
    num_bag_folds=CONFIG['num_bag_folds'],
    num_stack_levels=CONFIG['num_stack_levels']
)

## 11. Evaluate Models and Select Best

Evaluate all trained models and select the best based on F1 Score, Recall, and Precision.

In [None]:
def evaluate_and_select_best_model(
    predictor: TabularPredictor,
    test_data: pd.DataFrame,
    target_column: str
) -> Dict:
    """
    Evaluate all trained models and select the best one.
    """
    print("Evaluating models...")
    
    # Get leaderboard
    leaderboard = predictor.leaderboard(data=test_data, silent=True)
    
    # Get detailed metrics for each model
    model_metrics = []
    
    for model_name in leaderboard['model']:
        try:
            # Get predictions
            y_true = test_data[target_column]
            y_pred = predictor.predict(test_data, model=model_name)
            
            # Calculate metrics
            f1 = f1_score(y_true, y_pred, average='weighted')
            recall = recall_score(y_true, y_pred, average='weighted')
            precision = precision_score(y_true, y_pred, average='weighted')
            accuracy = accuracy_score(y_true, y_pred)
            
            model_metrics.append({
                'model_name': model_name,
                'f1_score': f1,
                'recall': recall,
                'precision': precision,
                'accuracy': accuracy,
                'score_val': leaderboard[leaderboard['model'] == model_name]['score_val'].values[0]
            })
        except Exception as e:
            print(f"Warning: Could not evaluate {model_name}: {e}")
    
    # Convert to DataFrame
    metrics_df = pd.DataFrame(model_metrics)
    
    # Select best model based on F1, then Recall, then Precision
    metrics_df = metrics_df.sort_values(
        by=['f1_score', 'recall', 'precision'],
        ascending=False
    )
    
    best_model = metrics_df.iloc[0].to_dict()
    
    return {
        'best_model': best_model,
        'all_models': metrics_df.to_dict('records'),
        'leaderboard': leaderboard
    }

# Evaluate models
evaluation_results = evaluate_and_select_best_model(
    predictor=predictor,
    test_data=test_df,
    target_column=CONFIG['target_column']
)

# Display results
best_model = evaluation_results['best_model']
print(f"\n{'='*60}")
print(f"BEST MODEL SELECTED: {best_model['model_name']}")
print(f"{'='*60}")
print(f"F1 Score:  {best_model['f1_score']:.4f}")
print(f"Recall:    {best_model['recall']:.4f}")
print(f"Precision: {best_model['precision']:.4f}")
print(f"Accuracy:  {best_model['accuracy']:.4f}")
print(f"{'='*60}")

# Show all models
print(f"\nAll Models Performance:")
metrics_df = pd.DataFrame(evaluation_results['all_models'])
print(metrics_df[['model_name', 'f1_score', 'recall', 'precision', 'accuracy']].to_string(index=False))

## 12. Register Model to Model Registry

Register the best model to Snowflake Model Registry with metadata.

In [None]:
def register_model_to_warehouse(
    predictor: TabularPredictor,
    model_name: str,
    best_model_name: str,
    metrics: Dict,
    feature_list: List[str],
    target_column: str
) -> str:
    """
    Register the best model to Snowflake Model Registry.
    """
    print(f"Registering model '{model_name}' to Snowflake Model Registry...")
    
    # Prepare model metadata
    metadata = {
        'model_type': 'AutoGluon TabularPredictor',
        'best_model': best_model_name,
        'f1_score': metrics['best_model']['f1_score'],
        'recall': metrics['best_model']['recall'],
        'precision': metrics['best_model']['precision'],
        'accuracy': metrics['best_model']['accuracy'],
        'features': feature_list,
        'target_column': target_column,
        'training_timestamp': datetime.now().isoformat()
    }
    
    # Register model
    model_version = registry.log_model(
        model_name=model_name,
        model_version='v1',
        model=predictor,
        conda_dependencies=['autogluon.tabular'],
        metadata=metadata,
        comment=f"Best model: {best_model_name} with F1={metrics['best_model']['f1_score']:.4f}"
    )
    
    return model_version

# Register model
model_version = register_model_to_warehouse(
    predictor=predictor,
    model_name=CONFIG['model_name'],
    best_model_name=evaluation_results['best_model']['model_name'],
    metrics=evaluation_results,
    feature_list=selected_features,
    target_column=CONFIG['target_column']
)

print(f"✓ Model registered successfully: {model_version}")

## 13. Workflow Results Summary

Display complete workflow results and metrics.

In [None]:
# Compile final results
final_results = {
    'workflow_status': 'SUCCESS',
    'timestamp': datetime.now().isoformat(),
    'data_info': {
        'source_table': CONFIG['source_table'],
        'total_rows': len(training_data),
        'train_rows': len(train_df),
        'test_rows': len(test_df)
    },
    'feature_selection': {
        'total_features': correlation_report['total_numeric_features'],
        'selected_features': correlation_report['selected_features'],
        'dropped_features': correlation_report['dropped_features'],
        'correlation_threshold': correlation_report['correlation_threshold']
    },
    'feature_view': CONFIG['feature_view_name'],
    'model_info': {
        'model_name': CONFIG['model_name'],
        'model_version': model_version,
        'best_model': evaluation_results['best_model']['model_name']
    },
    'performance_metrics': {
        'f1_score': evaluation_results['best_model']['f1_score'],
        'recall': evaluation_results['best_model']['recall'],
        'precision': evaluation_results['best_model']['precision'],
        'accuracy': evaluation_results['best_model']['accuracy']
    },
    'models_evaluated': len(evaluation_results['all_models'])
}

# Display results
print("\n" + "="*80)
print("WORKFLOW COMPLETED SUCCESSFULLY")
print("="*80)
print(json.dumps(final_results, indent=2))
print("="*80)

## 14. Optional: Deploy to Container Services

Deploy the model to Snowflake Container Services for production inference.

In [None]:
if CONFIG['deploy_to_container']:
    def deploy_to_container_services(
        model_name: str,
        model_version: str,
        compute_pool: str,
        service_name: str
    ) -> str:
        """
        Deploy model to Snowflake Container Services.
        """
        print(f"Deploying model to Container Services: {service_name}")
        
        # Create service specification
        service_spec = f"""
        CREATE SERVICE IF NOT EXISTS {service_name}
        IN COMPUTE POOL {compute_pool}
        FROM @ML_MODELS.MODEL_REGISTRY.{model_name}/{model_version}
        SPECIFICATION = $$
        spec:
          containers:
          - name: model-inference
            image: /ml_models/model_registry/autogluon:latest
            env:
              MODEL_NAME: {model_name}
              MODEL_VERSION: {model_version}
          endpoints:
          - name: predict
            port: 8080
            protocol: http
        $$
        """
        
        # Execute service creation
        session.sql(service_spec).collect()
        
        endpoint = f"https://{service_name}.snowflakecomputing.app/predict"
        print(f"✓ Service deployed successfully!")
        print(f"✓ Endpoint: {endpoint}")
        
        return endpoint
    
    # Deploy model
    service_endpoint = deploy_to_container_services(
        model_name=CONFIG['model_name'],
        model_version=model_version,
        compute_pool=CONFIG['compute_pool'],
        service_name=CONFIG['service_name']
    )
    
    final_results['service_endpoint'] = service_endpoint
else:
    print("Container Services deployment is disabled in configuration.")
    print("Set CONFIG['deploy_to_container'] = True to enable deployment.")

## 15. Make Predictions (Optional)

Use the trained model to make predictions on new data.

In [None]:
# Make predictions on test set (sample)
sample_data = test_df.head(10).drop(columns=[CONFIG['target_column']])

predictions = predictor.predict(sample_data)
prediction_probs = predictor.predict_proba(sample_data)

# Display predictions
results_df = sample_data.copy()
results_df['Prediction'] = predictions
results_df['Probability_0'] = prediction_probs[0]
results_df['Probability_1'] = prediction_probs[1]

print("Sample Predictions:")
print(results_df[[CONFIG['entity_column'], 'Prediction', 'Probability_0', 'Probability_1']].to_string(index=False))

## Conclusion

This notebook demonstrated a complete end-to-end ML workflow in Snowflake:

1. **Data Loading**: Loaded data from Snowflake table
2. **Feature Selection**: Automated correlation-based feature selection
3. **Feature Store**: Registered features for versioning and reuse
4. **Model Training**: Trained multiple models with AutoGluon
5. **Model Evaluation**: Selected best model based on metrics
6. **Model Registry**: Registered model with metadata for tracking
7. **Deployment**: Optional deployment to Container Services

### Next Steps:
- Monitor model performance over time
- Retrain model with new data periodically
- Experiment with different feature engineering techniques
- Tune AutoGluon hyperparameters for better performance
- Set up automated retraining pipelines

### Resources:
- [Snowflake ML Documentation](https://docs.snowflake.com/en/developer-guide/snowpark-ml/index)
- [AutoGluon Documentation](https://auto.gluon.ai/)
- [GitHub Repository](https://github.com/randalscottking/snowflake-autogluon-classification)