In [12]:
import openai
import threading
import time
import random
import json
from typing import List, Dict, Any, Callable
import queue
from enum import Enum, auto

# -------------------- Enums for Consistent Mappings --------------------

class AgentType(Enum):
    PLOT_GENERATOR = auto()
    PLOT_SUMMARISER = auto()
    NARRATION_GENERATOR = auto()
    CHARACTER_CREATOR = auto()
    PLOT_CRITIQUE = auto()
    NARRATION_CRITIQUE = auto()
    CHARACTER_CRITIQUE = auto()
    FLOW_CRITIQUE = auto()

class MessageType(Enum):
    TASK = auto()
    CRITIQUE = auto()
    UPDATE = auto()
    QUERY = auto()
    RESPONSE = auto()

class RoleEnum(Enum):
    STORYTELLER = auto()
    ARCHITECT = auto()
    NARRATOR = auto()
    CRITIQUE = auto()

# -------------------- Message Class --------------------

class Message:
    """
    Encapsulates messages exchanged between agents.
    """
    def __init__(self, sender_id: int, receiver_id: int, content: Dict[str, Any], msg_type: MessageType):
        self.sender_id = sender_id
        self.receiver_id = receiver_id
        self.content = content
        self.msg_type = msg_type

    def __str__(self):
        return f"Message(from={self.sender_id}, to={self.receiver_id}, type={self.msg_type.name}, content={self.content})"

# -------------------- Narration State --------------------

class NarrationState:
    """
    Maintains the global state of the narration.
    """
    def __init__(self):
        self.plot = ""
        self.characters = {}
        self.lock = threading.Lock()

    def update_plot(self, event: str):
        with self.lock:
            self.plot += f" {event}"
            print(f"\n[Plot Updated]: {self.plot}\n")

    def summarize_plot(self):
        with self.lock:
            return self.plot.strip()

    def add_character(self, name: str, description: str):
        with self.lock:
            if name not in self.characters:
                self.characters[name] = description
                print(f"[Character Added]: {name} - {description}")

    def get_characters(self) -> Dict[str, str]:
        with self.lock:
            return self.characters.copy()

    def get_state(self) -> Dict[str, Any]:
        with self.lock:
            return {
                "plot": self.plot.strip(),
                "characters": self.characters.copy()
            }

# -------------------- OpenAI API Wrapper --------------------

class OpenAIAPIWrapper:
    """
    Handles interactions with the OpenAI API.
    """
    def __init__(self, api_key: str, model: str = "gpt-4o-mini", temperature: float = 0.7):
        openai.api_key = api_key
        self.model = model
        self.temperature = temperature

    def generate_response(self, prompt: str, as_json: bool = False) -> str:
        """
        Generates a response from the OpenAI API based on the given prompt.
        If as_json is True, expects the LLM to return a JSON-formatted string.
        """
        try:
            response = openai.ChatCompletion.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": "You are a creative storyteller."},
                    {"role": "user", "content": prompt},
                ],
                temperature=self.temperature,
            )
            content = response.choices[0].message.content.strip()
            if as_json:
                # Ensure the response is valid JSON
                if content.startswith("{") and content.endswith("}"):
                    return content
                else:
                    # Attempt to extract JSON from the response
                    json_start = content.find("{")
                    json_end = content.rfind("}") + 1
                    if json_start != -1 and json_end != -1:
                        return content[json_start:json_end]
                    else:
                        raise ValueError("No JSON found in the response.")
            else:
                return content
        except Exception as e:
            print(f"[OpenAIAPIWrapper] Error generating response: {e}")
            return ""

# -------------------- Base Agent Class --------------------

class Agent(threading.Thread):
    """
    Base class for all agents.
    """
    def __init__(self, 
                 agent_id: int,
                 agent_type: AgentType,
                 api_wrapper: OpenAIAPIWrapper,
                 narration_state: NarrationState,
                 swarm: 'Swarm'):
        super().__init__()
        self.agent_id = agent_id
        self.agent_type = agent_type
        self.api_wrapper = api_wrapper
        self.narration_state = narration_state
        self.swarm = swarm
        self.inbox = queue.Queue()
        self.memory_buffer: List[str] = []
        self.running = True

    def run(self):
        while self.running:
            try:
                message: Message = self.inbox.get(timeout=1)  # Wait for a message
                if message:
                    self.handle_message(message)
                    self.inbox.task_done()
            except queue.Empty:
                continue

    def handle_message(self, message: Message):
        """
        Handles incoming messages based on their type.
        """
        print(f"Agent {self.agent_id} ({self.agent_type.name}) received {message.msg_type.name} from Agent {message.sender_id}: {message.content}")
        self.memory_buffer.append(f"From Agent {message.sender_id}: {message.content}")

        if message.msg_type == MessageType.TASK:
            self.process_task(message.content)
        elif message.msg_type == MessageType.CRITIQUE:
            self.process_critique(message.content)
        elif message.msg_type == MessageType.UPDATE:
            self.process_update(message.content)
        elif message.msg_type == MessageType.QUERY:
            self.process_query(message.content)
        elif message.msg_type == MessageType.RESPONSE:
            self.process_response(message.content)

    def process_task(self, content: Dict[str, Any]):
        """
        Processes task messages. To be overridden by subclasses.
        """
        pass

    def process_critique(self, content: Dict[str, Any]):
        """
        Processes critique messages. To be overridden by subclasses.
        """
        pass

    def process_update(self, content: Dict[str, Any]):
        """
        Processes update messages. To be overridden by subclasses.
        """
        pass

    def process_query(self, content: Dict[str, Any]):
        """
        Processes query messages. To be overridden by subclasses.
        """
        pass

    def process_response(self, content: Dict[str, Any]):
        """
        Processes response messages. To be overridden by subclasses.
        """
        pass

    def send_message(self, receiver_id: int, content: Dict[str, Any], msg_type: MessageType):
        """
        Sends a message to another agent via the swarm.
        """
        message = Message(sender_id=self.agent_id, receiver_id=receiver_id, content=content, msg_type=msg_type)
        self.swarm.send_message(message)

    def stop_agent(self):
        self.running = False

# -------------------- Specialized Agent Classes --------------------

# Generators

class PlotGenerator(Agent):
    def __init__(self, agent_id: int, api_wrapper: OpenAIAPIWrapper, narration_state: NarrationState, swarm: 'Swarm'):
        super().__init__(agent_id, AgentType.PLOT_GENERATOR, api_wrapper, narration_state, swarm)

    def process_task(self, content: Dict[str, Any]):
        """
        Generates a new plot event based on instructions.
        Expects content to have 'instructions'.
        """
        instructions = content.get("instructions", "")
        current_plot = self.narration_state.summarize_plot()
        characters = self.narration_state.get_characters()

        prompt = (
            f"Current Plot: {current_plot}\n"
            f"Characters: {', '.join(characters.keys())}\n"
            f"Instructions: {instructions}\n"
            f"Generate the next plot event in JSON format with an 'event' field."
        )

        response = self.api_wrapper.generate_response(prompt, as_json=True)
        if response:
            try:
                response_data = json.loads(response)
                event = response_data.get("event")
                if event:
                    self.narration_state.update_plot(event)
                    # Broadcast the new event to critiquers
                    self.swarm.broadcast(Message(
                        sender_id=self.agent_id,
                        receiver_id=0,  # 0 signifies broadcast
                        content={"event": event},
                        msg_type=MessageType.UPDATE
                    ))
            except json.JSONDecodeError:
                print(f"[PlotGenerator {self.agent_id}] Failed to parse JSON response.")

# Similar specialized classes can be created for PlotSummariser, NarrationGenerator, CharacterCreator, and Critiquers.

class PlotSummariser(Agent):
    def __init__(self, agent_id: int, api_wrapper: OpenAIAPIWrapper, narration_state: NarrationState, swarm: 'Swarm'):
        super().__init__(agent_id, AgentType.PLOT_SUMMARISER, api_wrapper, narration_state, swarm)

    def process_task(self, content: Dict[str, Any]):
        """
        Summarizes the current plot.
        """
        current_plot = self.narration_state.summarize_plot()
        prompt = f"Summarize the following plot in a concise paragraph:\n\n{current_plot}"
        response = self.api_wrapper.generate_response(prompt)
        summary = response.strip()
        print(f"[PlotSummariser {self.agent_id}] Summary: {summary}")

class NarrationGenerator(Agent):
    def __init__(self, agent_id: int, api_wrapper: OpenAIAPIWrapper, narration_state: NarrationState, swarm: 'Swarm'):
        super().__init__(agent_id, AgentType.NARRATION_GENERATOR, api_wrapper, narration_state, swarm)

    def process_task(self, content: Dict[str, Any]):
        """
        Generates narration based on the current state.
        """
        current_state = self.narration_state.get_state()
        prompt = (
            f"Current Plot: {current_state['plot']}\n"
            f"Characters: {', '.join(current_state['characters'].keys())}\n"
            f"Generate a narration paragraph that seamlessly integrates the latest plot event."
        )
        response = self.api_wrapper.generate_response(prompt)
        narration = response.strip()
        if narration:
            self.narration_state.update_plot(narration)
            # Broadcast the new narration
            self.swarm.broadcast(Message(
                sender_id=self.agent_id,
                receiver_id=0,
                content={"narration": narration},
                msg_type=MessageType.UPDATE
            ))

class CharacterCreator(Agent):
    def __init__(self, agent_id: int, api_wrapper: OpenAIAPIWrapper, narration_state: NarrationState, swarm: 'Swarm'):
        super().__init__(agent_id, AgentType.CHARACTER_CREATOR, api_wrapper, narration_state, swarm)

    def process_task(self, content: Dict[str, Any]):
        """
        Creates a new character based on instructions.
        Expects content to have 'description'.
        """
        description = content.get("description", "")
        current_characters = self.narration_state.get_characters()

        prompt = (
            f"Existing Characters: {', '.join(current_characters.keys())}\n"
            f"Description: {description}\n"
            f"Create a new character in JSON format with 'name' and 'description' fields."
        )

        response = self.api_wrapper.generate_response(prompt, as_json=True)
        if response:
            try:
                response_data = json.loads(response)
                name = response_data.get("name")
                desc = response_data.get("description")
                if name and desc:
                    self.narration_state.add_character(name, desc)
                    # Broadcast the new character
                    self.swarm.broadcast(Message(
                        sender_id=self.agent_id,
                        receiver_id=0,
                        content={"name": name, "description": desc},
                        msg_type=MessageType.UPDATE
                    ))
            except json.JSONDecodeError:
                print(f"[CharacterCreator {self.agent_id}] Failed to parse JSON response.")

# Critiquers

class PlotCritique(Agent):
    def __init__(self, agent_id: int, api_wrapper: OpenAIAPIWrapper, narration_state: NarrationState, swarm: 'Swarm'):
        super().__init__(agent_id, AgentType.PLOT_CRITIQUE, api_wrapper, narration_state, swarm)

    def process_update(self, content: Dict[str, Any]):
        """
        Critiques the latest plot event.
        """
        event = content.get("event")
        if event:
            prompt = f"Provide a constructive critique for the following plot event:\n\n{event}"
            response = self.api_wrapper.generate_response(prompt)
            critique = response.strip()
            print(f"[PlotCritique {self.agent_id}] Critique: {critique}")

class NarrationCritique(Agent):
    def __init__(self, agent_id: int, api_wrapper: OpenAIAPIWrapper, narration_state: NarrationState, swarm: 'Swarm'):
        super().__init__(agent_id, AgentType.NARRATION_CRITIQUE, api_wrapper, narration_state, swarm)

    def process_update(self, content: Dict[str, Any]):
        """
        Critiques the latest narration.
        """
        narration = content.get("narration")
        if narration:
            prompt = f"Provide a constructive critique for the following narration:\n\n{narration}"
            response = self.api_wrapper.generate_response(prompt)
            critique = response.strip()
            print(f"[NarrationCritique {self.agent_id}] Critique: {critique}")

class CharacterCritique(Agent):
    def __init__(self, agent_id: int, api_wrapper: OpenAIAPIWrapper, narration_state: NarrationState, swarm: 'Swarm'):
        super().__init__(agent_id, AgentType.CHARACTER_CRITIQUE, api_wrapper, narration_state, swarm)

    def process_update(self, content: Dict[str, Any]):
        """
        Critiques the latest character addition.
        """
        name = content.get("name")
        description = content.get("description")
        if name and description:
            prompt = f"Provide a constructive critique for the following character:\n\nName: {name}\nDescription: {description}"
            response = self.api_wrapper.generate_response(prompt)
            critique = response.strip()
            print(f"[CharacterCritique {self.agent_id}] Critique: {critique}")

class FlowCritique(Agent):
    def __init__(self, agent_id: int, api_wrapper: OpenAIAPIWrapper, narration_state: NarrationState, swarm: 'Swarm'):
        super().__init__(agent_id, AgentType.FLOW_CRITIQUE, api_wrapper, narration_state, swarm)

    def process_update(self, content: Dict[str, Any]):
        """
        Critiques the overall flow of the narration.
        """
        current_state = self.narration_state.get_state()
        prompt = (
            f"Provide a critique on the flow and coherence of the current narration based on the following state:\n\n"
            f"Plot: {current_state['plot']}\n"
            f"Characters: {', '.join(current_state['characters'].keys())}"
        )
        response = self.api_wrapper.generate_response(prompt)
        critique = response.strip()
        print(f"[FlowCritique {self.agent_id}] Critique: {critique}")

# -------------------- Swarm Class --------------------

class Swarm:
    """
    Manages the collection of agents and facilitates their interactions.
    """
    def __init__(self, 
                 num_agents: Dict[AgentType, int],
                 api_key: str,
                 model: str = "gpt-4o-mini",
                 temperature: float = 0.7):
        """
        Initializes the swarm with specified numbers of each agent type.
        """
        self.api_wrapper = OpenAIAPIWrapper(api_key, model, temperature)
        self.narration_state = NarrationState()
        self.agents: Dict[int, Agent] = {}
        self.agent_lock = threading.Lock()
        self.next_agent_id = 1

        # Create agents based on the num_agents dictionary
        for agent_type, count in num_agents.items():
            for _ in range(count):
                agent = self.create_agent(agent_type)
                self.agents[agent.agent_id] = agent

    def create_agent(self, agent_type: AgentType) -> Agent:
        """
        Creates an agent based on the AgentType.
        """
        agent_id = self.next_agent_id
        self.next_agent_id += 1

        if agent_type == AgentType.PLOT_GENERATOR:
            return PlotGenerator(agent_id, self.api_wrapper, self.narration_state, self)
        elif agent_type == AgentType.PLOT_SUMMARISER:
            return PlotSummariser(agent_id, self.api_wrapper, self.narration_state, self)
        elif agent_type == AgentType.NARRATION_GENERATOR:
            return NarrationGenerator(agent_id, self.api_wrapper, self.narration_state, self)
        elif agent_type == AgentType.CHARACTER_CREATOR:
            return CharacterCreator(agent_id, self.api_wrapper, self.narration_state, self)
        elif agent_type == AgentType.PLOT_CRITIQUE:
            return PlotCritique(agent_id, self.api_wrapper, self.narration_state, self)
        elif agent_type == AgentType.NARRATION_CRITIQUE:
            return NarrationCritique(agent_id, self.api_wrapper, self.narration_state, self)
        elif agent_type == AgentType.CHARACTER_CRITIQUE:
            return CharacterCritique(agent_id, self.api_wrapper, self.narration_state, self)
        elif agent_type == AgentType.FLOW_CRITIQUE:
            return FlowCritique(agent_id, self.api_wrapper, self.narration_state, self)
        else:
            raise ValueError(f"Unknown AgentType: {agent_type}")

    def start_swarm(self):
        """
        Starts all agent threads.
        """
        for agent in self.agents.values():
            agent.start()
        print(f"[Swarm] Swarm started with {len(self.agents)} agents.")

    def stop_swarm(self):
        """
        Stops all agent threads gracefully.
        """
        for agent in self.agents.values():
            agent.stop_agent()
        for agent in self.agents.values():
            agent.join()
        print("[Swarm] Swarm stopped.")

    def send_message(self, message: Message):
        """
        Sends a message to a specific agent.
        If receiver_id is 0, it's a broadcast message to all agents.
        """
        if message.receiver_id == 0:
            self.broadcast(message)
        else:
            with self.agent_lock:
                receiver = self.agents.get(message.receiver_id)
                if receiver:
                    receiver.inbox.put(message)
                    print(f"[Swarm] {message}")
                else:
                    print(f"[Swarm] Agent {message.receiver_id} not found.")

    def broadcast(self, message: Message):
        """
        Broadcasts a message to all agents except the sender.
        """
        with self.agent_lock:
            for agent_id, agent in self.agents.items():
                if agent_id != message.sender_id:
                    agent.inbox.put(message)
        print(f"[Swarm] Broadcast from Agent {message.sender_id}: {message.content}")

    def add_initial_narration(self):
        """
        Adds initial narration to kickstart the narrative.
        """
        initial_event = "The adventure begins in a quiet village surrounded by dense forests."
        self.narration_state.update_plot(initial_event)
        # Broadcast the initial event to critiquers
        for agent in self.agents.values():
            if isinstance(agent, (PlotCritique, NarrationCritique, CharacterCritique, FlowCritique)):
                message = Message(
                    sender_id=0,  # 0 signifies system or initial message
                    receiver_id=agent.agent_id,
                    content={"event": initial_event},
                    msg_type=MessageType.UPDATE
                )
                agent.inbox.put(message)

    def add_character(self, name: str, description: str):
        """
        Adds a new character to the narration.
        """
        self.narration_state.add_character(name, description)
        # Broadcast the new character to critiquers
        for agent in self.agents.values():
            if isinstance(agent, (PlotCritique, NarrationCritique, CharacterCritique, FlowCritique)):
                message = Message(
                    sender_id=0,
                    receiver_id=agent.agent_id,
                    content={"name": name, "description": description},
                    msg_type=MessageType.UPDATE
                )
                agent.inbox.put(message)

# -------------------- Initialization and Usage --------------------

def main():
    # Define the number of each type of agent in the swarm
    num_agents = {
        AgentType.PLOT_GENERATOR: 1,
        AgentType.PLOT_SUMMARISER: 1,
        AgentType.NARRATION_GENERATOR: 1,
        AgentType.CHARACTER_CREATOR: 1,
        AgentType.PLOT_CRITIQUE: 1,
        AgentType.NARRATION_CRITIQUE: 1,
        AgentType.CHARACTER_CRITIQUE: 1,
        AgentType.FLOW_CRITIQUE: 1
    }

    # Initialize the swarm
    OPENAI_API_KEY = "your-openai-api-key"  # Replace with your actual OpenAI API key
    swarm = Swarm(num_agents=num_agents, api_key=OPENAI_API_KEY)

    # Start all agents
    swarm.start_swarm()

    # Add initial narration
    swarm.add_initial_narration()

    # Example: Add a new character after some time
    time.sleep(5)
    swarm.add_character("Aria", "A brave knight with a mysterious past.")

    # Let the swarm run for a certain duration
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\n[Main] Stopping the swarm...")
        swarm.stop_swarm()

In [11]:
import os
from dotenv import load_dotenv

load_dotenv()

# -------------------- Example Usage --------------------

if __name__ == "__main__":
    # Replace 'your-api-key' with your actual OpenAI API key
    API_KEY = os.environ.get("OPENAI_API_KEY")

    # Initialize swarm with desired number of agents
    swarm = Swarm(num_agents=10, api_key=API_KEY)
    
    # Define and add tools to the ToolManager
    summarize_tool = Tool(
        name="summarize_plot",
        function=summarize_plot,
        description="Summarizes the current plot."
    )
    conflict_tool = Tool(
        name="add_conflict",
        function=add_conflict,
        description="Adds a conflict to a given event."
    )
    swarm.add_tool(summarize_tool)
    swarm.add_tool(conflict_tool)

    # Start the swarm
    swarm.start_swarm()

    # Assign initial tasks to kickstart the narrative
    swarm.add_initial_tasks()

    # Start pairwise interactions
    swarm.run_pairwise_interactions(interaction_interval=10.0)  # Every 10 seconds

    # Allow some time for agents to process tasks and interact
    try:
        # Run the simulation for a certain period
        simulation_time = 120  # seconds
        start_time = time.time()
        while time.time() - start_time < simulation_time:
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # Retrieve and print the final narration state
    narration = swarm.get_narration()
    print("\nFinal Narration State:")
    print(f"Plot: {narration['plot']}")
    print("Characters:")
    for name, desc in narration['characters'].items():
        print(f" - {name}: {desc}")

    # Stop the swarm
    swarm.stop_swarm()


[ToolManager] Tool added: summarize_plot
[ToolManager] Tool added: add_conflict
[Swarm] Swarm started with 10 agents.
[Swarm] Assigned task to Agent 2: {'type': <MessageType.TASK: 'task'>, 'content': 'Start the adventure with an intriguing event.'}
[Swarm] Assigned task to Agent 8: {'type': <MessageType.ADD_CHARACTER: 'add_character'>, 'content': 'Eve: A mysterious sorceress with unknown motives.'}
Agent 2 received message from Agent 0: Start the adventure with an intriguing event.
Agent 8 received message from Agent 0: Eve: A mysterious sorceress with unknown motives.
[Character Added]: Eve - A mysterious sorceress with unknown motives.
[Swarm] Agent 8 broadcasted message: Eve: A mysterious sorceress with unknown motives.
[Swarm] Agent 8 broadcasted message: Eve: A mysterious sorceress with unknown motives.
[Swarm] Agent 8 broadcasted message: Eve: A mysterious sorceress with unknown motives.
[Swarm] Agent 8 broadcasted message: Eve: A mysterious sorceress with unknown motives.
[Swarm