In [22]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
功能：解析包含多个结构块的文本文件，每块包含 SET、CONTENT、QUESTIONS、GRAPH 区域。
将其转换为 JSON 格式结构，保存到 result.json。

文本结构示例：
SET48
CONTENT
...
QUESTIONS
Q
...
A
...
GRAPH
{ ... }
"""

import json

def parse_multiple_blocks(lines):
    """
    解析文本中的多个结构块，返回一个 JSON 对象列表。
    """
    blocks = []
    current_block = None

    # 状态控制
    in_content = False
    in_questions = False
    in_graph = False

    # 缓存区
    content_buffer = []
    question_buffer = None
    answer_buffer = None
    graph_buffer = []

    def flush_current_block():
        """
        将当前结构块缓存内容写入 blocks 列表，并清空状态。
        """
        nonlocal current_block, content_buffer, graph_buffer
        nonlocal in_content, in_questions, in_graph

        if current_block is None:
            return

        # 内容整理
        current_block["CONTENT"] = "\n".join(content_buffer) if content_buffer else ""

        if graph_buffer:
            graph_str = "\n".join(graph_buffer)
            if not graph_str.strip().startswith("{"):
                graph_str = "{" + graph_str + "}"
            try:
                graph_json = json.loads(graph_str)
                current_block["GRAPH"] = graph_json
            except json.JSONDecodeError as e:
                raise ValueError(f"GRAPH 区块 JSON 解析失败：{e}\n内容如下：\n{graph_str}")
        else:
            current_block["GRAPH"] = {}

        blocks.append(current_block)

        # 重置状态
        current_block = None
        content_buffer.clear()
        graph_buffer.clear()
        in_content = False
        in_questions = False
        in_graph = False

    i = 0
    while i < len(lines):
        line = lines[i].strip()

        # 1. 新块开始
        if line.startswith("SET"):
            if current_block is not None:
                flush_current_block()
            current_block = {
                "SET": line,
                "CONTENT": "",
                "QUESTIONS": [],
                "GRAPH": {}
            }
            i += 1
            continue

        # 如果 SET 块未开始，忽略其他行
        if current_block is None:
            i += 1
            continue

        # 区块切换逻辑
        if line == "CONTENT":
            in_content, in_questions, in_graph = True, False, False
            i += 1
            continue
        elif line .startswith( "QUESTION"):
            in_content, in_questions, in_graph = False, True, False
            i += 1
            continue
        elif line == "GRAPH":
            in_content, in_questions, in_graph = False, False, True
            i += 1
            continue

        # CONTENT 区块采集
        if in_content:
            content_buffer.append(line)
            i += 1
            continue

                # QUESTIONS 区块采集
        if in_questions:
            if line == "Q":
                question_lines = []
                i += 1
                # 收集多行 Q 内容，直到遇到 A / Q / GRAPH / CONTENT
                while i < len(lines):
                    next_line = lines[i].strip()
                    if next_line in {"Q", "A", "GRAPH", "CONTENT"}:
                        break
                    question_lines.append(next_line)
                    i += 1
                question_buffer = "\n".join(question_lines)
                continue

            elif line == "A":
                answer_lines = []
                i += 1
                # 收集多行 A 内容，直到遇到 Q / A / GRAPH / CONTENT
                while i < len(lines):
                    next_line = lines[i].strip()
                    if next_line in {"Q", "A", "GRAPH", "CONTENT"}:
                        break
                    answer_lines.append(next_line)
                    i += 1
                answer_buffer = "\n".join(answer_lines)

                # 存储问答对
                if question_buffer and answer_buffer:
                    current_block["QUESTIONS"].append({
                        "Q": question_buffer,
                        "A": answer_buffer
                    })
                    question_buffer = None
                    answer_buffer = None
                continue

            else:
                # 非 Q/A 标识行，跳过
                i += 1
                continue

        # GRAPH 区块采集
        if in_graph:
            graph_buffer.append(line)
            i += 1
            continue

        # 其他默认跳过
        i += 1

    # 最后一块补交
    if current_block is not None:
        flush_current_block()

    return blocks


def main():
    input_path = "./raw_text.txt"
    output_path = "result.json"

    # 读取并预处理
    with open(input_path, "r", encoding="utf-8") as f:
        raw_lines = [line.strip() for line in f if line.strip()]

    # 解析文本结构
    parsed_blocks = parse_multiple_blocks(raw_lines)

    # 写出结果 JSON 文件
    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(parsed_blocks, f, ensure_ascii=False, indent=4)

    print(f"✅ 解析完成，已保存到：{output_path}")
    # 提取所有 Q/A 对并写入 qa.json
    all_qa_pairs = []
    for block in parsed_blocks:
        for qa in block.get("QUESTIONS", []):
            if "Q" in qa and "A" in qa:
                all_qa_pairs.append({
                    "Q": qa["Q"],
                    "A": qa["A"]
                })
    qa_output_path = "qa.json"
    with open(qa_output_path, "w", encoding="utf-8") as f:
        json.dump(all_qa_pairs, f, ensure_ascii=False, indent=4)
    print(f"✅ 所有 Q/A 对已保存到：{qa_output_path}",len(all_qa_pairs))
    with open("qa1.json", "w", encoding="utf-8") as f:
        json.dump(all_qa_pairs[:350], f, ensure_ascii=False, indent=4)
    with open("qa2.json", "w", encoding="utf-8") as f:
        json.dump(all_qa_pairs[350:], f, ensure_ascii=False, indent=4)
if __name__ == "__main__":
    main()


✅ 解析完成，已保存到：result.json
✅ 所有 Q/A 对已保存到：qa.json 692


In [16]:
import re
import json

# --------------------------------------------
# Script: parse_disease_history.py
# Purpose: 解析 disease_history.txt 文件中每个病例元素，提取题干、题目、答案和评分要点，输出为 JSON 格式。
# --------------------------------------------


def parse_element(text):
    """
    解析单个病例元素文本，提取字段：
      - id: 病例标识，如 SET51
      - 病史: BACKGROUND 部分
      - 问题: QUESTION 部分，列表
      - 答案: ANSWER 部分，列表
      - 评分细则: STANDARD 部分，列表
    """
    id_match = re.search(r"^(SET\d+)", text, re.MULTILINE)
    case_id = id_match.group(1) if id_match else None

    # 病史题干
    stem_match = re.search(r"BACKGROUND\s*(.*?)\s*QUESTION", text, re.S)
    stem = stem_match.group(1).strip() if stem_match else ""

    # 问题
    ques_block_match = re.search(r"QUESTION\s*(.*?)\s*ANSWER", text, re.S)
    questions = []
    if ques_block_match:
        parts = re.split(r"^QUE", ques_block_match.group(1), flags=re.M)
        questions = [part.strip() for part in parts if part.strip()]

    # 答案
    ans_block_match = re.search(r"ANSWER\s*(.*?)\s*STANDARD", text, re.S)
    answers = []
    if ans_block_match:
        parts = re.split(r"^ANS", ans_block_match.group(1), flags=re.M)
        answers = [part.strip() for part in parts if part.strip()]

    # 提取评分细则
    rubric_match = re.search(r"STANDARD\s*(.*)", text, re.S)
    rubrics = []
    if rubric_match:
        raw = rubric_match.group(1)

        # 使用 lookahead，保留 STA 开头，避免首段出问题
        parts = re.split(r"(?=^\s*STA\s*)", raw, flags=re.M)

        for part in parts:
            # 去掉开头的 STA
            clean = re.sub(r"^\s*STA\s*", "", part.strip())
            if clean:
                rubrics.append(clean)

    # 校验一致性
    max_len = max(len(questions), len(answers), len(rubrics))
    while len(answers) < max_len:
        answers.append("")
    while len(rubrics) < max_len:
        rubrics.append("")

    return {
        "id": case_id,
        "病史": stem,
        "问题": questions,
        "答案": answers,
        "评分细则": rubrics
    }


def parse_file(file_path):
    """
    读取整个文件，并按 SET 开头拆分多个病例元素，返回解析后的 JSON 列表。
    """
    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()

    # 按病例分段 (在每个 SET 开头处拆分)
    raw_elements = re.split(r"(?=^SET\d+)", content, flags=re.M)
    parsed = []
    for elem in raw_elements:
        elem = elem.strip()
        if not elem:
            continue
        parsed.append(parse_element(elem))

    return parsed


if __name__ == '__main__':
    # 示例：解析 disease_history.txt 并保存为 JSON
    cases = parse_file('disease_history.txt')
    with open('parsed_cases.json', 'w', encoding='utf-8') as out:
        json.dump(cases, out, ensure_ascii=False, indent=2)
    print(f"已解析 {len(cases)} 个病例，结果保存在 parsed_cases.json。")

# --------------------------------------------
# 说明：
# 1. 使用正则表达式对文本进行分段和提取。
# 2. re.S (DOTALL) 使 "." 匹配换行，re.M (MULTILINE) 使 ^ 和 $ 匹配每行。
# 3. 对题目部分使用 re.split 分割 "QUE" 标记，并清洗空白。
# 4. 输出 JSON 时保持中文字符。
# --------------------------------------------


已解析 81 个病例，结果保存在 parsed_cases.json。


In [5]:
import json
import math
import threading
from queue import Queue
from typing import List, Dict
from tenacity import retry, stop_after_attempt, wait_exponential
from dashscope import Generation

# ✅ Qwen 配置
QWEN_API_KEY = "sk-969f4200d53442a2a1733d1c0b1fb330"
QWEN_CHAT_MODEL = "qwen-plus"
NUM_THREADS = 8

# =====================
# 调用 Qwen 评估模型
# =====================
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=2, max=10))
def llm_model_func(prompt, system_prompt=None, history_messages=[], **kwargs) -> str:
    import dashscope
    dashscope.api_key = QWEN_API_KEY

    messages = []
    if system_prompt:
        messages.append({"role": "system", "content": system_prompt})
    messages.extend(history_messages)
    messages.append({"role": "user", "content": prompt})

    response = Generation.call(
        model=QWEN_CHAT_MODEL,
        messages=messages,
        result_format='message'
    )
    return response["output"]["choices"][0]["message"]["content"].strip()

def build_evaluation_prompt(q: str, a: str) -> str:
    return f"""
你是一名口腔医学考试专家，请对以下问答进行筛选：

问题：{q}
回答：{a}

请判断：
1. 是否是口腔医学领域特有的问题？（是/否）
2. 是否涉及具体的操作技能？（是/否）
3. 答案是否专业、准确、完整，体现较高的知识深度？（是/否）
4. 综合判断该问答是否适合作为口腔领域技能评测题目？（是/否）

请严格按照如下 JSON 返回：
{{
  "领域特异性": "是/否",
  "技能导向": "是/否",
  "答案质量": "是/否",
  "是否推荐": "是/否"
}}
"""

import re

def gpt_evaluate(q: str, a: str) -> bool:
    prompt = build_evaluation_prompt(q, a)
    try:
        response = llm_model_func(prompt)
        # 提取第一个合法 JSON 对象
        match = re.search(r'\{.*?\}', response, re.DOTALL)
        if not match:
            raise ValueError("No JSON object found in response.")
        json_str = match.group(0)
        result = json.loads(json_str)
        return result.get("是否推荐") == "是"
    except Exception as e:
        print(f"[❌ 模型评估失败] {e}")
        return False


# =====================
# Worker 多线程处理 Q&A
# =====================
def qa_worker(queue: Queue, result_dict: Dict[str, List[Dict]]):
    while not queue.empty():
        block_id, qa = queue.get()
        q, a = qa.get("Q"), qa.get("A")
        if not q or not a:
            queue.task_done()
            continue
        if gpt_evaluate(q, a):
            result_dict.setdefault(block_id, []).append(qa)
        queue.task_done()

# =====================
# 主函数：读取、按比例筛选、保存
# =====================
def threaded_blockwise_filter(input_path: str, output_path: str, total_target: int = 200):
    with open(input_path, 'r', encoding='utf-8') as f:
        all_blocks = json.load(f)

    total_questions = sum(len(b["QUESTIONS"]) for b in all_blocks)
    print(f"📘 总共 {len(all_blocks)} 个 block，{total_questions} 个问题，目标保留 {total_target} 条")

    # 组织任务队列（以问题为单位）
    qa_queue = Queue()
    block_map = {}  # 用于记录 block_id -> 原始 QA 数量

    for block in all_blocks:
        block_id = block.get("SET", "UNKNOWN")
        questions = block.get("QUESTIONS", [])
        block_map[block_id] = len(questions)
        for qa in questions:
            qa_queue.put((block_id, qa))

    # 用于保存每个 block 的推荐 QA
    result_dict = {}
    threads = []

    for _ in range(NUM_THREADS):
        t = threading.Thread(target=qa_worker, args=(qa_queue, result_dict))
        t.start()
        threads.append(t)

    qa_queue.join()
    for t in threads:
        t.join()

    # ============================
    # 每个 block 取对应比例
    # ============================
    final_output = []
    for block_id, original_count in block_map.items():
        keep_count = max(1, round(original_count * total_target / total_questions))
        selected = result_dict.get(block_id, [])[:keep_count]
        final_output.extend(selected)
        print(f"✅ {block_id}: 原始 {original_count} → 保留 {len(selected)}")

    print(f"\n✅ 最终总计保留 {len(final_output)} 条，保存到 {output_path}")
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(final_output, f, ensure_ascii=False, indent=2)

# =====================
# 入口函数
# =====================
if __name__ == '__main__':
    threaded_blockwise_filter(
        input_path="/home/lym/GraphRAG4OralHealth/Benchmark/Skill Objectives/raw_data.json",
        output_path="/home/lym/GraphRAG4OralHealth/Benchmark/Skill Objectives/basic_techniques.json",
        total_target=200
    )


📘 总共 53 个 block，692 个问题，目标保留 200 条
✅ SET1: 原始 2 → 保留 0
✅ SET2: 原始 1 → 保留 1
✅ SET3: 原始 3 → 保留 1
✅ SET4: 原始 10 → 保留 3
✅ SET5: 原始 10 → 保留 3
✅ SET6: 原始 10 → 保留 3
✅ SET7: 原始 9 → 保留 3
✅ SET8: 原始 9 → 保留 3
✅ SET9: 原始 10 → 保留 3
✅ SET10: 原始 8 → 保留 2
✅ SET11: 原始 10 → 保留 3
✅ SET12: 原始 10 → 保留 3
✅ SET13: 原始 9 → 保留 3
✅ SET14: 原始 30 → 保留 9
✅ SET15: 原始 32 → 保留 9
✅ SET16: 原始 18 → 保留 5
✅ SET17: 原始 19 → 保留 5
✅ SET18: 原始 7 → 保留 2
✅ SET19: 原始 19 → 保留 5
✅ SET20: 原始 20 → 保留 6
✅ SET21: 原始 20 → 保留 6
✅ SET22: 原始 10 → 保留 3
✅ SET23: 原始 10 → 保留 3
✅ SET24: 原始 10 → 保留 3
✅ SET25: 原始 7 → 保留 0
✅ SET26: 原始 3 → 保留 0
✅ SET27: 原始 2 → 保留 0
✅ SET28: 原始 21 → 保留 4
✅ SET29: 原始 10 → 保留 3
✅ SET30: 原始 21 → 保留 6
✅ SET31: 原始 20 → 保留 6
✅ SET32: 原始 8 → 保留 2
✅ SET33: 原始 10 → 保留 1
✅ SET34: 原始 10 → 保留 2
✅ SET35: 原始 10 → 保留 3
✅ SET36: 原始 9 → 保留 2
✅ SET37: 原始 10 → 保留 3
✅ SET38: 原始 20 → 保留 1
✅ SET39: 原始 21 → 保留 2
✅ SET40: 原始 15 → 保留 1
✅ SET41: 原始 10 → 保留 3
✅ SET42: 原始 20 → 保留 6
✅ SET43: 原始 8 → 保留 2
✅ SET44: 原始 10 → 保留 3
✅ SET45: 原始 16 → 保留 