# Creating Custom Events for Run Iteration

AG2's `run_iter()` lets you yield specific event types using `yield_on`. While AG2 provides many built-in events (`TextEvent`, `ToolCallEvent`, etc.), you can also create **custom events** for your specific use cases.

Custom events are useful for:
- **Workflow checkpoints** - Pause at specific stages for validation
- **Progress tracking** - Report status from within tools
- **Custom gating** - Implement domain-specific pause conditions

````{=mdx}
:::info Requirements
Install AG2:

```bash
pip install ag2[openai]
```

For more information, please refer to the [installation guide](https://docs.ag2.ai/latest/docs/user-guide/basic-concepts/installing-ag2).
:::
````

## Creating a Custom Event

Custom events require three things:

1. **Inherit from `BaseEvent`** - The base class for all AG2 events
2. **Use the `@wrap_event` decorator** - Wraps the event for serialization
3. **Class name must end with `Event`** - Enforced by the decorator

Let's create a custom event for tracking data pipeline stages:

In [None]:
from collections.abc import Callable
from typing import Any

from autogen.events.base_event import BaseEvent, resolve_print_callable, wrap_event


@wrap_event
class PipelineStageEvent(BaseEvent):
    """Custom event emitted when a data pipeline stage completes."""

    stage_name: str
    records_processed: int
    validation_passed: bool

    def print(self, f: Callable[..., Any] | None = None) -> None:
        """Optional: Define how the event prints to console."""
        f = resolve_print_callable(f)
        status = "PASSED" if self.validation_passed else "FAILED"
        f(f"[Pipeline] {self.stage_name}: {self.records_processed} records - {status}", flush=True)

## Emitting Custom Events

To emit a custom event from within a tool or custom code, use `IOStream.get_default().send()`:

In [None]:
from autogen.io.base import IOStream


def emit_pipeline_event(stage: str, records: int, passed: bool) -> None:
    """Helper to emit a pipeline stage event."""
    IOStream.get_default().send(
        PipelineStageEvent(
            stage_name=stage,
            records_processed=records,
            validation_passed=passed,
        )
    )

## Example: Data Pipeline with Validation Gates

Let's build a realistic example: an AI agent that processes data through multiple pipeline stages. We'll use custom events to yield at each stage for human validation.

First, let's set up our LLM configuration:

In [None]:
import os

from dotenv import load_dotenv

from autogen import ConversableAgent

load_dotenv("../.env.local")

llm_config = {
    "config_list": [
        {
            "model": "gpt-4o-mini",
            "api_key": os.environ.get("OPENAI_API_KEY"),
        }
    ]
}

Now let's create a tool that simulates a data pipeline. The tool emits our custom `PipelineStageEvent` at each stage:

In [None]:
import random

from autogen.tools import tool


@tool(description="Process data through a multi-stage pipeline: extract -> transform -> validate -> load")
def run_data_pipeline(source_name: str, record_count: int) -> str:
    """Simulates a data pipeline with multiple stages."""
    results = []

    # Stage 1: Extract
    extracted = record_count
    emit_pipeline_event("extract", extracted, True)
    results.append(f"Extracted {extracted} records from {source_name}")

    # Stage 2: Transform
    transformed = int(extracted * 0.95)  # Some records filtered
    emit_pipeline_event("transform", transformed, True)
    results.append(f"Transformed {transformed} records (filtered invalid)")

    # Stage 3: Validate
    validation_passed = random.random() > 0.3  # 70% chance of passing
    emit_pipeline_event("validate", transformed, validation_passed)
    if not validation_passed:
        results.append("VALIDATION FAILED - data quality issues detected")
        return "\n".join(results)
    results.append(f"Validated {transformed} records - all checks passed")

    # Stage 4: Load
    emit_pipeline_event("load", transformed, True)
    results.append(f"Loaded {transformed} records to destination")

    return "\n".join(results)

Now let's create an agent with this tool and run it with `run_iter()`, yielding on our custom event:

In [None]:
from autogen.events.agent_events import InputRequestEvent, TerminationEvent, TextEvent, ToolCallEvent

# Create the data engineer agent
data_engineer = ConversableAgent(
    "DataEngineer",
    system_message="""You are a data engineer. When asked to process data, use the run_data_pipeline tool.
After processing, report the results. Say DONE when finished.""",
    is_termination_msg=lambda x: "DONE" in x.get("content", ""),
    llm_config=llm_config,
    functions=[run_data_pipeline],
)

# Run iteration, yielding on our custom PipelineStageEvent
for event in data_engineer.run_iter(
    message="Process 1000 records from the 'sales_data' source",
    max_turns=3,
    yield_on=[PipelineStageEvent, TextEvent, ToolCallEvent, TerminationEvent],
):
    # Handle input requests
    if isinstance(event, InputRequestEvent):
        user_input = input("  Input requested: ")
        event.content.respond(user_input)
        continue

    # Handle our custom pipeline event
    if isinstance(event, PipelineStageEvent):
        stage = event.content.stage_name
        records = event.content.records_processed
        passed = event.content.validation_passed
        status = "PASSED" if passed else "FAILED"
        print(f"\n[PIPELINE STAGE] {stage.upper()}")
        print(f"  Records: {records}")
        print(f"  Status: {status}")

        # You could add human approval here:
        # if stage == "validate" and not passed:
        #     approval = input("  Continue despite validation failure? (y/n): ")
        #     if approval.lower() != 'y':
        #         break  # Abort the pipeline
        continue

    # Handle other events
    if isinstance(event, ToolCallEvent):
        for tool_call in event.content.tool_calls:
            print(f"\n[TOOL CALL] {tool_call.function.name}")
    elif isinstance(event, TextEvent):
        content = str(event.content.content)[:150]
        print(f"\n[TEXT] {content}")

print("\n--- Pipeline run completed! ---")

## How the Event Wrapper Works

The `@wrap_event` decorator transforms your event class:

1. **Adds a `type` field** - Converted from class name (e.g., `PipelineStageEvent` â†’ `"pipeline_stage"`)
2. **Wraps in a content structure** - Your fields are accessed via `event.content.<field>`
3. **Enables serialization** - Events can be serialized for logging or transmission

Example structure after wrapping:
```python
# Before wrapping (your class)
class PipelineStageEvent(BaseEvent):
    stage_name: str
    records_processed: int
    validation_passed: bool

# After wrapping (what you receive)
event.type           # "pipeline_stage"
event.content.stage_name
event.content.records_processed
event.content.validation_passed
```

## Summary

To create custom events for run iteration:

1. **Define the event class**:
   - Inherit from `BaseEvent`
   - Decorate with `@wrap_event`
   - Name must end with `Event`
   - Define fields using Pydantic-style type hints

2. **Emit the event**:
   - Use `IOStream.get_default().send(YourEvent(...))`
   - Emit from tools, hooks, or custom agent code

3. **Yield on the event**:
   - Use `run_iter()` with `yield_on=[YourEvent, ...]`
   - Check for it with `isinstance(event, YourEvent)`
   - Access fields via `event.content.<field>`

This pattern enables powerful workflows like:

- Validation gates in data pipelines
- Human approval at critical checkpoints
- Progress monitoring for long-running tasks
- Custom logging and analytics