In [0]:

# MAGIC %md
# MAGIC # 06 - Automated Model Retraining Pipeline
# MAGIC 
# MAGIC **End-to-end automated pipeline for model retraining and deployment**
# MAGIC 
# MAGIC ## Objectives:
# MAGIC - Check if retraining is needed based on monitoring metrics
# MAGIC - Retrain model with latest data
# MAGIC - Compare new model with production model
# MAGIC - Auto-promote if performance improves
# MAGIC - Send notifications and reports


In [0]:

# MAGIC %md
# MAGIC ## 1. Setup & Imports


In [0]:
# Restart Python to ensure clean imports
%restart_python


In [0]:

# Standard imports
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from datetime import datetime
import json
import warnings
warnings.filterwarnings('ignore')

# MLflow
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient

# Sklearn
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# Set plotting style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

print("‚úÖ Imports complete")


In [0]:

# MAGIC %md
# MAGIC ## 2. Project Setup


In [0]:

print("="*60)
print("AUTOMATED RETRAINING PIPELINE")
print("="*60)

# Define project root
project_root = "/Workspace/COMM - Commercial Analytics (CMAN)/MMM Quattro 2025/Satish/MLFLOW_sample"

# Add to path
if project_root not in sys.path:
    sys.path.insert(0, project_root)

print(f"\nüìÇ Project root: {project_root}")
print(f"‚úÖ Added to sys.path")

# Import custom modules
from src.utils import ConfigLoader, DataLoader, MLflowLogger, safe_display
from src.data_processing import DataProcessor
from src.feature_engineering import FeatureEngineer
from src.model import ModelTrainer

print(f"‚úÖ Custom modules imported")
print("="*60)


In [0]:

# MAGIC %md
# MAGIC ## 3. Load Configuration


In [0]:

print("="*60)
print("LOADING CONFIGURATION")
print("="*60)

config_path = f'{project_root}/config/config.yaml'
config = ConfigLoader.load_config(config_path)

print(f"\n‚úÖ Configuration loaded")
print(f"  ‚Ä¢ Project: {config['project']['name']}")
print(f"  ‚Ä¢ Model Registry: {config['mlflow']['model_registry_name']}")
print("="*60)


In [0]:

# MAGIC %md
# MAGIC ## 4. Setup MLflow


In [0]:

print("="*60)
print("SETTING UP MLFLOW")
print("="*60)

experiment_name = config['mlflow']['experiment_name']
mlflow.set_experiment(experiment_name)

# Initialize MLflow client
client = MlflowClient()

print(f"\n‚úÖ MLflow experiment set: {experiment_name}")

experiment = mlflow.get_experiment_by_name(experiment_name)
print(f"  ‚Ä¢ Experiment ID: {experiment.experiment_id}")
print("="*60)


In [0]:

# MAGIC %md
# MAGIC ## 5. Check Retraining Criteria


In [0]:

print("="*60)
print("CHECKING RETRAINING CRITERIA")
print("="*60)

# Define retraining thresholds
RETRAINING_THRESHOLDS = {
    'max_rmse_increase': 10,      # Retrain if RMSE increases by >10%
    'max_mae_increase': 10,       # Retrain if MAE increases by >10%
    'min_r2_decrease': 5,         # Retrain if R¬≤ decreases by >5%
    'max_drift_features': 2,      # Retrain if >2 features show drift
    'max_days_since_training': 30 # Retrain if model is >30 days old
}

print(f"\nüìã Retraining Thresholds:")
for key, value in RETRAINING_THRESHOLDS.items():
    print(f"  ‚Ä¢ {key}: {value}")

# Load latest monitoring report
processed_path = config['data']['processed_path']
import glob
import os

# Find latest monitoring report
monitoring_files = glob.glob(f"{processed_path}monitoring_report_*.json")

retraining_needed = False
retraining_reasons = []

if monitoring_files:
    latest_report = max(monitoring_files, key=os.path.getctime)
    
    with open(latest_report, 'r') as f:
        monitoring_data = json.load(f)
    
    print(f"\nüìä Latest Monitoring Report: {os.path.basename(latest_report)}")
    
    # Check performance drift
    drift_metrics = monitoring_data.get('drift_metrics', {})
    
    if drift_metrics.get('rmse_change_pct', 0) > RETRAINING_THRESHOLDS['max_rmse_increase']:
        retraining_needed = True
        retraining_reasons.append(f"RMSE increased by {drift_metrics['rmse_change_pct']:.2f}%")
    
    if drift_metrics.get('mae_change_pct', 0) > RETRAINING_THRESHOLDS['max_mae_increase']:
        retraining_needed = True
        retraining_reasons.append(f"MAE increased by {drift_metrics['mae_change_pct']:.2f}%")
    
    if drift_metrics.get('r2_change_pct', 0) < -RETRAINING_THRESHOLDS['min_r2_decrease']:
        retraining_needed = True
        retraining_reasons.append(f"R¬≤ decreased by {abs(drift_metrics['r2_change_pct']):.2f}%")
    
    # Check feature drift
    feature_drift_summary = monitoring_data.get('feature_drift_summary', {})
    drifted_features = feature_drift_summary.get('drifted_features', 0)
    
    if drifted_features > RETRAINING_THRESHOLDS['max_drift_features']:
        retraining_needed = True
        retraining_reasons.append(f"{drifted_features} features showing drift")
    
    # Check alerts
    alert_count = monitoring_data.get('alert_count', 0)
    if alert_count > 0:
        retraining_needed = True
        retraining_reasons.append(f"{alert_count} monitoring alerts triggered")
    
else:
    print(f"\n‚ö†Ô∏è No monitoring report found")
    retraining_needed = True
    retraining_reasons.append("No monitoring data available")

# Check model age
model_name = config['mlflow']['model_registry_name']
try:
    model_versions = client.search_model_versions(f"name='{model_name}'")
    if model_versions:
        latest_version = model_versions[0]
        run = client.get_run(latest_version.run_id)
        
        # Get run timestamp
        run_timestamp = datetime.fromtimestamp(run.info.start_time / 1000)
        days_since_training = (datetime.now() - run_timestamp).days
        
        print(f"\nüìÖ Model Age: {days_since_training} days")
        
        if days_since_training > RETRAINING_THRESHOLDS['max_days_since_training']:
            retraining_needed = True
            retraining_reasons.append(f"Model is {days_since_training} days old")
except:
    pass

# Decision
print(f"\n{'='*60}")
if retraining_needed:
    print(f"üîÑ RETRAINING NEEDED")
    print(f"\nüìã Reasons:")
    for reason in retraining_reasons:
        print(f"  ‚Ä¢ {reason}")
else:
    print(f"‚úÖ RETRAINING NOT NEEDED - Model performance is stable")

print(f"{'='*60}")


In [0]:

# MAGIC %md
# MAGIC ## 6. Load and Prepare Data


In [0]:

# Only proceed if retraining is needed
if retraining_needed:
    print("="*60)
    print("LOADING AND PREPARING DATA")
    print("="*60)
    
    # Load raw data
    raw_data_path = config['data']['raw_path']
    df = pd.read_csv(raw_data_path)
    
    print(f"\n‚úÖ Data loaded: {df.shape}")
    
    # Initialize processors
    data_processor = DataProcessor(config)
    feature_engineer = FeatureEngineer(config)
    
    # Validate data
    is_valid, validation_report = data_processor.validate_data(df)
    
    if not is_valid:
        print(f"\n‚ö†Ô∏è Data validation issues found:")
        print(validation_report)
    else:
        print(f"\n‚úÖ Data validation passed")
    
    # Create features
    print(f"\nüîß Creating features...")
    df_featured = feature_engineer.create_features(df)
    
    # Encode categorical features
    print(f"üîß Encoding categorical features...")
    df_encoded = data_processor.encode_categorical(df_featured)
    
    # Split features and target
    target_col = config['preprocessing']['target']
    X = df_encoded.drop(columns=[target_col])
    y = df_encoded[target_col]
    
    # Train-test split
    test_size = config['preprocessing']['test_size']
    random_state = config['preprocessing']['random_state']
    
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state
    )
    
    # Scale features
    print(f"üîß Scaling features...")
    X_train_scaled, X_test_scaled = data_processor.scale_features(X_train, X_test)
    
    print(f"\n‚úÖ Data preparation complete")
    print(f"  ‚Ä¢ Training set: {X_train_scaled.shape}")
    print(f"  ‚Ä¢ Test set: {X_test_scaled.shape}")
    
    print("="*60)
else:
    print("\n‚è≠Ô∏è Skipping data preparation - retraining not needed")


In [0]:

# MAGIC %md
# MAGIC ## 7. Train New Model


In [0]:

if retraining_needed:
    print("="*60)
    print("TRAINING NEW MODEL")
    print("="*60)
    
    # Initialize model trainer
    model_trainer = ModelTrainer(config)
    
    # Get enabled models
    enabled_models = {
        name: params 
        for name, params in config['models'].items() 
        if params.get('enabled', False)
    }
    
    print(f"\nü§ñ Training {len(enabled_models)} models...")
    
    # Train all models
    results = {}
    
    for model_name in enabled_models.keys():
        print(f"\n{'='*60}")
        print(f"Training: {model_name.replace('_', ' ').title()}")
        print(f"{'='*60}")
        
        result = model_trainer.train_model(
            model_name=model_name,
            X_train=X_train_scaled,
            y_train=y_train,
            X_test=X_test_scaled,
            y_test=y_test
        )
        
        results[model_name] = result
        
        print(f"\n‚úÖ {model_name} complete")
        print(f"  ‚Ä¢ Test RMSE: ${result['test_metrics']['rmse']:,.2f}")
        print(f"  ‚Ä¢ Test R¬≤: {result['test_metrics']['r2']:.4f}")
    
    # Find best model
    best_model_name = min(results.keys(), key=lambda k: results[k]['test_metrics']['rmse'])
    best_result = results[best_model_name]
    new_model = best_result['model']
    
    print(f"\n{'='*60}")
    print(f"üèÜ BEST NEW MODEL: {best_model_name.replace('_', ' ').title()}")
    print(f"{'='*60}")
    print(f"  ‚Ä¢ Test RMSE: ${best_result['test_metrics']['rmse']:,.2f}")
    print(f"  ‚Ä¢ Test MAE:  ${best_result['test_metrics']['mae']:,.2f}")
    print(f"  ‚Ä¢ Test R¬≤:   {best_result['test_metrics']['r2']:.4f}")
    print(f"{'='*60}")
    
else:
    print("\n‚è≠Ô∏è Skipping model training - retraining not needed")


In [0]:

# MAGIC %md
# MAGIC ## 8. Compare with Production Model


In [0]:

if retraining_needed:
    print("="*60)
    print("COMPARING WITH PRODUCTION MODEL")
    print("="*60)
    
    # Load production model
    model_registry_name = config['mlflow']['model_registry_name']
    
    try:
        production_model_uri = f"models:/{model_registry_name}/Production"
        production_model = mlflow.sklearn.load_model(production_model_uri)
        
        print(f"\n‚úÖ Production model loaded")
        
        # Get production model predictions
        y_pred_production = production_model.predict(X_test_scaled)
        
        # Calculate production model metrics
        production_metrics = {
            'rmse': np.sqrt(mean_squared_error(y_test, y_pred_production)),
            'mae': mean_absolute_error(y_test, y_pred_production),
            'r2': r2_score(y_test, y_pred_production)
        }
        
        # Get new model predictions
        y_pred_new = new_model.predict(X_test_scaled)
        
        # Calculate new model metrics
        new_metrics = {
            'rmse': np.sqrt(mean_squared_error(y_test, y_pred_new)),
            'mae': mean_absolute_error(y_test, y_pred_new),
            'r2': r2_score(y_test, y_pred_new)
        }
        
        # Calculate improvements
        improvements = {
            'rmse_improvement': ((production_metrics['rmse'] - new_metrics['rmse']) / production_metrics['rmse']) * 100,
            'mae_improvement': ((production_metrics['mae'] - new_metrics['mae']) / production_metrics['mae']) * 100,
            'r2_improvement': ((new_metrics['r2'] - production_metrics['r2']) / production_metrics['r2']) * 100
        }
        
        print(f"\nüìä Model Comparison:")
        print(f"\n  Production Model:")
        print(f"    ‚Ä¢ RMSE: ${production_metrics['rmse']:,.2f}")
        print(f"    ‚Ä¢ MAE:  ${production_metrics['mae']:,.2f}")
        print(f"    ‚Ä¢ R¬≤:   {production_metrics['r2']:.4f}")
        
        print(f"\n  New Model:")
        print(f"    ‚Ä¢ RMSE: ${new_metrics['rmse']:,.2f}")
        print(f"    ‚Ä¢ MAE:  ${new_metrics['mae']:,.2f}")
        print(f"    ‚Ä¢ R¬≤:   {new_metrics['r2']:.4f}")
        
        print(f"\n  Improvements:")
        print(f"    ‚Ä¢ RMSE: {improvements['rmse_improvement']:+.2f}%")
        print(f"    ‚Ä¢ MAE:  {improvements['mae_improvement']:+.2f}%")
        print(f"    ‚Ä¢ R¬≤:   {improvements['r2_improvement']:+.2f}%")
        
        # Decide if new model is better
        PROMOTION_THRESHOLD = 2  # New model must be at least 2% better
        
        should_promote = (
            improvements['rmse_improvement'] > PROMOTION_THRESHOLD or
            improvements['r2_improvement'] > PROMOTION_THRESHOLD
        )
        
        if should_promote:
            print(f"\n‚úÖ New model shows significant improvement - will be promoted")
        else:
            print(f"\n‚ö†Ô∏è New model does not show significant improvement - will not be promoted")
        
    except Exception as e:
        print(f"\n‚ö†Ô∏è Could not load production model: {e}")
        print(f"  ‚Ä¢ Assuming this is the first model - will promote")
        should_promote = True
        production_metrics = None
    
    print("="*60)
    
else:
    print("\n‚è≠Ô∏è Skipping model comparison - retraining not needed")


In [0]:

# MAGIC %md
# MAGIC ## 9. Register and Promote New Model


In [0]:

if retraining_needed and should_promote:
    print("="*60)
    print("REGISTERING AND PROMOTING NEW MODEL")
    print("="*60)
    
    try:
        # Register new model
        with mlflow.start_run(run_name=f"retrained_{best_model_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"):
            
            # Log metadata
            mlflow.log_param("model_type", best_model_name)
            mlflow.log_param("retraining_trigger", ", ".join(retraining_reasons))
            mlflow.log_param("auto_retrained", True)
            
            # Log parameters
            if best_result['params']:
                MLflowLogger.log_params_from_dict(best_result['params'])
            
            # Log metrics
            MLflowLogger.log_metrics_from_dict({
                f"train_{k}": v for k, v in best_result['train_metrics'].items()
            })
            MLflowLogger.log_metrics_from_dict({
                f"test_{k}": v for k, v in best_result['test_metrics'].items()
            })
            
            # Log improvements if production model exists
            if production_metrics:
                MLflowLogger.log_metrics_from_dict({
                    f"improvement_{k}": v for k, v in improvements.items()
                })
            
            # Register model
            mlflow.sklearn.log_model(
                sk_model=new_model,
                artifact_path="model",
                registered_model_name=model_registry_name
            )
            
            run_id = mlflow.active_run().info.run_id
        
        print(f"\n‚úÖ New model registered")
        print(f"  ‚Ä¢ Run ID: {run_id}")
        
        # Get the new model version
        import time
        time.sleep(2)  # Wait for registration to complete
        
        model_versions = client.search_model_versions(f"name='{model_registry_name}'")
        new_version = model_versions[0]
        
        print(f"  ‚Ä¢ Version: {new_version.version}")
        
        # Transition to Production
        client.transition_model_version_stage(
            name=model_registry_name,
            version=new_version.version,
            stage="Production",
            archive_existing_versions=True
        )
        
        # Update description
        client.update_model_version(
            name=model_registry_name,
            version=new_version.version,
            description=f"Auto-retrained model. Reasons: {', '.join(retraining_reasons)}. Deployed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
        )
        
        print(f"\n‚úÖ Model promoted to Production")
        print(f"  ‚Ä¢ Previous versions archived")
        
    except Exception as e:
        print(f"\n‚ùå Error during registration/promotion: {e}")
        import traceback
        traceback.print_exc()
    
    print("="*60)
    
elif retraining_needed and not should_promote:
    print("\n‚è≠Ô∏è Skipping promotion - new model not significantly better")
    
else:
    print("\n‚è≠Ô∏è Skipping registration - retraining not needed")

In [0]:

# MAGIC %md
# MAGIC ## 10. Generate Retraining Report

In [0]:

if retraining_needed:
    print("="*60)
    print("GENERATING RETRAINING REPORT")
    print("="*60)
    
    # Create retraining report
    retraining_report = {
        'timestamp': datetime.now().isoformat(),
        'retraining_needed': retraining_needed,
        'retraining_reasons': retraining_reasons,
        'model_promoted': should_promote if 'should_promote' in locals() else False,
        
        'new_model': {
            'name': best_model_name,
            'type': type(new_model).__name__,
            'metrics': {
                'rmse': float(new_metrics['rmse']),
                'mae': float(new_metrics['mae']),
                'r2': float(new_metrics['r2'])
            }
        } if 'new_model' in locals() else None,
        
        'production_model': {
            'metrics': {
                'rmse': float(production_metrics['rmse']),
                'mae': float(production_metrics['mae']),
                'r2': float(production_metrics['r2'])
            }
        } if production_metrics else None,
        
        'improvements': {
            k: float(v) for k, v in improvements.items()
        } if 'improvements' in locals() else None,
        
        'training_data': {
            'train_size': len(X_train_scaled) if 'X_train_scaled' in locals() else 0,
            'test_size': len(X_test_scaled) if 'X_test_scaled' in locals() else 0,
            'features': list(X_train_scaled.columns) if 'X_train_scaled' in locals() else []
        }
    }
    
    # Save report
    report_path = f"{processed_path}retraining_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(report_path, 'w') as f:
        json.dump(retraining_report, f, indent=2, default=str)
    
    print(f"\n‚úÖ Retraining report saved: {report_path}")
    
    print("="*60)
    
else:
    print("\n‚è≠Ô∏è Skipping report generation - retraining not needed")

In [0]:
# MAGIC %md
# MAGIC ## 11. Pipeline Summary


In [0]:

print("="*60)
print("AUTOMATED RETRAINING PIPELINE SUMMARY")
print("="*60)

print(f"\nüìÖ Execution Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

print(f"\nüîç Retraining Decision:")
print(f"  ‚Ä¢ Needed: {'Yes' if retraining_needed else 'No'}")

if retraining_needed:
    print(f"\nüìã Reasons:")
    for reason in retraining_reasons:
        print(f"  ‚Ä¢ {reason}")
    
    if 'new_model' in locals():
        print(f"\nü§ñ New Model:")
        print(f"  ‚Ä¢ Type: {best_model_name.replace('_', ' ').title()}")
        print(f"  ‚Ä¢ RMSE: ${new_metrics['rmse']:,.2f}")
        print(f"  ‚Ä¢ MAE:  ${new_metrics['mae']:,.2f}")
        print(f"  ‚Ä¢ R¬≤:   {new_metrics['r2']:.4f}")
    
    if 'improvements' in locals():
        print(f"\nüìà Improvements:")
        print(f"  ‚Ä¢ RMSE: {improvements['rmse_improvement']:+.2f}%")
        print(f"  ‚Ä¢ MAE:  {improvements['mae_improvement']:+.2f}%")
        print(f"  ‚Ä¢ R¬≤:   {improvements['r2_improvement']:+.2f}%")
    
    if 'should_promote' in locals():
        print(f"\nüöÄ Promotion:")
        print(f"  ‚Ä¢ Promoted: {'Yes' if should_promote else 'No'}")
        if should_promote:
            print(f"  ‚Ä¢ Status: New model is now in Production")
        else:
            print(f"  ‚Ä¢ Reason: Improvement below threshold")
else:
    print(f"\n‚úÖ Current model performance is stable")
    print(f"  ‚Ä¢ No action taken")

print(f"\nüìÅ Generated Files:")
if retraining_needed:
    print(f"  ‚Ä¢ retraining_report_*.json")
else:
    print(f"  ‚Ä¢ None")

print(f"\nüí° Next Steps:")
if retraining_needed and should_promote:
    print(f"  ‚Ä¢ Monitor new production model")
    print(f"  ‚Ä¢ Update documentation")
    print(f"  ‚Ä¢ Notify stakeholders")
elif retraining_needed and not should_promote:
    print(f"  ‚Ä¢ Investigate why new model didn't improve")
    print(f"  ‚Ä¢ Consider different algorithms or features")
    print(f"  ‚Ä¢ Review data quality")
else:
    print(f"  ‚Ä¢ Continue monitoring")
    print(f"  ‚Ä¢ Schedule next check")

print(f"\n‚úÖ Pipeline execution complete!")
print("="*60)