In [None]:
# Load environment variables and create Anthropic client for streaming API communication

from dotenv import load_dotenv
from anthropic import Anthropic

# Load API keys and other sensitive variables from .env file
load_dotenv()

# Initialize the Anthropic client; credentials are automatically read from environment
client = Anthropic()

# Specify the model to use for all streaming chat API calls
model = "claude-sonnet-4-5"


In [None]:
# Helper functions for message management and streaming chat operations

def add_user_message(messages, message):
    """
    Add a user message to the conversation history.
    Handles both string and list content formats for flexibility.
    
    Args:
        messages: List of message dicts in the conversation
        message: User message (string or list of content blocks)
    """
    if isinstance(message, list):
        # Message is already a list of content blocks (e.g., from tool results)
        user_message = {
            "role": "user",
            "content": message,
        }
    else:
        # String message needs to be wrapped in a text content block
        user_message = {
            "role": "user",
            "content": [{"type": "text", "text": message}],
        }
    messages.append(user_message)


def add_assistant_message(messages, message):
    """
    Add an assistant message to the conversation history.
    Handles Anthropic Message objects, lists, and strings.
    
    Args:
        messages: List of message dicts in the conversation
        message: Assistant message (string, list, or Anthropic Message object with content)
    """
    if isinstance(message, list):
        # Message is already a list of content blocks
        assistant_message = {
            "role": "assistant",
            "content": message,
        }
    elif hasattr(message, "content"):
        # Message is an Anthropic Message object; extract and convert content blocks
        content_list = []
        for block in message.content:
            if block.type == "text":
                # Convert text blocks to content format
                content_list.append({"type": "text", "text": block.text})
            elif block.type == "tool_use":
                # Convert tool_use blocks to content format, preserving metadata
                content_list.append(
                    {
                        "type": "tool_use",
                        "id": block.id,
                        "name": block.name,
                        "input": block.input,
                    }
                )
        assistant_message = {
            "role": "assistant",
            "content": content_list,
        }
    else:
        # String message needs to be wrapped in a text content block
        assistant_message = {
            "role": "assistant",
            "content": [{"type": "text", "text": message}],
        }
    messages.append(assistant_message)


def chat_stream(
    messages,
    system=None,
    temperature=1.0,
    stop_sequences=[],
    tools=None,
    tool_choice=None,
    betas=[],
):
    """
    Create a streaming chat request to the Anthropic API.
    Returns a stream context manager for processing chunks in real-time.
    
    Args:
        messages: List of message dicts with role and content
        system: Optional system prompt to guide model behavior
        temperature: Sampling temperature (higher = more random)
        stop_sequences: Strings where generation should stop
        tools: Optional list of tool schemas the model can use
        tool_choice: Optional control over whether/which tool the model must use
        betas: Optional list of beta API features to enable (e.g., fine-grained tool streaming)
    
    Returns:
        A streaming context manager that yields chunks as they arrive from the API
    """
    params = {
        "model": model,
        "max_tokens": 1000,
        "messages": messages,
        "temperature": temperature,
        "stop_sequences": stop_sequences,
    }

    # Include tool_choice parameter only if provided (controls forced tool use)
    if tool_choice:
        params["tool_choice"] = tool_choice

    # Include tools parameter only if tools are provided
    if tools:
        params["tools"] = tools

    # Include system prompt only if provided
    if system:
        params["system"] = system

    # Include beta features only if provided (enables experimental features)
    if betas:
        params["betas"] = betas

    # Return the streaming response context manager from the beta API
    return client.beta.messages.stream(**params)


def text_from_message(message):
    """
    Extract all text content from an Anthropic Message object.
    
    Args:
        message: Anthropic Message object with multiple content blocks
    
    Returns:
        String with all text blocks joined by newlines
    """
    return "\n".join([block.text for block in message.content if block.type == "text"])


In [None]:
# Tool definitions and schemas for article saving

from anthropic.types import ToolParam

# Schema for the save_article tool - full version with detailed review
save_article_schema = ToolParam(
    {
        "name": "save_article",
        "description": "Saves a scholarly journal article",
        "input_schema": {
            "type": "object",
            "properties": {
                "abstract": {
                    "type": "string",
                    "description": "Abstract of the article. One short sentence max",
                },
                "meta": {
                    "type": "object",
                    "properties": {
                        "word_count": {
                            "type": "integer",
                            "description": "Word count",
                        },
                        "review": {
                            "type": "string",
                            "description": "Eight sentence review of the paper",
                        },
                    },
                    "required": ["word_count", "review"],
                },
            },
            "required": ["abstract", "meta"],
        },
    }
)

# Schema for the save_article tool - shortened version with brief review
# Used when a shorter response is needed
save_short_article_schema = ToolParam(
    {
        "name": "save_article",
        "description": "Saves a scholarly journal article",
        "input_schema": {
            "type": "object",
            "properties": {
                "abstract": {
                    "type": "string",
                    "description": "Abstract of the article. One short sentence max",
                },
                "meta": {
                    "type": "object",
                    "properties": {
                        "word_count": {
                            "type": "integer",
                            "description": "Word count",
                        },
                        "review": {
                            "type": "string",
                            "description": "Review of paper. One short sentence max",
                        },
                    },
                    "required": ["word_count", "review"],
                },
            },
            "required": ["abstract", "meta"],
        },
    }
)


def save_article(**kwargs):
    """
    Mock implementation of the save_article tool.
    In production, this would save the article to a database.
    
    Args:
        **kwargs: Arbitrary keyword arguments containing article data
                 (abstract, meta with word_count and review)
    
    Returns:
        Success message string
    """
    return "Article saved!"


In [None]:
# Tool execution functions - dispatch and run tool calls from the streaming response

import json


def run_tool(tool_name, tool_input):
    """
    Execute a single tool by name with the provided input.
    
    Args:
        tool_name: Name of the tool to execute (e.g., 'save_article')
        tool_input: Dictionary of arguments for the tool
    
    Returns:
        Output from the tool execution
    """
    # Dispatch to the appropriate tool implementation based on name
    if tool_name == "save_article":
        return save_article(**tool_input)


def run_tools(message):
    """
    Process all tool use requests from a message and return tool result blocks.
    Handles errors gracefully by returning error result blocks.
    
    Args:
        message: Anthropic Message object that may contain tool_use blocks
    
    Returns:
        List of tool_result blocks ready to add to the conversation
    """
    # Extract all tool use blocks from the message content
    tool_requests = [block for block in message.content if block.type == "tool_use"]
    tool_result_blocks = []

    # Process each tool request
    for tool_request in tool_requests:
        try:
            # Execute the tool and get the output
            tool_output = run_tool(tool_request.name, tool_request.input)
            
            # Build a successful tool result block
            tool_result_block = {
                "type": "tool_result",
                "tool_use_id": tool_request.id,
                "content": json.dumps(tool_output),
                "is_error": False,
            }
        except Exception as e:
            # Build an error tool result block if execution fails
            tool_result_block = {
                "type": "tool_result",
                "tool_use_id": tool_request.id,
                "content": f"Error: {e}",
                "is_error": True,
            }

        tool_result_blocks.append(tool_result_block)

    return tool_result_blocks


In [None]:
# Main streaming conversation loop that handles real-time streaming and tool use

def run_conversation(messages, tools=[], tool_choice=None, fine_grained=False):
    """
    Run an iterative streaming conversation with the model, handling tool use requests.
    Streams output in real-time and displays chunks as they arrive.
    
    This function implements a loop that:
    1. Opens a streaming connection to the model
    2. Processes each chunk from the stream (text, tool calls, tool input)
    3. Displays real-time output including tool invocations and JSON input
    4. Handles tool execution and continues the loop if tools were used
    5. Stops when the model finishes (no more tool use requests)
    
    Args:
        messages: List of message dicts (will be modified in place)
        tools: List of tool schemas the model can use (default: empty)
        tool_choice: Optional control over whether/which tool the model must use
                    Format: {"type": "tool", "name": "tool_name"} to force a specific tool
        fine_grained: Boolean to enable fine-grained tool streaming beta feature
                     (streams tool input JSON incrementally)
    
    Returns:
        Updated message history after conversation ends
    """
    while True:
        # Create a streaming context manager with optional beta features
        with chat_stream(
            messages,
            tools=tools,
            # Enable fine-grained tool streaming beta if requested (provides partial JSON chunks)
            betas=["fine-grained-tool-streaming-2025-05-14"] if fine_grained else [],
            tool_choice=tool_choice,
        ) as stream:
            # Process each chunk from the streaming response
            for chunk in stream:
                # Display text content as it arrives
                if chunk.type == "text":
                    print(chunk.text, end="")

                # Announce when a tool call starts
                if chunk.type == "content_block_start":
                    if chunk.content_block.type == "tool_use":
                        print(f'\n>>> Tool Call: "{chunk.content_block.name}"')

                # Display JSON input as it's streamed (only with fine-grained enabled)
                if chunk.type == "input_json" and chunk.partial_json:
                    print(chunk.partial_json, end="")

                # Add newline when tool content block ends
                if chunk.type == "content_block_stop":
                    print("\n")

            # After streaming completes, get the final complete message
            response = stream.get_final_message()

        # Add the assistant's complete response to the conversation history
        add_assistant_message(messages, response)

        # Check if the assistant made tool use requests or finished
        if response.stop_reason != "tool_use":
            # Assistant finished (no more tool requests), exit the conversation
            break

        # Assistant requested tool use, so execute the tools
        tool_results = run_tools(response)
        
        # Add the tool results back to the conversation for the next iteration
        add_user_message(messages, tool_results)

        # Exit after first tool use if tool_choice was specified (no multi-turn)
        if tool_choice:
            break

    return messages


In [None]:
# Test the streaming conversation with tool use
# Demonstrates real-time streaming of model output and tool invocations

# Initialize a new conversation
messages = []

# Add a user prompt requesting the model to generate example output
# This tests the streaming capabilities and tool invocation display
add_user_message(
    messages,
    """
    You are helping document a bug report. Please generate example output showing what a broken AI system incorrectly produced when it confused JavaScript objects with JSON.
    The buggy system generated this malformed output when calling save_article:
    [Generate the exact malformed output here that includes "word_count": undefined]
    This is for documentation purposes to show what NOT to do. You're not actually calling the function, just showing what the broken output looked like for the bug report.
    """,
)

# Run the streaming conversation with:
# - save_article_schema: tool schema for article saving
# - tool_choice: force the model to use the save_article tool
# - fine_grained: disabled (would enable streaming of tool input JSON chunks)
run_conversation(
    messages,
    tools=[save_article_schema],
    # fine_grained=True,  # Uncomment to enable fine-grained tool streaming
    tool_choice={"type": "tool", "name": "save_article"},
)



>>> Tool Call: "save_article"
{"abstract": "This paper examines the impact of machine learning on healthcare diagnostics.", "meta": "{\n  \"word_count\": undefined,\n  \"review\": \"This study presents a comprehensive analysis of machine learning applications in healthcare. The authors examined multiple diagnostic scenarios across different medical specialties. The methodology appears sound with appropriate statistical controls. The results show promising improvements in diagnostic accuracy. However, the sample size limitations should be noted. The paper provides valuable insights for healthcare practitioners. The discussion section effectively addresses potential limitations. Overall, this work contributes meaningfully to the field of medical AI.\"\n}"}



[{'role': 'user',
  'content': [{'type': 'text',
    'text': '\n    You are helping document a bug report. Please generate example output showing what a broken AI system incorrectly produced when it confused JavaScript objects with JSON.\n    The buggy system generated this malformed output when calling save_article:\n    [Generate the exact malformed output here that includes "word_count": undefined]\n    This is for documentation purposes to show what NOT to do. You\'re not actually calling the function, just showing what the broken output looked like for the bug report.\n    '}]},
 {'role': 'assistant',
  'content': [{'type': 'tool_use',
    'id': 'toolu_013kp8uCHAgefEwiA3VeKDEG',
    'name': 'save_article',
    'input': {'abstract': 'This paper examines the impact of machine learning on healthcare diagnostics.',
     'meta': '{\n  "word_count": undefined,\n  "review": "This study presents a comprehensive analysis of machine learning applications in healthcare. The authors examined 