In [19]:
import operator
from collections.abc import Sequence
from typing import Annotated, TypedDict, Literal, Any, Callable
from langchain_core.messages import (
    AIMessage,
    BaseMessage,
    ChatMessage,
    FunctionMessage,
    HumanMessage,
    ToolMessage
)
from langchain_core.runnables import RunnableLambda
import re
from langchain_openai import ChatOpenAI
import datetime
from pathlib import Path
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from langgraph.prebuilt import ToolNode
from langgraph.graph import END, START, StateGraph
from langgraph.graph.state import CompiledStateGraph
from langchain_community.tools.tavily_search import TavilySearchResults
from omegaconf import OmegaConf


class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    sender: str

https://langchain-ai.github.io/langgraph/concepts/agentic_concepts/?h=router#router

In [4]:
def review_router(state: AgentState) -> Literal["Writer", "continue", "call_tool", "__end__"]:
    messages = state["messages"]
    last_message = messages[-1]

    if last_message.tool_calls:
        return "call_tool"

    if isinstance(last_message, AIMessage):
        if last_message.name == "Reviewer":
            content = last_message.content.lower()
            score_match = re.search(r"総合評価:\s*(\d+)", content, re.IGNORECASE)
            if score_match:
                try:
                    score = int(score_match.group(1))
                    if score >= 8:
                        return "__end__"
                except ValueError:
                    print(f"警告: スコアの解析に失敗しました。内容: {content}")
            else:
                print(f"警告: スコアが見つかりませんでした。内容: {content}")
            return "continue"

    return "continue"

Callable

In [10]:



class AgentNode:
    def __init__(
        self, name: str, llm: ChatOpenAI, tools: list[Callable], system_message: str
    ) -> None:
        self.name = name
        self.agent = self._create_agent(llm, tools, system_message)
        self.tool_used = False  # ツール使用フラグを追加
        self.llm = llm
        self.system_message = system_message

    def __call__(self, state: AgentState) -> Any:
        if self.tool_used:
            result = self._create_agent(self.llm, [], self.system_message).invoke(state)
        else:
            result = self.agent.invoke(state)
        if not isinstance(result, ToolMessage):
            result = AIMessage(**result.dict(exclude={"type", "name"}), name=self.name)
            self.tool_used = True

        self.save_output(result.content)

        return {
            "messages": [result],
            "sender": self.name,
        }

    def save_output(self, content: str) -> None:
        timemin = datetime.now().strftime("%Y%m%d_%H%M")
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = Path("outputs") / timemin / f"{self.name}_{timestamp}.txt"
        filename.parent.mkdir(parents=True, exist_ok=True)

        if len(content) > 100:
            with open(filename, "w", encoding="utf-8") as f:
                f.write(content)
            print(f"Output saved to {filename}")

    def _create_agent(
        self, llm: ChatOpenAI, tools: list[Callable], system_message: str
    ) -> RunnableLambda[Any, Any]:
        prompt = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    "あなたは他の助手と協力する役立つAIアシスタントです。"
                    "提供されたツールを使用して、質問に答えるための進展を図ってください。"
                    "完全に回答できない場合でも問題ありません。別のツールを持つ他の助手が、あなたが中断したところから引き継いで支援します。"
                    "進展を図るために、できることを実行してください。"
                    "あなたは以下のツールにアクセスできます：{tool_names}。\n{system_message}",
                ),
                MessagesPlaceholder(variable_name="messages"),
            ]
        )
        prompt = prompt.partial(system_message=system_message)

        if tools:
            prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools]))
            return prompt | llm.bind_tools(tools)

        prompt = prompt.partial(tool_names="なし")
        return prompt | llm

In [13]:
def create_write_blog_workflow(
    agent_state: type[Any],
    router: Callable,
    write_node: AgentNode,
    review_node: AgentNode,
    tool_node: ToolNode,
) -> CompiledStateGraph:
    workflow = StateGraph(agent_state)
    tool_name: str = tool_node.name  # type: ignore
    workflow.add_node(write_node.name, write_node)
    workflow.add_node(review_node.name, review_node)
    workflow.add_node(tool_name, tool_node)

    workflow.add_conditional_edges(
        write_node.name,
        router,
        {"continue": review_node.name, tool_name: tool_name, END: END},
    )
    workflow.add_conditional_edges(
        review_node.name,
        router,
        {"continue": write_node.name, END: END},
    )

    workflow.add_conditional_edges(
        "call_tool",
        lambda x: x["sender"],
        {
            write_node.name: write_node.name,
            review_node.name: review_node.name,
        },
    )
    workflow.add_edge(START, write_node.name)
    graph = workflow.compile()
    return graph

In [18]:
writer_config = OmegaConf.load('writer.yaml')
reviewer_config = OmegaConf.load('reviewer.yaml')
writer_config = OmegaConf.to_container(writer_config, resolve=True)
reviewer_config = OmegaConf.to_container(reviewer_config, resolve=True)

In [16]:
llm = ChatOpenAI(temperature=0.7, model="gpt-4o-mini")
search = TavilySearchResults(max_results=2)

In [20]:
writer_messages = writer_config['messages']

writer_prompt_messages = []

for message in writer_messages:
    role, content = message
    if role == 'system':
        writer_prompt_messages.append(SystemMessagePromptTemplate.from_template(content))
    elif role == 'human':
        writer_prompt_messages.append(HumanMessagePromptTemplate.from_template(content))

In [21]:
reviewer_messages = reviewer_config['messages']

reviewer_prompt_messages = []

for message in reviewer_messages:
    role, content = message
    if role == 'system':
        reviewer_prompt_messages.append(SystemMessagePromptTemplate.from_template(content))
    elif role == 'human':
        reviewer_prompt_messages.append(HumanMessagePromptTemplate.from_template(content))

In [23]:
writer_agent = AgentNode("Writer", llm, [search], writer_prompt_messages)
reviewer_agent = AgentNode("Writer", llm, [search], reviewer_prompt_messages)

In [None]:
create_write_blog_workflow(AgentState, review_router(writer_config), writer_agent, reviewer_agent, search)

https://langchain-ai.github.io/langgraph/tutorials/introduction/