In [1]:
import os
os.environ["AWS_PROFILE"]="isengard"

In [2]:
import uuid
import asyncio
from typing import Optional, List, Dict, Any
import json
import sys


from agent_squad.orchestrator import AgentSquad, AgentSquadConfig
from agent_squad.agents import (BedrockLLMAgent,
                        BedrockLLMAgentOptions,
                        AgentResponse,
                        AnthropicAgent, AnthropicAgentOptions,
                        AgentCallbacks)
from agent_squad.types import ConversationMessage, ParticipantRole
from agent_squad.classifiers import BedrockClassifier, BedrockClassifierOptions


class LLMAgentCallbacks(AgentCallbacks):
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        # handle response streaming here
        print("token printing\n")
        print(token, end='', flush=True)

In [3]:
# Initialize the orchestrator with some options
orchestrator = AgentSquad(
    options=AgentSquadConfig(
        LOG_AGENT_CHAT=True,
        LOG_CLASSIFIER_CHAT=True,
        LOG_CLASSIFIER_RAW_OUTPUT=True,
        LOG_CLASSIFIER_OUTPUT=True,
        LOG_EXECUTION_TIMES=True,
        MAX_RETRIES=3,
        USE_DEFAULT_AGENT_IF_NONE_IDENTIFIED=True,
        MAX_MESSAGE_PAIRS_PER_AGENT=10,
    )
)

# Add some agents
tech_agent = BedrockLLMAgent(
    BedrockLLMAgentOptions(
        name="Tech Agent",
        streaming=True,
        description="Specializes in technology areas including software development, hardware, AI, \
        cybersecurity, blockchain, cloud computing, emerging tech innovations, and pricing/costs \
        related to technology products and services.",
        model_id="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
        callbacks=LLMAgentCallbacks(),
        inference_config={"maxTokens": 2500, "temperature": 1},
        reasoning_config={"thinking": {"type": "enabled", "budget_tokens": 2000}},
    )
)
orchestrator.add_agent(tech_agent)


In [4]:
import asyncio


async def handle_request(_orchestrator: AgentSquad, _user_input: str, _user_id: str, _session_id: str):
    response: AgentResponse = await _orchestrator.route_request(_user_input, _user_id, _session_id)

    # Print metadata
    print("\nMetadata:")
    print(f"Selected Agent: {response.metadata.agent_name}")
    if isinstance(response, AgentResponse) and response.streaming is False:
        # Handle regular response
        if isinstance(response.output, str):
            print(response.output)
        elif isinstance(response.output, ConversationMessage):
            print(response.output.content[0].get("text"))

In [5]:
import uuid
import asyncio
import nest_asyncio

USER_ID = "user123"
SESSION_ID = str(uuid.uuid4())

nest_asyncio.apply()

user_input = "what is blockchain"


# Run the async function
loop = asyncio.get_event_loop()
response = loop.run_until_complete(handle_request(orchestrator, user_input, USER_ID, SESSION_ID))
print(f"\nResponse:\n{response}")


INFO:agent_squad.utils.logger:
** CLASSIFIED INTENT **
INFO:agent_squad.utils.logger:> Text: what is blockchain
INFO:agent_squad.utils.logger:> Selected Agent: Tech Agent
INFO:agent_squad.utils.logger:> Confidence: 0.95
INFO:agent_squad.utils.logger:
INFO:agent_squad.utils.logger:
** AGENT TECH-AGENT CHAT HISTORY **
INFO:agent_squad.utils.logger:> - None -
INFO:agent_squad.utils.logger:


token printing

#

ERROR:agent_squad.utils.logger:Error getting stream from Bedrock model: object NoneType can't be used in 'await' expression
ERROR:agent_squad.utils.logger:Error during agent processing: object NoneType can't be used in 'await' expression
INFO:agent_squad.utils.logger:
** EXECUTION TIMES **
INFO:agent_squad.utils.logger:> Classifying user intent: 2.3845510482788086s
INFO:agent_squad.utils.logger:> Agent Tech Agent | Processing request: 5.412101745605469e-05s
INFO:agent_squad.utils.logger:



Metadata:
Selected Agent: No Agent
object NoneType can't be used in 'await' expression

Response:
None
