# Evaluation of VLM2-Bench

## GC

Both `mat` and `trk` use the same evaluation script.

In [1]:
import os
import json
import re
from collections import defaultdict

input_folder = "code/gc/test/test_res/test_mat" # mat or trk result folder

In [2]:
def load_jsonl(file_path):
    """Load a JSONL file."""
    with open(file_path, 'r', encoding='utf-8') as f:
        return [json.loads(line.strip()) for line in f]

def save_jsonl(data, file_path):
    """Save a list of dicts to a JSONL file."""
    with open(file_path, 'w', encoding='utf-8') as f:
        for item in data:
            f.write(json.dumps(item, ensure_ascii=False) + '\n')

def analyze_correct_answers(input_folder, output_folder):
    """
    Analyze T/F outputs of each model in input_folder:
    1. Process all jsonl files in the folder
    2. Group by q_id after reading
    3. Calculate accuracy for positive questions, negative questions, and both correct
    4. Save valid entries to output_folder
    """
    os.makedirs(output_folder, exist_ok=True)
    
    # Create correct and wrong subdirectories
    correct_dir = os.path.join(output_folder, 'correct')
    wrong_dir = os.path.join(output_folder, 'wrong')
    os.makedirs(correct_dir, exist_ok=True)
    os.makedirs(wrong_dir, exist_ok=True)

    # Iterate through all jsonl files in folder
    for file_name in os.listdir(input_folder):
        if not file_name.endswith('.jsonl'):
            continue

        file_path = os.path.join(input_folder, file_name)
        data = load_jsonl(file_path)

        # Group by q_id
        grouped_data = defaultdict(list)
        for item in data:
            grouped_data[item["q_id"]].append(item)

        # For statistics
        correct_p = 0  # Number of correct positive examples
        correct_n = 0  # Number of correct negative examples  
        correct_pairs = 0  # Number of pairs both correct
        total_pairs = len(grouped_data)

        # Output file paths
        model_name = os.path.splitext(file_name)[0]  # Use filename without extension
        correct_file = os.path.join(correct_dir, f'{model_name}_correct.jsonl')
        wrong_file = os.path.join(wrong_dir, f'{model_name}_wrong.jsonl')

        with open(correct_file, 'w', encoding='utf-8') as correct_f, \
             open(wrong_file, 'w', encoding='utf-8') as wrong_f:

            for omni_id, items in grouped_data.items():
                p_correct = False
                n_correct = False
                
                for item in items:
                    if item["gt_answer"] == "T" and item["model_answer"] == item["gt_answer"]:
                        p_correct = True
                    elif item["gt_answer"] == "F" and item["model_answer"] == item["gt_answer"]:
                        n_correct = True

                if p_correct:
                    correct_p += 1
                if n_correct:
                    correct_n += 1
                if p_correct and n_correct:
                    correct_pairs += 1
                    for item in items:
                        correct_f.write(json.dumps(item, ensure_ascii=False) + '\n')
                else:
                    for item in items:
                        wrong_f.write(json.dumps(item, ensure_ascii=False) + '\n')

        # Calculate three accuracy metrics
        positive_acc = correct_p / total_pairs if total_pairs > 0 else 0
        negative_acc = correct_n / total_pairs if total_pairs > 0 else 0
        total_acc = correct_pairs / total_pairs if total_pairs > 0 else 0

        # Print model's three accuracy metrics (two-line format)
        print(f"Model: {model_name}")
        print(f"Positive Accuracy: {positive_acc:.2%} ({correct_p}/{total_pairs}) | Negative Accuracy: {negative_acc:.2%} ({correct_n}/{total_pairs}) | Total Accuracy: {total_acc:.2%} ({correct_pairs}/{total_pairs})")

output_folder = os.path.join(input_folder, "real_correct")
analyze_correct_answers(input_folder, output_folder)
print("Done.")


Model: Qwen2.5-VL-7B-Instruct_std_answers
Positive Accuracy: 55.98% (145/259) | Negative Accuracy: 75.68% (196/259) | Total Accuracy: 35.91% (93/259)
Done.


## OC

### cpr

In [3]:
oc_cpr_input_folder = "code/oc/test/test_res/test_cpr" # result folder

In [4]:
def parse_tf_answer(model_answer):
    """
    Extract 'T' or 'F' from the tf type model_answer.
    Supports various formats such as 'T', 'F', 'True', 'False', 't', 'f', and sentences containing these words.
    If multiple 'T'/'F' are found, return None and mark as multiple answers.
    If no 'T'/'F' is found, return None and mark as no answer found.
    """
    # Define matching pattern to match 't', 'f', 'true', 'false'
    pattern = re.compile(r'\b(t|f|true|false)\b', re.IGNORECASE)
    matches = pattern.findall(model_answer)

    # Extract all matched answers
    extracted = [match.upper()[0] for match in matches]  # 'true' -> 'T', 'false' -> 'F'

    if len(extracted) == 1:
        return extracted[0], None  # Return the extracted single answer, no error
    elif len(extracted) > 1:
        return None, 'multiple_answers_found'  # Multiple answers found
    else:
        return None, 'no_answer_found'  # No answers found

def load_model_answers(model_answer_file):
    """
    Load the model answer file, grouping answers by main id, 
    where each main id contains answers for '_p' and '_n'.
    Returns a dictionary where the key is the main id and the value is a sub-dictionary containing 'p' and 'n'.
    """
    model_answers_dict = defaultdict(dict)
    with open(model_answer_file, 'r', encoding='utf-8') as f:
        for line in f:
            data = json.loads(line)
            q_id = data.get('q_id')
            if not q_id or '_' not in q_id:
                continue
            main_id, suffix = q_id.split('_', 1)
            if suffix not in ['p', 'n']:
                continue
            model_answers_dict[main_id][suffix] = {
                'model_answer': data.get('model_answer', '').strip(),
                'gt_answer': data.get('gt_answer', '').strip().upper()
            }
    return model_answers_dict

def evaluate_pair_correctness(model_answers_dict):
    """
    Evaluate the correctness of the model's main id answers.
    Count the correctness of positive (p), negative (n), and overall (both correct).
    """
    correct_p = 0  # Number of correct positive answers
    correct_n = 0  # Number of correct negative answers
    correct_pairs = 0  # Number of both correct answers
    total_pairs = 0  # Total pairs

    for main_id, suffix_dict in model_answers_dict.items():
        # Ensure both sub-questions exist
        if 'p' not in suffix_dict or 'n' not in suffix_dict:
            continue  # Skip incomplete pairs

        total_pairs += 1

        # Get answers for p and n
        model_answer_p = suffix_dict['p']['model_answer']
        gt_answer_p = suffix_dict['p']['gt_answer']

        model_answer_n = suffix_dict['n']['model_answer']
        gt_answer_n = suffix_dict['n']['gt_answer']

        # Parse model answers
        parsed_p, error_p = parse_tf_answer(model_answer_p)
        parsed_n, error_n = parse_tf_answer(model_answer_n)

        # Check if positive answer is correct
        is_correct_p = (parsed_p == gt_answer_p) if parsed_p else False
        if is_correct_p:
            correct_p += 1

        # Check if negative answer is correct
        is_correct_n = (parsed_n == gt_answer_n) if parsed_n else False
        if is_correct_n:
            correct_n += 1

        # Check if both are correct
        if is_correct_p and is_correct_n:
            correct_pairs += 1

    return correct_p, correct_n, correct_pairs, total_pairs

def process_model_file(model_file_path):
    """
    Process a single model answer file and evaluate its accuracy.
    """
    model_name = os.path.splitext(os.path.basename(model_file_path))[0]

    # Load all answers from the model
    model_answers_dict = load_model_answers(model_file_path)

    # Evaluate the model's answers
    correct_p, correct_n, correct_pairs, total = evaluate_pair_correctness(model_answers_dict)
    
    # Calculate three types of accuracy
    positive_acc = correct_p / total if total > 0 else 0
    negative_acc = correct_n / total if total > 0 else 0
    total_acc = correct_pairs / total if total > 0 else 0
    
    print(f"Model: {model_name}")
    print(f"Positive Accuracy: {positive_acc:.2%} ({correct_p}/{total}) | Negative Accuracy: {negative_acc:.2%} ({correct_n}/{total}) | Total Accuracy: {total_acc:.2%} ({correct_pairs}/{total})")

def process_result_folder(result_folder):
    """
    Process all model answer files in the result folder and perform pair cross-validation.
    """
    # Iterate through each model answer file in the result folder
    for filename in os.listdir(result_folder):
        if not filename.endswith(".jsonl"):
            continue  # Only process .jsonl files

        file_path = os.path.join(result_folder, filename)
        process_model_file(file_path)

# Example usage
process_result_folder(oc_cpr_input_folder)


Model: Qwen2.5-VL-7B-Instruct_20250206_214957_answers
Positive Accuracy: 76.11% (274/360) | Negative Accuracy: 85.00% (306/360) | Total Accuracy: 70.83% (255/360)


### cnt

In [5]:
oc_cnt_input_folder = "code/oc/test/test_res/test_cnt" # result folder

In [6]:
# Mapping from English number words to integers
NUM_WORDS = {
    "zero": 0, "one": 1, "two": 2, "three": 3, "four": 4,
    "five": 5, "six": 6, "seven": 7, "eight": 8, "nine": 9,
    "ten": 10, "eleven": 11, "twelve": 12, "thirteen": 13, "fourteen": 14,
    "fifteen": 15, "sixteen": 16, "seventeen": 17, "eighteen": 18, "nineteen": 19,
    "twenty": 20, "thirty": 30, "forty": 40, "fifty": 50,
    "sixty": 60, "seventy": 70, "eighty": 80, "ninety": 90,
    "hundred": 100, "thousand": 1000,
}

# Define penalty factor and max length
PENALTY_FACTOR = 10
L_MAX = 4

def words_to_num(s):
    """Convert English number words to integers"""
    s = s.lower().replace('-', ' ').replace('and', ' ')
    tokens = s.split()
    total = 0
    current = 0
    for token in tokens:
        if token in NUM_WORDS:
            scale = NUM_WORDS[token]
            if scale in (100, 1000):
                if current == 0:
                    current = 1
                current *= scale
                total += current
                current = 0
            else:
                current += scale
        else:
            return None
    total += current
    return total if total != 0 else None

def extract_numbers(text):
    """Extract numbers from text"""
    text = text.lower()
    digit_numbers = [int(num) for num in re.findall(r'\d+', text)]
    word_numbers = []
    pattern = re.compile(
        r'\b(zero|one|two|three|four|five|six|seven|eight|nine|ten|'
        r'eleven|twelve|thirteen|fourteen|fifteen|sixteen|'
        r'seventeen|eighteen|nineteen|twenty|thirty|forty|fifty|'
        r'sixty|seventy|eighty|ninety|hundred|thousand)\b',
        re.IGNORECASE)
    matches = pattern.findall(text)
    if matches:
        words = []
        for match in matches:
            words.append(match)
        word_phrase = ' '.join(words)
        num = words_to_num(word_phrase)
        if num is not None:
            word_numbers.append(num)
    return digit_numbers + word_numbers

def parse_model_answer(model_answer):
    """Parse model answer to get number"""
    numbers = extract_numbers(model_answer)
    if len(numbers) == 1:
        return numbers[0]
    else:
        return None

def load_curated_questions(curated_file):
    """Load question info with image sequence lengths"""
    curated = {}
    with open(curated_file, 'r', encoding='utf-8') as f:
        for line in f:
            try:
                data = json.loads(line)
            except json.JSONDecodeError:
                continue
            q_id = data.get('q_id')
            if q_id is not None:
                if "image_seq_len" in data:
                    curated[q_id] = data["image_seq_len"]
                elif "image_seq" in data and isinstance(data["image_seq"], list):
                    curated[q_id] = len(data["image_seq"])
                else:
                    curated[q_id] = 2
    return curated

def evaluate_model_response(jsonl_file, curated_questions):
    """Evaluate model responses with normalized scoring"""
    base_dir = os.path.dirname(jsonl_file)
    model_name = os.path.splitext(os.path.basename(jsonl_file))[0]
    
    correct_dir = os.path.join(base_dir, 'correct')
    wrong_dir = os.path.join(base_dir, 'wrong')
    os.makedirs(correct_dir, exist_ok=True)
    os.makedirs(wrong_dir, exist_ok=True)

    correct_file = os.path.join(correct_dir, f'{model_name}_correct.jsonl')
    wrong_file = os.path.join(wrong_dir, f'{model_name}_wrong.jsonl')

    total_count = 0
    correct_count = 0
    valid_count = 0
    total_norm_score = 0

    with open(correct_file, 'w', encoding='utf-8') as correct_f, \
         open(wrong_file, 'w', encoding='utf-8') as wrong_f, \
         open(jsonl_file, 'r', encoding='utf-8') as file:

        for line in file:
            data = json.loads(line)
            total_count += 1

            model_answer = data.get('model_answer', '').strip()
            gt_answer = data.get('gt_answer', None)
            if gt_answer is None:
                data['error_reason'] = 'missing_gt_answer'
                wrong_f.write(json.dumps(data, ensure_ascii=False) + '\n')
                continue

            q_id = data.get('q_id', 'unknown_q_id')
            if q_id in curated_questions:
                image_len = curated_questions[q_id]
            else:
                image_len = 2
                data['warning'] = 'q_id not found in curated questions, defaulting image_len=2'
            
            parsed_answer = parse_model_answer(model_answer)
            if parsed_answer is None:
                data['raw_diff'] = None
                data['normalized_score'] = 0.0
                data['error_reason'] = 'invalid_answer_format'
                wrong_f.write(json.dumps(data, ensure_ascii=False) + '\n')
                continue

            if not (1 <= parsed_answer <= image_len):
                raw_diff = abs(parsed_answer - gt_answer)
                data['raw_diff'] = raw_diff
                data['normalized_score'] = 0.0
                data['error_reason'] = 'answer_out_of_expected_range'
                wrong_f.write(json.dumps(data, ensure_ascii=False) + '\n')
                continue

            raw_diff = abs(parsed_answer - gt_answer)
            if raw_diff == 0:
                norm_score = 100.0
            else:
                max_error = max(gt_answer - 1, image_len - gt_answer)
                relative_error = raw_diff / max_error if max_error > 0 else 0
                weight = L_MAX / image_len
                penalty = weight * (relative_error ** (1.0 / PENALTY_FACTOR))
                norm_score = 100 * (1 - penalty) if penalty < 1 else 0.0
            data['raw_diff'] = raw_diff
            data['normalized_score'] = norm_score

            total_norm_score += norm_score
            valid_count += 1

            if parsed_answer == gt_answer:
                correct_count += 1
                correct_f.write(json.dumps(data, ensure_ascii=False) + '\n')
            else:
                data['error_reason'] = 'incorrect_answer'
                wrong_f.write(json.dumps(data, ensure_ascii=False) + '\n')

    accuracy = (correct_count / total_count * 100) if total_count > 0 else 0
    avg_norm_score = (total_norm_score / valid_count) if valid_count > 0 else 0

    print(f"Model: {model_name} Accuracy: {accuracy:.2f}% ({correct_count}/{total_count})")
    print(f"Model: {model_name} Average Normalized Score: {avg_norm_score:.2f}")

def process_folder(input_folder, curated_questions):
    """Process all files in folder"""
    for filename in os.listdir(input_folder):
        if filename.endswith(".jsonl"):
            file_path = os.path.join(input_folder, filename)
            evaluate_model_response(file_path, curated_questions)

# Load curated questions and process results
curated_questions_file = 'jsonl/oc/vanilla/oc_cnt.jsonl'
curated_questions = load_curated_questions(curated_questions_file)

print("oc counting answer analysis:")
process_folder(oc_cnt_input_folder, curated_questions)


oc counting answer analysis:
Model: Qwen2.5-VL-7B-Instruct_20250206_215816_answers Accuracy: 39.72% (143/360)
Model: Qwen2.5-VL-7B-Instruct_20250206_215816_answers Average Normalized Score: 41.51


### mcq


In [9]:
oc_mcq_input_folder = "code/oc/test/test_res/test_grp" # result folder

In [10]:
def evaluate_model_response(jsonl_file):
    # Create two folders if they don't exist
    base_dir = os.path.dirname(jsonl_file)
    model_name = os.path.splitext(os.path.basename(jsonl_file))[0]  # Parse model name
    
    correct_dir = os.path.join(base_dir, 'correct')
    wrong_dir = os.path.join(base_dir, 'wrong')
    
    os.makedirs(correct_dir, exist_ok=True)
    os.makedirs(wrong_dir, exist_ok=True)

    # Output file paths
    correct_file = os.path.join(correct_dir, f'{model_name}_correct.jsonl')
    wrong_file = os.path.join(wrong_dir, f'{model_name}_wrong.jsonl')

    # Open files in write mode, overwrite previous content
    correct_f = open(correct_file, 'w')
    wrong_f = open(wrong_file, 'w')

    def clean_answer(answer):
        """Remove the option letter and its following content, returning only the letter part."""
        return answer.split(')')[0].strip()

    def count_options(answer):
        """Count the number of options in the answer."""
        return len(re.findall(r'\([A-Z]\)', answer))

    correct_count = 0
    total_count = 0

    # Read file and process
    with open(jsonl_file, 'r') as file:
        for line in file:
            data = json.loads(line)
            model_answer = data['model_answer']
            gt_answer = data['gt_answer']
            case_id = data.get('id', 'unknown_id')  # Default to 'unknown_id' if 'id' is missing

            total_count += 1  # Count total entries

            # Handle multiple choice answers
            if count_options(model_answer) > 1:
                data['error_reason'] = 'multi-choice'
                wrong_f.write(json.dumps(data) + '\n')  # Classify multi-choice as wrong
                continue

            # Clean answers for comparison
            model_cleaned = clean_answer(model_answer)
            gt_cleaned = clean_answer(gt_answer)

            # Classify and process
            if model_cleaned == gt_cleaned:
                correct_f.write(json.dumps(data) + '\n')
                correct_count += 1  # Count correct answers
            else:
                data['error_reason'] = 'incorrect_answer'
                wrong_f.write(json.dumps(data) + '\n')

    # Print model accuracy
    accuracy = correct_count / total_count if total_count > 0 else 0
    print(f"Model: {model_name} Accuracy: {accuracy:.2%} ({correct_count}/{total_count})")

    # Close file handles
    correct_f.close()
    wrong_f.close()

def process_folder(input_folder):
    """Batch process all .jsonl files in the folder"""
    for filename in os.listdir(input_folder):
        if filename.endswith(".jsonl"):
            file_path = os.path.join(input_folder, filename)
            evaluate_model_response(file_path)

print("oc mcq answer analysis:")
process_folder(oc_mcq_input_folder)


oc mcq answer analysis:
Model: Qwen2.5-VL-7B-Instruct_20250206_220231_answers Accuracy: 47.00% (94/200)


## PC

### cpr

In [11]:
pc_cpr_input_folder = "code/pc/image/test/test_res/test_cpr" # result folder

In [12]:
def parse_tf_answer(model_answer):
    """
    Extract 'T' or 'F' from the tf type model_answer.
    Supports various formats such as 'T', 'F', 'True', 'False', 't', 'f', and sentences containing these words.
    If multiple 'T'/'F' are found, return None and mark as multiple answers.
    If no 'T'/'F' is found, return None and mark as no answer found.
    """
    # Define matching pattern to match 't', 'f', 'true', 'false'
    pattern = re.compile(r'\b(t|f|true|false)\b', re.IGNORECASE)
    matches = pattern.findall(model_answer)

    # Extract all matched answers
    extracted = [match.upper()[0] for match in matches]  # 'true' -> 'T', 'false' -> 'F'

    if len(extracted) == 1:
        return extracted[0], None  # Return the extracted single answer, no error
    elif len(extracted) > 1:
        return None, 'multiple_answers_found'  # Multiple answers found
    else:
        return None, 'no_answer_found'  # No answers found

def load_model_answers(model_answer_file):
    """
    Load the model answer file, grouping answers by main id, 
    where each main id contains answers for '_p' and '_n'.
    Returns a dictionary where the key is the main id and the value is a sub-dictionary containing 'p' and 'n'.
    """
    model_answers_dict = defaultdict(dict)
    with open(model_answer_file, 'r', encoding='utf-8') as f:
        for line in f:
            data = json.loads(line)
            q_id = data.get('q_id')
            if not q_id or '_' not in q_id:
                continue
                
            # If q_id starts with 'tf_', remove this prefix
            if q_id.startswith('tf_'):
                q_id = q_id[3:]
                
            main_id, suffix = q_id.split('_', 1)
            if suffix not in ['p', 'n']:
                continue
            model_answers_dict[main_id][suffix] = {
                'model_answer': data.get('model_answer', '').strip(),
                'gt_answer': data.get('gt_answer', '').strip().upper()
            }
    return model_answers_dict

def evaluate_pair_correctness(model_answers_dict):
    """
    Evaluate the correctness of the model's main id answers.
    Count the correctness of positive (p), negative (n), and overall (both correct).
    """
    correct_p = 0  # Number of correct positive answers
    correct_n = 0  # Number of correct negative answers
    correct_pairs = 0  # Number of both correct answers
    total_pairs = 0  # Total pairs

    for main_id, suffix_dict in model_answers_dict.items():
        # Ensure both sub-questions exist
        if 'p' not in suffix_dict or 'n' not in suffix_dict:
            continue  # Skip incomplete pairs

        total_pairs += 1

        # Get answers for p and n
        model_answer_p = suffix_dict['p']['model_answer']
        gt_answer_p = suffix_dict['p']['gt_answer']

        model_answer_n = suffix_dict['n']['model_answer']
        gt_answer_n = suffix_dict['n']['gt_answer']

        # Parse model answers
        parsed_p, error_p = parse_tf_answer(model_answer_p)
        parsed_n, error_n = parse_tf_answer(model_answer_n)

        # Check if positive answer is correct
        is_correct_p = (parsed_p == gt_answer_p) if parsed_p else False
        if is_correct_p:
            correct_p += 1

        # Check if negative answer is correct
        is_correct_n = (parsed_n == gt_answer_n) if parsed_n else False
        if is_correct_n:
            correct_n += 1

        # Check if both are correct
        if is_correct_p and is_correct_n:
            correct_pairs += 1

    return correct_p, correct_n, correct_pairs, total_pairs

def process_model_file(model_file_path):
    """
    Process a single model answer file and evaluate its accuracy.
    """
    model_name = os.path.splitext(os.path.basename(model_file_path))[0]

    # Load all answers from the model
    model_answers_dict = load_model_answers(model_file_path)

    # Evaluate the model's answers
    correct_p, correct_n, correct_pairs, total = evaluate_pair_correctness(model_answers_dict)
    
    # Calculate three types of accuracy
    positive_acc = correct_p / total if total > 0 else 0
    negative_acc = correct_n / total if total > 0 else 0
    total_acc = correct_pairs / total if total > 0 else 0
    
    print(f"Model: {model_name}")
    print(f"Positive Accuracy: {positive_acc:.2%} ({correct_p}/{total}) | Negative Accuracy: {negative_acc:.2%} ({correct_n}/{total}) | Total Accuracy: {total_acc:.2%} ({correct_pairs}/{total})")

def process_result_folder(result_folder):
    """
    Process all model answer files in the result folder and perform pair cross-validation.
    """
    # Iterate through each model answer file in the result folder
    for filename in os.listdir(result_folder):
        if not filename.endswith(".jsonl"):
            continue  # Only process .jsonl files

        file_path = os.path.join(result_folder, filename)
        process_model_file(file_path)

print("pc cpr answer analysis:")
process_result_folder(pc_cpr_input_folder)


pc cpr answer analysis:
Model: Qwen2.5-VL-7B-Instruct_20250206_225927_answers
Positive Accuracy: 86.00% (172/200) | Negative Accuracy: 84.00% (168/200) | Total Accuracy: 80.00% (160/200)


### cnt

In [13]:
pc_cnt_input_folder = "code/pc/image/test/test_res/test_cnt" # result folder

In [14]:
# Mapping from English number words to integers
NUM_WORDS = {
    "zero": 0, "one": 1, "two": 2, "three": 3, "four": 4, "five": 5, "six": 6, "seven": 7, "eight": 8, "nine": 9, "ten": 10,
    "eleven": 11, "twelve": 12, "thirteen": 13, "fourteen": 14, "fifteen": 15, "sixteen": 16, "seventeen": 17, "eighteen": 18, "nineteen": 19,
    "twenty": 20, "thirty": 30, "forty": 40, "fifty": 50, "sixty": 60, "seventy": 70, "eighty": 80, "ninety": 90,
    "hundred": 100, "thousand": 1000,
}

# Define penalty factor for inverse power exponent calculation (recommended value 1-3; higher value means more significant penalty for errors)
PENALTY_FACTOR = 10
# Maximum image sequence length (for difficulty weighting), fixed at 4 for this task
L_MAX = 4

def words_to_num(s):
    """
    Convert English number words to integers.
    Supports formats like 'twenty one', 'one hundred', 'one hundred and five' etc.
    """
    s = s.lower().replace('-', ' ').replace('and', ' ')
    tokens = s.split()
    total = 0
    current = 0
    for token in tokens:
        if token in NUM_WORDS:
            scale = NUM_WORDS[token]
            if scale in (100, 1000):
                if current == 0:
                    current = 1
                current *= scale
                total += current
                current = 0
            else:
                current += scale
        else:
            return None
    total += current
    return total if total != 0 else None

def extract_numbers(text):
    """
    Extract all numbers from text, whether Arabic numerals or English number words.
    Returns a list of integers.
    """
    text = text.lower()
    # Extract Arabic numerals
    digit_numbers = re.findall(r'\d+', text)
    digit_numbers = [int(num) for num in digit_numbers]
    # Extract English number words
    word_numbers = []
    pattern = re.compile(
        r'\b(zero|one|two|three|four|five|six|seven|eight|nine|ten|'
        r'eleven|twelve|thirteen|fourteen|fifteen|sixteen|'
        r'seventeen|eighteen|nineteen|twenty|thirty|forty|fifty|'
        r'sixty|seventy|eighty|ninety|hundred|thousand)\b',
        re.IGNORECASE)
    matches = pattern.findall(text)
    if matches:
        words = []
        for match in matches:
            words.append(match)
        word_phrase = ' '.join(words)
        num = words_to_num(word_phrase)
        if num is not None:
            word_numbers.append(num)
    return digit_numbers + word_numbers

def parse_model_answer(model_answer):
    """
    Extract numbers from model_answer and convert to integers.
    Returns the number if exactly one number is found, otherwise returns None.
    """
    numbers = extract_numbers(model_answer)
    if len(numbers) == 1:
        return numbers[0]
    else:
        return None

def load_curated_questions(curated_file):
    """
    Load question information from original question file,
    with q_id as key and image sequence length as value:
      - Prioritize using "image_seq_len" field
      - If not present, check length of "image_seq" list
      - Otherwise default to image_len = 2
    """
    curated = {}
    with open(curated_file, 'r', encoding='utf-8') as f:
        for line in f:
            try:
                data = json.loads(line)
            except json.JSONDecodeError:
                continue
            q_id = data.get('q_id')
            if q_id is not None:
                if "image_seq_len" in data:
                    curated[q_id] = data["image_seq_len"]
                elif "image_seq" in data and isinstance(data["image_seq"], list):
                    curated[q_id] = len(data["image_seq"])
                else:
                    curated[q_id] = 2  # Default value
    return curated

def evaluate_model_response(jsonl_file, curated_questions):
    """
    Evaluate a single jsonl file:
      - Get image_len (image sequence length) from curated_questions using q_id
      - Calculate absolute error raw_diff = |model_answer - gt_answer| for each record
      - If answer is completely correct, normalized score is 100;
        otherwise use new calculation method:
          1. Calculate max_error = max(gt_answer - 1, image_len - gt_answer)
          2. Calculate relative_error = raw_diff / max_error
          3. Calculate difficulty weight: weight = L_MAX / image_len
          4. Use inverse power exponent to amplify error:
             penalty = weight * (relative_error ** (1.0 / PENALTY_FACTOR))
          5. Final normalized score = 100 * (1 - penalty) (score is 0 when penalty >= 1)
      - Accuracy calculation remains unchanged.
    """
    base_dir = os.path.dirname(jsonl_file)
    model_name = os.path.splitext(os.path.basename(jsonl_file))[0]
    
    correct_dir = os.path.join(base_dir, 'correct')
    wrong_dir   = os.path.join(base_dir, 'wrong')
    os.makedirs(correct_dir, exist_ok=True)
    os.makedirs(wrong_dir, exist_ok=True)

    correct_file = os.path.join(correct_dir, f'{model_name}_correct.jsonl')
    wrong_file   = os.path.join(wrong_dir, f'{model_name}_wrong.jsonl')

    total_count = 0
    correct_count = 0
    valid_count = 0
    total_norm_score = 0

    with open(correct_file, 'w', encoding='utf-8') as correct_f, \
         open(wrong_file, 'w', encoding='utf-8') as wrong_f, \
         open(jsonl_file, 'r', encoding='utf-8') as file:

        for line in file:
            data = json.loads(line)
            total_count += 1

            model_answer = data.get('model_answer', '').strip()
            gt_answer = data.get('gt_answer', None)
            if gt_answer is None:
                data['error_reason'] = 'missing_gt_answer'
                wrong_f.write(json.dumps(data, ensure_ascii=False) + '\n')
                continue

            # Get image_len based on q_id
            q_id = data.get('q_id', 'unknown_q_id')
            if q_id in curated_questions:
                image_len = curated_questions[q_id]
            else:
                image_len = 2
                data['warning'] = 'q_id not found in curated questions, defaulting image_len=2'
            
            parsed_answer = parse_model_answer(model_answer)
            if parsed_answer is None:
                data['raw_diff'] = None
                data['normalized_score'] = 0.0
                data['error_reason'] = 'invalid_answer_format'
                wrong_f.write(json.dumps(data, ensure_ascii=False) + '\n')
                continue

            if not (1 <= parsed_answer <= image_len):
                raw_diff = abs(parsed_answer - gt_answer)
                data['raw_diff'] = raw_diff
                data['normalized_score'] = 0.0
                data['error_reason'] = 'answer_out_of_expected_range'
                wrong_f.write(json.dumps(data, ensure_ascii=False) + '\n')
                continue

            raw_diff = abs(parsed_answer - gt_answer)
            if raw_diff == 0:
                norm_score = 100.0
            else:
                # Calculate maximum possible error: consider gt_answer position
                max_error = max(gt_answer - 1, image_len - gt_answer)
                relative_error = raw_diff / max_error if max_error > 0 else 0
                # Difficulty weight: fewer images means higher weight
                weight = L_MAX / image_len
                # Use inverse power exponent to amplify error
                penalty = weight * (relative_error ** (1.0 / PENALTY_FACTOR))
                norm_score = 100 * (1 - penalty) if penalty < 1 else 0.0
            data['raw_diff'] = raw_diff
            data['normalized_score'] = norm_score

            total_norm_score += norm_score
            valid_count += 1

            if parsed_answer == gt_answer:
                correct_count += 1
                correct_f.write(json.dumps(data, ensure_ascii=False) + '\n')
            else:
                data['error_reason'] = 'incorrect_answer'
                wrong_f.write(json.dumps(data, ensure_ascii=False) + '\n')

    accuracy = (correct_count / total_count * 100) if total_count > 0 else 0
    avg_norm_score = (total_norm_score / valid_count) if valid_count > 0 else 0

    print(f"Model: {model_name} Accuracy: {accuracy:.2f}% ({correct_count}/{total_count})")
    print(f"Model: {model_name} Average Normalized Score: {avg_norm_score:.2f}")

def process_folder(input_folder, curated_questions):
    """Process all .jsonl files in the folder"""
    for filename in os.listdir(input_folder):
        if filename.endswith(".jsonl"):
            file_path = os.path.join(input_folder, filename)
            evaluate_model_response(file_path, curated_questions)

print("pc cnt answer analysis:")
# Original counting questions file path 
curated_questions_file = 'jsonl/pc/vanilla/pc_cnt.jsonl' # path to pc cnt questions
curated_questions = load_curated_questions(curated_questions_file)

# Check if the input folder exists
if not os.path.isdir(pc_cnt_input_folder):
    print(f"Result folder does not exist: {pc_cnt_input_folder}")
else:
    process_folder(pc_cnt_input_folder, curated_questions)


pc cnt answer analysis:
Model: Qwen2.5-VL-7B-Instruct_20250206_230834_answers Accuracy: 56.67% (68/120)
Model: Qwen2.5-VL-7B-Instruct_20250206_230834_answers Average Normalized Score: 57.98


### mcq


In [15]:
pc_mcq_input_folder = "code/pc/image/test/test_res/test_grp" # result folder

In [16]:
print("pc mcq answer analysis:")
def evaluate_model_response(jsonl_file):
    # Create two folders if they don't exist
    base_dir = os.path.dirname(jsonl_file)
    model_name = os.path.splitext(os.path.basename(jsonl_file))[0]  # Parse model name
    
    correct_dir = os.path.join(base_dir, 'correct')
    wrong_dir = os.path.join(base_dir, 'wrong')
    
    os.makedirs(correct_dir, exist_ok=True)
    os.makedirs(wrong_dir, exist_ok=True)

    # Output file paths
    correct_file = os.path.join(correct_dir, f'{model_name}_correct.jsonl')
    wrong_file = os.path.join(wrong_dir, f'{model_name}_wrong.jsonl')

    # Open files in write mode, overwrite previous content
    correct_f = open(correct_file, 'w')
    wrong_f = open(wrong_file, 'w')

    def clean_answer(answer):
        """Remove the option letter and its following content, returning only the letter part."""
        return answer.split(')')[0].strip()

    def count_options(answer):
        """Count the number of options in the answer."""
        return len(re.findall(r'\([A-Z]\)', answer))

    correct_count = 0
    total_count = 0

    # Read file and process
    with open(jsonl_file, 'r') as file:
        for line in file:
            data = json.loads(line)
            model_answer = data['model_answer']
            gt_answer = data['gt_answer']
            case_id = data.get('id', 'unknown_id')  # Default to 'unknown_id' if 'id' is missing

            total_count += 1  # Count total entries

            # Handle multiple choice answers
            if count_options(model_answer) > 1:
                data['error_reason'] = 'multi-choice'
                wrong_f.write(json.dumps(data) + '\n')  # Classify multi-choice as wrong
                continue

            # Clean answers for comparison
            model_cleaned = clean_answer(model_answer)
            gt_cleaned = clean_answer(gt_answer)

            # Classify and process
            if model_cleaned == gt_cleaned:
                correct_f.write(json.dumps(data) + '\n')
                correct_count += 1  # Count correct answers
            else:
                data['error_reason'] = 'incorrect_answer'
                wrong_f.write(json.dumps(data) + '\n')

    # Print model accuracy
    accuracy = correct_count / total_count if total_count > 0 else 0
    print(f"Model: {model_name} Accuracy: {accuracy:.2%} ({correct_count}/{total_count})")

    # Close file handles
    correct_f.close()
    wrong_f.close()

def process_folder(input_folder):
    """Batch process all .jsonl files in the folder"""
    for filename in os.listdir(input_folder):
        if filename.endswith(".jsonl"):
            file_path = os.path.join(input_folder, filename)
            evaluate_model_response(file_path)

# Check if the input folder exists
if not os.path.isdir(pc_mcq_input_folder):
    print(f"Result folder does not exist: {pc_mcq_input_folder}")
else:
    process_folder(pc_mcq_input_folder)


pc mcq answer analysis:
Model: Qwen2.5-VL-7B-Instruct_20250206_231136_answers Accuracy: 69.00% (69/100)


### open-ended (video)

In [None]:
import json
import argparse
import re
import os
from openai import OpenAI

# Initialize API client
client = OpenAI(api_key="") # add your openai api key here

# Set input directory for model answers
pc_v_open_ended_input_folder = "code/pc/video/test/test_res/test_pc_v_open-ended"

In [None]:
# Define two prompts
prompt_ab = """#Task
You are evaluating a model's ability to accurately distinguish between two different individuals, A and B, who appear sequentially in a video (first A, then B). Given a description, your task is to determine if the model explicitly identifies that the first person (A) and the second person (B) are different individuals.
#Return Format
You only need return a number after "Score:". If you think the model correctly identifies that the two appearances belong to different individuals, return "Score: 1". If you think the model fails to explicitly state that there are two different individuals, return "Score: 0".
#Description
{description}
"""

prompt_aba = """#Task
You are evaluating a model's ability to accurately distinguish between two different individuals, A and B, who appear sequentially in a video following an ABA pattern (first A, then B, then A again). Given a description, your task is to determine whether the model explicitly identifies that: (1) A and B are different individuals, and (2) The person in the final scene is the same as the first (A).
#Return Format
You only need return a number after "Score:". (1) If the model correctly describes that the video follows an ABA sequence, explicitly recognizing that the first and last appearances belong to the same person (A), while the middle appearance is a different person (B), return "Score: 2".
(2) If the model correctly identifies that there are two different people in the video (A and B) but does not explicitly mention that the last scene returns to A, return "Score: 1".
(3) If the model fails to recognize that two different individuals appear (e.g., treats all appearances as the same person or does not distinguish between A and B), return "Score: 0".
#Description
{description}
"""

def get_model_name(filename):
    # Extract model name from filename pattern like vid_InternVL2.5-26B_20250206_035327.jsonl
    match = re.match(r'(vid_[^_]+)', filename)
    if match:
        return match.group(1)
    return None

def compute_average_scores(output_path):
    ab_scores = []
    aba_scores = []
    
    # Read the output JSONL and extract scores
    with open(output_path, 'r') as outfile:
        for line in outfile:
            data = json.loads(line)
            score = data.get("score")
            score = re.search(r"Score:\s*(\d+)", score)
            score = int(score.group(1)) if score else None
            type_ = data.get("type")

            if score is not None:
                if type_ == "AB":
                    ab_scores.append(score)
                elif type_ == "ABA":
                    aba_scores.append(score)

    # Compute average scores
    avg_ab = sum(ab_scores) / len(ab_scores) if ab_scores else 0
    avg_aba = sum(aba_scores) / len(aba_scores) if aba_scores else 0

    mapped_avg_ab = (avg_ab / 1) * 100
    mapped_avg_aba = (avg_aba / 2) * 100
    avg = (mapped_avg_ab + mapped_avg_aba) / 2

    return {
        "model": os.path.basename(output_path),
        "ab_score": mapped_avg_ab,
        "aba_score": mapped_avg_aba,
        "avg_score": avg
    }

def process_file(input_file, scored_dir):
    model_name = get_model_name(os.path.basename(input_file))
    if not model_name:
        print(f"Could not extract model name from {input_file}")
        return None
        
    output_file = os.path.join(scored_dir, f"{model_name}_scored.jsonl")
    
    # Open input and output files
    with open(input_file, 'r') as infile, open(output_file, 'w') as outfile:
        # Iterate through each line (JSON object)
        for line in infile:
            data = json.loads(line)
            vid = data["vid"]
            desc = data["model_answer"]
            
            # Select appropriate prompt
            if "ABA" in vid:
                prompt = prompt_aba
                type_ = "ABA"
            else:
                prompt = prompt_ab
                type_ = "AB"
                
            prompt_ = prompt.format(description=desc)
            
            # Call OpenAI API to get GPT response
            response = client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {
                        "role": "user",
                        "content": prompt_
                    }
                ],
                max_tokens=300,
            )
            
            # Get GPT response
            score = response.choices[0].message.content
            print(f"Processed {vid}: {score}")
            
            # Write results to output file
            result = {
                "vid": vid,
                "type": type_,
                "score": score,
                "model_answer": desc,
            }
            outfile.write(json.dumps(result) + "\n")
            
    return output_file

# Create scored_files directory if it doesn't exist
scored_dir = os.path.join(pc_v_open_ended_input_folder, "scored_files")
os.makedirs(scored_dir, exist_ok=True)

# Process all files in input directory
results = []
for filename in os.listdir(pc_v_open_ended_input_folder):
    if filename.endswith('.jsonl'):
        input_file = os.path.join(pc_v_open_ended_input_folder, filename)
        print(f"\nProcessing {filename}...")
        
        output_file = process_file(input_file, scored_dir)
        if output_file:
            results.append(compute_average_scores(output_file))

# Print batch results
print("\nBatch Evaluation Results:")
print("-" * 80)
print(f"{'Model':<30} {'AB Score':<15} {'ABA Score':<15} {'Average':<15}")
print("-" * 80)
for result in results:
    print(f"{result['model']:<30} {result['ab_score']:.2f}%{' '*10} {result['aba_score']:.2f}%{' '*10} {result['avg_score']:.2f}%")
