# Exercise 3: Durable Agent — Solution

Complete implementation combining **OpenAI Agents SDK** with **Temporal** for durability.

This solution demonstrates how Temporal automatically retries failed operations, making your AI agents resilient to network failures and other transient errors.

## Architecture

This solution follows a **4-component pattern** that mirrors production Temporal applications:

1. **`activities.py`** - Defines the weather activity
2. **`workflow.py`** - Defines the workflow orchestration
3. **`worker.py`** - Runs the worker that executes workflows/activities
4. **`starter.py`** - Executes the workflow

Each component is implemented in a separate Jupyter cell below for clarity.

## Setup

Before doing the exercise, you need to:

- Install necessary dependencies
- Create your `.env` file and supply your API key
- Load the environment variables
- Download and start a local Temporal Service

In [14]:
%pip install --quiet temporalio openai-agents httpx nest-asyncio pytz

# Import all required modules
import asyncio
import httpx
from datetime import timedelta, datetime
import pytz
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.contrib import openai_agents
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner, SandboxRestrictions
from agents import Agent, Runner

print("✅ All imports successful")


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
✅ All imports successful
✅ All imports successful


## Component 1: `activities.py`

Define the weather activity that fetches alerts from the National Weather Service API.

**Key points:**
- Returns a `dict` (not formatted string) so the LLM can interpret the data
- Activity name explicitly set with `@activity.defn(name="get_weather")`
- Simple error handling - just `raise_for_status()` and let Temporal retry
- No console logging or attempt tracking needed

In [None]:
@activity.defn(name="get_weather")  # Register this function as a Temporal activity
async def get_weather(state: str) -> dict:  # Accept 2-letter state code, return structured data
    """Fetch active NWS alerts for a 2-letter US state code (e.g., 'CA')."""
    # Set User-Agent header required by National Weather Service API
    headers = {"User-Agent": "Temporal-Agents-Workshop/1.0 (educational@example.com)"}
    # Create async HTTP client with 10-second timeout to prevent hanging
    async with httpx.AsyncClient(timeout=10) as client:
        # Make GET request to NWS alerts endpoint for the specified state
        r = await client.get(
            f"https://api.weather.gov/alerts/active/area/{state}",
            headers=headers
        )
        # Raise exception if request fails (4xx or 5xx) - Temporal will auto-retry
        r.raise_for_status()
        # Parse JSON response into Python dictionary
        data = r.json()

    # Initialize empty list to collect alert information
    alerts = []
    # Loop through first 5 features (weather alerts) in the response
    for f in (data.get("features") or [])[:5]:
        # Extract properties object from each feature (contains alert details)
        p = f.get("properties", {})
        # Build structured alert dictionary with key information
        alerts.append({
            "event": p.get("event"),  # Alert type (e.g., "Flash Flood Warning")
            "headline": p.get("headline"),  # Human-readable alert headline
            "severity": p.get("severity"),  # Severity level (e.g., "Severe", "Moderate")
            "area": p.get("areaDesc"),  # Geographic area affected by alert
        })

    # Return structured response with state, count, and alerts for LLM to interpret
    return {"state": state.upper(), "count": len(alerts), "alerts": alerts}

print("✅ Activity 'get_weather' defined")

In [None]:
# Define which task queue this workflow will use for communication
TASK_QUEUE = "agents-sdk-queue"

@workflow.defn(sandboxed=False)  # Disable sandbox for Jupyter compatibility
class WeatherAgentWorkflow:  # Define workflow class for orchestrating the agent
    @workflow.run  # Mark this method as the workflow entry point
    async def run(self, user_query: str) -> str:  # Accept user query, return agent response
        # Create the Agent inside the workflow with tools and instructions
        agent = Agent(
            name="Weather Assistant",  # Name shown in OpenAI traces
            # Define agent's role and behavior when handling user queries
            instructions=(
                "You are a helpful assistant that explains current weather alerts for U.S. states. "
                "When the user asks about alerts, call the get_weather tool, then summarize results."
            ),
            # Provide list of tools the agent can use
            tools=[
                # Convert Temporal activity to Agent tool for durable execution
                openai_agents.workflow.activity_as_tool(
                    get_weather,  # The activity function to wrap
                    start_to_close_timeout=timedelta(seconds=10),  # Max time for activity execution
                )
            ],
        )
        
        # Run the agent with user query - tool calls execute as Temporal activities
        result = await Runner().run(agent, user_query)
        
        # Safely extract final output from result object (handles different result types)
        return getattr(result, "final_output", str(result))

print(f"✅ Workflow 'WeatherAgentWorkflow' defined (task queue: {TASK_QUEUE})")

## Component 3: `worker.py`

Run the worker that polls for and executes workflow/activity tasks.

**Key points:**
- Connects to Temporal with `OpenAIAgentsPlugin` (required)
- Configures `ModelActivityParameters` for AI model timeout settings
- Uses `SandboxedWorkflowRunner` with passthrough for `httpx`
- Runs in background as an async task in Jupyter

In [None]:
async def run_worker():  # Define async function to start and run the worker
    """Start a Temporal worker that listens for workflow and activity tasks."""
    # Connect to local Temporal server with OpenAI Agents SDK plugin
    client = await Client.connect(
        "localhost:7233",  # Temporal server address
        # Register plugins to extend Temporal's functionality
        plugins=[
            # Plugin that enables OpenAI Agents SDK integration with Temporal
            OpenAIAgentsPlugin(
                # Configure timeout settings for AI model inference activities
                model_params=ModelActivityParameters(
                    start_to_close_timeout=timedelta(seconds=30)  # Max time for LLM calls
                )
            )
        ],
    )

    # Create worker that polls the task queue for work
    worker = Worker(
        client,  # Use the connected Temporal client
        task_queue=TASK_QUEUE,  # Which queue to poll for tasks
        workflows=[WeatherAgentWorkflow],  # List of workflows this worker can execute
        activities=[get_weather],  # List of activities this worker can execute
    )
    
    print(f"✅ Worker started on task queue: {TASK_QUEUE}")
    print("   Listening for workflow and activity tasks...")
    # Start polling and executing tasks (blocks until stopped)
    await worker.run()

# Apply nest_asyncio to allow nested event loops in Jupyter
import nest_asyncio
nest_asyncio.apply()
# Create background task to run worker without blocking the notebook
worker_task = asyncio.create_task(run_worker())
print("🔄 Worker running in background")

## Component 4: `starter.py`

Execute the workflow and display results.

**Key points:**
- Uses `start_workflow` (not `execute_workflow`) for better control
- Workflow ID uses EST timestamp pattern
- Single query parameter (no trace_id)
- Waits for result with `handle.result()`

### 🎯 Demonstrating Durability

To see Temporal's automatic retry in action:

1. 
2. **Run the cell below** to start the workflow
3. **While it's executing, disconnect your network** (WiFi off or disable network adapter)
4. **Watch the logs** - You'll see the activity fail and Temporal retry
5. **Reconnect your network**
6. **Observe** Temporal automatically complete the workflow on the next retry!

In [None]:
async def run_solution():  # Define async function to execute the workflow
    """Execute the weather agent workflow."""
    # Define the user query to send to the agent
    query = "What weather alerts are active in CA?"
    
    # Generate workflow ID with EST timestamp for human-readable tracking
    est = pytz.timezone('US/Eastern')  # Create EST timezone object
    now = datetime.now(est)  # Get current time in EST
    # Format timestamp as readable string with day-month-date-time pattern
    workflow_id = f"weather-{now.strftime('%a-%b-%d-%I%M%S').lower()}est"
    
    # Connect to Temporal server with OpenAI Agents SDK plugin
    client = await Client.connect(
        "localhost:7233",  # Temporal server address
        plugins=[OpenAIAgentsPlugin()]  # Enable OpenAI Agents integration
    )
    
    print(f"🚀 Starting workflow: {workflow_id}")
    print(f"📝 Query: {query}\n")
    
    # Start the workflow (non-blocking) and get handle for tracking
    handle = await client.start_workflow(
        WeatherAgentWorkflow.run,  # Workflow method to execute
        query,  # User query parameter passed to workflow
        id=workflow_id,  # Unique workflow ID for tracking in Temporal UI
        task_queue=TASK_QUEUE  # Queue where worker will pick up this workflow
    )
    
    print(f"✅ Workflow started: {handle.id}")
    # Print Temporal UI link for observing workflow execution
    print(f"🔗 View in Temporal UI: http://localhost:8233/namespaces/default/workflows/{workflow_id}\n")
    print("⏳ Waiting for agent response...\n")
    
    # Wait for workflow to complete and get result (blocking)
    result = await handle.result()
    
    print("=" * 60)
    print("🤖 Agent Response:")
    print("=" * 60)
    print(result)
    print("=" * 60)

# Run the solution with Jupyter-specific async handling
try:
    # Try to get existing event loop (Jupyter has one running)
    loop = asyncio.get_running_loop()
    # Execute in existing loop
    await run_solution()
except RuntimeError:
    # If no loop exists, create new one and run
    asyncio.run(run_solution())

## Simulating Failure and Recovery

Let's practice experiencing failure and recovery firsthand. We'll add a new weather validation activity that intentionally fails to demonstrate:

* How Activities automatically retry on failure
* How Temporal preserves state across Worker restarts
* How you can fix bugs without losing progress

This mirrors real-world scenarios like API outages, network failures, or temporary service unavailability.

### Step 1: Create a New Activity with an Intentional Error

We'll create a `get_weather_err` activity that simulates a weather API validation check that's temporarily failing. This could represent:
- A validation service that's down
- A rate-limited API endpoint
- A network partition

Run the code below to add this Activity:

In [None]:
from temporalio import activity
from temporalio.exceptions import ApplicationError

@activity.defn(name="get_weather_err")
async def get_weather_err(state: str) -> dict:
    """Simulate a failing weather API validation check."""
    # Simulate a temporary service failure
    raise ApplicationError(
        "Simulated failure: Weather validation service temporarily unavailable"
    )

print("✅ Activity 'get_weather_err' created with intentional error!")

### Step 2: Update the Workflow to Call the Validation Activity

Now we'll modify our Workflow to:
1. **Validate the state input (new - this will fail!)**
2. Create the Agent with tools
3. Run the agent to get weather alerts

Run the code below:

In [None]:
from temporalio import workflow

@workflow.defn(sandboxed=False)
class ErrorWorkflow:
    @workflow.run
    async def run(self, user_query: str) -> str:
        # Extract state code from query for validation (simple parsing)
        state = "NY"
        for word in user_query.split():
            if len(word) == 2 and word.isupper():
                state = word
                break
        
        # Step 1: Validate state input (NEW - will fail initially!)
        await workflow.execute_activity(
            get_weather_err,
            state,
            start_to_close_timeout=timedelta(seconds=10),
        )
        
        workflow.logger.info("State validation successful!")
        
        # Step 2: Create and run the agent
        agent = Agent(
            name="Weather Assistant",
            instructions=(
                "You are a helpful assistant that explains current weather alerts for U.S. states. "
                "When the user asks about alerts, call the get_weather tool, then summarize results."
            ),
            tools=[
                openai_agents.workflow.activity_as_tool(
                    get_weather,
                    start_to_close_timeout=timedelta(seconds=10),
                )
            ],
        )
        
        result = await Runner().run(agent, user_query)
        
        return getattr(result, "final_output", str(result))

print(f"✅ Workflow updated with validation step!")

### Step 3: Register the New Activity with the Worker

We need to tell the Worker about our new `get_weather_err` Activity:

In [None]:
from temporalio.client import Client
from temporalio.worker import Worker

async def run_worker():
    """Start a Temporal worker that listens for workflow and activity tasks."""
    client = await Client.connect(
        "localhost:7233",
        plugins=[
            OpenAIAgentsPlugin(
                model_params=ModelActivityParameters(
                    start_to_close_timeout=timedelta(seconds=30)
                )
            )
        ],
    )

    worker = Worker(
        client,
        task_queue=TASK_QUEUE,
        workflows=[ErrorWorkflow],
        activities=[get_weather, get_weather_err],  # Added get_weather_err
    )
    
    print(f"✅ Worker started on task queue: {TASK_QUEUE}")
    print("   Listening for workflow and activity tasks...")
    await worker.run()

print("✅ Worker function redefined with ErrorWorkflow and get_weather_err activity")

In [None]:
# Kill the old worker and start the new one
try:
    worker_task.cancel()
    try:
        await worker_task
    except asyncio.CancelledError:
        pass
except NameError:
    print("ℹ️  No existing worker to cancel")

worker_task = asyncio.create_task(run_worker())
print("🔄 New worker started with get_weather_err activity!")


### Step 4: Start a New Workflow Execution

Let's start a new Workflow that will call our failing Activity:

In [15]:
async def start_failing_workflow():
    """Start a workflow that will initially fail."""
    query = "What weather alerts are active in CA?"
    
    est = pytz.timezone('US/Eastern')
    now = datetime.now(est)
    workflow_id = f"weather-failing-{now.strftime('%a-%b-%d-%I%M%S').lower()}est"
    
    client = await Client.connect(
        "localhost:7233",
        plugins=[OpenAIAgentsPlugin()]
    )
    
    print(f"🚀 Starting workflow: {workflow_id}")
    print(f"📝 Query: {query}\n")
    
    handle = await client.start_workflow(
        ErrorWorkflow.run,
        query,
        id=workflow_id,
        task_queue=TASK_QUEUE
    )
    
    print(f"✅ Workflow started: {handle.id}")
    print(f"🔗 View in Temporal UI: http://localhost:8233/namespaces/default/workflows/{workflow_id}\n")
    print("⚠️  The workflow is now running and will retry the failing activity automatically!")
    print("   Go to the Temporal UI to watch the retries in action!")

# Run the workflow
try:
    loop = asyncio.get_running_loop()
    await start_failing_workflow()
except RuntimeError:
    asyncio.run(start_failing_workflow())

🚀 Starting workflow: weather-failing-wed-oct-22-080957est
📝 Query: What weather alerts are active in CA?



NameError: name 'TASK_QUEUE' is not defined

### Step 5: Observe Automatic Retries in the Web UI

**Go to your Temporal Web UI now!**

You should see:
1. Your Workflow is **Running** (not Failed!)
2. The `get_weather_err` Activity shows a **Pending Activity** with retry attempts
3. Click on the Pending Activity to see:
   - The error message: "Simulated failure: Weather validation service temporarily unavailable"
   - The current retry attempt number
   - The countdown until the next retry

**Key insight:** Temporal is automatically retrying the failed activity. The workflow hasn't failed - it's patiently waiting and retrying with exponential backoff.

### Step 6: Fix the Error

Now let's "fix" our simulated failure by creating a working version of the activity. In a real scenario, this could be:
- A service coming back online
- An API endpoint being fixed
- A network issue being resolved

Run the code below to create the fixed activity:

In [None]:
@activity.defn(name="get_weather_err")
async def get_weather_err(state: str) -> dict:
    """Weather API validation check - now working!"""
    # Error is now fixed! Service is back online.
    return {"validated": True, "state": state.upper()}

print("✅ Activity fixed! Error removed.")

### Step 7: Restart the Worker with Fixed Code

Now restart the Worker so it picks up the fixed Activity code:

In [None]:
# Kill the old worker and start the new one with fixed code
try:
    worker_task.cancel()
    try:
        await worker_task
    except asyncio.CancelledError:
        pass
except NameError:
    print("ℹ️  No existing worker to cancel")

worker_task = asyncio.create_task(run_worker())
print("🔄 Worker restarted with fixed activity!")

### Step 8: Observe Successful Completion

**Refresh your Web UI and observe:**

1. The `get_weather_err` Activity now completes successfully! ✅
2. The workflow continues and the agent runs with the `get_weather` tool
3. The entire Workflow shows **Completed** status

**What just happened?**
- Your Workflow **preserved all state** through the failure
- When you fixed the bug, Temporal **automatically continued** from where it left off
- No manual intervention needed - just fix the code and restart the Worker
- The agent execution happened seamlessly after the validation passed

**This is the power of durable execution!** In production, this means:
- API outages don't lose your progress
- You can deploy bug fixes without restarting workflows
- Your users never lose work
- Expensive operations (like LLM calls) are never re-executed unnecessarily