<a href="https://colab.research.google.com/github/Denis2054/Building-Business-Ready-Generative-AI-Systems/blob/main/Chapter10/GenAISYS_%26_MAS_No_Interface.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Orchestrating AI Agents: A Strategic Framework from Controlled Systems to Swarm MAS(Multi-Agent System) Intelligence

Copyright 2025, Denis Rothman

Educational Multi-Agent Task Processor
Original Concept by: Denis Rothman

This program demonstrates how the principles of a multi-agent system can be applied to process a batch of tasks asynchronously. It showcases a two-stage process:
1. A "swarm" of worker agents processes initial tasks in parallel.
2. A "summarizer" agent takes the collective output of the swarm to perform a synthesis task, demonstrating cross-agent communication.

*No new external libraries are needed. We rely on asyncio and aiohttp.*

# Introduction

This program provides a powerful and practical demonstration of a **Multi-Agent System (MAS)**, a key paradigm in artificial intelligence where multiple autonomous computational entities, known as "agents," interact to solve problems that are beyond the scope of a single agent. The architecture of this system is designed to be educational, clearly illustrating foundational concepts of agent-based computing using only standard Python libraries for asynchronous operations.

The core of this system is the **worker_agent**. Each instance of this asynchronous function represents a simple, reactive agent. It has a single, well-defined capability: to accept a task (an NLP problem description), interact with an external environment (the OpenAI API), and produce a result. **These agents operate autonomously, without knowledge of one another.** **The concept of a "swarm" is realized when the swarm_orchestrator dynamically creates a collection of these worker agents, one for each task in the provided list.** The orchestrator then leverages asyncio.gather to dispatch this entire swarm concurrently. This technique, a form of parallel task execution, is fundamental to swarm intelligence, where the collective, simultaneous effort of many simple agents leads to highly efficient, large-scale problem-solving.

**This system is further elevated by demonstrating a more complex agent interaction pattern known as a Task-Dependency Workflow**. This is showcased through the introduction of the **summarizer_agent**, a different type of agent whose operation is conditionally dependent on the output of the initial worker swarm.

This introduces a second stage to the workflow, illustrating a basic form of **inter-agent communication** (or more accurately, *indirect communication through data sharing*). The swarm_orchestrator, acting as a central controller, gathers the results from the worker swarm and passes this synthesized dataset to the summarizer agent. The summarizer then performs a different kind of task—synthesis and analysis—transforming the raw outputs into a high-level summary. This two-stage process (parallel execution followed by sequential synthesis) is a common and powerful pattern in designing sophisticated multi-agent systems, where different agents with different specializations collaborate to achieve a final, composite goal.

*It is important to distinguish between the core agent engine shown here and a full, enterprise-grade AI platform.* The code in this program represents the fundamental computational engine—the logic that defines agent behavior and orchestrates the swarm. A complete platform built for a corporation would include many additional layers around this core. These cloud-native features would typically include robust user management with roles and permissions, secure and scalable databases for storing tasks and results, integrated monitoring and logging for performance and security, and billing systems. **While the features of full entreprise-grade platform are essential for a production-ready application, the code displayed here is the heart of the system, demonstrating the essential multi-agent processing logic.**

# Setting up the Environement

In [None]:
#API Key
#Store you key in a file and read it(you can type it directly in the notebook but it will be visible for somebody next to you)
from google.colab import drive
drive.mount('/content/drive')
f = open("drive/MyDrive/files/api_key.txt", "r")
API_KEY=f.readline()
api_key=f.readline()
f.close()

Mounted at /content/drive


In [None]:
import asyncio
import aiohttp
import nest_asyncio
import os
import time
from IPython.display import display, HTML
import pandas as pd

In [None]:
# --- Setup Functions --

def setup_environment():
    """
    Installs necessary libraries and sets up the event loop.
    """
    try:
        import nest_asyncio
        nest_asyncio.apply()
    except ImportError:
        print("Installing nest_asyncio...")
        import subprocess
        import sys
        subprocess.check_call([sys.executable, "-m", "pip", "install", "nest_asyncio"])
        import nest_asyncio
        nest_asyncio.apply()

def load_tasks_from_file(filename="tasks.txt"):
    """
    Loads the task descriptions from a local file.
    """
    tasks = []
    try:
        if not os.path.exists(filename):
            print(f"{filename} not found. Downloading...")
            from IPython import get_ipython
            ipython = get_ipython()
            if ipython: # Check if running in an IPython environment
                ipython.system(f'curl -L https://raw.githubusercontent.com/Denis2054/Transformers_3rd_Edition/master/Chapter15/tasks.txt --output "{filename}"')
            else:
                # Fallback for standard Python script
                import requests
                url = "https://raw.githubusercontent.com/Denis2054/Transformers_3rd_Edition/master/Chapter15/tasks.txt"
                r = requests.get(url)
                with open(filename, 'wb') as f:
                    f.write(r.content)

        df = pd.read_csv(filename, header=None, on_bad_lines='skip', names=['Tasks'])
        print(f"Successfully loaded {len(df)} tasks from {filename}")
        tasks = df['Tasks'].dropna().tolist()
    except Exception as e:
        print(f"Error loading tasks: {e}")
    return tasks



# Agent Definitions

In [None]:
# --- Agent and Orchestrator Definitions ---

# In a multi-agent system, we have individual "agents" that perform tasks
# and an "orchestrator" or "controller" that manages them.

# Let's define our "Worker Agent"
# This agent's job is to take a single task (a prompt), send it to the
# OpenAI API, and return the result. It's an independent specialist.

async def worker_agent(session, task, api_key, model_name="gpt-4o-mini"):
    """
    Represents a single, autonomous agent that performs a task.
    It takes a task description and returns the API response.
    """
    url = "https://api.openai.com/v1/chat/completions"
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    payload = {
        "model": model_name,
        "messages": [
            {"role": "system", "content": "You are an expert Natural Language Processing exercise expert."},
            {"role": "assistant", "content": "1. You can explain any NLP task. 2. Create an example. 3. Solve the example."},
            {"role": "user", "content": task}
        ],
        "temperature": 0.1
    }

    # print(f"Worker Agent dispatched for task: '{task[:50]}...'")
    try:
        async with session.post(url, json=payload, headers=headers) as response:
            response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
            if response.headers.get('Content-Type') == 'application/json':
                return await response.json()
            else:
                text = await response.text()
                print(f"Error: Unexpected response content type: {response.headers.get('Content-Type')}")
                return {"error": text}
    except aiohttp.ClientError as e:
        print(f"An HTTP error occurred: {e}")
        return {"error": str(e)}

# Define a "Summarizer Agent"
# This agent takes the results from all the worker agents and creates a summary.
# This demonstrates an agent that relies on the output of other agents.

async def summarizer_agent(session, completed_tasks, api_key, model_name="gpt-4o-mini"):
    """
    An agent that synthesizes results from other agents into a summary.
    """
    print("\n--- Summarizer Agent Activated ---")
    print("Task: To summarize the findings from the worker swarm.")

    summary_input = "\n\n---\n\n".join(completed_tasks[:10])

    prompt = f"""
    The following are the results from several NLP task evaluations.
    Please provide a brief, high-level summary of the topics covered.
    Do not analyze every single task, but give a general overview of the types of problems solved.

    REPORTS:
    ---
    {summary_input}
    ---
    END OF REPORTS.

    Your summary:
    """

    url = "https://api.openai.com/v1/chat/completions"
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    payload = {
        "model": model_name,
        "messages": [
            {"role": "system", "content": "You are an expert AI analyst tasked with summarizing agent outputs."},
            {"role": "user", "content": prompt}
        ],
        "temperature": 0.5
    }

    try:
        async with session.post(url, json=payload, headers=headers) as response:
            response.raise_for_status()
            if response.headers.get('Content-Type') == 'application/json':
                return await response.json()
            else:
                text = await response.text()
                print(f"Summarizer Error: Unexpected response content type: {response.headers.get('Content-Type')}")
                return {"error": text}
    except aiohttp.ClientError as e:
        print(f"An HTTP error occurred in summarizer: {e}")
        return {"error": str(e)}


# Orchestrator Definition

In [None]:
# The "Swarm Orchestrator" now manages a two-stage process.
async def swarm_orchestrator(tasks, api_key, model_name="gpt-4o-mini"):
    """
    Manages a two-stage process:
    1. A swarm of worker agents to process a list of tasks in parallel.
    2. A summarizer agent to process the collected results.
    """
    print(f"\n--- Orchestrator starting: Managing a swarm of {len(tasks)} agents for model {model_name} ---")
    all_results = []

    async with aiohttp.ClientSession() as session:
        # --- STAGE 1: Worker Agent Swarm ---
        print("\n--- STAGE 1: Dispatching Worker Swarm ---")
        worker_coroutines = [worker_agent(session, task, api_key, model_name) for task in tasks]
        worker_responses = await asyncio.gather(*worker_coroutines, return_exceptions=True)
        print("\n--- All worker agents have completed their tasks. Processing results. ---")

        for i, response in enumerate(worker_responses):
            task_num = i + 1
            input_text = tasks[i]
            if isinstance(response, Exception):
                print(f"Task {task_num} failed with an exception: {response}")
                continue

            if response and 'choices' in response and response['choices']:
                content = response['choices'][0]['message']['content']
                all_results.append(f"Task {task_num}: {content}") # Collect results for summarizer
                try:
                    parts = input_text.split('Solve it:')
                    bb_task = parts[1].strip()
                    display_response(task_num, input_text, content.replace('\n', '<br>'), bb_task)
                except IndexError:
                    display_response(task_num, input_text, content.replace('\n', '<br>'), "Task Description Unavailable")
            else:
                print(f"Error in response for task {task_num}: {input_text}, Response: {response.get('error', response)}")

        # --- STAGE 2: Summarizer Agent ---
        print("\n--- STAGE 2: Dispatching Summarizer Agent ---")
        if all_results:
            summary_response = await summarizer_agent(session, all_results, api_key, model_name)
            if summary_response and 'choices' in summary_response and summary_response['choices']:
                summary_content = summary_response['choices'][0]['message']['content']
                display_summary(summary_content)
            else:
                 print(f"Could not generate summary. Response: {summary_response.get('error', summary_response)}")
        else:
            print("No results to summarize.")


# Helper Functions

In [None]:
# --- Helper Functions ---

def display_response(task_num, input_text, formatted_task, bb_task):
    """
    A simple display function to present the results from a worker agent.
    """
    html_content = f"""
    <html>
      <head>
        <style>
            body {{ font-family: sans-serif; margin: 1em; }}
            h1 {{ font-size: 1.2em; color: #2c3e50; border-bottom: 2px solid #3498db; padding-bottom: 5px;}}
            p {{ line-height: 1.6; color: #34495e; font-size: 0.9em; }}
            .task-card {{ background-color: #f9f9f9; border: 1px solid #ddd; border-left: 5px solid #3498db; padding: 15px; margin-bottom: 20px; border-radius: 5px; }}
        </style>
      </head>
      <body>
        <div class="task-card">
          <h1>Worker Agent Result for Task {task_num}: {bb_task}</h1>
          <p>{formatted_task}</p>
        </div>
      </body>
    </html>
    """
    display(HTML(html_content))

def display_summary(summary_content):
    """
    A new display function for the final summary.
    """
    formatted_summary = summary_content.replace('\n', '<br>')
    html_content = f"""
    <html>
      <head>
        <style>
            body {{ font-family: sans-serif; margin: 1em; }}
            h1 {{ font-size: 1.3em; color: #27ae60; border-bottom: 2px solid #2ecc71; padding-bottom: 5px;}}
            p {{ line-height: 1.6; color: #34495e; }}
            .summary-card {{ background-color: #e8f8f5; border: 1px solid #a3e4d7; border-left: 5px solid #2ecc71; padding: 15px; margin-top: 20px; border-radius: 5px; }}
        </style>
      </head>
      <body>
        <div class="summary-card">
          <h1>Orchestrator's Final Summary</h1>
          <p>{formatted_summary}</p>
        </div>
      </body>
    </html>
    """
    display(HTML(html_content))


# The main execution block

In [None]:
# --- Main Execution Block ---

def main():
    """
    Main function to run the multi-agent simulation.
    """
    print("Setting up the educational multi-agent environment...")
    setup_environment()


    # --- Load Tasks ---
    tasks = load_tasks_from_file()
    if not tasks:
        print("❌ No tasks to process. Exiting.")
        return

    # --- Select Model and Run ---
    selected_model = "gpt-4o-mini"
    print(f"\nSelected Model: {selected_model}")

    start_time = time.time()

    # CORRECTED: The call to the orchestrator was not waiting for completion.
    # A simple `asyncio.run()` call works correctly in both standard
    # Python and notebook environments (thanks to nest_asyncio) and ensures
    # the program waits for all async tasks to finish.
    asyncio.run(swarm_orchestrator(tasks, API_KEY, selected_model))

    end_time = time.time()
    total_time = end_time - start_time
    num_tasks = len(tasks)
    avg_time_per_task = total_time / num_tasks if num_tasks > 0 else 0

    # CORRECTED: Replaced the multiline f-string with standard print calls
    # to prevent syntax errors in all environments.
    print("\n" + "="*50)
    print("SWARM PROCESSING COMPLETE")
    print("="*50)
    print(f"Total Response Time: {total_time:.2f} seconds")
    print(f"Total Tasks Processed: {num_tasks}")
    print(f"Average time per task: {avg_time_per_task:.4f} seconds")
    print("="*50 + "\n")


# To run in a Colab/Jupyter notebook, you would place `main()` in a cell and run it.
# If running as a standard .py file, the following block executes.
if __name__ == "__main__":
    main()

Setting up the educational multi-agent environment...
tasks.txt not found. Downloading...
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 17838  100 17838    0     0  64705      0 --:--:-- --:--:-- --:--:-- 64865
Successfully loaded 144 tasks from tasks.txt

Selected Model: gpt-4o-mini

--- Orchestrator starting: Managing a swarm of 144 agents for model gpt-4o-mini ---

--- STAGE 1: Dispatching Worker Swarm ---

--- All worker agents have completed their tasks. Processing results. ---



--- STAGE 2: Dispatching Summarizer Agent ---

--- Summarizer Agent Activated ---
Task: To summarize the findings from the worker swarm.



SWARM PROCESSING COMPLETE
Total Response Time: 51.93 seconds
Total Tasks Processed: 144
Average time per task: 0.3606 seconds

