## Reflexion: 有強迫症的完美主義者

## 什麼是反思代理人（Reflexion Agent）？

反思代理人（Reflexion Agent）是一種具備「自我反思能力」的智能體架構。與傳統的行動導向代理人不同，反思代理人不僅僅執行任務或根據環境反應，而是能夠**在執行過程中分析自己的決策、識別錯誤、並自我改進**。這使得代理人能夠在長期任務中逐步提升表現，達成更高層次的智能行為。

反思代理人的核心概念是「反思循環（Reflexion Loop）」，其工作流程通常包含以下幾個步驟：  

1. **行動執行（Action Execution）**：代理人根據當前策略或任務目標執行行動。  
2. **結果評估（Outcome Evaluation）**：代理人觀察行動結果，並比較預期結果與實際結果之間的差異。  
3. **反思與推理（Reflection and Reasoning）**：代理人分析造成錯誤或效率低下的原因，生成新的策略或改進方案。  
4. **策略更新（Strategy Update）**：根據反思結果，代理人調整自己的決策模型或行動計劃。  

透過這種循環，反思代理人能夠像人類一樣「學習如何學習」，不斷優化自身行為。

---

## Langgraph 中的反思代理人

在 **Langgraph** 框架中，反思代理人（Reflexion Agent）通常結合了**記憶模組（Memory Module）**與**推理模組（Reasoning Module）**，用以支援自我監控與策略修正的過程。  

其主要特點包括：  

- **任務回顧（Task Review）**：Langgraph 允許代理人記錄過去的任務過程與結果，作為反思依據。  
- **錯誤分析（Error Analysis）**：代理人可自動生成「自我評估報告」，指出任務中的失誤與改進方向。  
- **策略再生成（Strategy Regeneration）**：透過反思機制，Langgraph 代理人能夠動態更新行動規劃，提升成功率。  

這使得 Langgraph 的反思代理人不僅能夠完成任務，還能**自主學習並優化自己的決策邏輯**，非常適合應用於需要長期學習、持續改進的智能系統，例如程式生成、知識推理或多步任務解決等場景。


In [None]:
import datetime
import os
from textwrap import dedent

os.chdir("../../../")

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder, SystemMessagePromptTemplate, HumanMessagePromptTemplate, PromptTemplate
from langchain_core.messages.tool import ToolMessage
from langchain_core.messages.ai import AIMessage
from langchain_core.messages.human import HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field

from src.initialization import credential_init

credential_init()

model = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash",
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=6,
    disable_streaming=False
    # other params...
)


class Reflection(BaseModel):
    missing: str = Field(description="Critique of what is missing.")
    superfluous: str = Field(description="Critique of what is superfluous")


class AnswerQuestion(BaseModel):
    """Answer the question. Provide an answer, reflection, and then follow up with search queries to improve the answer."""

    answer: str = Field(description="~250 word detailed answer to the question.")
    reflection: Reflection = Field(description="Your reflection on the current answer.")
    search_queries: list[str] = Field(
        description="1-3 search queries for researching improvements to address the critique of your current answer."
    )

output_parser = PydanticOutputParser(pydantic_object=AnswerQuestion)
format_instructions = output_parser.get_format_instructions()


"""
這算是一個偷懶的作法: 一次性生成答案和反思。
也許先有一個答案，可以是使用LLM生成或是由人生成，再讓respond 專注於反思是一個比較好的做法
"""

system_template =  dedent("""
                            You are expert researcher.
                            Current time: {time}
                            
                            1. {first_instruction}
                            2. Reflect and critique your answer. Be severe to maximize improvement.
                            3. Recommend search queries to research information and improve your answer.

                            Please the answer in traditional Chinese (繁體中文).
                            """)

system_prompt_template = PromptTemplate(template=system_template,
                                        partial_variables={"time": datetime.datetime.now().isoformat(),
                                                           "first_instruction": "Provide a detailed ~250 word answer."})

human_template = dedent("""
                        <system>Reflect on the user's original question and the actions taken thus far.</reminder>
                        output format instruction: {format_instruction}
                        """
                        )

human_prompt_template = PromptTemplate(template=human_template,
                                       partial_variables={"format_instruction": format_instructions})

messages = [SystemMessagePromptTemplate(prompt=system_prompt_template),
            MessagesPlaceholder(variable_name="messages"),
            HumanMessagePromptTemplate(prompt=human_prompt_template)]

respond_chat_prompt_template = ChatPromptTemplate.from_messages(messages)

respond_pipeline = respond_chat_prompt_template|model|output_parser

## Respond

In [None]:
example_question = "花蓮縣光復鄉因為馬太鞍溪堰塞湖潰堤，導致被泥石流淹過。就安全的考量，沒接受過專業訓練的平民是否應該去花蓮縣光復鄉參與救災。"
messages = [HumanMessage(content=example_question)]

initial_response = respond_pipeline.invoke({"messages": messages})

search_queries = initial_response.search_queries

print(search_queries)

In [None]:
print(initial_response)

## Tool

In [None]:
from langchain_core.tools import tool

from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.utilities.tavily_search import TavilySearchAPIWrapper

search = TavilySearchAPIWrapper()
tavily_tool = TavilySearchResults(api_wrapper=search, max_results=1)

@tool
def run_queries(search_queries: list[str]):
    """Run the generated queries."""
    return tavily_tool.batch([{"query": query} for query in search_queries])

In [None]:
model_search = model.bind_tools(tools=[run_queries], tool_choice='any')

### Basic Tool Usage

In [None]:
response = model_search.invoke([HumanMessage(content=search_queries)])

In [None]:
tool_calls = response.tool_calls
tool_call = tool_calls[0]

search_results = eval(tool_call['name'])(tool_call['args'])

# 將工具產生的結果傳給 content，並且將tool_call_id設定為 AIMessage裡給定的tool_call['id']
tool_message = ToolMessage(content=[r[0]['content'] for r in search_results], 
                           tool_call_id=tool_call['id'])

In [None]:
message = model.invoke([HumanMessage(content=search_queries),
                        response,
                        tool_message])

In [None]:
 message

## Revise

In [None]:
revise_instructions = dedent("""\
Using the reflection feedback and new information, revise your last answer to correct errors, clarify reasoning, and improve overall quality.
Output only the improved answer. 

- You MUST include numerical citations in your revised answer to ensure it can be verified.
- Add a "References" section to the bottom of your answer (which does not count towards the word limit). In form of:
    - [1] https://example.com
    - [2] https://example.com
- Update the `reflection` and `search_queries` according to your current answer.
""")


In [None]:
# Extend the initial answer schema to include references.
# Forcing citation in the model encourages grounded responses


class ReviseAnswer(AnswerQuestion):
    """Revise your original answer to your question. Provide an answer, reflection,
    Cite your reflection with references, and finally
    add search queries to improve the answer."""

    references: list[str] = Field(
        description="Citations motivating your updated answer."
    )

output_parser = PydanticOutputParser(pydantic_object=ReviseAnswer)
format_instructions = output_parser.get_format_instructions()


system_template =  dedent("""
                            You are expert researcher.
                            Current time: {time}
                            
                            1. {first_instruction}
                            2. Reflect and critique your answer. Be severe to maximize improvement.
                            3. Recommend search queries to research information and improve your answer.

                            Please the answer in traditional Chinese (繁體中文).
                            """)

system_prompt_template = PromptTemplate(template=system_template,
                                        partial_variables={"time": datetime.datetime.now().isoformat(),
                                                           "first_instruction": revise_instructions})

human_template = dedent("""
                        <system>Reflect on the user's original question and the actions taken thus far.</reminder>
                        output format instruction: {format_instruction}
                        """
                        )

human_prompt_template = PromptTemplate(template=human_template,
                                       partial_variables={"format_instruction": format_instructions})

messages = [SystemMessagePromptTemplate(prompt=system_prompt_template),
            MessagesPlaceholder(variable_name="messages"),
            HumanMessagePromptTemplate(prompt=human_prompt_template)]

revise_chat_prompt_template = ChatPromptTemplate.from_messages(messages)

revision_pipeline = revise_chat_prompt_template|model|output_parser

In [None]:
initial_response

In [None]:
revised_message = revision_pipeline.invoke({"messages": [HumanMessage(content="花蓮縣光復鄉因為馬太鞍溪堰塞湖潰堤，導致被泥石流淹過。就安全的考量，沒接受過專業訓練的平民是否應該去花蓮縣光復鄉參與救災。"),
                                                         AIMessage(content=initial_response.model_dump_json()),
                                                         HumanMessage(content=message.content)]})

In [None]:
revised_message

In [None]:
initial_response.reflection

In [None]:
revised_message.reflection

**** 接下來就是重複 Tool -> Revision的過程 ****

## Langgaph Workflow

In [None]:
import json
from typing import Annotated
from typing_extensions import TypedDict

from langgraph.graph.message import add_messages

MESSAGES = "messages"
MAX_ITERATION = 3

class State(TypedDict):
    messages: Annotated[list, add_messages]


async def respond(state: State):

    respond = await respond_pipeline.ainvoke({"messages": state[MESSAGES]})

    return {MESSAGES: AIMessage(content=respond.model_dump_json())}
    

async def tool(state: State):

    last_message = state[MESSAGES][-1]

    search_query = json.loads(last_message.content)["search_queries"]

    response = await model_search.ainvoke([HumanMessage(content=search_queries)])

    tool_calls = response.tool_calls
    tool_call = tool_calls[0]
    
    search_results = eval(tool_call['name'])(tool_call['args'])
    
    # 將工具產生的結果傳給 content，並且將tool_call_id設定為 AIMessage裡給定的tool_call['id']
    tool_message = ToolMessage(content=[r[0]['content'] for r in search_results], 
                               tool_call_id=tool_call['id'])

    message = await model.ainvoke([HumanMessage(content=search_queries),
                                  response,
                                  tool_message])

    return {MESSAGES: HumanMessage(content=message.content)}


async def revise(state: State):

    revised = await revision_pipeline.ainvoke({"messages": state[MESSAGES]})
    
    return {MESSAGES: AIMessage(content=revised.model_dump_json())}


def should_end(state: State):

    COUNTER = sum([1 for message in state[MESSAGES] if message.type=='ai'])

    if COUNTER < MAX_ITERATION:
        return "tool"
    else:
        return END
    

In [None]:
from langgraph.checkpoint.memory import InMemorySaver

from langgraph.graph import StateGraph, START, END

workflow = StateGraph(State)

# Add the plan node
workflow.add_node("respond", respond)

# Add the execution step
workflow.add_node("tool", tool)

workflow.add_node("revise", revise)

workflow.add_edge(START, "respond")

workflow.add_edge("respond", "tool")

# From agent, we replan
workflow.add_edge("tool", "revise")

workflow.add_conditional_edges(
    "revise",
    should_end,
    ["tool", END]
)

checkpointer = InMemorySaver()
app = workflow.compile(checkpointer=checkpointer)

In [None]:
from IPython.display import Image, display

display(Image(app.get_graph(xray=True).draw_mermaid_png()))

In [None]:
config = {"configurable": {"thread_id": "1"}}

In [None]:
async for event in app.astream(
    {
        "messages": [
            HumanMessage(
                content="花蓮縣光復鄉因為馬太鞍溪堰塞湖潰堤，導致被泥石流淹過。就安全的考量，沒接受過專業訓練的平民是否應該去花蓮縣光復鄉參與救災。"
            )
        ],
    },
    config,
):
    print(event)
    print("---")