In [50]:
from typing import List, Dict, AsyncGenerator, Optional
import anthropic
from anthropic.types import Message, MessageParam, ToolUseBlock

from dotenv import load_dotenv

load_dotenv()
# Define a tool that takes in a task description and returns a task plan
from standardbackend.tools.base import Tool
from pydantic import BaseModel
from standardbackend.tools.cache import ToolCache

class TaskPlan(BaseModel):
    task_description: str
    task_plan: str

tools = [
    Tool(
        name="generate_task_plan",
        description="Generate a detailed execution plan for a task",
        input_schema=TaskPlan,
        execute=lambda x: "Success! You can print the results as-is",
    )
]

from dataclasses import dataclass
from typing import Union

@dataclass
class AsyncGeneratorResult:
    """Represents a result from the async generator stream"""
    type: str  # Can be 'text' or 'tool_use'
    content: Union[str, dict]  # Either text content or tool result dictionary

class Claude:
    def __init__(self, api_key: Optional[str] = None, model: str = "claude-3-sonnet-20240229", tools: List[Tool] = tools):
        self.client = anthropic.AsyncAnthropic(api_key=api_key) if api_key else anthropic.AsyncAnthropic()
        self.model = model
        self.tools = tools
        self.tool_cache = ToolCache(tools)

    async def stream_chat(
        self,
        messages: List[Dict[str, str]],
        max_tokens: int = 4096,
        temperature: float = 0.7,
    ) -> AsyncGenerator[str, None]:
        """
        Handles streaming events from the Anthropic API
        
        We return a bunch of str events
        """
        
        def handle_streaming_tool_use(block: ToolUseBlock):
            # Schedule and execute tool
            self.tool_cache.request_execution(block.id, block.name, block.input)
            # result = self.tool_cache.get(block.id)
            return AsyncGeneratorResult(type="tool_use", content=block)

        def handle_streaming_text_end(text: str):
            pass

        def handle_streaming_text_partial(text_delta: str):
            return AsyncGeneratorResult(type="text", content=text_delta)

        def handle_streaming_tool_use_partial(partial_json):
            """We don't use partial json. But in some cases, like UI generation, it can be useful"""
            pass 

        def unhandled_event(event):
            """Useful for debugging

            Example:

            if event.type not in ['text', 'input_json', 'message_stop']:
                print("---> unhandled", event.type, event)
            """
            pass

        def process_event(event):
            if event.type == 'message_start':
                pass
            elif event.type == 'content_block_start':
                # This just tells us what kind of content block we're dealing with
                # RawContentBlockStartEvent(content_block=TextBlock(text='', type='text'), index=0, type='content_block_start')
                # RawContentBlockStartEvent(content_block=ToolUseBlock(id='toolu_01S2mZDhjnHMmH1QbMkyRgw4', input={}, name='generate_task_plan', type='tool_use'), index=1, type='content_block_start')
                pass
            elif event.type == 'content_block_delta':
                delta = event.delta
                if delta.type == 'text_delta':
                    return handle_streaming_text_partial(delta.text)
                elif delta.type == 'input_json_delta':
                    return handle_streaming_tool_use_partial(delta.partial_json)
                else:
                    unhandled_event(event)
            elif event.type == 'content_block_stop':
                content_block = event.content_block
                if content_block.type == 'text':
                    return handle_streaming_text_end(content_block.text)
                elif content_block.type == 'tool_use':
                    return handle_streaming_tool_use(content_block)
                else:
                    unhandled_event(event)
            else:
                unhandled_event(event)
            
        async with self.client.messages.stream(
            max_tokens=max_tokens,
            messages=messages,
            model=self.model,
            temperature=temperature,
            tools=self.tool_cache.tool_specs,
        ) as stream:
            async for event in stream:
                processed_event = process_event(event)
                if processed_event:
                    yield processed_event
                else:
                    continue



messages = [
    {"role": "user", "content": "call the tool generate_task_plan with the task description 'write a blog post about the benefits of using tools in AI' and print the results"}
]

claude = Claude()
async for event in claude.stream_chat(messages):
    print(event)

AsyncGeneratorResult(type='text', content='Here is how')
AsyncGeneratorResult(type='text', content=' to call the generate')
AsyncGeneratorResult(type='text', content='_task_plan')
AsyncGeneratorResult(type='text', content=' tool with the given')
AsyncGeneratorResult(type='text', content=' task description:')
AsyncGeneratorResult(type='tool_use', content=ToolUseBlock(id='toolu_01BnKWiSr4EpHvQnpvMnaEJK', input={'task_description': 'write a blog post about the benefits of using tools in AI', 'task_plan': 'Task: Write a blog post about the benefits of using tools in AI\n\nSteps:\n1. Research and gather information on the key benefits of using tools in AI applications and workflows. Some potential benefits to cover:\n    - Increased efficiency and productivity \n    - Access to specialized capabilities\n    - Ability to automate repetitive tasks\n    - Integration of multiple AI components\n    - Scalability and flexibility\n2. Outline the main sections and key points to cover in the blog p