# Phase 4.1 Experimental Lab: Retrieval Accuracy Stress Tests

**Goal:** Build retrieval system ‚Üí corrupt embeddings with noise ‚Üí measure recall degradation

**Break it:** Add adversarial perturbations ‚Üí watch retrieval failures

**Visualize:** Precision-recall curves under corruption, error propagation heatmaps

**Optimize:** Robust embedding normalization, outlier detection filters

---

## Professor's Notes

St. Mark, this lab teaches you that retrieval systems are fragile. In production, embeddings get corrupted by:
- Numerical precision issues
- Memory corruption
- Adversarial attacks
- Data drift

Understanding how retrieval degrades under stress helps you build robust systems that don't fail silently.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.datasets import make_blobs
import pandas as pd
from typing import List, Tuple, Dict
import warnings
warnings.filterwarnings('ignore')

# Set style for better plots
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

print("üî¨ Phase 4.1: Retrieval Accuracy Stress Tests")
print("=" * 50)

## 1. Build Clean Retrieval System

First, let's create a simple but effective retrieval system using embeddings.

In [None]:
class SimpleRetrievalSystem:
    """A simple embedding-based retrieval system"""
    
    def __init__(self, embedding_dim: int = 128):
        self.embedding_dim = embedding_dim
        self.documents = []
        self.embeddings = None
        
    def add_documents(self, docs: List[str], embeddings: np.ndarray):
        """Add documents with their embeddings"""
        self.documents = docs
        self.embeddings = embeddings.copy()
        print(f"‚úÖ Added {len(docs)} documents with {embeddings.shape[1]}D embeddings")
        
    def retrieve(self, query_embedding: np.ndarray, k: int = 5) -> List[Tuple[int, float]]:
        """Retrieve top-k most similar documents"""
        if self.embeddings is None:
            return []
            
        # Compute similarities
        similarities = cosine_similarity(query_embedding.reshape(1, -1), self.embeddings)[0]
        
        # Get top-k indices and scores
        top_k_indices = np.argsort(similarities)[::-1][:k]
        top_k_scores = similarities[top_k_indices]
        
        return list(zip(top_k_indices, top_k_scores))
    
    def corrupt_embeddings(self, noise_level: float, corruption_type: str = 'gaussian') -> np.ndarray:
        """Corrupt embeddings with different types of noise"""
        corrupted = self.embeddings.copy()
        
        if corruption_type == 'gaussian':
            # Add Gaussian noise
            noise = np.random.normal(0, noise_level, corrupted.shape)
            corrupted += noise
            
        elif corruption_type == 'salt_pepper':
            # Salt and pepper noise (random spikes)
            mask = np.random.random(corrupted.shape) < noise_level
            corrupted[mask] = np.random.choice([-1, 1], size=mask.sum())
            
        elif corruption_type == 'adversarial':
            # Adversarial perturbations (small but targeted)
            directions = np.random.normal(0, 1, corrupted.shape)
            directions = directions / np.linalg.norm(directions, axis=1, keepdims=True)
            corrupted += noise_level * directions
            
        # Renormalize to maintain unit length (important for cosine similarity)
        norms = np.linalg.norm(corrupted, axis=1, keepdims=True)
        corrupted = corrupted / norms
        
        return corrupted

In [None]:
# Create synthetic data for testing
def create_test_data(n_docs: int = 1000, n_queries: int = 100, embedding_dim: int = 128):
    """Create synthetic documents and queries with embeddings"""
    
    # Generate document embeddings (clustered)
    doc_centers = np.random.normal(0, 1, (10, embedding_dim))  # 10 clusters
    doc_embeddings, doc_labels = make_blobs(
        n_samples=n_docs, 
        centers=doc_centers,
        cluster_std=0.3,
        random_state=42
    )
    
    # Normalize embeddings
    doc_embeddings = doc_embeddings / np.linalg.norm(doc_embeddings, axis=1, keepdims=True)
    
    # Create document texts
    documents = [f"Document {i} in cluster {label}" for i, label in enumerate(doc_labels)]
    
    # Generate query embeddings (some matching docs, some outliers)
    query_centers = np.vstack([doc_centers[:8], np.random.normal(0, 2, (2, embedding_dim))])
    query_embeddings, query_labels = make_blobs(
        n_samples=n_queries,
        centers=query_centers,
        cluster_std=0.2,
        random_state=43
    )
    
    # Normalize query embeddings
    query_embeddings = query_embeddings / np.linalg.norm(query_embeddings, axis=1, keepdims=True)
    
    return documents, doc_embeddings, doc_labels, query_embeddings, query_labels

# Create test data
documents, doc_embeddings, doc_labels, query_embeddings, query_labels = create_test_data()

print(f"üìÑ Created {len(documents)} documents")
print(f"üîç Created {len(query_embeddings)} queries")
print(f"üìä Document clusters: {len(np.unique(doc_labels))}")
print(f"üéØ Query clusters: {len(np.unique(query_labels))}")

In [None]:
# Initialize retrieval system
retrieval_system = SimpleRetrievalSystem()
retrieval_system.add_documents(documents, doc_embeddings)

# Test clean retrieval
test_query = query_embeddings[0:1]  # First query
results = retrieval_system.retrieve(test_query, k=5)

print("\nüîç Clean Retrieval Results:")
print("Query cluster:", query_labels[0])
for rank, (doc_idx, score) in enumerate(results, 1):
    print(f"{rank}. Doc {doc_idx} (Cluster {doc_labels[doc_idx]}): {score:.4f}")

## 2. Break It: Add Corruption

Now let's systematically corrupt the embeddings and see how retrieval performance degrades.

In [None]:
def evaluate_retrieval_performance(
    system: SimpleRetrievalSystem, 
    queries: np.ndarray, 
    query_labels: np.ndarray,
    doc_labels: np.ndarray,
    k: int = 5
) -> Dict[str, float]:
    """Evaluate retrieval performance metrics"""
    
    precisions = []
    recalls = []
    
    for query_emb, query_label in zip(queries, query_labels):
        results = system.retrieve(query_emb.reshape(1, -1), k=k)
        retrieved_labels = [doc_labels[doc_idx] for doc_idx, _ in results]
        
        # Calculate precision@K (fraction of retrieved docs in same cluster)
        relevant_retrieved = sum(1 for label in retrieved_labels if label == query_label)
        precision = relevant_retrieved / k
        precisions.append(precision)
        
        # Calculate recall@K (fraction of relevant docs retrieved)
        total_relevant = sum(1 for label in doc_labels if label == query_label)
        recall = relevant_retrieved / min(total_relevant, k)  # Cap at k
        recalls.append(recall)
    
    return {
        'precision@k': np.mean(precisions),
        'recall@k': np.mean(recalls),
        'f1@k': 2 * np.mean(precisions) * np.mean(recalls) / (np.mean(precisions) + np.mean(recalls))
    }

# Test clean performance
clean_performance = evaluate_retrieval_performance(
    retrieval_system, query_embeddings, query_labels, doc_labels, k=5
)

print("\n‚ú® Clean System Performance:")
for metric, value in clean_performance.items():
    print(f"{metric}: {value:.4f}")

In [None]:
# Test different corruption levels and types
corruption_levels = [0.0, 0.01, 0.05, 0.1, 0.2, 0.5]
corruption_types = ['gaussian', 'salt_pepper', 'adversarial']

corruption_results = []

for corruption_type in corruption_types:
    print(f"\nüß™ Testing {corruption_type.upper()} corruption:")
    
    for noise_level in corruption_levels:
        # Create corrupted system
        corrupted_embeddings = retrieval_system.corrupt_embeddings(noise_level, corruption_type)
        corrupted_system = SimpleRetrievalSystem()
        corrupted_system.add_documents(documents, corrupted_embeddings)
        
        # Evaluate performance
        performance = evaluate_retrieval_performance(
            corrupted_system, query_embeddings, query_labels, doc_labels, k=5
        )
        
        result = {
            'corruption_type': corruption_type,
            'noise_level': noise_level,
            **performance
        }
        corruption_results.append(result)
        
        print(f"  Noise {noise_level:.3f}: P@5={performance['precision@k']:.4f}, R@5={performance['recall@k']:.4f}")

# Convert to DataFrame for analysis
results_df = pd.DataFrame(corruption_results)
print("\nüìä Collected", len(results_df), "experimental results")

## 3. Visualize: Precision-Recall Curves Under Corruption

Let's create comprehensive visualizations to understand how retrieval degrades.

In [None]:
# Create comprehensive visualization
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Phase 4.1: Retrieval Accuracy Stress Tests', fontsize=16, fontweight='bold')

# Plot 1: Precision vs Noise Level
ax1 = axes[0, 0]
for corruption_type in corruption_types:
    subset = results_df[results_df['corruption_type'] == corruption_type]
    ax1.plot(subset['noise_level'], subset['precision@k'], 
             marker='o', linewidth=2, label=corruption_type.title())

ax1.axhline(y=clean_performance['precision@k'], color='red', linestyle='--', alpha=0.7,
            label=f'Clean Baseline ({clean_performance["precision@k"]:.4f})')
ax1.set_xlabel('Noise Level')
ax1.set_ylabel('Precision@5')
ax1.set_title('Retrieval Precision Degradation')
ax1.legend()
ax1.grid(True, alpha=0.3)

# Plot 2: Recall vs Noise Level
ax2 = axes[0, 1]
for corruption_type in corruption_types:
    subset = results_df[results_df['corruption_type'] == corruption_type]
    ax2.plot(subset['noise_level'], subset['recall@k'], 
             marker='s', linewidth=2, label=corruption_type.title())

ax2.axhline(y=clean_performance['recall@k'], color='red', linestyle='--', alpha=0.7,
            label=f'Clean Baseline ({clean_performance["recall@k"]:.4f})')
ax2.set_xlabel('Noise Level')
ax2.set_ylabel('Recall@5')
ax2.set_title('Retrieval Recall Degradation')
ax2.legend()
ax2.grid(True, alpha=0.3)

# Plot 3: F1 Score Degradation
ax3 = axes[1, 0]
for corruption_type in corruption_types:
    subset = results_df[results_df['corruption_type'] == corruption_type]
    ax3.plot(subset['noise_level'], subset['f1@k'], 
             marker='^', linewidth=2, label=corruption_type.title())

ax3.axhline(y=clean_performance['f1@k'], color='red', linestyle='--', alpha=0.7,
            label=f'Clean Baseline ({clean_performance["f1@k"]:.4f})')
ax3.set_xlabel('Noise Level')
ax3.set_ylabel('F1@5')
ax3.set_title('Overall Retrieval Performance (F1)')
ax3.legend()
ax3.grid(True, alpha=0.3)

# Plot 4: Performance Drop Comparison
ax4 = axes[1, 1]
corruption_at_01 = results_df[results_df['noise_level'] == 0.1]
performance_drop = []

for _, row in corruption_at_01.iterrows():
    drop = clean_performance['f1@k'] - row['f1@k']
    performance_drop.append({
        'type': row['corruption_type'],
        'drop': drop,
        'percentage': (drop / clean_performance['f1@k']) * 100
    })

drop_df = pd.DataFrame(performance_drop)
bars = ax4.bar(drop_df['type'], drop_df['percentage'], 
               color=['skyblue', 'lightcoral', 'lightgreen'])

ax4.set_xlabel('Corruption Type')
ax4.set_ylabel('Performance Drop (%)')
ax4.set_title('F1 Score Drop at 10% Noise Level')
ax4.grid(True, alpha=0.3, axis='y')

# Add value labels on bars
for bar, pct in zip(bars, drop_df['percentage']):
    height = bar.get_height()
    ax4.text(bar.get_x() + bar.get_width()/2., height + 0.5,
             f'{pct:.1f}%', ha='center', va='bottom')

plt.tight_layout()
plt.show()

In [None]:
# Create error propagation heatmap
def create_error_propagation_heatmap():
    """Visualize how errors propagate through the retrieval system"""
    
    # Test with a specific query and show how corruption affects ranking
    test_query_idx = 0
    test_query_emb = query_embeddings[test_query_idx:test_query_idx+1]
    true_cluster = query_labels[test_query_idx]
    
    # Get rankings for different corruption levels
    ranking_changes = []
    
    for noise_level in [0.0, 0.05, 0.1, 0.2]:
        corrupted_emb = retrieval_system.corrupt_embeddings(noise_level, 'gaussian')
        corrupted_system = SimpleRetrievalSystem()
        corrupted_system.add_documents(documents, corrupted_emb)
        
        results = corrupted_system.retrieve(test_query_emb, k=10)
        rankings = [(doc_idx, score, doc_labels[doc_idx] == true_cluster) for doc_idx, score in results]
        ranking_changes.append((noise_level, rankings))
    
    # Create heatmap data
    n_docs = 20  # Show top 20 docs
    heatmap_data = np.zeros((len(ranking_changes), n_docs))
    
    for i, (noise_level, rankings) in enumerate(ranking_changes):
        for j, (doc_idx, score, is_relevant) in enumerate(rankings[:n_docs]):
            heatmap_data[i, j] = 1 if is_relevant else 0
    
    # Plot heatmap
    plt.figure(figsize=(12, 8))
    
    sns.heatmap(heatmap_data, 
                xticklabels=[f'Doc {i+1}' for i in range(n_docs)],
                yticklabels=[f'Noise {level:.3f}' for level, _ in ranking_changes],
                cmap='RdYlGn', cbar_kws={'label': 'Relevant to Query'})
    
    plt.title('Error Propagation: How Corruption Affects Document Ranking')
    plt.xlabel('Retrieved Document Position')
    plt.ylabel('Corruption Level')
    plt.tight_layout()
    plt.show()
    
    return ranking_changes

print("\nüî• Error Propagation Analysis:")
ranking_analysis = create_error_propagation_heatmap()

## 4. Optimize: Robust Retrieval Techniques

Now let's implement and test optimization techniques to make retrieval more robust.

In [None]:
class RobustRetrievalSystem(SimpleRetrievalSystem):
    """Enhanced retrieval system with robustness optimizations"""
    
    def __init__(self, embedding_dim: int = 128, use_normalization: bool = True, 
                 use_outlier_filter: bool = False, outlier_threshold: float = 2.0):
        super().__init__(embedding_dim)
        self.use_normalization = use_normalization
        self.use_outlier_filter = use_outlier_filter
        self.outlier_threshold = outlier_threshold
        
    def add_documents(self, docs: List[str], embeddings: np.ndarray):
        """Add documents with robustness preprocessing"""
        super().add_documents(docs, embeddings)
        
        if self.use_normalization:
            # Apply robust normalization
            norms = np.linalg.norm(self.embeddings, axis=1, keepdims=True)
            # Avoid division by very small numbers
            norms = np.maximum(norms, 1e-8)
            self.embeddings = self.embeddings / norms
            
        if self.use_outlier_filter:
            # Remove outlier embeddings
            centroid = np.mean(self.embeddings, axis=0)
            distances = np.linalg.norm(self.embeddings - centroid, axis=1)
            threshold = np.mean(distances) + self.outlier_threshold * np.std(distances)
            keep_mask = distances <= threshold
            
            self.documents = [doc for doc, keep in zip(self.documents, keep_mask) if keep]
            self.embeddings = self.embeddings[keep_mask]
            
            print(f"üßπ Filtered {len(keep_mask) - keep_mask.sum()} outlier documents")
    
    def retrieve_robust(self, query_embedding: np.ndarray, k: int = 5) -> List[Tuple[int, float]]:
        """Robust retrieval with confidence scoring"""
        if self.embeddings is None:
            return []
            
        # Get more candidates than needed
        candidates = self.retrieve(query_embedding, k=k*3)
        
        if len(candidates) == 0:
            return []
        
        # Apply confidence filtering
        scores = np.array([score for _, score in candidates])
        
        # Filter out low-confidence results
        mean_score = np.mean(scores)
        std_score = np.std(scores)
        confidence_threshold = mean_score - 0.5 * std_score
        
        filtered_candidates = [
            (idx, score) for (idx, score) in candidates 
            if score >= confidence_threshold
        ]
        
        return filtered_candidates[:k]

In [None]:
# Test robust retrieval systems
robust_configs = [
    {'use_normalization': True, 'use_outlier_filter': False, 'name': 'Normalized Only'},
    {'use_normalization': True, 'use_outlier_filter': True, 'name': 'Normalized + Outlier Filter'},
    {'use_normalization': False, 'use_outlier_filter': False, 'name': 'Baseline'}
]

robustness_results = []

for config in robust_configs:
    print(f"\nüõ°Ô∏è Testing {config['name']}:")
    
    # Test on corrupted data
    for noise_level in [0.0, 0.1, 0.2]:
        # Create corrupted embeddings
        corrupted_emb = retrieval_system.corrupt_embeddings(noise_level, 'gaussian')
        
        # Create robust system
        robust_system = RobustRetrievalSystem(
            use_normalization=config['use_normalization'],
            use_outlier_filter=config['use_outlier_filter']
        )
        robust_system.add_documents(documents, corrupted_emb)
        
        # Evaluate performance
        performance = evaluate_retrieval_performance(
            robust_system, query_embeddings, query_labels, doc_labels, k=5
        )
        
        result = {
            'config': config['name'],
            'noise_level': noise_level,
            **performance
        }
        robustness_results.append(result)
        
        print(f"  Noise {noise_level:.1f}: F1={performance['f1@k']:.4f}")

# Compare robustness improvements
robust_df = pd.DataFrame(robustness_results)

# Calculate improvement over baseline
baseline_results = robust_df[robust_df['config'] == 'Baseline'].set_index('noise_level')
improvement_data = []

for config_name in ['Normalized Only', 'Normalized + Outlier Filter']:
    config_results = robust_df[robust_df['config'] == config_name].set_index('noise_level')
    for noise_level in [0.1, 0.2]:
        baseline_f1 = baseline_results.loc[noise_level, 'f1@k']
        config_f1 = config_results.loc[noise_level, 'f1@k']
        improvement = config_f1 - baseline_f1
        improvement_pct = (improvement / baseline_f1) * 100
        
        improvement_data.append({
            'config': config_name,
            'noise_level': noise_level,
            'improvement': improvement,
            'improvement_pct': improvement_pct
        })

improvement_df = pd.DataFrame(improvement_data)
print("\nüìà Robustness Improvements:")
print(improvement_df.to_string(index=False))

In [None]:
# Final visualization: Robustness comparison
plt.figure(figsize=(12, 8))

# Plot F1 scores for different configurations
for config_name in robust_df['config'].unique():
    subset = robust_df[robust_df['config'] == config_name]
    plt.plot(subset['noise_level'], subset['f1@k'], 
             marker='o', linewidth=3, label=config_name, markersize=8)

plt.xlabel('Noise Level', fontsize=12)
plt.ylabel('F1@5 Score', fontsize=12)
plt.title('Retrieval Robustness: Configuration Comparison', fontsize=14, fontweight='bold')
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)

# Add annotations for key improvements
for _, row in improvement_df.iterrows():
    if row['noise_level'] == 0.2:
        plt.annotate(f"+{row['improvement_pct']:.1f}%", 
                    xy=(row['noise_level'], 
                        robust_df[(robust_df['config'] == row['config']) & 
                                 (robust_df['noise_level'] == row['noise_level'])]['f1@k'].values[0]),
                    xytext=(5, 5), textcoords='offset points', fontsize=10,
                    bbox=dict(boxstyle='round,pad=0.3', facecolor='yellow', alpha=0.8))

plt.tight_layout()
plt.show()

## Key Takeaways

### What We Learned:
1. **Retrieval systems are fragile** - Small amounts of noise can cause significant performance degradation
2. **Different corruption types matter** - Adversarial perturbations are more damaging than random noise
3. **Robustness techniques help** - Normalization and outlier filtering can mitigate some degradation
4. **Error propagation is real** - Corruption doesn't just reduce scores, it changes ranking order

### Production Implications:
- **Monitor embedding quality** - Implement checks for embedding corruption
- **Use robust similarity measures** - Consider alternatives to pure cosine similarity
- **Implement fallback strategies** - Have backup retrieval methods for when embeddings fail
- **Regular retraining** - Embeddings drift over time and need periodic refreshing

### Next Steps:
- Experiment with different embedding models (sentence transformers, etc.)
- Test on real datasets with actual semantic similarity
- Implement production monitoring for embedding health

---

**Professor's Challenge:** Can you implement a retrieval system that maintains 90% of its clean performance even with 20% noise? What techniques would you use?