In [1]:
import io
import os
import zipfile
import requests
import frontmatter
import logging
import asyncio
from tqdm import tqdm
from minsearch import Index
from typing import List, Any, Dict, Tuple, Optional
from dotenv import load_dotenv
from openai import OpenAI
from pydantic_ai import Agent
import json
import secrets
from pathlib import Path
from datetime import datetime
import numpy as np
from collections import defaultdict, Counter
from sentence_transformers import SentenceTransformer
from pydantic_ai.messages import ModelMessagesTypeAdapter

# Load environment variables
load_dotenv()

True

In [2]:
def read_repo_data(repo_owner, repo_name, branch="main"):
    """
    Download and parse all markdown files from a GitHub repository.
    Yields one document (dict) at a time to avoid loading everything into memory.

    Args:
        repo_owner (str): GitHub username or organization
        repo_name (str): Repository name
        branch (str): Branch name (default: main)
    """
    url = f"https://codeload.github.com/{repo_owner}/{repo_name}/zip/refs/heads/{branch}"
    resp = requests.get(url)

    if resp.status_code == 404 and branch == "main":
        # Try fallback to master
        return read_repo_data(repo_owner, repo_name, branch="master")

    if resp.status_code != 200:
        raise Exception(f"Failed to download repository: HTTP {resp.status_code}")

    with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
        for file_info in zf.infolist():
            filename = file_info.filename
            if not filename.lower().endswith((".md", ".mdx")):
                continue
            try:
                with zf.open(file_info) as f_in:
                    content = f_in.read().decode("utf-8", errors="replace")
                    post = frontmatter.loads(content)
                    data = post.to_dict()
                    data.update({
                        "filename": filename,
                        "repo": repo_name,
                        "owner": repo_owner,
                        "branch": branch
                    })
                    yield data
            except Exception as e:
                logging.warning("Error processing %s: %s", filename, e)
                continue

In [3]:
def sliding_window(seq, size, step):
    """Yield overlapping chunks from a long string."""
    if size <= 0 or step <= 0:
        raise ValueError("size and step must be positive")
    n = len(seq)
    for i in range(0, n, step):
        yield {"start": i, "chunk": seq[i:i+size]}
        if i + size >= n:
            break

In [4]:
evidently_chunks = []

for doc in tqdm(read_repo_data("evidentlyai", "docs"), desc="Processing files"):
    doc_copy = doc.copy()
    content = doc_copy.pop("content", "")
    for chunk in sliding_window(content, size=2000, step=1000):
        chunk.update(doc_copy)
        evidently_chunks.append(chunk)

print(f"Collected {len(evidently_chunks)} chunks. Building index...")

# Build text search index
index = Index(
    text_fields=["chunk", "title", "description", "filename"],
    keyword_fields=[]
)
index.fit(evidently_chunks)

print("Text indexing complete!")

Processing files: 95it [00:00, 115.38it/s]

Collected 575 chunks. Building index...
Text indexing complete!





In [5]:
class VectorSearch:
    """
    Simple vector search implementation using cosine similarity.
    """
    def __init__(self):
        self.embeddings = None
        self.documents = None
    
    def fit(self, embeddings: np.ndarray, documents: List[Dict]):
        """
        Store embeddings and associated documents.
        """
        self.embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)  # Normalize
        self.documents = documents
    
    def search(self, query_embedding: np.ndarray, num_results: int = 5) -> List[Dict]:
        """
        Search for most similar documents using cosine similarity.
        """
        if self.embeddings is None:
            return []
        
        # Normalize query embedding
        query_norm = query_embedding / np.linalg.norm(query_embedding)
        
        # Calculate cosine similarities
        similarities = np.dot(self.embeddings, query_norm)
        
        # Get top k indices
        top_indices = np.argsort(similarities)[-num_results:][::-1]
        
        # Return documents with similarity scores
        results = []
        for idx in top_indices:
            doc = self.documents[idx].copy()
            doc['similarity_score'] = float(similarities[idx])
            results.append(doc)
        
        return results

In [6]:
print("Creating embeddings for semantic search...")

# Initialize an empty list to store embeddings for each chunk
evidently_embeddings = []

# Load a pre-trained sentence transformer model for creating embeddings
print("Loading embedding model...")
embedding_model = SentenceTransformer('multi-qa-distilbert-cos-v1')

# Loop through each document chunk in evidently_chunks
print("Encoding document chunks...")
for d in tqdm(evidently_chunks, desc="Creating embeddings"):
    # Create a combined text for better context
    text_to_encode = d['chunk']
    if 'title' in d and d['title']:
        text_to_encode = f"{d['title']}. {text_to_encode}"
    if 'filename' in d:
        # Extract meaningful parts from filename
        filename_parts = d['filename'].replace('/', ' ').replace('_', ' ').replace('.mdx', '').replace('.md', '')
        text_to_encode = f"{filename_parts}. {text_to_encode}"
    
    # Encode the enhanced text into a vector (embedding)
    v = embedding_model.encode(text_to_encode, show_progress_bar=False)
    
    # Append the embedding to the list
    evidently_embeddings.append(v)

# Convert the list of embeddings into a NumPy array
evidently_embeddings = np.array(evidently_embeddings)

# Initialize and fit vector search index
evidently_vindex = VectorSearch()
evidently_vindex.fit(evidently_embeddings, evidently_chunks)

print("Vector indexing complete!")


Creating embeddings for semantic search...
Loading embedding model...
Encoding document chunks...


Creating embeddings: 100%|████████████████████████████████████████| 575/575 [05:16<00:00,  1.82it/s]

Vector indexing complete!





In [7]:
openai_client = OpenAI(
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY")
)

# Test the connection
try:
    test_response = openai_client.chat.completions.create(
        model="deepseek/deepseek-r1:free",
        messages=[{"role": "user", "content": "Say 'Connection successful' if you can read this."}],
        max_tokens=50
    )
    print(f"OpenRouter Connection Test: {test_response.choices[0].message.content}")
except Exception as e:
    print(f"Connection Error: {e}")
    print("Make sure you have OPENROUTER_API_KEY in your .env file")


Connection Error: Error code: 429 - {'error': {'message': 'Provider returned error', 'code': 429, 'metadata': {'raw': 'deepseek/deepseek-r1:free is temporarily rate-limited upstream. Please retry shortly, or add your own key to accumulate your rate limits: https://openrouter.ai/settings/integrations', 'provider_name': 'Chutes'}}, 'user_id': 'user_33P07DTlgylC1rY1SOCrqh6snaU'}
Make sure you have OPENROUTER_API_KEY in your .env file


In [8]:
def text_search(query: str) -> List[Any]:
    """
    Perform a text-based search on the FAQ index.
    """
    results = index.search(query, num_results=5)
    print(f"Text search found {len(results)} results for query: '{query}'")
    return results

def vector_search(query: str) -> List[Any]:
    """
    Perform semantic vector-based search.
    """
    # Encode the query into a vector using the embedding model
    q = embedding_model.encode(query)
    # Search the vector index for the top 5 most similar chunks
    results = evidently_vindex.search(q, num_results=5)
    print(f"Vector search found {len(results)} results for query: '{query}'")
    return results

def hybrid_search(query: str, alpha: float = 0.5, num_results: int = 5) -> List[Any]:
    """
    Perform hybrid search combining text and vector search.
    
    Args:
        query: Search query
        alpha: Weight for text search (1-alpha for vector search)
        num_results: Number of results to return
    """
    # Get results from both search methods
    text_results = index.search(query, num_results=num_results*2)
    
    q_embedding = embedding_model.encode(query)
    vector_results = evidently_vindex.search(q_embedding, num_results=num_results*2)
    
    # Create a scoring dictionary
    doc_scores = {}
    
    # Add text search scores
    for i, doc in enumerate(text_results):
        doc_key = doc.get('filename', '') + str(doc.get('start', 0))
        # Use inverse rank as score
        text_score = 1.0 / (i + 1)
        doc_scores[doc_key] = {
            'doc': doc,
            'text_score': text_score * alpha,
            'vector_score': 0
        }
    
    # Add vector search scores
    for doc in vector_results:
        doc_key = doc.get('filename', '') + str(doc.get('start', 0))
        vector_score = doc.get('similarity_score', 0) * (1 - alpha)
        
        if doc_key in doc_scores:
            doc_scores[doc_key]['vector_score'] = vector_score
        else:
            doc_scores[doc_key] = {
                'doc': doc,
                'text_score': 0,
                'vector_score': vector_score
            }
    
    # Calculate combined scores
    for key in doc_scores:
        doc_scores[key]['combined_score'] = doc_scores[key]['text_score'] + doc_scores[key]['vector_score']
    
    # Sort by combined score and return top results
    sorted_docs = sorted(doc_scores.values(), key=lambda x: x['combined_score'], reverse=True)
    results = [item['doc'] for item in sorted_docs[:num_results]]
    
    print(f"Hybrid search found {len(results)} results for query: '{query}'")
    return results

# Test all search methods
test_query = "test dataset"
print("\n Testing search methods:")
text_results = text_search(test_query)
vector_results = vector_search(test_query)
hybrid_results = hybrid_search(test_query)



 Testing search methods:
Text search found 5 results for query: 'test dataset'
Vector search found 5 results for query: 'test dataset'
Hybrid search found 5 results for query: 'test dataset'


In [9]:
def answer_question_manual(question: str, search_method: str = 'hybrid') -> str:
    """
    Answer a question by searching and then using the LLM.
    
    Args:
        question: The question to answer
        search_method: 'text', 'vector', or 'hybrid'
    """
    # Select search method
    if search_method == 'text':
        search_results = text_search(question)
    elif search_method == 'vector':
        search_results = vector_search(question)
    else:  # hybrid
        search_results = hybrid_search(question)

    # Format the search results as context
    context = "\n\n---\n\n".join([
        f"Result {i+1} (from {result.get('filename', 'unknown')}):\n{result.get('chunk', '')}"
        for i, result in enumerate(search_results)
    ])

    # Create the prompt with the context
    prompt = f"""You are an expert assistant that answers questions about the Evidently project 
(https://github.com/evidentlyai/evidently) using ONLY the information provided in the context below.

Context from Evidently documentation:
{context}

User question: {question}

Instructions:
- Answer based ONLY on the provided context
- Be concise and clear
- If the answer is not in the context, say "I could not find this information in the Evidently documentation"
- Do not invent features or functionality

Answer:"""

    try:
        response = openai_client.chat.completions.create(
            model="deepseek/deepseek-r1:free",
            messages=[
                {"role": "system", "content": "You are a helpful assistant that answers questions about Evidently."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.3,
            max_tokens=1000
        )
        return response.choices[0].message.content
    except Exception as e:
        return f"Error getting answer: {e}"

# Test question answering
question = "What components are required in a test dataset to evaluate AI?"
print(f"\n❓ Question: {question}")
print("Thinking (using hybrid search + LLM approach)...")
answer = answer_question_manual(question, search_method='hybrid')
print(f"\n Answer:\n{answer}")



❓ Question: What components are required in a test dataset to evaluate AI?
Thinking (using hybrid search + LLM approach)...
Hybrid search found 5 results for query: 'What components are required in a test dataset to evaluate AI?'

 Answer:
To evaluate AI systems like a RAG using Evidently, a test dataset should include:
- **User-like questions** generated from your knowledge base
- Corresponding **ground truth answers** extracted from the data source
- (Optional) **Context snippets** used to generate the answers

These components are automatically created when generating RAG test datasets via Evidently UI by uploading your knowledge base files. The system produces question-answer pairs that reflect what your AI *should* know, serving as validation benchmarks.


In [10]:
LOG_DIR = Path(os.getenv('LOGS_DIRECTORY', 'logs'))
LOG_DIR.mkdir(exist_ok=True)

def log_entry(agent, messages, source="user"):
    tools = []
    for ts in agent.toolsets:
        tools.extend(ts.tools.keys())
    dict_messages = ModelMessagesTypeAdapter.dump_python(messages)
    return {
        "agent_name": agent.name,
        "system_prompt": agent._instructions,
        "provider": agent.model.system,
        "model": agent.model.model_name,
        "tools": tools,
        "messages": dict_messages,
        "source": source
    }

def serializer(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()
    raise TypeError(f"Type {type(obj)} not serializable")

def log_interaction_to_file(agent, messages, source='user'):
    entry = log_entry(agent, messages, source)
    ts = entry['messages'][-1]['timestamp']
    ts_str = ts.strftime("%Y%m%d_%H%M%S")
    rand_hex = secrets.token_hex(3)
    filename = f"{agent.name}_{ts_str}_{rand_hex}.json"
    filepath = LOG_DIR / filename
    with filepath.open("w", encoding="utf-8") as f_out:
        json.dump(entry, f_out, indent=2, default=serializer)
    return filepath


In [11]:
def evaluate_search_quality(
    search_function, 
    test_queries: List[Tuple[str, List[str]]], 
    num_results: int = 5,
    log_results: bool = True
) -> Dict[str, Any]:
    """
    Evaluate search quality using hit rate and MRR metrics.
    """
    results = []
    timestamp = datetime.now()

    print(f"Starting evaluation with {len(test_queries)} test queries...")

    for idx, (query, expected_docs) in enumerate(test_queries, 1):
        print(f"  Query {idx}/{len(test_queries)}: '{query[:50]}...'")

        try:
            # Execute search
            if hasattr(search_function, '__self__'):  # Method
                search_results = search_function(query, num_results=num_results)
            else:  # Function
                search_results = search_function(query)[:num_results]

            # Extract filenames from results
            retrieved_docs = [doc.get('filename', '') for doc in search_results]

            # Calculate hit rate (binary: found at least one relevant doc)
            relevant_found = any(doc in expected_docs for doc in retrieved_docs)

            # Calculate MRR (Mean Reciprocal Rank)
            mrr = 0
            first_relevant_rank = None
            for i, doc in enumerate(retrieved_docs):
                if doc in expected_docs:
                    mrr = 1 / (i + 1)
                    first_relevant_rank = i + 1
                    break

            # Calculate Precision@k
            relevant_in_results = sum(1 for doc in retrieved_docs if doc in expected_docs)
            precision_at_k = relevant_in_results / len(retrieved_docs) if retrieved_docs else 0

            # Store detailed result
            result = {
                'query': query,
                'expected_docs': expected_docs,
                'retrieved_docs': retrieved_docs,
                'hit': relevant_found,
                'mrr': mrr,
                'precision_at_k': precision_at_k,
                'first_relevant_rank': first_relevant_rank,
                'num_relevant_found': relevant_in_results
            }
            results.append(result)

        except Exception as e:
            print(f"Error processing query: {e}")
            results.append({
                'query': query,
                'expected_docs': expected_docs,
                'retrieved_docs': [],
                'hit': False,
                'mrr': 0,
                'precision_at_k': 0,
                'first_relevant_rank': None,
                'num_relevant_found': 0,
                'error': str(e)
            })

    # Calculate aggregate metrics
    hit_rate = sum(r['hit'] for r in results) / len(results) if results else 0
    avg_mrr = sum(r['mrr'] for r in results) / len(results) if results else 0
    avg_precision = sum(r['precision_at_k'] for r in results) / len(results) if results else 0

    evaluation_summary = {
        'timestamp': timestamp.isoformat(),
        'num_queries': len(test_queries),
        'num_results_per_query': num_results,
        'metrics': {
            'hit_rate': hit_rate,
            'mean_reciprocal_rank': avg_mrr,
            'mean_precision_at_k': avg_precision
        },
        'detailed_results': results
    }

    # Log results if requested
    if log_results:
        log_evaluation_results(evaluation_summary)

    return evaluation_summary

def log_evaluation_results(evaluation_data: Dict[str, Any]) -> Path:
    """
    Log evaluation results to a JSON file.
    """
    ts = datetime.fromisoformat(evaluation_data['timestamp'])
    ts_str = ts.strftime("%Y%m%d_%H%M%S")
    rand_hex = secrets.token_hex(3)
    filename = f"search_evaluation_{ts_str}_{rand_hex}.json"
    filepath = LOG_DIR / filename
    with filepath.open("w", encoding="utf-8") as f_out:
        json.dump(evaluation_data, f_out, indent=2, default=str)
    print(f"📝 Evaluation results saved to: {filepath}")
    return filepath

def print_evaluation_report(evaluation_summary: Dict[str, Any]):
    """
    Print a formatted evaluation report.
    """
    print("\n" + "="*60)
    print("📊 EVALUATION REPORT")
    print("="*60)
    print(f"Timestamp: {evaluation_summary['timestamp']}")
    print(f"Number of queries: {evaluation_summary['num_queries']}")
    print(f"Results per query: {evaluation_summary['num_results_per_query']}")

    print("\n📈 AGGREGATE METRICS:")
    metrics = evaluation_summary['metrics']
    print(f"  • Hit Rate: {metrics['hit_rate']:.2%}")
    print(f"  • Mean Reciprocal Rank (MRR): {metrics['mean_reciprocal_rank']:.3f}")
    print(f"  • Mean Precision@{evaluation_summary['num_results_per_query']}: {metrics['mean_precision_at_k']:.2%}")

    print("\n🔍 QUERY-LEVEL RESULTS:")
    for i, result in enumerate(evaluation_summary['detailed_results'][:5], 1):
        print(f"\n  Query {i}: \"{result['query'][:50]}...\"")
        print(f"    • Hit: {'✅' if result['hit'] else '❌'}")
        print(f"    • MRR: {result['mrr']:.3f}")
        print(f"    • Precision: {result['precision_at_k']:.2%}")
        if result['first_relevant_rank']:
            print(f"    • First relevant at rank: {result['first_relevant_rank']}")

    if len(evaluation_summary['detailed_results']) > 5:
        print(f"\n  ... and {len(evaluation_summary['detailed_results']) - 5} more queries")

    print("\n" + "="*60)

In [12]:
test_queries = [
    # Testing and evaluation
    ("What components are required in a test dataset to evaluate AI?", 
     ["docs-main/docs/library/tests.mdx", 
      "docs-main/examples/LLM_regression_testing.mdx"]),

    ("How to run evaluations in Evidently?", 
     ["docs-main/docs/library/evaluations_overview.mdx",
      "docs-main/docs/library/tests.mdx"]),

    # Data and metrics
    ("Understanding data definition and descriptors",
     ["docs-main/docs/library/data_definition.mdx",
      "docs-main/docs/library/descriptors.mdx"])
]

In [13]:
def compare_search_methods(test_queries):
    """
    Compare text, vector, and hybrid search performance.
    """
    print("\n" + "="*60)
    print(" COMPARING SEARCH METHODS")
    print("="*60)
    
    methods = {
        'text': lambda q: text_search(q),
        'vector': lambda q: vector_search(q),
        'hybrid': lambda q: hybrid_search(q, alpha=0.5)
    }
    
    comparison_results = {}
    
    for method_name, search_fn in methods.items():
        print(f"\n📊 Evaluating {method_name.upper()} search...")
        
        eval_results = evaluate_search_quality(
            search_function=search_fn,
            test_queries=test_queries,
            num_results=5,
            log_results=False  # Don't log intermediate results
        )
        
        metrics = eval_results['metrics']
        comparison_results[method_name] = {
            'hit_rate': metrics['hit_rate'],
            'mrr': metrics['mean_reciprocal_rank'],
            'precision': metrics['mean_precision_at_k'],
            'detailed_results': eval_results['detailed_results']
        }
        
        print(f"  • Hit Rate: {metrics['hit_rate']:.2%}")
        print(f"  • MRR: {metrics['mean_reciprocal_rank']:.3f}")
        print(f"  • Precision@5: {metrics['mean_precision_at_k']:.2%}")
    
    # Find best method for each metric
    print("\n BEST METHODS:")
    print(f"  • Best Hit Rate: {max(comparison_results.items(), key=lambda x: x[1]['hit_rate'])[0].upper()}")
    print(f"  • Best MRR: {max(comparison_results.items(), key=lambda x: x[1]['mrr'])[0].upper()}")
    print(f"  • Best Precision: {max(comparison_results.items(), key=lambda x: x[1]['precision'])[0].upper()}")
    
    # Show improvement from text to hybrid
    if 'text' in comparison_results and 'hybrid' in comparison_results:
        hit_rate_improvement = comparison_results['hybrid']['hit_rate'] - comparison_results['text']['hit_rate']
        mrr_improvement = comparison_results['hybrid']['mrr'] - comparison_results['text']['mrr']
        
        print(f"\n📈 Hybrid vs Text Search Improvement:")
        print(f"  • Hit Rate: {hit_rate_improvement:+.1%}")
        print(f"  • MRR: {mrr_improvement:+.3f}")
    
    return comparison_results

# Run comparison
comparison = compare_search_methods(test_queries)




 COMPARING SEARCH METHODS

📊 Evaluating TEXT search...
Starting evaluation with 3 test queries...
  Query 1/3: 'What components are required in a test dataset to ...'
Text search found 5 results for query: 'What components are required in a test dataset to evaluate AI?'
  Query 2/3: 'How to run evaluations in Evidently?...'
Text search found 5 results for query: 'How to run evaluations in Evidently?'
  Query 3/3: 'Understanding data definition and descriptors...'
Text search found 5 results for query: 'Understanding data definition and descriptors'
  • Hit Rate: 33.33%
  • MRR: 0.333
  • Precision@5: 33.33%

📊 Evaluating VECTOR search...
Starting evaluation with 3 test queries...
  Query 1/3: 'What components are required in a test dataset to ...'
Vector search found 5 results for query: 'What components are required in a test dataset to evaluate AI?'
  Query 2/3: 'How to run evaluations in Evidently?...'
Vector search found 5 results for query: 'How to run evaluations in Evidently?'


In [14]:

def optimize_hybrid_alpha(test_queries, alphas=[0.0, 0.25, 0.5, 0.75, 1.0]):
    """
    Find the optimal alpha value for hybrid search.
    """
    print("\n" + "="*60)
    print(" OPTIMIZING HYBRID SEARCH ALPHA")
    print("="*60)
    
    results = []
    
    for alpha in alphas:
        print(f"\n🔧 Testing alpha={alpha:.2f}")
        
        hybrid_fn = lambda q: hybrid_search(q, alpha=alpha)
        
        eval_results = evaluate_search_quality(
            search_function=hybrid_fn,
            test_queries=test_queries,
            num_results=5,
            log_results=False
        )
        
        metrics = eval_results['metrics']
        results.append({
            'alpha': alpha,
            'hit_rate': metrics['hit_rate'],
            'mrr': metrics['mean_reciprocal_rank'],
            'precision': metrics['mean_precision_at_k']
        })
        
        print(f"  • Hit Rate: {metrics['hit_rate']:.2%}")
        print(f"  • MRR: {metrics['mean_reciprocal_rank']:.3f}")
    
    # Find optimal alpha
    best_by_mrr = max(results, key=lambda x: x['mrr'])
    best_by_hit_rate = max(results, key=lambda x: x['hit_rate'])
    
    print(f"\n🎯 OPTIMAL ALPHA VALUES:")
    print(f"  • Best for MRR: α={best_by_mrr['alpha']:.2f} (MRR={best_by_mrr['mrr']:.3f})")
    print(f"  • Best for Hit Rate: α={best_by_hit_rate['alpha']:.2f} (Hit Rate={best_by_hit_rate['hit_rate']:.2%})")
    
    return results

# Optimize alpha
alpha_results = optimize_hybrid_alpha(test_queries)



 OPTIMIZING HYBRID SEARCH ALPHA

🔧 Testing alpha=0.00
Starting evaluation with 3 test queries...
  Query 1/3: 'What components are required in a test dataset to ...'
Hybrid search found 5 results for query: 'What components are required in a test dataset to evaluate AI?'
  Query 2/3: 'How to run evaluations in Evidently?...'
Hybrid search found 5 results for query: 'How to run evaluations in Evidently?'
  Query 3/3: 'Understanding data definition and descriptors...'
Hybrid search found 5 results for query: 'Understanding data definition and descriptors'
  • Hit Rate: 100.00%
  • MRR: 0.567

🔧 Testing alpha=0.25
Starting evaluation with 3 test queries...
  Query 1/3: 'What components are required in a test dataset to ...'
Hybrid search found 5 results for query: 'What components are required in a test dataset to evaluate AI?'
  Query 2/3: 'How to run evaluations in Evidently?...'
Hybrid search found 5 results for query: 'How to run evaluations in Evidently?'
  Query 3/3: 'Understanding

In [15]:
print("\n" + "="*60)
print("🚀 FINAL EVALUATION WITH OPTIMIZED HYBRID SEARCH")
print("="*60)

# Use the best alpha from optimization (you can adjust based on results)
best_alpha = 0.5  # Adjust based on optimization results

final_search = lambda q: hybrid_search(q, alpha=best_alpha)

final_evaluation = evaluate_search_quality(
    search_function=final_search,
    test_queries=test_queries,
    num_results=5,
    log_results=True
)

print_evaluation_report(final_evaluation)




🚀 FINAL EVALUATION WITH OPTIMIZED HYBRID SEARCH
Starting evaluation with 3 test queries...
  Query 1/3: 'What components are required in a test dataset to ...'
Hybrid search found 5 results for query: 'What components are required in a test dataset to evaluate AI?'
  Query 2/3: 'How to run evaluations in Evidently?...'
Hybrid search found 5 results for query: 'How to run evaluations in Evidently?'
  Query 3/3: 'Understanding data definition and descriptors...'
Hybrid search found 5 results for query: 'Understanding data definition and descriptors'
📝 Evaluation results saved to: logs/search_evaluation_20251003_051943_a2ff82.json

📊 EVALUATION REPORT
Timestamp: 2025-10-03T05:19:43.380922
Number of queries: 3
Results per query: 5

📈 AGGREGATE METRICS:
  • Hit Rate: 66.67%
  • Mean Reciprocal Rank (MRR): 0.500
  • Mean Precision@5: 33.33%

🔍 QUERY-LEVEL RESULTS:

  Query 1: "What components are required in a test dataset to ..."
    • Hit: ✅
    • MRR: 0.500
    • Precision: 40.00%
    • 

In [16]:

from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Any
import json
from datetime import datetime
import pandas as pd

@dataclass
class LLMEvaluationResult:
    """Store LLM-based evaluation results"""
    query: str
    response: str
    criteria: str
    score: float
    reasoning: str
    timestamp: str = None
    metadata: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now().isoformat()

class LLMEvaluator:
    """
    LLM-based evaluator for agent responses using OpenRouter/DeepSeek
    """
    
    def __init__(self, client=None, model="deepseek/deepseek-r1:free"):
        """
        Initialize with existing OpenRouter client
        
        Args:
            client: OpenAI client configured for OpenRouter
            model: Model to use for evaluation
        """
        self.client = client if client else openai_client  # Use existing client
        self.model = model
        self.evaluation_history = []
    
    def evaluate_response(self,
                         query: str,
                         response: str,
                         criteria: str = "overall_quality",
                         reference: Optional[str] = None,
                         context: Optional[str] = None,
                         max_retries: int = 3) -> LLMEvaluationResult:
        """
        Evaluate a single agent response using LLM with retry logic
        
        Args:
            query: The original user query
            response: The agent's response
            criteria: Evaluation criteria to use
            reference: Optional reference answer for comparison
            context: Optional context/documents used by agent
            max_retries: Maximum number of retry attempts for rate limits
        """
        
        prompt = self._build_evaluation_prompt(
            query, response, criteria, reference, context
        )
        
        import time
        
        for attempt in range(max_retries + 1):
            try:
                llm_response = self.client.chat.completions.create(
                    model=self.model,
                    messages=[
                        {"role": "system", "content": "You are an expert evaluator for AI agent responses. Provide scores from 1-5 with detailed reasoning."},
                        {"role": "user", "content": prompt}
                    ],
                    temperature=0.0,  # Deterministic for consistency
                    max_tokens=500
                )
                
                # Parse the response
                result_text = llm_response.choices[0].message.content
                parsed = self._parse_evaluation(result_text)
                
                eval_result = LLMEvaluationResult(
                    query=query,
                    response=response,
                    criteria=criteria,
                    score=parsed['score'],
                    reasoning=parsed['reasoning'],
                    metadata={'model_used': self.model, 'attempts': attempt + 1}
                )
                
                self.evaluation_history.append(eval_result)
                return eval_result
                
            except Exception as e:
                error_str = str(e)
                
                # Check if it's a rate limit error
                if '429' in error_str or 'rate limit' in error_str.lower():
                    if attempt < max_retries:
                        # Extract wait time if provided, otherwise use exponential backoff
                        wait_time = 2 ** (attempt + 1)  # 2, 4, 8 seconds
                        
                        # Try to extract reset time from error if available
                        if 'X-RateLimit-Reset' in error_str:
                            try:
                                import re
                                reset_match = re.search(r'X-RateLimit-Reset.*?(\d+)', error_str)
                                if reset_match:
                                    reset_time = int(reset_match.group(1)) / 1000  # Convert from ms
                                    current_time = time.time()
                                    wait_time = max(1, min(reset_time - current_time, 60))
                            except:
                                pass
                        
                        print(f"  ⏳ Rate limit hit. Waiting {wait_time:.1f}s before retry {attempt + 1}/{max_retries}...")
                        time.sleep(wait_time)
                        continue
                    else:
                        print(f"  ❌ Rate limit exceeded after {max_retries} retries")
                        return LLMEvaluationResult(
                            query=query,
                            response=response,
                            criteria=criteria,
                            score=3,  # Default neutral score
                            reasoning=f"Could not evaluate due to rate limits after {max_retries} retries"
                        )
                else:
                    # Other errors - don't retry
                    print(f"Evaluation error: {e}")
                    return LLMEvaluationResult(
                        query=query,
                        response=response,
                        criteria=criteria,
                        score=0,
                        reasoning=f"Error during evaluation: {str(e)}"
                    )
    
    def _build_evaluation_prompt(self, 
                                query: str, 
                                response: str,
                                criteria: str,
                                reference: Optional[str] = None,
                                context: Optional[str] = None) -> str:
        """Build evaluation prompt based on criteria"""
        
        base_prompt = f"""Evaluate the following AI agent response.

User Query: {query}

Agent Response: {response}

{f"Reference Answer: {reference}" if reference else ""}
{f"Context Used: {context[:500]}..." if context else ""}

Evaluation Criteria: {criteria}
"""
        
        criteria_instructions = {
            "overall_quality": """
Rate the overall quality considering:
1. Relevance - Does it address the query?
2. Accuracy - Is the information correct?
3. Completeness - Does it fully answer the question?
4. Clarity - Is it well-structured and clear?
5. Usefulness - Would this help the user?

Provide a score from 1-5 and explain your reasoning.
Format your response as:
SCORE: [1-5]
REASONING: [Your detailed explanation]""",
            
            "relevance": """
Rate how relevant the response is to the query (1-5).
1 = Completely off-topic
2 = Mostly irrelevant
3 = Partially relevant
4 = Mostly relevant
5 = Perfectly relevant

Format: SCORE: [1-5] REASONING: [explanation]""",
            
            "accuracy": """
Rate the factual accuracy (1-5).
1 = Contains major errors
2 = Several inaccuracies
3 = Mostly accurate
4 = Very accurate
5 = Completely accurate

Format: SCORE: [1-5] REASONING: [explanation]""",
            
            "coherence": """
Rate the coherence and clarity (1-5).
1 = Incoherent/confusing
2 = Poor structure
3 = Acceptable clarity
4 = Well-structured
5 = Excellent clarity

Format: SCORE: [1-5] REASONING: [explanation]""",
            
            "faithfulness": """
Rate how faithful the response is to the provided context (1-5).
1 = Contradicts context
2 = Mostly unsupported
3 = Partially supported
4 = Well-supported
5 = Fully grounded in context

Format: SCORE: [1-5] REASONING: [explanation]"""
        }
        
        return base_prompt + criteria_instructions.get(
            criteria, 
            criteria_instructions["overall_quality"]
        )
    
    def _parse_evaluation(self, response_text: str) -> Dict:
        """Parse LLM evaluation response"""
        
        try:
            lines = response_text.strip().split('\n')
            score = 3  # Default
            reasoning = ""
            
            for i, line in enumerate(lines):
                if 'SCORE:' in line.upper():
                    # Extract numeric score
                    import re
                    numbers = re.findall(r'\d', line)
                    if numbers:
                        score = int(numbers[0])
                        score = min(max(score, 1), 5)
                elif 'REASONING:' in line.upper():
                    # Get all text after REASONING:
                    reasoning_start = line.upper().index('REASONING:') + 10
                    reasoning = line[reasoning_start:].strip()
                    # Add any subsequent lines
                    if i < len(lines) - 1:
                        reasoning += ' ' + ' '.join(lines[i+1:])
                    break
            
            return {
                'score': score,
                'reasoning': reasoning if reasoning else response_text
            }
            
        except Exception as e:
            return {
                'score': 3,
                'reasoning': response_text[:200]
            }

# %%
class AgentResponseEvaluator:
    """
    Comprehensive evaluation suite for agent responses
    """
    
    def __init__(self, search_function, answer_function, llm_evaluator=None):
        """
        Initialize with search and answer functions
        
        Args:
            search_function: Function to search for context
            answer_function: Function to generate answers
            llm_evaluator: LLMEvaluator instance
        """
        self.search_function = search_function
        self.answer_function = answer_function
        self.llm_evaluator = llm_evaluator or LLMEvaluator()
        self.results = []
    
    def evaluate_end_to_end(self,
                           test_cases: List[Dict[str, str]],
                           criteria: List[str] = None,
                           batch_delay: float = 1.0) -> pd.DataFrame:
        """
        Evaluate complete RAG pipeline with rate limiting management
        
        Args:
            test_cases: List of dicts with 'query' and optionally 'reference'
            criteria: Evaluation criteria to use
            batch_delay: Delay between evaluations to avoid rate limits
        """
        
        if criteria is None:
            criteria = ["overall_quality", "relevance", "accuracy", "faithfulness"]
        
        results = []
        import time
        
        for i, test_case in enumerate(test_cases):
            print(f"\nEvaluating case {i+1}/{len(test_cases)}")
            query = test_case['query']
            reference = test_case.get('reference', None)
            
            # Get search results
            print(f"  Searching for: '{query[:50]}...'")
            search_results = self.search_function(query)
            
            # Generate answer
            print("  Generating answer...")
            answer = self.answer_function(query, 'hybrid')
            
            # Format context from search results
            context = "\n".join([
                f"{r.get('chunk', '')[:200]}..." 
                for r in search_results[:3]
            ])
            
            # Evaluate with each criteria
            case_result = {
                'case_id': i,
                'query': query[:100],
                'response': answer[:200],
                'has_reference': reference is not None
            }
            
            for j, criterion in enumerate(criteria):
                print(f"  Evaluating {criterion}...")
                
                # Add delay between API calls to avoid rate limits
                if j > 0:
                    time.sleep(batch_delay)
                
                eval_result = self.llm_evaluator.evaluate_response(
                    query=query,
                    response=answer,
                    criteria=criterion,
                    reference=reference,
                    context=context,
                    max_retries=3  # Use retry logic
                )
                
                case_result[f'{criterion}_score'] = eval_result.score
                case_result[f'{criterion}_reasoning'] = eval_result.reasoning[:100]
            
            # Calculate average score
            score_cols = [k for k in case_result.keys() if k.endswith('_score')]
            case_result['avg_score'] = np.mean([case_result[col] for col in score_cols])
            
            results.append(case_result)
            self.results = results
            
            # Add delay between test cases
            if i < len(test_cases) - 1:
                print(f"  ⏱️ Waiting {batch_delay}s before next case...")
                time.sleep(batch_delay)
        
        return pd.DataFrame(results)
    
    def print_evaluation_summary(self, df: pd.DataFrame):
        """Print a formatted summary of evaluation results"""
        
        print("\n" + "="*60)
        print("📊 LLM EVALUATION SUMMARY")
        print("="*60)
        
        # Overall statistics
        print(f"\nTotal test cases evaluated: {len(df)}")
        print(f"Average overall score: {df['avg_score'].mean():.2f}/5.0")
        print(f"Score standard deviation: {df['avg_score'].std():.2f}")
        
        # Score breakdown by criteria
        score_cols = [col for col in df.columns if col.endswith('_score') and col != 'avg_score']
        
        print("\n📈 SCORES BY CRITERIA:")
        for col in score_cols:
            criteria_name = col.replace('_score', '').replace('_', ' ').title()
            mean_score = df[col].mean()
            print(f"  • {criteria_name}: {mean_score:.2f}")
        
        # Best and worst performing cases
        if len(df) > 0:
            best_idx = df['avg_score'].idxmax()
            worst_idx = df['avg_score'].idxmin()
            
            print("\n🏆 BEST PERFORMING QUERY:")
            print(f"  Query: {df.loc[best_idx, 'query']}")
            print(f"  Score: {df.loc[best_idx, 'avg_score']:.2f}")
            
            print("\n⚠️ NEEDS IMPROVEMENT:")
            print(f"  Query: {df.loc[worst_idx, 'query']}")
            print(f"  Score: {df.loc[worst_idx, 'avg_score']:.2f}")
        
        # Distribution of scores
        print("\n📊 SCORE DISTRIBUTION:")
        score_ranges = {
            'Excellent (4.5-5.0)': ((df['avg_score'] >= 4.5) & (df['avg_score'] <= 5.0)).sum(),
            'Good (3.5-4.5)': ((df['avg_score'] >= 3.5) & (df['avg_score'] < 4.5)).sum(),
            'Fair (2.5-3.5)': ((df['avg_score'] >= 2.5) & (df['avg_score'] < 3.5)).sum(),
            'Poor (1.0-2.5)': ((df['avg_score'] >= 1.0) & (df['avg_score'] < 2.5)).sum()
        }
        
        for range_name, count in score_ranges.items():
            percentage = (count / len(df) * 100) if len(df) > 0 else 0
            print(f"  • {range_name}: {count} ({percentage:.1f}%)")
        
        print("\n" + "="*60)
    
    def save_evaluation_results(self, df: pd.DataFrame, filename_prefix: str = "llm_eval") -> Path:
        """Save LLM evaluation results to file"""
        
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        rand_hex = secrets.token_hex(3)
        filename = f"{filename_prefix}_{timestamp}_{rand_hex}.json"
        filepath = LOG_DIR / filename
        
        # Convert DataFrame to dict for JSON serialization
        results_dict = {
            'timestamp': datetime.now().isoformat(),
            'summary': {
                'total_cases': len(df),
                'avg_score': df['avg_score'].mean() if 'avg_score' in df.columns else 0,
                'criteria_scores': {}
            },
            'detailed_results': df.to_dict('records')
        }
        
        # Add criteria scores to summary
        score_cols = [col for col in df.columns if col.endswith('_score') and col != 'avg_score']
        for col in score_cols:
            criteria_name = col.replace('_score', '')
            results_dict['summary']['criteria_scores'][criteria_name] = df[col].mean()
        
        with filepath.open("w", encoding="utf-8") as f:
            json.dump(results_dict, f, indent=2, default=str)
        
        print(f"📁 Results saved to: {filepath}")
        return filepath

# %% [markdown]
# ## Rate Limit Management for Free API Tier

# %%
class RateLimitManager:
    """
    Manage rate limits for free API tier
    """
    
    def __init__(self, requests_per_minute: int = 16):
        """
        Initialize rate limit manager
        
        Args:
            requests_per_minute: API limit for free tier (default 16 for DeepSeek free)
        """
        self.requests_per_minute = requests_per_minute
        self.request_times = []
        import time
        self.time = time
    
    def wait_if_needed(self):
        """
        Wait if necessary to avoid rate limits
        """
        current_time = self.time.time()
        
        # Remove requests older than 1 minute
        self.request_times = [t for t in self.request_times if current_time - t < 60]
        
        # If we've hit the limit, wait
        if len(self.request_times) >= self.requests_per_minute:
            oldest_request = min(self.request_times)
            wait_time = 61 - (current_time - oldest_request)
            if wait_time > 0:
                print(f"  ⏳ Rate limit approaching. Waiting {wait_time:.1f}s...")
                self.time.sleep(wait_time)
                # Clean up old requests after waiting
                current_time = self.time.time()
                self.request_times = [t for t in self.request_times if current_time - t < 60]
    
    def record_request(self):
        """
        Record a request time
        """
        self.request_times.append(self.time.time())

# Initialize global rate limiter for the notebook
rate_limiter = RateLimitManager(requests_per_minute=16)

# Modified LLM evaluator with rate limiting
class LLMEvaluatorWithRateLimit(LLMEvaluator):
    """
    LLM Evaluator with built-in rate limiting for free tier
    """
    
    def __init__(self, client=None, model="deepseek/deepseek-r1:free", rate_limiter=None):
        super().__init__(client, model)
        self.rate_limiter = rate_limiter or RateLimitManager()
    
    def evaluate_response(self,
                         query: str,
                         response: str,
                         criteria: str = "overall_quality",
                         reference: Optional[str] = None,
                         context: Optional[str] = None,
                         max_retries: int = 3) -> LLMEvaluationResult:
        """
        Evaluate with automatic rate limiting
        """
        # Wait if necessary before making request
        self.rate_limiter.wait_if_needed()
        
        # Call parent method with retry logic
        result = super().evaluate_response(
            query, response, criteria, reference, context, max_retries
        )
        
        # Record successful request
        if result.score > 0:  # Only count successful requests
            self.rate_limiter.record_request()
        
        return result

# %% [markdown]
# ## Example Usage: Evaluating Your Agent Responses with LLM

# %%
# Initialize the LLM evaluator with rate limiting
llm_evaluator = LLMEvaluatorWithRateLimit(
    client=openai_client,
    rate_limiter=rate_limiter
)

# Test cases for LLM evaluation - start small to test rate limiting
llm_test_cases = [
    {
        'query': "What components are required in a test dataset to evaluate AI?",
        'reference': "A test dataset should include questions, expected answers, and optionally context."
    },
    {
        'query': "How to detect data drift in Evidently?",
        'reference': None  # No reference answer
    }
]

# Initialize the end-to-end evaluator with rate limiting
agent_evaluator = AgentResponseEvaluator(
    search_function=hybrid_search,
    answer_function=answer_question_manual,
    llm_evaluator=llm_evaluator
)

# Run evaluation with automatic rate limit handling
print("Starting LLM-based evaluation with rate limit management...")
print(f"Rate limit: {rate_limiter.requests_per_minute} requests per minute")

evaluation_df = agent_evaluator.evaluate_end_to_end(
    test_cases=llm_test_cases[:1],  # Start with just 2 cases to test
    criteria=["overall_quality", "relevance"],  # Fewer criteria to avoid hitting limits
    batch_delay=4.0  # 4 second delay between requests (safe for 16 req/min limit)
)

# Display results
agent_evaluator.print_evaluation_summary(evaluation_df)

# Save results
filepath = agent_evaluator.save_evaluation_results(evaluation_df)

# %% [markdown]
# ## Comparing Search Methods with LLM Evaluation

# %%
def compare_methods_with_llm(test_queries_subset):
    """
    Compare different search methods using LLM evaluation
    """
    print("\n" + "="*60)
    print("🔍 COMPARING SEARCH METHODS WITH LLM EVALUATION")
    print("="*60)
    
    methods = {
        'text': lambda q: answer_question_manual(q, 'text'),
        'vector': lambda q: answer_question_manual(q, 'vector'),
        'hybrid': lambda q: answer_question_manual(q, 'hybrid')
    }
    
    llm_eval = LLMEvaluator()
    comparison_results = []
    
    for query_text in test_queries_subset:
        print(f"\nQuery: '{query_text[:50]}...'")
        
        for method_name, answer_fn in methods.items():
            print(f"  Testing {method_name} search...")
            
            # Generate answer
            answer = answer_fn(query_text)
            
            # Evaluate with LLM
            eval_result = llm_eval.evaluate_response(
                query=query_text,
                response=answer,
                criteria="overall_quality"
            )
            
            comparison_results.append({
                'query': query_text[:50],
                'method': method_name,
                'score': eval_result.score,
                'reasoning': eval_result.reasoning[:100]
            })
    
    # Convert to DataFrame and show summary
    comparison_df = pd.DataFrame(comparison_results)
    
    print("\n📊 METHOD COMPARISON RESULTS:")
    method_avg = comparison_df.groupby('method')['score'].mean().sort_values(ascending=False)
    for method, avg_score in method_avg.items():
        print(f"  • {method.upper()}: {avg_score:.2f}")
    
    return comparison_df

# Example: Compare methods on a subset of queries
test_subset = [
    "What components are required in a test dataset to evaluate AI?",
    "How to run evaluations in Evidently?",
    "Understanding data definition and descriptors"
]

comparison_df = compare_methods_with_llm(test_subset)

# %% [markdown]
# ## Batch LLM Evaluation with Logging

# %%
def run_comprehensive_llm_evaluation():
    """
    Run comprehensive LLM evaluation on all test queries
    """
    print("\n" + "="*60)
    print("🚀 COMPREHENSIVE LLM EVALUATION")
    print("="*60)
    
    # Convert existing test_queries to LLM test format
    llm_test_cases = [
        {'query': q[0], 'reference': None} 
        for q in test_queries
    ]
    
    # Run evaluation
    evaluator = AgentResponseEvaluator(
        search_function=hybrid_search,
        answer_function=answer_question_manual,
        llm_evaluator=LLMEvaluator()
    )
    
    results_df = evaluator.evaluate_end_to_end(
        test_cases=llm_test_cases,
        criteria=["relevance", "accuracy", "coherence"]
    )
    
    # Print summary
    evaluator.print_evaluation_summary(results_df)
    
    # Save to file
    filepath = evaluator.save_evaluation_results(results_df, "comprehensive_llm_eval")
    
    return results_df

# Run comprehensive evaluation
# comprehensive_results = run_comprehensive_llm_evaluation()

Starting LLM-based evaluation with rate limit management...
Rate limit: 16 requests per minute

Evaluating case 1/1
  Searching for: 'What components are required in a test dataset to ...'
Hybrid search found 5 results for query: 'What components are required in a test dataset to evaluate AI?'
  Generating answer...
Hybrid search found 5 results for query: 'What components are required in a test dataset to evaluate AI?'
  Evaluating overall_quality...
  Evaluating relevance...
  ⏳ Rate limit hit. Waiting 2.0s before retry 1/3...

📊 LLM EVALUATION SUMMARY

Total test cases evaluated: 1
Average overall score: 3.00/5.0
Score standard deviation: nan

📈 SCORES BY CRITERIA:
  • Overall Quality: 3.00
  • Relevance: 3.00

🏆 BEST PERFORMING QUERY:
  Query: What components are required in a test dataset to evaluate AI?
  Score: 3.00

⚠️ NEEDS IMPROVEMENT:
  Query: What components are required in a test dataset to evaluate AI?
  Score: 3.00

📊 SCORE DISTRIBUTION:
  • Excellent (4.5-5.0): 0 (0.0%)