# Lesson 5: Async Streaming, Executors, Multi-modal & MCP

Master async operations, streaming, tool execution strategies, multi-modal content, and external tool integration:

- ✅ Async tool definition with `async def` and `await`
- ✅ Streaming progress updates with `yield` in async tools
- ✅ Using `agent.stream_async()` for real-time responses
- ✅ ConcurrentToolExecutor for parallel tool execution (default)
- ✅ SequentialToolExecutor for ordered tool execution
- ✅ Multi-modal content (images, PDFs, documents)
- ✅ Model Context Protocol (MCP) for external tool integration
- ✅ Combining MCP tools with custom Python tools

**Estimated time:** 5-6 hours

**What you'll build:** Streaming tools, executor comparisons, multi-modal examples, and MCP integrations!

## Setup

Import necessary modules and configure the environment:

In [None]:
import asyncio
import time
from datetime import datetime

from strands import Agent, tool
from strands.tools.executors import ConcurrentToolExecutor, SequentialToolExecutor

from lesson_utils import (
    load_environment,
    create_working_model,
    check_api_keys,
    print_troubleshooting,
)

# Load environment and check API keys
load_environment()
check_api_keys()

print("🎯 Lesson 5: Async Streaming, Executors & Multi-modal")
print("=" * 60)

## Part 1: Async Tools with Streaming Progress

Async tools can `yield` intermediate results to provide real-time progress updates. Each yielded value becomes a streaming event that you can consume with `agent.stream_async()`.

**Reference:** [Python Tools - Tool Streaming](https://strandsagents.com/latest/documentation/docs/user-guide/concepts/tools/python-tools/#tool-streaming)

In [None]:
# Define async tools with yield for streaming progress

@tool
async def process_dataset(records: int) -> str:
    """Process records with progress updates."""
    start = datetime.now()

    for i in range(1, records + 1):
        await asyncio.sleep(0.1)  # Simulate processing time
        if i % 10 == 0:
            elapsed = (datetime.now() - start).total_seconds()
            yield f"Processed {i}/{records} records in {elapsed:.1f}s"

    total_time = (datetime.now() - start).total_seconds()
    yield f"✓ Completed {records} records in {total_time:.1f}s"


@tool
async def download_file(url: str, size_mb: int = 10) -> str:
    """Simulate downloading a file with progress updates."""
    chunks = 10
    chunk_size = size_mb / chunks

    yield f"Starting download: {url} ({size_mb}MB)"

    for i in range(1, chunks + 1):
        await asyncio.sleep(0.2)  # Simulate download time
        progress = (i / chunks) * 100
        downloaded = chunk_size * i
        yield f"Downloaded {downloaded:.1f}MB / {size_mb}MB ({progress:.0f}%)"

    yield f"✓ Download complete: {url}"


print("✅ Async streaming tools created!")

In [None]:
model = create_working_model()

if model:
    # Create agent with async streaming tools
    agent = Agent(
        model=model,
        tools=[process_dataset, download_file],
        system_prompt="You are a helpful assistant with data processing capabilities.",
    )

    print("Streaming progress from async tools...\n")

    async def demo_streaming():
        async for event in agent.stream_async("Process 30 records"):
            # Check for tool stream events (progress updates)
            if tool_stream := event.get("tool_stream_event"):
                if update := tool_stream.get("data"):
                    print(f"📊 Progress: {update}")

            # Check for final text response
            if "data" in event and not event.get("tool_stream_event"):
                print(f"🤖 Agent: {event['data']}", end="")

    await demo_streaming()
else:
    print_troubleshooting()

## Part 2: ConcurrentToolExecutor (Parallel Execution)

`ConcurrentToolExecutor` is the default executor. It executes multiple tools in parallel when the LLM requests multiple tools in a single response.

**Reference:** [Tool Executors](https://strandsagents.com/latest/documentation/docs/user-guide/concepts/tools/executors/)

In [None]:
# Define tools that simulate API calls

@tool
async def get_weather(city: str) -> str:
    """Get weather forecast for a city."""
    await asyncio.sleep(1.0)  # Simulate API call
    return f"Weather in {city}: Sunny, 72°F"


@tool
async def get_time(city: str) -> str:
    """Get current time in a city."""
    await asyncio.sleep(1.0)  # Simulate API call
    return f"Time in {city}: 2:30 PM"


@tool
async def get_population(city: str) -> str:
    """Get population of a city."""
    await asyncio.sleep(1.0)  # Simulate database query
    return f"Population of {city}: ~1.5 million"


print("✅ API simulation tools created!")

In [None]:
model = create_working_model()

if model:
    # ConcurrentToolExecutor is the default
    agent = Agent(
        model=model,
        tool_executor=ConcurrentToolExecutor(),
        tools=[get_weather, get_time, get_population],
        system_prompt="Use tools to answer user questions about cities.",
    )

    print("Testing ConcurrentToolExecutor (tools run in parallel)...\n")

    start_time = time.time()
    response = await agent.invoke_async(
        "What's the weather, time, and population in Seattle?"
    )
    elapsed = time.time() - start_time

    print(f"\n🤖 Agent: {response}")
    print(f"\n⚡ Total time: {elapsed:.2f}s")
    print("💡 With concurrent execution, 3 tools (each taking 1s) complete in ~1s!")
else:
    print_troubleshooting()

## Part 3: SequentialToolExecutor (Ordered Execution)

`SequentialToolExecutor` executes tools one after another in the order specified by the LLM. This is useful for dependent operations where one tool's output is needed by the next.

In [None]:
# Define tools for dependent operations

@tool
async def take_screenshot(filename: str) -> str:
    """Take a screenshot and save to file."""
    await asyncio.sleep(0.5)  # Simulate screenshot capture
    return f"Screenshot saved to {filename}"


@tool
async def compress_file(filename: str) -> str:
    """Compress a file to save space."""
    await asyncio.sleep(0.5)  # Simulate compression
    compressed = filename.replace(".png", ".zip")
    return f"Compressed {filename} to {compressed}"


@tool
async def send_email(recipient: str, attachment: str) -> str:
    """Send an email with an attachment."""
    await asyncio.sleep(0.5)  # Simulate email sending
    return f"Email sent to {recipient} with attachment: {attachment}"


print("✅ Workflow tools created!")

In [None]:
model = create_working_model()

if model:
    # Use SequentialToolExecutor for dependent operations
    agent = Agent(
        model=model,
        tool_executor=SequentialToolExecutor(),
        tools=[take_screenshot, compress_file, send_email],
        system_prompt="Execute tasks in the correct order for dependent operations.",
    )

    print("Testing SequentialToolExecutor (tools run in order)...")
    print("Task: Take screenshot → Compress → Email\n")

    start_time = time.time()
    response = await agent.invoke_async(
        "Take a screenshot named report.png, compress it, "
        "then email the compressed file to boss@company.com"
    )
    elapsed = time.time() - start_time

    print(f"\n🤖 Agent: {response}")
    print(f"\n⏱️  Total time: {elapsed:.2f}s")
    print("💡 Operations executed in the correct order!")
else:
    print_troubleshooting()

## Part 4: Performance Comparison

Let's compare the performance difference between concurrent and sequential execution.

In [None]:
@tool
async def fetch_data(source: str) -> str:
    """Fetch data from a source (simulated)."""
    await asyncio.sleep(1.0)  # Simulate network delay
    return f"Data from {source}: [sample data]"


model = create_working_model()

if model:
    # Test with ConcurrentToolExecutor
    print("1. Testing ConcurrentToolExecutor:")
    concurrent_agent = Agent(
        model=model,
        tool_executor=ConcurrentToolExecutor(),
        tools=[fetch_data],
        system_prompt="Fetch data from multiple sources.",
    )

    start = time.time()
    await concurrent_agent.invoke_async("Fetch data from API-A, API-B, and API-C")
    concurrent_time = time.time() - start
    print(f"   ⚡ Concurrent time: {concurrent_time:.2f}s")

    # Test with SequentialToolExecutor
    print("\n2. Testing SequentialToolExecutor:")
    sequential_agent = Agent(
        model=model,
        tool_executor=SequentialToolExecutor(),
        tools=[fetch_data],
        system_prompt="Fetch data from multiple sources.",
    )

    start = time.time()
    await sequential_agent.invoke_async("Fetch data from API-A, API-B, and API-C")
    sequential_time = time.time() - start
    print(f"   ⏱️  Sequential time: {sequential_time:.2f}s")

    # Show comparison
    print("\n3. Performance Summary:")
    print(f"   Concurrent: {concurrent_time:.2f}s")
    print(f"   Sequential: {sequential_time:.2f}s")
    speedup = sequential_time / concurrent_time if concurrent_time > 0 else 1
    print(f"   Speedup: {speedup:.2f}x faster with concurrent execution!")
else:
    print_troubleshooting()

## Part 5: Multi-modal Content

Agents can process multi-modal content including images and PDFs. This enables visual understanding and document analysis.

**Reference:** [Multi-modal Example](https://strandsagents.com/latest/documentation/docs/examples/python/multimodal/)

In [None]:
# Multi-modal works with any vision model (OpenAI, Anthropic, etc.)
# We pass images directly in messages using Bedrock Converse format

import os

# Create sample receipt
def create_sample_receipt():
    from PIL import Image, ImageDraw, ImageFont
    
    width, height = 400, 600
    img = Image.new('RGB', (width, height), color='white')
    draw = ImageDraw.Draw(img)
    
    try:
        font_large = ImageFont.truetype("Arial.ttf", 24)
        font_medium = ImageFont.truetype("Arial.ttf", 18)
        font_small = ImageFont.truetype("Arial.ttf", 14)
    except IOError:
        font_large = ImageFont.load_default()
        font_medium = ImageFont.load_default()
        font_small = ImageFont.load_default()
    
    # Draw receipt (simplified)
    y = 20
    draw.text((width//2-70, y), "ACME GROCERY", fill='black', font=font_large)
    y += 40
    draw.text((width//2-60, y), "123 Main St", fill='black', font=font_small)
    y += 40
    draw.line([(20, y), (width-20, y)], fill='black', width=2)
    y += 20
    draw.text((20, y), "Receipt #: 2025-001234", fill='black', font=font_small)
    y += 25
    draw.text((20, y), "Date: Oct 11, 2025", fill='black', font=font_small)
    y += 40
    
    items = [
        ("Apples (2 lbs)", "$5.99"),
        ("Bread", "$3.49"),
        ("Eggs (12)", "$4.99"),
        ("Milk", "$4.29"),
        ("Salmon", "$12.99"),
        ("Greens", "$3.49"),
    ]
    for item, price in items:
        draw.text((20, y), item, fill='black', font=font_small)
        draw.text((width-80, y), price, fill='black', font=font_small)
        y += 22
    
    y += 10
    draw.line([(20, y), (width-20, y)], fill='black', width=1)
    y += 20
    draw.text((20, y), "Subtotal:", fill='black', font=font_medium)
    draw.text((width-85, y), "$35.24", fill='black', font=font_medium)
    y += 25
    draw.text((20, y), "Tax:", fill='black', font=font_medium)
    draw.text((width-85, y), "$3.00", fill='black', font=font_medium)
    y += 25
    draw.line([(20, y), (width-20, y)], fill='black', width=2)
    y += 20
    draw.text((20, y), "TOTAL:", fill='black', font=font_large)
    draw.text((width-90, y), "$38.24", fill='black', font=font_large)
    
    filename = 'sample_receipt.png'
    img.save(filename)
    return filename

print("📸 Creating sample receipt...")
receipt_path = create_sample_receipt()
print(f"✓ Created: {receipt_path}\n")

# Read image as bytes
with open(receipt_path, 'rb') as f:
    image_bytes = f.read()

model = create_working_model()

if model:
    print("Using provider-agnostic multi-modal approach\n")
    
    # Demo 1: Document Analyzer (works with any vision model)
    print("1. Document Analyzer:")
    analyzer = Agent(
        model=model,
        system_prompt="Analyze images and extract all visible text and key information."
    )
    
    # Pass image directly in message (Bedrock Converse format)
    message = [
        {"text": "Analyze this receipt image and extract key information:"},
        {"image": {"format": "png", "source": {"bytes": image_bytes}}}
    ]
    
    response = await analyzer.invoke_async(message)
    print(f"📄 Analysis:\n{response}\n")
    
    # Demo 2: Receipt Extractor
    print("2. Receipt Extractor:")
    extractor = Agent(
        model=model,
        system_prompt="Extract financial data: store, date, line items, totals."
    )
    
    message = [
        {"text": "Extract all financial data from this receipt:"},
        {"image": {"format": "png", "source": {"bytes": image_bytes}}}
    ]
    
    response = await extractor.invoke_async(message)
    print(f"💰 Data:\n{response}\n")
    
    # Cleanup
    os.remove(receipt_path)
    
    print("💡 Key Takeaways:")
    print("✓ Images passed directly in messages (provider-agnostic)")
    print("✓ Works with OpenAI, Anthropic, and other vision models")
    print("✓ Bedrock Converse format: {'image': {'format': 'png', ...}}")
else:
    print_troubleshooting()

In [None]:
# Demo 3: PDF Document Processing
print("\n3. PDF Document Processing:")

try:
    from reportlab.lib.pagesizes import letter
    from reportlab.pdfgen import canvas
    from reportlab.lib.units import inch
    
    # Create sample PDF
    pdf_path = "sample_report.pdf"
    c = canvas.Canvas(pdf_path, pagesize=letter)
    width, height = letter
    
    # Add title
    c.setFont("Helvetica-Bold", 16)
    c.drawString(1 * inch, height - 1 * inch, "Quarterly Sales Report - Q4 2024")
    
    # Add content
    c.setFont("Helvetica", 12)
    y = height - 1.5 * inch
    lines = [
        "Executive Summary:",
        "",
        "• Total Revenue: $2,450,000",
        "• Growth Rate: +23% YoY",
        "• Top Product: Enterprise Platform (45% of sales)",
        "• Customer Acquisition: 150 new clients",
        "• Customer Retention: 94%",
        "",
        "Regional Performance:",
        "• North America: $1,200,000 (49%)",
        "• Europe: $800,000 (33%)",
        "• Asia Pacific: $450,000 (18%)",
        "",
        "Key Initiatives for Q1 2025:",
        "1. Launch mobile application",
        "2. Expand European operations",
        "3. Enhance customer support infrastructure",
    ]
    
    for line in lines:
        c.drawString(1 * inch, y, line)
        y -= 0.25 * inch
    
    c.save()
    print(f"✓ Created PDF: {pdf_path}\n")
    
    # Read PDF as bytes
    with open(pdf_path, 'rb') as f:
        pdf_bytes = f.read()
    
    # Analyze PDF with agent
    pdf_analyzer = Agent(
        model=model,
        system_prompt="You are a business document analyst. "
        "Analyze PDF reports and summarize key metrics and insights."
    )
    
    # Pass PDF in message
    message = [
        {"text": "Analyze this quarterly report and summarize the key findings:"},
        {"document": {"format": "pdf", "name": pdf_path, "source": {"bytes": pdf_bytes}}},
    ]
    
    response = await pdf_analyzer.invoke_async(message)
    print(f"📊 PDF Analysis:\n{response}\n")
    
    # Cleanup
    os.remove(pdf_path)
    print("✓ PDF cleaned up")
    
except ImportError:
    print("⚠️  reportlab not installed - skipping PDF example")
    print("   Install with: uv sync --dev")
    print("   (reportlab is in optional dev dependencies)")
    
print("\n💡 Multi-modal Key Takeaways:")
print("✓ Images: {'image': {'format': 'png', 'source': {'bytes': ...}}}")
print("✓ PDFs: {'document': {'format': 'pdf', 'source': {'bytes': ...}}}")
print("✓ Works with OpenAI, Anthropic, and other vision models")
print("✓ Provider-agnostic Bedrock Converse format")
print("✓ Enables OCR, document analysis, visual QA")

## Part 6: Model Context Protocol (MCP) Integration

The Model Context Protocol (MCP) enables agents to use external tools provided by MCP servers. This extends agent capabilities beyond custom Python tools to include documentation servers, APIs, and other external services.

**Important:** All MCP operations must be inside a context manager (`with mcp_client:`).

**Reference:** [MCP Tools](https://strandsagents.com/latest/documentation/docs/user-guide/concepts/tools/mcp-tools/)

In [None]:
# Example A: AWS Documentation MCP Server (Optional)
# MCP enables access to external documentation and APIs

try:
    from mcp import stdio_client, StdioServerParameters
    from strands.tools.mcp import MCPClient
    
    print("Example A: AWS Documentation MCP Server")
    print("-" * 70)
    
    # Create MCP client for AWS docs (stdio transport)
    aws_docs_client = MCPClient(
        lambda: stdio_client(
            StdioServerParameters(
                command="uvx",
                args=["awslabs.aws-documentation-mcp-server@latest"]
            )
        )
    )
    
    print("Connecting to AWS documentation server via stdio...")
    with aws_docs_client:
        # Discover available MCP tools
        mcp_tools = aws_docs_client.list_tools_sync()
        print(f"✓ Connected! Found {len(mcp_tools)} AWS documentation tools\n")
        
        # Create agent with MCP tools
        model = create_working_model()
        agent = Agent(
            model=model,
            tools=mcp_tools,
            system_prompt="You help with AWS services. "
            "Use documentation tools for accurate information.",
        )
        
        # Agent will automatically select and use MCP tools
        response = agent("What is AWS Lambda and when should I use it?")
        print(f"🤖 Agent: {response}\n")

except ImportError as e:
    print(f"⚠️  MCP packages not available: {e}")
    print("   Install with: uv sync")
except Exception as e:
    print(f"⚠️  AWS MCP server not available: {e}")
    print("   Note: This is optional - MCP servers require external packages")
    print("   Install with: uvx install awslabs.aws-documentation-mcp-server")
    print("   Continuing without MCP example...")

In [None]:
# Example B: Combining MCP with Custom Async Tools
# Demonstrates hybrid agents with both MCP and Python tools

try:
    print("\nExample B: Hybrid Agent (MCP + Custom Async Tools)")
    print("-" * 70)
    
    # Create MCP client
    aws_docs_client = MCPClient(
        lambda: stdio_client(
            StdioServerParameters(
                command="uvx",
                args=["awslabs.aws-documentation-mcp-server@latest"]
            )
        )
    )
    
    # Define custom async tool
    @tool
    async def process_records(count: int) -> str:
        """Process records with progress."""
        await asyncio.sleep(0.5)
        return f"Processed {count} records"
    
    print("Combining external MCP tools with custom async tools...")
    with aws_docs_client:
        mcp_tools = aws_docs_client.list_tools_sync()
        
        # Combine MCP tools + custom async tools
        all_tools = mcp_tools + [process_records]
        
        model = create_working_model()
        agent = Agent(
            model=model,
            tools=all_tools,
            system_prompt="You can both lookup AWS documentation and "
            "process records. Use the right tool for each task.",
        )
        
        print(f"✓ Agent has {len(all_tools)} tools "
              f"({len(mcp_tools)} MCP + 1 custom)\n")
        
        response = agent("Look up AWS DynamoDB, then process 50 records")
        print(f"🤖 Agent: {response}\n")
        
    print("💡 MCP Key Concepts:")
    print("   1. Context managers required: with mcp_client:")
    print("   2. Tools discovered via list_tools_sync()")
    print("   3. MCP tools work alongside custom Python tools")
    print("   4. stdio transport runs servers as subprocesses (uvx)")
    print("   5. Multiple servers can be combined together")
    print("   6. SSE and HTTP transports also available")

except Exception as e:
    print(f"⚠️  Combined tools demo skipped: {e}")
    print("   MCP is optional - lesson continues without it")

## Experiments

Now it's your turn! Try these experiments:

### Exercises:
1. **Large dataset processing** - Process 100 records and watch the streaming progress
2. **Variable delays** - Create tools with different delays (0.5s, 1s, 2s) and compare executors
3. **Data pipeline** - Build a multi-step pipeline with dependent operations
4. **Parallel API calls** - Simulate calling 5 different APIs concurrently
5. **Real images** - Use different receipt/invoice images with the multi-modal approach
6. **Multiple images** - Pass multiple images in one message for comparison
7. **Document classification** - Build a classifier that categorizes document types
8. **AWS Documentation MCP** (Optional) - Integrate AWS MCP server for AWS technical queries
9. **Hybrid Agent** - Build an agent combining MCP tools, async tools, and multi-modal content
10. **MCP Performance** - Compare MCP tools with equivalent custom Python implementations

### Challenge:
Build a document processing pipeline that:
1. Downloads multiple files concurrently
2. Processes them sequentially with progress updates
3. Optionally uses MCP tools to enrich data
4. Generates a summary report

Use the cell below for your experiments:

In [None]:
# Your experiments here!


## ✅ Success Criteria

You've completed Lesson 5 if:

- ✅ Async tools stream progress in real-time via `yield`
- ✅ ConcurrentToolExecutor runs tools in parallel
- ✅ SequentialToolExecutor runs tools in order
- ✅ Async tools show measurable speedup vs sequential
- ✅ Stream events are properly formatted and received
- ✅ Agent processes images (PNG, JPEG) correctly
- ✅ Agent processes PDF documents correctly
- ✅ Multi-modal agents invoked with real receipt image and PDF
- ✅ Document analyzer and receipt extractor demonstrated
- ✅ MCP concepts understood (optional: run MCP examples if available)

## 💡 Key Concepts Learned

- **Async Tools** - Use `async def` with `yield` for streaming progress
- **Streaming** - `agent.stream_async()` for real-time event consumption
- **ConcurrentToolExecutor** - Parallel execution for independent operations (1.4-1.5x speedup)
- **SequentialToolExecutor** - Ordered execution for dependent operations
- **Multi-modal** - Process images and PDFs directly in messages with Bedrock Converse format
- **MCP Integration** - External tool integration via Model Context Protocol (optional)
- **Context Managers** - Required pattern for MCP operations (`with mcp_client:`)
- **Hybrid Agents** - Combining MCP tools with custom Python tools

## Next Steps

- **Lesson 6**: Hooks & Structured Output - Lifecycle hooks and Pydantic models
- **Lesson 7**: Advanced Tools & Context - Class-based tools, ToolContext, conversation management

Ready to continue? Open `lesson_06_hooks_structured.ipynb`!