# Input Guardrails in Pydantic AI with asyncio

In the previous lesson we implemented input guardrails using tools.

This approach is simple but has two drawbacks.

First, it runs sequentially and adds to the execution time.

Second, the agent must decide to invoke the guardrail tool, which is not guaranteed.

## Guardrails with asyncio

We can implement parallel execution of guardrails using asyncio. This is the same approach that OpenAI Agents SDK uses internally.

The guardrail runs in parallel with the agent. If the guardrail detects a problem, it cancels the agent immediately.

## Understanding Parallel Execution

Let's start by running two async functions in parallel.

In [None]:
import asyncio

async def agent():
    print('[agent] start')
    for i in range(10):
        await asyncio.sleep(1)
        print('[agent]', i)
    print('[agent] end')
    return 1


We use asyncio.sleep instead of time.sleep: it's non-blocking and yields control to other tasks. The time.sleep function would block the entire event loop.

## Creating the Guardrail Function

Define the output model for the guardrail result.

In [None]:
from dataclasses import dataclass

@dataclass
class GuardrailFunctionOutput:
    output_info: str
    tripwire_triggered: bool


Create a custom exception for guardrail failures.

In [None]:
class GuardrailException(Exception):
    def __init__(self, message: str, info: GuardrailFunctionOutput):
        super().__init__(message)
        self.info = info

Implement a guardrail function that passes validation.

In [None]:
async def guardrail():
    print('[guardrail] start')
    await asyncio.sleep(1.5)
    print('[guardrail] stop')

    info = GuardrailFunctionOutput(
        output_info='check passed',
        tripwire_triggered=False
    )
        
    return 0


Now implement a guardrail that detects a violation.

In [None]:
async def guardrail_fail():
    print('[guardrail] start')
    await asyncio.sleep(1.5)
    print('[guardrail] stop')

    info = GuardrailFunctionOutput(
        output_info='check failed',
        tripwire_triggered=True
    )

    raise GuardrailException('check failed', info)


## Running Agent and Guardrail in Parallel

Create tasks for both the agent and guardrail.

Use asyncio.gather to run them concurrently.

In [None]:
agent_task = asyncio.create_task(agent())
guardrail_task = asyncio.create_task(guardrail())

try:
    await asyncio.gather(agent_task, guardrail_task)
    result = await agent_task
    print(result)
except GuardrailException as e:
    print(e.info)
    agent_task.cancel()
    try:
        await agent_task
    except asyncio.CancelledError:
        print("[main] Agent cancelled")


When a guardrail raises an exception, we cancel the agent task.

We then await the cancelled task to handle the CancelledError properly. This ensures all resources are cleaned up correctly.

## Creating a Reusable Function

Wrap the logic in a reusable function that handles multiple guardrails.

In [None]:
async def run_with_guardrails(agent_coroutine, guardrails):
    """
    Run `agent_coroutine` while multiple guardrails monitor it.

    Parameters:
        agent_coroutine: an *awaitable*, e.g. agent()
        guardrails: an iterable of *awaitables*, e.g. [guard1(), guard2()]

    Returns:
        The result of the agent, if no guardrail triggers.

    Raises:
        GuardrailException from any guardrail.
    """

    agent_task = asyncio.create_task(agent_coroutine)
    guard_tasks = [asyncio.create_task(g) for g in guardrails]

    try:
        # If any guardrail raises GuardrailException,
        # gather will throw and we drop into except.
        await asyncio.gather(agent_task, *guard_tasks)

        # Agent finished successfully.
        return agent_task.result()

    except GuardrailException as e:
        # At least one guardrail fired.
        print("[guardrail fired]", e.info)

        # Cancel the agent.
        agent_task.cancel()
        try:
            await agent_task
        except asyncio.CancelledError:
            print("[run_with_guardrails] agent cancelled")

        # Cancel all guardrails (they may still be running).
        for t in guard_tasks:
            t.cancel()
        await asyncio.gather(*guard_tasks, return_exceptions=True)

        raise


## Testing the Implementation

Run the agent with a single guardrail.

In [None]:
result = await run_with_guardrails(
    agent(),
    [guardrail()]
)

## Using with PydanticAI Agent

Create a run function for the PydanticAI agent.

This function wraps the agent execution in an async coroutine.

In [None]:
async def run(agent, user_input: str):
    # Implementation that runs the agent with the user input
    ...

Import the necessary modules and create the agent.

In [None]:
import search_agent
import ver3

agent = search_agent.create_agent()


Run the agent with multiple guardrails.

In [None]:
result = await run_with_guardrails(
    ver3.run(agent, 'llm as a judge'),
    [guardrail(), guardrail_fail()]
)

The guardrails now run in parallel with the agent. If any guardrail detects a problem, the agent stops immediately.

This approach is more efficient than the tool-based guardrail, but more complex to implement.