**IMPORTS AND DEPENDENCIES**

In [None]:
import os
import json
import random
import requests
import pandas as pd
import re
import matplotlib.pyplot as plt
from tqdm import tqdm
import time
import numpy as np
import zipfile
import glob

**GLOBAL CONFIGURATION**

In [None]:
os.environ["GROQ_API_KEY"] = "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"
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
GROQ_ENDPOINT = "https://api.groq.com/openai/v1/chat/completions"

BASE_URL = "https://raw.githubusercontent.com/chen700564/RGB/master/data/"
s
INFO_INT_FILE = "en_int.json"
FACT_FILE = "en_fact.json"
EN_FILE = "en_refine.json"
ZH_FILE = "zh_refine.json"

In [None]:
# Evaluation parameters
SAMPLE_SIZE = 100  # Adjustable per evaluation
MAX_DOCS = 5
RANDOM_SEED = 42
TEMPERATURE = 0.1  # Easy to change - 0.0 (deterministic) to 1.0 (creative)

# RGB Noise Ratios - Following RGB paper methodology
NOISE_RATIOS = [0.0, 0.2, 0.4]  # Information Integration noise ratios
NOISE_RATIOS_FULL = [0.0, 0.2, 0.4, 0.6, 0.8]  # Full noise testing

In [None]:
MODELS_TO_EVALUATE = [
    "llama-3.1-8b-instant",
    "qwen/qwen3-32b",
    "llama-3.3-70b-versatile",
    "deepseek-r1-distill-llama-70b",
    "gemma2-9b-it"
]

In [None]:
CONFIG = {
    "min_delay": 0.5,
    "max_retries": 3,
    "timeout": 60,
    "max_context_length": 12000,
    "temperature": TEMPERATURE,
    "max_tokens": 512
}

LAST_API_CALL = 0

**UTILITY FUNCTIONS**

In [None]:
def set_seeds(seed_value=42):
    """Set random seeds for reproducibility"""
    random.seed(seed_value)
    np.random.seed(seed_value)
    print(f"Random seeds set to: {seed_value}")

def load_data(filename):
    """Load dataset from GitHub repository"""
    url = BASE_URL + filename
    response = requests.get(url)

    if response.status_code != 200:
        raise Exception(f"Failed to fetch {filename}. Status: {response.status_code}")

    text = response.text.strip()

    try:
        return json.loads(text)
    except json.JSONDecodeError:
        data = []
        for line in text.splitlines():
            line = line.strip()
            if line:
                try:
                    data.append(json.loads(line))
                except json.JSONDecodeError:
                    continue
        return data

def truncate_documents(docs, max_length=3500):
    """Truncate documents to fit context limits"""
    if not docs:
        return []

    context = "\n\n".join(docs)
    if len(context) <= max_length:
        return docs

    max_doc_length = max_length // len(docs)
    truncated_docs = []

    for doc in docs:
        if len(doc) > max_doc_length:
            truncated = doc[:max_doc_length]
            last_period = truncated.rfind('.')
            if last_period > max_doc_length * 0.8:
                truncated_docs.append(doc[:last_period + 1])
            else:
                truncated_docs.append(doc[:max_doc_length] + "...")
        else:
            truncated_docs.append(doc)

    return truncated_docs

def query_model(question, docs, model, language="en", system_prompt=None):
    """Universal model query function with error handling"""
    global LAST_API_CALL

    if not docs:
        return "API_ERROR: No documents provided"

    # Rate limiting
    elapsed = time.time() - LAST_API_CALL
    if elapsed < CONFIG["min_delay"]:
        time.sleep(CONFIG["min_delay"] - elapsed)

    # Prepare documents
    truncated_docs = truncate_documents(docs, CONFIG["max_context_length"])
    docs_text = "\n\n".join([f"Document {i+1}:\n{doc}" for i, doc in enumerate(truncated_docs)])

    # Use provided system prompt or default
    if system_prompt is None:
        if language == "en":
            system_prompt = (
                "Answer the question based on the given documents. "
                "If not found, say: 'I can not answer the question because of the insufficient information in documents.'"
            )
        else:
            system_prompt = "请根据以下文档回答问题。如果无法回答，请回复：我无法回答这个问题，因为文档中没有足够的信息。"

    user_message = f"Document:\n{docs_text}\n\nQuestion:\n{question}"

    payload = {
        "model": model,
        "messages": [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_message}
        ],
        "temperature": CONFIG["temperature"],
        "max_tokens": CONFIG["max_tokens"]
    }

    headers = {
        "Authorization": f"Bearer {GROQ_API_KEY}",
        "Content-Type": "application/json"
    }

    for attempt in range(CONFIG["max_retries"]):
        try:
            response = requests.post(
                GROQ_ENDPOINT,
                headers=headers,
                json=payload,
                timeout=CONFIG["timeout"]
            )

            LAST_API_CALL = time.time()
            response.raise_for_status()
            data = response.json()

            if "choices" not in data or len(data["choices"]) == 0:
                if attempt == CONFIG["max_retries"] - 1:
                    return "API_ERROR: Invalid response structure"
                continue

            content = data["choices"][0]["message"]["content"].strip()
            return content if content else "API_ERROR: Empty response"

        except requests.exceptions.Timeout:
            if attempt == CONFIG["max_retries"] - 1:
                return "API_ERROR: Timeout"
            time.sleep(2 ** attempt)

        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                if attempt == CONFIG["max_retries"] - 1:
                    return "API_ERROR: Rate limited"
                time.sleep(5 * (attempt + 1))
            elif e.response.status_code == 413:
                return "API_ERROR: Context too long"
            else:
                return f"API_ERROR: HTTP {e.response.status_code}"

        except Exception as e:
            if attempt == CONFIG["max_retries"] - 1:
                return f"API_ERROR: {str(e)}"
            time.sleep(1)

    return "API_ERROR: Max retries exceeded"

def safe_sample(population, k):
    """Safely sample k items from population, handling edge cases"""
    if not population:
        return []
    if k <= 0:
        return []
    if len(population) <= k:
        return population[:]
    return random.sample(population, k)

**INFORMATION INTEGRATION EVALUATION**

In [None]:
def get_information_integration_system_prompt():
    """RGB system instruction for Information Integration"""
    return """You are an accurate and reliable AI assistant that can answer questions with the help of external documents. Please note that external documents may contain noisy or factually incorrect information. If the information in the document contains the correct answer, you will give an accurate answer. If the information in the document does not contain the answer, you will generate 'I can not answer the question because of the insufficient information in documents.' If there are inconsistencies with the facts in some of the documents, please generate the response 'There are factual errors in the provided documents.' and provide the correct answer."""

def prepare_documents(positive_docs, negative_docs, noise_ratio=0.0, max_docs=5):
    """
    Prepare documents as per RGB methodology
    noise_ratio: proportion of negative documents (0.0 = no noise)
    """
    all_positive = []
    for doc_group in positive_docs:
        if isinstance(doc_group, list):
            all_positive.extend(doc_group)
        else:
            all_positive.append(doc_group)

    all_negative = []
    for doc in negative_docs:
        all_negative.append(doc)

    # Select documents total
    num_negative = int(max_docs * noise_ratio)
    num_positive = max_docs - num_negative

    # Safely sample documents
    selected_positive = safe_sample(all_positive, min(num_positive, len(all_positive)))
    selected_negative = safe_sample(all_negative, min(num_negative, len(all_negative)))

    # Fill remaining slots if needed
    while len(selected_positive) + len(selected_negative) < max_docs:
        if len(all_positive) > len(selected_positive):
            selected_positive.append(random.choice(all_positive))
        elif len(all_negative) > len(selected_negative):
            selected_negative.append(random.choice(all_negative))
        else:
            break

    # Combine and shuffle
    all_docs = selected_positive + selected_negative
    random.shuffle(all_docs)

    return all_docs[:max_docs]

def evaluate_information_integration(response, expected_answers):
    """
    Evaluate Information Integration response
    expected_answers format: [answer_list1, answer_list2, ...]
    """
    if "API_ERROR" in response:
        return "ERROR"

    response_lower = response.lower()

    # Check if response contains rejection
    rejection_phrases = [
        "insufficient information",
        "cannot answer",
        "can not answer",
        "not enough information",
        "unable to answer",
        "i can not answer the question because of the insufficient information in documents"
    ]

    if any(phrase in response_lower for phrase in rejection_phrases):
        return "REJECTED"

    # Check factual errors claim
    factual_errors_patterns = [
        "there are factual errors in the provided documents",
        "factual errors in the provided documents"
    ]

    has_factual_errors_claim = any(pattern in response_lower for pattern in factual_errors_patterns)

    # Check each answer component
    components_found = 0

    for answer_group in expected_answers:
        component_found = False
        for answer_variant in answer_group:
            if answer_variant.lower() in response_lower:
                component_found = True
                break
        if component_found:
            components_found += 1

    total_components = len(expected_answers)

    # Determine evaluation based on components found
    if components_found == total_components:
        return "FULL_SUCCESS"
    elif components_found > 0:
        return "PARTIAL_SUCCESS"
    else:
        if has_factual_errors_claim:
            return "FACTUAL_ERROR_DETECTED"
        else:
            return "FAILURE"

def evaluate_rgb_information_integration(dataset, model, sample_size, max_docs, noise_ratio):
    """RGB paper-style Information Integration evaluation for single noise ratio"""
    print(f"\nEvaluating {model} for RGB Information Integration (Noise: {noise_ratio})")

    # Filter samples with required document types
    filtered_samples = []
    for sample in dataset:
        if (sample.get("positive") and sample.get("negative") and sample.get("answer")):
            # Check minimum document requirements
            if (len(sample.get("positive", [])) >= 2 and
                len(sample.get("negative", [])) >= 2):
                filtered_samples.append(sample)
        if len(filtered_samples) >= sample_size:
            break

    if len(filtered_samples) < sample_size:
        print(f"Warning: Only found {len(filtered_samples)} quality samples (target: {sample_size})")

    actual_sample_size = min(sample_size, len(filtered_samples))
    samples_to_process = filtered_samples[:actual_sample_size]

    results = []
    stats = {
        "FULL_SUCCESS": 0,
        "PARTIAL_SUCCESS": 0,
        "FAILURE": 0,
        "REJECTED": 0,
        "FACTUAL_ERROR_DETECTED": 0,
        "ERROR": 0,
        "total": 0
    }

    # Progress tracking
    progress_bar = tqdm(total=actual_sample_size, desc=f"{model} (noise={noise_ratio})")

    for i, sample in enumerate(samples_to_process):
        try:
            query = sample["query"]
            expected_answers = sample["answer"]

            # Prepare documents with noise ratio
            docs = prepare_documents(
                sample["positive"],
                sample["negative"],
                noise_ratio=noise_ratio,
                max_docs=max_docs
            )

            if not docs:
                continue

            # Query model
            system_prompt = get_information_integration_system_prompt()
            response = query_model(query, docs, model, system_prompt=system_prompt)
            evaluation = evaluate_information_integration(response, expected_answers)

            # Update statistics
            stats[evaluation] += 1
            stats["total"] += 1

            # Store result
            result = {
                "sample_id": sample.get('id', i + 1),
                "model": model,
                "query": query,
                "response": response,
                "evaluation": evaluation,
                "expected_answers": expected_answers,
                "components_expected": len(expected_answers),
                "noise_ratio": noise_ratio,
                "docs_used": len(docs),
                "temperature": CONFIG["temperature"],
                "documents": " ||| ".join(docs[:2])
            }

            results.append(result)
            progress_bar.update(1)

        except Exception as e:
            print(f"\nError processing sample {i}: {e}")
            continue

    progress_bar.close()

    # Calculate and display metrics
    total = stats["total"]
    if total > 0:
        full_success_rate = (stats["FULL_SUCCESS"] / total * 100)
        partial_success_rate = (stats["PARTIAL_SUCCESS"] / total * 100)

        print(f"  Noise {noise_ratio}: Full: {full_success_rate:5.1f}%, "
              f"Partial: {partial_success_rate:5.1f}%, "
              f"Total: {full_success_rate + partial_success_rate:5.1f}%")

    return stats, results

**FACTUAL ACCURACY EVALUATION**

In [None]:
def get_factual_system_prompt():
    return """You are an accurate and reliable AI assistant that can answer questions with the help of external documents. Please note that external documents may contain noisy or factually incorrect information. If the information in the document contains the correct answer, you will give an accurate answer. If the information in the document does not contain the answer, you will generate 'I can not answer the question because of the insufficient information in documents.'. If there are inconsistencies with the facts in some of the documents, please generate the response 'There are factual errors in the provided documents.' and provide the correct answer."""

def normalize_answer(answer):
    """Convert answer to string format, handling lists and other types"""
    if answer is None:
        return ""

    if isinstance(answer, list):
        if len(answer) > 0:
            return str(answer[0])
        else:
            return ""

    return str(answer)

def evaluate_answer(response, correct_answer, fake_answer):
    """Enhanced evaluation function fixing external knowledge detection and logic issues"""
    if "API_ERROR" in response:
        return "ERROR"

    # Normalize answers
    correct_answer = normalize_answer(correct_answer)
    fake_answer = normalize_answer(fake_answer)

    response_lower = response.lower()
    correct_lower = correct_answer.lower()
    fake_lower = fake_answer.lower()

    # ENHANCED external knowledge patterns (CRITICAL FIX)
    external_knowledge_patterns = [
        "based on external knowledge",
        "based on general knowledge",
        "according to external information",
        "based on external sources",
        "according to multiple reliable sources",
        "based on multiple sources",
        "multiple reliable sources",
        "however, the correct answer is",
        "but actually",
        "in reality",
        "the actual winner was",
        "according to multiple sources",
        "based on common knowledge",
        "from what i know",
        "as far as i know",
        "to my knowledge",
        "however, based on",
        "but the actual",
        "the real answer is",
        "according to external",
        "based on external",
        "external knowledge",
        "multiple sources",
        "reliable sources"
    ]

    # CHECK EXTERNAL KNOWLEDGE FIRST (before other evaluations)
    uses_external_knowledge = any(pattern in response_lower for pattern in external_knowledge_patterns)
    if uses_external_knowledge:
        return "REJECTED"  # Model cheated by using external knowledge

    # Check for structured responses from the prompt
    insufficient_info_patterns = [
        "i can not answer the question because of the insufficient information in documents",
        "i cannot answer the question because of the insufficient information in documents",
        "i can not answer the question because of insufficient information in documents",
        "i cannot answer the question because of insufficient information in documents",
        "insufficient information in documents",
        "i can not answer",
        "i cannot answer"
    ]

    factual_errors_patterns = [
        "there are factual errors in the provided documents",
        "factual errors in the provided documents",
        "there are factual errors",
        "factual errors"
    ]

    # Check for different response types
    is_insufficient_info = any(pattern in response_lower for pattern in insufficient_info_patterns)
    has_factual_errors_claim = any(pattern in response_lower for pattern in factual_errors_patterns)

    # Check for answers
    has_correct = correct_lower in response_lower if correct_lower else False
    has_fake = fake_lower in response_lower if fake_lower else False

    # ENHANCED evaluation logic
    if is_insufficient_info:
        return "REJECTED"
    elif has_factual_errors_claim:
        # Model detected errors - IMPROVED logic
        if has_correct and not has_fake:
            return "CORRECT"  # Detected errors, gave correct answer
        elif has_correct and has_fake:
            # If model detected errors AND provided correct answer prominently, count as CORRECT
            # Check if correct answer appears more prominently or is stated as the final answer
            response_parts = response_lower.split('.')
            for part in reversed(response_parts[-3:]):  # Check last 3 sentences
                if correct_lower in part and fake_lower not in part:
                    return "CORRECT"  # Correct answer stated clearly at end
            return "PARTIAL"  # Both answers present without clear preference
        elif has_fake and not has_correct:
            return "INCORRECT"  # Detected errors, but gave wrong answer
        else:
            return "REJECTED"  # Detected errors, gave no clear answer
    else:
        # Standard evaluation for direct answers
        if has_correct and not has_fake:
            return "CORRECT"
        elif has_fake and not has_correct:
            return "INCORRECT"
        elif has_correct and has_fake:
            return "PARTIAL"
        else:
            return "NO_ANSWER"

def create_document_sets(sample, max_docs):
    """Create different document sets for RGB evaluation with robust sampling"""
    positive_docs = sample.get("positive", [])
    negative_docs = sample.get("negative", [])
    wrong_docs = sample.get("positive_wrong", [])

    document_sets = {}

    # All positive (supporting evidence) - need at least 3 docs
    if len(positive_docs) >= 3:
        sample_size = min(max_docs, len(positive_docs))
        document_sets["all_positive"] = safe_sample(positive_docs, sample_size)

    # All negative (irrelevant) - need at least 3 docs
    if len(negative_docs) >= 3:
        sample_size = min(max_docs, len(negative_docs))
        document_sets["all_negative"] = safe_sample(negative_docs, sample_size)

    # All wrong (misinformation) - need at least 3 docs
    if len(wrong_docs) >= 3:
        sample_size = min(max_docs, len(wrong_docs))
        document_sets["all_wrong"] = safe_sample(wrong_docs, sample_size)

    # Mixed (realistic scenario) - need at least max_docs total
    all_docs = positive_docs + negative_docs + wrong_docs
    if len(all_docs) >= max_docs:
        document_sets["mixed"] = safe_sample(all_docs, max_docs)

    # Positive + Wrong mix (challenging scenario) - need at least 2 of each
    if len(positive_docs) >= 2 and len(wrong_docs) >= 2:
        # Take 2-3 positive and 2-3 wrong to make max_docs total
        pos_count = min(3, len(positive_docs), max_docs // 2 + 1)
        wrong_count = min(max_docs - pos_count, len(wrong_docs))

        if pos_count + wrong_count >= 4:  # Minimum viable set
            pos_subset = safe_sample(positive_docs, pos_count)
            wrong_subset = safe_sample(wrong_docs, wrong_count)
            combined = pos_subset + wrong_subset
            random.shuffle(combined)  # Randomize order
            document_sets["pos_wrong_mix"] = combined

    return document_sets

def evaluate_rgb_factual_accuracy(dataset, model, sample_size, max_docs):
    """RGB paper-style factual accuracy evaluation"""
    print(f"\nEvaluating {model} for RGB factual accuracy")
    print(f"Sample size: {sample_size}, Max docs per question: {max_docs}")

    # Filter samples with required document types
    filtered_samples = []
    for sample in dataset:
        if (sample.get("positive") and sample.get("negative") and
            sample.get("positive_wrong") and sample.get("answer") and sample.get("fakeanswer")):
            # Check minimum document requirements
            if (len(sample.get("positive", [])) >= 2 and
                len(sample.get("negative", [])) >= 2 and
                len(sample.get("positive_wrong", [])) >= 2):
                filtered_samples.append(sample)
        if len(filtered_samples) >= sample_size:
            break

    if len(filtered_samples) < sample_size:
        print(f"Warning: Only found {len(filtered_samples)} quality samples (target: {sample_size})")

    actual_sample_size = min(sample_size, len(filtered_samples))
    samples_to_process = filtered_samples[:actual_sample_size]

    results = []
    scenario_stats = {}

    # Initialize statistics
    scenarios = ["all_positive", "all_negative", "all_wrong", "mixed", "pos_wrong_mix"]
    eval_types = ["CORRECT", "INCORRECT", "REJECTED", "PARTIAL", "NO_ANSWER", "ERROR"]

    for scenario in scenarios:
        scenario_stats[scenario] = {eval_type: 0 for eval_type in eval_types}
        scenario_stats[scenario]["total"] = 0

    # Count available scenarios first
    available_scenarios = set()
    total_evaluations = 0

    for sample in samples_to_process:
        document_sets = create_document_sets(sample, max_docs)
        available_scenarios.update(document_sets.keys())
        total_evaluations += len(document_sets)

    print(f"Available scenarios: {sorted(available_scenarios)}")
    print(f"Total evaluations: {total_evaluations}")

    # Progress tracking
    progress_bar = tqdm(total=total_evaluations, desc=f"{model}")

    for i, sample in enumerate(samples_to_process):
        try:
            query = sample["query"]
            document_sets = create_document_sets(sample, max_docs)

            for scenario_name, docs in document_sets.items():
                if not docs:  # Skip empty document sets
                    continue

                # Query model
                system_prompt = get_factual_system_prompt()
                response = query_model(query, docs, model, system_prompt=system_prompt)
                evaluation = evaluate_answer(response, sample["answer"], sample["fakeanswer"])

                # Update statistics
                scenario_stats[scenario_name][evaluation] += 1
                scenario_stats[scenario_name]["total"] += 1

                # Store result
                result = {
                    "sample_id": i + 1,
                    "model": model,
                    "query": query,
                    "scenario": scenario_name,
                    "response": response,
                    "evaluation": evaluation,
                    "correct_answer": normalize_answer(sample["answer"]),
                    "fake_answer": normalize_answer(sample["fakeanswer"]),
                    "docs_used": len(docs),
                    "documents": " ||| ".join(docs[:2])
                }

                results.append(result)
                progress_bar.update(1)

        except Exception as e:
            print(f"\nError processing sample {i}: {e}")
            continue

    progress_bar.close()

    # Calculate and display metrics
    print(f"\n{model} RGB Results:")
    print("-" * 60)

    for scenario in scenarios:
        stats = scenario_stats[scenario]
        total = stats["total"]

        if total > 0:
            correct = stats["CORRECT"]
            incorrect = stats["INCORRECT"]
            rejected = stats["REJECTED"]
            partial = stats["PARTIAL"]
            no_answer = stats["NO_ANSWER"]
            error = stats["ERROR"]

            accuracy = (correct / total * 100)
            error_rate = (incorrect / total * 100)
            rejection_rate = (rejected / total * 100)

            print(f"{scenario:15s}: Acc: {accuracy:5.1f}% ({correct:2d}/{total:2d}), "
                  f"Err: {error_rate:5.1f}%, Rej: {rejection_rate:5.1f}%")
        else:
            print(f"{scenario:15s}: No samples available")

    return scenario_stats, results

**NOISE ROBUSTNESS EVALUATION**

In [None]:
def extract_answers(answer_structure, language):
    """Extract answers from data structure"""
    answers = []

    for item in answer_structure:
        if isinstance(item, list):
            answers.extend(item)
        elif isinstance(item, str):
            answers.append(item)

    if language == "en":
        return [ans.lower().strip() for ans in answers]
    else:
        return [ans.strip() for ans in answers]

def add_noise(positive_docs, negative_docs, noise_ratio, sample_id=0):
    """Add noise documents based on ratio with consistent seeding"""
    # Set sample-specific seed to ensure same question gets same docs across noise ratios
    sample_seed = 42 + sample_id
    random.seed(sample_seed)
    np.random.seed(sample_seed)

    if noise_ratio == 0:
        selected_docs = random.sample(positive_docs, min(MAX_DOCS, len(positive_docs)))
        while len(selected_docs) < MAX_DOCS and len(selected_docs) < len(positive_docs):
            remaining = [doc for doc in positive_docs if doc not in selected_docs]
            if remaining:
                selected_docs.append(random.choice(remaining))
            else:
                break
    else:
        num_negatives = int(MAX_DOCS * noise_ratio)
        num_positives = MAX_DOCS - num_negatives

        selected_docs = []

        if num_positives > 0 and positive_docs:
            pos_sample = random.sample(positive_docs, min(num_positives, len(positive_docs)))
            selected_docs.extend(pos_sample)

        if num_negatives > 0 and negative_docs:
            neg_sample = random.sample(negative_docs, min(num_negatives, len(negative_docs)))
            selected_docs.extend(neg_sample)

        while len(selected_docs) < MAX_DOCS and len(selected_docs) < len(positive_docs):
            remaining_pos = [doc for doc in positive_docs if doc not in selected_docs]
            if remaining_pos:
                selected_docs.append(random.choice(remaining_pos))
            else:
                break

    selected_docs = selected_docs[:MAX_DOCS]

    # Use noise-ratio specific seed for shuffling
    shuffle_seed = sample_seed + int(noise_ratio * 100)
    random.seed(shuffle_seed)
    random.shuffle(selected_docs)

    return selected_docs

def check_correct(predicted, true_answers, language):
    """General answer checking without hardcoding"""
    if not predicted.strip() or "API_ERROR" in predicted:
        return False

    rejection_phrases = [
        "i can not answer", "insufficient information", "cannot answer",
        "don't know", "not enough information", "unable to answer",
        "not found", "unclear", "not available", "not provided"
    ]

    if language == "zh":
        rejection_phrases.extend([
            "无法回答", "信息不足", "不知道", "无法", "不能回答",
            "文档中没有", "不确定", "不清楚", "没有提供", "找不到"
        ])

    predicted_lower = predicted.lower() if language == "en" else predicted
    for phrase in rejection_phrases:
        if phrase in predicted_lower:
            return False

    processed_pred = predicted.lower().strip() if language == "en" else predicted.strip()

    for ans in true_answers:
        if len(ans) >= 1:
            ans_clean = ans.lower().strip() if language == "en" else ans.strip()

            # Strategy 1: Direct substring matching
            if ans_clean in processed_pred:
                return True

            # Strategy 2: Extract all tokens and numbers
            def extract_tokens(text):
                tokens = re.findall(r'\b\w+\b', text)
                return [t for t in tokens if len(t) > 0]

            pred_tokens = extract_tokens(processed_pred)
            ans_tokens = extract_tokens(ans_clean)

            # Check if all answer tokens appear in prediction
            if ans_tokens and all(token in pred_tokens for token in ans_tokens):
                return True

            # Strategy 3: Number matching
            pred_numbers = re.findall(r'\d+(?:\.\d+)?', processed_pred)
            ans_numbers = re.findall(r'\d+(?:\.\d+)?', ans_clean)

            if ans_numbers and all(num in pred_numbers for num in ans_numbers):
                return True

            # Strategy 4: Word boundary matching for short answers
            if len(ans_clean) <= 4:
                pattern = r'\b' + re.escape(ans_clean) + r'\b'
                if re.search(pattern, processed_pred):
                    return True

            # Strategy 5: Multi-word partial matching
            if len(ans_clean.split()) > 1:
                ans_words = [w for w in ans_clean.split() if len(w) > 2]
                if ans_words and all(word in processed_pred for word in ans_words):
                    return True

    return False

def eval_noise_test(dataset, model, language, sample_size, base_seed=42):
    """Evaluate model with noise testing using consistent seeding"""
    results = {}
    details = []

    # Set initial seed for dataset filtering
    set_seeds(base_seed)

    filtered_dataset = []
    for sample in dataset[:sample_size]:
        if (len(sample.get('positive', [])) >= 2 and
            len(sample.get('negative', [])) >= 3):
            filtered_dataset.append(sample)

    print(f"Filtered to {len(filtered_dataset)} quality samples from {min(sample_size, len(dataset))}")

    total_evaluations = len(NOISE_RATIOS_FULL) * len(filtered_dataset)
    progress_bar = tqdm(total=total_evaluations, desc=f"{model} {language.upper()}")

    for ratio in NOISE_RATIOS_FULL:
        correct = 0
        total = len(filtered_dataset)
        api_errors = 0

        for i, sample in enumerate(filtered_dataset):
            question = sample["query"]
            true_answers = extract_answers(sample["answer"], language)

            # Use sample ID to ensure consistent document selection
            docs = add_noise(sample["positive"], sample["negative"], ratio, sample_id=i)
            predicted = query_model(question, docs, model, language)

            if "API_ERROR" in predicted:
                api_errors += 1
                is_correct = False
            else:
                is_correct = check_correct(predicted, true_answers, language)

            if is_correct:
                correct += 1

            neg_docs_used = sum(1 for doc in docs if doc in sample['negative'])
            actual_noise_ratio = neg_docs_used / len(docs) if docs else 0

            details.append({
                "sample_id": i + 1,
                "model": model,
                "language": language,
                "noise_ratio": ratio,
                "actual_noise_ratio": actual_noise_ratio,
                "question": question,
                "predicted": predicted,
                "true_answers": "; ".join([str(ans) for ans in sample["answer"]]),
                "extracted_answers": "; ".join(true_answers),
                "correct": is_correct,
                "docs_used": len(docs),
                "pos_docs_available": len(sample["positive"]),
                "neg_docs_available": len(sample["negative"]),
                "pos_docs_used": sum(1 for doc in docs if doc in sample['positive']),
                "neg_docs_used": neg_docs_used
            })

            progress_bar.update(1)

        accuracy = (correct / total) * 100
        results[ratio] = accuracy

        error_info = f" (API errors: {api_errors})" if api_errors > 0 else ""
        print(f"{model} {language.upper()} | Noise {int(ratio*100)}%: {accuracy:.1f}% ({correct}/{total}){error_info}")

    progress_bar.close()
    return results, details

**NEGATIVE REJECTION EVALUATION**

In [None]:
def get_rgb_system_prompt():
    """Same simple system prompt as noise ratio test"""
    return (
        "Answer the question based on the given documents. "
        "If not found, say: 'I can not answer the question because of the insufficient information in documents.'"
    )

def check_exact_rejection(predicted):
    """RGB paper exact matching for rejection (Rej metric)"""
    if not predicted.strip() or "API_ERROR" in predicted:
        return False

    # Exact phrase from RGB paper
    exact_phrase = "I can not answer the question because of the insufficient information in documents."
    return exact_phrase.lower() in predicted.lower()

def check_semantic_rejection(predicted):
    """RGB paper semantic rejection detection (Rej* metric without ChatGPT)"""
    if not predicted.strip() or "API_ERROR" in predicted:
        return False

    # More flexible rejection indicators
    rejection_patterns = [
        r"can\s*not\s+answer.*insufficient",
        r"cannot\s+answer.*insufficient",
        r"unable\s+to\s+answer.*insufficient",
        r"don[\'']?t\s+have\s+enough.*information",
        r"not\s+enough\s+information",
        r"insufficient\s+information",
        r"information.*not.*sufficient",
        r"documents.*do\s+not\s+contain",
        r"no\s+relevant\s+information",
        r"cannot\s+find.*answer",
        r"unable\s+to\s+find.*answer",
        r"information.*missing",
        r"lack.*information"
    ]

    predicted_lower = predicted.lower()

    # Check for rejection patterns
    for pattern in rejection_patterns:
        if re.search(pattern, predicted_lower):
            return True

    # Check for answer-like content (if it contains specific facts, likely not a rejection)
    answer_indicators = [
        r"\b\d{4}\b",  # Years
        r"\b\d+\s*(million|billion|thousand)\b",  # Numbers with units
        r"\b[A-Z][a-z]+\s+[A-Z][a-z]+\b",  # Proper names
        r"according\s+to",  # Reference phrases
        r"based\s+on",
        r"the\s+answer\s+is",
        r"was\s+(born|awarded|won|defeated)"
    ]

    for pattern in answer_indicators:
        if re.search(pattern, predicted):
            return False  # Contains answer-like content, probably not a rejection

    return False

def should_be_rejected_rgb(question, docs):
    """
    RGB paper approach: For negative rejection testbed,
    ALL documents should be negative (noisy), so ALL should be rejected
    """
    return True  # In negative rejection testbed, all should be rejected

def evaluate_negative_rejection_rgb(dataset, model, sample_size, max_docs):
    """RGB paper-style negative rejection evaluation"""
    print(f"\nEvaluating {model} for RGB-style negative rejection")
    print(f"Sample size: {sample_size}, Max docs per question: {max_docs}")

    # Filter samples with sufficient negative documents
    filtered_samples = []
    for sample in dataset:
        negative_docs = sample.get('negative', [])
        if len(negative_docs) >= max_docs:
            filtered_samples.append(sample)
        if len(filtered_samples) >= sample_size:
            break

    if len(filtered_samples) < sample_size:
        print(f"Warning: Only found {len(filtered_samples)} quality samples (target: {sample_size})")

    actual_sample_size = min(sample_size, len(filtered_samples))
    samples_to_process = filtered_samples[:actual_sample_size]

    results = []
    exact_rejections = 0  # Rej metric
    semantic_rejections = 0  # Rej* metric
    api_errors = 0
    response_diversity = set()  # Track response diversity

    progress_bar = tqdm(samples_to_process, desc=f"{model}")

    for i, sample in enumerate(samples_to_process):
        question = sample["query"]
        negative_docs = sample.get("negative", [])

        # Sample ONLY negative documents (RGB negative rejection setup)
        docs = random.sample(negative_docs, min(max_docs, len(negative_docs)))

        # Get model response
        system_prompt = get_rgb_system_prompt()
        predicted = query_model(question, docs, model, system_prompt=system_prompt)

        if "API_ERROR" in predicted:
            api_errors += 1
            results.append({
                "sample_id": i + 1,
                "model": model,
                "question": question,
                "predicted": predicted,
                "exact_rejection": False,
                "semantic_rejection": False,
                "should_reject": True,
                "docs_used": len(docs),
                "documents": " ||| ".join(docs[:2])
            })
            continue

        # RGB evaluation metrics
        exact_rejection = check_exact_rejection(predicted)
        semantic_rejection = check_semantic_rejection(predicted)
        should_reject = should_be_rejected_rgb(question, docs)  # Always True for negative rejection

        if exact_rejection:
            exact_rejections += 1
        if semantic_rejection:
            semantic_rejections += 1

        # Track response diversity
        response_diversity.add(predicted.strip()[:100])  # First 100 chars

        results.append({
            "sample_id": i + 1,
            "model": model,
            "question": question,
            "predicted": predicted,
            "exact_rejection": exact_rejection,
            "semantic_rejection": semantic_rejection,
            "should_reject": should_reject,
            "docs_used": len(docs),
            "documents": " ||| ".join(docs[:2])
        })

        # Update progress
        valid_samples = len([r for r in results if "API_ERROR" not in r["predicted"]])
        if valid_samples > 0:
            exact_rate = (exact_rejections / valid_samples) * 100
            semantic_rate = (semantic_rejections / valid_samples) * 100
            progress_bar.set_postfix({
                'Exact': f'{exact_rate:.1f}%',
                'Semantic': f'{semantic_rate:.1f}%',
                'Errors': api_errors
            })

        progress_bar.update(1)

    progress_bar.close()

    # Calculate final metrics
    valid_results = [r for r in results if "API_ERROR" not in r["predicted"]]
    total_questions = len(valid_results)

    if total_questions == 0:
        print(f"No valid results for {model} due to API errors")
        return {"exact_rate": 0, "semantic_rate": 0, "diversity": 0}, results

    exact_rejection_rate = (exact_rejections / total_questions) * 100
    semantic_rejection_rate = (semantic_rejections / total_questions) * 100
    diversity_score = len(response_diversity)

    print(f"\n{model} Results (RGB-style):")
    print(f"  Total Questions: {total_questions}")
    print(f"  Exact Rejections (Rej): {exact_rejections} ({exact_rejection_rate:.1f}%)")
    print(f"  Semantic Rejections (Rej*): {semantic_rejections} ({semantic_rejection_rate:.1f}%)")
    print(f"  Response Diversity: {diversity_score} unique responses")
    if api_errors > 0:
        print(f"  API Errors: {api_errors}")

    return {
        "exact_rate": exact_rejection_rate,
        "semantic_rate": semantic_rejection_rate,
        "diversity": diversity_score
    }, results

**RESULT SAVING AND ANALYSIS FUNCTIONS**

In [None]:
def save_rgb_results_by_model_and_noise(summary_data, detail_data, output_prefix):
    """Save RGB evaluation results separated by model and including noise analysis"""
    if not summary_data or not detail_data:
        print("No data to save")
        return None, None

    summary_df = pd.DataFrame(summary_data)
    detail_df = pd.DataFrame(detail_data)

    # Get unique models
    models = summary_df['model'].unique() if len(summary_df) > 0 else detail_df['model'].unique()

    saved_files = {"summary": [], "detail": [], "analysis": []}

    # Save files for each model individually
    for model in models:
        # Clean model name for filename
        safe_model_name = model.replace("/", "_").replace("-", "_")

        # Filter data for current model
        model_summary = summary_df[summary_df['model'] == model] if len(summary_df) > 0 else pd.DataFrame()
        model_detail = detail_df[detail_df['model'] == model]

        # Create filenames
        summary_file = f"{output_prefix}_{safe_model_name}_summary.csv"
        detail_file = f"{output_prefix}_{safe_model_name}_details.csv"

        # Save individual model files
        if len(model_summary) > 0:
            model_summary.to_csv(summary_file, index=False)
            saved_files["summary"].append(summary_file)

        if len(model_detail) > 0:
            model_detail.to_csv(detail_file, index=False)
            saved_files["detail"].append(detail_file)

        print(f"Saved {model} results:")
        print(f"  - Summary: {summary_file}")
        print(f"  - Details: {detail_file}")

    # Also save combined files
    combined_summary_file = f"{output_prefix}_all_models_summary.csv"
    combined_detail_file = f"{output_prefix}_all_models_details.csv"

    if len(summary_df) > 0:
        summary_df.to_csv(combined_summary_file, index=False)
    detail_df.to_csv(combined_detail_file, index=False)

    print(f"\nCombined files saved:")
    if len(summary_df) > 0:
        print(f"  - All Summary: {combined_summary_file}")
    print(f"  - All Details: {combined_detail_file}")

    return saved_files, {"summary": combined_summary_file, "detail": combined_detail_file}

def create_downloadable_package(output_prefix):
    """Create a downloadable zip package with all results"""
    zip_filename = f"{output_prefix}_complete_results.zip"

    with zipfile.ZipFile(zip_filename, 'w') as zipf:
        # Add all CSV files
        csv_files = glob.glob(f"{output_prefix}*.csv")
        for csv_file in csv_files:
            zipf.write(csv_file, os.path.basename(csv_file))

        # Add results directory if it exists
        results_dir = f"{output_prefix}_results"
        if os.path.exists(results_dir):
            for root, dirs, files in os.walk(results_dir):
                for file in files:
                    file_path = os.path.join(root, file)
                    arcname = os.path.relpath(file_path, os.path.dirname(results_dir))
                    zipf.write(file_path, arcname)

    print(f"\nDownloadable package created: {zip_filename}")
    return zip_filename

**MAIN EXECUTION FUNCTIONS**

In [None]:
def run_information_integration_evaluation():
    """Run Information Integration evaluation"""
    print("="*80)
    print("INFORMATION INTEGRATION EVALUATION")
    print("="*80)

    set_seeds(RANDOM_SEED)

    # Load data
    info_int_data = load_data(INFO_INT_FILE)
    print(f"Loaded {len(info_int_data)} Information Integration samples")

    all_summary_data = []
    all_detail_data = []

    for model in MODELS_TO_EVALUATE:
        for noise_ratio in NOISE_RATIOS:
            random.seed(RANDOM_SEED + int(noise_ratio * 10))

            stats, details = evaluate_rgb_information_integration(
                info_int_data, model, SAMPLE_SIZE, MAX_DOCS, noise_ratio
            )

            if stats["total"] > 0:
                all_summary_data.append({
                    "model": model,
                    "noise_ratio": noise_ratio,
                    "total_samples": stats["total"],
                    "full_success": stats["FULL_SUCCESS"],
                    "partial_success": stats["PARTIAL_SUCCESS"],
                    "failure": stats["FAILURE"],
                    "rejected": stats["REJECTED"],
                    "factual_error_detected": stats["FACTUAL_ERROR_DETECTED"],
                    "error": stats["ERROR"],
                    "full_success_rate": (stats["FULL_SUCCESS"] / stats["total"] * 100),
                    "partial_success_rate": (stats["PARTIAL_SUCCESS"] / stats["total"] * 100),
                    "overall_success_rate": ((stats["FULL_SUCCESS"] + stats["PARTIAL_SUCCESS"]) / stats["total"] * 100),
                    "failure_rate": (stats["FAILURE"] / stats["total"] * 100),
                    "rejection_rate": (stats["REJECTED"] / stats["total"] * 100),
                    "temperature": TEMPERATURE
                })

            all_detail_data.extend(details)

    # Save results
    if all_detail_data:
        save_rgb_results_by_model_and_noise(
            all_summary_data, all_detail_data, "rgb_information_integration"
        )
        return pd.DataFrame(all_summary_data), pd.DataFrame(all_detail_data)

    return None, None

def run_factual_accuracy_evaluation():
    """Run Factual Accuracy evaluation"""
    print("="*80)
    print("FACTUAL ACCURACY EVALUATION")
    print("="*80)

    set_seeds(RANDOM_SEED)

    # Load data
    fact_data = load_data(FACT_FILE)
    print(f"Loaded {len(fact_data)} factual accuracy samples")

    all_summary_data = []
    all_detail_data = []

    for model in MODELS_TO_EVALUATE:
        scenario_stats, details = evaluate_rgb_factual_accuracy(
            fact_data, model, SAMPLE_SIZE, MAX_DOCS
        )

        # Create summary data
        for scenario, stats in scenario_stats.items():
            if stats["total"] > 0:
                all_summary_data.append({
                    "model": model,
                    "scenario": scenario,
                    "total_samples": stats["total"],
                    "correct": stats["CORRECT"],
                    "incorrect": stats["INCORRECT"],
                    "rejected": stats["REJECTED"],
                    "partial": stats["PARTIAL"],
                    "no_answer": stats["NO_ANSWER"],
                    "error": stats["ERROR"],
                    "accuracy": (stats["CORRECT"] / stats["total"] * 100),
                    "error_rate": (stats["INCORRECT"] / stats["total"] * 100),
                    "rejection_rate": (stats["REJECTED"] / stats["total"] * 100)
                })

        all_detail_data.extend(details)

    # Save results
    if all_summary_data and all_detail_data:
        save_rgb_results_by_model_and_noise(
            all_summary_data, all_detail_data, "rgb_factual_accuracy"
        )
        return pd.DataFrame(all_summary_data), pd.DataFrame(all_detail_data)

    return None, None

def run_noise_robustness_evaluation():
    """Run Noise Robustness evaluation"""
    print("="*80)
    print("NOISE ROBUSTNESS EVALUATION")
    print("="*80)

    set_seeds(RANDOM_SEED)

    # Load data
    en_data = load_data(EN_FILE)
    zh_data = load_data(ZH_FILE)
    print(f"Loaded {len(en_data)} English samples, {len(zh_data)} Chinese samples")

    summary_data = []
    detail_data = []

    for model in MODELS_TO_EVALUATE:
        en_results, en_details = eval_noise_test(en_data, model, "en", SAMPLE_SIZE, base_seed=42)
        zh_results, zh_details = eval_noise_test(zh_data, model, "zh", SAMPLE_SIZE, base_seed=42)

        for ratio in NOISE_RATIOS_FULL:
            summary_data.append({
                "model": model,
                "language": "en",
                "noise_ratio": ratio,
                "accuracy": en_results.get(ratio, 0)
            })
            summary_data.append({
                "model": model,
                "language": "zh",
                "noise_ratio": ratio,
                "accuracy": zh_results.get(ratio, 0)
            })

        detail_data.extend(en_details)
        detail_data.extend(zh_details)

    # Save results
    summary_df = pd.DataFrame(summary_data)
    detail_df = pd.DataFrame(detail_data)

    summary_df.to_csv("noise_evaluation_summary.csv", index=False)
    detail_df.to_csv("noise_evaluation_details.csv", index=False)

    return summary_df, detail_df

def run_negative_rejection_evaluation():
    """Run Negative Rejection evaluation"""
    print("="*80)
    print("NEGATIVE REJECTION EVALUATION")
    print("="*80)

    set_seeds(RANDOM_SEED)

    # Load data
    en_data = load_data(EN_FILE)
    print(f"Loaded {len(en_data)} English samples")

    summary_data = []
    detail_data = []

    for model in MODELS_TO_EVALUATE:
        metrics, details = evaluate_negative_rejection_rgb(
            en_data, model, SAMPLE_SIZE, MAX_DOCS
        )

        if details:
            summary_data.append({
                "model": model,
                "exact_rejection_rate": metrics["exact_rate"],
                "semantic_rejection_rate": metrics["semantic_rate"],
                "response_diversity": metrics["diversity"],
                "samples_evaluated": len(details)
            })

            detail_data.extend(details)

    # Save results
    if summary_data:
        summary_df = pd.DataFrame(summary_data)
        detail_df = pd.DataFrame(detail_data)

        summary_df.to_csv("rgb_negative_rejection_summary.csv", index=False)
        detail_df.to_csv("rgb_negative_rejection_details.csv", index=False)

        return summary_df, detail_df

    return None, None

def run_all_evaluations():
    """Run all RGB evaluations"""
    print("RGB EVALUATION SUITE - COMPREHENSIVE TESTING")
    print("="*80)
    print(f"Models to evaluate: {MODELS_TO_EVALUATE}")
    print(f"Sample size per evaluation: {SAMPLE_SIZE}")
    print(f"Temperature: {TEMPERATURE}")
    print(f"Random seed: {RANDOM_SEED}")

    results = {}

    # Run all evaluations
    print("\n1. Running Information Integration Evaluation...")
    results['info_int'] = run_information_integration_evaluation()

    print("\n2. Running Factual Accuracy Evaluation...")
    results['fact_acc'] = run_factual_accuracy_evaluation()

    print("\n3. Running Noise Robustness Evaluation...")
    results['noise_rob'] = run_noise_robustness_evaluation()

    print("\n4. Running Negative Rejection Evaluation...")
    results['neg_rej'] = run_negative_rejection_evaluation()

    # Create comprehensive package
    print("\nCreating comprehensive results package...")
    create_downloadable_package("rgb_comprehensive_evaluation")

    print("\n" + "="*80)
    print("ALL EVALUATIONS COMPLETED")
    print("="*80)
    print("Check the generated CSV files and zip package for detailed results.")

    return results

**NOTEBOOK EXECUTION**

In [None]:
if __name__ == "__main__":
    # You can run individual evaluations or all at once

    # Option 1: Run all evaluations
    results = run_all_evaluations()

    # Option 2: Run individual evaluations (uncomment as needed)
    # info_int_summary, info_int_details = run_information_integration_evaluation()
    # fact_acc_summary, fact_acc_details = run_factual_accuracy_evaluation()
    # noise_rob_summary, noise_rob_details = run_noise_robustness_evaluation()
    # neg_rej_summary, neg_rej_details = run_negative_rejection_evaluation()