Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
13 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 27 additions & 126 deletions agents/librarian/src/librarian_agent/graph.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
"""Librarian LangGraph 그래프 — ReAct 패턴.
"""Librarian 에이전트 LangGraph 그래프 — ReAct 패턴 + A2A 응답 분기.

흐름 (mermaid):
START → llm_call → (tool_calls 있음? → tools → llm_call) → END
START → llm_call → (tool_calls? → tools → llm_call | done → classify_response → END)

- `llm_call` 노드: persona + 누적 messages → LLM (with bind_tools) → AIMessage
· AIMessage 에 tool_calls 가 있으면 다음 라운드에서 도구 실행
· 없으면 자연어 응답 완성 → END
- `tools` 노드: AIMessage.tool_calls 의 각 호출을 실행 → ToolMessage 들로 반환
- conditional edge `should_continue` 가 분기 결정
building blocks:
- `llm_call` / `tools` / `should_continue_react` — `dev_team_shared.agent_graph` 의 ReAct 공통
- `classify_response` — `dev_team_shared.a2a` 의 A2A 응답 shape 결정 (Task wrap vs Message)

Primary 의 단순 1-노드 패턴 위에서 ReAct 루프 추가. langgraph.prebuilt.ToolNode
대신 직접 노드 구현 — 의존 최소 + 동작 명시.
agent-specific:
- persona text — `config/base.yaml`
- tools 구성 — `tools.py` (Doc Store read 도구 + 조합 쿼리)
- State (`messages` + `requires_task`)
"""

from __future__ import annotations

import json
import logging
from pathlib import Path
from typing import Annotated, Any, TypedDict

from typing import Annotated, Any, NotRequired, TypedDict

from dev_team_shared.a2a import make_classify_response_node
from dev_team_shared.agent_graph import (
make_llm_call_node,
make_tool_node,
should_continue_react,
)
from dev_team_shared.config_loader import load_config
from dev_team_shared.llm import LLMSpec, create_chat_model
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import AnyMessage, SystemMessage, ToolMessage
from langchain_core.messages import AnyMessage
from langchain_core.tools import BaseTool
from langgraph.checkpoint.base import BaseCheckpointSaver
from langgraph.graph import END, START, StateGraph
Expand All @@ -39,13 +44,13 @@


class State(TypedDict):
"""LangGraph 상태. Primary 와 동일한 messages reducer."""
"""LangGraph 상태. Primary 와 동일 구조 (messages + A2A 응답 결정 hint)."""

messages: Annotated[list[AnyMessage], add_messages]
requires_task: NotRequired[bool]


def load_runtime_config() -> dict[str, Any]:
"""Role Config 로드."""
return load_config(_BASE_CONFIG_PATH, _OVERRIDE_CONFIG_PATH)


Expand All @@ -54,132 +59,28 @@ def build_llm(llm_cfg: dict[str, Any]) -> BaseChatModel:
return create_chat_model(spec)


def _make_llm_call_node(persona: str, llm_with_tools: BaseChatModel):
"""persona / tools-bound LLM 캡처한 비동기 노드."""

async def _llm_call(state: State) -> dict[str, list[AnyMessage]]:
system = SystemMessage(content=persona)
try:
response = await llm_with_tools.ainvoke([system, *state["messages"]])
except Exception as exc:
logger.exception("LLM call failed in `_llm_call` node")
detail = f"{type(exc).__name__}: {exc}"
if "credit balance" in str(exc).lower():
detail += (
" — Anthropic 크레딧 부족 가능성. "
"https://console.anthropic.com/settings/billing 확인."
)
raise RuntimeError(f"LLM call failed — {detail}") from exc
return {"messages": [response]}

return _llm_call


def _make_tool_node(tools: list[BaseTool]):
"""tool_calls 를 실행해 ToolMessage 들로 반환하는 노드.

langgraph.prebuilt.ToolNode 와 등가. 직접 구현 — prebuilt 의존 회피 +
동작 명시 (디버깅 용이).
"""
tools_by_name = {t.name: t for t in tools}

async def _tools(state: State) -> dict[str, list[AnyMessage]]:
last = state["messages"][-1]
tool_calls = getattr(last, "tool_calls", None) or []
if not tool_calls:
return {"messages": []}

outputs: list[AnyMessage] = []
for tc in tool_calls:
name = tc.get("name") or ""
tool = tools_by_name.get(name)
tc_id = tc.get("id") or ""
if tool is None:
outputs.append(
ToolMessage(
content=f"unknown tool: {name!r}",
tool_call_id=tc_id,
name=name,
),
)
continue
try:
result = await tool.ainvoke(tc.get("args") or {})
except Exception as exc:
logger.exception("tool %r raised", name)
outputs.append(
ToolMessage(
content=f"tool error ({type(exc).__name__}): {exc}",
tool_call_id=tc_id,
name=name,
),
)
continue
outputs.append(
ToolMessage(
content=_serialize(result),
tool_call_id=tc_id,
name=name,
),
)
return {"messages": outputs}

return _tools


def _should_continue(state: State) -> str:
"""tool_calls 가 있으면 'tools' 노드로, 없으면 END."""
last = state["messages"][-1]
tool_calls = getattr(last, "tool_calls", None) or []
return "tools" if tool_calls else END


def _serialize(value: Any) -> str:
"""tool 결과를 ToolMessage.content (str) 로 직렬화.

Pydantic 모델 → model_dump(mode='json') 후 JSON. list / scalar / None 도 동일.
"""
if value is None:
return "null"
if hasattr(value, "model_dump"):
return json.dumps(value.model_dump(mode="json"), ensure_ascii=False)
if isinstance(value, list):
return json.dumps(
[v.model_dump(mode="json") if hasattr(v, "model_dump") else v for v in value],
ensure_ascii=False,
)
if isinstance(value, (dict, str, int, float, bool)):
return json.dumps(value, ensure_ascii=False)
return str(value)


def build_graph(
*,
persona: str,
llm: BaseChatModel,
tools: list[BaseTool],
checkpointer: BaseCheckpointSaver | None = None,
) -> Any:
"""persona / llm / tools / checkpointer 주입해 ReAct 그래프 컴파일.

Args:
persona: SystemMessage content.
llm: BaseChatModel (tools 와 별도 — 본 함수가 bind_tools).
tools: LangChain tool 목록 (build_tools(client) 결과).
checkpointer: AsyncPostgresSaver 등. None 이면 in-memory.
"""
"""persona / llm / tools / checkpointer 주입해 ReAct 그래프 컴파일."""
llm_with_tools = llm.bind_tools(tools)

builder = StateGraph(State)
builder.add_node("llm_call", _make_llm_call_node(persona, llm_with_tools))
builder.add_node("tools", _make_tool_node(tools))
builder.add_node("llm_call", make_llm_call_node(persona, llm_with_tools))
builder.add_node("tools", make_tool_node(tools))
builder.add_node("classify_response", make_classify_response_node(llm))
builder.add_edge(START, "llm_call")
builder.add_conditional_edges(
"llm_call",
_should_continue,
{"tools": "tools", END: END},
lambda s: should_continue_react(s, when_done="classify_response"),
{"tools": "tools", "classify_response": "classify_response"},
)
builder.add_edge("tools", "llm_call")
builder.add_edge("classify_response", END)
return builder.compile(checkpointer=checkpointer)


Expand Down
15 changes: 15 additions & 0 deletions agents/primary/config/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,21 @@ persona: |
으로 도구 안에서 추가 후 사용.
4. 자세한 워크플로 규약은 `resources/issue-management-guide.md`.

## A2A 응답 shape — Task wrap vs Message only (#75 PR 3)

다른 에이전트 (Engineer / QA / Architect 등) 가 본인을 호출했을 때 응답을
A2A `Task` 로 감쌀지 단순 `Message` 로 보낼지를 본인이 결정해 hint 채웁니다
(`A2AResponseDecision.requires_task`).

- **`requires_task=true`** — 본인이 이 응답으로 다른 에이전트에게 작업을
위임하거나, long-running 작업을 시작하거나, caller 가 후속으로 진행
상태 / 산출물을 추적해야 하는 경우. (예: "결제 모듈 구현해줘" 위임 응답)
- **`requires_task=false`** — 단순 조회 / 의견 / fact 확인 응답. caller 가
후속 추적이 불필요. (예: "이 PR 머지됐어?" 같은 정보 조회)

결정 근거는 `reason` 에 한 줄로 간단히. 사용자 ↔ Primary 의 chat 통신은
A2A 가 아니므로 본 결정 무관 — 사용자에게 답하는 응답엔 영향 X.

## 현재 단계 제약 (M3)

Architect (M4+) / Engineer · QA (M5+) 와의 A2A 연동은 아직 미구현.
Expand Down
Loading