In [10]:
# -*- coding: utf-8 -*-
"""
从 txt（示例格式见截图）解析调度问题，并用 DeepSeek 辅助补充信息。
变更点：
- 用 DeepSeek API 替代原来的 Ollama 调用；
- 批量处理 Input 目录下的 .txt，输出到 output_json；
- 顶层 JSON 新增 article_title。
"""

import os
import re
import json
from pathlib import Path
from typing import Any, Dict, List

# ========================== 路径配置（按需改） ==========================
# ——手动指定要处理的 txt 完整路径——
FILES = [
    "2.txt",
]
OUT_DIR = Path.home() / "/Users/yoosi/Desktop/Capstone/json_file"  # 批量输出 .json 的目录
OUT_DIR.mkdir(parents=True, exist_ok=True)
# =====================================================================

# ======================= DeepSeek（OpenAI兼容） =======================
try:
    from openai import OpenAI
except ImportError as e:
    raise ImportError("需要安装 openai>=1.0.0 ：pip install --upgrade openai") from e

DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY", "sk-ca02b34094134bd8902281716aad4a65")
DS_MODEL = "deepseek-chat"  # 通用对话模型

ds_client = OpenAI(
    api_key=DEEPSEEK_API_KEY,
    base_url="https://api.deepseek.com"
)


def ds_chat(prompt: str, max_retries: int = 2) -> str:
    """最小封装：发送 user 消息，返回文本内容"""
    last_err = None
    for _ in range(max_retries + 1):
        try:
            resp = ds_client.chat.completions.create(
                model=DS_MODEL,
                messages=[{"role": "user", "content": prompt}],
                temperature=0
            )
            return (resp.choices[0].message.content or "").strip()
        except Exception as e:
            last_err = e
    raise RuntimeError(f"DeepSeek 调用失败: {last_err}")
# =====================================================================


def parse_txt_file(file_path: Path) -> List[Dict[str, Any]]:
    """
    从 txt 文件中解析调度问题数据，支持多个模型
    返回模型列表
    """
    content = file_path.read_text(encoding="utf-8", errors="ignore")
    print(f"\n=== 解析文件: {file_path.name} ===")
    print("原始开头片段:", content[:200].replace("\n", " ") + " ...")

    # 按模型分割 - 查找所有以 "0. article title:" 开头的模型
    model_pattern = r'0\.\s*article\s+title:\s*"[^"]*".*?(?=\s*0\.\s*article\s+title:|\Z)'
    model_matches = re.findall(model_pattern, content, re.DOTALL | re.I)
    
    models = []
    
    for i, model_content in enumerate(model_matches):
        print(f"\n--- 解析第 {i+1} 个模型 ---")
        result: Dict[str, Any] = {}
        
        # 0) 文章标题 - 修正正则表达式，去掉多余的反斜杠
        title_match = re.search(r'0\.\s*article\s+title:\s*"([^"]+)"', model_content, flags=re.I)
        if title_match:
            result['article_title'] = title_match.group(1).strip()
            print("解析到 article_title:", result['article_title'])
        else:
            result['article_title'] = ""
            print("未解析到 article_title")

        # 1) 问题描述 - 修正正则表达式
        desc_match = re.search(r'1\.\s*problem\s+description:\s*"([^"]*)"', model_content, flags=re.I)
        if desc_match:
            result['problem_description'] = desc_match.group(1)
            print("解析到问题描述(截断):", result['problem_description'][:100], "...")
        else:
            result['problem_description'] = ""

        # 2) 参数 - 修正正则表达式
        params_match = re.search(r'2\.\s*parameters:\s*(\[.*?\])', model_content, re.DOTALL | re.I)
        if params_match:
            try:
                result['parameters'] = json.loads(params_match.group(1))
                print(f"解析到 {len(result['parameters'])} 个参数")
            except json.JSONDecodeError as e:
                print("参数解析错误:", e)
                result['parameters'] = []
        else:
            result['parameters'] = []

        # 3) 决策变量 - 修正正则表达式
        vars_match = re.search(r'3\.\s*decision\s+variables:\s*(\[.*?\])', model_content, re.DOTALL | re.I)
        if vars_match:
            try:
                result['decision_variables'] = json.loads(vars_match.group(1))
                print(f"解析到 {len(result['decision_variables'])} 个决策变量")
            except json.JSONDecodeError as e:
                print("决策变量解析错误:", e)
                result['decision_variables'] = []
        else:
            result['decision_variables'] = []

        # 4) 目标函数 - 修正正则表达式
        obj_match = re.search(r'4\.\s*objective\s+function:\s*(\{.*?\})(?=\s*;|\s*$|\s*5\.)', model_content, re.DOTALL | re.I)
        if not obj_match:
            obj_match = re.search(r'4\.\s*objective\s+function:\s*(\{.*\})', model_content, re.DOTALL | re.I)
        if obj_match:
            obj_str = obj_match.group(1)
            try:
                result['objective_function'] = json.loads(obj_str)
            except json.JSONDecodeError:
                # 尝试手动抓取两个字段
                f = re.search(r'"function"\s*:\s*"([^"]*)"', obj_str)
                d = re.search(r'"description"\s*:\s*"([^"]*)"', obj_str)
                result['objective_function'] = {
                    "function": f.group(1) if f else "",
                    "description": d.group(1) if d else ""
                }
        else:
            result['objective_function'] = {}

        # 5) 约束 - 修正正则表达式
        cons_match = re.search(r'5\.\s*constraints:\s*(\[.*?\])', model_content, re.DOTALL | re.I)
        if cons_match:
            try:
                result['constraints'] = json.loads(cons_match.group(1))
                print(f"解析到 {len(result['constraints'])} 条约束")
            except json.JSONDecodeError as e:
                print("约束解析错误:", e)
                result['constraints'] = []
        else:
            result['constraints'] = []
        
        models.append(result)
    
    return models

def extract_related_variables_with_llm(constraint_function: str,
                                       constraint_description: str,
                                       decision_variables: List[Dict[str, str]]) -> List[str]:
    """
    用 DeepSeek 判断约束里出现了哪些决策变量符号。
    仅返回在给定列表中的符号。
    """
    var_symbols = [v.get('symbol', '') for v in decision_variables if isinstance(v, dict)]
    prompt = (
        "给定约束：\n"
        f"表达式：{constraint_function}\n"
        f"描述：{constraint_description}\n\n"
        f"候选决策变量符号列表：{var_symbols}\n\n"
        "请返回该约束中实际出现的符号，**只返回 JSON 数组**，例如：[\"C_{ijq}\", \"u_{ijq}\"]。"
    )
    try:
        text = ds_chat(prompt)
        # 优先尝试 JSON 解析
        try:
            arr = json.loads(text)
            if isinstance(arr, list):
                return [s for s in arr if s in var_symbols]
        except json.JSONDecodeError:
            pass
        # 不可解析 JSON 时做包含匹配作为兜底
        found = [s for s in var_symbols if s and s in text]
        return found
    except Exception as e:
        print("提取相关变量失败：", e)
        return []


def convert_to_standard_json_format(parsed: Dict[str, Any]) -> Dict[str, Any]:
    """
    将解析结果整理为统一 JSON。
    顶层新增 article_title；其余结构尽量保持你原来的风格。
    """
    problem_description = parsed.get('problem_description', '')
    parameters = parsed.get('parameters', [])
    decision_variables = parsed.get('decision_variables', [])
    objective_function = parsed.get('objective_function', {})
    constraints = parsed.get('constraints', [])
    article_title = parsed.get('article_title', '').strip()

    # 让模型根据描述生成一个标题/类型（若需要）
    title_prompt = (
        "Based on this problem description, generate a concise title for this scheduling problem and the type of scheduling problem (Job shop/flow shop/Hybrid flow shop/Flexible job shop/Distributed hybrid flow shop/Distributed permutation flow shop/Distributed job shop/Distributed flow shop/Distributed assembly job shop/Distributed flexible job shop, and so on) this problem belongs to.\n"
        f"Description: {problem_description[:300]}...\n"
        "Output format: Title|Type"
    )
    try:
        title_type = ds_chat(title_prompt).split("|")
        gen_title = title_type[0].strip() if len(title_type) > 0 else "Scheduling Problem"
        problem_type = title_type[1].strip() if len(title_type) > 1 else "Scheduling"
    except Exception:
        gen_title = "Scheduling Problem"
        problem_type = "Scheduling"

    # 输出结构（顶层加 article_title）
    result: Dict[str, Any] = {
        "article_title": article_title or gen_title,  # 优先用解析到的文章标题
        "title": gen_title,                           # 你之前生成的“问题标题”
        "type": problem_type,
        "description": problem_description,
        "Nomenclature": {
            "Parameters": [],
            "Decision Variables": [],
            "Domain terms": {
                "Makespan": "In production scheduling problems, the makespan is the maximum completion time of jobs"
            }
        },
        "Formulation": {
            "Objective Function": {
                "function": "",
                "gurobi_code": "",
                "description": ""
            },
            "Constraints": []
        }
    }

    # 参数
    for p in parameters:
        result["Nomenclature"]["Parameters"].append({
            "symbol": p.get("symbol", ""),
            "definition": p.get("definition", "")
        })

    # 决策变量（简单判别类型）
    for v in decision_variables:
        defin = v.get("definition", "")
        vtype = "Binary" if any(k in defin.lower() for k in ["binary", "equals 1", "0 otherwise", "1 if"]) else "Continuous"
        result["Nomenclature"]["Decision Variables"].append({
            "symbol": v.get("symbol", ""),
            "definition": defin,
            "type": vtype
        })

    # 目标函数
    if isinstance(objective_function, dict) and (objective_function.get("function") or objective_function.get("description")):
        result["Formulation"]["Objective Function"] = {
            "function": objective_function.get("function", ""),
            "description": objective_function.get("description", "")
        }
    else:
        result["Formulation"]["Objective Function"] = {
            "function": "minimize C_max and sum of T_i",
            "description": "Minimize makespan and total tardiness simultaneously"
        }

    # 约束（用模型提取相关决策变量）
    for i, c in enumerate(constraints):
        func = c.get("function", "")
        if not isinstance(func, list):
            func = [func]
        desc = c.get("description", "")
        text_for_vars = " ".join(func)
        related_vars = extract_related_variables_with_llm(text_for_vars, desc, result["Nomenclature"]["Decision Variables"])

        # 参数关联（简单包含匹配）
        related_params: List[str] = []
        simple_text = re.sub(r'[{}_\s]', '', text_for_vars)
        for p in result["Nomenclature"]["Parameters"]:
            sym = p["symbol"]
            if not sym:
                continue
            if sym in text_for_vars or re.sub(r'[{}_\s]', '', sym) in simple_text:
                related_params.append(sym)

        result["Formulation"]["Constraints"].append({
            "function": func,
            "description": desc,
            "related Parameters": related_params,
            "related Decision Variables": related_vars
        })

    return result


def process_one_txt(txt_path: Path, out_json_path: Path) -> None:
    models = parse_txt_file(txt_path)
    
    # 如果只有一个模型，保持原来的文件名
    if len(models) == 1:
        result = convert_to_standard_json_format(models[0])
        out_json_path.write_text(json.dumps(result, indent=2, ensure_ascii=False), encoding="utf-8")
        print(f"✅ 已保存：{out_json_path}")
    else:
        # 多个模型，为每个模型创建单独的文件
        for i, model in enumerate(models):
            # 修改输出文件名，添加模型索引
            model_out_path = out_json_path.parent / f"{out_json_path.stem}_model{i+1}{out_json_path.suffix}"
            result = convert_to_standard_json_format(model)
            model_out_path.write_text(json.dumps(result, indent=2, ensure_ascii=False), encoding="utf-8")
            print(f"✅ 已保存：{model_out_path}")

def main():
    outdir = OUT_DIR.expanduser().resolve()
    outdir.mkdir(parents=True, exist_ok=True)

    for pth in FILES:
        f = Path(pth).expanduser().resolve()
        if not f.exists():
            print(f"❌ Not found: {f}")
            continue
        out_path = outdir / f.with_suffix(".json").name
        process_one_txt(f, out_path)



if __name__ == "__main__":
    main()



=== 解析文件: 2.txt ===
原始开头片段: 0. article title: "Mathematical models for job-shop scheduling problems with routing and process plan flexibility"  1. problem description: "FJSP consists of a set of n independent jobs J = {j_i}_{i=1 ...

--- 解析第 1 个模型 ---
解析到 article_title: Mathematical models for job-shop scheduling problems with routing and process plan flexibility
解析到问题描述(截断): FJSP consists of a set of n independent jobs J = {j_i}_{i=1}^n, each having its own processing order ...
解析到 11 个参数
解析到 6 个决策变量
解析到 8 条约束

--- 解析第 2 个模型 ---
解析到 article_title: Mathematical models for job-shop scheduling problems with routing and process plan flexibility
解析到问题描述(截断): The FJSP-PPF considers multiple process plans for jobs by excluding the final assumption of FJSP. Th ...
解析到 11 个参数
解析到 6 个决策变量
解析到 8 条约束
✅ 已保存：/Users/yoosi/Desktop/Capstone/json_file/2_model1.json
✅ 已保存：/Users/yoosi/Desktop/Capstone/json_file/2_model2.json
