In [22]:
import operator
from typing import Annotated, List, Tuple, TypedDict
import os
from dotenv import load_dotenv
from typing import Literal
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.tools import tool
from langchain_community.chat_models import ChatZhipuAI
# pip install langgraph
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph, MessagesState
from langgraph.prebuilt import ToolNode
import requests
import json
from tools import search
import json

In [14]:
# 加载 .env 文件中的环境变量
load_dotenv(dotenv_path='D:/02File/05练习项目/04llm_learning/KEYs.env', override=True)
# 获取 API 密钥
llm_api_key = os.environ["ZHIPUAI_API_KEY"] 
tavily_api_key = os.environ["TAVILY_API_KEY"]

llm = ChatZhipuAI(
    model="GLM-4-Air",
    api_key=llm_api_key,
    temperature=0
)


# 定义一个TypedDict类PlanExecute，用于存储输入、计划、过去的步骤和响应
class PlanExecute(TypedDict):
    input: str
    plan: List[str]
    past_steps: Annotated[List[Tuple], operator.add]
    response: str

from pydantic import BaseModel, Field

In [124]:
# 定义工具函数，用于代理调用外部工具
from langchain_community.tools.tavily_search import TavilySearchResults
tavily_api_key = os.environ["TAVILY_API_KEY"]
# 创建TavilySearchResults工具，设置最大结果数为1
tools = [TavilySearchResults(max_results=1, tavily_api_key=tavily_api_key)]

In [125]:
from langgraph.prebuilt import create_react_agent

# 从LangChain的Hub中获取prompt模板，可以进行修改
# prompt = hub.pull("wfh/react-agent-executor")
# prompt.pretty_print()

# 创建一个REACT代理执行器，使用指定的LLM和工具，并应用从Hub中获取的prompt
agent_executor = create_react_agent(llm, tools)

In [129]:
agent_executor.invoke({"messages": [("user", "查找2024年巴黎奥运会100米自由泳决赛冠军")]})

{'messages': [HumanMessage(content='查找2024年巴黎奥运会100米自由泳决赛冠军', additional_kwargs={}, response_metadata={}, id='d574a847-ff50-409b-b017-3d7e288fe24b'),
  AIMessage(content='', additional_kwargs={'tool_calls': [{'function': {'arguments': '{"query": "2024年巴黎奥运会100米自由泳决赛冠军"}', 'name': 'tavily_search_results_json'}, 'id': 'call_-8751680482070084797', 'index': 0, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 24, 'prompt_tokens': 200, 'total_tokens': 224}, 'model_name': 'GLM-4-Air', 'finish_reason': 'tool_calls'}, id='run-384049e5-7fbb-4429-9a43-ccbe81c1c404-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': '2024年巴黎奥运会100米自由泳决赛冠军'}, 'id': 'call_-8751680482070084797', 'type': 'tool_call'}]),
  ToolMessage(content='[{"title": "男子100米决赛| 游泳| 2024年巴黎奥运会 - Olympics.com", "url": "https://www.olympics.com/zh/video/men-s-100m-freestyle-final-swimming-olympic-games-paris-2024", "content": "巴黎奥运会游泳男子100米自由泳决赛于2024年7月31日在巴黎拉德芳斯体育馆举行。潘展乐（中国）以46.40秒的新

In [112]:
import re
def extract_all_matches_or_return_full_text(text):
    pattern = r"'''json\s*([\s\S]*?)\s*'''"
    matches = re.findall(pattern, text)
    
    if matches:
        # 如果有匹配项，返回所有匹配结果组成的字符串
        return "\n".join(match.strip() for match in matches)
    else:
        # 如果没有匹配项，返回整个文本
        return text


In [17]:
# 定义一个Plan模型类，用于描述未来要执行的计划
class Plan(BaseModel):
    """未来要执行的计划"""

    steps: List[str] = Field(
        description="需要执行的不同步骤，应该按顺序排列"
    )

In [118]:
from langchain_core.prompts import ChatPromptTemplate

# 创建一个计划生成的提示模板
planner_prompt = ChatPromptTemplate.from_template("""
    你是一位高效的助手，擅长将复杂的任务分解为一系列简单、具体的步骤，以确保任务可以被准确无误地完成。

    要求：
    - 你的回答应该直接针对上述目标，提出一个简明的逐步计划。
    - 每个步骤都应该是独立且必需的，如果按顺序正确执行这些步骤，将会得到正确的最终答案。
    - 避免添加任何不必要的步骤或额外解释。
    - 确保每一步骤都包含了所有必要的信息，不跳过任何关键细节。
    - 最后一个步骤应当能够直接获得最终的答案。
    - 输出结果的前后不要出现"json"或单引号等无关字符串。

    根据以上要求，以下是基于示例目标的生成逐步计划：
    示例：
    “计算从2023年1月1日至2025年5月4日之间的总天数。”

    结果：
    '''
    {{
    "plan":[
    "确认起始日期为2023年1月1日，结束日期为2025年5月4日",
    "计算2023年的剩余天数（从1月1日起）",
    "加上2024年的全年天数（注意检查是否为闰年）",
    "加上2025年直到5月4日为止的天数",
    "将上述步骤中得到的所有天数加起来，得出总天数作为最终答案"
    ]
    }}
    '''

    现在，请根据上述格式和要求，为以下目标制定一个逐步计划：
    {messages}
    """)

In [113]:
def parse_plan(res: AIMessage) -> List[str]:
    """
    解析计划字符串，将其转换为列表。
    """
    plan =  extract_all_matches_or_return_full_text(res.content)
    # 使用json.loads将字符串转换为Python列表
    try:
        return json.loads(plan)
    except json.JSONDecodeError:
        # 如果解析失败，返回一个空列表
        return []

In [119]:
# 使用指定的提示模板创建一个计划生成器，使用OpenAI的ChatGPT-4o模型
# planner = planner_prompt | llm.with_structured_output(Plan, include_raw=True)
planner = planner_prompt | llm

# 调用计划生成器，询问“当前澳大利亚公开赛冠军的家乡是哪里？”
res = planner.invoke({"messages":"2024年巴黎奥运会100米自由泳决赛冠军的家乡是哪里?请用中文答复"})

# 转换为 Python 列表
res_list = json.loads(res.content)

print(res_list)

{'plan': ['确定2024年巴黎奥运会100米自由泳决赛的冠军', '查找该冠军的个人信息', '在个人信息公开信息中找到其家乡']}


In [120]:
res

AIMessage(content='{\n"plan":[\n"确定2024年巴黎奥运会100米自由泳决赛的冠军",\n"查找该冠军的个人信息",\n"在个人信息公开信息中找到其家乡"\n]\n}', additional_kwargs={}, response_metadata={'token_usage': {'completion_tokens': 41, 'prompt_tokens': 338, 'total_tokens': 379}, 'model_name': 'GLM-4-Air', 'finish_reason': 'stop'}, id='run-03bcdaae-14b2-4993-a9bb-5e9457dce91c-0')

In [121]:
planner = planner_prompt | llm | parse_plan

In [122]:
planner.invoke({"messages":"2024年巴黎奥运会100米自由泳决赛冠军的家乡是哪里?请用中文答复"})

{'plan': ['确定2024年巴黎奥运会100米自由泳决赛的冠军', '查找该冠军的个人信息', '在个人信息公开信息中找到其家乡']}

In [20]:
res

AIMessage(content='[\n    "查找现任澳网冠军的信息",\n    "确认澳网冠军的姓名",\n    "搜索该冠军的个人信息或传记资料",\n    "在资料中找到其家乡信息",\n    "记录下澳网冠军的家乡作为最终答案"\n]', additional_kwargs={}, response_metadata={'token_usage': {'completion_tokens': 57, 'prompt_tokens': 330, 'total_tokens': 387}, 'model_name': 'GLM-4-Air', 'finish_reason': 'stop'}, id='run-0bdd912a-689b-4cfe-8efb-5ede75781e26-0')

In [62]:
from typing import Union


# 定义一个响应模型类，用于描述用户的响应
class Response(BaseModel):
    """用户响应"""

    response: str


# 定义一个行为模型类，用于描述要执行的行为
class Act(BaseModel):
    """要执行的行为"""

    action: Union[Response, Plan] = Field(
        description="要执行的行为。如果要回应用户，使用Response。如果需要进一步使用工具获取答案，使用Plan。"
    )


# 创建一个重新计划的提示模板
replanner_prompt = ChatPromptTemplate.from_template(
    """
    你是一个擅长任务拆解和计划更新的助手。你的任务是根据给定的目标、原始计划和已完成的步骤，动态更新当前的执行计划。
    
    规则：
    - 每个步骤必须独立且完整，确保可以被正确执行。
    - 不要添加任何多余或重复的步骤。
    - 只保留尚未完成的步骤，已完成的不要出现在新计划中。
    - 最后一步应直接产出最终答案。
    - 如果所有步骤已完成，请使用 **Response** 返回结果。
    - 如果还需要继续执行，请使用 **Plan** 列出剩余步骤。
    - 确保每一步都包含所有必要的信息，不跳过关键细节。
    - 输出结果必须为JSON格式。
    - 结果中不要出现"json"等无关字符串。
    
    你的目标是：
    {input}
    
    原始计划为：
    {plan}
    
    已完成的步骤为：
    {past_steps}
    
    请相应地更新计划，并按照以下格式输出：

    {{
      "plan":[
        "第一步说明",
        "第二步说明" 
      ]
    }}
    
    或
    
    {{
      "Response":"最终答案"
    }}
    """
    )

# 使用指定的提示模板创建一个重新计划生成器，使用OpenAI的ChatGPT-4o模型
replanner = replanner_prompt | llm | parse_plan
# replanner = replanner_prompt | llm 

In [63]:
# p = PlanExecute(
#     input = '2024年巴黎奥运会100米自由泳决赛冠军的家乡是哪里',
#     plan = ['查找2024年巴黎奥运会100米自由泳决赛冠军的名字', '查找该冠军的家乡'],
#     past_steps = [('查找2024年巴黎奥运会100米自由泳决赛冠军的名字', '2024年巴黎奥运会男子100米自由泳决赛的冠军是中国选手潘展乐（Zhanle Pan）。')]
# )
p = PlanExecute(
    input = '2024年巴黎奥运会100米自由泳决赛冠军的家乡是哪里',
    plan = ['查找潘展乐的家乡'],
    past_steps = [('查找2024年巴黎奥运会100米自由泳决赛冠军的名字', '2024年巴黎奥运会男子100米自由泳决赛的冠军是中国选手潘展乐（Zhanle Pan）。'), ('查找潘展乐的家乡', '潘展乐的家乡是浙江温州。')]
)
replanner.invoke(p)

{'Response': '潘展乐的家乡是浙江温州。'}

In [130]:
from typing import Literal


# 定义一个异步主函数

# 定义一个异步函数，用于执行步骤
def execute_step(state: PlanExecute):
    plan = state["plan"]
    plan_str = "\n".join(f"{i + 1}. {step}" for i, step in enumerate(plan))
    task = plan[0]
    task_formatted = f"""
    对于以下计划：
    {plan_str}
    你的任务是执行第{1}步，
    {task}。
    """
    agent_response = agent_executor.invoke({"messages": task_formatted})
    print(f"智能体查询结果（看有没有function call）agent_res: {agent_response}")
    return {
        "past_steps": state["past_steps"] + [(task, agent_response["messages"][-1].content)],
    }

# 定义一个异步函数，用于生成计划步骤
def plan_step(state: PlanExecute):
    plan = planner.invoke({"messages": state["input"]})
    print(plan)
    return plan

# 定义一个异步函数，用于重新计划步骤
def replan_step(state: PlanExecute):
    output = replanner.invoke(state)
    return output

# 定义一个函数，用于判断是否结束
# 改一下这里**********
def should_end(state: PlanExecute) -> Literal["agent", "__end__"]:
    if "response" in state and state["response"]:
        return "__end__"
    else:
        return "agent"

In [85]:
from langgraph.graph import StateGraph, START

# 创建一个状态图，初始化PlanExecute
workflow = StateGraph(PlanExecute)

# 添加计划节点
workflow.add_node("planner", plan_step)

# 添加执行步骤节点
workflow.add_node("agent", execute_step)

# 添加重新计划节点
workflow.add_node("replan", replan_step)

# 设置从开始到计划节点的边
workflow.add_edge(START, "planner")

# 设置从计划到代理节点的边
workflow.add_edge("planner", "agent")

# 设置从代理到重新计划节点的边
workflow.add_edge("agent", "replan")

# 添加条件边，用于判断下一步操作
workflow.add_conditional_edges(
    "replan",
    # 传入判断函数，确定下一个节点
    should_end,
)

# 编译状态图，生成LangChain可运行对象
app = workflow.compile()

In [131]:
# 设置配置，递归限制为50
config = {"recursion_limit": 50}
# 输入数据
inputs = {"input": "2024年巴黎奥运会100米自由泳决赛冠军的家乡是哪里?请用中文答复"}
# 异步执行状态图，输出结果
for event in app.stream(inputs, config=config):
    # print(event)
    for k, v in event.items():
        if k != "__end__":
            print(v)

{'plan': ['确定2024年巴黎奥运会100米自由泳决赛的冠军', '查找该冠军的个人信息', '在个人信息公开信息中找到其家乡']}
{'plan': ['确定2024年巴黎奥运会100米自由泳决赛的冠军', '查找该冠军的个人信息', '在个人信息公开信息中找到其家乡']}
{'past_steps': [('确定2024年巴黎奥运会100米自由泳决赛的冠军', '很抱歉，但作为一个人工智能，我无法预测未来的事件，包括体育比赛的结果。因此，我无法确定2024年巴黎奥运会100米自由泳决赛的冠军是谁。这个结果需要在比赛结束后才能得知。如果您需要关于奥运会或其他体育赛事的信息，我可以提供历史数据或者相关的背景信息。')]}
{'plan': ['查找2024年巴黎奥运会100米自由泳决赛冠军的个人信息', '在个人信息公开信息中找到其家乡']}
{'past_steps': [('确定2024年巴黎奥运会100米自由泳决赛的冠军', '很抱歉，但作为一个人工智能，我无法预测未来的事件，包括体育比赛的结果。因此，我无法确定2024年巴黎奥运会100米自由泳决赛的冠军是谁。这个结果需要在比赛结束后才能得知。如果您需要关于奥运会或其他体育赛事的信息，我可以提供历史数据或者相关的背景信息。'), ('查找2024年巴黎奥运会100米自由泳决赛冠军的个人信息', '2024年巴黎奥运会100米自由泳决赛的冠军是中国的潘展乐，他以46.40秒的新世界纪录成绩夺得了金牌。目前，我找到了关于冠军的信息，但是没有直接提供个人信息的详细内容。要获取潘展乐的个人信息，可能需要进一步查找相关资料或访问特定网站。接下来，我将尝试找到他的个人信息公开信息，以确定他的家乡。')]}
{'plan': ['在个人信息公开信息中找到潘展乐的家乡']}
{'past_steps': [('确定2024年巴黎奥运会100米自由泳决赛的冠军', '很抱歉，但作为一个人工智能，我无法预测未来的事件，包括体育比赛的结果。因此，我无法确定2024年巴黎奥运会100米自由泳决赛的冠军是谁。这个结果需要在比赛结束后才能得知。如果您需要关于奥运会或其他体育赛事的信息，我可以提供历史数据或者相关的背景信息。'), ('确定2024年巴黎奥运会100米自由泳决赛的冠军', '很抱歉，但作为一个人工智能，我无法预测

InvalidUpdateError: Expected dict, got []
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/INVALID_GRAPH_NODE_RETURN_VALUE

In [94]:
p = PlanExecute(
    input = '2024年巴黎奥运会100米自由泳决赛冠军的家乡是哪里?请用中文答复'
)
plan_step(p)

[]


[]

In [95]:
planner.invoke({"messages": '2024年巴黎奥运会100米自由泳决赛冠军的家乡是哪里?请用中文答复'})

[]