# Fail@k Evaluation Framework

This notebook implements a framework for evaluating language models using both standard accuracy metrics and the fail@k metric.

## What is fail@k?

Unlike pass@k (which counts a question as correct if any of k attempts succeeds), fail@k only counts a question as correct if ALL k attempts are correct. This is a much stricter test of model reliability.

- pass@k tests the upper bound of model potential
- fail@k tests the lower bound of model reliability

As noted in the planning document, if a model is 95% reliable, you still have to check every output. But if it's 99.5% reliable, you might fully trust the model without manual verification.

## Features

- **Organized directory structure**: All results are stored in a structured way by dataset and evaluation type
- **Standardized metrics**: Both regular accuracy and fail@k are implemented consistently
- **Metadata tracking**: Each run is tracked with timestamps and detailed results
- **Visualization tools**: Built-in functions to visualize and compare results
- **Async option**: Both synchronous and asynchronous evaluation modes
- **Multi-model comparison**: Easily compare multiple models on the same dataset
- **Full experiment automation**: Run comprehensive experiments with a single function call

## Getting Started

To run an experiment, first ensure your environment variables are set up correctly (particularly `OPENROUTER_API_KEY`), then use:
1. The individual test functions (`run_accuracy_test`, `run_fail_at_k_test`) for specific experiments
2. Visualization functions (`visualize_results`, `compare_models`) to analyze results
3. The `run_full_experiment` function to run a complete battery of tests

In [1]:
import huggingface
from datasets import load_dataset
import requests
import json
import os
import time
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
from dotenv import load_dotenv
import asyncio
import aiohttp
import aiofiles

# Load environment variables
load_dotenv()

# Create output directories structure
def create_output_dirs():
    """Create organized directory structure for experiment outputs"""
    # Base output directory
    base_dir = Path("results")
    base_dir.mkdir(exist_ok=True)
    
    # Dataset directories
    datasets = ["gsm8k", "math500", "gpqa"]
    for dataset in datasets:
        # Create dataset directory
        dataset_dir = base_dir / dataset
        dataset_dir.mkdir(exist_ok=True)
        
        # Create subdirectories for different evaluation types
        (dataset_dir / "regular").mkdir(exist_ok=True)
        (dataset_dir / "fail_at_k").mkdir(exist_ok=True)
        
        # Create metadata dir for summary results
        (dataset_dir / "metadata").mkdir(exist_ok=True)

# Create directories
create_output_dirs()

## Loading Datasets

In [2]:
gsm8k = load_dataset("gsm8k", "main")
gsm8k_answers = []
for i in range(len(gsm8k["train"])):
    sample = gsm8k["train"][i]
    for line in sample['answer'].split("\n"):
        if line.strip().startswith("####"):
            gsm8k_answers.append(int(line.replace("####", "").strip().replace(",","")))

gsm8k = gsm8k["train"]['question']

gsm8k = gsm8k[:500]
gsm8k_answers = gsm8k_answers[:500]

print("length of gsm8k: ", len(gsm8k))

length of gsm8k:  500


In [22]:
math500 = load_dataset("HuggingFaceH4/MATH-500")
math500_answers = math500['test']['answer']
math500 = math500['test']['problem']
print("length of math500: ", len(math500))

length of math500:  500


In [3]:
gpqa = load_dataset("Idavidrein/gpqa",'gpqa_main')
gpqa_answers = gpqa['train']['Correct Answer']
gpqa = gpqa['train']['Question']
print("length of gpqa: ", len(gpqa))

length of gpqa:  448


In [4]:
lcb_codegen = load_dataset("livecodebench/code_generation_lite", version_tag="release_v5")

## FUNCTIONS

In [5]:
def get_model_response(
    prompt: str, 
    answer: str, 
    dataset: str, 
    model: str,
    run_id: str = None
) -> str:
    """
    Makes a request to the model API to get an answer for a math problem.
    
    Args:
        prompt: The math problem to solve
        answer: The correct answer (for logging purposes)
        dataset: Name of the dataset (gsm8k, math500, etc.)
        model: Model identifier (e.g., "anthropic/claude-3-5-sonnet")
        run_id: Optional identifier for the specific experiment run
    
    Returns:
        The model's extracted answer as a string
    """
    # Create paths based on the dataset and run info
    base_dir = Path("results") / dataset
    run_id = run_id or f"{int(time.time())}"
    
    # Create output file paths
    output_dir = base_dir / "regular"
    output_file = output_dir / f"{model.split('/')[-1]}_{run_id}.txt"
    
    try:
        response = requests.post(
            url="https://openrouter.ai/api/v1/chat/completions",
            headers={
                "Authorization": f"Bearer {os.getenv('OPENROUTER_API_KEY')}",
            },
            data=json.dumps({
                "model": model,
                "messages": [
                    {
                        "role": "user",
                        "content": (
                            """
                            Solve the math problem provided below. 
                            At the very end of your message, provide the answer to the problem in <ANSWER> </ANSWER> tags:
                            In the answer tags, ONLY provide the answer to the problem. 
                            No other text or symbols such as $.
                            Provide your answer in the simpliest form. Examples: 5 instead of 5.00 and 1/2 instead of 2/4.
                            """
                            + prompt
                        )
                    }
                ]
            })
        )
        
        if response.status_code == 200:
            data = response.json()
            model_response = data['choices'][0]['message']['content']
            
            try:
                model_answer = model_response.split("<ANSWER>")[1].split("</ANSWER>")[0].strip()
            except IndexError:
                model_answer = "MISSING_ANSWER_TAGS"
            
            # Ensure the output directory exists
            output_dir.mkdir(exist_ok=True, parents=True)
            
            # Write the results to the output file
            with open(output_file, 'a') as f:
                f.write("QUESTION: " + prompt + "\n")
                f.write("MODEL RESPONSE: " + model_response + "\n\n")
                f.write("MODEL ANSWER: " + model_answer + "\n\n")
                f.write("CORRECT ANSWER: " + answer + "\n\n")
                f.write("-" * 80 + "\n\n")

            return model_answer
        else:
            print(f"Error: {response.status_code}")
            return "ERROR_RESPONSE"
    except Exception as e:
        print(f"Error: {e}")
        return "ERROR_RESPONSE"

In [6]:
def checking_answer(
    model_answer: str, 
    correct_answer: str, 
    dataset: str,
    model: str = "anthropic/claude-3-5-sonnet",
    run_id: str = None
) -> bool:
    """
    Checks if the model's answer is equivalent to the correct answer using another LLM.
    
    Args:
        model_answer: The answer provided by the model
        correct_answer: The correct answer to compare against
        dataset: Name of the dataset (gsm8k, math500, etc.)
        model: Model identifier (e.g., "anthropic/claude-3-5-sonnet")
        run_id: Optional identifier for the specific experiment run
    
    Returns:
        Boolean indicating whether the answers are equivalent
    """
    # Create paths based on the dataset and run info
    base_dir = Path("results") / dataset
    run_id = run_id or f"{int(time.time())}"
    
    # Create output file paths
    output_dir = base_dir / "regular"
    check_file = output_dir / f"checking_{model.split('/')[-1]}_{run_id}.txt"
    
    try:
        response = requests.post(
            url="https://openrouter.ai/api/v1/chat/completions",
            headers={
                "Authorization": f"Bearer {os.getenv('OPENROUTER_API_KEY')}",
            },
            data=json.dumps({
                "model": model,
                "messages": [
                    {
                        "role": "user",
                        "content": (
                            """
                            Check whether or not these two answers are equivalent.
                            At the very end of your message, provide the answer to the problem in <ANSWER> </ANSWER> tags:
                            If the answers are equivalent, there should only be "YES" in the answer tags.
                            If they aren't equivalent, there should only be "NO" in the answer tags.
                            Just a quick check is fine.
                            No need for elaborate reasoning.
                            First answer: """ + model_answer + "\n" + "Second answer: " + correct_answer
                        )
                    }
                ]
            })
        )
        
        if response.status_code == 200:
            data = response.json()
            model_response = data['choices'][0]['message']['content'] # raw response
            
            try:
                responses_equivalent = model_response.split("<ANSWER>")[1].split("</ANSWER>")[0].strip() # YES or NO
            except IndexError:
                responses_equivalent = "NO"
                
            final_answer = responses_equivalent == "YES" # true or false based 
            
            # Ensure the output directory exists
            output_dir.mkdir(exist_ok=True, parents=True)
            
            # Write the check results
            with open(check_file, 'a') as f:
                f.write("CORRECT ANSWER: " + correct_answer + '\n')
                f.write("MODEL OUTPUT: " + model_answer + '\n')
                f.write("COMPARISON ANSWER: ")
                f.write("TRUE\n" if final_answer else "FALSE\n")
                f.write(model_response + '\n')
                f.write("-" * 80 + "\n\n")

            return final_answer
        else:
            print(f"Error: {response.status_code}")
            return False
    except Exception as e:
        print(f"Error: {e}")
        return False

In [7]:
def run_accuracy_test(
    questions: list[str], 
    answers: list[str], 
    dataset: str, 
    model: str,
    max_samples: int = None,
    return_details: bool = True
) -> dict:
    """
    Runs an accuracy test on a dataset with a specific model.
    
    Args:
        questions: List of questions to evaluate
        answers: List of correct answers
        dataset: Name of the dataset (gsm8k, math500, etc.)
        model: Model identifier (e.g., "anthropic/claude-3-5-sonnet")
        max_samples: Maximum number of samples to evaluate (useful for testing)
        return_details: Whether to return detailed results or just the accuracy score
        
    Returns:
        Dictionary containing the test results, including:
        - accuracy: The overall accuracy score
        - run_id: The unique identifier for this test run
        - model: The model used
        - dataset: The dataset used
        - timestamp: When the test was run
        - results: List of individual question results (if return_details=True)
    """
    # Create a unique run ID
    run_id = f"{int(time.time())}"
    
    # Limit the number of samples if specified
    if max_samples is not None:
        questions = questions[:max_samples]
        answers = answers[:max_samples]
    
    # Results to track
    num_correct = 0
    detailed_results = []
    wrong_indices = []
    
    print(f"RUNNING ACCURACY TEST: {model} on {dataset}")
    print(f"Number of questions: {len(questions)}")
    
    # Process each question
    for i in range(len(questions)):
        print(f"Processing question {i+1}/{len(questions)}...", end='\r')
        
        # Get the model's answer
        model_answer = get_model_response(
            questions[i], 
            str(answers[i]), 
            dataset, 
            model, 
            run_id
        )
        
        # Check if the answer is correct
        correct = checking_answer(
            model_answer, 
            str(answers[i]), 
            dataset, 
            model=model, 
            run_id=run_id
        )
        
        # Track the results
        if correct:
            num_correct += 1
        else:
            wrong_indices.append(i)
            print(f"WRONG {i}")
        
        # Store detailed result
        if return_details:
            detailed_results.append({
                "index": i,
                "question": questions[i],
                "correct_answer": str(answers[i]),
                "model_answer": model_answer,
                "is_correct": correct
            })
    
    # Calculate accuracy
    accuracy = num_correct / len(questions)
    
    # Create a metadata file to store the summary results
    metadata_dir = Path("results") / dataset / "metadata"
    metadata_dir.mkdir(exist_ok=True, parents=True)
    
    # Create a summary of the test run
    summary = {
        "accuracy": accuracy,
        "run_id": run_id,
        "model": model,
        "dataset": dataset,
        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
        "num_questions": len(questions),
        "num_correct": num_correct,
        "wrong_indices": wrong_indices
    }
    
    # Add detailed results if requested
    if return_details:
        summary["results"] = detailed_results
    
    # Save the summary as JSON
    with open(metadata_dir / f"accuracy_{model.split('/')[-1]}_{run_id}.json", 'w') as f:
        json.dump(summary, f, indent=2)
    
    print(f"ACCURACY for {model} on {dataset}: {accuracy:.4f}")
    
    return summary

In [8]:
def run_fail_at_k_test(
    questions: list[str], 
    answers: list[str], 
    dataset: str, 
    model: str,
    k: int = 4,
    max_samples: int = None,
    return_details: bool = True
) -> dict:
    """
    Runs a fail@k test on a dataset with a specific model.
    In fail@k, a question is only considered correct if the model gets it right
    on ALL k attempts.
    
    Args:
        questions: List of questions to evaluate
        answers: List of correct answers
        dataset: Name of the dataset (gsm8k, math500, etc.)
        model: Model identifier (e.g., "anthropic/claude-3-5-sonnet")
        k: Number of attempts per question
        max_samples: Maximum number of samples to evaluate (useful for testing)
        return_details: Whether to return detailed results or just the accuracy score
        
    Returns:
        Dictionary containing the test results, including:
        - fail_at_k: The overall fail@k score
        - run_id: The unique identifier for this test run
        - model: The model used
        - dataset: The dataset used
        - k: The number of attempts per question
        - timestamp: When the test was run
        - results: List of individual question results (if return_details=True)
    """
    # Create a unique run ID
    run_id = f"fail{k}_{int(time.time())}"
    
    # Limit the number of samples if specified
    if max_samples is not None:
        questions = questions[:max_samples]
        answers = answers[:max_samples]
    
    # Results to track
    num_passed = 0
    detailed_results = []
    
    print(f"RUNNING FAIL@{k} TEST: {model} on {dataset}")
    print(f"Number of questions: {len(questions)}")
    
    # Create output directories
    base_dir = Path("results") / dataset
    output_dir = base_dir / "fail_at_k"
    output_dir.mkdir(exist_ok=True, parents=True)
    
    # Process each question
    for i in range(len(questions)):
        print(f"Running question {i+1}/{len(questions)}")
        
        # Initialize results for this question
        question_passed = True
        question_attempts = []
        
        # Make k attempts for this question
        for j in range(k):
            print(f"  Attempt {j+1}/{k}...", end='\r')
            
            # Create file names for this specific attempt
            attempt_output_file = output_dir / f"{model.split('/')[-1]}_{run_id}_q{i}_a{j}.txt"
            attempt_check_file = output_dir / f"checking_{model.split('/')[-1]}_{run_id}_q{i}_a{j}.txt"
            
            # Get the model's answer
            model_answer = get_model_response(
                questions[i], 
                str(answers[i]), 
                dataset, 
                model, 
                f"{run_id}_q{i}_a{j}"
            )
            
            # Check if the answer is correct
            correct = checking_answer(
                model_answer, 
                str(answers[i]), 
                dataset, 
                model=model, 
                run_id=f"{run_id}_q{i}_a{j}"
            )
            
            # Track the result of this attempt
            question_attempts.append({
                "attempt": j,
                "model_answer": model_answer,
                "is_correct": correct
            })
            
            # In fail@k, if any attempt fails, the whole question fails
            if not correct:
                question_passed = False
                print(f"  Failed at attempt {j+1}")
                break
        
        # If all attempts passed, increment the counter
        if question_passed:
            num_passed += 1
            print(f"  Question {i+1} PASSED (all {k} attempts correct)")
        
        # Store detailed result for this question
        if return_details:
            detailed_results.append({
                "index": i,
                "question": questions[i],
                "correct_answer": str(answers[i]),
                "passed": question_passed,
                "attempts": question_attempts
            })
    
    # Calculate fail@k score
    fail_at_k_score = num_passed / len(questions)
    
    # Create a metadata file to store the summary results
    metadata_dir = Path("results") / dataset / "metadata"
    metadata_dir.mkdir(exist_ok=True, parents=True)
    
    # Create a summary of the test run
    summary = {
        "fail_at_k_score": fail_at_k_score,
        "run_id": run_id,
        "model": model,
        "dataset": dataset,
        "k": k,
        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
        "num_questions": len(questions),
        "num_passed": num_passed
    }
    
    # Add detailed results if requested
    if return_details:
        summary["results"] = detailed_results
    
    # Save the summary as JSON
    with open(metadata_dir / f"fail{k}_{model.split('/')[-1]}_{run_id}.json", 'w') as f:
        json.dump(summary, f, indent=2)
    
    print(f"FAIL@{k} score for {model} on {dataset}: {fail_at_k_score:.4f}")
    
    return summary

In [9]:
def visualize_results(dataset: str = None, run_id: str = None):
    """
    Visualizes results from accuracy and fail@k tests.
    
    Args:
        dataset: Name of the dataset to visualize (or None for all datasets)
        run_id: Specific run ID to visualize (or None for all runs)
    """
    # Initialize data for visualization
    accuracy_data = []
    fail_at_k_data = []
    
    # Get all metadata directories
    base_dir = Path("results")
    if dataset:
        datasets = [dataset]
    else:
        datasets = [d.name for d in base_dir.iterdir() if d.is_dir()]
    
    for ds in datasets:
        metadata_dir = base_dir / ds / "metadata"
        if not metadata_dir.exists():
            continue
        
        # Process all JSON files in the metadata directory
        for json_file in metadata_dir.glob("*.json"):
            # Skip if a specific run_id was requested and this isn't it
            if run_id and run_id not in json_file.name:
                continue
                
            with open(json_file, 'r') as f:
                data = json.load(f)
                
            # Add to appropriate dataset based on the file name
            if "accuracy" in json_file.name:
                accuracy_data.append({
                    "dataset": data["dataset"],
                    "model": data["model"],
                    "accuracy": data["accuracy"],
                    "timestamp": data["timestamp"],
                    "run_id": data["run_id"]
                })
            elif "fail" in json_file.name:
                fail_at_k_data.append({
                    "dataset": data["dataset"],
                    "model": data["model"],
                    "fail_at_k_score": data["fail_at_k_score"],
                    "k": data["k"],
                    "timestamp": data["timestamp"],
                    "run_id": data["run_id"]
                })
    
    # Create DataFrames for visualization
    if accuracy_data:
        accuracy_df = pd.DataFrame(accuracy_data)
        print("== ACCURACY RESULTS ==")
        print(accuracy_df[["dataset", "model", "accuracy", "timestamp"]])
        
        # Plot accuracy results
        plt.figure(figsize=(12, 6))
        for dataset in accuracy_df["dataset"].unique():
            dataset_df = accuracy_df[accuracy_df["dataset"] == dataset]
            for model in dataset_df["model"].unique():
                model_df = dataset_df[dataset_df["model"] == model]
                plt.bar(f"{dataset} - {model.split('/')[-1]}", model_df["accuracy"].values[0])
        
        plt.title("Accuracy Results by Dataset and Model")
        plt.ylabel("Accuracy")
        plt.xticks(rotation=45, ha="right")
        plt.tight_layout()
        plt.show()
    
    if fail_at_k_data:
        fail_at_k_df = pd.DataFrame(fail_at_k_data)
        print("\n== FAIL@K RESULTS ==")
        print(fail_at_k_df[["dataset", "model", "k", "fail_at_k_score", "timestamp"]])
        
        # Plot fail@k results
        plt.figure(figsize=(12, 6))
        for dataset in fail_at_k_df["dataset"].unique():
            dataset_df = fail_at_k_df[fail_at_k_df["dataset"] == dataset]
            for model in dataset_df["model"].unique():
                model_df = dataset_df[dataset_df["model"] == model]
                for k in model_df["k"].unique():
                    k_df = model_df[model_df["k"] == k]
                    plt.bar(f"{dataset} - {model.split('/')[-1]} (k={k})", k_df["fail_at_k_score"].values[0])
        
        plt.title("Fail@k Results by Dataset, Model, and k")
        plt.ylabel("Fail@k Score")
        plt.xticks(rotation=45, ha="right")
        plt.tight_layout()
        plt.show()
    
    if not accuracy_data and not fail_at_k_data:
        print("No results found for the specified criteria.")

def compare_models(dataset: str, models: list, max_samples: int = 20):
    """
    Runs an accuracy test for multiple models on the same dataset and visualizes the results.
    
    Args:
        dataset: Name of the dataset to test on
        models: List of model identifiers to compare
        max_samples: Maximum number of samples to evaluate
    """
    # Load the dataset
    if dataset == "gsm8k":
        questions = gsm8k[:max_samples]
        answers = gsm8k_answers[:max_samples]
    elif dataset == "math500":
        questions = math500[:max_samples]
        answers = math500_answers[:max_samples]
    elif dataset == "gpqa":
        questions = gpqa[:max_samples]
        answers = gpqa_answers[:max_samples]
    else:
        print(f"Unknown dataset: {dataset}")
        return
    
    # Create a common run ID for this comparison
    run_id = f"compare_{int(time.time())}"
    
    # Run tests for each model
    results = []
    for model in models:
        print(f"\nTesting model: {model}")
        result = run_accuracy_test(
            questions, 
            answers, 
            dataset, 
            model, 
            max_samples=max_samples,
            return_details=False
        )
        results.append(result)
    
    # Create a DataFrame for comparison
    comparison_df = pd.DataFrame([
        {
            "model": r["model"].split('/')[-1],
            "accuracy": r["accuracy"],
            "num_correct": r["num_correct"],
            "num_questions": r["num_questions"]
        }
        for r in results
    ])
    
    # Print the comparison table
    print("\n== MODEL COMPARISON ==")
    print(comparison_df)
    
    # Visualize the comparison
    plt.figure(figsize=(10, 6))
    plt.bar(comparison_df["model"], comparison_df["accuracy"])
    plt.title(f"Model Accuracy Comparison on {dataset}")
    plt.ylabel("Accuracy")
    plt.ylim(0, 1)
    plt.xticks(rotation=45, ha="right")
    plt.tight_layout()
    plt.show()
    
    return comparison_df

In [11]:
# Example: Running a regular accuracy test
gsm8k_test_result = run_accuracy_test(
    gsm8k,  # Questions
    gsm8k_answers,  # Answers
    "gsm8k",  # Dataset name
    "anthropic/claude-3.5-sonnet",  # Model to use
    max_samples=10  # Limit to 10 samples for a quick test
)

RUNNING ACCURACY TEST: anthropic/claude-3.5-sonnet on gsm8k
Number of questions: 10
ACCURACY for anthropic/claude-3.5-sonnet on gsm8k: 1.0000


In [12]:
# Example: Running a fail@k test
fail_at_k_result = run_fail_at_k_test(
    gsm8k,  # Questions
    gsm8k_answers,  # Answers
    "gsm8k",  # Dataset name
    "anthropic/claude-3.5-sonnet",  # Model to use
    k=2,  # Number of attempts per question
    max_samples=5  # Limit to 5 samples for a quick test
)

RUNNING FAIL@2 TEST: anthropic/claude-3.5-sonnet on gsm8k
Number of questions: 5
Running question 1/5
  Question 1 PASSED (all 2 attempts correct)
Running question 2/5
  Question 2 PASSED (all 2 attempts correct)
Running question 3/5
  Question 3 PASSED (all 2 attempts correct)
Running question 4/5
  Question 4 PASSED (all 2 attempts correct)
Running question 5/5
  Question 5 PASSED (all 2 attempts correct)
FAIL@2 score for anthropic/claude-3.5-sonnet on gsm8k: 1.0000


In [None]:
# Example: Visualizing results for a specific dataset
visualize_results(dataset="gsm8k")

In [None]:
# Example: Comparing multiple models on a dataset
comparison = compare_models(
    dataset="gsm8k",
    models=[
        "anthropic/claude-3-5-sonnet",
        "google/gemini-1.5-pro",
        "openai/gpt-4o"
    ],
    max_samples=5  # Using just 5 samples for a quick test
)

In [None]:
print("RUNNING FAIL@K ACCURACY TESTS:")

correct = 0
for i in range(len(gsm8k)):
    passing = True
    print(f"Running the {i}th sample")
    for j in range(4):
        print(f"{j}th try")
        answer = get_model_response(gsm8k[i],str(gsm8k_answers[i]),"fail.txt")
        correct = checking_answer(answer,str(gsm8k_answers[i]),"checking_gsm8k_fail.txt")
        if correct:
            correct+=1
        else:
            passing = False
            print(f"Failed at the {j}th example")
            break
    if passing:
        correct+=1

print("ACCURACY FOR FAIL@K: ", correct/len(gsm8k_answers))

RUNNING FAIL@K ACCURACY TESTS:
Running the 0th sample
0th try
CORRECT ANSWER:  15
MODEL ANSWER:  15
1th try
CORRECT ANSWER:  15
MODEL ANSWER:  15
2th try
CORRECT ANSWER:  15
MODEL ANSWER:  15
3th try
CORRECT ANSWER:  15
MODEL ANSWER:  15
Running the 1th sample
0th try
CORRECT ANSWER:  36
MODEL ANSWER:  36
1th try
CORRECT ANSWER:  36
MODEL ANSWER:  36
2th try
CORRECT ANSWER:  36
MODEL ANSWER:  36
3th try
CORRECT ANSWER:  36
MODEL ANSWER:  36
Running the 2th sample
0th try
CORRECT ANSWER:  25
MODEL ANSWER:  25
1th try
CORRECT ANSWER:  25
MODEL ANSWER:  25
2th try
CORRECT ANSWER:  25
MODEL ANSWER:  25
3th try
CORRECT ANSWER:  25
MODEL ANSWER:  25
Running the 3th sample
0th try
CORRECT ANSWER:  258
MODEL ANSWER:  258
1th try
CORRECT ANSWER:  258
MODEL ANSWER:  258
2th try
CORRECT ANSWER:  258
MODEL ANSWER:  258
3th try
CORRECT ANSWER:  258
MODEL ANSWER:  258
Running the 4th sample
0th try
CORRECT ANSWER:  96
MODEL ANSWER:  96
1th try
CORRECT ANSWER:  96
MODEL ANSWER:  96
2th try
CORRECT AN

Accuracy for above is 0.94 (three failed examples)

In [None]:
print("INDICES OF PROBLEMS THE MODEL GETS WRONG:")
wrong_indices = [59,63,73, 108, 118, 167]
print(wrong_indices)

### MATH500

In [102]:
length = 30
math500_subset_questions = math500[:length]
math500_subset_answers = math500_answers[:length]

with open('math500.txt', 'w') as f:
    f.write('')
with open('checking_math500.txt', 'w') as f:
    f.write('')


print("RUNNING ACCURACY TESTS:")

correct_count = 0
for i in range(length):
    answer = get_model_response(math500_subset_questions[i],math500_subset_answers[i],'math500.txt')

    correct = checking_answer(answer,math500_subset_answers[i],"checking_math500.txt")
    if correct:
        correct_count +=1
        print("CORRECT")
    else:
        print("WRONG")


print("ACCURACY for regular: ", correct_count/len(math500_subset_answers))

RUNNING ACCURACY TESTS:
CORRECT ANSWER:  \left( 3, \frac{\pi}{2} \right)
MODEL ANSWER:  (3,π/2)
CORRECT
CORRECT ANSWER:  p - q
MODEL ANSWER:  p - q
CORRECT
CORRECT ANSWER:  \frac{14}{3}
MODEL ANSWER:  14/3
CORRECT
CORRECT ANSWER:  9
MODEL ANSWER:  9
CORRECT
CORRECT ANSWER:  \text{Evelyn}
MODEL ANSWER:  Evelyn
CORRECT
CORRECT ANSWER:  42
MODEL ANSWER:  42
CORRECT
CORRECT ANSWER:  27
MODEL ANSWER:  27
CORRECT
CORRECT ANSWER:  90^\circ
MODEL ANSWER:  72
WRONG
CORRECT ANSWER:  3\sqrt{13}
MODEL ANSWER:  3√13
CORRECT
CORRECT ANSWER:  4
MODEL ANSWER:  2
WRONG
CORRECT ANSWER:  2220
MODEL ANSWER:  2220
CORRECT
CORRECT ANSWER:  \frac{3}{56}
MODEL ANSWER:  8/63
WRONG
CORRECT ANSWER:  284
MODEL ANSWER:  284
CORRECT
CORRECT ANSWER:  5
MODEL ANSWER:  5
CORRECT
CORRECT ANSWER:  \sqrt{51}
MODEL ANSWER:  √51
CORRECT
CORRECT ANSWER:  6 - 5i
MODEL ANSWER:  6 - 5i
CORRECT
CORRECT ANSWER:  -50
MODEL ANSWER:  -50
CORRECT
CORRECT ANSWER:  \pi
MODEL ANSWER:  π
CORRECT
CORRECT ANSWER:  28
MODEL ANSWER:  56
WRO

In [None]:
length = 30
math500_subset_questions = math500[:length]
math500_subset_answers = math500_answers[:length]

with open('math500_fail.txt', 'w') as f:
    f.write('')
with open('checking_math500_fail.txt', 'w') as f:
    f.write('')


print("RUNNING TESTS FOR FAIL@4:")

correct_count = 0
for i in range(length):
    passing = True
    print(f"Running the {i}th sample")
    for j in range(4):
        print(f"{j}th try")
        answer = get_model_response(math500_subset_questions[i],math500_subset_answers[i],"math500_fail.txt")
        correct = checking_answer(answer,math500_subset_answers[i],"checking_math500_fail.txt")

        if correct:
            print("correct")
            pass
        else:
            passing = False
            print(f"Failed at the {j}th try")
            break
    if passing:
        print(f"model passed the {i}th sample")
        correct_count+=1


print("ACCURACY for FAIL@4: ", correct_count/len(math500_subset_answers))

RUNNING TESTS FOR FAIL@4:
Running the 0th sample
0th try
CORRECT ANSWER:  \left( 3, \frac{\pi}{2} \right)
MODEL ANSWER:  (3, π/2)
correct
1th try
CORRECT ANSWER:  \left( 3, \frac{\pi}{2} \right)
MODEL ANSWER:  (3,π/2)
correct
2th try
CORRECT ANSWER:  \left( 3, \frac{\pi}{2} \right)
MODEL ANSWER:  (3,π/2)
correct
3th try
CORRECT ANSWER:  \left( 3, \frac{\pi}{2} \right)
MODEL ANSWER:  (3,1.5708)
correct
model passed the 0th sample
Running the 1th sample
0th try
CORRECT ANSWER:  p - q
MODEL ANSWER:  p - q
correct
1th try
CORRECT ANSWER:  p - q
MODEL ANSWER:  p - q
correct
2th try
CORRECT ANSWER:  p - q
MODEL ANSWER:  p-q
correct
3th try
CORRECT ANSWER:  p - q
MODEL ANSWER:  p-q
correct
model passed the 1th sample
Running the 2th sample
0th try
CORRECT ANSWER:  \frac{14}{3}
MODEL ANSWER:  14/3
correct
1th try
CORRECT ANSWER:  \frac{14}{3}
MODEL ANSWER:  14/3
correct
2th try
CORRECT ANSWER:  \frac{14}{3}
MODEL ANSWER:  14/3
correct
3th try
CORRECT ANSWER:  \frac{14}{3}
MODEL ANSWER:  14/3
c

accuracy above is 57%

Claude 3.5 sonnet accuracy vs fail@4 score on a subset of MATH500 is 73% and 57%

### GPQA

In [12]:
accuracy(gpqa,gpqa_answers,"gpqa","anthropic/claude-3.7-sonnet")

NameError: name 'accuracy' is not defined

In [None]:
fail_k(gpqa,gpqa_answers,"gpqa",4,"anthropic/claude-3.7-sonnet")

## Async

In [13]:
async def get_model_response_async(
    session: aiohttp.ClientSession,
    prompt: str,
    answer: str,
    dataset: str,
    model: str = "openai/gpt-4",
    run_id: str = None
) -> str:
    """
    Asynchronously calls the model endpoint and writes the response to a file.
    Returns the model's extracted answer string.
    """
    # Create paths based on the dataset and run info
    base_dir = Path("results") / dataset
    run_id = run_id or f"async_{int(time.time())}"
    
    # Create output file paths
    output_dir = base_dir / "regular"
    output_file = output_dir / f"{model.split('/')[-1]}_{run_id}.txt"
    
    # Ensure the output directory exists
    output_dir.mkdir(exist_ok=True, parents=True)
    
    url = "https://openrouter.ai/api/v1/chat/completions"
    headers = {
        "Authorization": f"Bearer {os.getenv('OPENROUTER_API_KEY')}",
    }
    payload = {
        "model": model,
        "messages": [
            {
                "role": "user",
                "content": (
                    "Solve the math problem provided below. "
                    "At the very end of your message, provide the answer to the problem in <ANSWER> </ANSWER> tags:\n"
                    "In the answer tags, ONLY provide the answer to the problem. "
                    "No other text or symbols such as $. "
                    "Provide your answer in the simplest form.\n\n"
                    + prompt
                )
            }
        ]
    }
    try:
        async with session.post(url, headers=headers, json=payload) as resp:
            if resp.status == 200:
                data = await resp.json()
                model_response = data['choices'][0]['message']['content']
                # Parse out <ANSWER> tags
                try:
                    model_answer = model_response.split("<ANSWER>")[1].split("</ANSWER>")[0].strip()
                except:
                    model_answer = "MISSING_ANSWER_TAGS"

                async with aiofiles.open(output_file, 'a') as f:
                    await f.write("QUESTION: " + prompt + "\n")
                    await f.write("MODEL RESPONSE: " + model_response + "\n\n")
                    await f.write("MODEL ANSWER: " + model_answer + "\n\n")
                    await f.write("CORRECT ANSWER: " + answer + "\n\n")
                    await f.write("-" * 80 + "\n\n")

                return model_answer
            else:
                print(f"Error: {resp.status}")
                return "ERROR_RESPONSE"
    except Exception as e:
        print(f"Error: {e}")
        return "ERROR_RESPONSE"

In [14]:
async def checking_answer_async(
    session: aiohttp.ClientSession,
    model_answer: str,
    correct_answer: str,
    dataset: str,
    model: str = "anthropic/claude-3-5-sonnet",
    run_id: str = None
) -> bool:
    """
    Asynchronously checks whether the model_answer and correct_answer
    are equivalent by calling the specified model via the API endpoint.
    Writes comparison logs to a file. Returns True or False.
    """
    # Create paths based on the dataset and run info
    base_dir = Path("results") / dataset
    run_id = run_id or f"async_{int(time.time())}"
    
    # Create output file paths
    output_dir = base_dir / "regular"
    check_file = output_dir / f"checking_{model.split('/')[-1]}_{run_id}.txt"
    
    # Ensure the output directory exists
    output_dir.mkdir(exist_ok=True, parents=True)
    
    url = "https://openrouter.ai/api/v1/chat/completions"
    headers = {
        "Authorization": f"Bearer {os.getenv('OPENROUTER_API_KEY')}",
    }
    payload = {
        "model": model,
        "messages": [
            {
                "role": "user",
                "content": (
                    "Check whether or not these two answers are equivalent.\n"
                    "At the very end of your message, provide the answer to the problem in <ANSWER> </ANSWER> tags:\n"
                    "If the answers are equivalent, there should only be 'YES' in the answer tags.\n"
                    "If they aren't equivalent, there should only be 'NO' in the answer tags.\n"
                    "First answer: " + model_answer + "\n" + "Second answer: " + correct_answer
                )
            }
        ]
    }
    
    try:
        async with session.post(url, headers=headers, json=payload) as resp:
            resp_text = await resp.text()
            
            if resp.status == 200:
                data = await resp.json()
                model_response = data['choices'][0]['message']['content']
                # Extract <ANSWER>YES</ANSWER> or <ANSWER>NO</ANSWER>
                try:
                    responses_equivalent = model_response.split("<ANSWER>")[1].split("</ANSWER>")[0].strip()
                except:
                    responses_equivalent = "NO"

                final_answer = (responses_equivalent == "YES")
                
                async with aiofiles.open(check_file, 'a') as f:
                    await f.write("CORRECT ANSWER: " + correct_answer + '\n')
                    await f.write("MODEL OUTPUT: " + model_answer + '\n')
                    await f.write("COMPARISON ANSWER: ")
                    await f.write("TRUE\n" if final_answer else "FALSE\n")
                    await f.write(model_response + '\n')
                    await f.write("-" * 80 + "\n\n")

                return final_answer
            else:
                print(f"Error: {resp.status}")
                print(f"Error details: {resp_text}")
                return False
    except Exception as e:
        print(f"Exception in checking_answer: {e}")
        import traceback
        print(traceback.format_exc())
        return False

In [15]:
async def handle_question_async(
    session: aiohttp.ClientSession,
    question: str,
    answer: str,
    dataset: str,
    model: str = "openai/gpt-4",
    run_id: str = None,
    delay: int = 0
) -> bool:
    """
    Orchestrates getting the model's response for one question and checking correctness.
    Returns True if correct, False otherwise.
    """
    if delay > 0:
        await asyncio.sleep(delay)
        
    model_answer = await get_model_response_async(
        session, question, str(answer), dataset, model, run_id
    )
    
    if model_answer == "ERROR_RESPONSE":
        return False  # If we got an error, treat it as incorrect.
        
    return await checking_answer_async(
        session, model_answer, str(answer), dataset, model=model, run_id=run_id
    )

In [16]:
gsm8k_questions = gsm8k[:5]
gsm8k_answers = gsm8k_answers[:5]

In [18]:
async with aiofiles.open('gsm8k_TESTASYNC.txt', 'w') as f:
    await f.write('')

# Clear out old checking_gsm8k.txt so we have a fresh file
async with aiofiles.open('checking_gsm8k_TESTASYNC.txt', 'w') as f:
    await f.write('')

print("RUNNING ACCURACY TESTS:")

# We'll create the aiohttp session once and reuse it.
async with aiohttp.ClientSession() as session:
    tasks = []
    for i in range(len(gsm8k_questions)):
        tasks.append(
            asyncio.create_task(
                handle_question_async(
                    session,
                    gsm8k_questions[i],
                    gsm8k_answers[i],
                    'gsm8k_TESTASYNC.txt',
                    'checking_gsm8k_TESTASYNC.txt',
                    model="anthropic/claude-3.5-sonnet",
                    delay=1
                )
            )
        )
    
    # Run all question checks concurrently
    results = await asyncio.gather(*tasks)
    # Count how many came back True
    total_correct = sum(results)
    print("ACCURACY for gsm8k:", total_correct / len(gsm8k_answers))

RUNNING ACCURACY TESTS:


TypeError: handle_question_async() got multiple values for argument 'model'

In [25]:
math500_questions = math500[:20]
math500_answers = math500_answers[:20]

In [None]:
async def run_accuracy_test_async(
    questions: list[str], 
    answers: list[str], 
    dataset: str, 
    model: str,
    max_samples: int = None,
    return_details: bool = True
) -> dict:
    """
    Asynchronously runs an accuracy test on a dataset with a specific model.
    
    Args:
        questions: List of questions to evaluate
        answers: List of correct answers
        dataset: Name of the dataset (gsm8k, math500, etc.)
        model: Model identifier (e.g., "anthropic/claude-3-5-sonnet")
        max_samples: Maximum number of samples to evaluate (useful for testing)
        return_details: Whether to return detailed results or just the accuracy score
        
    Returns:
        Dictionary containing the test results
    """
    # Create a unique run ID
    run_id = f"async_{int(time.time())}"
    
    # Limit the number of samples if specified
    if max_samples is not None:
        questions = questions[:max_samples]
        answers = answers[:max_samples]
    
    print(f"RUNNING ASYNC ACCURACY TEST: {model} on {dataset}")
    print(f"Number of questions: {len(questions)}")
    
    # Create directories
    base_dir = Path("results") / dataset
    output_dir = base_dir / "regular"
    metadata_dir = base_dir / "metadata"
    output_dir.mkdir(exist_ok=True, parents=True)
    metadata_dir.mkdir(exist_ok=True, parents=True)
    
    # We'll create the aiohttp session once and reuse it
    async with aiohttp.ClientSession() as session:
        # Create task list
        tasks = []
        for i in range(len(questions)):
            tasks.append(
                asyncio.create_task(
                    handle_question_async(
                        session,
                        questions[i],
                        answers[i],
                        dataset,
                        model=model,
                        run_id=run_id,
                        delay=i % 3  # Stagger requests slightly
                    )
                )
            )
        
        # Run all question checks concurrently
        print("Waiting for all tasks to complete...")
        results = await asyncio.gather(*tasks)
        
        # Count how many came back True
        total_correct = sum(results)
        accuracy = total_correct / len(questions)
        
        # Build result summary
        wrong_indices = [i for i, correct in enumerate(results) if not correct]
        summary = {
            "accuracy": accuracy,
            "run_id": run_id,
            "model": model,
            "dataset": dataset,
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
            "num_questions": len(questions),
            "num_correct": total_correct,
            "wrong_indices": wrong_indices
        }
        
        # Add detailed results if requested
        if return_details:
            detailed_results = []
            for i, (question, answer, is_correct) in enumerate(zip(questions, answers, results)):
                detailed_results.append({
                    "index": i,
                    "question": question,
                    "correct_answer": str(answer),
                    "is_correct": is_correct
                })
            summary["results"] = detailed_results
        
        # Save the summary as JSON
        async with aiofiles.open(metadata_dir / f"accuracy_async_{model.split('/')[-1]}_{run_id}.json", 'w') as f:
            await f.write(json.dumps(summary, indent=2))
        
        print(f"ACCURACY for {model} on {dataset}: {accuracy:.4f}")
        
        return summary

In [None]:
# Example: Running an async accuracy test
async def run_gsm8k_test():
    result = await run_accuracy_test_async(
        gsm8k,  # Questions
        gsm8k_answers,  # Answers
        "gsm8k",  # Dataset name
        "anthropic/claude-3-5-sonnet",  # Model to use
        max_samples=5  # Limit to 5 samples for a quick test
    )
    return result

# To run this: await run_gsm8k_test()

In [None]:
async def run_fail_at_k_test_async_example():
    result = await run_fail_at_k_test_async(
        gsm8k,  # Questions
        gsm8k_answers,  # Answers
        "gsm8k",  # Dataset name
        "anthropic/claude-3-5-sonnet",  # Model to use
        k=2,  # Number of attempts per question
        max_samples=3  # Limit to 3 samples for a quick test
    )
    return result

# To run this: await run_fail_at_k_test_async_example()

# Function to run a comprehensive experiment across multiple models and k values
def run_full_experiment(dataset="math500", models=None, k_values=None, max_samples=20):
    """
    Run a comprehensive experiment on a dataset with multiple models and k values.
    
    Args:
        dataset: Name of the dataset to test on
        models: List of model identifiers to test
        k_values: List of k values to test for fail@k
        max_samples: Maximum number of samples to evaluate
    """
    # Default values
    models = models or ["anthropic/claude-3-5-sonnet", "openai/gpt-4o", "google/gemini-1.5-pro"]
    k_values = k_values or [1, 2, 4]
    
    # Load the dataset
    if dataset == "gsm8k":
        questions = gsm8k[:max_samples]
        answers = gsm8k_answers[:max_samples]
    elif dataset == "math500":
        questions = math500[:max_samples]
        answers = math500_answers[:max_samples]
    elif dataset == "gpqa":
        questions = gpqa[:max_samples]
        answers = gpqa_answers[:max_samples]
    else:
        print(f"Unknown dataset: {dataset}")
        return
    
    # Run regular accuracy tests for each model
    print(f"Running accuracy tests on {dataset} with {len(questions)} questions")
    for model in models:
        print(f"\nTesting {model}...")
        run_accuracy_test(
            questions, 
            answers, 
            dataset, 
            model,
            max_samples=max_samples
        )
    
    # Run fail@k tests for each model and k value
    print(f"\nRunning fail@k tests on {dataset}")
    for model in models:
        for k in k_values:
            print(f"\nTesting {model} with k={k}...")
            run_fail_at_k_test(
                questions, 
                answers, 
                dataset, 
                model,
                k=k,
                max_samples=max_samples
            )
    
    # Visualize the results
    print("\nGenerating visualizations...")
    visualize_results(dataset=dataset)
    
    print(f"\nExperiment completed for {dataset}")

# Example usage:
# run_full_experiment(dataset="gsm8k", max_samples=10)