# Multi-Agents Orchestration Notebook

Ce notebook illustre un pipeline complet pour orchestrer la collaboration de plusieurs agents (Coder, Reviewer, Admin) autour d’un **notebook cible** à finaliser.

---

### Étapes principales

1. **Sélection du notebook à finaliser** (via un prompt ou dropdown `ipywidgets`).
2. **Objectif facultatif** : l’utilisateur peut préciser un objectif global qui sera injecté en contexte si présent.
3. **Chargement d’un état partagé** (`NotebookState`) permettant :
   - de conserver un cache d’exécution (pour éviter de relancer Papermill après chaque modification si le notebook n’a pas changé)  
   - de gérer les modifications successives du `Coder`  
4. **Définition des plugins** (Coder, Reviewer, Admin) :
   - Coder : `update_cell`  
   - Reviewer : `get_notebook_status`  
   - Admin : `submit_for_approval`
5. **Création d’un `AgentGroupChat`** multi-agents sous Semantic Kernel, plus un système de **function calling** centralisé pour manipuler le notebook.
6. **Boucle itérative** gérée par des stratégies de sélection et de terminaison (Semantic Kernel) pour arrêter la conversation une fois le notebook approuvé.

---


In [7]:
# Cellule Code : Installation
%pip install papermill nbformat ipywidgets semantic-kernel --quiet


Note: you may need to restart the kernel to use updated packages.


### Configuration du Logger

Ici, nous activons un logger verbeux, avec couleurs, pour afficher toutes les actions (modifications du notebook, appels de fonctions, messages entre agents, etc.).


In [8]:
# Cellule Code : Imports et configuration
import os
import json
import hashlib
import logging
import nbformat
import papermill as pm
from datetime import datetime

# Pour la conversation multi-agents
from semantic_kernel import Kernel
from semantic_kernel.agents import ChatCompletionAgent, AgentGroupChat
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion

# On importe la base pour nos stratégies
from semantic_kernel.agents.strategies.selection.selection_strategy import SelectionStrategy
from semantic_kernel.agents.strategies.termination.termination_strategy import TerminationStrategy
from semantic_kernel.contents import ChatMessageContent, AuthorRole

# Configuration d'un logger simple
logger = logging.getLogger("Orchestration")
logger.setLevel(logging.DEBUG)
if not logger.handlers:
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s - %(message)s")
    ch.setFormatter(formatter)
    logger.addHandler(ch)

logger.info("Logger ready. Imports done.")


2025-02-11 22:21:29,074 [INFO] Orchestration - Logger ready. Imports done.


## Sélection du notebook à finaliser et Objectif facultatif

### Définition de l'état du Notebook: la classe NotebookState


In [9]:
import os
import json
import nbformat
import papermill as pm
import hashlib
import logging


class NotebookState:
    """
    Version simplifiée de la gestion d'un notebook :
      - Charge un notebook depuis `notebook_path`
      - Met à jour directement sur disque lors de `update_cell()`
      - Exécute directement en place (papermill input_path = output_path)
    """

    def __init__(self, notebook_path: str):
        self.notebook_path = notebook_path
        self._cached_notebook = None
        self._is_approved = False
        self._load_notebook()

    def _load_notebook(self):
        """Lit le notebook sur disque et le stocke en mémoire."""
        if not os.path.exists(self.notebook_path):
            raise FileNotFoundError(f"Notebook introuvable: {self.notebook_path}")

        with open(self.notebook_path, "r", encoding="utf-8") as f:
            self._cached_notebook = nbformat.read(f, as_version=4)

        logger.debug(f"Notebook '{self.notebook_path}' chargé, "
                     f"nb de cellules={len(self._cached_notebook.cells)}.")

    def get_notebook_json(self) -> str:
        """
        Retourne le contenu du notebook en mémoire au format JSON.
        (Ne relance plus Papermill automatiquement).
        """
        raw_json = json.dumps(self._cached_notebook, indent=2, ensure_ascii=False)
        return raw_json

    # Dans la classe NotebookState, ajouter une méthode pour logger le JSON
    def log_notebook_state(self):
        """Log le contenu JSON du notebook (version simplifiée pour les logs)."""
        notebook_json = self.get_notebook_json()
        logger.debug(f"[NotebookState] Current notebook state (simplified):\n{notebook_json[:10000]}...")  # Truncate pour lisibilité


    def update_cell(self, cell_index: int, new_source: str):
        """Modifie immédiatement la source d'une cellule, puis sauvegarde sur disque."""
        old = self._cached_notebook.cells[cell_index].source
        self._cached_notebook.cells[cell_index].source = new_source

        logger.info(f"[NotebookState] Mise à jour de la cellule {cell_index}")
        logger.debug(f"Ancien contenu:\n{old}")
        logger.debug(f"Nouveau contenu:\n{new_source}")

        # Sauvegarde immédiate
        self.save_notebook()
        logger.debug(f"[update_cell] Modification et sauvegarde effectuées pour la cellule {cell_index}.")

    def execute_notebook(self):
        """
        Exécute le notebook "en place", i.e. input=output sur le même fichier.
        On sauvegarde d'abord pour être sûr que le disque est à jour.
        """
        logger.info(f"[NotebookState] Exécution Papermill sur {self.notebook_path} (in place).")
        self.save_notebook()  # S'assure que la version disque est à jour avant exécution

        try:
            pm.execute_notebook(
                self.notebook_path,
                self.notebook_path,      # <--- exécution en place
                kernel_name="python3",
                progress_bar=False,
                log_output=True
            )
        except Exception as e:
            logger.error(f"[execute_notebook] Erreur lors de l'exécution: {e}")
            raise

        # On recharge le contenu avec les outputs produits
        self._load_notebook()

        logger.info("[NotebookState] Exécution OK, notebook mis à jour (in place).")

    def save_notebook(self, path: str = ""):
        """
        Sauvegarde en JSON. 
        Si `path` est vide, on réécrit dans `self.notebook_path`.
        """
        if not path:
            path = self.notebook_path

        # On calcule un hash avant sauvegarde (optionnel)
        current_content = json.dumps(self._cached_notebook, sort_keys=True)
        current_hash = hashlib.md5(current_content.encode('utf-8')).hexdigest()

        try:
            with open(path, "w", encoding="utf-8") as f:
                nbformat.write(self._cached_notebook, f)
            logger.info(f"[NotebookState] Notebook sauvegardé sous {path}")
        except Exception as e:
            logger.error(f"[save_notebook] Erreur de sauvegarde: {e}")
            raise

        # Vérification après sauvegarde (optionnel)
        try:
            with open(path, "r", encoding="utf-8") as f:
                saved_notebook = nbformat.read(f, as_version=4)
            saved_content = json.dumps(saved_notebook, sort_keys=True)
            saved_hash = hashlib.md5(saved_content.encode('utf-8')).hexdigest()
        except Exception as e:
            logger.error(f"[save_notebook] Erreur lors de la relecture du notebook: {e}")
            raise

        if current_hash != saved_hash:
            logger.warning("[save_notebook] Le contenu relu ne correspond pas au contenu en mémoire!")
        else:
            logger.debug("[save_notebook] Vérification OK, contenu identique.")

        # On met à jour en mémoire si on a sauvegardé dans un autre path
        if path != self.notebook_path:
            self.notebook_path = path
            self._load_notebook()

    # Optionnel : gestion de l'approbation
    def is_approved(self) -> bool:
        return self._is_approved

    def set_approved(self, approved: bool = True):
        self._is_approved = approved
        logger.info(f"[NotebookState] set_approved({approved}).")


### Test du Notebook state

In [10]:
import shutil
import logging



# Exemple de texte de tâche DBpedia
DBPEDIA_TASK_DESCRIPTION = (
    "Créer un notebook Python permettant de requêter DBpedia (SPARQL) et d’afficher "
    "un graphique final avec rdflib ou SPARQLWrapper et plotly.\n\n"
    "**Résultat attendu** :\n"
    "1. Une requête complexe sur DBpedia (avec agrégats) correctement exécutée.\n"
    "2. Un graphique plotly **pertinent** reflétant les données issues de DBpedia.\n"
)

def apply_task_description(notebook_state: NotebookState, task_description: str) -> bool:
    """Injecte la description de tâche dans la cellule appropriée."""
    updated = False
    for idx, cell in enumerate(notebook_state._cached_notebook.cells):
        if cell.cell_type == "markdown" and "## 0. Objectif du Notebook" in cell.source:
            new_source = cell.source
            if "{{TASK_DESCRIPTION}}" in new_source:
                new_source = new_source.replace(
                    "{{TASK_DESCRIPTION}}",
                    f"```markdown\n{task_description}\n```"  # Bloc markdown pour la lisibilité
                )
                new_source = new_source.replace("TASK_INSTRUCTION_PLACEHOLDER", "")
                
                # La méthode update_cell() de NotebookState
                # sauvegarde immédiatement le fichier modifié.
                notebook_state.update_cell(idx, new_source)
                
                logger.debug(f"[apply_task_description] Cellule {idx} mise à jour.")
                updated = True
            # On quitte la boucle une fois la bonne cellule trouvée.
            break
    return updated


SOURCE_NOTEBOOK = "Notebook-Template.ipynb"
DEST_NOTEBOOK = "Notebook-TaskDBpedia.ipynb"

# 1) On copie d'abord le template pour ne pas le toucher.
shutil.copy2(SOURCE_NOTEBOOK, DEST_NOTEBOOK)
logger.info(f"Copie du template -> {DEST_NOTEBOOK}")

# 2) On charge le nouveau notebook dans NotebookState
notebook_state = NotebookState(DEST_NOTEBOOK)

# 3) On injecte la description DBpedia
changed = apply_task_description(notebook_state, DBPEDIA_TASK_DESCRIPTION)

if changed:
    logger.info("Le placeholder DBpedia a bien été injecté.")

    # 4) Exécution du notebook en place (optionnel)
    logger.info("Exécution du notebook en place...")
    notebook_state.execute_notebook()  
    logger.info("Notebook ré-exécuté après injection de la tâche DBpedia.")

else:
    logger.warning("Aucun placeholder n'a été trouvé. Vérifiez que {{TASK_DESCRIPTION}} est présent.")


2025-02-11 22:21:29,204 [INFO] Orchestration - Copie du template -> Notebook-TaskDBpedia.ipynb
2025-02-11 22:21:29,268 [DEBUG] Orchestration - Notebook 'Notebook-TaskDBpedia.ipynb' chargé, nb de cellules=11.
2025-02-11 22:21:29,270 [INFO] Orchestration - [NotebookState] Mise à jour de la cellule 1
2025-02-11 22:21:29,271 [DEBUG] Orchestration - Ancien contenu:
## 0. Objectif du Notebook

Dans cette section, nous décrivons l'objectif du notebook, sa fonction, les outils à utiliser et les buts à atteindre.

### Tâche originale

Voilà la tâche telle qu'elle a été initialement formulée :

{{TASK_DESCRIPTION}}

TASK_INSTRUCTION_PLACEHOLDER

### Interprétation et sous-objectifs

Décrire ici l'interprétation de la tâche et les étapes prévues pour la réaliser.
2025-02-11 22:21:29,272 [DEBUG] Orchestration - Nouveau contenu:
## 0. Objectif du Notebook

Dans cette section, nous décrivons l'objectif du notebook, sa fonction, les outils à utiliser et les buts à atteindre.

### Tâche originale

Voi

### Plugins


In [11]:
from semantic_kernel.functions import kernel_function

class BaseNotebookPlugin:
    """Classe de base pour manipuler NotebookState."""
    def __init__(self, state: NotebookState):
        self.state = state

    @kernel_function(
        name="get_notebook_content",
        description="Renvoie le notebook complet au format JSON."
    )
    def get_notebook_content(self) -> str:
        logger.debug("[Plugin] get_notebook_content appelé.")
        return self.state.get_notebook_json()


class CoderNotebookPlugin(BaseNotebookPlugin):
    @kernel_function(
        name="update_cell",
        description="Met à jour la source d'une cellule (par son index)."
    )
    def update_cell(self, cell_index_str: str, new_source: str) -> str:
        logger.info(f"[CoderNotebookPlugin] update_cell({cell_index_str}) appelé.")
        try:
            cell_index = int(cell_index_str)
            self.state.update_cell(cell_index, new_source)
            return f"Cellule {cell_index} mise à jour avec succès."
        except Exception as e:
            logger.error(f"[CoderNotebookPlugin] Erreur lors de update_cell: {str(e)}")
            return f"Erreur lors de la mise à jour: {str(e)}"


class ReviewerNotebookPlugin(BaseNotebookPlugin):
    @kernel_function(
        name="validate_notebook",
        description="Exécute le notebook via papermill, renvoie un résumé."
    )
    def validate_notebook(self) -> str:
        logger.info("[ReviewerNotebookPlugin] validate_notebook() appelé => exécution.")
        self.state.execute_notebook()
        return "Notebook exécuté et validé (potentiellement)."


class AdminNotebookPlugin(BaseNotebookPlugin):
    @kernel_function(
        name="approve_notebook",
        description="Marque le notebook comme approuvé."
    )
    def approve_notebook(self) -> str:
        logger.debug("[AdminNotebookPlugin] approve_notebook() appelé.")
        self.state.set_approved(True)
        logger.info("[AdminNotebookPlugin] Notebook APPROVED.")
        return "Notebook APPROVED."


# Cellule Markdown : Stratégies


In [12]:
from pydantic import PrivateAttr

class ApprovedBasedTerminationStrategy(TerminationStrategy):
    _state: NotebookState = PrivateAttr()
    _max_steps: int = PrivateAttr(default=10)  # Limite de sécurité

    def __init__(self, state: NotebookState, max_steps: int = 10):
        super().__init__()
        self._state = state
        self._max_steps = max_steps
        self._current_step = 0

    async def should_agent_terminate(self, agent, history):
        self._current_step += 1
        is_approved = self._state.is_approved()
        logger.debug(
            f"[TerminationStrategy] Step={self._current_step}/{self._max_steps}, "
            f"IsApproved={is_approved}"
        )

        if is_approved:
            logger.info("[TerminationStrategy] Notebook approuvé => on arrête.")
            return True

        if self._current_step >= self._max_steps:
            logger.warning(f"[TerminationStrategy] max_steps={self._max_steps} atteint => on arrête.")
            return True

        return False



class NotebookAwareSelectionStrategy(SelectionStrategy):
    _state: NotebookState = PrivateAttr()
    _index: int = PrivateAttr(default=-1)

    def __init__(self, state: NotebookState):
        super().__init__()
        self._state = state
        self._index = -1

    def reset(self) -> None:
        self._index = -1

    async def select_agent(self, agents, history):
        logger.debug(f"[SelectionStrategy] Nombre d'agents: {len(agents)}")
        
        if self._state.is_approved():
            logger.info("[SelectionStrategy] Le notebook est déjà approuvé.")
            return None
        
        if not agents:
            logger.warning("[SelectionStrategy] Auncun agent disponible.")
            return None
        
        self._index = (self._index + 1) % len(agents)
        selected = agents[self._index]
        logger.debug(f"[SelectionStrategy] L'agent sélectionné est: {selected.name}")
        
        return selected

## Cellule Markdown : Création des agents


In [13]:
from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior
from semantic_kernel.functions.kernel_arguments import KernelArguments

def create_kernel_for_agent(agent_id: str, plugin_instance) -> Kernel:
    k = Kernel()
    
    # 1) Ajout du service ChatCompletion
    k.add_service(OpenAIChatCompletion(service_id="default"))
    
    # 2) Ajout du plugin
    k.add_plugin(plugin_instance, plugin_name="notebook_plugin")
    return k

# Instanciation des plugins
coder_plugin = CoderNotebookPlugin(notebook_state)
reviewer_plugin = ReviewerNotebookPlugin(notebook_state)
admin_plugin = AdminNotebookPlugin(notebook_state)

# Kernels
coder_kernel = create_kernel_for_agent("coder_kernel", coder_plugin)
reviewer_kernel = create_kernel_for_agent("reviewer_kernel", reviewer_plugin)
admin_kernel = create_kernel_for_agent("admin_kernel", admin_plugin)

# Configuration auto-invocation des fonctions
coder_settings = coder_kernel.get_prompt_execution_settings_from_service_id("default")
coder_settings.function_choice_behavior = FunctionChoiceBehavior.Auto()

reviewer_settings = reviewer_kernel.get_prompt_execution_settings_from_service_id("default")
reviewer_settings.function_choice_behavior = FunctionChoiceBehavior.Auto()

admin_settings = admin_kernel.get_prompt_execution_settings_from_service_id("default")
admin_settings.function_choice_behavior = FunctionChoiceBehavior.Auto()

# Agents
coder_agent = ChatCompletionAgent(
    service_id="coder_kernel",
    kernel=coder_kernel,
    name="CoderAgent",
    instructions=(
        "Vous êtes le Coder. Procédez comme suit :\n"
        "1. Utilisez 'get_notebook_content' pour analyser le notebook\n"
        "2. Modifiez les cellules EXISTANTES avec 'update_cell(index, nouveau_code)'\n"
        "3. Les index commencent à 0\n"
        "4. Soyez précis dans les modifications!\n"
        "5. Confirmez toujours après modification"
    ),
    arguments=KernelArguments(settings=coder_settings)
    # Ne PAS mettre allow_function_calls=True si votre version ne le gère pas
)

reviewer_agent = ChatCompletionAgent(
    service_id="reviewer_kernel",
    kernel=reviewer_kernel,
    name="ReviewerAgent",
    instructions=(
      "Vous êtes le Reviewer. "
      "1) Exécutez le notebook avec 'validate_notebook'. "
      "2) S'il y a une erreur Python ou un code manquant compte tenu à l'objectif annoncé, signalez-le et demandez à Coder de corriger. "
      "3) Sinon, déclarez « OK pour moi » et laissez l'Admin décider de l'approbation."
    ),
    arguments=KernelArguments(settings=reviewer_settings)
)

admin_agent = ChatCompletionAgent(
    service_id="admin_kernel",
    kernel=admin_kernel,
    name="AdminAgent",
    instructions=(
        "Vous êtes l'Admin. Approuvez le notebook UNIQUEMENT si le Reviewer a validé.\n"
        "Pour approuver, appelez 'approve_notebook()'. Sinon, demandez des corrections."
    ),
    arguments=KernelArguments(settings=admin_settings)
)

# GroupChat avec stratégies
termination_strategy = ApprovedBasedTerminationStrategy(notebook_state)
selection_strategy = NotebookAwareSelectionStrategy(notebook_state)

group_chat = AgentGroupChat(
    agents=[coder_agent, reviewer_agent, admin_agent],
    selection_strategy=selection_strategy,
    termination_strategy=termination_strategy
)

logger.info("Agents créés, group_chat initialisé.")


2025-02-11 22:21:33,635 [INFO] Orchestration - Agents créés, group_chat initialisé.


## Boucle finale


In [14]:
import asyncio
from semantic_kernel.contents.function_call_content import FunctionCallContent
from semantic_kernel.contents.function_result_content import FunctionResultContent

async def run_conversation():
    try:
        logger.info("Version initiale du  notebook")
        notebook_state.log_notebook_state()
        initial_content = notebook_state.get_notebook_json()
        group_chat.history.add_system_message(f"NOTEBOOK CONTENT:\\n{initial_content}")
        group_chat.history.add_user_message(
            "Bonjour, j'aimerais finaliser ce notebook qui contient des instructions et des cellules à compléter..."
        )
        
        iteration = 0
        async for message in group_chat.invoke():
            iteration += 1
            logger.debug(f"--- Iteration #{iteration} ---")
            
            # Log des appels de fonction
            for item in message.items:
                if isinstance(item, FunctionCallContent):
                    logger.info(f"FUNCTION CALL: {item.name}({item.arguments})")
                if isinstance(item, FunctionResultContent):
                    logger.info(f"FUNCTION RESULT: {item.result}")

            role = message.role.name
            content = message.content
            logger.info(f"{role.upper()} >>> {content}")

            if notebook_state.is_approved():
                logger.info("Notebook approuvé - arrêt de la conversation")
                break
            
        
        logger.info("Version finale sauvegardée dans le notebook")
        notebook_state.log_notebook_state()

    except Exception as e:
        logger.error(f"Erreur inattendue: {str(e)}")
    finally:
        logger.info(f"Statut final - Approuvé: {notebook_state.is_approved()}")
        logger.info("Conversation terminée.")
        
        
await run_conversation()


2025-02-11 22:21:33,654 [INFO] Orchestration - Version initiale du  notebook
2025-02-11 22:21:33,656 [DEBUG] Orchestration - [NotebookState] Current notebook state (simplified):
{
  "cells": [
    {
      "cell_type": "markdown",
      "id": "516d2854",
      "metadata": {
        "papermill": {
          "duration": 0.005046,
          "end_time": "2025-02-11T21:21:31.732101",
          "exception": false,
          "start_time": "2025-02-11T21:21:31.727055",
          "status": "completed"
        },
        "tags": []
      },
      "source": "# Notebook de travail\n\nCe notebook est généré de façon incrémentale pour accomplir la tâche décrite ci-dessous."
    },
    {
      "cell_type": "markdown",
      "id": "2a226db6",
      "metadata": {
        "papermill": {
          "duration": 0.004966,
          "end_time": "2025-02-11T21:21:31.742175",
          "exception": false,
          "start_time": "2025-02-11T21:21:31.737209",
          "status": "completed"
        },
        "t