In [2]:
# 使用 python-dotenv 加载 .env 文件
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.messages import HumanMessage, SystemMessage

import os

# 加载当前目录下的 .env 文件
load_dotenv(override=True)

# 获取环境变量
api_key = os.getenv("API_KEY")
base_url = os.getenv("BASE_URL")
model_name = os.getenv("MODEL_NAME")


# 初始化模型
model = ChatOpenAI(
    base_url=base_url,
    model=model_name,
    api_key=api_key,
    temperature=1,
)

In [3]:
from typing import List, TypedDict, Annotated, Tuple, Literal, Optional, Any, Dict
from langchain.tools import tool
from langchain.messages import HumanMessage, SystemMessage
from prompt import CODE_GENERATOR_SYSTEM_PROMPT, CODE_INTERPRETER_AGENT_PROMPT
from utils import extract_python_code
from langchain.agents import create_agent
import json


# 构建sub agent 作为bi数据分析 然后会集成到主agent的tool中
class BIAgentState(TypedDict):
    goal: str  # 目标
    data_context: Dict[str, Any]  # 数据上下文（文件路径、schema等）
    generated_code: str  # 当前生成的代码
    execution_result: str  # 执行结果
    error_message: str  # 错误信息
    analysis: str  # 分析结论
    iteration_count: int  # 重试次数
    max_iterations: int  # 最大重试次数


@tool
def generate_code(
    task_description: str, data_context: dict, previous_error: str = None
) -> str:
    """
    生成Python数据分析代码。

    这个工具会调用LLM生成高质量的、可执行的Python代码。

    Args:
        task_description: 要完成的任务描述，例如："分析data.csv的销售趋势"
        data_context: 数据上下文信息（JSON字符串），包含文件路径、已知schema等。
                     例如：'{"file_path": "./data.csv", "schema": {"columns": ["date", "sales"]}}'
        previous_error: 可选，上一次代码执行的错误信息。如果提供，将生成修复后的代码。

    Returns:
        生成的Python代码字符串（不含markdown代码块标记）

    Examples:
        >>> # 首次生成代码
        >>> code = generate_python_code(
        ...     task_description="统计data.csv的基本信息",
        ...     data_context='{"file_path": "./data.csv"}'
        ... )

        >>> # 基于错误修复代码
        >>> code = generate_python_code(
        ...     task_description="统计data.csv的基本信息",
        ...     data_context='{"file_path": "./data.csv"}',
        ...     previous_error="FileNotFoundError: [Errno 2] No such file or directory: 'data.csv'"
        ... )
    """

    print(task_description, data_context, previous_error)

    err_prompt = ""
    data_context_prompt = ""

    if data_context:
        try:
            context_dict = (
                json.loads(data_context)
                if isinstance(data_context, str)
                else data_context
            )
            context_str = json.dumps(context_dict, ensure_ascii=False, indent=2)
            data_context_prompt = f"**数据上下文:**\n```json\n{context_str}\n```\n"
        except:
            data_context_prompt = f"**数据上下文:**\n{data_context}\n"

    if previous_error:
        err_prompt = f"""
**⚠️ 之前的代码执行失败了！错误信息如下:**
```
{previous_error}
```

**请仔细分析错误原因，生成修复后的代码。**
常见修复方法:
- FileNotFoundError: 检查文件路径，使用绝对路径或确认相对路径正确
- KeyError: 先打印df.columns查看实际列名，不要假设列名
- ValueError: 检查数据类型，必要时进行类型转换
- ImportError: 确保导入了所有必要的库
"""

    # 构造messages
    messages = [
        SystemMessage(content=CODE_GENERATOR_SYSTEM_PROMPT),
        HumanMessage(
            content=f"""
        **任务描述:**
        {task_description}
        {data_context_prompt}
        {err_prompt}
        **输出要求:**
        1. 只输出Python代码，不要有任何解释文字
        2. 代码必须放在 ```python 代码块内
        3. 代码要完整、可直接执行
        4. 使用print()输出关键信息
    """
        ),
    ]

    # 调用llm
    model = ChatOpenAI(
        base_url=base_url,
        model=model_name,
        api_key=api_key,
        temperature=0.2,  # 代码生成使用较低温度，保证稳定性
    )

    result = model.invoke(messages)

    raw_content = result.content
    # 提取python代码（去除markdown标志）
    code = extract_python_code(raw_content)
    if not code:
        # 如果没有提取到代码块，可能LLM直接返回了代码
        code = raw_content.strip()

    return code


@tool
def execute_code(code: str):
    """运行python脚本

    Args:
        code (str): 要运行的python脚本

    Returns:
        _type_: _description_
    """
    import subprocess

    result = subprocess.run(
        ["python", "-c", code], capture_output=True, text=True, timeout=10
    )
    return result.stdout if result.returncode == 0 else result.stderr


bi_agent = create_agent(
    model=model, tools=[generate_code], system_prompt=CODE_INTERPRETER_AGENT_PROMPT
)


response = bi_agent.stream(
    {
        "messages": [
            {
                "role": "user",
                "content": f"""请完成以下数据分析任务：

## 用户查询
分析一下 ./data.csv中的数据

## 数据上下文
```json
无
```

请按照标准流程开始执行任务。
""",
            }
        ]
    },
    stream_mode=["updates", "messages"],
)

for chunk in response:
    print(chunk)

('messages', (AIMessageChunk(content='我', additional_kwargs={}, response_metadata={'model_provider': 'openai'}, id='lc_run--956f5c78-aec8-4012-a1f5-6bf8cfde348a'), {'langgraph_step': 1, 'langgraph_node': 'model', 'langgraph_triggers': ('branch:to:model',), 'langgraph_path': ('__pregel_pull', 'model'), 'langgraph_checkpoint_ns': 'model:06045b12-f09b-e0ba-fdbf-a62ed2f8fd78', 'checkpoint_ns': 'model:06045b12-f09b-e0ba-fdbf-a62ed2f8fd78', 'ls_provider': 'openai', 'ls_model_name': 'qwen3-max', 'ls_model_type': 'chat', 'ls_temperature': 1.0}))
('messages', (AIMessageChunk(content='需要', additional_kwargs={}, response_metadata={'model_provider': 'openai'}, id='lc_run--956f5c78-aec8-4012-a1f5-6bf8cfde348a'), {'langgraph_step': 1, 'langgraph_node': 'model', 'langgraph_triggers': ('branch:to:model',), 'langgraph_path': ('__pregel_pull', 'model'), 'langgraph_checkpoint_ns': 'model:06045b12-f09b-e0ba-fdbf-a62ed2f8fd78', 'checkpoint_ns': 'model:06045b12-f09b-e0ba-fdbf-a62ed2f8fd78', 'ls_provider': '

In [None]:
# 基于plan and execute 作为主agent
from pydantic import BaseModel, Field
from typing import List, TypedDict, Annotated, Tuple, Literal, Optional
import operator
from prompt import PLANNER_SYSTEM_PROMPT, EXECUTOR_SYSTEM_PROMPT, REPLAN_SYSTEM_PROMPT
from langchain.agents import create_agent


class AgentState(TypedDict):
    user_query: str  # 用户问题
    steps: List[str]  # 将要执行的计划
    past_steps: Annotated[
        List[Tuple], operator.add
    ]  # 已经执行了的步骤 [step,result]元组
    response: str


class Plan(BaseModel):
    """执行计划"""

    steps: List[str] = Field(description="要遵循的不同步骤，应按顺序排列")


def plan_node(state: AgentState):

    # 构造messages
    messages = [
        SystemMessage(content=PLANNER_SYSTEM_PROMPT),
        HumanMessage(
            f"""
**用户问题：** "{state['steps']}"

**文件信息：**
- **File Path**: `./data.csv`
- **Known Schema**: 无 
(如果 Known Schema 为空，请先制定计划去获取它。)
"""
        ),
    ]

    planer = model.with_structured_output(Plan)

    result = planer.invoke(messages)

    return {steps: result.steps}


def execute_node(state: AgentState):
    # 拿到当前要执行的步骤
    current_plan = state["steps"][0]
    # 拿到之前完成了的步骤和结果
    history_steps = "暂无历史记录(这是第一步)"

    if state["past_steps"]:
        history_steps = "\n".join(
            f"步骤:{step}\n结果:{result}\n" for step, result in state["past_steps"]
        )
    # 构造messages
    messages = [
        SystemMessage(content=EXECUTOR_SYSTEM_PROMPT),
        HumanMessage(
            content=f"""请利用工具执行以下具体任务：

### 1. 项目背景 (Context)
**总体目标**: {state['steps']}

### 2. 参考资料 (Execution History)
*以下是前序步骤的执行结果，请从中提取你需要的参数（如ID、链接、关键数据），不要重复做这些工作：*
----------------------------------------
{history_steps}
----------------------------------------

### 3. 当前任务 (Current Mission)
**请立即执行此步骤**: {current_plan}
"""
        ),
    ]

    executor = create_agent(
        tools=[],
        model=model,
    )

    # 调用执行当前节点
    response = executor.invoke(input={"messages": messages})

    return {"past_steps": [(current_plan, response["messages"][-1].content)]}


class Replan(BaseModel):
    """重规划者的决策结果"""

    status: Literal["done", "continue"] = Field(
        description="如果是 'done'，表示任务已完成。如果是 'continue'，表示需要执行新计划。"
    )
    new_plan: Optional[List[str]] = Field(
        default=None,
        description="如果 status 是 'continue'，这里必须包含剩余的、更新后的步骤列表。",
    )
    final_response: Optional[str] = Field(
        default=None,
        description="如果 status 是 'done'，这里必须包含回答用户问题的最终完整回复。",
    )


def replan_node(state: AgentState):

    # 拿到所有还没执行的步骤
    current_plan_list = "无"

    if state["steps"]:
        current_plan_list = "".join(f"{step}," for step in state["steps"])

    # 拿到之前执行完了的步骤和结果
    history_steps = "暂无历史记录"

    if state["past_steps"]:
        history_steps = "\n".join(
            f"步骤:{step}\n结果:{result}\n" for step, result in state["past_steps"]
        )

    # 构造messages
    messages = [
        SystemMessage(content=REPLAN_SYSTEM_PROMPT),
        HumanMessage(
            content=f"""
        请基于以下最新的项目状态进行决策：

# 1. 原始目标 (Goal)
{state['user_query']}

# 2. 当前剩余计划 (Current Plan)
{current_plan_list}

# 3. 执行历史档案 (Execution History)
----------------------------------------
{history_steps}
----------------------------------------
"""
        ),
    ]

    replaner = model.with_structured_output(Replan)

    result = replaner.invoke(messages)

    print(result, "replaner")

    if result.status == "done":
        return {"steps": [], "response": result.final_response}
    else:
        return {"steps": result.new_plan}


# langgraph图表创建
from langgraph.graph import StateGraph, START, END

workflow = StateGraph(AgentState)

workflow.add_node("planner", plan_node)

workflow.add_node("executor", execute_node)

workflow.add_node("replaner", replan_node)

workflow.add_edge(START, "planner")

workflow.add_edge("planner", "executor")

workflow.add_edge("executor", "replaner")


# 判断是否执行步骤还是结束
def should_end(state: AgentState):
    if "response" in state and state["response"]:
        return END
    else:
        return "executor"


workflow.add_conditional_edges("replaner", should_end)

app = workflow.compile()