# Structured Output with Pydantic Models

This notebook demonstrates how to get structured, validated data from agent responses using Pydantic models instead of free-form text.

## Key Concepts
- **Pydantic Models**: Define expected data structure
- **Type Validation**: Automatic validation and error checking
- **Structured Responses**: Guaranteed data format for downstream processing
- **Database Integration**: Easy integration with databases and APIs

## Benefits
- **Guaranteed Format**: Consistent output regardless of model variations
- **Type Safety**: Validation prevents data errors
- **Easy Processing**: Direct use in applications without parsing
- **Documentation**: Self-documenting data structures

## Prerequisites

Make sure you have the required packages installed:

```bash
pip install langchain langchain-community langchain-core langgraph pydantic
ollama pull qwen3
ollama serve
```

In [None]:
# Import required modules
from pydantic import BaseModel, Field, validator
from typing import List, Optional, Union
from datetime import datetime
from langchain_ollama import ChatOllama
from langchain.agents import create_agent
import tools

## Basic Structured Output Example

Let's start with a simple contact information extraction example:

In [None]:
print("=== Basic Structured Output Example ===")

# Define the structure we want the agent to return
class ContactInfo(BaseModel):
    """Contact information structure."""
    name: str = Field(description="Full name of the person")
    email: str = Field(description="Email address")
    phone: str = Field(description="Phone number")
    company: str = Field(default="Unknown", description="Company name (optional)")
    
    @validator('email')
    def validate_email(cls, v):
        """Basic email validation."""
        if '@' not in v:
            raise ValueError('Invalid email format')
        return v

# Display the model structure
print("âœ“ ContactInfo model defined with fields:")
for field_name, field_info in ContactInfo.__fields__.items():
    required = "(required)" if field_info.is_required() else "(optional)"
    print(f"  - {field_name}: {field_info.type_} {required}")
    if field_info.field_info.description:
        print(f"    Description: {field_info.field_info.description}")

## Creating Agent with Structured Output

Note: In this demo, we'll show the concept. Full structured output integration depends on your LangChain version and model support.

In [None]:
# Create model and agent
model = ChatOllama(model="qwen3")

# Create agent with structured output requirement (conceptual)
# Note: Actual implementation may vary based on LangChain version
agent = create_agent(
    model,
    tools=[tools.extract_contact],
    # response_format=ContactInfo  # This would force structured output in some implementations
)

print("âœ“ Agent created for contact information extraction")
print("  The agent will extract contact information and format it consistently")

## Testing Contact Information Extraction

In [None]:
# Test with contact information
contact_text = "Extract contact info from: John Doe, john@example.com, (555) 123-4567, works at TechCorp"

print(f"Input text: {contact_text}")
print("\n=== Agent Processing ===")

try:
    result = agent.invoke({
        "messages": contact_text
    })
    
    print(f"âœ“ Agent response: {result['messages'][-1].content}")
    
    # In a real implementation, you would extract structured data here
    print("\n=== Simulated Structured Output ===")
    
    # Simulate extracting structured data
    structured_data = ContactInfo(
        name="John Doe",
        email="john@example.com",
        phone="(555) 123-4567",
        company="TechCorp"
    )
    
    print(f"Structured data: {structured_data}")
    print(f"\nAs JSON: {structured_data.json(indent=2)}")
    print(f"\nAs dict: {structured_data.dict()}")
    
except Exception as e:
    print(f"Error: {e}")

print("\nðŸ’¡ In production, you'd get a ContactInfo object with guaranteed fields")
print("   This enables direct database insertion, API calls, etc.")

## Advanced Structured Models

Let's create more complex models for different use cases:

In [None]:
print("=== Advanced Structured Models ===")

# Research Paper Analysis Model
class ResearchPaper(BaseModel):
    """Structure for research paper analysis."""
    title: str = Field(description="Paper title")
    authors: List[str] = Field(description="List of author names")
    abstract: str = Field(description="Paper abstract")
    keywords: List[str] = Field(description="Key topics and terms")
    methodology: Optional[str] = Field(None, description="Research methodology used")
    findings: List[str] = Field(description="Key findings and results")
    publication_year: Optional[int] = Field(None, description="Year of publication")
    
    @validator('publication_year')
    def validate_year(cls, v):
        if v is not None and (v < 1900 or v > datetime.now().year):
            raise ValueError('Invalid publication year')
        return v

# Business Analysis Model
class BusinessAnalysis(BaseModel):
    """Structure for business analysis results."""
    company_name: str = Field(description="Company name")
    industry: str = Field(description="Industry sector")
    strengths: List[str] = Field(description="Company strengths")
    weaknesses: List[str] = Field(description="Areas for improvement")
    opportunities: List[str] = Field(description="Market opportunities")
    threats: List[str] = Field(description="Potential threats")
    financial_metrics: Optional[dict] = Field(None, description="Key financial indicators")
    recommendation: str = Field(description="Overall recommendation")
    confidence_score: float = Field(ge=0.0, le=1.0, description="Analysis confidence (0-1)")

# Technical Documentation Model
class TechnicalDoc(BaseModel):
    """Structure for technical documentation."""
    title: str = Field(description="Documentation title")
    overview: str = Field(description="High-level overview")
    requirements: List[str] = Field(description="System requirements")
    installation_steps: List[str] = Field(description="Step-by-step installation")
    api_endpoints: Optional[List[dict]] = Field(None, description="API endpoint details")
    examples: List[dict] = Field(description="Code examples")
    troubleshooting: List[dict] = Field(description="Common issues and solutions")
    last_updated: datetime = Field(default_factory=datetime.now, description="Last update timestamp")

print("âœ“ Advanced models defined:")
print("  - ResearchPaper: For academic paper analysis")
print("  - BusinessAnalysis: For SWOT analysis and business insights")
print("  - TechnicalDoc: For software documentation generation")

## Demonstrating Model Usage

Let's show how these models work with sample data:

In [None]:
print("=== Model Usage Examples ===")

# Example 1: Research Paper
print("\n--- Research Paper Example ---")
try:
    paper = ResearchPaper(
        title="Deep Learning for Natural Language Processing",
        authors=["Alice Smith", "Bob Johnson", "Carol Wilson"],
        abstract="This paper explores the application of deep learning techniques...",
        keywords=["deep learning", "NLP", "neural networks", "transformers"],
        methodology="Experimental study using transformer architectures",
        findings=[
            "Transformer models outperform RNNs on most NLP tasks",
            "Attention mechanisms are crucial for long-range dependencies",
            "Pre-training on large corpora improves downstream performance"
        ],
        publication_year=2023
    )
    
    print(f"âœ“ Research paper created: {paper.title}")
    print(f"  Authors: {', '.join(paper.authors)}")
    print(f"  Keywords: {', '.join(paper.keywords)}")
    print(f"  Findings: {len(paper.findings)} key results")
    
except Exception as e:
    print(f"âœ— Error creating research paper: {e}")

# Example 2: Business Analysis
print("\n--- Business Analysis Example ---")
try:
    analysis = BusinessAnalysis(
        company_name="TechStart Inc.",
        industry="Software Technology",
        strengths=[
            "Strong technical team",
            "Innovative product",
            "Growing market demand"
        ],
        weaknesses=[
            "Limited marketing budget",
            "Small customer base"
        ],
        opportunities=[
            "Expanding to international markets",
            "Partnership with larger companies"
        ],
        threats=[
            "Competition from established players",
            "Economic downturn"
        ],
        financial_metrics={
            "revenue_growth": "150% YoY",
            "burn_rate": "$50k/month",
            "runway": "18 months"
        },
        recommendation="Focus on customer acquisition and seek Series A funding",
        confidence_score=0.85
    )
    
    print(f"âœ“ Business analysis created for: {analysis.company_name}")
    print(f"  Industry: {analysis.industry}")
    print(f"  Confidence: {analysis.confidence_score:.1%}")
    print(f"  Recommendation: {analysis.recommendation}")
    
except Exception as e:
    print(f"âœ— Error creating business analysis: {e}")

# Example 3: Technical Documentation
print("\n--- Technical Documentation Example ---")
try:
    tech_doc = TechnicalDoc(
        title="API Gateway Setup Guide",
        overview="This guide covers the setup and configuration of our API gateway service",
        requirements=[
            "Docker 20.10+",
            "Node.js 16+",
            "PostgreSQL 13+"
        ],
        installation_steps=[
            "Clone the repository",
            "Install dependencies with npm install",
            "Configure environment variables",
            "Run docker-compose up"
        ],
        api_endpoints=[
            {"method": "GET", "path": "/api/health", "description": "Health check endpoint"},
            {"method": "POST", "path": "/api/auth", "description": "Authentication endpoint"}
        ],
        examples=[
            {
                "language": "curl",
                "code": "curl -X GET http://localhost:3000/api/health",
                "description": "Check API health"
            }
        ],
        troubleshooting=[
            {
                "issue": "Port already in use",
                "solution": "Change port in docker-compose.yml or stop conflicting service"
            }
        ]
    )
    
    print(f"âœ“ Technical documentation created: {tech_doc.title}")
    print(f"  Requirements: {len(tech_doc.requirements)} items")
    print(f"  Installation steps: {len(tech_doc.installation_steps)} steps")
    print(f"  API endpoints: {len(tech_doc.api_endpoints or [])} endpoints")
    
except Exception as e:
    print(f"âœ— Error creating technical documentation: {e}")

## Data Validation and Error Handling

Let's demonstrate Pydantic's validation capabilities:

In [None]:
print("=== Data Validation Examples ===")

# Test validation with invalid data
print("\n--- Testing Email Validation ---")
try:
    # This should fail due to invalid email
    invalid_contact = ContactInfo(
        name="Jane Doe",
        email="invalid-email",  # No @ symbol
        phone="555-1234"
    )
    print("âœ— Validation should have failed!")
except Exception as e:
    print(f"âœ“ Validation correctly failed: {e}")

# Test with valid email
try:
    valid_contact = ContactInfo(
        name="Jane Doe",
        email="jane@example.com",
        phone="555-1234"
    )
    print(f"âœ“ Valid contact created: {valid_contact.name}")
except Exception as e:
    print(f"âœ— Unexpected error: {e}")

# Test confidence score validation
print("\n--- Testing Confidence Score Validation ---")
try:
    # This should fail due to confidence score > 1.0
    invalid_analysis = BusinessAnalysis(
        company_name="Test Corp",
        industry="Testing",
        strengths=["Good tests"],
        weaknesses=["Needs improvement"],
        opportunities=["Market expansion"],
        threats=["Competition"],
        recommendation="Test more",
        confidence_score=1.5  # Invalid: > 1.0
    )
    print("âœ— Validation should have failed!")
except Exception as e:
    print(f"âœ“ Validation correctly failed: {e}")

# Test publication year validation
print("\n--- Testing Publication Year Validation ---")
try:
    # This should fail due to invalid year
    invalid_paper = ResearchPaper(
        title="Time Travel Research",
        authors=["Future Scientist"],
        abstract="Research from the future",
        keywords=["time", "travel"],
        findings=["Time travel is possible"],
        publication_year=2050  # Invalid: future year
    )
    print("âœ— Validation should have failed!")
except Exception as e:
    print(f"âœ“ Validation correctly failed: {e}")

## Practical Integration Patterns

Let's show how to integrate structured output with real applications:

In [None]:
print("=== Practical Integration Patterns ===")

# Pattern 1: Database Integration
class DatabaseIntegration:
    """Simulate database operations with structured data."""
    
    def __init__(self):
        self.contacts = []  # Simulate database table
        self.papers = []
        self.analyses = []
    
    def save_contact(self, contact: ContactInfo) -> str:
        """Save contact to database."""
        contact_dict = contact.dict()
        contact_dict['id'] = len(self.contacts) + 1
        self.contacts.append(contact_dict)
        return f"Contact saved with ID: {contact_dict['id']}"
    
    def save_paper(self, paper: ResearchPaper) -> str:
        """Save research paper to database."""
        paper_dict = paper.dict()
        paper_dict['id'] = len(self.papers) + 1
        self.papers.append(paper_dict)
        return f"Paper saved with ID: {paper_dict['id']}"
    
    def get_statistics(self) -> dict:
        """Get database statistics."""
        return {
            "contacts": len(self.contacts),
            "papers": len(self.papers),
            "analyses": len(self.analyses)
        }

# Pattern 2: API Response Format
class APIResponse(BaseModel):
    """Standard API response format."""
    success: bool = Field(description="Operation success status")
    message: str = Field(description="Response message")
    data: Optional[Union[ContactInfo, ResearchPaper, BusinessAnalysis]] = Field(None, description="Response data")
    errors: Optional[List[str]] = Field(None, description="Error messages")
    timestamp: datetime = Field(default_factory=datetime.now, description="Response timestamp")

# Pattern 3: Batch Processing
class BatchProcessor:
    """Process multiple structured objects."""
    
    def process_contacts(self, contacts: List[ContactInfo]) -> dict:
        """Process a batch of contacts."""
        processed = 0
        errors = []
        
        for i, contact in enumerate(contacts):
            try:
                # Simulate processing
                if contact.email and '@' in contact.email:
                    processed += 1
                else:
                    errors.append(f"Contact {i+1}: Invalid email")
            except Exception as e:
                errors.append(f"Contact {i+1}: {str(e)}")
        
        return {
            "total": len(contacts),
            "processed": processed,
            "errors": errors
        }

print("âœ“ Integration patterns defined:")
print("  - DatabaseIntegration: Save structured data to database")
print("  - APIResponse: Standard API response format")
print("  - BatchProcessor: Handle multiple structured objects")

## Testing Integration Patterns

In [None]:
print("=== Testing Integration Patterns ===")

# Test database integration
print("\n--- Database Integration Test ---")
db = DatabaseIntegration()

# Create and save some contacts
contacts = [
    ContactInfo(name="Alice Smith", email="alice@example.com", phone="555-0001", company="TechCorp"),
    ContactInfo(name="Bob Jones", email="bob@example.com", phone="555-0002"),
    ContactInfo(name="Carol Wilson", email="carol@research.edu", phone="555-0003", company="University")
]

for contact in contacts:
    result = db.save_contact(contact)
    print(f"  {result}")

print(f"\nDatabase statistics: {db.get_statistics()}")

# Test API response format
print("\n--- API Response Format Test ---")
try:
    # Success response
    success_response = APIResponse(
        success=True,
        message="Contact retrieved successfully",
        data=contacts[0]
    )
    print(f"âœ“ Success response created")
    print(f"  Message: {success_response.message}")
    print(f"  Data type: {type(success_response.data).__name__}")
    
    # Error response
    error_response = APIResponse(
        success=False,
        message="Validation failed",
        errors=["Invalid email format", "Missing required field"]
    )
    print(f"\nâœ“ Error response created")
    print(f"  Errors: {len(error_response.errors)} items")
    
except Exception as e:
    print(f"âœ— Error creating API response: {e}")

# Test batch processing
print("\n--- Batch Processing Test ---")
processor = BatchProcessor()

# Mix of valid and invalid contacts
batch_contacts = [
    ContactInfo(name="Valid User", email="valid@example.com", phone="555-1111"),
    ContactInfo(name="Another Valid", email="another@example.com", phone="555-2222"),
]

batch_result = processor.process_contacts(batch_contacts)
print(f"âœ“ Batch processing completed:")
print(f"  Total: {batch_result['total']}")
print(f"  Processed: {batch_result['processed']}")
print(f"  Errors: {len(batch_result['errors'])}")

## Advanced Pydantic Features

Let's explore some advanced Pydantic features useful for agent outputs:

In [None]:
print("=== Advanced Pydantic Features ===")

from enum import Enum
from typing import Dict, Any
from pydantic import root_validator, constr, conint, conlist

# Enums for controlled values
class Priority(str, Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class Status(str, Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    CANCELLED = "cancelled"

# Advanced model with constraints and validation
class TaskAnalysis(BaseModel):
    """Advanced task analysis with constraints."""
    
    # Constrained strings
    title: constr(min_length=5, max_length=100) = Field(description="Task title")
    description: constr(min_length=10) = Field(description="Detailed description")
    
    # Enums for controlled values
    priority: Priority = Field(description="Task priority level")
    status: Status = Field(default=Status.PENDING, description="Current status")
    
    # Constrained numbers
    estimated_hours: conint(ge=1, le=1000) = Field(description="Estimated hours (1-1000)")
    completion_percentage: conint(ge=0, le=100) = Field(default=0, description="Completion (0-100%)")
    
    # Constrained lists
    tags: conlist(str, min_items=1, max_items=10) = Field(description="1-10 tags")
    dependencies: Optional[List[str]] = Field(default=[], description="Task dependencies")
    
    # Complex nested data
    metadata: Optional[Dict[str, Any]] = Field(default={}, description="Additional metadata")
    
    # Custom validation
    @root_validator
    def validate_status_completion(cls, values):
        """Ensure completion percentage matches status."""
        status = values.get('status')
        completion = values.get('completion_percentage', 0)
        
        if status == Status.COMPLETED and completion < 100:
            raise ValueError('Completed tasks must have 100% completion')
        
        if status == Status.PENDING and completion > 0:
            raise ValueError('Pending tasks should have 0% completion')
        
        return values
    
    @validator('tags')
    def validate_tags(cls, v):
        """Ensure tags are lowercase and unique."""
        cleaned_tags = [tag.lower().strip() for tag in v]
        if len(cleaned_tags) != len(set(cleaned_tags)):
            raise ValueError('Tags must be unique')
        return cleaned_tags

print("âœ“ Advanced TaskAnalysis model defined with:")
print("  - Constrained string lengths")
print("  - Enum-based controlled values")
print("  - Numeric constraints")
print("  - List size constraints")
print("  - Custom cross-field validation")
print("  - Tag normalization and uniqueness")

## Testing Advanced Features

In [None]:
print("=== Testing Advanced Features ===")

# Test valid task
print("\n--- Valid Task Creation ---")
try:
    valid_task = TaskAnalysis(
        title="Implement user authentication",
        description="Create a secure authentication system with JWT tokens and password hashing",
        priority=Priority.HIGH,
        status=Status.IN_PROGRESS,
        estimated_hours=25,
        completion_percentage=60,
        tags=["authentication", "security", "backend", "jwt"],
        dependencies=["database-setup", "user-model"],
        metadata={
            "assigned_to": "senior-dev",
            "sprint": "2024-Q1",
            "complexity": "medium"
        }
    )
    
    print(f"âœ“ Valid task created: {valid_task.title}")
    print(f"  Priority: {valid_task.priority.value}")
    print(f"  Status: {valid_task.status.value}")
    print(f"  Progress: {valid_task.completion_percentage}%")
    print(f"  Tags: {', '.join(valid_task.tags)}")
    
except Exception as e:
    print(f"âœ— Unexpected error: {e}")

# Test validation errors
print("\n--- Testing Validation Errors ---")

# Test title too short
try:
    TaskAnalysis(
        title="Hi",  # Too short
        description="This should fail due to short title",
        priority=Priority.LOW,
        estimated_hours=5,
        tags=["test"]
    )
    print("âœ— Should have failed: title too short")
except Exception as e:
    print(f"âœ“ Correctly failed: {e}")

# Test status/completion mismatch
try:
    TaskAnalysis(
        title="Test completed task",
        description="This should fail due to completion mismatch",
        priority=Priority.LOW,
        status=Status.COMPLETED,
        completion_percentage=50,  # Should be 100 for completed
        estimated_hours=5,
        tags=["test"]
    )
    print("âœ— Should have failed: completion percentage mismatch")
except Exception as e:
    print(f"âœ“ Correctly failed: {e}")

# Test duplicate tags
try:
    TaskAnalysis(
        title="Test duplicate tags",
        description="This should fail due to duplicate tags",
        priority=Priority.LOW,
        estimated_hours=5,
        tags=["test", "TEST", "test"]  # Duplicates after normalization
    )
    print("âœ— Should have failed: duplicate tags")
except Exception as e:
    print(f"âœ“ Correctly failed: {e}")

# Test tag normalization (should work)
try:
    normalized_task = TaskAnalysis(
        title="Test tag normalization",
        description="This should work with tag normalization",
        priority=Priority.LOW,
        estimated_hours=5,
        tags=["  Frontend  ", "REACT", "Javascript"]  # Will be normalized
    )
    print(f"âœ“ Tag normalization works: {normalized_task.tags}")
except Exception as e:
    print(f"âœ— Unexpected error: {e}")

## Export and Serialization

Pydantic models can be easily exported to various formats:

In [None]:
print("=== Export and Serialization ===")

# Create a sample task for export
sample_task = TaskAnalysis(
    title="Export demonstration task",
    description="This task demonstrates various export formats available in Pydantic",
    priority=Priority.MEDIUM,
    status=Status.IN_PROGRESS,
    estimated_hours=8,
    completion_percentage=25,
    tags=["demo", "export", "pydantic"],
    dependencies=["setup-environment"],
    metadata={
        "created_by": "demo_system",
        "project": "structured_output_demo"
    }
)

print("\n--- JSON Export ---")
json_output = sample_task.json(indent=2)
print(json_output)

print("\n--- Dictionary Export ---")
dict_output = sample_task.dict()
print(f"Dictionary with {len(dict_output)} fields")
for key, value in dict_output.items():
    print(f"  {key}: {type(value).__name__} = {value}")

print("\n--- Selective Export ---")
# Export only specific fields
summary_dict = sample_task.dict(include={'title', 'priority', 'status', 'completion_percentage'})
print(f"Summary: {summary_dict}")

# Exclude sensitive data
public_dict = sample_task.dict(exclude={'metadata'})
print(f"\nPublic data (excluding metadata): {len(public_dict)} fields")

print("\n--- Schema Export ---")
# Get the JSON schema
schema = TaskAnalysis.schema()
print(f"Schema has {len(schema['properties'])} properties")
print("Required fields:", schema.get('required', []))

# Show just the properties
print("\nField definitions:")
for field_name, field_schema in schema['properties'].items():
    field_type = field_schema.get('type', 'object')
    description = field_schema.get('description', 'No description')
    print(f"  {field_name}: {field_type} - {description}")

## Best Practices Summary

### Model Design Principles

1. **Clear Field Names**: Use descriptive, unambiguous field names
2. **Comprehensive Descriptions**: Add Field descriptions for documentation
3. **Appropriate Defaults**: Provide sensible default values
4. **Type Constraints**: Use constrained types (constr, conint, etc.)
5. **Validation Logic**: Add custom validators for business rules

### Validation Best Practices

1. **Input Sanitization**: Clean and normalize input data
2. **Cross-Field Validation**: Use root_validator for complex rules
3. **Meaningful Errors**: Provide clear error messages
4. **Performance**: Keep validation logic efficient
5. **Security**: Validate against injection attacks

### Integration Patterns

1. **Database Models**: Map Pydantic models to database schemas
2. **API Responses**: Use structured responses for consistency
3. **Batch Processing**: Handle multiple objects efficiently
4. **Error Handling**: Graceful degradation with validation errors
5. **Documentation**: Auto-generate API docs from schemas

### Production Considerations

1. **Performance**: Profile validation overhead
2. **Memory Usage**: Be mindful of large object graphs
3. **Backward Compatibility**: Plan for schema evolution
4. **Testing**: Comprehensive validation testing
5. **Monitoring**: Track validation errors in production

## Use Cases for Structured Output

### Data Extraction
- **Contact Information**: Extract from emails, documents
- **Financial Data**: Parse financial statements, reports
- **Research Papers**: Analyze and structure academic content
- **Legal Documents**: Extract key terms, clauses

### Content Generation
- **Documentation**: Generate structured technical docs
- **Reports**: Create consistent business reports
- **Analysis**: Produce standardized analysis results
- **Summaries**: Generate structured content summaries

### System Integration
- **Database Population**: Direct insertion of structured data
- **API Responses**: Consistent API output formats
- **Message Queues**: Structured message passing
- **Configuration**: Generate system configuration files

## Conclusion

Structured output with Pydantic provides:
- **Data Quality**: Validation ensures clean, consistent data
- **Type Safety**: Catch errors early in development
- **Integration**: Easy connection to databases and APIs
- **Documentation**: Self-documenting data structures
- **Maintainability**: Clear contracts between system components

This approach transforms agent responses from unpredictable text into reliable, structured data that can power robust applications.