# Blue-Green Model Deployments for Zero-Downtime Updates

This notebook demonstrates how to implement blue-green deployment patterns in Snowflake for ML models, enabling:
- Zero-downtime model updates
- Safe rollback mechanisms  
- A/B testing between model versions
- Production-grade MLOps workflows

## Why Blue-Green Deployments?

Traditional model deployments often require downtime or risk serving inconsistent predictions. Blue-green deployments solve this by:
1. **Blue Environment**: Current production model serving live traffic
2. **Green Environment**: New model version being deployed and tested
3. **Instant Switch**: Traffic routing changes instantly between environments
4. **Quick Rollback**: Immediate return to previous version if issues arise


## Setup and Prerequisites


In [1]:
# Import required libraries
import snowflake.snowpark.functions as F
from snowflake.ml.registry import Registry
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.linear_model import LinearRegression
from snowflake.ml._internal.utils import identifier
from snowflake.ml.modeling.metrics import mean_absolute_percentage_error
import pandas as pd
import time
from datetime import datetime
import json

# Get session
session = get_active_session()
session.sql_simplifier_enabled = True


  from pandas.core import (


ModuleNotFoundError: No module named 'snowflake.ml'

## 1. Setup Blue-Green Infrastructure


In [None]:
# Create blue-green deployment infrastructure using Snowpark for Python
from snowflake.snowpark.types import StructType, StructField, StringType, BooleanType, TimestampType, DecimalType, FloatType, IntegerType, VariantType

# Create deployment management schema
session.sql("CREATE SCHEMA IF NOT EXISTS ML_HOL_DB.DEPLOYMENT_MANAGEMENT").collect()
session.use_schema("ML_HOL_DB.DEPLOYMENT_MANAGEMENT")

# Table creation functions using SQL DDL (which supports DEFAULT values)
def create_deployment_tables():
    """Create deployment tables with proper default values using SQL DDL"""
    
    # Create DEPLOYMENT_CONFIG table with defaults
    session.sql("""
    CREATE OR REPLACE TABLE DEPLOYMENT_CONFIG (
        MODEL_NAME STRING,
        ENVIRONMENT STRING,  -- 'BLUE' or 'GREEN'
        MODEL_VERSION STRING,
        IS_ACTIVE BOOLEAN,
        DEPLOYMENT_TIME TIMESTAMP_LTZ,
        HEALTH_CHECK_STATUS STRING,
        TRAFFIC_PERCENTAGE NUMBER(5,2) DEFAULT 0,
        CREATED_AT TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
    )
    """).collect()
    
    # Create MODEL_PERFORMANCE_METRICS table with defaults
    session.sql("""
    CREATE OR REPLACE TABLE MODEL_PERFORMANCE_METRICS (
        MODEL_NAME STRING,
        ENVIRONMENT STRING,
        MODEL_VERSION STRING,
        METRIC_NAME STRING,
        METRIC_VALUE FLOAT,
        SAMPLE_SIZE NUMBER,
        MEASUREMENT_TIME TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
    )
    """).collect()
    
    # Create PREDICTION_REQUESTS table with defaults
    session.sql("""
    CREATE OR REPLACE TABLE PREDICTION_REQUESTS (
        REQUEST_ID STRING,
        MODEL_NAME STRING,
        ENVIRONMENT STRING,
        MODEL_VERSION STRING,
        INPUT_DATA VARIANT,
        PREDICTION FLOAT,
        RESPONSE_TIME_MS NUMBER,
        REQUEST_TIME TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
    )
    """).collect()

# Create all deployment tables
create_deployment_tables()

# Verify table creation using Snowpark DataFrames
deployment_config_table = session.table("DEPLOYMENT_CONFIG")
performance_metrics_table = session.table("MODEL_PERFORMANCE_METRICS")
prediction_requests_table = session.table("PREDICTION_REQUESTS")

print("✅ Blue-green deployment infrastructure created successfully!")
print(f"📋 Created tables:")
print(f"   • DEPLOYMENT_CONFIG ({len(deployment_config_table.columns)} columns)")
print(f"   • MODEL_PERFORMANCE_METRICS ({len(performance_metrics_table.columns)} columns)")
print(f"   • PREDICTION_REQUESTS ({len(prediction_requests_table.columns)} columns)")
print("🎯 All tables include proper DEFAULT value constraints")


## 2. Prepare Training Data and Models


In [None]:
# Load and prepare the diamonds dataset (from the main ML notebook)
diamonds_df = session.read.options({
    "field_delimiter": ",",
    "field_optionally_enclosed_by": '"',
    "infer_schema": True,
    "parse_header": True
}).csv("@ML_HOL_DB.ML_HOL_SCHEMA.DIAMONDS_ASSETS")

# Clean column names
for colname in diamonds_df.columns:
    if colname == '"table"':
        new_colname = "TABLE_PCT"
    else:
        new_colname = str.upper(colname)
    diamonds_df = diamonds_df.with_column_renamed(colname, new_colname)

# Split data
train_df, test_df = diamonds_df.random_split(weights=[0.8, 0.2], seed=42)

INPUT_COLS = ['CARAT', 'DEPTH', 'TABLE_PCT', 'X', 'Y', 'Z']
LABEL_COL = 'PRICE'

print(f"Training data: {train_df.count()} rows")
print(f"Test data: {test_df.count()} rows")


## 3. Deploy Blue Environment (Current Production Model)


In [None]:
# Train the "blue" model (current production model - simpler baseline)
blue_model = LinearRegression(
    input_cols=INPUT_COLS,
    label_cols=[LABEL_COL],
    output_cols=['PREDICTED_PRICE']
)

print("🔵 Training Blue model (Linear Regression)...")
blue_model.fit(train_df)

# Test blue model performance
blue_predictions = blue_model.predict(test_df)
blue_mape = mean_absolute_percentage_error(
    df=blue_predictions,
    y_true_col_names=LABEL_COL,
    y_pred_col_names='PREDICTED_PRICE'
)

print(f"🔵 Blue model MAPE: {blue_mape:.4f}")


In [None]:
# Create required tags before model registration
print("🏷️  Creating Snowflake tags for model metadata...")

# Create tags in the DEPLOYMENT_MANAGEMENT schema
tag_creation_sql = """
-- Create tags for model tracking
CREATE TAG IF NOT EXISTS ML_HOL_DB.DEPLOYMENT_MANAGEMENT.ENVIRONMENT
    COMMENT = 'Deployment environment: blue, green, canary, etc.';

CREATE TAG IF NOT EXISTS ML_HOL_DB.DEPLOYMENT_MANAGEMENT.MODEL_TYPE
    COMMENT = 'Type of ML model: linear_regression, xgboost, etc.';

CREATE TAG IF NOT EXISTS ML_HOL_DB.DEPLOYMENT_MANAGEMENT.STAGE
    COMMENT = 'Deployment stage: development, staging, production, canary';
"""

session.sql(tag_creation_sql).collect()
print("✅ Tags created successfully")

# Register blue model in registry
db = identifier._get_unescaped_name(session.get_current_database())
schema = "DEPLOYMENT_MANAGEMENT"
registry = Registry(session=session, database_name=db, schema_name=schema)

model_name = "DIAMONDS_PRICE_PREDICTOR"
X_sample = train_df.select(INPUT_COLS).limit(100)

# Log blue model (without tags parameter)
blue_model_ver = registry.log_model(
    model_name=model_name,
    version_name="BLUE_V1",
    model=blue_model,
    sample_input_data=X_sample
)

blue_model_ver.set_metric("mape", blue_mape)
blue_model_ver.comment = "Blue environment - Production baseline model"

# Add tags to the model (not the version)
model = registry.get_model(model_name)
model.set_tag("ML_HOL_DB.DEPLOYMENT_MANAGEMENT.ENVIRONMENT", "blue")
model.set_tag("ML_HOL_DB.DEPLOYMENT_MANAGEMENT.MODEL_TYPE", "linear_regression")
model.set_tag("ML_HOL_DB.DEPLOYMENT_MANAGEMENT.STAGE", "production")

print(f"🔵 Blue model registered: {blue_model_ver.version_name}")
print("🔵 Tags added: environment=blue, model_type=linear_regression, stage=production")


In [None]:
# Update deployment configuration for blue environment
session.sql(f"""
INSERT INTO DEPLOYMENT_CONFIG 
(MODEL_NAME, ENVIRONMENT, MODEL_VERSION, IS_ACTIVE, DEPLOYMENT_TIME, HEALTH_CHECK_STATUS, TRAFFIC_PERCENTAGE)
VALUES 
('{model_name}', 'BLUE', 'BLUE_V1', TRUE, CURRENT_TIMESTAMP(), 'HEALTHY', 100.0)
""")

print("🔵 Blue environment is now serving 100% of production traffic")


## 4. Prepare Green Environment (New Model Version)


In [None]:
# Train the "green" model (new improved model)
green_model = XGBRegressor(
    input_cols=INPUT_COLS,
    label_cols=[LABEL_COL],
    output_cols=['PREDICTED_PRICE'],
    n_estimators=100,
    max_depth=6
)

print("🟢 Training Green model (XGBoost)...")
green_model.fit(train_df)

# Test green model performance
green_predictions = green_model.predict(test_df)
green_mape = mean_absolute_percentage_error(
    df=green_predictions,
    y_true_col_names=LABEL_COL,
    y_pred_col_names='PREDICTED_PRICE'
)

print(f"🟢 Green model MAPE: {green_mape:.4f}")
print(f"📊 Improvement over Blue: {((blue_mape - green_mape) / blue_mape * 100):.2f}%")


In [None]:
# Register green model (without tags parameter)
green_model_ver = registry.log_model(
    model_name=model_name,
    version_name="GREEN_V2",
    model=green_model,
    sample_input_data=X_sample
)

green_model_ver.set_metric("mape", green_mape)
green_model_ver.comment = "Green environment - Improved XGBoost model"

# Update model tags using fully qualified tag names
model = registry.get_model(model_name)
model.set_tag("ML_HOL_DB.DEPLOYMENT_MANAGEMENT.ENVIRONMENT", "green")   # Updates environment tag
model.set_tag("ML_HOL_DB.DEPLOYMENT_MANAGEMENT.MODEL_TYPE", "xgboost")  # Updates model_type tag  
model.set_tag("ML_HOL_DB.DEPLOYMENT_MANAGEMENT.STAGE", "canary")        # Updates stage tag

print(f"🟢 Green model registered: {green_model_ver.version_name}")
print("🟢 Tags updated: environment=green, model_type=xgboost, stage=canary")

# Deploy to green environment (initially inactive)
session.sql(f"""
INSERT INTO DEPLOYMENT_CONFIG 
(MODEL_NAME, ENVIRONMENT, MODEL_VERSION, IS_ACTIVE, DEPLOYMENT_TIME, HEALTH_CHECK_STATUS, TRAFFIC_PERCENTAGE)
VALUES 
('{model_name}', 'GREEN', 'GREEN_V2', FALSE, CURRENT_TIMESTAMP(), 'HEALTHY', 0.0)
""")

print("🟢 Green environment deployed but not yet serving traffic")


## 5. Canary Deployment (Gradual Traffic Shift)


In [None]:
import numpy as np
import random
from scipy import stats
from typing import Dict, List, Tuple
import uuid

class AdvancedCanaryDeployment:
    """Advanced canary deployment with comprehensive testing and monitoring"""
    
    def __init__(self, session, model_name, blue_model, green_model, test_data):
        self.session = session
        self.model_name = model_name
        self.blue_model = blue_model
        self.green_model = green_model
        self.test_data = test_data
        self.metrics_history = []
        self.rollback_triggers = {
            'error_rate_threshold': 0.05,  # 5% error rate
            'latency_degradation': 1.5,    # 50% latency increase
            'accuracy_degradation': 0.1,   # 10% accuracy drop
            'min_sample_size': 50          # Minimum samples for statistical significance
        }
    
    def simulate_load_test(self, environment: str, model_ver, sample_size: int = 100) -> Dict:
        """Simulate realistic load testing with performance metrics"""
        print(f"   🔍 Running load test for {environment} environment ({sample_size} requests)...")
        
        # Generate test batch
        test_batch = self.test_data.sample(n=sample_size, replace=True)
        
        # Simulate realistic latency variations
        base_latency = 45 if environment == "BLUE" else 52  # Green slightly slower initially
        latencies = []
        predictions = []
        errors = 0
        
        start_time = time.time()
        
        for i in range(sample_size):
            request_start = time.time()
            
            try:
                # Simulate individual prediction
                single_row = test_batch.limit(1).offset(i % test_batch.count())
                pred_result = model_ver.run(single_row, function_name='PREDICT')
                prediction = pred_result.select('PREDICTED_PRICE').collect()[0][0]
                predictions.append(prediction)
                
                # Simulate realistic latency with some randomness
                simulated_latency = base_latency + np.random.normal(0, 5) + (i * 0.1)  # Slight degradation over time
                latencies.append(max(10, simulated_latency))  # Minimum 10ms
                
            except Exception as e:
                errors += 1
                latencies.append(base_latency * 3)  # Error requests take longer
                predictions.append(0)  # Default prediction for errors
        
        total_time = time.time() - start_time
        
        # Calculate ground truth for accuracy assessment
        actual_prices = test_batch.select('PRICE').collect()
        actual_values = [row[0] for row in actual_prices[:len(predictions)]]
        
        # Calculate metrics
        avg_latency = np.mean(latencies)
        p95_latency = np.percentile(latencies, 95)
        error_rate = errors / sample_size
        
        # Calculate MAPE for accuracy
        mape = np.mean(np.abs((np.array(actual_values) - np.array(predictions)) / np.array(actual_values))) * 100
        
        return {
            'environment': environment,
            'sample_size': sample_size,
            'avg_latency': avg_latency,
            'p95_latency': p95_latency,
            'error_rate': error_rate,
            'mape': mape,
            'predictions': predictions,
            'latencies': latencies,
            'total_time': total_time,
            'timestamp': datetime.now()
        }
    
    def statistical_comparison(self, blue_metrics: Dict, green_metrics: Dict) -> Dict:
        """Perform statistical tests to compare blue vs green performance"""
        print("   📊 Performing statistical analysis...")
        
        # Latency comparison (t-test)
        latency_stat, latency_p = stats.ttest_ind(blue_metrics['latencies'], green_metrics['latencies'])
        
        # Error rate comparison (chi-square test)
        blue_errors = int(blue_metrics['error_rate'] * blue_metrics['sample_size'])
        green_errors = int(green_metrics['error_rate'] * green_metrics['sample_size'])
        
        contingency_table = [
            [blue_errors, blue_metrics['sample_size'] - blue_errors],
            [green_errors, green_metrics['sample_size'] - green_errors]
        ]
        
        chi2_stat, error_rate_p = stats.chi2_contingency(contingency_table)[:2]
        
        # Effect sizes
        latency_effect_size = (green_metrics['avg_latency'] - blue_metrics['avg_latency']) / blue_metrics['avg_latency']
        accuracy_effect_size = (green_metrics['mape'] - blue_metrics['mape']) / blue_metrics['mape']
        
        return {
            'latency_significant': latency_p < 0.05,
            'latency_p_value': latency_p,
            'latency_effect_size': latency_effect_size,
            'error_rate_significant': error_rate_p < 0.05,
            'error_rate_p_value': error_rate_p,
            'accuracy_effect_size': accuracy_effect_size,
            'sample_size_adequate': min(blue_metrics['sample_size'], green_metrics['sample_size']) >= self.rollback_triggers['min_sample_size']
        }
    
    def health_check(self, metrics: Dict, baseline_metrics: Dict = None) -> Tuple[bool, List[str]]:
        """Comprehensive health check with rollback triggers"""
        warnings = []
        is_healthy = True
        
        # Error rate check
        if metrics['error_rate'] > self.rollback_triggers['error_rate_threshold']:
            warnings.append(f"❌ Error rate too high: {metrics['error_rate']:.3f} > {self.rollback_triggers['error_rate_threshold']}")
            is_healthy = False
        
        # Latency degradation check (if baseline available)
        if baseline_metrics:
            latency_ratio = metrics['avg_latency'] / baseline_metrics['avg_latency']
            if latency_ratio > self.rollback_triggers['latency_degradation']:
                warnings.append(f"❌ Latency degraded: {latency_ratio:.2f}x > {self.rollback_triggers['latency_degradation']}x")
                is_healthy = False
            
            # Accuracy degradation check
            accuracy_ratio = metrics['mape'] / baseline_metrics['mape']
            if accuracy_ratio > (1 + self.rollback_triggers['accuracy_degradation']):
                warnings.append(f"❌ Accuracy degraded: MAPE {metrics['mape']:.3f} vs baseline {baseline_metrics['mape']:.3f}")
                is_healthy = False
        
        if is_healthy:
            print("   ✅ Health check passed")
        else:
            print(f"   ⚠️  Health check failed: {len(warnings)} issues detected")
            for warning in warnings:
                print(f"      {warning}")
        
        return is_healthy, warnings
    
    def log_performance_metrics(self, metrics: Dict):
        """Log detailed performance metrics to monitoring table"""
        for metric_name, value in [
            ('avg_latency', metrics['avg_latency']),
            ('p95_latency', metrics['p95_latency']),
            ('error_rate', metrics['error_rate']),
            ('mape', metrics['mape'])
        ]:
            self.session.sql(f"""
            INSERT INTO MODEL_PERFORMANCE_METRICS 
            (MODEL_NAME, ENVIRONMENT, MODEL_VERSION, METRIC_NAME, METRIC_VALUE, SAMPLE_SIZE)
            VALUES 
            ('{self.model_name}', '{metrics['environment']}', '{metrics['environment']}_V{1 if metrics['environment']=='BLUE' else 2}', 
             '{metric_name}', {value}, {metrics['sample_size']})
            """).collect()
    
    def log_prediction_requests(self, metrics: Dict, traffic_percentage: float):
        """Log prediction requests with detailed timing and routing info"""
        for i, (prediction, latency) in enumerate(zip(metrics['predictions'][:10], metrics['latencies'][:10])):
            request_id = f"{uuid.uuid4().hex[:8]}_{metrics['environment']}_{int(traffic_percentage)}"
            
            self.session.sql(f"""
            INSERT INTO PREDICTION_REQUESTS 
            (REQUEST_ID, MODEL_NAME, ENVIRONMENT, MODEL_VERSION, PREDICTION, RESPONSE_TIME_MS)
            VALUES 
            ('{request_id}', '{self.model_name}', '{metrics['environment']}', 
             '{metrics['environment']}_V{1 if metrics['environment']=='BLUE' else 2}', {prediction}, {latency})
            """).collect()
    
    def update_traffic_split(self, green_percentage: int):
        """Update traffic distribution between environments"""
        blue_percentage = 100 - green_percentage
        
        self.session.sql(f"""
        UPDATE DEPLOYMENT_CONFIG 
        SET TRAFFIC_PERCENTAGE = {blue_percentage},
            IS_ACTIVE = CASE WHEN {blue_percentage} > 0 THEN TRUE ELSE FALSE END
        WHERE MODEL_NAME = '{self.model_name}' AND ENVIRONMENT = 'BLUE'
        """).collect()
        
        self.session.sql(f"""
        UPDATE DEPLOYMENT_CONFIG 
        SET TRAFFIC_PERCENTAGE = {green_percentage},
            IS_ACTIVE = CASE WHEN {green_percentage} > 0 THEN TRUE ELSE FALSE END
        WHERE MODEL_NAME = '{self.model_name}' AND ENVIRONMENT = 'GREEN'
        """).collect()
    
    def advanced_canary_stage(self, green_percentage: int, baseline_metrics: Dict = None) -> Tuple[bool, Dict]:
        """Execute advanced canary deployment stage with comprehensive testing"""
        print(f"\n🚀 Canary Stage: {100-green_percentage}% Blue, {green_percentage}% Green")
        print("=" * 60)
        
        # Update traffic split
        self.update_traffic_split(green_percentage)
        
        # Run load tests for both environments (proportional to traffic)
        blue_sample_size = max(50, int(100 * (100 - green_percentage) / 100))
        green_sample_size = max(50, int(100 * green_percentage / 100)) if green_percentage > 0 else 0
        
        blue_metrics = self.simulate_load_test("BLUE", self.blue_model, blue_sample_size)
        green_metrics = None
        
        if green_percentage > 0:
            green_metrics = self.simulate_load_test("GREEN", self.green_model, green_sample_size)
            
            # Statistical comparison
            if green_percentage >= 10:  # Only compare when sufficient green traffic
                stats_results = self.statistical_comparison(blue_metrics, green_metrics)
                
                print(f"   📈 Performance Comparison:")
                print(f"      Blue: {blue_metrics['avg_latency']:.1f}ms avg, {blue_metrics['error_rate']:.3f} error rate, {blue_metrics['mape']:.3f} MAPE")
                print(f"      Green: {green_metrics['avg_latency']:.1f}ms avg, {green_metrics['error_rate']:.3f} error rate, {green_metrics['mape']:.3f} MAPE")
                
                if stats_results['sample_size_adequate']:
                    print(f"   🧮 Statistical Significance:")
                    print(f"      Latency difference: {'Significant' if stats_results['latency_significant'] else 'Not significant'} (p={stats_results['latency_p_value']:.4f})")
                    print(f"      Error rate difference: {'Significant' if stats_results['error_rate_significant'] else 'Not significant'} (p={stats_results['error_rate_p_value']:.4f})")
                    print(f"      Effect sizes: {stats_results['latency_effect_size']:.2%} latency, {stats_results['accuracy_effect_size']:.2%} accuracy")
        
        # Health checks
        blue_healthy, blue_warnings = self.health_check(blue_metrics, baseline_metrics)
        green_healthy = True
        
        if green_metrics:
            green_healthy, green_warnings = self.health_check(green_metrics, baseline_metrics)
        
        # Log metrics
        self.log_performance_metrics(blue_metrics)
        self.log_prediction_requests(blue_metrics, 100 - green_percentage)
        
        if green_metrics:
            self.log_performance_metrics(green_metrics)
            self.log_prediction_requests(green_metrics, green_percentage)
        
        # Store metrics history
        stage_metrics = {
            'stage': green_percentage,
            'blue_metrics': blue_metrics,
            'green_metrics': green_metrics,
            'timestamp': datetime.now()
        }
        self.metrics_history.append(stage_metrics)
        
        # Determine if stage passed
        stage_passed = blue_healthy and green_healthy
        
        if stage_passed:
            print(f"   ✅ Stage {green_percentage}% passed all health checks")
        else:
            print(f"   ❌ Stage {green_percentage}% failed health checks")
        
        return stage_passed, stage_metrics

# Initialize advanced canary deployment
canary_deployer = AdvancedCanaryDeployment(
    session=session,
    model_name=model_name,
    blue_model=blue_model_ver,
    green_model=green_model_ver,
    test_data=test_df
)

print("🟢 Starting Advanced Canary Deployment with Comprehensive Testing...")
print("🎯 Features: Load Testing, Statistical Analysis, Health Monitoring, Auto-Rollback")
print()

# Get baseline metrics from blue environment
baseline_metrics = canary_deployer.simulate_load_test("BLUE", blue_model_ver, 100)
print(f"📊 Baseline (Blue) Performance: {baseline_metrics['avg_latency']:.1f}ms avg latency, {baseline_metrics['error_rate']:.3f} error rate, {baseline_metrics['mape']:.3f} MAPE")

# Advanced canary stages with comprehensive testing
canary_stages = [5, 10, 25, 50, 75, 100]
deployment_successful = True

for stage in canary_stages:
    stage_passed, stage_metrics = canary_deployer.advanced_canary_stage(stage, baseline_metrics)
    
    if not stage_passed:
        print(f"🚨 CANARY DEPLOYMENT FAILED at stage {stage}%")
        print("🔄 Initiating automatic rollback...")
        canary_deployer.update_traffic_split(0)  # Rollback to 100% blue
        deployment_successful = False
        break
    
    # Add realistic monitoring delay
    print(f"   ⏱️  Monitoring stage {stage}% for 2 seconds...")
    time.sleep(2)

if deployment_successful:
    print("\n🎉 CANARY DEPLOYMENT SUCCESSFUL!")
    print("✅ Green environment is now serving 100% traffic")
    print("📊 All performance metrics within acceptable thresholds")
else:
    print("\n❌ CANARY DEPLOYMENT ABORTED")
    print("🔵 Rolled back to Blue environment (100% traffic)")
    print("📋 Review metrics and address issues before retry")


## 6. Emergency Rollback Mechanism


In [None]:
def emergency_rollback(model_name, reason="Manual rollback"):
    """Instantly rollback to blue environment"""
    print(f"🚨 EMERGENCY ROLLBACK INITIATED: {reason}")
    
    start_time = time.time()
    
    # Instantly switch all traffic back to blue
    session.sql(f"""
    UPDATE DEPLOYMENT_CONFIG 
    SET TRAFFIC_PERCENTAGE = 100,
        IS_ACTIVE = TRUE
    WHERE MODEL_NAME = '{model_name}' AND ENVIRONMENT = 'BLUE'
    """)
    
    session.sql(f"""
    UPDATE DEPLOYMENT_CONFIG 
    SET TRAFFIC_PERCENTAGE = 0,
        IS_ACTIVE = FALSE,
        HEALTH_CHECK_STATUS = 'ROLLED_BACK'
    WHERE MODEL_NAME = '{model_name}' AND ENVIRONMENT = 'GREEN'
    """)
    
    rollback_time = (time.time() - start_time) * 1000
    
    print(f"✅ Rollback completed in {rollback_time:.0f}ms")
    print("🔵 Blue environment is now serving 100% traffic")
    
    return rollback_time

# Demonstrate rollback capability (ready for emergencies)
print("💡 Emergency rollback function ready")
print("💡 Uncomment the line below to test instant rollback:")
print("# rollback_time = emergency_rollback(model_name, 'Demo rollback')")


## 7. Deployment Status Dashboard


In [None]:
# Current deployment status
deployment_status = session.sql(f"""
SELECT 
    MODEL_NAME,
    ENVIRONMENT,
    MODEL_VERSION,
    IS_ACTIVE,
    TRAFFIC_PERCENTAGE,
    HEALTH_CHECK_STATUS,
    DEPLOYMENT_TIME,
    DATEDIFF('minute', DEPLOYMENT_TIME, CURRENT_TIMESTAMP()) as MINUTES_DEPLOYED
FROM DEPLOYMENT_CONFIG 
WHERE MODEL_NAME = '{model_name}'
ORDER BY ENVIRONMENT
""").to_pandas()

print("🎛️  Current Deployment Status:")
print("=" * 80)
for _, row in deployment_status.iterrows():
    status_icon = "🟢" if row['IS_ACTIVE'] else "⚫"
    health_icon = "✅" if row['HEALTH_CHECK_STATUS'] == 'HEALTHY' else "⚠️"
    
    print(f"{status_icon} {row['ENVIRONMENT']} Environment:")
    print(f"   Version: {row['MODEL_VERSION']}")
    print(f"   Traffic: {row['TRAFFIC_PERCENTAGE']}%")
    print(f"   Health: {health_icon} {row['HEALTH_CHECK_STATUS']}")
    print(f"   Deployed: {row['MINUTES_DEPLOYED']} minutes ago")
    print()

# A/B testing results
ab_results = session.sql(f"""
SELECT 
    ENVIRONMENT,
    MODEL_VERSION,
    COUNT(*) as REQUEST_COUNT,
    AVG(PREDICTION) as AVG_PREDICTION,
    AVG(RESPONSE_TIME_MS) as AVG_RESPONSE_TIME
FROM PREDICTION_REQUESTS 
WHERE MODEL_NAME = '{model_name}'
GROUP BY ENVIRONMENT, MODEL_VERSION
ORDER BY ENVIRONMENT
""").to_pandas()

print("📊 A/B Testing Results:")
if not ab_results.empty:
    print(ab_results.to_string(index=False))
else:
    print("No prediction requests logged yet")


## 8. Production-Ready Features Summary

This blue-green deployment implementation demonstrates several production-ready capabilities:

### ✅ **Zero-Downtime Deployments**
- Instant traffic switching between environments
- No service interruption during model updates  
- Rollback completed in milliseconds

### ✅ **Comprehensive Health Monitoring**
- Automated health checks before deployment
- Performance metrics tracking
- Response time and prediction quality validation

### ✅ **Safe Rollout Strategy**
- Canary deployments with gradual traffic shifting
- A/B testing capabilities for model comparison
- Emergency rollback procedures

### ✅ **Enterprise Governance**
- Complete audit trail of deployments
- Performance metrics logging
- Model versioning and tagging

### 🚀 **Why This Matters for Snowflake**

This pattern showcases Snowflake's advantages over traditional ML platforms:

1. **Native Integration**: All deployment logic runs inside Snowflake using SQL and Python
2. **Data Locality**: Models, monitoring data, and business data stay in one platform  
3. **Scale**: Handles enterprise workloads without complex infrastructure
4. **Governance**: Built-in security, compliance, and audit capabilities
5. **Cost Efficiency**: Pay-per-query pricing vs. always-on cluster costs

### 💡 **Enterprise Impact**

**Traditional ML Platforms:**
- Require separate infrastructure for deployment
- Complex data movement between systems
- Manual rollback procedures
- Limited governance and audit trails

**Snowflake ML Platform:**
- ✅ Zero infrastructure overhead
- ✅ No data movement required  
- ✅ Instant rollbacks with complete audit trail
- ✅ Enterprise-grade governance built-in

This demonstrates that Snowflake isn't just a data warehouse—it's a **complete ML platform** capable of enterprise-grade production deployments that competitors simply cannot match due to their architectural limitations.
