In [1]:
import re
from dataclasses import dataclass
from typing import Dict, List

from autogen_core import (
    DefaultTopicId,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TypeSubscription,
    default_subscription,
    message_handler,
)
from autogen_core.models import (
    ChatCompletionClient,
    LLMMessage,
    SystemMessage,
)
from autogen_ext.models.openai import OpenAIChatCompletionClient

model_client = chat_client = OpenAIChatCompletionClient(model="gpt-4o-mini")

In [2]:
@dataclass
class Question:
    content: str


@dataclass
class Answer:
    content: str


@dataclass
class SolverRequest:
    content: str
    question: str


@dataclass
class IntermediateSolverResponse:
    content: str
    question: str
    answer: str
    round: int


@dataclass
class FinalSolverResponse:
    answer: str

In [3]:
def financialReportRagTool(company: str) -> str:
    return f"Financial report for {company}"

def getAnnualisedReturnTool(company: str) -> float:
    '''Get the annualised return for a company'''
    return 0.1

def getAnnualisedVolatilityTool(company: str) -> float:
    '''Get the annualised volatility for a company'''
    return 0.2

def getVolumeTool(company: str) -> int:
    '''Get the volume for a company'''
    return 1000000

def getNewsBodyTool(company: str) -> str:
    '''Get the news body for a company'''
    return "we are happy with the company"


from autogen_core.tools import FunctionTool

documentTool = FunctionTool(
    financialReportRagTool,
    description="Useful to get the financial reports of a company"
)

annualisedReturnTool = FunctionTool(
    getAnnualisedReturnTool,
    description="Get the annualised return for a company"
)

annualisedVolatilityTool = FunctionTool(
    getAnnualisedVolatilityTool,
    description="Get the annualised volatility for a company"
)

volumeTool = FunctionTool(
    getVolumeTool,
    description="Get the trading volume for a company"
)

newsBodyTool = FunctionTool(
    getNewsBodyTool,
    description="Get the latest news body for a company"
)

In [4]:
from typing import Callable
from autogen_core.models import UserMessage, AssistantMessage

@default_subscription
class SpecialistAgent(RoutedAgent):
    def __init__(
        self, 
        model_client: ChatCompletionClient, 
        role: str, 
        goal: str, 
        tools: List[Callable], 
        topic_type: str, 
        num_neighbors: int, 
        max_round: int
    ) -> None:
        super().__init__(role)
        self._role = role
        self._goal = goal
        self._tools = tools
        self._topic_type = topic_type
        self._model_client = model_client
        self._num_neighbors = num_neighbors
        self._history: List[LLMMessage] = []
        self._buffer: Dict[int, List[IntermediateSolverResponse]] = {}
        self._system_messages = [
            SystemMessage(
                content=(
                    f"You are {self._role}. Your goal is: {self._goal}. "
                    "You have access to specific tools. "
                )
            )
        ]
        self._round = 0
        self._max_round = max_round
        
    @message_handler
    async def handle_request(self, message: SolverRequest, ctx: MessageContext) -> None:
        print(f"{self._role} received request:\n{message.content}")
        self._history.append(UserMessage(content=message.content, source="user"))
        model_result = await self._model_client.create(
            self._system_messages + self._history
        )
        self._history.append(AssistantMessage(content=model_result.content, source=self.metadata["type"]))

        # extract answer
        match = re.search(r"\{\{(\-?\d+(\.\d+)?)\}\}", model_result.content)
        answer = match.group(1) if match else "0"

        # publish intermediate response (first round)
        await self.publish_message(
            IntermediateSolverResponse(
                content=model_result.content,
                question=message.question,
                answer=answer,
                round=self._round,
            ),
            topic_id=DefaultTopicId(type=self._topic_type),
        )

    @message_handler
    async def handle_response(self, message: IntermediateSolverResponse, ctx: MessageContext) -> None:
        # collect neighbor responses
        self._buffer.setdefault(message.round, []).append(message)

        if len(self._buffer[message.round]) == self._num_neighbors:
            print(f"{self._role} received all peer responses for round {message.round}")
            # prepare new prompt with peers’ answers
            prompt = "Other agents’ answers so far:\n"
            for resp in self._buffer[message.round]:
                prompt += f"- {resp.content}\n"
            prompt += (
                f"Reconsider your answer to the original problem: {message.question}. "
                "Provide your updated reasoning and a final answer in the form {{answer}}."
            )

            self._round += 1
            if self._round >= self._max_round:
                # stop debating → final answer
                final_answer = message.answer
                await self.publish_message(
                    FinalSolverResponse(answer=final_answer, question=message.question),
                    topic_id=DefaultTopicId(),
                )
            else:
                # ask self again
                await self.send_message(
                    SolverRequest(content=prompt, question=message.question),
                    self.id
                )

            self._buffer.pop(message.round)



# valuationAgent = SpecialistAgent(
#     model_client=chat_client,
#     role="Valuation Expert",
#     goal="Estimate fair value using discounted cash flow & volatility.",
#     tools=[annualisedVolatilityTool, annualisedReturnTool, volumeTool],
#     topic_type="valuation",
#     num_neighbors=2,
#     max_round=3,
# )

# fundamentalAgent = SpecialistAgent(
#     model_client=chat_client,
#     role="Fundamentals Expert",
#     goal="Analyze company balance sheets, ratios, and earnings.",
#     tools=[documentTool],
#     topic_type="specialist",
#     num_neighbors=2,
#     max_round=3,
# )

# sentimentAgent = SpecialistAgent(
#     model_client=chat_client,
#     role="sentiment Analyst",
#     goal="Use moving averages, ATR, ADX, and chart patterns to find trends.",
#     tools=[newsBodyTool],
#     topic_type="technical",
#     num_neighbors=2,
#     max_round=3,
# )


In [5]:
@default_subscription
class FinanceConsensusAggregator(RoutedAgent):
    def __init__(self, num_agents: int):
        super().__init__("Finance Consensus Aggregator")
        self._num_agents = num_agents
        self._buffer: Dict[str, FinalSolverResponse] = {}
        self._round = 0
        self._max_round = 5
        # Define expected agent types
        self._expected_agents = {"valuation_expert", "fundamentals_expert", "sentiment_analyst"}

    @message_handler
    async def handle_question(self, message: Question, ctx: MessageContext) -> None:
        self._round = 0
        self._buffer.clear()
        print(f"{'-'*80}\nAggregator {self.id} received question:\n{message.content}")
        prompt = (
            f"Can you analyze the following financial problem?\n{message.content}\n"
            "Explain your reasoning from your perspective. "
            "Your final answer must be a single numerical number, "
            "in the form {{answer}}, at the end of your response."
        )
        print(f"{'-'*80}\nAggregator {self.id} publishes initial solver request.")
        await self.publish_message(SolverRequest(content=prompt, question=message.content), topic_id=DefaultTopicId())

    @message_handler
    async def handle_final_solver_response(self, message: FinalSolverResponse, ctx: MessageContext) -> None:
        agent_type = message.metadata.get("type") if message.metadata else "unknown"
        print(f"{'-'*80}\nAggregator {self.id} received final answer from {agent_type} agent: {message.answer}")

        # Save this agent's response
        self._buffer[agent_type] = message

        # Wait until all expected agents have responded
        if all(agent in self._buffer for agent in self._expected_agents):
            answers = [resp.answer for resp in self._buffer.values()]
            unique_answers = set(answers)

            if len(unique_answers) == 1:
                # Consensus reached ✅
                agreed_answer = unique_answers.pop()
                print(f"{'-'*80}\nAggregator {self.id}: Consensus reached on {agreed_answer}")
                await self.publish_message(Answer(content=agreed_answer), topic_id=DefaultTopicId())
                self._buffer.clear()
            else:
                # No consensus yet ❌ → send back for another round
                self._round += 1
                if self._round >= self._max_round:
                    print(f"{'-'*80}\nAggregator {self.id}: Max rounds reached, forcing final answer.")
                    # Fallback: majority vote
                    majority_answer = max(set(answers), key=answers.count)
                    await self.publish_message(Answer(content=majority_answer), topic_id=DefaultTopicId())
                    self._buffer.clear()
                else:
                    print(f"{'-'*80}\nAggregator {self.id}: No consensus yet, sending back for round {self._round}.")
                    debate_prompt = "The agents have provided different answers:\n"
                    for agent, resp in self._buffer.items():
                        debate_prompt += f"- {agent} agent: {resp.answer} ({resp.content})\n"
                    debate_prompt += (
                        "Please reconsider your reasoning in light of the other agents' answers "
                        "and try to reach a consensus. Your revised final answer should be a single "
                        "numerical number, in the form {{answer}}, at the end of your response."
                    )
                    # Ask all agents again
                    await self.publish_message(SolverRequest(content=debate_prompt, question=message.question), topic_id=DefaultTopicId())
                    self._buffer.clear()

In [6]:
runtime = SingleThreadedAgentRuntime()

model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")

# Register fundamental analysis agent
await SpecialistAgent.register(
    runtime,
    "ValuationAgent",
    lambda: SpecialistAgent(
        model_client=chat_client,
        role="Valuation Expert",
        goal="Estimate fair value using discounted cash flow & volatility.",
        tools=[annualisedVolatilityTool, annualisedReturnTool, volumeTool],
        topic_type="valuation",
        num_neighbors=2,
        max_round=3,
    ),
)

await SpecialistAgent.register(
    runtime,
    "FundamentalAgent",
    lambda: SpecialistAgent(
        model_client=chat_client,
        role="Fundamentals Expert",
        goal="Analyze company balance sheets, ratios, and earnings.",
        tools=[documentTool],
        topic_type="fundamentals",
        num_neighbors=2,
        max_round=3,
    ),
)

await SpecialistAgent.register(
    runtime,
    "SentimentAgent",
    lambda: SpecialistAgent(
        model_client=chat_client,
        role="Sentiment Analyst",
        goal="Use moving averages, ATR, ADX, and chart patterns to find trends.",
        tools=[newsBodyTool],
        topic_type="technical",
        num_neighbors=2,
        max_round=3,
    ),
)

# Register aggregator that forces debate until consensus
await FinanceConsensusAggregator.register(
    runtime,
    "FinanceConsensusAggregator",
    lambda: FinanceConsensusAggregator(num_agents=3),  # fundamental + valuation + sentimental
)

AgentType(type='FinanceConsensusAggregator')

In [7]:
# Fundamental ↔ Valuation
await runtime.add_subscription(TypeSubscription("FundamentalAgent", "ValuationAgent"))
await runtime.add_subscription(TypeSubscription("ValuationAgent", "FundamentalAgent"))

# Valuation ↔ Sentimental
await runtime.add_subscription(TypeSubscription("ValuationAgent", "SentimentalAgent"))
await runtime.add_subscription(TypeSubscription("SentimentalAgent", "ValuationAgent"))

# Sentimental ↔ Fundamental
await runtime.add_subscription(TypeSubscription("SentimentalAgent", "FundamentalAgent"))
await runtime.add_subscription(TypeSubscription("FundamentalAgent", "SentimentalAgent"))

# All three agents and the aggregator subscribe to the default topic
await runtime.add_subscription(TypeSubscription("FundamentalAgent", DefaultTopicId()))
await runtime.add_subscription(TypeSubscription("ValuationAgent", DefaultTopicId()))
await runtime.add_subscription(TypeSubscription("SentimentalAgent", DefaultTopicId()))
await runtime.add_subscription(TypeSubscription("FinanceConsensusAggregator", DefaultTopicId()))

In [8]:
# Example finance question
question = "Should we recommend buying, holding, or selling Tesla stock based on fundamentals, valuation, and sentiment?"

# Start the runtime
runtime.start()

# Publish the question to the default topic (all agents + aggregator listen here)
await runtime.publish_message(Question(content=question), DefaultTopicId())

# Wait for the runtime to stop when idle (after debate + consensus)
await runtime.stop_when_idle()

# Close model client connection
await model_client.close()

--------------------------------------------------------------------------------
Aggregator FinanceConsensusAggregator/default received question:
Should we recommend buying, holding, or selling Tesla stock based on fundamentals, valuation, and sentiment?
--------------------------------------------------------------------------------
Aggregator FinanceConsensusAggregator/default publishes initial solver request.
Valuation Expert received request:
Can you analyze the following financial problem?
Should we recommend buying, holding, or selling Tesla stock based on fundamentals, valuation, and sentiment?
Explain your reasoning from your perspective. Your final answer must be a single numerical number, in the form {{answer}}, at the end of your response.
Fundamentals Expert received request:
Can you analyze the following financial problem?
Should we recommend buying, holding, or selling Tesla stock based on fundamentals, valuation, and sentiment?
Explain your reasoning from your perspectiv