# RAG Chunking Strategy Evaluator

A comprehensive system to evaluate and visualize different chunking strategies for RAG systems.
Supports custom chunking strategies through inheritance and includes RAGAS evaluation metrics.

In [1]:
import os

from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
os.environ['OPENAI_API_KEY'] = user_secrets.get_secret("OPENAI_API_KEY")


In [2]:
!pip install -q langchain-openai langchain-chroma ragas==0.1.9 plotly scikit-learn

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m86.1/86.1 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.4/63.4 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.0/19.0 MB[0m [31m62.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m94.9/94.9 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m438.3/438.3 kB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.5/65.5 kB[0m [31m2.6 MB/s[0m eta

In [3]:
import os
import json
import time
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, asdict
import hashlib

import pandas as pd
import numpy as np
from tqdm import tqdm

# Core RAG components
import chromadb
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores import Chroma

# RAGAS evaluation
from ragas import evaluate
from ragas.metrics import (
    answer_relevancy,
    faithfulness,
    context_precision,
    context_recall,
    answer_correctness
)
from datasets import Dataset

# Visualization
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.offline as pyo

# Suppress warnings
import warnings
warnings.filterwarnings('ignore')


For example, replace imports like: `from langchain_core.pydantic_v1 import BaseModel`
with: `from pydantic import BaseModel`
or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. 	from pydantic.v1 import BaseModel

  from ragas.metrics._answer_correctness import AnswerCorrectness, answer_correctness

For example, replace imports like: `from langchain.pydantic_v1 import BaseModel`
with: `from pydantic import BaseModel`
or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. 	from pydantic.v1 import BaseModel

  from ragas.metrics._context_entities_recall import (


In [4]:
@dataclass
class ChunkingConfig:
    """Configuration for chunking strategies"""
    name: str
    chunk_size: int = 1000
    chunk_overlap: int = 200
    separators: Optional[List[str]] = None
    custom_params: Optional[Dict[str, Any]] = None


@dataclass
class EvaluationResult:
    """Results from RAG evaluation"""
    strategy_name: str
    chunk_count: int
    avg_chunk_size: float
    retrieval_time: float
    generation_time: float
    answer_relevancy: float
    faithfulness: float
    context_precision: float
    context_recall: float
    answer_correctness: float
    overall_score: float


class ChunkingStrategy(ABC):
    """Abstract base class for chunking strategies"""
    
    def __init__(self, config: ChunkingConfig):
        self.config = config
    
    @abstractmethod
    def chunk_documents(self, documents: List[Document]) -> List[Document]:
        """Split documents into chunks"""
        pass
    
    @property
    def name(self) -> str:
        return self.config.name

In [5]:
class RecursiveCharacterChunking(ChunkingStrategy):
    """Langchain's Recursive Character Text Splitter"""
    
    def chunk_documents(self, documents: List[Document]) -> List[Document]:
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.config.chunk_size,
            chunk_overlap=self.config.chunk_overlap,
            separators=self.config.separators or ["\n\n", "\n", " ", ""]
        )
        return splitter.split_documents(documents)


class FixedSizeChunking(ChunkingStrategy):
    """Simple fixed-size chunking with overlap"""
    
    def chunk_documents(self, documents: List[Document]) -> List[Document]:
        chunks = []
        for doc in documents:
            text = doc.page_content
            start = 0
            
            while start < len(text):
                end = min(start + self.config.chunk_size, len(text))
                chunk_text = text[start:end]
                
                chunk = Document(
                    page_content=chunk_text,
                    metadata={**doc.metadata, "chunk_start": start, "chunk_end": end}
                )
                chunks.append(chunk)
                
                if end >= len(text):
                    break
                start = end - self.config.chunk_overlap
        
        return chunks


class SemanticChunking(ChunkingStrategy):
    """Sentence-based chunking that respects semantic boundaries"""
    
    def chunk_documents(self, documents: List[Document]) -> List[Document]:
        chunks = []
        for doc in documents:
            sentences = self._split_sentences(doc.page_content)
            current_chunk = ""
            
            for sentence in sentences:
                # Check if adding sentence exceeds limit
                if len(current_chunk) + len(sentence) > self.config.chunk_size and current_chunk:
                    # Save current chunk
                    chunk = Document(
                        page_content=current_chunk.strip(),
                        metadata=doc.metadata
                    )
                    chunks.append(chunk)
                    
                    # Start new chunk with overlap
                    overlap_words = current_chunk.split()[-self.config.chunk_overlap//10:]
                    current_chunk = " ".join(overlap_words) + " " + sentence
                else:
                    current_chunk += " " + sentence
            
            # Add final chunk
            if current_chunk.strip():
                chunk = Document(
                    page_content=current_chunk.strip(),
                    metadata=doc.metadata
                )
                chunks.append(chunk)
        
        return chunks
    
    def _split_sentences(self, text: str) -> List[str]:
        """Simple sentence splitting"""
        import re
        sentences = re.split(r'[.!?]+', text)
        return [s.strip() for s in sentences if s.strip()]

In [6]:
class RAGChunkingEvaluator:
    """Main evaluation system for RAG chunking strategies"""
    
    def __init__(self, openai_api_key: str, persist_directory: str = "./chroma_db"):
        self.openai_api_key = openai_api_key
        self.persist_directory = persist_directory
        self.embeddings = OpenAIEmbeddings(api_key=openai_api_key)
        self.llm = ChatOpenAI(model="gpt-4o-mini", api_key=openai_api_key)
        self.strategies: Dict[str, ChunkingStrategy] = {}
        self.results: List[EvaluationResult] = []
        
        # Initialize Chroma client
        self.chroma_client = chromadb.PersistentClient(path=persist_directory)
    
    def register_strategy(self, strategy: ChunkingStrategy):
        """Register a chunking strategy for evaluation"""
        self.strategies[strategy.name] = strategy
        print(f"Registered strategy: {strategy.name}")
    
    def load_documents(self, file_paths: List[str]) -> List[Document]:
        """Load documents from file paths"""
        documents = []
        for path in file_paths:
            with open(path, 'r', encoding='utf-8') as f:
                content = f.read()
                doc = Document(
                    page_content=content,
                    metadata={"source": path}
                )
                documents.append(doc)
        return documents
    
    def create_test_questions(self, documents: List[Document], num_questions: int = 10) -> List[Dict[str, str]]:
        """Generate test questions from documents"""
        questions = []
        
        # Sample text for question generation
        sample_texts = []
        for doc in documents[:3]:  # Use first 3 docs
            words = doc.page_content.split()[:500]  # First 500 words
            sample_texts.append(" ".join(words))
        
        combined_text = "\n\n".join(sample_texts)
        
        prompt = f"""Based on the following text, generate {num_questions} diverse questions that can be answered using the information provided. 
        
        Text:
        {combined_text}
        
        Return only the questions, one per line, without numbering or additional text."""
        
        response = self.llm.invoke(prompt)
        question_lines = [q.strip() for q in response.content.split('\n') if q.strip()]
        
        for i, question in enumerate(question_lines[:num_questions]):
            questions.append({
                "question": question,
                "question_id": f"q_{i+1}"
            })
        
        return questions
    
    def evaluate_strategy(self, 
                         strategy_name: str, 
                         documents: List[Document],
                         test_questions: List[Dict[str, str]],
                         top_k: int = 5) -> EvaluationResult:
        """Evaluate a single chunking strategy"""
        
        if strategy_name not in self.strategies:
            raise ValueError(f"Strategy {strategy_name} not registered")
        
        strategy = self.strategies[strategy_name]
        print(f"\nEvaluating strategy: {strategy_name}")
        
        # 1. Chunk documents
        chunks = strategy.chunk_documents(documents)
        print(f"Created {len(chunks)} chunks")
        
        # 2. Create vector store
        collection_name = f"eval_{strategy_name}_{hash(str(time.time()))}"
        collection_name = collection_name.replace("-", "_").replace(" ", "_")
        
        vectorstore = Chroma(
            collection_name=collection_name,
            embedding_function=self.embeddings,
            client=self.chroma_client
        )
        
        # Add chunks to vector store
        vectorstore.add_documents(chunks)
        
        # 3. Evaluate with test questions
        evaluation_data = []
        
        for q_data in tqdm(test_questions, desc="Processing questions"):
            question = q_data["question"]
            
            # Retrieve context
            start_time = time.time()
            retrieved_docs = vectorstore.similarity_search(question, k=top_k)
            retrieval_time = time.time() - start_time
            
            contexts = [doc.page_content for doc in retrieved_docs]
            
            # Generate answer
            start_time = time.time()
            context_text = "\n\n".join(contexts)
            prompt = f"Context:\n{context_text}\n\nQuestion: {question}\n\nAnswer:"
            
            response = self.llm.invoke(prompt)
            answer = response.content
            generation_time = time.time() - start_time
            
            # Prepare for RAGAS
            evaluation_data.append({
                "question": question,
                "answer": answer,
                "contexts": contexts,
                "ground_truth": answer,  # Using generated answer as proxy
                "retrieval_time": retrieval_time,
                "generation_time": generation_time
            })
        
        # 4. Run RAGAS evaluation
        dataset = Dataset.from_list(evaluation_data)
        
        metrics = [
            answer_relevancy,
            faithfulness,
            context_precision,
            context_recall,
            answer_correctness
        ]
        
        ragas_results = evaluate(dataset, metrics=metrics)
        
        # 5. Calculate statistics
        avg_chunk_size = np.mean([len(chunk.page_content) for chunk in chunks])
        avg_retrieval_time = np.mean([d["retrieval_time"] for d in evaluation_data])
        avg_generation_time = np.mean([d["generation_time"] for d in evaluation_data])
        
        # Calculate overall score (weighted average)
        overall_score = (
            ragas_results["answer_relevancy"] * 0.25 +
            ragas_results["faithfulness"] * 0.25 +
            ragas_results["context_precision"] * 0.2 +
            ragas_results["context_recall"] * 0.2 +
            ragas_results["answer_correctness"] * 0.1
        )
        
        result = EvaluationResult(
            strategy_name=strategy_name,
            chunk_count=len(chunks),
            avg_chunk_size=avg_chunk_size,
            retrieval_time=avg_retrieval_time,
            generation_time=avg_generation_time,
            answer_relevancy=ragas_results["answer_relevancy"],
            faithfulness=ragas_results["faithfulness"],
            context_precision=ragas_results["context_precision"],
            context_recall=ragas_results["context_recall"],
            answer_correctness=ragas_results["answer_correctness"],
            overall_score=overall_score
        )
        
        # Cleanup
        try:
            self.chroma_client.delete_collection(collection_name)
        except:
            pass
        
        return result
    
    def run_evaluation(self, 
                      documents: List[Document],
                      test_questions: Optional[List[Dict[str, str]]] = None,
                      num_questions: int = 10) -> List[EvaluationResult]:
        """Run evaluation on all registered strategies"""
        
        if not test_questions:
            print("Generating test questions...")
            test_questions = self.create_test_questions(documents, num_questions)
        
        self.results = []
        
        for strategy_name in self.strategies:
            try:
                result = self.evaluate_strategy(strategy_name, documents, test_questions)
                self.results.append(result)
                print(f"✓ Completed {strategy_name}")
            except Exception as e:
                print(f"✗ Failed {strategy_name}: {str(e)}")
        
        return self.results
    
    def create_dashboard(self, save_path: str = "rag_evaluation_dashboard.html"):
        """Create interactive Plotly dashboard"""
        
        if not self.results:
            print("No results to visualize. Run evaluation first.")
            return
        
        df = pd.DataFrame([asdict(result) for result in self.results])
        
        # Create subplots
        fig = make_subplots(
            rows=3, cols=2,
            subplot_titles=[
                "Overall Performance Comparison",
                "Chunk Statistics",
                "RAGAS Metrics Breakdown",
                "Performance vs Chunk Size",
                "Retrieval & Generation Time",
                "Strategy Rankings"
            ],
            specs=[
                [{"type": "bar"}, {"type": "scatter"}],
                [{"type": "bar"}, {"type": "scatter"}],
                [{"type": "bar"}, {"type": "table"}]
            ]
        )
        
        # 1. Overall Performance
        fig.add_trace(
            go.Bar(
                x=df['strategy_name'],
                y=df['overall_score'],
                name="Overall Score",
                marker_color='lightblue'
            ),
            row=1, col=1
        )
        
        # 2. Chunk Statistics
        fig.add_trace(
            go.Scatter(
                x=df['chunk_count'],
                y=df['avg_chunk_size'],
                mode='markers+text',
                text=df['strategy_name'],
                textposition="top center",
                name="Chunk Stats",
                marker=dict(size=10, color='orange')
            ),
            row=1, col=2
        )
        
        # 3. RAGAS Metrics
        metrics = ['answer_relevancy', 'faithfulness', 'context_precision', 'context_recall', 'answer_correctness']
        for i, metric in enumerate(metrics):
            fig.add_trace(
                go.Bar(
                    x=df['strategy_name'],
                    y=df[metric],
                    name=metric.replace('_', ' ').title(),
                    offsetgroup=i
                ),
                row=2, col=1
            )
        
        # 4. Performance vs Chunk Size
        fig.add_trace(
            go.Scatter(
                x=df['avg_chunk_size'],
                y=df['overall_score'],
                mode='markers+text',
                text=df['strategy_name'],
                textposition="top center",
                name="Performance vs Size",
                marker=dict(size=12, color='green')
            ),
            row=2, col=2
        )
        
        # 5. Time Analysis
        fig.add_trace(
            go.Bar(
                x=df['strategy_name'],
                y=df['retrieval_time'],
                name="Retrieval Time",
                marker_color='red',
                offsetgroup=0
            ),
            row=3, col=1
        )
        
        fig.add_trace(
            go.Bar(
                x=df['strategy_name'],
                y=df['generation_time'],
                name="Generation Time",
                marker_color='blue',
                offsetgroup=1
            ),
            row=3, col=1
        )
        
        # 6. Rankings Table
        df_sorted = df.sort_values('overall_score', ascending=False)
        df_sorted['rank'] = range(1, len(df_sorted) + 1)
        
        fig.add_trace(
            go.Table(
                header=dict(
                    values=['Rank', 'Strategy', 'Overall Score', 'Best Metric'],
                    fill_color='lightgray'
                ),
                cells=dict(
                    values=[
                        df_sorted['rank'],
                        df_sorted['strategy_name'],
                        [f"{score:.3f}" for score in df_sorted['overall_score']],
                        [self._best_metric(row) for _, row in df_sorted.iterrows()]
                    ]
                )
            ),
            row=3, col=2
        )
        
        # Update layout
        fig.update_layout(
            height=1200,
            title="RAG Chunking Strategy Performance Dashboard",
            showlegend=True
        )
        
        # Save dashboard
        pyo.plot(fig, filename=save_path, auto_open=False)
        print(f"Dashboard saved to: {save_path}")
        return fig
    
    def _best_metric(self, row) -> str:
        """Find the best performing metric for a strategy"""
        metrics = ['answer_relevancy', 'faithfulness', 'context_precision', 'context_recall', 'answer_correctness']
        best_metric = max(metrics, key=lambda m: row[m])
        return best_metric.replace('_', ' ').title()
    
    def save_results(self, filename: str = "rag_evaluation_results.json"):
        """Save evaluation results to JSON"""
        results_dict = [asdict(result) for result in self.results]
        with open(filename, 'w') as f:
            json.dump(results_dict, f, indent=2)
        print(f"Results saved to: {filename}")
    
    def get_best_strategy(self) -> Optional[EvaluationResult]:
        """Get the best performing strategy"""
        if not self.results:
            return None
        return max(self.results, key=lambda r: r.overall_score)

In [7]:
def demo_usage():
    """Demonstrate how to use the RAG Chunking Evaluator"""
    
    # Initialize evaluator
    evaluator = RAGChunkingEvaluator(
        openai_api_key=os.environ['OPENAI_API_KEY'],
        persist_directory="./demo_chroma"
    )
    
    # Register strategies
    evaluator.register_strategy(
        RecursiveCharacterChunking(
            ChunkingConfig(name="Recursive-1000", chunk_size=1000, chunk_overlap=200)
        )
    )
    
    evaluator.register_strategy(
        RecursiveCharacterChunking(
            ChunkingConfig(name="Recursive-500", chunk_size=500, chunk_overlap=100)
        )
    )
    
    evaluator.register_strategy(
        FixedSizeChunking(
            ChunkingConfig(name="Fixed-1000", chunk_size=1000, chunk_overlap=200)
        )
    )
    
    evaluator.register_strategy(
        SemanticChunking(
            ChunkingConfig(name="Semantic-1000", chunk_size=1000, chunk_overlap=50)
        )
    )
    
    # Load documents (replace with your document paths)
    documents = evaluator.load_documents([
        "document1.txt",
        "document2.txt"
    ])
    
    # Run evaluation
    results = evaluator.run_evaluation(documents, num_questions=15)
    
    # Create dashboard
    evaluator.create_dashboard("chunking_evaluation.html")
    
    # Save results
    evaluator.save_results("evaluation_results.json")
    
    # Print best strategy
    best = evaluator.get_best_strategy()
    if best:
        print(f"\nBest Strategy: {best.strategy_name}")
        print(f"Overall Score: {best.overall_score:.3f}")

In [8]:
if __name__ == "__main__":
    print("RAG Chunking Strategy Evaluator")
    print("=" * 50)
    print("\nTo use this system:")
    print("1. Set your OpenAI API key")
    print("2. Register chunking strategies")
    print("3. Load your documents")
    print("4. Run evaluation")
    print("5. View dashboard")
    print("\nSee demo_usage() function for example.")

RAG Chunking Strategy Evaluator

To use this system:
1. Set your OpenAI API key
2. Register chunking strategies
3. Load your documents
4. Run evaluation
5. View dashboard

See demo_usage() function for example.


In [9]:
!curl https://www.gutenberg.org/files/46/46-0.txt > document1.txt
!curl https://www.gutenberg.org/files/1342/1342-0.txt > document2.txt

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  177k  100  177k    0     0   224k      0 --:--:-- --:--:-- --:--:--  224k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  734k  100  734k    0     0   832k      0 --:--:-- --:--:-- --:--:--  833k


In [10]:
# demo_usage()

In [11]:
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from dataclasses import asdict
import json
import os
import numpy as np
from scipy import stats
from scipy.stats import ttest_ind, mannwhitneyu
import warnings
warnings.filterwarnings('ignore')

def create_individual_charts(results, output_dir="rag_charts", cost_per_token=None, format='html'):
    """
    Create individual RAG evaluation charts
    
    Args:
        results: List of evaluation results OR path to JSON file
        output_dir: Directory to save chart files
        format: 'html' (recommended) or 'png'
        cost_per_token: Dict with 'input' and 'output' token costs for cost analysis
                       e.g., {'input': 0.00001, 'output': 0.00003}
    """
    
    # Load data if results is a file path
    if isinstance(results, str):
        with open(results, 'r') as f:
            data = json.load(f)
    else:
        data = results
        
    if not data:
        print("No results provided")
        return None
    
    # Convert to DataFrame
    def convert_to_dict(result):
        if isinstance(result, dict):
            return result
        elif hasattr(result, '__dict__'):
            return vars(result)
        elif hasattr(result, '_asdict'):
            return result._asdict()
        else:
            try:
                return asdict(result)
            except TypeError:
                raise TypeError(f"Cannot convert result type {type(result)} to dict")
    
    df = pd.DataFrame([convert_to_dict(result) for result in data])
    
    # Add cost analysis if token costs provided
    if cost_per_token and 'input_tokens' in df.columns and 'output_tokens' in df.columns:
        df['cost_per_query'] = (df['input_tokens'] * cost_per_token['input'] + 
                               df['output_tokens'] * cost_per_token['output'])
        df['cost_per_performance'] = df['cost_per_query'] / df['overall_score']
    
    # Create output directory
    os.makedirs(output_dir, exist_ok=True)
    
    # Color palette
    colors = {
        'primary': '#1f77b4',
        'secondary': '#ff7f0e', 
        'success': '#2ca02c',
        'danger': '#d62728',
        'warning': '#ff9800',
        'info': '#17a2b8',
        'purple': '#9467bd',
        'gray': '#7f7f7f'
    }
    
    charts_created = []
    file_ext = 'html' if format == 'html' else 'png'
    
    # Helper function to save charts
    def save_chart(fig, filename_base):
        filename = f"{output_dir}/{filename_base}.{file_ext}"
        if format == 'html':
            fig.write_html(filename)
        else:
            try:
                # Try different export methods for PNG
                fig.write_image(filename, width=1000, height=700, engine="kaleido")
            except Exception as e1:
                try:
                    fig.write_image(filename, width=1000, height=700, engine="orca")
                except Exception as e2:
                    try:
                        import plotly.io as pio
                        pio.write_image(fig, filename, width=1000, height=700)
                    except Exception as e3:
                        print(f"PNG export failed. Saving as HTML instead: {e3}")
                        filename = f"{output_dir}/{filename_base}.html"
                        fig.write_html(filename)
        return filename
    
    # Statistical significance testing
    def perform_significance_tests():
        """Test if performance differences are statistically significant"""
        strategies = df['strategy_name'].unique()
        results = {}
        
        if len(strategies) < 2:
            return results
            
        for i, strategy1 in enumerate(strategies):
            for strategy2 in strategies[i+1:]:
                score1 = df[df['strategy_name'] == strategy1]['overall_score'].values
                score2 = df[df['strategy_name'] == strategy2]['overall_score'].values
                
                # Use Mann-Whitney U test (non-parametric) since we likely have small samples
                try:
                    statistic, p_value = mannwhitneyu(score1, score2, alternative='two-sided')
                    results[f"{strategy1} vs {strategy2}"] = {
                        'p_value': p_value,
                        'significant': p_value < 0.05,
                        'effect_size': abs(np.mean(score1) - np.mean(score2)) / np.sqrt((np.var(score1) + np.var(score2)) / 2)
                    }
                except:
                    continue
                    
        return results
    
    sig_tests = perform_significance_tests()
    
    # 1. Performance Ranking with Statistical Significance
    def create_performance_chart():
        df_sorted = df.sort_values('overall_score', ascending=True)
        
        # Add error bars if multiple measurements per strategy
        error_bars = []
        for strategy in df_sorted['strategy_name']:
            strategy_scores = df[df['strategy_name'] == strategy]['overall_score']
            if len(strategy_scores) > 1:
                error_bars.append(strategy_scores.std())
            else:
                error_bars.append(0)
        
        fig = go.Figure(go.Bar(
            x=df_sorted['overall_score'],
            y=df_sorted['strategy_name'],
            orientation='h',
            marker_color=colors['primary'],
            text=[f"{score:.3f}" for score in df_sorted['overall_score']],
            textposition='outside',
            error_x=dict(type='data', array=error_bars, visible=True),
            hovertemplate='<b>%{y}</b><br>Score: %{x:.3f}<br>Std Dev: %{error_x:.3f}<extra></extra>'
        ))
        
        # Add significance annotations
        annotations = []
        if sig_tests:
            top_strategy = df_sorted.iloc[-1]['strategy_name']
            y_pos = len(df_sorted) - 1
            
            for comparison, test_result in sig_tests.items():
                if top_strategy in comparison and test_result['significant']:
                    annotations.append(
                        dict(x=df_sorted.iloc[-1]['overall_score'], y=y_pos,
                             text=f"p<0.05*", showarrow=True, arrowhead=2,
                             arrowcolor='red', arrowsize=1, arrowwidth=2)
                    )
                    break
        
        fig.update_layout(
            title={
                'text': 'RAG Strategy Performance Ranking<br><sub>Error bars show standard deviation • * indicates statistical significance</sub>',
                'x': 0.5,
                'font': {'size': 20}
            },
            xaxis_title='Overall Performance Score',
            yaxis_title='Chunking Strategy',
            height=max(500, len(df) * 60),
            width=1000,
            margin=dict(l=200, r=150, t=120, b=80),
            plot_bgcolor='white',
            font=dict(size=14),
            annotations=annotations
        )
        
        fig.update_xaxes(showgrid=True, gridcolor='lightgray')
        return save_chart(fig, "01_performance_ranking")
    
    # 2. Chunk Distribution with Performance Correlation
    def create_chunk_analysis():
        # Calculate correlation
        chunk_perf_corr = df['chunk_count'].corr(df['overall_score'])
        size_perf_corr = df['avg_chunk_size'].corr(df['overall_score'])
        
        fig = go.Figure(go.Scatter(
            x=df['chunk_count'],
            y=df['avg_chunk_size'],
            mode='markers+text',
            text=df['strategy_name'],
            textposition="top center",
            marker=dict(
                size=df['overall_score'] * 40,
                color=df['overall_score'],
                colorscale='Viridis',
                showscale=True,
                colorbar=dict(
                    title="Performance Score",
                    titleside="right",
                    len=0.7
                ),
                line=dict(width=2, color='white'),
                opacity=0.8
            ),
            hovertemplate='<b>%{text}</b><br>' +
                         'Chunks: %{x}<br>' +
                         'Avg Size: %{y:.0f} chars<br>' +
                         'Score: %{marker.color:.3f}<extra></extra>'
        ))
        
        fig.update_layout(
            title={
                'text': f'Chunk Distribution vs Performance<br><sub>Bubble size = performance • Chunk-Perf correlation: {chunk_perf_corr:.3f} • Size-Perf correlation: {size_perf_corr:.3f}</sub>',
                'x': 0.5,
                'font': {'size': 20}
            },
            xaxis_title='Number of Chunks',
            yaxis_title='Average Chunk Size (characters)',
            height=700,
            width=1000,
            plot_bgcolor='white',
            font=dict(size=14)
        )
        
        fig.update_xaxes(showgrid=True, gridcolor='lightgray')
        fig.update_yaxes(showgrid=True, gridcolor='lightgray')
        
        return save_chart(fig, "02_chunk_distribution")
    
    # 3. RAGAS Metrics Radar Chart
    def create_metrics_radar():
        metrics = ['answer_relevancy', 'faithfulness', 'context_precision', 'context_recall', 'answer_correctness']
        metric_labels = [m.replace('_', ' ').title() for m in metrics]
        
        fig = go.Figure()
        
        colors_list = [colors['primary'], colors['secondary'], colors['success'], colors['danger'], colors['warning'], colors['purple']]
        
        for i, (_, row) in enumerate(df.iterrows()):
            values = [row[metric] for metric in metrics]
            values += [values[0]]  # Close the radar chart
            
            fig.add_trace(go.Scatterpolar(
                r=values,
                theta=metric_labels + [metric_labels[0]],
                fill='toself',
                name=row['strategy_name'],
                line_color=colors_list[i % len(colors_list)],
                opacity=0.6
            ))
        
        fig.update_layout(
            polar=dict(
                radialaxis=dict(
                    visible=True,
                    range=[0, 1],
                    tickformat='.2f'
                )
            ),
            title={
                'text': 'RAGAS Metrics Comparison<br><sub>All metrics normalized to 0-1 scale</sub>',
                'x': 0.5,
                'font': {'size': 20}
            },
            height=700,
            width=1000,
            font=dict(size=14),
            legend=dict(
                orientation="v",
                yanchor="middle",
                y=0.5,
                xanchor="left",
                x=1.05
            )
        )
        
        return save_chart(fig, "03_metrics_radar")
    
    # 4. Performance vs Chunk Size with Trend Analysis
    def create_size_performance():
        fig = go.Figure(go.Scatter(
            x=df['avg_chunk_size'],
            y=df['overall_score'],
            mode='markers+text',
            text=df['strategy_name'],
            textposition="top center",
            marker=dict(
                size=20,
                color=colors['secondary'],
                line=dict(width=2, color='white')
            ),
            hovertemplate='<b>%{text}</b><br>' +
                         'Chunk Size: %{x:.0f} chars<br>' +
                         'Performance: %{y:.3f}<extra></extra>'
        ))
        
        # Add trendline with confidence interval
        if len(df) > 2:
            z = np.polyfit(df['avg_chunk_size'], df['overall_score'], 1)
            p = np.poly1d(z)
            
            x_trend = np.linspace(df['avg_chunk_size'].min(), df['avg_chunk_size'].max(), 100)
            y_trend = p(x_trend)
            
            # Calculate R-squared
            y_mean = np.mean(df['overall_score'])
            ss_tot = np.sum((df['overall_score'] - y_mean) ** 2)
            ss_res = np.sum((df['overall_score'] - p(df['avg_chunk_size'])) ** 2)
            r_squared = 1 - (ss_res / ss_tot)
            
            fig.add_trace(go.Scatter(
                x=x_trend,
                y=y_trend,
                mode='lines',
                name=f'Trend (R²={r_squared:.3f})',
                line=dict(color=colors['danger'], dash='dash', width=3),
                hovertemplate='Trendline<extra></extra>'
            ))
        
        fig.update_layout(
            title={
                'text': 'Chunk Size vs Performance<br><sub>Identifying optimal chunk size ranges</sub>',
                'x': 0.5,
                'font': {'size': 20}
            },
            xaxis_title='Average Chunk Size (characters)',
            yaxis_title='Overall Performance Score',
            height=700,
            width=1000,
            plot_bgcolor='white',
            font=dict(size=14),
            showlegend=True
        )
        
        fig.update_xaxes(showgrid=True, gridcolor='lightgray')
        fig.update_yaxes(showgrid=True, gridcolor='lightgray')
        
        return save_chart(fig, "04_size_vs_performance")
    
    # 5. Processing Time Analysis
    def create_time_analysis():
        df_sorted = df.sort_values('retrieval_time', ascending=True)
        
        fig = go.Figure()
        
        # Total time for sorting
        df_sorted['total_time'] = df_sorted['retrieval_time'] + df_sorted['generation_time']
        
        fig.add_trace(go.Bar(
            y=df_sorted['strategy_name'],
            x=df_sorted['retrieval_time'],
            orientation='h',
            name='Retrieval Time',
            marker_color=colors['info'],
            text=[f"{t:.2f}s" for t in df_sorted['retrieval_time']],
            textposition='inside',
            hovertemplate='<b>%{y}</b><br>Retrieval: %{x:.3f}s<extra></extra>'
        ))
        
        fig.add_trace(go.Bar(
            y=df_sorted['strategy_name'],
            x=df_sorted['generation_time'],
            orientation='h',
            name='Generation Time',
            marker_color=colors['warning'],
            text=[f"{t:.2f}s" for t in df_sorted['generation_time']],
            textposition='inside',
            hovertemplate='<b>%{y}</b><br>Generation: %{x:.3f}s<extra></extra>'
        ))
        
        fig.update_layout(
            title={
                'text': 'Processing Time Breakdown<br><sub>Lower times indicate faster processing</sub>',
                'x': 0.5,
                'font': {'size': 20}
            },
            xaxis_title='Time (seconds)',
            yaxis_title='Chunking Strategy',
            height=max(500, len(df) * 70),
            width=1000,
            margin=dict(l=200, r=150, t=120, b=80),
            plot_bgcolor='white',
            font=dict(size=14),
            barmode='group',
            legend=dict(
                orientation="h",
                yanchor="bottom",
                y=1.02,
                xanchor="right",
                x=1
            )
        )
        
        fig.update_xaxes(showgrid=True, gridcolor='lightgray')
        
        return save_chart(fig, "05_processing_time")
    
    # 6. Cost Analysis (if cost data available)
    def create_cost_analysis():
        if 'cost_per_query' not in df.columns:
            return None
            
        fig = go.Figure()
        
        # Cost per query
        fig.add_trace(go.Scatter(
            x=df['cost_per_query'],
            y=df['overall_score'],
            mode='markers+text',
            text=df['strategy_name'],
            textposition="top center",
            name='Cost vs Performance',
            marker=dict(
                size=20,
                color=colors['success'],
                line=dict(width=2, color='white')
            ),
            hovertemplate='<b>%{text}</b><br>' +
                         'Cost per Query: $%{x:.4f}<br>' +
                         'Performance: %{y:.3f}<extra></extra>'
        ))
        
        # Add Pareto frontier (efficiency frontier)
        df_sorted = df.sort_values('cost_per_query')
        pareto_points = []
        max_performance = 0
        
        for _, row in df_sorted.iterrows():
            if row['overall_score'] > max_performance:
                pareto_points.append(row)
                max_performance = row['overall_score']
        
        if len(pareto_points) > 1:
            pareto_df = pd.DataFrame(pareto_points)
            fig.add_trace(go.Scatter(
                x=pareto_df['cost_per_query'],
                y=pareto_df['overall_score'],
                mode='lines',
                name='Efficiency Frontier',
                line=dict(color=colors['danger'], dash='dot', width=3),
                hovertemplate='Pareto Frontier<extra></extra>'
            ))
        
        fig.update_layout(
            title={
                'text': 'Cost vs Performance Analysis<br><sub>Pareto frontier shows most efficient strategies</sub>',
                'x': 0.5,
                'font': {'size': 20}
            },
            xaxis_title='Cost per Query ($)',
            yaxis_title='Overall Performance Score',
            height=700,
            width=1000,
            plot_bgcolor='white',
            font=dict(size=14),
            showlegend=True
        )
        
        fig.update_xaxes(showgrid=True, gridcolor='lightgray')
        fig.update_yaxes(showgrid=True, gridcolor='lightgray')
        
        return save_chart(fig, "06_cost_analysis")
    
    # 7. Statistical Significance Summary
    def create_significance_chart():
        if not sig_tests:
            return None
            
        # Create significance matrix
        strategies = df['strategy_name'].unique()
        sig_matrix = np.zeros((len(strategies), len(strategies)))
        
        for i, strategy1 in enumerate(strategies):
            for j, strategy2 in enumerate(strategies):
                if i != j:
                    comparison = f"{strategy1} vs {strategy2}"
                    reverse_comparison = f"{strategy2} vs {strategy1}"
                    
                    if comparison in sig_tests:
                        sig_matrix[i, j] = 1 if sig_tests[comparison]['significant'] else 0.5
                    elif reverse_comparison in sig_tests:
                        sig_matrix[i, j] = 1 if sig_tests[reverse_comparison]['significant'] else 0.5
        
        fig = go.Figure(data=go.Heatmap(
            z=sig_matrix,
            x=strategies,
            y=strategies,
            colorscale=[[0, 'white'], [0.5, 'yellow'], [1, 'red']],
            showscale=True,
            colorbar=dict(
                title="Significance",
                tickvals=[0, 0.5, 1],
                ticktext=['No Test', 'Not Significant', 'Significant (p<0.05)']
            ),
            hovertemplate='<b>%{y} vs %{x}</b><br>Significance: %{z}<extra></extra>'
        ))
        
        fig.update_layout(
            title={
                'text': 'Statistical Significance Matrix<br><sub>Red indicates significant performance differences (p<0.05)</sub>',
                'x': 0.5,
                'font': {'size': 20}
            },
            xaxis_title='Strategy',
            yaxis_title='Strategy',
            height=600,
            width=1000,
            font=dict(size=14)
        )
        
        return save_chart(fig, "07_significance_matrix")
    
    # Create all charts
    try:
        charts_created.append(create_performance_chart())
        charts_created.append(create_chunk_analysis())
        charts_created.append(create_metrics_radar())
        charts_created.append(create_size_performance())
        charts_created.append(create_time_analysis())
        
        cost_chart = create_cost_analysis()
        if cost_chart:
            charts_created.append(cost_chart)
            
        sig_chart = create_significance_chart()
        if sig_chart:
            charts_created.append(sig_chart)
        
        # Filter out None values
        charts_created = [chart for chart in charts_created if chart is not None]
        
        print(f"Created {len(charts_created)} {format.upper()} charts in '{output_dir}/':")
        for chart in charts_created:
            print(f"  - {os.path.basename(chart)}")
            
        # Print statistical summary
        if sig_tests:
            print(f"\nStatistical Significance Summary:")
            significant_pairs = sum(1 for test in sig_tests.values() if test['significant'])
            print(f"  - {significant_pairs}/{len(sig_tests)} comparisons show significant differences")
            
        return charts_created
        
    except Exception as e:
        print(f"Error creating charts: {e}")
        return None

# Usage examples:
# HTML format (recommended):
# charts = create_individual_charts("/path/to/results.json", format='html')

# PNG format (if kaleido works):
# charts = create_individual_charts("/path/to/results.json", format='png')

# With cost analysis:
# cost_config = {'input': 0.00001, 'output': 0.00003}
# charts = create_individual_charts("/path/to/results.json", 
#                                 cost_per_token=cost_config, format='html')

In [12]:
 charts = create_individual_charts("/kaggle/input/rag-evaluation-restults-json/evaluation_results.json")

Created 6 HTML charts in 'rag_charts/':
  - 01_performance_ranking.html
  - 02_chunk_distribution.html
  - 03_metrics_radar.html
  - 04_size_vs_performance.html
  - 05_processing_time.html
  - 07_significance_matrix.html

Statistical Significance Summary:
  - 0/6 comparisons show significant differences
