# 05 - Production Patterns

Best practices for using Hugging Face Pipelines in production:

1. Model Loading Strategies
2. Caching and Performance
3. API Design Patterns
4. Monitoring and Logging
5. Scaling Considerations
6. Security Best Practices

In [None]:
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
import torch
import time
import logging
from functools import lru_cache
from typing import List, Dict, Any, Optional
from dataclasses import dataclass

---
## 1. Model Loading Strategies

### 1.1 Singleton Pattern for Model Loading

In [None]:
class ModelManager:
    """
    Singleton pattern to ensure models are loaded only once.
    """
    _instance = None
    _pipelines = {}
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    def get_pipeline(self, task: str, model: str = None, **kwargs) -> pipeline:
        """
        Get or create a pipeline. Cached for reuse.
        """
        key = f"{task}_{model or 'default'}"
        
        if key not in self._pipelines:
            print(f"Loading pipeline: {key}")
            if model:
                self._pipelines[key] = pipeline(task, model=model, **kwargs)
            else:
                self._pipelines[key] = pipeline(task, **kwargs)
        
        return self._pipelines[key]
    
    def clear(self):
        """Clear all cached pipelines."""
        self._pipelines.clear()
        import gc
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

# Usage
manager = ModelManager()

# First call loads the model
pipe1 = manager.get_pipeline("sentiment-analysis")

# Second call returns cached model
pipe2 = manager.get_pipeline("sentiment-analysis")

print(f"Same instance: {pipe1 is pipe2}")

### 1.2 Lazy Loading with Context Manager

In [None]:
class LazyPipeline:
    """
    Lazy-loaded pipeline that initializes on first use.
    """
    def __init__(self, task: str, model: str = None, **kwargs):
        self.task = task
        self.model = model
        self.kwargs = kwargs
        self._pipeline = None
    
    @property
    def pipe(self):
        if self._pipeline is None:
            print(f"Initializing {self.task} pipeline...")
            self._pipeline = pipeline(
                self.task, 
                model=self.model, 
                **self.kwargs
            )
        return self._pipeline
    
    def __call__(self, *args, **kwargs):
        return self.pipe(*args, **kwargs)
    
    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        self.unload()
    
    def unload(self):
        """Explicitly unload the model."""
        if self._pipeline is not None:
            del self._pipeline
            self._pipeline = None
            import gc
            gc.collect()

# Usage - model loads only when first called
lazy_classifier = LazyPipeline("sentiment-analysis")
print("Pipeline created (not loaded yet)")

# First call triggers loading
result = lazy_classifier("This loads the model!")
print(f"Result: {result}")

---
## 2. Caching and Performance

### 2.1 Result Caching

In [None]:
import hashlib
from collections import OrderedDict

class CachedPipeline:
    """
    Pipeline with LRU result caching.
    """
    def __init__(self, task: str, model: str = None, cache_size: int = 1000):
        self.pipe = pipeline(task, model=model)
        self.cache = OrderedDict()
        self.cache_size = cache_size
        self.hits = 0
        self.misses = 0
    
    def _get_cache_key(self, text: str) -> str:
        """Generate cache key from input."""
        return hashlib.md5(text.encode()).hexdigest()
    
    def __call__(self, text: str, use_cache: bool = True):
        if use_cache:
            key = self._get_cache_key(text)
            
            if key in self.cache:
                self.hits += 1
                # Move to end (most recently used)
                self.cache.move_to_end(key)
                return self.cache[key]
            
            self.misses += 1
            result = self.pipe(text)
            
            # Add to cache
            self.cache[key] = result
            
            # Evict oldest if over capacity
            while len(self.cache) > self.cache_size:
                self.cache.popitem(last=False)
            
            return result
        else:
            return self.pipe(text)
    
    @property
    def hit_rate(self) -> float:
        total = self.hits + self.misses
        return self.hits / total if total > 0 else 0.0

# Usage
cached = CachedPipeline("sentiment-analysis", cache_size=100)

# First call - cache miss
result1 = cached("I love this product!")
print(f"Call 1 - Hit rate: {cached.hit_rate:.2%}")

# Second call with same input - cache hit
result2 = cached("I love this product!")
print(f"Call 2 - Hit rate: {cached.hit_rate:.2%}")

# Different input - cache miss
result3 = cached("This is terrible!")
print(f"Call 3 - Hit rate: {cached.hit_rate:.2%}")

### 2.2 Request Batching Service

In [None]:
import threading
from queue import Queue
import uuid

class BatchingService:
    """
    Accumulate requests and process in batches.
    """
    def __init__(self, task: str, batch_size: int = 16, timeout: float = 0.1):
        self.pipe = pipeline(task)
        self.batch_size = batch_size
        self.timeout = timeout
        self.request_queue = Queue()
        self.results = {}
        self._stop = False
        
        # Start worker thread
        self.worker = threading.Thread(target=self._process_batches, daemon=True)
        self.worker.start()
    
    def _process_batches(self):
        """Background worker to process accumulated requests."""
        while not self._stop:
            batch = []
            ids = []
            
            # Collect requests
            try:
                while len(batch) < self.batch_size:
                    req_id, text = self.request_queue.get(timeout=self.timeout)
                    batch.append(text)
                    ids.append(req_id)
            except:
                pass  # Timeout reached
            
            # Process batch
            if batch:
                results = self.pipe(batch)
                for req_id, result in zip(ids, results):
                    self.results[req_id] = result
    
    def predict(self, text: str, timeout: float = 5.0):
        """Submit request and wait for result."""
        req_id = str(uuid.uuid4())
        self.request_queue.put((req_id, text))
        
        # Wait for result
        start = time.time()
        while req_id not in self.results:
            if time.time() - start > timeout:
                raise TimeoutError("Request timed out")
            time.sleep(0.01)
        
        result = self.results.pop(req_id)
        return result
    
    def stop(self):
        self._stop = True

# Usage
service = BatchingService("sentiment-analysis", batch_size=8)

# Simulate concurrent requests
results = []
for text in ["Great!", "Terrible!", "Okay."]:
    result = service.predict(text)
    results.append(result)
    print(f"{text}: {result[0]['label']}")

service.stop()

---
## 3. API Design Patterns

### 3.1 Service Class Pattern

In [None]:
@dataclass
class ClassificationResult:
    """Structured result for classification."""
    text: str
    label: str
    confidence: float
    processing_time_ms: float
    model_name: str

class TextClassificationService:
    """
    Production-ready text classification service.
    """
    def __init__(self, model_name: str = "distilbert-base-uncased-finetuned-sst-2-english"):
        self.model_name = model_name
        self.pipe = pipeline("sentiment-analysis", model=model_name)
        self.logger = logging.getLogger(__name__)
    
    def classify(self, text: str) -> ClassificationResult:
        """
        Classify a single text.
        """
        start = time.time()
        
        # Validate input
        if not text or not text.strip():
            raise ValueError("Input text cannot be empty")
        
        # Run inference
        result = self.pipe(text)[0]
        processing_time = (time.time() - start) * 1000
        
        return ClassificationResult(
            text=text,
            label=result['label'],
            confidence=result['score'],
            processing_time_ms=processing_time,
            model_name=self.model_name
        )
    
    def classify_batch(self, texts: List[str]) -> List[ClassificationResult]:
        """
        Classify multiple texts efficiently.
        """
        start = time.time()
        
        # Validate
        texts = [t for t in texts if t and t.strip()]
        if not texts:
            return []
        
        # Batch inference
        results = self.pipe(texts)
        processing_time = (time.time() - start) * 1000 / len(texts)
        
        return [
            ClassificationResult(
                text=text,
                label=result['label'],
                confidence=result['score'],
                processing_time_ms=processing_time,
                model_name=self.model_name
            )
            for text, result in zip(texts, results)
        ]

# Usage
service = TextClassificationService()

# Single classification
result = service.classify("This product is amazing!")
print(f"Single: {result.label} ({result.confidence:.2%}) in {result.processing_time_ms:.1f}ms")

# Batch classification
batch_results = service.classify_batch([
    "Great product!",
    "Terrible experience.",
    "It's okay."
])
for r in batch_results:
    print(f"Batch: {r.text[:20]}... -> {r.label}")

---
## 4. Monitoring and Logging

### 4.1 Instrumented Pipeline

In [None]:
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)

class InstrumentedPipeline:
    """
    Pipeline with comprehensive monitoring.
    """
    def __init__(self, task: str, model: str = None):
        self.pipe = pipeline(task, model=model)
        self.logger = logging.getLogger(f"pipeline.{task}")
        
        # Metrics
        self.total_requests = 0
        self.total_latency_ms = 0
        self.errors = 0
        self.label_counts = {}
    
    def __call__(self, text: str, **kwargs):
        self.total_requests += 1
        request_id = f"{datetime.now().timestamp():.0f}"
        
        self.logger.info(f"[{request_id}] Processing request, length={len(text)}")
        start = time.time()
        
        try:
            result = self.pipe(text, **kwargs)
            latency = (time.time() - start) * 1000
            self.total_latency_ms += latency
            
            # Track label distribution
            label = result[0]['label']
            self.label_counts[label] = self.label_counts.get(label, 0) + 1
            
            self.logger.info(
                f"[{request_id}] Completed: label={label}, "
                f"confidence={result[0]['score']:.3f}, "
                f"latency={latency:.1f}ms"
            )
            
            return result
            
        except Exception as e:
            self.errors += 1
            self.logger.error(f"[{request_id}] Error: {str(e)}")
            raise
    
    def get_metrics(self) -> Dict[str, Any]:
        """Get current metrics."""
        return {
            'total_requests': self.total_requests,
            'errors': self.errors,
            'error_rate': self.errors / max(self.total_requests, 1),
            'avg_latency_ms': self.total_latency_ms / max(self.total_requests, 1),
            'label_distribution': self.label_counts
        }

# Usage
monitored = InstrumentedPipeline("sentiment-analysis")

# Process some requests
texts = ["Great!", "Terrible!", "Amazing!", "Awful!"]
for text in texts:
    monitored(text)

# View metrics
metrics = monitored.get_metrics()
print(f"\nMetrics:")
print(f"  Total requests: {metrics['total_requests']}")
print(f"  Avg latency: {metrics['avg_latency_ms']:.1f}ms")
print(f"  Label distribution: {metrics['label_distribution']}")

---
## 5. FastAPI Integration Example

In [None]:
# Example FastAPI application structure

fastapi_code = '''
# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from transformers import pipeline
from typing import List
import time

app = FastAPI(title="Text Classification API")

# Load model at startup
classifier = None

@app.on_event("startup")
async def load_model():
    global classifier
    classifier = pipeline("sentiment-analysis")

class TextInput(BaseModel):
    text: str

class BatchInput(BaseModel):
    texts: List[str]

class ClassificationResponse(BaseModel):
    label: str
    confidence: float
    processing_time_ms: float

@app.post("/classify", response_model=ClassificationResponse)
async def classify(input: TextInput):
    if not input.text.strip():
        raise HTTPException(400, "Text cannot be empty")
    
    start = time.time()
    result = classifier(input.text)[0]
    latency = (time.time() - start) * 1000
    
    return ClassificationResponse(
        label=result["label"],
        confidence=result["score"],
        processing_time_ms=latency
    )

@app.post("/classify/batch")
async def classify_batch(input: BatchInput):
    texts = [t for t in input.texts if t.strip()]
    if not texts:
        return []
    
    results = classifier(texts)
    return [
        {"text": text, "label": r["label"], "confidence": r["score"]}
        for text, r in zip(texts, results)
    ]

@app.get("/health")
async def health():
    return {"status": "healthy", "model_loaded": classifier is not None}

# Run with: uvicorn app:app --host 0.0.0.0 --port 8000
'''

print("FastAPI Example:")
print(fastapi_code)

---
## 6. Security Best Practices

In [None]:
class SecurePipeline:
    """
    Pipeline with security considerations.
    """
    def __init__(self, task: str, max_length: int = 1000):
        self.pipe = pipeline(task)
        self.max_length = max_length
    
    def sanitize_input(self, text: str) -> str:
        """
        Sanitize and validate input.
        """
        # Remove null bytes
        text = text.replace('\x00', '')
        
        # Limit length
        if len(text) > self.max_length:
            text = text[:self.max_length]
        
        # Strip excessive whitespace
        text = ' '.join(text.split())
        
        return text
    
    def __call__(self, text: str, **kwargs):
        # Validate input type
        if not isinstance(text, str):
            raise TypeError("Input must be a string")
        
        # Sanitize
        clean_text = self.sanitize_input(text)
        
        if not clean_text:
            raise ValueError("Input text is empty after sanitization")
        
        return self.pipe(clean_text, **kwargs)

# Usage
secure = SecurePipeline("sentiment-analysis", max_length=500)

# Normal input
result = secure("This is a normal input.")
print(f"Normal: {result}")

# Very long input (gets truncated)
long_text = "word " * 1000
result = secure(long_text)
print(f"Long input handled: {result[0]['label']}")

---
## ðŸŽ¯ Production Checklist

### Before Deployment
- [ ] Model loaded at startup (not per-request)
- [ ] Input validation implemented
- [ ] Error handling in place
- [ ] Logging configured
- [ ] Health check endpoint available
- [ ] Batch processing for throughput

### Performance
- [ ] GPU enabled if available
- [ ] FP16 or quantization considered
- [ ] Batch size optimized
- [ ] Result caching implemented
- [ ] Connection pooling configured

### Security
- [ ] Input sanitization
- [ ] Rate limiting
- [ ] Input length limits
- [ ] Authentication (if needed)

### Monitoring
- [ ] Latency metrics
- [ ] Error rates
- [ ] Model drift detection
- [ ] Resource utilization

## Congratulations!

You've completed the Hugging Face Pipelines learning guide. You now know:

1. **Basics**: What pipelines are and how to use them
2. **NLP**: All major NLP pipeline types
3. **Multimodal**: Vision, audio, and cross-modal pipelines
4. **Advanced**: Custom pipelines and optimization
5. **Production**: Best practices for deployment

Happy building! ðŸš€