# Stage 06 — Build knowledge graph (parquet-first, QA gates, interactive viz)

This notebook consumes:
- `clusters_semantic.parquet` (Stage 04)
- `items_after_agent1.parquet` (Stage 04)
- `cluster_links_verified.parquet` (Stage 05)

and produces a **Neo4j/RDF-ready knowledge graph**:

- `exports/stage_06_kg/kg_nodes.parquet`
- `exports/stage_06_kg/kg_edges.parquet`
- `exports/stage_06_kg/kg_provenance.parquet`
- `exports/stage_06_kg/kg_summary.json`
- `exports/stage_06_kg/neo4j_csv/` and optional `kg.ttl`
- `exports/stage_06_kg/graph_pyvis.html` (interactive)

**New in this refactor**
- KG health gates + diagnostics (`kg_quality.py`)
- Interactive viz (`pyvis`)
- Cluster relationship edges from Agent 2 are added *with evidence attributes*.


In [None]:
# --- Colab-first setup ---
import os, sys, time
from pathlib import Path

FORCE_REBUILD = False
FAST_MODE = True
EDA_LEVEL = "core"

SHOW_PLOTS = True
SAVE_PLOTS = True

DRIVE_SEARCH_BASE = "/content/drive/MyDrive"

def _is_colab() -> bool:
    return "google.colab" in sys.modules

if _is_colab():
    from google.colab import drive  # type: ignore
    drive.mount("/content/drive")

def _resolve_project_root() -> Path:
    ev = os.environ.get("HISTO_PROJECT_ROOT")
    if ev and Path(ev).exists():
        return Path(ev)

    base = Path(DRIVE_SEARCH_BASE)
    candidates = []
    if base.exists():
        for p in base.glob("**/pipeline_config.yaml"):
            parent = p.parent
            if (parent / "label_taxonomy.yaml").exists():
                candidates.append(parent)
    if candidates:
        candidates = sorted(candidates, key=lambda p: p.stat().st_mtime, reverse=True)
        return candidates[0]

    p = Path.cwd()
    for _ in range(10):
        if (p / "pipeline_config.yaml").exists():
            return p
        p = p.parent
    raise FileNotFoundError("Could not resolve PROJECT_ROOT. Set HISTO_PROJECT_ROOT env var.")

PROJECT_ROOT = _resolve_project_root()
sys.path.insert(0, str(PROJECT_ROOT))
print("PROJECT_ROOT:", PROJECT_ROOT)

# Install deps
import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "-q", "install", "-r", str(PROJECT_ROOT / "requirements.txt")])

import yaml
cfg = yaml.safe_load((PROJECT_ROOT / "pipeline_config.yaml").read_text())

EXPORTS_DIR = PROJECT_ROOT / str(cfg.get("paths", {}).get("exports_dir", "exports"))
EXPORTS_DIR.mkdir(parents=True, exist_ok=True)

SAFE_MODE = bool(cfg.get("project", {}).get("safe_mode", True))
SEED = int(cfg.get("project", {}).get("seed", 1337))

print("SAFE_MODE:", SAFE_MODE)


In [None]:
# --- Stage paths + registries ---
from pathlib import Path
import pandas as pd

from histo_cartography.viz import ensure_dir, save_and_display, register_plot
from histo_cartography.artifact_registry import register_artifact, append_stage_manifest
from histo_cartography.critic import run_critic, write_critic_report, critic_result_table, critic_issues_table

stage_in_clusters = EXPORTS_DIR / "stage_04_agent1_cleanup" / "clusters_semantic.parquet"
stage_in_items = EXPORTS_DIR / "stage_04_agent1_cleanup" / "items_after_agent1.parquet"

# Prefer verified links if available; otherwise fallback to raw cluster_links
stage_in_links_verified = EXPORTS_DIR / "stage_05_agent2_linking" / "cluster_links_verified.parquet"
stage_in_links_raw = EXPORTS_DIR / "stage_05_agent2_linking" / "cluster_links.parquet"

assert stage_in_clusters.exists(), f"missing: {stage_in_clusters}"
assert stage_in_items.exists(), f"missing: {stage_in_items}"

links_path = stage_in_links_verified if stage_in_links_verified.exists() else stage_in_links_raw
assert links_path.exists(), f"missing links parquet: {links_path}"

stage_dir = EXPORTS_DIR / "stage_06_kg"
plots_dir = ensure_dir(stage_dir / "plots")
qa_dir = ensure_dir(stage_dir / "qa")
neo4j_dir = ensure_dir(stage_dir / "neo4j_csv")

out_nodes_path = stage_dir / "kg_nodes.parquet"
out_edges_path = stage_dir / "kg_edges.parquet"
out_prov_path = stage_dir / "kg_provenance.parquet"
out_summary_path = stage_dir / "kg_summary.json"

viz_records = []

print("links_path:", links_path)
print("stage_dir:", stage_dir)


In [None]:
# --- Load upstream artifacts ---
import pandas as pd
from IPython.display import display

clusters_semantic = pd.read_parquet(stage_in_clusters)
items_after_agent1 = pd.read_parquet(stage_in_items)
cluster_links = pd.read_parquet(links_path)

display(clusters_semantic.head(5))
display(cluster_links.head(5))

print("clusters_semantic:", clusters_semantic.shape)
print("items_after_agent1:", items_after_agent1.shape)
print("cluster_links:", cluster_links.shape)


## PEEP — Preflight health gates

In [None]:
# PEEP — Critic checks (clusters + items + links)
from IPython.display import display

crit1 = run_critic(
    df=clusters_semantic,
    stage="stage_06_kg",
    gate="peep_clusters_semantic",
    required_cols=["cluster_id","cluster_name","cluster_description"],
    id_col="cluster_id",
    min_rows=2,
    key_nonnull_cols=["cluster_id","cluster_name"],
)

crit2 = run_critic(
    df=items_after_agent1,
    stage="stage_06_kg",
    gate="peep_items_after_agent1",
    required_cols=["item_id","cluster_id","source","label","image_path"],
    id_col="item_id",
    min_rows=100 if not SAFE_MODE else 10,
    key_nonnull_cols=["item_id","cluster_id","image_path"],
)

# For links, accept either verified or raw schema
required_link_cols = ["src_cluster_id","dst_cluster_id","relationship"]
crit3 = run_critic(
    df=cluster_links,
    stage="stage_06_kg",
    gate="peep_cluster_links",
    required_cols=required_link_cols,
    id_col=None,
    min_rows=0,
    key_nonnull_cols=["src_cluster_id","dst_cluster_id","relationship"],
)

write_critic_report(crit1, qa_dir / "critic_peep_clusters_semantic.json")
write_critic_report(crit2, qa_dir / "critic_peep_items_after_agent1.json")
write_critic_report(crit3, qa_dir / "critic_peep_cluster_links.json")

display(critic_result_table(crit1))
display(critic_issues_table(crit1).head(20))
display(critic_result_table(crit2))
display(critic_issues_table(crit2).head(20))
display(critic_result_table(crit3))
display(critic_issues_table(crit3).head(20))


## Stage logic — Build KG tables (idempotent)
We build base nodes/edges from items+clusters, then add Agent2 cluster-cluster relationship edges (with evidence attributes).

In [None]:
# --- Build KG ---
import json
import pandas as pd
import numpy as np

from histo_cartography.exports import save_parquet
from histo_cartography.kg import build_kg_tables

t0 = time.time()

if out_nodes_path.exists() and out_edges_path.exists() and out_prov_path.exists() and not FORCE_REBUILD:
    kg_nodes = pd.read_parquet(out_nodes_path)
    kg_edges = pd.read_parquet(out_edges_path)
    kg_prov = pd.read_parquet(out_prov_path)
    kg_summary = json.loads(out_summary_path.read_text()) if out_summary_path.exists() else {}
    print("✅ Loaded existing KG artifacts:", kg_nodes.shape, kg_edges.shape, kg_prov.shape)
else:
    # Base graph from items + semantic clusters
    kg_nodes, kg_edges, kg_prov, kg_summary = build_kg_tables(
        items=items_after_agent1,
        clusters=clusters_semantic,
        include_similarity_edges=False,  # keep KG small; Stage 02/03 already has embedding diagnostics
        similarity_top_k=20,
        random_state=SEED,
    )

    # -----------------------------------------------------------------
    # Add cluster-cluster relationship edges (Agent2)
    # -----------------------------------------------------------------
    # Map cluster_id -> node_id for clusters
    cluster_nodes = kg_nodes[kg_nodes["node_type"].astype(str) == "cluster"].copy()
    cluster_id_to_node = dict(zip(cluster_nodes["cluster_id"].astype(int), cluster_nodes["node_id"].astype(str)))

    def _edge_id(src: str, dst: str, rel: str) -> str:
        import hashlib
        return hashlib.sha256((src + "|" + dst + "|" + rel).encode("utf-8")).hexdigest()[:16]

    rel_edges = []
    rel_prov = []

    for _, r in cluster_links.iterrows():
        try:
            src_c = int(r["src_cluster_id"])
            dst_c = int(r["dst_cluster_id"])
        except Exception:
            continue

        src = cluster_id_to_node.get(src_c)
        dst = cluster_id_to_node.get(dst_c)
        if not src or not dst:
            continue

        rel = str(r.get("relationship", "related_to")).upper()  # KG convention
        conf = float(r.get("confidence", 0.0) or 0.0)
        sim = r.get("centroid_similarity")
        sim = float(sim) if sim is not None and sim == sim else None

        attrs = {
            "confidence": conf,
            "centroid_similarity": sim,
            "needs_more_evidence": bool(r.get("needs_more_evidence", False)) if "needs_more_evidence" in r else None,
            "flags": r.get("flags"),
        }
        # store full row (careful: may include nested dicts); convert to JSON-safe
        try:
            attrs["raw_row"] = {k: (v if isinstance(v, (str,int,float,bool,type(None))) else str(v)) for k, v in r.to_dict().items()}
        except Exception:
            pass

        eid = _edge_id(src, dst, rel)
        prov_id = "prov_" + eid

        rel_edges.append(
            {
                "edge_id": eid,
                "src": src,
                "dst": dst,
                "rel": rel,
                "weight": conf if conf else 1.0,
                "attributes_json": json.dumps(attrs, ensure_ascii=False),
                "provenance_id": prov_id,
            }
        )
        rel_prov.append(
            {
                "provenance_id": prov_id,
                "source": "stage_05_agent2_linking",
                "method": "agent2_linking",
                "details_json": json.dumps({"links_path": str(links_path)}, ensure_ascii=False),
            }
        )

    rel_edges_df = pd.DataFrame(rel_edges)
    rel_prov_df = pd.DataFrame(rel_prov)

    kg_edges = pd.concat([kg_edges, rel_edges_df], ignore_index=True)
    kg_prov = pd.concat([kg_prov, rel_prov_df], ignore_index=True)

    # Persist
    save_parquet(kg_nodes, out_nodes_path)
    save_parquet(kg_edges, out_edges_path)
    save_parquet(kg_prov, out_prov_path)
    out_summary_path.write_text(json.dumps(kg_summary, indent=2))

runtime_sec = time.time() - t0
print("runtime_sec:", round(runtime_sec, 2))
print("kg_nodes:", kg_nodes.shape, "kg_edges:", kg_edges.shape, "kg_prov:", kg_prov.shape)


## CHECKPOINT — KG health gates + diagnostics

In [None]:
# KG health summary + plots
import json
from IPython.display import display

from histo_cartography.kg_quality import build_graph, kg_health_summary, plot_degree_distribution, plot_component_sizes

G = build_graph(kg_nodes.rename(columns={"node_id":"node_id"}), kg_edges)

summary = kg_health_summary(kg_nodes.rename(columns={"node_id":"node_id"}), kg_edges)
(qa_dir / "kg_health_summary.json").write_text(json.dumps(summary, indent=2))
display(summary)

# Degree distribution plot
fig = plot_degree_distribution(G, title="KG degree distribution (in+out)")
out_path = plots_dir / "kg_degree_distribution.png"
save_and_display(fig, out_path)
register_plot(viz_records, stage="stage_06_kg", plot_id="kg_degree_distribution", title="KG degree distribution", path=out_path, tags=["core","kg","quality"], is_core=True)

# Component sizes plot
fig = plot_component_sizes(G, title="KG weakly-connected component sizes")
out_path = plots_dir / "kg_component_sizes.png"
save_and_display(fig, out_path)
register_plot(viz_records, stage="stage_06_kg", plot_id="kg_component_sizes", title="KG component sizes", path=out_path, tags=["core","kg","quality"], is_core=True)


## Exports — Neo4j CSV + RDF Turtle (optional)

In [None]:
# Export Neo4j CSV and optional RDF
import pandas as pd

from histo_cartography.exports import export_neo4j_csv, export_rdf_turtle

# Convert nodes -> entities expected by export helpers
entities = kg_nodes.rename(columns={"node_id":"entity_id","node_type":"entity_type"}).copy()
entities["entity_type"] = entities["entity_type"].astype(str).str.upper()
entities["description"] = entities.get("description", "").astype(str)

neo_paths = export_neo4j_csv(entities=entities, edges=kg_edges, out_dir=neo4j_dir)
print("✅ Neo4j export:", neo_paths)

ttl_path = stage_dir / "kg.ttl"
ttl = export_rdf_turtle(entities=entities, edges=kg_edges, out_path=ttl_path)
if ttl:
    print("✅ RDF export:", ttl)
else:
    print("⚠️ rdflib not installed; RDF export skipped")


## Interactive graph view (PyVis)
This is a lightweight HTML you can open/download from Colab.

In [None]:
# PyVis interactive HTML (small subgraph to keep it fast)
import pandas as pd

try:
    from pyvis.network import Network  # type: ignore
except Exception as e:
    print("⚠️ pyvis not available:", e)
    Network = None

# Build a small subgraph: clusters + their direct relationship edges only
cluster_nodes = kg_nodes[kg_nodes["node_type"].astype(str) == "cluster"].copy()
cluster_ids = set(cluster_nodes["node_id"].astype(str).tolist())

rel_edges = kg_edges[kg_edges["src"].astype(str).isin(cluster_ids) & kg_edges["dst"].astype(str).isin(cluster_ids)].copy()
print("cluster_nodes:", cluster_nodes.shape, "cluster_rel_edges:", rel_edges.shape)

if Network is not None:
    net = Network(height="700px", width="100%", bgcolor="#ffffff", font_color="black", directed=True)

    # Add nodes
    for _, r in cluster_nodes.iterrows():
        nid = str(r["node_id"])
        label = str(r.get("name", ""))[:60]
        title = str(r.get("description", ""))[:300]
        net.add_node(nid, label=label, title=title)

    # Add edges
    for _, r in rel_edges.iterrows():
        net.add_edge(str(r["src"]), str(r["dst"]), title=str(r.get("rel","")), value=float(r.get("weight",1.0)))

    out_html = stage_dir / "graph_pyvis.html"
    net.write_html(str(out_html))
    print("✅ wrote:", out_html)
else:
    print("skipping pyvis export")


## Register artifacts + viz index

In [None]:
# Register stage artifacts + viz index
schema_version = str(cfg.get("project", {}).get("schema_version", "0.1.0"))

# Critic on nodes/edges
crit_nodes = run_critic(
    df=kg_nodes,
    stage="stage_06_kg",
    gate="checkpoint_kg_nodes",
    required_cols=["node_id","node_type","name"],
    id_col="node_id",
    min_rows=10,
    key_nonnull_cols=["node_id","node_type"],
)
crit_edges = run_critic(
    df=kg_edges,
    stage="stage_06_kg",
    gate="checkpoint_kg_edges",
    required_cols=["edge_id","src","dst","rel"],
    id_col="edge_id",
    min_rows=10,
    key_nonnull_cols=["edge_id","src","dst","rel"],
)

write_critic_report(crit_nodes, qa_dir / "critic_checkpoint_kg_nodes.json")
write_critic_report(crit_edges, qa_dir / "critic_checkpoint_kg_edges.json")

register_artifact(
    project_root=PROJECT_ROOT,
    stage="stage_06_kg",
    artifact="kg_nodes",
    path=out_nodes_path,
    schema_version=schema_version,
    inputs=[stage_in_clusters, stage_in_items, links_path],
    df=kg_nodes,
    warnings_count=int(crit_nodes.warnings_count + crit_edges.warnings_count),
    fails_count=int(crit_nodes.fails_count + crit_edges.fails_count),
    runtime_sec=float(runtime_sec),
    notes="KG nodes table",
)
register_artifact(
    project_root=PROJECT_ROOT,
    stage="stage_06_kg",
    artifact="kg_edges",
    path=out_edges_path,
    schema_version=schema_version,
    inputs=[stage_in_clusters, stage_in_items, links_path],
    df=kg_edges,
    warnings_count=int(crit_nodes.warnings_count + crit_edges.warnings_count),
    fails_count=int(crit_nodes.fails_count + crit_edges.fails_count),
    runtime_sec=float(runtime_sec),
    notes="KG edges table",
)
register_artifact(
    project_root=PROJECT_ROOT,
    stage="stage_06_kg",
    artifact="kg_provenance",
    path=out_prov_path,
    schema_version=schema_version,
    inputs=[stage_in_clusters, stage_in_items, links_path],
    df=kg_prov,
    warnings_count=int(crit_nodes.warnings_count + crit_edges.warnings_count),
    fails_count=int(crit_nodes.fails_count + crit_edges.fails_count),
    runtime_sec=float(runtime_sec),
    notes="KG provenance table",
)

append_stage_manifest(
    project_root=PROJECT_ROOT,
    stage="stage_06_kg",
    inputs=[stage_in_clusters, stage_in_items, links_path],
    outputs=[out_nodes_path, out_edges_path, out_prov_path],
    schema_version=schema_version,
    warnings_count=int(crit_nodes.warnings_count + crit_edges.warnings_count),
    fails_count=int(crit_nodes.fails_count + crit_edges.fails_count),
    runtime_sec=float(runtime_sec),
    notes="stage 06 run summary",
)

# viz index
from histo_cartography.viz import write_viz_index, viz_records_to_df
from IPython.display import display

viz_index_path = stage_dir / "viz_index.parquet"
write_viz_index(viz_records, out_parquet=viz_index_path, out_csv=stage_dir / "viz_index.csv")
display(viz_records_to_df(viz_records).head(200))
print("✅ wrote viz_index:", viz_index_path)


## Next actions
- Load `neo4j_csv/` into Neo4j or use `graph_pyvis.html` for a quick interactive check.
- If KG has many components/orphans: revisit Stage 05 relationships (add more links or lower MIN_SIM cautiously).