# Strands Agents Fundamentals - Part 2

### 1.f Realtime Streaming and handling of Agent Responses - Async Iterators

Strands Agents SDK provides support for **[asynchronous iterators](https://strandsagents.com/latest/user-guide/concepts/streaming/async-iterators/)** through the stream_async method, enabling real-time streaming of agent responses in asynchronous environments like web servers, APIs, and other async applications.

**Async Iterator event types -** 
- Text generation from the model
- Tool selection and execution
- Reasoning process
- Errors and completions

In [None]:
from strands import Agent
import asyncio
from strands_tools import calculator

async_iter_agent = Agent(
    tools=[calculator],
    callback_handler=None  # Disable default callback handler
)

# Async function that iterates over streamed agent events
async def process_streaming_response():
    query = "What is 25 * 48 and explain the calculation"

    # Get an async iterator for the agent's response stream
    agent_stream = async_iter_agent.stream_async(query)

    # Process events as they arrive
    async for event in agent_stream:
        # print(event)  # OPTIONAL:: uncomment this print statement to display detailed event trace
        if "data" in event:
            # Print text chunks as they're generated
            print(event["data"], end="", flush=True)
        elif "current_tool_use" in event and event["current_tool_use"].get("name"):
            # Print tool usage information
            print(f"\n[Tool use delta for: {event['current_tool_use']['name']}]")

## Run the agent with the async event processing ##
# Use below code to invoke agent from Jupyter notebook. 
# Comment this await statement and uncomment aysncio.run(), if you are not executing this script in a Jupyter notebook.
await process_streaming_response()

# Uncomment below statement if using in a python file. asyncio.run () fails in Jupyter notebook. 
# asyncio.run(process_streaming_response()) 

### 1.g Realtime Streaming and handling of Agent Responses - Callback Handlers

**[Callback handlers](https://strandsagents.com/latest/user-guide/concepts/streaming/callback-handlers/)** are a powerful feature of the Strands Agents SDK that allow you to intercept and process events as they happen during agent execution. This enables real-time monitoring, custom output formatting, and integration with external systems.
Callback handlers are an alternate for Async Iterators

**Callback handlers receive events in real-time as they occur during an agent's lifecycle:**
- Text generation from the model
- Tool selection and execution
- Reasoning process
- Errors and completions

---

#### An Example for Event Loop Lifecycle Tracking

This callback handler illustrates the event loop lifecycle events and how they relate to each other. It's useful for understanding the flow of execution in the Strands agent: 

**The output will show the sequence of events:**
- First the event loop initializes (`init_event_loop`)
- Then the cycle begins (`start_event_loop`)
- New cycles may start multiple times during execution (`start`)
- Text generation and tool usage events occur during the cycle
- Finally, the cycle completes (`complete`) or may be force-stopped


In [None]:
from strands_tools import calculator

def event_loop_tracker(**kwargs):
    # Track event loop lifecycle
    if kwargs.get("init_event_loop", False):
        print("🔄 Event loop initialized")
    elif kwargs.get("start_event_loop", False):
        print("▶️ Event loop cycle starting")
    elif kwargs.get("start", False):
        print("📝 New cycle started")
    elif "message" in kwargs:
        print(f"📬 New message created: {kwargs['message']['role']}")
    elif kwargs.get("complete", False):
        print("✅ Cycle completed")
    elif kwargs.get("force_stop", False):
        print(f"🛑 Event loop force-stopped: {kwargs.get('force_stop_reason', 'unknown reason')}")

    # Track tool usage
    if "current_tool_use" in kwargs and kwargs["current_tool_use"].get("name"):
        tool_name = kwargs["current_tool_use"]["name"]
        print(f"🔧 Using tool: {tool_name}")

    # Show only a snippet of text to keep output clean
    if "data" in kwargs:
        # Only show first 20 chars of each chunk for demo purposes
        data_snippet = kwargs["data"][:20] + ("..." if len(kwargs["data"]) > 20 else "")
        print(f"📟 Text: {data_snippet}")

# Create agent with event loop tracker
agent = Agent(
    tools=[calculator],
    callback_handler=event_loop_tracker
)

# This will show the full event lifecycle in the console
callback_handler_agent_response = agent("What is the capital of France and what is 42+7?")

### **1.h Observability**

In the Strands Agents SDK, observability refers to the ability to measure system behavior and performance. [Observability](https://strandsagents.com/latest/user-guide/observability-evaluation/observability/) is the combination of instrumentation, data collection, and analysis techniques that provide insights into an agent's behavior and performance.
Building observable agents starts with monitoring the right telemetry. While we leverage the same fundamental building blocks as traditional software — [traces](https://strandsagents.com/latest/user-guide/observability-evaluation/traces/), [metrics](https://strandsagents.com/latest/user-guide/observability-evaluation/metrics/), and [logs](https://strandsagents.com/latest/user-guide/observability-evaluation/logs/) — their application to agents requires special consideration. We need to capture not only standard application telemetry but also AI-specific signals like model interactions, reasoning steps, and tool usage.

### **[Metrics](https://strandsagents.com/latest/user-guide/observability-evaluation/metrics/)**

#### **Metrics for the agent invocation using non-default model (Metrics for Step 1.c)**

In [None]:
%store -r agent_response_with_nova_premier
agent_response_with_nova_premier.metrics.get_summary()

#### **Metrics for the agent invocation using callback handler (Metrics for Step 1.g)**

In [None]:
callback_handler_agent_response.metrics.get_summary()

### **[Traces](https://strandsagents.com/latest/user-guide/observability-evaluation/traces/)**

Tracing is a fundamental component of the Strands SDK's observability framework, providing detailed insights into your agent's execution. Using the OpenTelemetry standard, Strands traces capture the complete journey of a request through your agent, including LLM interactions, retrievers, tool usage, and event loop processing.

Strands natively integrates with OpenTelemetry, an industry standard for distributed tracing.

----

#### **Trace Structure**
Strands creates a hierarchical trace structure that mirrors the execution of your agent: 
- **Agent Span:** The top-level span representing the entire agent invocation - Contains overall metrics like total token usage and cycle count - Captures the user prompt and final response
- **Cycle Spans:** Child spans for each event loop cycle - Tracks the progression of thought and reasoning - Shows the transformation from prompt to response
- **LLM Spans:** Model invocation spans - Contains prompt, completion, and token usage - Includes model-specific parameters
- **Tool Spans:** Tool execution spans - Captures tool name, parameters, and results - Measures tool execution time


Below example shows the detailed traces that can be sent to any of the OTEL compatible tool for visualization and anaysis   

In [None]:
# Option 3: Use StrandsTelemetry with your own tracer provider
# (Keeps your tracer provider, adds Strands exporters without setting global)
from strands.telemetry import StrandsTelemetry

strands_telemetry = StrandsTelemetry()
strands_telemetry.setup_otlp_exporter().setup_console_exporter()  # Chaining supported

# Create agent (tracing will be enabled automatically)
agent = Agent(
    model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
    system_prompt="You are a helpful AI assistant"
)

# Use agent normally
response = agent("What can you help me with?") 

In [None]:
## Enable this section if you have an LangFuse endpoint to deliver the traces
#import os
#import base64

#os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = ""
 
# Get keys for your project from the project settings page: https://cloud.langfuse.com
#os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-....."
#os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-....." 
#os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com" # 🇪🇺 EU region (default)
# os.environ["LANGFUSE_HOST"] = "https://us.cloud.langfuse.com" # 🇺🇸 US region
 
# Build Basic Auth header.
#LANGFUSE_AUTH = base64.b64encode(
#    f"{os.environ.get('LANGFUSE_PUBLIC_KEY')}:{os.environ.get('LANGFUSE_SECRET_KEY')}".encode()
#).decode()
 
# Configure OpenTelemetry endpoint & headers
#os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = os.environ.get("LANGFUSE_HOST") + "/api/public/otel/v1/traces"
#os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {LANGFUSE_AUTH}"

In [None]:
from strands.telemetry import StrandsTelemetry


strands_telemetry = StrandsTelemetry()
strands_telemetry.setup_console_exporter()   # Print traces to console

# Uncomment this statement if the OTLP endpoint is configured in the previous cell
#strands_telemetry.setup_otlp_exporter()      # Send traces to OTLP endpoint

# Create agent
agent = Agent(
    model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
    system_prompt="You are a helpful AI assistant",
    trace_attributes={
        "session.id": "test-session-1234", # Example session ID
        "user.id": "rs-ws@demo.com", # Example user ID
        "langfuse.tags": [
            "Agent-SDK-Example",
            "Strands-Project-Demo",
            "Observability-Tutorial"
        ]
    }
)

# Execute a series of interactions that will be traced
response = agent("Find me information about Mars. What is its atmosphere like?")
print(response)

# Ask a follow-up that uses tools
response = agent("Calculate how long it would take to travel from Earth to Mars at 100,000 km/h")
print(response)

# Each interaction creates a complete trace that can be visualized in your tracing tool

### **1.h Agent State Management**

Strands [Agents state](https://strandsagents.com/latest/user-guide/concepts/agents/state-sessions/) is maintained in several forms:
- **Conversation History:** The sequence of messages between the user and the agent.
- **Agent State:** Stateful information outside of conversation context, maintained across multiple requests.
- **Request State:** Contextual information maintained within a single request.


#### **Conversation History**

Conversation history is the primary form of context in a Strands agent, directly accessible through the `agent.messages` property. 

**Below example shows how the conversation gets updated after each iteration of agent invocation.**

In [None]:
# Create an agent with initial messages
agent = Agent(messages=[
    {"role": "user", "content": [{"text": "Hello, my name is Strands!"}]},
    {"role": "assistant", "content": [{"text": "Hi there! How can I help you today?"}]}
])
# Access the conversation history
print ("Initial Conversation :: ", agent.messages)
print ("\n")

agent("Explain Agentic AI under 100 words?")
print ("\n")
print ("Next Iteration :: ", agent.messages)
print ("\n")

agent("Describe Amazon Bedrock under 100 words?")
print ("\n")
print("Final Iteration :: ", agent.messages)  # Shows all messages exchanged so far

### **Conversation Manager**

Strands uses a **[conversation manager](https://strandsagents.com/latest/user-guide/concepts/agents/context-management/)** to handle conversation history effectively. The default is the [SlidingWindowConversationManager](https://strandsagents.com/latest/api-reference/agent/#strands.agent.conversation_manager.sliding_window_conversation_manager.SlidingWindowConversationManager), which keeps recent messages and removes older ones when needed. This is the **default conversation manager** used by the Agent class.

Strands also supports [NullConversationManager](https://strandsagents.com/latest/api-reference/agent/#strands.agent.conversation_manager.null_conversation_manager.NullConversationManager), which is a simple implementation that does not modify the conversation history.

The third is [SummarizingConversationManager](https://strandsagents.com/latest/api-reference/agent/#strands.agent.conversation_manager.summarizing_conversation_manager.SummarizingConversationManager), which implements intelligent conversation context management by summarizing older messages instead of simply discarding them. This approach preserves important information while staying within context limits. By default, the `SummarizingConversationManager` uses the same model and configuration as the main agent to perform summarization.

Refer the [context-management documentation](https://strandsagents.com/latest/user-guide/concepts/agents/context-management/), for more details on these 3 options. We will explore an example of `SummarizingConversationManager` in the next cell.


##### **Configuration parameters:**
- **`summary_ratio` (float, default: 0.3):** Percentage of messages to summarize when reducing context (clamped between 0.1 and 0.8)
- **`preserve_recent_messages` (int, default: 10):** Minimum number of recent messages to always keep
- **`summarization_agent` (Agent, optional):** Custom agent for generating summaries. If not provided, uses the main agent instance. Cannot be used together with summarization_system_prompt.
- **`summarization_system_prompt` (str, optional):** Custom system prompt for summarization. If not provided, uses a default prompt that creates structured bullet-point summaries focusing on key topics, tools used, and technical information in third-person format. Cannot be used together with summarization_agent.


<div style="background-color: #f44336; color: white; padding: 15px; border-radius: 5px; margin: 10px 0;width: 85%;">
  <h2><strong> Work In Progress </strong><br></h2>
</div>

In [None]:
from strands.agent.conversation_manager import SummarizingConversationManager

# Create a cheaper, faster model for summarization tasks
summarization_model = BedrockModel(
    model_id="anthropic.claude-3-5-haiku-20241022-v1:0",  # More cost-effective for summarization
    max_tokens=100,
    params={"temperature": 0.1}  # Low temperature for consistent summaries
)

custom_summarization_agent = Agent(model=summarization_model)

# Custom system prompt for technical conversations
custom_system_prompt = """
You are summarizing a conversation. Create a concise summary that is under 200 words"""

conversation_manager = SummarizingConversationManager(
    summary_ratio=0.6,
    preserve_recent_messages=1,
    summarization_system_prompt=custom_system_prompt
)

agent = Agent(conversation_manager=conversation_manager)

In [None]:
agent("Tell me about Generative AI and responsible AI in detail")
print ("\n")
# Access the conversation history
print ("Initial Iteration :: ", agent.messages)
print ("\n")

agent("Tell me about Agentic AI in detail")
print ("\n")
print ("Second Iteration :: ", agent.messages)
print ("\n")

agent("Tell me about Amazon Bedrock in detail")
print ("\n")
print ("Next Iteration :: ", agent.messages)
print ("\n")

agent("what can you help me with ?")
print ("\n")
print("Final Iteration :: ", agent.messages)  # Shows all messages exchanged so far
print ("\n")

### **1.h Model Context Protocol (MCP) Tools**

The **[Model Context Protocol (MCP)](https://modelcontextprotocol.io/)** is an open protocol that standardizes how applications provide context to Large Language Models (LLMs). Strands Agents integrates with MCP to extend agent capabilities through external tools and services.

MCP enables communication between agents and MCP servers that provide additional tools. Strands includes built-in support for connecting to MCP servers and using their tools.

When working with MCP tools in Strands, all agent operations must be performed within the MCP client's context manager (using a with statement). This requirement ensures that the MCP session remains active and connected while the agent is using the tools. If you attempt to use an agent or its MCP tools outside of this context, you'll encounter errors because the MCP session will have closed.

#### **Streamable HTTP**

For HTTP-based MCP servers that use Streamable-HTTP Events transport

In [None]:
from mcp.client.streamable_http import streamablehttp_client
from strands import Agent
from strands.tools.mcp.mcp_client import MCPClient

streamable_http_mcp_client = MCPClient(lambda: streamablehttp_client("http://localhost:8000/mcp"))

# Create an agent with MCP tools
with streamable_http_mcp_client:
    # Get the tools from the MCP server
    tools = streamable_http_mcp_client.list_tools_sync()

    # Create an agent with these tools
    agent = Agent(tools=tools)

In [None]:
Upcoming Sections -

- Simple Swarm agent
- A2A
- Agent as a tool
- Agent Evaluations
- Deploy to Lambda




## Next Step
You have successfully completed Lab 0. Now let's move on to the next lab. Open `lab 1a\faq-agent` to continue.