In [None]:
# Initial Setup
import mlflow
import pandas as pd
import numpy as np
from datetime import datetime
from scipy import stats
from sklearn.metrics import mean_absolute_error, mean_squared_error
import smtplib
from email.mime.text import MIMEText
from apscheduler.schedulers.background import BackgroundScheduler

# Initialize MLFlow
mlflow.set_tracking_uri("http://localhost:5000")  # Update with your MLFlow server URI
mlflow.set_experiment("Sales_Forecast_Monitoring")

print("MLFlow version:", mlflow.__version__)
print("MLFlow tracking URI:", mlflow.get_tracking_uri())

In [None]:
# Load your production model from MLFlow
MODEL_URI = "models:/sales_forecast/production"  # Update with your model URI

try:
    model = mlflow.pyfunc.load_model(MODEL_URI)
    print(f"Successfully loaded model from {MODEL_URI}")
except Exception as e:
    print(f"Error loading model: {str(e)}")
    raise

In [None]:
def establish_baseline(model, X_train, y_train):
    """
    Calculate and log baseline statistics for monitoring
    
    Args:
        model: Your trained model
        X_train: Training features DataFrame
        y_train: Training target values
        
    Returns:
        run_id: The MLFlow run ID where baseline was logged
    """
    # Make predictions on training data
    train_pred = model.predict(X_train)
    
    # Calculate baseline statistics
    baseline_stats = {
        'performance': {
            'mae': mean_absolute_error(y_train, train_pred),
            'rmse': mean_squared_error(y_train, train_pred, squared=False)
        },
        'features': {},
        'timestamp': datetime.now().isoformat()
    }
    
    # Store feature distributions (sampling to avoid memory issues)
    for col in X_train.columns:
        baseline_stats['features'][col] = {
            'mean': float(X_train[col].mean()),
            'std': float(X_train[col].std()),
            'sample_values': X_train[col].sample(min(1000, len(X_train))).values.tolist()
        }
    
    # Log to MLFlow
    with mlflow.start_run(run_name="baseline_establishment") as run:
        mlflow.log_dict(baseline_stats, "baseline_stats.json")
        mlflow.log_metrics(baseline_stats['performance'])
        mlflow.log_param("baseline_data_shape", f"{X_train.shape[0]} rows, {X_train.shape[1]} features")
        
    print(f"Baseline established in run {run.info.run_id}")
    return run.info.run_id, baseline_stats

# Example usage (uncomment when ready):
# baseline_run_id, baseline_stats = establish_baseline(model, X_train, y_train)

In [None]:
class SalesForecastMonitor:
    def __init__(self, model_uri, baseline_stats):
        self.model = mlflow.pyfunc.load_model(model_uri)
        self.baseline_stats = baseline_stats
        
    def check_feature_drift(self, new_data):
        """Detect feature drift using Kolmogorov-Smirnov test"""
        drift_report = {}
        for feature in self.baseline_stats['features']:
            if feature in new_data.columns:
                # KS test for continuous features
                _, p_value = stats.ks_2samp(
                    self.baseline_stats['features'][feature]['sample_values'],
                    new_data[feature].dropna().values
                )
                drift_report[feature] = {
                    'p_value': float(p_value),
                    'drift_detected': p_value < 0.05,
                    'baseline_mean': self.baseline_stats['features'][feature]['mean'],
                    'current_mean': float(new_data[feature].mean())
                }
            else:
                drift_report[feature] = {
                    'error': 'Feature missing in new data'
                }
        return drift_report
    
    def check_performance_drift(self, y_true, y_pred):
        """Compare current performance with baseline"""
        baseline_mae = self.baseline_stats['performance']['mae']
        current_mae = mean_absolute_error(y_true, y_pred)
        
        return {
            'metric': 'MAE',
            'baseline_value': float(baseline_mae),
            'current_value': float(current_mae),
            'percent_change': float(((current_mae - baseline_mae) / baseline_mae) * 100),
            'threshold_exceeded': abs((current_mae - baseline_mae) / baseline_mae) > 0.2  # 20% threshold
        }

In [None]:
def run_monitoring_pipeline(model_uri, new_data, actuals=None, baseline_run_id=None):
    """
    Complete monitoring pipeline for a single execution
    
    Args:
        model_uri: URI of the model to monitor
        new_data: DataFrame with new inference data
        actuals: Optional array of true values (if available)
        baseline_run_id: Run ID containing baseline stats
        
    Returns:
        Tuple of (predictions, drift_report, performance_report)
    """
    # Load baseline stats if not provided
    if baseline_run_id is None:
        baseline_run_id = mlflow.search_runs(filter_string="tags.mlflow.runName='baseline_establishment'").iloc[0].run_id
    
    with mlflow.start_run(run_name="monitoring_run", nested=True) as run:
        # Load baseline data
        client = mlflow.tracking.MlflowClient()
        baseline_stats = mlflow.artifacts.load_dict(f"runs:/{baseline_run_id}/baseline_stats.json")
        
        # Initialize monitor
        monitor = SalesForecastMonitor(model_uri, baseline_stats)
        
        # Log monitoring metadata
        mlflow.log_param("monitoring_timestamp", datetime.now().isoformat())
        mlflow.log_param("data_shape", f"{new_data.shape[0]} rows, {new_data.shape[1]} features")
        
        # Check feature drift
        drift_report = monitor.check_feature_drift(new_data)
        mlflow.log_dict(drift_report, "feature_drift_report.json")
        
        # Count drifting features
        drifting_features = sum(1 for f in drift_report.values() if isinstance(f, dict) and f.get('drift_detected', False))
        mlflow.log_metric("drifting_features_count", drifting_features)
        
        # Make predictions
        predictions = monitor.model.predict(new_data)
        
        # Check performance if actuals available
        performance_report = None
        if actuals is not None:
            performance_report = monitor.check_performance_drift(actuals, predictions)
            mlflow.log_metrics({
                f"current_{performance_report['metric']}": performance_report['current_value'],
                f"{performance_report['metric']}_percent_change": performance_report['percent_change']
            })
            
            if performance_report['threshold_exceeded']:
                mlflow.set_tag("alert", "performance_drift_detected")
        
        return predictions, drift_report, performance_report

In [None]:
def send_performance_alert(run_id, recipients):
    """Generate and send performance alert email"""
    client = mlflow.tracking.MlflowClient()
    run = client.get_run(run_id)
    
    # Extract relevant information
    metrics = run.data.metrics
    params = run.data.params
    
    # Create alert message
    alert_msg = f"""
    Sales Forecast Model Alert
    -------------------------
    Run ID: {run_id}
    Time: {params.get('monitoring_timestamp', 'N/A')}
    
    Performance Metrics:
    - Current MAE: {metrics.get('current_MAE', 'N/A')}
    - MAE Change: {metrics.get('MAE_percent_change', 'N/A')}%
    
    Feature Drift:
    - Drifting Features: {metrics.get('drifting_features_count', 0)}
    
    Model Info:
    - Data Processed: {params.get('data_shape', 'N/A')}
    
    View in MLFlow: {mlflow.get_tracking_uri()}/#/experiments/{run.info.experiment_id}/runs/{run_id}
    """
    
    # Send email (configure with your SMTP settings)
    msg = MIMEText(alert_msg)
    msg['Subject'] = "ALERT: Sales Forecast Model Drift Detected"
    msg['From'] = "ml-monitoring@yourcompany.com"
    msg['To'] = ", ".join(recipients)
    
    try:
        with smtplib.SMTP('your.smtp.server', 587) as server:
            server.starttls()
            server.login("your_username", "your_password")
            server.send_message(msg)
        print("Alert sent successfully")
    except Exception as e:
        print(f"Failed to send alert: {str(e)}")

In [None]:
def setup_scheduled_monitoring(model_uri, data_loader_func, recipients):
    """Set up scheduled monitoring job"""
    
    def monitoring_job():
        print(f"Running scheduled monitoring at {datetime.now()}")
        try:
            new_data, actuals = data_loader_func()
            with mlflow.start_run(run_name="scheduled_monitoring") as run:
                _, _, performance_report = run_monitoring_pipeline(
                    model_uri=model_uri,
                    new_data=new_data,
                    actuals=actuals
                )
                
                if performance_report and performance_report['threshold_exceeded']:
                    send_performance_alert(run.info.run_id, recipients)
                    
        except Exception as e:
            print(f"Monitoring job failed: {str(e)}")
    
    # Set up scheduler (runs daily at 2AM)
    scheduler = BackgroundScheduler()
    scheduler.add_job(monitoring_job, 'cron', hour=2)
    scheduler.start()
    print("Scheduled monitoring initialized - will run daily at 2AM")
    
    return scheduler

# Example usage:
# def load_monitoring_data():
#     new_data = pd.read_csv("new_sales_data.csv")
#     actuals = pd.read_csv("actuals.csv")['sales'] if exists else None
#     return new_data, actuals
# 
# scheduler = setup_scheduled_monitoring(
#     model_uri=MODEL_URI,
#     data_loader_func=load_monitoring_data,
#     recipients=["data-team@yourcompany.com"]
# )

In [None]:
# Example end-to-end usage (adapt to your specific case)

# 1. Load your training data to establish baseline
# X_train, y_train = load_training_data()
# baseline_run_id, baseline_stats = establish_baseline(model, X_train, y_train)

# 2. Load new data for monitoring
# new_data = pd.read_csv("new_sales_data.csv")
# actuals = pd.read_csv("actuals.csv")['sales']  # Optional if available

# 3. Run monitoring
# predictions, drift_report, perf_report = run_monitoring_pipeline(
#     model_uri=MODEL_URI,
#     new_data=new_data,
#     actuals=actuals,
#     baseline_run_id=baseline_run_id
# )

# 4. View results
# print("Drift Report:", json.dumps(drift_report, indent=2))
# print("Performance Report:", json.dumps(perf_report, indent=2))