In [130]:
%cd /Users/ymxu/Workspace/MuDocU/DTAgent

/Users/ymxu/Workspace/MuDocU/DTAgent


# Test Planner (Strict)

In [10]:
import os, sys, pathlib
def add_repo_root(marker='src'):
    p = pathlib.Path.cwd()
    while p != p.parent:
        if (p/marker).exists():
            sys.path.insert(0, str(p))
            print('Added repo root to sys.path:', p)
            return True
        p = p.parent
    return False
ok = add_repo_root('src')
assert ok, '未找到包含 src/ 的仓库根目录，请手工 %cd 到仓库根'


Added repo root to sys.path: /Users/ymxu/Workspace/MuDocU/DTAgent


In [11]:
import numpy as np
from src.agents.retriever_impl import JsonlRetriever
from src.agents.planner import PlannerLLMStrict
from src.index.embed_models import get_embedder
from src.utils.llm_clients import gpt_llm_call, qwen_llm_call
print('Imports OK')


Imports OK


In [12]:
# 严格配置（请按实验修改）
INDEX_DIR = './../../../data/users/yiming/dtagent/MinerU_25_MMLB/honor_watch_gs_pro/indexes'
EMBED_MODEL = 'BAAI/bge-small-en-v1.5'  
LLM_BACKEND = 'gpt'               
LLM_MODEL = 'gpt-4o' 

assert os.path.isdir(INDEX_DIR), f'indexes 目录不存在: {INDEX_DIR}'
print('INDEX_DIR =', os.path.abspath(INDEX_DIR))

if LLM_BACKEND=='gpt':
    assert os.environ.get('OPENAI_API_KEY'), '缺少 OPENAI_API_KEY'
else:
    assert os.environ.get('DASHSCOPE_API_KEY'), '缺少 DASHSCOPE_API_KEY'


INDEX_DIR = /Users/ymxu/data/users/yiming/dtagent/MinerU_25_MMLB/honor_watch_gs_pro/indexes


In [13]:

emb = get_embedder(EMBED_MODEL)
def encode_fn(texts):
    X = emb.encode(texts)
    return np.array(X, dtype=np.float32)
R = JsonlRetriever(INDEX_DIR, encode_fn)
print('Retriever ready with', EMBED_MODEL)


Retriever ready with BAAI/bge-small-en-v1.5


In [14]:

def llm_call(messages, json_mode=True, max_tokens=256, **kw):
    if LLM_BACKEND=='gpt':
        return gpt_llm_call(messages, model=LLM_MODEL, json_mode=json_mode)
    return qwen_llm_call(messages, model=LLM_MODEL, json_mode=json_mode)
planner = PlannerLLMStrict(R, llm_call=llm_call, model_name=LLM_MODEL, retry=2, topk_candidates=8, min_pool_size=10)
print('Planner ready:', LLM_BACKEND, LLM_MODEL)


Planner ready: gpt gpt-4o


In [15]:

question = "If \"--\" is displayed as the resting heart rate reading, what kind of environment should the user stay in?"
plan = planner.plan(question)
print('Candidates:', len(plan.candidates))
for i,c in enumerate(plan.candidates[:10], 1):
    print(f'  #{i}', c.node_id, c.role, f'score={c.score:.4f}', '| why=', ','.join(c.why[:3]))
print('Steps:', len(plan.steps))
for i,s in enumerate(plan.steps, 1):
    print(f'  - [{i}]', s.op, 'node=', s.node, 'expect=', s.expect)


Candidates: 10
  #1 sec_526 section score=0.2242 | why= dense@37,dense@37,bm25@12
  #2 sec_474 section score=0.2135 | why= dense@14,dense@14,unit_hit
  #3 sec_580 section score=0.2116 | why= dense@26,dense@26,unit_hit
  #4 sec_586 section score=0.2112 | why= dense@29,dense@29,unit_hit
  #5 sec_230 section score=0.2096 | why= dense@44,dense@44,unit_hit
  #6 sec_148 section score=0.2085 | why= dense@57,dense@57,unit_hit
  #7 sec_271 section score=0.2085 | why= dense@58,dense@58,unit_hit
  #8 sec_152 section score=0.2076 | why= dense@71,dense@71,unit_hit
  #9 sec_181 section score=0.2075 | why= dense@73,dense@73,unit_hit
  #10 sec_83 section score=0.2075 | why= dense@74,dense@74,unit_hit
Steps: 1
  - [1] SCAN_PARAS node= sec_526 expect= Find information about the environment when resting heart rate is "--".


In [16]:

question = "How many reasoning steps are involved in the figure 1 in the paper?"
plan = planner.plan(question)
print('Candidates:', len(plan.candidates))
for i,c in enumerate(plan.candidates[:10], 1):
    print(f'  #{i}', c.node_id, c.role, f'score={c.score:.4f}', '| why=', ','.join(c.why[:3]))
print('Steps:', len(plan.steps))
for i,s in enumerate(plan.steps, 1):
    print(f'  - [{i}]', s.op, 'node=', s.node, 'expect=', s.expect)


PlannerError: LLM intent_v2 failed

In [None]:

question = "How many green and grey rectangles in the first figure of the paper? Please list the numbers in the list format by descending order. e.g., [\"2\",\"1\"]"
plan = planner.plan(question)
print('Candidates:', len(plan.candidates))
for i,c in enumerate(plan.candidates[:10], 1):
    print(f'  #{i}', c.node_id, c.role, f'score={c.score:.4f}', '| why=', ','.join(c.why[:3]))
print('Steps:', len(plan.steps))
for i,s in enumerate(plan.steps, 1):
    print(f'  - [{i}]', s.op, 'node=', s.node, 'expect=', s.expect)


Candidates: 10
  #1 img_45 image score=0.4313 | why= dense@2,dense@2,bm25@6
  #2 img_13 image score=0.2313 | why= dense@6,dense@6,bm25@2
  #3 img_75 image score=0.2313 | why= dense@3,dense@3,bm25@5
  #4 img_14 image score=0.2300 | why= dense@11,dense@11,bm25@3
  #5 sec_106 section score=0.2288 | why= dense@7,dense@7,bm25@12
  #6 img_25 image score=0.2284 | why= dense@18,dense@18,bm25@4
  #7 sec_125 section score=0.2261 | why= dense@23,dense@23,bm25@11
  #8 sec_116 section score=0.2250 | why= dense@26,dense@26,bm25@15
  #9 sec_133 section score=0.2249 | why= dense@36,dense@36,bm25@9
  #10 sec_36 section score=0.2244 | why= dense@28,dense@28,bm25@17
Steps: 3
  - [1] FIND_NODES node= img_13 expect= nodes containing green and grey rectangles
  - [2] COUNT node= img_13 expect= count of green and grey rectangles
  - [3] COUNT node= img_25 expect= count of green and grey rectangles


In [None]:

question = "How many more claims does the Wiki Table datasets have comparing to scientific articles datasets? Please writeh the answer in int format."
plan = planner.plan(question)
print('Candidates:', len(plan.candidates))
for i,c in enumerate(plan.candidates[:10], 1):
    print(f'  #{i}', c.node_id, c.role, f'score={c.score:.4f}', '| why=', ','.join(c.why[:3]))
print('Steps:', len(plan.steps))
for i,s in enumerate(plan.steps, 1):
    print(f'  - [{i}]', s.op, 'node=', s.node, 'expect=', s.expect)


Candidates: 10
  #1 tab_314 table score=0.6283 | why= dense@4,dense@4,bm25@19
  #2 tab_307 table score=0.6282 | why= dense@3,dense@3,bm25@21
  #3 tab_33 table score=0.4328 | why= dense@1,dense@1,bm25@1
  #4 tab_333 table score=0.4315 | why= dense@2,dense@2,bm25@5
  #5 tab_352 table score=0.4313 | why= dense@5,dense@5,bm25@3
  #6 tab_44 table score=0.4302 | why= dense@11,dense@11,bm25@2
  #7 tab_76 table score=0.4301 | why= dense@9,dense@9,bm25@4
  #8 tab_123 table score=0.4292 | why= dense@7,dense@7,bm25@10
  #9 tab_327 table score=0.4285 | why= dense@6,dense@6,bm25@15
  #10 tab_53 table score=0.4284 | why= dense@12,dense@12,bm25@9
Steps: 3
  - [1] FIND_NODES node= tab_314 expect= Node found for Wiki Table dataset
  - [2] COUNT node= tab_314 expect= Count of claims for Wiki Table dataset
  - [3] COUNT node= tab_307 expect= Count of claims for scientific articles dataset


In [None]:

question = "How many USD dollars will the author spend if the annotators finish annotating all of claims of the authors' proposed dataset? Please write the answer in float format."
plan = planner.plan(question)
print('Candidates:', len(plan.candidates))
for i,c in enumerate(plan.candidates[:10], 1):
    print(f'  #{i}', c.node_id, c.role, f'score={c.score:.4f}', '| why=', ','.join(c.why[:3]))
print('Steps:', len(plan.steps))
for i,s in enumerate(plan.steps, 1):
    print(f'  - [{i}]', s.op, 'node=', s.node, 'expect=', s.expect)

Candidates: 10
  #1 sec_109 section score=0.2306 | why= dense@8,dense@8,bm25@3
  #2 sec_47 section score=0.2267 | why= dense@18,dense@18,bm25@12
  #3 sec_36 section score=0.2258 | why= dense@25,dense@25,bm25@11
  #4 sec_106 section score=0.2161 | why= dense@2,dense@2,unit_hit
  #5 sec_38 section score=0.2154 | why= dense@5,dense@5,unit_hit
  #6 sec_0 section score=0.2152 | why= dense@6,dense@6,unit_hit
  #7 sec_112 section score=0.2149 | why= dense@7,dense@7,unit_hit
  #8 sec_114 section score=0.2141 | why= dense@11,dense@11,unit_hit
  #9 sec_125 section score=0.2139 | why= dense@12,dense@12,unit_hit
  #10 sec_116 section score=0.2123 | why= dense@21,dense@21,unit_hit
Steps: 3
  - [1] FIND_NODES node= sec_109 expect= nodes containing cost information in USD
  - [2] FIND_COL node= sec_109 expect= column index for cost
  - [3] COUNT node= sec_109 expect= total cost in USD


In [133]:
doc_path = "./../../../../data/users/yiming/dtagent/MinerU_25_MMLB/f8d3a162ab9507e021d83dd109118b60/doctree.mm.json"

In [134]:
import json, os
from typing import Any, Dict, List, Tuple, Optional
from collections import defaultdict, Counter

def load_doctree(path: str) -> Dict[str, Any]:
    with open(path, "r", encoding="utf-8") as f:
        root = json.load(f)
    if not (isinstance(root, dict) and root.get("type") == "document"):
        raise ValueError("不是 DocTree 根节点（type=document）")
    return root

In [135]:
root

{'type': 'document',
 'doc_id': 'ACTIVISIONBLIZZARD_2019_10K',
 'meta': {'pages': 198, 'mode': 'doc', 'columns': 1},
 'toc': [{'title': 'PART I', 'level': 1, 'page': None, 'target': None},
  {'title': 'PART II', 'level': 1, 'page': None, 'target': None},
  {'title': 'PART III', 'level': 1, 'page': None, 'target': None},
  {'title': 'PART IV', 'level': 1, 'page': None, 'target': None},
  {'title': 'Exhibit Index', 'level': 1, 'page': 65, 'target': None},
  {'title': 'SIGNATURES', 'level': 1, 'page': 65, 'target': 'sec_798'}],
 'views': [{'type': 'page', 'node_id': 'P0', 'page_idx': 0},
  {'type': 'page', 'node_id': 'P1', 'page_idx': 1},
  {'type': 'page', 'node_id': 'P2', 'page_idx': 2},
  {'type': 'page', 'node_id': 'P3', 'page_idx': 3},
  {'type': 'page', 'node_id': 'P4', 'page_idx': 4},
  {'type': 'page', 'node_id': 'P5', 'page_idx': 5},
  {'type': 'page', 'node_id': 'P6', 'page_idx': 6},
  {'type': 'page', 'node_id': 'P7', 'page_idx': 7},
  {'type': 'page', 'node_id': 'P8', 'page_id

In [136]:
def show_layer(stats: List[Dict[str, Any]], depth: int, type_filter: Optional[str] = None):
    s = next((x for x in stats if x["depth"] == depth), None)
    if not s:
        print(f"没有 depth={depth} 的层"); return
    print(f"[Layer {depth}] total={s['total']} leaf={s['leaf']} leaf_ratio={s['leaf_ratio']} avg_children={s['avg_children']}")
    # 打印示例
    keys = sorted(s["samples"].keys())
    for tk in keys:
        if type_filter and tk != type_filter:
            continue
        items = s["samples"][tk]
        print(f"  - {tk} (samples={len(items)}):")
        for it in items:
            nid = it.get("node_id","")
            pv = it.get("preview","")
            print(f"      {nid} | {pv}")

In [137]:
def children_of(node: Dict[str, Any]) -> List[Dict[str, Any]]:
    ch = node.get("children")
    return ch if isinstance(ch, list) else []

def is_leaf(node: Dict[str, Any]) -> bool:
    ch = children_of(node)
    return len(ch) == 0
def type_key(node: Dict[str, Any]) -> str:
    t = str(node.get("type") or "")
    # 细分 text 的角色（paragraph/caption），便于感受不同层的内容
    if t == "text":
        role = str(node.get("role") or "")
        if role:
            return f"text:{role}"
    return t

In [138]:
def preview_text(node: Dict[str, Any], max_len: int = 80) -> str:
    t = node.get("type")
    if t == "section":
        return str(node.get("title") or "")
    if t == "image":
        # 图片：优先描述，其次路径
        desc = node.get("description") or ""
        if desc:
            return desc[:max_len]
        p = node.get("image_path") or ""
        return f"[image] {os.path.basename(p)}"
    if t == "table":
        data = node.get("data") or ""
        s = str(data)
        return (" ".join(s.split()))[:max_len]
    if t == "list":
        items = node.get("items") or []
        if items and isinstance(items, list):
            first = items[0]
            if isinstance(first, dict):
                return str(first.get("text") or "")[:max_len]
            return str(first)[:max_len]
        return "[list]"
    if t == "equation":
        return (str(node.get("text") or ""))[:max_len]
    if t == "text":
        return (str(node.get("text") or ""))[:max_len]
    return f"[{t}]"

In [139]:
def build_levels(root: Dict[str, Any], max_depth: Optional[int] = None) -> Dict[int, List[Dict[str, Any]]]:
    levels: Dict[int, List[Dict[str, Any]]] = defaultdict(list)
    def dfs(n: Dict[str, Any], d: int):
        levels[d].append(n)
        if max_depth is not None and d >= max_depth:
            return
        for c in children_of(n):
            dfs(c, d+1)
    dfs(root, 0)
    return dict(levels)
def summarize_levels(levels: Dict[int, List[Dict[str, Any]]], samples_per_type: int = 3) -> List[Dict[str, Any]]:
    out: List[Dict[str, Any]] = []
    max_depth = max(levels.keys()) if levels else -1
    for d in range(0, max_depth+1):
        nodes = levels.get(d, [])
        if not nodes:
            continue
        # 统计
        total = len(nodes)
        leaf_cnt = sum(1 for n in nodes if is_leaf(n))
        leaf_ratio = leaf_cnt / total if total else 0.0
        degs = [len(children_of(n)) for n in nodes]
        avg_deg = (sum(degs) / total) if total else 0.0
        types = [type_key(n) for n in nodes]
        type_dist = Counter(types)
        # 示例
        samples: Dict[str, List[Dict[str, str]]] = defaultdict(list)
        for n in nodes:
            tk = type_key(n)
            if len(samples[tk]) < samples_per_type:
                samples[tk].append({
                    "node_id": str(n.get("node_id") or ""),
                    "type": tk,
                    "preview": preview_text(n, 100),
                })
        out.append({
            "depth": d,
            "total": total,
            "leaf": leaf_cnt,
            "leaf_ratio": round(leaf_ratio, 3),
            "avg_children": round(avg_deg, 3),
            "type_counts": dict(type_dist),
            "samples": dict(samples),
        })
    return out

In [140]:
def peek_level(lv, depth, *, limit=10, type_filter=None, show_preview=True):
    nodes = lv.get(depth, [])
    if type_filter:
        nodes = [n for n in nodes if type_key(n) == type_filter]
    print(f"[depth {depth}] total={len(nodes)} type={type_filter or 'ALL'}")
    from collections import Counter
    if not type_filter:
        print("type_counts:", Counter(type_key(n) for n in nodes))
    for n in nodes[:limit]:
        nid = n.get("node_id")
        t = type_key(n)
        if show_preview:
            print(f"  - {nid} | {t} | {preview_text(n)}")
        else:
            print(f"  - {nid} | {t}")

In [144]:

doc_path = "./../../../data/users/yiming/dtagent/MinerU_25_MMLB/ACTIVISIONBLIZZARD_2019_10K/doctree.mm.json"
root = load_doctree(doc_path)
lv = build_levels(root, max_depth=None)
stats = summarize_levels(lv, samples_per_type=3)
# show_layer(stats, levels=1)
peek_level(lv,2)

[depth 2] total=63 type=ALL
type_counts: Counter({'text:paragraph': 35, 'section': 23, 'image': 4, 'table': 1})
  - t_3 | text:paragraph | ANNUAL REPORT PURSUANT TO SECTION 13 OR 15(d) OF THE SECURITIES EXCHANGE ACT OF 
  - t_4 | text:paragraph | For the Fiscal Year Ended December 31, 2019
  - t_5 | text:paragraph | OR
  - t_6 | text:paragraph | □TRANSITION REPORT PURSUANT TO SECTION 13 OR 15(d) OF THE SECURITIES EXCHANGE AC
  - t_7 | text:paragraph | For the transition period from to
  - t_8 | text:paragraph | Commission File Number 1-15839
  - t_11 | text:paragraph | (Exact name of registrant as specified in its charter)
  - t_13 | text:paragraph | (State or other jurisdiction of incorporation or organization)
  - t_14 | text:paragraph | 3100 Ocean Park Boulevard
  - t_15 | text:paragraph | Santa Monica,


In [145]:
show_layer(stats, depth=3)

[Layer 3] total=319 leaf=135 leaf_ratio=0.423 avg_children=2.661
  - list (samples=1):
      lst_801 | (a) 1 Financial Statements See Item 8.—Consolidated Financial Statements and Supplementary Data for 
  - section (samples=3):
      sec_58 | PART I
      sec_67 | Overview
      sec_70 | Our Strategy and Vision
  - table (samples=1):
      tab_360 | <table><tr><td rowspan="2"></td><td colspan="5">For the Years Ended December 31,</td></tr><tr><td>20
  - text:caption (samples=1):
      cap_54 | Table of Contents
  - text:paragraph (samples=3):
      t_60 | This Annual Report on Form 10-K contains, or incorporates by reference, certain forward-looking stat
      t_61 | The company cautions that a number of important factors could cause Activision Blizzard, Inc.'s actu
      t_62 | Activision Blizzard Inc.'s names, abbreviations thereof, logos, and product and service designators 


In [146]:
def show_headings_at_level(lv, depth, *, unique=False, show_id=False, show_level=True, limit=None, pattern=None):
    import re
    nodes = [n for n in lv.get(depth, []) if n.get("type") == "section"]
    if pattern:
        rx = re.compile(pattern, re.I)
        nodes = [n for n in nodes if rx.search(str(n.get("title") or ""))]
    # 去重（按 title_norm 优先，退化到 lower(strip)）
    if unique:
        seen = set()
        deduped = []
        for n in nodes:
            key = n.get("title_norm") or str(n.get("title") or "").strip().lower()
            if key in seen: continue
            seen.add(key); deduped.append(n)
        nodes = deduped
    if limit is not None:
        nodes = nodes[:limit]
    print(f"[depth {depth}] headings: {len(nodes)}")
    for n in nodes:
        title = str(n.get("title") or "")
        parts = []
        if show_id: parts.append(str(n.get("node_id") or ""))
        if show_level and n.get("level") is not None: parts.append(f"H{n.get('level')}")
        if n.get("page_idx") is not None: parts.append(f"p{n.get('page_idx')}")
        meta = (" | ".join(parts) + " — ") if parts else ""
        print(f"  - {meta}{title}")

def show_captions_at_level(lv, depth, *, limit=None, pattern=None):
    import re
    nodes = [n for n in lv.get(depth, []) if n.get("type") == "text" and n.get("role") == "caption"]
    if pattern:
        rx = re.compile(pattern, re.I)
        nodes = [n for n in nodes if rx.search(str(n.get("text") or ""))]
    if limit is not None:
        nodes = nodes[:limit]
    print(f"[depth {depth}] captions: {len(nodes)}")
    for n in nodes:
        cap = str(n.get("text") or "")
        nid = str(n.get("node_id") or "")
        pg = f"p{n.get('page_idx')}" if n.get("page_idx") is not None else ""
        print(f"  - {nid} {pg} — {cap[:120]}")

In [147]:
show_headings_at_level(lv, 3)

[depth 3] headings: 234
  - H3 | p2 — PART I
  - H3 | p3 — Overview
  - H3 | p3 — Our Strategy and Vision
  - H3 | p3 — Reportable Segments
  - H3 | p4 — Other
  - H3 | p4 — Products
  - H3 | p5 — Product Development and Support
  - H3 | p5 — Marketing, Sales, and Distribution
  - H3 | p6 — Manufacturing
  - H3 | p6 — Significant Customers and Top Franchises
  - H3 | p6 — Competition
  - H3 | p7 — Intellectual Property
  - H3 | p8 — Employees
  - H3 | p8 — Additional Financial Information
  - H3 | p8 — Available Information
  - H3 | p14 — We may be subject to intellectual property claims.
  - H3 | p15 — Changes in tax rates or exposure to additional tax liabilities could negatively impact our business.
  - H3 | p16 — Our debt could adversely affect our business.
  - H3 | p19 — The insolvency or business failure of any of our business partners could negatively impact us.
  - H3 | p19 — We are a global company and are subject to the risks and uncertainties of conducting business outside 

In [148]:
from tree.utils_2 import print_heading_outline

In [150]:
tree = print_heading_outline(root)

In [152]:
def _media_title(n: dict) -> str:
    """Derive a short label for an image/table node using caption/description/path."""
    t = n.get("type")
    if t == "image":
        caps = n.get("image_caption") if isinstance(n.get("image_caption"), list) else []
        cap = caps[0] if caps else ""
        if cap and isinstance(cap, str):
            return cap
        desc = n.get("description") if isinstance(n.get("description"), str) else ""
        if desc:
            return desc
        p = n.get("img_path") if isinstance(n.get("img_path"), str) else "image"
        import os
        return os.path.basename(p) or "image"
    if t == "table":
        caps = n.get("table_caption") if isinstance(n.get("table_caption"), list) else []
        cap = caps[0] if caps else ""
        if cap and isinstance(cap, str):
            return cap
        return "table"
    return str(n.get("text", ""))

def _shorten_text(text: str, max_len: int = 80) -> str:
    s = (text or "").strip().replace("\n", " ")
    if len(s) <= max_len:
        return s
    return s[: max(0, max_len - 1)].rstrip() + "…"
    
def format_heading_outline(
    flat_root: dict,
    *,
    by_logical_page: bool = False,
    include_meta: bool = True,
    max_len: int = 80,
    include_media: bool = True,
) -> list:
    """
    Build a list of one-line strings representing the heading hierarchy for quick inspection.
    - Indentation reflects node_level (2 spaces per level - 1).
    - Shows node_idx, page/logical_page, level, optional via/frozen flags.
    """
    lines: list = []
    items = list(flat_root.get("children", [])) if isinstance(flat_root, dict) else []
    for n in items:
        t = n.get("type")
        if t not in ("text", "image", "table"):
            continue
        if t in ("image", "table") and not include_media:
            continue
        try:
            lvl = int(n.get("node_level", -1))
        except Exception:
            continue
        if lvl < 1:
            continue
        indent = "  " * max(0, lvl - 1)
        idx = n.get("node_idx")
        lp = n.get("logical_page")
        p = n.get("page_idx")
        # choose label
        label = _media_title(n) if t in ("image", "table") else str(n.get("text", ""))
        text = _shorten_text(label, max_len=max_len)
        meta = n.get("heading_meta") if include_meta and isinstance(n.get("heading_meta"), dict) else None
        flags: list[str] = []
        if meta:
            if meta.get("via"):
                flags.append(str(meta.get("via")))
            if meta.get("frozen"):
                flags.append("frozen")
            if meta.get("inserted"):
                flags.append("inserted")
            if meta.get("corrected_cross_level"):
                flags.append("corrected")
        # media marker
        if t == "image":
            flags.append("image")
        elif t == "table":
            flags.append("table")
        where = f"lp={lp}" if by_logical_page else f"p={p}"
        flag_str = (" [" + ",".join(flags) + "]") if flags else ""
        lines.append(f"{indent}- L{lvl} idx={idx} {where}: {text}{flag_str}")
    return lines

In [153]:
def print_heading_outline(
    flat_root: dict,
    *,
    by_logical_page: bool = True,
    include_meta: bool = True,
    max_len: int = 80,
    include_media: bool = True,
) -> None:
    """Print heading outline to stdout for quick manual inspection."""
    try:
        lines = format_heading_outline(
            flat_root,
            by_logical_page=by_logical_page,
            include_meta=include_meta,
            max_len=max_len,
            include_media=include_media,
        )
        for ln in lines:
            print(ln)
    except Exception as e:
        print(f"[print_heading_outline] failed: {e}")


In [None]:
print_heading_outline()