In [1]:
from tqdm import tqdm  
import json
from collections import defaultdict  
import os
import pandas as pd
# from openpyxl import load_workbook
def jsonl_to_excel_sheet(jsonl_file, excel_file, sheet_name, prefix_path=""):
    jsonl_path = os.path.join('./file', jsonl_file)
    rows = []
    with open(jsonl_path, "r", encoding="utf-8") as f:
        for line in f:
            data = json.loads(line.strip())
            row = {
                "image_path": prefix_path + data.get("image_path", ""),
                "question": data.get("prompt", ""),
                "answer": data.get("answer", ""),
                "category": ", ".join(data.get("category", [])),
                "l2-category": data.get("subcategory", ""),
                "type": data.get("type", ""),
                "ID": data.get("ID", ""),
            }
            pred = data.get("predict", "")
            if isinstance(pred, str):
                pred_processed = pred.capitalize()
            else:
                pred_processed = str(pred)
            row["prediction"] = pred_processed
            rows.append(row)

    df = pd.DataFrame(rows)
    df.index.name = "index"

    # try:
    #     book = load_workbook(excel_file)
    #     if sheet_name in book.sheetnames:
    #         # 如果是唯一的sheet，就先创建一个备用空白sheet
    #         if len(book.sheetnames) == 1:
    #             # 添加备用sheet
    #             book.create_sheet("_temp_sheet_for_delete_backup_")
    #             book.save(excel_file)  # 保存后才能继续操作
    #         # 删除原目标sheet
    #         std = book[sheet_name]
    #         book.remove(std)
    #         book.save(excel_file)

    #     with pd.ExcelWriter(excel_file, engine='openpyxl', mode='a') as writer:
    #         writer.book = load_workbook(excel_file)  # 重新加载最新的book
    #         df.to_excel(writer, sheet_name=sheet_name, index=True)
    #         writer.save()

    # except FileNotFoundError:
    #     # 文件不存在，创建新文件
    #     with pd.ExcelWriter(excel_file, engine='openpyxl', mode='w') as writer:
    #         df.to_excel(writer, sheet_name=sheet_name, index=True)
def evaluate(eval_file):
        global COUNT
        COUNT = 0
        score_file = eval_file.replace('.jsonl', '_score.csv')
        score_file = os.path.join('./score', score_file)
        def load(file_path):
            # 读取jsonl文件
            data = []
            with open(file_path, 'r', encoding='utf-8') as f:
                for line in f:
                    data.append(json.loads(line))
            return data
        def is_number(s):
            try:
                float(s)
                return True
            except ValueError:
                pass
            return False
        def get_blank_metric():  
            eval_category_dict = {  
                "unary": {  
                    "Points-unary": ["Existence", "Quantity", "Size Property"],  
                    "Line Segment-unary": ["Existence", "Quantity", "Size Property"],  
                    "Angle-unary": ["Existence", "Quantity", "Size Property"],  
                    "Triangle-unary": ["Existence", "Quantity", "Size Property"],  
                    "Circle-unary": ["Existence", "Quantity", "Size Property"],  
                    "Polygon-unary": ["Existence", "Quantity", "Size Property"],  
                    "Arc-unary": ["Existence", "Quantity", "Size Property"],  
                    "Sector-unary": ["Existence", "Quantity", "Size Property"],  
                },  
                "binary": {  
                    "Points": ["Points", "LineSegment", "Angle", "Triangle", "Circle", "Polygon"],  
                    "LineSegment": ["LineSegment", "Angle", "Triangle", "Circle", "Polygon"],  
                    "Angle": ["Angle", "Triangle", "Circle", "Polygon"],  
                    "Triangle": ["Triangle", "Circle", "Polygon"],  
                    "Circle": ["Circle", "Polygon"],  
                    "Polygon": ["Polygon"],  
                },  
                "answer_type": ["choose", "free-form"],  
                "special_case": ["wrongprerequisite", "not given", "normal"],  
                "id": [i for i in range(1, 73)],  
                "image": ["synthetic", "mathverse"]  
            }   

            # 初始化空的指标字典  
            def initialize_metrics(category_dict):  
                metrics = {}  
                for key, value in category_dict.items():  
                    if isinstance(value, dict):  
                        # 如果是字典，则递归调用  
                        metrics[key] = initialize_metrics(value)  
                    elif isinstance(value, list):  
                        # 如果是列表，则为每个元素赋值0  
                        metrics[key] = {item: 0 for item in value}  
                    else:  
                        # 处理其他类型（尽可能保持通用）  
                        metrics[key] = 0  
                return metrics  
            
            return initialize_metrics(eval_category_dict) 
        def is_close_with_ratio(gt_ans, pred_ans, relative_tol=0.01, absolute_tol=1e-6, loose_flag=False):  
            if loose_flag and (gt_ans == "notgiven" and pred_ans in ("no","notsure")):
                return True

            if isinstance(gt_ans, (int, float)) and isinstance(pred_ans, (int, float)):
                # 数字的话看误差
                if gt_ans == 0:  
                    # 如果 gt_ans 为 0，仅比较绝对误差  
                    return abs(gt_ans - pred_ans) <= absolute_tol  
                else:  
                    # 计算相对误差  
                    relative_error = abs(gt_ans - pred_ans) / abs(gt_ans)  
                    return relative_error <= relative_tol  
            else:
                # 字符串严格匹配
                return int(gt_ans == pred_ans)
        def regularize_answer(ans):  
            ans = str(ans).lower().strip().replace(' ', '').replace('\n', '')
            # 去掉句尾的句号
            ans = ans[:-1] if ans.endswith('.') else ans
            ans = eval(ans) if is_number(ans) else ans
            try:
                assert ans in ["yes", "no", "a", "b", "c", "d", "e", "wrongprerequisite", "notgiven", "notsure"] or isinstance(ans, (int, float))
            except:
                global COUNT
                COUNT += 1
                # 返回最大的一个数
                return 999
            return ans

        def process_metric_to_excel(metric_all, excel_file, sheet_name):
            data = {
                "Metric": [],  
                "Category": [],  
                "Subcategory": [],  
                "Strict Accuracy": [],  
                "Loose Accuracy": [],  
                "Count": []  
            }
            for key in metric_all['count']:
                if key in ('unary', 'binary'):
                    for category, subcategories in metric_all['count'][key].items():
                        for subcategory, count in subcategories.items():
                            loose_acc = metric_all['loose_c'][key][category][subcategory] / count if count > 0 else '-'
                            strict_acc = metric_all['strict_c'][key][category][subcategory] / count if count > 0 else '-'
                            data['Metric'].append(key)
                            data['Category'].append(category)
                            data['Subcategory'].append(subcategory)
                            data['Strict Accuracy'].append(strict_acc)
                            data['Loose Accuracy'].append(loose_acc)
                            data['Count'].append(count)
                elif key in ('answer_type', 'special_case', 'id', 'image'):
                    for category, count in metric_all['count'][key].items():
                        loose_acc = metric_all['loose_c'][key][category] / count if count > 0 else '-'
                        strict_acc = metric_all['strict_c'][key][category] / count if count > 0 else '-'
                        data['Metric'].append(key)
                        data['Category'].append(category)
                        data['Subcategory'].append('-')
                        data['Strict Accuracy'].append(strict_acc)
                        data['Loose Accuracy'].append(loose_acc)
                        data['Count'].append(count)

            df = pd.DataFrame(data)
            df = df[["Metric", "Category", "Subcategory",  "Count", "Strict Accuracy", "Loose Accuracy"]]

            # 判断文件是否存在，若存在则追加sheet，不覆盖其他sheet
            # from openpyxl import load_workbook
            # try:
            #     book = load_workbook(excel_file)
            #     with pd.ExcelWriter(excel_file, engine='openpyxl', mode='a') as writer:
            #         writer.book = book
            #         df.to_excel(writer, sheet_name=sheet_name, index=False)
            # except FileNotFoundError:
            #     # 文件不存在时，创建新文件
            #     with pd.ExcelWriter(excel_file, engine='openpyxl', mode='w') as writer:
            #         df.to_excel(writer, sheet_name=sheet_name, index=False)

            return df
        def process_metric_to_dataframe(metric_all):
            # 用于存储最终数据的列表
            data = {
                "Metric": [],  
                "Category": [],  
                "Subcategory": [],  
                "Strict Accuracy": [],  
                "Loose Accuracy": [],  
                "Count": []  
            }
            for key in metric_all['count']:
                if key in ('unary', 'binary'):
                    for category, subcategories in metric_all['count'][key].items():
                        for subcategory, count in subcategories.items():
                            loose_acc = metric_all['loose_c'][key][category][subcategory] / count if count > 0 else '-'
                            strict_acc = metric_all['strict_c'][key][category][subcategory] / count if count > 0 else '-'
                            data['Metric'].append(key)
                            data['Category'].append(category)
                            data['Subcategory'].append(subcategory)
                            data['Strict Accuracy'].append(strict_acc)
                            data['Loose Accuracy'].append(loose_acc)
                            data['Count'].append(count)
                elif key in ('answer_type', 'special_case', 'id', 'image'):
                    for category, count in metric_all['count'][key].items():
                        loose_acc = metric_all['loose_c'][key][category] / count if count > 0 else '-'
                        strict_acc = metric_all['strict_c'][key][category] / count if count > 0 else '-'
                        data['Metric'].append(key)
                        data['Category'].append(category)
                        data['Subcategory'].append('-')
                        data['Strict Accuracy'].append(strict_acc)
                        data['Loose Accuracy'].append(loose_acc)
                        data['Count'].append(count)
            # 创建 DataFrame
            df = pd.DataFrame(data)
            # 将数据按 Metric 和 Category 排序  
            df = df[["Metric", "Category", "Subcategory",  "Count", "Strict Accuracy", "Loose Accuracy"]]  
            # 保存到score_file
            df.to_csv(score_file, index=False)
            return df

        doc = load(os.path.join('./file', eval_file))  # 读取文件
        preds = [x['predict'] for x in doc]
        answers = [x['answer'] for x in doc]
        metric_all={}
        for key in ['strict_c', 'loose_c', 'count']:
            metric_all[key] = get_blank_metric()
        # zip pred answer
        for i, pred, gt in zip(range(len(preds)), preds, answers):  
            pred = regularize_answer(pred)
            gt = regularize_answer(gt)
            

            if gt == "wrongprerequisite":
                # wrong prerequisite特殊处理
                metric_all['count']['special_case']['wrongprerequisite'] += 1
                metric_all['strict_c']['special_case']['wrongprerequisite'] += pred=='wrongprerequisite'
                metric_all['loose_c']['special_case']['wrongprerequisite'] += pred in ('wrongprerequisite', 'notgiven', 'no', 'notsure')
                doc[i]['strict'] = pred=='wrongprerequisite'
                doc[i]['loose'] = pred in ('wrongprerequisite', 'notgiven', 'no', 'notsure')
                continue
            category = doc[i]['category']
            subcategory = doc[i]['subcategory']
            answer_type = doc[i]['type']
            ID = doc[i]['ID']
            image = doc[i]['image_path']
            # 先处理special case

            strict_result = is_close_with_ratio(gt, pred)
            loose_result = is_close_with_ratio(gt, pred, loose_flag=True)
            doc[i]['strict'] = strict_result
            doc[i]['loose'] = loose_result

            if gt == 'notgiven':
                metric_all['count']['special_case']['not given'] += 1
                metric_all['strict_c']['special_case']['not given'] += strict_result
                metric_all['loose_c']['special_case']['not given'] += loose_result
            else:
                metric_all['count']['special_case']['normal'] += 1
                metric_all['strict_c']['special_case']['normal'] += strict_result
                metric_all['loose_c']['special_case']['normal'] += loose_result
            # 处理unary binary
            if len(category) == 2:
                # binary
                elem1 = category[0]
                elem2 = category[1]
                metric_all['count']['binary'][elem1][elem2] += 1
                metric_all['strict_c']['binary'][elem1][elem2] += strict_result
                metric_all['loose_c']['binary'][elem1][elem2] += loose_result
            else:
                # unary
                metric_all['count']['unary'][category[0]][subcategory] += 1
                metric_all['strict_c']['unary'][category[0]][subcategory] += strict_result
                metric_all['loose_c']['unary'][category[0]][subcategory] += loose_result
            # 处理answer type
            metric_all['count']['answer_type'][answer_type] += 1
            metric_all['strict_c']['answer_type'][answer_type] += strict_result
            metric_all['loose_c']['answer_type'][answer_type] += loose_result
            # 处理id
            metric_all['count']['id'][ID] += 1
            metric_all['strict_c']['id'][ID] += strict_result
            metric_all['loose_c']['id'][ID] += loose_result
            # 处理image
            if 'mathverse' in image:
                metric_all['count']['image']['mathverse'] += 1
                metric_all['strict_c']['image']['mathverse'] += strict_result
                metric_all['loose_c']['image']['mathverse'] += loose_result
            else:
                metric_all['count']['image']['synthetic'] += 1
                metric_all['strict_c']['image']['synthetic'] += strict_result
                metric_all['loose_c']['image']['synthetic'] += loose_result
        # 计算指标
        df = process_metric_to_dataframe(metric_all)
        # process_metric_to_excel(metric_all, './score/total_score.xlsx', eval_file.replace('.jsonl', ''))
        print(f"处理完成: {eval_file}，错误数{COUNT}")

        # 存储处理后的数据
        output_file = os.path.join('./file_score', eval_file.replace('.jsonl', '_processed.jsonl'))

        with open(output_file, 'w', encoding='utf-8') as f_out:
            for entry in doc:
                f_out.write(json.dumps(entry, ensure_ascii=False) + "\n")
        df = None
        return df

In [3]:
import os
eval_file_dir = './file/'
# 遍历文件夹
eval_file_list = os.listdir(eval_file_dir)
# 过滤出以jsonl结尾的文件
eval_file_list = [f for f in eval_file_list if f.endswith('.jsonl')]
# 遍历文件列表
for eval_file in eval_file_list:
    eval_file = "qwen2.5vl_answers.jsonl"
    df = evaluate(eval_file)
    print(df)
    break

处理完成: qwen2.5vl_answers.jsonl，错误数0
None
