In [1]:
# Cell 0a — Environment bootstrap (safe & idempotent)
import os, sys, subprocess

PIP_MIN = (23, 1)  # only upgrade if pip is clearly very old
OFFLINE = os.getenv("OFFLINE", "").lower() in {"1", "true", "yes"}
NO_PIP_UPGRADE = os.getenv("NO_PIP_UPGRADE", "").lower() in {"1", "true", "yes"}

def _ver_tuple(v: str):
    parts = []
    for tok in v.split("."):
        if tok.isdigit():
            parts.append(int(tok))
        else:
            # drop dev/local tags like '24.0.1.dev0'
            num = "".join(ch for ch in tok if ch.isdigit())
            parts.append(int(num) if num else 0)
    # normalize to 3 parts
    return tuple((parts + [0, 0, 0])[:3])

def _run(cmd):
    # Keep output quiet but still fail fast if needed
    subprocess.check_call(cmd)

# 1) Ensure pip exists
try:
    import pip  # noqa: F401
except Exception:
    import ensurepip
    ensurepip.bootstrap()  # installs pip into the current interpreter
    import pip  # noqa: F401

# 2) Upgrade pip only when reasonable
is_conda = bool(os.environ.get("CONDA_PREFIX")) or "conda" in sys.version.lower()
pip_ver = _ver_tuple(getattr(pip, "__version__", "0.0.0"))

should_upgrade = (
    (pip_ver < PIP_MIN) and       # clearly outdated
    (not is_conda) and            # avoid fighting conda-managed envs
    (not OFFLINE) and             # respect offline runs
    (not NO_PIP_UPGRADE)          # allow user override
)

if should_upgrade:
    _run([sys.executable, "-m", "pip", "install", "--upgrade", "pip", "--disable-pip-version-check"])

print(f"[env] Python: {sys.executable}")
print(f"[env] pip: {pip.__version__}")
print(f"[env] mode: {'offline' if OFFLINE else 'online'} | conda:{is_conda} | auto-upgrade:{should_upgrade}")


[env] Python: c:\Users\calig\AppData\Local\Programs\Python\Python313\python.exe
[env] pip: 25.2
[env] mode: online | conda:False | auto-upgrade:False


In [2]:
# Cell 0b — Export toolchain (pandoc / docx2pdf), with Option A (winget/choco/scoop) + fallback
import os, sys, platform, subprocess, shutil, importlib.util

print(sys.executable)

OFFLINE = os.getenv("OFFLINE", "").lower() in {"1", "true", "yes"}
PY = sys.executable

def _has_module(name: str) -> bool:
    return importlib.util.find_spec(name) is not None

def _pip_install(*pkgs) -> bool:
    if OFFLINE:
        return False
    try:
        subprocess.check_call([PY, "-m", "pip", "install", "--disable-pip-version-check", *pkgs])
        return True
    except Exception:
        return False

def _run_ok(cmd):
    try:
        r = subprocess.run(cmd, check=False, capture_output=True, text=True)
        return r.returncode == 0, r.stdout + "\n" + r.stderr
    except Exception as e:
        return False, str(e)

# --- pandoc detection strategy ---
pandoc_path = shutil.which("pandoc")
HAS_PANDOC = bool(pandoc_path)

# Option A: Try system install paths (winget → choco → scoop) if pandoc missing
if not HAS_PANDOC and not OFFLINE and platform.system() == "Windows":
    # 1) winget (silent, accept agreements)
    ok, _ = _run_ok([
        "winget", "install", "--exact", "--silent",
        "--id", "JohnMacFarlane.Pandoc", "--source", "winget",
        "--accept-package-agreements", "--accept-source-agreements"
    ])
    if not ok:
        # 2) chocolatey (requires admin; if available)
        ok, _ = _run_ok(["choco", "install", "pandoc", "-y", "--no-progress"])
    if not ok:
        # 3) scoop (user-space) if installed
        has_scoop, _ = _run_ok(["scoop", "--version"])
        if has_scoop:
            ok, _ = _run_ok(["scoop", "install", "pandoc"])
        else:
            ok = False

    # refresh detection
    pandoc_path = shutil.which("pandoc")
    HAS_PANDOC = bool(pandoc_path)

# If still missing and online: try pypandoc + bundled binary as a fallback
if not HAS_PANDOC and not OFFLINE:
    if not _has_module("pypandoc"):
        _pip_install("pypandoc>=1.13")
    if not _has_module("pypandoc_binary"):
        _pip_install("pypandoc-binary>=1.11")
    try:
        import pypandoc
        pandoc_path = pypandoc.get_pandoc_path()
        HAS_PANDOC = bool(pandoc_path and os.path.exists(pandoc_path))
        if HAS_PANDOC:
            os.environ["PATH"] = os.path.dirname(pandoc_path) + os.pathsep + os.environ.get("PATH", "")
    except Exception:
        # final PATH check
        pandoc_path = shutil.which("pandoc")
        HAS_PANDOC = bool(pandoc_path)

# Optional: show pandoc version if detected
if HAS_PANDOC:
    try:
        v = subprocess.check_output(["pandoc", "--version"], text=True).splitlines()[0]
        print(f"[dep] pandoc: OK at {pandoc_path} | {v}")
    except Exception:
        print(f"[dep] pandoc: OK at {pandoc_path}")
else:
    print("[dep] pandoc: MISSING (use fallback)")

# --- docx2pdf detection/installation
HAS_DOCX2PDF = _has_module("docx2pdf")
if not HAS_DOCX2PDF and not OFFLINE:
    _pip_install("docx2pdf>=0.1.8")
    HAS_DOCX2PDF = _has_module("docx2pdf")

# LibreOffice (works on Windows too if installed)
soffice_path = shutil.which("soffice")
HAS_SOFFICE = bool(soffice_path)

print(f"[dep] docx2pdf: {'OK' if HAS_DOCX2PDF else 'MISSING'} | soffice: {soffice_path or 'MISSING'}")

# Export capability flags for later cells
os.environ["HAS_PANDOC"] = "1" if HAS_PANDOC else "0"
os.environ["HAS_DOCX2PDF"] = "1" if HAS_DOCX2PDF else "0"
os.environ["HAS_SOFFICE"] = "1" if HAS_SOFFICE else "0"
os.environ["PLATFORM"] = platform.system()

c:\Users\calig\AppData\Local\Programs\Python\Python313\python.exe
[dep] pandoc: OK at C:\Users\calig\AppData\Local\Pandoc\pandoc.exe | pandoc 3.8
[dep] docx2pdf: OK | soffice: C:\Program Files\LibreOffice\program\soffice.COM


In [3]:
# Cell 0c — Utility used by triage: verify_author_calls()

from __future__ import annotations
from pathlib import Path
import json

def verify_author_calls():
    """
    Scans logs/agent_calls.jsonl for Author stage invocations and reports basic stats:
      - total author calls
      - cache hits vs fresh
      - per-section distribution (author_block)
    Prints a concise summary and returns a dict of metrics.
    """
    p = Path("logs/agent_calls.jsonl")
    if not p.exists():
        print("[verify] logs/agent_calls.jsonl not found.")
        return {}

    total = 0
    author = 0
    editor = 0
    research = 0
    cache_hits = 0
    fresh = 0
    sections = {}

    for line in p.read_text(encoding="utf-8", errors="ignore").splitlines():
        line = line.strip()
        if not line:
            continue
        try:
            d = json.loads(line)
        except Exception:
            continue
        total += 1
        stage = (d.get("stage") or "").lower()
        agent = (d.get("agent") or "").lower()
        ev = d.get("event") or ""

        if agent.startswith("author") or stage == "author_block":
            author += 1
        if agent.startswith("editor") or stage == "editor":
            editor += 1
        if agent.startswith("research") or stage == "research":
            research += 1

        if ev == "cache_hit":
            cache_hits += 1
        elif ev == "llm_ok":
            fresh += 1

        # guess section id from label like ch7_draft_s3
        label = d.get("label") or ""
        m = re.search(r"_s(\d+)", label)
        if m:
            sec = int(m.group(1))
            sections[sec] = sections.get(sec, 0) + 1

    summary = {
        "lines": total,
        "author_calls": author,
        "editor_calls": editor,
        "research_calls": research,
        "cache_hits": cache_hits,
        "fresh_calls": fresh,
        "sections_touched": sorted(list(sections.keys())),
    }
    print("[verify] author:", author, "| editor:", editor, "| research:", research, "| cache_hits:", cache_hits, "| fresh:", fresh)
    if sections:
        print("[verify] sections touched:", summary["sections_touched"])
    return summary

print("verify_author_calls() ready")

verify_author_calls() ready


In [4]:
# Cell 1 — Environment, .env bootstrap, and OpenAI client (robust/deterministic)

import os, sys, json, re, hashlib, shutil, zipfile, subprocess
from pathlib import Path
from datetime import datetime, timezone

# -------------------- Paths & Time --------------------
ROOT = Path(".").resolve()
print(f"[env] ROOT: {ROOT}")

def now_utc_iso() -> str:
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")

# Create canonical run dirs early (idempotent)
for p in [
    "logs", "cache", "dist",
    "content/outline", "content/edits",
    "references",
    "build"
]:
    (ROOT / p).mkdir(parents=True, exist_ok=True)

# -------------------- Env helpers --------------------
def _env_bool(name: str, default=False) -> bool:
    v = os.getenv(name)
    if v is None:
        return default
    return v.strip().lower() in {"1", "true", "yes", "on"}

def _mask(s: str, keep: int = 4) -> str:
    if not s:
        return ""
    return s[:keep] + "…" + ("*" * max(0, len(s) - keep - 1))

OFFLINE = _env_bool("OFFLINE", False)
AUTO_INSTALL_OPENAI = _env_bool("AUTO_INSTALL_OPENAI", True) and not OFFLINE
AUTO_INSTALL_DOTENV = _env_bool("AUTO_INSTALL_DOTENV", True)

# -------------------- .env loading --------------------
# If you have a .env, load it before reading OPENAI_*
def _ensure_dotenv_loaded():
    try:
        from dotenv import load_dotenv
    except Exception:
        if AUTO_INSTALL_DOTENV:
            print("[setup] Installing python-dotenv…")
            subprocess.run(
                [sys.executable, "-m", "pip", "install", "--quiet", "python-dotenv>=1.0.0,<2"],
                check=False,
            )
            try:
                from dotenv import load_dotenv
            except Exception as e:
                print(f"[warn] python-dotenv not available after install: {e}")
                return
        else:
            return
    # Load .env if present in ROOT
    env_path = ROOT / ".env"
    if env_path.exists():
        load_dotenv(dotenv_path=env_path, override=False)
        print(f"[env] .env loaded from {env_path}")
    else:
        print("[env] No .env file found (skipping)")

_ensure_dotenv_loaded()

# -------------------- Determinism postures --------------------
# Encourage run-level determinism (Python hash seed). We don't force-set it here,
# but we log it so differences are visible in manifests.
PY_HASH_SEED = os.getenv("PYTHONHASHSEED")
print(f"[env] PYTHONHASHSEED={PY_HASH_SEED or '(unset)'}")

def stable_json_dumps(obj) -> str:
    """Stable JSON for hashing & cache keys."""
    return json.dumps(obj, sort_keys=True, separators=(",", ":"))

def sha1(data: str) -> str:
    return hashlib.sha1(data.encode("utf-8")).hexdigest()

# -------------------- OpenAI client (optional if OFFLINE) --------------------
client = None
timeout_s = int(os.getenv("OPENAI_TIMEOUT", "60"))  # applied per-request

if OFFLINE:
    print("[mode] OFFLINE: Skipping OpenAI client initialization.")
else:
    # Defer import/installation until after .env is loaded
    try:
        from openai import OpenAI  # >=1.x series
    except Exception:
        if AUTO_INSTALL_OPENAI:
            print("[setup] Installing openai SDK…")
            subprocess.run(
                [sys.executable, "-m", "pip", "install", "--quiet", "openai>=1.40.0,<2"],
                check=False,
            )
            try:
                from openai import OpenAI
            except Exception as e:
                raise SystemExit(f"ERROR: OpenAI SDK not available after install: {e}")
        else:
            raise SystemExit(
                "ERROR: OpenAI SDK missing and AUTO_INSTALL_OPENAI=0. "
                "Install 'openai' or enable auto-install."
            )

    api_key = os.getenv("OPENAI_API_KEY")
    base_url = os.getenv("OPENAI_BASE_URL")  # optional (for gateways/self-hosting)
    organization = os.getenv("OPENAI_ORG") or os.getenv("OPENAI_ORGANIZATION")
    project = os.getenv("OPENAI_PROJECT")  # optional

    if not api_key:
        raise SystemExit("ERROR: OPENAI_API_KEY not set. Add it to your environment or .env")

    try:
        client_kwargs = {"api_key": api_key}
        if base_url:
            client_kwargs["base_url"] = base_url
        if organization:
            client_kwargs["organization"] = organization
        if project:
            client_kwargs["project"] = project

        client = OpenAI(**client_kwargs)
        # We’ll pass request-level timeout per call; some transports also allow with_options.
        print(
            "[openai] client ready | "
            f"base_url={base_url or 'default'} | "
            f"org={organization or '-'} | project={project or '-'} | "
            f"timeout={timeout_s}s/request"
        )
        print(f"[openai] key={_mask(api_key)}")
    except Exception as e:
        raise SystemExit(f"ERROR: Could not initialize OpenAI client: {e}")

# Centralized request options for per-call usage (e.g., client.responses.create(**REQUEST_OPTS, ...))
REQUEST_OPTS = {"timeout": timeout_s}

# -------------------- Run banner --------------------
print("[env] Mode:", "OFFLINE" if OFFLINE else "ONLINE")
print("[env] Ready @", now_utc_iso())

[env] ROOT: C:\Users\Public\projects\Project-A
[env] .env loaded from C:\Users\Public\projects\Project-A\.env
[env] PYTHONHASHSEED=(unset)
[openai] client ready | base_url=default | org=- | project=- | timeout=60s/request
[openai] key=sk-p…***************************************************************************************************************************************************************
[env] Mode: ONLINE
[env] Ready @ 2025-09-12T22:08:20.416555Z


In [5]:
# Cell 2 — Configuration (generalized: fiction/nonfiction packs, env-overridable, deterministic)

import os
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Dict, Any, List

# -------------------- Ensure project dirs exist (idempotent) --------------------
DIRS = [
    "content/research",
    "content/drafts",
    "content/edits",
    "content/outline",
    "content/style",
    "content/research_inputs",
    "build",
    "dist",
    "logs",
    "references",
    "cache",
]
for d in DIRS:
    Path(d).mkdir(parents=True, exist_ok=True)

# -------------------- Small env helpers --------------------
def _env_bool(k: str, default: bool = False) -> bool:
    v = os.getenv(k)
    if v is None:
        return default
    return v.strip().lower() in {"1", "true", "yes", "on"}

def _env_int(k: str, default: int) -> int:
    try:
        return int((os.getenv(k) or "").replace("_", "").strip() or default)
    except Exception:
        return default

def _env_float(k: str, default: float) -> float:
    try:
        return float((os.getenv(k) or "").strip() or default)
    except Exception:
        return default

def _env_str(k: str, default: str) -> str:
    v = os.getenv(k)
    return v if v is not None and v.strip() != "" else default

def _env_csv(k: str, default: List[str]) -> List[str]:
    v = os.getenv(k)
    if not v:
        return default
    return [s.strip() for s in v.split(",") if s.strip()]

# -------------------- Pack registries (expandable) --------------------
FICTION_GENRE_PACKS = {
    "thriller": "thriller_beats_v3",
    "mystery": "mystery_puzzle_v2",
    "romance": "romance_beat_sheet_v2",
    "fantasy": "hero_journey_v2",
    "sci_fi": "sf_adventure_v2",
    "literary": "literary_arc_v1",
    "children": "children_story_v1",
    "ya": "ya_arc_v1",
}

NONFICTION_STRUCTURE_PACKS = {
    "howto": "howto_schema_v2",
    "case_study": "business_case_v2",
    "essay": "essay_argument_v2",
    "report": "report_structure_v2",
    "memoir": "memoir_narrative_v1",
    "history": "history_narrative_v2",
}

STYLE_PACKS = {"academic", "journalistic", "conversational", "literary", "technical"}
CITATION_STYLES = {"apa", "chicago", "ieee", "none"}

# -------------------- Book kind & packs (env-overridable) --------------------
BOOK_KIND = _env_str("BOOK_KIND", "fiction").lower()  # fiction|nonfiction
GENRE_PACK = _env_str("GENRE_PACK", "thriller").lower()
STRUCTURE_PACK = _env_str("STRUCTURE_PACK", "howto").lower()
STYLE_PACK = _env_str("STYLE_PACK", "conversational").lower()
CITATION_STYLE = _env_str("CITATION_STYLE", "none" if BOOK_KIND == "fiction" else "apa").lower()

if BOOK_KIND not in {"fiction", "nonfiction"}:
    raise SystemExit("BOOK_KIND must be 'fiction' or 'nonfiction'")

if BOOK_KIND == "fiction" and GENRE_PACK not in FICTION_GENRE_PACKS:
    raise SystemExit(f"GENRE_PACK must be one of: {', '.join(FICTION_GENRE_PACKS)}")

if BOOK_KIND == "nonfiction" and STRUCTURE_PACK not in NONFICTION_STRUCTURE_PACKS:
    raise SystemExit(f"STRUCTURE_PACK must be one of: {', '.join(NONFICTION_STRUCTURE_PACKS)}")

if STYLE_PACK not in STYLE_PACKS:
    raise SystemExit(f"STYLE_PACK must be one of: {', '.join(sorted(STYLE_PACKS))}")

if CITATION_STYLE not in CITATION_STYLES:
    raise SystemExit(f"CITATION_STYLE must be one of: {', '.join(sorted(CITATION_STYLES))}")

# Derive outline schema from selected pack
if BOOK_KIND == "fiction":
    OUTLINE_SCHEMA = FICTION_GENRE_PACKS[GENRE_PACK]
else:
    OUTLINE_SCHEMA = NONFICTION_STRUCTURE_PACKS[STRUCTURE_PACK]

# -------------------- Story asset defaults (thriller-native but generalizable) --------------------
default_characters = [
    {"name": "Alex Reyes", "role": "former NSA cyber-operative turned rogue agent", "traits": ["analytical", "haunted", "resourceful"]},
    {"name": "Eira Kwon", "role": "black-hat hacker, founder of The Nulls", "traits": ["reckless", "genius-level coder", "mistrustful"]},
    {"name": "Director Harland Vey", "role": "antagonist, head of the Oracle Division", "traits": ["calculating", "charismatic", "fanatically loyal to the AI"]},
    {"name": "The Oracle", "role": "sentient predictive AI, ambiguous intentions", "traits": ["omniscient", "unpredictable", "eerily calm"]},
]

default_outline_seed_thriller = [
    "Trigger Protocol","The Whispering Code","Neural Keys","Ghosting the Oracle","Tokyo Blackout","Zero-Day Firestorm",
    "Signals from the Nulls","The Frame Job","Fractured Loyalties","Synthetic Betrayal","The Infinite Loop","Blood in the Data Streams",
    "Exfiltration Protocol","When Futures Collide","The Oracle's Dilemma","Null State","Collapse Point","Reboot",
]

# -------------------- Book spec (env-overridable core) --------------------
book_spec: Dict[str, Any] = {
    "kind": BOOK_KIND,  # fiction|nonfiction
    "genre_pack": GENRE_PACK if BOOK_KIND == "fiction" else None,
    "structure_pack": STRUCTURE_PACK if BOOK_KIND == "nonfiction" else None,
    "style_pack": STYLE_PACK,
    "citation_style": CITATION_STYLE,

    "title": _env_str("BOOK_TITLE", "Ghost Protocol: The Oracle Gambit" if BOOK_KIND == "fiction" else "Build Once, Scale Forever"),
    "subtitle": _env_str(
        "BOOK_SUBTITLE",
        "A Cyber-Espionage Thriller About Free Will, AI, and Betrayal"
        if BOOK_KIND == "fiction"
        else "A Practical Playbook for Durable Systems, Teams, and Strategy",
    ),
    "author": _env_str("BOOK_AUTHOR", "Your Name"),
    "audience": _env_str(
        "BOOK_AUDIENCE",
        "Adult readers who enjoy techno-thrillers, espionage, and high-stakes conspiracies"
        if BOOK_KIND == "fiction"
        else "Practitioners and leaders seeking clear, cited guidance",
    ),
    "goal": _env_str(
        "BOOK_GOAL",
        "Deliver a heart-pounding thriller combining espionage, AI ethics, and hidden conspiracies, exploring free will vs algorithmic control."
        if BOOK_KIND == "fiction"
        else "Deliver a credible, well-cited manuscript that advances a clear thesis with actionable frameworks.",
    ),
    "genre": _env_str("BOOK_GENRE", "Techno-thriller / Spy Fiction" if BOOK_KIND == "fiction" else "Nonfiction"),
    "tone": _env_str(
        "BOOK_TONE",
        "Dark, tense, cerebral, action-driven, morally complex" if BOOK_KIND == "fiction" else "Clear, direct, evidence-driven"
    ),
    "reading_level": _env_str("BOOK_READING_LEVEL", "Adult (17+)"),
    "target_length_words": _env_int("BOOK_WORDS", 95_000 if BOOK_KIND == "fiction" else 60_000),
    "chapters": _env_int("BOOK_CHAPTERS", 18 if BOOK_KIND == "fiction" else 12),

    "outline_schema": OUTLINE_SCHEMA,
    "outline_constraints": _env_csv(
        "OUTLINE_CONSTRAINTS",
        [
            # Fiction defaults emphasize tension and reveal-through-action
            "Three-act structure or equivalent pacing appropriate to selected pack",
            "Cliffhanger or momentum at most chapter endings",
            "Foreshadow/payoff alignment; minimize exposition dumps",
            "Continuity of POV voice and timeline",
        ]
        if BOOK_KIND == "fiction"
        else [
            # Nonfiction defaults emphasize claim-evidence clarity
            "Logical progression from thesis → claims → evidence → counterpoints → takeaway",
            "Figures/tables placed near first reference",
            "Terminology defined once; reused consistently",
        ]
    ),

    "style_guide": {
        "voice": _env_str(
            "STYLE_VOICE",
            "Third-person limited, rotating POV; past tense" if BOOK_KIND == "fiction" else "Direct, active voice; minimal jargon"
        ),
        "formatting": _env_str(
            "STYLE_FORMATTING",
            "Markdown: H2 chapters, H3 subsections; italics for flashbacks/inner monologues; short paragraphs."
            if BOOK_KIND == "fiction"
            else "Markdown: H2 chapters, H3 sections; consistent heading hierarchy; numbered figures/tables."
        ),
        "citations": "Not applicable (fiction)." if BOOK_KIND == "fiction" else f"{CITATION_STYLE.upper()} for in-text and bibliography.",
        "terminology": _env_csv(
            "STYLE_TERMS",
            ([
                "oracle: a predictive AI that simulates futures",
                "ghosting: disappearing from all digital networks",
                "neural key: encrypted memory implant",
                "deepnet: unindexed AI-controlled layers beyond the dark web",
            ] if BOOK_KIND == "fiction" else [])
        ),
    },

    "research_policy": {
        "enabled": _env_bool("fv", True),
        # Selective: only for editor-tagged [n]/claims; the ResearchAgent returns JSON summaries & minimal citations
        "selective": True,
        "citation_format": CITATION_STYLE if BOOK_KIND == "nonfiction" else "none",
        "sources_allowed": _env_csv(
            "RESEARCH_SOURCES_OK",
            ([
                "declassified espionage tactics",
                "AI ethics research papers",
                "cybersecurity best practices",
                "public-domain intelligence archives",
            ] if BOOK_KIND == "fiction" else [
                "peer-reviewed articles",
                "official statistics",
                "government/NGO reports",
                "company filings",
            ])
        ),
        "sources_disallowed": _env_csv(
            "RESEARCH_SOURCES_NO",
            ["real classified documents", "leaked materials", "malware samples"]
        ),
    },

    "constraints": {
        "originality": "All prose must be new/unique; paraphrase-first posture.",
        "copyright": "No copyrighted fictional characters/logos; quotes require citation (nonfiction) or in-world attribution (fiction).",
        "age_appropriateness": _env_str("AGE_POLICY", "Adult themes allowed; avoid gratuitous gore/exploitation."),
        "representation": "International cast/perspectives; avoid stereotypes.",
        "localization": _env_str("LOCALE", "en-US"),
    },

    "export": {
        "docx": _env_bool("EXPORT_DOCX", True),
        "epub": _env_bool("EXPORT_EPUB", True),
        "pdf": _env_bool("EXPORT_PDF", True),
    },

    "story_assets": {
        "setting": _env_str(
            "SETTING_TEXT",
            "Near-future, hyper-connected world influenced by predictive AIs; set pieces: neo-Tokyo, abandoned Soviet site, orbital data center, underground network."
            if BOOK_KIND == "fiction" else ""
        ),
        "characters": default_characters if BOOK_KIND == "fiction" else [],
        "motifs": [
            "green cascading code overlays",
            "mirrors/reflections (identity duality)",
            "flickering neon during confrontations",
            "repeating drowning dreams (info-overload)",
        ] if BOOK_KIND == "fiction" else [],
        "themes": _env_csv(
            "THEMES",
            (["Free will vs determinism", "Surveillance vs autonomy", "Trust & betrayal", "Ethics of machine sentience"]
             if BOOK_KIND == "fiction" else
             ["Clarity vs complexity", "Evidence over opinion", "Systems thinking"])
        ),
    },

    "outline_seed": default_outline_seed_thriller if BOOK_KIND == "fiction" else [],
    "chapter_template": {
        "sections": (
            [
                "Opening hook scene",
                "Problem escalation",
                "Twist or revelation",
                "Action or moral dilemma",
                "Cliffhanger or resolution",
            ] if BOOK_KIND == "fiction" else [
                "Introduction",
                "Argument",
                "Evidence/Case",
                "Counterpoint",
                "Summary/Takeaway",
            ]
        ),
        "end_matter": (
            [
                "Encrypted Epigraph",
                "Tech Glossary Entry",
                "Foreshadowing Symbol or Clue",
            ] if BOOK_KIND == "fiction" else [
                "Key Takeaways",
                "Cited Works Added",
                "Open Questions",
            ]
        ),
        "chapter_heading_format": _env_str("CHAPTER_HEADING_FMT", "## Chapter {number}: {title}"),
        "render_section_headings": _env_bool("RENDER_SECTION_HEADINGS", BOOK_KIND != "fiction"),
        "number_sections": _env_bool("NUMBER_SECTIONS", True),
        "section_heading_format": _env_str("SECTION_HEADING_FMT", "### {index}. {title}"),
        "end_matter_heading_format": _env_str("END_MATTER_HEADING_FMT", "### {label}"),
    },
}

# -------------------- Pipeline configuration (budget, models, tokens, gates) --------------------
pipeline_config: Dict[str, Any] = {
    # Cost envelopes (reserve → reconcile)
    "RUN_COST_CAP_USD": _env_float("RUN_COST_CAP_USD", 3.00),
    "CHAPTER_COST_CAP_USD": _env_float("CHAPTER_COST_CAP_USD", 0.25),

    # Models & temps (fast vs think)
    "MODEL_ID_FAST": _env_str("MODEL_ID_FAST", "gpt-4o"),
    "MODEL_ID_THINK": _env_str("MODEL_ID_THINK", "gpt-5"),
    "TEMPERATURE": _env_float("TEMPERATURE", 0.2),

    # Run mode
    "SAMPLE_RUN_CHAPTERS": _env_int("SAMPLE_RUN_CHAPTERS", 1),  # 0 = disabled
    "FULL_RUN": _env_bool("FULL_RUN", True),
    "ULTRA_BUDGET_MODE": _env_bool("ULTRA_BUDGET_MODE", False),

    # Token budgets (author/editor/research/outline)
    # NOTE: Router should pass these as max_t per call to be the *governing* caps.
    "AUTHOR_MAX_TOKENS": _env_int("AUTHOR_MAX_TOKENS", 10000 if BOOK_KIND == "fiction" else 10000),
    "EDITOR_MAX_TOKENS": _env_int("EDITOR_MAX_TOKENS", 16384),
    "EDITOR_MIN_TOKENS": _env_int("EDITOR_MIN_TOKENS", 16384),
    "EDITOR_CONTEXT_TOKENS": _env_int("EDITOR_CONTEXT_TOKENS", 128_000),
    "EDITOR_SAFE_MARGIN_TOKENS": _env_int("EDITOR_SAFE_MARGIN_TOKENS", 1000),
    "EDITOR_MAX_INPUT_CHARS": _env_int("EDITOR_MAX_INPUT_CHARS", 300_000),
    "RESEARCH_MAX_TOKENS": _env_int("RESEARCH_MAX_TOKENS", 5500 if BOOK_KIND == "fiction" else 5500),
    "OUTLINE_MAX_TOKENS": _env_int("OUTLINE_MAX_TOKENS", 5500),

    # Word target tolerance (±)
    "CHAPTER_TOLERANCE_PCT": _env_float("CHAPTER_TOLERANCE_PCT", 0.18),

    # Research switch (selective routing later)
    "RESEARCH_ENABLED": book_spec["research_policy"]["enabled"],

    # Quality gates (toggle per run)
    "GATES_FICTION": {
        "value_shift": _env_bool("GATE_VALUE_SHIFT", True),
        "cliffhanger": _env_bool("GATE_CLIFFHANGER", True),
        "voice_enforce": _env_bool("GATE_VOICE", True),
        "cliche_sweep": _env_bool("GATE_CLICHE", True),
        "redundancy_sweep": _env_bool("GATE_REDUNDANCY", True),
        "antagonist_competence": _env_bool("GATE_ANTAGONIST", True),
    },
    "GATES_NONFICTION": {
        "claim_evidence": _env_bool("GATE_CLAIM_EVIDENCE", True),
        "logic_flow": _env_bool("GATE_LOGIC", True),
        "citation_hygiene": _env_bool("GATE_CITATION_HYGIENE", True),
        "redundancy_sweep": _env_bool("GATE_REDUNDANCY", True),
        "terminology_consistency": _env_bool("GATE_TERMINOLOGY", True),
    },

    # Prompt/version stamps (cache coherence)
    "PROMPT_VERSIONS": {
        "author": _env_str("PV_AUTHOR", "v4"),
        "editor": _env_str("PV_EDITOR", "v3"),
        "research": _env_str("PV_RESEARCH", "v2"),
        "router": _env_str("PV_ROUTER", "v2"),
        "outline": _env_str("PV_OUTLINE", f"v3_{GENRE_PACK}" if BOOK_KIND == "fiction" else f"v3_{STRUCTURE_PACK}"),
    },

    # Cache shaping knobs
    "CACHE_PAYLOAD_FIELDS": [
        "pv", "kind", "pack", "ch", "sec", "oc_min", "tw", "kb_hash", "bible_hash"
    ],
    "CACHE_PREFIXES": {
        "outline": "oln",
        "author_block": "ab",
        "author_tighten": "at",
        "editor": "ed",
        "research": "rs",
    },
}

# Compute effective gates for this run (union minimized to kind-relevant)
if BOOK_KIND == "fiction":
    EFFECTIVE_GATES = pipeline_config["GATES_FICTION"]
else:
    EFFECTIVE_GATES = pipeline_config["GATES_NONFICTION"]

# -------------------- Ultra-budget posture (optional) --------------------
if pipeline_config["ULTRA_BUDGET_MODE"]:
    # Trim tokens and relax optional gates
    pipeline_config["AUTHOR_MAX_TOKENS"] = max(1500, int(pipeline_config["AUTHOR_MAX_TOKENS"] * 0.5))
    pipeline_config["EDITOR_MAX_TOKENS"] = max(4000, int(pipeline_config["EDITOR_MAX_TOKENS"] * 0.5))
    pipeline_config["RESEARCH_MAX_TOKENS"] = max(800, int(pipeline_config["RESEARCH_MAX_TOKENS"] * 0.6))
    pipeline_config["OUTLINE_MAX_TOKENS"] = max(900, int(pipeline_config["OUTLINE_MAX_TOKENS"] * 0.6))
    pipeline_config["RUN_COST_CAP_USD"] = min(pipeline_config["RUN_COST_CAP_USD"], 2.00)
    pipeline_config["CHAPTER_COST_CAP_USD"] = min(pipeline_config["CHAPTER_COST_CAP_USD"], 0.15)

    if BOOK_KIND == "fiction":
        for k in ("cliche_sweep", "redundancy_sweep", "voice_enforce"):
            EFFECTIVE_GATES[k] = False
    else:
        for k in ("redundancy_sweep",):
            EFFECTIVE_GATES[k] = False

# -------------------- Paths & references --------------------
paths: Dict[str, str] = {
    "outline_json": "content/outline/outline.json",
    "outline_md": "content/outline/outline.md",
    "story_bible": "references/story_bible.json",         # fiction
    "knowledge_base": "references/knowledge_base.json",   # nonfiction
    "agent_calls": "logs/agent_calls.jsonl",
    "run_manifest": "logs/run_manifest.json",
    "cost_log": "logs/cost.json",
    "build_md": "build/book.md",
    "dist_epub": "dist/book.epub",
    "dist_pdf": "dist/book.pdf",
    "dist_docx": "dist/book.docx",
    "csl_style": _env_str("CSL_STYLE_PATH", "references/apa.csl"),  # used when CITATION_STYLE != none
}

# -------------------- Print a concise banner --------------------
print("[config] kind:", book_spec["kind"], "| pack:", book_spec.get("genre_pack") or book_spec.get("structure_pack"), "| style:", STYLE_PACK, "| cite:", CITATION_STYLE)
print("[config] MODEL_FAST:", pipeline_config["MODEL_ID_FAST"], "| MODEL_THINK:", pipeline_config["MODEL_ID_THINK"], "| TEMP:", pipeline_config["TEMPERATURE"])
print("[config] tokens(author/editor/research/outline):",
      pipeline_config["AUTHOR_MAX_TOKENS"], "/",
      pipeline_config["EDITOR_MAX_TOKENS"], "/",
      pipeline_config["RESEARCH_MAX_TOKENS"], "/",
      pipeline_config["OUTLINE_MAX_TOKENS"])
print("[config] cost caps (run/chapter): $", pipeline_config["RUN_COST_CAP_USD"], "/$", pipeline_config["CHAPTER_COST_CAP_USD"])
print("[config] outline schema:", book_spec["outline_schema"])
print("[config] gates:", EFFECTIVE_GATES)
print("[config] PROMPT_VERSIONS:", pipeline_config["PROMPT_VERSIONS"])

# -------------------- Expose globals the rest of the notebook will import --------------------
CONFIG = {
    "book_spec": book_spec,
    "pipeline": pipeline_config,
    "gates": EFFECTIVE_GATES,
    "paths": paths,
}

[config] kind: fiction | pack: thriller | style: conversational | cite: none
[config] MODEL_FAST: gpt-4o | MODEL_THINK: gpt-5 | TEMP: 0.2
[config] tokens(author/editor/research/outline): 10000 / 16384 / 5500 / 5500
[config] cost caps (run/chapter): $ 3.0 /$ 0.25
[config] outline schema: thriller_beats_v3
[config] gates: {'value_shift': True, 'cliffhanger': True, 'voice_enforce': True, 'cliche_sweep': True, 'redundancy_sweep': True, 'antagonist_competence': True}
[config] PROMPT_VERSIONS: {'author': 'v4', 'editor': 'v3', 'research': 'v2', 'router': 'v2', 'outline': 'v3_thriller'}


In [6]:
# Cell 3 — Utilities (deterministic, cache- & cost-aware, aligned to v2.0 Blueprint)

from __future__ import annotations

import os
import re
import json
import hashlib
import traceback
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Any, Dict, Iterable, Optional, Tuple
from datetime import datetime, timezone

# ----------------------------------------------------------------------
# Time & small helpers
# ----------------------------------------------------------------------

def now_utc_iso() -> str:
    """UTC timestamp in ISO-8601 Z form (deterministic string for logs/manifests)."""
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")

def _safe_label(s: str, max_len: int = 64) -> str:
    """Filesystem-safe label (for log filenames / cache prefixes)."""
    s = (s or "").strip().lower()
    s = re.sub(r"[^a-z0-9_\-\.]+", "_", s)
    return s[:max_len] or "run"

def _safe_int(v: Any, default: int) -> int:
    try:
        return int(v)
    except Exception:
        return int(default)

def _safe_float(v: Any, default: float) -> float:
    try:
        return float(v)
    except Exception:
        return float(default)

def stable_json_dumps(obj: Any) -> str:
    """Stable JSON string for hashing/cache keys."""
    return json.dumps(obj, sort_keys=True, separators=(",", ":"))

def sha1(s: str) -> str:
    return hashlib.sha1((s or "").encode("utf-8")).hexdigest()

# ----------------------------------------------------------------------
# File I/O (idempotent, UTF-8 safe)
# ----------------------------------------------------------------------

def read_text(p: os.PathLike | str) -> str:
    p = Path(p)
    return p.read_text(encoding="utf-8") if p.exists() else ""

def write_text(p: os.PathLike | str, s: str) -> None:
    p = Path(p)
    p.parent.mkdir(parents=True, exist_ok=True)
    p.write_text(s or "", encoding="utf-8")

def read_json(p: os.PathLike | str) -> Any:
    p = Path(p)
    return json.loads(p.read_text(encoding="utf-8")) if p.exists() else None

def write_json(p: os.PathLike | str, d: Any, pretty: bool = True) -> None:
    p = Path(p)
    p.parent.mkdir(parents=True, exist_ok=True)
    if pretty:
        s = json.dumps(d, indent=2, ensure_ascii=False)
    else:
        s = stable_json_dumps(d)
    p.write_text(s, encoding="utf-8")

def append_jsonl(p: os.PathLike | str, record: Dict[str, Any]) -> None:
    """Append a JSON line to a .jsonl log."""
    p = Path(p)
    p.parent.mkdir(parents=True, exist_ok=True)
    with p.open("a", encoding="utf-8") as f:
        f.write(stable_json_dumps(record) + "\n")

def has_file(p: os.PathLike | str) -> bool:
    return Path(p).is_file()

def stamp(p: os.PathLike | str, msg: str = "checkpoint") -> None:
    write_text(p, f"{msg}: {now_utc_iso()}")

def log_exc(label: str, e: Exception) -> None:
    """Capture structured error to logs/<label>_error.json."""
    Path("logs").mkdir(parents=True, exist_ok=True)
    write_json(
        f"logs/{_safe_label(label)}_error.json",
        {
            "type": type(e).__name__,
            "msg": str(e),
            "trace": traceback.format_exc(),
            "ts": now_utc_iso(),
        },
    )

# ----------------------------------------------------------------------
# Markdown sanitizer (Blueprint §3: Sanitizer; §6 Quality gates expect clean MD)
# ----------------------------------------------------------------------

def sanitize_md(t: str, *, ensure_final_newline: bool = True) -> str:
    """
    Normalize MD text:
      - Normalize newlines
      - Strip a single leading/trailing fenced block wrapper if present
      - Trim trailing spaces
      - Collapse ≥3 consecutive blank lines to 2
    """
    t = (t or "").replace("\r\n", "\n").replace("\r", "\n").strip()

    # Remove exactly one opening fence if it's the very first line
    t = re.sub(r"^```(?:\w+)?\n", "", t, flags=re.I)

    # Remove exactly one trailing fence if it's the very last line
    t = re.sub(r"\n```$", "", t)

    # Trim trailing spaces on each line
    t = "\n".join([ln.rstrip() for ln in t.split("\n")])

    # Collapse 3+ blank lines → 2
    t = re.sub(r"\n{3,}", "\n\n", t)

    if ensure_final_newline and (not t.endswith("\n")):
        t += "\n"
    return t.strip() if not ensure_final_newline else t

def enforce_heading_policy(md: str, *, chapter_heading_fmt: str = "## Chapter {number}: {title}") -> str:
    """
    Light enforcement for heading policy:
      - Ensure chapter headings start with '## '
      - Avoid accidental H1s; downshift '# ' → '## ' if it looks like a chapter heading
    """
    lines = (md or "").splitlines()
    out = []
    for i, ln in enumerate(lines):
        if re.match(r"^#\s+Chapter\s+\d+:", ln.strip(), flags=re.I):
            out.append(re.sub(r"^#\s+", "## ", ln))
        else:
            out.append(ln)
    return "\n".join(out)

# ----------------------------------------------------------------------
# Tokens & word counts (Blueprint §7: budget heuristics)
# ----------------------------------------------------------------------

CHARS_PER_TOKEN = _safe_int(os.getenv("CHARS_PER_TOKEN", 4), 4)

def count_words(t: str) -> int:
    return len((t or "").split())

def approx_tokens(t: str) -> int:
    """Heuristic: ~4 chars/token by default; overridable via CHARS_PER_TOKEN env."""
    t = t or ""
    return max(1, int(len(t) / max(1, CHARS_PER_TOKEN)))

# ----------------------------------------------------------------------
# Cache keys (Blueprint §7 Determinism: prompt/version stamps + bible/knowledge hash)
# ----------------------------------------------------------------------

def make_cache_key(prefix: str, payload: Dict[str, Any]) -> str:
    """
    Build a stable cache key:
      sha1( prefix + ":" + stable_json(payload) )
    Expected payload keys for authoring stages include:
      pv, kind, pack, ch, sec, oc_min, tw, kb_hash, bible_hash
    """
    s = f"{prefix}:{stable_json_dumps(payload)}"
    return sha1(s)

# ----------------------------------------------------------------------
# Cost tracking (Blueprint §7: reserve→reconcile envelopes; run/chap caps)
# ----------------------------------------------------------------------

class CostCapExceededException(Exception):
    pass

@dataclass
class CostEvent:
    t: str
    type: str                      # "reserve" | "reconcile"
    label: str
    est: Optional[float] = None
    act: Optional[float] = None
    delta: Optional[float] = None
    model: Optional[str] = None
    ptok: Optional[int] = None
    ctok: Optional[int] = None

class CostTracker:
    """
    Reserve → Reconcile accounting with optional per-chapter envelopes.
    - price table can be overridden via env: PRICE_<MODEL_UPPER>_IN / _OUT (USD per 1K tokens)
    - .reserve(label, amount)
    - .reconcile(label, est_amount, actual_amount)
    - .estimate_cost(model, prompt_tokens, completion_tokens)
    - .child_envelope(name, cap)  (per-chapter budget guard)
    Persists to logs/cost.json & logs/agent_calls.jsonl companions when used by caller.
    """

    # Baseline price table (conservative defaults; override via env if needed)
    PRICES = {
        "gpt-5-thinking":    {"in": 0.00150, "out": 0.0120},
        "gpt-5":             {"in": 0.00125, "out": 0.0100},
        "gpt-5-mini":        {"in": 0.00025, "out": 0.0020},
        "gpt-4o":            {"in": 0.00050, "out": 0.0015},
        "gpt-4o-mini":       {"in": 0.00015, "out": 0.0006},
        "gpt-4-turbo":       {"in": 0.01000, "out": 0.0300},
        "gpt-3.5-turbo":     {"in": 0.00050, "out": 0.0015},
        "default":           {"in": 0.00100, "out": 0.0030},
    }

    # env overrides (e.g., PRICE_GPT_4O_IN=0.00045 PRICE_GPT_4O_OUT=0.0013)
    for key in list(PRICES.keys()):
        if key == "default":
            continue
        env_key = key.upper().replace("-", "_").replace(".", "_")
        pin = os.getenv(f"PRICE_{env_key}_IN")
        pout = os.getenv(f"PRICE_{env_key}_OUT")
        try:
            if pin is not None and pout is not None:
                PRICES[key] = {"in": float(pin), "out": float(pout)}
        except Exception:
            pass

    def __init__(self, cap_usd: float, *, autosave: bool = True, log_path: str = "logs/cost.json"):
        self.cap = float(cap_usd)
        self.spent = 0.0
        self.events: list[CostEvent] = []
        self.autosave = autosave
        self.log_path = log_path
        self._pending: Dict[str, float] = {}  # label -> last_estimate reserved

        # ensure logs dir
        Path(self.log_path).parent.mkdir(parents=True, exist_ok=True)

    # ---- pricing helpers ----
    def _price_record(self, model: str) -> Dict[str, float]:
        if model in self.PRICES:
            return self.PRICES[model]
        # longest-prefix match
        for k in sorted(self.PRICES, key=len, reverse=True):
            if k != "default" and model.startswith(k):
                return self.PRICES[k]
        return self.PRICES["default"]

    def estimate_cost(self, model: str, prompt_tokens: int, completion_tokens: int) -> float:
        p = self._price_record(model)
        return (prompt_tokens / 1000.0) * p["in"] + (completion_tokens / 1000.0) * p["out"]

    # ---- cap checks ----
    def _can(self, amount: float) -> bool:
        return (self.spent + float(amount)) <= (self.cap + 1e-9)

    # ---- main API ----
    def reserve(self, label: str, amount_usd: float) -> None:
        """Reserve budget before a call; raises if it would exceed run cap."""
        amt = float(amount_usd)
        if not self._can(amt):
            raise CostCapExceededException(
                f"Cap hit before {label}: need {amt:.4f}, left {(self.cap - self.spent):.4f}"
            )
        self.spent += amt
        self._pending[label] = amt
        ev = CostEvent(t=now_utc_iso(), type="reserve", label=label, est=round(amt, 6))
        self.events.append(ev)
        if self.autosave:
            self.save()

    # alias for backward compatibility
    spend = reserve

    def reconcile(self, label: str, actual_usd: float) -> None:
        """Reconcile actual cost against last reserve for this label; only positive deltas increase spend."""
        est = float(self._pending.get(label, 0.0))
        act = float(actual_usd)
        delta = max(0.0, act - est)
        if delta > 0 and not self._can(delta):
            raise CostCapExceededException(f"Cap hit reconciling {label} +{delta:.4f}")
        self.spent += delta
        ev = CostEvent(
            t=now_utc_iso(), type="reconcile", label=label,
            est=round(est, 6), act=round(act, 6), delta=round(delta, 6)
        )
        self.events.append(ev)
        # clear pending for label (one-shot)
        self._pending.pop(label, None)
        if self.autosave:
            self.save()

    # alias for backward compatibility
    recon = reconcile

    # ---- envelopes ----
    def child_envelope(self, name: str, cap_usd: float) -> "ChapterEnvelope":
        return ChapterEnvelope(parent=self, name=name, cap_usd=cap_usd)

    # ---- persistence ----
    def summary(self) -> Dict[str, Any]:
        return {
            "total_spent_usd": round(self.spent, 6),
            "run_cap_usd": round(self.cap, 6),
            "remaining_usd": round(max(0, self.cap - self.spent), 6),
            "events": len(self.events),
            "ts": now_utc_iso(),
        }

    def to_dict(self) -> Dict[str, Any]:
        return {
            "summary": self.summary(),
            "events": [asdict(e) for e in self.events],
        }

    def save(self) -> None:
        write_json(self.log_path, self.to_dict(), pretty=True)

class ChapterEnvelope:
    """
    Per-chapter/per-stage budget guard that also respects the parent run cap.
    Usage:
        ch = tracker.child_envelope("ch12", cap_usd=0.25)
        ch.reserve("author_block", 0.08)
        # ... do call ...
        ch.reconcile("author_block", actual_usd=0.07)
    """
    def __init__(self, parent: CostTracker, name: str, cap_usd: float):
        self.parent = parent
        self.name = _safe_label(name)
        self.cap = float(cap_usd)
        self.spent = 0.0

    def _can(self, amount: float) -> bool:
        return (self.spent + float(amount)) <= (self.cap + 1e-9)

    def reserve(self, label: str, amount_usd: float) -> None:
        if not self._can(amount_usd):
            raise CostCapExceededException(
                f"[{self.name}] envelope hit before {label}: need {amount_usd:.4f}, left {(self.cap - self.spent):.4f}"
            )
        self.spent += float(amount_usd)
        # Delegate to parent run-level tracker as well
        self.parent.reserve(f"{self.name}:{label}", amount_usd)

    spend = reserve

    def reconcile(self, label: str, actual_usd: float) -> None:
        # Parent handles delta/cap; we track envelope-only delta for accounting symmetry
        est = float(self.parent._pending.get(f"{self.name}:{label}", 0.0))
        act = float(actual_usd)
        delta = max(0.0, act - est)
        if delta > 0 and not self._can(delta):
            raise CostCapExceededException(
                f"[{self.name}] envelope hit on reconcile {label} +{delta:.4f}"
            )
        self.spent += delta
        self.parent.reconcile(f"{self.name}:{label}", actual_usd)

    recon = reconcile

# ----------------------------------------------------------------------
# Agent logging & run manifest (Blueprint §8)
# ----------------------------------------------------------------------

def log_agent_call(payload: Dict[str, Any], path: str = "logs/agent_calls.jsonl") -> None:
    """Append an agent call record with usage & cache flags."""
    record = dict(payload)
    record.setdefault("ts", now_utc_iso())
    append_jsonl(path, record)

def update_run_manifest(delta: Dict[str, Any], path: str = "logs/run_manifest.json") -> Dict[str, Any]:
    """
    Merge-in updates to run manifest (title, kind/pack, chapters done, cost, gate pass rates).
    Call iteratively; we keep previous keys unless overwritten.
    """
    cur = read_json(path) or {}
    # Shallow merge at top-level; caller can manage nested dicts explicitly
    cur.update(delta or {})
    cur.setdefault("updated_at", now_utc_iso())
    cur["updated_at"] = now_utc_iso()
    write_json(path, cur, pretty=True)
    return cur

# ----------------------------------------------------------------------
# Length targeting (Blueprint §5 Orchestration: compute chapter/section targets)
# ----------------------------------------------------------------------

def compute_chapter_word_targets(
    book_spec: Dict[str, Any],
    cfg: Dict[str, Any],
    sections_count: Optional[int] = None,
    ch_num: Optional[int] = None,
) -> Tuple[int, int, int, int]:
    """
    Derive per-chapter and per-section word targets.

    Inputs:
      - book_spec["target_length_words"], book_spec["chapters"]
      - optional book_spec["chapter_target_words"], ["chapter_weights"]
      - chapter_template.section_weights (if present)
      - cfg["CHAPTER_TOLERANCE_PCT"], optional CHAPTER_MIN_WORDS/CHAPTER_MAX_WORDS
      - cfg["AUTHOR_SECTION_TARGET_WORDS"] (override per-section)

    Returns:
      (per_chapter, min_words, max_words, per_section_target)
    """
    # Total words and chapter count
    total = _safe_int(book_spec.get("target_length_words", 95_000), 95_000)
    n_ch = max(1, _safe_int(book_spec.get("chapters", 18), 18))

    # Base per-chapter target (explicit override wins; else total/n)
    per = _safe_int(book_spec.get("chapter_target_words", total // n_ch), total // n_ch)

    # Chapter-specific weight
    weights = book_spec.get("chapter_weights") or []
    if ch_num and 1 <= ch_num <= len(weights):
        try:
            w = float(weights[ch_num - 1] or 1.0)
            per = max(600, int(per * w))
        except Exception:
            pass

    # Tolerance band
    tol = float(cfg.get("CHAPTER_TOLERANCE_PCT", 0.18))
    min_w = int(per * (1 - tol))
    max_w = int(per * (1 + tol))

    # Explicit min/max overrides from cfg
    if "CHAPTER_MIN_WORDS" in cfg:
        min_w = _safe_int(cfg["CHAPTER_MIN_WORDS"], min_w)
    if "CHAPTER_MAX_WORDS" in cfg:
        max_w = _safe_int(cfg["CHAPTER_MAX_WORDS"], max_w)

    # Sections
    tmpl = (book_spec.get("chapter_template") or {})
    sec_ws = tmpl.get("section_weights") or []
    if sections_count is None:
        sections_count = len(sec_ws) if sec_ws else len(tmpl.get("sections") or []) or 5

    if sec_ws and len(sec_ws) == sections_count:
        ssum = sum(max(0.01, float(x)) for x in sec_ws)
        # nominal target for the *largest* section weight; author pass can distribute within
        per_section_nominal = max(400, int(per * (max(sec_ws) / ssum)))
    else:
        per_section_nominal = max(600, per // max(1, sections_count))

    # Optional explicit override
    per_section = _safe_int(cfg.get("AUTHOR_SECTION_TARGET_WORDS", per_section_nominal), per_section_nominal)

    return per, min_w, max_w, per_section

print("[utils] ready — IO/sanitize/cache/cost/targets initialized")

[utils] ready — IO/sanitize/cache/cost/targets initialized


In [7]:
# Cell 4 — Agents Wiring (adds author_tighten & gate_check stages; deterministic caching; selective research; budget-aware)

from __future__ import annotations

import os, re, json, time
from pathlib import Path
from typing import Any, Dict, Optional, Tuple

# Assumes from previous cells:
# - client (OpenAI), REQUEST_OPTS (dict), now_utc_iso, approx_tokens, sha1, stable_json_dumps,
#   make_cache_key, sanitize_md, write_json, write_text, read_json, append_jsonl, log_agent_call,
#   CostTracker, ChapterEnvelope, CONFIG
#
# Available in CONFIG:
#   CONFIG["book_spec"], CONFIG["pipeline"], CONFIG["gates"], CONFIG["paths"]

BOOK = CONFIG["book_spec"]
PL = CONFIG["pipeline"]
GATES = CONFIG["gates"]
PATHS = CONFIG["paths"]

# -------------------------- Dynamic system prompts (aligned with v2.0) --------------------------

def _fmt_gate_state(d: Dict[str, bool]) -> str:
    if not d:
        return "(none)"
    on = [k for k, v in d.items() if v]
    off = [k for k, v in d.items() if not v]
    return f"ON: {', '.join(on) if on else '—'} | OFF: {', '.join(off) if off else '—'}"

def pmrouter_sys_prompt() -> str:
    return f"""
You are PMRouter — the orchestration, budget, and logging layer for an agentic book factory.

NORTH-STAR
- Deterministic, resumable, low-cost pipeline to generate professional-quality books (fiction & nonfiction).

GUARDRAILS
- Cost envelopes: reserve → reconcile. Respect run cap ${PL.get('RUN_COST_CAP_USD', 3.00):.2f} and per-chapter caps.
- Determinism: cache by stage prefix + PROMPT_VERSION + bible/knowledge hash; reuse cached artifacts.
- Persist everything: outline/drafts/edits/claims/notes to the canonical paths in CONFIG.

WORKFLOW
1) Outline → 2) Per-Chapter loop (Block → Gate → Tighten → Editor → Selective Research → Merge) → 3) Weave → 4) Export.

SELECTIVE RESEARCH
- Only resolve editor-tagged [n] or flagged claims; produce JSON summaries/citations; do not inject prose directly.

QUALITY GATES (toggle per CONFIG)
- Fiction: {_fmt_gate_state(PL.get('GATES_FICTION', {}))}
- Nonfiction: {_fmt_gate_state(PL.get('GATES_NONFICTION', {}))}

LOGGING
- Append a jsonl record per call with: agent, model, label, cache_hit, usage, est_cost, actual_cost, duration.
- Retry once on transient failure, then continue non-blocking and log.

You enforce STOP if caps are exceeded and persist partials for resumption.
""".strip()

def author_sys_prompt() -> str:
    voice = (BOOK.get("style_guide") or {}).get("voice", "")
    formatting = (BOOK.get("style_guide") or {}).get("formatting", "")
    kind = BOOK.get("kind")
    pack = BOOK.get("genre_pack") or BOOK.get("structure_pack")
    return f"""
You are AuthorAgent.

TASK
- Draft scenes (fiction) or sections (nonfiction) from outline inputs with minimal exposition, high clarity.
- Respect style & tone. Use [n] markers where external support is required (facts, stats, quotes).

STYLE
- Kind: {kind}; Pack: {pack}; Voice: {voice}; Formatting: {formatting}
- Keep paragraphs short and cinematic (fiction) or argumentatively crisp (nonfiction).
- Do NOT fabricate citations or sources; leave [n] for ResearchAgent.

OUTPUT
- Clean Markdown only. No YAML frontmatter. No triple backticks.
- For the last section in a chapter:
  - Fiction → end with a hook/cliffhanger if enabled.
  - Nonfiction → end with a concrete takeaway/bridge.

DETERMINISM
- Honor provided word targets; stay within tolerance.
- Avoid randomness beyond temperature; no meta commentary.

NOTE
- You may also be asked to perform a LIGHT 'TIGHTEN' pass on an already-drafted chapter. In that case:
  - Improve cadence/clarity; remove redundancy; preserve facts/continuity; do not change the chapter title/number.
""".strip()

def editor_sys_prompt() -> str:
    cit = BOOK.get("citation_style", "none")
    return f"""
You are EditorAgent.

TASK
- Polish AuthorAgent text: enforce tone/style/heading policy, remove redundancy, improve flow.
- Insert [n] markers wherever claims require support.
- Emit three artifacts:
  1) <DRAFT_EDITED_MD>…</DRAFT_EDITED_MD> — sanitized Markdown
  2) <CLAIMS_REPORT_JSON>{{"claims":[{{"id":1,"marker":"[1]","text":"…","category":"tech|ops|science|history|quote","confidence":"low|med|high"}}]]}}</CLAIMS_REPORT_JSON>
  3) <CONTINUITY_OR_ARGUMENT_NOTES_MD>…</CONTINUITY_OR_ARGUMENT_NOTES_MD>

QUALITY GATES
- Fiction gates (when enabled): value_shift per scene; chapter ending hook; voice enforcement; cliché & redundancy sweeps; antagonist competence.
- Nonfiction gates (when enabled): claim-evidence mapping; logical flow; citation hygiene ({cit}); redundancy & terminology consistency.

RULES
- Zero uncited direct quotes. Use [n] markers instead.
- Preserve chapter/section structure; do not re-outline.
- Keep Markdown clean (no code fences unless content is code).

OUTPUT
- Wrap exactly as specified with the three tags. Nothing else.

ADDITIONAL STAGE
- You may also be queried for small JSON 'gate_check' scorers; respond succinctly with strict JSON only when requested.
""".strip()

def research_sys_prompt() -> str:
    style = BOOK.get("citation_style", "none")
    return f"""
You are ResearchAgent.

TASK
- Resolve [n] markers or claim items with concise, defensible summaries (max 3 bullets each).
- Provide minimal citation metadata sufficient for later formatting (style: {style.upper()} if applicable).

SOURCES
- Prefer reputable, open sources: peer-reviewed articles, official stats, filings, government/NGO reports.
- Disallow: leaked/classified materials, unsafe or illegal content.

FALLBACK
- If no reliable source is found, DO NOT guess. Instead return:
  - research_question: the refined question
  - recommended_sources: where to look next (types/databases)

OUTPUT (strict JSON list of items)
[
  {{"claim_id": 1, "marker":"[1]", "summary": "…", "citations": [{{"title":"…","url":"…","year":2022,"note":"…"}}]}}
]
No prose, no Markdown, JSON only.
""".strip()

# -------------------------- Low-level chat (OpenAI >=1.x; robust & text-first) --------------------------

def _extract_text_from_msg(msg) -> str:
    # Prefer parsed JSON if present
    parsed = getattr(msg, "parsed", None)
    if parsed is not None:
        if isinstance(parsed, (dict, list)):
            try:
                return json.dumps(parsed, ensure_ascii=False)
            except Exception:
                pass
        if isinstance(parsed, str):
            return parsed.strip()

    # Normal .content
    content = getattr(msg, "content", None)
    if isinstance(content, str) and content.strip():
        return content.strip()
    if isinstance(content, (list, tuple)) and content:
        p0 = content[0]
        if isinstance(p0, dict):
            for cand in ("text", "content", "value"):
                v = p0.get(cand)
                if isinstance(v, str) and v.strip():
                    return v.strip()
                if isinstance(v, dict) and isinstance(v.get("value"), str):
                    return v["value"].strip()
        else:
            for cand in ("text", "content", "value"):
                v = getattr(p0, cand, None)
                if isinstance(v, str) and v.strip():
                    return v.strip()
                if isinstance(v, dict) and isinstance(v.get("value"), str):
                    return v["value"].strip()
    return ""

def _extract_text_from_response(r) -> str:
    t = getattr(r, "output_text", None)
    if isinstance(t, str) and t.strip():
        return t.strip()

    # chat.completions
    try:
        choices = getattr(r, "choices", None)
        if choices and len(choices) > 0:
            msg = getattr(choices[0], "message", None)
            if msg is not None:
                s = _extract_text_from_msg(msg)
                if s:
                    return s
            if isinstance(choices[0], dict):
                maybe = choices[0].get("message", {}).get("content")
                if isinstance(maybe, str) and maybe.strip():
                    return maybe.strip()
    except Exception:
        pass

    # responses
    for attr in ("output", "outputs", "content"):
        maybe = getattr(r, attr, None)
        if isinstance(maybe, str) and maybe.strip():
            return maybe.strip()
        if isinstance(maybe, (list, tuple)) and maybe:
            p0 = maybe[0]
            if isinstance(p0, dict):
                v = p0.get("text") or p0.get("content") or p0.get("value")
                if isinstance(v, str) and v.strip():
                    return v.strip()
    return ""

def chat(model: str, sysm: str, userm: str, temp: float = 0.2, max_t: int = 800, *, force_json: bool = False) -> Tuple[str, Dict[str, Any]]:
    if not model:
        raise ValueError("Model ID is required")

    # Model output caps (best-effort)
    caps = {
        "gpt-4o":      {"out": 16384},
        "gpt-4o-mini": {"out": 8192},
    }
    m_lower = model.lower()
    out_cap = 16384
    if "gpt-4o-mini" in m_lower:
        out_cap = caps["gpt-4o-mini"]["out"]
    elif "gpt-4o" in m_lower:
        out_cap = caps["gpt-4o"]["out"]

    max_completion_tokens = min(int(max_t or 0), out_cap) if max_t else None

    base_messages = [
        {"role": "system", "content": sysm},
        {"role": "user", "content": "STRICT JSON OUTPUT ONLY.\n" + userm if force_json else userm},
    ]

    # 1) Try chat.completions
    kwargs = {}
    if max_completion_tokens:
        kwargs["max_completion_tokens"] = max_completion_tokens
    if temp is not None:
        kwargs["temperature"] = float(temp)
    kwargs.setdefault("modalities", ["text"])
    try:
        r = client.chat.completions.create(model=model, messages=base_messages, **kwargs, **(REQUEST_OPTS or {}))
        text = _extract_text_from_response(r)
        usage = getattr(r, "usage", None)
        usage_dict = {
            "prompt_tokens": getattr(usage, "prompt_tokens", None) or getattr(usage, "input_tokens", None),
            "completion_tokens": getattr(usage, "completion_tokens", None) or getattr(usage, "output_tokens", None),
            "via": "chat.completions",
        }
        if force_json and not (text or "").strip().startswith(("{", "[")):
            # fallback to Responses if JSON not honored
            raise RuntimeError("non_json_output")
        return text, usage_dict
    except Exception:
        pass

    # 2) Fallback: responses API (text-only)
    kwargs = {}
    if max_completion_tokens:
        kwargs["max_output_tokens"] = max_completion_tokens
    r = client.responses.create(model=model, input=base_messages, **kwargs, **(REQUEST_OPTS or {}))
    text = _extract_text_from_response(r)
    usage = getattr(r, "usage", None)
    usage_dict = {
        "prompt_tokens": getattr(usage, "input_tokens", None) or getattr(usage, "prompt_tokens", None),
        "completion_tokens": getattr(usage, "output_tokens", None) or getattr(usage, "completion_tokens", None),
        "via": "responses",
    }
    if force_json and not (text or "").strip().startswith(("{", "[")):
        raise RuntimeError("Model did not return JSON.")
    return text, usage_dict

# -------------------------- Agent class (cache & budget aware) --------------------------

def _bible_or_kb_hash() -> str:
    """
    Returns a content hash for the Story Bible (fiction) or Knowledge Base (nonfiction),
    used to invalidate caches when continuity/knowledge changes.
    """
    p = Path(PATHS["story_bible"]) if BOOK.get("kind") == "fiction" else Path(PATHS["knowledge_base"])
    if p.exists():
        try:
            return sha1(p.read_text(encoding="utf-8"))
        except Exception:
            return "nohash"
    return "nohash"

def _prefix_for(stage: str) -> str:
    """
    Stable, short cache key prefixes per stage. Allows custom overrides via PL['CACHE_PREFIXES'].
    """
    default = {
        "outline":        "ol",
        "author_block":   "ab",
        "author_tighten": "at",
        "editor":         "ed",
        "research":       "rs",
        "gate_check":     "gc",
    }
    return PL.get("CACHE_PREFIXES", {}).get(stage, default.get(stage, stage[:2]))

class Agent:
    """
    Thin LLM wrapper with:
    - deterministic caching keyed by stage + prompt-version + bible/KB hash + payload
    - reserve→reconcile cost accounting via CostTracker / ChapterEnvelope
    - robust text extraction & optional strict-JSON mode

    Known stages: "outline", "author_block", "author_tighten", "editor", "research", "gate_check"
    """
    def __init__(self, name: str, sysm_fn, model: str, temp: float, max_t: int, cache_dir: str, tracker: Optional[CostTracker]):
        self.name = name
        self._sysm_fn = sysm_fn  # callable producing dynamic system prompt
        self.model = model
        self.temp = temp
        self.max_t = max_t
        self.cache = Path(cache_dir)
        self.cache.mkdir(parents=True, exist_ok=True)
        self.tracker = tracker

    @property
    def sysm(self) -> str:
        return self._sysm_fn()

    def run(
        self,
        label: str,
        prompt: str,
        *,
        stage: str,                      # "outline" | "author_block" | "author_tighten" | "editor" | "research" | "gate_check"
        cache_payload: Optional[Dict[str, Any]] = None,
        cache_key: Optional[str] = None,
        max_t: Optional[int] = None,
        force_json: bool = False,
        envelope: Optional[ChapterEnvelope] = None,
    ) -> Dict[str, Any]:
        """
        Execute a call with deterministic caching and reserve→reconcile accounting.
        """
        effective_max_t = int(max_t if max_t is not None else self.max_t)

        # ---------------- Cache key (Blueprint §7) ----------------
        if cache_key is None:
            pv_map = PL.get("PROMPT_VERSIONS", {})
            if   "author"   in stage: pv = pv_map.get("author")
            elif "editor"   in stage: pv = pv_map.get("editor")
            elif "research" in stage: pv = pv_map.get("research")
            else:                     pv = pv_map.get("outline")

            payload = {
                "pv": pv,
                "kind": BOOK.get("kind"),
                "pack": BOOK.get("genre_pack") or BOOK.get("structure_pack"),
                # Include whichever reference hash applies (fiction vs nonfiction)
                "bible_hash":   _bible_or_kb_hash() if BOOK.get("kind") == "fiction"    else None,
                "kb_hash":      _bible_or_kb_hash() if BOOK.get("kind") == "nonfiction" else None,
                "stage": stage,
            }
            if cache_payload:
                payload.update(cache_payload)
            cache_key = make_cache_key(_prefix_for(stage), payload)

        safe = re.sub(r"[^A-Za-z0-9._-]+", "_", label or "step")
        cpath = self.cache / f"{safe}_{cache_key}.json"

        # ---------------- Begin log ----------------
        begin_rec = {
            "t": now_utc_iso(),
            "agent": self.name,
            "stage": stage,
            "model": self.model,
            "label": label,
            "event": "begin",
            "cache_key": cache_key,
            "max_t": effective_max_t,
            "force_json": bool(force_json),
        }
        log_agent_call(begin_rec)

        # ---------------- Cache read ----------------
        if cpath.exists():
            try:
                d = json.loads(cpath.read_text(encoding="utf-8"))
                cached_txt = (d.get("text") or "").strip()
                if cached_txt:
                    log_agent_call({
                        "t": now_utc_iso(), "agent": self.name, "stage": stage, "model": self.model,
                        "label": label, "event": "cache_hit", "cache_path": cpath.as_posix()
                    })
                    return {"text": cached_txt, "cached": True, "usage": d.get("usage"), "est_cost": 0.0}
                # purge empty
                cpath.unlink(missing_ok=True)
                log_agent_call({
                    "t": now_utc_iso(), "agent": self.name, "stage": stage, "model": self.model,
                    "label": label, "event": "cache_purged_empty", "cache_path": cpath.as_posix()
                })
            except Exception:
                log_agent_call({
                    "t": now_utc_iso(), "agent": self.name, "stage": stage, "model": self.model,
                    "label": label, "event": "cache_corrupt", "cache_path": cpath.as_posix()
                })

        # ---------------- Reserve cost ----------------
        pt = approx_tokens(self.sysm) + approx_tokens(prompt)
        ct_budget = int(effective_max_t * 0.9)
        est = self.tracker.estimate_cost(self.model, pt, ct_budget) if self.tracker else 0.0

        if envelope:
            envelope.reserve(f"{label}[reserve]", est)
        elif self.tracker:
            self.tracker.reserve(f"{label}[reserve]", est)

        # ---------------- Call LLM ----------------
        t0 = time.time()
        try:
            txt, usage = chat(self.model, self.sysm, prompt, self.temp, effective_max_t, force_json=force_json)
            txt = sanitize_md(txt)
        except Exception as e:
            log_agent_call({
                "t": now_utc_iso(), "agent": self.name, "stage": stage, "model": self.model,
                "label": label, "event": "error", "error": str(e)
            })
            raise

        dur = round(time.time() - t0, 3)
        log_agent_call({
            "t": now_utc_iso(), "agent": self.name, "stage": stage, "model": self.model,
            "label": label, "event": "llm_ok", "duration_s": dur, "usage": usage
        })

        # ---------------- Reconcile cost ----------------
        if usage and (self.tracker or envelope):
            actual = (self.tracker or envelope.parent).estimate_cost(  # type: ignore[union-attr]
                self.model,
                usage.get("prompt_tokens") or pt,
                usage.get("completion_tokens") or ct_budget,
            )
            if envelope:
                envelope.reconcile(f"{label}[actual]", actual)
            else:
                self.tracker.reconcile(f"{label}[actual]", actual)  # type: ignore[union-attr]

        # ---------------- Persist cache ----------------
        payload = {
            "text": txt or "",
            "usage": usage,
            "ts": now_utc_iso(),
            "model": self.model,
            "temp": self.temp,
            "stage": stage,
        }
        cpath.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")

        return {"text": txt or "", "cached": False, "usage": usage, "est_cost": est}

# -------------------------- PMRouter (wires models, temps, caps from config) --------------------------

class PMRouter:
    def __init__(self, tracker: CostTracker):
        self.tracker = tracker
        model_fast  = PL.get("MODEL_ID_FAST", "gpt-4o-mini")
        model_think = PL.get("MODEL_ID_THINK", "gpt-5-thinking")

        temp = float(PL.get("TEMPERATURE", 0.2))
        temp_author   = float(os.getenv("AUTHOR_TEMP",   PL.get("AUTHOR_TEMPERATURE",   temp)))
        temp_editor   = float(os.getenv("EDITOR_TEMP",   PL.get("EDITOR_TEMPERATURE",   temp)))
        temp_research = float(os.getenv("RESEARCH_TEMP", PL.get("RESEARCH_TEMPERATURE", 0.2)))

        self.author = Agent(
            "Author",
            author_sys_prompt,
            model_fast,                              # cheapest capable by default
            temp_author,
            int(PL.get("AUTHOR_MAX_TOKENS", 7000)),
            "cache",
            tracker,
        )
        self.editor = Agent(
            "Editor",
            editor_sys_prompt,
            model_fast,                              # can be swapped to THINK if needed
            temp_editor,
            int(PL.get("EDITOR_MAX_TOKENS", 16384)),
            "cache",
            tracker,
        )
        self.research = (
            Agent(
                "Research",
                research_sys_prompt,
                model_fast,
                temp_research,
                int(PL.get("RESEARCH_MAX_TOKENS", 1500)),
                "cache",
                tracker,
            )
            if PL.get("RESEARCH_ENABLED", True)
            else None
        )

        self.router_prompt = pmrouter_sys_prompt()
        Path("logs").mkdir(exist_ok=True)

    # Optional helper to pick a model dynamically (cost-aware)
    def pick_model(self, stage: str) -> str:
        # Upgrade to THINK for heavy editor passes when configured
        if stage == "editor" and int(PL.get("EDITOR_MAX_TOKENS", 16384)) > 12000:
            return PL.get("MODEL_ID_THINK", PL.get("MODEL_ID_FAST", "gpt-4o-mini"))
        return PL.get("MODEL_ID_FAST", "gpt-4o-mini")

    def log(self, label: str, meta: Optional[Dict[str, Any]] = None) -> None:
        meta = dict(meta or {})
        meta["label"] = label
        meta["time"] = now_utc_iso()
        fname = f"{now_utc_iso().replace(':','').replace('-','')}_{re.sub(r'[^A-Za-z0-9._-]+','_',label)}.json"
        p = Path("logs") / fname
        p.parent.mkdir(parents=True, exist_ok=True)
        write_json(p, meta)

print("Agents ready (Cell 4)")

Agents ready (Cell 4)


In [8]:
# Cell 5 — Style Guide & Glossary (pack-aware; POV voice cards; citation posture)

from __future__ import annotations
from pathlib import Path
from typing import Dict, Any, List

# Assumes from previous cells:
# - CONFIG dict with: book_spec, pipeline, paths
# - write_text, write_json

BOOK: Dict[str, Any] = CONFIG["book_spec"]
PL: Dict[str, Any] = CONFIG["pipeline"]
PATHS: Dict[str, str] = CONFIG["paths"]

STYLE_DIR = Path("content/style")
STYLE_DIR.mkdir(parents=True, exist_ok=True)

# -------------------- helpers --------------------

def _parse_terminology(items) -> List[Dict[str, str]]:
    """Turn ['term: def', 'sensor: ...'] into [{'term': 'term','definition':'def'}, ...]."""
    parsed: List[Dict[str, str]] = []
    for raw in items or []:
        if not isinstance(raw, str):
            continue
        if ":" in raw:
            term, definition = raw.split(":", 1)
            parsed.append({"term": term.strip(), "definition": definition.strip()})
        else:
            parsed.append({"term": raw.strip(), "definition": ""})
    return parsed

def _mk_voice_cards(spec: Dict[str, Any]) -> Dict[str, Any]:
    """
    Build POV voice cards for fiction (Blueprint §6: Voice Enforcement).
    Uses story_assets.characters if available; otherwise returns a single generic card.
    """
    if spec.get("kind") != "fiction":
        return {}
    cards = {}
    chars = (spec.get("story_assets", {}) or {}).get("characters", []) or []
    if not chars:
        cards["POV_Default"] = {
            "pov": "third_limited",
            "voice_summary": "Neutral cinematic voice; short sentences; no overt authorial intrusion.",
            "diction_cues": ["concrete nouns", "active verbs", "minimal adverbs"],
            "tempo_cues": ["tight pacing", "short paragraphs in action scenes"],
            "taboos": ["exposition dumps", "breaking POV", "genre clichés"],
        }
        return cards

    for ch in chars:
        name = ch.get("name") or "Unnamed"
        traits = ", ".join(ch.get("traits", [])) if isinstance(ch.get("traits"), list) else (ch.get("traits") or "")
        cards[name] = {
            "pov": "third_limited",
            "voice_summary": f"POV aligned to {name}'s worldview; traits: {traits}.",
            "diction_cues": ["favor concrete over abstract", "emotion shown via action beats"],
            "tempo_cues": ["vary cadence with tension", "shorter lines in high-stakes beats"],
            "taboos": ["head-hopping", "info-dumps", "stereotypes"],
        }
    return cards

def _citation_posture(spec: Dict[str, Any]) -> Dict[str, Any]:
    """Nonfiction citation guidance based on selected style."""
    style = (spec.get("citation_style") or "none").lower()
    if spec.get("kind") == "fiction" or style == "none":
        return {"style": "none", "examples": [], "notes": "Fiction: no formal citations; use [n] markers for realism-sensitive facts if needed."}
    if style == "apa":
        return {
            "style": "APA",
            "examples": [
                "In-text: (Author, Year)",
                "Ref: Author, A. A. (Year). Title. Publisher. https://doi.org/…"
            ],
            "notes": "Use author-year; page/para for quotes. Build bib via ResearchAgent → citations pipeline."
        }
    if style == "chicago":
        return {
            "style": "Chicago",
            "examples": [
                "Footnote: 1. Author, Title (City: Publisher, Year), page.",
                "Bibliography: Author. Title. City: Publisher, Year."
            ],
            "notes": "Prefer notes-bibliography; ensure footnote markers align with [n]."
        }
    if style == "ieee":
        return {
            "style": "IEEE",
            "examples": [
                "In-text: [1]",
                "Ref: [1] A. Author, “Article,” Journal, vol., no., pp., Year."
            ],
            "notes": "Numeric ordering by first appearance; maintain stable mapping from [n] → [#]."
        }
    # fallback
    return {"style": style.upper(), "examples": [], "notes": "Follow pack default; ensure unambiguous source mapping."}

# -------------------- main generator --------------------

def gen_style(spec: Dict[str, Any]) -> Dict[str, str]:
    sg = (spec or {}).get("style_guide", {}) or {}
    voice = sg.get("voice", "—")
    formatting = sg.get("formatting", "—")
    citations_label = sg.get("citations", "—")
    terminology_items = sg.get("terminology", [])
    tone = (spec or {}).get("tone", "—")

    glossary = _parse_terminology(terminology_items)
    packs = {
        "kind": spec.get("kind"),
        "genre_pack": spec.get("genre_pack"),
        "structure_pack": spec.get("structure_pack"),
        "style_pack": spec.get("style_pack"),
        "citation_style": spec.get("citation_style", "none"),
        "outline_schema": spec.get("outline_schema"),
    }

    # Citation posture for nonfiction
    citation_guidance = _citation_posture(spec)

    # POV voice cards (fiction only)
    voice_cards = _mk_voice_cards(spec)

    # Build Style Guide Markdown
    lines = [
        "# Style Guide",
        "",
        "## Overview",
        f"- **Kind:** {packs['kind']}",
        f"- **Pack:** {packs.get('genre_pack') or packs.get('structure_pack')}",
        f"- **Style Pack:** {packs['style_pack']}",
        f"- **Outline Schema:** {packs['outline_schema']}",
        "",
        "## Tone & Voice",
        f"- **Voice:** {voice}",
        f"- **Tone:** {tone}",
        "",
        "## Formatting",
        f"- {formatting}",
        "- Headings policy: use **H2 for chapters**, **H3 for sections**; avoid H1.",
        "- Keep paragraphs short. Prefer lists for dense information.",
        "",
        "## Citations",
        f"- Declared: {citations_label}",
    ]

    if citation_guidance["style"] != "none":
        lines += [
            f"- **Style:** {citation_guidance['style']}",
            "- **Examples:**",
        ]
        for ex in citation_guidance["examples"]:
            lines.append(f"  - {ex}")
        lines.append(f"- **Notes:** {citation_guidance['notes']}")
    else:
        lines.append("- Fiction posture or citations disabled; use `[n]` markers for any real-world claims.")

    lines += [
        "",
        "## Terminology",
    ]
    if glossary:
        lines += [f"- **{t['term']}** — {t['definition']}".rstrip() for t in glossary]
    else:
        lines.append("- _(none)_")

    lines += [
        "",
        "## Rules",
        "- Short, clear paragraphs; concrete > abstract.",
        "- Add `[n]` wherever a claim needs a source (or realism-sensitive detail).",
        "- No code fences in normal prose; only for actual code/diagrams.",
        "- Avoid clichés and redundancy; preserve consistent POV and timeline.",
        "",
        "## Gate Awareness",
    ]
    if spec.get("kind") == "fiction":
        lines += [
            "- **Value-Shift Gate:** Each scene moves from state A→B.",
            "- **Hook Gate:** Chapter ends sustain momentum.",
            "- **Voice Enforcement:** Adhere to POV voice cards.",
            "- **Cliché & Redundancy Sweeps** on edit.",
            "- **Antagonist Competence:** Avoid easy wins.",
        ]
    else:
        lines += [
            "- **Claim–Evidence Gate:** Important assertions must have planned/supporting sources.",
            "- **Logical Flow Gate:** Thesis → claims → evidence → counterpoint → takeaway.",
            "- **Citation Hygiene:** No orphan quotes; consistent style.",
            "- **Terminology Consistency:** Define once; reuse exactly.",
        ]

    style_md = "\n".join(lines) + "\n"

    # -------------------- write artifacts --------------------
    style_path = STYLE_DIR / "style_guide.md"
    glossary_path = STYLE_DIR / "glossary.json"
    packs_path = STYLE_DIR / "packs.json"
    voice_cards_path = STYLE_DIR / "voice_cards.json"  # fiction only (may be empty)

    write_text(style_path, style_md)
    write_json(glossary_path, {"terms": glossary})
    write_json(packs_path, packs)
    if voice_cards:
        write_json(voice_cards_path, voice_cards)
    else:
        # create an empty file for deterministic downstream imports
        write_json(voice_cards_path, {})

    # style manifest for quick sanity checks
    manifest = {
        "kind": packs["kind"],
        "pack": packs.get("genre_pack") or packs.get("structure_pack"),
        "style_pack": packs["style_pack"],
        "citation_style": packs["citation_style"],
        "files": {
            "style_md": style_path.as_posix(),
            "glossary_json": glossary_path.as_posix(),
            "packs_json": packs_path.as_posix(),
            "voice_cards_json": voice_cards_path.as_posix(),
        },
    }
    write_json(STYLE_DIR / "style_manifest.json", manifest)

    print("[style] guide:", style_path)
    print("[style] glossary:", glossary_path, "| voice_cards:", voice_cards_path)
    print("[style] packs:", packs_path)
    return {
        "style_md": style_path.as_posix(),
        "glossary_json": glossary_path.as_posix(),
        "packs_json": packs_path.as_posix(),
        "voice_cards_json": voice_cards_path.as_posix(),
    }

# Execute immediately to materialize artifacts for downstream cells
STYLE_OUTPUTS = gen_style(BOOK)
print("Style writers ready")

[style] guide: content\style\style_guide.md
[style] glossary: content\style\glossary.json | voice_cards: content\style\voice_cards.json
[style] packs: content\style\packs.json
Style writers ready


In [9]:
# Cell 6 — Outline Generation (pack-aware, deterministic, hardened — minor robustness & logging tweaks)

from __future__ import annotations
import json, re
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

# Assumes from previous cells:
# - CONFIG (BOOK/PL/PATHS), write_text, write_json, read_json, has_file, now_utc_iso,
#   sha1, stable_json_dumps, make_cache_key, log_exc, update_run_manifest
# - router: PMRouter instance whose .author Agent supports stage + cache_payload

BOOK: Dict[str, Any] = CONFIG["book_spec"]
PL: Dict[str, Any]   = CONFIG["pipeline"]
PATHS: Dict[str, str] = CONFIG["paths"]

# -------------------- Local helpers --------------------

def _safe_int(x, default):
    try:
        return int(x)
    except Exception:
        return default

def _looks_like_ipynb(obj) -> bool:
    return isinstance(obj, dict) and {"cells", "metadata", "nbformat"} <= set(obj.keys())

def _json_repair(s: str) -> str:
    """
    Best-effort minimal repairs:
      - remove BOM, smart→straight quotes
      - strip fences & junk before first { or [
      - remove trailing commas before } or ]
      - slice to outermost braces/brackets
    """
    s = (s or "").replace("\r\n", "\n").replace("\r", "\n").strip()
    s = re.sub(r"^```(?:json|markdown)?\s*", "", s, flags=re.I | re.M)
    s = re.sub(r"\s*```$", "", s, flags=re.M)
    if s and s[0] == "\ufeff":
        s = s[1:]
    s = s.replace("“", '"').replace("”", '"').replace("’", "'").replace("‘", "'")
    first_brace = s.find("{"); first_bracket = s.find("[")
    cuts = [p for p in (first_brace, first_bracket) if p != -1]
    if cuts:
        s = s[min(cuts):]
    s = re.sub(r",(\s*[}\]])", r"\1", s)

    def _slice_outer(text, open_ch, close_ch):
        i, j = text.find(open_ch), text.rfind(close_ch)
        return text[i:j+1] if (i != -1 and j != -1 and j > i) else text

    if s.startswith("{"):
        s = _slice_outer(s, "{", "}")
    elif s.startswith("["):
        s = _slice_outer(s, "[", "]")
    return s.strip()

def parse_json_loose(raw: str):
    # 1) strict
    try:
        d = json.loads(raw)
        if _looks_like_ipynb(d): return None
        return d
    except Exception:
        pass
    # 2) repaired
    repaired = _json_repair(raw)
    try:
        d = json.loads(repaired)
        if _looks_like_ipynb(d): return None
        return d
    except Exception as e:
        log_exc("outline_loose_repair", e)
    # 3) slice candidates
    i_obj, j_obj = raw.find("{"), raw.rfind("}")
    i_arr, j_arr = raw.find("["), raw.rfind("]")
    candidates = []
    if i_obj != -1 and j_obj != -1 and j_obj > i_obj:
        candidates.append(raw[i_obj:j_obj+1])
    if i_arr != -1 and j_arr != -1 and j_arr > i_arr:
        candidates.append(raw[i_arr:j_arr+1])
    for cand in candidates:
        try:
            d = json.loads(cand)
            if _looks_like_ipynb(d): continue
            return d
        except Exception as e:
            log_exc("outline_loose_slice", e)
    # 4) last try
    try:
        d = json.loads(repaired)
        if _looks_like_ipynb(d): return None
        return d
    except Exception as e:
        log_exc("outline_loose_raw", e)
        return None

def _hash_list(xs: List[str]) -> str:
    return sha1(stable_json_dumps([x.strip() for x in (xs or [])]))

def _chapter_template(spec: Dict[str, Any]) -> Dict[str, Any]:
    return (spec.get("chapter_template") or {})

def _outline_constraints(spec: Dict[str, Any]) -> List[str]:
    return (spec.get("outline_constraints") or [])

def _seed_titles(spec: Dict[str, Any]) -> List[str]:
    return (spec.get("outline_seed") or [])

def brief(spec: Dict[str, Any]) -> str:
    sg = (spec.get("style_guide") or {})
    tmpl = _chapter_template(spec)
    oc   = _outline_constraints(spec)
    seed = _seed_titles(spec)
    lines = [
        f"Kind: {spec.get('kind','')}",
        f"Pack: {spec.get('genre_pack') or spec.get('structure_pack')}",
        f"OutlineSchema: {spec.get('outline_schema','')}",
        f"Title: {spec.get('title','')}",
        f"Audience: {spec.get('audience','')}",
        f"Goal: {spec.get('goal','')}",
        f"Tone: {spec.get('tone','')}",
        f"Chapters: {spec.get('chapters','')}",
        f"TargetWords: {spec.get('target_length_words','')}",
        f"Style: {sg.get('formatting','')}",
    ]
    if oc:
        lines.append("OutlineConstraints: " + " | ".join(oc))
    if seed:
        lines.append("SeedTitles: " + " | ".join(seed))
    if tmpl:
        sections = ", ".join(tmpl.get("sections", []))
        end_matter = ", ".join(tmpl.get("end_matter", []))
        lines.append(f"ChapterTemplate.sections: [{sections}]")
        if end_matter:
            lines.append(f"ChapterTemplate.end_matter: [{end_matter}]")
    return "\n".join(lines)

# -------------------- Normalizers (ensure schema compliance) --------------------

def _normalize_fiction_outline(d: Dict[str, Any], n: int, spec: Dict[str, Any]) -> Dict[str, Any]:
    tmpl = _chapter_template(spec)
    tmpl_sections = tmpl.get("sections") or [
        "Opening hook scene","Problem escalation","Twist or revelation","Action or moral dilemma","Cliffhanger or resolution"
    ]
    chapters = d.get("chapters") if isinstance(d.get("chapters"), list) else []
    seed = _seed_titles(spec)
    fixed = []
    for i in range(n):
        base = chapters[i] if i < len(chapters) and isinstance(chapters[i], dict) else {}
        num = i + 1
        title = (base.get("title") or (seed[i] if i < len(seed) else f"Chapter {num}")).strip()
        if len(title.split()) > 7:
            title = " ".join(title.split()[:7])
        sections = base.get("sections") if isinstance(base.get("sections"), list) else list(tmpl_sections)
        beats = base.get("beats") if isinstance(base.get("beats"), dict) else {}
        beats = {
            "stakes": beats.get("stakes") or "—",
            "turn_or_reversal": beats.get("turn_or_reversal") or "—",
            "set_piece": beats.get("set_piece") or "—",
            "promise": beats.get("promise") or "—",
        }
        fixed.append({"number": num, "title": title, "sections": sections, "beats": beats})
    out = {
        "chapters": fixed,
        "structure_variant": spec.get("outline_schema") or "thriller_three_act",
    }
    return out

def _normalize_nonfiction_outline(d: Dict[str, Any], n: int, spec: Dict[str, Any]) -> Dict[str, Any]:
    tmpl = _chapter_template(spec)
    tmpl_sections = tmpl.get("sections") or ["Introduction","Argument","Evidence/Case","Counterpoint","Summary"]
    chapters = d.get("chapters") if isinstance(d.get("chapters"), list) else []
    fixed = []
    for i in range(n):
        base = chapters[i] if i < len(chapters) and isinstance(chapters[i], dict) else {}
        num = i + 1
        title = (base.get("title") or f"Chapter {num}").strip()
        if len(title.split()) > 10:
            title = " ".join(title.split()[:10])
        sections = base.get("sections") if isinstance(base.get("sections"), list) else list(tmpl_sections)
        objectives = base.get("objectives") if isinstance(base.get("objectives"), list) else [
            "Clarify the claim", "Present evidence", "Address counterpoints"
        ]
        fixed.append({"number": num, "title": title, "sections": sections, "objectives": objectives})
    out = {
        "chapters": fixed,
        "citation_style": (spec.get("citation_style") or "none"),
        "structure_variant": spec.get("outline_schema") or "howto",
    }
    return out

def _fallback_outline(spec: Dict[str, Any]) -> Dict[str, Any]:
    n = int(spec.get("chapters") or 10)
    if (spec.get("kind") == "fiction"):
        return _normalize_fiction_outline({"chapters":[]}, n, spec)
    else:
        return _normalize_nonfiction_outline({"chapters":[]}, n, spec)

# -------------------- Core: generate outline via Author agent --------------------

def gen_outline(spec: Dict[str, Any], tracker, router) -> Tuple[str, str]:
    pj = PATHS["outline_json"]
    pm = PATHS["outline_md"]
    Path(pj).parent.mkdir(parents=True, exist_ok=True)
    Path(pm).parent.mkdir(parents=True, exist_ok=True)

    # If both artifacts already exist, reuse.
    if has_file(pj) and has_file(pm):
        return pj, pm

    n = int(spec.get("chapters", 10) or 10)
    seed = _seed_titles(spec)
    tmpl = _chapter_template(spec)
    tmpl_sections = tmpl.get("sections") or (["Introduction","Argument","Evidence/Case","Counterpoint","Summary"] if spec.get("kind")=="nonfiction" else
                                             ["Opening hook scene","Problem escalation","Twist or revelation","Action or moral dilemma","Cliffhanger or resolution"])

    # Request strict JSON schema depending on kind
    if spec.get("kind") == "fiction":
        schema_hint = """
Return STRICT MINIFIED JSON ONLY:

{
  "chapters": [
    {
      "number": 1,
      "title": "…",
      "sections": ["…","…","…"],
      "beats": {"stakes":"…","turn_or_reversal":"…","set_piece":"…","promise":"…"}
    }
  ],
  "structure_variant": "hero_journey|thriller_three_act|romance_beat_sheet|mystery_puzzle|…"
}
"""
    else:
        schema_hint = f"""
Return STRICT MINIFIED JSON ONLY:

{{
  "chapters": [
    {{"number": 1, "title": "…", "sections": ["Introduction","Argument","Evidence/Case","Counterpoint","Summary"], "objectives": ["reader will learn …","we will support claim …"]}}
  ],
  "citation_style": "{spec.get('citation_style','none')}",
  "structure_variant": "howto|case_study|essay|report|memoir|history"
}}
"""

    constraints = _outline_constraints(spec)
    brief_block = "BRIEF\n" + brief(spec)
    tmpl_line = "ChapterTemplate.sections=[" + ", ".join(tmpl_sections) + "]"
    seed_line = ("SeedTitles: " + " | ".join(seed)) if seed else "SeedTitles: (none)"

    instr = (
        f"{schema_hint}\n"
        f"- Exactly {n} chapters.\n"
        "- Titles should be concise (≤7 words fiction; ≤10 words nonfiction).\n"
        "- Follow ChapterTemplate sections for each chapter unless clearly inappropriate.\n"
        "- Respect OutlineConstraints.\n"
        "- NO prose, NO markdown fences, JSON ONLY."
    )

    prompt = "\n\n".join([brief_block, tmpl_line, seed_line, "INSTRUCTIONS", instr])

    # Build deterministic cache payload & key
    pv = PL.get("PROMPT_VERSIONS", {}).get("outline", "v3")
    payload = {
        "pv": pv,
        "kind": spec.get("kind"),
        "pack": spec.get("genre_pack") or spec.get("structure_pack"),
        "n": n,
        "seed_hash": _hash_list(seed),
        "tmpl_hash": sha1(stable_json_dumps(tmpl)),
        "oc_hash": _hash_list(constraints),
        "title": spec.get("title","")[:64],
    }
    cache_key = make_cache_key(PL.get("CACHE_PREFIXES", {}).get("outline","oln"), payload)

    # Token budget
    max_t = _safe_int(PL.get("OUTLINE_MAX_TOKENS", 1800), 1800)
    if PL.get("ULTRA_BUDGET_MODE"):
        max_t = max(900, int(max_t * 0.6))

    # Call Author agent (stage="outline")
    R = router.author.run(
        "outline",
        prompt,
        stage="outline",
        cache_payload=payload,
        cache_key=cache_key,
        max_t=max_t,
        force_json=True,
    )
    model_text = (R.get("text") or "").strip()
    try:
        write_text("logs/outline_model_raw.txt", model_text)
    except Exception:
        pass

    d = parse_json_loose(model_text)

    # Retry once with stricter reminder if needed
    if not (isinstance(d, dict) and isinstance(d.get("chapters"), list) and d["chapters"]):
        strict_prompt = prompt + "\n\nSTRICT: Return JSON object only — no prose."
        R2 = router.author.run(
            "outline_retry1",
            strict_prompt,
            stage="outline",
            cache_payload=payload,
            cache_key=cache_key + ":r1",
            max_t=max_t,
            force_json=True,
        )
        model_text2 = (R2.get("text") or "").strip()
        try:
            write_text("logs/outline_model_raw_retry1.txt", model_text2)
        except Exception:
            pass
        d = parse_json_loose(model_text2)

    # Fallback to deterministic local outline
    if not (isinstance(d, dict) and isinstance(d.get("chapters"), list) and d["chapters"]):
        d = _fallback_outline(spec)
        write_json(
            "logs/outline_parse_fallback.json",
            {"reason": "Invalid or empty JSON from model", "ts": now_utc_iso()},
        )

    # Normalize to schema
    if spec.get("kind") == "fiction":
        final = _normalize_fiction_outline(d, n, spec)
    else:
        final = _normalize_nonfiction_outline(d, n, spec)

    # Persist artifacts
    write_json(pj, final)

    # Markdown index (chapters + section headings)
    lines = [f"# Outline: {spec.get('title','')}", ""]
    for ch in final["chapters"]:
        lines.append(f"## Chapter {ch['number']}: {ch['title']}")
        secs = ch.get("sections") or []
        for idx, s in enumerate(secs, 1):
            lines.append(f"- {idx}. {s}")
        if spec.get("kind") == "fiction":
            b = ch.get("beats", {})
            lines.append(f"  - _Beats:_ stakes={b.get('stakes','—')}; turn={b.get('turn_or_reversal','—')}; set_piece={b.get('set_piece','—')}; promise={b.get('promise','—')}")
        lines.append("")
    Path(pm).parent.mkdir(parents=True, exist_ok=True)
    write_text(pm, "\n".join(lines).strip() + "\n")

    # Update run manifest
    try:
        update_run_manifest({
            "title": spec.get("title"),
            "kind": spec.get("kind"),
            "pack": spec.get("genre_pack") or spec.get("structure_pack"),
            "outline_schema": spec.get("outline_schema"),
            "chapters_planned": n,
            "outline_paths": {"json": pj, "md": pm},
            "outline_generated_at": now_utc_iso(),
        })
    except Exception as e:
        log_exc("outline_manifest_update", e)

    return pj, pm

print("Outline generator ready (Cell 6)")

Outline generator ready (Cell 6)


In [10]:
# Cell 6b — Continuity/Knowledge Merge (pack-aware; deterministic hashing)

from __future__ import annotations
from pathlib import Path
from typing import Dict, Any, List, Optional
import json, re

# Assumes from previous cells:
# - CONFIG (BOOK/PL/PATHS), write_text, write_json, read_json, now_utc_iso, sha1, stable_json_dumps, log_exc
# - Outline already written; edited chapters live at content/edits/<ch>/{chapter_xx.md, claims_report.json, continuity_notes.md}

BOOK: Dict[str, Any] = CONFIG["book_spec"]
PL: Dict[str, Any]   = CONFIG["pipeline"]
PATHS: Dict[str, str] = CONFIG["paths"]

def _ensure_dir(p: Path) -> None:
    p.mkdir(parents=True, exist_ok=True)

def _safe_json(path: Path) -> Any:
    try:
        return read_json(path.as_posix()) or {}
    except Exception:
        try:
            return json.loads(path.read_text(encoding="utf-8"))
        except Exception:
            return {}

def _default_ref_paths(kind: str) -> Dict[str, Path]:
    if kind.lower() == "fiction":
        p = Path(PATHS.get("story_bible", "references/story_bible.json"))
        return {"ref": p}
    else:
        p = Path(PATHS.get("knowledge_base", "references/knowledge_base.json"))
        return {"ref": p}

def _load_ref(kind: str) -> Dict[str, Any]:
    rp = _default_ref_paths(kind)["ref"]
    if rp.exists():
        d = _safe_json(rp)
        if isinstance(d, dict):
            return d
    # initialize new structure
    if kind.lower() == "fiction":
        return {
            "characters": {},
            "timeline": [],
            "motifs": [],
            "world_rules": {},
            "threads_open": [],
            "threads_closed": []
        }
    else:
        return {
            "thesis": BOOK.get("thesis","") if "thesis" in BOOK else "",
            "key_claims": [],
            "citations": BOOK.get("citations", []),
            "figures": [],
            "outline_links": []
        }

def _write_ref(kind: str, d: Dict[str, Any]) -> str:
    rp = _default_ref_paths(kind)["ref"]
    _ensure_dir(rp.parent)
    write_json(rp.as_posix(), d)
    # return a stable hash the caching layer can use
    try:
        raw = json.dumps(d, ensure_ascii=False, separators=(",", ":"), sort_keys=True)
    except Exception:
        raw = str(d)
    return sha1(raw)

def _parse_bullets(md: str) -> List[str]:
    out = []
    for ln in (md or "").splitlines():
        s = ln.strip()
        if not s: continue
        if s.startswith(("-", "*")):
            out.append(s.lstrip("-* ").strip())
    return out

def _merge_fiction(bible: Dict[str, Any], ch: int, notes_md: str) -> Dict[str, Any]:
    # Minimal, safe merge: add timeline entry + capture salient bullets from notes
    bullets = _parse_bullets(notes_md)
    if bullets:
        bible.setdefault("timeline", []).append({
            "ch": ch,
            "notes": bullets[:12]  # keep short to avoid bloat
        })
    # Heuristic: detect thread open/close tags from notes like "[open:#id]" "[close:#id]"
    opens = re.findall(r"\[open:#([A-Za-z0-9_-]+)\]", notes_md or "", flags=re.I)
    closes = re.findall(r"\[close:#([A-Za-z0-9_-]+)\]", notes_md or "", flags=re.I)
    for tid in opens:
        if not any(t.get("id")==tid for t in bible.get("threads_open", [])):
            bible.setdefault("threads_open", []).append({"id": tid, "opened_at": ch})
    for tid in closes:
        if not any(t.get("id")==tid for t in bible.get("threads_closed", [])):
            bible.setdefault("threads_closed", []).append({"id": tid, "closed_at": ch})
            # also remove from open if present
            bible["threads_open"] = [t for t in bible.get("threads_open", []) if t.get("id") != tid]
    return bible

def _merge_nonfiction(kb: Dict[str, Any], ch: int, notes_md: str, claims: List[Dict[str, Any]]) -> Dict[str, Any]:
    # Add claims (dedup by normalized text)
    def norm(s: str) -> str:
        return re.sub(r"\s+", " ", (s or "").strip().lower())
    seen = {norm(c.get("text","")) for c in kb.get("key_claims", []) if isinstance(c, dict)}
    for c in claims or []:
        text = (c or {}).get("text","")
        if not text: continue
        if norm(text) in seen: continue
        kb.setdefault("key_claims", []).append({
            "id": c.get("id") or len(kb.get("key_claims", [])) + 1,
            "text": text,
            "importance": "high" if (c.get("confidence") in ("low","med")) else "med",
            "evidence_plan": [c.get("category")] if c.get("category") else []
        })
        seen.add(norm(text))
    # Link chapter to claims added this pass
    added_ids = [c.get("id") for c in kb.get("key_claims", []) if isinstance(c, dict)]
    kb.setdefault("outline_links", [])
    kb["outline_links"].append({"chapter": ch, "claims": added_ids, "citations": []})
    # Notes to kb['notes'] (rolling)
    if notes_md and notes_md.strip():
        kb["notes"] = (kb.get("notes","") + "\n" + notes_md.strip()).strip()
    return kb

def merge_continuity_for_chapter(ch: int, *, kind: Optional[str] = None) -> str:
    """
    Merge editor outputs for a single chapter into Story Bible (fiction) or Knowledge Base (nonfiction).
    Returns new hash string of the reference artifact.
    """
    kind = (kind or BOOK.get("kind") or "fiction").lower()
    base = _load_ref(kind)
    notes_p = Path(f"content/edits/{ch:02d}/continuity_notes.md")
    claims_p = Path(f"content/edits/{ch:02d}/claims_report.json")
    notes_md = notes_p.read_text(encoding="utf-8", errors="ignore") if notes_p.exists() else ""
    claims = _safe_json(claims_p) or {}
    claims_list = claims.get("claims") if isinstance(claims, dict) else []

    if kind == "fiction":
        base = _merge_fiction(base, ch, notes_md)
    else:
        base = _merge_nonfiction(base, ch, notes_md, claims_list if isinstance(claims_list, list) else [])

    h = _write_ref(kind, base)
    # optional: record a small breadcrumb
    try:
        Path("logs").mkdir(exist_ok=True)
        write_json(f"logs/merge_ch{ch:02d}.json", {"chapter": ch, "kind": kind, "hash": h, "ts": now_utc_iso()})
    except Exception as e:
        log_exc("merge_continuity_breadcrumb", e)
    return h

def merge_all_continuity() -> Dict[str, Any]:
    """
    Merge over all planned chapters (from outline or stats).
    """
    outline = _safe_json("content/outline/outline.json")
    n = len(outline.get("chapters", [])) if isinstance(outline.get("chapters"), list) else int((read_json("build/stats.json") or {}).get("chapters_planned") or 0)
    merged = []
    for ch in range(1, max(1, n) + 1):
        try:
            h = merge_continuity_for_chapter(ch, kind=BOOK.get("kind"))
            merged.append({"chapter": ch, "hash": h})
        except Exception as e:
            log_exc(f"merge_ch{ch:02d}", e)
    # quick manifest touch
    try:
        man = read_json("logs/run_manifest.json") or {}
        man["continuity_merged_at"] = now_utc_iso()
        man["continuity_entries"] = merged
        write_json("logs/run_manifest.json", man)
    except Exception as e:
        log_exc("merge_manifest_update", e)
    print(f"[merge] continuity merged for {len(merged)} chapter(s).")
    return {"merged": merged}

print("Continuity/Knowledge Merge ready (Cell 6b)")

Continuity/Knowledge Merge ready (Cell 6b)


In [11]:
# Cell 7 — Per-Chapter Loop (two-pass authoring, tighten micro-pass, strict editor tags, selective research, gate metrics)

from __future__ import annotations
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Any
import json, re

# Assumes from previous cells:
# - CONFIG dict with BOOK/PL/PATHS
# - router: PMRouter instance
# - tracker: CostTracker instance
# - write_text, read_text, write_json, read_json, has_file, now_utc_iso, sha1, sanitize_md
# - compute_chapter_word_targets, log_exc
# - sanitize_chapter_markdown from this cell
# - ChapterEnvelope from utils

BOOK: Dict[str, Any] = CONFIG["book_spec"]
PL: Dict[str, Any]   = CONFIG["pipeline"]
PATHS: Dict[str, str] = CONFIG["paths"]

# ----------------------------- Small utilities -----------------------------

def _ensure_dir(p: Path) -> None:
    p.mkdir(parents=True, exist_ok=True)

def _safe_int(x, default: int) -> int:
    try:
        return int(x)
    except Exception:
        return default

def _safe_float(x, default: float) -> float:
    try:
        return float(x)
    except Exception:
        return default

def _bool(x, default: bool) -> bool:
    if x is None: return default
    if isinstance(x, bool): return x
    s = str(x).strip().lower()
    if s in {"1","true","yes","y","on"}: return True
    if s in {"0","false","no","n","off"}: return False
    return default

# ----------------------------- Path helpers --------------------------------

def ch_dir(ch, sub) -> Path:
    d = Path(f"content/{sub}/{int(ch):02d}")
    _ensure_dir(d)
    return d

def outline_ch(pj: str, ch: int) -> Dict[str, Any]:
    d = read_json(pj) or {}
    for it in d.get("chapters", []):
        if int(it.get("number", -1)) == int(ch):
            return it
    raise KeyError(f"Chapter {ch} not in outline")

# ----------------------------- Parse editor blocks --------------------------

def parse_editor_blocks(t: str) -> Tuple[str, Dict[str, Any], str]:
    """
    Extract three tagged blocks and return (md, claims_dict, notes_md).
    Recognizes both CONTINUITY_OR_ARGUMENT_NOTES_MD and CONTINUITY_NOTES_MD for backward-compat.
    Hardened against missing tags and malformed JSON.
    """
    def ex(tag: str) -> str:
        m = re.search(r"<" + re.escape(tag) + r">\s*([\s\S]*?)\s*</" + re.escape(tag) + r">", t, flags=re.DOTALL | re.IGNORECASE)
        return (m.group(1) if m else "").strip()

    md_raw = ex("DRAFT_EDITED_MD")
    claims_raw = ex("CLAIMS_REPORT_JSON")
    notes_raw = ex("CONTINUITY_OR_ARGUMENT_NOTES_MD") or ex("CONTINUITY_NOTES_MD")

    # Claims JSON
    claims: Dict[str, Any]
    try:
        parsed = json.loads(sanitize_md(claims_raw)) if claims_raw else {}
        if not isinstance(parsed, dict):
            parsed = {"raw": parsed}
        parsed.setdefault("claims", [])
        parsed.setdefault("issues", [])
        parsed.setdefault("notes", "")
        claims = parsed
    except Exception:
        claims = {"raw": sanitize_md(claims_raw or "")}

    return sanitize_md(md_raw), claims, sanitize_md(notes_raw)

# ---------- Generic helpers for end-matter & editor instruction ----------

def end_matter_spec(spec: Dict[str, Any]) -> Dict[str, Any]:
    """
    Build a generic end-matter spec from book_spec.
    Returns:
      { "labels": [{"label": "...", "heading": "### ..."}], "heading_fmt": "### {label}" }
    """
    tmpl = spec.get("chapter_template") or {}
    items = tmpl.get("end_matter") or []
    heading_fmt = tmpl.get("end_matter_heading_format", "### {label}")

    labels = []
    seen = set()
    for x in items:
        if isinstance(x, dict):
            label = (x.get("label") or x.get("title") or str(x)).strip()
            heading = x.get("heading")
        else:
            s = str(x)
            label = s.split(":", 1)[0].strip() if ":" in s else s.strip()
            heading = None
        if not label:
            continue
        key = label.lower()
        if key in seen:
            continue
        seen.add(key)
        labels.append({"label": label, "heading": heading or heading_fmt.format(label=label)})

    return {"labels": labels, "heading_fmt": heading_fmt}

EDITOR_AGENT_PREAMBLE = """
You are EditorAgent, responsible for polishing and validating the AuthorAgent's drafts.

Responsibilities
1) Editing & Cleanup
   - Improve clarity, readability, and flow.
   - Enforce tone, style, and heading rules from the style guide.
   - Remove redundancy, filler, and unnecessary complexity.

2) Fact-Check Preparation
   - Insert [n] markers where claims need supporting evidence.
   - Collect a structured list of those claims for ResearchAgent.

3) Output Requirements
   - Deliver a polished, unified chapter ready for assembly.
   - Produce a parallel claims report mapping [n] markers to unresolved facts.
   - Add brief continuity/argument notes.
""".strip()

def build_editor_instruction(
    dtext: str,
    spec: Dict[str, Any],
    oc: Dict[str, Any],
    ch: int,
    cfg: Dict[str, Any],
    tw: Optional[int] = None,
) -> str:
    """Strict editor instructions with exact tag protocol and end-matter policy."""
    tmpl = spec.get("chapter_template", {}) or {}
    ch_fmt = tmpl.get("chapter_heading_format", "## Chapter {number}: {title}")
    chapter_heading = ch_fmt.format(number=ch, title=oc.get("title", ""))

    render_heads = bool(tmpl.get("render_section_headings", False))
    sec_fmt = tmpl.get("section_heading_format", "### {title}")
    sec_fmt_preview = sec_fmt.replace("{index}", "N").replace("{title}", "Title")

    em = end_matter_spec(spec)
    em_labels = em["labels"]
    em_order_preview = ("\n".join([f"  {i + 1}) {item['heading']}" for i, item in enumerate(em_labels)]) if em_labels else "  (none)")
    target_words = _safe_int(tw, 0) or None

    sg = spec.get("style_guide", {})
    voice = sg.get("voice", "")
    formatting = sg.get("formatting", "")
    terms = ", ".join(sg.get("terminology", []))

    parts: List[str] = [
        EDITOR_AGENT_PREAMBLE,
        "\n\nSTYLE GUIDE SNAPSHOT\n",
        f"- Voice: {voice}\n" if voice else "",
        f"- Formatting: {formatting}\n" if formatting else "",
        f"- Terminology: {terms}\n" if terms else "",
        "\nINPUT_MD:\n",
        dtext,
        "\n\nEditing directives (follow strictly)\n",
        f"- Keep ONE chapter heading at the top, exactly:\n  {chapter_heading}\n",
        "- Do NOT repeat the chapter title inside the body.\n",
        (f"- If section headings are enabled, normalize to: {sec_fmt_preview}; renumber 1..N.\n" if render_heads
         else "- Section headings disabled: remove template beat subheads; keep only organic, helpful subheads.\n"),
        "- End-matter policy (STRICT):\n",
        "  • Keep exactly ONE end-matter block at EOF in the configured order.\n",
        "  • Treat label/colon or bold/italic variants as end-matter and move to EOF.\n",
        "  • Merge duplicates; keep the most informative content.\n",
        "  • Use the configured end-matter heading format:\n",
        em_order_preview, "\n",
        "  • Do NOT invent end-matter if no content exists.\n",
        "- Preserve timeline, names, and technical terms; no new plot facts.\n",
        "- Past tense, third-person limited (fiction) or pack-appropriate voice (nonfiction).\n",
        "- Collapse excessive blank lines; strip trailing spaces.\n",
        (f"- Target ~{target_words} words (±15%).\n" if target_words else ""),
        "- Add [n] sparingly for verifiable claims (tech/ops/AI/geo/policy).\n",
        "\nOUTPUT FORMAT (MANDATORY TAGS)\n",
        "<DRAFT_EDITED_MD>\n",
        "# Full unified chapter in Markdown with exactly one top chapter heading.\n",
        "</DRAFT_EDITED_MD>\n",
        "<CLAIMS_REPORT_JSON>\n",
        '{ "claims": [ {"id":1,"marker":"[1]","text":"…","category":"tech|ops|ai","confidence":"low|med|high"} ], "issues": [], "notes": "" }\n',
        "</CLAIMS_REPORT_JSON>\n",
        "<CONTINUITY_OR_ARGUMENT_NOTES_MD>\n",
        "- Where the chapter ends; character states; items in play; open threads.\n",
        "</CONTINUITY_OR_ARGUMENT_NOTES_MD>\n",
    ]
    return "".join(parts)

# ---------- Post-edit sanitizer (structure enforcement) ----------

def sanitize_chapter_markdown(md: str, spec: Dict[str, Any]) -> str:
    """
    Enforce:
      - exactly one chapter heading (synthesize if missing)
      - unwrap code fences
      - collect end-matter variants to EOF in defined order
      - normalize/remove section heads per template; renumber sequentially if enabled
      - trim trailing spaces & collapse extra blank lines
    """
    tmpl = spec.get("chapter_template") or {}
    md = (md or "").replace("\r\n", "\n").replace("\r", "\n")

    # Single top "## Chapter N: Title"
    ch_heading_rx = re.compile(r"^##\s*Chapter\s+\d+\s*[:\-–—]\s*.+\s*$", re.MULTILINE | re.IGNORECASE)
    lines = md.strip().splitlines()
    first = None
    for i, ln in enumerate(lines):
        if ch_heading_rx.match(ln):
            if first is None:
                first = i
            else:
                lines[i] = ""
    md = "\n".join(lines).strip()
    if not ch_heading_rx.search(md):
        number = _safe_int(spec.get("current_chapter_number", 1), 1)
        title = spec.get("current_chapter_title", "") or spec.get("title", "")
        ch_fmt = tmpl.get("chapter_heading_format", "## Chapter {number}: {title}")
        md = ch_fmt.format(number=number, title=title) + ("\n\n" + md if md else "")

    # Unwrap fenced blocks
    md = re.sub(r"```+(\w+)?\s*(.*?)```+", r"\2", md, flags=re.DOTALL)

    # End-matter collection and rebuild
    def _em_items(bs: Dict[str, Any]) -> List[Dict[str, str]]:
        items = (bs.get("chapter_template") or {}).get("end_matter") or []
        heading_fmt = (bs.get("chapter_template") or {}).get("end_matter_heading_format", "### {label}")
        out, seen = [], set()
        for x in items:
            if isinstance(x, dict):
                label = (x.get("label") or x.get("title") or str(x)).strip()
                heading = x.get("heading") or heading_fmt.format(label=label)
            else:
                s = str(x); label = s.split(":", 1)[0].strip() if ":" in s else s.strip()
                heading = heading_fmt.format(label=label)
            if label and label.lower() not in seen:
                seen.add(label.lower()); out.append({"label": label, "heading": heading})
        return out

    em_items = _em_items(spec)

    def collect_heading_blocks(text: str, item: Dict[str, str]) -> List[str]:
        h = re.escape(item["heading"])
        rx = re.compile(rf"(?ms)^\s*{h}\s*\n(.*?)(?=^\s*###\s+|\Z)")
        return rx.findall(text)

    def collect_label_variants(text: str, items: List[Dict[str, str]]):
        labels = [it["label"] for it in items]
        label_alt = r"|".join([re.escape(l) for l in labels])
        start_rx = re.compile(rf"(?im)^(?:[*_>\s]*)(?:{label_alt})(?:\s*[:\-–—]\s*)(.*)$")
        hits, lines = [], text.splitlines()
        i = 0
        while i < len(lines):
            m = start_rx.match(lines[i])
            if not m: i += 1; continue
            line_clean = re.sub(r"^[*_>\s]+|[*_\s:–—-]+$", "", lines[i]).strip().lower()
            matched_label = next((lab for lab in sorted(labels, key=len, reverse=True) if line_clean.startswith(lab.lower())), None)
            if not matched_label: i += 1; continue
            body_lines, inline_rest = [], m.group(1).rstrip()
            if inline_rest: body_lines.append(inline_rest)
            j = i + 1
            while j < len(lines):
                ln = lines[j]
                if ln.startswith("#") or start_rx.match(ln): break
                if ln.strip() == "" and j + 1 < len(lines) and lines[j + 1].startswith("#"): break
                body_lines.append(ln); j += 1
            hits.append((i, j, matched_label, "\n".join(body_lines).strip()))
            i = j
        return hits

    if em_items:
        found_blocks: Dict[str, List[str]] = {item["heading"]: [] for item in em_items}
        for item in em_items:
            for bl in collect_heading_blocks(md, item):
                found_blocks[item["heading"]].append(bl)

        variants = collect_label_variants(md, em_items)
        if variants:
            drop = [(a, b) for (a, b, _, _) in variants]
            rebuilt, lines = [], md.splitlines()
            for idx in range(len(lines)):
                if any(a <= idx < b for (a, b) in drop): continue
                rebuilt.append(lines[idx])
            md = "\n".join(rebuilt)
            by_label: Dict[str, List[str]] = {}
            for _, _, label_text, body in variants:
                if body.strip(): by_label.setdefault(label_text, []).append(body)
            label_to_heading = {it["label"]: it["heading"] for it in em_items}
            for lab, bodies in by_label.items():
                h = label_to_heading.get(lab)
                if h: found_blocks.setdefault(h, []).extend(bodies)

        # remove canonical headings in body
        for item in em_items:
            h = re.escape(item["heading"])
            md = re.sub(rf"(?ms)^\s*{h}\s*\n(.*?)(?=^\s*###\s+|\Z)", "", md)

        # rebuild EOF block
        em_out = []
        for item in em_items:
            h = item["heading"]
            found = found_blocks.get(h, [])
            if found:
                body = max(found, key=lambda x: len(x.strip())).strip()
                if body:
                    em_out.append(f"{h}\n{body}\n")
        if em_out:
            md = md.rstrip() + "\n\n" + "\n\n".join(em_out).rstrip() + "\n"

    # Section heading normalization/removal
    render_heads = bool(tmpl.get("render_section_headings", False))
    number_sections = bool(tmpl.get("number_sections", False))
    sec_fmt = tmpl.get("section_heading_format", "### {title}")
    known_secs = tmpl.get("sections") or (["Introduction","Argument","Evidence/Case","Counterpoint","Summary"]
                                          if BOOK.get("kind")=="nonfiction" else
                                          ["Opening hook scene","Problem escalation","Twist or revelation","Action or moral dilemma","Cliffhanger or resolution"])
    names_rx = r"|".join([re.escape(n) for n in known_secs])
    sec_rx = re.compile(rf"^(#+)\s+(?:\d+\.\s+)?({names_rx})\s*$", re.IGNORECASE | re.MULTILINE)

    matches = list(sec_rx.finditer(md))
    if matches:
        out_text = md
        offset, count = 0, 0
        for m in matches:
            start, end = m.span()
            start += offset; end += offset
            title = m.group(2).strip()
            if not render_heads:
                repl = ""
            else:
                count += 1
                display_title = f"{count}. {title}" if number_sections else title
                repl = sec_fmt.format(index=count, title=display_title)
            before, after = out_text[:start], out_text[end:]
            out_text = before + repl + after
            offset += (len(repl) - (end - start))
        md = out_text

    # Clean whitespace
    md = "\n".join([ln.rstrip() for ln in md.splitlines()])
    md = re.sub(r"\n{3,}", r"\n\n", md).strip() + "\n"
    return md

# ------------------------- Tighten micro-pass helper ------------------------

def build_author_tighten_prompt(chapter_md: str, ch: int, oc: Dict[str, Any]) -> str:
    """
    Ask AuthorAgent for a cheap 'tighten' pass:
      - improve cadence/clarity/micro-tension (fiction) or argumentative flow (nonfiction)
      - remove repetition/filler, keep facts/events intact
      - keep exactly one chapter heading at the top (do not change chapter title/number)
      - do not invent new facts; preserve continuity
    Output ONLY Markdown (no commentary).
    """
    return "".join([
        "You are AuthorAgent performing a LIGHT TIGHTEN pass.\n",
        "Goals:\n",
        "- Improve cadence and flow; vary sentence length; cut filler.\n",
        "- Preserve all plot facts (fiction) or claims/logic (nonfiction).\n",
        "- Keep exactly one top chapter heading as-is; do NOT alter title/number.\n",
        "- Keep section headings policy consistent with the input.\n",
        "- No code fences. Output ONLY Markdown.\n\n",
        f"CHAPTER_NUMBER: {ch}\n",
        f"CHAPTER_TITLE: {oc.get('title','')}\n\n",
        "INPUT_CHAPTER_MD:\n",
        chapter_md,
    ])

# --------------------------- Gate metric calculators ------------------------

def _gate_value_shift(router, ch: int, md_final: str, envelope) -> Optional[float]:
    """Cheap LLM micro-check: estimate fraction of scenes/sections with a clear state A→B change."""
    try:
        q = (
            "Given the chapter Markdown below, estimate the fraction (0..1) of scenes/sections that contain a clear VALUE SHIFT "
            "(the situation or stakes move from state A to state B). Return ONLY JSON: {\"value_shift_pass_rate\": float}.\n\n"
            + md_final[:25000]
        )
        R = router.editor.run(
            f"ch{ch}_gate_value_shift",
            q,
            stage="gate_check",
            max_t=300,
            force_json=True,
            cache_payload={"pv": PL.get("PROMPT_VERSIONS", {}).get("editor","v3"), "gate": "value_shift", "ch": ch},
            cache_key=sha1(q),
            envelope=envelope,
        ) or {}
        txt = (R.get("text") or "").strip()
        data = json.loads(sanitize_md(txt)) if txt else {}
        v = float(data.get("value_shift_pass_rate", 0.0))
        return max(0.0, min(1.0, v))
    except Exception as e:
        log_exc(f"ch{ch}_gate_value_shift", e)
        return None

def _gate_hook(router, ch: int, md_final: str, envelope) -> Optional[float]:
    """Cheap LLM micro-check: score last ~1200 chars for hook/cliffhanger momentum (0..1)."""
    try:
        tail = md_final[-1200:]
        q = (
            "Score the strength of the CHAPTER END hook/cliffhanger (0..1). A strong hook propels the reader forward with "
            "tension, an unanswered question, or escalated stakes. Return ONLY JSON: {\"hook_score\": float}.\n\n"
            f"CHAPTER_END_SNIPPET:\n{tail}"
        )
        R = router.editor.run(
            f"ch{ch}_gate_hook",
            q,
            stage="gate_check",
            max_t=250,
            force_json=True,
            cache_payload={"pv": PL.get("PROMPT_VERSIONS", {}).get("editor","v3"), "gate": "hook", "ch": ch},
            cache_key=sha1(q),
            envelope=envelope,
        ) or {}
        txt = (R.get("text") or "").strip()
        data = json.loads(sanitize_md(txt)) if txt else {}
        v = float(data.get("hook_score", 0.0))
        return max(0.0, min(1.0, v))
    except Exception as e:
        log_exc(f"ch{ch}_gate_hook", e)
        return None

def _gate_claim_coverage(claims: List[Dict[str, Any]], resolved: List[Dict[str, Any]]) -> Optional[float]:
    """
    Heuristic non-LLM check: coverage = fraction of claims with at least one plausible source/summary.
    A resolved item counts if it has any of: sources/url/citation/refs length > 0.
    """
    try:
        if not claims:
            return 1.0
        # Build quick lookup by marker or id
        def _has_support(it: Dict[str, Any]) -> bool:
            if not isinstance(it, dict): return False
            # any common support fields
            for k in ("sources","urls","citations","refs","references"):
                v = it.get(k)
                if isinstance(v, list) and len(v) > 0:
                    return True
            for k in ("url","doi","citation"):
                if it.get(k): return True
            # nested
            src = it.get("source") or it.get("support")
            if isinstance(src, (list, dict)): return True
            return False

        supported = 0
        # Try to align by marker/id if present
        by_marker = {}
        for r in resolved:
            mk = (str(r.get("marker") or "")).strip()
            if mk: by_marker.setdefault(mk, []).append(r)

        for cl in claims:
            mk = (str(cl.get("marker") or "")).strip()
            if mk and mk in by_marker:
                if any(_has_support(x) for x in by_marker[mk]):
                    supported += 1
                continue
            # fallback: any resolved has support -> partial credit
            if any(_has_support(r) for r in resolved):
                supported += 1

        return max(0.0, min(1.0, supported / max(1, len(claims))))
    except Exception:
        return None

# ------------------------ Main per-chapter processor ------------------------

def process_chapter(ch: int, cfg: Dict[str, Any], tracker, router, pj: str) -> None:
    """
    Two-pass authoring (sectioned draft → LIGHT tighten), strict editor with tagged outputs,
    selective research AFTER editor based on claims, and cheap gate metrics.
    Writes:
      - content/drafts/<ch>/draft.md
      - content/drafts/<ch>/draft_tight.md  (new; used as editor input when present)
      - content/edits/<ch>/chapter_<ch>.md
      - content/edits/<ch>/claims_report.json
      - content/edits/<ch>/continuity_notes.md
      - content/edits/<ch>/gates.json       (new; gate metrics for this chapter)
      - content/research/<ch>/claims_resolved.json (if research enabled & claims)
      - logs/chXX_author_raw_*.txt (raw model outputs)
    """
    ch = int(ch)
    env_cap = _safe_float(cfg.get("CHAPTER_COST_CAP_USD", PL.get("CHAPTER_COST_CAP_USD", 0.25)), 0.25)
    envelope = tracker.child_envelope(f"ch{ch:02d}", cap_usd=env_cap) if tracker else None

    # Chapter directories
    dr = ch_dir(ch, "research")
    dd = ch_dir(ch, "drafts")
    de = ch_dir(ch, "edits")
    _ensure_dir(Path("logs"))

    # Paths
    dp = dd / "draft.md"
    dp_tight = dd / "draft_tight.md"
    meta_path = dd / ".author_call.json"
    ep = de / f"chapter_{ch:02d}.md"
    decp = de / "claims_report.json"
    nop = de / "continuity_notes.md"
    gatep = de / "gates.json"

    # Outline object
    oc = outline_ch(pj, ch)

    # ---- Author draft (sectioned) ----
    need_author = (not dp.exists()) or (dp.stat().st_size < 8) or (sanitize_md(read_text(dp)).strip() == "")
    if need_author:
        tmpl = BOOK.get("chapter_template", {}) or {}
        ch_fmt = tmpl.get("chapter_heading_format", "## Chapter {number}: {title}")
        render_heads = bool(tmpl.get("render_section_headings", False))
        number_sections = bool(tmpl.get("number_sections", False))
        sec_fmt = tmpl.get("section_heading_format", "### {title}")
        sections = tmpl.get("sections") or (
            ["Introduction","Argument","Evidence/Case","Counterpoint","Summary"] if BOOK.get("kind")=="nonfiction" else
            ["Opening hook scene","Problem escalation","Twist or revelation","Action or moral dilemma","Cliffhanger or resolution"]
        )

        per, min_w, max_w, per_section = compute_chapter_word_targets(BOOK, cfg, len(sections), ch_num=ch)
        tw = max(min_w, min(max_w, per))
        per_section = _safe_int(cfg.get("AUTHOR_SECTION_TARGET_WORDS", per_section), per_section)

        ctx_chars = _safe_int(cfg.get("AUTHOR_CONTEXT_CHARS", 6000), 6000)

        em = end_matter_spec(BOOK)
        em_labels = em["labels"]
        em_preview = "\n".join([f"- {item['heading']}" for item in em_labels]) if em_labels else ""
        em_label_names = ", ".join([x["label"] for x in em_labels]) if em_labels else "end-matter items"

        accum: List[str] = []
        last_R = None

        for idx_sec, sec in enumerate(sections, 1):
            sec_title = f"{idx_sec}. {sec}" if number_sections else sec
            sec_heading = sec_fmt.format(index=idx_sec, title=sec_title)
            so_far = "\n".join(accum)[-ctx_chars:] if accum else ""
            chapter_heading = ch_fmt.format(number=ch, title=oc.get("title", ""))

            heading_chunk = (
                f"SECTION_HEADING_MD:\n{sec_heading}\n\n"
                "Begin the section with SECTION_HEADING_MD on its own line, then continue prose.\n"
                if render_heads else ""
            )
            end_matter_chunk = ""
            nonfinal_guard = ""
            if idx_sec == len(sections) and em_labels:
                end_matter_chunk = (
                    "\nAt the very end of THIS FINAL SECTION, append a single end-matter block in this order:\n"
                    + (em_preview + "\n")
                    + "Do not wrap end-matter in code fences.\n"
                )
            else:
                nonfinal_guard = (
                    f"- Do NOT include end-matter here (avoid lines like '{em_label_names}:' or bold/italic variants).\n"
                )

            prompt = "".join([
                "You are AuthorAgent. Draft the requested section only.\n\n",
                "BOOK SPEC:\n", json.dumps(BOOK, ensure_ascii=False, indent=2), "\n\n",
                "OUTLINE_JSON:\n", json.dumps(oc, ensure_ascii=False, indent=2), "\n\n",
                "CHAPTER_SO_FAR_MD:\n", so_far, "\n\n",
                f"Write ONLY Markdown for this section of '{chapter_heading}'.\n",
                f"Section {idx_sec}/{len(sections)}: {sec}\n",
                heading_chunk,
                "- Continue seamlessly from CHAPTER_SO_FAR_MD (no recap).\n",
                "- Maintain voice, POV, and pack style; short, cinematic paragraphs (or crisp argument flow for nonfiction).\n",
                f"- Target ≈ {per_section} words for THIS section.\n",
                "- Do not repeat the chapter title; only section text (plus section heading if enabled).\n",
                "- If you hit the target but the beat/objective isn’t complete, finish it.\n",
                "- Use [n] markers sparingly for verifiable claims.\n",
                nonfinal_guard,
                end_matter_chunk,
            ])

            last_R = router.author.run(
                f"ch{ch}_draft_s{idx_sec}",
                prompt,
                stage="author_block",
                cache_payload={"pv": PL.get("PROMPT_VERSIONS", {}).get("author","v4"), "ch": ch, "sec": idx_sec, "tw": tw, "sections": sections},
                cache_key=sha1(json.dumps({"ch": ch, "sec": idx_sec, "oc": oc, "tw": tw, "sections": sections}, sort_keys=True)),
                max_t=_safe_int(cfg.get("AUTHOR_MAX_TOKENS", 3200), 3200),
                envelope=envelope,
            )

            raw_model = (last_R or {}).get("text", "") or ""
            write_text(f"logs/ch{ch:02d}_author_raw_s{idx_sec}_MODEL.txt", raw_model)
            part = sanitize_md(raw_model)
            write_text(f"logs/ch{ch:02d}_author_raw_s{idx_sec}.txt", part)
            accum.append(part.strip())

        chapter_heading = ch_fmt.format(number=ch, title=oc.get("title", ""))
        full = chapter_heading + "\n\n" + "\n\n".join(accum)
        write_text(dp, full)

        meta = {
            "t": now_utc_iso(),
            "chapter": ch,
            "outline_title": oc.get("title", ""),
            "attempt": "sectioned",
            "target_words": tw,
            "sections": len(sections),
            "draft_chars": len(full),
            "draft_words": len(full.split()),
            "cached": bool(last_R and last_R.get("cached")),
            "usage": last_R.get("usage") if last_R else None,
            "est_cost": last_R.get("est_cost") if last_R else None,
        }
        write_json(meta_path, meta)
    else:
        if not meta_path.exists():
            write_json(meta_path, {"t": now_utc_iso(), "chapter": ch, "note": "Draft existed; AuthorAgent not called this run."})

    # ---- Author TIGHTEN micro-pass (optional, cheap) ----
    use_tighten = _bool(PL.get("AUTHOR_TIGHTEN_ENABLED", False), False) and not _bool(PL.get("ULTRA_BUDGET_MODE", False), False)
    dp_for_editor = dp
    if use_tighten:
        try:
            draft_text_for_tighten = read_text(dp)
            tighten_prompt = build_author_tighten_prompt(draft_text_for_tighten, ch, oc)
            Rt = router.author.run(
                f"ch{ch}_tighten",
                tighten_prompt,
                stage="author_tighten",
                cache_payload={"pv": PL.get("PROMPT_VERSIONS", {}).get("author","v4"), "ch": ch, "tighten": True},
                cache_key=sha1(draft_text_for_tighten),
                max_t=_safe_int(cfg.get("AUTHOR_TIGHTEN_MAX_TOKENS", 16384), 16384),
                envelope=envelope,
            ) or {}
            tightened = sanitize_md(Rt.get("text") or "")
            if tightened.strip():
                write_text(dp_tight, tightened)
                dp_for_editor = dp_tight
        except Exception as e:
            log_exc(f"ch{ch}_author_tighten", e)

    # ---- Editor pass (budget-aware) ----
    if PL.get("ULTRA_BUDGET_MODE"):
        # Minimal path: copy draft and skip edits
        write_text(ep, read_text(dp_for_editor))
        write_json(decp, {"info": "Editor skipped (ULTRA_BUDGET_MODE)", "chapter": ch})
        write_text(nop, "Local cleanup applied; no LLM edit due to ultra budget mode.")
        print(f"[ch{ch:02d}] Editor skipped: ULTRA_BUDGET_MODE")
        return

    draft_text = read_text(dp_for_editor)
    MAX_CHARS = _safe_int(cfg.get("EDITOR_MAX_INPUT_CHARS", PL.get("EDITOR_MAX_INPUT_CHARS", 300_000)), 300_000)
    dtext = draft_text[:MAX_CHARS]

    # compute word target for editor hints if available from meta
    meta = read_json(meta_path) or {}
    tw_for_editor = meta.get("target_words")

    instr = build_editor_instruction(dtext, {**BOOK, "current_chapter_number": ch, "current_chapter_title": oc.get("title","")}, oc, ch, cfg, tw=tw_for_editor)

    # Guard output tokens relative to context cap (matches Cell 4 logic)
    def _tok_est(chars: int) -> int: return max(1, chars // 4)
    EDITOR_MODEL = (cfg.get("EDITOR_MODEL") or PL.get("MODEL_ID_FAST","gpt-4o")).lower()
    MODEL_CAPS = {"gpt-4o": {"ctx": 128_000, "out": 16_384}, "gpt-4o-mini": {"ctx": 128_000, "out": 8_192}}
    caps = MODEL_CAPS.get(EDITOR_MODEL, {"ctx": _safe_int(cfg.get("EDITOR_CONTEXT_TOKENS", 128_000), 128_000),
                                         "out": _safe_int(cfg.get("EDITOR_MAX_TOKENS", 16_384), 16_384)})
    ctx_limit = min(_safe_int(cfg.get("EDITOR_CONTEXT_TOKENS", caps["ctx"]), caps["ctx"]), caps["ctx"])
    safe_margin = _safe_int(cfg.get("EDITOR_SAFE_MARGIN_TOKENS", 1_000), 1_000)
    out_cap = min(_safe_int(cfg.get("EDITOR_MAX_TOKENS", caps["out"]), caps["out"]), caps["out"])
    min_out = min(_safe_int(cfg.get("EDITOR_MIN_TOKENS", 16_384), 16_384), out_cap)
    est_in_tokens = _tok_est(len(dtext))
    desired_out = min(max(0, ctx_limit - est_in_tokens - safe_margin), out_cap)
    if desired_out < min_out:
        max_in_tokens = max(1, ctx_limit - min_out - safe_margin)
        max_in_chars = max_in_tokens * 4
        if len(dtext) > max_in_chars:
            dtext = dtext[-max_in_chars:]
            est_in_tokens = _tok_est(len(dtext))
            desired_out = min(max(0, ctx_limit - est_in_tokens - safe_margin), out_cap)
    editor_max_t = max(min_out, min(desired_out, out_cap))

    try:
        Re = router.editor.run(
            f"ch{ch}_edit",
            instr,
            stage="editor",
            cache_payload={"pv": PL.get("PROMPT_VERSIONS", {}).get("editor","v3"), "ch": ch},
            cache_key=sha1(dtext),
            max_t=editor_max_t,
            envelope=envelope,
        )
    except Exception as e:
        log_exc(f"ch{ch}_editor_call", e)
        Re = {}

    md, cl, nt = parse_editor_blocks((Re or {}).get("text", "") or "")
    md_final = sanitize_chapter_markdown(md or draft_text, {**BOOK, "current_chapter_number": ch, "current_chapter_title": oc.get("title","")})

    write_text(ep, md_final)
    write_json(decp, cl)
    write_text(nop, nt)

    # ---- Selective Research AFTER editor: resolve claims if enabled ----
    claims = (cl or {}).get("claims", []) if isinstance(cl, dict) else []
    resolved: List[Dict[str, Any]] = []
    if PL.get("RESEARCH_ENABLED", True) and claims and getattr(router, "research", None):
        research_prompt = json.dumps({"chapter": ch, "claims": claims}, ensure_ascii=False)
        try:
            Rr = router.research.run(
                f"ch{ch}_research_claims",
                "Resolve the following claims. Return a JSON array as specified in your system prompt.\n\n" + research_prompt,
                stage="research",
                cache_payload={"pv": PL.get("PROMPT_VERSIONS", {}).get("research","v2"), "ch": ch, "claims_hash": sha1(json.dumps(claims, sort_keys=True))},
                cache_key=sha1(research_prompt),
                max_t=_safe_int(cfg.get("RESEARCH_MAX_TOKENS", 1500), 1500),
                force_json=True,
                envelope=envelope,
            )
            research_text = (Rr or {}).get("text", "") or "[]"
        except Exception as e:
            log_exc(f"ch{ch}_research_claims_call", e)
            research_text = "[]"

        # Parse or store raw
        try:
            resolved = json.loads(sanitize_md(research_text))
            if not isinstance(resolved, list): resolved = [{"raw": resolved}]
        except Exception:
            resolved = [{"raw": sanitize_md(research_text)}]

        write_json(dr / "claims_resolved.json", resolved)

    # ---- Gate metrics (cheap, cacheable) ----
    gates_enabled = {
        "value_shift": _bool(PL.get("GATE_VALUE_SHIFT", True), True),
        "hook":        _bool(PL.get("GATE_HOOK", True), True),
        "claim_ev":    _bool(PL.get("GATE_CLAIM_EVIDENCE", True), True),
    }
    metrics: Dict[str, Any] = {
        "chapter": ch,
        "kind": BOOK.get("kind"),
        "t": now_utc_iso(),
        "value_shift_pass_rate": None,
        "hook_score": None,
        "claim_coverage_rate": None,
    }

    if BOOK.get("kind") == "fiction":
        if gates_enabled["value_shift"]:
            metrics["value_shift_pass_rate"] = _gate_value_shift(router, ch, md_final, envelope)
        if gates_enabled["hook"]:
            metrics["hook_score"] = _gate_hook(router, ch, md_final, envelope)
    else:
        # nonfiction
        if gates_enabled["claim_ev"]:
            # compute on claims & resolved research (if any)
            metrics["claim_coverage_rate"] = _gate_claim_coverage(claims, resolved)

    write_json(gatep, metrics)
    write_json(Path(f"logs/ch{ch:02d}_gates.json"), metrics)

    print(f"Chapter processor ready (ch{ch:02d})")

In [12]:
# Cell 8 — Assembly (NO exports) — builds book.md, toc.json, stats.json, and a readable References appendix (nonfiction)

from __future__ import annotations
from pathlib import Path
import json, re
from typing import Dict, Any, List, Tuple, Optional

# Assumes from previous cells:
# - write_text, read_text, write_json, read_json, has_file, now_utc_iso
# - count_words (or compute with simple split)
# - CONFIG["book_spec"] and/or book_spec; if not present, we’ll coalesce safely.

# ----------------------------- helpers -----------------------------

def _coalesce_spec(spec: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    """Prefer explicit spec, then CONFIG['book_spec'], then book_spec, else {}."""
    if spec:
        return spec
    try:
        cfg_spec = (CONFIG or {}).get("book_spec")  # type: ignore[name-defined]
        if cfg_spec:
            return cfg_spec
    except Exception:
        pass
    try:
        return book_spec  # type: ignore[name-defined]
    except Exception:
        return {}

def _count_words_local(s: str) -> int:
    try:
        return count_words(s)  # if provided elsewhere
    except Exception:
        return len((s or "").split())

def _count_unresolved_markers(text: str) -> Dict[str, int]:
    """
    KPI-ish unresolved counts:
      - literal [n] markers (selective research placeholders)
      - explicit TODO/TK/TBD tokens
    """
    t = text or ""
    n_markers = len(re.findall(r"\[(?:n|N)\]", t))
    todos = len(re.findall(r"\b(?:TODO|TBD|TK|FIXME)\b", t))
    return {"bracket_n": n_markers, "todo_tokens": todos, "total": n_markers + todos}

def _gather_references_nonfiction(spec: Dict[str, Any]) -> List[Dict[str, Any]]:
    """
    Collect citation-like objects for nonfiction:
      1) references/knowledge_base.json -> 'citations' array (preferred, already structured)
      2) content/research/*/claims_resolved.json -> fallback using 'citations'/'url'/'source' fields
    Output is CSL-JSON-ish dicts with at least 'id' and 'title' and/or 'URL'.
    """
    out: List[Dict[str, Any]] = []

    kb_path = Path("references/knowledge_base.json")
    if kb_path.is_file():
        try:
            kb = read_json(kb_path) or {}
            for c in kb.get("citations", []):
                if not isinstance(c, dict):
                    continue
                cid = str(c.get("id") or c.get("key") or c.get("doi") or c.get("url") or len(out) + 1)
                title = c.get("title") or c.get("container-title") or c.get("journal") or c.get("note") or "Untitled"
                item = {
                    "id": cid,
                    "type": c.get("type") or "article-journal",
                    "title": title,
                    "author": [{"literal": a} for a in (c.get("authors") or c.get("author") or [])] if isinstance(c.get("authors") or c.get("author"), list) else None,
                    "issued": {"date-parts": [[int(c.get("year"))]]} if str(c.get("year") or "").isdigit() else None,
                    "URL": c.get("url") or None,
                    "DOI": c.get("doi") or None,
                    "note": c.get("note") or None,
                }
                out.append({k: v for k, v in item.items() if v})
        except Exception:
            pass

    # Fallback: resolve from per-chapter research outputs
    if not out:
        for p in sorted(Path("content/research").glob("*/claims_resolved.json")):
            try:
                arr = read_json(p) or []
                for it in arr:
                    # try richer 'citations' array first
                    cits = it.get("citations") if isinstance(it, dict) else None
                    if isinstance(cits, list) and cits:
                        for c in cits:
                            if not isinstance(c, dict): continue
                            out.append({
                                "id": f"R{len(out)+1}",
                                "type": c.get("type") or "webpage",
                                "title": c.get("title") or it.get("summary") or "Source",
                                "URL": c.get("url") or c.get("URL"),
                                "issued": {"date-parts": [[int(c.get("year"))]]} if str(c.get("year") or "").isdigit() else None,
                                "note": c.get("note") or None,
                            })
                        continue
                    # else try a single URL/source field
                    url = (it or {}).get("url") or (it or {}).get("URL") or (it or {}).get("source")
                    if url:
                        out.append({
                            "id": f"R{len(out)+1}",
                            "type": "webpage",
                            "title": (it.get("summary") or "Source"),
                            "URL": str(url).strip(),
                        })
            except Exception:
                continue

    # Dedup by (title, URL)
    dedup, seen = [], set()
    for it in out:
        key = (it.get("title"), it.get("URL"))
        if key in seen:
            continue
        seen.add(key); dedup.append(it)
    return dedup

def _append_references_md(md: str, refs: List[Dict[str, Any]]) -> str:
    if not refs:
        return md
    lines = [md.rstrip(), "", "## References", ""]
    for i, r in enumerate(refs, 1):
        title = r.get("title") or r.get("container-title") or "Untitled"
        author_list = r.get("author") or []
        authors = ", ".join(
            [
                (a.get("literal")
                 or ", ".join([x for x in [a.get("family"), a.get("given")] if x]).strip(", "))
                for a in author_list if isinstance(a, dict)
            ]
        ) if isinstance(author_list, list) else ""
        year = ""
        issued = r.get("issued")
        if isinstance(issued, dict):
            dps = issued.get("date-parts")
            if isinstance(dps, list) and dps and isinstance(dps[0], list) and dps[0]:
                year = str(dps[0][0])
        url = r.get("URL") or r.get("url") or ""
        line = f"- [{i}] {title}"
        if authors: line += f" — {authors}"
        if year:    line += f" ({year})"
        if url:     line += f". {url}"
        lines.append(line)
    lines.append("")
    return "\n".join(lines)

# ----------------------------- main -----------------------------

def assemble_book(spec: Optional[Dict[str, Any]] = None) -> Tuple[str, str, str]:
    """
    Build `build/book.md` + `build/toc.json` + `build/stats.json`.
    NO exports here. (Exports are handled elsewhere.)
    Returns: (book_md_path, toc_json_path, stats_json_path)
    """
    spec = _coalesce_spec(spec)
    build_dir = Path("build"); build_dir.mkdir(parents=True, exist_ok=True)

    # Front matter
    title = str(spec.get("title", "Untitled")).strip()
    subtitle = str(spec.get("subtitle", "") or "").strip()
    author = str(spec.get("author", "Unknown Author")).strip()

    fm = [
        f"# {title}",
        f"## {subtitle}" if subtitle else "",
        f"**Author:** {author}",
        "",
        "---",
        "",
    ]

    chs, toc = [], []
    n = int(spec.get("chapters", 0) or 0)

    total_words = 0
    unresolved_total = 0

    for ch in range(1, n + 1):
        # Prefer edited chapter_<ch>.md; fall back to legacy name or draft
        edited_path = Path(f"content/edits/{ch:02d}/chapter_{ch:02d}.md")
        edited_alt  = Path(f"content/edits/{ch:02d}/draft_edited.md")  # legacy
        draft_path  = Path(f"content/drafts/{ch:02d}/draft.md")

        t, src, path_used = "", "missing", None
        if edited_path.exists():
            t, src, path_used = read_text(edited_path), "edit", edited_path
        elif edited_alt.exists():
            t, src, path_used = read_text(edited_alt), "edit", edited_alt
        elif draft_path.exists():
            t, src, path_used = read_text(draft_path), "draft", draft_path
        else:
            t = f"## Chapter {ch}: (missing)\nTODO"
            src, path_used = "missing", None

        chs.append(t.strip())

        # TOC entry = first H2
        tl = next((ln.strip() for ln in t.splitlines() if ln.strip().startswith("## ")), f"## Chapter {ch}")
        toc.append({"chapter": ch, "title_line": tl, "source": src, "path": str(path_used) if path_used else None})

        total_words += _count_words_local(t)
        unresolved_total += _count_unresolved_markers(t)["total"]

    # Build book.md
    md = "\n\n".join([x for x in fm if x] + chs).rstrip() + "\n"

    # Append References for nonfiction builds (if we have any)
    if str(spec.get("kind") or "").lower() == "nonfiction":
        refs = _gather_references_nonfiction(spec)
        if refs:
            md = _append_references_md(md, refs)

    # Write artifacts
    book_md = build_dir / "book.md"
    write_text(book_md.as_posix(), md)
    write_json((build_dir / "toc.json").as_posix(), toc)

    # KPIs / stats
    words_per_10k = max(1, total_words) / 10_000.0
    unresolved_per_10k = round(unresolved_total / words_per_10k, 2) if words_per_10k else float(unresolved_total)
    stats = {
        "ts": now_utc_iso(),
        "title": title,
        "chapters_planned": n,
        "total_words": total_words,
        "unresolved_placeholders_total": unresolved_total,
        "unresolved_per_10k_words": unresolved_per_10k,
    }
    stats_path = build_dir / "stats.json"
    write_json(stats_path.as_posix(), stats)

    print("[assembly] book.md, toc.json, stats.json written (no exports).")
    return book_md.as_posix(), "build/toc.json", stats_path.as_posix()


print("Assembly ready (no exports)")

Assembly ready (no exports)


In [13]:
# Cell 9 — QA (gates & hygiene checks; KPI reporting — with gate roll-ups and thresholds)

from __future__ import annotations
from pathlib import Path
import re, json
from typing import Dict, Any, List, Optional, Tuple

# Assumes from previous cells:
# - read_text, write_json, read_json, count_words (or fallback), now_utc_iso

# ----------------------------- helpers -----------------------------

def _count_words_local(s: str) -> int:
    try:
        return count_words(s)
    except Exception:
        return len((s or "").split())

def _count_unresolved_markers(text: str) -> Dict[str, int]:
    t = text or ""
    n_markers = len(re.findall(r"\[(?:n|N)\]", t))
    todos = len(re.findall(r"\b(?:TODO|TBD|TK|FIXME)\b", t))
    return {"bracket_n": n_markers, "todo_tokens": todos, "total": n_markers + todos}

def _strip_codeblocks(s: str) -> str:
    out, in_code = [], False
    for line in (s or "").splitlines():
        if line.strip().startswith("```"):
            in_code = not in_code
            continue
        if not in_code:
            out.append(line)
    return "\n".join(out)

def _safe_float(x, default=None) -> Optional[float]:
    try:
        if x is None: return default
        return float(x)
    except Exception:
        return default

def _avg(vals: List[Optional[float]]) -> Optional[float]:
    nums = [v for v in vals if isinstance(v, (int, float))]
    return (sum(nums) / len(nums)) if nums else None

# ----------------------------- main QA -----------------------------

def run_qa(spec: Dict[str, Any]):
    """
    Produces dist/qa_report.json with:
      - build presence & hygiene checks
      - unresolved placeholders KPI (≤ 2 per 10k words)
      - gate roll-ups from content/edits/<ch>/gates.json
        * fiction: value_shift_pass_rate avg; hook_score avg
        * nonfiction: claim_coverage_rate avg
      - per-chapter gate snapshots for debugging
    """
    out_dir = Path("dist")
    out_dir.mkdir(parents=True, exist_ok=True)

    rpt: Dict[str, Any] = {"checks": {}, "warnings": [], "metrics": {}, "ts": now_utc_iso(), "gate_metrics": {}}

    book_path = Path("build/book.md")
    ok = book_path.exists() and book_path.stat().st_size > 0
    rpt["checks"]["book_exists"] = bool(ok)
    if not ok:
        write_json(str(out_dir / "qa_report.json"), rpt)
        return rpt

    t = read_text(book_path)
    wc = _count_words_local(t)
    rpt["metrics"]["word_count"] = wc

    # Target window ±10%
    tgt = int(spec.get("target_length_words", 20000))
    low, hi = int(tgt * 0.9), int(tgt * 1.1)
    within = low <= wc <= hi
    rpt["checks"]["word_count_ok"] = bool(within)
    if not within:
        rpt["warnings"].append(f"Word count {wc} vs target {tgt} (acceptable range {low}-{hi}).")

    # Chapters present
    n = int(spec.get("chapters", 0) or 0)
    toc_path = Path("build/toc.json")
    missing_list = []
    if toc_path.exists():
        try:
            toc = read_json(toc_path) or []
            all_present = (len(toc) == n) and all(item.get("source", "unknown") != "missing" for item in toc)
            if not all_present and toc:
                for idx, item in enumerate(toc, start=1):
                    if item.get("source", "unknown") == "missing":
                        missing_list.append(idx)
            rpt["checks"]["all_chapters_present"] = bool(all_present)
        except Exception:
            rpt["checks"]["all_chapters_present"] = False
    else:
        # Heuristic via H2 count
        h2_count = sum(1 for ln in t.splitlines() if ln.strip().startswith("## "))
        rpt["checks"]["all_chapters_present"] = h2_count >= n
        if h2_count < n:
            rpt["warnings"].append(f"Expected {n} chapters but found {h2_count} H2 headings.")

    if missing_list:
        rpt["warnings"].append(f"Chapters marked missing in TOC: {missing_list}")

    # Per-chapter heading sanity: must start with "## Chapter X:"
    bad_headings = []
    for ch in range(1, n + 1):
        block = re.findall(
            rf"^##\s+Chapter\s+{ch}:[\s\S]*?(?=^##\s+Chapter\s+{ch + 1}:|\Z)",
            t, flags=re.M
        )
        if not block:
            bad_headings.append(ch)
    if bad_headings:
        rpt["warnings"].append(f"Chapters missing canonical heading format '## Chapter N: ...': {bad_headings}")

    # Hygiene checks
    text_no_code = _strip_codeblocks(t)
    if "####" in text_no_code:
        rpt["checks"]["heading_levels_ok"] = False
        rpt["warnings"].append("Headings exceed H3 (#### found).")
    else:
        rpt["checks"]["heading_levels_ok"] = True

    if re.search(r"[^\S\r\n]{2,}", t):
        rpt["warnings"].append("Double spaces detected.")
    if re.search(r"[ \t]+$", t, flags=re.M):
        rpt["warnings"].append("Trailing spaces at line ends.")

    # Heuristic: flag long double-quoted passages without any [n] nearby (nonfiction only)
    if str(spec.get("kind","")).lower() == "nonfiction":
        lines = text_no_code.splitlines()
        long_quote_issues = []
        for i, ln in enumerate(lines, 1):
            # look for long quoted substrings (≥ 20 chars between quotes)
            for m in re.finditer(r"“([^”]{20,})”|\"([^\"]{20,})\"", ln):
                window = "\n".join(lines[max(0, i-1):min(len(lines), i+2)])
                if "[n]" not in window:
                    long_quote_issues.append(i)
                    break
        if long_quote_issues:
            rpt["warnings"].append(f"Potential uncited long quotes on lines: {long_quote_issues[:20]}{'…' if len(long_quote_issues)>20 else ''}")

    # Unresolved markers KPI
    unresolved = _count_unresolved_markers(t)
    rpt["metrics"]["unresolved_total"] = unresolved["total"]
    words_per_10k = max(1, wc) / 10_000.0
    unresolved_per_10k = round(unresolved["total"] / words_per_10k, 2)
    rpt["metrics"]["unresolved_per_10k_words"] = unresolved_per_10k
    # KPI threshold from blueprint: ≤2 unresolved per 10k words
    unresolved_kpi_ok = unresolved_per_10k <= 2.0
    rpt["checks"]["unresolved_kpi_ok"] = bool(unresolved_kpi_ok)
    if not unresolved_kpi_ok:
        rpt["warnings"].append(f"Unresolved placeholders {unresolved_per_10k}/10k words (limit ≤ 2).")

    # ---------------- Gate roll-ups (from per-chapter gates.json) ----------------
    # Defaults from blueprint:
    fiction_thresholds  = {"value_shift": 0.95, "hook": 0.90}
    nonfiction_thresholds = {"claim_coverage": 0.95}

    per_ch_gates: List[Dict[str, Any]] = []
    vs_vals: List[Optional[float]] = []
    hk_vals: List[Optional[float]] = []
    cc_vals: List[Optional[float]] = []

    for ch in range(1, n + 1):
        gp = Path(f"content/edits/{ch:02d}/gates.json")
        if not gp.exists():
            per_ch_gates.append({"chapter": ch, "missing": True})
            continue
        g = read_json(gp) or {}
        entry = {"chapter": ch}
        vs = _safe_float(g.get("value_shift_pass_rate"))
        hk = _safe_float(g.get("hook_score"))
        cc = _safe_float(g.get("claim_coverage_rate"))
        if vs is not None:
            entry["value_shift_pass_rate"] = round(vs, 4)
            vs_vals.append(vs)
        if hk is not None:
            entry["hook_score"] = round(hk, 4)
            hk_vals.append(hk)
        if cc is not None:
            entry["claim_coverage_rate"] = round(cc, 4)
            cc_vals.append(cc)
        per_ch_gates.append(entry)

    kind = str(spec.get("kind") or "").lower()
    rpt["gate_metrics"]["per_chapter"] = per_ch_gates

    if kind == "fiction":
        vs_avg = _avg(vs_vals)
        hk_avg = _avg(hk_vals)
        rpt["gate_metrics"]["rollup"] = {
            "value_shift_avg": round(vs_avg, 4) if vs_avg is not None else None,
            "hook_score_avg": round(hk_avg, 4) if hk_avg is not None else None,
            "targets": fiction_thresholds,
        }
        # Pass/Fail against targets (only if at least one value exists)
        if vs_avg is not None:
            vs_ok = vs_avg >= fiction_thresholds["value_shift"]
            rpt["checks"]["kpi_value_shift_ok"] = bool(vs_ok)
            if not vs_ok:
                rpt["warnings"].append(f"Value-Shift KPI avg {vs_avg:.2%} < target {fiction_thresholds['value_shift']:.0%}.")
        if hk_avg is not None:
            hk_ok = hk_avg >= fiction_thresholds["hook"]
            rpt["checks"]["kpi_hook_ok"] = bool(hk_ok)
            if not hk_ok:
                rpt["warnings"].append(f"Hook/Cliffhanger KPI avg {hk_avg:.2%} < target {fiction_thresholds['hook']:.0%}.")
    else:
        cc_avg = _avg(cc_vals)
        rpt["gate_metrics"]["rollup"] = {
            "claim_coverage_avg": round(cc_avg, 4) if cc_avg is not None else None,
            "targets": nonfiction_thresholds,
        }
        if cc_avg is not None:
            cc_ok = cc_avg >= nonfiction_thresholds["claim_coverage"]
            rpt["checks"]["kpi_claim_coverage_ok"] = bool(cc_ok)
            if not cc_ok:
                rpt["warnings"].append(f"Claim-Evidence KPI avg {cc_avg:.2%} < target {nonfiction_thresholds['claim_coverage']:.0%}.")

    # Research / citations sanity for nonfiction (light)
    research_enabled = bool(
        (isinstance(spec.get("research_policy"), dict) and spec["research_policy"].get("enabled")) or
        (globals().get("pipeline_config", {}) or {}).get("RESEARCH_ENABLED") or
        True  # default true in our pipeline
    )
    if kind == "nonfiction" and research_enabled:
        has_markers = "[n]" in t
        rpt["checks"]["citations_present_if_research"] = bool(has_markers)
        if not has_markers:
            rpt["warnings"].append("No [n] markers while research is enabled for nonfiction.")

    # End-matter presence per chapter if defined
    end_matter = (spec.get("chapter_template", {}) or {}).get("end_matter", []) or []
    if end_matter:
        missing_em = []
        for ch in range(1, n + 1):
            block = re.findall(
                rf"^##\s+Chapter\s+{ch}:[\s\S]*?(?=^##\s+Chapter\s+{ch + 1}:|\Z)",
                t, flags=re.M
            )
            if block:
                chapter_md = block[0]
                # Require that each configured label appears somewhere
                missing = []
                for label in end_matter:
                    lab = label.get("label") if isinstance(label, dict) else str(label).split(":",1)[0]
                    if lab and lab not in chapter_md:
                        missing.append(lab)
                if missing:
                    missing_em.append({"chapter": ch, "missing": missing})
        if missing_em:
            rpt["warnings"].append(f"End-matter missing in some chapters: {missing_em}")

    # Outcome
    rpt["outcome"] = "PASS" if not rpt["warnings"] else "PASS_WITH_WARNINGS"

    write_json(str(out_dir / "qa_report.json"), rpt)
    print("[qa] Report written to dist/qa_report.json")
    return rpt


print("QA ready")

QA ready


In [14]:
# Cell 10 — Export (Pandoc with citeproc/CSL, richer logs; copies build/book.md to dist/)

from __future__ import annotations
from pathlib import Path
import json, re, shutil, subprocess, zipfile
from typing import Dict, Any, List, Tuple, Optional

# Assumes from previous cells:
# - read_text, write_text, read_json, write_json
# - assemble_book has already produced build/book.md
# - spec['export'] flags (docx/epub/pdf)

def minimal_docx(p, note="DOCX export unavailable; see dist/book.md"):
    p = Path(p)
    p.parent.mkdir(parents=True, exist_ok=True)
    safe = note.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
    with zipfile.ZipFile(p, "w", compression=zipfile.ZIP_DEFLATED) as z:
        z.writestr(
            "[Content_Types].xml",
            '<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
            '<Types xmlns="http://schemas.openxmlformats.org/package/2006/content-types">'
            '<Default Extension="rels" ContentType="application/vnd.openxmlformats-package.relationships+xml"/>'
            '<Default Extension="xml" ContentType="application/xml"/>'
            '<Override PartName="/word/document.xml" ContentType="application/vnd.openxmlformats-officedocument.wordprocessingml.document.main+xml"/>'
            "</Types>",
        )
        z.writestr(
            "_rels/.rels",
            '<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
            '<Relationships xmlns="http://schemas.openxmlformats.org/officeDocument/2006/relationships">'
            '<Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/officeDocument" Target="word/document.xml"/>'
            "</Relationships>",
        )
        z.writestr(
            "word/document.xml",
            '<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
            '<w:document xmlns:w="http://schemas.openxmlformats.org/wordprocessingml/2006/main">'
            "<w:body><w:p><w:r><w:t>"
            + safe
            + "</w:t></w:r></w:p></w:body></w:document>",
        )

def _pandoc_path() -> Optional[str]:
    return shutil.which("pandoc")

def _pandoc_version(pandoc: Optional[str]) -> Optional[str]:
    if not pandoc:
        return None
    try:
        p = subprocess.run([pandoc, "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        return (p.stdout.splitlines() or [""])[0].strip()
    except Exception:
        return None

def _run(cmd: List[str]) -> Tuple[int, str, str]:
    try:
        p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
        return p.returncode, p.stdout, p.stderr
    except Exception as e:
        return -1, "", str(e)

def _coalesce_spec(spec: Dict[str, Any] | None = None) -> Dict[str, Any]:
    if spec: return spec
    try:
        cfg_spec = (CONFIG or {}).get("book_spec")  # type: ignore[name-defined]
        if cfg_spec: return cfg_spec
    except Exception:
        pass
    try:
        return book_spec  # type: ignore[name-defined]
    except Exception:
        return {}

def _gather_refs_for_export(spec: Dict[str, Any]) -> List[Dict[str, Any]]:
    refs: List[Dict[str, Any]] = []
    kb = Path("references/knowledge_base.json")
    if kb.is_file():
        try:
            data = read_json(kb) or {}
            refs = data.get("citations", []) or []
        except Exception:
            refs = []
    if not refs:
        for p in sorted(Path("content/research").glob("*/claims_resolved.json")):
            try:
                arr = read_json(p) or []
                for it in arr:
                    # Prefer structured citations if present
                    cits = (it or {}).get("citations")
                    if isinstance(cits, list) and cits:
                        refs.extend([c for c in cits if isinstance(c, dict)])
                        continue
                    # Fallback to a single URL/source field
                    url = (it or {}).get("url") or (it or {}).get("URL") or (it or {}).get("source")
                    if url:
                        refs.append({"id": f"R{len(refs)+1}", "type": "webpage", "title": (it.get("summary") or "Source"), "URL": str(url).strip()})
            except Exception:
                continue
    # Dedup simple (title, URL)
    dedup, seen = [], set()
    for it in refs:
        key = (it.get("title"), it.get("URL"))
        if key in seen: continue
        seen.add(key); dedup.append(it)
    return dedup

def _detect_pdf_engine() -> Optional[str]:
    for cand in ("tectonic", "xelatex", "pdflatex", "wkhtmltopdf"):
        if shutil.which(cand):
            return cand
    return None

def export_deliverables(spec: Dict[str, Any] | None = None) -> Dict[str, Any]:
    """
    Copy build/book.md to dist/, then try exporting via Pandoc to DOCX/EPUB/PDF
    depending on spec['export'] flags. If Pandoc is missing, produce a minimal
    DOCX placeholder so deliverables always exist.

    Returns a dict with paths, commands, and logs.
    """
    spec = _coalesce_spec(spec)
    dist = Path("dist"); dist.mkdir(parents=True, exist_ok=True)
    build_md = Path("build/book.md")

    results: Dict[str, Any] = {
        "book_md": None, "docx": None, "epub": None, "pdf": None,
        "log": [], "commands": [], "pandoc_version": None,
        "bibliography": None, "csl": None, "flags": spec.get("export") or {"docx": True, "epub": True, "pdf": False}
    }

    # Copy/prepare book.md
    if build_md.exists():
        shutil.copyfile(build_md, dist / "book.md")
        results["book_md"] = (dist / "book.md").as_posix()
        md = read_text(build_md)
    else:
        md = "# Book (missing)\n"
        write_text((dist / "book.md").as_posix(), md)
        results["book_md"] = (dist / "book.md").as_posix()
        results["log"].append("build/book.md missing; wrote placeholder.")

    # Pandoc path & version
    pandoc = _pandoc_path()
    results["pandoc_version"] = _pandoc_version(pandoc)
    flags = results["flags"]

    if not pandoc:
        results["log"].append("pandoc not found; skipping rich exports.")
        # Minimal DOCX placeholder so users still get a .docx
        try:
            minimal_docx("dist/book.docx", note="Pandoc unavailable. Open dist/book.md.")
            results["docx"] = "dist/book.docx"
        except Exception as e:
            results["log"].append(f"Failed to create minimal DOCX: {e}")
        write_json("build/export_log.json", results)
        print("[export] Minimal artifacts written (no pandoc).")
        return results

    # Prepare refs (nonfiction and/or citation_style != 'none')
    citation_style = str(spec.get("citation_style", "none")).lower()
    refs_json = None
    if str(spec.get("kind") or "").lower() == "nonfiction" or citation_style != "none":
        refs = _gather_refs_for_export(spec)
        if refs:
            refs_dir = Path("references"); refs_dir.mkdir(parents=True, exist_ok=True)
            refs_json = refs_dir / "citations.csl.json"
            try:
                write_json(refs_json.as_posix(), refs)
                results["bibliography"] = refs_json.as_posix()
            except Exception as e:
                results["log"].append(f"Failed writing citations.csl.json: {e}")
                refs_json = None

    # Meta YAML
    meta_yaml = Path("build/_meta.yml")
    try:
        meta_yaml.write_text(
            "\n".join([
                f'title: "{spec.get("title","Untitled")}"',
                f'subtitle: "{spec.get("subtitle","")}"' if spec.get("subtitle") else "",
                f'author: "{spec.get("author","Unknown Author")}"',
                'lang: "en-US"',
            ]) + "\n",
            encoding="utf-8",
        )
    except Exception as e:
        results["log"].append(f"Failed to write meta YAML: {e}")

    # Build base command (+ citeproc/CSL if available)
    cite_args: List[str] = []
    if refs_json:
        cite_args = ["--citeproc", f"--bibliography={refs_json.as_posix()}"]
        # Optional CSL: use spec.citation_style if a matching file exists, else pick any known CSL present
        style_map = {
            "apa": "references/apa.csl",
            "chicago": "references/chicago-author-date.csl",
            "ieee": "references/ieee.csl",
        }
        # preferred style path (if present)
        preferred = style_map.get(citation_style)
        csl_path = None
        if preferred and Path(preferred).is_file():
            csl_path = preferred
        else:
            for alt in ["references/style.csl", "references/apa.csl", "references/chicago-author-date.csl", "references/ieee.csl"]:
                if Path(alt).is_file():
                    csl_path = alt
                    break
        if csl_path:
            cite_args += [f"--csl={csl_path}"]
            results["csl"] = csl_path

    base_cmd = [pandoc, (dist / "book.md").as_posix(), "-f", "markdown", "--metadata-file", meta_yaml.as_posix()]
    if cite_args:
        base_cmd += cite_args

    # Optional docx reference template (keeps heading styles consistent if present)
    docx_ref = "references/reference.docx" if Path("references/reference.docx").is_file() else None

    # DOCX
    if flags.get("docx", True):
        out = dist / "book.docx"
        cmd = base_cmd + (["--reference-doc", docx_ref] if docx_ref else []) + ["-o", out.as_posix()]
        results["commands"].append({"format": "docx", "cmd": cmd})
        code, out_s, err_s = _run(cmd)
        if code == 0:
            results["docx"] = out.as_posix()
        else:
            results["log"].append(f"DOCX export failed: {err_s.strip()[:500]}")
            try:
                minimal_docx(out.as_posix(), note="Pandoc failed; open dist/book.md")
                results["docx"] = out.as_posix()
            except Exception as e:
                results["log"].append(f"Minimal DOCX creation failed: {e}")

    # EPUB
    if flags.get("epub", True):
        out = dist / "book.epub"
        cmd = base_cmd + ["--toc", "-o", out.as_posix()]
        results["commands"].append({"format": "epub", "cmd": cmd})
        code, out_s, err_s = _run(cmd)
        if code == 0:
            results["epub"] = out.as_posix()
        else:
            results["log"].append(f"EPUB export failed: {err_s.strip()[:500]}")

    # PDF (requires LaTeX engine)
    if flags.get("pdf", False):
        out = dist / "book.pdf"
        pdf_engine = _detect_pdf_engine()
        cmd = base_cmd + (["--pdf-engine", pdf_engine] if pdf_engine and pdf_engine != "wkhtmltopdf" else []) + ["-o", out.as_posix()]
        results["commands"].append({"format": "pdf", "cmd": cmd, "pdf_engine": pdf_engine})
        code, out_s, err_s = _run(cmd)
        if code == 0:
            results["pdf"] = out.as_posix()
        else:
            if not pdf_engine:
                results["log"].append("PDF export failed: no LaTeX engine detected. Install tectonic/xelatex.")
            else:
                results["log"].append(f"PDF export failed with engine '{pdf_engine}'. Error: {err_s.strip()[:500]}")

    write_json("build/export_log.json", results)
    print("[export] Export results written:", {k: v for k, v in results.items() if k in ("docx","epub","pdf") and v})
    return results

print("Export ready")

Export ready


In [15]:
# Cell 11 — Manifest (enriched with stats, QA, exports, gate KPIs, and log tails)

from pathlib import Path
from datetime import datetime, timezone
import json

def _scan_files(root_dir: str):
    root = Path(root_dir)
    items = []
    if not root.exists():
        return items
    for p in sorted(root.rglob("*")):
        if p.is_file():
            st = p.stat()
            items.append(
                {
                    "path": p.as_posix(),
                    "size_bytes": st.st_size,
                    "mtime": datetime.fromtimestamp(st.st_mtime, timezone.utc)
                    .isoformat()
                    .replace("+00:00", "Z"),
                }
            )
    return items

def _tail_text(path: str, n: int = 50) -> list[str]:
    p = Path(path)
    if not p.exists():
        return []
    lines = p.read_text(encoding="utf-8", errors="ignore").splitlines()
    return lines[-n:]

def manifest(tr, outcome: str):
    Path("dist").mkdir(parents=True, exist_ok=True)

    # Pull in build/qa/export summaries if present
    stats = read_json("build/stats.json") or {}
    qa = read_json("dist/qa_report.json") or {}
    export_log = read_json("build/export_log.json") or {}
    run_manifest = read_json("logs/run_manifest.json") or {}

    # Lift key QA metrics (with defaults if absent)
    qa_checks = qa.get("checks", {})
    qa_metrics = qa.get("metrics", {})
    gate_roll = (qa.get("gate_metrics") or {}).get("rollup") or {}
    gate_per_ch = (qa.get("gate_metrics") or {}).get("per_chapter") or []

    m = {
        "outcome": outcome,
        "generated_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
        "files": {
            "build": _scan_files("build"),
            "dist": _scan_files("dist"),
        },
        "cost_summary": (tr.summary() if hasattr(tr, "summary") else None),
        "stats": {
            "title": stats.get("title"),
            "chapters_planned": stats.get("chapters_planned"),
            "total_words": stats.get("total_words") or qa_metrics.get("word_count"),
            "unresolved_placeholders_total": stats.get("unresolved_placeholders_total") or qa_metrics.get("unresolved_total"),
            "unresolved_per_10k_words": stats.get("unresolved_per_10k_words") or qa_metrics.get("unresolved_per_10k_words"),
        },
        "qa": {
            "outcome": qa.get("outcome"),
            "warnings_count": len(qa.get("warnings", [])),
            "warnings": qa.get("warnings", [])[:25],  # keep top warnings short
            "checks": {
                **qa_checks,
                # expose key KPIs explicitly
                "kpi_unresolved_ok": qa_checks.get("unresolved_kpi_ok"),
                "kpi_value_shift_ok": qa_checks.get("kpi_value_shift_ok"),
                "kpi_hook_ok": qa_checks.get("kpi_hook_ok"),
                "kpi_claim_coverage_ok": qa_checks.get("kpi_claim_coverage_ok"),
            },
            "gate_kpis": {
                "rollup": gate_roll,
                "per_chapter": gate_per_ch,
            },
        },
        "exports": {
            "docx": export_log.get("docx"),
            "epub": export_log.get("epub"),
            "pdf": export_log.get("pdf"),
            "log": export_log.get("log"),
        },
        "run_manifest": run_manifest or None,
        "logs_tail": {
            "agent_calls_jsonl_tail": _tail_text("logs/agent_calls.jsonl", 40),
            "last_error_tail": _tail_text("logs/last_error.txt", 60),
        },
    }

    write_json("dist/manifest.json", m)
    print("[manifest] dist/manifest.json written")
    return m

print("Manifest ready")

Manifest ready


In [16]:
# Cell 12 — Run-All / Rerun (aligned with assembly-only Cell 8 and separate export Cell 10)

from pathlib import Path
import os
import json

def make_router():
    tr = CostTracker(pipeline_config["RUN_COST_CAP_USD"])
    rt = PMRouter(tr)
    return rt, tr

def run_all():
    Path("logs").mkdir(parents=True, exist_ok=True)
    if os.getenv("DRY_RUN") == "1":
        print("[run_all] DRY_RUN=1 → skipping API calls; using synthetic outputs.")

    out = "PASS"
    tr = None  # ensure defined for finally

    try:
        rt, tr = make_router()

        # Style & outline
        try:
            gen_style(book_spec)
        except Exception as e:
            log_exc("gen_style", e)

        pj, pm = gen_outline(book_spec, tr, rt)

        # Chapter selection based on config
        total = int(book_spec.get("chapters", 0) or 0)
        if pipeline_config.get("FULL_RUN", True):
            chs = list(range(1, total + 1))
        else:
            sample_n = int(pipeline_config.get("SAMPLE_RUN_CHAPTERS", 2))
            chs = list(range(1, min(total, sample_n) + 1))

        # Per-chapter processing with cost-cap handling
        for ch in chs:
            try:
                process_chapter(ch, pipeline_config, tr, rt, pj)
            except CostCapExceededException as e:
                write_text("logs/last_error.txt", str(e))
                out = "ABORTED_COST_CAP"
                break
            except Exception as e:
                log_exc(f"process_chapter_{ch}", e)
                out = "FAILED_STEP"
                break

        # Assembly (no exports here)
        try:
            assemble_book(book_spec)
        except Exception as e:
            log_exc("assemble_book", e)
            return "FAILED_STEP"

        # QA
        try:
            qa = run_qa(book_spec)
            if qa.get("warnings"):
                out = "PASS_WITH_WARNINGS" if out == "PASS" else out
        except Exception as e:
            log_exc("run_qa", e)
            return "FAILED_STEP"

        # Export (separate cell handles pandoc/fallbacks)
        try:
            export_deliverables()
        except Exception as e:
            log_exc("export_deliverables", e)
            return "FAILED_STEP"

        return out

    except CostCapExceededException as e:
        write_text("logs/last_error.txt", str(e))
        out = "ABORTED_COST_CAP"
    except Exception as e:
        write_text("logs/last_error.txt", "FAILED_STEP: " + str(e))
        out = "FAILED_STEP"
    finally:
        try:
            manifest(tr, out) if tr else None
        except Exception as e:
            # last-ditch logging if manifest itself fails
            write_text("logs/last_error.txt", "MANIFEST_FAILED: " + str(e))

    print("Run done", out)
    return out

def rerun_chapter(n):
    n = int(n)
    # Clear prior artifacts for this chapter
    for sub in ["research", "drafts", "edits"]:
        d = Path(f"content/{sub}/{n:02d}")
        if d.exists():
            for p in d.glob("*"):
                if p.is_file():
                    p.unlink()

    Path("logs").mkdir(parents=True, exist_ok=True)

    rt, tr = make_router()
    pj = "content/outline/outline.json"
    if not Path(pj).exists():
        pj, _ = gen_outline(book_spec, tr, rt)

    process_chapter(n, pipeline_config, tr, rt, pj)
    assemble_book(book_spec)
    run_qa(book_spec)
    export_deliverables()
    manifest(tr, "PASS")
    print("Rerun done", n)

def resume():
    return run_all()

def verify_author_calls():
    from pathlib import Path
    import json

    print("AuthorAgent call verification:\n")
    any_found = False
    # Chapter stamps
    for p in sorted(Path("content/drafts").rglob("*/.author_call.json")):
        any_found = True
        d = json.loads(p.read_text(encoding="utf-8"))
        ch = p.parent.name
        print(
            f'- drafts/{ch}: called @ {d.get("t")} | cached={d.get("cached")} | title="{d.get("outline_title", "")}"'
        )
    if not any_found:
        print("- No .author_call.json stamps found in content/drafts/*")
    print("\nRecent agent call events (tail 8):")
    log = Path("logs/agent_calls.jsonl")
    if log.exists():
        lines = log.read_text(encoding="utf-8").splitlines()[-8:]
        for ln in lines:
            print("  ", ln)
    else:
        print("  logs/agent_calls.jsonl (missing)")

print("Controls ready")

Controls ready


In [17]:
# Cell 13 — Demonstration Mode (sample run; research off for speed; robust crash handling)

from pathlib import Path
import os, sys, shutil, subprocess, traceback
import json
from datetime import datetime, timezone

# Cheap demo run: sample a couple of chapters and DISABLE research for speed
pipeline_config["SAMPLE_RUN_CHAPTERS"] = int(pipeline_config.get("SAMPLE_RUN_CHAPTERS", 2))
pipeline_config["FULL_RUN"] = False
pipeline_config["RESEARCH_ENABLED"] = True  # demo: faster, no web/research agent calls

try:
    outcome = run_all()
except Exception as e:
    Path("dist").mkdir(parents=True, exist_ok=True)
    crash_manifest = {
        "outcome": "FAILED_STEP",
        "failed_step": "run_all_top_level",
        "error": str(e),
        "trace": traceback.format_exc(),
        "t": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
    }
    Path("dist/manifest.json").write_text(
        json.dumps(crash_manifest, indent=2), encoding="utf-8"
    )
    print("run_all crashed. Details saved to dist/manifest.json")
    outcome = "FAILED_STEP"

docx_path = Path("dist/book.docx")
pdf_path = Path("dist/book.pdf")
pdf_path.parent.mkdir(parents=True, exist_ok=True)

def _have_exe(cmd: str) -> bool:
    return shutil.which(cmd) is not None

def convert_docx_to_pdf(src: Path, dst: Path) -> bool:
    """
    Best-effort DOCX→PDF:
      1) docx2pdf (Windows/macOS w/ MS Word)
      2) LibreOffice soffice --headless (cross-platform)
      3) pandoc (if installed)
    Returns True on success, False otherwise.
    """
    # 1) docx2pdf on win/darwin only
    try:
        if sys.platform in ("win32", "darwin"):
            from docx2pdf import convert  # may raise ImportError
            print("Converting DOCX to PDF via docx2pdf...")
            convert(str(src), str(dst))
            return dst.exists() and dst.stat().st_size > 0
        else:
            print("docx2pdf is typically not supported on this platform; skipping that route.")
    except Exception as e:
        print(f"docx2pdf failed: {e}")

    # 2) LibreOffice (soffice)
    if _have_exe("soffice"):
        try:
            print("Converting via LibreOffice (soffice --headless)...")
            outdir = dst.parent
            cmd = [
                "soffice",
                "--headless",
                "--convert-to",
                "pdf",
                "--outdir",
                str(outdir),
                str(src),
            ]
            subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            produced = outdir / (src.stem + ".pdf")
            if produced.exists():
                if produced != dst:
                    produced.replace(dst)
                return dst.exists() and dst.stat().st_size > 0
        except Exception as e:
            print(f"LibreOffice conversion failed: {e}")

    # 3) Pandoc
    if _have_exe("pandoc"):
        try:
            print("Converting via pandoc...")
            subprocess.run(["pandoc", str(src), "-o", str(dst)], check=True)
            return dst.exists() and dst.stat().st_size > 0
        except Exception as e:
            print(f"pandoc conversion failed: {e}")

    print("No PDF converter available (docx2pdf/soffice/pandoc not working). Skipping PDF export.")
    return False

if docx_path.exists():
    success = convert_docx_to_pdf(docx_path, pdf_path)
    if success:
        print(f"PDF saved at: {pdf_path}")
    else:
        print("PDF export skipped or failed. See messages above.")
else:
    print("DOCX file not found. Cannot export PDF.")

m = read_json("dist/manifest.json") or {}
print("Outcome:", m.get("outcome", outcome))
print("Cost:", m.get("cost_summary"))

def tree(d):
    d = Path(d)
    if not d.exists():
        print(f"{d.as_posix()} (missing)")
        return
    for p in sorted(d.rglob("*")):
        if p.is_file():
            rel = p.relative_to(d).as_posix()
            print(f"{d.as_posix()}/{rel} ({p.stat().st_size} bytes)")

print("\nBuild:")
tree("build")

print("\nDist:")
tree("dist")

[style] guide: content\style\style_guide.md
[style] glossary: content\style\glossary.json | voice_cards: content\style\voice_cards.json
[style] packs: content\style\packs.json
Chapter processor ready (ch01)
[assembly] book.md, toc.json, stats.json written (no exports).
[qa] Report written to dist/qa_report.json
[export] Export results written: {'docx': 'dist/book.docx', 'epub': 'dist/book.epub'}
[manifest] dist/manifest.json written
Converting DOCX to PDF via docx2pdf...


  from .autonotebook import tqdm as notebook_tqdm
100%|██████████| 1/1 [00:05<00:00,  5.36s/it]

PDF saved at: dist\book.pdf
Cost: {'total_spent_usd': 0.156982, 'run_cap_usd': 3.0, 'remaining_usd': 2.843018, 'events': 20, 'ts': '2025-09-12T22:12:32.601962Z'}

Build:
build/_meta.yml (154 bytes)
build/book.md (27777 bytes)
build/export_log.json (1283 bytes)
build/stats.json (221 bytes)
build/toc.json (2216 bytes)

Dist:
dist/book.docx (21746 bytes)
dist/book.epub (16260 bytes)
dist/book.md (27777 bytes)
dist/book.pdf (109597 bytes)
dist/manifest.json (12900 bytes)
dist/qa_report.json (4129 bytes)





In [18]:
# TRIAGE CELL — diagnose failing step and what's been produced (robust, actionable, with snapshot)

from __future__ import annotations
from pathlib import Path
from datetime import datetime, timezone
import json, os, re, shutil, subprocess

# ---------------------------- small local helpers ----------------------------

def _safe_json(path: str):
    p = Path(path)
    if not p.exists():
        return {}
    try:
        return json.loads(p.read_text(encoding="utf-8"))
    except Exception as e:
        print(f"[triage] Couldn't parse {path}: {e}")
        return {}

def _safe_text(path: str, limit: int | None = None) -> str:
    p = Path(path)
    if not p.exists():
        return ""
    s = p.read_text(encoding="utf-8", errors="replace")
    return s if limit is None else (s[:limit] + ("..." if len(s) > limit else ""))

def _tree(d: str):
    dpath = Path(d)
    if not dpath.exists():
        print(f"{dpath.as_posix()} (missing)")
        return
    for p in sorted(dpath.rglob("*")):
        if p.is_file():
            rel = p.relative_to(dpath).as_posix()
            print(f"{dpath.as_posix()}/{rel} ({p.stat().st_size} bytes)")

def _exists_size(p: str) -> tuple[bool, int]:
    q = Path(p)
    return (q.exists(), (q.stat().st_size if q.exists() else 0))

def _sum_dir_bytes(d: str) -> int:
    root = Path(d)
    if not root.exists():
        return 0
    total = 0
    for p in root.rglob("*"):
        if p.is_file():
            total += p.stat().st_size
    return total

def _which(cmd: str) -> str | None:
    return shutil.which(cmd)

def _pandoc_version() -> str | None:
    exe = _which("pandoc")
    if not exe:
        return None
    try:
        out = subprocess.run([exe, "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        return (out.stdout.splitlines() or [""])[0].strip()
    except Exception:
        return None

def _pdf_engine_detect() -> str | None:
    for cand in ("tectonic", "xelatex", "pdflatex", "wkhtmltopdf"):
        if _which(cand):
            return cand
    return None

def _now_iso():
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")

def _outline_chapter_count() -> int:
    d = _safe_json("content/outline/outline.json")
    if isinstance(d.get("chapters"), list):
        return len(d["chapters"])
    s = _safe_json("build/stats.json")
    return int(s.get("chapters_planned") or 0)

def _missing_chapters(n: int) -> list[int]:
    missing = []
    for ch in range(1, max(1, n) + 1):
        ep = Path(f"content/edits/{ch:02d}/chapter_{ch:02d}.md")
        if not ep.exists():
            missing.append(ch)
    return missing

# ---------------------------- 1) Manifest snapshot ---------------------------

man = _safe_json("dist/manifest.json")
print("=== TRIAGE @", _now_iso(), "===\n")

print("Outcome:", man.get("outcome"))
print("Failed step:", man.get("failed_step") or man.get("last_step") or man.get("step"))
print("Notes:", man.get("notes"))
print("Artifacts (legacy key):", man.get("artifacts"))

# Cost summary (if present)
cost = man.get("cost_summary") or {}
if cost:
    print("\nCost summary:")
    print("  total_spent_usd:", cost.get("total_spent_usd"))
    print("  run_cap_usd    :", cost.get("run_cap_usd"))
    print("  remaining_usd  :", cost.get("remaining_usd"))
    print("  log_items      :", cost.get("log_items"))

# Build stats
stats = man.get("stats") or _safe_json("build/stats.json")
if stats:
    print("\nBuild stats:")
    print("  title                    :", stats.get("title"))
    print("  chapters_planned         :", stats.get("chapters_planned"))
    print("  total_words              :", stats.get("total_words"))
    print("  unresolved_placeholders  :", stats.get("unresolved_placeholders_total"))
    print("  unresolved_per_10k_words :", stats.get("unresolved_per_10k_words"))

# QA
qa = man.get("qa") or _safe_json("dist/qa_report.json")
if qa:
    print("\nQA:")
    print("  outcome         :", qa.get("outcome"))
    wc = qa.get("warnings")
    print("  warnings_count  :", len(wc) if isinstance(wc, list) else wc)
    if isinstance(wc, list) and wc:
        print("  warnings (first 5):")
        for w in wc[:5]:
            print("   -", w)

    # Gate KPIs (rollups)
    gm = (qa.get("gate_metrics") or {})
    roll = gm.get("rollup") or {}
    if roll:
        print("  gate_rollup     :", roll)

# Exports (from export_log)
exports = man.get("exports") or _safe_json("build/export_log.json")
if exports:
    print("\nExports:")
    print("  docx:", exports.get("docx"))
    print("  epub:", exports.get("epub"))
    print("  pdf :", exports.get("pdf"))
    if exports.get("pandoc_version"):
        print("  pandoc_version  :", exports.get("pandoc_version"))
    if exports.get("log"):
        print("  log (first 3):")
        for line in (exports["log"][:3] if isinstance(exports["log"], list) else [exports["log"]]):
            print("   -", line)

# ---------------------------- 2) Recent logs & tails -------------------------

# JSON logs (newest first)
logs_json = sorted(Path("logs").glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True)
print("\nRecent JSON logs:", [p.name for p in logs_json[:6]])

# JSONL agent calls tail
agent_calls_tail = _safe_text("logs/agent_calls.jsonl", limit=4000)
if agent_calls_tail:
    print("\nagent_calls.jsonl (tail ~4k chars):\n")
    txt = Path("logs/agent_calls.jsonl").read_text(encoding="utf-8", errors="replace")
    print(txt[-4000:])

# last_error.txt
last_err = _safe_text("logs/last_error.txt", limit=2000)
if last_err:
    print("\nlast_error.txt (head 2k chars):\n")
    print(last_err)

# run manifest (if any)
run_manifest = _safe_json("logs/run_manifest.json")
if run_manifest:
    print("\nlogs/run_manifest.json (keys):", list(run_manifest.keys())[:10])

# ---------------------------- 3) Artifact presence ---------------------------

b_exists, b_size = _exists_size("build/book.md")
d_exists, d_size = _exists_size("dist/book.docx")
e_exists, e_size = _exists_size("dist/book.epub")
p_exists, p_size = _exists_size("dist/book.pdf")
toc_exists, _ = _exists_size("build/toc.json")

print("\nArtifacts presence:")
print("  build/book.md :", b_exists, "| size:", b_size)
print("  build/toc.json:", toc_exists)
print("  dist/book.docx:", d_exists, "| size:", d_size)
print("  dist/book.epub:", e_exists, "| size:", e_size)
print("  dist/book.pdf :", p_exists, "| size:", p_size)

# Check pandoc availability (useful if exports failed)
print("\nBinaries available:")
print(f"  pandoc   :", bool(_which("pandoc")), "|", (_pandoc_version() or "(version n/a)"))
print(f"  pdf eng. :", _pdf_engine_detect() or "(none detected)")
for exe in ["soffice"]:
    print(f"  {exe:8}:", bool(_which(exe)))

# ---------------------------- 4) Directory trees -----------------------------

print("\nBuild tree:")
_tree("build")
print("\nDist tree:")
_tree("dist")

# Sizes of heavy dirs (cache transparency)
print("\nDirectory sizes (bytes):")
for d in ["cache", "content/drafts", "content/edits", "content/research"]:
    print(f"  {d:18}:", _sum_dir_bytes(d))

# ---------------------------- 5) Diagnostics & advice ------------------------

def _diagnose():
    outcome = (man.get("outcome") or "").upper()
    failed = (man.get("failed_step") or man.get("last_step") or man.get("step") or "").upper()

    advice = []

    if outcome in ("ABORTED_COST_CAP",):
        rem = (man.get("cost_summary") or {}).get("remaining_usd")
        advice.append(f"- Run aborted by cost cap. Remaining ${rem}. Raise RUN_COST_CAP_USD/CHAPTER_COST_CAP_USD or enable ULTRA_BUDGET_MODE, then rerun.")
    elif outcome in ("FAILED_STEP",):
        advice.append("- A step failed. Check logs/last_error.txt above and the agent_calls.jsonl tail for the exact API error.")
    elif outcome in ("PASS_WITH_WARNINGS",):
        advice.append("- Build passed with warnings. See QA warnings above; address end-matter gaps, heading issues, or unresolved [n] markers.")
    elif outcome in ("PASS",):
        advice.append("- Build succeeded. If exports are missing, ensure pandoc (and LaTeX for PDF) is installed and rerun export_deliverables().")

    # Specific artifacts guidance
    if b_exists and not d_exists:
        advice.append("- book.md exists but DOCX is missing. Run export_deliverables(); if pandoc is missing, the pipeline should still create a minimal DOCX.")
    if not b_exists:
        advice.append("- book.md missing: run assemble_book(spec) after resolving chapter generation.")
    if stats and isinstance(stats.get("unresolved_per_10k_words"), (int, float)) and stats["unresolved_per_10k_words"] > 2:
        advice.append(f"- Unresolved placeholders exceed KPI ({stats['unresolved_per_10k_words']}/10k). Run selective research or patch drafts to resolve [n]/TODO/TK.")

    # Gate KPI nudges
    gm = (qa.get("gate_metrics") or {}) if qa else {}
    roll = gm.get("rollup") or {}
    if roll:
        if "value_shift_avg" in roll and isinstance(roll["value_shift_avg"], (int, float)) and roll["value_shift_avg"] < 0.95:
            advice.append("- Fiction KPI: Value-Shift average below 95%. Re-run tightening or tweak beats for weak scenes.")
        if "hook_score_avg" in roll and isinstance(roll["hook_score_avg"], (int, float)) and roll["hook_score_avg"] < 0.90:
            advice.append("- Fiction KPI: Hook/Cliffhanger average below 90%. Strengthen final beats per chapter.")
        if "claim_coverage_avg" in roll and isinstance(roll["claim_coverage_avg"], (int, float)) and roll["claim_coverage_avg"] < 0.95:
            advice.append("- Nonfiction KPI: Claim coverage below 95%. Add [n] markers where missing and run research to attach sources.")

    # Missing chapters quick check
    n_plan = _outline_chapter_count()
    missing = _missing_chapters(n_plan) if n_plan else []
    if missing:
        advice.append(f"- Missing edited chapters: {missing}. Re-run process_chapter() for those numbers.")

    # Top QA warnings, again
    if qa and isinstance(qa.get("warnings"), list) and qa["warnings"]:
        advice.append("- Top QA fixes:")
        for w in qa["warnings"][:2]:
            advice.append(f"    • {w}")

    # Show advice
    if advice:
        print("\nNext actions:")
        for line in advice:
            print(line)

_diagnose()

# ---------------------------- 6) Snapshot & optional verify ------------------

def triage_snapshot() -> dict:
    """Write dist/triage_snapshot.json and return the snapshot dict."""
    outline = _safe_json("content/outline/outline.json")
    qa_rpt  = qa or _safe_json("dist/qa_report.json")
    build_s = stats or _safe_json("build/stats.json")
    manifest= man or _safe_json("dist/manifest.json")
    export  = exports or _safe_json("build/export_log.json")

    n = _outline_chapter_count()
    missing = _missing_chapters(n) if n else []

    snap = {
        "generated_at": _now_iso(),
        "title": outline.get("title") or build_s.get("title"),
        "chapters_planned": n,
        "chapters_done": (n - len(missing)) if n else None,
        "chapters_missing": missing,
        "qa_outcome": qa_rpt.get("outcome") if isinstance(qa_rpt, dict) else None,
        "unresolved_per_10k": (qa_rpt.get("metrics") or {}).get("unresolved_per_10k_words") if isinstance(qa_rpt, dict) else build_s.get("unresolved_per_10k_words"),
        "gate_rollup": ((qa_rpt.get("gate_metrics") or {}).get("rollup") if isinstance(qa_rpt, dict) else None),
        "exports": {"docx": export.get("docx"), "epub": export.get("epub"), "pdf": export.get("pdf")},
        "cost_summary": manifest.get("cost_summary"),
        "binaries": {"pandoc": _pandoc_version(), "pdf_engine": _pdf_engine_detect()},
        "artifacts": {
            "book_md": Path("build/book.md").exists(),
            "docx": Path("dist/book.docx").exists(),
            "epub": Path("dist/book.epub").exists(),
            "pdf": Path("dist/book.pdf").exists(),
        },
    }
    Path("dist").mkdir(parents=True, exist_ok=True)
    Path("dist/triage_snapshot.json").write_text(json.dumps(snap, ensure_ascii=False, indent=2), encoding="utf-8")
    return snap

snap = triage_snapshot()
print("\n[triage] Snapshot written to dist/triage_snapshot.json")

# Optional: verify author calls if helper exists
try:
    verify_author_calls()
except Exception as e:
    print("\n[triage] verify_author_calls() unavailable or failed:", e)

print("Triage ready")

=== TRIAGE @ 2025-09-12T22:12:41.964377Z ===

Failed step: None
Notes: None
Artifacts (legacy key): None

Cost summary:
  total_spent_usd: 0.156982
  run_cap_usd    : 3.0
  remaining_usd  : 2.843018
  log_items      : None

Build stats:
  title                    : Ghost Protocol: The Oracle Gambit
  chapters_planned         : 18
  total_words              : 4732
  unresolved_placeholders  : 17
  unresolved_per_10k_words : 35.93

QA:
   - Word count 4752 vs target 95000 (acceptable range 85500-104500).
   - Chapters marked missing in TOC: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]
   - Unresolved placeholders 35.77/10k words (limit ≤ 2).
   - Value-Shift KPI avg 90.00% < target 95%.
   - Hook/Cliffhanger KPI avg 85.00% < target 90%.

Exports:
  docx: dist/book.docx
  epub: dist/book.epub
  pdf : None
  log (first 3):
   - PDF export failed: no LaTeX engine detected. Install tectonic/xelatex.

Recent JSON logs: ['ch01_gates.json', 'cost.json', 'run_manifest.json']

age