# Self-Improving RAG System

This notebook demonstrates the self-improvement loop of the `autorag-live` system, where the RAG pipeline automatically optimizes itself through disagreement analysis and iterative refinement.

## Overview

The self-improvement loop consists of:
1. **Evaluation**: Run evaluation suites to measure current performance
2. **Disagreement Analysis**: Analyze disagreements between retrievers
3. **Optimization**: Optimize hybrid weights and other parameters
4. **Acceptance Policy**: Decide whether to accept or reject changes
5. **Iteration**: Repeat the process with improved configuration

## Setup

In [None]:
# Install required packages
# !pip install autorag-live sentence-transformers scipy

import sys
import os
import time
import json
from datetime import datetime
sys.path.append('..')

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import List, Dict, Any, Optional

# Import autorag-live components
from autorag_live.retrievers import bm25, dense, hybrid
from autorag_live.disagreement import metrics
from autorag_live.evals.small import run_small_suite
from autorag_live.pipeline.hybrid_optimizer import (
    grid_search_hybrid_weights, 
    save_hybrid_config,
    load_hybrid_config
)
from autorag_live.pipeline.acceptance_policy import AcceptancePolicy
from autorag_live.augment.synonym_miner import (
    mine_synonyms_from_disagreements,
    update_terms_from_mining
)

# Set up plotting
plt.style.use('default')
import seaborn as sns
sns.set_palette("husl")

print("Setup complete!")

## Sample Data and Initial Setup

Let's set up our test data and initial system configuration.

In [None]:
# Sample corpus
CORPUS = [
    "The sky is blue and beautiful during the day.",
    "The sun rises in the east and sets in the west.",
    "The sun is bright and provides light to Earth.",
    "The sun in the sky is very bright during daytime.",
    "We can see the shining sun, the bright sun in the sky.",
    "The quick brown fox jumps over the lazy dog.",
    "A lazy fox is usually sleeping in its den.",
    "The fox is a mammal that belongs to the canine family.",
    "Machine learning is a subset of artificial intelligence.",
    "Deep learning uses neural networks with multiple layers.",
    "Natural language processing helps computers understand text.",
    "Computer vision enables machines to interpret visual information.",
    "Data science combines statistics, programming, and domain expertise.",
    "Python is a popular programming language for data science.",
    "Jupyter notebooks provide an interactive environment for coding."
]

# Evaluation queries (different from training queries)
EVAL_QUERIES = [
    "bright sun in the sky",
    "fox jumping over dog", 
    "machine learning and AI",
    "programming with Python",
    "data science techniques",
    "natural language processing",
    "computer vision applications"
]

# Training queries for optimization
TRAIN_QUERIES = [
    "sun bright sky",
    "fox dog jump",
    "AI machine learning",
    "Python programming"
]

print(f"Corpus: {len(CORPUS)} documents")
print(f"Training queries: {len(TRAIN_QUERIES)}")
print(f"Evaluation queries: {len(EVAL_QUERIES)}")

# Ensure necessary directories exist
os.makedirs('runs', exist_ok=True)
os.makedirs('reports', exist_ok=True)

## Self-Improvement Loop Implementation

Let's implement the core self-improvement loop that iteratively optimizes the RAG system.

In [None]:
class SelfImprovingRAG:
    """Self-improving RAG system with automatic optimization."""
    
    def __init__(self, corpus: List[str], max_iterations: int = 5):
        self.corpus = corpus
        self.max_iterations = max_iterations
        self.history = []
        self.acceptance_policy = AcceptancePolicy(threshold=0.01)
        
        # Load current configuration
        try:
            self.current_config = load_hybrid_config()
        except:
            # Default configuration
            self.current_config = type('Config', (), {
                'bm25_weight': 0.5,
                'dense_weight': 0.5
            })()
    
    def evaluate_current_performance(self, queries: List[str]) -> Dict[str, float]:
        """Evaluate current system performance."""
        print("📊 Evaluating current performance...")
        
        # Run evaluation suite
        summary = run_small_suite(judge_type="deterministic")
        
        # Calculate average disagreement diversity
        diversity_scores = []
        for query in queries[:3]:  # Use subset for speed
            bm25_results = bm25.bm25_retrieve(query, self.corpus, 5)
            dense_results = dense.dense_retrieve(query, self.corpus, 5)
            hybrid_results = hybrid.hybrid_retrieve(query, self.corpus, 5)
            
            # Calculate diversity as average disagreement
            jaccard_bd = metrics.jaccard_at_k(bm25_results, dense_results)
            jaccard_bh = metrics.jaccard_at_k(bm25_results, hybrid_results)
            jaccard_dh = metrics.jaccard_at_k(dense_results, hybrid_results)
            
            avg_diversity = (jaccard_bd + jaccard_bh + jaccard_dh) / 3
            diversity_scores.append(avg_diversity)
        
        avg_diversity = np.mean(diversity_scores)
        
        metrics_dict = {
            'em': summary['metrics']['em'],
            'f1': summary['metrics']['f1'],
            'relevance': summary['metrics']['relevance'],
            'faithfulness': summary['metrics']['faithfulness'],
            'diversity': avg_diversity,
            'run_id': summary['run_id']
        }
        
        print(f"   EM: {metrics_dict['em']:.3f}, F1: {metrics_dict['f1']:.3f}")
        print(f"   Diversity: {metrics_dict['diversity']:.3f}")
        
        return metrics_dict
    
    def optimize_hybrid_weights(self, train_queries: List[str]) -> Dict[str, Any]:
        """Optimize hybrid retriever weights."""
        print("🔧 Optimizing hybrid weights...")
        
        # Perform grid search
        optimal_weights, best_score = grid_search_hybrid_weights(
            train_queries, 
            self.corpus, 
            k=5, 
            grid_size=4
        )
        
        print(f"   New weights - BM25: {optimal_weights.bm25_weight:.3f}, "
              f"Dense: {optimal_weights.dense_weight:.3f}")
        print(f"   Diversity score: {best_score:.3f}")
        
        return {
            'weights': optimal_weights,
            'score': best_score
        }
    
    def mine_synonyms(self, queries: List[str]) -> int:
        """Mine synonyms from retriever disagreements."""
        print("📚 Mining synonyms from disagreements...")
        
        mined_synonyms = []
        for query in queries[:2]:  # Use subset for speed
            bm25_results = bm25.bm25_retrieve(query, self.corpus, 5)
            dense_results = dense.dense_retrieve(query, self.corpus, 5)
            hybrid_results = hybrid.hybrid_retrieve(query, self.corpus, 5)
            
            synonyms = mine_synonyms_from_disagreements(
                bm25_results, dense_results, hybrid_results
            )
            mined_synonyms.extend(synonyms)
        
        if mined_synonyms:
            update_terms_from_mining(mined_synonyms)
            print(f"   Added {len(mined_synonyms)} synonym groups")
        else:
            print("   No new synonyms found")
        
        return len(mined_synonyms)
    
    def apply_optimization(self, optimization_result: Dict[str, Any]) -> bool:
        """Apply optimization results with acceptance policy."""
        print("✅ Applying optimization...")
        
        def update_func():
            save_hybrid_config(optimization_result['weights'])
            self.current_config = optimization_result['weights']
        
        # Use acceptance policy to decide whether to apply changes
        accepted = self.acceptance_policy.safe_update(
            update_func, 
            ["hybrid_config.json"],
            expected_improvement=optimization_result['score'] - 0.5  # Baseline diversity
        )
        
        if accepted:
            print("   ✅ Optimization accepted")
        else:
            print("   ❌ Optimization rejected (reverted)")
        
        return accepted
    
    def run_improvement_loop(self, train_queries: List[str], eval_queries: List[str]):
        """Run the complete self-improvement loop."""
        print("🚀 Starting Self-Improvement Loop")
        print("=" * 50)
        
        for iteration in range(self.max_iterations):
            print(f"\n🔄 Iteration {iteration + 1}/{self.max_iterations}")
            print("-" * 30)
            
            # Step 1: Evaluate current performance
            current_metrics = self.evaluate_current_performance(eval_queries)
            
            # Step 2: Mine synonyms from disagreements
            synonyms_added = self.mine_synonyms(train_queries)
            
            # Step 3: Optimize hybrid weights
            optimization_result = self.optimize_hybrid_weights(train_queries)
            
            # Step 4: Apply optimization with acceptance policy
            accepted = self.apply_optimization(optimization_result)
            
            # Record iteration results
            iteration_result = {
                'iteration': iteration + 1,
                'timestamp': datetime.now().isoformat(),
                'metrics': current_metrics,
                'optimization': optimization_result,
                'synonyms_added': synonyms_added,
                'accepted': accepted
            }
            
            self.history.append(iteration_result)
            
            # Save progress
            self.save_progress()
            
            print(f"   📈 Iteration {iteration + 1} complete")
            
            # Small delay between iterations
            time.sleep(1)
        
        print("\n🎉 Self-improvement loop completed!")
        return self.history
    
    def save_progress(self):
        """Save improvement loop progress."""
        progress_file = f"runs/improvement_loop_{int(time.time())}.json"
        with open(progress_file, 'w') as f:
            json.dump({
                'history': self.history,
                'final_config': {
                    'bm25_weight': self.current_config.bm25_weight,
                    'dense_weight': self.current_config.dense_weight
                }
            }, f, indent=2)
        
        print(f"   💾 Progress saved to {progress_file}")
    
    def get_improvement_summary(self) -> Dict[str, Any]:
        """Get summary of improvement over iterations."""
        if not self.history:
            return {}
        
        # Extract metrics over time
        iterations = [h['iteration'] for h in self.history]
        em_scores = [h['metrics']['em'] for h in self.history]
        f1_scores = [h['metrics']['f1'] for h in self.history]
        diversity_scores = [h['metrics']['diversity'] for h in self.history]
        
        return {
            'iterations': iterations,
            'em_scores': em_scores,
            'f1_scores': f1_scores,
            'diversity_scores': diversity_scores,
            'total_synonyms': sum(h['synonyms_added'] for h in self.history),
            'accepted_optimizations': sum(h['accepted'] for h in self.history)
        }

print("SelfImprovingRAG class defined!")

## Running the Self-Improvement Loop

Now let's run the self-improvement loop and observe how the system optimizes itself.

In [None]:
# Initialize the self-improving RAG system
rag_system = SelfImprovingRAG(CORPUS, max_iterations=3)  # Use fewer iterations for demo

# Run the improvement loop
history = rag_system.run_improvement_loop(TRAIN_QUERIES, EVAL_QUERIES)

print("\n📊 Improvement Summary:")
summary = rag_system.get_improvement_summary()
print(f"Total iterations: {len(summary.get('iterations', []))}")
print(f"Synonyms added: {summary.get('total_synonyms', 0)}")
print(f"Optimizations accepted: {summary.get('accepted_optimizations', 0)}")

if summary.get('em_scores'):
    print(f"EM improvement: {summary['em_scores'][0]:.3f} → {summary['em_scores'][-1]:.3f}")
    print(f"F1 improvement: {summary['f1_scores'][0]:.3f} → {summary['f1_scores'][-1]:.3f}")
    print(f"Diversity improvement: {summary['diversity_scores'][0]:.3f} → {summary['diversity_scores'][-1]:.3f}")

## Visualizing Improvement Over Time

Let's create visualizations to show how the system improved over iterations.

In [None]:
# Create improvement visualization
if summary:
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    fig.suptitle('Self-Improvement Loop Results', fontsize=16)
    
    iterations = summary.get('iterations', [])
    
    # EM and F1 scores over time
    if summary.get('em_scores'):
        axes[0,0].plot(iterations, summary['em_scores'], 'o-', label='EM Score', linewidth=2)
        axes[0,0].plot(iterations, summary['f1_scores'], 's-', label='F1 Score', linewidth=2)
        axes[0,0].set_title('Retrieval Performance Over Time')
        axes[0,0].set_xlabel('Iteration')
        axes[0,0].set_ylabel('Score')
        axes[0,0].legend()
        axes[0,0].grid(True, alpha=0.3)
    
    # Diversity over time
    if summary.get('diversity_scores'):
        axes[0,1].plot(iterations, summary['diversity_scores'], '^-', 
                       color='orange', linewidth=2)
        axes[0,1].set_title('Retriever Diversity Over Time')
        axes[0,1].set_xlabel('Iteration')
        axes[0,1].set_ylabel('Average Jaccard Similarity')
        axes[0,1].grid(True, alpha=0.3)
    
    # Optimization acceptance
    accepted = [h['accepted'] for h in history]
    axes[1,0].bar(iterations, accepted, color=['green' if a else 'red' for a in accepted])
    axes[1,0].set_title('Optimization Acceptance')
    axes[1,0].set_xlabel('Iteration')
    axes[1,0].set_ylabel('Accepted (1) / Rejected (0)')
    axes[1,0].set_yticks([0, 1])
    
    # Synonyms added per iteration
    synonyms = [h['synonyms_added'] for h in history]
    axes[1,1].bar(iterations, synonyms, color='purple', alpha=0.7)
    axes[1,1].set_title('Synonyms Added per Iteration')
    axes[1,1].set_xlabel('Iteration')
    axes[1,1].set_ylabel('Number of Synonyms')
    
    plt.tight_layout()
    plt.show()
    
    # Print detailed results
    print("\n📋 Detailed Results:")
    print("=" * 50)
    for i, result in enumerate(history):
        print(f"\nIteration {result['iteration']}:")
        print(f"  EM: {result['metrics']['em']:.3f}")
        print(f"  F1: {result['metrics']['f1']:.3f}")
        print(f"  Diversity: {result['metrics']['diversity']:.3f}")
        print(f"  Synonyms added: {result['synonyms_added']}")
        print(f"  Optimization accepted: {result['accepted']}")
        print(f"  Run ID: {result['metrics']['run_id']}")
else:
    print("No improvement data available to visualize.")

## Testing the Improved System

Let's test the final optimized system on some queries to see the improvements.

In [None]:
# Test the optimized system
print("🧪 Testing Optimized System")
print("=" * 40)

# Load the final optimized configuration
try:
    final_config = load_hybrid_config()
    print(f"Final configuration - BM25: {final_config.bm25_weight:.3f}, "
          f"Dense: {final_config.dense_weight:.3f}")
except:
    print("Using default configuration")
    final_config = None

# Test on evaluation queries
test_results = []
for query in EVAL_QUERIES[:3]:  # Test on first 3 queries
    print(f"\nQuery: '{query}'")
    print("-" * 30)
    
    # Get results from optimized hybrid retriever
    results = hybrid.hybrid_retrieve(query, CORPUS, 3)
    
    print("Top 3 results:")
    for i, doc in enumerate(results):
        print(f"  {i+1}. {doc}")
    
    test_results.append({
        'query': query,
        'results': results
    })

print("\n✅ Testing complete!")

## Key Insights from Self-Improvement

From running the self-improvement loop, we can observe:

1. **Automatic Optimization**: The system automatically finds better hybrid weights
2. **Acceptance Policy**: Only accepts improvements that meet quality thresholds
3. **Synonym Mining**: Learns from retriever disagreements to improve retrieval
4. **Iterative Refinement**: Performance improves over multiple iterations
5. **Robustness**: System maintains stability even when optimizations are rejected

## Real-World Applications

This self-improvement approach can be applied to:
- **Production RAG Systems**: Continuous optimization in live environments
- **Research**: Automated hyperparameter tuning and architecture search
- **Quality Assurance**: Ensuring retrieval quality meets standards
- **A/B Testing**: Comparing different retrieval strategies automatically

## Next Steps

- Scale to larger datasets and more complex queries
- Add more optimization dimensions (beyond just weights)
- Implement more sophisticated acceptance policies
- Add human-in-the-loop validation
- Deploy in production with monitoring and alerting

This notebook demonstrates the core self-improvement capabilities of the `autorag-live` system, showing how RAG pipelines can automatically optimize themselves through iterative refinement and disagreement analysis.