In [6]:
import os
from dotenv import load_dotenv

load_dotenv()

from langchain_deepseek import ChatDeepSeek
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder, PromptTemplate

from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import AnyMessage, add_messages
from langgraph.checkpoint.memory import MemorySaver

from pydantic import BaseModel, Field, ConfigDict, field_validator
from typing import Annotated, List, Optional, Dict, Any, Literal
import json

In [8]:
llm = ChatDeepSeek(model="deepseek-chat", openai_api_key=os.getenv("DEEPSEEK_API_KEY"))

## 智能体间的通讯状态

In [5]:
class State(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)

    messages: Annotated[List[AnyMessage], add_messages] = Field(default_factory=list, title="对话列表")
    knowledge_results: list = Field(default=[], title="从存储数据结构知识图谱的向量数据库搜索到的相关知识点信息")
    current_knowledge_point: str = Field(default="", title="当前知识点")
    is_right: bool = Field(default=False, title="用户回复是否正确")
    next_node: str = Field(default="teacher_agent", title="下一个节点")
    success: bool = Field(default=False, title="节点执行是否成功")
    log: str = Field(default="", title="节点执行日志")

    @field_validator("messages", mode="before")
    def validate_messages(cls, v, info):
        if 'messages' in info.data:
            return add_messages(info.data['messages'], v)
        else:
            return v if isinstance(v, list) else [v]
    

## Router Agent

接收历史对话信息以及最新的用户回复进行路由选择

- 如果用户回复内容错误
    - 直接转向 Teacher Agent

- 如果用户回复内容正确，通过 rag 技术，在知识图谱中检索，进行知识点关联度判断
    - 关联度低，转向 Teacher Agent，进行总结
    - 关联度高，转向 Student Agent，进行追问
    - 关联度中，随机选择

In [6]:
class ReplyEvaluation(BaseModel):
    """
    用户回复评估结果
    - is_right: 用户回复是否正确
    - current_knowledge_point: 涉及的知识点
    """
    is_right: bool = Field(..., title="用户回复是否正确")
    current_knowledge_point: str = Field(..., title="涉及的主要知识点")


def router_agent(state: State) -> State:
    """根据当前状态进行路由"""
    try:
        conversation = ""
        for message in state.messages:
            if isinstance(message, HumanMessage):
                conversation += f"user: {message.content}\n"
            elif isinstance(message, AIMessage):
                conversation += f"assistant: {message.content}\n"

        # 首先进行用户回复正确性判断
        prompt = ChatPromptTemplate([
            ("system", "你是一名数据结构领域的专家，现在需要判断用户回复是否正确，并给出涉及的主要知识点"),
            ("human", "{conversation}"),
        ])
        reply_evaluation_chain = prompt | llm.with_structured_output(ReplyEvaluation)
        reply_evaluation = reply_evaluation_chain.invoke({"conversation": conversation})

        # 根据用户回复正确性判断结果进行路由
        if reply_evaluation.is_right:
            # 用户回复正确，进行知识图谱检索
            kg_result = retrieve_knowledge_for_routing(reply_evaluation.current_knowledge_point)
            if "error" in kg_result:
                return State(current_knowledge_point=reply_evaluation.current_knowledge_point, is_right=True, next_node="teacher_agent", success=False, log=f"知识图谱检索失败，转向TeacherAgent")
            
            # 根据关联度判断结果进行路由
            if kg_result["metadata"]["total_related_entities"] == 0:
                return State(current_knowledge_point=reply_evaluation.current_knowledge_point, is_right=True, next_node="teacher_agent", success=False, log=f"无相关知识点，转向TeacherAgent")
            
            if kg_result["metadata"]["high_relevance_count"] > 0:
                high_relevance_entities = []
                for re in kg_result["related_entities"]:
                    if re["relation"]["rs_score"] > 0.6:
                        high_relevance_entities.append(re)
                    return State(current_knowledge_point=reply_evaluation.current_knowledge_point, knowledge_results=high_relevance_entities, is_right=True, next_node="stutent_agent", success=False, log=f"关联度高，转向StudentAgent")

            if kg_result["metadata"]["medium_relevance_count"] > 0:
                medium_relevance_entities = []
                for re in kg_result["related_entities"]:
                    if re["relation"]["rs_score"] > 0.3:
                        medium_relevance_entities.append(re)
                return State(current_knowledge_point=reply_evaluation.current_knowledge_point, knowledge_results=medium_relevance_entities, is_right=True, next_node="random", success=False, log=f"关联度中,随机选择")
                
        else:
            return State(current_knowledge_point=reply_evaluation.current_knowledge_point, is_right=False, next_node="teacher_agent", success=True, log=f"用户回复错误，转向TeacherAgent")
            
            
    except Exception as e:
        return State(prompt="", success=False, log=str(e))

## Student Agent

根据从 Router Agent 转来的状态进行回复

In [None]:
def student_agent(state: State) -> State:
    """根据当前状态进行回复"""
    try:
        conversation = ""
        for message in state.messages:
            if isinstance(message, HumanMessage):
                conversation += f"user: {message.content}\n"
            elif isinstance(message, AIMessage):
                conversation += f"assistant: {message.content}\n"
        
        if state.knowledge_results:
            student_agent_prompt = PromptTemplate.from_template("""
            你是一名正在学习数据结构的学生， 针对当前知识点：{current_knowledge_point}，用户已经做了正确的讲解，请结合可能涉及的相关知识点：{knowledge_results}，进一步追问。
            """)
        else:
            student_agent_prompt = PromptTemplate.from_template("""
            你是一名正在学习数据结构的学生， 针对当前知识点：{current_knowledge_point}，用户已经做了正确的讲解，进一步追问。
            """)

        prompt = ChatPromptTemplate([
            ("system", student_agent_prompt),
            ("human", "{conversation}"),
        ])
        student_agent_chain = prompt | llm
        guidance = student_agent_chain.invoke({"conversation": conversation})
        return State(route="student_agent", success=True, log=f"StudentAgent生成成功", messages=[AIMessage(content=guidance)])
    
    except Exception as e:
        return State(route="student_agent", success=False, log=str(e))


## Teacher Agent

根据从路由智能体转来的状态进行回复

- 如果 is_right == False，即用户回答错误，teacher_agent 进行错误分类与讲解建议
- 如果 is_right == True，那根据 log 的情况进行回复
    - 知识图谱搜索失败，根据历史对话信息适当回复
    - 无关联知识点，总结评价
    - 中关联度，结合关联知识点，总结评价

In [9]:

def teacher_agent(state: State) -> State:
    """根据当前状态进行回复"""
    try:
        conversation = ""
        for message in state.messages:
            if isinstance(message, HumanMessage):
                conversation += f"user: {message.content}\n"
            elif isinstance(message, AIMessage):
                conversation += f"assistant: {message.content}\n"
        
        if state.is_right == False:
            teacher_agent_prompt = PromptTemplate.from_template("""
            你是一名数据结构领域的专家， 针对当前知识点：{current_knowledge_point}，用户回答错误，请给出讲解建议。
            """)    
        elif state.knowledge_results:
            teacher_agent_prompt = PromptTemplate.from_template("""
            你是一名数据结构领域的专家， 针对当前知识点：{current_knowledge_point}，用户回答正确，请结合可能涉及的相关知识点：{knowledge_results}，给出总结评价。
            """)
        else:
            teacher_agent_prompt = PromptTemplate.from_template("""
            你是一名数据结构领域的专家， 针对当前知识点：{current_knowledge_point}，用户回答正确，请给出总结评价。
            """)

        prompt = ChatPromptTemplate([
            ("system", teacher_agent_prompt),
            ("human", "{conversation}"),
        ])
        teacher_agent_chain = prompt | llm
        guidance = teacher_agent_chain.invoke({"conversation": conversation})
        return State(route="teacher_agent", success=True, log=f"TeacherAgent生成成功", messages=[AIMessage(content=guidance)])
    except Exception as e:
        return State(route="teacher_agent", success=False, log=str(e))


## Router

In [10]:
def router(state: State) -> State:
    """根据RouterAgent返回的状态进行路由"""
    if state.next_node == "teacher_agent":
        return "teacher_agent"
    elif state.next_node == "student_agent":
        return "student_agent"
    elif state.next_node == "random":
        # 从 stutent_agent 和 teacher_agent 中随机选择
        return random.choice(["student_agent", "teacher_agent"])
    else:
        return "__end__"


## MainGraph

In [11]:
workflow = StateGraph(State)

workflow.add_node("router", router)
workflow.add_node("student_agent", student_agent)
workflow.add_node("teacher_agent", teacher_agent)

workflow.add_conditional_edges(
    "router",
    router,
    {
        "student_agent": "student_agent",
        "teacher_agent": "teacher_agent",
        "__end__": END
    }
)

workflow.add_edge(START, "router")
workflow.add_edge("student_agent", "teacher_agent")
workflow.add_edge("teacher_agent", END)

memory = MemorySaver()

graph = workflow.compile(checkpointer=memory)


In [None]:
from IPython.display import Image, display

# Setting xray to 1 will show the internal structure of the nested graph
display(Image(graph.get_graph(xray=2).draw_mermaid_png()))