<a href="https://colab.research.google.com/github/wjleece/rag-experimentation-framework/blob/main/RAG_Experimentation_Framework_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

If you use this code, please cite:

{
  title = {RAG Experimentation Framework},

  author = {Bill Leece},

  year = {2024}
}

#Setup

In [None]:
!pip install -U transformers --quiet
#!pip install -U optimum --quiet
!pip install -U accelerate  --quiet
#!pip install -U bitsandbytes  --quiet
!pip install -U torch --quiet
!pip install -U sentencepiece --quiet
!pip install -U llama-index --quiet
!pip install -U llama-index-llms-mistralai --quiet
!pip install -U llama-index-embeddings-mistralai --quiet
!pip install -U llama-index-llms-langchain --quiet
!pip install -U langchain --quiet
!pip install -U langchain-community --quiet
!pip install -U langchain-mistralai --quiet
!pip install -U langchain_huggingface --quiet
!pip install -U faiss-gpu --quiet

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.6 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m64.5 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m69.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m189.0/189.0 kB[0m [31m19.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m298.0/298.0 kB[0m [31m29.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m62.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.5/49.5 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32

In [None]:
import os
import json
import numpy as np
import faiss
import transformers
import torch
import gc
from google.colab import drive, userdata
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from langchain.prompts import PromptTemplate
from langchain_huggingface import HuggingFacePipeline
from langchain_core.output_parsers import StrOutputParser
from langchain_mistralai.chat_models import ChatMistralAI
from llama_index.embeddings.mistralai import MistralAIEmbedding
from llama_index.core import SimpleDirectoryReader, Settings
from llama_index.core.node_parser import SemanticSplitterNodeParser
import time
from typing import List, Dict, Tuple
from contextlib import contextmanager
from langchain.schema.runnable import RunnableSequence
from langchain.schema.output_parser import StrOutputParser
from langchain_text_splitters.markdown import MarkdownHeaderTextSplitter
from langchain.text_splitter import RecursiveCharacterTextSplitter

In [None]:
os.environ["HF_TOKEN"] = userdata.get('HF_TOKEN')
os.environ["MISTRAL_API_KEY"] = userdata.get('MISTRAL_API_KEY')
api_key = userdata.get('OPENAI_API_KEY')

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu' #Use GPUs when possible

#Experiment Configurations

In [None]:
# Setup configurations
MODEL_CONFIGS = {
    "models": [
    #    {
    #        "name": "open-mixtral-8x7b",
    #        "type": "mistral_api",
    #        "tokenizer": None,  # Not needed for API models
    #    },
         {
            "name": "open-mistral-nemo",
            "type": "mistral_api",
            "tokenizer": None,  # Not needed for API models
         },
   #     {
   #         "name": "ministral-8b-latest",
   #         "type": "mistral_api",
   #         "tokenizer": None,  # Not needed for API models
   #     },
   #   {
   #         "name": "wjleece/quantized-mistral-7b",
   #         "type": "huggingface_quantized",
   #         "tokenizer": "mistralai/Mixtral-8x7B-v0.1",  # The same tokenizer that works on the base model will work on the quantized model - there is no 'quantized tokenizer'
   #          "quantization_config": {                    #Quantization config left here as a reference, but not used in the code (as we're using an already quantized model from HuggingFace)
   #             "load_in_4bit": True,
   #             "bnb_4bit_compute_dtype": "float16",
   #             "bnb_4bit_quant_type": "nf4",
   #             "bnb_4bit_use_double_quant": False
   #         }
   #     },
   #   {
   #           "name": "wjleece/quantized-mistral-nemo-12b",
   #           "type": "huggingface_quantized",
   #           "tokenizer": "mistralai/Mistral-Nemo-Instruct-2407",  # The same tokenizer that works on the base model will work on the quantized model - there is no 'quantized tokenizer'
   #           "quantization_config": {                    #Quantization config left here as a reference, but not used in the code (as we're using an already quantized model from HuggingFace)
   #               "load_in_4bit": True,
   #               "bnb_4bit_compute_dtype": "float16",
   #               "bnb_4bit_quant_type": "nf4",
   #               "bnb_4bit_use_double_quant": False
   #          }
   #       },
     #  {
     #         "name": "wjleece/quantized-mistral-8b",
     #         "type": "huggingface_quantized",
     #         "tokenizer": "mistralai/Ministral-8B-Instruct-2410",  # The same tokenizer that works on the base model will work on the quantized model - there is no 'quantized tokenizer'
     #         "quantization_config": {                    #Quantization config left here as a reference, but not used in the code (as we're using an already quantized model from HuggingFace)
     #             "load_in_4bit": True,
     #             "bnb_4bit_compute_dtype": "float16",
     #             "bnb_4bit_quant_type": "nf4",
     #             "bnb_4bit_use_double_quant": False
     #         }
     #     }
       ]
}


CHUNKING_CONFIGS = {
    "strategies": ["semantic", "paragraph", "header"], #results will be saved in this order, with thesholds applicable for semantic only (if it is included)
    "thresholds": [85, 95], #semantic threshold, only applicable for semantic chunking
    "max_chunk_size": 2048, #only used in paragraph and header chunking
    "chunk_overlap": 100, #only used in paragraph and header chunking
    "min_chunk_size": 35 #if a chunk is only 35 characters - about 5 words - just ignore it
}

QUESTION_CONFIGS = {
    "questions": [
        "What were cloud revenues in Q2 2024?",
       # "What were the main drivers of revenue growth in Q2?",
       # "How much did YouTube ad revenues grow in Q2 in APAC?",
       # "Can you summarize recent key antitrust matters?",
       # "Compare the revenue growth across all geographic regions and explain the main factors for each region.",
       # "Summarize all mentioned risk factors related to international operations.",
       # "What were the major changes in operating expenses across all categories and their stated reasons?"
    ] #These quetsions should relate to the RAG document --> these are your 'business use cases'
}

FILE_CONFIGS = {
    "save_directory": '/content/drive/My Drive/AI/Model_Analysis'
}

#Load RAG Document

In [None]:
drive.mount('/content/drive')
documents = SimpleDirectoryReader(input_files=["/content/drive/My Drive/AI/Datasets/Google-10-q/goog-10-q-q2-2024.pdf"]).load_data()

Mounted at /content/drive


#RAG Pipeline Class

In [None]:
# Global singleton instance
_GLOBAL_RAG_PIPELINE = None

class RAGPipeline:
    def __init__(self):
        self.chunk_cache = {}
        self.embedding_cache = {}
        self.embedding_model = None

    @classmethod
    def get_instance(cls):
        """Get or create singleton instance"""
        global _GLOBAL_RAG_PIPELINE
        if _GLOBAL_RAG_PIPELINE is None:
            _GLOBAL_RAG_PIPELINE = cls()
        return _GLOBAL_RAG_PIPELINE


    def initialize_embedding_model(self):
        """Initialize the embedding model if not already initialized"""
        if self.embedding_model is None:
            mistral_api_key = userdata.get('MISTRAL_API_KEY')
            self.embedding_model = MistralAIEmbedding(
                model_name="mistral-embed",
                api_key=mistral_api_key
            )
        return self.embedding_model

    def convert_to_markdown_headers(self, text):
        """Convert document section titles to markdown headers"""
        import re

        patterns = [
            (r'^(?:ITEM|Section)\s+\d+[.:]\s*(.+)$', '# '),
            (r'^\d+\.\d+\s+(.+)$', '## '),
            (r'^\([a-z]\)\s+(.+)$', '### ')
        ]

        lines = text.split('\n')
        markdown_lines = []

        for line in lines:
            line = line.strip()
            converted = False

            for pattern, header_mark in patterns:
                if re.match(pattern, line, re.IGNORECASE):
                    markdown_lines.append(f"{header_mark}{line}")
                    converted = True
                    break

            if not converted:
                markdown_lines.append(line)

        return '\n'.join(markdown_lines)


    def create_chunks(self, documents: List, threshold: int, chunk_strategy: str = "semantic") -> Dict:
        """Create or retrieve chunks based on specified strategy"""

        MAX_CHUNK_SIZE = CHUNKING_CONFIGS['max_chunk_size']
        CHUNK_OVERLAP = CHUNKING_CONFIGS['chunk_overlap']
        MIN_CHUNK_SIZE = CHUNKING_CONFIGS['min_chunk_size']

        if chunk_strategy == "semantic":
            cache_key = f"{chunk_strategy}_{threshold}"
        else:
            cache_key = f"{chunk_strategy}_{MAX_CHUNK_SIZE}"

        if cache_key not in self.chunk_cache:
            print("\n=== CHUNK CREATION DEBUG ===")
            print(f"Strategy: {chunk_strategy}")
            print(f"Cache key: {cache_key}")
            if chunk_strategy == "semantic":
                print(f"Using semantic threshold: {threshold}")
            else:
                print(f"Using max chunk size: {MAX_CHUNK_SIZE} characters with {CHUNK_OVERLAP} character overlap")

            if len(self.chunk_cache) > 2:
                oldest_key = min(self.chunk_cache.keys())
                if oldest_key != cache_key:
                    del self.chunk_cache[oldest_key]
                    if oldest_key in self.embedding_cache:
                        del self.embedding_cache[oldest_key]
                    gc.collect()

            if chunk_strategy == "semantic":
                if self.embedding_model is None:
                    self.initialize_embedding_model()

                splitter = SemanticSplitterNodeParser(
                    buffer_size=1,
                    breakpoint_percentile_threshold=threshold,
                    embed_model=self.embedding_model
                )
                nodes = splitter.get_nodes_from_documents(documents)
                texts = [node.text for node in nodes]

            elif chunk_strategy == "paragraph":
                text_splitter = RecursiveCharacterTextSplitter(
                    separators=["\n\n", "\n", ". ", " ", ""],
                    chunk_size=MAX_CHUNK_SIZE,
                    chunk_overlap=CHUNK_OVERLAP,
                    length_function=len
                )
                texts = []
                for doc in documents:
                    chunks = text_splitter.split_text(doc.text)
                    texts.extend(chunks)

            elif chunk_strategy == "header":
                headers_to_split_on = [
                    ("#", "Header 1"),
                    ("##", "Header 2"),
                    ("###", "Header 3"),
                ]

                header_splitter = MarkdownHeaderTextSplitter(
                    headers_to_split_on=headers_to_split_on
                )

                text_splitter = RecursiveCharacterTextSplitter(
                    chunk_size=MAX_CHUNK_SIZE,
                    chunk_overlap=CHUNK_OVERLAP,
                    separators=["\n\n", "\n", ". ", " ", ""]
                )

                texts = []
                for doc in documents:
                    md_text = self.convert_to_markdown_headers(doc.text)
                    header_splits = header_splitter.split_text(md_text)

                    for split in header_splits:
                        if len(split.page_content) > MAX_CHUNK_SIZE:
                            chunks = text_splitter.split_text(split.page_content)
                            texts.extend(chunks)
                        else:
                            texts.append(split.page_content)

            else:
                raise ValueError(f"Unknown chunk strategy: {chunk_strategy}")

            # Filter out chunks that are too small
            texts = [t for t in texts if len(t.strip()) >= MIN_CHUNK_SIZE]

            if texts:
                chunk_lengths = [len(t) for t in texts]
                chunk_stats = {
                    'num_chunks': len(texts),
                    'avg_chunk_size': sum(chunk_lengths)/len(texts),
                    'min_chunk_size': min(chunk_lengths),
                    'max_chunk_size': max(chunk_lengths)
                }

                self.chunk_cache[cache_key] = {
                    'texts': texts,
                    'strategy': chunk_strategy,
                    'chunk_stats': chunk_stats
                }

        return self.chunk_cache[cache_key]

    def run_cosine_search(self, query: str, threshold: int, chunk_strategy: str = "semantic", k: int = 5) -> List[Dict]:
        """Run cosine similarity search with memory optimization and debugging"""
        print("\n=== COSINE SEARCH DEBUG ===")
        print(f"Query: {query}")
        print(f"Strategy: {chunk_strategy}")
        print(f"Threshold: {threshold}")
        print(f"Requested k: {k}")

        if self.embedding_model is None:
            self.initialize_embedding_model()

        FIXED_CHUNK_SIZE = 1024

        if chunk_strategy == "semantic":
            cache_key = f"{chunk_strategy}_{threshold}"
        else:
            cache_key = f"{chunk_strategy}_{FIXED_CHUNK_SIZE}"

        print(f"Cache key: {cache_key}")

        if cache_key not in self.embedding_cache:
            try:
                texts = self.chunk_cache[cache_key]['texts']
                print(f"Creating embeddings for {len(texts)} chunks")
            except KeyError:
                print(f"Warning: No chunks found for strategy {chunk_strategy}")
                return []

            batch_size = 32
            embeddings = []

            for i in range(0, len(texts), batch_size):
                batch_texts = texts[i:i + batch_size]
                batch_embeddings = [self.embedding_model.get_text_embedding(text)
                                  for text in batch_texts]
                embeddings.extend(batch_embeddings)

                if i % (batch_size * 4) == 0:
                    gc.collect()

            embeddings_array = np.array(embeddings).astype('float32')
            normalized_embeddings = embeddings_array / np.linalg.norm(embeddings_array, axis=1)[:, np.newaxis]

            dimension = embeddings_array.shape[1]
            cosine_index = faiss.IndexFlatIP(dimension)
            cosine_index.add(normalized_embeddings)

            self.embedding_cache[cache_key] = {
                'embeddings': embeddings_array,
                'cosine_index': cosine_index
            }

        query_vector = self.embedding_model.get_text_embedding(query)
        query_vector = np.array([query_vector]).astype('float32')
        query_normalized = query_vector / np.linalg.norm(query_vector)

        distances, indices = self.embedding_cache[cache_key]['cosine_index'].search(
            query_normalized.reshape(1, -1).astype('float32'), k
        )

        return [
            {
                'text': self.chunk_cache[cache_key]['texts'][idx],
                'distance': float(score),
                'strategy': chunk_strategy
            }
            for score, idx in zip(distances[0], indices[0])
        ]

    def generate_response(self, query: str, context_rag: list, model: Dict) -> dict:
        """Generate response using provided context"""
        try:
            context_texts = [doc['text'] for doc in context_rag]
            if not context_texts:
                return {"response_text": "No relevant context found.", "sources": [], "strategy": context_rag[0]['strategy'] if context_rag else None}

            context = "\n\n".join(context_texts)

            prompt = PromptTemplate(template="""
            Instructions:

            You are a helpful assistant who answers questions from context that has been provided to you.
            Given the context information, provide a direct and concise answer to the question: {query}

            Focus only on information present in the context. If you don't know the answer, say "I don't know."
            You must format your response as a JSON string object, starting with the word "LLM_Response:"

            Your answer to {query} will be a JSON string object that starts with "LLM_Response:" as shown below:

            LLM_Response:
            {{
                "response_text": "Your detailed answer here",
                "sources": [
                    "Copy and paste here the exact text segments from the context that you used to generate your answer. Include all relevant segments, verbatim."
                ]
            }}

            Important: In your response, the "sources" field must contain the exact text passages from the provided context that you used to formulate your answer. Copy these passages word-for-word.

            Do not include a hypothetical example in your answer, only include your final answer after "LLM_Response:"

            The context information that you will use for your answer is below:

            ---------------
            {context}
            ---------------
            """)

            model_type = model['type']
            llm = model['llm']

            chain = prompt | llm | StrOutputParser()

            response = chain.invoke({
                "query": query,
                "context": context
               })

            response_text = response.split("LLM_Response:")[-1].strip()

            try:
                if '{' in response_text and '}' in response_text:
                    json_str = response_text[response_text.find('{'):response_text.rfind('}')+1]
                    parsed_response = json.loads(json_str)
                    return {
                        "response_text": parsed_response.get("response_text", response_text),
                        "sources": parsed_response.get("sources", []),
                        "strategy": context_rag[0]['strategy'] if context_rag else None
                    }
                else:
                    return {
                        "response_text": response_text,
                        "sources": [],
                        "strategy": context_rag[0]['strategy'] if context_rag else None
                    }
            except json.JSONDecodeError:
                return {
                    "response_text": response_text,
                    "sources": [],
                    "strategy": context_rag[0]['strategy'] if context_rag else None
                }

        except Exception as e:
            print(f"An error occurred: {str(e)}")
            return {"response_text": "An error occurred while generating the response.", "sources": []}

#ModelConfig Class

In [None]:
class ModelConfig:
    """Handles model configuration and management"""
    def __init__(self,
                 models: List[Dict],
                 temperature: float = 0.3):
        self.models = models
        self.temperature = temperature
        self.current_model = None
        self.current_model_name = None


    @contextmanager
    def load_model(self, model_config: Dict):
        """Context manager for lazy loading and proper cleanup of models"""
        try:
            model_name = model_config["name"]
            model_type = model_config["type"]

            # Clear any existing model
            self.cleanup_current_model()

            if model_type == "mistral_api":
                mistral_api_key = userdata.get('MISTRAL_API_KEY')
                self.current_model = {
                    'llm': ChatMistralAI(
                        model=model_name,
                        temperature=self.temperature,
                        api_key=mistral_api_key
                    ),
                    'type': 'mistral_api'
                }
            else:  # huggingface_quantized
                print(f"Loading quantized model: {model_name}")

                # Empty CUDA cache before loading new model
                torch.cuda.empty_cache()
                gc.collect()

                tokenizer = AutoTokenizer.from_pretrained(
                    pretrained_model_name_or_path=model_config["tokenizer"],
                    trust_remote_code=True,
                    use_fast=True,
                    padding_side="left"
                )

                model = AutoModelForCausalLM.from_pretrained(
                    pretrained_model_name_or_path=model_name,
                    device_map="auto",
                    trust_remote_code=True,
                    torch_dtype=torch.float16,
                    use_cache=True,
                    low_cpu_mem_usage=True,
                )

                pipe = pipeline(
                    "text-generation",
                    model=model,
                    tokenizer=tokenizer,
                    max_new_tokens=512,
                    temperature=self.temperature,
                    top_p=0.95,
                    top_k=50,
                    do_sample=True,
                    device_map="auto"
                )

                self.current_model = {
                    'llm': HuggingFacePipeline(pipeline=pipe),
                    'type': 'huggingface_quantized',
                    'model': model,  # Keep reference for cleanup
                    'pipe': pipe     # Keep reference for cleanup
                }

            self.current_model_name = model_name
            yield self.current_model

        finally:
            # Cleanup will happen in cleanup_current_model()
            pass

    def cleanup_current_model(self):
        """Clean up the current model and free memory"""
        if self.current_model is not None:
            if self.current_model['type'] == 'huggingface_quantized':
                # Delete model components explicitly
                del self.current_model['llm']
                del self.current_model['model']
                del self.current_model['pipe']

                # Clear CUDA cache
                torch.cuda.empty_cache()

                # Run garbage collection
                gc.collect()

            self.current_model = None
            self.current_model_name = None

#ExperimentRunner Class

In [None]:
class ExperimentRunner:
    """Handles experiment execution"""
    def __init__(self,
                 model_config: ModelConfig,
                 thresholds: List[int],
                 questions: List[str],
                 chunk_strategies: List[str],
                 rag_pipeline: RAGPipeline = None):
        self.model_config = model_config
        self.thresholds = thresholds
        self.questions = questions
        self.chunk_strategies = chunk_strategies

        # Use existing RAG pipeline or create new one
        global _GLOBAL_RAG_PIPELINE
        if rag_pipeline:
            self.rag_pipeline = rag_pipeline
        elif _GLOBAL_RAG_PIPELINE:
            self.rag_pipeline = _GLOBAL_RAG_PIPELINE
        else:
            print("Initializing new RAG pipeline")
            _GLOBAL_RAG_PIPELINE = RAGPipeline()
            self.rag_pipeline = _GLOBAL_RAG_PIPELINE

    def run_experiments(self) -> Dict:
        results = {
            "metadata": {
                "timestamp": time.strftime("%Y%m%d-%H%M%S"),
                "models_tested": [model["name"] for model in self.model_config.models],
                "thresholds_tested": self.thresholds,
                "chunk_strategies_tested": self.chunk_strategies,
                "temperature": self.model_config.temperature
            },
            "results": []
        }

        for strategy in self.chunk_strategies:
            if strategy == "semantic":
                thresholds_to_test = self.thresholds
            else:
                thresholds_to_test = [None]

            for threshold in thresholds_to_test:
                actual_threshold = threshold if strategy == "semantic" else 0

                # Get chunks and their stats
                chunks_data = self.rag_pipeline.create_chunks(
                    documents,
                    threshold=actual_threshold,
                    chunk_strategy=strategy
                )

                # Store chunk stats in a format that will persist through the pipeline
                chunk_stats = {
                    "strategy": strategy,
                    "threshold": threshold,
                    "stats": chunks_data["chunk_stats"]
                }

                for model_config in self.model_config.models:
                    model_name = model_config["name"]
                    print(f"\nTesting model: {model_name}")

                    with self.model_config.load_model(model_config) as model:
                        for question in self.questions:
                            print(f"Processing question: {question}")

                            context = self.rag_pipeline.run_cosine_search(
                                query=question,
                                threshold=threshold,
                                chunk_strategy=strategy
                            )

                            answer = self.rag_pipeline.generate_response(
                                query=question,
                                context_rag=context,
                                model=model
                            )

                            # Include chunk stats in results
                            results["results"].append({
                                "model": model_name,
                                "threshold": threshold if strategy == "semantic" else None,
                                "chunk_strategy": strategy,
                                "question": question,
                                "response": answer,
                                "chunk_stats": chunk_stats["stats"]  # Include the stats here
                            })

        return results


#Evaluator Class

In [None]:
import openai
import json
import tiktoken
import textwrap
import time
from datetime import datetime
from typing import Dict, List, Any

class ExperimentEvaluator:
    """Handles pure evaluation logic"""
    def __init__(self, api_key: str):
        self.client = openai.OpenAI(api_key=api_key)
        self.encoder = tiktoken.encoding_for_model("gpt-4o")

    def _get_baseline_answers(self, questions: List[str], source_doc: str) -> Dict[str, str]:
        """Get GPT-4o's own answers to the questions as baseline"""
        baseline_prompt = f"""Source Document:
        {source_doc}

        Using only the information from the source document above, answer these questions.
        Format your response as a valid JSON object with questions as keys and answers as values.
        Keep answers concise and factual.

        Questions to answer:
        {json.dumps(questions, indent=2)}"""

        try:
            print("\n--- Getting Baseline Answers ---")
            response = self.client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "You are a helpful assistant that provides JSON-formatted answers based on source documents."},
                    {"role": "user", "content": baseline_prompt}
                ],
                temperature=0.1
            )

            content = response.choices[0].message.content
            if '{' in content and '}' in content:
                json_str = content[content.find('{'):content.rfind('}')+1]
                return json.loads(json_str)
            return {"error": "No JSON structure found", "questions": questions}

        except Exception as e:
            print(f"Warning: Error getting baseline answers: {str(e)}")
            return {"error": str(e), "questions": questions}

    def evaluate_experiments(self, experiment_results: Dict, source_doc: str) -> Dict:
        """Core evaluation logic"""
        try:
            print("\n=== Starting Evaluation Process ===")
            questions = list(set(result["question"] for result in experiment_results["results"]))
            model_strategy_combinations = set(
                (result["model"],
                result["chunk_strategy"],
                result["threshold"] if result["chunk_strategy"] == "semantic" else None)
                for result in experiment_results["results"]
            )

            baseline_answers = self._get_baseline_answers(questions, source_doc)
            all_evaluations = []

            for model, strategy, threshold in model_strategy_combinations:
                relevant_results = [r for r in experiment_results["results"]
                                  if r["model"] == model and
                                     r["chunk_strategy"] == strategy and
                                     (r["threshold"] == threshold if strategy == "semantic" else True)]

                for result in relevant_results:
                    evaluation = self._evaluate_single_response(
                        result, baseline_answers.get(result["question"], "No baseline available")
                    )
                    all_evaluations.append(evaluation)

            return {
                "metadata": {
                    "timestamp": datetime.now().isoformat(),
                    "model_used": "gpt-4o",
                    "num_combinations_evaluated": len(model_strategy_combinations),
                    "num_questions_evaluated": len(questions),
                    "evaluation_status": "success"
                },
                "evaluations": all_evaluations,
                "summary": self._generate_summary(all_evaluations)
            }

        except Exception as e:
            print(f"\nCritical error in evaluation process: {str(e)}")
            return self._create_default_evaluation(experiment_results)

    def _evaluate_single_response(self, result: Dict, baseline: str) -> Dict:
        """Evaluate a single response with chunk stats preservation"""
        evaluation_prompt = self._construct_evaluation_prompt(result, baseline)

        try:
            response = self.client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "You are an expert at evaluating LLM responses for accuracy and quality."},
                    {"role": "user", "content": evaluation_prompt}
                ],
                temperature=0.7,
                max_tokens=1000
            )

            content = response.choices[0].message.content
            if '{' in content and '}' in content:
                json_str = content[content.find('{'):content.rfind('}')+1]
                evaluation = json.loads(json_str)

                # Preserve chunk stats in evaluation
                if "chunk_stats" in result:
                    evaluation["chunk_stats"] = result["chunk_stats"]

                return evaluation

            return self._create_default_single_evaluation(result)

        except Exception as e:
            print(f"Warning: Error evaluating response: {str(e)}")
            return self._create_default_single_evaluation(result)


    def _construct_evaluation_prompt(self, result: Dict, baseline: str) -> str:


        return f"""You are evaluating a RAG (Retrieval Augmented Generation) system's response to a specific question. Your task is to:
              1. First analyze if the requested information exists in the source document
              2. Then evaluate how well the system retrieved and presented that information

              EVALUATION CONTEXT:
              Question: {result["question"]}
              Your Analysis of Source Document: {baseline}


              System Response: {json.dumps(result["response"], indent=2)}

              SCORING CRITERIA:

              1. ACCURACY (0-100):
              - If information exists in source:
                * 90-100: Retrieved complete and correct information with proper context
                * 70-89: Retrieved correct information but with incomplete context
                * 0-30: Failed to find existing information
              - If information does not exist in source:
                * 90-100: Correctly stated information cannot be found
                * 0-30: Incorrectly claimed information is unavailable

              2. SOURCE ATTRIBUTION (0-100):
              - If information exists in source:
                * 90-100: Provided full context (e.g., complete table data)
                * 70-89: Provided partial but relevant context
                * 0-30: Failed to provide sources for available information
              - If information does not exist:
                * 90-100: Correctly indicated no relevant sources
                * 0-30: Failed to indicate lack of sources

              3. CONCISENESS (0-100):
              - 90-100: Direct answer with necessary context
              - 70-89: Includes some unnecessary details
              - 0-69: Verbose or missing important context

              4. REASONABLENESS (0-100):
              - 90-100: Confidence matches strength of evidence
              - 70-89: Slightly mismatched confidence level
              - 0-69: Inappropriate confidence given available evidence

              Evaluate evidence quality:
              - Table data > Direct quotes > Derived information > No sources

            Provide your evaluation in this exact JSON format:
            {{
                "model": "{result["model"]}",
                "chunk_strategy": "{result["chunk_strategy"]}",
                "threshold": {result["threshold"] if result["chunk_strategy"] == "semantic" else "null"},
                "question": "{result["question"]}",
                "scores": {{
                    "accuracy": <score>,
                    "conciseness": <score>,
                    "source_attribution": <score>,
                    "reasonableness": <score>
                }},
                "composite_score": <average of all scores>,
                "explanation": "Detailed explanation comparing the response to your baseline understanding and justifying each score"
            }}"""


    def _create_default_evaluation(self, experiment_results: Dict) -> Dict:
        """Create a default evaluation structure when parsing fails"""
        print("\n--- Creating Default Evaluation Due to Failure ---")
        default_eval = {
            "metadata": {
                "timestamp": datetime.now().isoformat(),
                "model_used": "gpt-4o",
                "num_permutations_evaluated": len(experiment_results["results"]),
                "num_questions_evaluated": len(set(r["question"] for r in experiment_results["results"])),
                "evaluation_status": "failed"
            },
            "evaluations": [],
            "summary": {
                "overall_performance": "Evaluation failed - using default structure",
                "optimal_permutation": "Not available",
                "performance_analysis": "Evaluation process encountered errors"
            }
        }

        for result in experiment_results["results"]:
            default_eval["evaluations"].append({
                "model": result["model"],
                "threshold": result["threshold"],
                "question": result["question"],
                "scores": {
                    "accuracy": 0,
                    "conciseness": 0,
                    "source_attribution": 0,
                    "reasonableness": 0
                },
                "composite_score": 0,
                "explanation": "Evaluation failed - default scores assigned"
            })

        print("Created default evaluation with", len(default_eval["evaluations"]), "empty evaluations")
        return default_eval

    def _create_default_single_evaluation(self, result: Dict) -> Dict:
        """Create a default evaluation for a single response when evaluation fails"""
        return {
            "model": result["model"],
            "chunk_strategy": result["chunk_strategy"],
            "threshold": result["threshold"] if result["chunk_strategy"] == "semantic" else None,
            "question": result["question"],
            "scores": {
                "accuracy": 0,
                "conciseness": 0,
                "source_attribution": 0,
                "reasonableness": 0
            },
            "composite_score": 0,
            "explanation": "Evaluation failed - default scores assigned"
        }


    def _generate_summary(self, evaluations: List[Dict]) -> Dict:
        """Generate summary statistics from evaluations with ordered results"""
        if not evaluations:
            return {
                "overall_performance": "No evaluations available",
                "optimal_permutation": "Not available",
                "performance_analysis": "Evaluation process failed",
                "chunk_stats": {}
            }

        # Calculate average scores by model/strategy combination
        strategy_scores = {}
        chunk_stats_by_config = {}

        # Generate desired order from configs
        desired_model_order = [model["name"] for model in MODEL_CONFIGS["models"]]
        desired_chunking_order = []
        for strategy in CHUNKING_CONFIGS["strategies"]:
            if strategy == "semantic":
                for threshold in sorted(CHUNKING_CONFIGS["thresholds"]):
                    desired_chunking_order.append((strategy, threshold))
            else:
                desired_chunking_order.append((strategy, None))

        # First pass: collect all scores and stats
        for eval in evaluations:
            # Handle scoring calculations
            key = (eval["model"], eval["chunk_strategy"])
            if "threshold" in eval and eval["chunk_strategy"] == "semantic":
                key = (eval["model"], eval["chunk_strategy"], eval["threshold"])

            if key not in strategy_scores:
                strategy_scores[key] = {
                    "count": 0,
                    "total_composite": 0
                }

            scores = strategy_scores[key]
            scores["count"] += 1
            scores["total_composite"] += eval["composite_score"]

            # Collect chunk statistics
            config_key = (eval["chunk_strategy"],
                        eval.get("threshold") if eval["chunk_strategy"] == "semantic" else None)

            if config_key not in chunk_stats_by_config and "chunk_stats" in eval:
                chunk_stats_by_config[config_key] = eval["chunk_stats"]

        # Calculate performance analysis with ordering
        best_score = 0
        best_config = None
        temp_strategy_analysis = {}
        ordered_strategy_analysis = {}

        # Calculate all scores first
        for key, scores in strategy_scores.items():
            avg_composite = scores["total_composite"] / scores["count"]
            model, strategy = key[:2]
            threshold = key[2] if len(key) == 3 else None

            config_str = (f"{model} with {strategy} chunking" +
                        (f" (threshold: {threshold})" if threshold is not None else ""))

            temp_strategy_analysis[key] = {
                'config_str': config_str,
                'score': avg_composite
            }

            if avg_composite > best_score:
                best_score = avg_composite
                best_config = config_str

        # Order performance analysis by model and then chunking strategy
        for model_name in desired_model_order:
            for strategy_key in desired_chunking_order:
                strategy, threshold = strategy_key
                key = (model_name, strategy, threshold) if threshold is not None else (model_name, strategy)
                if key in temp_strategy_analysis:
                    ordered_strategy_analysis[temp_strategy_analysis[key]['config_str']] = \
                        temp_strategy_analysis[key]['score']

        # Format chunk statistics with same ordering as chunking strategies
        final_chunk_stats = {}
        temp_stats = {}
        for (strategy, threshold), stats in chunk_stats_by_config.items():
            key = f"{strategy}" + (f"_{threshold}" if threshold is not None else "")
            temp_stats[key] = {
                "num_chunks": stats["num_chunks"],
                "min_size": stats["min_chunk_size"],
                "max_size": stats["max_chunk_size"],
                "median_size": stats.get("median_size",
                                      int((stats["min_chunk_size"] + stats["max_chunk_size"]) / 2)),
                "avg_size": stats["avg_chunk_size"]
            }

        # Create ordered chunk stats
        for strategy, threshold in desired_chunking_order:
            key = f"{strategy}" + (f"_{threshold}" if threshold is not None else "")
            if key in temp_stats:
                final_chunk_stats[key] = temp_stats[key]

        return {
            "overall_performance": f"Average composite score across all evaluations: {sum(e['composite_score'] for e in evaluations)/len(evaluations):.2f}/100",
            "optimal_permutation": f"Best performance: {best_config} (score: {best_score:.2f}/100)",
            "performance_analysis": ordered_strategy_analysis,
            "chunk_stats": final_chunk_stats
        }


#Results Manager Class

In [None]:
class ResultsManager:
    """Handles formatting, saving, and displaying evaluation results with simplified logic"""
    def __init__(self, save_directory: str):
        self.save_directory = save_directory
        os.makedirs(save_directory, exist_ok=True)

    def format_results(self, experiment_results: Dict, evaluation_results: Dict) -> Tuple[Dict, Dict]:
        """Format experiment and evaluation results into structured output"""
        print("\n=== Starting Results Formatting ===")

        # Format experiment results - removing chunk_stats
        formatted_experiment = {
            "metadata": experiment_results.get("metadata", {}),
            "results": [{
                "model": result["model"],
                "chunk_strategy": result["chunk_strategy"],
                "threshold": result["threshold"],
                "question": result["question"],
                "response": {
                    "answer": result["response"].get("response_text", ""),
                    "sources": result["response"].get("sources", [])
                }
            } for result in experiment_results["results"]]
        }

        # Format evaluation results - chunk_stats remain here
        formatted_evaluation = {
            "metadata": evaluation_results["metadata"],
            "evaluations": evaluation_results.get("evaluations", []),
            "overall_summary": evaluation_results.get("summary", {})
        }

        return formatted_experiment, formatted_evaluation




    def save_results(self, formatted_experiment: Dict, formatted_evaluation: Dict) -> Tuple[str, str]:
        """Save formatted results to JSON files"""
        timestamp = time.strftime("%Y%m%d-%H%M%S")

        experiment_file = f"{self.save_directory}/experiment_results_{timestamp}.json"
        evaluation_file = f"{self.save_directory}/evaluation_results_{timestamp}.json"

        for filepath, data in [
            (experiment_file, formatted_experiment),
            (evaluation_file, formatted_evaluation)
        ]:
            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)

        return experiment_file, evaluation_file

    def display_results(self, evaluation_results: Dict):
        """Display evaluation results in a clear, formatted manner"""
        print("\n" + "="*80)
        print("EVALUATION RESULTS")
        print("="*80)

        # Display metadata
        metadata = evaluation_results.get("metadata", {})
        print("\nMETADATA:")
        print("-"*80)
        print(f"Timestamp:           {metadata.get('timestamp', 'Not available')}")
        print(f"Model Used:          {metadata.get('model_used', 'Not available')}")
        print(f"Combinations:        {metadata.get('num_combinations_evaluated', 'Not available')}")
        print(f"Questions:           {metadata.get('num_questions_evaluated', 'Not available')}")
        print(f"Evaluation Status:   {metadata.get('evaluation_status', 'Not available')}")

        # Display summary
        summary = evaluation_results.get("summary", {})
        if summary:
            print("\nOVERALL ANALYSIS:")
            print("-"*80)

            # Overall performance
            if "overall_performance" in summary:
                print("\nPerformance Summary:")
                print(textwrap.fill(summary["overall_performance"], width=80))

            # Optimal configuration
            if "optimal_permutation" in summary:
                print("\nOptimal Configuration:")
                print(textwrap.fill(summary["optimal_permutation"], width=80))

            # Performance analysis
            if "performance_analysis" in summary:
                print("\nPerformance Analysis:")
                analysis = summary["performance_analysis"]
                if isinstance(analysis, dict):
                    for config, score in analysis.items():
                        print(f"{config}: {score:.2f}")
                else:
                    print(textwrap.fill(str(analysis), width=80))

            # Chunk statistics
            if "chunk_stats" in summary and summary["chunk_stats"]:
                print("\nChunk Statistics:")
                for config, stats in summary["chunk_stats"].items():
                    print(f"\n{config}:")
                    print(f"  Number of chunks: {stats['num_chunks']}")
                    print(f"  Min chunk size: {stats['min_size']} chars")
                    print(f"  Max chunk size: {stats['max_size']} chars")
                    print(f"  Median chunk size: {stats['median_size']} chars")
                    print(f"  Average chunk size: {stats['avg_size']:.1f} chars")

#Main

In [None]:
def main():
    # Initialize configurations
    model_config = ModelConfig(
        models=MODEL_CONFIGS["models"],
        temperature=0.3
    )

    # Initialize experiment runner
    experiment_runner = ExperimentRunner(
        model_config=model_config,
        thresholds=CHUNKING_CONFIGS["thresholds"],
        questions=QUESTION_CONFIGS["questions"],
        chunk_strategies=CHUNKING_CONFIGS["strategies"]
    )

    print("Starting experiment with configurations:")
    print(f"Models: {[model['name'] for model in model_config.models]}")
    print(f"Thresholds: {CHUNKING_CONFIGS['thresholds']}")
    print(f"Chunk strategies: {CHUNKING_CONFIGS['strategies']}")
    print(f"Number of questions: {len(QUESTION_CONFIGS['questions'])}")

    # Run experiments
    experiment_results = experiment_runner.run_experiments()

    # Get source document text
    source_doc = documents[0].text

    # Initialize evaluator
    print("\nInitializing GPT-4o evaluation...")
    evaluator = ExperimentEvaluator(api_key=userdata.get('OPENAI_API_KEY'))

    # Run evaluation
    evaluation_results = evaluator.evaluate_experiments(
        experiment_results=experiment_results,
        source_doc=source_doc
    )

    # Initialize results manager
    results_manager = ResultsManager(save_directory=FILE_CONFIGS['save_directory'])

    # Format results
    formatted_experiment, formatted_evaluation = results_manager.format_results(
        experiment_results=experiment_results,
        evaluation_results=evaluation_results
    )

    # Save results
    experiment_file, evaluation_file = results_manager.save_results(
        formatted_experiment=formatted_experiment,
        formatted_evaluation=formatted_evaluation
    )

    # Display results
    results_manager.display_results(evaluation_results=formatted_evaluation)

    print("\nExperiment complete!")
    print(f"Results saved to:")
    print(f"  Experiment results: {experiment_file}")
    print(f"  Evaluation results: {evaluation_file}")

    return formatted_experiment, formatted_evaluation


if __name__ == "__main__":
    results, evaluation = main()

Starting experiment with configurations:
Models: ['open-mistral-nemo']
Thresholds: [85, 95]
Chunk strategies: ['semantic', 'paragraph', 'header']
Number of questions: 1

Testing model: open-mistral-nemo
Processing question: What were cloud revenues in Q2 2024?

=== COSINE SEARCH DEBUG ===
Query: What were cloud revenues in Q2 2024?
Strategy: semantic
Threshold: 85
Requested k: 5
Cache key: semantic_85

Testing model: open-mistral-nemo
Processing question: What were cloud revenues in Q2 2024?

=== COSINE SEARCH DEBUG ===
Query: What were cloud revenues in Q2 2024?
Strategy: semantic
Threshold: 95
Requested k: 5
Cache key: semantic_95

=== CHUNK CREATION DEBUG ===
Strategy: paragraph
Cache key: paragraph_2048
Using max chunk size: 2048 characters with 100 character overlap

Testing model: open-mistral-nemo
Processing question: What were cloud revenues in Q2 2024?

=== COSINE SEARCH DEBUG ===
Query: What were cloud revenues in Q2 2024?
Strategy: paragraph
Threshold: None
Requested k: 5
Ca