## This notebook requires GPU

This lab must be run in Google Colab in order to use GPU acceleration for model training. Click the button below to open this notebook in Colab, then set your runtime to GPU:

**Runtime > Change Runtime Type > T4 GPU**

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/scott2b/coursera-msds-public/blob/main/notebooks/6_production_deployment_scaling.ipynb)

# 🚀 Production Deployment and Scaling

This notebook covers everything you need to know about deploying LLM classification systems in production environments, from basic serving to advanced scaling strategies.

## 🎯 Learning Objectives

By the end of this notebook, you will:
1. Master production deployment strategies for LLMs
2. Implement scalable serving architectures
3. Optimize models for production workloads
4. Handle high-throughput inference scenarios
5. Implement monitoring and observability
6. Manage model versioning and updates
7. Handle production failures and rollbacks
8. Implement cost-effective scaling strategies

## 🔧 Prerequisites

- Completed Notebooks 1-3 (Fundamentals, vLLM, Fine-tuning)
- Understanding of REST APIs and web services
- Basic knowledge of containerization (Docker)
- Familiarity with cloud platforms (AWS, GCP, Azure)

In [None]:
# Install packages not pre-installed in Colab
!pip install fastapi uvicorn
!pip install transformers accelerate
!pip install psutil

In [None]:
import torch
import asyncio
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import uvicorn
import time
import psutil
import threading
from typing import List, Dict, Any, Optional
import logging
import json
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("🚀 Production Deployment Environment Ready!")
print(f"FastAPI version: Ready for serving")
print(f"AsyncIO support: {asyncio.iscoroutinefunction(lambda: None)}")

## 🏗️ Production Architecture Patterns

Understanding different deployment architectures for LLMs.

In [None]:
# Production architecture patterns
architectures = {
    "Single Model Service": {
        "description": "Simple FastAPI service with one model",
        "scale": "Low (1-10 req/s)",
        "complexity": "Low",
        "cost": "Low",
        "use_case": "Prototyping, small applications",
        "components": ["FastAPI", "Single GPU", "Load Balancer"]
    },
    "Model Ensemble": {
        "description": "Multiple specialized models for different tasks",
        "scale": "Medium (10-100 req/s)",
        "complexity": "Medium",
        "cost": "Medium",
        "use_case": "Multi-task applications",
        "components": ["Router", "Multiple Models", "Task Classifier"]
    },
    "Distributed Serving": {
        "description": "vLLM with Ray for distributed inference",
        "scale": "High (100-1000+ req/s)",
        "complexity": "High",
        "cost": "High",
        "use_case": "Large-scale production",
        "components": ["vLLM", "Ray", "Model Sharding", "Load Balancer"]
    },
    "Serverless": {
        "description": "On-demand model serving with auto-scaling",
        "scale": "Variable (pay per use)",
        "complexity": "Medium",
        "cost": "Variable",
        "use_case": "Variable load, cost optimization",
        "components": ["Lambda", "API Gateway", "Cloud Storage"]
    },
    "Edge Deployment": {
        "description": "Models deployed on edge devices",
        "scale": "Local (device-dependent)",
        "complexity": "High",
        "cost": "Low",
        "use_case": "Offline, privacy-sensitive applications",
        "components": ["ONNX", "Quantized Models", "Local Hardware"]
    }
}

# Display architecture comparison
import pandas as pd
df_arch = pd.DataFrame.from_dict(architectures, orient='index')
df_arch.index.name = 'Architecture'
print("🏗️  Production Architecture Patterns:")
print(df_arch[['description', 'scale', 'use_case']].to_string())

## ⚡ FastAPI Service Implementation

Building a production-ready LLM classification service.

In [None]:
# Production service implementation
class ProductionLLMService:
    """Production-ready LLM service with comprehensive features"""

    def __init__(self, model_name: str = "microsoft/DialoGPT-small"):
        self.model_name = model_name
        self.model = None
        self.tokenizer = None
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Metrics
        self.request_count = 0
        self.total_latency = 0.0
        self.error_count = 0
        self.start_time = time.time()

        # Load model
        self._load_model()

        logger.info(f"✅ Production service initialized with {model_name}")

    def _load_model(self):
        """Load and optimize the model"""
        try:
            from transformers import AutoTokenizer, AutoModelForCausalLM

            logger.info(f"Loading model: {self.model_name}")

            self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
            if self.tokenizer.pad_token is None:
                self.tokenizer.pad_token = self.tokenizer.eos_token

            # Load with optimizations
            self.model = AutoModelForCausalLM.from_pretrained(
                self.model_name,
                torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
                device_map="auto" if torch.cuda.is_available() else None,
                low_cpu_mem_usage=True,
            )

            self.model.eval()  # Set to evaluation mode

            logger.info("✅ Model loaded successfully")

        except Exception as e:
            logger.error(f"❌ Failed to load model: {e}")
            raise

    async def classify_text(self, text: str, labels: List[str] = None) -> Dict[str, Any]:
        """Async classification with comprehensive error handling"""

        start_time = time.time()
        self.request_count += 1

        try:
            if not text or not isinstance(text, str):
                raise ValueError("Text must be a non-empty string")

            if labels is None:
                labels = ["positive", "negative", "neutral"]

            # Format prompt
            prompt = f"""Classify this text into one of: {', '.join(labels)}

Text: {text}

Classification:"""

            # Tokenize
            inputs = self.tokenizer(
                prompt,
                return_tensors="pt",
                truncation=True,
                max_length=512
            )

            if torch.cuda.is_available():
                inputs = {k: v.cuda() for k, v in inputs.items()}

            # Generate
            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_new_tokens=20,
                    temperature=0.1,
                    do_sample=False,
                    pad_token_id=self.tokenizer.eos_token_id,
                    num_return_sequences=1
                )

            # Decode response
            generated_text = self.tokenizer.decode(
                outputs[0][len(inputs["input_ids"][0]):],
                skip_special_tokens=True
            ).strip()

            # Extract classification
            prediction = self._extract_prediction(generated_text, labels)

            # Calculate latency
            latency = time.time() - start_time
            self.total_latency += latency

            return {
                "prediction": prediction,
                "confidence": 0.85,  # Simplified confidence
                "latency": latency,
                "model": self.model_name,
                "timestamp": datetime.now().isoformat()
            }

        except Exception as e:
            self.error_count += 1
            logger.error(f"Classification error: {e}")
            raise HTTPException(status_code=500, detail=f"Classification failed: {str(e)}")

    def _extract_prediction(self, generated_text: str, labels: List[str]) -> str:
        """Extract prediction from generated text"""
        generated_lower = generated_text.lower()

        for label in labels:
            if label.lower() in generated_lower:
                return label

        return labels[0]  # Default to first label

    def get_metrics(self) -> Dict[str, Any]:
        """Get comprehensive service metrics"""
        uptime = time.time() - self.start_time
        avg_latency = self.total_latency / max(self.request_count, 1)
        error_rate = self.error_count / max(self.request_count, 1)

        return {
            "uptime_seconds": uptime,
            "total_requests": self.request_count,
            "average_latency": avg_latency,
            "error_rate": error_rate,
            "requests_per_second": self.request_count / max(uptime, 1),
            "model_name": self.model_name,
            "device": str(self.device)
        }

# Initialize service
service = ProductionLLMService()
print("✅ Production LLM Service initialized!")

In [None]:
# FastAPI application
app = FastAPI(
    title="LLM Classification API",
    description="Production-ready LLM classification service",
    version="1.0.0",
    docs_url="/docs",
    redoc_url="/redoc"
)

# Request/Response models
class ClassificationRequest(BaseModel):
    text: str
    labels: Optional[List[str]] = None
    temperature: Optional[float] = 0.1

class ClassificationResponse(BaseModel):
    prediction: str
    confidence: float
    latency: float
    model: str
    timestamp: str

class MetricsResponse(BaseModel):
    uptime_seconds: float
    total_requests: int
    average_latency: float
    error_rate: float
    requests_per_second: float
    model_name: str
    device: str

@app.post("/classify", response_model=ClassificationResponse)
async def classify_endpoint(request: ClassificationRequest):
    """Classify text using the LLM service"""
    result = await service.classify_text(request.text, request.labels)
    return ClassificationResponse(**result)

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {
        "status": "healthy",
        "model_loaded": service.model is not None,
        "timestamp": datetime.now().isoformat()
    }

@app.get("/metrics", response_model=MetricsResponse)
async def get_metrics():
    """Get service performance metrics"""
    return MetricsResponse(**service.get_metrics())

@app.get("/models")
async def list_models():
    """List available models and configurations"""
    return {
        "current_model": service.model_name,
        "available_models": [
            "microsoft/DialoGPT-small",
            "microsoft/DialoGPT-medium",
            "microsoft/DialoGPT-large"
        ],
        "supported_tasks": ["sentiment", "classification", "analysis"]
    }

print("🚀 FastAPI application configured!")
print("📋 Available endpoints:")
print("   POST /classify - Classify text")
print("   GET  /health   - Health check")
print("   GET  /metrics  - Performance metrics")
print("   GET  /models   - Available models")

## 📊 Monitoring and Observability

Implementing comprehensive monitoring for production LLM services.

In [None]:
# Monitoring and metrics implementation
class LLMMonitor:
    """Comprehensive monitoring for LLM services"""

    def __init__(self):
        self.metrics = {
            "requests_total": 0,
            "requests_by_endpoint": {},
            "latency_histogram": [],
            "errors_total": 0,
            "errors_by_type": {},
            "model_metrics": {
                "load_time": 0,
                "inference_count": 0,
                "average_tokens": 0
            }
        }
        self.start_time = time.time()

    def record_request(self, endpoint: str, latency: float, success: bool = True):
        """Record a request with its metrics"""
        self.metrics["requests_total"] += 1

        if endpoint not in self.metrics["requests_by_endpoint"]:
            self.metrics["requests_by_endpoint"][endpoint] = 0
        self.metrics["requests_by_endpoint"][endpoint] += 1

        self.metrics["latency_histogram"].append(latency)

        if not success:
            self.metrics["errors_total"] += 1

    def record_error(self, error_type: str):
        """Record an error by type"""
        if error_type not in self.metrics["errors_by_type"]:
            self.metrics["errors_by_type"][error_type] = 0
        self.metrics["errors_by_type"][error_type] += 1

    def get_summary(self) -> Dict[str, Any]:
        """Get monitoring summary"""
        uptime = time.time() - self.start_time
        latencies = self.metrics["latency_histogram"]

        summary = {
            "uptime_seconds": uptime,
            "total_requests": self.metrics["requests_total"],
            "total_errors": self.metrics["errors_total"],
            "error_rate": self.metrics["errors_total"] / max(self.metrics["requests_total"], 1),
            "requests_per_second": self.metrics["requests_total"] / max(uptime, 1),
            "average_latency": sum(latencies) / max(len(latencies), 1),
            "p95_latency": sorted(latencies)[int(len(latencies) * 0.95)] if latencies else 0,
            "p99_latency": sorted(latencies)[int(len(latencies) * 0.99)] if latencies else 0,
            "endpoint_breakdown": self.metrics["requests_by_endpoint"],
            "error_breakdown": self.metrics["errors_by_type"]
        }

        return summary

    def get_health_status(self) -> Dict[str, Any]:
        """Get system health status"""
        summary = self.get_summary()

        # Define health thresholds
        health_status = "healthy"
        issues = []

        if summary["error_rate"] > 0.05:  # 5% error rate
            health_status = "degraded"
            issues.append("High error rate")

        if summary["p95_latency"] > 5.0:  # 5 second p95 latency
            health_status = "degraded"
            issues.append("High latency")

        return {
            "status": health_status,
            "issues": issues,
            "last_check": datetime.now().isoformat(),
            "metrics": summary
        }

# Initialize monitor
monitor = LLMMonitor()
print("📊 Monitoring system initialized!")

# Test monitoring
monitor.record_request("/classify", 0.5, True)
monitor.record_request("/classify", 0.3, True)
monitor.record_error("timeout")

print("📈 Test metrics:")
print(json.dumps(monitor.get_summary(), indent=2))

## 🚀 Scaling Strategies

Implementing different scaling approaches for production workloads.

In [None]:
# Scaling implementation
class AutoScaler:
    """Intelligent auto-scaling for LLM services"""

    def __init__(self, min_instances: int = 1, max_instances: int = 10):
        self.min_instances = min_instances
        self.max_instances = max_instances
        self.current_instances = min_instances
        self.target_instances = min_instances

        # Scaling thresholds
        self.scale_up_threshold = 0.8  # 80% utilization
        self.scale_down_threshold = 0.3  # 30% utilization
        self.cooldown_period = 300  # 5 minutes
        self.last_scale_time = 0

    def evaluate_scaling(self, current_metrics: Dict[str, Any]) -> Dict[str, Any]:
        """Evaluate if scaling is needed based on current metrics"""

        current_time = time.time()

        # Check cooldown period
        if current_time - self.last_scale_time < self.cooldown_period:
            return {"action": "cooldown", "target_instances": self.current_instances}

        # Calculate utilization metrics
        cpu_utilization = psutil.cpu_percent() / 100.0
        memory_utilization = psutil.virtual_memory().percent / 100.0

        # GPU utilization if available
        gpu_utilization = 0.0
        try:
            import GPUtil
            gpus = GPUtil.getGPUs()
            if gpus:
                gpu_utilization = gpus[0].load
        except:
            pass

        # Average utilization
        avg_utilization = (cpu_utilization + memory_utilization + gpu_utilization) / 3

        # Scaling logic
        if avg_utilization > self.scale_up_threshold:
            self.target_instances = min(self.current_instances + 1, self.max_instances)
            action = "scale_up"
        elif avg_utilization < self.scale_down_threshold:
            self.target_instances = max(self.current_instances - 1, self.min_instances)
            action = "scale_down"
        else:
            action = "maintain"
            self.target_instances = self.current_instances

        # Execute scaling if needed
        if action in ["scale_up", "scale_down"]:
            self._execute_scaling(action)
            self.last_scale_time = current_time

        return {
            "action": action,
            "current_instances": self.current_instances,
            "target_instances": self.target_instances,
            "cpu_utilization": cpu_utilization,
            "memory_utilization": memory_utilization,
            "gpu_utilization": gpu_utilization,
            "avg_utilization": avg_utilization
        }

    def _execute_scaling(self, action: str):
        """Execute the scaling action"""
        if action == "scale_up":
            self.current_instances += 1
            print(f"⬆️  Scaled up to {self.current_instances} instances")
        elif action == "scale_down":
            self.current_instances -= 1
            print(f"⬇️  Scaled down to {self.current_instances} instances")

        # In a real implementation, this would:
        # 1. Launch/terminate EC2 instances
        # 2. Update load balancer
        # 3. Reconfigure Kubernetes deployment

# Test auto-scaling
scaler = AutoScaler()

# Simulate high load
print("🧪 Testing auto-scaling with simulated load...")
scaling_decision = scaler.evaluate_scaling({})
print(f"Scaling decision: {scaling_decision}")

## 🐳 Containerization and Deployment

Creating production-ready Docker containers for LLM services.

In [None]:
# Docker configuration
dockerfile_content = '''
# Multi-stage build for optimized LLM service
FROM python:3.10-slim as base

# Install system dependencies
RUN apt-get update && apt-get install -y \
    git \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Create app directory
WORKDIR /app

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Production stage
FROM base as production

# Copy application code
COPY . .

# Create non-root user
RUN useradd --create-home --shell /bin/bash app \
    && chown -R app:app /app
USER app

# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Expose port
EXPOSE 8000

# Run application
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
'''

# Save Dockerfile
with open('Dockerfile', 'w') as f:
    f.write(dockerfile_content)

print("🐳 Dockerfile created!")
print("\nTo build and run:")
print("docker build -t llm-classification .")
print("docker run -p 8000:8000 llm-classification")

# Docker Compose for multi-service deployment
docker_compose_content = '''
version: '3.8'

services:
  llm-service:
    build: .
    ports:
      - "8000:8000"
    environment:
      - MODEL_NAME=microsoft/DialoGPT-medium
      - CUDA_VISIBLE_DEVICES=0
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  load-balancer:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - llm-service

  monitoring:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
'''

# Save Docker Compose
with open('docker-compose.yml', 'w') as f:
    f.write(docker_compose_content)

print("\n🐳 Docker Compose configuration created!")
print("\nTo deploy with orchestration:")
print("docker-compose up -d")
print("docker-compose logs -f")

## ☁️ Cloud Deployment Strategies

Strategies for deploying LLMs on major cloud platforms.

In [None]:
# Cloud deployment strategies
cloud_strategies = {
    "AWS SageMaker": {
        "services": ["SageMaker Endpoints", "Lambda", "EC2"],
        "scaling": "Auto-scaling groups",
        "cost_optimization": "Spot instances, reserved instances",
        "monitoring": "CloudWatch, X-Ray",
        "best_for": "Enterprise deployments, ML workflows"
    },
    "Google Cloud AI": {
        "services": ["Vertex AI", "Cloud Run", "GKE"],
        "scaling": "Cloud Autoscaler",
        "cost_optimization": "Committed use discounts, preemptible VMs",
        "monitoring": "Cloud Monitoring, Cloud Logging",
        "best_for": "Large-scale AI applications, global distribution"
    },
    "Azure ML": {
        "services": ["Azure ML", "Container Instances", "AKS"],
        "scaling": "Azure Autoscale",
        "cost_optimization": "Reserved instances, spot VMs",
        "monitoring": "Azure Monitor, Application Insights",
        "best_for": "Enterprise integration, hybrid cloud"
    },
    "Serverless": {
        "services": ["Lambda", "Cloud Functions", "Cloud Run"],
        "scaling": "Automatic",
        "cost_optimization": "Pay per request",
        "monitoring": "CloudWatch, Cloud Monitoring",
        "best_for": "Variable load, cost-sensitive applications"
    }
}

# Display cloud comparison
import pandas as pd
df_cloud = pd.DataFrame.from_dict(cloud_strategies, orient='index')
df_cloud.index.name = 'Platform'
print("☁️  Cloud Deployment Strategies:")
print(df_cloud[['scaling', 'cost_optimization', 'best_for']].to_string())

# Cost estimation function
def estimate_cloud_costs(platform: str, instances: int, hours_per_month: int) -> Dict[str, float]:
    """Estimate monthly costs for different cloud platforms"""

    # Simplified cost estimates (in USD)
    base_costs = {
        "AWS": {"gpu_hour": 3.50, "storage_gb_month": 0.023},
        "GCP": {"gpu_hour": 2.50, "storage_gb_month": 0.020},
        "Azure": {"gpu_hour": 3.00, "storage_gb_month": 0.022}
    }

    if platform not in base_costs:
        return {"error": "Platform not supported"}

    costs = base_costs[platform]
    compute_cost = instances * hours_per_month * costs["gpu_hour"]
    storage_cost = 100 * costs["storage_gb_month"]  # Assume 100GB storage
    total_cost = compute_cost + storage_cost

    return {
        "compute_cost": compute_cost,
        "storage_cost": storage_cost,
        "total_monthly": total_cost,
        "cost_per_request": total_cost / (instances * hours_per_month * 3600 / 0.1)  # Assuming 0.1s per request
    }

# Test cost estimation
aws_costs = estimate_cloud_costs("AWS", 2, 730)  # 2 instances, ~1 month
print(f"\n💰 AWS Cost Estimate: ${aws_costs['total_monthly']:.2f}/month")
print(f"   Compute: ${aws_costs['compute_cost']:.2f}")
print(f"   Storage: ${aws_costs['storage_cost']:.2f}")
print(f"   Per request: ${aws_costs['cost_per_request']:.6f}")

## 🎯 Production Optimization Techniques

Advanced techniques for optimizing production LLM deployments.

In [None]:
# Advanced optimization techniques
optimization_techniques = {
    "Model Optimization": {
        "quantization": "Reduce precision (FP16, INT8, INT4)",
        "pruning": "Remove unnecessary weights",
        "distillation": "Train smaller model to mimic larger one",
        "sparsification": "Make weight matrix sparse"
    },
    "Inference Optimization": {
        "continuous_batching": "Process multiple requests together",
        "kv_caching": "Cache key-value pairs across requests",
        "parallel_decoding": "Decode multiple sequences in parallel",
        "speculative_decoding": "Use smaller model to guide larger one"
    },
    "System Optimization": {
        "memory_pooling": "Pre-allocate memory buffers",
        "cpu_offloading": "Move computations to CPU when needed",
        "model_sharding": "Split model across multiple GPUs",
        "pipeline_parallelism": "Parallelize model layers"
    },
    "Request Optimization": {
        "request_batching": "Group multiple requests together",
        "priority_queues": "Process urgent requests first",
        "caching": "Cache frequent queries and responses",
        "compression": "Compress request/response data"
    }
}

print("⚡ Production Optimization Techniques:")
print("=" * 50)

for category, techniques in optimization_techniques.items():
    print(f"\n🔧 {category}:")
    for technique, description in techniques.items():
        print(f"   • {technique}: {description}")

# Performance benchmarking function
def benchmark_optimization(technique: str, baseline_metrics: Dict[str, float]) -> Dict[str, float]:
    """Benchmark the impact of an optimization technique"""

    # Simulate optimization impact
    improvements = {
        "quantization_fp16": {"latency": 0.9, "memory": 0.5, "throughput": 1.1},
        "continuous_batching": {"latency": 1.2, "memory": 1.0, "throughput": 3.0},
        "kv_caching": {"latency": 0.8, "memory": 1.1, "throughput": 1.5},
        "model_pruning": {"latency": 0.95, "memory": 0.7, "throughput": 1.05}
    }

    if technique not in improvements:
        return baseline_metrics

    optimized = {}
    for metric, baseline_value in baseline_metrics.items():
        if metric in improvements[technique]:
            optimized[metric] = baseline_value * improvements[technique][metric]
        else:
            optimized[metric] = baseline_value

    return optimized

# Test optimization benchmarking
baseline = {"latency": 1.0, "memory": 100, "throughput": 10}
optimized = benchmark_optimization("continuous_batching", baseline)

print(f"\n📊 Optimization Impact Example:")
print(f"Baseline: {baseline}")
print(f"With continuous batching: {optimized}")
print(f"Improvement: {optimized['throughput'] / baseline['throughput']:.1f}x throughput")

## 📚 Key Takeaways

1. **Architecture Selection**: Choose the right deployment architecture based on scale and requirements
2. **FastAPI Implementation**: Build robust, async APIs with proper error handling
3. **Monitoring**: Implement comprehensive observability for production systems
4. **Auto-scaling**: Use intelligent scaling based on utilization metrics
5. **Containerization**: Docker and Kubernetes for consistent deployments
6. **Cloud Optimization**: Leverage cloud-specific features for cost and performance
7. **Optimization**: Multiple techniques for maximizing throughput and minimizing latency

## 🚀 Next Steps

Now that you understand production deployment, proceed to:
- **Notebook 5**: Evaluation, Benchmarking, and Ethics

## 🔗 Additional Resources

- [FastAPI Documentation](https://fastapi.tiangolo.com/)
- [Kubernetes for ML](https://kubernetes.io/docs/concepts/workloads/)
- [AWS SageMaker](https://docs.aws.amazon.com/sagemaker/)
- [vLLM Production Guide](https://vllm.readthedocs.io/en/latest/serving/index.html)

## 🎯 Hands-on Exercises

1. **API Development**: Build a complete FastAPI service for LLM classification
2. **Docker Deployment**: Containerize your LLM service and deploy it
3. **Monitoring Setup**: Implement comprehensive monitoring and alerting
4. **Auto-scaling**: Configure auto-scaling based on custom metrics
5. **Cloud Deployment**: Deploy to AWS/GCP/Azure with proper security
6. **Performance Optimization**: Apply multiple optimization techniques and measure impact
7. **Load Testing**: Use tools like Locust to test your deployment under load
8. **A/B Testing**: Implement model versioning and gradual rollouts

## 🎉 Conclusion

You've now mastered production deployment and scaling for LLM systems! Key achievements:
- ✅ Understanding production architecture patterns
- ✅ Building robust FastAPI services
- ✅ Implementing comprehensive monitoring
- ✅ Configuring auto-scaling strategies
- ✅ Containerizing applications with Docker
- ✅ Optimizing for cloud deployments
- ✅ Applying advanced optimization techniques

Ready to move on to evaluation and ethics! 🚀