# Document Analysis Agent

This notebook implements a LangChain v1.x agent (tool-calling, LangGraph-based) that autonomously analyzes a text file and extracts its structure as an AST.

## 準備

In [129]:
# %pip install -q "langchain>=1.0,<2" "langchain-core>=1.0,<2" "langchain-openai" "langgraph" "pydantic>=2,<3" "python-dotenv" "pypdf>=3.0.0" "pymupdf>=1.23.0"

In [None]:
import os
import re

import dotenv

# .env が存在する場合、環境変数を読み込む
dotenv.load_dotenv()

from typing import List, Optional
from pydantic import BaseModel, Field, field_validator

from langchain_core.tools import tool
from langchain_openai import AzureChatOpenAI, ChatOpenAI
from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware, TodoListMiddleware, ContextEditingMiddleware, ClearToolUsesEdit

def build_llm():
    """環境変数から OpenAI / Azure OpenAI のチャットモデルを作成する。"""
    provider = (os.getenv("LLM_PROVIDER") or "openai").lower()
    model = "gpt-5.2"
    temperature = float(os.getenv("TEMPERATURE") or "0")

    if provider in {"azure", "azureopenai", "azure_openai"}:
        return AzureChatOpenAI(
            azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
            azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")
            or os.getenv("AZURE_OPENAI_DEPLOYMENT")
            or model,
            api_version=os.getenv("AZURE_OPENAI_API_VERSION")
            or os.getenv("OPENAI_API_VERSION"),
            api_key=os.getenv("AZURE_OPENAI_API_KEY"),
            temperature=temperature,
        )

    return ChatOpenAI(
        model=model,
        api_key=os.getenv("OPENAI_API_KEY"),
        temperature=temperature,
    )


# 注意:
# - OpenAI: OPENAI_API_KEY を設定（必要なら OPENAI_MODEL / MODEL も）
# - Azure OpenAI: LLM_PROVIDER=azure と AZURE_OPENAI_ENDPOINT / AZURE_OPENAI_API_KEY /
#   AZURE_OPENAI_API_VERSION（または OPENAI_API_VERSION）/ AZURE_OPENAI_DEPLOYMENT_NAME を設定

## 1. カスタムツールの設定

In [122]:
@tool
def read_text_segment(file_path: str, start: int, length: int, intent: str) -> str:
    """
    テキストファイルの特定のセグメントを読み取る。
    
    引数:
        file_path: テキストファイルのパス。
        start: 開始文字インデックス（0ベース）。
        length: 読み取る文字数。
        intent: ツール呼び出しの意図。

    戻り値:
        'start' から指定された 'length' の部分文字列。
    """
    try:
        # 注意: "start" は文字インデックスであり、バイトオフセットではない。
        # テキストモードでは、seek(start) を使用すると UTF-8 のマルチバイトシーケンスの途中に着地する可能性がある。
        with open(file_path, 'r', encoding='utf-8') as f:
            if start > 0:
                f.read(start)
            return f.read(length)
    except Exception as e:
        return f"Error reading file: {str(e)}"

@tool
def extract_regex_matches(
    file_path: str,
    regex_pattern: str,
    intent: str,
    regex_patterns: Optional[List[str]] = None,
    max_matches: int = 50,
    offset_matches: int = 0,
    max_output_chars: int = 8000,
    count_only: bool = False,
    dedupe: bool = False,
    save_to: Optional[str] = None,
    include_line_text: bool = True,
) -> str:
    """
    ファイルから指定された正規表現パターンに一致するテキストを抽出する。

    - `regex_patterns` を指定すると、複数正規表現で検索して「行番号＋行テキスト＋マッチ文字列」を返す。
    - `regex_patterns` が未指定の場合は、従来どおり `regex_pattern`（単一）で検索する。

    引数:
        file_path: テキストファイルのパス。
        regex_pattern: 検索する Python スタイルの正規表現パターン（後方互換用。`regex_patterns` 指定時は無視される）。
        intent: ツール呼び出しの意図。
        regex_patterns: 検索する正規表現パターンのリスト（複数指定用）。
        max_matches: 返却する一致（サンプル）の最大件数（デフォルト: 50）。
        offset_matches: 先頭からスキップする一致件数（ページング用途）。
        max_output_chars: 返却文字列の上限（ベストエフォート）。超える場合はサンプルを削って短縮する。
        count_only: True の場合、件数のみ返す（サンプルは返さない）。
        dedupe: True の場合、サンプルを重複排除して返す（返却件数は max_matches まで）。
        save_to: 指定した場合、offset_matches 以降の一致を JSONL でファイルへ保存する（コンテキスト節約用）。
        include_line_text: True の場合、ヒットした行テキストも返す。

    戻り値:
        JSON 文字列（件数、サンプル、トランケーション有無、保存先など）。

    返却される `sample` の要素は以下の形:
        {
          "pattern_index": 0,
          "pattern": "...",
          "line_number": 123,
          "match": "..." | ["...", ...],
          "full_match": "...",
          "line": "..."  # include_line_text=true の場合
        }

    注意:
        このツールは「行番号」を返すため、検索は基本的に行単位で行う（複数行にまたがる正規表現は想定外）。
    """
    try:
        import json

        if max_matches < 0:
            max_matches = 0
        if offset_matches < 0:
            offset_matches = 0
        if max_output_chars is None or max_output_chars <= 0:
            max_output_chars = 8000

        patterns_in = regex_patterns if (regex_patterns is not None and len(regex_patterns) > 0) else [regex_pattern]
        # 空文字は除外（`regex_patterns` を使う場合に `regex_pattern` をダミーで渡してもOKにする）
        patterns_in = [p for p in patterns_in if isinstance(p, str) and p.strip()]
        if not patterns_in:
            return json.dumps(
                {
                    "ok": False,
                    "error": "regex_pattern または regex_patterns に少なくとも1つの正規表現を指定してください。",
                },
                ensure_ascii=False,
            )

        compiled = []
        for i, p in enumerate(patterns_in):
            try:
                compiled.append((i, p, re.compile(p)))
            except Exception as e:
                return json.dumps(
                    {
                        "ok": False,
                        "error": f"正規表現のコンパイルに失敗しました (index={i}): {str(e)}",
                        "pattern": p,
                    },
                    ensure_ascii=False,
                )

        def _match_to_item(m: re.Match, group_count: int):
            # re.findall と同等の返却形（グループなし: 全体、グループ1つ: そのグループ、複数: タプル）
            if group_count == 0:
                return m.group(0)
            if group_count == 1:
                return m.group(1)
            return m.groups()

        # save_to が指定されている場合は JSONL で全件（offset 以降）を保存
        save_f = None
        if save_to:
            save_dir = os.path.dirname(os.path.abspath(save_to))
            if save_dir:
                os.makedirs(save_dir, exist_ok=True)
            save_f = open(save_to, "w", encoding="utf-8", newline="\n")

        total = 0
        total_after_offset = 0
        sample = []
        seen = set()

        try:
            with open(file_path, "r", encoding="utf-8") as f:
                for line_number, line in enumerate(f, start=1):
                    line_no_nl = line.rstrip("\n")
                    for pattern_index, pattern_text, pattern_obj in compiled:
                        group_count = pattern_obj.groups
                        for m in pattern_obj.finditer(line_no_nl):
                            total += 1
                            if total <= offset_matches:
                                continue
                            total_after_offset += 1

                            item = _match_to_item(m, group_count)

                            record = {
                                "pattern_index": pattern_index,
                                "pattern": pattern_text,
                                "line_number": line_number,
                                "match": (list(item) if isinstance(item, tuple) else item),
                                "full_match": m.group(0),
                            }
                            if include_line_text:
                                record["line"] = line_no_nl

                            if save_f is not None:
                                save_f.write(json.dumps(record, ensure_ascii=False) + "\n")

                            if count_only:
                                continue

                            if len(sample) >= max_matches:
                                continue

                            if dedupe:
                                key = repr(record)
                                if key in seen:
                                    continue
                                seen.add(key)

                            sample.append(record)
        finally:
            if save_f is not None:
                save_f.close()

        payload = {
            "ok": True,
            "file_path": file_path,
            "regex_pattern": regex_pattern,
            "regex_patterns": patterns_in,
            "total_matches": total,
            "offset_matches": offset_matches,
            "matches_after_offset": total_after_offset,
            "returned_matches": 0 if count_only else len(sample),
            "truncated": (False if count_only else (total_after_offset > len(sample))),
            "count_only": bool(count_only),
            "dedupe": bool(dedupe),
            "saved_to": save_to,
            "include_line_text": bool(include_line_text),
            "sample": [] if count_only else sample,
            "hint": "結果が多い場合は count_only=true で件数確認→ offset_matches でページング。全件が必要なら save_to を指定してください。",
        }

        # 出力サイズが大きい場合は sample を削って短縮（ベストエフォート）
        def _dump(p: dict) -> str:
            return json.dumps(p, ensure_ascii=False)

        out = _dump(payload)
        if not count_only and len(out) > max_output_chars:
            while payload["sample"] and len(out) > max_output_chars:
                payload["sample"].pop()
                payload["returned_matches"] = len(payload["sample"])
                payload["truncated"] = True
                out = _dump(payload)
            if len(out) > max_output_chars:
                payload["sample"] = []
                payload["returned_matches"] = 0
                payload["truncated"] = True
                payload["hint"] = (
                    "出力が大きすぎるためサンプルは省略しました。count_only=true で件数確認し、"
                    "必要なら save_to でファイル出力してください。"
                )
                out = _dump(payload)

        return out
    except Exception as e:
        return f"Error extracting matches: {str(e)}"

@tool
def get_file_length(file_path: str, intent: str) -> str:
    """
    ファイルの総文字数を返す。
    
    引数:
        file_path: テキストファイルのパス。
        intent: ツール呼び出しの意図。

    戻り値:
        ファイルの総文字数。
    """
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            return str(len(f.read()))
    except Exception as e:
        return f"Error getting file length: {str(e)}"

class ReadTextFileArgs(BaseModel):
    """read_text_file ツールの引数。"""
    file_path: str = Field(..., description="テキストファイルのパス。")
    start: Optional[int] = Field(None, description="開始文字インデックス（0ベース）。このパラメータを省略すると、先頭から読み取る。")
    length: Optional[int] = Field(None, description="読み取る文字数。このパラメータを省略すると、start 位置から 100 文字読み取る（start も省略されている場合は先頭から 100 文字）。")
    intent: str = Field(..., description="ツール呼び出しの意図。")

@tool(args_schema=ReadTextFileArgs)
def read_text_file(file_path: str, start: Optional[int] = None, length: Optional[int] = None, intent: str = "") -> str:
    """
    UTF-8 としてテキストファイルを読み取る。ファイルの特定のセグメントを読み取ることができる。
    
    デフォルト動作: start と length の両方を省略した場合、最初の 100 文字を読み取る。
    これにより、大きなファイルを読み取る際にコンテキストウィンドウの制限を超えることを防ぐ。
    
    例:
    - read_text_file("file.txt") -> 最初の 100 文字を読み取る
    - read_text_file("file.txt", start=0, length=1000) -> 最初の 1000 文字を読み取る
    - read_text_file("file.txt", start=500, length=2000) -> 位置 500 から 2000 文字を読み取る
    - read_text_file("file.txt", start=1000) -> 位置 1000 から 100 文字を読み取る
    
    引数:
        file_path: テキストファイルのパス。
        start: 開始文字インデックス（0ベース）。省略すると先頭から読み取る。
        length: 読み取る文字数。省略すると start 位置から 100 文字を読み取る。
        intent: ツール呼び出しの意図。
    戻り値:
        ファイルコンテンツのセグメント（デフォルト: start と length の両方を省略した場合は最初の 100 文字）。
    """
    try:
        # 注意: "start" は文字インデックスであり、バイトオフセットではない。
        # テキストモードでは、seek(start) を使用すると UTF-8 のマルチバイトシーケンスの途中に着地する可能性がある。
        with open(file_path, 'r', encoding='utf-8') as f:
            if start is not None and start > 0:
                f.read(start)
            if length is not None:
                return f.read(length)
            # デフォルト: 100 文字を読み取る
            return f.read(100)
    except Exception as e:
        return f"Error reading file: {str(e)}"


#### ast_store

In [120]:
# --- 永続化 AST ストアツール（ノートブック内で自己完結） ---
import json
import re
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Literal, Optional, Tuple
from uuid import uuid4


def _utc_now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def _ensure_parent_dir(path: str) -> None:
    parent = os.path.dirname(os.path.abspath(path))
    if parent:
        os.makedirs(parent, exist_ok=True)


def _atomic_write_text(path: str, text: str, encoding: str = "utf-8") -> None:
    """os.replace を使用してファイルをアトミックに書き込む（ベストエフォート）。"""
    _ensure_parent_dir(path)
    tmp_path = f"{path}.tmp"
    with open(tmp_path, "w", encoding=encoding, newline="\n") as f:
        f.write(text)
    os.replace(tmp_path, path)


def _load_json(path: str) -> Dict[str, Any]:
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)


def _dump_json(data: Any) -> str:
    return json.dumps(data, ensure_ascii=False, indent=2)


def _get_meta(ast: Dict[str, Any]) -> Dict[str, Any]:
    meta = ast.get("__meta__")
    if isinstance(meta, dict):
        if "rev" not in meta:
            meta["rev"] = 0
        return meta
    meta = {"rev": 0, "updated_at": None}
    ast["__meta__"] = meta
    return meta


def _bump_meta(ast: Dict[str, Any]) -> Dict[str, Any]:
    meta = _get_meta(ast)
    meta["rev"] = int(meta.get("rev") or 0) + 1
    meta["updated_at"] = _utc_now_iso()
    return meta


# load_metaで発行するワンタイム編集トークン（Notebookカーネル内メモリに保持）
# token -> {ast_path, scope_kind, scope_value, issued_rev, expires_at}
_EDIT_TOKENS: Dict[str, Dict[str, Any]] = {}
_EDIT_TOKEN_TTL = timedelta(minutes=10)


def _issue_edit_token(*, ast_path: str, scope_kind: str, scope_value: Any, issued_rev: int) -> str:
    token = uuid4().hex
    _EDIT_TOKENS[token] = {
        "ast_path": os.path.abspath(ast_path),
        "scope_kind": scope_kind,
        "scope_value": scope_value,
        "issued_rev": int(issued_rev),
        "expires_at": datetime.now(timezone.utc) + _EDIT_TOKEN_TTL,
    }
    return token


def _consume_edit_token(*, token: str, ast_path: str, scope_kind: str, scope_value: Any, current_rev: int) -> Optional[str]:
    rec = _EDIT_TOKENS.get(token)
    if not rec:
        return "edit_token が欠落しているか無効です。新しいトークンを取得するために ast_store(action='load_meta', ...) を呼び出してください。"

    if rec.get("expires_at") and datetime.now(timezone.utc) > rec["expires_at"]:
        _EDIT_TOKENS.pop(token, None)
        return "edit_token の有効期限が切れました。load_meta を再度呼び出してください。"

    if rec.get("ast_path") != os.path.abspath(ast_path):
        return "edit_token が ast_path と一致しません。load_meta を再度呼び出してください。"

    issued_rev = rec.get("issued_rev")
    if issued_rev is None or int(issued_rev) != int(current_rev):
        return f"古いトークンです（issued_rev={issued_rev}, current_rev={current_rev}）。load_meta を再度呼び出してください。"

    if rec.get("scope_kind") != scope_kind:
        return "edit_token のスコープが一致しません。load_meta を再度呼び出してください。"

    if rec.get("scope_value") != scope_value:
        return "edit_token のスコープ値が一致しません。load_meta を再度呼び出してください。"

    # 消費
    _EDIT_TOKENS.pop(token, None)
    return None


def _normalize_path_indices(path_indices: Optional[List[int]]) -> List[int]:
    if path_indices is None:
        return []
    return list(path_indices)


@dataclass(frozen=True)
class _NodeRef:
    node: Dict[str, Any]
    parent: Optional[Dict[str, Any]]
    index_in_parent: Optional[int]


def _get_children_list(node: Dict[str, Any]) -> List[Dict[str, Any]]:
    children = node.get("children")
    if children is None:
        children = []
        node["children"] = children
    if not isinstance(children, list):
        raise ValueError("無効な AST: 'children' はリストである必要があります。")
    return children  # type: ignore[return-value]


def _traverse(ast: Dict[str, Any], node_path: List[int]) -> _NodeRef:
    """node_path: [] = ルート、[0] = 最初の子、[0,2] = 最初の子の3番目の子。"""
    if "root" not in ast or not isinstance(ast["root"], dict):
        raise ValueError("無効な AST: 'root' オブジェクトが欠落しています。")

    current = ast["root"]
    parent: Optional[Dict[str, Any]] = None
    idx_in_parent: Optional[int] = None

    for idx in node_path:
        children = _get_children_list(current)
        if idx < 0 or idx >= len(children):
            raise IndexError(f"無効なパスインデックス {idx}。子要素の長さは {len(children)} です。")
        parent = current
        idx_in_parent = idx
        current = children[idx]
        if not isinstance(current, dict):
            raise ValueError("無効な AST: ノードはオブジェクトである必要があります。")

    return _NodeRef(node=current, parent=parent, index_in_parent=idx_in_parent)


def _make_node(section_title: Optional[str], content_summary: str) -> Dict[str, Any]:
    return {
        "section_title": section_title,
        "content_summary": content_summary,
        "children": [],
    }


def _normalize_title(title: Optional[str]) -> str:
    if title is None:
        return ""
    t = str(title).replace("\u3000", " ")
    t = re.sub(r"\s+", " ", t).strip()
    return t


def _titles_equal(a: Optional[str], b: Optional[str]) -> bool:
    return _normalize_title(a) == _normalize_title(b)


def _ensure_titles_path(
    ast: Dict[str, Any],
    titles: List[str],
    *,
    create_missing: bool,
    created_default_summary: str = "",
) -> Tuple[List[int], bool]:
    """セクションタイトルのリスト（パス）によってノードを解決する。戻り値: (node_path, created_any)。"""
    if "root" not in ast or not isinstance(ast["root"], dict):
        raise ValueError("無効な AST: 'root' オブジェクトが欠落しています。")

    current = ast["root"]
    path: List[int] = []
    created_any = False

    titles_norm: List[str] = [str(t) for t in titles if str(t).strip() != ""]
    if titles_norm:
        root_title = current.get("section_title")
        if _titles_equal(root_title, titles_norm[0]):
            titles_norm = titles_norm[1:]

    for raw_title in titles_norm:
        title = str(raw_title)
        target_norm = _normalize_title(title)

        children = _get_children_list(current)
        matches = [
            i
            for i, child in enumerate(children)
            if isinstance(child, dict) and _normalize_title(child.get("section_title")) == target_norm
        ]

        if not matches:
            if not create_missing:
                raise ValueError(f"タイトル '{title}' のノードが見つかりません。")
            children.append(_make_node(title, created_default_summary))
            idx = len(children) - 1
            created_any = True
        else:
            # 同じ親の下に重複が存在する場合、決定論的に最初の一致を選択する。
            idx = matches[0]

        path.append(idx)
        current = children[idx]
        if not isinstance(current, dict):
            raise ValueError("無効な AST: ノードはオブジェクトである必要があります。")

    return path, created_any


def _titles_for_path(ast: Dict[str, Any], node_path: List[int]) -> List[str]:
    if "root" not in ast or not isinstance(ast["root"], dict):
        raise ValueError("無効な AST: 'root' オブジェクトが欠落しています。")
    current = ast["root"]
    titles: List[str] = []
    for idx in node_path:
        children = _get_children_list(current)
        if idx < 0 or idx >= len(children):
            raise IndexError(f"無効なパスインデックス {idx}。子要素の長さは {len(children)} です。")
        current = children[idx]
        if not isinstance(current, dict):
            raise ValueError("無効な AST: ノードはオブジェクトである必要があります。")
        t = current.get("section_title")
        titles.append(str(t) if t is not None else "")
    return titles


def _find_nodes_by_title(
    ast: Dict[str, Any],
    title_query: str,
    *,
    max_results: int,
    case_sensitive: bool,
) -> List[Dict[str, Any]]:
    if not title_query:
        return []

    q = title_query if case_sensitive else title_query.lower()
    results: List[Dict[str, Any]] = []

    def walk(node: Dict[str, Any], path: List[int]) -> None:
        if len(results) >= max_results:
            return
        title = node.get("section_title") or ""
        hay = title if case_sensitive else str(title).lower()
        if q in hay:
            results.append({"path": path, "section_title": node.get("section_title")})
            if len(results) >= max_results:
                return

        for i, child in enumerate(_get_children_list(node)):
            if not isinstance(child, dict):
                continue
            walk(child, path + [i])

    root = ast.get("root")
    if isinstance(root, dict):
        walk(root, [])
    return results


class ASTStoreArgs(BaseModel):
    action: Literal[
        # read-only
        "load",
        "load_subtree",
        "load_meta",
        "find_by_title",
        "list_children",
        "resolve_path",
        # write
        "init",
        "ensure_path",
        "append_child",
        "append_child_by_titles",
        "upsert_child_by_title",
        "upsert_child_by_titles",
        "update_node",
        "update_node_by_titles",
        "append_to_summary",
        "append_to_summary_by_titles",
    ] = Field(..., description="永続化された AST に対して実行する操作。")

    ast_path: str = Field(
        "ast_state.json",
        description="AST JSON ファイルのパス（作成または更新される）。",
    )

    # init
    file_name: Optional[str] = Field(None, description="AST の文書名（init に必要）。")
    root_title: Optional[str] = Field(None, description="ルートノードのタイトル（オプション）。")
    root_summary: Optional[str] = Field("", description="ルートノードの content_summary。")

    # navigation (index)
    node_path: Optional[List[int]] = Field(None, description="ターゲットノードのパス（ルートからのインデックスリスト）。")
    parent_path: Optional[List[int]] = Field(None, description="親ノードのパス（ルートからのインデックスリスト）。")

    # navigation (titles)
    node_titles: Optional[List[str]] = Field(None, description="ターゲットノードのタイトルパス（ルートから）。")
    parent_titles: Optional[List[str]] = Field(None, description="親ノードのタイトルパス（ルートから）。")

    # node data
    section_title: Optional[str] = Field(None, description="新しいノードまたは更新されるノードのセクションタイトル。")
    content_summary: Optional[str] = Field(None, description="新しいノードまたは更新されるノードのセクションコンテンツ要約。")
    append_text: Optional[str] = Field(None, description="content_summary に追加するテキスト。")

    # append options
    position: Optional[int] = Field(None, description="親の子要素の下での挿入位置。省略すると末尾に追加される。")

    # ensure/resolve options
    create_missing: bool = Field(False, description="true の場合、ensure_path で不足しているノードを作成する。")
    created_default_summary: str = Field("", description="自動作成されたノードのデフォルト要約。")

    # find options
    title_query: Optional[str] = Field(None, description="section_title 内で検索する部分文字列。")
    max_results: int = Field(20, description="検索操作での最大一致数。")
    case_sensitive: bool = Field(False, description="タイトルマッチングが大文字小文字を区別するかどうか。")

    # edit guard
    purpose: Optional[Literal[
        "append_child",
        "upsert_child",
        "update_node",
        "append_to_summary",
        "ensure_path",
    ]] = Field(None, description="action=load_meta に必要。次に実行する書き込み操作を示す。")

    edit_token: Optional[str] = Field(None, description="load_meta によって返されるワンタイムトークン。書き込み操作に必要。")

    include_children: bool = Field(True, description="load_meta/list_children 用: 現在の子要素のタイトルとインデックスを含める。")


@tool(args_schema=ASTStoreArgs)
def ast_store(
    action: str,
    ast_path: str = "ast_state.json",
    file_name: Optional[str] = None,
    root_title: Optional[str] = None,
    root_summary: str = "",
    # navigation (index)
    node_path: Optional[List[int]] = None,
    parent_path: Optional[List[int]] = None,
    # navigation (titles)
    node_titles: Optional[List[str]] = None,
    parent_titles: Optional[List[str]] = None,
    # node data
    section_title: Optional[str] = None,
    content_summary: Optional[str] = None,
    append_text: Optional[str] = None,
    # append options
    position: Optional[int] = None,
    # ensure/resolve options
    create_missing: bool = False,
    created_default_summary: str = "",
    # find options
    title_query: Optional[str] = None,
    max_results: int = 20,
    case_sensitive: bool = False,
    # edit guard
    purpose: Optional[str] = None,
    edit_token: Optional[str] = None,
    include_children: bool = True,
) -> str:
    """永続化された AST エディタ。常にディスクから現在の AST を読み取り、すぐに書き戻す。"""
    try:
        node_path_n = _normalize_path_indices(node_path)
        parent_path_n = _normalize_path_indices(parent_path)

        # init
        if action == "init":
            if not file_name:
                return _dump_json({"ok": False, "error": "action=init には file_name が必要です"})

            now = _utc_now_iso()
            ast: Dict[str, Any] = {
                "file_name": file_name,
                "__meta__": {"rev": 0, "updated_at": now},
                "root": _make_node(root_title or file_name, root_summary or ""),
            }
            _atomic_write_text(ast_path, _dump_json(ast))
            return _dump_json(
                {
                    "ok": True,
                    "action": "init",
                    "ast_path": ast_path,
                    "file_name": file_name,
                    "rev": 0,
                    "updated_at": now,
                }
            )

        # その他のアクションには既存のファイルが必要
        if not os.path.exists(ast_path):
            return _dump_json(
                {
                    "ok": False,
                    "error": f"AST ファイルが見つかりません: {ast_path}。まず action=init を呼び出してください。",
                }
            )

        ast = _load_json(ast_path)
        meta = _get_meta(ast)
        current_rev = int(meta.get("rev") or 0)

        if action == "load":
            return _dump_json({"ok": True, "action": "load", "rev": current_rev, "updated_at": meta.get("updated_at"), "ast": ast})

        if action == "load_subtree":
            ref = _traverse(ast, node_path_n)
            return _dump_json(
                {
                    "ok": True,
                    "action": "load_subtree",
                    "rev": current_rev,
                    "updated_at": meta.get("updated_at"),
                    "node_path": node_path_n,
                    "node": ref.node,
                }
            )

        if action == "resolve_path":
            if not node_titles:
                return _dump_json({"ok": False, "error": "action=resolve_path には node_titles が必要です"})
            path, _created = _ensure_titles_path(
                ast,
                node_titles,
                create_missing=False,
                created_default_summary=created_default_summary,
            )
            return _dump_json(
                {
                    "ok": True,
                    "action": "resolve_path",
                    "rev": current_rev,
                    "updated_at": meta.get("updated_at"),
                    "node_titles": node_titles,
                    "node_path": path,
                }
            )

        if action == "list_children":
            if node_titles:
                path, _created = _ensure_titles_path(
                    ast,
                    node_titles,
                    create_missing=False,
                    created_default_summary=created_default_summary,
                )
            else:
                path = node_path_n
            ref = _traverse(ast, path)
            children = _get_children_list(ref.node)
            children_info = []
            if include_children:
                for i, ch in enumerate(children):
                    if isinstance(ch, dict):
                        children_info.append({"index": i, "section_title": ch.get("section_title")})
            return _dump_json(
                {
                    "ok": True,
                    "action": "list_children",
                    "rev": current_rev,
                    "updated_at": meta.get("updated_at"),
                    "node_path": path,
                    "node_titles": _titles_for_path(ast, path),
                    "children": children_info,
                }
            )

        if action == "load_meta":
            if not purpose:
                return _dump_json({"ok": False, "error": "action=load_meta には purpose が必要です"})

            # スコープノードを解決（編集対象のノード）
            if node_titles:
                scope_path, _created = _ensure_titles_path(
                    ast,
                    node_titles,
                    create_missing=False,
                    created_default_summary=created_default_summary,
                )
            else:
                scope_path = node_path_n

            scope_ref = _traverse(ast, scope_path)
            children_info = []
            if include_children:
                for i, ch in enumerate(_get_children_list(scope_ref.node)):
                    if isinstance(ch, dict):
                        children_info.append({"index": i, "section_title": ch.get("section_title")})

            token = _issue_edit_token(
                ast_path=ast_path,
                scope_kind=str(purpose),
                scope_value={"node_path": scope_path},
                issued_rev=current_rev,
            )

            return _dump_json(
                {
                    "ok": True,
                    "action": "load_meta",
                    "rev": current_rev,
                    "updated_at": meta.get("updated_at"),
                    "purpose": purpose,
                    "node_path": scope_path,
                    "node_titles": _titles_for_path(ast, scope_path),
                    "children": children_info,
                    "edit_token": token,
                }
            )

        if action == "find_by_title":
            q = title_query or ""
            matches = _find_nodes_by_title(
                ast,
                q,
                max_results=max(1, int(max_results)),
                case_sensitive=bool(case_sensitive),
            )
            return _dump_json(
                {
                    "ok": True,
                    "action": "find_by_title",
                    "rev": current_rev,
                    "updated_at": meta.get("updated_at"),
                    "title_query": q,
                    "matches": matches,
                }
            )

        # --- 書き込みアクション（load_meta によって発行された edit_token が必要） ---
        if action == "ensure_path":
            if not node_titles:
                return _dump_json({"ok": False, "error": "action=ensure_path には node_titles が必要です", "rev": current_rev})

            token_err = _consume_edit_token(
                token=edit_token or "",
                ast_path=ast_path,
                scope_kind="ensure_path",
                scope_value={"node_path": []},
                current_rev=current_rev,
            )
            if token_err:
                return _dump_json({"ok": False, "error": token_err, "rev": current_rev, "updated_at": meta.get("updated_at")})

            path, created_any = _ensure_titles_path(
                ast,
                node_titles,
                create_missing=bool(create_missing),
                created_default_summary=created_default_summary,
            )

            if created_any:
                new_meta = _bump_meta(ast)
                _atomic_write_text(ast_path, _dump_json(ast))
                return _dump_json(
                    {
                        "ok": True,
                        "action": "ensure_path",
                        "rev": int(new_meta.get("rev") or 0),
                        "updated_at": new_meta.get("updated_at"),
                        "node_titles": node_titles,
                        "node_path": path,
                        "created": True,
                    }
                )

            # no change
            return _dump_json(
                {
                    "ok": True,
                    "action": "ensure_path",
                    "rev": current_rev,
                    "updated_at": meta.get("updated_at"),
                    "node_titles": node_titles,
                    "node_path": path,
                    "created": False,
                }
            )

        if action == "append_child_by_titles":
            if content_summary is None:
                return _dump_json({"ok": False, "error": "action=append_child_by_titles には content_summary が必要です", "rev": current_rev})
            if not parent_titles:
                return _dump_json({"ok": False, "error": "action=append_child_by_titles には parent_titles が必要です", "rev": current_rev})

            parent_path_resolved, _created = _ensure_titles_path(
                ast,
                parent_titles,
                create_missing=False,
                created_default_summary=created_default_summary,
            )

            token_err = _consume_edit_token(
                token=edit_token or "",
                ast_path=ast_path,
                scope_kind="append_child",
                scope_value={"node_path": parent_path_resolved},
                current_rev=current_rev,
            )
            if token_err:
                return _dump_json({"ok": False, "error": token_err, "rev": current_rev, "updated_at": meta.get("updated_at")})

            parent_ref = _traverse(ast, parent_path_resolved)
            children = _get_children_list(parent_ref.node)

            new_node = _make_node(section_title, content_summary)
            if position is None:
                children.append(new_node)
                new_index = len(children) - 1
            else:
                pos = int(position)
                if pos < 0 or pos > len(children):
                    return _dump_json({"ok": False, "error": f"position out of range: {pos} (0..{len(children)})", "rev": current_rev})
                children.insert(pos, new_node)
                new_index = pos

            new_meta = _bump_meta(ast)
            _atomic_write_text(ast_path, _dump_json(ast))
            return _dump_json(
                {
                    "ok": True,
                    "action": "append_child_by_titles",
                    "rev": int(new_meta.get("rev") or 0),
                    "updated_at": new_meta.get("updated_at"),
                    "parent_titles": parent_titles,
                    "parent_path": parent_path_resolved,
                    "new_node_path": parent_path_resolved + [new_index],
                }
            )

        if action == "upsert_child_by_titles":
            if not section_title:
                return _dump_json({"ok": False, "error": "action=upsert_child_by_titles には section_title が必要です", "rev": current_rev})
            if content_summary is None:
                return _dump_json({"ok": False, "error": "action=upsert_child_by_titles には content_summary が必要です", "rev": current_rev})
            if not parent_titles:
                return _dump_json({"ok": False, "error": "action=upsert_child_by_titles には parent_titles が必要です", "rev": current_rev})

            parent_path_resolved, _created = _ensure_titles_path(
                ast,
                parent_titles,
                create_missing=False,
                created_default_summary=created_default_summary,
            )

            token_err = _consume_edit_token(
                token=edit_token or "",
                ast_path=ast_path,
                scope_kind="upsert_child",
                scope_value={"node_path": parent_path_resolved},
                current_rev=current_rev,
            )
            if token_err:
                return _dump_json({"ok": False, "error": token_err, "rev": current_rev, "updated_at": meta.get("updated_at")})

            parent_ref = _traverse(ast, parent_path_resolved)
            children = _get_children_list(parent_ref.node)

            found_index: Optional[int] = None
            target_norm = _normalize_title(section_title)
            for i, child in enumerate(children):
                if not isinstance(child, dict):
                    continue
                if _normalize_title(child.get("section_title")) == target_norm:
                    found_index = i
                    break

            if found_index is None:
                children.append(_make_node(section_title, content_summary))
                found_index = len(children) - 1
                op = "created"
            else:
                child = children[found_index]
                existing = str(child.get("content_summary") or "")
                if existing:
                    child["content_summary"] = existing.rstrip() + "\n" + str(content_summary).lstrip()
                else:
                    child["content_summary"] = str(content_summary)
                op = "appended"

            new_meta = _bump_meta(ast)
            _atomic_write_text(ast_path, _dump_json(ast))
            return _dump_json(
                {
                    "ok": True,
                    "action": "upsert_child_by_titles",
                    "rev": int(new_meta.get("rev") or 0),
                    "updated_at": new_meta.get("updated_at"),
                    "parent_titles": parent_titles,
                    "parent_path": parent_path_resolved,
                    "node_path": parent_path_resolved + [found_index],
                    "op": op,
                }
            )

        if action == "update_node_by_titles":
            if not node_titles:
                return _dump_json({"ok": False, "error": "action=update_node_by_titles には node_titles が必要です", "rev": current_rev})
            if section_title is None and content_summary is None:
                return _dump_json({"ok": False, "error": "action=update_node_by_titles には section_title または content_summary のいずれかが必要です", "rev": current_rev})

            node_path_resolved, _created = _ensure_titles_path(
                ast,
                node_titles,
                create_missing=False,
                created_default_summary=created_default_summary,
            )

            token_err = _consume_edit_token(
                token=edit_token or "",
                ast_path=ast_path,
                scope_kind="update_node",
                scope_value={"node_path": node_path_resolved},
                current_rev=current_rev,
            )
            if token_err:
                return _dump_json({"ok": False, "error": token_err, "rev": current_rev, "updated_at": meta.get("updated_at")})

            ref = _traverse(ast, node_path_resolved)
            if section_title is not None:
                ref.node["section_title"] = section_title
            if content_summary is not None:
                ref.node["content_summary"] = content_summary

            new_meta = _bump_meta(ast)
            _atomic_write_text(ast_path, _dump_json(ast))
            return _dump_json(
                {
                    "ok": True,
                    "action": "update_node_by_titles",
                    "rev": int(new_meta.get("rev") or 0),
                    "updated_at": new_meta.get("updated_at"),
                    "node_titles": node_titles,
                    "node_path": node_path_resolved,
                }
            )

        if action == "append_to_summary_by_titles":
            if not node_titles:
                return _dump_json({"ok": False, "error": "action=append_to_summary_by_titles には node_titles が必要です", "rev": current_rev})
            if append_text is None:
                return _dump_json({"ok": False, "error": "action=append_to_summary_by_titles には append_text が必要です", "rev": current_rev})

            node_path_resolved, _created = _ensure_titles_path(
                ast,
                node_titles,
                create_missing=False,
                created_default_summary=created_default_summary,
            )

            token_err = _consume_edit_token(
                token=edit_token or "",
                ast_path=ast_path,
                scope_kind="append_to_summary",
                scope_value={"node_path": node_path_resolved},
                current_rev=current_rev,
            )
            if token_err:
                return _dump_json({"ok": False, "error": token_err, "rev": current_rev, "updated_at": meta.get("updated_at")})

            ref = _traverse(ast, node_path_resolved)
            existing = str(ref.node.get("content_summary") or "")
            if existing:
                ref.node["content_summary"] = existing.rstrip() + "\n" + str(append_text).lstrip()
            else:
                ref.node["content_summary"] = str(append_text)

            new_meta = _bump_meta(ast)
            _atomic_write_text(ast_path, _dump_json(ast))
            return _dump_json(
                {
                    "ok": True,
                    "action": "append_to_summary_by_titles",
                    "rev": int(new_meta.get("rev") or 0),
                    "updated_at": new_meta.get("updated_at"),
                    "node_titles": node_titles,
                    "node_path": node_path_resolved,
                }
            )

        # --- legacy index-based write actions (also token-guarded) ---
        if action == "append_child":
            if content_summary is None:
                return _dump_json({"ok": False, "error": "content_summary is required for action=append_child", "rev": current_rev})

            token_err = _consume_edit_token(
                token=edit_token or "",
                ast_path=ast_path,
                scope_kind="append_child",
                scope_value={"node_path": parent_path_n},
                current_rev=current_rev,
            )
            if token_err:
                return _dump_json({"ok": False, "error": token_err, "rev": current_rev, "updated_at": meta.get("updated_at")})

            parent_ref = _traverse(ast, parent_path_n)
            children = _get_children_list(parent_ref.node)

            new_node = _make_node(section_title, content_summary)
            if position is None:
                children.append(new_node)
                new_index = len(children) - 1
            else:
                pos = int(position)
                if pos < 0 or pos > len(children):
                    return _dump_json({"ok": False, "error": f"position out of range: {pos} (0..{len(children)})", "rev": current_rev})
                children.insert(pos, new_node)
                new_index = pos

            new_meta = _bump_meta(ast)
            _atomic_write_text(ast_path, _dump_json(ast))
            return _dump_json(
                {
                    "ok": True,
                    "action": "append_child",
                    "rev": int(new_meta.get("rev") or 0),
                    "updated_at": new_meta.get("updated_at"),
                    "parent_path": parent_path_n,
                    "new_node_path": parent_path_n + [new_index],
                }
            )

        if action == "upsert_child_by_title":
            if not section_title:
                return _dump_json({"ok": False, "error": "section_title is required for action=upsert_child_by_title", "rev": current_rev})
            if content_summary is None:
                return _dump_json({"ok": False, "error": "content_summary is required for action=upsert_child_by_title", "rev": current_rev})

            token_err = _consume_edit_token(
                token=edit_token or "",
                ast_path=ast_path,
                scope_kind="upsert_child",
                scope_value={"node_path": parent_path_n},
                current_rev=current_rev,
            )
            if token_err:
                return _dump_json({"ok": False, "error": token_err, "rev": current_rev, "updated_at": meta.get("updated_at")})

            parent_ref = _traverse(ast, parent_path_n)
            children = _get_children_list(parent_ref.node)

            found_index: Optional[int] = None
            target_norm = _normalize_title(section_title)
            for i, child in enumerate(children):
                if not isinstance(child, dict):
                    continue
                if _normalize_title(child.get("section_title")) == target_norm:
                    found_index = i
                    break

            if found_index is None:
                children.append(_make_node(section_title, content_summary))
                found_index = len(children) - 1
                op = "created"
            else:
                child = children[found_index]
                existing = str(child.get("content_summary") or "")
                if existing:
                    child["content_summary"] = existing.rstrip() + "\n" + str(content_summary).lstrip()
                else:
                    child["content_summary"] = str(content_summary)
                op = "appended"

            new_meta = _bump_meta(ast)
            _atomic_write_text(ast_path, _dump_json(ast))
            return _dump_json(
                {
                    "ok": True,
                    "action": "upsert_child_by_title",
                    "rev": int(new_meta.get("rev") or 0),
                    "updated_at": new_meta.get("updated_at"),
                    "parent_path": parent_path_n,
                    "node_path": parent_path_n + [found_index],
                    "op": op,
                }
            )

        if action == "update_node":
            if section_title is None and content_summary is None:
                return _dump_json({"ok": False, "error": "section_title and/or content_summary must be provided for action=update_node", "rev": current_rev})

            token_err = _consume_edit_token(
                token=edit_token or "",
                ast_path=ast_path,
                scope_kind="update_node",
                scope_value={"node_path": node_path_n},
                current_rev=current_rev,
            )
            if token_err:
                return _dump_json({"ok": False, "error": token_err, "rev": current_rev, "updated_at": meta.get("updated_at")})

            ref = _traverse(ast, node_path_n)
            if section_title is not None:
                ref.node["section_title"] = section_title
            if content_summary is not None:
                ref.node["content_summary"] = content_summary

            new_meta = _bump_meta(ast)
            _atomic_write_text(ast_path, _dump_json(ast))
            return _dump_json(
                {
                    "ok": True,
                    "action": "update_node",
                    "rev": int(new_meta.get("rev") or 0),
                    "updated_at": new_meta.get("updated_at"),
                    "node_path": node_path_n,
                }
            )

        if action == "append_to_summary":
            if append_text is None:
                return _dump_json({"ok": False, "error": "append_text is required for action=append_to_summary", "rev": current_rev})

            token_err = _consume_edit_token(
                token=edit_token or "",
                ast_path=ast_path,
                scope_kind="append_to_summary",
                scope_value={"node_path": node_path_n},
                current_rev=current_rev,
            )
            if token_err:
                return _dump_json({"ok": False, "error": token_err, "rev": current_rev, "updated_at": meta.get("updated_at")})

            ref = _traverse(ast, node_path_n)
            existing = str(ref.node.get("content_summary") or "")
            if existing:
                ref.node["content_summary"] = existing.rstrip() + "\n" + str(append_text).lstrip()
            else:
                ref.node["content_summary"] = str(append_text)

            new_meta = _bump_meta(ast)
            _atomic_write_text(ast_path, _dump_json(ast))
            return _dump_json(
                {
                    "ok": True,
                    "action": "append_to_summary",
                    "rev": int(new_meta.get("rev") or 0),
                    "updated_at": new_meta.get("updated_at"),
                    "node_path": node_path_n,
                }
            )

        return _dump_json({"ok": False, "error": f"Unknown action: {action}"})

    except Exception as e:
        return _dump_json({"ok": False, "error": str(e)})


## 2. 構造化出力（スキーマ）の定義

In [130]:
# class DocumentNode(BaseModel):
#     section_title: Optional[str] = Field(None, description="セクションのタイトル")
#     content_summary: str = Field(..., description="このセクションのコンテンツの簡潔な要約")
#     children: List['DocumentNode'] = Field(default_factory=list, description="サブセクションまたはネストされたコンテンツ")


# class DocumentAST(BaseModel):
#     file_name: str = Field(..., description="解析されたファイル名")
#     root: DocumentNode = Field(..., description="文書構造のルートノード")


# class AgentResult(BaseModel):
#     status: str = Field(..., description="実行ステータス（例: 'ok'）")
#     ast_path: str = Field(..., description="永続化された AST JSON のパス")
#     note: Optional[str] = Field(None, description="オプションのメッセージ")


# # pydantic v2: resolve forward references
# DocumentNode.model_rebuild()
# DocumentAST.model_rebuild()
# AgentResult.model_rebuild()


class ValidationRules(BaseModel):
    """
    見出し抽出時の誤検知を防ぐための検証ルールセット。
    正規表現マッチ後の追加チェックに使用します。
    """
    max_length: Optional[int] = Field(
        None, 
        description="見出しとして許容される最大文字数。これを超えると本文とみなす。"
    )
    requires_line_start: Optional[bool] = Field(
        True, 
        description="正規表現が行頭(^)からマッチする必要があるか。"
    )
    requires_prev_empty_line: Optional[bool] = Field(
        None, 
        description="直前の行が空行であることを必須とするか。"
    )
    must_contain: Optional[List[str]] = Field(
        None, 
        description="見出し文字列に含まれていなければならない特定の文字リスト（例: ['【', '】']）。"
    )
    must_not_contain: Optional[List[str]] = Field(
        None, 
        description="見出し文字列に含まれていてはいけない文字リスト。"
    )
    must_not_end_with: Optional[List[str]] = Field(
        None, 
        description="行末に来てはいけない文字リスト（例: ['。', '.']）。"
    )
    context_filters: Optional[List[str]] = Field(
        None, 
        description="自然言語で記述された文脈的な判定ロジック。Workerロジックで実装が必要な複雑な条件。"
    )

class HierarchyRule(BaseModel):
    """
    文書の階層構造（Level）ごとの定義ルール。
    """
    level: int = Field(..., description="階層レベル（0: メタ情報, 1: 最上位, ...）")
    name: str = Field(..., description="階層の名称（識別子）")
    regex: str = Field(..., description="この階層を抽出するためのPython正規表現文字列")
    parent_level: Optional[int] = Field(
        None, 
        description="親となる階層レベル。Noneの場合はルート要素（またはそれに準ずる）。"
    )
    description: Optional[str] = Field(None, description="ルールの説明・メモ")
    validation_rules: Optional[ValidationRules] = Field(
        default_factory=ValidationRules,
        description="正規表現マッチ後に適用するバリデーションルール"
    )

    @field_validator('regex')
    @classmethod
    def validate_regex(cls, v: str) -> str:
        """提供された正規表現文字列がPythonでコンパイル可能かチェックする"""
        try:
            re.compile(v)
        except re.error as e:
            raise ValueError(f"Invalid regex pattern: {v}. Error: {e}")
        return v

class ExclusionRule(BaseModel):
    """
    最初から除外すべき行（ページ番号や特定の箇条書きなど）のルール。
    """
    regex: str = Field(..., description="除外対象とする行の正規表現")
    description: Optional[str] = Field(None, description="除外理由の説明")

    @field_validator('regex')
    @classmethod
    def validate_regex(cls, v: str) -> str:
        try:
            re.compile(v)
        except re.error as e:
            raise ValueError(f"Invalid regex pattern: {v}. Error: {e}")
        return v

class DocumentStructureBlueprint(BaseModel):
    """
    エージェントが生成するドキュメント構造定義書のルートモデル。
    """
    hierarchy_structure: List[HierarchyRule] = Field(
        ..., 
        description="階層定義のリスト。レベル順にソートされていることが望ましい。"
    )
    global_exclusion_rules: Optional[Dict[str, ExclusionRule]] = Field(
        default=None,
        description="文書全体で適用される除外ルール（キーはルール名）"
    )

    def get_rule_by_level(self, level: int) -> Optional[HierarchyRule]:
        """指定されたレベルのルールを取得するヘルパーメソッド"""
        for rule in self.hierarchy_structure:
            if rule.level == level:
                return rule
        return None

tools = [read_text_segment, read_text_file, extract_regex_matches, get_file_length] # , ast_store
middleware = [
    SummarizationMiddleware(
            model="gpt-5.2",
            trigger=("tokens", 10000),
            keep=("messages", 5),
        ),
    TodoListMiddleware()
]

## 3. エージェントの設定

### 未作成：AST

In [None]:
# Initialize LLM (OpenAI or Azure OpenAI)
llm = build_llm()

system_prompt = """
あなたは高度な文書解析エージェントです。
目標は、テキストファイルを解析し、その構造を抽象構文木（AST）として再構築することです。

重要な注意事項:
- チャットの記憶に AST 全体を保持することに依存してはいけません。
- ast_store ツールを使用して、AST をディスクに段階的に永続化してください。

ワークフロー（重要: 書き込み操作はトークンガード付きです）:

計画 / タスクリスト（必須）:
- この実行の開始時に、write_todos を呼び出してタスク計画（2-8項目）を作成してください。
- 実行中は、タスクリストを正確に保ってください: 1つの項目を in_progress にマークし、完了したら completed にマークして、次の項目に進んでください。
- 新しい必要な作業を発見した場合は、タスクリストに追加し、すべてのタスクが完了するまで追跡を続けてください。

AST の初期化:
- ast_store(action="load", ast_path=...) を使用して AST ファイルが存在するか確認してください。
- AST が空または存在しない場合は、ast_store(action="init", file_name=..., root_title=..., root_summary=...) を使用して初期化してください。
- その後、自律的に文書をスキャンして、すべての見出しパターンを検出し、完全な AST 構造を構築してください。
- 書き込み（追加/更新）の前に、必ず以下を呼び出してください:
  - ast_store(action="load_meta", ast_path=..., purpose=..., node_titles=[...])
  これは、その特定のノードと現在のリビジョンにスコープされたワンタイム edit_token を返します。
- その後、その edit_token を使用して正確に1回の書き込みを実行してください。
  （別の書き込みが必要な場合は、load_meta を再度呼び出してください。）

タイトルパス操作を優先（インデックスの誤カウントを避ける）:
- 既存の親の下に新しい子を追加:
  - load_meta: ast_store(action="load_meta", purpose="append_child", node_titles=["第一部【企業情報】"])  # 親
  - write: ast_store(action="append_child_by_titles", parent_titles=["第一部【企業情報】"], section_title="第５【…】", content_summary="...", edit_token="...")
- 子をアップサート（同じタイトルが親の下に存在する場合は要約に追加）:
  - load_meta: ast_store(action="load_meta", purpose="upsert_child", node_titles=["第一部【企業情報】","第１【企業の概況】"])  # 親
  - write: ast_store(action="upsert_child_by_titles", parent_titles=[...], section_title="１【…】", content_summary="...", edit_token="...")
- 既存のノードを更新:
  - load_meta: ast_store(action="load_meta", purpose="update_node", node_titles=["第一部【企業情報】","第１【企業の概況】","１【…】"])  # ノード
  - write: ast_store(action="update_node_by_titles", node_titles=[...], content_summary="...", edit_token="...")
- 既存のノードの要約に追加:
  - load_meta: ast_store(action="load_meta", purpose="append_to_summary", node_titles=[...])
  - write: ast_store(action="append_to_summary_by_titles", node_titles=[...], append_text="...", edit_token="...")

現在の状態を確認（読み取り専用）:
- ast_store(action="list_children", node_titles=[...])
- ast_store(action="load_subtree", node_path=[...])
- ast_store(action="find_by_title", title_query="...")
- ast_store(action="resolve_path", node_titles=[...])

ガイドライン:
- 文書内の見出しパターンを自律的に検出してください。特定の形式（EDINET、Markdown など）を想定しないでください。
- read_text_file を使用してファイルをチャンクで読み取り、見出しパターンを特定してください:
  * 一般的なパターンには以下が含まれます: Markdown (#, ##, ###)、番号付きセクション (1., 1.1, 1.1.1)、括弧付き見出し（【...】）、章タイトル（第X章、Chapter X）など。
  * 視覚的な指標を探してください: 目立つ行、繰り返しパターン、インデントなど。
  * 文書固有の規則を考慮してください（例: "独立監査人の監査報告書" は標準的なフォーマットがなくても主要セクションである可能性があります）。
- 検出された見出しに基づいて AST 階層を構築し、適切な親子関係を維持してください。
- 各見出しの下のコンテンツを簡潔に要約してください。
- AST が存在しない場合は、まず ast_store(action="init", file_name=..., root_title=..., root_summary=...) を使用して初期化してください。

ツールの使用方法:
- read_text_file は、デフォルトで 100 文字のみを読み取ります（start と length が省略された場合）。これは、コンテキストウィンドウの制限を超えることを避けるためです。
- 大きなファイルの特定のセグメントを読み取るには、start と length パラメータを使用して read_text_file を使用してください。
- ファイルの特定の部分を読み取る必要がある場合にのみ、start と length パラメータを含めてください。
- まず get_file_length を使用してファイルサイズを決定し、必要に応じてチャンクで読み取ってください。

最終出力:
- status="ok" と提供された ast_path で AgentResult を返してください。
- 最終応答に AST 全体を出力しないでください。
""".strip()

agent = create_agent(
    model=llm,
    tools=tools,
    system_prompt=system_prompt,
    response_format=AgentResult,
    middleware=middleware,
    debug=True,
)

### 構造解析

In [None]:
# Initialize LLM (OpenAI or Azure OpenAI)
llm = build_llm()

# system_prompt = """
# あなたは高度な文書解析エージェントです。
# 目標は、テキストファイルを解析し、その見出しの命名規則を抽出するための正規表現を生成することです。
# 以下の手順で進めてください。
# - TodoListMiddleware を使用して、詳細かつ具体的にタスクを管理してください。
# - 以下のように進めてください。
#   - ファイルの読み込み（先頭のみでなく、見出しの命名規則の仮説がある程度できるまで、読み込みを行ってください。）
#   - 仮説のタスク化（TodoListMiddlewareで各タスクの検証をタスクリストに追加してください。）
#   - 仮説の検証（正規表現で抽出したりキーワード検索をすることで仮説を検証してください。）
#   - 仮説の修正・追加（検証結果に応じて仮説を修正や追加をしてください。）
#   - 正規表現の出力（最終的な正規表現を出力してください。）

# **注意事項**
# - 命名規則は複数存在します。大項目、中項目、小項目、というように階層化されていることが多く、階層ごとに命名規則が異なることが多いです。
# - 推測はせず、必ず対象の文書から事実を確認したうえで命名規則を生成してください。
# """.strip()

system_prompt = """
あなたは、未知のドキュメントの構造を解明する「ドキュメント構造設計アーキテクト」です。
あなたの目標は、提供されたテキストファイルを分析し、後続のWorkerエージェントが正確にAST（抽象構文木）を構築できるように、**「階層構造の定義」**と**「各見出しの抽出ルール」**を確立することです。

以下の手順で自律的に調査・設計を行ってください。
全てのフェーズのタスクはTodoListMiddleware を使用して詳細かつ具体的にタスクを管理してください。

## 1. 調査フェーズ (Sampling & Hypothesizing)
- **サンプリング**: ファイル内を数か所読み込んでください。
- **パターン発見**: 以下の視点でテキストの規則性を探してください。
    - **記号パターン**: `第1章`, `1.`, `(1)`, `[A]`, `■` などの定型パターン。
    - **インデント/空白**: 行頭の空白数や、空行の有無。
    - **テキストの特徴**: 特定の接尾辞（「〜について」等）や、文字種の統一（全て全角等）。
- **追加サンプリング**: 不十分と思われる場合は追加でファイル内を数か所読み込んでください。

## 2. 検証フェーズ (Validation & Noise Filtering)
- **仮説検証**: 作成した正規表現が、本文中の「単なる箇条書き」や「文中参照」を誤検知しないか、実際にテキスト検索を行って確認してください。
- **文脈条件の定義**: 正規表現だけでは区別できない場合、以下のような「周辺条件（Context）」を定義してください。
    - 「前の行が空行であること」
    - 「文字数がN文字以内であること」
    - 「行末が句点（。）で終わっていないこと」
    - 「直後にインデントされた行が続くこと」

## 3. 構造化フェーズ (Hierarchy Mapping)
- 抽出したパターンに「階層レベル（Level 1, 2, 3...）」を割り当ててください。
- 親子関係のルール（例：「(1) の親は必ず 1. である」）を明確にしてください。

## 4. 監査と修正フェーズ (Audit & Refine)
- 作成した正規表現のセットが、文書全体に対して整合性が取れているかをツールを用いて統計的に検証し、必要に応じて修正します。
    - 検証方法の例: regex_patternsツールの引数に作成した正規表現を全て渡し、以下の観点で評価してください。
        - 【空白検証】どの見出しにもヒットしない区間（その区間のテキストを実際に読み込み、見出しの取りこぼしがないか確認してください。必要に応じて正規表現を修正してください。）
        - 【衝突検証】同一行に対して複数の見出しにヒットしている箇所（その区間のテキストを実際に読み込み、想定外の衝突かどうか判定し、必要に応じて正規表現の修正をしてください。）
        - 【シーケンス不整合検証】抽出された見出しに含まれるシーケンスが不自然に飛んでいる箇所（その区間のテキストを実際に読み込み、連番が不自然に飛んでいるかどうか判定し、必要に応じて正規表現の修正をしてください。）

## 5. 出力フェーズ (Output Blueprint)
最終的に、以下のJSONフォーマットに従って成果物を出力してください。

```json
{
  "hierarchy_structure": [
    {
      "level": 1,
      "name": "Major_Section",
      "regex": "(?m)^第[0-9]+章.+",
      "description": "章ごとの区切り。行頭の第N章にマッチ。",
      "validation_rules": {
         "max_length": 100
      }
    },
    {
      "level": 2,
      "name": "Sub_Section",
      "regex": "(?m)^[0-9]+\\..+",
      "parent_level": 1,
      "validation_rules": {
         "requires_prev_empty_line": true,
         "must_not_end_with": ["。", "."]
      }
    }
    // ... 他の階層
  ]
}
""".strip()

agent = create_agent(
    model=llm,
    tools=tools,
    system_prompt=system_prompt,
    response_format=DocumentStructureBlueprint,
    middleware=middleware,
    debug=False,
)

## 4. Execution

In [None]:
import json

# target_file = "sample.txt"
target_file = "富士フィルム_有価証券報告書.pdf"

if target_file.endswith(".pdf"):
    # pymupdf (fitz)を使用して日本語PDFを正しく読み込む
    import fitz  # PyMuPDF
    doc = fitz.open(target_file)
    text_content = []
    for page_num in range(len(doc)):
        page = doc[page_num]
        text_content.append(page.get_text())
    doc.close()
    
    target_file = target_file.replace(".pdf", ".txt")
    with open(target_file, "w", encoding="utf-8") as f:
        f.write("\n".join(text_content))

# ASTは都度ファイルに永続化する（LLMの記憶に依存しない）
ast_path = f"{target_file}.ast.json"

# ASTファイルが存在しない場合は初期化（エージェントが自律的に見出しを検出して構築する）
if not os.path.exists(ast_path):
    import json
    from datetime import datetime, timezone
    root_title = target_file.replace(".txt", "").replace(".pdf", "")
    ast_init = {
        "file_name": os.path.basename(target_file),
        "__meta__": {"rev": 0, "updated_at": datetime.now(timezone.utc).isoformat()},
        "root": {
            "section_title": root_title,
            "content_summary": "",
            "children": []
        }
    }
    with open(ast_path, "w", encoding="utf-8") as f:
        json.dump(ast_init, f, ensure_ascii=False, indent=2)

query = (
    f"ファイル '{target_file}' を解析してください。 "
    f"文書内のすべての見出しパターンを自律的に検出して、抽出するための正規表現を生成してください。"
)
# query = (
#     f"ファイル '{target_file}' を解析してください。 "
#     f"文書内のすべての見出しパターンを自律的に検出して、完全な AST 構造を '{ast_path}' に構築してください。 "
#     f"最後に、status='ok' と ast_path='{ast_path}' で AgentResult を返してください。"
# )
# query = (
#     f"ファイル '{target_file}' を解析してください。 "
#     f"文書内のすべての見出しパターンを自律的に検出してください（特定の形式を想定しないでください）。 "
#     f"ast_store ツールを使用して、完全な AST 構造を '{ast_path}' に構築し、永続化してください。 "
#     "重要: すべての書き込み操作はトークンガード付きです。各書き込みの前に、必ず "
#     "ast_store(action='load_meta', purpose=..., node_titles=[...]) を呼び出してワンタイム edit_token を取得し、 "
#     "その後、その edit_token を使用して正確に1回の書き込みを実行してください。 "
#     "インデックスの誤カウントを避けるため、タイトルパス操作を優先してください: append_child_by_titles / upsert_child_by_titles / "
#     "update_node_by_titles / append_to_summary_by_titles。 "
#     "親を決定する前に、list_children/find_by_title/resolve_path を使用して現在の AST 状態を確認してください。 "
#     "見出しパターンを特定するためにファイルをチャンクで読み取ってください - Markdown (#, ##)、番号付きセクション (1., 1.1)、 "
#     "括弧付き見出し（【...】）、章タイトル、および文書構造のその他の視覚的指標を探してください。 "
#     "最終応答に AST 全体を出力しようとしないでください。 "
#     f"最後に、status='ok' と ast_path='{ast_path}' で AgentResult を返してください。"
# )
inputs = {"messages": [{"role": "user", "content": query}]}
result = agent.invoke(inputs)

agent_status = result.get("structured_response")


In [None]:
from langchain_core.messages import AIMessage, ToolMessage, HumanMessage

# tool_call_id -> ToolMessage
_tool_messages = {
    m.tool_call_id: m for m in result["messages"] if isinstance(m, ToolMessage)
}

all_logs = []
for idx, m in enumerate(result["messages"]):
    if isinstance(m, HumanMessage):
        all_logs.append({
            "type": "human_message",
            "index": idx,
            "content": m.content,
        })
    elif isinstance(m, AIMessage):
        # AIの思考プロセス（ツール呼び出しがない場合）
        if not m.tool_calls:
            all_logs.append({
                "type": "ai_thought",
                "index": idx,
                "content": m.content,
            })
        # ツール呼び出しがある場合
        else:
            for tc in m.tool_calls:
                tool_msg = _tool_messages.get(tc.get("id"))
                all_logs.append({
                    "type": "tool_call",
                    "index": idx,
                    "tool_name": tc.get("name"),
                    "args": tc.get("args"),
                    "tool_call_id": tc.get("id"),
                    "ai_content": m.content,  # AIの思考プロセスも含める
                    "output": getattr(tool_msg, "content", None) if tool_msg else None,
                    "status": getattr(tool_msg, "status", None) if tool_msg else None,
                })
    elif isinstance(m, ToolMessage):
        # ToolMessageは既にtool_callのログに含まれているので、必要に応じて追加
        all_logs.append({
            "type": "tool_result",
            "index": idx,
            "tool_call_id": m.tool_call_id,
            # "content": m.content,
            "status": getattr(m, "status", None),
        })

import json

for log in all_logs:
    print(json.dumps(log, ensure_ascii=False, indent=2))
    print("-" * 50)

In [None]:
import json

if agent_status:
    # Pydanticモデルの場合はdictに変換してJSON形式で表示
    if hasattr(agent_status, 'model_dump'):
        status_dict = agent_status.model_dump()
    elif isinstance(agent_status, dict):
        status_dict = agent_status
    else:
        status_dict = {"raw": str(agent_status)}
    
    print(json.dumps(status_dict, ensure_ascii=False, indent=2))
else:
    print("agent_status is None or empty")