# Model Deployment and Serving Guide

## Introduction

This tutorial provides a comprehensive guide to deploying and serving both the Mini Transformer and Advanced Transformer models in production environments. We'll cover containerization with Docker, API development with FastAPI, performance optimization, and monitoring strategies.

### What You'll Learn
- Containerizing models with Docker for consistent deployment
- Building REST APIs with FastAPI for model serving
- Performance optimization techniques for production
- Load testing and benchmarking
- Monitoring and logging strategies
- Scaling considerations for high-traffic applications

In [None]:
# Import required libraries
import torch
import torch.nn as nn
import sys
from pathlib import Path
import time
import numpy as np
import json
import os

# Add project root to path
sys.path.append(str(Path('.').parent))

# Import our model implementations
from src.model.mini_transformer import MiniTransformer, MiniTransformerConfig
from src.model.advanced_transformer import AdvancedTransformer, AdvancedTransformerConfig

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")

## 1. Docker Containerization Overview

Containerization with Docker ensures consistent deployment across different environments. Let's examine the Docker configuration files in our project.

In [None]:
# Examine Docker configuration files
docker_dir = "../environment/docker"
print(f"Docker configuration directory: {docker_dir}")

# List Docker files
docker_files = []
if os.path.exists(docker_dir):
    docker_files = [f for f in os.listdir(docker_dir) if f.startswith("Dockerfile")]
    print(f"\nDocker files found:")
    for docker_file in docker_files:
        print(f"  - {docker_file}")
else:
    print(f"\nDocker directory not found: {docker_dir}")

# Show content of training Dockerfile as an example
train_dockerfile = os.path.join(docker_dir, "Dockerfile.train")
if os.path.exists(train_dockerfile):
    print(f"\nContent of Dockerfile.train (first 20 lines):")
    with open(train_dockerfile, 'r') as f:
        lines = f.readlines()
        for i, line in enumerate(lines[:20]):
            print(f"  {i+1:2d}: {line.rstrip()}")
        if len(lines) > 20:
            print(f"  ... ({len(lines) - 20} more lines)")

## 2. FastAPI Application Development

Let's create a FastAPI application for serving our models. This will provide a REST API interface for model inference.

In [None]:
# Create a simple FastAPI application for model serving
fastapi_code = '''
"""
FastAPI Application for Transformer Model Serving
===============================================

This module provides a REST API for serving transformer models.
"""

import torch
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import time
import os
import sys
from pathlib import Path

# Add project root to path
sys.path.append(str(Path(__file__).parent.parent))

from src.model.mini_transformer import MiniTransformer, MiniTransformerConfig
from src.model.advanced_transformer import AdvancedTransformer, AdvancedTransformerConfig

# Initialize FastAPI app
app = FastAPI(
    title="Transformer Model API",
    description="API for serving Mini Transformer and Advanced Transformer models",
    version="1.0.0"
)

# Global model variables
mini_model = None
advanced_model = None
device = None

# Request/Response Models
class TextInput(BaseModel):
    text: str
    max_length: Optional[int] = 50
    temperature: Optional[float] = 1.0

class TextOutput(BaseModel):
    generated_text: str
    inference_time_ms: float
    input_tokens: int
    output_tokens: int

class ModelInfo(BaseModel):
    model_name: str
    parameters: int
    device: str
    status: str

# Simple tokenizer for demonstration
class SimpleTokenizer:
    def __init__(self, vocab_size=1000):
        self.vocab_size = vocab_size
        self.pad_token_id = 0
        self.bos_token_id = 1
        self.eos_token_id = 2
        
        # Simple vocabulary mapping
        self.vocab = {
            \'<PAD>\': self.pad_token_id,
            \'<BOS>\': self.bos_token_id,
            \'<EOS>\': self.eos_token_id,
        }
        
        # Add some sample words
        words = [
            \'the\', \'of\', \'and\', \'a\', \'to\', \'in\', \'is\', \'you\', \'that\', \'it\',
            \'he\', \'was\', \'for\', \'on\', \'are\', \'as\', \'with\', \'his\', \'they\', \'i\',
            \'at\', \'be\', \'this\', \'have\', \'from\', \'or\', \'one\', \'had\', \'by\', \'word\',
            \'but\', \'not\', \'what\', \'all\', \'were\', \'we\', \'when\', \'your\', \'can\', \'said\'
        ]
        
        for i, word in enumerate(words):
            if i + 3 < self.vocab_size:
                self.vocab[word] = i + 3
        
        self.id_to_token = {v: k for k, v in self.vocab.items()}
    
    def encode(self, text: str, max_length: Optional[int] = None) -> List[int]:
        tokens = [self.bos_token_id]
        words = text.lower().split()
        
        for word in words:
            word = word.strip(\'.,!?;:\')
            if word in self.vocab:
                tokens.append(self.vocab[word])
            else:
                tokens.append(3)  # Unknown token
        
        tokens.append(self.eos_token_id)
        
        if max_length and len(tokens) > max_length:
            tokens = tokens[:max_length]
        elif max_length and len(tokens) < max_length:
            tokens.extend([self.pad_token_id] * (max_length - len(tokens)))
        
        return tokens
    
    def decode(self, token_ids: List[int]) -> str:
        words = []
        for token_id in token_ids:
            if token_id == self.eos_token_id:
                break
            if token_id != self.bos_token_id and token_id != self.pad_token_id:
                word = self.id_to_token.get(token_id, \'<UNK>\')
                words.append(word)
        return \' \'.join(words)

# Initialize models
@app.on_event("startup")
async def load_models():
    """Load models on startup"""
    global mini_model, advanced_model, device, tokenizer
    
    # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    
    # Initialize tokenizer
    tokenizer = SimpleTokenizer()
    
    # Load Mini Transformer
    try:
        mini_config = MiniTransformerConfig(
            vocab_size=1000,
            hidden_size=128,
            num_attention_heads=4,
            num_hidden_layers=4,
            intermediate_size=256,
            max_position_embeddings=64,
            dropout_prob=0.0,
            use_cuda=torch.cuda.is_available(),
            use_cudnn=True
        )
        mini_model = MiniTransformer(mini_config)
        mini_model.to(device)
        mini_model.eval()
        print("Mini Transformer loaded successfully")
    except Exception as e:
        print(f"Failed to load Mini Transformer: {e}")
        mini_model = None
    
    # Load Advanced Transformer
    try:
        advanced_config = AdvancedTransformerConfig(
            hidden_size=512,
            num_attention_heads=8,
            num_hidden_layers=6,
            intermediate_size=2048,
            max_position_embeddings=512,
            num_modalities=4,
            gpu_acceleration_units=16,
            spiking_neurons=False,  # Simplify for demo
            continuous_learning=False,
            use_cuda=torch.cuda.is_available(),
            use_cudnn=True
        )
        advanced_model = AdvancedTransformer(advanced_config)
        advanced_model.to(device)
        advanced_model.eval()
        print("Advanced Transformer loaded successfully")
    except Exception as e:
        print(f"Failed to load Advanced Transformer: {e}")
        advanced_model = None

# Health check endpoint
@app.get("/health", response_model=ModelInfo)
async def health_check():
    """Health check endpoint"""
    if mini_model is not None:
        params = sum(p.numel() for p in mini_model.parameters())
        return ModelInfo(
            model_name="MiniTransformer",
            parameters=params,
            device=str(device),
            status="loaded"
        )
    else:
        return ModelInfo(
            model_name="MiniTransformer",
            parameters=0,
            device=str(device),
            status="not loaded"
        )

# Model information endpoint
@app.get("/models", response_model=List[ModelInfo])
async def list_models():
    """List available models"""
    models = []
    
    if mini_model is not None:
        mini_params = sum(p.numel() for p in mini_model.parameters())
        models.append(ModelInfo(
            model_name="MiniTransformer",
            parameters=mini_params,
            device=str(device),
            status="loaded"
        ))
    
    if advanced_model is not None:
        advanced_params = sum(p.numel() for p in advanced_model.parameters())
        models.append(ModelInfo(
            model_name="AdvancedTransformer",
            parameters=advanced_params,
            device=str(device),
            status="loaded"
        ))
    
    return models

# Text generation endpoint for Mini Transformer
@app.post("/generate/mini", response_model=TextOutput)
async def generate_text_mini(input: TextInput):
    """Generate text using Mini Transformer"""
    if mini_model is None:
        raise HTTPException(status_code=500, detail="Mini Transformer not loaded")
    
    try:
        # Encode input text
        input_tokens = tokenizer.encode(input.text, max_length=32)
        input_ids = torch.tensor([input_tokens], dtype=torch.long).to(device)
        
        # Generate text
        start_time = time.time()
        with torch.no_grad():
            generated_ids = mini_model.generate(
                input_ids,
                max_length=input.max_length,
                temperature=input.temperature
            )
        end_time = time.time()
        
        # Decode output
        generated_text = tokenizer.decode(generated_ids[0].cpu().tolist())
        
        return TextOutput(
            generated_text=generated_text,
            inference_time_ms=(end_time - start_time) * 1000,
            input_tokens=len(input_tokens),
            output_tokens=generated_ids.shape[1]
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Generation failed: {str(e)}")

# Text generation endpoint for Advanced Transformer
@app.post("/generate/advanced", response_model=TextOutput)
async def generate_text_advanced(input: TextInput):
    """Generate text using Advanced Transformer"""
    if advanced_model is None:
        raise HTTPException(status_code=500, detail="Advanced Transformer not loaded")
    
    try:
        # Encode input text
        input_tokens = tokenizer.encode(input.text, max_length=32)
        input_ids = torch.tensor([input_tokens], dtype=torch.long).to(device)
        
        # Generate text
        start_time = time.time()
        with torch.no_grad():
            generated_ids = advanced_model.generate(
                input_ids,
                max_length=input.max_length,
                temperature=input.temperature
            )
        end_time = time.time()
        
        # Decode output
        generated_text = tokenizer.decode(generated_ids[0].cpu().tolist())
        
        return TextOutput(
            generated_text=generated_text,
            inference_time_ms=(end_time - start_time) * 1000,
            input_tokens=len(input_tokens),
            output_tokens=generated_ids.shape[1]
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Generation failed: {str(e)}")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
'''

# Save the FastAPI application
api_dir = "../src/api"
os.makedirs(api_dir, exist_ok=True)
api_file = os.path.join(api_dir, "main.py")

with open(api_file, 'w') as f:
    f.write(fastapi_code)

print(f"FastAPI application created at: {api_file}")
print(f"\nTo run the API server:")
print(f"  cd src/api")
print(f"  uvicorn main:app --reload")
print(f"\nAPI will be available at: http://localhost:8000")
print(f"API documentation: http://localhost:8000/docs")

## 3. Creating a Serving Dockerfile

Let's create a Dockerfile specifically for serving our models via the FastAPI application.

In [None]:
# Create a Dockerfile for model serving
serving_dockerfile_content = '''
# Serving Dockerfile with CUDA support
FROM nvcr.io/nvidia/pytorch:23.08-py3

# Set working directory
WORKDIR /app

# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV NVIDIA_VISIBLE_DEVICES=all
ENV NVIDIA_DRIVER_CAPABILITIES=compute,utility

# Install additional system dependencies
RUN apt-get update && apt-get install -y \
    build-essential \
    curl \
    git \
    wget \
    ca-certificates \
    && rm -rf /var/lib/apt/lists/*

# Upgrade pip
RUN pip install --no-cache-dir --upgrade pip

# Copy requirements file
COPY requirements.txt .

# Install Python dependencies with CUDA optimizations
RUN pip install --no-cache-dir -r requirements.txt

# Install additional tools for serving
RUN pip install --no-cache-dir \
    fastapi \
    uvicorn \
    pydantic \
    python-multipart

# Create directories for models and data
RUN mkdir -p models data logs

# Copy project files
COPY . .

# Set proper permissions
RUN chmod -R 755 /app

# Create non-root user with proper permissions
RUN useradd --create-home --shell /bin/bash --uid 1000 server && \
    chown -R server:server /app && \
    usermod -aG sudo server

# Switch to non-root user
USER server

# Expose port for API
EXPOSE 8000

# Set entrypoint for serving
ENTRYPOINT ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"]
'''

# Save the serving Dockerfile
serving_dockerfile = os.path.join(docker_dir, "Dockerfile.serve")

with open(serving_dockerfile, 'w') as f:
    f.write(serving_dockerfile_content)

print(f"Serving Dockerfile created at: {serving_dockerfile}")
print(f"\nTo build the serving image:")
print(f"  docker build -f environment/docker/Dockerfile.serve -t transformer-serve .")
print(f"\nTo run the serving container:")
print(f"  docker run -p 8000:8000 --gpus all transformer-serve")

## 4. Kubernetes Deployment Configuration

For production deployments, Kubernetes provides orchestration capabilities. Let's create Kubernetes deployment configurations.

In [None]:
# Create Kubernetes deployment configuration
k8s_dir = "../environment/k8s"
os.makedirs(k8s_dir, exist_ok=True)

# Deployment YAML
deployment_yaml = '''
apiVersion: apps/v1
kind: Deployment
metadata:
  name: transformer-api
  labels:
    app: transformer-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: transformer-api
  template:
    metadata:
      labels:
        app: transformer-api
    spec:
      containers:
      - name: transformer-api
        image: transformer-serve:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            memory: "2Gi"
            cpu: "1"
            nvidia.com/gpu: 1
          limits:
            memory: "4Gi"
            cpu: "2"
            nvidia.com/gpu: 1
        env:
        - name: NVIDIA_VISIBLE_DEVICES
          value: "all"
        - name: NVIDIA_DRIVER_CAPABILITIES
          value: "compute,utility"
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
'''

# Service YAML
service_yaml = '''
apiVersion: v1
kind: Service
metadata:
  name: transformer-api-service
spec:
  selector:
    app: transformer-api
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8000
  type: LoadBalancer
'''

# Save Kubernetes configurations
deployment_file = os.path.join(k8s_dir, "deployment.yaml")
service_file = os.path.join(k8s_dir, "service.yaml")

with open(deployment_file, 'w') as f:
    f.write(deployment_yaml)

with open(service_file, 'w') as f:
    f.write(service_yaml)

print(f"Kubernetes configurations created:")
print(f"  Deployment: {deployment_file}")
print(f"  Service: {service_file}")
print(f"\nTo deploy to Kubernetes:")
print(f"  kubectl apply -f environment/k8s/")

## 5. Performance Optimization Techniques

Let's explore various performance optimization techniques for model serving.

In [None]:
print("Performance Optimization Techniques for Model Serving:")
print("=" * 50)

# 1. Model Quantization
print("\n1. Model Quantization:")
print("   - Convert FP32 models to INT8 for reduced memory usage")
print("   - Use PyTorch's quantization tools:")
print("     import torch.quantization")
print("     model_quantized = torch.quantization.quantize_dynamic(model, {torch.nn.Linear}, dtype=torch.qint8)")

# 2. Model Compilation
print("\n2. Model Compilation:")
print("   - Use TorchScript for optimization:")
print("     scripted_model = torch.jit.script(model)")
print("   - Use ONNX for cross-platform deployment")

# 3. Batching
print("\n3. Batching Strategies:")
print("   - Process multiple requests together")
print("   - Dynamic batching based on request arrival")
print("   - Static batching for predictable workloads")

# 4. Caching
print("\n4. Caching Mechanisms:")
print("   - Cache frequent requests and responses")
print("   - Use Redis or similar for distributed caching")
print("   - Implement LRU cache for recent results")

# 5. Asynchronous Processing
print("\n5. Asynchronous Processing:")
print("   - Use async/await in FastAPI")
print("   - Implement request queues for heavy processing")
print("   - Use background tasks for non-critical operations")

# 6. Load Balancing
print("\n6. Load Balancing:")
print("   - Use NGINX or similar for request distribution")
print("   - Implement health checks for automatic failover")
print("   - Use Kubernetes services for built-in load balancing")

# 7. Monitoring and Metrics
print("\n7. Monitoring and Metrics:")
print("   - Track inference latency and throughput")
print("   - Monitor GPU and memory utilization")
print("   - Use Prometheus and Grafana for visualization")
print("   - Implement logging for debugging and audit")

## 6. Load Testing and Benchmarking

Let's create a simple load testing script to benchmark our API.

In [None]:
# Create a load testing script
load_test_code = '''
"""
Load Testing Script for Transformer API
=====================================

This script performs load testing on the transformer API.
"""

import asyncio
import aiohttp
import time
import statistics
from typing import List

API_URL = "http://localhost:8000"

async def send_request(session, prompt: str) -> float:
    """Send a single request and return response time"""
    start_time = time.time()
    try:
        async with session.post(
            f"{API_URL}/generate/mini",
            json={
                "text": prompt,
                "max_length": 30,
                "temperature": 0.8
            }
        ) as response:
            await response.json()
    except Exception as e:
        print(f"Request failed: {e}")
        return None
    
    return time.time() - start_time

async def load_test(concurrent_requests: int = 10, total_requests: int = 100):
    """Perform load testing with specified concurrency"""
    # Sample prompts for testing
    prompts = [
        "The future of artificial intelligence",
        "Machine learning is transforming",
        "Natural language processing enables",
        "Deep learning models can",
        "Ethical AI development requires"
    ]
    
    async with aiohttp.ClientSession() as session:
        # Create tasks for all requests
        tasks = []
        for i in range(total_requests):
            prompt = prompts[i % len(prompts)]
            tasks.append(send_request(session, prompt))
        
        # Execute requests
        start_time = time.time()
        results = await asyncio.gather(*tasks, return_exceptions=True)
        total_time = time.time() - start_time
        
        # Filter out failed requests
        response_times = [r for r in results if isinstance(r, float)]
        
        # Calculate statistics
        if response_times:
            avg_response_time = statistics.mean(response_times)
            median_response_time = statistics.median(response_times)
            min_response_time = min(response_times)
            max_response_time = max(response_times)
            
            print(f"Load Test Results:")
            print(f"  Total requests: {total_requests}")
            print(f"  Concurrent requests: {concurrent_requests}")
            print(f"  Successful requests: {len(response_times)}")
            print(f"  Failed requests: {total_requests - len(response_times)}")
            print(f"  Total test time: {total_time:.2f} seconds")
            print(f"  Requests per second: {total_requests / total_time:.2f}")
            print(f"  Average response time: {avg_response_time*1000:.2f} ms")
            print(f"  Median response time: {median_response_time*1000:.2f} ms")
            print(f"  Min response time: {min_response_time*1000:.2f} ms")
            print(f"  Max response time: {max_response_time*1000:.2f} ms")
        else:
            print("All requests failed!")

if __name__ == "__main__":
    print("Starting load test...")
    asyncio.run(load_test(concurrent_requests=5, total_requests=20))
'''

# Save the load testing script
test_dir = "../tests"
os.makedirs(test_dir, exist_ok=True)
load_test_file = os.path.join(test_dir, "load_test.py")

with open(load_test_file, 'w') as f:
    f.write(load_test_code)

print(f"Load testing script created at: {load_test_file}")
print(f"\nTo run the load test (after starting the API server):")
print(f"  python tests/load_test.py")

## 7. Monitoring and Logging

Let's enhance our FastAPI application with monitoring and logging capabilities.

In [None]:
# Enhanced FastAPI code with monitoring and logging
enhanced_fastapi_code = '''
"""
Enhanced FastAPI Application with Monitoring and Logging
=======================================================

This module provides a REST API for serving transformer models with enhanced monitoring.
"""

import torch
import logging
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import List, Optional
import time
import os
import sys
from pathlib import Path
import psutil
import asyncio

# Add project root to path
sys.path.append(str(Path(__file__).parent.parent))

from src.model.mini_transformer import MiniTransformer, MiniTransformerConfig

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Initialize FastAPI app
app = FastAPI(
    title="Transformer Model API",
    description="API for serving transformer models with monitoring",
    version="1.0.0"
)

# Global variables
mini_model = None
device = None
request_count = 0
total_inference_time = 0.0

# Request/Response Models
class TextInput(BaseModel):
    text: str
    max_length: Optional[int] = 50
    temperature: Optional[float] = 1.0

class TextOutput(BaseModel):
    generated_text: str
    inference_time_ms: float
    input_tokens: int
    output_tokens: int

class HealthInfo(BaseModel):
    status: str
    model_loaded: bool
    device: str
    request_count: int
    average_response_time_ms: Optional[float]
    cpu_percent: float
    memory_percent: float

# Simple tokenizer
class SimpleTokenizer:
    def __init__(self, vocab_size=1000):
        self.vocab_size = vocab_size
        self.pad_token_id = 0
        self.bos_token_id = 1
        self.eos_token_id = 2
        
        self.vocab = {\'<PAD>\': self.pad_token_id, \'<BOS>\': self.bos_token_id, \'<EOS>\': self.eos_token_id}
        words = [\'the\', \'of\', \'and\', \'a\', \'to\', \'in\', \'is\', \'you\', \'that\', \'it\']
        for i, word in enumerate(words):
            if i + 3 < self.vocab_size:
                self.vocab[word] = i + 3
        self.id_to_token = {v: k for k, v in self.vocab.items()}
    
    def encode(self, text: str, max_length: Optional[int] = None) -> List[int]:
        tokens = [self.bos_token_id] + [self.vocab.get(w, 3) for w in text.lower().split()[:max_length-2]] + [self.eos_token_id]
        if max_length and len(tokens) < max_length:
            tokens.extend([self.pad_token_id] * (max_length - len(tokens)))
        return tokens[:max_length] if max_length else tokens
    
    def decode(self, token_ids: List[int]) -> str:
        return \' \'.join([self.id_to_token.get(t, \'<UNK>\') for t in token_ids if t not in [self.bos_token_id, self.eos_token_id, self.pad_token_id]])

# Middleware for logging requests
@app.middleware("http")
async def log_requests(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    logger.info(f"{request.method} {request.url.path} - Status: {response.status_code} - Time: {process_time:.4f}s")
    return response

# Initialize models
@app.on_event("startup")
async def load_models():
    """Load models on startup"""
    global mini_model, device, tokenizer
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logger.info(f"Using device: {device}")
    
    tokenizer = SimpleTokenizer()
    
    try:
        config = MiniTransformerConfig(vocab_size=1000, hidden_size=128, num_attention_heads=4, num_hidden_layers=4)
        mini_model = MiniTransformer(config)
        mini_model.to(device)
        mini_model.eval()
        logger.info("Mini Transformer loaded successfully")
    except Exception as e:
        logger.error(f"Failed to load Mini Transformer: {e}")
        mini_model = None

# Health check endpoint
@app.get("/health", response_model=HealthInfo)
async def health_check():
    """Enhanced health check with system metrics"""
    cpu_percent = psutil.cpu_percent()
    memory_percent = psutil.virtual_memory().percent
    
    avg_response_time = total_inference_time / request_count if request_count > 0 else None
    
    return HealthInfo(
        status="healthy" if mini_model is not None else "unhealthy",
        model_loaded=mini_model is not None,
        device=str(device),
        request_count=request_count,
        average_response_time_ms=avg_response_time,
        cpu_percent=cpu_percent,
        memory_percent=memory_percent
    )

# Text generation endpoint
@app.post("/generate", response_model=TextOutput)
async def generate_text(input: TextInput):
    """Generate text with monitoring"""
    global request_count, total_inference_time
    
    if mini_model is None:
        raise HTTPException(status_code=500, detail="Model not loaded")
    
    request_count += 1
    logger.info(f"Processing request #{request_count}: {input.text[:50]}...")
    
    try:
        input_tokens = tokenizer.encode(input.text, max_length=32)
        input_ids = torch.tensor([input_tokens], dtype=torch.long).to(device)
        
        start_time = time.time()
        with torch.no_grad():
            generated_ids = mini_model.generate(input_ids, max_length=input.max_length, temperature=input.temperature)
        inference_time = time.time() - start_time
        
        total_inference_time += inference_time
        
        generated_text = tokenizer.decode(generated_ids[0].cpu().tolist())
        
        logger.info(f"Request #{request_count} completed in {inference_time*1000:.2f}ms")
        
        return TextOutput(
            generated_text=generated_text,
            inference_time_ms=inference_time * 1000,
            input_tokens=len(input_tokens),
            output_tokens=generated_ids.shape[1]
        )
    except Exception as e:
        logger.error(f"Generation failed for request #{request_count}: {e}")
        raise HTTPException(status_code=500, detail=f"Generation failed: {str(e)}")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
'''

print("Enhanced FastAPI application with monitoring and logging:")
print("=" * 50)
print("Key enhancements include:")
print("  1. Comprehensive logging with timestamps and request details")
print("  2. System metrics monitoring (CPU, memory usage)")
print("  3. Request counting and performance tracking")
print("  4. Middleware for request logging")
print("  5. Enhanced health check endpoint with metrics")
print("  6. Error handling with detailed logging")
print("\nTo implement this enhanced version, replace the content of src/api/main.py")

## 8. Scaling Considerations

Let's discuss important scaling considerations for production deployments.

In [None]:
print("Scaling Considerations for Production Deployments:")
print("=" * 50)

# Horizontal Scaling
print("\n1. Horizontal Scaling:")
print("   - Use load balancers to distribute requests")
print("   - Implement stateless services for easy scaling")
print("   - Use Kubernetes for automatic scaling based on metrics")
print("   - Consider blue-green deployments for zero-downtime updates")

# Vertical Scaling
print("\n2. Vertical Scaling:")
print("   - Use larger GPU instances for better performance")
print("   - Optimize memory usage to fit larger models")
print("   - Consider model parallelism for very large models")
print("   - Use mixed precision training/inference to reduce memory")

# Auto-scaling
print("\n3. Auto-scaling Strategies:")
print("   - Scale based on CPU/GPU utilization")
print("   - Scale based on request queue length")
print("   - Scale based on response time SLAs")
print("   - Implement predictive scaling based on traffic patterns")

# Geographic Distribution
print("\n4. Geographic Distribution:")
print("   - Deploy to multiple regions for low latency")
print("   - Use CDN for static content")
print("   - Implement data replication for consistency")
print("   - Consider edge computing for real-time applications")

# Cost Optimization
print("\n5. Cost Optimization:")
print("   - Use spot instances for non-critical workloads")
print("   - Implement request batching to improve utilization")
print("   - Use model compression techniques")
print("   - Monitor and optimize resource allocation")

# Fault Tolerance
print("\n6. Fault Tolerance:")
print("   - Implement circuit breakers for downstream services")
print("   - Use health checks for automatic failover")
print("   - Implement retry logic with exponential backoff")
print("   - Use distributed tracing for debugging")

## Conclusion

This tutorial has covered the essential aspects of deploying and serving transformer models in production environments:

1. **Containerization with Docker**: Ensuring consistent deployment across environments
2. **API Development with FastAPI**: Creating RESTful interfaces for model serving
3. **Kubernetes Deployment**: Orchestrating containers for scalability
4. **Performance Optimization**: Techniques for maximizing throughput and minimizing latency
5. **Load Testing**: Validating system performance under various loads
6. **Monitoring and Logging**: Tracking system health and performance metrics
7. **Scaling Considerations**: Strategies for handling increased traffic and demand

### Key Takeaways:

- **Start Simple**: Begin with basic containerization and API development
- **Monitor Everything**: Implement comprehensive logging and monitoring from the start
- **Plan for Growth**: Design systems with scalability in mind
- **Optimize Continuously**: Regularly benchmark and optimize performance
- **Ensure Reliability**: Implement fault tolerance and graceful degradation

### Best Practices:

1. Use infrastructure as code (Kubernetes YAML, Dockerfiles) for reproducible deployments
2. Implement comprehensive monitoring and alerting
3. Design stateless services for easy scaling
4. Use load testing to validate performance under expected loads
5. Implement proper error handling and logging
6. Plan for disaster recovery and backup strategies
7. Regularly update and patch container images
8. Use secrets management for sensitive configuration

These deployment strategies and techniques form the foundation of robust, scalable, and maintainable AI model serving infrastructure used by leading technology companies.