![](https://cdn.sa.net/2025/03/04/PZ2iwmofMgNzjeG.png)

基本原理很简单: 就是针对用户的问题, 利用R1这类推理模型强大的规划和识别能力, 持续不断的找出每一次迭代中的有用信息、识别缺失信息和无效信息, 直到找到一个满意的回答(条件:无法照出缺失信息)

inspire by https://build.nvidia.com/langchain/structured-report-generation

In [1]:
from dotenv import load_dotenv, find_dotenv
import os

_ = load_dotenv(find_dotenv())

# model instance
from langchain_openai import ChatOpenAI

r1_model = ChatOpenAI(
    model="deepseek-r1",
    temperature=0.6,
    api_key=os.getenv("QWEN_API_KEY"),
    base_url=os.getenv("QWEN_BASE_URL")
)

In [2]:
# Agent Prompts
from langchain_core.prompts import PromptTemplate

class Prompts:
    VALIDATE_RETRIEVAL = PromptTemplate(
        input_variables=["retrieved_context", "question"],
        template="""
        You are a retrieval validator.
        You will be provided with a question and chunks of text that may or may not contain the answer to the question.
        Your role is to carefully look through the chunks of text provide a JSON response with three fields:
        1. status: whether the retrieved chunks contain the answer to the question.
        - 'COMPLETE' if the retrieved chunks contain the answer to the question, 'INCOMPLETE' otherwise. Nothing else.
        
        2. useful_information: the useful information from the retrieved chunks. Be concise and direct.
        - if there is no useful information, set this to an empty string.
        
        3. missing_information: the missing information that is needed to answer the question in full. Be concise and direct.
        - if there is no missing information, set this to an empty string.
        
        Please provide your response as dictionary in the following format.

        {{"status": "<status>",
        "useful_information": "<useful_information>",
        "missing_information": "<missing_information>"}}
        
        Here is an example of the response format:
        
        {{"status": "COMPLETE",
        "useful_information": "The capital city of Canada is Ottawa.",
        "missing_information": "The capital city of Mexico"}}
    
        Do not include any other text.
        
        Context: {retrieved_context}
        
        The Question: {question}
        Response:
        """
    )
    ANSWER_QUESTION = PromptTemplate(
        input_variables=["retrieved_context", "question"],
        template="""
        You are a question answering agent.
        You will be provided with a question and chunks of text that contain the answer to the question.
        Your role is to carefully look through the chunks of text and answer the question.
        Provide a direct and concise answer based on the information provided.
        Do not include any additional information or commentary.
        
        The Question: {question}
        Context: {retrieved_context}
        Answer:
        """
    )

In [5]:
# Evaluator-Optimizer Workflow
from langgraph.graph import END, StateGraph
from typing_extensions import TypedDict
from tavily import TavilyClient
import json

class GraphState(TypedDict):
    question: str
    retrieved_context: str
    router_decision: str
    answer_to_question: str
    missing_information: str
    reasoning: str
    useful_information: str

class QAAgent:
    def __init__(self):
        self.tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
        self.workflow = self.create_workflow()

    def retrieve(self, state: GraphState):
        print("\n=== STEP 1: RETRIEVAL ===")
        question = state["question"]
        print("Searching for:", question)
        result = self.tavily_client.search(question, max_results=3)    
        retrieved_context = "\n".join([r["content"] for r in result["results"]])
        print("Retrieved Context: \n", retrieved_context)
        return {"retrieved_context": retrieved_context}

    def validate_retrieval(self, state: GraphState):
        print("\n=== STEP 2: VALIDATION ===")
        question = state["question"]
        retrieved_context = state["retrieved_context"]
        
        validation_chain = Prompts.VALIDATE_RETRIEVAL | r1_model
        llm_output = validation_chain.invoke({"retrieved_context": retrieved_context, "question": question}).content
        
        if "<think>" in llm_output and "</think>" in llm_output:
            reasoning = llm_output.split("<think>")[1].split("</think>")[0].strip()
            response = llm_output.split("</think>")[1].strip()
        else:
            reasoning = ""
            response = llm_output.strip()  # 将整个输出当作 response

        strcutured_response = json.loads(response)
        
        router_decision = strcutured_response["status"]
        missing_information = strcutured_response["missing_information"]
        useful_information = strcutured_response["useful_information"]
        print("router decision:", router_decision)
        print("missing information:", missing_information)
        print("useful information:", useful_information)

        if router_decision == "INCOMPLETE":
            print("Missing Information:", missing_information)

        return {"router_decision": router_decision, "retrieved_context": retrieved_context, "useful_information": useful_information, "missing_information": missing_information, "reasoning": reasoning}

    def answer(self, state: GraphState):
        print("\n=== STEP 3: ANSWERING ===")
        question = state["question"]
        context = state["retrieved_context"]

        answer_chain = Prompts.ANSWER_QUESTION | r1_model
        llm_output = answer_chain.invoke({"retrieved_context": context, "question": question}).content
        
        # 检查 <think> 标签是否存在
        if "<think>" in llm_output and "</think>" in llm_output:
            reasoning = llm_output.split("<think>")[1].split("</think>")[0].strip()
            answer = llm_output.split("</think>")[1].strip()
        else:
            reasoning = ""
            answer = llm_output.strip()  # 将整个输出当作 answer

        return {"answer_to_question": answer}

    def find_missing_information(self, state: GraphState):
        print("\n=== STEP 2b: FINDING MISSING INFORMATION ===")
        missing_information = state["missing_information"]
        print("Searching for:", missing_information)
        
        tavily_query = self.tavily_client.search(missing_information, max_results=3)
        previously_retrieved_useful_information = state["useful_information"]
        newly_retrieved_context = "\n".join([r["content"] for r in tavily_query["results"]])
        combined_context = f"{previously_retrieved_useful_information}\n{newly_retrieved_context}"
        print("newly retrieved context:", newly_retrieved_context)
        return {"retrieved_context": combined_context}

    @staticmethod
    def decide_route(state: GraphState):
        return state["router_decision"]


    # def create_workflow(self):
    #     workflow = StateGraph(GraphState)
        
    #     workflow.add_node("retrieve context", self.retrieve)
    #     workflow.add_node("answer", self.answer)
        
    #     workflow.set_entry_point("retrieve context")
    #     workflow.add_edge("retrieve context","answer")
    #     workflow.add_edge("answer", END)
    #     compiled_graph = workflow.compile()
    #     compiled_graph.get_graph(xray=1).draw_mermaid_png(output_file_path="agent-architecture.png")
    #     return compiled_graph

    def create_workflow(self):
        workflow = StateGraph(GraphState)
        
        workflow.add_node("retrieve context", self.retrieve)
        workflow.add_node("is retrieved context complete?", self.validate_retrieval)
        workflow.add_node("answer", self.answer)
        workflow.add_node("find missing information", self.find_missing_information)
        
        workflow.set_entry_point("retrieve context")
        workflow.add_edge("retrieve context", "is retrieved context complete?")
        
        workflow.add_conditional_edges(
            "is retrieved context complete?",
            self.decide_route,
            {
                "COMPLETE": "answer",
                "INCOMPLETE": "find missing information"
            }
        )
        workflow.add_edge("find missing information", "is retrieved context complete?")
    
        workflow.add_edge("answer", END)
        compiled_graph = workflow.compile()
        compiled_graph.get_graph(xray=1).draw_mermaid_png(output_file_path="agent-architecture.png")
        return compiled_graph

    def run(self, question: str):
        result = self.workflow.invoke({"question": question})
        return result["answer_to_question"]
    

In [6]:
agent = QAAgent()
answer = agent.run("告诉我什么NVIDIA 4090已经在中国被禁售了么? 香港可以买到4090么? 4090和A100在模型推理上性能相差很大么?")
print(answer)


=== STEP 1: RETRIEVAL ===
Searching for: 告诉我什么NVIDIA 4090已经在中国被禁售了么? 香港可以买到4090么? 4090和A100在模型推理上性能相差很大么?
Retrieved Context: 
 据业内人士称，只要品牌所属公司的主体在中国内地、港澳，就不能继续在国内生产和销售rtx 4090。 在未来一段时间内，中国玩家仍然能买到RTX 4090，但都是库存产品，尤其是在禁售前各厂商想尽办法囤积的， 而在这一批卖完之后，就没有了。
据业内人士称，只要品牌所属公司的主体在中国内地、港澳，就不能继续在国内生产和销售rtx 4090。 理论上，nvidia和台系品牌可以向美国商务部申请
快科技11月17日消息，据英伟达中国官网显示，rtx 4090信息已经被移除，不过除了这个型号，其余rtx 40系正常购买。截至笔者发稿前，英伟达中国官网

=== STEP 2: VALIDATION ===
router decision: INCOMPLETE
missing information: 4090与A100在模型推理上的性能差异
useful information: RTX 4090已被禁止在中国内地、港澳地区生产和销售，但库存产品仍可购买直至售完。英伟达中国官网已移除RTX 4090信息。
Missing Information: 4090与A100在模型推理上的性能差异

=== STEP 2b: FINDING MISSING INFORMATION ===
Searching for: 4090与A100在模型推理上的性能差异
newly retrieved context: 对比A100和4090：两者的区别以及适用点 自2022年年末英伟达发布4090芯片以来，这款产品凭借着其优异的性能迅速在科技界占据了一席之地。现如今，不论是在游戏体验、内容创作能力方面还是模型精度提升方面，4090都是一个绕不过去的名字。而A100作为早些发布的产品，其优异的能力和适配性已经为它打下了良好的口碑。RTX 4090芯片和A100芯片虽然都是高性能的GPU，但它们在设计理念、目标市场和性能特点上有着明显的区别，而本篇文章将简单概述两者的区别同时介绍一下二者的特性。 GPU 训练性能和成本对比 虽然A100被称为深度