# Real-world Applications

Welcome to the final topic! In this notebook, we'll explore practical implementations of transformer models across various domains. We'll build production-ready applications including chatbots, translation systems, code generators, and more.

## Learning Objectives

By the end of this notebook, you will:
- Build production-ready transformer applications
- Implement conversational AI systems
- Create machine translation pipelines
- Develop code generation assistants
- Deploy content creation systems
- Understand enterprise integration patterns

## Setup and Imports

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Optional, Tuple, Any
import asyncio
import time
from collections import defaultdict
import json

# Set random seeds
torch.manual_seed(42)
np.random.seed(42)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## 1. Application Architecture Overview

Let's start by understanding the common architecture patterns for transformer applications.

In [None]:
# Visualize typical application architecture
plt.figure(figsize=(12, 8))

# Components
components = {
    'User Interface': (2, 8, 'lightblue'),
    'API Gateway': (4, 8, 'lightgreen'),
    'Load Balancer': (6, 8, 'lightyellow'),
    'Model Servers': (8, 8, 'lightcoral'),
    'Cache': (10, 8, 'lightgray'),
    'Auth Service': (4, 6, 'lightblue'),
    'Rate Limiter': (4, 4, 'lightgreen'),
    'Vector DB': (8, 6, 'lightyellow'),
    'Monitoring': (6, 2, 'lightcoral')
}

# Draw components
for comp, (x, y, color) in components.items():
    plt.scatter(x, y, s=2000, c=color, edgecolors='black', linewidth=2)
    plt.text(x, y, comp, ha='center', va='center', fontsize=10, weight='bold')

# Draw connections
connections = [
    ((2, 8), (4, 8)),
    ((4, 8), (6, 8)),
    ((6, 8), (8, 8)),
    ((8, 8), (10, 8)),
    ((4, 8), (4, 6)),
    ((4, 8), (4, 4)),
    ((8, 8), (8, 6)),
    ((6, 8), (6, 2)),
    ((8, 8), (6, 2))
]

for (x1, y1), (x2, y2) in connections:
    plt.arrow(x1, y1, x2-x1, y2-y1, head_width=0.2, head_length=0.1, 
              fc='gray', ec='gray', alpha=0.5)

plt.xlim(0, 12)
plt.ylim(0, 10)
plt.axis('off')
plt.title('Transformer Application Architecture', fontsize=16, weight='bold')
plt.show()

print("Key architectural components:")
print("1. API Gateway: Routes requests and handles authentication")
print("2. Load Balancer: Distributes traffic across model servers")
print("3. Model Servers: Run transformer inference")
print("4. Cache: Stores frequent responses")
print("5. Vector DB: Stores embeddings for retrieval")
print("6. Monitoring: Tracks performance and usage")

## 2. Building a Conversational AI System

Let's implement a production-ready chatbot with advanced features.

In [None]:
# Simple transformer model for demonstration
class SimpleTransformer(nn.Module):
    def __init__(self, vocab_size=30000, d_model=512, nhead=8, num_layers=6):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = nn.Parameter(torch.randn(1, 1000, d_model))
        
        encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward=2048)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        
        self.output_proj = nn.Linear(d_model, vocab_size)
        
    def forward(self, x):
        seq_len = x.size(1)
        x = self.embedding(x)
        x = x + self.pos_encoding[:, :seq_len, :]
        x = self.transformer(x)
        return self.output_proj(x)
    
    def generate(self, prompt_ids, max_length=50, temperature=0.7):
        """Simple generation method."""
        self.eval()
        generated = prompt_ids.clone()
        
        with torch.no_grad():
            for _ in range(max_length):
                outputs = self(generated)
                next_token_logits = outputs[:, -1, :] / temperature
                probs = F.softmax(next_token_logits, dim=-1)
                next_token = torch.multinomial(probs, 1)
                generated = torch.cat([generated, next_token], dim=1)
                
                if next_token.item() == 2:  # EOS token
                    break
                    
        return generated

# Create a simple model
model = SimpleTransformer().to(device)
print(f"Model created with {sum(p.numel() for p in model.parameters())/1e6:.1f}M parameters")

In [None]:
# Implement conversation manager
class ConversationManager:
    """Manage conversation history and context."""
    
    def __init__(self, max_history=10, context_window=2048):
        self.conversations = {}
        self.max_history = max_history
        self.context_window = context_window
        
    def get_history(self, conversation_id: str) -> List[Dict]:
        """Get conversation history."""
        return self.conversations.get(conversation_id, [])
    
    def add_turn(self, conversation_id: str, user_msg: str, assistant_msg: str):
        """Add a conversation turn."""
        if conversation_id not in self.conversations:
            self.conversations[conversation_id] = []
            
        self.conversations[conversation_id].append({
            'user': user_msg,
            'assistant': assistant_msg,
            'timestamp': time.time()
        })
        
        # Trim history if too long
        if len(self.conversations[conversation_id]) > self.max_history:
            self.conversations[conversation_id] = \
                self.conversations[conversation_id][-self.max_history:]
    
    def build_context(self, conversation_id: str, current_msg: str, 
                     persona: str = "helpful assistant") -> str:
        """Build context from history."""
        history = self.get_history(conversation_id)
        
        # Start with persona
        context_parts = [f"You are a {persona}.\n"]
        
        # Add recent history
        for turn in history[-5:]:  # Last 5 turns
            context_parts.append(f"User: {turn['user']}")
            context_parts.append(f"Assistant: {turn['assistant']}")
            
        # Add current message
        context_parts.append(f"User: {current_msg}")
        context_parts.append("Assistant:")
        
        return "\n".join(context_parts)

# Test conversation manager
conv_manager = ConversationManager()

# Simulate conversation
conv_id = "test_123"
conv_manager.add_turn(conv_id, "Hello!", "Hi there! How can I help you today?")
conv_manager.add_turn(conv_id, "What's the weather?", "I'd be happy to help with weather information.")

context = conv_manager.build_context(conv_id, "Thanks!")
print("Built context:")
print(context)

In [None]:
# Implement safety filter
class SafetyFilter:
    """Filter unsafe content."""
    
    def __init__(self):
        self.unsafe_patterns = [
            'violence', 'hate', 'harassment', 'self-harm',
            'sexual', 'illegal', 'deception'
        ]
        
    def is_safe(self, text: str) -> Tuple[bool, Optional[str]]:
        """Check if text is safe."""
        text_lower = text.lower()
        
        # Check for unsafe patterns
        for pattern in self.unsafe_patterns:
            if pattern in text_lower:
                return False, f"Content may contain {pattern}"
                
        return True, None
    
    def filter_response(self, response: str) -> str:
        """Filter or modify unsafe response."""
        is_safe, reason = self.is_safe(response)
        
        if not is_safe:
            return "I'm sorry, but I can't provide that type of response."
            
        return response

# Implement response cache
class ResponseCache:
    """Cache frequent responses."""
    
    def __init__(self, max_size=1000, ttl=3600):
        self.cache = {}
        self.access_times = {}
        self.max_size = max_size
        self.ttl = ttl
        
    def get(self, key: str) -> Optional[str]:
        """Get cached response."""
        if key in self.cache:
            # Check if expired
            if time.time() - self.access_times[key] < self.ttl:
                self.access_times[key] = time.time()  # Update access time
                return self.cache[key]
            else:
                # Remove expired entry
                del self.cache[key]
                del self.access_times[key]
                
        return None
    
    def set(self, key: str, value: str):
        """Cache response."""
        # Evict oldest if at capacity
        if len(self.cache) >= self.max_size:
            oldest_key = min(self.access_times, key=self.access_times.get)
            del self.cache[oldest_key]
            del self.access_times[oldest_key]
            
        self.cache[key] = value
        self.access_times[key] = time.time()

# Test safety and caching
safety_filter = SafetyFilter()
cache = ResponseCache()

# Test safety filter
test_messages = [
    "How do I bake a cake?",
    "Tell me about violence",  # Unsafe
    "What's the capital of France?"
]

for msg in test_messages:
    is_safe, reason = safety_filter.is_safe(msg)
    print(f"Message: '{msg}' - Safe: {is_safe}")
    if not is_safe:
        print(f"  Reason: {reason}")

In [None]:
# Complete conversational AI system
class ConversationalAI:
    """Production-ready conversational AI."""
    
    def __init__(self, model, config=None):
        self.model = model
        self.config = config or {}
        
        # Components
        self.conv_manager = ConversationManager()
        self.safety_filter = SafetyFilter()
        self.cache = ResponseCache()
        
        # Metrics
        self.metrics = {
            'total_requests': 0,
            'cache_hits': 0,
            'safety_blocks': 0,
            'avg_latency': 0
        }
        
    def chat(self, message: str, conversation_id: str = "default") -> Dict[str, Any]:
        """Process chat message."""
        start_time = time.time()
        self.metrics['total_requests'] += 1
        
        # Safety check on input
        is_safe, reason = self.safety_filter.is_safe(message)
        if not is_safe:
            self.metrics['safety_blocks'] += 1
            return {
                'response': "I'm sorry, but I can't process that type of message.",
                'filtered': True,
                'reason': reason
            }
        
        # Check cache
        cache_key = f"{conversation_id}:{message}"
        cached_response = self.cache.get(cache_key)
        if cached_response:
            self.metrics['cache_hits'] += 1
            return {
                'response': cached_response,
                'cached': True,
                'latency': time.time() - start_time
            }
        
        # Build context
        context = self.conv_manager.build_context(
            conversation_id, message, 
            self.config.get('persona', 'helpful AI assistant')
        )
        
        # Generate response (simplified)
        response = self._generate_response(context)
        
        # Safety filter on output
        response = self.safety_filter.filter_response(response)
        
        # Update conversation history
        self.conv_manager.add_turn(conversation_id, message, response)
        
        # Cache response
        self.cache.set(cache_key, response)
        
        # Update metrics
        latency = time.time() - start_time
        self.metrics['avg_latency'] = (
            self.metrics['avg_latency'] * 0.9 + latency * 0.1
        )
        
        return {
            'response': response,
            'latency': latency,
            'conversation_id': conversation_id
        }
    
    def _generate_response(self, context: str) -> str:
        """Generate response from model."""
        # Simplified tokenization
        tokens = [hash(word) % 30000 for word in context.split()]
        input_ids = torch.tensor([tokens]).to(device)
        
        # Generate
        with torch.no_grad():
            output_ids = self.model.generate(input_ids, max_length=50)
            
        # Simplified decoding
        response = f"Generated response to: {context.split('User:')[-1].split('Assistant:')[0].strip()}"
        return response
    
    def get_metrics(self) -> Dict[str, Any]:
        """Get system metrics."""
        metrics = self.metrics.copy()
        if metrics['total_requests'] > 0:
            metrics['cache_hit_rate'] = metrics['cache_hits'] / metrics['total_requests']
            metrics['safety_block_rate'] = metrics['safety_blocks'] / metrics['total_requests']
        return metrics

# Create and test chatbot
chatbot = ConversationalAI(model, config={'persona': 'friendly AI assistant'})

# Simulate conversation
test_messages = [
    "Hello! How are you?",
    "What can you help me with?",
    "Tell me a joke",
    "Hello! How are you?"  # Duplicate to test caching
]

for msg in test_messages:
    result = chatbot.chat(msg, "demo_session")
    print(f"\nUser: {msg}")
    print(f"Assistant: {result['response']}")
    print(f"Latency: {result.get('latency', 0):.3f}s")
    if result.get('cached'):
        print("(Retrieved from cache)")

# Show metrics
print("\nChatbot Metrics:")
for key, value in chatbot.get_metrics().items():
    print(f"  {key}: {value}")

## 3. Machine Translation System

Let's build a production-ready translation system with quality estimation.

In [None]:
# Language detection (simplified)
class LanguageDetector:
    """Detect language of text."""
    
    def __init__(self):
        # Simple word-based detection
        self.language_words = {
            'en': ['the', 'is', 'are', 'and', 'to', 'of'],
            'es': ['el', 'la', 'es', 'son', 'de', 'que'],
            'fr': ['le', 'la', 'est', 'sont', 'de', 'que'],
            'de': ['der', 'die', 'das', 'ist', 'sind', 'und']
        }
        
    def detect(self, text: str) -> str:
        """Detect language of text."""
        text_lower = text.lower().split()
        scores = {}
        
        for lang, words in self.language_words.items():
            score = sum(1 for word in text_lower if word in words)
            scores[lang] = score
            
        return max(scores, key=scores.get) if scores else 'en'

# Quality estimator
class TranslationQualityEstimator:
    """Estimate translation quality."""
    
    def estimate(self, source: str, translation: str, 
                source_lang: str, target_lang: str) -> float:
        """Estimate quality score (0-1)."""
        # Simple heuristics
        score = 1.0
        
        # Length ratio check
        length_ratio = len(translation.split()) / max(len(source.split()), 1)
        if length_ratio < 0.5 or length_ratio > 2.0:
            score *= 0.8
            
        # Check for repeated words (indication of errors)
        words = translation.split()
        if len(words) > 0:
            unique_ratio = len(set(words)) / len(words)
            if unique_ratio < 0.7:
                score *= 0.9
                
        # Language pair difficulty
        difficult_pairs = [('zh', 'en'), ('ar', 'en'), ('ja', 'en')]
        if (source_lang, target_lang) in difficult_pairs:
            score *= 0.95
            
        return max(0.0, min(1.0, score))

# Test language detection
detector = LanguageDetector()
test_texts = [
    "Hello, how are you today?",
    "Hola, ¿cómo estás?",
    "Bonjour, comment allez-vous?",
    "Guten Tag, wie geht es Ihnen?"
]

for text in test_texts:
    lang = detector.detect(text)
    print(f"Text: '{text}' -> Language: {lang}")

In [None]:
# Translation system
class TranslationSystem:
    """Production translation system."""
    
    def __init__(self, models: Dict[str, nn.Module]):
        self.models = models  # {"en-es": model, "es-en": model, ...}
        self.detector = LanguageDetector()
        self.quality_estimator = TranslationQualityEstimator()
        self.cache = ResponseCache(max_size=5000)
        
        # Translation memories
        self.translation_memory = self._load_translation_memory()
        
    def _load_translation_memory(self) -> Dict[str, Dict[str, str]]:
        """Load translation memory database."""
        # Simplified TM
        return {
            'en-es': {
                'hello': 'hola',
                'thank you': 'gracias',
                'goodbye': 'adiós'
            },
            'es-en': {
                'hola': 'hello',
                'gracias': 'thank you',
                'adiós': 'goodbye'
            }
        }
        
    def translate(self, text: str, target_lang: str, 
                 source_lang: Optional[str] = None) -> Dict[str, Any]:
        """Translate text with quality estimation."""
        # Detect source language if not provided
        if not source_lang:
            source_lang = self.detector.detect(text)
            
        # Check if translation needed
        if source_lang == target_lang:
            return {
                'translation': text,
                'source_language': source_lang,
                'target_language': target_lang,
                'quality_score': 1.0,
                'method': 'no_translation_needed'
            }
            
        # Check cache
        cache_key = f"{source_lang}-{target_lang}:{text}"
        cached = self.cache.get(cache_key)
        if cached:
            return json.loads(cached)
            
        # Check translation memory
        lang_pair = f"{source_lang}-{target_lang}"
        if lang_pair in self.translation_memory:
            tm = self.translation_memory[lang_pair]
            if text.lower() in tm:
                translation = tm[text.lower()]
                result = {
                    'translation': translation,
                    'source_language': source_lang,
                    'target_language': target_lang,
                    'quality_score': 1.0,
                    'method': 'translation_memory'
                }
                self.cache.set(cache_key, json.dumps(result))
                return result
                
        # Use model
        if lang_pair in self.models:
            translation = self._translate_with_model(text, self.models[lang_pair])
        else:
            # Try pivot translation through English
            translation = self._pivot_translate(text, source_lang, target_lang)
            
        # Estimate quality
        quality_score = self.quality_estimator.estimate(
            text, translation, source_lang, target_lang
        )
        
        result = {
            'translation': translation,
            'source_language': source_lang,
            'target_language': target_lang,
            'quality_score': quality_score,
            'method': 'neural_translation'
        }
        
        # Cache result
        self.cache.set(cache_key, json.dumps(result))
        
        return result
        
    def _translate_with_model(self, text: str, model: nn.Module) -> str:
        """Translate using neural model."""
        # Simplified - in practice, use proper tokenization
        return f"[Translated] {text}"
        
    def _pivot_translate(self, text: str, source_lang: str, target_lang: str) -> str:
        """Translate through pivot language (English)."""
        # First translate to English
        if source_lang != 'en':
            intermediate = self._translate_with_model(
                text, self.models.get(f"{source_lang}-en", None)
            )
        else:
            intermediate = text
            
        # Then translate to target
        if target_lang != 'en':
            final = self._translate_with_model(
                intermediate, self.models.get(f"en-{target_lang}", None)
            )
        else:
            final = intermediate
            
        return final
    
    def batch_translate(self, texts: List[str], target_lang: str,
                       source_lang: Optional[str] = None) -> List[Dict[str, Any]]:
        """Translate multiple texts efficiently."""
        results = []
        
        # Group by detected language if not provided
        if not source_lang:
            lang_groups = defaultdict(list)
            for text in texts:
                lang = self.detector.detect(text)
                lang_groups[lang].append(text)
        else:
            lang_groups = {source_lang: texts}
            
        # Translate each group
        for lang, group_texts in lang_groups.items():
            for text in group_texts:
                result = self.translate(text, target_lang, lang)
                results.append(result)
                
        return results

# Create translation system with dummy models
translation_models = {
    'en-es': model,  # Reusing our simple model
    'es-en': model,
    'en-fr': model,
    'fr-en': model
}

translator = TranslationSystem(translation_models)

# Test translations
test_cases = [
    ("Hello", "es"),
    ("Thank you", "es"),
    ("How are you today?", "fr"),
    ("Hola", "en", "es")
]

for test in test_cases:
    if len(test) == 2:
        text, target = test
        result = translator.translate(text, target)
    else:
        text, target, source = test
        result = translator.translate(text, target, source)
        
    print(f"\nSource: {result['source_language']} -> Target: {result['target_language']}")
    print(f"Original: {text}")
    print(f"Translation: {result['translation']}")
    print(f"Quality Score: {result['quality_score']:.2f}")
    print(f"Method: {result['method']}")

## 4. Code Generation Assistant

Let's build an AI-powered code generation and assistance system.

In [None]:
# Code analyzer
class CodeAnalyzer:
    """Analyze code for various properties."""
    
    def __init__(self):
        self.language_patterns = {
            'python': {'def ', 'import ', 'class ', ':', 'print('},
            'javascript': {'function ', 'const ', 'let ', '=>', 'console.log'},
            'java': {'public ', 'class ', 'void ', 'static ', 'System.out'}
        }
        
    def detect_language(self, code: str) -> str:
        """Detect programming language."""
        scores = {}
        
        for lang, patterns in self.language_patterns.items():
            score = sum(1 for pattern in patterns if pattern in code)
            scores[lang] = score
            
        return max(scores, key=scores.get) if scores else 'unknown'
    
    def analyze_complexity(self, code: str) -> Dict[str, int]:
        """Analyze code complexity."""
        lines = code.split('\n')
        
        return {
            'lines_of_code': len(lines),
            'non_empty_lines': len([l for l in lines if l.strip()]),
            'functions': code.count('def ') + code.count('function '),
            'classes': code.count('class '),
            'loops': code.count('for ') + code.count('while '),
            'conditions': code.count('if ') + code.count('else')
        }
    
    def check_syntax_patterns(self, code: str, language: str) -> List[str]:
        """Check for common syntax issues."""
        issues = []
        
        if language == 'python':
            # Check indentation (simplified)
            lines = code.split('\n')
            for i, line in enumerate(lines):
                if line.strip() and line[0] not in ' \t' and i > 0:
                    prev_line = lines[i-1].strip()
                    if prev_line.endswith(':'):
                        issues.append(f"Line {i+1}: Expected indentation after ':'")
                        
        return issues

# Security scanner
class SecurityScanner:
    """Scan code for security issues."""
    
    def __init__(self):
        self.vulnerability_patterns = {
            'eval_usage': (r'\beval\s*\(', 'Use of eval() is dangerous'),
            'sql_injection': (r'".*SELECT.*FROM.*"\s*\+', 'Potential SQL injection'),
            'hardcoded_secrets': (r'(password|api_key|secret)\s*=\s*["\']\w+["\']', 
                                'Hardcoded secrets detected'),
            'unsafe_random': (r'\brandom\.random\(\)', 
                            'Use of non-cryptographic random for security')
        }
        
    def scan(self, code: str) -> List[Dict[str, str]]:
        """Scan code for security issues."""
        issues = []
        
        for vuln_type, (pattern, message) in self.vulnerability_patterns.items():
            import re
            if re.search(pattern, code, re.IGNORECASE):
                issues.append({
                    'type': vuln_type,
                    'severity': 'high' if 'eval' in vuln_type else 'medium',
                    'message': message
                })
                
        return issues

# Test code analyzer
analyzer = CodeAnalyzer()
scanner = SecurityScanner()

test_code = """def calculate_sum(numbers):
    total = 0
    for num in numbers:
        total += num
    return total

class Calculator:
    def __init__(self):
        self.result = 0
    
    def add(self, x, y):
        return x + y
"""

print("Code Analysis:")
print(f"Detected language: {analyzer.detect_language(test_code)}")
print(f"Complexity metrics: {analyzer.analyze_complexity(test_code)}")

# Test security scanner
unsafe_code = 'result = eval(user_input)'
security_issues = scanner.scan(unsafe_code)
print(f"\nSecurity issues found: {len(security_issues)}")
for issue in security_issues:
    print(f"  - {issue['type']}: {issue['message']} (Severity: {issue['severity']})")

In [None]:
# Code generation assistant
class CodeAssistant:
    """AI-powered code generation and assistance."""
    
    def __init__(self, model):
        self.model = model
        self.analyzer = CodeAnalyzer()
        self.scanner = SecurityScanner()
        self.code_templates = self._load_templates()
        
    def _load_templates(self) -> Dict[str, str]:
        """Load code templates."""
        return {
            'python_function': '''def {name}({params}):
    """{docstring}"""
    {body}
''',
            'python_class': '''class {name}:
    """{docstring}"""
    
    def __init__(self{params}):
        {init_body}
''',
            'javascript_function': '''function {name}({params}) {{
    // {description}
    {body}
}}
'''
        }
        
    def generate_code(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Generate code from request."""
        task_type = request.get('type', 'function')
        language = request.get('language', 'python')
        description = request.get('description', '')
        
        # Use template if available
        template_key = f"{language}_{task_type}"
        if template_key in self.code_templates:
            code = self._generate_from_template(request, template_key)
        else:
            code = self._generate_freeform(description, language)
            
        # Analyze generated code
        complexity = self.analyzer.analyze_complexity(code)
        security_issues = self.scanner.scan(code)
        
        return {
            'code': code,
            'language': language,
            'complexity': complexity,
            'security_issues': security_issues,
            'needs_review': len(security_issues) > 0
        }
        
    def _generate_from_template(self, request: Dict, template_key: str) -> str:
        """Generate code using template."""
        template = self.code_templates[template_key]
        
        # Fill template (simplified)
        code = template.format(
            name=request.get('name', 'function_name'),
            params=request.get('params', ''),
            docstring=request.get('description', 'Description here'),
            body='pass  # TODO: Implement',
            init_body='pass  # TODO: Initialize',
            description=request.get('description', '')
        )
        
        return code
        
    def _generate_freeform(self, description: str, language: str) -> str:
        """Generate code without template."""
        # Simplified - would use model in practice
        return f"# Generated {language} code for: {description}\n# TODO: Implement\npass"
        
    def complete_code(self, partial_code: str, context: Dict = None) -> Dict[str, Any]:
        """Complete partial code."""
        language = self.analyzer.detect_language(partial_code)
        
        # Find insertion point
        lines = partial_code.split('\n')
        last_line = lines[-1] if lines else ''
        
        # Generate completion (simplified)
        if 'def' in last_line and language == 'python':
            completion = ':\n    """Function description."""\n    pass'
        elif 'class' in last_line and language == 'python':
            completion = ':\n    """Class description."""\n    \n    def __init__(self):\n        pass'
        else:
            completion = '  # TODO: Complete implementation'
            
        completed_code = partial_code + completion
        
        return {
            'completed_code': completed_code,
            'completion': completion,
            'language': language,
            'confidence': 0.85
        }
        
    def explain_code(self, code: str) -> Dict[str, Any]:
        """Generate explanation for code."""
        language = self.analyzer.detect_language(code)
        complexity = self.analyzer.analyze_complexity(code)
        
        # Generate explanation (simplified)
        explanation = {
            'summary': f"This {language} code contains {complexity['functions']} functions and {complexity['classes']} classes.",
            'complexity': complexity,
            'key_concepts': self._extract_concepts(code, language),
            'suggestions': self._generate_suggestions(code, complexity)
        }
        
        return explanation
        
    def _extract_concepts(self, code: str, language: str) -> List[str]:
        """Extract key programming concepts."""
        concepts = []
        
        if 'class' in code:
            concepts.append('Object-Oriented Programming')
        if 'for' in code or 'while' in code:
            concepts.append('Loops')
        if 'if' in code:
            concepts.append('Conditional Logic')
        if 'def' in code or 'function' in code:
            concepts.append('Functions')
            
        return concepts
        
    def _generate_suggestions(self, code: str, complexity: Dict) -> List[str]:
        """Generate improvement suggestions."""
        suggestions = []
        
        if complexity['lines_of_code'] > 50:
            suggestions.append("Consider breaking down into smaller functions")
        if complexity['functions'] == 0 and complexity['lines_of_code'] > 10:
            suggestions.append("Consider organizing code into functions")
        if 'TODO' in code:
            suggestions.append("Complete TODO items")
            
        return suggestions

# Create code assistant
code_assistant = CodeAssistant(model)

# Test code generation
print("=== Code Generation ===")
generation_request = {
    'type': 'function',
    'language': 'python',
    'name': 'calculate_fibonacci',
    'params': 'n: int',
    'description': 'Calculate the nth Fibonacci number'
}

result = code_assistant.generate_code(generation_request)
print(f"Generated {result['language']} code:")
print(result['code'])
print(f"Security issues: {len(result['security_issues'])}")

# Test code completion
print("\n=== Code Completion ===")
partial = "def process_data(data"
completion_result = code_assistant.complete_code(partial)
print(f"Partial code: {partial}")
print(f"Completion: {completion_result['completion']}")
print(f"Confidence: {completion_result['confidence']}")

# Test code explanation
print("\n=== Code Explanation ===")
explanation = code_assistant.explain_code(test_code)
print(f"Summary: {explanation['summary']}")
print(f"Key concepts: {', '.join(explanation['key_concepts'])}")
print(f"Suggestions: {explanation['suggestions']}")

## 5. Content Generation System

Let's build a system for generating various types of content.

In [None]:
# SEO optimizer
class SEOOptimizer:
    """Optimize content for search engines."""
    
    def optimize(self, content: str, keywords: List[str]) -> str:
        """Optimize content for keywords."""
        optimized = content
        
        # Ensure keywords appear in content (simplified)
        for keyword in keywords:
            if keyword.lower() not in optimized.lower():
                # Add keyword naturally (simplified)
                optimized += f"\n\nThis content is relevant to {keyword}."
                
        return optimized
        
    def analyze_seo(self, content: str, keywords: List[str]) -> Dict[str, Any]:
        """Analyze SEO metrics."""
        content_lower = content.lower()
        word_count = len(content.split())
        
        # Keyword density
        keyword_counts = {}
        for keyword in keywords:
            count = content_lower.count(keyword.lower())
            density = (count / word_count) * 100 if word_count > 0 else 0
            keyword_counts[keyword] = {
                'count': count,
                'density': density
            }
            
        # Calculate SEO score
        total_keyword_presence = sum(1 for k in keywords if k.lower() in content_lower)
        keyword_coverage = total_keyword_presence / len(keywords) if keywords else 0
        
        # Check for good SEO practices
        has_headings = any(marker in content for marker in ['#', '<h1>', '<h2>'])
        good_length = 300 <= word_count <= 2000
        
        seo_score = (
            keyword_coverage * 0.4 +
            (1.0 if has_headings else 0.0) * 0.3 +
            (1.0 if good_length else 0.5) * 0.3
        )
        
        return {
            'seo_score': seo_score,
            'word_count': word_count,
            'keyword_analysis': keyword_counts,
            'has_headings': has_headings,
            'recommendations': self._get_seo_recommendations(seo_score, keyword_counts)
        }
        
    def _get_seo_recommendations(self, score: float, keyword_analysis: Dict) -> List[str]:
        """Generate SEO recommendations."""
        recommendations = []
        
        if score < 0.7:
            recommendations.append("Improve keyword usage and content structure")
            
        for keyword, data in keyword_analysis.items():
            if data['density'] < 0.5:
                recommendations.append(f"Increase usage of keyword '{keyword}'")
            elif data['density'] > 3.0:
                recommendations.append(f"Reduce keyword stuffing for '{keyword}'")
                
        return recommendations

# Test SEO optimizer
seo_optimizer = SEOOptimizer()

test_content = """# Introduction to Machine Learning

Machine learning is a subset of artificial intelligence that enables computers to learn from data.
In this guide, we'll explore the fundamentals of ML and its applications.
"""

keywords = ['machine learning', 'artificial intelligence', 'data science']

seo_analysis = seo_optimizer.analyze_seo(test_content, keywords)
print("SEO Analysis:")
print(f"SEO Score: {seo_analysis['seo_score']:.2f}")
print(f"Word Count: {seo_analysis['word_count']}")
print("\nKeyword Analysis:")
for keyword, data in seo_analysis['keyword_analysis'].items():
    print(f"  {keyword}: {data['count']} occurrences ({data['density']:.1f}% density)")
print(f"\nRecommendations: {seo_analysis['recommendations']}")

In [None]:
# Content generator
class ContentGenerator:
    """Multi-purpose content generation system."""
    
    def __init__(self, model):
        self.model = model
        self.seo_optimizer = SEOOptimizer()
        self.templates = self._load_content_templates()
        
    def _load_content_templates(self) -> Dict[str, str]:
        """Load content templates."""
        return {
            'blog_intro': """{hook}

In this article, we'll explore {topic} and discover {benefits}.

## What is {topic}?

{definition}

## Why {topic} Matters

{importance}
""",
            'product_description': """{product_name} - {tagline}

**Key Features:**
{features}

**Benefits:**
{benefits}

**Perfect for:** {target_audience}
""",
            'social_media': """{emoji} {hook}

{main_point}

{call_to_action} {hashtags}
"""
        }
        
    def generate_article(self, topic: str, style: str = 'informative',
                        keywords: List[str] = None, word_count: int = 500) -> Dict[str, Any]:
        """Generate complete article."""
        # Generate outline
        outline = self._generate_outline(topic, word_count)
        
        # Generate content for each section
        sections = []
        current_words = 0
        
        for section in outline:
            section_content = self._generate_section(
                section['title'], 
                section['points'],
                style,
                target_words=(word_count - current_words) // (len(outline) - len(sections))
            )
            sections.append({
                'title': section['title'],
                'content': section_content
            })
            current_words += len(section_content.split())
            
        # Combine sections
        article = self._combine_sections(sections)
        
        # SEO optimization
        if keywords:
            article = self.seo_optimizer.optimize(article, keywords)
            
        # Analyze final article
        analysis = {
            'word_count': len(article.split()),
            'sections': len(sections),
            'readability_score': self._calculate_readability(article)
        }
        
        if keywords:
            analysis['seo'] = self.seo_optimizer.analyze_seo(article, keywords)
            
        return {
            'content': article,
            'outline': outline,
            'analysis': analysis
        }
        
    def _generate_outline(self, topic: str, target_words: int) -> List[Dict]:
        """Generate article outline."""
        # Simplified outline generation
        sections_count = max(3, min(7, target_words // 150))
        
        outline = [
            {'title': 'Introduction', 'points': ['Hook', 'Overview', 'Thesis']},
            {'title': f'Understanding {topic}', 'points': ['Definition', 'History', 'Importance']},
        ]
        
        # Add middle sections
        for i in range(sections_count - 3):
            outline.append({
                'title': f'Key Aspect {i+1}',
                'points': ['Main idea', 'Examples', 'Analysis']
            })
            
        outline.append({'title': 'Conclusion', 'points': ['Summary', 'Key takeaways', 'Call to action']})
        
        return outline
        
    def _generate_section(self, title: str, points: List[str], 
                         style: str, target_words: int) -> str:
        """Generate section content."""
        # Simplified content generation
        content = f"## {title}\n\n"
        
        words_per_point = target_words // len(points)
        
        for point in points:
            # Generate paragraph for each point
            paragraph = f"{point}: " + " ".join([
                f"This is content about {point.lower()} in {style} style."
                for _ in range(words_per_point // 10)
            ])
            content += paragraph + "\n\n"
            
        return content
        
    def _combine_sections(self, sections: List[Dict]) -> str:
        """Combine sections into article."""
        article_parts = []
        
        for section in sections:
            article_parts.append(section['content'])
            
        return "\n".join(article_parts)
        
    def _calculate_readability(self, text: str) -> float:
        """Calculate readability score."""
        sentences = text.split('.')
        words = text.split()
        
        if not sentences or not words:
            return 0.0
            
        avg_sentence_length = len(words) / len(sentences)
        
        # Simple readability score (inverse of complexity)
        if avg_sentence_length < 15:
            return 0.9  # Very readable
        elif avg_sentence_length < 20:
            return 0.7  # Readable
        elif avg_sentence_length < 25:
            return 0.5  # Moderate
        else:
            return 0.3  # Complex
            
    def generate_marketing_copy(self, product_info: Dict[str, Any],
                              platform: str = 'website') -> Dict[str, Any]:
        """Generate marketing copy."""
        if platform == 'website' and 'product_description' in self.templates:
            template = self.templates['product_description']
            
            # Format features and benefits
            features_text = '\n'.join([f"- {f}" for f in product_info.get('features', [])])
            benefits_text = '\n'.join([f"- {b}" for b in product_info.get('benefits', [])])
            
            copy = template.format(
                product_name=product_info.get('name', 'Product'),
                tagline=product_info.get('tagline', 'Your perfect solution'),
                features=features_text,
                benefits=benefits_text,
                target_audience=product_info.get('target_audience', 'everyone')
            )
        else:
            # Generate without template
            copy = f"{product_info.get('name', 'Product')} - {product_info.get('tagline', 'Great product')}\n\n"
            copy += "Discover the perfect solution for your needs."
            
        return {
            'copy': copy,
            'platform': platform,
            'word_count': len(copy.split()),
            'optimized_for': 'conversion'
        }

# Create content generator
content_gen = ContentGenerator(model)

# Test article generation
print("=== Article Generation ===")
article_result = content_gen.generate_article(
    topic="Artificial Intelligence",
    style="educational",
    keywords=["AI", "machine learning", "future"],
    word_count=300
)

print(f"Generated article outline:")
for section in article_result['outline']:
    print(f"  - {section['title']}")
print(f"\nWord count: {article_result['analysis']['word_count']}")
print(f"Readability score: {article_result['analysis']['readability_score']:.2f}")

# Test marketing copy
print("\n=== Marketing Copy Generation ===")
product_info = {
    'name': 'SmartAssist Pro',
    'tagline': 'Your AI-powered productivity companion',
    'features': [
        'Real-time suggestions',
        'Multi-language support',
        'Cloud synchronization'
    ],
    'benefits': [
        'Save 2 hours daily',
        'Reduce errors by 90%',
        'Work from anywhere'
    ],
    'target_audience': 'busy professionals'
}

marketing_result = content_gen.generate_marketing_copy(product_info)
print("Generated marketing copy:")
print(marketing_result['copy'])

## 6. Production Deployment Patterns

Let's explore best practices for deploying transformer applications at scale.

In [None]:
# Model server with batching
class ModelServer:
    """Production model server with batching and optimization."""
    
    def __init__(self, model, config=None):
        self.model = model
        self.config = config or {}
        
        # Batching configuration
        self.batch_size = self.config.get('batch_size', 32)
        self.batch_timeout = self.config.get('batch_timeout', 0.1)  # seconds
        
        # Request queue
        self.request_queue = []
        self.results = {}
        
        # Performance monitoring
        self.metrics = {
            'total_requests': 0,
            'total_batches': 0,
            'avg_batch_size': 0,
            'avg_latency': 0
        }
        
    def process_request(self, request_id: str, input_data: Any) -> str:
        """Process single request."""
        self.metrics['total_requests'] += 1
        
        # Add to queue
        self.request_queue.append({
            'id': request_id,
            'data': input_data,
            'timestamp': time.time()
        })
        
        # Check if batch should be processed
        if len(self.request_queue) >= self.batch_size or \
           (self.request_queue and 
            time.time() - self.request_queue[0]['timestamp'] > self.batch_timeout):
            self._process_batch()
            
        # Wait for result (simplified - use async in practice)
        while request_id not in self.results:
            time.sleep(0.01)
            
        return self.results.pop(request_id)
        
    def _process_batch(self):
        """Process batch of requests."""
        if not self.request_queue:
            return
            
        batch = self.request_queue[:self.batch_size]
        self.request_queue = self.request_queue[self.batch_size:]
        
        self.metrics['total_batches'] += 1
        self.metrics['avg_batch_size'] = (
            self.metrics['avg_batch_size'] * 0.9 + len(batch) * 0.1
        )
        
        start_time = time.time()
        
        # Batch process (simplified)
        batch_results = self._run_model_batch([r['data'] for r in batch])
        
        # Store results
        for request, result in zip(batch, batch_results):
            self.results[request['id']] = result
            
        # Update latency
        latency = time.time() - start_time
        self.metrics['avg_latency'] = (
            self.metrics['avg_latency'] * 0.9 + latency * 0.1
        )
        
    def _run_model_batch(self, batch_data: List[Any]) -> List[str]:
        """Run model on batch."""
        # Simplified batch processing
        return [f"Processed: {data}" for data in batch_data]
        
    def get_metrics(self) -> Dict[str, Any]:
        """Get server metrics."""
        metrics = self.metrics.copy()
        if metrics['total_batches'] > 0:
            metrics['requests_per_batch'] = (
                metrics['total_requests'] / metrics['total_batches']
            )
        return metrics

# Load balancer
class LoadBalancer:
    """Distribute requests across multiple model servers."""
    
    def __init__(self, servers: List[ModelServer]):
        self.servers = servers
        self.current_server = 0
        self.server_loads = [0] * len(servers)
        
    def route_request(self, request_id: str, input_data: Any) -> str:
        """Route request to least loaded server."""
        # Find least loaded server
        min_load_idx = self.server_loads.index(min(self.server_loads))
        
        # Update load
        self.server_loads[min_load_idx] += 1
        
        # Process request
        result = self.servers[min_load_idx].process_request(request_id, input_data)
        
        # Update load
        self.server_loads[min_load_idx] -= 1
        
        return result
        
    def get_server_metrics(self) -> List[Dict[str, Any]]:
        """Get metrics from all servers."""
        return [server.get_metrics() for server in self.servers]

# Test deployment patterns
print("=== Model Server Testing ===")

# Create model servers
servers = [ModelServer(model, {'batch_size': 4}) for _ in range(3)]
load_balancer = LoadBalancer(servers)

# Simulate requests
import uuid

print("Simulating 20 requests...")
results = []
for i in range(20):
    request_id = str(uuid.uuid4())
    result = load_balancer.route_request(request_id, f"Request {i}")
    results.append(result)
    
print(f"\nProcessed {len(results)} requests")

# Show metrics
print("\nServer Metrics:")
for i, metrics in enumerate(load_balancer.get_server_metrics()):
    print(f"\nServer {i+1}:")
    print(f"  Total requests: {metrics['total_requests']}")
    print(f"  Total batches: {metrics['total_batches']}")
    print(f"  Avg batch size: {metrics['avg_batch_size']:.1f}")
    print(f"  Avg latency: {metrics['avg_latency']:.3f}s")

In [None]:
# Production monitoring
class ApplicationMonitor:
    """Monitor transformer applications in production."""
    
    def __init__(self):
        self.metrics = defaultdict(list)
        self.alerts = []
        self.thresholds = {
            'error_rate': 0.05,
            'latency_p99': 1.0,
            'memory_usage': 0.9
        }
        
    def record_metric(self, metric_name: str, value: float, timestamp: float = None):
        """Record metric value."""
        if timestamp is None:
            timestamp = time.time()
            
        self.metrics[metric_name].append({
            'value': value,
            'timestamp': timestamp
        })
        
        # Check thresholds
        self._check_alerts(metric_name, value)
        
    def _check_alerts(self, metric_name: str, value: float):
        """Check if metric exceeds threshold."""
        if metric_name in self.thresholds:
            if value > self.thresholds[metric_name]:
                self.alerts.append({
                    'metric': metric_name,
                    'value': value,
                    'threshold': self.thresholds[metric_name],
                    'timestamp': time.time(),
                    'severity': 'high' if value > self.thresholds[metric_name] * 1.5 else 'medium'
                })
                
    def get_metrics_summary(self, window_minutes: int = 60) -> Dict[str, Dict[str, float]]:
        """Get metrics summary for time window."""
        current_time = time.time()
        window_start = current_time - (window_minutes * 60)
        
        summary = {}
        
        for metric_name, values in self.metrics.items():
            # Filter values in window
            window_values = [
                v['value'] for v in values 
                if v['timestamp'] >= window_start
            ]
            
            if window_values:
                summary[metric_name] = {
                    'count': len(window_values),
                    'mean': np.mean(window_values),
                    'p50': np.percentile(window_values, 50),
                    'p95': np.percentile(window_values, 95),
                    'p99': np.percentile(window_values, 99),
                    'max': max(window_values)
                }
                
        return summary
        
    def get_recent_alerts(self, limit: int = 10) -> List[Dict]:
        """Get recent alerts."""
        return sorted(self.alerts, key=lambda x: x['timestamp'], reverse=True)[:limit]
        
    def visualize_metrics(self, metric_name: str, window_minutes: int = 60):
        """Visualize metric over time."""
        current_time = time.time()
        window_start = current_time - (window_minutes * 60)
        
        # Get data points
        data_points = [
            v for v in self.metrics.get(metric_name, [])
            if v['timestamp'] >= window_start
        ]
        
        if not data_points:
            print(f"No data for metric: {metric_name}")
            return
            
        # Extract values and timestamps
        timestamps = [(p['timestamp'] - window_start) / 60 for p in data_points]
        values = [p['value'] for p in data_points]
        
        # Plot
        plt.figure(figsize=(10, 6))
        plt.plot(timestamps, values, 'b-', alpha=0.7)
        plt.scatter(timestamps, values, c='blue', alpha=0.5)
        
        # Add threshold line if exists
        if metric_name in self.thresholds:
            plt.axhline(y=self.thresholds[metric_name], color='r', 
                       linestyle='--', label=f'Threshold: {self.thresholds[metric_name]}')
            
        plt.xlabel('Time (minutes ago)')
        plt.ylabel(metric_name)
        plt.title(f'{metric_name} over last {window_minutes} minutes')
        plt.grid(True, alpha=0.3)
        plt.legend()
        plt.show()

# Test monitoring
monitor = ApplicationMonitor()

# Simulate metrics over time
print("Simulating application metrics...")
base_time = time.time() - 3600  # Start 1 hour ago

for i in range(100):
    timestamp = base_time + i * 36  # Every 36 seconds
    
    # Simulate metrics
    latency = 0.2 + np.random.normal(0, 0.05) + (0.5 if i > 70 else 0)
    error_rate = 0.01 + np.random.normal(0, 0.005) + (0.05 if i > 80 else 0)
    memory_usage = 0.6 + np.random.normal(0, 0.1) + (0.3 if i > 90 else 0)
    
    monitor.record_metric('latency_p99', latency, timestamp)
    monitor.record_metric('error_rate', error_rate, timestamp)
    monitor.record_metric('memory_usage', memory_usage, timestamp)

# Show metrics summary
print("\nMetrics Summary (last hour):")
summary = monitor.get_metrics_summary(60)
for metric, stats in summary.items():
    print(f"\n{metric}:")
    print(f"  Mean: {stats['mean']:.3f}")
    print(f"  P99: {stats['p99']:.3f}")
    print(f"  Max: {stats['max']:.3f}")

# Show alerts
alerts = monitor.get_recent_alerts()
if alerts:
    print(f"\nRecent Alerts ({len(alerts)} total):")
    for alert in alerts[:5]:
        print(f"  [{alert['severity'].upper()}] {alert['metric']}: {alert['value']:.3f} > {alert['threshold']}")

# Visualize metrics
monitor.visualize_metrics('latency_p99', 60)

## Summary and Best Practices

Let's summarize the key learnings and best practices for real-world transformer applications.

In [None]:
# Best practices summary
best_practices = {
    "Architecture": [
        "Use microservices architecture for scalability",
        "Implement proper API gateway for routing",
        "Use load balancing for high availability",
        "Cache frequently requested responses",
        "Implement circuit breakers for fault tolerance"
    ],
    "Performance": [
        "Batch requests for efficient GPU utilization",
        "Use model quantization for faster inference",
        "Implement request prioritization",
        "Monitor and optimize memory usage",
        "Use asynchronous processing where possible"
    ],
    "Safety": [
        "Implement comprehensive input/output filtering",
        "Regular safety audits and red teaming",
        "Monitor for harmful content",
        "Implement rate limiting",
        "Have incident response procedures"
    ],
    "Monitoring": [
        "Track key performance metrics",
        "Set up alerting for anomalies",
        "Log all requests for debugging",
        "Monitor model drift",
        "Regular A/B testing"
    ],
    "User Experience": [
        "Provide clear error messages",
        "Implement graceful degradation",
        "Show progress for long operations",
        "Collect user feedback",
        "Personalize responses when appropriate"
    ]
}

# Visualize best practices
fig, ax = plt.subplots(figsize=(12, 8))

# Create hierarchical visualization
y_pos = 0
colors = plt.cm.Set3(np.linspace(0, 1, len(best_practices)))

for i, (category, practices) in enumerate(best_practices.items()):
    # Category header
    ax.text(0, y_pos, category, fontsize=14, weight='bold', color=colors[i])
    y_pos -= 0.5
    
    # Practices
    for practice in practices:
        ax.text(0.5, y_pos, f"• {practice}", fontsize=10)
        y_pos -= 0.3
        
    y_pos -= 0.3

ax.set_xlim(-0.1, 10)
ax.set_ylim(y_pos, 1)
ax.axis('off')
ax.set_title('Best Practices for Real-world Transformer Applications', 
             fontsize=16, weight='bold', pad=20)

plt.tight_layout()
plt.show()

print("\n🎉 Congratulations!")
print("\nYou've completed the comprehensive transformer tutorial series!")
print("\nYou now have the knowledge to:")
print("✓ Build transformer models from scratch")
print("✓ Train and fine-tune at scale")
print("✓ Deploy production applications")
print("✓ Ensure safety and fairness")
print("✓ Monitor and optimize performance")
print("\nContinue learning and building amazing applications with transformers!")