In [7]:
from kvpress import AdapPress

In [27]:
compression_ratio = 0.3
press = AdapPress(compression_ratio)

In [17]:
from kvpress import SnapKVPress
from kvpress import KnormPress
compression_ratio = 0.3
press = KnormPress(compression_ratio)

In [23]:
print(f"{press.__class__.__name__}")

AdapPress


In [28]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import matplotlib.pyplot as plt
import numpy as np
import random
import string
from tqdm import tqdm
import pandas as pd
import seaborn as sns
import time
import os
from datetime import datetime
from matplotlib.colors import LinearSegmentedColormap

# Set random seed for reproducibility
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

def generate_haystack(length, needle_text, needle_position_ratio=None):
    """
    Generate a haystack text with a needle inserted at a specific position.
    
    Args:
        length: Length of the haystack in characters
        needle_text: The needle text to be inserted
        needle_position_ratio: Position of the needle as a ratio of the total length (0.0-1.0)
                              If None, a random position is chosen
    
    Returns:
        haystack_text: The combined text
        needle_position: The character position where the needle starts
    """
    # Generate random text for the haystack
    haystack_chars = ''.join(random.choices(string.ascii_letters + string.digits + ' ' * 5 + ',.!?', k=length))
    
    # Determine needle position
    if needle_position_ratio is None:
        needle_position_ratio = random.uniform(0.2, 0.8)  # Avoid extreme ends
    
    needle_position = int(needle_position_ratio * (length - len(needle_text)))
    
    # Insert the needle
    haystack_text = haystack_chars[:needle_position] + needle_text + haystack_chars[needle_position:length-len(needle_text)]
    
    return haystack_text, needle_position

def run_needle_test(model, tokenizer, haystack_text, question, needle_value, max_new_tokens=50):
    """
    Run a needle-in-a-haystack test with a transformer model.
    
    Args:
        model: The transformer model
        tokenizer: The corresponding tokenizer
        haystack_text: The haystack text containing the needle
        question: The question to ask to retrieve the needle
        needle_value: The expected answer
        max_new_tokens: Maximum tokens to generate
    
    Returns:
        dict: Results including response, success, similarity, etc.
    """
    prompt = f"{haystack_text}\n\nQuestion: {question}\nAnswer:"
    
    # Tokenize and get token count
    tokens = tokenizer(prompt, return_tensors="pt")
    token_count = tokens.input_ids.shape[1]
    
    # Move to GPU if available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    tokens = {key: val.to(device) for key, val in tokens.items()}
    
    start_time = time.time()
    
    try:
        with torch.no_grad(), press(model):
        # with torch.no_grad():
            outputs = model.generate(
                tokens["input_ids"],
                max_new_tokens=max_new_tokens,
                num_return_sequences=1,
                do_sample=False
            )
        
        response = tokenizer.decode(outputs[0][tokens["input_ids"].shape[1]:], skip_special_tokens=True)
        
        # Evaluate success
        from difflib import SequenceMatcher
        
        # Simple exact or partial match check
        if needle_value.lower() in response.lower():
            success = True
            similarity = 1.0
        else:
            # Partial match using character-level similarity
            similarity = SequenceMatcher(None, response.lower(), needle_value.lower()).ratio()
            success = similarity > 0.8  # Adjust threshold as needed
        
        inference_time = time.time() - start_time
        
        return {
            "response": response,
            "success": success,
            "similarity": similarity,
            "token_count": token_count,
            "inference_time": inference_time
        }
    
    except Exception as e:
        print(f"Error during inference: {str(e)}")
        return {
            "response": f"ERROR: {str(e)}",
            "success": False,
            "similarity": 0,
            "token_count": token_count,
            "inference_time": time.time() - start_time
        }

def test_model_across_depths_and_lengths(model_name, context_lengths, depth_positions, test_repeats=2):
    """
    Test a model across different context lengths and document depths.
    
    Args:
        model_name: Name of the model to test
        context_lengths: List of context lengths to test (in characters)
        depth_positions: List of depth positions to test (as ratios 0.0-1.0)
        test_repeats: Number of times to repeat each test for reliability
    
    Returns:
        results_df: DataFrame containing all test results
    """
    print(f"Testing model: {model_name}")
    
    # Load model and tokenizer
    try:
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model = AutoModelForCausalLM.from_pretrained(model_name).to(device)
        print(f"Model loaded on {device}")
    except Exception as e:
        print(f"Error loading model {model_name}: {str(e)}")
        return pd.DataFrame()  # Return empty DataFrame on error
    
    results = []
    
    # Define a fixed needle value for consistent testing
    secret_value = "XYZ42"
    needle_text = f"The secret code is {secret_value}."
    question = "What is the secret code?"
    
    # Loop through all test configurations
    for length in tqdm(context_lengths, desc=f"Testing {model_name}"):
        for depth in depth_positions:
            for repeat in range(test_repeats):
                # Generate haystack with needle
                haystack_text, needle_position = generate_haystack(
                    length, 
                    needle_text, 
                    depth
                )
                
                # Approximate token count (rough estimate)
                char_to_token_ratio = 4  # Approximate ratio of characters to tokens
                token_length_estimate = length // char_to_token_ratio
                
                # Skip if estimated tokens exceed model's known limits
                if token_length_estimate > 8192 and "gpt2" in model_name:  # Example limit for GPT-2
                    result = {
                        "model": model_name,
                        "context_length_chars": length,
                        "context_length_tokens_est": token_length_estimate,
                        "depth_position": depth,
                        "depth_percent": int(depth * 100),  # Add percentage for better visualization
                        "repeat": repeat,
                        "success": False,
                        "similarity": 0,
                        "response": "SKIPPED - Context too long for model",
                        "inference_time": 0
                    }
                else:
                    # Run the test
                    test_result = run_needle_test(
                        model=model, 
                        tokenizer=tokenizer, 
                        haystack_text=haystack_text, 
                        question=question,
                        needle_value=secret_value
                    )
                    
                    # Compile result
                    result = {
                        "model": model_name,
                        "context_length_chars": length,
                        "context_length_tokens_est": token_length_estimate,
                        "context_length_tokens_actual": test_result["token_count"],
                        "depth_position": depth,
                        "depth_percent": int(depth * 100),  # Add percentage for better visualization
                        "repeat": repeat,
                        "success": test_result["success"],
                        "similarity": test_result["similarity"],
                        "score": int(test_result["similarity"] * 100),  # Add score as percentage
                        "response": test_result["response"],
                        "inference_time": test_result["inference_time"]
                    }
                
                results.append(result)
                
                # Save intermediate results to avoid losing progress
                if len(results) % 5 == 0:
                    pd.DataFrame(results).to_csv(f"{model_name.replace('/', '_')}_intermediate_results.csv", index=False)
    
    # Convert to DataFrame
    results_df = pd.DataFrame(results)
    
    # Save full results
    results_df.to_csv(f"{model_name.replace('/', '_')}_results.csv", index=False)
    
    # Free up GPU memory
    if torch.cuda.is_available():
        del model
        torch.cuda.empty_cache()
    
    return results_df

def create_heatmap_visualization(results_df, output_dir="results"):
    """
    Create a heatmap visualization for each model in the results using the specified color scheme.
    
    Args:
        results_df: DataFrame containing test results
        output_dir: Directory to save visualizations
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Get unique models
    models = results_df["model"].unique()
    
    # Define the custom color map as specified
    cmap = LinearSegmentedColormap.from_list("custom_cmap", ["#F0496E", "#EBB839", "#0CD79F"])
    
    for model_name in models:
        model_df = results_df[results_df["model"] == model_name]
        
        # Create a pivot table for the heatmap
        # pivot_table = model_df.pivot_table(
        #     values="score",  # Use percentage score
        #     index="depth_percent",  # Y-axis: depth as percentage
        #     columns="context_length_tokens_est",  # X-axis: tokens
        #     aggfunc="mean"  # Average score for each combination
        # )
        pivot_table = pd.pivot_table(model_df, values='score', index=['depth_percent', 'context_length_chars'], aggfunc='mean').reset_index() # This will aggregate
        pivot_table = pivot_table.pivot(index="depth_percent", columns="context_length_chars", values="score") # This will turn into a proper pivot
        
        # Create the heatmap with better aesthetics
        plt.figure(figsize=(17.5, 8))
        ax = sns.heatmap(
            pivot_table,
            # annot=True,  # Show values in cells
            fmt="g",  # General format for integers
            cmap=cmap,
            vmin=0,
            vmax=100,
            cbar_kws={'label': 'Score'}
        )
        
        # Improve aesthetics
        clean_model_name = model_name.replace("/", "-")
        plt.title(f'Pressure Testing {clean_model_name}\nFact Retrieval Across Context Lengths ("Needle In A HayStack")', fontsize=16)
        plt.xlabel('Token Length')
        plt.ylabel('Depth Percentage')
        plt.xticks(rotation=45)
        plt.yticks(rotation=0)
        
        # Add annotations to cells that are below 80% (indicating potential issues)
        for i in range(pivot_table.shape[0]):
            for j in range(pivot_table.shape[1]):
                if not np.isnan(pivot_table.iloc[i, j]) and pivot_table.iloc[i, j] < 80:
                    ax.text(j + 0.5, i + 0.5, f"{int(pivot_table.iloc[i, j])}%", 
                            ha='center', va='center', color='white', fontweight='bold')
        
        plt.tight_layout()
        
        # Save the figure
        plt.savefig(f"{output_dir}/{clean_model_name}_heatmap_{press.__class__.__name__}_compression{compression_ratio}.png", dpi=300, bbox_inches='tight')
        plt.close()
        
        print(f"Created heatmap for {model_name}")

def compare_models_visualization(results_df, output_dir="results"):
    """
    Create visualizations comparing models across different dimensions.
    
    Args:
        results_df: DataFrame containing test results
        output_dir: Directory to save visualizations
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Get unique models
    models = results_df["model"].unique()
    
    # 1. Success rate by context length for each model
    plt.figure(figsize=(12, 8))
    
    for model_name in models:
        model_df = results_df[results_df["model"] == model_name]
        
        # Group by context length and calculate success rate
        success_by_length = model_df.groupby("context_length_tokens_est")["score"].mean()
        
        plt.plot(success_by_length.index, success_by_length.values, 
                marker='o', linewidth=2, label=model_name)
    
    plt.title("Model Performance by Context Length", fontsize=16)
    plt.xlabel("Context Length (tokens)")
    plt.ylabel("Score (%)")
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.legend()
    plt.tight_layout()
    
    plt.savefig(f"{output_dir}/model_comparison_by_length.png", dpi=300)
    plt.close()
    
    # 2. Success rate by depth position for each model
    plt.figure(figsize=(12, 8))
    
    for model_name in models:
        model_df = results_df[results_df["model"] == model_name]
        
        # Group by depth position and calculate success rate
        success_by_depth = model_df.groupby("depth_percent")["score"].mean()
        
        plt.plot(success_by_depth.index, success_by_depth.values, 
                marker='o', linewidth=2, label=model_name)
    
    plt.title("Model Performance by Document Depth", fontsize=16)
    plt.xlabel("Document Depth (%)")
    plt.ylabel("Score (%)")
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.legend()
    plt.tight_layout()
    
    plt.savefig(f"{output_dir}/model_comparison_by_depth.png", dpi=300)
    plt.close()
    
    # 3. Create a combined heatmap comparison for all models
    plt.figure(figsize=(20, 10))
    
    # Get unique depth percentages and token lengths
    depths = sorted(results_df["depth_percent"].unique())
    lengths = sorted(results_df["context_length_tokens_est"].unique())
    
    # Define subplot grid
    n_models = len(models)
    cols = min(3, n_models)
    rows = (n_models + cols - 1) // cols
    
    # Create the custom color map
    cmap = LinearSegmentedColormap.from_list("custom_cmap", ["#F0496E", "#EBB839", "#0CD79F"])
    
    for i, model_name in enumerate(models):
        model_df = results_df[results_df["model"] == model_name]
        
        # Create pivot table
        pivot_table = model_df.pivot_table(
            values="score",
            index="depth_percent",
            columns="context_length_tokens_est",
            aggfunc="mean"
        )
        
        # Add subplot
        plt.subplot(rows, cols, i+1)
        
        # Create heatmap
        sns.heatmap(
            pivot_table,
            annot=True,
            fmt="g",
            cmap=cmap,
            cbar_kws={'label': 'Score (%)'}
        )
        
        plt.title(model_name)
        plt.xlabel('Token Length')
        plt.ylabel('Depth (%)')
        
    plt.show()
    plt.tight_layout()
    plt.savefig(f"{output_dir}/all_models_comparison.png", dpi=300)
    plt.close()
    
    print("Created model comparison visualizations")

def main():
    """Main function to run the needle-in-haystack tests and create visualizations."""
    
    # Define models to test
    models = [
        "Qwen/Qwen2.5-0.5B-Instruct",
    ]
    
    # Define test parameters with more depth positions as requested
    context_lengths = [1000, 2000, 5000, 10000, 20000]  # Characters
    # More depth positions (10%, 20%, 30%, 40%, 50%, 60%, 70%, 80%, 90%)
    depth_positions = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
    test_repeats = 2  # Number of times to repeat each test
    
    # Create results directory
    output_dir = f"needle_test_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    os.makedirs(output_dir, exist_ok=True)
    
    # Run tests for each model
    all_results = []
    
    for model_name in models:
        model_results = test_model_across_depths_and_lengths(
            model_name, 
            context_lengths, 
            depth_positions, 
            test_repeats
        )
        all_results.append(model_results)
    
    # Combine all results
    combined_results = pd.concat(all_results, ignore_index=True)
    combined_results.to_csv(f"{output_dir}/all_models_results.csv", index=False)
    
    # Create visualizations
    create_heatmap_visualization(combined_results, output_dir)
    # compare_models_visualization(combined_results, output_dir)
    
    print(f"All tests and visualizations completed. Results saved in '{output_dir}'")
    
    # Return the results for further analysis if needed
    return combined_results

if __name__ == "__main__":
    main()

Testing model: Qwen/Qwen2.5-0.5B-Instruct
Model loaded on cuda


Testing Qwen/Qwen2.5-0.5B-Instruct: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [08:14<00:00, 98.83s/it]


Created heatmap for Qwen/Qwen2.5-0.5B-Instruct
All tests and visualizations completed. Results saved in 'needle_test_results_20250501_105108'
