# Observability and Tracing



In [None]:
import asyncio

from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
from agent_framework.observability import setup_observability

In [None]:
class StartExecutor(Executor):
    @handler  # type: ignore[misc]
    async def handle_input(self, message: str, ctx: WorkflowContext[str]) -> None:
        # Transform and forward downstream. This produces executor.process and message.send spans.
        await ctx.send_message(message.upper())


class EndExecutor(Executor):
    @handler  # type: ignore[misc]
    async def handle_final(self, message: str, ctx: WorkflowContext) -> None:
        # Sink executor. The workflow completes when idle with no pending work.
        print(f"Final result: {message}")


async def main() -> None:
    # This will enable tracing and create the necessary tracing, logging and metrics providers
    # based on environment variables.
    setup_observability()

    # Build a two node graph: StartExecutor -> EndExecutor. The builder emits a workflow.build span.
    workflow = (
        WorkflowBuilder()
        .add_edge(StartExecutor(id="start"), EndExecutor(id="end"))
        .set_start_executor("start")  # set_start_executor accepts an executor id string or the instance
        .build()
    )  # workflow.build span emitted here

    # Run once with a simple payload. You should see workflow.run plus executor and message spans.
    await workflow.run("We are demonstrating Tracing to DVC")  # workflow.run + executor.process and message.send spans

In [None]:
await main()