# Implementation, Testing and Evaluation for Optimal Retrieval Process in RAG

#### Notebook Outline
1. Imports and Configurations
2. Creation of Vector Database
3. Querying the Vector Database
4. Output of RAG Experiments
5. Evaluations

This notebook uses functions from the Baseline RAG .ipynb file and adapts these.

### 1. Imports and Configurations

Imports

In [None]:
# === Standard Library Imports ===
import json
import os
import sys

# === Third-Party Libraries ===
import numpy as np
from dotenv import load_dotenv
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from tqdm import tqdm, trange

# === OpenAI Integration ===
import openai
from openai import OpenAI
from langchain_openai import ChatOpenAI

# === Project Root Configuration ===
project_root = os.path.abspath(os.path.join(os.getcwd(), '..', '..'))
if project_root not in sys.path:
    sys.path.append(project_root)

# === Local Project Modules ===
from ipynb_notebooks.baseline.rag_utils.baseline_rag import (
    load_vector_database,
    retrieve_documents,
    generate_answer,
    rag_pipeline
)

from ipynb_notebooks.evaluation_datasets.retrieval_eval.eval_vector_dataset_generator import generate_evalset
from ipynb_notebooks.evaluation_datasets.retrieval_eval.retrieval_metrics import run_retrieval_evaluation
from ipynb_notebooks.evaluation_datasets.generation_eval.generation_metrics import run_generation_evaluation


Configurations

In [None]:
# Load environment variables. Assumes that the project directory contains a .env file with API keys
load_dotenv()

# Set the OpenAI API key from the environment variables
# Make sure to update "OPENAI_API_KEY" to match the variable name in your .env file
openai.api_key = os.environ['OPENAI_API_KEY']

# Set client for chat completion 
client = OpenAI(api_key=openai.api_key)

# Define constants for paths
DATA_PATH = "../../data/laws_and_ordinances.json"  # Directory containing the url to the law and ordinance documents
DATA_PATH_SHORT_VERSION = "../../data/laws_and_ordinances_short_version.json" # Directory containing a subset of all urls for testing purposes
CHROMA_PATH = "chroma_dbs/chroma"  # Directory to save the Chroma vector store

### 2. Creation of Vector Databases

**Why Creating Separate Chroma Databases for Each Retrieval Process Is Not Necessary**

In contrast to chunking experiments, evaluating different retrieval strategies does not require generating separate Chroma vector databases. This is because all strategies operate over the same underlying document corpus and embeddings. Retrieval processes such as iterative, recursive or adaptive approaches differ only in how they search the embedded documents‚Äînot in how the documents are chunked or stored.

As long as the Chroma DB is generated using a consistent chunking strategy and embedding model, it provides a shared semantic space that is sufficient for fair comparison across retrieval methods. Creating separate vector stores per strategy would introduce unnecessary redundancy and would not improve the validity of the evaluation.

In [None]:
chroma_db_optimal_retrieval_process = "../chroma_dbs/chroma_chunksize1024_overlap128_c800ccc6_optimal_retrieval_process"

### 3. Retrieval Processes

| Retrieval Processes   | Retrieval Method  | Explanation  |
|---|---|---|
| Iterative  | Dense  | Repeatedly alternates between retrieval and generation, using generated content to refine subsequent retrievals.  |
| Recursive  | Dense  | Breaks down complex queries into sub-questions and solves them step by step, often guided by chain-of-thought reasoning  |
| Adaptive  | Dense  | Empowers the model to dynamically decide whether, when, and how much to retrieve, based on uncertainty, special tokens, or self-reflection.  |

#### 3.1 Iterative RAG Process

In [None]:
def rewrite_query(original_query, last_response):
    return f"{original_query}. Hinweis: Beachte bei der Beantwortung auch: {last_response}"

In [None]:
def compute_cosine_similarity_with_embeddings(text1, text2, model="text-embedding-3-small"):
    """
    Computes cosine similarity between OpenAI embeddings of two texts.
    """
    embeddings = openai.embeddings.create(
        model=model,
        input=[text1, text2]
    )
    
    vec1 = np.array(embeddings.data[0].embedding)
    vec2 = np.array(embeddings.data[1].embedding)
    
    return float(cosine_similarity([vec1], [vec2])[0][0])

In [None]:
def rag_pipeline_iterative(query, database, k, model_name, max_iterations=3, similarity_threshold=0.95):
    current_query = query
    retrieved_sources_accumulated = []
    retrieved_contexts_accumulated = []
    retrieved_ids_accumulated = []
    retrieved_indices_accumulated = []
    previous_response = ""
    retrieved_ids_set = set() 

    for iteration in trange(max_iterations, desc="Iterative RAG"):
        
        # Retriever
        results = retrieve_documents(query_text=current_query, db=database, k=k)        

        for doc, _ in results:
            chunk_id = doc.metadata.get("chunk_id")
            if chunk_id not in retrieved_ids_set:
                retrieved_contexts_accumulated.append(doc.page_content)
                retrieved_sources_accumulated.append(doc.metadata.get("source"))
                retrieved_ids_accumulated.append(chunk_id)
                retrieved_indices_accumulated.append(doc.metadata.get("chunk_index"))
                retrieved_ids_set.add(chunk_id)  

        # Generator
        response = generate_answer(results, current_query, model_name)

        # Convergence check: similarity of the previously generated response
        if previous_response:
            similarity = compute_cosine_similarity_with_embeddings(response.strip(), previous_response.strip())
            if similarity >= similarity_threshold:
                print(f"\nCancel at iteration {iteration+1}: Similarity = {similarity:.2f}")
                break

        previous_response = response

        # Update query with rewrite function
        current_query = rewrite_query(query, response)
    
    return response.strip(), retrieved_sources_accumulated, retrieved_contexts_accumulated, retrieved_ids_accumulated, retrieved_indices_accumulated

#### 3.2 Recursive RAG Process 

In [None]:
def generate_sub_queries(query, max_depth=3, model_name="gpt-4o-mini"):
    """
    Uses a Chain-of-Thought prompt to generate sub-questions for recursive reasoning.
    """
    prompt = f"""
                Du bist ein hilfreicher, juristischer KI-Assistent f√ºr Gesetzestexte im deutschen Energie- und Versorgungsbereich. Zerlege die folgende komplexe Frage in {max_depth} einfachere, kurze Teilfragen, die Schritt f√ºr Schritt bei der Beantwortung helfen.

                Frage:
                {query}

                Liste jede Teilfrage in einer neuen Zeile auf:
            """
            
    model = ChatOpenAI(model_name=model_name)
    response = model.predict(prompt)
    return [q.strip() for q in response.strip().split("\n") if q.strip()]

In [None]:
def aggregate_recursive_answers(original_query, sub_answers, model_name):
    """
    Aggregates all answers of the sub-queries to a consistent answer.
    """
    combined = "\n".join([f"Frage: {q}\nAntwort: {a}" for q, a in sub_answers])
    prompt = f"""
                Basierend auf der urspr√ºnglichen Frage: "{original_query}"
                und den folgenden Teilantworten, generiere eine kurze, pr√§zise, in sich schl√ºssige und vollst√§ndige Gesamtantwort:

                {combined}
                
                Die Antwort soll sehr kurz sein mit einer maximalen Tokenl√§nge von 200.
            """
            
    model = ChatOpenAI(model_name=model_name)
    response = model.predict(prompt)
    return response


In [None]:
def process_sub_query(sub_query, db, k, model_name):
    # Retrieval
    results = retrieve_documents(query_text=sub_query, db=db, k=k)

    # Antwort generieren
    answer = generate_answer(results, sub_query, model_name)

    return sub_query, answer, results

In [None]:
from concurrent.futures import ThreadPoolExecutor
from functools import partial

def rag_pipeline_recursive(query: str, database, k: int, model_name: str, max_depth: int = 3, max_iterations: int = 3, convergence_threshold: float = 0.95):

    retrieved_sources_accumulated = []
    retrieved_contexts_accumulated = []
    retrieved_ids_accumulated = []
    retrieved_ids_set = set()
    retrieved_indices_accumulated = []
    
    previous_response = ""
    current_query = query

    for iteration in tqdm(range(max_iterations), desc="Recursive RAG"):
        
        sub_answers = []
        sub_queries = generate_sub_queries(current_query, model_name=model_name, max_depth=max_depth)


        # Build preconfigured function
        process_fn = partial(process_sub_query, db=database, k=k, model_name=model_name)

        # Parallel processing
        with ThreadPoolExecutor(max_workers=5) as executor:
            futures = [executor.submit(process_fn, sub_query) for sub_query in sub_queries]

            for future in futures:
                sub_query, answer, results = future.result()
                

                for doc, _ in results:
                    chunk_id = doc.metadata.get("chunk_id")
                    if chunk_id not in retrieved_ids_set:
                        retrieved_contexts_accumulated.append(doc.page_content)
                        retrieved_sources_accumulated.append(doc.metadata.get("source"))
                        retrieved_ids_accumulated.append(chunk_id)
                        retrieved_indices_accumulated.append(doc.metadata.get("chunk_index"))
                        retrieved_ids_set.add(chunk_id)

                sub_answers.append((sub_query, answer))
                        
        response = aggregate_recursive_answers(query, sub_answers, model_name)

        # Convergence check: similarity of the previously generated response
        if previous_response:
            similarity = compute_cosine_similarity_with_embeddings(response.strip(), previous_response.strip())
            if similarity >= convergence_threshold:
                print(f"\nCancel at iteration {iteration+1}: Similarity = {similarity:.2f}")
                break

        previous_response = response
        current_query = rewrite_query(query, response)

    return response.strip(), retrieved_sources_accumulated, retrieved_contexts_accumulated, retrieved_ids_accumulated, retrieved_indices_accumulated

#### 3.3. Adapative RAG Process

In [None]:
def reflection_judge(query: str, answer: str, model_name: str) -> bool:
    """
    Das Modell reflektiert, ob die Antwort gut genug ist.
    R√ºckgabe: True = Antwort reicht aus, False = Retrieval n√∂tig
    """
    prompt = f"""
    Die folgende Antwort wurde auf die Frage gegeben: "{query}"

    Antwort:
    {answer}

    Beurteile, ob die Antwort vollst√§ndig, korrekt und nachvollziehbar ist.
    Wenn die Antwort unzureichend ist und eine zus√§tzliche Recherche notwendig w√§re, antworte mit "RETRIEVE".
    Andernfalls antworte mit "OK".
    """
    model = ChatOpenAI(model_name=model_name)
    response = model.predict(prompt).strip().upper()
    
    print(f"Response: {response}")
    
    return response != "OK"


In [None]:
def rag_pipeline_adaptive(query: str, database, k: int, model_name: str, max_depth: int = 3, max_iterations: int = 3, convergence_threshold: float = 0.95):
    """
    Adaptive RAG: Erst direkt generieren, dann ggf. Retrieval triggern (Self-RAG Light)
    """
    model = ChatOpenAI(model_name=model_name)

    # Schritt 1: Direkte Antwort
    direct_prompt = f"Beantworte die folgende Frage so gut wie m√∂glich: {query}"
    initial_answer = model.predict(direct_prompt).strip()

    # Schritt 2: Reflektiere, ob Antwort ausreicht
    needs_retrieval = reflection_judge(query, initial_answer, model_name)

    if not needs_retrieval:
        return initial_answer, [], [], [], []

    # Schritt 3: Falls n√∂tig ‚Üí Retrieval (z.‚ÄØB. recursive_rag)
    retrieved_sources_accumulated = []
    retrieved_contexts_accumulated = []
    retrieved_ids_accumulated = []
    retrieved_ids_set = set()
    retrieved_indices_accumulated = []
    
    previous_response = ""
    current_query = query

    for iteration in tqdm(range(max_iterations), desc="Recursive RAG"):
                
        sub_answers = []
        sub_queries = generate_sub_queries(current_query, model_name=model_name, max_depth=max_depth)


        # Build preconfigured function
        process_fn = partial(process_sub_query, db=database, k=k, model_name=model_name)

        # Parallel processing
        with ThreadPoolExecutor(max_workers=5) as executor:
            futures = [executor.submit(process_fn, sub_query) for sub_query in sub_queries]

            for future in futures:
                sub_query, answer, results = future.result()

                for doc in results:
                    chunk_id = doc.metadata.get("chunk_id")
                    if chunk_id not in retrieved_ids_set:
                        retrieved_contexts_accumulated.append(doc.page_content)
                        retrieved_sources_accumulated.append(doc.metadata.get("source"))
                        retrieved_ids_accumulated.append(chunk_id)
                        retrieved_indices_accumulated.append(doc.metadata.get("chunk_index"))
                        retrieved_ids_set.add(chunk_id)

                sub_answers.append((sub_query, answer))
                        
        response = aggregate_recursive_answers(query, sub_answers, model_name)
        
        if not reflection_judge(query, response, model_name):
            print(f"‚úÖ Iteration {iteration + 1}: Answer sufficient, cancel retrieval and generate final answer")
            break

        # Convergence check: similarity of the previously generated response
        if previous_response:
            similarity = compute_cosine_similarity_with_embeddings(response.strip(), previous_response.strip())
            if similarity >= convergence_threshold:
                print(f"\nCancel at iteration {iteration+1}: Similarity = {similarity:.2f}")
                break

        previous_response = response
        current_query = rewrite_query(query, response)

    return response.strip(), retrieved_sources_accumulated, retrieved_contexts_accumulated, retrieved_ids_accumulated, retrieved_indices_accumulated


### 4. Evaluations

#### 4.1 Preparing the Evaluation Dataset

Since no new Chroma DB had to be created, the evaluation data set from the RAG baseline can also be reused. The data set was copied and renamed to ensure completeness.

In [None]:
eval_dataset_optimal_retrieval_process = "eval_datasets/4_optimal_retrieval_process/artificial_evaluation_dataset_for_chroma_chunksize1024_overlap128_c800ccc6_optimal_retrieval_process.json"

#### 4.2 Enrich Evaluation Datasets with Responses

In [None]:
def enrich_eval_dataset_with_rag_responses_for_optimal_retrieval(
    eval_dataset, 
    chroma_path, 
    k, 
    model_name, 
    rag_mode="baseline",  # baseline | iterative | recursive | adaptive
    optimization="4_optimal_retrieval_process"
):
    db = load_vector_database(chroma_path)

    with open(eval_dataset, "r", encoding="utf-8") as f:
        eval_dataset_json = json.load(f)

    enriched_dataset = []

    for i, entry in enumerate(tqdm(eval_dataset_json, desc="Processing RAG responses")):
        query = entry["query"]

        # Dynamische Auswahl der Pipeline
        if rag_mode == "baseline":
            response, _, retrieved_chunk_contexts, retrieved_chunk_ids, retrieved_chunk_indices = rag_pipeline(
                query, db, model_name)
        elif rag_mode == "iterative":
            response, _, retrieved_chunk_contexts, retrieved_chunk_ids, retrieved_chunk_indices = rag_pipeline_iterative(
                query, db, k, model_name
            )
        elif rag_mode == "recursive":
            response, _, retrieved_chunk_contexts, retrieved_chunk_ids, retrieved_chunk_indices = rag_pipeline_recursive(
                query, db, k, model_name
            )
        elif rag_mode == "adaptive":
            response, _, retrieved_chunk_contexts, retrieved_chunk_ids, retrieved_chunk_indices = rag_pipeline_adaptive(
                query, db, k, model_name
            )
        else:
            raise ValueError(f"Unknown RAG mode: {rag_mode}")

        # Ergebnisse hinzuf√ºgen
        entry["generated_response"] = response
        entry["retrieved_chunk_contexts"] = retrieved_chunk_contexts
        entry["retrieved_chunk_ids"] = retrieved_chunk_ids
        entry["retrieved_chunk_indices"] = retrieved_chunk_indices

        enriched_dataset.append(entry)

    output_path = f"eval_datasets/{optimization}{eval_dataset.split('/')[-1].replace('.json', '')}_{rag_mode}_rag_enriched.json"

    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(enriched_dataset, f, indent=2, ensure_ascii=False)

    return output_path


In [None]:
rag_processes = ["iterative", "recursive", "adaptive"]

enriched_datasets = {}

for process in rag_processes:
    
    print(f"Enriching evaluation dataset for {process} RAG:")
    
    enriched = enrich_eval_dataset_with_rag_responses_for_optimal_retrieval(
        eval_dataset=eval_dataset_optimal_retrieval_process,
        chroma_path=chroma_db_optimal_retrieval_process,
        k=6,
        model_name="gpt-4o-mini",
        rag_mode=process
    )

    enriched_datasets[process] = enriched


In [None]:
rag_processes = ["iterative", "recursive"]

enriched_datasets = {
    'iterative': 'eval_datasets/4_optimal_retrieval_process/artificial_evaluation_dataset_for_chroma_chunksize1024_overlap128_c800ccc6_optimal_retrieval_process_iterative_rag_enriched.json', 
    'recursive': 'eval_datasets/4_optimal_retrieval_process/artificial_evaluation_dataset_for_chroma_chunksize1024_overlap128_c800ccc6_optimal_retrieval_process_recursive_rag_enriched.json', 
    'adaptive': 'eval_datasets/4_optimal_retrieval_process/artificial_evaluation_dataset_for_chroma_chunksize1024_overlap128_c800ccc6_optimal_retrieval_process_adaptive_rag_enriched.json'
    }

#### 4.2. Evaluate Retrieval & Generation

In [None]:
evaluation_results_optimal_chunking = {}
generation_results_optimal_chunking = {}

db_name = chroma_db_optimal_retrieval_process.split("/")[-1]

for index, process in enumerate(rag_processes):

    json_filename = f"4_optimal_retrieval_process/{enriched_datasets[process].split('/')[-1]}"
    model_name = f"optimal_retrieval_{index+1}_{process}_{db_name.replace('_optimal_retrieval_process', '')}"


    print(f"\nEvaluating {model_name} using dataset {json_filename}...")

    retrieval_result = run_retrieval_evaluation(
        json_filename=json_filename,
        model_name=model_name
    )

    generation_result = run_generation_evaluation(
        json_filename=json_filename,
        model_name=model_name
    )

    evaluation_results_optimal_chunking[model_name] = retrieval_result
    generation_results_optimal_chunking[model_name] = generation_result

In [None]:
process = "adaptive"
json_filename = f"4_optimal_retrieval_process/{enriched_datasets[process].split('/')[-1]}"
model_name = f"optimal_retrieval_{3}_{process}_{db_name.replace('_optimal_retrieval_process', '')}"


print(f"\nEvaluating {model_name} using dataset {json_filename}...")

generation_result = run_generation_evaluation(
    json_filename=json_filename,
    model_name=model_name
)

In [None]:
from pathlib import Path
import pandas as pd

# Define base folder and file patterns
folder_path = Path("eval_results") / "4_optimal_retrieval_process"
pattern_retrieval = "optimal_retrieval*retrieval_evaluation.csv"
pattern_generation = "optimal_retrieval*generation_evaluation.csv"

# Find matching CSV files
csv_retrieval_files = list(folder_path.glob(pattern_retrieval))
csv_generation_files = list(folder_path.glob(pattern_generation))

print(f"üîç Found {len(csv_retrieval_files)} retrieval files.")
print(f"üîç Found {len(csv_generation_files)} generation files.")

# Load and combine retrieval evaluation files
df_retrieval = [pd.read_csv(f) for f in csv_retrieval_files]
df_generation = [pd.read_csv(f) for f in csv_generation_files]

# Concatenate if there is at least one file
if df_retrieval:
    combined_df_retrieval = pd.concat(df_retrieval, ignore_index=True)
    output_path_retrieval = folder_path / "combined_optimal_retrieval_process_retrieval_evaluation.csv"
    combined_df_retrieval.to_csv(output_path_retrieval, index=False)
    print(f"‚úÖ Retrieval results saved to: {output_path_retrieval}")
else:
    print("‚ö†Ô∏è No retrieval CSV files found.")

if df_generation:
    combined_df_generation = pd.concat(df_generation, ignore_index=True)
    output_path_generation = folder_path / "combined_optimal_retrieval_process_generation_evaluation.csv"
    combined_df_generation.to_csv(output_path_generation, index=False)
    print(f"‚úÖ Generation results saved to: {output_path_generation}")
else:
    print("‚ö†Ô∏è No generation CSV files found.")
