In [1]:
# Install required packages
# mamba install -c conda-forge langchain langchain-text-splitters langchain-openai langchain-community wikipedia pydantic nbformat nbconvert
%pip install -q langchain langchain-text-splitters langchain-openai langchain-community wikipedia pydantic nbformat

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import re
import json
import hashlib
import time
import signal
from datetime import datetime
from dataclasses import dataclass, field, asdict
from typing import Optional
from urllib.parse import quote
from contextlib import contextmanager

import nbformat
from nbformat.v4 import new_notebook, new_markdown_cell, new_code_cell, new_raw_cell
from langchain_community.document_loaders import WikipediaLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# === Content ID (CID) Functions ===

def compute_cid(content: str) -> str:
    """Compute SHA256 content ID for a string."""
    return hashlib.sha256(content.encode('utf-8')).hexdigest()

def make_signature(cell_num: int, cell_type: str, cid: str, from_cid: str) -> dict:
    """Create a signature dict for a generated cell."""
    return {
        "cell": cell_num,
        "type": cell_type,
        "cid": cid,
        "from_cid": from_cid,
    }

def parse_signature(raw_content: str) -> Optional[dict]:
    """Parse a signature from raw cell content. Returns None if not a valid signature."""
    try:
        data = json.loads(raw_content.strip())
        if all(k in data for k in ("cell", "type", "cid", "from_cid")):
            return data
    except (json.JSONDecodeError, TypeError):
        pass
    return None

def extract_signatures(notebook) -> dict:
    """Extract all signatures from a notebook, keyed by cell number."""
    signatures = {}
    for cell in notebook.cells:
        if cell.cell_type == 'raw':
            sig = parse_signature(cell.source)
            if sig:
                signatures[sig["cell"]] = sig
    return signatures

# === Timeout Handling ===

CELL_TIMEOUT_SECONDS = 300  # 5 minutes

class TimeoutException(Exception):
    pass

@contextmanager
def timeout_context(seconds):
    """Context manager for timing out long-running operations."""
    def timeout_handler(signum, frame):
        raise TimeoutException(f"Operation timed out after {seconds} seconds")
    
    old_handler = signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, old_handler)

print("Dependencies loaded (with CID support)")

Dependencies loaded


## Configuration

In [3]:
# Pipeline configuration
ARTICLE_TITLE = "Albert Einstein"
OUTPUT_DIR = "data"
CHUNK_SIZE = 2000
CHUNK_OVERLAP = 128

# LLM configuration (shared across stages)
LLM_CONFIG = {
    "provider": "lm_studio",  # or "openai"
    "model": "qwen/qwen3-coder-30b",
    "temperature": 1,
    "base_url": os.environ.get("LM_STUDIO_BASE_URL", "http://host.docker.internal:1234/v1"),
}

# Create output directory
os.makedirs(OUTPUT_DIR, exist_ok=True)
print(f"Pipeline configured for: {ARTICLE_TITLE}")

Pipeline configured for: Albert Einstein


## Entity Registry

Manages entity identity across chunks with stable URIs derived from source URL.

In [4]:
@dataclass
class EntityRegistry:
    """Tracks entities with stable IDs derived from source URL."""
    source_url: str
    entities: dict = field(default_factory=dict)  # normalized_key -> entity
    aliases: dict = field(default_factory=dict)   # alias -> canonical_key
    
    def normalize_key(self, label: str) -> str:
        """Create consistent key from entity label."""
        return re.sub(r'[^a-z0-9]+', '_', label.lower().strip()).strip('_')
    
    def generate_uri(self, entity_type: str, label: str) -> str:
        """Generate URI based on source URL with fragment identifier."""
        key = self.normalize_key(label)
        # Use source URL as base, add fragment for entity
        return f"{self.source_url}#{entity_type.lower()}_{key}"
    
    def generate_id(self, entity_type: str, label: str) -> str:
        """Generate local ID for internal reference."""
        key = self.normalize_key(label)
        return f"{entity_type.lower()}_{key}"
    
    def register(self, label: str, entity_type: str, description: str = "",
                 aliases: list = None, source_chunk: int = None) -> str:
        """Register or update an entity, return canonical ID."""
        key = self.normalize_key(label)
        entity_id = self.generate_id(entity_type, label)
        entity_uri = self.generate_uri(entity_type, label)
        
        if key not in self.entities:
            self.entities[key] = {
                "id": entity_id,
                "uri": entity_uri,
                "label": label,
                "type": entity_type,
                "descriptions": [description] if description else [],
                "source_chunks": [source_chunk] if source_chunk is not None else [],
                "aliases": list(aliases or []),
            }
        else:
            existing = self.entities[key]
            if description and description not in existing["descriptions"]:
                existing["descriptions"].append(description)
            if source_chunk is not None and source_chunk not in existing["source_chunks"]:
                existing["source_chunks"].append(source_chunk)
            if aliases:
                existing["aliases"] = list(set(existing["aliases"]) | set(aliases))
        
        # Register aliases
        for alias in (aliases or []):
            self.aliases[self.normalize_key(alias)] = key
        
        return entity_id
    
    def lookup(self, label: str) -> Optional[dict]:
        """Find entity by label or alias."""
        key = self.normalize_key(label)
        canonical_key = self.aliases.get(key, key)
        return self.entities.get(canonical_key)
    
    def to_json(self) -> str:
        """Serialize registry to JSON."""
        return json.dumps({
            "source_url": self.source_url,
            "entities": self.entities,
            "aliases": self.aliases,
        }, indent=2)
    
    @classmethod
    def from_json(cls, json_str: str) -> 'EntityRegistry':
        """Deserialize registry from JSON."""
        data = json.loads(json_str)
        registry = cls(source_url=data["source_url"])
        registry.entities = data["entities"]
        registry.aliases = data["aliases"]
        return registry

print("EntityRegistry class defined")

EntityRegistry class defined


## Section Hierarchy Parser

Extracts Wikipedia section structure for breadcrumb context.

In [5]:
def extract_section_hierarchy(content: str) -> list[dict]:
    """Parse Wikipedia == headers == into hierarchical structure with positions."""
    header_pattern = re.compile(r'^(={2,6})\s*(.+?)\s*\1\s*$', re.MULTILINE)
    
    sections = []
    current_path = []  # Stack of (level, title)
    
    for match in header_pattern.finditer(content):
        level = len(match.group(1))  # Number of '=' signs
        title = match.group(2).strip()
        
        # Pop stack until we're at parent level
        while current_path and current_path[-1][0] >= level:
            current_path.pop()
        
        current_path.append((level, title))
        breadcrumb = " > ".join(t for _, t in current_path)
        
        sections.append({
            "level": level,
            "title": title,
            "breadcrumb": breadcrumb,
            "start_pos": match.start(),
            "end_pos": match.end(),
        })
    
    return sections


def get_section_context(position: int, sections: list[dict], article_title: str) -> dict:
    """Find the section context for a given character position."""
    active_section = {
        "title": "Introduction",
        "breadcrumb": "Introduction",
        "level": 1,
    }
    
    for section in sections:
        if section["start_pos"] <= position:
            active_section = section
        else:
            break
    
    return {
        "section_title": active_section["title"],
        "breadcrumb": f"{article_title} > {active_section['breadcrumb']}",
    }

print("Section hierarchy parser defined")

Section hierarchy parser defined


## Fetch Wikipedia Content

In [6]:
# Fetch Wikipedia article
loader = WikipediaLoader(query=ARTICLE_TITLE, load_max_docs=1, doc_content_chars_max=100000)
docs = loader.load()

if not docs:
    raise ValueError(f"Could not fetch article: {ARTICLE_TITLE}")

raw_content = docs[0].page_content
metadata = docs[0].metadata

# Construct source URL and provenance
source_url = f"https://en.wikipedia.org/wiki/{quote(ARTICLE_TITLE.replace(' ', '_'))}"

provenance = {
    "source_url": source_url,
    "article_title": ARTICLE_TITLE,
    "fetched_at": datetime.now().isoformat(),
    "content_length": len(raw_content),
    # Wikipedia license - standard for all Wikipedia content
    "license": "CC BY-SA 4.0",
    "license_url": "https://creativecommons.org/licenses/by-sa/4.0/",
    "attribution": "Wikipedia contributors",
}

print(f"Fetched: {ARTICLE_TITLE}")
print(f"Source URL: {source_url}")
print(f"Content length: {len(raw_content)} characters")
print(f"License: {provenance['license']}")

Fetched: Albert Einstein
Source URL: https://en.wikipedia.org/wiki/Albert_Einstein
Content length: 87959 characters
License: CC BY-SA 4.0


## Create Contextual Chunks

In [7]:
# Parse section hierarchy
sections = extract_section_hierarchy(raw_content)
print(f"Found {len(sections)} sections")

# Split into chunks
splitter = RecursiveCharacterTextSplitter(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP,
)
raw_chunks = splitter.split_text(raw_content)
print(f"Split into {len(raw_chunks)} chunks")

# Add context to each chunk
@dataclass
class ContextualChunk:
    content: str
    chunk_index: int
    total_chunks: int
    breadcrumb: str
    section_title: str
    char_start: int
    char_end: int

contextual_chunks = []
current_pos = 0

for i, chunk_text in enumerate(raw_chunks):
    # Find position in original content
    chunk_start = raw_content.find(chunk_text, current_pos)
    if chunk_start == -1:
        chunk_start = current_pos  # Fallback
    chunk_end = chunk_start + len(chunk_text)
    
    # Get section context
    context = get_section_context(chunk_start, sections, ARTICLE_TITLE)
    
    contextual_chunks.append(ContextualChunk(
        content=chunk_text,
        chunk_index=i,
        total_chunks=len(raw_chunks),
        breadcrumb=context["breadcrumb"],
        section_title=context["section_title"],
        char_start=chunk_start,
        char_end=chunk_end,
    ))
    
    current_pos = chunk_start + 1

print(f"\nChunks with context:")
for chunk in contextual_chunks[:3]:
    print(f"  Chunk {chunk.chunk_index + 1}: {chunk.breadcrumb}")

Found 71 sections
Split into 63 chunks

Chunks with context:
  Chunk 1: Albert Einstein > Introduction
  Chunk 2: Albert Einstein > Introduction
  Chunk 3: Albert Einstein > Life and career


## Initialize Entity Registry

In [8]:
# Initialize entity registry with article subject
registry = EntityRegistry(source_url=source_url)

# Pre-seed with the article subject
registry.register(
    label=ARTICLE_TITLE,
    entity_type="Person",  # Adjust based on article type
    description=f"Subject of Wikipedia article: {ARTICLE_TITLE}",
    aliases=[ARTICLE_TITLE.split()[-1]],  # Last name as alias
)

print(f"Entity registry initialized with subject: {ARTICLE_TITLE}")
print(f"Subject URI: {registry.entities[registry.normalize_key(ARTICLE_TITLE)]['uri']}")

Entity registry initialized with subject: Albert Einstein
Subject URI: https://en.wikipedia.org/wiki/Albert_Einstein#person_albert_einstein


## Prompt Templates

These prompts are embedded in the generated notebooks for transparency and adjustability.

In [9]:
FACTS_EXTRACTION_PROMPT = """You are an expert at extracting factual information from text.

Given text from a Wikipedia article, extract simple English statements that capture the consensus factual information. Each statement should:
- Be a single, clear sentence
- Contain one main fact or relationship
- Use the full name of entities on first mention
- Be verifiable from the source text
- Avoid opinions, interpretations, or hedged language

The text comes from: {source_url}
Section context: {breadcrumb}

Known entities (use consistent names):
{known_entities}

---
{chunk_text}
---

Extract factual statements as a bulleted list. Also identify any new entities (people, places, organizations, concepts, events, works) that should be added to the registry.
"""

RDF_GENERATION_PROMPT = """You are an expert at converting factual statements to RDF triples in Turtle format.

Convert the following factual statements to RDF using schema.org vocabulary where possible.

Source: {source_url}
Section: {breadcrumb}

Use these prefixes:
{prefixes}

Entity registry (use these URIs):
{entity_registry}

Guidelines:
- Use schema.org properties (schema:birthDate, schema:birthPlace, schema:worksFor, etc.)
- For relationships not in schema.org, use wiki3: prefix
- Include rdfs:label for entities
- Use xsd datatypes for dates and numbers
- Entity URIs should use the source URL as base with fragment identifiers

---
{facts}
---

Generate Turtle RDF:
"""

RDF_PREFIXES = """@prefix schema: <https://schema.org/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix wiki3: <https://wiki3.ai/vocab/> .
@base <{source_url}> .
"""

print("Prompt templates defined")

Prompt templates defined


## Generate Chunks Notebook

In [None]:
def generate_chunks_notebook(chunks: list, provenance: dict, registry: EntityRegistry, 
                             llm_config: dict, output_path: str):
    """Generate a notebook with chunked source text and context metadata.
    
    Each chunk cell is followed by a signature raw cell with its CID.
    """
    nb = new_notebook()
    
    # Cell 0: Provenance markdown
    provenance_yaml = f"""# Chunked Text: {provenance['article_title']}

## Provenance

```yaml
source_url: {provenance['source_url']}
article_title: {provenance['article_title']}
fetched_at: {provenance['fetched_at']}
content_length: {provenance['content_length']}
license: {provenance['license']}
license_url: {provenance['license_url']}
attribution: {provenance['attribution']}
chunk_size: {CHUNK_SIZE}
chunk_overlap: {CHUNK_OVERLAP}
total_chunks: {len(chunks)}
generated_by: wiki_to_kg_pipeline.ipynb
generated_at: {datetime.now().isoformat()}
```

## Processing Instructions

Each chunk below contains source text with contextual metadata. The context line (before the separator) provides:
- **Context**: Hierarchical breadcrumb showing article > section path
- **Chunk**: Position in sequence

The text below the `---` separator is the unchanged source content.
Each chunk is followed by a signature cell containing its Content ID (CID).
"""
    nb.cells.append(new_markdown_cell(provenance_yaml))
    
    # Cell 1: Entity registry (raw cell)
    nb.cells.append(new_raw_cell(registry.to_json()))
    
    # Chunk cells with signatures
    # Compute CID of raw source for provenance
    source_cid = compute_cid(provenance['source_url'] + str(provenance['content_length']))
    
    for chunk in chunks:
        # Content cell
        chunk_content = f"""**Context:** {chunk.breadcrumb}
**Chunk:** {chunk.chunk_index + 1} of {chunk.total_chunks}

---

{chunk.content}
"""
        nb.cells.append(new_markdown_cell(chunk_content))
        
        # Signature cell
        chunk_cid = compute_cid(chunk_content)
        signature = make_signature(
            cell_num=chunk.chunk_index + 1,
            cell_type="chunk",
            cid=chunk_cid,
            from_cid=source_cid
        )
        nb.cells.append(new_raw_cell(json.dumps(signature)))
    
    # Write notebook
    with open(output_path, 'w', encoding='utf-8') as f:
        nbformat.write(nb, f)
    
    return output_path

# Generate chunks notebook
article_slug = ARTICLE_TITLE.lower().replace(' ', '_')
chunks_path = os.path.join(OUTPUT_DIR, f"{article_slug}_chunks.ipynb")
generate_chunks_notebook(contextual_chunks, provenance, registry, LLM_CONFIG, chunks_path)
print(f"Generated: {chunks_path}")

Generated: data/albert_einstein_chunks.ipynb


## Generate Facts Notebook (Structure Only)

Creates the facts notebook with placeholders. Actual content is generated in the next step.

In [None]:
def generate_facts_notebook_header(provenance: dict, registry: EntityRegistry,
                                    llm_config: dict, prompt_template: str) -> nbformat.NotebookNode:
    """Generate just the header cells for facts notebook."""
    nb = new_notebook()
    
    provenance_md = f"""# Factual Statements: {provenance['article_title']}

## Provenance

```yaml
source_url: {provenance['source_url']}
article_title: {provenance['article_title']}
license: {provenance['license']}
license_url: {provenance['license_url']}
source_notebook: {article_slug}_chunks.ipynb
generated_by: wiki_to_kg_pipeline.ipynb
generated_at: {datetime.now().isoformat()}
llm_provider: {llm_config['provider']}
llm_model: {llm_config['model']}
llm_temperature: {llm_config['temperature']}
```

## Processing Instructions

This notebook contains simple English factual statements extracted from source text chunks.
Each content cell corresponds to one chunk from the source notebook.
Each content cell is followed by a signature cell with CID provenance.

To regenerate a specific cell: delete both the content cell and its signature, then re-run the pipeline.

## Prompt Template

```
{prompt_template}
```
"""
    nb.cells.append(new_markdown_cell(provenance_md))
    nb.cells.append(new_raw_cell(registry.to_json()))
    
    return nb

# Create initial facts notebook with just header (no placeholders)
facts_path = os.path.join(OUTPUT_DIR, f"{article_slug}_facts.ipynb")

# Only create if doesn't exist
if not os.path.exists(facts_path):
    facts_nb = generate_facts_notebook_header(provenance, registry, LLM_CONFIG, FACTS_EXTRACTION_PROMPT)
    with open(facts_path, 'w', encoding='utf-8') as f:
        nbformat.write(facts_nb, f)
    print(f"Created facts notebook: {facts_path}")
else:
    print(f"Facts notebook exists: {facts_path}")

Generated: data/albert_einstein_facts.ipynb


## Generate RDF Notebook (Structure Only)

Creates the RDF notebook with placeholders. Actual content is generated after facts extraction.

In [None]:
def generate_rdf_notebook_header(provenance: dict, registry: EntityRegistry,
                                  llm_config: dict, prompt_template: str, prefixes: str) -> nbformat.NotebookNode:
    """Generate just the header cells for RDF notebook."""
    nb = new_notebook()
    
    formatted_prefixes = prefixes.format(source_url=provenance['source_url'])
    provenance_md = f"""# RDF Triples: {provenance['article_title']}

## Provenance

```yaml
source_url: {provenance['source_url']}
article_title: {provenance['article_title']}
license: {provenance['license']}
license_url: {provenance['license_url']}
source_notebook: {article_slug}_facts.ipynb
generated_by: wiki_to_kg_pipeline.ipynb
generated_at: {datetime.now().isoformat()}
llm_provider: {llm_config['provider']}
llm_model: {llm_config['model']}
llm_temperature: {llm_config['temperature']}
rdf_format: Turtle
```

## RDF Prefixes

```turtle
{formatted_prefixes}
```

## Processing Instructions

This notebook contains RDF triples in Turtle format, one cell per source facts cell.
Each content cell is followed by a signature cell with CID provenance.

To regenerate a specific cell: delete both the content cell and its signature, then re-run the pipeline.

## Prompt Template

```
{prompt_template}
```
"""
    nb.cells.append(new_markdown_cell(provenance_md))
    nb.cells.append(new_raw_cell(registry.to_json()))
    
    return nb

# Create initial RDF notebook with just header (no placeholders)
rdf_path = os.path.join(OUTPUT_DIR, f"{article_slug}_rdf.ipynb")

# Only create if doesn't exist
if not os.path.exists(rdf_path):
    rdf_nb = generate_rdf_notebook_header(provenance, registry, LLM_CONFIG, RDF_GENERATION_PROMPT, RDF_PREFIXES)
    with open(rdf_path, 'w', encoding='utf-8') as f:
        nbformat.write(rdf_nb, f)
    print(f"Created RDF notebook: {rdf_path}")
else:
    print(f"RDF notebook exists: {rdf_path}")

Generated: data/albert_einstein_rdf.ipynb


## Process Chunks → Extract Facts

Run the LLM on each chunk to extract factual statements and update the facts notebook.

In [None]:
# Initialize LLM for processing
llm = ChatOpenAI(
    model=LLM_CONFIG["model"],
    temperature=LLM_CONFIG["temperature"],
    base_url=LLM_CONFIG["base_url"],
    api_key="lm-studio",
)

# Create facts extraction prompt
facts_prompt = ChatPromptTemplate.from_template("""You are an expert at extracting factual information from text.

Given text from a Wikipedia article, extract simple English statements that capture the consensus factual information. Each statement should:
- Be a single, clear sentence
- Contain one main fact or relationship
- Use the full name of entities on first mention
- Be verifiable from the source text
- Avoid opinions, interpretations, or hedged language

Source: {source_url}
Section context: {breadcrumb}

Known entities (use consistent names):
{known_entities}

---
{chunk_text}
---

Extract factual statements as a bulleted list:""")

facts_chain = facts_prompt | llm

def get_known_entities_text(registry: EntityRegistry) -> str:
    """Format known entities for prompt context."""
    lines = []
    for entity in registry.entities.values():
        lines.append(f"- {entity['label']} ({entity['type']})")
    return "\n".join(lines) if lines else "None yet"

# Read chunks notebook to get source content and CIDs
chunks_nb = nbformat.read(chunks_path, as_version=4)
chunk_signatures = extract_signatures(chunks_nb)

# Build list of chunk content with CIDs
# Chunks notebook structure: [provenance, registry, chunk1, sig1, chunk2, sig2, ...]
chunk_data = []
cell_idx = 2  # Skip provenance and registry
while cell_idx < len(chunks_nb.cells):
    cell = chunks_nb.cells[cell_idx]
    if cell.cell_type == 'markdown':
        content = cell.source
        # Get corresponding signature (next cell)
        sig = None
        if cell_idx + 1 < len(chunks_nb.cells):
            sig = parse_signature(chunks_nb.cells[cell_idx + 1].source)
        
        # Extract breadcrumb
        context_match = re.search(r'\*\*Context:\*\*\s*(.+)', content)
        breadcrumb = context_match.group(1) if context_match else "Unknown"
        
        # Extract chunk text (after ---)
        parts = content.split("---\n", 1)
        chunk_text = parts[1].strip() if len(parts) > 1 else content
        
        chunk_data.append({
            "cell_num": sig["cell"] if sig else len(chunk_data) + 1,
            "content": content,
            "chunk_text": chunk_text,
            "breadcrumb": breadcrumb,
            "cid": sig["cid"] if sig else compute_cid(content),
        })
        cell_idx += 2  # Skip content and signature
    else:
        cell_idx += 1

print(f"Found {len(chunk_data)} chunks with CIDs")

# Read existing facts notebook and extract signatures
facts_nb = nbformat.read(facts_path, as_version=4)
facts_signatures = extract_signatures(facts_nb)

print(f"Found {len(facts_signatures)} existing fact signatures")
print(f"Timeout per cell: {CELL_TIMEOUT_SECONDS}s ({CELL_TIMEOUT_SECONDS // 60} min)")

# Process each chunk
processed_count = 0
skipped_count = 0
error_count = 0

for chunk in chunk_data:
    cell_num = chunk["cell_num"]
    source_cid = chunk["cid"]
    
    # Check if we already have up-to-date facts for this chunk
    existing_sig = facts_signatures.get(cell_num)
    if existing_sig and existing_sig["from_cid"] == source_cid:
        print(f"  Chunk {cell_num}: ⊘ Up-to-date (CID match), skipping")
        skipped_count += 1
        continue
    
    # Need to generate (or regenerate) this cell
    if existing_sig:
        print(f"  Chunk {cell_num}: ↻ Source changed, regenerating...", end=" ", flush=True)
    else:
        print(f"  Chunk {cell_num}: + Generating...", end=" ", flush=True)
    
    start_time = time.time()
    
    # Call LLM to extract facts
    try:
        with timeout_context(CELL_TIMEOUT_SECONDS):
            result = facts_chain.invoke({
                "source_url": provenance["source_url"],
                "breadcrumb": chunk["breadcrumb"],
                "known_entities": get_known_entities_text(registry),
                "chunk_text": chunk["chunk_text"],
            })
        facts_content = result.content
        elapsed = time.time() - start_time
        print(f"✓ ({len(facts_content)} chars, {elapsed:.1f}s)")
        processed_count += 1
    except TimeoutException:
        facts_content = f"# Error: Timeout after {CELL_TIMEOUT_SECONDS}s"
        print(f"⏱ Timeout")
        error_count += 1
    except Exception as e:
        facts_content = f"# Error: {e}"
        elapsed = time.time() - start_time
        print(f"✗ Error after {elapsed:.1f}s: {e}")
        error_count += 1
    
    # Build the facts cell content
    facts_cell_content = f"""**Context:** {chunk['breadcrumb']}
**Chunk:** {cell_num} of {len(chunk_data)}

---

{facts_content}
"""
    facts_cid = compute_cid(facts_cell_content)
    signature = make_signature(cell_num, "facts", facts_cid, source_cid)
    
    # Find where to insert/update in the notebook
    # If there's an existing signature for this cell, find and remove old content+sig
    if existing_sig:
        # Find and remove the old cells
        new_cells = [facts_nb.cells[0], facts_nb.cells[1]]  # Keep header
        i = 2
        while i < len(facts_nb.cells):
            cell = facts_nb.cells[i]
            if cell.cell_type == 'raw':
                sig = parse_signature(cell.source)
                if sig and sig["cell"] == cell_num:
                    # Skip this signature and its preceding content cell
                    i += 1
                    continue
            # Check if this is content for the cell we're replacing
            if i > 0 and i + 1 < len(facts_nb.cells):
                next_sig = parse_signature(facts_nb.cells[i + 1].source) if facts_nb.cells[i + 1].cell_type == 'raw' else None
                if next_sig and next_sig["cell"] == cell_num:
                    i += 2  # Skip content and signature
                    continue
            new_cells.append(cell)
            i += 1
        facts_nb.cells = new_cells
    
    # Append new content and signature
    facts_nb.cells.append(new_markdown_cell(facts_cell_content))
    facts_nb.cells.append(new_raw_cell(json.dumps(signature)))
    
    # Update signatures dict
    facts_signatures[cell_num] = signature
    
    # Save notebook after each cell
    with open(facts_path, 'w', encoding='utf-8') as f:
        nbformat.write(facts_nb, f)

print(f"\nFacts extraction complete:")
print(f"  - {processed_count} generated")
print(f"  - {skipped_count} skipped (up-to-date)")
print(f"  - {error_count} errors/timeouts")

Processing 63 chunks to extract facts...
Timeout per cell: 300 seconds (5 minutes)
  Chunk 1/63... ✓ (2354 chars, 7.2s)
  Chunk 2/63... ✓ (1781 chars, 4.6s)
  Chunk 3/63... ✓ (1146 chars, 3.5s)
  Chunk 4/63... ✓ (1151 chars, 3.3s)
  Chunk 5/63... ✓ (848 chars, 2.2s)
  Chunk 6/63... ✓ (1041 chars, 3.0s)
  Chunk 7/63... ✓ (1532 chars, 4.2s)
  Chunk 8/63... ✓ (1685 chars, 6.0s)
  Chunk 9/63... ✓ (1614 chars, 4.8s)
  Chunk 10/63... ✓ (1652 chars, 4.2s)
  Chunk 11/63... ✓ (1600 chars, 5.0s)
  Chunk 12/63... ✓ (1543 chars, 4.4s)
  Chunk 13/63... ✓ (1321 chars, 4.2s)
  Chunk 14/63... ✓ (399 chars, 1.1s)
  Chunk 15/63... ✓ (1218 chars, 3.8s)
  Chunk 16/63... ✓ (1267 chars, 3.5s)
  Chunk 17/63... ✓ (1199 chars, 3.3s)
  Chunk 18/63... ✓ (821 chars, 2.6s)
  Chunk 19/63... ✓ (1195 chars, 3.4s)
  Chunk 20/63... ✓ (1367 chars, 4.9s)
  Chunk 21/63... ✓ (1949 chars, 5.5s)
  Chunk 22/63... ✓ (1358 chars, 4.1s)
  Chunk 23/63... ✓ (1256 chars, 3.5s)
  Chunk 24/63... ✓ (1081 chars, 3.5s)
  Chunk 25/63... 

In [14]:
# Summary of facts extraction (notebook already updated incrementally)
print(f"Facts notebook: {facts_path}")
print(f"  - Updated incrementally during processing")
print(f"  - Ready for review/editing before RDF generation")

Facts notebook: data/albert_einstein_facts.ipynb
  - Updated incrementally during processing
  - Ready for review/editing before RDF generation


## Process Facts → Generate RDF

Run the LLM on each facts cell to generate RDF triples and update the RDF notebook.

In [None]:
# Create RDF generation prompt
rdf_prompt = ChatPromptTemplate.from_template("""You are an expert at converting factual statements to RDF triples in Turtle format.

Convert the following factual statements to RDF triples. Use:
- schema.org vocabulary for common predicates (schema:name, schema:birthDate, etc.)
- wiki3:vocab for domain-specific predicates
- Entity URIs from the registry when available, otherwise create from source URL

Source: {source_url}
Section context: {breadcrumb}

Entity Registry (use these URIs):
{entity_registry}

Factual statements:
{facts}

Generate Turtle RDF triples (no prefixes, use full URIs):""")

rdf_chain = rdf_prompt | llm

def format_entity_registry_for_prompt(registry: EntityRegistry) -> str:
    """Format registry for RDF prompt."""
    lines = []
    for entity in registry.entities.values():
        lines.append(f"<{entity['uri']}> # {entity['label']} ({entity['type']})")
    return "\n".join(lines) if lines else "# No entities registered yet"

# Read facts notebook to get source content and CIDs
facts_nb = nbformat.read(facts_path, as_version=4)
facts_signatures = extract_signatures(facts_nb)

# Build list of facts content with CIDs
facts_data = []
cell_idx = 2  # Skip provenance and registry
while cell_idx < len(facts_nb.cells):
    cell = facts_nb.cells[cell_idx]
    if cell.cell_type == 'markdown':
        content = cell.source
        # Get corresponding signature (next cell)
        sig = None
        if cell_idx + 1 < len(facts_nb.cells):
            sig = parse_signature(facts_nb.cells[cell_idx + 1].source)
        
        # Extract breadcrumb
        context_match = re.search(r'\*\*Context:\*\*\s*(.+)', content)
        breadcrumb = context_match.group(1) if context_match else "Unknown"
        
        # Extract facts (after ---)
        parts = content.split("---\n", 1)
        facts_text = parts[1].strip() if len(parts) > 1 else content
        
        facts_data.append({
            "cell_num": sig["cell"] if sig else len(facts_data) + 1,
            "content": content,
            "facts_text": facts_text,
            "breadcrumb": breadcrumb,
            "cid": sig["cid"] if sig else compute_cid(content),
        })
        cell_idx += 2  # Skip content and signature
    else:
        cell_idx += 1

print(f"Found {len(facts_data)} facts cells with CIDs")

# Read existing RDF notebook and extract signatures
rdf_nb = nbformat.read(rdf_path, as_version=4)
rdf_signatures = extract_signatures(rdf_nb)

print(f"Found {len(rdf_signatures)} existing RDF signatures")
print(f"Timeout per cell: {CELL_TIMEOUT_SECONDS}s ({CELL_TIMEOUT_SECONDS // 60} min)")

# Process each facts cell
processed_count = 0
skipped_count = 0
error_count = 0

for facts_item in facts_data:
    cell_num = facts_item["cell_num"]
    source_cid = facts_item["cid"]
    
    # Skip cells that had errors in facts extraction
    if facts_item["facts_text"].startswith("# Error:"):
        print(f"  Facts {cell_num}: ⊘ Source had error, skipping")
        skipped_count += 1
        continue
    
    # Check if we already have up-to-date RDF for these facts
    existing_sig = rdf_signatures.get(cell_num)
    if existing_sig and existing_sig["from_cid"] == source_cid:
        print(f"  Facts {cell_num}: ⊘ Up-to-date (CID match), skipping")
        skipped_count += 1
        continue
    
    # Need to generate (or regenerate) this cell
    if existing_sig:
        print(f"  Facts {cell_num}: ↻ Source changed, regenerating...", end=" ", flush=True)
    else:
        print(f"  Facts {cell_num}: + Generating...", end=" ", flush=True)
    
    start_time = time.time()
    
    # Call LLM to generate RDF
    try:
        with timeout_context(CELL_TIMEOUT_SECONDS):
            result = rdf_chain.invoke({
                "source_url": provenance["source_url"],
                "breadcrumb": facts_item["breadcrumb"],
                "entity_registry": format_entity_registry_for_prompt(registry),
                "facts": facts_item["facts_text"],
            })
        rdf_content = result.content
        elapsed = time.time() - start_time
        print(f"✓ ({len(rdf_content)} chars, {elapsed:.1f}s)")
        processed_count += 1
    except TimeoutException:
        rdf_content = f"# Error: Timeout after {CELL_TIMEOUT_SECONDS}s"
        print(f"⏱ Timeout")
        error_count += 1
    except Exception as e:
        rdf_content = f"# Error: {e}"
        elapsed = time.time() - start_time
        print(f"✗ Error after {elapsed:.1f}s: {e}")
        error_count += 1
    
    # Build the RDF cell content
    rdf_cell_content = f"""# Context: {facts_item['breadcrumb']}
# Cell: {cell_num} of {len(facts_data)}

{rdf_content}
"""
    rdf_cid = compute_cid(rdf_cell_content)
    signature = make_signature(cell_num, "rdf", rdf_cid, source_cid)
    
    # Find where to insert/update in the notebook
    # If there's an existing signature for this cell, find and remove old content+sig
    if existing_sig:
        # Find and remove the old cells
        new_cells = [rdf_nb.cells[0], rdf_nb.cells[1]]  # Keep header
        i = 2
        while i < len(rdf_nb.cells):
            cell = rdf_nb.cells[i]
            if cell.cell_type == 'raw':
                sig = parse_signature(cell.source)
                if sig and sig["cell"] == cell_num:
                    # Skip this signature and its preceding content cell
                    i += 1
                    continue
            # Check if this is content for the cell we're replacing
            if i > 0 and i + 1 < len(rdf_nb.cells):
                next_sig = parse_signature(rdf_nb.cells[i + 1].source) if rdf_nb.cells[i + 1].cell_type == 'raw' else None
                if next_sig and next_sig["cell"] == cell_num:
                    i += 2  # Skip content and signature
                    continue
            new_cells.append(cell)
            i += 1
        rdf_nb.cells = new_cells
    
    # Append new content and signature
    rdf_nb.cells.append(new_raw_cell(rdf_cell_content))
    rdf_nb.cells.append(new_raw_cell(json.dumps(signature)))
    
    # Update signatures dict
    rdf_signatures[cell_num] = signature
    
    # Save notebook after each cell
    with open(rdf_path, 'w', encoding='utf-8') as f:
        nbformat.write(rdf_nb, f)

print(f"\nRDF generation complete:")
print(f"  - {processed_count} generated")
print(f"  - {skipped_count} skipped (up-to-date)")
print(f"  - {error_count} errors/timeouts")

Processing 63 facts cells to generate RDF...
Timeout per cell: 300 seconds (5 minutes)
  Facts cell 1/63... ✓ (4425 chars, 17.6s)
  Facts cell 2/63... 

KeyboardInterrupt: 

In [None]:
# Summary of RDF generation (notebook already updated incrementally)
print(f"RDF notebook: {rdf_path}")
print(f"  - Updated incrementally during processing")
print(f"  - Ready for review/editing before final export")

## Export Combined RDF

Combine all RDF cells into a single Turtle file with prefixes.

In [None]:
# Combine all RDF into a single Turtle file
all_triples = []

# Read the updated RDF notebook
rdf_nb = nbformat.read(rdf_path, as_version=4)

# Collect RDF from all raw cells (skip provenance, registry, and signature cells)
for cell in rdf_nb.cells[2:]:
    if cell.cell_type == 'raw':
        content = cell.source.strip()
        
        # Skip signature cells (JSON objects)
        if content.startswith('{') and '"cid"' in content:
            continue
        
        # Skip empty or error-only cells
        if not content or content.startswith('# Error:'):
            continue
        
        # Skip comment-only cells
        lines = [line for line in content.split('\n') 
                 if line.strip() and not line.strip().startswith('#')]
        if lines:
            all_triples.append(content)

# Build complete Turtle file
turtle_output = f"""# RDF Knowledge Graph: {provenance['article_title']}
# Source: {provenance['source_url']}
# License: {provenance['license']}
# Generated: {datetime.now().isoformat()}

{formatted_prefixes}

# === Triples ===

"""

turtle_output += "\n\n".join(all_triples)

# Save to file
turtle_path = os.path.join(OUTPUT_DIR, f"{article_slug}.ttl")
with open(turtle_path, 'w', encoding='utf-8') as f:
    f.write(turtle_output)

print(f"Exported RDF to: {turtle_path}")
print(f"  - {len(all_triples)} chunks of triples")
print(f"  - {len(turtle_output)} characters total")

## Save Initial Entity Registry

In [None]:
# Save entity registry to JSON file
registry_path = os.path.join(OUTPUT_DIR, "entity_registry.json")
with open(registry_path, 'w', encoding='utf-8') as f:
    f.write(registry.to_json())

print(f"Saved: {registry_path}")
print(f"\nInitial entities: {len(registry.entities)}")
for key, entity in registry.entities.items():
    print(f"  - {entity['label']} ({entity['type']}): {entity['uri']}")

Saved: data/entity_registry.json

Initial entities: 1
  - Albert Einstein (Person): https://en.wikipedia.org/wiki/Albert_Einstein#person_albert_einstein


## Pipeline Summary

In [None]:
print("="*60)
print("PIPELINE COMPLETE")
print("="*60)
print(f"\nArticle: {ARTICLE_TITLE}")
print(f"Source: {source_url}")
print(f"License: {provenance['license']}")
print(f"\nGenerated artifacts:")
print(f"  1. {chunks_path}")
print(f"     - {len(contextual_chunks)} chunks with breadcrumb context")
print(f"  2. {facts_path}")
print(f"     - {len(extracted_facts)} cells with extracted factual statements")
print(f"  3. {rdf_path}")
print(f"     - {len(generated_rdf)} cells with RDF triples")
print(f"  4. {turtle_path}")
print(f"     - Combined Turtle file for import")
print(f"\nEntity registry: {registry_path}")
print(f"\nThe intermediate notebooks can be reviewed and edited before re-export.")

PIPELINE COMPLETE

Article: Albert Einstein
Source: https://en.wikipedia.org/wiki/Albert_Einstein
License: CC BY-SA 4.0

Generated content notebooks (no Python code):
  1. data/albert_einstein_chunks.ipynb
     - 63 chunks with breadcrumb context
     - Markdown cells with unchanged source text
  2. data/albert_einstein_facts.ipynb
     - Placeholder cells for factual statements
     - Prompt template in header for LLM processing
  3. data/albert_einstein_rdf.ipynb
     - Placeholder cells for Turtle RDF
     - Prefixes and prompt in header

Entity registry: data/entity_registry.json

Workflow:
  1. Review/edit chunks notebook
  2. LLM/agent fills facts notebook from chunks
  3. Human reviews/edits facts
  4. LLM/agent fills RDF notebook from facts
  5. Human reviews/edits RDF
  6. Export final .ttl file
