In [1]:
import sys
sys.path.append("../../libs")
from models import *


In [2]:
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import List

from autogen_core import (
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    message_handler,
)
from autogen_core._default_subscription import DefaultSubscription
from autogen_core._default_topic import DefaultTopicId
from autogen_core.models import (
    SystemMessage,
)

In [3]:
class TaxSpecialty(str, Enum):
    PLANNING = "planning"
    DISPUTE_RESOLUTION = "dispute_resolution"
    COMPLIANCE = "compliance"
    PREPARATION = "preparation"


@dataclass
class ClientRequest:
    content: str


@dataclass
class RequestAssessment:
    content: str


class TaxSpecialist(RoutedAgent):
    def __init__(
        self,
        description: str,
        specialty: TaxSpecialty,
        system_messages: List[SystemMessage],
    ) -> None:
        super().__init__(description)
        self.specialty = specialty
        self._system_messages = system_messages
        self._memory: List[ClientRequest] = []

    @message_handler
    async def handle_message(self, message: ClientRequest, ctx: MessageContext) -> None:
        # Process the client request.
        print(f"\n{'='*50}\nTax specialist {self.id} with specialty {self.specialty}:\n{message.content}")
        # Send a response back to the manager
        if ctx.topic_id is None:
            raise ValueError("Topic ID is required for broadcasting")
        await self.publish_message(
            message=RequestAssessment(content=f"I can handle this request in {self.specialty}."),
            topic_id=ctx.topic_id,
        )

In [4]:
async def run_single_tenant_single_scope() -> None:
    # Create the runtime.
    runtime = SingleThreadedAgentRuntime()

    # Register TaxSpecialist agents for each specialty
    specialist_agent_type_1 = "TaxSpecialist_1"
    specialist_agent_type_2 = "TaxSpecialist_2"
    await TaxSpecialist.register(
        runtime=runtime,
        type=specialist_agent_type_1,
        factory=lambda: TaxSpecialist(
            description="A tax specialist 1",
            specialty=TaxSpecialty.PLANNING,
            system_messages=[SystemMessage(content="You are a tax specialist.")],
        ),
    )

    await TaxSpecialist.register(
        runtime=runtime,
        type=specialist_agent_type_2,
        factory=lambda: TaxSpecialist(
            description="A tax specialist 2",
            specialty=TaxSpecialty.DISPUTE_RESOLUTION,
            system_messages=[SystemMessage(content="You are a tax specialist.")],
        ),
    )

    # Add default subscriptions for each agent type
    await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_1))
    await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_2))

    # Start the runtime and send a message to agents on default topic
    runtime.start()
    await runtime.publish_message(ClientRequest("I need to have my tax for 2024 prepared."), topic_id=DefaultTopicId())
    await runtime.stop_when_idle()


await run_single_tenant_single_scope()


Tax specialist TaxSpecialist_1/default with specialty TaxSpecialty.PLANNING:
I need to have my tax for 2024 prepared.

Tax specialist TaxSpecialist_2/default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need to have my tax for 2024 prepared.


In [5]:
async def run_multi_tenant_single_scope() -> None:
    # Create the runtime
    runtime = SingleThreadedAgentRuntime()

    # List of clients (tenants)
    tenants = ["ClientABC", "ClientXYZ"]

    # Initialize sessions and map the topic type to each TaxSpecialist agent type
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        specialist_subscription = DefaultSubscription(agent_type=specialist_agent_type)
        await runtime.add_subscription(specialist_subscription)

    # Start the runtime
    runtime.start()

    # Publish client requests to their respective topics
    for tenant in tenants:
        topic_source = tenant  # The topic source is the client name
        topic_id = DefaultTopicId(source=topic_source)
        await runtime.publish_message(
            ClientRequest(f"{tenant} requires tax services."),
            topic_id=topic_id,
        )

    # Allow time for message processing
    await asyncio.sleep(1)

    # Stop the runtime when idle
    await runtime.stop_when_idle()


await run_multi_tenant_single_scope()


Tax specialist TaxSpecialist_planning/ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC requires tax services.

Tax specialist TaxSpecialist_dispute_resolution/ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC requires tax services.

Tax specialist TaxSpecialist_compliance/ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC requires tax services.

Tax specialist TaxSpecialist_preparation/ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC requires tax services.

Tax specialist TaxSpecialist_planning/ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ requires tax services.

Tax specialist TaxSpecialist_dispute_resolution/ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ requires tax services.

Tax specialist TaxSpecialist_compliance/ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ requires tax services.

Tax specialist TaxSpecialist_preparation/ClientXYZ with specialty TaxSpecialty.PREPARATION:
ClientXYZ req

In [6]:
async def run_single_tenant_multiple_scope() -> None:
    # Create the runtime
    runtime = SingleThreadedAgentRuntime()
    # Register TaxSpecialist agents for each specialty and add subscriptions
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        specialist_subscription = TypeSubscription(topic_type=specialty.value, agent_type=specialist_agent_type)
        await runtime.add_subscription(specialist_subscription)

    # Start the runtime
    runtime.start()

    # Publish a ClientRequest to each specialist's topic
    for specialty in TaxSpecialty:
        topic_id = TopicId(type=specialty.value, source="default")
        await runtime.publish_message(
            ClientRequest(f"I need assistance with {specialty.value} taxes."),
            topic_id=topic_id,
        )

    # Allow time for message processing
    await asyncio.sleep(1)

    # Stop the runtime when idle
    await runtime.stop_when_idle()


await run_single_tenant_multiple_scope()


Tax specialist TaxSpecialist_planning/default with specialty TaxSpecialty.PLANNING:
I need assistance with planning taxes.

Tax specialist TaxSpecialist_dispute_resolution/default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need assistance with dispute_resolution taxes.

Tax specialist TaxSpecialist_compliance/default with specialty TaxSpecialty.COMPLIANCE:
I need assistance with compliance taxes.

Tax specialist TaxSpecialist_preparation/default with specialty TaxSpecialty.PREPARATION:
I need assistance with preparation taxes.


In [7]:
async def run_multi_tenant_multiple_scope() -> None:
    # Create the runtime
    runtime = SingleThreadedAgentRuntime()

    # Define TypeSubscriptions for each specialty and tenant
    tenants = ["ClientABC", "ClientXYZ"]

    # Initialize agents for all specialties and add type subscriptions
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        for tenant in tenants:
            specialist_subscription = TypeSubscription(
                topic_type=f"{tenant}_{specialty.value}", agent_type=specialist_agent_type
            )
            await runtime.add_subscription(specialist_subscription)

    # Start the runtime
    runtime.start()

    # Send messages for each tenant to each specialty
    for tenant in tenants:
        for specialty in TaxSpecialty:
            topic_id = TopicId(type=f"{tenant}_{specialty.value}", source=tenant)
            await runtime.publish_message(
                ClientRequest(f"{tenant} needs assistance with {specialty.value} taxes."),
                topic_id=topic_id,
            )

    # Allow time for message processing
    await asyncio.sleep(1)

    # Stop the runtime when idle
    await runtime.stop_when_idle()


await run_multi_tenant_multiple_scope()


Tax specialist TaxSpecialist_planning/ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC needs assistance with planning taxes.

Tax specialist TaxSpecialist_dispute_resolution/ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC needs assistance with dispute_resolution taxes.

Tax specialist TaxSpecialist_compliance/ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC needs assistance with compliance taxes.

Tax specialist TaxSpecialist_preparation/ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC needs assistance with preparation taxes.

Tax specialist TaxSpecialist_planning/ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ needs assistance with planning taxes.

Tax specialist TaxSpecialist_dispute_resolution/ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ needs assistance with dispute_resolution taxes.

Tax specialist TaxSpecialist_compliance/ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ needs assistan