### Multi-Agent RAG system
This Jupyter Notebook serves as a proof of concept of a multi-agent solution for MedTech regulations. The intent of the system is to provide clear answers to questions on WHO and FDA documentations on medical devices, using a 3-to-4 agents system composed like this:

- an LLM orchestrator that receives the question and coordinates agents
- a RAG agent capable of retrieving documents related to the question
- an LLM response agent to put together the answer based on the documents retrieved and the question
- a possible fourth agent to be decided (chunking agent, source-verifier agent, compare agent, prompt agent to improve prompts, etc.)

In [1]:
"""

pseudo code for multi agent system

class agent

instantiate orchestrator and other agents

define orchestrator prompt and response false

take input

while not response:

    orchestrator call

    designed agent call

return response

"""

'\n\npseudo code for multi agent system\n\nclass agent\n\ninstantiate orchestrator and other agents\n\ndefine orchestrator prompt and response false\n\ntake input\n\nwhile not response:\n\n    orchestrator call\n\n    designed agent call\n\nreturn response\n\n'

In [36]:
# Import libraries

import chromadb
import uuid
import google.generativeai as genai
import os
from dotenv import load_dotenv
import pymupdf4llm
import json
from chromadb.utils import embedding_functions

In [2]:
# Load environmental variables and AI models

load_dotenv() # load the API key and put in a .env file
try:
    genai.configure(api_key=os.environ['GOOGLE_API_KEY'])
except Exception as e:
    print(f"Error configuring Google AI. Please ensure your API key is correct. Error: {e}")

# Configure paths to data
current_dir = os.getcwd() 
FILE_PATH_FDA_DESIGN = os.path.join(current_dir, '..', 'documents', 'FDA_Design_Control_Guidance.pdf')
FILE_PATH_WHO = os.path.join(current_dir, '..', 'documents', 'WHO_Medical_Device_Regulations.pdf')
FILE_PATH_FDA_POLICY = os.path.join(current_dir, '..', 'documents', 'FDA_Policy_Device_Software_Functions.pdf')
COLLECTION_NAME = "multi_agent_rag"

In [37]:
# Split logic (very simple, splitting paragraphs and skipping short ones)

def split_into_chunks(dict_list):
    text_chunks = []
    metadatas = []
    
    for document in dict_list:
        try:
            paragraphs = list(document.values())[0].split("\n\n")
    
            for paragraph in paragraphs: # Delete short paragraphs
                if len(paragraph) > 10:
                    text_chunks.append(paragraph)
                    metadatas.append({"source": list(document.keys())[0]})
        except Exception as e:
            print(f"An error occurred while processing {list(document.keys())[0]}: {e}")
            
    return text_chunks, metadatas

# Use pymupdf4llm to convert pdf into text, adequately formatted. Using dicts to keep track of sources and add them to metadata while chunking
fda_design_dict = {'FDA_Design_Control_Guidance.pdf': pymupdf4llm.to_markdown(FILE_PATH_FDA_DESIGN)}
who_dict = {'WHO_Medical_Device_Regulations.pdf': pymupdf4llm.to_markdown(FILE_PATH_WHO)}
fda_policy_dict = {'FDA_Policy_Device_Software_Functions.pdf': pymupdf4llm.to_markdown(FILE_PATH_FDA_POLICY)}

text_chunks, metadatas = split_into_chunks([fda_design_dict, who_dict, fda_policy_dict])
print(len(text_chunks), len(metadatas))

1657 1657


In [114]:
# Second chunking logic: texting up the documents. I'll convert the pdf in txt and see if the RAG handles them better.
# Result: it worked worse.

# Configure paths to data
current_dir = os.getcwd() 
FILE_PATH_FDA_DESIGN = os.path.join(current_dir, '..', 'documents', 'FDA_Design_Control_Guidance.txt')
FILE_PATH_WHO = os.path.join(current_dir, '..', 'documents', 'WHO_Medical_Device_Regulations.txt')
FILE_PATH_FDA_POLICY = os.path.join(current_dir, '..', 'documents', 'FDA_Policy_Device_Software_Functions.txt')
COLLECTION_NAME = "multi_agent_rag"

def process_txt_file(file_path, source_name):
    all_text_chunks = []
    all_metadatas = []
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            full_text = f.read()

        # Chunking strategy: split by paragraph
        paragraphs = full_text.split('\n\n')

        for para in paragraphs:
            stripped_para = para.strip()
            if len(stripped_para) > 25:  # Filter out very short paragraphs or empty lines
                all_text_chunks.append(stripped_para)
                # Metadata is simpler for a txt file, just the source
                all_metadatas.append({'source': source_name})

    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
    except Exception as e:
        print(f"An error occurred while processing {file_path}: {e}")

    return all_text_chunks, all_metadatas

chunks_fdad, metas_fdad = process_txt_file(FILE_PATH_FDA_DESIGN, 'FDA_Design_Control_Guidance')
print(f"Processed FDA_Design: {len(chunks_fdad)} chunks.")

chunks_who, metas_who = process_txt_file(FILE_PATH_WHO, 'WHO_Medical_Device_Regulations')
print(f"Processed WHO: {len(chunks_who)} chunks.")

chunks_fdap, metas_fdap = process_txt_file(FILE_PATH_FDA_POLICY, 'FDA_Policy_Device_Software_Functions')
print(f"Processed FDA Policy: {len(chunks_fdap)} chunks.")

text_chunks = chunks_fdad + chunks_who + chunks_fdapS
metadatas = metas_fdad + metas_who + metas_fdap

Processed FDA_Design: 67 chunks.
Processed WHO: 120 chunks.
Processed FDA Policy: 100 chunks.


In [34]:
# Define agent classes. Every agent will have a memory variable with a memory limit, and this will be the context that gets passed along.
# It would be cleaner to build a 'state' that every agent can access and update (similar to the logic of LangGraph), but this works fine for a POC.

class OrchestratorAgent:

    def __init__(self, agents):
        self.agents = agents
        self.memory = []
        self.memory_limit = 15

        self.model = genai.GenerativeModel('gemini-1.5-flash')
        #self.model = genai.GenerativeModel('gemini-2.0-flash-thinking-exp-01-21')
        self.generation_config = {
            "temperature": 0.2,
            "top_k": 10,
            "top_p": 0.85,
            "max_output_tokens": 8192
        }

    def run(self): # The main function, where the loop is. Keeps elaborating the input and calling an agent until the user cuts.
        print("Hi! I am Camille, your MedTech regulatory companion. How can I help you?")
        user_input = input("You: ")

        while True:
            self.memory = self.memory[-self.memory_limit:]
            if user_input.lower() in ["exit", "bye", "close"]:
                print("I hope I could be of use to you, have a great day!")
                break  
            orch_response = self.orchestrate(user_input)
            if orch_response["agent_to_call"] == "No action needed":
                print("Is there anything else I can help you with?")
                user_input = input("You: ")
            for agent in self.agents:
                if agent.name == orch_response["agent_to_call"]:
                    print(f"Found agent I was looking for: {agent.name}\n")
                    response = agent.act(orch_response["output"], orch_response["relevant_info"], self.memory)
                    self.memory.append(f"Agent {agent.name} responded {response}")     
        return response

    def orchestrate(self, user_input):
        self.memory.append(f"User: {user_input}")
        self.memory = self.memory[-self.memory_limit:]
        context = "\n".join(self.memory)
        response_format = {"agent_to_call":"", "output": "", "relevant_info":""}
        response = self.model.generate_content(self.get_prompt(context, response_format), generation_config = self.generation_config)
        self.memory.append(f"Orchestrator: {response.text}")
        response_cleaned = clean_response(response)
        return response_cleaned

    def get_prompt(self, context, response_format):
        prompt = f"""
        Act as an orchestrator agent for an intelligent RAG system for MedTech companies. Your task is to coordinate agents in order to extract relevant documents from a RAG system and package a coherent and precise answer to the query received.
        The task is to call a first time the ragagent, and then use the ragagent documents to call the response_agent on them and craft a response. Make sure to call the response_agent if the history contains a previous call to the ragagent.
        Your AI agents and their descriptions are {", ".join([f"- {agent.name}: {agent.description}" for agent in self.agents])}

        Use the context, which includes the current user input and the memory of previous inputs and outputs, to plan next steps.
        Context : {context}

        Guidelines:
        At every step, you need to choose only one of the agents provide instruction to only that agent. If the request needs multiple agent to be solved, do that in a loop.
        Read the context, take your time to understand the task, and check if you have executed it correctly.
        If there are no actions needed, default the "agent_to_call" parameter to "No action needed" in the response.
        Return only the agent name in the "agent_to_call" parameter.
        You will return instructions in a valid JSON in the form of {response_format}. All output should be of string type, the "output" is for the query and the "relevant_info" is to attach documents from the RAG for the response agent.
        """
        return prompt

class RAGAgent:

    def __init__(self):
        self.name = "ragagent"
        self.description = """I am a RAG agent that can search for relevant documents in a vector database in order to answer a query.
                            I expect a user query as input and will return relevant chunks and a variable containing sources info, relevance scores and chunks.
                            """
        self.memory = []
        self.memory_limit = 15
        self.sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="all-MiniLM-L6-v2")
        self.collection_name = COLLECTION_NAME

    def act(self, query, relevant_info, memory): # Checks whether the db exists before querying it.
        if "RAG initialized" not in memory:
            self.collection = self.initialize_db()
            self.memory.append("RAG initialized")
        self.memory.append(memory)
        self.memory = self.memory[-self.memory_limit:]
        documents = self.query_db(query)
        return documents

    def initialize_db(self):
        print("Initializing RAG system... This may take a minute.")
        self.client = chromadb.Client()

        if self.collection_name in [c.name for c in self.client.list_collections()]:
            self.client.delete_collection(name = self.collection_name)
            #print(f"Deleted existing collection: {self.collection_name}"
    
        collection = self.client.get_or_create_collection(
        name = self.collection_name,
        embedding_function = self.sentence_transformer_ef
        )
        self.load_documents(collection, text_chunks, metadatas)

        return collection

    def load_documents(self, collection, document_chunks, metadatas):
        collection.add(
        ids = [str(uuid.uuid4()) for _ in text_chunks],
        documents = text_chunks,
        metadatas = metadatas)
    
    def query_db(self, question):
        results = self.collection.query(query_texts=[question], include = ["documents", "metadatas", "distances"], n_results=5)

        sources_markdown = "### Sources Used for Analysis\n\n"
        retrieved_documents = results['documents'][0]
        retrieved_metadatas = results['metadatas'][0]
        retrieved_distances = results['distances'][0]
    
        for i, (doc, meta, dist) in enumerate(zip(retrieved_documents, retrieved_metadatas, retrieved_distances)):
            # Convert distance to a more intuitive similarity score (1 - distance)
            relevance_score = 1 - dist
            source_info = f"**Source {i+1}:** {meta.get('source', 'N/A')}, Page {meta.get('page', 'N/A')}\n"
            relevance_info = f"**Relevance Score:** {relevance_score:.2f}\n\n"
            content_info = f"```\n{doc}\n```\n\n---\n\n"
            sources_markdown += source_info + relevance_info + content_info
        print(f"Sources, documents and relevance score: {sources_markdown}")
            
        return sources_markdown

class ResponseAgent:
    def __init__(self):
        self.name = "response_agent"
        self.description = """I am a response agent that expects as input a user query and relevant documents and info from a RAG search.
                            My task is to craft a precise response for the user based on the provided documents. I will return a response text.
                            """

        self.memory = []
        self.memory_limit = 15
        self.model = genai.GenerativeModel('gemini-1.5-flash')
        #self.model = genai.GenerativeModel('gemini-2.0-flash-thinking-exp-01-21')
        self.generation_config = {
            "temperature": 0.2,
            "top_k": 10,
            "top_p": 0.85,
            "max_output_tokens": 8192
        }
    
    def act(self, query, relevant_info, memory):
        prompt = self.get_prompt(query, relevant_info, memory)
        response = self.model.generate_content(prompt, generation_config = self.generation_config)
        print(response.text)
        return response.text

    def get_prompt(self, query, relevant_info, memory):
        self.memory = self.memory[-self.memory_limit:]
        prompt = f"""
        Act as Camille, an AI companion acting as a Senior Consultant for medical devices. You receive a query from your client, and answer to it based on the relevant information you receive from the RAG system as document chunks.
        Be precise, confident and do not make things up. If the context is not enough to provide a clear answer, state it.
        Cite the documents and sources you receive as part of your input and provide strategic recommendation. The structure of your answer will be:
        - Salutation telling who you are.
        - Precise response to the query based on the documents received from the RAG.

        As additional resources:
        User input: {query}
        Relevant documents with sources and relevance scores: {relevant_info}
        Memory of previous inputs and info: {memory}
        """
        return prompt

In [35]:
# Instantiate agent objects

def clean_response(response):
    response_text = response.text
    #print(f"Cleaning response {response.text}")
    start_index = response_text.find("{")
    end_index = response_text.rfind("}")
    if start_index != -1 and end_index != -1:
        response_cleaned = json.loads(response_text[start_index : end_index + 1])
        #print(f"Response cleaned is {response_cleaned}")
        return response_cleaned
    else:
        return response_text

ragagent = RAGAgent()
response_agent = ResponseAgent()
orchestrator = OrchestratorAgent([ragagent, response_agent])

final_answer = None
history = [] 
query1 = "Is our AI-powered MRI analysis tool considered a medical device software?"
query2 = "What are the design control requirements for verification and validation?"
query3 = "Compare FDA and WHO approaches to risk management for medical devices"
query4 = "What documentation is needed for a mobile app that monitors heart rate?"
count_rag = 0
count_final = 0

orchestrator.run()

Hi! I am Camille, your MedTech regulatory companion. How can I help you?


You:  What are the design control requirements for verification and validation?


Found agent I was looking for: ragagent

Initializing RAG system... This may take a minute.
Sources, documents and relevance score: ### Sources Used for Analysis

**Source 1:** FDA_Design_Control_Guidance.pdf, Page N/A
**Relevance Score:** 0.45

```

be performed by device manufacturers. Rather, the manufacturer should select and apply
appropriate verification techniques based on the generally accepted practices for the
technologies employed in their products. Many of these practices are an integral part of
the development process, and are routinely performed by developers. The objective of
design controls is to ensure adequate oversight by making verification activities explicit
and measuring the thoroughness of their execution. Following are a few examples of
verification methods and activities.
```

---

**Source 2:** FDA_Design_Control_Guidance.pdf, Page N/A
**Relevance Score:** 0.43

```

  - Each manufacturer shall establish and maintain procedures for validating the device
desig