# Industry-Standard Approaches: Hybrid Scoring vs RRF

This notebook implements both approaches used in production:

## 1. Weighted Score Fusion
```
final_score = α × embedding_score + β × reranker_score
```
Used by: Pinecone, many RAG systems

## 2. Reciprocal Rank Fusion (RRF)
```
RRF_score = 1/(k + rank_embedding) + 1/(k + rank_reranker)
```
Used by: Elasticsearch, Azure AI Search, Weaviate

## 3. Re-ranker Only (for comparison)
```
final_score = reranker_score
```
Used by: Cohere Rerank, when re-ranker is highly tuned

In [None]:
import yaml
import json
import os
import pickle
from typing import List, Dict, Optional, Literal
import numpy as np
import faiss
import boto3
import uuid
from datetime import datetime

with open('config/config.yaml', 'r') as f:
    config = yaml.safe_load(f)

print("✓ Config loaded")

In [None]:
RERANKER_ENDPOINT = 'your-qwen3-reranker-endpoint'  # <-- UPDATE THIS

In [None]:
class ProductionRetriever:
    """
    Production-grade retriever with multiple fusion strategies:
    
    1. 'weighted' - Weighted score fusion (α × embed + β × rerank)
    2. 'rrf' - Reciprocal Rank Fusion (industry standard)
    3. 'reranker_only' - Trust re-ranker completely
    4. 'embedding_only' - No re-ranking
    """
    
    def __init__(self, config: dict, reranker_endpoint: str):
        self.config = config
        
        # Embedding client
        self.embedding_endpoint_name = config['models']['embedding']['endpoint_name']
        embedding_creds = config['models']['embedding']['credentials']
        self.embedding_client = boto3.client(
            'sagemaker-runtime',
            region_name=embedding_creds['region'],
            aws_access_key_id=embedding_creds['accessKeyId'],
            aws_secret_access_key=embedding_creds['secretAccessKey'],
            aws_session_token=embedding_creds['sessionToken']
        )
        print("✓ Embedding client initialized")
        
        # Re-ranker client
        self.reranker_endpoint = reranker_endpoint
        self.reranker_client = boto3.client(
            'sagemaker-runtime',
            region_name=embedding_creds['region'],
            aws_access_key_id=embedding_creds['accessKeyId'],
            aws_secret_access_key=embedding_creds['secretAccessKey'],
            aws_session_token=embedding_creds['sessionToken']
        )
        print("✓ Re-ranker client initialized")
        
        self.sessions = {}
        self.load_indexes()
    
    def load_indexes(self):
        faiss_path = os.path.join(self.config['storage']['faiss_index'], 'faiss.index')
        self.faiss_index = faiss.read_index(faiss_path)
        
        bm25_path = os.path.join(self.config['storage']['bm25_index'], 'bm25.pkl')
        with open(bm25_path, 'rb') as f:
            self.bm25_index = pickle.load(f)
        
        metadata_path = os.path.join(self.config['storage']['faiss_index'], 'chunk_metadata.json')
        with open(metadata_path, 'r') as f:
            self.chunks = json.load(f)
        
        print(f"✓ Indexes loaded: {len(self.chunks)} chunks")
    
    def get_embedding(self, text: str) -> np.ndarray:
        params = {"inputs": [text], "encoding_format": "float"}
        response = self.embedding_client.invoke_endpoint(
            EndpointName=self.embedding_endpoint_name,
            ContentType='application/json',
            Body=json.dumps(params)
        )
        raw_bytes = response['Body'].read()
        output_data = json.loads(raw_bytes.decode())
        return np.array(output_data[0], dtype='float32')
    
    def get_rerank_scores(self, query: str, documents: List[str]) -> List[float]:
        payload = {"query": query, "documents": documents}
        response = self.reranker_client.invoke_endpoint(
            EndpointName=self.reranker_endpoint,
            ContentType='application/json',
            Body=json.dumps(payload)
        )
        raw_bytes = response['Body'].read()
        output_data = json.loads(raw_bytes.decode('utf-8'))
        
        results = output_data['results']
        scores_with_index = [(r['index'], r['relevance_score']) for r in results]
        scores_with_index.sort(key=lambda x: x[0])
        return [score for idx, score in scores_with_index]
    
    def build_context_query(self, query: str, history: List[Dict]) -> str:
        if not history:
            return query
        recent = history[-3:]
        context_queries = [h.get('query', '') for h in recent if h.get('query')]
        if not context_queries:
            return query
        context = " and ".join(context_queries[-2:]).lower()
        return f"Regarding {context}: {query}"
    
    def hybrid_search(self, query: str, entitlement: str, org_id: str = None,
                      tags: List[str] = None, top_k: int = 20) -> List[Dict]:
        """Embedding-based hybrid search (vector + BM25)"""
        query_embedding = self.get_embedding(query)
        query_embedding = query_embedding.reshape(1, -1).astype('float32')
        faiss.normalize_L2(query_embedding)
        
        initial_top_k = min(top_k * 5, len(self.chunks))
        
        vector_scores, vector_indices = self.faiss_index.search(query_embedding, initial_top_k)
        vector_scores = vector_scores[0]
        vector_indices = vector_indices[0]
        
        tokenized_query = query.lower().split()
        bm25_scores = self.bm25_index.get_scores(tokenized_query)
        
        def normalize(scores):
            min_s, max_s = scores.min(), scores.max()
            if max_s - min_s < 1e-10:
                return np.zeros_like(scores)
            return (scores - min_s) / (max_s - min_s)
        
        vector_scores_norm = normalize(vector_scores)
        bm25_scores_norm = normalize(bm25_scores)
        
        vector_weight = self.config['retrieval']['hybrid']['vector_weight']
        bm25_weight = self.config['retrieval']['hybrid']['bm25_weight']
        
        hybrid_scores = {}
        for idx, score in zip(vector_indices, vector_scores_norm):
            hybrid_scores[idx] = score * vector_weight
        
        for idx, score in enumerate(bm25_scores_norm):
            if idx in hybrid_scores:
                hybrid_scores[idx] += score * bm25_weight
            else:
                hybrid_scores[idx] = score * bm25_weight
        
        sorted_indices = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)
        
        results = []
        for idx, score in sorted_indices:
            chunk = self.chunks[idx].copy()
            
            chunk_entitlements = chunk['entitlement']
            if isinstance(chunk_entitlements, str):
                chunk_entitlements = [chunk_entitlements]
            
            has_access = 'universal' in chunk_entitlements or entitlement in chunk_entitlements
            if not has_access:
                continue
            if org_id and chunk['orgId'] != org_id:
                continue
            if tags and not any(t in chunk['metadata']['tags'] for t in tags):
                continue
            
            chunk['hybrid_score'] = float(score)
            chunk['chunk_idx'] = idx
            results.append(chunk)
        
        return results[:top_k]
    
    # =========================================================================
    # FUSION STRATEGIES
    # =========================================================================
    
    def fuse_weighted(self, candidates: List[Dict], rerank_scores: List[float],
                      embedding_weight: float = 0.6, reranker_weight: float = 0.4) -> List[Dict]:
        """
        STRATEGY 1: Weighted Score Fusion
        
        final_score = α × normalized_embedding + β × normalized_rerank
        
        Used by: Pinecone, many production RAG systems
        """
        hybrid_scores = np.array([c['hybrid_score'] for c in candidates])
        rerank_scores = np.array(rerank_scores)
        
        def normalize(scores):
            min_s, max_s = scores.min(), scores.max()
            if max_s - min_s < 1e-10:
                return np.ones_like(scores) * 0.5
            return (scores - min_s) / (max_s - min_s)
        
        hybrid_norm = normalize(hybrid_scores)
        rerank_norm = normalize(rerank_scores)
        
        for i, chunk in enumerate(candidates):
            chunk['rerank_score'] = float(rerank_scores[i])
            chunk['final_score'] = embedding_weight * hybrid_norm[i] + reranker_weight * rerank_norm[i]
            chunk['fusion_method'] = 'weighted'
        
        candidates.sort(key=lambda x: x['final_score'], reverse=True)
        return candidates
    
    def fuse_rrf(self, candidates: List[Dict], rerank_scores: List[float],
                 k: int = 60) -> List[Dict]:
        """
        STRATEGY 2: Reciprocal Rank Fusion (RRF)
        
        RRF_score = 1/(k + rank_embedding) + 1/(k + rank_reranker)
        
        Used by: Elasticsearch, Azure AI Search, Weaviate
        Industry standard for combining multiple rankings.
        
        k=60 is the standard value (from original paper)
        """
        # Get embedding ranking (already sorted by hybrid_score)
        embedding_ranks = {i: rank for rank, i in enumerate(range(len(candidates)))}
        
        # Get reranker ranking
        rerank_with_idx = [(i, score) for i, score in enumerate(rerank_scores)]
        rerank_with_idx.sort(key=lambda x: x[1], reverse=True)
        reranker_ranks = {idx: rank for rank, (idx, _) in enumerate(rerank_with_idx)}
        
        # Compute RRF score
        for i, chunk in enumerate(candidates):
            emb_rank = embedding_ranks[i]
            rerank_rank = reranker_ranks[i]
            
            rrf_score = (1 / (k + emb_rank)) + (1 / (k + rerank_rank))
            
            chunk['rerank_score'] = float(rerank_scores[i])
            chunk['embedding_rank'] = emb_rank
            chunk['reranker_rank'] = rerank_rank
            chunk['final_score'] = rrf_score
            chunk['fusion_method'] = 'rrf'
        
        candidates.sort(key=lambda x: x['final_score'], reverse=True)
        return candidates
    
    def fuse_reranker_only(self, candidates: List[Dict], rerank_scores: List[float]) -> List[Dict]:
        """
        STRATEGY 3: Re-ranker Only
        
        Trust re-ranker completely for final ranking.
        
        Used by: Cohere Rerank, when re-ranker is highly tuned
        """
        for i, chunk in enumerate(candidates):
            chunk['rerank_score'] = float(rerank_scores[i])
            chunk['final_score'] = float(rerank_scores[i])
            chunk['fusion_method'] = 'reranker_only'
        
        candidates.sort(key=lambda x: x['final_score'], reverse=True)
        return candidates
    
    def fuse_embedding_only(self, candidates: List[Dict]) -> List[Dict]:
        """
        STRATEGY 4: Embedding Only (no re-ranking)
        """
        for chunk in candidates:
            chunk['final_score'] = chunk['hybrid_score']
            chunk['fusion_method'] = 'embedding_only'
        
        # Already sorted by hybrid_score
        return candidates
    
    # =========================================================================
    # MAIN QUERY METHOD
    # =========================================================================
    
    def query(self, query: str, entitlement: str, org_id: str = None,
              tags: List[str] = None, top_k: int = 5,
              candidates_for_rerank: int = 20,
              conversation_history: List[Dict] = None,
              fusion_method: Literal['weighted', 'rrf', 'reranker_only', 'embedding_only'] = 'rrf',
              embedding_weight: float = 0.6,
              reranker_weight: float = 0.4,
              rrf_k: int = 60,
              verbose: bool = False) -> Dict:
        """
        Query with configurable fusion strategy.
        
        Args:
            fusion_method: 'weighted', 'rrf', 'reranker_only', or 'embedding_only'
            embedding_weight: Weight for embeddings (only for 'weighted')
            reranker_weight: Weight for re-ranker (only for 'weighted')
            rrf_k: K parameter for RRF (default 60)
        """
        # Stage 1: Embedding-based retrieval
        candidates = self.hybrid_search(query, entitlement, org_id, tags, candidates_for_rerank)
        
        if not candidates:
            return {'query': query, 'documents': [], 'message': 'No documents found'}
        
        if verbose:
            print(f"\n[Embedding Search] Top 5:")
            for i, c in enumerate(candidates[:5]):
                print(f"  {i+1}. {c['title']} (hybrid: {c['hybrid_score']:.4f})")
        
        # Stage 2: Apply fusion strategy
        if fusion_method == 'embedding_only':
            candidates = self.fuse_embedding_only(candidates)
        else:
            # Get re-ranker scores
            context_query = self.build_context_query(query, conversation_history)
            if verbose and conversation_history:
                print(f"\n[Context Query]: {context_query}")
            
            documents = [c['content'] for c in candidates]
            rerank_scores = self.get_rerank_scores(context_query, documents)
            
            if verbose:
                print(f"\n[Re-ranker Scores] Top 5:")
                temp_ranked = sorted(enumerate(rerank_scores), key=lambda x: x[1], reverse=True)
                for rank, (i, score) in enumerate(temp_ranked[:5]):
                    print(f"  {rank+1}. {candidates[i]['title']} (rerank: {score:.4f})")
            
            # Apply fusion
            if fusion_method == 'weighted':
                candidates = self.fuse_weighted(candidates, rerank_scores, embedding_weight, reranker_weight)
            elif fusion_method == 'rrf':
                candidates = self.fuse_rrf(candidates, rerank_scores, rrf_k)
            elif fusion_method == 'reranker_only':
                candidates = self.fuse_reranker_only(candidates, rerank_scores)
        
        if verbose:
            print(f"\n[Final Ranking ({fusion_method})] Top 5:")
            for i, c in enumerate(candidates[:5]):
                print(f"  {i+1}. {c['title']} (final: {c['final_score']:.4f})")
        
        # Build response
        seen_docs = set()
        result_documents = []
        
        for chunk in candidates:
            doc_id = chunk['doc_id']
            if doc_id not in seen_docs:
                seen_docs.add(doc_id)
                doc_entry = {
                    'document_name': chunk['title'],
                    'doc_id': doc_id,
                    'final_score': chunk['final_score'],
                    'hybrid_score': chunk['hybrid_score'],
                }
                if 'rerank_score' in chunk:
                    doc_entry['rerank_score'] = chunk['rerank_score']
                if 'embedding_rank' in chunk:
                    doc_entry['embedding_rank'] = chunk['embedding_rank']
                    doc_entry['reranker_rank'] = chunk['reranker_rank']
                
                result_documents.append(doc_entry)
                
                if len(result_documents) >= top_k:
                    break
        
        return {
            'query': query,
            'documents': result_documents,
            'fusion_method': fusion_method,
            'history_used': len(conversation_history) if conversation_history else 0
        }
    
    # Session management
    def create_session(self, user_id: str, entitlement: str, org_id: str = None) -> str:
        session_id = str(uuid.uuid4())
        self.sessions[session_id] = {
            'user_id': user_id, 'entitlement': entitlement, 'org_id': org_id, 'query_history': []
        }
        print(f"✓ Created session: {session_id}")
        return session_id
    
    def query_with_session(self, session_id: str, query: str, 
                           fusion_method: str = 'rrf', **kwargs) -> Dict:
        session = self.sessions.get(session_id)
        if not session:
            raise ValueError(f"Session not found: {session_id}")
        
        history = session['query_history'][-3:]
        
        result = self.query(
            query=query,
            entitlement=session['entitlement'],
            org_id=session['org_id'],
            conversation_history=history,
            fusion_method=fusion_method,
            **kwargs
        )
        
        session['query_history'].append({
            'query': query,
            'documents_found': [d['document_name'] for d in result['documents']]
        })
        
        return result

print("✓ ProductionRetriever class defined")

## Initialize Retriever

In [None]:
retriever = ProductionRetriever(
    config=config,
    reranker_endpoint=RERANKER_ENDPOINT
)

## Compare All Fusion Strategies

In [None]:
print("="*70)
print("COMPARISON: All Fusion Strategies")
print("="*70)

test_query = "How do I process a cancellation?"

strategies = [
    ('embedding_only', 'Embedding Only (no re-ranker)'),
    ('reranker_only', 'Re-ranker Only'),
    ('weighted', 'Weighted Fusion (60/40)'),
    ('rrf', 'Reciprocal Rank Fusion (RRF)'),
]

for method, label in strategies:
    print(f"\n{'─'*50}")
    print(f"{label}")
    print(f"{'─'*50}")
    
    result = retriever.query(
        query=test_query,
        entitlement='agent_support',
        org_id='org_123',
        fusion_method=method,
        top_k=5
    )
    
    for i, doc in enumerate(result['documents'], 1):
        extras = ""
        if 'rerank_score' in doc:
            extras = f" | rerank={doc['rerank_score']:.3f}"
        if 'embedding_rank' in doc:
            extras += f" | emb_rank={doc['embedding_rank']}, rerank_rank={doc['reranker_rank']}"
        print(f"  {i}. {doc['document_name']} (final={doc['final_score']:.4f}{extras})")

## Test with Conversation History

In [None]:
print("="*70)
print("TEST: Multi-Turn Conversation with RRF")
print("="*70)

session_id = retriever.create_session('agent_001', 'agent_support', 'org_123')

queries = [
    "How do I cancel a booking?",
    "What about the refund?",
    "What documents do I need?"  # Ambiguous - history helps
]

for i, q in enumerate(queries, 1):
    print(f"\n{'─'*50}")
    print(f"Turn {i}: {q}")
    print(f"{'─'*50}")
    
    result = retriever.query_with_session(
        session_id=session_id,
        query=q,
        fusion_method='rrf',
        verbose=True
    )
    
    print(f"\nTop result: {result['documents'][0]['document_name']}")

## Summary: Which Strategy to Use?

| Strategy | Best When | Industry Usage |
|----------|-----------|----------------|
| **RRF** | General purpose, robust | Elasticsearch, Azure AI Search |
| **Weighted** | You know relative signal quality | Pinecone, custom RAG |
| **Re-ranker Only** | Re-ranker is domain-tuned | Cohere Rerank |
| **Embedding Only** | Low latency needed, re-ranker unreliable | Simple RAG |

### Recommendation for Your Case:

**Use RRF** - It's the industry standard and handles your situation well:
- Doesn't require tuning weights
- Robust to close scores (0.93 vs 0.94 won't flip rankings arbitrarily)
- Combines rankings, not raw scores

```python
result = retriever.query(
    query="...",
    fusion_method='rrf',  # Industry standard
    ...
)
```

In [None]:
print("="*70)
print("NOTEBOOK COMPLETE")
print("="*70)
print("\nRecommended: Use 'rrf' (Reciprocal Rank Fusion)")
print("It's the industry standard and handles close scores well.")