# Exercise 3: Durable Agent

**Goal:** Build a durable weather agent using **Temporal + OpenAI Agents SDK** that automatically retries on failure.

**Timebox:** 15 minutes

## What You'll Learn

- How to structure a Temporal + OpenAI Agents application with 4 components
- How to convert Temporal activities into Agent tools using `activity_as_tool()`
- How to test durability by simulating network failures
- How Temporal makes agents production-ready **without changing agent code**


> **üí° Tip:** If you get stuck, check the solution notebook in the `solutions/` directory! Each exercise has a corresponding complete solution that you can reference.
## Prerequisites

Before starting, ensure you have:

1. **Temporal Server Running:**
   

### Flow Diagram üîÑ

```
User Query üë§
    ‚Üì
Temporal Workflow (orchestration layer) üé≠
    ‚Üì
Activity: Call LLM with tools ü§ñ
    ‚Üì
[Agent decides: need weather data]
    ‚Üì
Activity: Execute get_weather tool üîß
    ‚Üì
External API Call (NWS) üåê
    ‚Üì
Activity: Get final LLM response üí¨
    ‚Üì
Return to user ‚úÖ
```

**Each activity can retry independently, and the entire flow is durable!** üí™

### Component Structure üì¶

```bash
   make temporal-up
   # Or manually: temporal server start-dev
   ```
   Verify at: http://localhost:8233

2. **OpenAI API Key:** Add to `.env` file in project root:
   

## Setup

Before doing the exercise, you need to:

- Install necessary dependencies
- Create your `.env` file and supply your API key
- Load the environment variables
- Download and start a local Temporal Service

## Installation

Install dependencies and import required libraries.

In [None]:
%pip install --quiet temporalio openai-agents httpx nest-asyncio pytz

# Import all required modules
import asyncio
import httpx
import nest_asyncio
from datetime import timedelta, datetime
import pytz
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.contrib import openai_agents
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner, SandboxRestrictions
from agents import Agent, Runner

print("‚úÖ All imports successful")

## Key Concepts: Temporal Durability

### What is Temporal?

**Temporal** is a workflow orchestration platform that provides durable execution for your applications:

- **Automatic Retries:** Failed operations retry with exponential backoff
- **State Persistence:** Workflow state survives crashes and restarts  
- **Execution History:** Full audit trail visible in Temporal UI
- **Replay-Safe:** Workflows can resume from any point after failure

### How Agents + Temporal Work Together

This exercise combines **OpenAI Agents SDK** (from Exercise 1) with **Temporal**:

1. **Agent defines behavior** - Same agent code from Exercise 1
2. **Activity wraps execution** - Agent runs inside a Temporal activity
3. **Temporal handles failures** - Automatic retries on network errors, API failures, etc.
4. **No agent code changes** - Durability added by wrapping, not modifying!

### The `activity_as_tool()` Pattern

The key integration point is `activity_as_tool()`:

```python
tools=[
    openai_agents.workflow.activity_as_tool(
        get_weather,  # Your Temporal activity
        start_to_close_timeout=timedelta(seconds=10)
    )
]
```

This converts a Temporal activity into an Agent tool, giving you:
- ‚úÖ Automatic retries if the activity fails
- ‚úÖ State persistence across crashes
- ‚úÖ Full execution history in Temporal UI
- ‚úÖ Same agent interface as Exercise 1

### Why This Matters for Production

Without Temporal (Exercise 1):
- ‚ùå Network failure = user sees error
- ‚ùå Process crash = lost work  
- ‚ùå API timeout = no retry
- ‚ùå No audit trail

With Temporal (Exercise 3):
- ‚úÖ Network failure = automatic retry
- ‚úÖ Process crash = resumes from checkpoint
- ‚úÖ API timeout = configurable retries
- ‚úÖ Full execution history in UI

**The agent code is identical** - Temporal adds durability as a wrapper!

## Component 1: `activities.py`

Define the weather activity that fetches alerts from the National Weather Service API.

**TODO:** Implement the `get_weather` activity:
- Use `@activity.defn(name="get_weather")` decorator
- Accept `state: str` parameter and return `dict`
- Make async HTTP call to `https://api.weather.gov/alerts/active/area/{state}`
- Parse JSON and extract first 5 alerts
- Return dict with `state`, `count`, and `alerts` list

In [None]:
# TODO: Implement the get_weather activity
@activity.defn(name="get_weather")  # Register this function as a Temporal activity
async def get_weather(state: str) -> dict:  # Accept 2-letter state code, return structured data
    """Fetch active NWS alerts for a 2-letter US state code (e.g., 'CA')."""
    # TODO: Set User-Agent header required by National Weather Service API
    headers = {"User-Agent": ""}  # HINT: Use "Temporal-Agents-Workshop/1.0 (educational@example.com)"
    # TODO: Create async HTTP client with 10-second timeout to prevent hanging
    async with httpx.AsyncClient(timeout=10) as client:
        # TODO: Make GET request to NWS alerts endpoint for the specified state
        r = await client.get(
            f"https://api.weather.gov/alerts/active/area/{state}",
            headers=headers
        )
        # TODO: Raise exception if request fails (4xx or 5xx) - Temporal will auto-retry
        r.raise_for_status()
        # TODO: Parse JSON response into Python dictionary
        data = r.json()

    # TODO: Initialize empty list to collect alert information
    alerts = []
    # TODO: Loop through first 5 features (weather alerts) in the response
    for f in (data.get("features") or [])[:5]:
        # TODO: Extract properties object from each feature (contains alert details)
        p = f.get("properties", {})
        # TODO: Build structured alert dictionary with key information
        alerts.append({
            "event": p.get("event"),  # Alert type (e.g., "Flash Flood Warning")
            "headline": p.get("headline"),  # Human-readable alert headline
            "severity": p.get("severity"),  # Severity level (e.g., "Severe", "Moderate")
            "area": p.get("areaDesc"),  # Geographic area affected by alert
        })

    # TODO: Return structured response with state, count, and alerts for LLM to interpret
    return {"state": state.upper(), "count": len(alerts), "alerts": alerts}

print("‚úÖ Activity 'get_weather' defined")

## Component 2: `workflow.py`

Define the workflow that orchestrates the agent execution.

**TODO:** Implement the workflow:
- Define `TASK_QUEUE = "agents-sdk-queue"`
- Create `WeatherAgentWorkflow` class with `@workflow.defn(sandboxed=False)`
- Single parameter: `user_query: str` (no trace_id)
- Create Agent inside workflow with `activity_as_tool()`
- Use `Runner().run()` (instantiate Runner)
- Return `getattr(result, "final_output", str(result))`

In [None]:
# TODO: Define which task queue this workflow will use for communication
TASK_QUEUE = "agents-sdk-queue"

# TODO: Disable sandbox for Jupyter compatibility using @workflow.defn(sandboxed=False)
@workflow.defn(sandboxed=False)  # Disable sandbox for Jupyter compatibility
class WeatherAgentWorkflow:  # Define workflow class for orchestrating the agent
    @workflow.run  # Mark this method as the workflow entry point
    async def run(self, user_query: str) -> str:  # Accept user query, return agent response
        # TODO: Create the Agent inside the workflow with tools and instructions
        agent = Agent(
            name="Weather Assistant",  # Name shown in OpenAI traces
            # TODO: Define agent's role and behavior when handling user queries
            instructions=(
                ""  # HINT: Tell the agent to explain weather alerts and use the get_weather tool
            ),
            # TODO: Provide list of tools the agent can use
            tools=[
                # TODO: Convert Temporal activity to Agent tool for durable execution
                openai_agents.workflow.activity_as_tool(
                    get_weather,  # The activity function to wrap
                    start_to_close_timeout=timedelta(seconds=10),  # Max time for activity execution
                )
            ],
        )
        
        # TODO: Run the agent with user query - tool calls execute as Temporal activities
        result = await Runner().run(agent, user_query)
        
        # TODO: Safely extract final output from result object (handles different result types)
        return getattr(result, "final_output", str(result))

print(f"‚úÖ Workflow 'WeatherAgentWorkflow' defined (task queue: {TASK_QUEUE})")

## Comparison with Exercise 1

This exercise uses the **same weather agent** from Exercise 1, but wraps it with Temporal for durability:

| Aspect | Exercise 1: Agent Hello World | Exercise 3: Durable Agent |
|--------|-------------------------------|---------------------------|
| **Agent Code** | ‚úÖ Weather Agent | ‚úÖ **SAME** Weather Agent |
| **Framework** | Agents SDK only | Agents SDK + Temporal |
| **Retries** | None | Automatic (exponential backoff) |
| **State** | Ephemeral (lost on crash) | Persisted (survives crashes) |
| **Network Failure** | ‚ùå Immediate failure | ‚úÖ Auto-retry and recover |
| **Observability** | Console logs only | Full history in Temporal UI |
| **Production Ready** | No | Yes ‚úÖ |

### The Key Insight

**The agent itself doesn't change!** 

In Exercise 1, you created an agent with a weather tool:
```python
agent = Agent(
    name="Weather Agent",
    tools=[get_weather_alerts]  # Function tool
)
```

In Exercise 3, you create the **exact same agent**, but the tool is now a Temporal activity:
```python
agent = Agent(
    name="Weather Agent",
    tools=[activity_as_tool(get_weather)]  # Temporal activity as tool
)
```

Temporal adds durability by **wrapping the agent execution**, not by modifying the agent code!

## Component 3: `worker.py`

Run the worker that polls for and executes workflow/activity tasks.

**TODO:** Implement the worker:
- Connect to Temporal with `OpenAIAgentsPlugin` and `ModelActivityParameters`
- Create Worker with workflows and activities lists
- No sandbox configuration needed (using `sandboxed=False` on workflow)
- Run worker in background using `asyncio.create_task()`

In [None]:
# TODO: Define async function to start and run the worker
async def run_worker():  # Define async function to start and run the worker
    """Start a Temporal worker that listens for workflow and activity tasks."""
    # TODO: Connect to local Temporal server with OpenAI Agents SDK plugin
    client = await Client.connect(
        "HELLO",  # Temporal server address
        # TODO: Register plugins to extend Temporal's functionality
        plugins=[
            # TODO: Plugin that enables OpenAI Agents SDK integration with Temporal
            OpenAIAgentsPlugin(
                # TODO: Configure timeout settings for AI model inference activities
                model_params=ModelActivityParameters(
                    MAYBE_SOME_TIMEOUT=timedelta(seconds=30)  # Max time for LLM calls
                )
            )
        ],
    )

    # TODO: Create worker that polls the task queue for work
    worker = Worker(
        client,  # Use the connected Temporal client
        task_queue=TASK_QUEUE,  # Which queue to poll for tasks
        workflows=[],  # List of workflows this worker can execute
        activities=[],  # List of activities this worker can execute
    )
    
    print(f"‚úÖ Worker started on task queue: {TASK_QUEUE}")
    print("   Listening for workflow and activity tasks...")
    await worker.run()

# Allow async functions to be called from within other async contexts
# This is needed because Jupyter notebooks already run in an event loop
nest_asyncio.apply()
# Create a task that runs the worker concurrently with other notebook operations
worker_task = asyncio.create_task(run_worker())
print("üîÑ Worker running in background")

## Component 4: `starter.py`

Execute the workflow and display results.

**TODO:** Implement the starter:
- Use `start_workflow` (not `execute_workflow`)
- Generate workflow ID with EST timestamp
- Single query parameter (no trace_id)
- Print workflow ID and Temporal UI link
- Wait for result with `await handle.result()`

### üéØ Test Durability (Optional)

After running the cell below:
1. **While executing, disconnect your network** (WiFi off)
2. **Watch** the activity fail and Temporal retry
3. **Reconnect** your network
4. **Observe** automatic completion!

In [None]:
# TODO: Define async function to execute the workflow
async def run_exercise():  # Define async function to execute the workflow
    """Execute the weather agent workflow."""
    # TODO: Define the user query to send to the agent
    query = "What weather alerts are active in CA?"
    
    # TODO: Generate workflow ID with EST timestamp for human-readable tracking
    est = pytz.timezone('US/Eastern')  # Create EST timezone object
    now = datetime.now(est)  # Get current time in EST
    # TODO: Format timestamp as readable string with day-month-date-time pattern
    workflow_id = f"weather-{now.strftime('%a-%b-%d-%I%M%S').lower()}est"
    
    # TODO: Connect to Temporal server with OpenAI Agents SDK plugin
    client = await Client.connect(
        "localhost:7233",  # Temporal server address
        plugins=[OpenAIAgentsPlugin()]  # Enable OpenAI Agents integration
    )
    
    print(f"üöÄ Starting workflow: {workflow_id}")
    print(f"üìù Query: {query}\n")
    
    # TODO: Start the workflow (non-blocking) and get handle for tracking
    handle = await client.start_workflow(
        WeatherAgentWorkflow.run,  # Workflow method to execute
        query,  # User query parameter passed to workflow
        id=workflow_id,  # Unique workflow ID for tracking in Temporal UI
        task_queue=TASK_QUEUE  # Queue where worker will pick up this workflow
    )
    
    print(f"‚úÖ Workflow started: {handle.id}")
    # TODO: Print Temporal UI link for observing workflow execution
    print(f"üîó View in Temporal UI: http://localhost:8233/namespaces/default/workflows/{workflow_id}\n")
    print("‚è≥ Waiting for agent response...\n")
    
    # TODO: Wait for workflow to complete and get result (blocking)
    result = await handle.result()
    
    print("=" * 60)
    print("ü§ñ Agent Response:")
    print("=" * 60)
    print(result)
    print("=" * 60)

# Run the exercise with Jupyter-specific async handling
try:
    # Try to get existing event loop (Jupyter has one running)
    loop = asyncio.get_running_loop()
    # We need to allow nested async calls since Jupyter already has a loop
    nest_asyncio.apply()
    # Execute in existing loop
    await run_exercise()
except RuntimeError:
    # If no loop exists, create new one and run
    # This happens in regular Python scripts or non-async environments
    asyncio.run(run_exercise())

## Summary

You've implemented a durable weather agent using the **4-component pattern**:

1. ‚úÖ **activities.py** - Weather activity that fetches NWS alerts
2. ‚úÖ **workflow.py** - Workflow orchestrating the agent
3. ‚úÖ **worker.py** - Worker running in background
4. ‚úÖ **starter.py** - Workflow execution

## Key Takeaways

- **Agent tool calls are durable** - Temporal automatically retries failed activities
- **Clean separation of concerns** - Each component has a specific responsibility
- **Production-ready pattern** - This structure scales to real applications
- **No agent code changes** - Durability added by wrapping, not modifying!

## What You've Learned

- ‚úÖ How to wrap agents in Temporal activities for durability
- ‚úÖ How automatic retries work without changing agent code
- ‚úÖ How to structure production Temporal applications (4-component pattern)
- ‚úÖ How to test durability by simulating network failures
- ‚úÖ How to observe execution history in Temporal UI

## Next Steps

**Continue your learning journey:**

- **[Exercise 4: Agent Routing](../04_agent_routing/)** - Build multi-agent systems with intelligent routing patterns

**Want to dive deeper?**

- Check the [solution notebook](../../solutions/03_durable_agent/solution.ipynb) for the complete implementation
- Explore the Temporal UI at http://localhost:8233 to see execution history
- Try the durability test: disconnect network during execution and watch automatic recovery!

## Expected Output

When you complete the TODOs and run all cells, you should see:

### Normal Execution:

```
üöÄ Starting workflow: weather-mon-oct-27-143045est
üìù Query: What weather alerts are active in CA?

‚úÖ Workflow started: weather-mon-oct-27-143045est
üîó View in Temporal UI: http://localhost:8233/namespaces/default/workflows/weather-mon-oct-27-143045est

‚è≥ Waiting for agent response...

============================================================
ü§ñ Agent Response:
============================================================
Currently, California has 2 active weather alerts:
- Wind Advisory (Moderate) in effect until midnight
- High Surf Advisory (Minor) along the coast until 9 PM
============================================================
```

### Testing Durability (Optional):

If you disconnect your network **while the workflow is executing**:

1. **Activity fails** - You'll see an error in the worker logs
2. **Temporal retries** - Automatically tries again after a delay
3. **Reconnect network** - The next retry succeeds
4. **Workflow completes** - You get the result as if nothing happened!

**Check the Temporal UI** to see:
- Retry attempts in the activity history
- Timeline showing when retries occurred
- Final successful completion

This demonstrates the power of durable execution!

## Troubleshooting

### Error: `Failed to connect to Temporal server`

**Fix:** Ensure Temporal is running:
```bash
make temporal-up
# Or: temporal server start-dev
```

Check server at: http://localhost:8233

### Error: `OPENAI_API_KEY is not set`

**Fix:** Add key to `.env` file in project root:
```
OPENAI_API_KEY=sk-your-key-here
```

Then reload: `load_dotenv()`

### Worker not picking up workflows

**Possible causes:**
- Worker isn't running (check cell output for "Worker started")
- Task queue mismatch (ensure `TASK_QUEUE` matches in workflow and worker)
- Worker crashed (check for error messages in worker cell)

**Fix:** Re-run the worker cell to restart it

### Network disconnect doesn't cause retry

**For durability testing:**
- Disconnect network **during** the API call, not before workflow starts
- Watch Temporal UI for retry attempts
- Look for activity errors in the Timeline view

### No active alerts returned

**This is normal!** Many states have no alerts.

**To test:**
- Try different states: CA, TX, FL
- Check [weather.gov](https://www.weather.gov) for states with active alerts
- Winter months typically have more alerts