## Streaming Example

In [1]:
import os
import asyncio
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Import Maxim components and instrument once at the top
from maxim import Maxim
from maxim.logger.pydantic_ai import instrument_pydantic_ai

### Setup Maxim Logger

In [2]:
# Set up Maxim logger
api_key = os.getenv("MAXIM_API_KEY")
repo_id = os.getenv("MAXIM_LOG_REPO_ID")

if not api_key:
    print("Please set MAXIM_API_KEY environment variable")
    exit(1)

if not repo_id:
    print("Please set MAXIM_LOG_REPO_ID environment variable")
    exit(1)

maxim = Maxim()
maxim_logger = maxim.logger()


[32m[MaximSDK] Initializing Maxim AI(v3.10.8)[0m


### Instrumentation by Maxim for Pydantic AI

In [3]:
# Instrument Pydantic AI once at the top
print("Initializing Maxim instrumentation for Pydantic AI...")
instrument_pydantic_ai(maxim_logger, debug=True)
print("Instrumentation complete!")

# Import Pydantic AI components
try:
    from pydantic_ai import Agent, RunContext
    PYDANTIC_AI_AVAILABLE = True
except ImportError:
    print("Pydantic AI not available. Please install it first:")
    print("pip install pydantic-ai")
    exit(1)


Initializing Maxim instrumentation for Pydantic AI...
Instrumentation complete!


### Create Agents

In [4]:
def create_streaming_agent():
    """Create a Pydantic AI agent with streaming capabilities."""
    agent = Agent(
        model="openai:gpt-4o-mini",
        name="Streaming Agent",
        instructions="You are a helpful assistant that can provide detailed explanations and perform calculations."
    )
    
    @agent.tool
    def add_numbers(ctx: RunContext, a: float, b: float) -> float:
        """Add two numbers together."""
        print(f"[Tool] Adding {a} + {b}")
        return a + b
    
    @agent.tool
    def multiply_numbers(ctx: RunContext, a: float, b: float) -> float:
        """Multiply two numbers together."""
        print(f"[Tool] Multiplying {a} * {b}")
        return a * b
    
    @agent.tool
    def explain_concept(ctx: RunContext, topic: str) -> str:
        """Provide a brief explanation of a concept."""
        print(f"[Tool] Explaining concept: {topic}")
        return f"Here's a brief explanation of {topic}: It's an important concept in its field."
    
    return agent



### Run the Agents

In [11]:

async def run_streaming_example():
    """Run the streaming agent example."""
    print("=== Streaming Agent Example ===")
    
    # Create the agent
    agent = create_streaming_agent()
    
    # Use streaming mode for detailed explanations
    print("Running streaming explanation...")
    async with agent.run_stream("Explain what is 2 + 2 in detail and then calculate 5 * 6") as stream:
        print("Streaming in progress...")
        # The stream will complete automatically when the context manager exits
        print("Streaming completed")
    
    # Run another streaming example
    print("Running streaming concept explanation...")
    async with agent.run_stream("Explain the concept of machine learning and then add 10 + 15") as stream:
        print("Streaming concept explanation in progress...")
        print("Streaming concept explanation completed")
    
    # Run a simple synchronous example for comparison
    print("Running synchronous calculation for comparison...")
    result = agent.run("What is 7 * 8?")
    print(f"Synchronous Result: {result}")
    
    print("Streaming agent example completed!")


In [12]:
def run_sync_example():
    """Run a synchronous example for comparison."""
    print("=== Synchronous Agent Example (for comparison) ===")
    
    agent = create_streaming_agent()
    
    # Run multiple synchronous operations
    print("Running first calculation...")
    result = agent.run("What is 12 + 18?")
    print(f"Result: {result}")
    
    print("Running second calculation...")
    result = agent.run("Calculate 6 * 9")
    print(f"Result: {result}")
    
    print("Synchronous example completed!")



In [None]:
async def main():
    """Main function to run streaming examples."""
    # Run streaming example
    await run_streaming_example()
    
    # Run synchronous example for comparison
    run_sync_example()  # Remove await since this is a synchronous function


# In Jupyter notebooks, we need to use await directly instead of asyncio.run()
# since the notebook already has a running event loop
await main()