# MLOps & Production Deployment for Deep Learning
## From Notebook to Production: The Complete Guide

**Skill Level:** Advanced | Architect

This notebook covers everything needed to take a deep learning model from experimental notebook to production deployment, including experiment tracking, model optimization, serving infrastructure, monitoring, and CI/CD pipelines.

### Table of Contents
1. [Introduction to MLOps](#1)
2. [Experiment Configuration & Reproducibility](#2)
3. [Production Data Pipelines](#3)
4. [Model Versioning & Registry](#4)
5. [Model Optimization for Deployment](#5)
6. [Serving Architecture](#6)
7. [Monitoring & Drift Detection](#7)
8. [Automated Model Testing](#8)
9. [CI/CD for ML](#9)
10. [Distributed Training & Scaling](#10)
11. [End-to-End Production Pipeline](#11)
12. [Architect-Level Exercises](#12)

<a name="1"></a>
## 1. Introduction to MLOps

### What is MLOps?

MLOps (Machine Learning Operations) is the practice of applying DevOps principles to ML systems. It bridges the gap between ML development and production operations.

### The ML Lifecycle

```
┌─────────┐     ┌─────────┐     ┌──────────┐     ┌────────┐     ┌─────────┐
│  Data    │────→│  Train   │────→│ Evaluate │────→│ Deploy │────→│ Monitor │
│ Pipeline │     │  Model   │     │ & Test   │     │& Serve │     │& Alert  │
└─────────┘     └─────────┘     └──────────┘     └────────┘     └─────────┘
     ↑                                                                │
     └────────────────────── Retrain Trigger ─────────────────────────┘
```

### MLOps Maturity Levels (Google)

| Level | Description | Characteristics |
|-------|-------------|-----------------|
| **0** | Manual | No automation, notebook-driven, no monitoring |
| **1** | ML Pipeline | Automated training, basic CI/CD, manual deployment |
| **2** | CI/CD + CT | Automated training & deployment, continuous monitoring, auto-retraining |

### Key Principles
- **Reproducibility**: Same data + code + config = same model
- **Automation**: Minimize manual steps in the ML lifecycle
- **Monitoring**: Track model performance in production continuously
- **Versioning**: Version data, code, models, and configs together

In [None]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import json, os, time, tempfile, shutil
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Optional
from datetime import datetime

print(f"TensorFlow version: {tf.__version__}")
print(f"GPU available: {len(tf.config.list_physical_devices('GPU')) > 0}")

# Set seeds for reproducibility
def set_seeds(seed=42):
    tf.random.set_seed(seed)
    np.random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)

set_seeds(42)

<a name="2"></a>
## 2. Experiment Configuration & Reproducibility

### Why Configuration Management Matters
- Reproduce any experiment exactly
- Compare experiments systematically
- Track what changed between versions
- Enable automated hyperparameter search

In [None]:
@dataclass
class ExperimentConfig:
    """Complete experiment configuration for reproducibility."""

    # Model
    model_name: str = "efficientnet_b0"
    num_classes: int = 2
    image_size: int = 224
    dropout_rate: float = 0.2

    # Training
    learning_rate: float = 1e-3
    batch_size: int = 32
    epochs: int = 50
    optimizer: str = "adamw"
    weight_decay: float = 1e-4

    # Data
    augmentation: bool = True
    validation_split: float = 0.2
    shuffle_buffer: int = 1000

    # Infrastructure
    seed: int = 42
    mixed_precision: bool = False
    xla_compile: bool = False

    # Metadata
    experiment_name: str = "default"
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())

    def to_dict(self):
        return asdict(self)

    def save(self, path):
        with open(path, 'w') as f:
            json.dump(self.to_dict(), f, indent=2)
        print(f"Config saved to {path}")

    @classmethod
    def load(cls, path):
        with open(path) as f:
            data = json.load(f)
        return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})

    def diff(self, other):
        """Show differences between two configs."""
        diffs = {}
        for key in self.__dataclass_fields__:
            v1, v2 = getattr(self, key), getattr(other, key)
            if v1 != v2:
                diffs[key] = {'current': v1, 'other': v2}
        return diffs

# Demo
config_v1 = ExperimentConfig(experiment_name="baseline")
config_v2 = ExperimentConfig(
    experiment_name="improved",
    learning_rate=3e-4,
    batch_size=64,
    augmentation=True,
    dropout_rate=0.3,
    optimizer="adamw"
)

print("Config V1:", json.dumps(config_v1.to_dict(), indent=2)[:500])
print("\n" + "="*50)
print("\nDifferences (V1 vs V2):")
for k, v in config_v1.diff(config_v2).items():
    print(f"  {k}: {v['current']} → {v['other']}")

# Save & reload test
tmp_path = '/tmp/test_config.json'
config_v1.save(tmp_path)
config_loaded = ExperimentConfig.load(tmp_path)
print(f"\nConfig reload test: {'PASS' if config_loaded.to_dict() == config_v1.to_dict() else 'FAIL'}")

<a name="3"></a>
## 3. Production Data Pipelines

### Design Principles
1. **Deterministic**: Same seed → same data order (for reproducibility)
2. **Efficient**: Prefetch, cache, parallelize
3. **Robust**: Handle corrupted images, missing data
4. **Monitored**: Track data statistics for drift detection

In [None]:
class ProductionDataPipeline:
    """Production-grade tf.data pipeline with monitoring."""

    def __init__(self, config: ExperimentConfig):
        self.config = config
        self.data_stats = {}
        self.AUTOTUNE = tf.data.AUTOTUNE

    def _parse_image(self, file_path, label):
        """Robust image parsing with error handling."""
        img = tf.io.read_file(file_path)
        img = tf.image.decode_jpeg(img, channels=3)
        img = tf.image.resize(img, [self.config.image_size, self.config.image_size])
        img = tf.cast(img, tf.float32) / 255.0
        return img, label

    def _augment(self, image, label):
        """Data augmentation pipeline."""
        image = tf.image.random_flip_left_right(image)
        image = tf.image.random_brightness(image, 0.2)
        image = tf.image.random_contrast(image, 0.8, 1.2)
        image = tf.image.random_saturation(image, 0.8, 1.2)
        image = tf.clip_by_value(image, 0.0, 1.0)
        return image, label

    def build(self, images, labels, is_training=True):
        """Build optimized tf.data pipeline."""
        dataset = tf.data.Dataset.from_tensor_slices((images, labels))

        if is_training:
            dataset = dataset.shuffle(
                buffer_size=self.config.shuffle_buffer,
                seed=self.config.seed,
                reshuffle_each_iteration=True
            )

        dataset = dataset.batch(self.config.batch_size)

        if is_training and self.config.augmentation:
            dataset = dataset.map(
                lambda x, y: (tf.map_fn(lambda img: self._augment(img, tf.constant(0))[0], x), y),
                num_parallel_calls=self.AUTOTUNE
            )

        dataset = dataset.prefetch(self.AUTOTUNE)
        return dataset

    def compute_stats(self, dataset, name="dataset"):
        """Compute and store dataset statistics for drift detection."""
        all_means, all_stds = [], []
        for batch, _ in dataset.take(10):
            all_means.append(tf.reduce_mean(batch, axis=[0, 1, 2]).numpy())
            all_stds.append(tf.math.reduce_std(batch, axis=[0, 1, 2]).numpy())

        stats = {
            'mean': np.mean(all_means, axis=0).tolist(),
            'std': np.mean(all_stds, axis=0).tolist(),
            'num_batches_sampled': len(all_means)
        }
        self.data_stats[name] = stats
        return stats

# Demo with synthetic data
config = ExperimentConfig()
pipeline = ProductionDataPipeline(config)

# Create dummy data
dummy_images = np.random.rand(100, 224, 224, 3).astype(np.float32)
dummy_labels = np.random.randint(0, 2, 100)

train_ds = pipeline.build(dummy_images[:80], dummy_labels[:80], is_training=True)
val_ds = pipeline.build(dummy_images[80:], dummy_labels[80:], is_training=False)

stats = pipeline.compute_stats(train_ds, "training")
print(f"Training data statistics:")
print(f"  Channel means: [{stats['mean'][0]:.4f}, {stats['mean'][1]:.4f}, {stats['mean'][2]:.4f}]")
print(f"  Channel stds:  [{stats['std'][0]:.4f}, {stats['std'][1]:.4f}, {stats['std'][2]:.4f}]")

for batch_x, batch_y in train_ds.take(1):
    print(f"\nBatch shape: {batch_x.shape}")
    print(f"Label shape: {batch_y.shape}")

<a name="4"></a>
## 4. Model Versioning & Registry

### Why Version Models?
- Track which model is in production
- Roll back to previous versions if new model degrades
- Compare model versions objectively
- Audit trail for compliance

In [None]:
class ModelRegistry:
    """Simple model registry for versioning and managing models."""

    def __init__(self, base_dir='model_registry'):
        self.base_dir = base_dir
        os.makedirs(base_dir, exist_ok=True)

    def save_model(self, model, config, metrics, version=None):
        """Save model with config and metrics."""
        if version is None:
            existing = [d for d in os.listdir(self.base_dir) if d.startswith('v')]
            version = f"v{len(existing) + 1}"

        version_dir = os.path.join(self.base_dir, version)
        os.makedirs(version_dir, exist_ok=True)

        # Save model weights (faster than full SavedModel for registry)
        model.save_weights(os.path.join(version_dir, 'model_weights.h5'))

        # Save model architecture as JSON
        model_json = model.to_json()
        with open(os.path.join(version_dir, 'architecture.json'), 'w') as f:
            f.write(model_json)

        # Save config
        config.save(os.path.join(version_dir, 'config.json'))

        # Save metrics
        with open(os.path.join(version_dir, 'metrics.json'), 'w') as f:
            json.dump(metrics, f, indent=2)

        # Save model summary
        summary_lines = []
        model.summary(print_fn=lambda x: summary_lines.append(x))
        with open(os.path.join(version_dir, 'summary.txt'), 'w') as f:
            f.write('\n'.join(summary_lines))

        print(f"Model saved: {version_dir}")
        return version

    def load_model(self, version):
        """Load a specific model version."""
        version_dir = os.path.join(self.base_dir, version)

        with open(os.path.join(version_dir, 'architecture.json')) as f:
            model = keras.models.model_from_json(f.read())
        model.load_weights(os.path.join(version_dir, 'model_weights.h5'))

        with open(os.path.join(version_dir, 'metrics.json')) as f:
            metrics = json.load(f)

        config = ExperimentConfig.load(os.path.join(version_dir, 'config.json'))
        return model, config, metrics

    def list_versions(self):
        """List all model versions with their metrics."""
        versions = []
        for v in sorted(os.listdir(self.base_dir)):
            metrics_path = os.path.join(self.base_dir, v, 'metrics.json')
            if os.path.exists(metrics_path):
                with open(metrics_path) as f:
                    metrics = json.load(f)
                versions.append({'version': v, **metrics})
        return versions

    def get_best_model(self, metric='val_accuracy', higher_is_better=True):
        """Get the best model version by a specific metric."""
        versions = self.list_versions()
        if not versions:
            return None
        key_fn = lambda x: x.get(metric, 0)
        best = max(versions, key=key_fn) if higher_is_better else min(versions, key=key_fn)
        return best['version']

# Demo
registry = ModelRegistry('/tmp/model_registry')

# Create and save a simple model
model = keras.Sequential([
    keras.layers.Dense(64, activation='relu', input_shape=(224*224*3,)),
    keras.layers.Dropout(0.2),
    keras.layers.Dense(2, activation='softmax')
])
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

config = ExperimentConfig(experiment_name="baseline_v1", learning_rate=1e-3)
metrics_v1 = {'val_accuracy': 0.85, 'val_loss': 0.42, 'train_accuracy': 0.92}
registry.save_model(model, config, metrics_v1, version='v1')

config_v2 = ExperimentConfig(experiment_name="improved_v2", learning_rate=3e-4, dropout_rate=0.3)
metrics_v2 = {'val_accuracy': 0.91, 'val_loss': 0.28, 'train_accuracy': 0.94}
registry.save_model(model, config_v2, metrics_v2, version='v2')

# List and compare
print("\nAll model versions:")
for v in registry.list_versions():
    print(f"  {v['version']}: val_acc={v['val_accuracy']:.2f}, val_loss={v['val_loss']:.2f}")

best = registry.get_best_model('val_accuracy')
print(f"\nBest model: {best}")

<a name="5"></a>
## 5. Model Optimization for Deployment

### Why Optimize?
- **Reduce latency**: Faster inference for real-time applications
- **Reduce model size**: Smaller models for mobile/edge deployment
- **Reduce cost**: Less compute = lower cloud bills

### Optimization Techniques

| Technique | Size Reduction | Speed Improvement | Accuracy Impact |
|-----------|---------------|-------------------|-----------------|
| **Dynamic Quantization** | 2-4x | 2-3x | Minimal (<1%) |
| **Float16 Quantization** | 2x | 1.5-2x | Minimal |
| **Int8 Quantization** | 4x | 2-4x | Small (1-3%) |
| **Pruning** | 2-10x | Variable | Small-Medium |
| **Knowledge Distillation** | Variable | Variable | Small |

In [None]:
def convert_to_tflite(model, optimization='none', representative_data=None):
    """Convert Keras model to TF Lite with various optimizations."""

    # Save model to temp dir first
    tmp_dir = tempfile.mkdtemp()
    model.save(os.path.join(tmp_dir, 'saved_model'))

    converter = tf.lite.TFLiteConverter.from_saved_model(os.path.join(tmp_dir, 'saved_model'))

    if optimization == 'dynamic':
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
    elif optimization == 'float16':
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        converter.target_spec.supported_types = [tf.float16]
    elif optimization == 'int8' and representative_data is not None:
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        converter.representative_dataset = representative_data

    tflite_model = converter.convert()

    # Cleanup
    shutil.rmtree(tmp_dir)

    return tflite_model


def benchmark_model(model_or_tflite, test_input, num_runs=50, is_tflite=False):
    """Benchmark inference speed and model size."""

    if is_tflite:
        interpreter = tf.lite.Interpreter(model_content=model_or_tflite)
        interpreter.allocate_tensors()
        input_details = interpreter.get_input_details()
        output_details = interpreter.get_output_details()

        # Warmup
        sample = test_input[:1].astype(np.float32)
        interpreter.resize_tensor_input(input_details[0]['index'], sample.shape)
        interpreter.allocate_tensors()

        times = []
        for _ in range(num_runs):
            interpreter.set_tensor(input_details[0]['index'], sample)
            start = time.time()
            interpreter.invoke()
            times.append((time.time() - start) * 1000)

        size_mb = len(model_or_tflite) / (1024 * 1024)
    else:
        # Keras model
        sample = test_input[:1]
        _ = model_or_tflite(sample)  # Warmup

        times = []
        for _ in range(num_runs):
            start = time.time()
            _ = model_or_tflite(sample, training=False)
            times.append((time.time() - start) * 1000)

        # Estimate size
        size_mb = model_or_tflite.count_params() * 4 / (1024 * 1024)

    return {
        'mean_latency_ms': np.mean(times),
        'p50_latency_ms': np.percentile(times, 50),
        'p95_latency_ms': np.percentile(times, 95),
        'p99_latency_ms': np.percentile(times, 99),
        'model_size_mb': size_mb
    }

# Build a proper model for benchmarking
model = keras.Sequential([
    keras.layers.Flatten(input_shape=(32, 32, 3)),
    keras.layers.Dense(256, activation='relu'),
    keras.layers.Dense(128, activation='relu'),
    keras.layers.Dense(10, activation='softmax')
], name='benchmark_model')
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')

test_input = np.random.rand(10, 32, 32, 3).astype(np.float32)

# Benchmark original
print("Benchmarking original Keras model...")
original_results = benchmark_model(model, test_input, is_tflite=False)

# Convert and benchmark TFLite variants
results = {'Original (float32)': original_results}

for opt_name in ['none', 'dynamic', 'float16']:
    print(f"Converting to TFLite ({opt_name})...")
    try:
        tflite_model = convert_to_tflite(model, optimization=opt_name)
        bench = benchmark_model(tflite_model, test_input.reshape(10, 32, 32, 3), is_tflite=True)
        results[f'TFLite ({opt_name})'] = bench
    except Exception as e:
        print(f"  Error: {e}")

# Display results
print(f"\n{'Model':<25} {'Size (MB)':<12} {'Mean (ms)':<12} {'P95 (ms)':<12} {'P99 (ms)':<12}")
print("-" * 75)
for name, r in results.items():
    print(f"{name:<25} {r['model_size_mb']:<12.2f} {r['mean_latency_ms']:<12.2f} {r['p95_latency_ms']:<12.2f} {r['p99_latency_ms']:<12.2f}")

<a name="6"></a>
## 6. Serving Architecture

### Serving Options

| Method | Latency | Throughput | Complexity | Best For |
|--------|---------|-----------|------------|----------|
| **TF Serving** | Low | High | Medium | Production REST/gRPC |
| **TF Lite** | Very Low | Medium | Low | Mobile/Edge |
| **ONNX Runtime** | Low | High | Medium | Cross-framework |
| **Triton** | Low | Very High | High | Multi-model, multi-GPU |
| **Flask/FastAPI** | Medium | Low | Low | Prototyping |

### TF Serving Architecture
```
Client → Load Balancer → TF Serving Container(s) → Model Registry
                              ↓
                     ┌────────────────┐
                     │  REST (8501)   │
                     │  gRPC (8500)   │
                     │                │
                     │  Model Manager │
                     │  ├── v1 (warm) │
                     │  ├── v2 (warm) │
                     │  └── v3 (live) │
                     └────────────────┘
```

In [None]:
# TF Serving deployment code examples

# 1. Export model for TF Serving
def export_for_serving(model, export_path, version=1):
    """Export model in SavedModel format for TF Serving."""
    versioned_path = os.path.join(export_path, str(version))
    model.save(versioned_path)
    print(f"Model exported to: {versioned_path}")
    return versioned_path

# 2. Docker deployment command
tf_serving_docker = '''
# Pull TF Serving image
docker pull tensorflow/serving:latest

# Run TF Serving
docker run -d --name tf_serving \
  -p 8501:8501 \
  -p 8500:8500 \
  --mount type=bind,source=/path/to/models/pet_classifier,target=/models/pet_classifier \
  -e MODEL_NAME=pet_classifier \
  tensorflow/serving:latest
'''

# 3. Client code for REST API
def predict_rest_api(images, model_name="pet_classifier", server_url="http://localhost:8501"):
    """Send prediction request to TF Serving REST API."""
    import requests

    url = f"{server_url}/v1/models/{model_name}:predict"
    payload = {
        "signature_name": "serving_default",
        "instances": images.tolist()
    }

    response = requests.post(url, json=payload, timeout=10)
    response.raise_for_status()
    return response.json()['predictions']

# 4. Client code for gRPC API
grpc_client_code = '''
import grpc
import tensorflow as tf
from tensorflow_serving.apis import predict_pb2, prediction_service_pb2_grpc

channel = grpc.insecure_channel('localhost:8500')
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

request = predict_pb2.PredictRequest()
request.model_spec.name = 'pet_classifier'
request.model_spec.signature_name = 'serving_default'
request.inputs['input_1'].CopyFrom(
    tf.make_tensor_proto(images, shape=images.shape)
)

response = stub.Predict(request, timeout=10.0)
predictions = tf.make_ndarray(response.outputs['dense_1'])
'''

# 5. A/B Testing configuration
ab_config = {
    "model_config_list": {
        "config": [
            {"name": "pet_classifier", "base_path": "/models/pet_classifier",
             "model_version_policy": {"specific": {"versions": [2, 3]}}}
        ]
    }
}

print("=== TF Serving Deployment Guide ===")
print("\n1. Docker command:")
print(tf_serving_docker)
print("\n2. A/B Testing Config (model.config):")
print(json.dumps(ab_config, indent=2))

print("\n3. Health check: curl http://localhost:8501/v1/models/pet_classifier")
print("4. Predict:      curl -d '{\"instances\": [...]}' http://localhost:8501/v1/models/pet_classifier:predict")

<a name="7"></a>
## 7. Monitoring & Drift Detection

### What to Monitor

| Metric | Type | Alert Threshold | Action |
|--------|------|-----------------|--------|
| Prediction latency | Performance | P95 > 100ms | Scale up / optimize |
| Error rate | Reliability | > 1% | Investigate / rollback |
| Prediction distribution | Data drift | KL divergence > 0.1 | Investigate data |
| Feature statistics | Data drift | > 3σ from baseline | Retrain trigger |
| Confidence score | Model health | Mean < 0.7 | Retrain / check data |

### Types of Drift
- **Data Drift**: Input distribution changes (e.g., new camera, different lighting)
- **Concept Drift**: Relationship between features and labels changes
- **Prediction Drift**: Output distribution shifts

In [None]:
class ModelMonitor:
    """Production model monitoring with drift detection."""

    def __init__(self, reference_stats=None):
        self.predictions_log = []
        self.reference_stats = reference_stats or {}
        self.alerts = []

    def log_prediction(self, prediction, confidence, latency_ms, metadata=None):
        """Log a single prediction event."""
        self.predictions_log.append({
            'timestamp': time.time(),
            'prediction': int(prediction),
            'confidence': float(confidence),
            'latency_ms': float(latency_ms),
            'metadata': metadata or {}
        })

    def detect_data_drift(self, current_batch_stats):
        """Detect data drift using statistical comparison."""
        if not self.reference_stats:
            return {'drift_detected': False, 'message': 'No reference stats set'}

        ref_mean = np.array(self.reference_stats['mean'])
        ref_std = np.array(self.reference_stats['std'])
        cur_mean = np.array(current_batch_stats['mean'])

        # Z-score based drift detection
        z_scores = np.abs(cur_mean - ref_mean) / (ref_std + 1e-7)
        drift_detected = bool(np.any(z_scores > 3.0))

        result = {
            'drift_detected': drift_detected,
            'z_scores': z_scores.tolist(),
            'max_z_score': float(np.max(z_scores)),
            'drifted_channels': [i for i, z in enumerate(z_scores) if z > 3.0]
        }

        if drift_detected:
            self.alerts.append({
                'type': 'DATA_DRIFT',
                'severity': 'HIGH' if np.max(z_scores) > 5.0 else 'MEDIUM',
                'timestamp': time.time(),
                'details': result
            })

        return result

    def detect_prediction_drift(self, window_size=100):
        """Detect drift in prediction distribution."""
        if len(self.predictions_log) < window_size * 2:
            return {'drift_detected': False, 'message': 'Not enough data'}

        recent = [p['confidence'] for p in self.predictions_log[-window_size:]]
        baseline = [p['confidence'] for p in self.predictions_log[-window_size*2:-window_size]]

        recent_mean = np.mean(recent)
        baseline_mean = np.mean(baseline)
        baseline_std = np.std(baseline)

        drift_score = abs(recent_mean - baseline_mean) / max(baseline_std, 1e-7)

        recent_class_dist = np.bincount([p['prediction'] for p in self.predictions_log[-window_size:]], minlength=2) / window_size
        baseline_class_dist = np.bincount([p['prediction'] for p in self.predictions_log[-window_size*2:-window_size]], minlength=2) / window_size

        return {
            'drift_detected': drift_score > 2.0,
            'drift_score': float(drift_score),
            'recent_confidence': float(recent_mean),
            'baseline_confidence': float(baseline_mean),
            'recent_class_distribution': recent_class_dist.tolist(),
            'baseline_class_distribution': baseline_class_dist.tolist()
        }

    def get_performance_summary(self, last_n=None):
        """Get performance metrics summary."""
        logs = self.predictions_log[-last_n:] if last_n else self.predictions_log
        if not logs:
            return {}

        latencies = [l['latency_ms'] for l in logs]
        confidences = [l['confidence'] for l in logs]

        return {
            'total_predictions': len(logs),
            'latency_mean_ms': float(np.mean(latencies)),
            'latency_p50_ms': float(np.percentile(latencies, 50)),
            'latency_p95_ms': float(np.percentile(latencies, 95)),
            'latency_p99_ms': float(np.percentile(latencies, 99)),
            'confidence_mean': float(np.mean(confidences)),
            'confidence_min': float(np.min(confidences)),
            'low_confidence_ratio': float(np.mean(np.array(confidences) < 0.7)),
            'num_alerts': len(self.alerts)
        }

# Demo: Simulate production monitoring
monitor = ModelMonitor(reference_stats={'mean': [0.5, 0.5, 0.5], 'std': [0.2, 0.2, 0.2]})

# Simulate normal predictions
np.random.seed(42)
for i in range(200):
    pred = np.random.choice([0, 1])
    conf = np.random.uniform(0.7, 0.99)
    latency = np.random.exponential(20) + 10
    monitor.log_prediction(pred, conf, latency)

# Simulate drift (lower confidence in recent predictions)
for i in range(100):
    pred = np.random.choice([0, 1], p=[0.3, 0.7])  # Shifted distribution
    conf = np.random.uniform(0.5, 0.85)  # Lower confidence
    latency = np.random.exponential(30) + 15  # Higher latency
    monitor.log_prediction(pred, conf, latency)

# Check for drift
pred_drift = monitor.detect_prediction_drift(window_size=100)
data_drift = monitor.detect_data_drift({'mean': [0.65, 0.48, 0.52]})

print("=== Performance Summary ===")
summary = monitor.get_performance_summary()
for k, v in summary.items():
    print(f"  {k}: {v:.4f}" if isinstance(v, float) else f"  {k}: {v}")

print(f"\n=== Prediction Drift ===")
print(f"  Drift detected: {pred_drift['drift_detected']}")
print(f"  Drift score: {pred_drift['drift_score']:.3f}")
print(f"  Recent confidence: {pred_drift['recent_confidence']:.3f}")
print(f"  Baseline confidence: {pred_drift['baseline_confidence']:.3f}")

print(f"\n=== Data Drift ===")
print(f"  Drift detected: {data_drift['drift_detected']}")
print(f"  Max Z-score: {data_drift['max_z_score']:.3f}")

<a name="8"></a>
## 8. Automated Model Testing

### Test Categories

| Category | What It Tests | When to Run |
|----------|-------------|-------------|
| **Unit Tests** | Individual components (data loading, preprocessing) | Every commit |
| **Model Tests** | Accuracy, latency, size thresholds | Before promotion |
| **Integration Tests** | End-to-end pipeline (data → prediction) | Before deployment |
| **Regression Tests** | Performance vs. previous version | Before replacing model |

In [None]:
class ModelTestSuite:
    """Automated model testing for CI/CD pipelines."""

    def __init__(self, model, config):
        self.model = model
        self.config = config
        self.results = []

    def _record(self, test_name, passed, details=""):
        self.results.append({'test': test_name, 'passed': passed, 'details': details})
        status = "PASS" if passed else "FAIL"
        print(f"  [{status}] {test_name}: {details}")

    def test_model_output_shape(self, input_shape):
        """Test that model produces expected output shape."""
        dummy = np.random.rand(1, *input_shape).astype(np.float32)
        output = self.model(dummy, training=False)
        expected = (1, self.config.num_classes)
        passed = tuple(output.shape) == expected
        self._record("Output Shape", passed, f"Expected {expected}, got {tuple(output.shape)}")

    def test_output_probabilities(self, input_shape):
        """Test that outputs are valid probabilities (sum to 1)."""
        dummy = np.random.rand(2, *input_shape).astype(np.float32)
        output = self.model(dummy, training=False).numpy()
        sums = output.sum(axis=1)
        passed = np.allclose(sums, 1.0, atol=1e-5)
        self._record("Output Probabilities", passed, f"Sums: {sums}")

    def test_inference_latency(self, input_shape, max_latency_ms=100):
        """Test inference latency meets requirements."""
        dummy = np.random.rand(1, *input_shape).astype(np.float32)
        _ = self.model(dummy, training=False)  # Warmup

        times = []
        for _ in range(30):
            start = time.time()
            _ = self.model(dummy, training=False)
            times.append((time.time() - start) * 1000)

        p95 = np.percentile(times, 95)
        passed = p95 <= max_latency_ms
        self._record("Inference Latency", passed, f"P95={p95:.1f}ms (max={max_latency_ms}ms)")

    def test_model_size(self, max_size_mb=500):
        """Test model size meets requirements."""
        size_mb = self.model.count_params() * 4 / (1024 * 1024)
        passed = size_mb <= max_size_mb
        self._record("Model Size", passed, f"{size_mb:.1f}MB (max={max_size_mb}MB)")

    def test_prediction_consistency(self, input_shape, num_runs=5):
        """Test that predictions are deterministic."""
        dummy = np.random.rand(1, *input_shape).astype(np.float32)
        predictions = [self.model(dummy, training=False).numpy() for _ in range(num_runs)]
        consistent = all(np.allclose(predictions[0], p, atol=1e-5) for p in predictions[1:])
        self._record("Prediction Consistency", consistent, f"Checked {num_runs} runs")

    def test_handles_batch_sizes(self, input_shape, batch_sizes=[1, 4, 16, 32]):
        """Test model handles various batch sizes."""
        all_passed = True
        for bs in batch_sizes:
            try:
                dummy = np.random.rand(bs, *input_shape).astype(np.float32)
                output = self.model(dummy, training=False)
                assert output.shape[0] == bs
            except Exception as e:
                all_passed = False
                break
        self._record("Batch Size Handling", all_passed, f"Tested batches: {batch_sizes}")

    def run_all(self, input_shape):
        """Run all tests."""
        print("\n" + "="*60)
        print("MODEL TEST SUITE")
        print("="*60)

        self.test_model_output_shape(input_shape)
        self.test_output_probabilities(input_shape)
        self.test_inference_latency(input_shape)
        self.test_model_size()
        self.test_prediction_consistency(input_shape)
        self.test_handles_batch_sizes(input_shape)

        passed = sum(1 for r in self.results if r['passed'])
        total = len(self.results)
        print(f"\n{'='*60}")
        print(f"Results: {passed}/{total} tests passed")
        print(f"{'='*60}")

        return all(r['passed'] for r in self.results)

# Run tests
model = keras.Sequential([
    keras.layers.Flatten(input_shape=(32, 32, 3)),
    keras.layers.Dense(128, activation='relu'),
    keras.layers.Dense(2, activation='softmax')
])
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')

config = ExperimentConfig(num_classes=2)
test_suite = ModelTestSuite(model, config)
all_passed = test_suite.run_all(input_shape=(32, 32, 3))

<a name="9"></a>
## 9. CI/CD for ML

### Pipeline Architecture

```
┌──────────┐    ┌──────────────┐    ┌──────────┐    ┌───────────┐    ┌──────────┐
│  Code    │───→│  Train &     │───→│  Test    │───→│  Stage    │───→│Production│
│  Commit  │    │  Evaluate    │    │  Suite   │    │  Deploy   │    │  Deploy  │
└──────────┘    └──────────────┘    └──────────┘    └───────────┘    └──────────┘
                       │                  │                │                │
                  TensorBoard        Pass/Fail      Canary Test     Full Traffic
                  Experiment ID      Gate            A/B Testing     Monitoring
```

### GitHub Actions Example

In [None]:
# GitHub Actions workflow for ML CI/CD
github_actions_yaml = '''
name: ML Pipeline

on:
  push:
    branches: [main]
    paths: ['src/**', 'configs/**', 'data/**']

jobs:
  train-and-test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.10'

      - name: Install dependencies
        run: pip install -r requirements.txt

      - name: Train model
        run: python src/train.py --config configs/production.json
        env:
          WANDB_API_KEY: ${{ secrets.WANDB_API_KEY }}

      - name: Run model tests
        run: python -m pytest tests/model_tests.py -v

      - name: Run inference benchmarks
        run: python src/benchmark.py --model outputs/model

      - name: Upload model artifact
        uses: actions/upload-artifact@v4
        with:
          name: trained-model
          path: outputs/model/

  deploy-staging:
    needs: train-and-test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - name: Download model
        uses: actions/download-artifact@v4
        with:
          name: trained-model

      - name: Deploy to staging
        run: |
          # Push to model registry
          # Deploy canary (10% traffic)
          echo "Deploying to staging..."

  promote-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - name: Promote to production
        run: |
          # Run smoke tests against staging
          # Gradually shift traffic (10% -> 50% -> 100%)
          echo "Promoting to production..."
'''

# Model promotion gates
promotion_gates = {
    "staging": {
        "accuracy_threshold": 0.90,
        "latency_p95_ms": 100,
        "model_size_mb": 200,
        "tests_pass": True,
        "no_data_drift": True
    },
    "production": {
        "accuracy_threshold": 0.92,
        "canary_error_rate": 0.01,
        "canary_latency_p99_ms": 200,
        "canary_duration_hours": 24,
        "rollback_on_regression": True
    }
}

print("=== GitHub Actions ML Pipeline ===")
print(github_actions_yaml[:500] + "\n...")

print("\n=== Promotion Gates ===")
print(json.dumps(promotion_gates, indent=2))

<a name="10"></a>
## 10. Distributed Training & Scaling

### Distribution Strategies

| Strategy | Use Case | Communication |
|----------|----------|---------------|
| **MirroredStrategy** | Single machine, multi-GPU | All-reduce |
| **MultiWorkerMirrored** | Multiple machines | All-reduce |
| **TPUStrategy** | Google TPU pods | Custom interconnect |
| **ParameterServer** | Large-scale, heterogeneous | Param server |

In [None]:
# Distributed training setup
print("=== Distributed Training Strategies ===\n")

# 1. MirroredStrategy (most common - single machine, multi-GPU)
strategy = tf.distribute.MirroredStrategy()
print(f"MirroredStrategy: {strategy.num_replicas_in_sync} device(s)")

# 2. Using strategy to build and train model
def train_with_strategy(strategy, config):
    """Train model with distribution strategy."""
    with strategy.scope():
        model = keras.Sequential([
            keras.layers.Flatten(input_shape=(32, 32, 3)),
            keras.layers.Dense(256, activation='relu'),
            keras.layers.BatchNormalization(),
            keras.layers.Dropout(config.dropout_rate),
            keras.layers.Dense(128, activation='relu'),
            keras.layers.BatchNormalization(),
            keras.layers.Dense(config.num_classes, activation='softmax')
        ])

        # Scale learning rate by number of replicas
        scaled_lr = config.learning_rate * strategy.num_replicas_in_sync
        optimizer = keras.optimizers.Adam(learning_rate=scaled_lr)

        model.compile(
            optimizer=optimizer,
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )

    return model

config = ExperimentConfig(num_classes=10)
model = train_with_strategy(strategy, config)
print(f"\nModel built with {strategy.num_replicas_in_sync} replicas")
print(f"Effective batch size: {config.batch_size} × {strategy.num_replicas_in_sync} = {config.batch_size * strategy.num_replicas_in_sync}")
print(f"Scaled learning rate: {config.learning_rate} × {strategy.num_replicas_in_sync} = {config.learning_rate * strategy.num_replicas_in_sync}")

# 3. Mixed precision training
mixed_precision_code = '''
# Enable mixed precision for faster training on modern GPUs
tf.keras.mixed_precision.set_global_policy('mixed_float16')

# Your model will automatically use float16 for compute
# and float32 for variable storage
model = build_model(config)
model.compile(
    optimizer=keras.optimizers.Adam(1e-3),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

# Note: The output layer should still use float32
# Add a cast layer if needed:
# outputs = layers.Activation('softmax', dtype='float32')(x)
'''

print("\n=== Mixed Precision Training ===")
print(mixed_precision_code)

# 4. Cost optimization tips
print("\n=== Cost Optimization Tips ===")
tips = [
    "1. Use spot/preemptible instances (60-90% savings)",
    "2. Start with smaller models and scale up only if needed",
    "3. Use mixed precision training (2x faster on modern GPUs)",
    "4. Cache preprocessed data (avoid recomputing augmentations)",
    "5. Use gradient accumulation instead of larger GPUs",
    "6. Profile before optimizing (use TF Profiler)",
    "7. Implement early stopping to avoid wasted training",
    "8. Use transfer learning instead of training from scratch",
]
for tip in tips:
    print(f"  {tip}")

<a name="11"></a>
## 11. End-to-End Production Pipeline

### Complete System Architecture

```
                    ┌─────────────────────────────────────────────────────────┐
                    │                   ML Platform                           │
                    │                                                         │
  ┌──────────┐     │  ┌──────────┐   ┌──────────┐   ┌──────────────────┐   │
  │  Raw     │────→│  │  Data    │──→│  Feature │──→│    Training       │   │
  │  Data    │     │  │  Pipeline│   │  Store   │   │    Pipeline       │   │
  └──────────┘     │  └──────────┘   └──────────┘   └────────┬─────────┘   │
                    │                                          │             │
                    │                                 ┌────────▼─────────┐   │
                    │                                 │  Model Registry  │   │
                    │                                 │  ├── v1          │   │
                    │                                 │  ├── v2          │   │
                    │                                 │  └── v3 (latest) │   │
                    │                                 └────────┬─────────┘   │
                    │                                          │             │
                    │  ┌──────────┐   ┌──────────┐   ┌────────▼─────────┐   │
  ┌──────────┐     │  │  API     │←──│  Model   │←──│  Automated       │   │
  │  Client  │←───→│  │  Gateway │   │  Server  │   │  Tests           │   │
  └──────────┘     │  └──────────┘   └──────────┘   └──────────────────┘   │
                    │        │                                              │
                    │  ┌─────▼────────────────────────────────────────┐    │
                    │  │            Monitoring & Alerting             │    │
                    │  │  • Latency metrics  • Data drift detection   │    │
                    │  │  • Error rates      • Prediction drift       │    │
                    │  │  • Throughput        • Retrain triggers       │    │
                    │  └─────────────────────────────────────────────┘    │
                    └─────────────────────────────────────────────────────┘
```

<a name="12"></a>
## 12. Architect-Level Exercises

### Exercise 1: Design for Scale
Design a system that handles **10,000 predictions per second** for image classification:
- What serving infrastructure would you use?
- How would you handle cold starts?
- What's your caching strategy?
- How do you handle model updates without downtime?

### Exercise 2: Automated Retraining Pipeline
Create a complete retraining pipeline that:
- Monitors for data drift continuously
- Triggers retraining when drift exceeds threshold
- Trains new model with latest data
- Runs automated test suite
- Deploys via canary (10% → 50% → 100%)
- Rolls back automatically if metrics degrade

### Exercise 3: Cost Optimization
Given a startup with \$1000/month compute budget:
- Design the most cost-effective training pipeline
- Choose architecture that maximizes accuracy per dollar
- Implement efficient serving for 100 req/sec
- Plan for scaling to 10x traffic

### Exercise 4: Multi-Model System
Design a system serving 5 different ML models:
- Image classifier (200ms SLA)
- Text sentiment (50ms SLA)
- Recommendation engine (100ms SLA)
- Fraud detection (10ms SLA)
- Content moderation (500ms SLA)

How would you architect this? Consider:
- Shared vs. dedicated infrastructure
- Resource allocation and priority
- Monitoring across models
- Unified deployment pipeline

---

**Congratulations!** You've covered the complete MLOps lifecycle from experiment configuration to production deployment at scale.

### Recommended Tools for Production
| Category | Tools |
|----------|-------|
| **Experiment Tracking** | MLflow, Weights & Biases, TensorBoard |
| **Model Registry** | MLflow, Vertex AI, SageMaker |
| **Serving** | TF Serving, Triton, Seldon, BentoML |
| **Monitoring** | Prometheus + Grafana, WhyLabs, Evidently |
| **Orchestration** | Kubeflow, Airflow, Prefect |
| **CI/CD** | GitHub Actions, GitLab CI, Jenkins |