# Pydantic AI Tutorial: Building Type-Safe Agents

**Recommended for: Insurance teams new to agent frameworks**

This tutorial will teach you how to build AI agents using Pydantic AI, a framework that prioritizes type safety and structured outputs. By the end, you'll understand:

1. What Pydantic AI is and why it's great for insurance
2. How to define structured outputs with Pydantic models
3. How to create tools that agents can call
4. How to chain multiple agents together
5. How to integrate with DSPy for prompt optimization
6. How to track experiments with MLFlow

---

## Why Pydantic AI for Insurance?

Insurance data is structured: Policy IDs, claim amounts, dates, status codes. When an LLM processes this data, you need guarantees that:

- Policy IDs match your format (e.g., `POL-123456`)
- Amounts are valid numbers
- Dates are parseable
- Status values are from your allowed list

Pydantic AI enforces these constraints automatically. If the LLM returns invalid data, you get an error instead of silent corruption.

## 1. Installation & Setup

In [None]:
# Install required packages
# Uncomment and run if not already installed

# !pip install pydantic-ai httpx python-dotenv beautifulsoup4

In [None]:
# Import required libraries
import os
import json
from dataclasses import dataclass
from datetime import date
from enum import Enum
from typing import Optional

import httpx
from pydantic import BaseModel, Field
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

print("Libraries imported successfully!")

In [None]:
# Verify API key is set
api_key = os.getenv("OPENAI_API_KEY")
if api_key:
    print(f"API key found: {api_key[:8]}...")
else:
    print("WARNING: No API key found. Create a .env file with OPENAI_API_KEY=your-key")

## 2. Core Concepts: Structured Outputs with Pydantic

Before we build agents, let's understand Pydantic models. These define the **shape** of data the LLM must return.

In [None]:
# Example 1: Simple Pydantic model for weather data

class WeatherObservation(BaseModel):
    """Weather observation from BOM."""
    thunderstorms: str = Field(description="'Observed' or 'No reports or observations'")
    strong_wind: str = Field(description="'Observed' or 'No reports or observations'")

# This model enforces that any weather data MUST have these two fields
# Let's test it:

# Valid data - works fine
valid_weather = WeatherObservation(
    thunderstorms="Observed",
    strong_wind="No reports or observations"
)
print(f"Valid weather: {valid_weather}")

# Invalid data - missing field - raises error
try:
    invalid_weather = WeatherObservation(thunderstorms="Observed")  # Missing strong_wind!
except Exception as e:
    print(f"\nValidation error (expected): {type(e).__name__}")

In [None]:
# Example 2: Insurance-specific models with validation

class CATStatus(str, Enum):
    """Catastrophic event status."""
    CONFIRMED = "CONFIRMED"
    POSSIBLE = "POSSIBLE"
    NOT_CAT = "NOT_CAT"

class Decision(str, Enum):
    """Eligibility decision."""
    APPROVED = "APPROVED"
    REVIEW = "REVIEW"
    DENIED = "DENIED"

class ClaimEligibilityResult(BaseModel):
    """Structured output for CAT eligibility decisions."""
    
    # Pattern validation: must match POL-XXXXXX format
    policy_id: str = Field(pattern=r"^POL-\d{6}$", description="Policy ID in POL-XXXXXX format")
    
    # Enum validation: must be one of the allowed values
    cat_status: CATStatus = Field(description="Catastrophic event status")
    decision: Decision = Field(description="Eligibility decision")
    
    # Range validation: confidence must be 0.0 to 1.0
    confidence: float = Field(ge=0.0, le=1.0, description="Confidence score")
    
    reasoning: str = Field(description="Explanation of the decision")

# Test valid data
valid_result = ClaimEligibilityResult(
    policy_id="POL-123456",
    cat_status=CATStatus.CONFIRMED,
    decision=Decision.APPROVED,
    confidence=0.95,
    reasoning="Both thunderstorms and strong wind were observed."
)
print(f"Valid result: {valid_result.model_dump_json(indent=2)}")

In [None]:
# Test invalid data - see how Pydantic catches errors

test_cases = [
    {"name": "Invalid policy ID", "data": {"policy_id": "12345", "cat_status": "CONFIRMED", "decision": "APPROVED", "confidence": 0.9, "reasoning": "test"}},
    {"name": "Invalid confidence (>1)", "data": {"policy_id": "POL-123456", "cat_status": "CONFIRMED", "decision": "APPROVED", "confidence": 1.5, "reasoning": "test"}},
    {"name": "Invalid decision value", "data": {"policy_id": "POL-123456", "cat_status": "CONFIRMED", "decision": "MAYBE", "confidence": 0.9, "reasoning": "test"}},
]

for test in test_cases:
    try:
        ClaimEligibilityResult(**test["data"])
        print(f"{test['name']}: PASSED (unexpected)")
    except Exception as e:
        print(f"{test['name']}: Caught error (expected) - {str(e)[:80]}...")

## 3. Building the Weather Verification Agent

Now let's build a real agent that:
1. Takes a location and date
2. Geocodes the location to coordinates
3. Fetches weather data from BOM
4. Returns a structured result

In [None]:
from pydantic_ai import Agent, RunContext

# Define the structured output for weather verification
class WeatherVerificationResult(BaseModel):
    """Structured weather verification output."""
    location: str = Field(description="Verified address")
    latitude: float = Field(ge=-90, le=90, description="Latitude")
    longitude: float = Field(ge=-180, le=180, description="Longitude")
    date: str = Field(description="Date verified (YYYY-MM-DD)")
    thunderstorms: str = Field(description="Thunderstorm observation")
    strong_wind: str = Field(description="Strong wind observation")
    severe_weather_confirmed: bool = Field(description="True if BOTH weather types observed")

# Define dependencies - things the agent needs access to
@dataclass
class WeatherAgentDeps:
    """Dependencies for the weather agent."""
    http_client: httpx.AsyncClient

# Create the agent
weather_agent = Agent(
    'openai:gpt-4o-mini',
    deps_type=WeatherAgentDeps,
    output_type=WeatherVerificationResult,
    instructions="""You are a Weather Verification Agent for an insurance company.
    
Your job is to verify severe weather events for insurance claims.

STEPS:
1. Use the geocode tool to convert the address to coordinates
2. Use the fetch_weather tool to get BOM observations
3. Return a WeatherVerificationResult

IMPORTANT:
- Always use your tools - never make up data
- severe_weather_confirmed should be True ONLY if BOTH thunderstorms AND strong wind are "Observed"
"""
)

print("Weather agent created!")

In [None]:
# Define tools for the weather agent
from bs4 import BeautifulSoup

@weather_agent.tool
async def geocode(ctx: RunContext[WeatherAgentDeps], city: str, state: str, postcode: str) -> dict:
    """Convert an Australian address to latitude/longitude coordinates.
    
    Args:
        city: City name (e.g., "Brisbane")
        state: Australian state code (e.g., "QLD")
        postcode: Postcode (e.g., "4000")
    
    Returns:
        Dictionary with latitude, longitude, and display_name
    """
    print(f"  Tool: geocode({city}, {state}, {postcode})")
    
    query = f"{city}, {state}, {postcode}, Australia"
    response = await ctx.deps.http_client.get(
        "https://nominatim.openstreetmap.org/search",
        params={"q": query, "format": "json", "countrycodes": "au"},
        headers={"User-Agent": "InsuranceWeatherAgent/1.0"}
    )
    data = response.json()
    
    if data:
        result = {
            "latitude": float(data[0]["lat"]),
            "longitude": float(data[0]["lon"]),
            "display_name": data[0].get("display_name", "")
        }
    else:
        result = {"error": f"Location not found: {query}"}
    
    print(f"    Result: {result}")
    return result


@weather_agent.tool
async def fetch_weather(ctx: RunContext[WeatherAgentDeps], lat: float, lon: float, date: str, state: str) -> dict:
    """Fetch weather observations from Australian Bureau of Meteorology.
    
    Args:
        lat: Latitude
        lon: Longitude  
        date: Date in YYYY-MM-DD format
        state: Australian state code
    
    Returns:
        Dictionary with thunderstorms and strong_wind observations
    """
    print(f"  Tool: fetch_weather({lat}, {lon}, {date}, {state})")
    
    response = await ctx.deps.http_client.get(
        "https://reg.bom.gov.au/cgi-bin/climate/storms/get_storms.py",
        params={
            "lat": round(lat, 1),
            "lon": round(lon, 1),
            "date": date,
            "state": state,
            "unique_id": "pydantic_ai_tutorial"
        }
    )
    
    # Parse HTML response
    soup = BeautifulSoup(response.text, 'html.parser')
    thunderstorms = "No reports or observations"
    strong_wind = "No reports or observations"
    
    for row in soup.find_all('tr'):
        cells = row.find_all(['td', 'th'])
        if len(cells) >= 2:
            weather_type = cells[0].get_text(strip=True).lower()
            status = cells[1].get_text(strip=True)
            if 'thunderstorm' in weather_type:
                thunderstorms = status or "No reports or observations"
            elif 'wind' in weather_type:
                strong_wind = status or "No reports or observations"
    
    result = {"thunderstorms": thunderstorms, "strong_wind": strong_wind}
    print(f"    Result: {result}")
    return result

print("Tools defined!")

In [None]:
# Run the weather agent
import asyncio

async def verify_weather(city: str, state: str, postcode: str, date: str):
    """Run weather verification for a location and date."""
    async with httpx.AsyncClient(timeout=30.0) as client:
        deps = WeatherAgentDeps(http_client=client)
        
        result = await weather_agent.run(
            f"Verify weather for {city}, {state}, {postcode} on {date}",
            deps=deps
        )
        
        return result.output  # This is a WeatherVerificationResult object!

# Test with Brisbane
weather_result = await verify_weather("Brisbane", "QLD", "4000", "2025-03-07")

print("\n" + "="*60)
print("WEATHER VERIFICATION RESULT")
print("="*60)
print(f"Location: {weather_result.location}")
print(f"Coordinates: ({weather_result.latitude}, {weather_result.longitude})")
print(f"Date: {weather_result.date}")
print(f"Thunderstorms: {weather_result.thunderstorms}")
print(f"Strong Wind: {weather_result.strong_wind}")
print(f"Severe Weather Confirmed: {weather_result.severe_weather_confirmed}")

## 4. Building the Eligibility Agent

Now let's build a second agent that takes the weather verification and determines CAT eligibility.

In [None]:
# Define the eligibility output model
class EligibilityDecision(BaseModel):
    """Structured eligibility decision."""
    cat_event_status: str = Field(pattern="^(CONFIRMED|POSSIBLE|NOT_CAT)$", description="CAT event status")
    eligibility_decision: str = Field(pattern="^(APPROVED|REVIEW|DENIED)$", description="Final decision")
    confidence: float = Field(ge=0.0, le=1.0, description="Confidence score")
    reasoning: str = Field(description="Explanation of the decision")

# Create the eligibility agent (no tools - pure reasoning)
eligibility_agent = Agent(
    'openai:gpt-4o-mini',
    output_type=EligibilityDecision,
    instructions="""You are a Claims Eligibility Agent for an insurance company.

You evaluate weather verification reports and determine CAT (catastrophic event) eligibility.

BUSINESS RULES:
- BOTH thunderstorms AND strong wind "Observed" = CONFIRMED CAT → APPROVED
- Only ONE weather type "Observed" = POSSIBLE CAT → REVIEW  
- Neither "Observed" = NOT_CAT → DENIED

VALIDATION:
- Coordinates must be in Australia (-44 to -10 lat, 112 to 154 lon)
- If coordinates are outside Australia, set decision to DENIED with reasoning

Be precise and follow the rules exactly."""
)

print("Eligibility agent created!")

In [None]:
# Run the eligibility agent with the weather result

async def check_eligibility(weather: WeatherVerificationResult):
    """Check CAT eligibility based on weather verification."""
    
    # Format the weather result as input for the eligibility agent
    weather_report = f"""
Weather Verification Report:
- Location: {weather.location}
- Coordinates: ({weather.latitude}, {weather.longitude})
- Date: {weather.date}
- Thunderstorms: {weather.thunderstorms}
- Strong Wind: {weather.strong_wind}
- Severe Weather Confirmed: {weather.severe_weather_confirmed}
"""
    
    result = await eligibility_agent.run(
        f"Evaluate CAT eligibility for this claim:\n{weather_report}"
    )
    
    return result.output  # This is an EligibilityDecision object!

# Run eligibility check
eligibility_result = await check_eligibility(weather_result)

print("\n" + "="*60)
print("ELIGIBILITY DECISION")
print("="*60)
print(f"CAT Status: {eligibility_result.cat_event_status}")
print(f"Decision: {eligibility_result.eligibility_decision}")
print(f"Confidence: {eligibility_result.confidence}")
print(f"Reasoning: {eligibility_result.reasoning}")

## 5. Complete Pipeline: Weather → Eligibility

Let's put it all together into a single pipeline function.

In [None]:
async def process_cat_claim(city: str, state: str, postcode: str, date: str):
    """Complete CAT claim processing pipeline."""
    
    print(f"Processing CAT claim for {city}, {state}, {postcode} on {date}")
    print("="*60)
    
    # Step 1: Weather Verification
    print("\nStep 1: Weather Verification")
    print("-"*40)
    weather = await verify_weather(city, state, postcode, date)
    print(f"\nWeather Result:")
    print(f"  Thunderstorms: {weather.thunderstorms}")
    print(f"  Strong Wind: {weather.strong_wind}")
    
    # Step 2: Eligibility Check
    print("\nStep 2: Eligibility Check")
    print("-"*40)
    eligibility = await check_eligibility(weather)
    print(f"\nEligibility Result:")
    print(f"  Status: {eligibility.cat_event_status}")
    print(f"  Decision: {eligibility.eligibility_decision}")
    
    # Return combined result
    return {
        "weather": weather.model_dump(),
        "eligibility": eligibility.model_dump()
    }

# Test with different locations
result = await process_cat_claim("Brisbane", "QLD", "4000", "2025-03-07")

print("\n" + "="*60)
print("FINAL RESULT (JSON)")
print("="*60)
print(json.dumps(result, indent=2))

## 6. Integrating with DSPy for Prompt Optimization

DSPy can automatically improve your prompts based on evaluation data. Here's how to use DSPy-optimized prompts with Pydantic AI.

In [None]:
# Install DSPy if needed
# !pip install dspy

In [None]:
import dspy

# Configure DSPy with the same model
lm = dspy.LM(model="openai/gpt-4o-mini", max_tokens=1000)
dspy.configure(lm=lm)

print("DSPy configured!")

In [None]:
# Define a DSPy signature for eligibility classification
class EligibilityClassifier(dspy.Signature):
    """Classify insurance claims for CAT event eligibility.
    
    Business Rules:
    - BOTH thunderstorms AND strong wind Observed = APPROVED
    - Only ONE weather type Observed = REVIEW
    - Neither Observed = DENIED
    """
    thunderstorms: str = dspy.InputField(desc="Thunderstorm observation: 'Observed' or 'No reports'")
    strong_wind: str = dspy.InputField(desc="Strong wind observation: 'Observed' or 'No reports'")
    decision: str = dspy.OutputField(desc="APPROVED, REVIEW, or DENIED")

# Create a basic classifier
classifier = dspy.ChainOfThought(EligibilityClassifier)

# Test it
test_result = classifier(
    thunderstorms="Observed",
    strong_wind="Observed"
)
print(f"Test: Both observed → {test_result.decision}")

test_result2 = classifier(
    thunderstorms="Observed",
    strong_wind="No reports or observations"
)
print(f"Test: Only thunderstorms → {test_result2.decision}")

In [None]:
# Create training data for optimization
trainset = [
    dspy.Example(
        thunderstorms="Observed",
        strong_wind="Observed",
        decision="APPROVED"
    ).with_inputs("thunderstorms", "strong_wind"),
    dspy.Example(
        thunderstorms="Observed",
        strong_wind="No reports or observations",
        decision="REVIEW"
    ).with_inputs("thunderstorms", "strong_wind"),
    dspy.Example(
        thunderstorms="No reports or observations",
        strong_wind="Observed",
        decision="REVIEW"
    ).with_inputs("thunderstorms", "strong_wind"),
    dspy.Example(
        thunderstorms="No reports or observations",
        strong_wind="No reports or observations",
        decision="DENIED"
    ).with_inputs("thunderstorms", "strong_wind"),
]

# Evaluation metric
def accuracy_metric(example, prediction, trace=None):
    return prediction.decision.upper() == example.decision.upper()

# Evaluate baseline accuracy
correct = 0
for ex in trainset:
    pred = classifier(thunderstorms=ex.thunderstorms, strong_wind=ex.strong_wind)
    if pred.decision.upper() == ex.decision.upper():
        correct += 1
        
print(f"Baseline accuracy: {correct}/{len(trainset)} = {100*correct/len(trainset):.0f}%")

In [None]:
# Export DSPy optimized prompt for use in Pydantic AI
# (In production, you would run GEPA optimization first)

# The optimized instructions can be used in Pydantic AI:
optimized_instructions = """You are a Claims Eligibility Agent.

CLASSIFICATION RULES (from DSPy optimization):
1. If thunderstorms = "Observed" AND strong_wind = "Observed" → APPROVED
2. If thunderstorms = "Observed" XOR strong_wind = "Observed" → REVIEW
3. If neither is "Observed" → DENIED

Apply these rules precisely. Do not deviate."""

# Create an optimized Pydantic AI agent
optimized_eligibility_agent = Agent(
    'openai:gpt-4o-mini',
    output_type=EligibilityDecision,
    instructions=optimized_instructions  # Use DSPy-optimized prompt
)

print("Created optimized agent with DSPy-tuned instructions!")

## 7. Integrating with MLFlow for Experiment Tracking

MLFlow helps you track experiments, compare results, and manage model versions.

In [None]:
# Install MLFlow if needed
# !pip install mlflow

In [None]:
import mlflow

# Set up MLFlow experiment
mlflow.set_experiment("CAT-Claim-Processing")

print("MLFlow experiment set!")

In [None]:
# Track a claim processing run
async def process_claim_with_tracking(city: str, state: str, postcode: str, date: str):
    """Process a claim with MLFlow tracking."""
    
    with mlflow.start_run(run_name=f"{city}-{date}"):
        # Log parameters
        mlflow.log_param("city", city)
        mlflow.log_param("state", state)
        mlflow.log_param("postcode", postcode)
        mlflow.log_param("date", date)
        mlflow.log_param("model", "gpt-4o-mini")
        
        # Run weather verification
        weather = await verify_weather(city, state, postcode, date)
        
        # Log weather results
        mlflow.log_param("thunderstorms", weather.thunderstorms)
        mlflow.log_param("strong_wind", weather.strong_wind)
        mlflow.log_metric("latitude", weather.latitude)
        mlflow.log_metric("longitude", weather.longitude)
        
        # Run eligibility check
        eligibility = await check_eligibility(weather)
        
        # Log eligibility results
        mlflow.log_param("cat_status", eligibility.cat_event_status)
        mlflow.log_param("decision", eligibility.eligibility_decision)
        mlflow.log_metric("confidence", eligibility.confidence)
        
        # Log reasoning as artifact
        with open("reasoning.txt", "w") as f:
            f.write(eligibility.reasoning)
        mlflow.log_artifact("reasoning.txt")
        
        print(f"Run logged to MLFlow: {city} → {eligibility.eligibility_decision}")
        
        return {"weather": weather, "eligibility": eligibility}

# Process a claim with tracking
tracked_result = await process_claim_with_tracking("Brisbane", "QLD", "4000", "2025-03-07")

In [None]:
# Process multiple test cases
test_cases = [
    {"city": "Brisbane", "state": "QLD", "postcode": "4000", "date": "2025-03-07"},
    {"city": "Sydney", "state": "NSW", "postcode": "2000", "date": "2025-03-07"},
    {"city": "Perth", "state": "WA", "postcode": "6000", "date": "2025-01-15"},
]

print("Processing test cases with MLFlow tracking...")
print("="*60)

for case in test_cases:
    try:
        result = await process_claim_with_tracking(**case)
    except Exception as e:
        print(f"Error processing {case['city']}: {e}")

print("\nAll runs logged to MLFlow!")
print("Run 'mlflow ui' in terminal to view the experiment dashboard.")

## 8. Summary & Next Steps

In this tutorial, you learned:

1. **Pydantic Models**: Define structured outputs with validation
2. **Agents**: Create agents with `Agent()` and `output_type`
3. **Tools**: Add tools with `@agent.tool` decorator
4. **Dependencies**: Inject dependencies with `deps_type` and `RunContext`
5. **DSPy Integration**: Use DSPy to optimize prompts, export to Pydantic AI
6. **MLFlow Integration**: Track experiments, log metrics and artifacts

### Key Takeaways for Insurance Teams

- **Type safety matters**: Pydantic catches invalid data before it enters your system
- **Testability**: Dependency injection makes agents easy to unit test
- **Auditability**: Structured outputs are easy to log and audit
- **Optimization**: DSPy can improve prompts based on your data
- **Tracking**: MLFlow provides visibility into agent behavior

### Next Steps

1. Try different test cases with your own data
2. Add more validation rules to your Pydantic models
3. Run DSPy GEPA optimization with more training examples
4. Set up MLFlow server for team-wide experiment tracking