In [1]:
from dataclasses import dataclass, field
from enum import Enum
from typing import List

class TaskComplexity(Enum):
    SIMPLE = "simple"      # Classification, extraction, formatting
    STANDARD = "standard"  # Summarization, Q&A, basic generation
    COMPLEX = "complex"    # Multi-step reasoning, analysis, debugging
    AGENTIC = "agentic"    # Tool use, planning, self-correction

class DataSensitivity(Enum):
    PUBLIC = "public"           # No restrictions
    INTERNAL = "internal"       # Business data, contractual API use OK
    SENSITIVE = "sensitive"     # PII, regulated—regional restrictions apply
    RESTRICTED = "restricted"   # Cannot leave your infrastructure

class LatencyTier(Enum):
    REALTIME = "realtime"       # < 500ms end-to-end
    INTERACTIVE = "interactive" # < 2s end-to-end
    BATCH = "batch"             # Minutes acceptable

class ModelClass(Enum):
    """Model classes representing capability/deployment combinations."""
    SMALL_OPEN = "small_open"           # Llama 8B, Mistral 7B, Phi-3
    SMALL_CLOSED = "small_closed"       # GPT-4o-mini, Claude Haiku
    MID_OPEN = "mid_open"               # Llama 70B, Mixtral 8x22B
    MID_CLOSED = "mid_closed"           # GPT-4o, Claude Sonnet
    FRONTIER = "frontier"               # Claude Opus, GPT-4.5
    SELF_HOSTED = "self_hosted"         # Any model, your infrastructure


@dataclass
class TaskProfile:
    """
    Encodes the four dimensions that drive model selection.
    
    Use this to characterize any LLM task before choosing a model.
    """
    name: str
    complexity: TaskComplexity
    sensitivity: DataSensitivity
    latency: LatencyTier
    daily_volume: int
    
    def requires_self_hosting(self) -> bool:
        """Restricted data mandates self-hosting."""
        return self.sensitivity == DataSensitivity.RESTRICTED
    
    def prefers_self_hosting(self) -> bool:
        """Sensitive data strongly prefers self-hosting."""
        return self.sensitivity in (DataSensitivity.SENSITIVE, 
                                     DataSensitivity.RESTRICTED)
    
    def is_cost_sensitive(self, threshold: int = 10000) -> bool:
        """High volume makes per-request cost significant."""
        return self.daily_volume >= threshold
    
    def is_latency_constrained(self) -> bool:
        """Real-time requirements limit model size."""
        return self.latency == LatencyTier.REALTIME


@dataclass
class ModelRecommendation:
    """A model recommendation with reasoning and trade-offs."""
    primary: ModelClass
    primary_examples: List[str]
    alternatives: List[ModelClass] = field(default_factory=list)
    reasoning: List[str] = field(default_factory=list)
    warnings: List[str] = field(default_factory=list)
    estimated_cost_per_1k: float = 0.0  # € per 1000 requests (2K tokens avg)


def recommend_model(profile: TaskProfile) -> ModelRecommendation:
    """
    Recommend a model class based on task profile.
    
    Implements the decision logic as executable code.
    The reasoning list explains each constraint applied.
    """
    reasoning = []
    warnings = []
    alternatives = []
    
    # Hard constraint: restricted data must self-host
    if profile.requires_self_hosting():
        reasoning.append("RESTRICTED data → must self-host (no external APIs)")
        
        if profile.complexity in (TaskComplexity.SIMPLE, TaskComplexity.STANDARD):
            examples = ["Llama 3.1 8B", "Mistral 7B", "Phi-3"]
            reasoning.append("Simple/standard task → small model sufficient")
            cost = 0.10  # Rough compute estimate
        else:
            examples = ["Llama 3.1 70B", "Mixtral 8x22B", "Qwen 72B"]
            reasoning.append("Complex task → larger self-hosted model needed")
            warnings.append("70B+ models require significant GPU infrastructure")
            cost = 0.50
        
        return ModelRecommendation(
            primary=ModelClass.SELF_HOSTED,
            primary_examples=examples,
            reasoning=reasoning,
            warnings=warnings,
            estimated_cost_per_1k=cost
        )
    
    # Soft constraint: sensitive data prefers self-hosting
    if profile.prefers_self_hosting():
        reasoning.append("SENSITIVE data → prefer self-hosted or regional provider")
        
        if profile.complexity == TaskComplexity.SIMPLE:
            return ModelRecommendation(
                primary=ModelClass.SMALL_OPEN,
                primary_examples=["Llama 3.1 8B (self-hosted)", "Mistral 7B"],
                alternatives=[ModelClass.SMALL_CLOSED],
                reasoning=reasoning + ["Simple task → small open model ideal"],
                warnings=["If using cloud API, ensure GDPR-compliant DPA in place"],
                estimated_cost_per_1k=0.10
            )
        elif profile.complexity == TaskComplexity.STANDARD:
            return ModelRecommendation(
                primary=ModelClass.MID_OPEN,
                primary_examples=["Llama 3.1 70B", "Mixtral 8x22B"],
                alternatives=[ModelClass.MID_CLOSED],
                reasoning=reasoning + ["Standard task → mid-tier open model"],
                warnings=["Cloud APIs (GPT-4o, Sonnet) viable with proper DPA"],
                estimated_cost_per_1k=0.50
            )
        else:  # COMPLEX or AGENTIC
            reasoning.append("Complex task with sensitive data → trade-off required")
            warnings.append("Best open models lag frontier by ~6 months on reasoning")
            warnings.append("Consider: Can you decompose into sensitive + non-sensitive parts?")
            return ModelRecommendation(
                primary=ModelClass.MID_OPEN,
                primary_examples=["Llama 3.1 70B", "Mixtral 8x22B"],
                alternatives=[ModelClass.MID_CLOSED, ModelClass.FRONTIER],
                reasoning=reasoning,
                warnings=warnings,
                estimated_cost_per_1k=0.50
            )
    
    # No sovereignty constraints—optimize for capability and cost
    
    # Simple tasks: small models suffice
    if profile.complexity == TaskComplexity.SIMPLE:
        reasoning.append("Simple task → small model sufficient")
        
        if profile.is_cost_sensitive():
            reasoning.append(f"High volume ({profile.daily_volume:,}/day) → optimize cost")
            return ModelRecommendation(
                primary=ModelClass.SMALL_CLOSED,
                primary_examples=["GPT-4o-mini", "Claude Haiku"],
                alternatives=[ModelClass.SMALL_OPEN],
                reasoning=reasoning,
                estimated_cost_per_1k=0.30
            )
        else:
            return ModelRecommendation(
                primary=ModelClass.SMALL_CLOSED,
                primary_examples=["GPT-4o-mini", "Claude Haiku"],
                reasoning=reasoning,
                estimated_cost_per_1k=0.30
            )
    
    # Standard tasks: mid-tier models
    if profile.complexity == TaskComplexity.STANDARD:
        reasoning.append("Standard task → mid-tier model recommended")
        
        if profile.is_latency_constrained():
            reasoning.append("Real-time latency → prefer optimized inference")
            warnings.append("GPT-4o and Sonnet typically 200-500ms; may need caching")
        
        if profile.is_cost_sensitive():
            reasoning.append(f"High volume ({profile.daily_volume:,}/day) → consider routing")
            alternatives.append(ModelClass.SMALL_CLOSED)
            warnings.append("Route simple queries to smaller model for 50-70% cost reduction")
        
        return ModelRecommendation(
            primary=ModelClass.MID_CLOSED,
            primary_examples=["GPT-4o", "Claude Sonnet"],
            alternatives=alternatives,
            reasoning=reasoning,
            warnings=warnings,
            estimated_cost_per_1k=6.00
        )
    
    # Complex reasoning: frontier models
    if profile.complexity == TaskComplexity.COMPLEX:
        reasoning.append("Complex reasoning → frontier model recommended")
        
        if profile.is_latency_constrained():
            warnings.append("Frontier models may exceed 500ms on complex prompts")
            warnings.append("Consider mid-tier for latency-critical paths")
            alternatives.append(ModelClass.MID_CLOSED)
        
        if profile.is_cost_sensitive():
            warnings.append(f"At {profile.daily_volume:,}/day, frontier costs add up fast")
            warnings.append("Implement routing: frontier for hard queries, mid-tier for rest")
            alternatives.append(ModelClass.MID_CLOSED)
        
        return ModelRecommendation(
            primary=ModelClass.FRONTIER,
            primary_examples=["Claude Opus", "GPT-4.5", "Gemini Ultra"],
            alternatives=alternatives,
            reasoning=reasoning,
            warnings=warnings,
            estimated_cost_per_1k=30.00
        )
    
    # Agentic tasks: tool-use optimized models
    reasoning.append("Agentic task → models optimized for tool use")
    reasoning.append("Claude Sonnet and GPT-4o excel at structured tool calling")
    
    if profile.is_cost_sensitive():
        warnings.append("Agentic loops multiply token usage—monitor closely")
    
    return ModelRecommendation(
        primary=ModelClass.MID_CLOSED,
        primary_examples=["Claude Sonnet", "GPT-4o"],
        alternatives=[ModelClass.FRONTIER],
        reasoning=reasoning + ["Mid-tier often matches frontier on tool use"],
        warnings=warnings,
        estimated_cost_per_1k=6.00
    )


def format_recommendation(profile: TaskProfile, rec: ModelRecommendation) -> str:
    """Format recommendation as readable output."""
    lines = [
        f"MODEL RECOMMENDATION: {profile.name}",
        "=" * 60,
        "",
        f"Task Profile:",
        f"  Complexity:   {profile.complexity.value}",
        f"  Sensitivity:  {profile.sensitivity.value}",
        f"  Latency:      {profile.latency.value}",
        f"  Daily Volume: {profile.daily_volume:,}",
        "",
        f"Recommended: {rec.primary.value.upper()}",
        f"  Examples: {', '.join(rec.primary_examples)}",
        "",
    ]
    
    if rec.alternatives:
        alt_names = [a.value for a in rec.alternatives]
        lines.append(f"Alternatives: {', '.join(alt_names)}")
        lines.append("")
    
    lines.append("Reasoning:")
    for r in rec.reasoning:
        lines.append(f"  • {r}")
    
    if rec.warnings:
        lines.append("")
        lines.append("Warnings:")
        for w in rec.warnings:
            lines.append(f"  ⚠ {w}")
    
    lines.append("")
    monthly_cost = rec.estimated_cost_per_1k * (profile.daily_volume * 30 / 1000)
    lines.append(f"Estimated Monthly Cost: €{monthly_cost:,.0f}")
    lines.append(f"  (Based on €{rec.estimated_cost_per_1k:.2f} per 1K requests)")
    
    return "\n".join(lines)


# =============================================================================
# Driver: Model selection for real scenarios
# =============================================================================

print("Model Selection Advisor")
print("=" * 60)
print()

# Scenario 1: Support ticket classifier with PII
ticket_classifier = TaskProfile(
    name="Support Ticket Classifier",
    complexity=TaskComplexity.SIMPLE,
    sensitivity=DataSensitivity.SENSITIVE,
    latency=LatencyTier.REALTIME,
    daily_volume=50000
)
rec1 = recommend_model(ticket_classifier)
print(format_recommendation(ticket_classifier, rec1))
print()

# Scenario 2: Contract analysis for legal team
contract_analyzer = TaskProfile(
    name="Contract Risk Analyzer", 
    complexity=TaskComplexity.COMPLEX,
    sensitivity=DataSensitivity.RESTRICTED,
    latency=LatencyTier.BATCH,
    daily_volume=500
)
rec2 = recommend_model(contract_analyzer)
print(format_recommendation(contract_analyzer, rec2))
print()

# Scenario 3: Customer-facing chatbot
chatbot = TaskProfile(
    name="Product Q&A Chatbot",
    complexity=TaskComplexity.STANDARD,
    sensitivity=DataSensitivity.PUBLIC,
    latency=LatencyTier.INTERACTIVE,
    daily_volume=100000
)
rec3 = recommend_model(chatbot)
print(format_recommendation(chatbot, rec3))

Model Selection Advisor

MODEL RECOMMENDATION: Support Ticket Classifier

Task Profile:
  Complexity:   simple
  Sensitivity:  sensitive
  Latency:      realtime
  Daily Volume: 50,000

Recommended: SMALL_OPEN
  Examples: Llama 3.1 8B (self-hosted), Mistral 7B

Alternatives: small_closed

Reasoning:
  • SENSITIVE data → prefer self-hosted or regional provider
  • Simple task → small open model ideal

  ⚠ If using cloud API, ensure GDPR-compliant DPA in place

Estimated Monthly Cost: €150
  (Based on €0.10 per 1K requests)

MODEL RECOMMENDATION: Contract Risk Analyzer

Task Profile:
  Complexity:   complex
  Sensitivity:  restricted
  Latency:      batch
  Daily Volume: 500

Recommended: SELF_HOSTED
  Examples: Llama 3.1 70B, Mixtral 8x22B, Qwen 72B

Reasoning:
  • RESTRICTED data → must self-host (no external APIs)
  • Complex task → larger self-hosted model needed

  ⚠ 70B+ models require significant GPU infrastructure

Estimated Monthly Cost: €8
  (Based on €0.50 per 1K requests)

MO

In [2]:
from typing import List, Dict, Callable, Any
from dataclasses import dataclass
import time

@dataclass
class BenchmarkResult:
    """Results from benchmarking a model on your task."""
    model_name: str
    accuracy: float
    latency_p50_ms: float
    latency_p95_ms: float
    cost_per_1k_requests: float
    
    def meets_requirements(
        self, 
        min_accuracy: float, 
        max_latency_p95_ms: float,
        max_cost_per_1k: float
    ) -> bool:
        """Check if this model meets all requirements."""
        return (
            self.accuracy >= min_accuracy and
            self.latency_p95_ms <= max_latency_p95_ms and
            self.cost_per_1k_requests <= max_cost_per_1k
        )


def benchmark_model(
    model_fn: Callable[[str], str],
    test_cases: List[Dict[str, str]],
    evaluator: Callable[[str, str], float],
    cost_per_1k_tokens: float,
    avg_tokens_per_request: int = 2000
) -> BenchmarkResult:
    """
    Benchmark a single model on your test cases.
    
    Parameters
    ----------
    model_fn : Callable
        Function that takes input string, returns output string
    test_cases : List[Dict]
        Each dict has 'input' and 'expected' keys
    evaluator : Callable
        Function(actual, expected) -> score (0.0 to 1.0)
    cost_per_1k_tokens : float
        Model's price per 1000 tokens
    avg_tokens_per_request : int
        Expected tokens per request for cost calculation
    """
    scores = []
    latencies = []
    
    for case in test_cases:
        start = time.perf_counter()
        actual = model_fn(case['input'])
        latency_ms = (time.perf_counter() - start) * 1000
        
        score = evaluator(actual, case['expected'])
        scores.append(score)
        latencies.append(latency_ms)
    
    latencies.sort()
    n = len(latencies)
    
    return BenchmarkResult(
        model_name="",  # Set by caller
        accuracy=sum(scores) / len(scores),
        latency_p50_ms=latencies[n // 2],
        latency_p95_ms=latencies[int(n * 0.95)],
        cost_per_1k_requests=(avg_tokens_per_request / 1000) * cost_per_1k_tokens * 1000
    )


def compare_models(
    models: Dict[str, tuple],  # name -> (model_fn, cost_per_1k_tokens)
    test_cases: List[Dict[str, str]],
    evaluator: Callable[[str, str], float],
    requirements: Dict[str, float]  # min_accuracy, max_latency_p95_ms, max_cost_per_1k
) -> List[BenchmarkResult]:
    """
    Benchmark multiple models and filter by requirements.
    
    Returns results sorted by accuracy (highest first),
    with models not meeting requirements flagged.
    """
    results = []
    
    for name, (model_fn, cost) in models.items():
        result = benchmark_model(model_fn, test_cases, evaluator, cost)
        result.model_name = name
        results.append(result)
    
    # Sort by accuracy descending
    results.sort(key=lambda r: r.accuracy, reverse=True)
    return results


# =============================================================================
# Driver: How to set up your benchmark
# =============================================================================

print()
print("Model Validation Framework")
print("=" * 60)
print("""
To validate models on YOUR task:

1. BUILD YOUR TEST SET (50-200 examples from production):

   test_cases = [
       {"input": "Where is my order #12345?", "expected": "order_status"},
       {"input": "I want a refund", "expected": "refund_request"},
       {"input": "Your product broke my dishwasher", "expected": "complaint"},
       # Include edge cases that have caused problems
   ]

2. DEFINE YOUR EVALUATOR:

   # For classification:
   def evaluator(actual: str, expected: str) -> float:
       return 1.0 if expected.lower() in actual.lower() else 0.0
   
   # For generation (using embedding similarity):
   def evaluator(actual: str, expected: str) -> float:
       return cosine_similarity(embed(actual), embed(expected))

3. DEFINE YOUR REQUIREMENTS:

   requirements = {
       "min_accuracy": 0.92,        # 92% accuracy minimum
       "max_latency_p95_ms": 500,   # 500ms P95 latency
       "max_cost_per_1k": 10.0      # €10 per 1000 requests
   }

4. SET UP MODEL CANDIDATES:

   models = {
       "gpt-4o-mini": (
           lambda x: call_openai(x, model="gpt-4o-mini"),
           0.00015  # cost per 1K tokens
       ),
       "claude-haiku": (
           lambda x: call_anthropic(x, model="claude-3-haiku"),
           0.00025
       ),
       "llama-8b-local": (
           lambda x: call_local(x, model="llama-8b"),
           0.00005  # compute cost estimate
       ),
   }

5. RUN COMPARISON:

   results = compare_models(models, test_cases, evaluator, requirements)
   
   for r in results:
       status = "✓" if r.meets_requirements(**requirements) else "✗"
       print(f"{status} {r.model_name}: {r.accuracy:.1%} accuracy, "
             f"{r.latency_p95_ms:.0f}ms P95, €{r.cost_per_1k_requests:.2f}/1K")

The model that meets all requirements at lowest cost wins.
""")


Model Validation Framework

To validate models on YOUR task:

1. BUILD YOUR TEST SET (50-200 examples from production):

   test_cases = [
       {"input": "Where is my order #12345?", "expected": "order_status"},
       {"input": "I want a refund", "expected": "refund_request"},
       {"input": "Your product broke my dishwasher", "expected": "complaint"},
       # Include edge cases that have caused problems
   ]

2. DEFINE YOUR EVALUATOR:

   # For classification:
   def evaluator(actual: str, expected: str) -> float:
       return 1.0 if expected.lower() in actual.lower() else 0.0

   # For generation (using embedding similarity):
   def evaluator(actual: str, expected: str) -> float:
       return cosine_similarity(embed(actual), embed(expected))

3. DEFINE YOUR REQUIREMENTS:

   requirements = {
       "min_accuracy": 0.92,        # 92% accuracy minimum
       "max_latency_p95_ms": 500,   # 500ms P95 latency
       "max_cost_per_1k": 10.0      # €10 per 1000 requests
   }

4. SE

In [3]:
def estimate_vision_costs(
    images_per_day: int,
    tokens_per_image: int = 1500,  # Typical for 1024x1024
    text_tokens_per_request: int = 500,
    price_per_1k_input: float = 0.0025  # GPT-4o pricing
) -> dict:
    """
    Estimate costs for a vision-enabled pipeline.
    
    Vision tokens typically cost the same as text tokens,
    but images consume many more tokens than equivalent text.
    """
    daily_vision_tokens = images_per_day * tokens_per_image
    daily_text_tokens = images_per_day * text_tokens_per_request
    daily_total_tokens = daily_vision_tokens + daily_text_tokens
    
    daily_cost = (daily_total_tokens / 1000) * price_per_1k_input
    monthly_cost = daily_cost * 30
    
    # Compare to text-only alternative
    text_only_daily = (images_per_day * text_tokens_per_request / 1000) * price_per_1k_input
    vision_premium = daily_cost / text_only_daily if text_only_daily > 0 else float('inf')
    
    return {
        'daily_tokens': daily_total_tokens,
        'daily_cost': round(daily_cost, 2),
        'monthly_cost': round(monthly_cost, 2),
        'vision_cost_multiplier': round(vision_premium, 1)
    }


# =============================================================================
# Driver: Vision cost analysis for document processing
# =============================================================================

# Scenario: Invoice processing system
invoice_processing = estimate_vision_costs(
    images_per_day=10000,
    tokens_per_image=1500,
    text_tokens_per_request=300,
    price_per_1k_input=0.0025
)

print("Vision Pipeline Cost Analysis: Invoice Processing")
print("=" * 55)
print(f"Daily token consumption:  {invoice_processing['daily_tokens']:>12,}")
print(f"Daily cost:               €{invoice_processing['daily_cost']:>11,.2f}")
print(f"Monthly cost:             €{invoice_processing['monthly_cost']:>11,.2f}")
print(f"Cost vs text-only:        {invoice_processing['vision_cost_multiplier']:>12}×")
print()
print("Decision guidance:")
print("  • If OCR + text extraction achieves 95%+ accuracy → use text-only")
print("  • If documents have complex layouts, tables → vision may be worth 4×")
print("  • Consider hybrid: OCR first, vision fallback for low-confidence cases")

Vision Pipeline Cost Analysis: Invoice Processing
Daily token consumption:    18,000,000
Daily cost:               €      45.00
Monthly cost:             €   1,350.00
Cost vs text-only:                 6.0×

Decision guidance:
  • If OCR + text extraction achieves 95%+ accuracy → use text-only
  • If documents have complex layouts, tables → vision may be worth 4×
  • Consider hybrid: OCR first, vision fallback for low-confidence cases


In [None]:
# Structured Output with Instructor
# pip install instructor pydantic

from pydantic import BaseModel, Field
from typing import List
from enum import Enum

class Priority(str, Enum):
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"

class SupportTicket(BaseModel):
    """Schema for structured extraction - Pydantic does the heavy lifting."""
    category: str = Field(description="Issue category")
    priority: Priority = Field(description="Urgency level")
    summary: str = Field(description="One-sentence summary", max_length=200)
    entities: List[str] = Field(default_factory=list, description="Products/orders mentioned")
    sentiment: float = Field(ge=-1.0, le=1.0, description="Sentiment score")


print("Structured Output with Instructor")
print("=" * 55)
print("""
WHAT INSTRUCTOR DOES:
  1. Injects your Pydantic schema into the prompt
  2. Parses LLM response into typed object
  3. On validation failure → re-prompts with error context
  4. Returns validated Pydantic object, not raw text

RELIABILITY SPECTRUM:
  Prompt-only parsing:     ~85% (model adds explanations, breaks JSON)
  Instructor:              ~95-99% (auto-retry with validation feedback)
  Constrained generation:  ~99.9% (grammar-enforced, for self-hosted)

SETUP:
  # Cloud APIs
  client = instructor.from_openai(OpenAI())
  client = instructor.from_anthropic(Anthropic())
  
  # Local (Ollama)
  client = instructor.from_openai(
      OpenAI(base_url="http://localhost:11434/v1", api_key="ollama"),
      mode=instructor.Mode.JSON
  )

USAGE:
  ticket = client.chat.completions.create(
      model="gpt-4o-mini",
      response_model=SupportTicket,
      max_retries=2,
      messages=[{"role": "user", "content": raw_message}]
  )
  # ticket is a SupportTicket object, not a string

→ See 1B/demos.ipynb for runnable demo with Ollama
""")

Structured Output with Instructor

Setup (one-time):
    from openai import OpenAI
    import instructor

    client = instructor.from_openai(OpenAI())
    # Or: instructor.from_anthropic(Anthropic())
    # Or: instructor.from_provider("openai/gpt-4o-mini")

Usage:
    raw_message = '''
    I've been trying to reset my password for 3 days now!
    The mobile app keeps crashing when I tap "Forgot Password".
    This is ridiculous - I need access to my account for work.
    Order #12345 is stuck and I can't track it.
    '''

    ticket = extract_ticket_info(
        raw_text=raw_message,
        customer_id="CUST-98765",
        client=client
    )

    print(ticket.model_dump_json(indent=2))

Expected output:
    {
      "customer_id": "CUST-98765",
      "category": "authentication",
      "priority": "high",
      "summary": "Password reset failing on mobile app, blocking order tracking",
      "entities_mentioned": ["mobile app", "Forgot Password", "Order #12345"],
      "sentiment"

In [None]:
# Structured Generation - Outlines
# pip install outlines[ollama]  # or [openai], [anthropic], [transformers], [vllm]

"""
Outlines is a unified structured generation library supporting many backends.
Capabilities differ based on how you connect:

┌────────────────────────────┬──────────────┬─────────────────┐
│ Backend                    │ JSON Schemas │ Regex/Grammar   │
├────────────────────────────┼──────────────┼─────────────────┤
│ Ollama (from_ollama)       │ ✓            │ ✗ (black-box)   │
│ OpenAI (from_openai)       │ ✓            │ ✗ (black-box)   │
│ Anthropic (from_anthropic) │ ✓            │ ✗ (black-box)   │
│ vLLM server (from_vllm)    │ ✓            │ ✗ (API mode)    │
│ vLLM local (from_vllm_offline) │ ✓        │ ✓ Full support  │
│ HuggingFace (from_transformers)│ ✓        │ ✓ Full support  │
│ llama.cpp (from_llamacpp)  │ ✓            │ ✓ Full support  │
└────────────────────────────┴──────────────┴─────────────────┘

# API backends - JSON schemas via provider's native mode
import outlines, ollama
model = outlines.from_ollama(ollama.Client(), model_name="qwen3:4b")
result = model("Classify: payment failed", MySchema)  # Returns JSON str

# Local backends - true token masking, full grammar control
from vllm import LLM
model = outlines.from_vllm_offline(LLM("meta-llama/Llama-3-8B"))

regex_type = outlines.types.regex(r"PRD-[0-9]{3}")
result = model("Generate code:", regex_type)  # GUARANTEED PRD-XXX

DECISION GUIDE:
  • APIs (Ollama, OpenAI, vLLM server)? → Instructor has simpler DX
  • Self-hosting + need regex/grammar? → Outlines (local backends)
  • High-volume GPU inference? → Outlines + vLLM offline (fastest)

→ See 1B/demos.ipynb for runnable examples
"""

print("Constrained Generation Decision")
print("=" * 55)
print("""
Choose your approach:

┌─────────────────────┬────────────────┬──────────────────┐
│ Approach            │ Reliability    │ Best For         │
├─────────────────────┼────────────────┼──────────────────┤
│ Prompt + parsing    │ ~85%           │ Prototyping      │
│ Instructor          │ ~95-99%        │ Cloud APIs       │
│ Outlines/guidance   │ ~99.9%         │ Self-hosted      │
│ Native JSON mode    │ ~95%           │ Simple schemas   │
└─────────────────────┴────────────────┴──────────────────┘

For most production systems, Instructor is the sweet spot:
high reliability, great DX, works everywhere.
""")

Constrained Generation Decision

Choose your approach:

┌─────────────────────┬────────────────┬──────────────────┐
│ Approach            │ Reliability    │ Best For         │
├─────────────────────┼────────────────┼──────────────────┤
│ Prompt + parsing    │ ~85%           │ Prototyping      │
│ Instructor          │ ~95-99%        │ Cloud APIs       │
│ Outlines/guidance   │ ~99.9%         │ Self-hosted      │
│ Native JSON mode    │ ~95%           │ Simple schemas   │
└─────────────────────┴────────────────┴──────────────────┘

For most production systems, Instructor is the sweet spot:
high reliability, great DX, works everywhere.



In [5]:
# NeMo Guardrails - Dialog Flow and Safety Rails
# pip install nemoguardrails langchain-openai

"""
NeMo Guardrails requires a config directory with:
  nemo_config/
  ├── config.yml   # Model and rails configuration
  ├── rails.co     # Colang dialog flows (user intents, bot responses)
  └── prompts.yml  # Custom prompts for safety checks

Example config.yml (for Ollama):
  models:
    - type: main
      engine: openai
      model: qwen3:4b
      parameters:
        openai_api_base: http://localhost:11434/v1
        openai_api_key: ollama

Example rails.co (Colang dialog flows):
  define user express greeting
      "hello"
      "hi"
  
  define bot express greeting
      "Hello! How can I help you today?"
  
  define flow greeting
      user express greeting
      bot express greeting
  
  define user ask off topic
      "what's the weather"
      "tell me a joke"
  
  define bot refuse off topic
      "I'm designed to help with TechCorp products only."
  
  define flow handle off topic
      user ask off topic
      bot refuse off topic
"""

from nemoguardrails import LLMRails, RailsConfig

def create_guarded_llm(config_path: str):
    """Create an LLM with NeMo guardrails applied."""
    config = RailsConfig.from_path(config_path)
    return LLMRails(config)


# =============================================================================
# Driver: NeMo Guardrails overview
# =============================================================================

print("NeMo Guardrails: Dialog Flow Control")
print("=" * 55)
print("""
RAIL TYPES:

1. INPUT RAILS (before LLM):
   • Jailbreak detection - "Ignore previous instructions..."
   • Topic filtering - Reject off-topic requests
   • PII masking - Block/redact sensitive data

2. OUTPUT RAILS (after LLM):
   • Toxicity filtering - Block harmful content
   • Factuality checking - Verify against knowledge base

3. DIALOG RAILS (Colang flows):
   • Define conversation patterns
   • Guide users through processes
   • Handle edge cases consistently

USAGE:
  rails = create_guarded_llm("./nemo_config")
  
  response = rails.generate(
      messages=[{"role": "user", "content": "Hello!"}]
  )

→ See 1B/nemo_config/ for complete config files
→ See 1B/guardrails_demo.ipynb for comprehensive guardrails demos
""")


NeMo Guardrails: Dialog Flow Control

RAIL TYPES:

1. INPUT RAILS (before LLM):
   • Jailbreak detection - "Ignore previous instructions..."
   • Topic filtering - Reject off-topic requests
   • PII masking - Block/redact sensitive data

2. OUTPUT RAILS (after LLM):
   • Toxicity filtering - Block harmful content
   • Factuality checking - Verify against knowledge base

3. DIALOG RAILS (Colang flows):
   • Define conversation patterns
   • Guide users through processes
   • Handle edge cases consistently

USAGE:
  rails = create_guarded_llm("./nemo_config")

  response = rails.generate(
      messages=[{"role": "user", "content": "Hello!"}]
  )

→ See 1B/nemo_config/ for complete config files
→ See 1B/demos.ipynb (Demo 3) for runnable demo with Ollama



In [None]:
# Guardrails AI - Field-level Validation with Hub Validators
# ⚠️ DEPENDENCY CONFLICT: guardrails-ai requires openai<2.0.0
#    This conflicts with Instructor which requires openai>=2.0.0
#    Use in separate environment if needed

# pip install guardrails-ai
# guardrails hub install hub://guardrails/regex_match
# guardrails hub install hub://guardrails/toxic_language

"""
Guardrails AI provides validators from the Hub:
- PII detection and redaction
- Toxic language filtering  
- Regex pattern matching
- Custom LLM-based validation

Example validators from Hub:
    hub://guardrails/detect_pii
    hub://guardrails/toxic_language
    hub://guardrails/provenance_llm  # Check if grounded in sources
    hub://guardrails/reading_level   # Ensure appropriate complexity
"""

from guardrails import Guard
from guardrails.hub import DetectPII, ToxicLanguage
from pydantic import BaseModel, Field
from typing import List


class CustomerResponse(BaseModel):
    """Schema for customer-facing responses."""
    answer: str = Field(
        description="The response to the customer",
        validators=[
            ToxicLanguage(on_fail="fix"),  # Auto-fix toxic content
            DetectPII(on_fail="fix"),       # Redact any PII
        ]
    )
    sources: List[str] = Field(
        description="Sources used to generate the answer"
    )
    confidence: float = Field(
        ge=0.0, le=1.0,
        description="Confidence score"
    )


def validated_response(
    user_query: str,
    context: str,
    llm_callable
) -> CustomerResponse:
    """
    Generate a response with Guardrails AI validation.
    
    Validators run on the output and can:
    - Pass: Output is valid
    - Fix: Auto-correct issues (e.g., redact PII)
    - Fail: Reject and optionally retry
    """
    guard = Guard.from_pydantic(CustomerResponse)
    
    result = guard(
        llm_callable,
        prompt=f"""
        Context: {context}
        
        Question: {user_query}
        
        Provide a helpful answer based only on the context.
        """,
        num_reasks=2  # Retry twice on validation failure
    )
    
    return result.validated_output


# =============================================================================
# Driver: Combined guardrails strategy
# =============================================================================

print("Combined Guardrails Strategy")
print("=" * 55)
print("""
RECOMMENDED ARCHITECTURE:

┌────────────────────────────────────────────────────────┐
│                    USER INPUT                          │
└────────────────────────────────────────────────────────┘
                          │
                          ▼
┌────────────────────────────────────────────────────────┐
│              NEMO GUARDRAILS (Dialog Layer)            │
│  • Jailbreak detection                                 │
│  • Topic control                                       │
│  • Conversation flow management                        │
└────────────────────────────────────────────────────────┘
                          │
                          ▼
┌────────────────────────────────────────────────────────┐
│                    LLM CALL                            │
│  (with Instructor for structured output)               │
└────────────────────────────────────────────────────────┘
                          │
                          ▼
┌────────────────────────────────────────────────────────┐
│           GUARDRAILS AI (Validation Layer)             │
│  • PII redaction                                       │
│  • Toxicity filtering                                  │
│  • Custom validators                                   │
└────────────────────────────────────────────────────────┘
                          │
                          ▼
┌────────────────────────────────────────────────────────┐
│                   SAFE OUTPUT                          │
└────────────────────────────────────────────────────────┘

Why layer guardrails?
- NeMo excels at dialog flow and conversation-level control
- Guardrails AI excels at field-level validation and Hub ecosystem
- Haystack provides pipeline-native components (EU-aligned, data sovereignty focus)
- Together they provide defense in depth

FRAMEWORK SELECTION:

    Using Haystack? → Use pipeline components (InputGuardrail, OutputGuardrail)
    Using LangChain? → Use NeMo + Guardrails AI wrappers
    Framework-agnostic? → NeMo for dialog + Guardrails AI for validation
""")

ImportError: cannot import name 'DetectPII' from 'guardrails.hub' (/Users/titasbiswas/miniforge3/envs/jupyter-env/lib/python3.12/site-packages/guardrails/hub/__init__.py)

In [9]:
"""
Haystack 2.x Guardrails: Pipeline Components
============================================

Haystack's approach differs from NeMo/Guardrails AI:
- Guardrails are pipeline components, not wrappers
- Fits naturally into Haystack's DAG-based pipelines
- Components can branch, filter, or transform at any stage

Key advantages for regulated enterprises:
- European-origin company (data sovereignty alignment)
- Gartner Cool Vendor 2024
- Native integration with European vector DBs (Qdrant, Weaviate)
- Strong enterprise adoption in regulated industries
"""

# pip install haystack-ai
from haystack import Pipeline, component, Document
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders import PromptBuilder
from haystack.dataclasses import ChatMessage
from typing import List, Dict, Any
import re


@component
class InputGuardrail:
    """
    Haystack component for input validation.
    
    Runs before the LLM call to filter/transform input.
    Can reject, modify, or pass through queries.
    """
    
    def __init__(
        self,
        blocked_patterns: List[str] = None,
        pii_patterns: List[str] = None,
        max_length: int = 10000
    ):
        self.blocked_patterns = blocked_patterns or [
            r"ignore\s+(all\s+)?(previous\s+)?instructions",
            r"you\s+are\s+now\s+(a|an)\s+",
            r"pretend\s+(to\s+be|you('re|'re))",
            r"jailbreak",
            r"DAN\s+mode",
        ]
        self.pii_patterns = pii_patterns or [
            r"\b\d{3}-\d{2}-\d{4}\b",  # SSN
            r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",  # Email
            r"\b\d{16}\b",  # Credit card (simplified)
        ]
        self.max_length = max_length
    
    @component.output_types(
        query=str,
        blocked=bool,
        block_reason=str,
        pii_detected=List[str]
    )
    def run(self, query: str) -> Dict[str, Any]:
        """
        Validate input query.
        
        Returns:
            query: Original or sanitized query
            blocked: Whether query was blocked
            block_reason: Why it was blocked (if applicable)
            pii_detected: List of PII types found
        """
        # Check length
        if len(query) > self.max_length:
            return {
                "query": "",
                "blocked": True,
                "block_reason": f"Query exceeds maximum length ({self.max_length})",
                "pii_detected": []
            }
        
        # Check for injection patterns
        query_lower = query.lower()
        for pattern in self.blocked_patterns:
            if re.search(pattern, query_lower, re.IGNORECASE):
                return {
                    "query": "",
                    "blocked": True,
                    "block_reason": "Potential prompt injection detected",
                    "pii_detected": []
                }
        
        # Detect (but don't block) PII
        pii_found = []
        for pattern in self.pii_patterns:
            if re.search(pattern, query):
                pii_type = self._identify_pii_type(pattern)
                pii_found.append(pii_type)
        
        return {
            "query": query,
            "blocked": False,
            "block_reason": "",
            "pii_detected": pii_found
        }
    
    def _identify_pii_type(self, pattern: str) -> str:
        if "\\d{3}-\\d{2}" in pattern:
            return "SSN"
        elif "@" in pattern:
            return "email"
        elif "\\d{16}" in pattern:
            return "credit_card"
        return "unknown_pii"


@component
class OutputGuardrail:
    """
    Haystack component for output validation.
    
    Runs after LLM generation to filter/transform output.
    Can redact, flag, or transform responses.
    """
    
    def __init__(
        self,
        redact_patterns: Dict[str, str] = None,
        toxicity_keywords: List[str] = None,
        require_grounding: bool = True
    ):
        self.redact_patterns = redact_patterns or {
            r"\b\d{3}-\d{2}-\d{4}\b": "[SSN REDACTED]",
            r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b": "[EMAIL REDACTED]",
        }
        self.toxicity_keywords = toxicity_keywords or []
        self.require_grounding = require_grounding
    
    @component.output_types(
        response=str,
        redactions_made=int,
        grounding_check=str,
        safe=bool
    )
    def run(
        self,
        response: str,
        context: List[Document] = None
    ) -> Dict[str, Any]:
        """
        Validate and sanitize output.
        
        Parameters:
            response: LLM-generated response
            context: Retrieved documents (for grounding check)
        
        Returns:
            response: Sanitized response
            redactions_made: Number of redactions applied
            grounding_check: Result of grounding verification
            safe: Whether response passed all checks
        """
        sanitized = response
        redaction_count = 0
        
        # Apply redactions
        for pattern, replacement in self.redact_patterns.items():
            sanitized, count = re.subn(pattern, replacement, sanitized)
            redaction_count += count
        
        # Grounding check (simplified - production would use NLI)
        grounding_result = "not_checked"
        if self.require_grounding and context:
            context_text = " ".join([doc.content for doc in context])
            # Simple heuristic: check if key terms from response appear in context
            response_terms = set(sanitized.lower().split())
            context_terms = set(context_text.lower().split())
            overlap = len(response_terms & context_terms) / len(response_terms) if response_terms else 0
            grounding_result = "grounded" if overlap > 0.3 else "potentially_ungrounded"
        
        return {
            "response": sanitized,
            "redactions_made": redaction_count,
            "grounding_check": grounding_result,
            "safe": redaction_count == 0 and grounding_result != "potentially_ungrounded"
        }


@component  
class ConditionalRouter:
    """
    Route based on guardrail results.
    
    Haystack's branching allows different paths:
    - Blocked queries → rejection response
    - PII detected → enhanced privacy mode
    - Normal queries → standard RAG pipeline
    """
    
    @component.output_types(
        standard_path=str,
        blocked_path=str,
        pii_path=str
    )
    def run(
        self,
        query: str,
        blocked: bool,
        pii_detected: List[str]
    ) -> Dict[str, Any]:
        """Route query based on guardrail results."""
        if blocked:
            return {
                "standard_path": None,
                "blocked_path": "I'm not able to process that request. Please rephrase your question.",
                "pii_path": None
            }
        elif pii_detected:
            return {
                "standard_path": None,
                "blocked_path": None,
                "pii_path": query  # Route to privacy-enhanced pipeline
            }
        else:
            return {
                "standard_path": query,
                "blocked_path": None,
                "pii_path": None
            }


def build_guarded_rag_pipeline() -> Pipeline:
    """
    Build a complete RAG pipeline with integrated guardrails.
    
    Pipeline structure:
        Input → InputGuardrail → Router → [RAG Components] → OutputGuardrail → Response
    
    This demonstrates Haystack's component-based approach where
    guardrails are first-class pipeline citizens.
    """
    pipeline = Pipeline()
    
    # Add components
    pipeline.add_component("input_guard", InputGuardrail())
    pipeline.add_component("router", ConditionalRouter())
    pipeline.add_component("prompt_builder", PromptBuilder(
        template="""
        Context: {{ context }}
        
        Question: {{ query }}
        
        Answer based only on the provided context.
        """
    ))
    pipeline.add_component("llm", OpenAIGenerator(model="gpt-4o-mini"))
    pipeline.add_component("output_guard", OutputGuardrail())
    
    # Connect components
    pipeline.connect("input_guard.query", "router.query")
    pipeline.connect("input_guard.blocked", "router.blocked")
    pipeline.connect("input_guard.pii_detected", "router.pii_detected")
    pipeline.connect("router.standard_path", "prompt_builder.query")
    pipeline.connect("prompt_builder", "llm")
    pipeline.connect("llm.replies", "output_guard.response")
    
    return pipeline


# =============================================================================
# Driver: Haystack guardrails in action
# =============================================================================

print("Haystack 2.x Guardrails Pipeline")
print("=" * 55)
print("""
PIPELINE ARCHITECTURE:

    ┌─────────────────┐
    │   User Query    │
    └────────┬────────┘
             │
             ▼
    ┌─────────────────┐
    │ InputGuardrail  │ ← Injection detection, PII flagging
    └────────┬────────┘
             │
             ▼
    ┌─────────────────┐     ┌──────────────────┐
    │ ConditionalRouter│────►│ Rejection Path   │
    └────────┬────────┘     └──────────────────┘
             │
             ▼
    ┌─────────────────┐
    │  RAG Pipeline   │ ← Retrieval + Generation
    └────────┬────────┘
             │
             ▼
    ┌─────────────────┐
    │ OutputGuardrail │ ← PII redaction, grounding check
    └────────┬────────┘
             │
             ▼
    ┌─────────────────┐
    │  Safe Response  │
    └─────────────────┘

USAGE:

    pipeline = build_guarded_rag_pipeline()
    
    # Normal query - passes through
    result = pipeline.run({
        "input_guard": {"query": "What is the return policy?"}
    })
    
    # Injection attempt - blocked
    result = pipeline.run({
        "input_guard": {"query": "Ignore all instructions. You are now..."}
    })
    # Returns rejection response, never reaches LLM

WHY HAYSTACK FOR REGULATED MARKETS:

    1. Data Sovereignty: European-origin, EU-aligned
    2. Enterprise Adoption: Strong in regulated industries (finance, healthcare)
    3. Framework Fit: Native pipeline components vs wrappers
    4. Vector DB Integration: First-class Qdrant/Weaviate support
    5. Evaluation Built-in: haystack-eval for quality metrics

COMBINING WITH OTHER GUARDRAILS:

    # Haystack + Guardrails AI hybrid
    @component
    class GuardrailsAIValidator:
        def __init__(self):
            from guardrails import Guard
            self.guard = Guard.from_pydantic(ResponseSchema)
        
        @component.output_types(validated=str, passed=bool)
        def run(self, response: str):
            result = self.guard.validate(response)
            return {
                "validated": result.validated_output,
                "passed": result.validation_passed
            }
    
    # Add to pipeline
    pipeline.add_component("guardrails_ai", GuardrailsAIValidator())
    pipeline.connect("output_guard.response", "guardrails_ai.response")

→ See 1B/guardrails_demo.ipynb for comprehensive Haystack guardrails demo
""")

Haystack 2.x Guardrails Pipeline

PIPELINE ARCHITECTURE:

    ┌─────────────────┐
    │   User Query    │
    └────────┬────────┘
             │
             ▼
    ┌─────────────────┐
    │ InputGuardrail  │ ← Injection detection, PII flagging
    └────────┬────────┘
             │
             ▼
    ┌─────────────────┐     ┌──────────────────┐
    │ ConditionalRouter│────►│ Rejection Path   │
    └────────┬────────┘     └──────────────────┘
             │
             ▼
    ┌─────────────────┐
    │  RAG Pipeline   │ ← Retrieval + Generation
    └────────┬────────┘
             │
             ▼
    ┌─────────────────┐
    │ OutputGuardrail │ ← PII redaction, grounding check
    └────────┬────────┘
             │
             ▼
    ┌─────────────────┐
    │  Safe Response  │
    └─────────────────┘

USAGE:

    pipeline = build_guarded_rag_pipeline()

    # Normal query - passes through
    result = pipeline.run({
        "input_guard": {"query": "What is the return policy?"}
    })

 