In [None]:
import os
import re
import json
from loguru import logger
from tqdm import tqdm
from vllm import LLM, SamplingParams

MODEL_NAME = 'Qwen/Qwen2-7B-Instruct'
llm = LLM(model=MODEL_NAME, dtype='float16')

sampling_params = SamplingParams(
    max_tokens=1024
)


In [None]:
def call_LLM(query):
    response = llm.generate(query, sampling_params)
    return response[0].outputs[0].text

In [None]:
def get_prompt(problem, question, options):
    options = '\n'.join(f"{'ABCDEFG'[i]}. {o}" for i, o in enumerate(options))

    prompt = f"""你是一位逻辑推理专家。请分析并解答以下单选题,遵循闭世界假设(未明确陈述的事实均为假)。
                ### 题目:
                {problem}
                
                ### 问题:
                {question}
                {options}
                
                请按以下步骤进行:
                1. 列出关键信息和事实
                2. 分析每个选项的逻辑
                3. 解释你的推理过程
                4. 得出结论
                
                简短且用清晰的结构呈现你的分析。如果是数学题，直接给出答案。字数限定在150字以内且必须包括答案。
                
                最后,以此格式给出答案:
                答案是：#选项字母#
            """
    return prompt


In [None]:
# 这里使用extract抽取模获得抽取的结果

def extract(input_text):
    ans_pattern = re.compile(r"答案是：(.)", re.S)

    problems = ans_pattern.findall(input_text)
    if (problems == ''):
        return 'A'
    return problems[0]

In [None]:
def process_datas(datas):
    results = []

    future_data = {}
    lens = 0
    for data in tqdm(datas, desc="Submitting tasks", total=len(datas)):

        problem = data['problem']
        for id, question in enumerate(data['questions']):
            prompt = get_prompt(problem,
                                question['question'],
                                question['options'],
                                )
            future = call_LLM(prompt)
            future_data[future] = (data, id)
            lens += 1
    for future in tqdm(future_data, total=lens, desc="Processing tasks"):
        data = future_data[future][0]
        problem_id = future_data[future][1]
        try:
            res = future
            extract_response = extract(res)
            print(extract_response)
            data['questions'][problem_id]['answer'] = extract_response
            results.append(data)
        except Exception as e:
            logger.error(f"Failed to process text: {data}. Error: {e}")

    return results

In [None]:
def main(ifn, ofn):
    if os.path.exists(ofn):
        pass
    data = []
    # 按行读取数据
    with open(ifn) as reader:
        for line in reader:
            sample = json.loads(line)
            data.append(sample)
    datas = data
    return_list = process_datas(datas)
    print(len(return_list))
    print("All tasks finished!")
    return return_list

In [None]:
def evaluate(ofn):
    data = []
    with open(ofn) as reader:
        for line in reader:
            sample = json.loads(line)
            data.append(sample)

    pse = 0
    cnt = 0
    tot = 0
    for task in data:
        for question in task['questions']:

            if MODEL_NAME in question:
                tot += 1
                cnt += question[MODEL_NAME] == question['answer']
            else:
                pse += 1

    print(cnt, tot, cnt / tot, pse)

In [None]:
if __name__ == '__main__':
    return_list = main('round1_test_data.jsonl', 'upload.jsonl')


In [None]:
def has_complete_answer(questions):
    # 这里假设完整答案的判断逻辑是：每个question都有一个'answer'键
    for question in questions:
        if 'answer' not in question:
            return False
    return True


def filter_problems(data):
    result = []
    problem_set = set()

    for item in data:
        # print('处理的item' ,item)
        problem = item['problem']
        if problem in problem_set:
            # 找到已存在的字典
            for existing_item in result:
                if existing_item['problem'] == problem:
                    # 如果当前字典有完整答案，替换已存在的字典
                    if has_complete_answer(item['questions']):
                        existing_item['questions'] = item['questions']
                        existing_item['id'] = item['id']
                    break
        else:
            # 如果当前字典有完整答案，添加到结果列表
            if has_complete_answer(item['questions']):
                result.append(item)
                problem_set.add(problem)

    return result

In [None]:
return_list
return_list = filter_problems(return_list)
sorted_data = sorted(return_list, key=lambda x: int(str(x['id'])[-3:]))
print(sorted_data)

In [None]:
sorted_data

In [None]:
def find_missing_ids(dict_list):
    # 提取所有序号
    extracted_ids = {int(d['id'][-3:]) for d in dict_list}

    # 创建0-500的序号集合
    all_ids = set(range(500))

    # 找出缺失的序号
    missing_ids = all_ids - extracted_ids

    return sorted(missing_ids)


# 示例字典列表
dict_list = sorted_data

# 找出缺失的序号
missing_ids = find_missing_ids(dict_list)
print("缺失的序号:", missing_ids)

In [None]:
len(missing_ids)

In [None]:
data = []
with open('round1_test_data.jsonl') as reader:
    for id, line in enumerate(reader):
        if (id in missing_ids):
            sample = json.loads(line)
            for question in sample['questions']:
                question['answer'] = 'A'
            sorted_data.append(sample)
sorted_data = sorted(sorted_data, key=lambda x: int(str(x['id'])[-3:]))


In [None]:
with open('upload-pipeline.jsonl', 'w') as writer:
    for sample in sorted_data:
        writer.write(json.dumps(sample, ensure_ascii=False))
        writer.write('\n')