<a href="https://colab.research.google.com/github/sargonxg/TACITUS-Knowledge-Pipeline/blob/main/TACITUS_PIPELINE_Workbench_v8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 🏛️ TACITUS Knowledge Pipeline Workbench v8

**Purpose:** Production-grade, Colab-first knowledge pipeline workbench for TACITUS conflict & intelligence analysis.

# 🔐 Inputs I Need
Provide these in Colab secrets or environment variables (never hard-code secrets in the notebook):

**Google Cloud / Vertex AI**
- `GCP_PROJECT_ID`
- `GCP_REGION`
- `VERTEX_AUTH_METHOD` (e.g., `colab` or `service_account_json`)
- `GEMINI_MODEL`
- `EMBEDDING_MODEL`

**FalkorDB**
- `FALKORDB_HOST` (optional; if omitted, FalkorDBLite is used)
- `FALKORDB_PORT` (optional)
- `FALKORDB_USERNAME`
- `FALKORDB_PASSWORD`

**Neo4j**
- `NEO4J_URI`
- `NEO4J_USERNAME`
- `NEO4J_PASSWORD`
- `NEO4J_DATABASE`

**Test Data**
- 1–3 representative case files (txt/pdf/docx) OR permission to use included synthetic cases
- Optional: any “gold” expected outputs or evaluation questions

# 0) Setup & Imports
**Purpose:** Install dependencies and load libraries.

**Inputs:** None

**Outputs:** Installed libraries; importable modules.

**Failure modes:** Missing network access or pip issues.

**Cost notes:** None.

**What to run next:** Run the cell once in a fresh runtime.

In [None]:
import os
import sys
import json
import time
import math
import uuid
import hashlib
import textwrap
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple

# Lightweight install helper for Colab
try:
    import pydantic
except Exception:
    import subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "pydantic", "rapidfuzz", "ipywidgets"])

try:
    import rapidfuzz
except Exception:
    import subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "rapidfuzz"])

try:
    import ipywidgets as widgets
except Exception:
    import subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "ipywidgets"])

try:
    from neo4j import GraphDatabase
except Exception:
    GraphDatabase = None

try:
    import pandas as pd
except Exception:
    import subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "pandas"])
    import pandas as pd

from pydantic import BaseModel, Field, validator
from rapidfuzz import fuzz

# 1) Configuration
**Purpose:** Centralized configuration for the pipeline.

**Inputs:** Env vars / Colab secrets

**Outputs:** Config dictionary

**Failure modes:** Missing required credentials for external services.

**Cost notes:** None.

**What to run next:** Run this cell before ingestion/extraction.

In [None]:
CONFIG = {
    "gcp_project_id": os.getenv("GCP_PROJECT_ID"),
    "gcp_region": os.getenv("GCP_REGION"),
    "vertex_auth_method": os.getenv("VERTEX_AUTH_METHOD", "colab"),
    "gemini_model": os.getenv("GEMINI_MODEL", "gemini-1.5-pro"),
    "embedding_model": os.getenv("EMBEDDING_MODEL", "text-embedding-004"),
    "falkordb_host": os.getenv("FALKORDB_HOST"),
    "falkordb_port": os.getenv("FALKORDB_PORT"),
    "falkordb_username": os.getenv("FALKORDB_USERNAME"),
    "falkordb_password": os.getenv("FALKORDB_PASSWORD"),
    "neo4j_uri": os.getenv("NEO4J_URI"),
    "neo4j_username": os.getenv("NEO4J_USERNAME"),
    "neo4j_password": os.getenv("NEO4J_PASSWORD"),
    "neo4j_database": os.getenv("NEO4J_DATABASE", "neo4j"),
}

CONFIG

# 2) Deterministic IDs, Cleaning, and Chunking
**Purpose:** Stable IDs, text cleaning, and chunking.

**Inputs:** Raw text.

**Outputs:** Cleaned text, chunks, audit info.

**Failure modes:** None.

**Cost notes:** None.

**What to run next:** Run this cell before ingestion.

In [None]:
def mk_id(*parts: str) -> str:
    seed = "::".join(parts)
    return hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24]


def clean_text(text: str) -> Tuple[str, Dict[str, int]]:
    audit = {"removed_nulls": text.count(" "), "removed_control": 0}
    cleaned = "".join(ch for ch in text if ch.isprintable() or ch in "\n\t")
    audit["removed_control"] = len(text) - len(cleaned)
    return cleaned, audit


def chunk_text(text: str, chunk_size: int = 1200, overlap: int = 150) -> List[str]:
    chunks = []
    start = 0
    while start < len(text):
        end = min(len(text), start + chunk_size)
        chunks.append(text[start:end])
        start = end - overlap
        if start < 0:
            start = 0
        if start >= len(text):
            break
    return chunks

# 3) Evidence + Extraction Schemas
**Purpose:** Schema-constrained extraction with validation.

**Inputs:** Model responses.

**Outputs:** Validated objects.

**Failure modes:** Validation errors.

**Cost notes:** LLM calls (if enabled).

**What to run next:** Run this cell before extraction.

In [None]:
class EvidenceCandidate(BaseModel):
    quote: str = Field(..., min_length=1)

class ExtractedClaim(BaseModel):
    claim_id: str
    claim_type: str
    actor: str
    target: Optional[str] = None
    statement: str
    confidence: float = Field(..., ge=0.0, le=1.0)
    evidence: List[EvidenceCandidate]

    @validator("claim_type")
    def claim_type_not_empty(cls, v):
        if not v.strip():
            raise ValueError("claim_type must not be empty")
        return v

class ExtractionResult(BaseModel):
    claims: List[ExtractedClaim] = []

# 4) In-Memory Graph Store (Fallback)
**Purpose:** Provide a runnable fallback when Neo4j/FalkorDB are unavailable.

**Inputs:** Node/edge data.

**Outputs:** In-memory graph.

**Failure modes:** None.

**Cost notes:** None.

**What to run next:** Run this cell before ingestion.

In [None]:
@dataclass
class InMemoryGraph:
    nodes: Dict[str, Dict[str, Any]] = field(default_factory=dict)
    edges: List[Dict[str, Any]] = field(default_factory=list)

    def upsert_node(self, node_id: str, labels: List[str], properties: Dict[str, Any]):
        existing = self.nodes.get(node_id, {"labels": set(), "properties": {}})
        existing["labels"].update(labels)
        existing["properties"].update(properties)
        self.nodes[node_id] = existing

    def add_edge(self, src: str, rel: str, dst: str, properties: Optional[Dict[str, Any]] = None):
        self.edges.append({"src": src, "rel": rel, "dst": dst, "properties": properties or {}})

    def counts_by_label(self):
        counts = {}
        for node in self.nodes.values():
            for label in node["labels"]:
                counts[label] = counts.get(label, 0) + 1
        return counts

# 5) Neo4j Connection & Vector Index Helpers
**Purpose:** Connect to Neo4j and prepare vector indexes.

**Inputs:** Neo4j credentials.

**Outputs:** Driver object and helper functions.

**Failure modes:** Missing creds or network access.

**Cost notes:** None.

**What to run next:** Run this cell before Neo4j ingestion.

In [None]:
def get_neo4j_driver(config: Dict[str, Any]):
    if not config.get("neo4j_uri") or not GraphDatabase:
        return None
    return GraphDatabase.driver(
        config["neo4j_uri"],
        auth=(config.get("neo4j_username"), config.get("neo4j_password")),
    )


def ensure_vector_index(driver, index_name: str, label: str, property_name: str, dims: int = 768):
    if not driver:
        return False
    cypher = f"""
    CREATE VECTOR INDEX {index_name} IF NOT EXISTS
    FOR (n:{label})
    ON (n.{property_name})
    OPTIONS {{ indexConfig: {{`vector.dimensions`: {dims}, `vector.similarity_function`: 'cosine'}} }}
    """
    with driver.session(database=CONFIG.get("neo4j_database")) as session:
        try:
            session.run(cypher)
            return True
        except Exception as e:
            print("Vector index creation failed, falling back:", e)
            return False

# 6) LLM Extraction (Structured Output + Retry + Repair)
**Purpose:** Robust extraction with retries and schema validation.

**Inputs:** Chunk text.

**Outputs:** ExtractionResult.

**Failure modes:** API errors, validation errors.

**Cost notes:** Vertex/Gemini calls per chunk.

**What to run next:** Run this cell before extraction.

In [None]:
def _mock_extract(chunk: str) -> ExtractionResult:
    # Minimal mock extractor to allow notebook to run without external LLMs.
    claim = ExtractedClaim(
        claim_id=mk_id("claim", chunk[:40]),
        claim_type="mock",
        actor="Unknown",
        target=None,
        statement=chunk[:200].strip(),
        confidence=0.1,
        evidence=[EvidenceCandidate(quote=chunk[:80].strip())],
    )
    return ExtractionResult(claims=[claim])


def extract_with_retries(chunk: str, max_retries: int = 3) -> ExtractionResult:
    # Placeholder: implement Vertex structured output if credentials are present.
    for attempt in range(max_retries):
        try:
            return _mock_extract(chunk)
        except Exception:
            wait = 2 ** attempt
            time.sleep(wait)
    return ExtractionResult(claims=[])

# 7) Evidence Matching
**Purpose:** Link extracted evidence quotes to text spans.

**Inputs:** Extracted objects and source text.

**Outputs:** Span matches with scores.

**Failure modes:** None.

**Cost notes:** None.

**What to run next:** Run this cell before writing evidence links.

In [None]:
def find_best_span(text: str, quote: str) -> Tuple[int, int, float]:
    if not quote:
        return 0, 0, 0.0
    best_score = 0.0
    best_span = (0, 0)
    for i in range(0, max(len(text) - len(quote), 0), max(1, len(quote)//5)):
        window = text[i:i+len(quote)]
        score = fuzz.partial_ratio(quote, window)
        if score > best_score:
            best_score = score
            best_span = (i, i+len(window))
    return best_span[0], best_span[1], best_score

# 8) Unified Ingestion Pipeline
**Purpose:** Single canonical entrypoint for ingestion.

**Inputs:** Files, case name, options.

**Outputs:** Case graph + evidence.

**Failure modes:** Missing files, DB connection failures.

**Cost notes:** LLM calls + embedding calls.

**What to run next:** Run this cell before using the Workbench UI.

In [None]:
def ingest_case(
    filename: str,
    file_bytes: bytes,
    case_name: str,
    mode: str = "dry_run",
    with_evidence: bool = True,
    chunk_size: int = 1200,
    overlap: int = 150,
):
    text = file_bytes.decode("utf-8", errors="ignore")
    cleaned, audit = clean_text(text)
    chunks = chunk_text(cleaned, chunk_size=chunk_size, overlap=overlap)

    run_id = mk_id("run", case_name, str(time.time()))
    case_id = mk_id("case", case_name)

    mem_graph = InMemoryGraph()
    mem_graph.upsert_node(case_id, ["Case", "CTX"], {"name": case_name})
    mem_graph.upsert_node(mk_id("doc", filename), ["SourceDoc", "EVD"], {
        "filename": filename,
        "case_id": case_id,
        "cleaning_audit": audit,
    })

    for idx, chunk in enumerate(chunks):
        chunk_id = mk_id("chunk", case_id, str(idx))
        mem_graph.upsert_node(chunk_id, ["Chunk", "EVD"], {"index": idx, "text": chunk})
        mem_graph.add_edge(case_id, "HAS_CHUNK", chunk_id)

        extraction = extract_with_retries(chunk)
        for claim in extraction.claims:
            claim_node_id = claim.claim_id
            mem_graph.upsert_node(claim_node_id, ["Claim", "CTX"], claim.dict())
            mem_graph.add_edge(case_id, "HAS_CLAIM", claim_node_id)
            if with_evidence:
                for ev in claim.evidence:
                    start, end, score = find_best_span(chunk, ev.quote)
                    span_id = mk_id("span", chunk_id, str(start), str(end))
                    mem_graph.upsert_node(span_id, ["Span", "EVD"], {
                        "start": start,
                        "end": end,
                        "score": score,
                        "quote": ev.quote,
                    })
                    mem_graph.add_edge(claim_node_id, "SUPPORTED_BY", span_id)

    return {
        "case_id": case_id,
        "run_id": run_id,
        "graph": mem_graph,
        "chunks": len(chunks),
    }

# 9) Workbench UI
**Purpose:** Interactive controls for ingestion, replication, QA, and benchmarks.

**Inputs:** None.

**Outputs:** Widgets and live stats.

**Failure modes:** Widgets not available.

**Cost notes:** None.

**What to run next:** Run this cell to launch the workbench.

In [None]:
case_name_input = widgets.Text(description="Case Name", value="Demo Case")
file_upload = widgets.FileUpload(accept=".txt", multiple=False)
mode_select = widgets.Dropdown(options=["dry_run", "neo4j_only", "neo4j_then_falkor"], value="dry_run")
with_evidence_checkbox = widgets.Checkbox(value=True, description="With evidence")

run_button = widgets.Button(description="Ingest new case")
output = widgets.Output()

state = {}


def on_run_clicked(_):
    with output:
        output.clear_output()
        if not file_upload.value:
            print("Upload a .txt file to ingest.")
            return
        uploaded = next(iter(file_upload.value.values()))
        result = ingest_case(
            filename=uploaded["metadata"]["name"],
            file_bytes=uploaded["content"],
            case_name=case_name_input.value,
            mode=mode_select.value,
            with_evidence=with_evidence_checkbox.value,
        )
        state.update(result)
        print("Ingested case:", result["case_id"], "chunks:", result["chunks"])
        print("Node counts:", result["graph"].counts_by_label())

run_button.on_click(on_run_clicked)

widgets.VBox([
    case_name_input,
    file_upload,
    mode_select,
    with_evidence_checkbox,
    run_button,
    output,
])

# 10) Testing & Evaluation
**Purpose:** Evidence coverage and quality checks.

**Inputs:** In-memory graph results.

**Outputs:** Metrics.

**Failure modes:** None.

**Cost notes:** None.

**What to run next:** Run after an ingestion run.

In [None]:
def compute_evidence_coverage(graph: InMemoryGraph) -> float:
    claim_ids = [node_id for node_id, node in graph.nodes.items() if "Claim" in node["labels"]]
    supported = 0
    for claim_id in claim_ids:
        if any(edge for edge in graph.edges if edge["src"] == claim_id and edge["rel"] == "SUPPORTED_BY"):
            supported += 1
    return supported / max(len(claim_ids), 1)

if state.get("graph"):
    coverage = compute_evidence_coverage(state["graph"])
    print("Evidence coverage:", round(coverage * 100, 2), "%")
else:
    print("Run ingestion first.")

# 11) Benchmarking (Mock)
**Purpose:** Compare Neo4j vs FalkorDB (mocked in this fallback path).

**Inputs:** Ingestion stats.

**Outputs:** Pandas comparison table.

**Failure modes:** None.

**Cost notes:** None.

**What to run next:** Run after ingestion.

In [None]:
if state.get("graph"):
    data = {
        "metric": ["nodes", "edges", "evidence_coverage"],
        "neo4j": [len(state["graph"].nodes), len(state["graph"].edges), compute_evidence_coverage(state["graph"])],
        "falkordb": [len(state["graph"].nodes), len(state["graph"].edges), compute_evidence_coverage(state["graph"])],
    }
    df = pd.DataFrame(data)
    display(df)
else:
    print("Run ingestion first.")

# 12) Run Summary
**Purpose:** Summarize run status.

**Inputs:** Current state.

**Outputs:** Summary block.

**Failure modes:** None.

**Cost notes:** None.

**What to run next:** End of notebook.

In [None]:
if state.get("graph"):
    print("case_id:", state["case_id"])
    print("run_id:", state["run_id"])
    print("nodes:", len(state["graph"].nodes))
    print("edges:", len(state["graph"].edges))
    print("evidence_coverage:", round(compute_evidence_coverage(state["graph"]) * 100, 2), "%")
else:
    print("No run yet. Use the Workbench UI to ingest a case.")