# 1、Env

In [None]:
import os

In [None]:
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())

In [None]:
model = os.getenv("MODEL")
base_url = os.getenv("BASE_URL")
api_key = os.getenv("API_KEY")
model_type = os.getenv("MODEL_TYPE")

# 2、LLM Model

In [None]:
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(openai_api_base=base_url, model=model, openai_api_key=api_key)

# 3、Prompt

In [None]:
from langchain_core.prompts import ChatPromptTemplate
prompt = "你是一个操作系统专家，可以执行终端命令，现在是一台windows操作系统，你可以执行命令并获取系统中的信息。请根据用户的需求执行命令并返回结果。"
primary_assistant_prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system", prompt,
            ),
            ("placeholder", "{messages}"),
        ]
    )

# 4、RAG

# 5、TOOLS

In [None]:
import subprocess
from langchain_core.tools import tool
from langchain_core.messages import AnyMessage


@tool
def exec_cmd(cmd: AnyMessage):
    """
    执行终端命令

    Args:
        cmd (str): 需要执行的命令

    Returns:
        dict: A dictionary containing:
            - output: The standard output of the command.
            - error: The standard error of the command.
    """
    import subprocess
    import shlex

    process = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    output, error = process.communicate()
    return {
        "output": output.decode(),
        "error": error.decode()
    }


@tool
def execute_python_code(code: str) -> str:
    """执行python代码

    Args:
        code (str): 需要执行的python代码

    Returns:
        str: 执行完成后，终端打印的内容
    """
    try:
        process = subprocess.run(['python', '-c', code], capture_output=True, text=True, timeout=10)
        if process.returncode == 0:
            return process.stdout.strip()
        else:
            return f"Error: {process.stderr.strip()}"
    except subprocess.TimeoutExpired:
        return "Error: Python code execution timed out."
    except FileNotFoundError:
        return "Error: Python interpreter not found."
    except Exception as e:
        return f"Error: {e}"


tools = [exec_cmd, execute_python_code]
tools

In [None]:
tools[0].invoke("dir D:")

In [None]:
from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda
from langgraph.prebuilt import ToolNode

def handle_tool_error(state) -> dict:
    """
    Function to handle errors that occur during tool execution.
    
    Args:
        state (dict): The current state of the AI agent, which includes messages and tool call details.
    
    Returns:
        dict: A dictionary containing error messages for each tool that encountered an issue.
    """
    # Retrieve the error from the current state
    error = state.get("error")
    
    # Access the tool calls from the last message in the state's message history
    tool_calls = state["messages"][-1].tool_calls
    
    # Return a list of ToolMessages with error details, linked to each tool call ID
    return {
        "messages": [
            ToolMessage(
                content=f"Error: {repr(error)}\n please fix your mistakes.",  # Format the error message for the user
                tool_call_id=tc["id"],  # Associate the error message with the corresponding tool call ID
            )
            for tc in tool_calls  # Iterate over each tool call to produce individual error messages
        ]
    }

def create_tool_node_with_fallback(tools: list) -> dict:
    """
    Function to create a tool node with fallback error handling.
    
    Args:
        tools (list): A list of tools to be included in the node.
    
    Returns:
        dict: A tool node that uses fallback behavior in case of errors.
    """
    # Create a ToolNode with the provided tools and attach a fallback mechanism
    # If an error occurs, it will invoke the handle_tool_error function to manage the error
    return ToolNode(tools).with_fallbacks(
        [RunnableLambda(handle_tool_error)],  # Use a lambda function to wrap the error handler
        exception_key="error"  # Specify that this fallback is for handling errors
    )


In [None]:
toolnode = create_tool_node_with_fallback(tools)

In [None]:
toolnode

In [None]:
tools[0].args_schema

In [None]:
model_with_tool = llm.bind_tools(tools)

In [None]:
model_with_tool.kwargs

In [None]:
response = model_with_tool.invoke("我想查看D盘的文件夹，请帮我罗列出来")
response

In [None]:
response.tool_calls

In [None]:
response.tool_calls[0]['args']['cmd']

In [None]:
import json
tools[0].invoke(response.tool_calls[0]['args']['cmd'])

# Memory

In [None]:
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()

# Graph

In [None]:
from typing import Annotated
from typing_extensions import TypedDict

from langchain_core.messages import AnyMessage, SystemMessage, ToolMessage
from langgraph.graph.message import add_messages
from langgraph.graph import StateGraph, START, END


class SubGraphState(TypedDict):
    message: Annotated[list[AnyMessage], add_messages]


class CustomGraph:
    """
    创建一个图形化的AI代理，使用LLM和工具进行交互。
    """
    def __init__(self, llm, prompt="", tools=[], checkpointer=None):
        self.prompt = prompt
        self.llm = llm.bind_tools(tools)
        self.tools = [{t.name: t} for t in tools]
        self.checkpointer = checkpointer
        
        graph = StateGraph(SubGraphState)
        graph.add_node("llm", self.call_llm)
        graph.add_node("action", self.call_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        
        self.graph = graph.compile(checkpointer=self.checkpointer, interrupt_before=["action"])
    
    def call_llm(self, state: SubGraphState) -> SubGraphState:
        query_message = state["message"]
        if self.prompt:
            query_message = [SystemMessage(content=self.prompt)] + query_message
        message= self.llm.invoke(query_message)
        return {"message": [message]}

    def call_action(self, state: SubGraphState):
        # Safely retrieve tool_calls from the last message
        last_message = state['message'][-1]
        tool_calls = getattr(last_message, 'tool_calls', [])
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            if t['name'] in self.tools:
                result = self.tools[t['name']].invoke(t['args'])
                results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
            else:
                print(f"Tool {t['name']} not found.")
        print("Back to the model!")
        return {'message': results}
    
    def exists_action(self, state: SubGraphState):
        # Safely check if tool_calls exist in the last message
        last_message = state['message'][-1]
        tool_calls = getattr(last_message, 'tool_calls', [])
        return len(tool_calls) > 0


In [None]:
agent = CustomGraph(llm, prompt, tools, memory)
agent

In [None]:
from langchain_core.messages import HumanMessage

In [None]:
thread = {"configurable": {"thread_id": "1"}}

message = "我想查看D盘的文件夹，请帮我罗列出来"
message = HumanMessage(content=message)
agent.graph.invoke({"messages": message}, thread)  # 这里是一个示例调用，可以根据需要修改

In [None]:
def stream_graph_updates(user_input: str):
    # 初始化一个变量来累积输出
    accumulated_output = []

    for event in agent.graph.stream({"messages": [("user", user_input)]}, thread):
        for value in event.values():
            # 将模型回复的内容添加到累积输出中
            accumulated_output.append(value["messages"][-1].content)

    # 返回累积的输出
    return accumulated_output

# while True:
#     try:
#         user_input = input("用户提问: ")
#         if user_input.lower() in ["退出", "quit"]:
#             print("下次再见！")
#             break

#         # 获取累积的输出
#         updates = stream_graph_updates(user_input)

#         # 打印最后一个输出
#         if updates:
#             print("模型回复:")
#             print(updates[-1])
#     except:
#         break

In [None]:
stream_graph_updates("你好")