# RAG for French Competitive Exam Reports (Rapports de Jury)

## Libraries

In [None]:
import os
import psycopg
from langchain_ollama.llms import OllamaLLM
from urllib.parse import quote
from langchain_ollama import OllamaEmbeddings
from langchain_postgres import PGVector
from langchain_postgres.vectorstores import PGVector
from dotenv import load_dotenv
from langchain_core.output_parsers import StrOutputParser, PydanticOutputParser
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from typing import Optional, Literal
from time import sleep
from itertools import chain
from langchain_openai import ChatOpenAI

load_dotenv()

## Environment Variables

In [None]:
username = os.getenv("USER_DB")
password = quote(os.getenv("PASSWORD_DB"))
host = os.getenv("HOST_DB")
database = os.getenv("DATABASE_DB")
port = os.getenv("PORT_DB")

## LLM Setup (change model as needed)

In [None]:
llm = OllamaLLM(model="llama3.1")  # ChatOpenAI()  # do not forget to replace the embeddings and to recreate the db



## Test Connection to PostgreSQL (PGVector)

In [None]:
try:
    connection = f'postgresql://{username}:{password}@{host}:{port}/{database}'
    with psycopg.connect(conninfo=connection) as conn:
        print("Connection successful!")
except psycopg.OperationalError as e:
    print(f"OperationalError: {e}")
except Exception as e:
    print(f"Connection failed: {e}")

## Connect to Vector Store

In [None]:
connection = f'postgresql://{username}:{password}@{host}:{port}/{database}'
collection_name = "rapports_jurys"
embeddings = OllamaEmbeddings(model="llama3.1") 

vector_store = PGVector(
    embeddings=embeddings,
    collection_name=collection_name,
    connection=connection,
    use_jsonb=True,
)

## Helper functions for RAG

### Define Structured Metadata Schema for Questions

In [None]:
class Question(BaseModel):
    exam: Optional[Literal["MATHEMATIQUES A", "MATHEMATIQUES B", "MATHEMATIQUES C","MATHEMATIQUES D", "MATHEMATIQUES", "MATHEMATIQUES 1", "MATHEMATIQUES 2", "INFORMATIQUE", "INFORMATIQUE A", "INFORMATIQUE B", "INFORMATIQUE C"]] = Field(
        None, description="Le nom de l'épreuve (par exemple, MATHEMATIQUES, MATHEMATIQUES A, INFORMATIQUE C, INFORMATIQUE, etc.) en majuscule. Les valeurs possibles sont MATHEMATIQUES A, MATHEMATIQUES B, MATHEMATIQUES C, MATHEMATIQUES D, MATHEMATIQUES, MATHEMATIQUES 1, MATHEMATIQUES 2, INFORMATIQUE, INFORMATIQUE A, INFORMATIQUE B, INFORMATIQUE C, ou None."
    )
    type: Optional[Literal["écrit", "oral"]] = Field(
        None, description="Le type de l'examen. Les valeurs possibles sont 'écrit', 'oral', ou None."
    )
    contest: Optional[Literal["X-ENS", "Concours Centrale-Supélec"]] = Field(
        None, description="""Le concours (par exemple, X-ENS). Les valeurs possibles q sont 'X-ENS', 'Concours Centrale-Supélec', ou None
        X-ENS a plusieurs synonymes comme Polytechnique, X, ENS etc.
        Concours Centrale-Supélec a également plusieurs synonymes comme centrale, supelec, etc.
        Choisis le plus probable entre les valeurs possibles qui sont 'X-ENS', 'Concours Centrale-Supélec', ou None. 
        """
    )
    year: Optional[str] = Field(
        None, description="L'année de la question (par exemple, 2022)."
    )
    levels: Optional[Literal["MP", "PC", "MPI", "PSI"]] = Field(
        None, description="Les niveaux ou préfixes de la question (par exemple, MP, PC). Les valeurs possibles sont 'MP', 'PC', 'MPI', 'PSI' ou None."
    )
    subjects: Optional[Literal["Mathématiques", "Informatique"]] = Field(
        None, description="Le sujet lié à la question. Les valeurs possibles sont 'Mathématiques', 'Informatique' ou None."
    )
    questions_prefix: Optional[str] = Field(
        None, description="Le préfixe de la question (par exemple, 10, 1a., etc.). Autorise None."
    )
    
output_parser = PydanticOutputParser(pydantic_object=Question)



### Parse Question into Structured Metadata

In [None]:
def StructuredQuestion(llm, question: str, output_parser=output_parser, max_retries=10): 

    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                """
                Tu es un agent qui structure les données sous format JSON. Structure moi la question donnée si possible avec ces champs :
                - exam : Le nom de l'épreuve (par exemple, MATHEMATIQUES A) sans accent.
                - type : Le type de l'examen (écrit ou oral).
                - contest : Le concours (par exemple, X-ENS).
                - year : L'année de la question (par exemple, 2022).
                - levels : Les niveaux ou préfixes de la question (par exemple, MP, PC).
                - subjects : Le sujet lié à la question (par exemple, Mathématiques). 
                - questions_prefix : Le préfixe de la question (par exemple, 1a, 2a.).

                Contrainte:
                - Pour chaque champ, retourne une chaine de charactères, pas de liste !
                
                Encadre le résultat avec des balises `json`.\n{format_instructions}
                """
            ),
            (
                "human",
                "Voici la question: {question}"
            ),
        ]
    ).partial(format_instructions=output_parser.get_format_instructions())
    
    chain = prompt | llm | output_parser
    
    for attempt in range(max_retries):
        try:
            res = chain.invoke({"question": question})

            if res and any(value is not None for value in res.model_dump().values()):
                if res.type is None:
                    res.exam = None
                    res.questions_prefix = []
                return res 

            print(f"Attempt {attempt + 1}/{max_retries} failed: All fields are None.")
            if attempt < max_retries - 1:
                sleep(1)  # small delay between retries

        except Exception as e:
            print(f"Attempt {attempt + 1}/{max_retries} failed: {e}")
            if attempt < max_retries - 1:
                sleep(1) 
                
    print("All retry attempts failed.")
    
    return None


##### Testing the Question Parser

In [None]:
# question = "Peux-tu me dire ce que dit le rapport de jury pour l'épreuve écrite de maths A pour la question 12 en filière MP pour le concours Polytechnique en 2022 ?"

# question_metadata = StructuredQuestion(llm, question, output_parser)

# question_metadata

### Sub-question Generation to Improve Retrieval

In [None]:
class SubQuestions(BaseModel):
    question_one: str = Field(..., description="La première sous-question générée à partir de la question principale.")
    question_two: str = Field(..., description="La deuxième sous-question générée à partir de la question principale.")
    question_three: str = Field(..., description="La troisième sous-question générée à partir de la question principale.")    
    
output_parser_sub_questions = PydanticOutputParser(pydantic_object=SubQuestions)

def SubQuestionsGeneration(llm, question: str, question_metadata: Question, output_parser_sub_questions=output_parser_sub_questions): 
    metadata = (
        f"Examen : {question_metadata.exam or 'Non spécifié'}\n"
        f"Type : {question_metadata.type or 'Non spécifié'}\n"
        f"Concours : {question_metadata.contest or 'Non spécifié'}\n"
        f"Année : {question_metadata.year or 'Non spécifiée'}\n"
        f"Niveau : {question_metadata.levels or 'Non spécifié'}\n"
        f"Sujet : {question_metadata.subjects or 'Non spécifié'}\n"
        f"Préfixe de la question : {question_metadata.questions_prefix or 'Non spécifié'}"
    )
    
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                """
                Ta tâche est de générer trois variantes de la question posée par l'utilisateur et des métadonnées suivantes:
                {metadata}
                
                Ces reformulations doivent permettre de récupérer efficacement des documents pertinents dans une base de données vectorielle tout en compensant les limites de la recherche par similarité.

                Le but est de générer des questions pertinentes en lien avec les rapports de jury 
                des sujets de concours des classes préparatoires scientifiques.
                
                Retourne le résultat sous forme de JSON avec les champs suivants :
                - question_one : La première sous-question.
                - question_two : La deuxième sous-question.
                - question_three : La troisième sous-question.
                
                Encadre la réponse avec des balises `json`.\n{format_instructions}"""
            ),
            (
                "human",
                "Voici la question: {question}"
            ),
        ]
    ).partial(format_instructions=output_parser_sub_questions.get_format_instructions())
    
    chain = prompt | llm | output_parser_sub_questions
                
    res = chain.invoke({"question": question, "metadata": metadata})
        
    return res



##### Testing Sub-question Parser

In [None]:
# subquestions = SubQuestionsGeneration(llm, question, question_metadata)

# subquestions

### 🔙 Step-Back Question Generation

In [None]:
class StepBackQuestion(BaseModel):
    step_back_question: str = Field(..., description="Une question plus générale générée à partir de la question principale.")
    
output_parser_step_back = PydanticOutputParser(pydantic_object=StepBackQuestion)


In [None]:

def StepBackQuestionGeneration(llm, question: str, output_parser_step_back=output_parser_step_back):     
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                """
                Ta tâche est de générer une question plus générale par rapport à la question donnée par l'utilisateur.
                
                La question générée doit fournir un contexte plus large à la question initiale tout en restant pertinente pour les rapports de jury des sujets de concours des classes préparatoires scientifiques.
                
                Encadre la réponse avec des balises `json`.\n{format_instructions}"""
            ),
            (
                "human",
                "Voici la question: {question}"
            ),
        ]
    ).partial(format_instructions=output_parser_step_back.get_format_instructions())
    
    chain = prompt | llm | output_parser_step_back
                
    res = chain.invoke({"question": question})
        
    return res



##### Testing the step back question parser

In [None]:
# step_back = StepBackQuestionGeneration(llm, question, question_metadata)

# step_back

### Reciprocal Rank Fusion for Multi-query Retrieval

Link to the method : https://medium.com/@devalshah1619/mathematical-intuition-behind-reciprocal-rank-fusion-rrf-explained-in-2-mins-002df0cc5e2a

In [None]:
from langchain.load import dumps, loads

def reciprocal_rank_fusion(results: list[list], k=60):
    """ Reciprocal_rank_fusion that takes multiple lists of ranked documents 
        and an optional parameter k used in the RRF formula """
    
    fused_scores = {}

    for docs in results:
        for rank, doc in enumerate(docs):            
            page_content = doc[0].page_content
            if page_content not in fused_scores:
                fused_scores[page_content] = 0
            fused_scores[page_content] += 1 / (rank + k)

    reranked_results = [
        (page_content, score)
        for page_content, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
    ]
    return reranked_results



## RAG pipeline

### Filter Generator for Vector Search

In [None]:
def filterRAG(question_metadata):
    filter = {}

    for key, value in question_metadata.model_dump().items():
        if value is None:
            continue

        # if key == "questions_prefix":
        #     filter[key] = {"$in": [value]}
        if key == "levels" or key == "questions_prefix":
            filter[key] = {"$ilike": f"%{value}%"}
        else:
            filter[key] = {"$eq": value}
    return filter

### RAG Pipeline Class

In [None]:
class RapportsJurysRAG:
    def __init__(self, vector_store, question):
        self.llm = OllamaLLM(model="llama3.1")
        self.vector_store = vector_store
        self.question = question
        self.multiQuerySubQuestions = False # add variants questions
        self.QueryStepBack = False # add an overview context
        self.multiQueryRAGFusion = False # use rrf ranking
        self.nbDocs = 5 # nb docs in the context
        self.similarity_score_threshold = 0.3 # 0: no similarity, 1: full similarity
        
    def retriever(self):
        
        try:
            question_metadata = StructuredQuestion(self.llm, self.question)
                                    
            if (self.multiQuerySubQuestions):
                
                sub_questions_generation = SubQuestionsGeneration(self.llm, self.question, question_metadata)
                sub_questions = dict(sub_questions_generation).values()
                results = []
                                
                for sub_question in sub_questions:
                    
                    sub_question_metadata = StructuredQuestion(self.llm, sub_question)
                    filter = filterRAG(sub_question_metadata)
                    results.append(self.vector_store.similarity_search_with_relevance_scores(sub_question, k=self.nbDocs, filter=filter))
                                    
                if (self.multiQueryRAGFusion):
                
                    retrieval_results = reciprocal_rank_fusion(results)
                    contexte = "\n".join([page_content for page_content, _ in retrieval_results[:self.nbDocs]])  
                    return contexte, question_metadata
                
                else:
                    
                    flat_results = list(chain.from_iterable(results))
                    unique_results = {doc[0].metadata['id']: doc for doc in flat_results}
                    sorted_results = sorted(unique_results.values(), key=lambda x: x[1], reverse=True)
                    contexte = "\n".join([doc[0].page_content for doc in sorted_results[:self.nbDocs]])                      
                    return contexte, question_metadata
                
            elif (self.QueryStepBack):
                
                step_back_question_output = StepBackQuestionGeneration(self.llm, self.question)   
                step_back_question = step_back_question_output.step_back_question            
                step_back_question_metadata = StructuredQuestion(self.llm, step_back_question)                   
                filter_question = filterRAG(question_metadata)
                filter_step_back = filterRAG(step_back_question_metadata)
                results_question = self.vector_store.similarity_search_with_relevance_scores(self.question, k=5, filter=filter_question)
                results_step_back_question = self.vector_store.similarity_search_with_relevance_scores(step_back_question, k=5, filter=filter_step_back)                
                contexte = "\n".join([result[0].page_content for result in results_question[:self.nbDocs] + results_step_back_question[:self.nbDocs]])
                return contexte, question_metadata
                
            else:
                
                filter = filterRAG(question_metadata)
                results = self.vector_store.similarity_search_with_relevance_scores(self.question, k=self.nbDocs, filter=filter) #, score_threshold=self.similarity_score_threshold
                contexte = "\n".join([result[0].page_content for result in results])  
                return contexte, question_metadata
            
        except Exception as e:
            print(f"Error occurred: {e}")
            return "", {}
        
        
    def create_chain(self):
        template = """
        Tu es un agent IA sur les rapports de jury concernant les sujets de concours en classe préparatoire scientifique. 
        Réponds à la question de l'utilisateur en utilisant le contexte. La metadata est censé t'aider à discerner les parties pertinentes du contexte pour générer la réponse. 
        Si tu ne connais pas la réponse, dis simplement que tu ne sais pas.
        Question: {question} 
        Metadata: {metadata}
        Contexte: {contexte} 
        """ 

        prompt = ChatPromptTemplate.from_template(template)
            
        chain = (
             prompt
            | self.llm
            | StrOutputParser()
        )

        return chain

    def answer(self):
        contexte, question_metadata = self.retriever()
        
        input_data = {
            "question": self.question,
            "metadata": question_metadata,
            "contexte": contexte
        }
        
        chain = self.create_chain()

        res = chain.invoke(input=input_data)

        return res

### Run Example

In [None]:
question = "Peux-tu me dire ce que dit le rapport de jury pour l'épreuve écrite de maths A pour la question 12 en filière MP pour le concours Polytechnique en 2022 ?"

rapports_jurys = RapportsJurysRAG(vector_store, question)

answer = rapports_jurys.answer()

print("Réponse: ", answer)