# **NarrRAG** - An orchestrated RAG framework for narrative extraction from topic model output

This implementation of NarrRAG uses:

*   llama3.2 from Ollama for all LLM tasks
*   ChromaDB and BM25 retrievers
*   langChain and langGraph for orchestration
*   Pydantic for data structures

The goal of NarrRAG is to extract narratives from document clusters. It is independet of the model used to create those clusters. Also, alternative LLMs, retrievers and orchestration frameworks can be used.

### Setup and Dependencies

In [None]:
# delete later
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# ollama installer (maybe move to setup.sh script?)
! sudo apt update
! sudo apt install -y pciutils
!curl -fsSL https://ollama.com/install.sh |sh

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
[33m0% [Connecting to archive.ubuntu.com (91.189.91.83)] [Waiting for headers] [1 I[0m[33m0% [Connecting to archive.ubuntu.com (91.189.91.83)] [Waiting for headers] [Con[0m                                                                               Get:2 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
                                                                               Get:3 https://cli.github.com/packages stable InRelease [3,917 B]
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:8 https://cli.github.com/packages stable/main amd64 Packages [346 B]
Get:9 https://developer.downl

In [None]:
import threading
import subprocess
import time

def run_ollama_serve():
  subprocess.Popen(["ollama","serve"])
thread = threading.Thread(target=run_ollama_serve)

thread.start()
time.sleep(5)
!ollama pull llama3.2

[?2026h[?25l[1G[?25h[?2026l[?2026h[?25l[1G[?25h[?2026l[?2026h[?25l[1G[?25h[?2026l[?2026h[?25l[1G[?25h[?2026l[?2026h[?25l[1G[?25h[?2026l


In [None]:
!pip install langchain_community langchain-ollama chromadb torch jq rank-bm25 langgraph --upgrade langgraph --upgrade transformers

Collecting langchain_community
  Downloading langchain_community-0.3.29-py3-none-any.whl.metadata (2.9 kB)
Collecting langchain-ollama
  Downloading langchain_ollama-0.3.7-py3-none-any.whl.metadata (2.1 kB)
Collecting chromadb
  Downloading chromadb-1.0.20-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.3 kB)
Collecting jq
  Downloading jq-1.10.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.0 kB)
Collecting rank-bm25
  Downloading rank_bm25-0.2.2-py3-none-any.whl.metadata (3.2 kB)
Collecting langgraph
  Downloading langgraph-0.6.7-py3-none-any.whl.metadata (6.8 kB)
Collecting requests<3,>=2.32.5 (from langchain_community)
  Downloading requests-2.32.5-py3-none-any.whl.metadata (4.9 kB)
Collecting dataclasses-json<0.7,>=0.6.7 (from langchain_community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting ollama<1.0.0,>=0.5.3 (from langchain-ollama)
  Downloading ollama-0.5.3-py3-none-any.whl.metadata (4.3 kB)
Co

In [None]:
#!pip install "transformers==4.40.2" --force-reinstall
#!pip install --upgrade sentence-transformers

In [None]:
#!pip install -U langchain-huggingface

In [None]:
from langchain_ollama.llms import OllamaLLM
from langchain_ollama import ChatOllama

import pandas as pd
import json
from typing import List, Optional, Dict
from pydantic import BaseModel, Field
from enum import Enum
from IPython.display import Markdown
import os
import numpy as np
from pathlib import Path


from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_community.retrievers import BM25Retriever
from langchain_community.document_loaders import JSONLoader
from langchain_community.llms import Ollama

from langchain.schema import Document
from langchain.prompts import PromptTemplate
from langchain.output_parsers import PydanticOutputParser
from langchain_core.tools import tool

from langgraph.graph import StateGraph, START

### Setup Retrievers

In [None]:
CSV_PATH = "/content/drive/My Drive/narrRAG/mexico_seedtopics_merged.csv"
#CSV_PATH = "/content/drive/My Drive/narrRAG/sanbr_seedtopics_merged_tests.csv"
JSON_PATH = "/content/drive/My Drive/narrRAG/mexico_topic_keywords.json"

In [None]:
df = pd.read_csv(CSV_PATH)
df["created_parsed"] = pd.to_datetime(df["created"], errors="coerce")
df_fil = df[['Document', 'user', 'created','Topic']].dropna(subset=['Document', 'user', 'created','Topic'])

with open(JSON_PATH, "r") as f:
    topic_keywords = json.load(f)

In [None]:
from langchain_ollama import OllamaEmbeddings

embedding_model = OllamaEmbeddings(
    model="llama3.2",
)

In [None]:
chroma = Chroma(
    #persist_directory="/content/drive/My Drive/narrRAG/chroma_db",
    embedding_function=embedding_model
)

In [None]:
# Use only publisher title and published date as metadata for the chroma store, also change structure of the json

def metadata_func(record: dict, metadata: dict) -> dict:
    publisher = record.get("publisher", {})
    metadata["publisher_title"] = publisher.get("title", "")
    metadata["published_date"] = record.get("published date", "")
    return metadata

loader = JSONLoader(
    file_path="/content/drive/My Drive/narrRAG/puebla_news.json",
    jq_schema=".[]",
    content_key="description",
    metadata_func=metadata_func,
)

docs = loader.load()
print(docs[0].metadata)

{'source': '/content/drive/My Drive/narrRAG/puebla_news.json', 'seq_num': 1, 'publisher_title': "Moody's", 'published_date': 'Mon, 25 Sep 2017 07:00:00 GMT'}


In [None]:
chroma.add_documents(docs)

['31fc7757-fe82-4ffe-9b0d-80d986283f7e',
 '695ed805-f24b-48bd-ad4b-15b3e55fd5a5',
 'c44087a3-1deb-4a19-9335-b46f1fd01bda',
 'f5379060-ae05-4bf3-83dd-e09fa5ab1070',
 '98e9810c-3809-40c8-a09c-14720fd5b48a',
 '3a59121c-9968-4b44-9031-ff4cc42b561c',
 '185024e0-067d-40db-b268-fb525d17741a',
 'cf4d5380-8ca3-4d6f-838f-c760adcbb225',
 '3a8b80ea-1dde-4ddb-97cc-ffe0b3e03f0b',
 'aca32edd-8b23-4428-8227-11343a4fdbc1',
 '6f7631af-ae6c-4097-b82e-695d20d31b15',
 '4c6b5e8f-38c2-41df-873a-837480c3e087',
 'a271dfa9-fd2f-4b89-8bd4-dc266b58f58d',
 '83110d58-58fa-456f-a825-3fdf3b0c9c03',
 '7a2d74b9-52af-4023-b0ca-c589b1cbde24',
 '8800cd55-20a8-4237-9078-6356a524f850',
 '43222f4f-d8be-46ac-b91b-62e420f49b74',
 '8788af57-f110-4ec9-ab6d-470912176131',
 'f1cf9699-0053-45c3-9215-3f7f210ac8f1',
 '5a662f69-7787-468b-8a46-7b66cf872860',
 '71c2571e-8e81-4c82-a6fe-914661b9b151',
 '411f8d78-2fd3-4a38-97f7-8abd6636a2c2',
 '118ffe6f-7287-4708-a3f8-024c22e11cf4',
 '4b1c2269-3752-4aaf-9f08-4bfea87ffd5a',
 'f7aad868-d0a5-

In [None]:
# Build retriever
chroma_retriever = chroma.as_retriever(search_kwargs={"k": 50})  # retrieve top 5 docs

In [None]:
# Build bm25 retriever dict
bm25_retrievers = {}

# Make sure topic values are integers or strings, depending on your JSON keys
df_fil['Topic'] = df_fil['Topic'].astype(str)

# Loop over each topic to create a BM25 retriever
for topic_id in df_fil['Topic'].unique():
    topic_docs = df_fil[df_fil['Topic'] == topic_id]

    # Convert to LangChain Document objects
    documents = [
        Document(
            page_content=row['Document'],
            metadata={"topic": topic_id}
        )
        for _, row in topic_docs.iterrows()
    ]

    # Build BM25Retriever for this topic
    retriever = BM25Retriever.from_documents(documents)
    retriever.k = 10

    # Add to dictionary
    bm25_retrievers[topic_id] = retriever

### Setup pydantic and graph state

In [None]:
# Setup pydantic models and graph state models
class Narrative(BaseModel):
    topic_id: str = Field(description="The topic ID of the narrative.")
    actor: str = Field(description="The actor(s) of the narrative.")
    action: str = Field(description="Action that is carried out by actor(s) or other entities or individuals.")
    event: str = Field(description="The event linking the actor(s) and their action.")
    description: str = Field(description="A one sentence long description of the narrative.")

class MergedNarratives(BaseModel):
    merged_narrative: Narrative
    merged_from: List[Narrative]

class Grade(str, Enum):
    approved = "approved"
    refine = "refine"

class GradedNarrative(BaseModel):
    grade: Grade  # Enum ensures only valid values
    explanation: str

class ApprovedNarrativeWithDocs(BaseModel):
    narrative: Narrative
    documents_bm25: List[Document]
    documents_chroma: List[Document]

class GraphState(BaseModel):
    topic_id: str = Field(description="The topic ID of the narrative.")
    query: Optional[str] = None
    documents_bm25: Optional[List[Document]] = None
    documents_chroma: Optional[List[Document]] = None
    narratives: Optional[List[Narrative]] = Field(default_factory=list)
    grade_result: Optional[GradedNarrative] = None
    approved_narratives: Optional[List[ApprovedNarrativeWithDocs]] = Field(default_factory=list)
    pending_narratives_with_docs: Dict[str, ApprovedNarrativeWithDocs] = Field(default_factory=dict)
    refine_counts: Dict[str, int] = Field(default_factory=dict)

# Setup LLMs
llm = ChatOllama(model="llama3.2")
llm_struct = ChatOllama(model='llama3.2').with_structured_output(Narrative, method='json_schema')
llm_struct_merge = ChatOllama(model='llama3.2').with_structured_output(MergedNarratives, method='json_schema')
llm_grader = ChatOllama(model='llama3.2').with_structured_output(GradedNarrative, method='json_schema')

### Setup Model functions

In [None]:
# RETRIEVE
def retrieve_node(state: GraphState) -> GraphState:
    topic_id = state.topic_id
    keywords = topic_keywords[str(topic_id)]
    query = " ".join(keywords)

    docs_bm25 = bm25_retrievers[topic_id].invoke(query)
    docs_chroma = chroma_retriever.invoke(query)

    return GraphState(
        topic_id=topic_id,
        query=query,
        documents_bm25=docs_bm25,
        documents_chroma=docs_chroma,
        narratives=[]
    )

In [None]:
# EXTRACT
from pydantic import ValidationError

def is_blank_narrative(data):
    fields = ['actor', 'action', 'event', 'description']
    return all(not (data.get(field) or '').strip() for field in fields)

def extract_narrative(state: GraphState, max_attempts: int = 3) -> GraphState:
    topic_id = str(state.topic_id)
    documents_bm25 = state.documents_bm25 or []
    documents_chroma = state.documents_chroma or []
    attempt = 0
    last_error = None
    last_raw_result = None

    if state.pending_narratives_with_docs:
        updated_pending = {str(k): v for k, v in state.pending_narratives_with_docs.items()}
    else:
        updated_pending = {}

    while attempt < max_attempts:
        attempt += 1
        print(f"🔄 Extract attempt {attempt} for topic {topic_id}...")

        docs_text = "\n".join(get_page_content(doc) for doc in documents_bm25)
        combined_text = (f"""
        You are a information extraction system.
        Your task:
        From the following documents, extract ONLY the information present to fill the following JSON object:
        {{
          "actor": "",
          "action": "",
          "event": "",
          "description": ""
        }}
        Rules:
        - STRICTLY use only the information found in the provided documents.
        - Absolutely NO external knowledge, assumptions, or inferred details.
        - Your output will be discarded if it contains information not directly from the documents.
        - Do NOT copy or reuse the examples below.
        - "action" should include at least one verb.
        - "event" is the object of the action and can include nouns and noun phrases.
        - "actor" can be any entity or multiple entities (individual, group, institution, public entity, country, etc.).
        - ONLY if you cannot determine an "actor", use "user".
        - "description" must summarize the narrative in one sentence and must be consistent with "actor","action" and "event".
        - Output ONLY the JSON object, nothing else.


        DOCUMENTS:
        -------------------
        {docs_text}

        """  )


        #combined_text = "\n".join(get_page_content(doc) for doc in documents_bm25) + prompt
        result = llm_struct.invoke(combined_text)
        #combined_text = "\n".join(get_page_content(doc) for doc in documents_bm25)
        try:
            result = llm_struct.invoke(combined_text)
            print("RAW LLM result:", result)

            if isinstance(result, dict) and is_blank_narrative(result):
                print(f"⚠️ All narrative fields blank, retrying (attempt {attempt})...")
                continue

            if isinstance(result, Narrative):
                narrative = result
            elif isinstance(result, dict):
                narrative = Narrative.model_validate(result)
            else:
                narrative = Narrative.model_validate(result)

            print("Narrative before overwrite:", narrative)
            narrative.topic_id = topic_id
            print("Narrative after overwrite:", narrative)

            narrative_with_docs = ApprovedNarrativeWithDocs(
                narrative=narrative,
                documents_bm25=documents_bm25,
                documents_chroma=documents_chroma
            )

            updated_pending[topic_id] = narrative_with_docs

            return state.model_copy(update={
                "pending_narratives_with_docs": updated_pending
            })

        except ValidationError as ve:
            print(f"❌ Validation error parsing Narrative (attempt {attempt}): {ve}")
            print("Raw LLM output:", result)
            last_error = ve
            last_raw_result = result

        except Exception as e:
            print(f"❌ Unexpected error during extraction (attempt {attempt}): {e}")
            last_error = e
            last_raw_result = result

    print(f"❌ Failed to extract valid Narrative after {max_attempts} attempts. Last error: {last_error}")
    # Fallback: use model_construct to create a partial Narrative and pass it on
    if last_raw_result:
        # fill missing fields with empty strings
        if isinstance(last_raw_result, dict):
            narrative_data = {
                "topic_id": topic_id,
                "actor": last_raw_result.get("actor", ""),
                "action": last_raw_result.get("action", ""),
                "event": last_raw_result.get("event", ""),
                "description": last_raw_result.get("description", "")
            }
            narrative = Narrative.model_construct(**narrative_data)
        elif isinstance(last_raw_result, Narrative):
            narrative = last_raw_result
            narrative.topic_id = topic_id
        else:
            narrative = Narrative.model_construct(
                topic_id=topic_id, actor="", action="", event="", description=""
            )

        narrative_with_docs = ApprovedNarrativeWithDocs(
            narrative=narrative,
            documents_bm25=documents_bm25,
            documents_chroma=documents_chroma
        )
        updated_pending[topic_id] = narrative_with_docs
        return state.model_copy(update={
            "pending_narratives_with_docs": updated_pending
        })
    return state

In [None]:
# GRADE - NEW INDENTATION
MAX_REFINES = 100

def auto_grade_if_incomplete(narrative: Narrative) -> Optional[GradedNarrative]:
    required_fields = ["actor", "action", "event", "description"]
    missing = [field for field in required_fields if not getattr(narrative, field, "").strip()]

    if missing:
        return GradedNarrative(
            grade=Grade.refine,
            explanation=f"Missing or empty fields: {', '.join(missing)}"
        )

    return None


def grade_narrative(state: GraphState) -> GraphState:
    if not state.pending_narratives_with_docs:
        print("⚠️ No pending narratives to grade.")
        return state.model_copy(update={
            "pending_narratives_with_docs": {},
            "grade_result": None
        })

    pending = state.pending_narratives_with_docs

    # Ensure topic_key is str for consistency
    topic_key = str(state.topic_id)
    if not isinstance(pending, dict) or topic_key not in pending:
        print(f"❌ Invalid or missing pending narrative for topic {topic_key}: {pending}")
        return state.model_copy(update={
            "pending_narratives_with_docs": pending if isinstance(pending, dict) else {},
            "grade_result": None
        })

    narrative_with_docs = pending[topic_key]
    narrative = narrative_with_docs.narrative

    # Log narrative fields, even if incomplete (for debugging)
    print("📝 Narrative to grade (may be partial):", narrative)

    docs_combined = (
        (narrative_with_docs.documents_chroma or []) +
        (narrative_with_docs.documents_bm25 or [])
    )

    context = "\n\n".join([get_page_content(doc) for doc in docs_combined])

    # Check for missing fields before calling the LLM
    graded = auto_grade_if_incomplete(narrative)
    if graded:
        print(f"⚠️ Narrative is incomplete. Auto-graded as 'refine': {graded.explanation}")
    else:
        try:
            prompt = f"""
You are a narrative fact-checker. Your task is to analyze a narrative in the context of supporting documents and determine if it is consistent.

### Rules for Grading

Start by assuming the narrative is **approved**. Change it to **refine** only if:

1. The narrative **contradicts** the context (i.e. directly conflicts).
2. The narrative includes hallucinations (i.e. facts not present in the context).

✅ Approve if:
- The narrative is CONSISTENT with the context.
- The narrative does not contradict the context (i.e. tells the opposite).
- Approximate matches exist (e.g. "America" ≈ "US").
- The actor is "user" (this is always valid and must be **approved** if other fields are valid).

🧠 Do NOT:
- Guess or invent information.
- Consider grammar, tone, or style.
- Penalize narratives that are vague but not contradictory.

Use the GradedNarrative schema with fields:
- grade: Either 'approved' or 'refine'
- explanation: A short explanation for the decision.

Context:
{context}

Narrative:
{narrative}
"""
            graded_raw = llm_grader.invoke(prompt)
            print("✅ Grading result:", graded_raw, flush=True)
            graded = (
                graded_raw if isinstance(graded_raw, GradedNarrative)
                else GradedNarrative.model_validate(graded_raw)
            )
        except Exception as e:
            print(f"⚠️ Could not parse grading result into GradedNarrative: {e}", flush=True)
            graded = None

    # Copy and prep state data
    approved_narratives = list(getattr(state, "approved_narratives", []))
    refine_counts = dict(getattr(state, "refine_counts", {}))
    pending_narratives_with_docs = dict(pending)
    refine_count = refine_counts.get(topic_key, 0)

    if graded and graded.grade == Grade.approved:
        approved_narratives.append(narrative_with_docs)
        refine_counts.pop(topic_key, None)
        pending_narratives_with_docs.pop(topic_key, None)

    elif graded and graded.grade == Grade.refine:
        refine_count += 1
        if refine_count >= MAX_REFINES:
            print(f"⚠️ Max refine attempts reached for topic {topic_key}, approving narrative.")
            approved_narratives.append(narrative_with_docs)
            refine_counts.pop(topic_key, None)
            pending_narratives_with_docs.pop(topic_key, None)
        else:
            refine_counts[topic_key] = refine_count

    else:
        # Invalid or missing grade — still increment refine count
        refine_count += 1
        print(f"⚠️ Grading failed or incomplete, incrementing refine_count: {refine_count} for topic {topic_key}")
        if refine_count >= MAX_REFINES:
            print(f"⚠️ Max refine attempts reached (fallback) for topic {topic_key}, approving narrative.")
            approved_narratives.append(narrative_with_docs)
            refine_counts.pop(topic_key, None)
            pending_narratives_with_docs.pop(topic_key, None)
        else:
            refine_counts[topic_key] = refine_count

    return state.model_copy(update={
        "approved_narratives": approved_narratives,
        "refine_counts": refine_counts,
        "grade_result": graded,
        "pending_narratives_with_docs": pending_narratives_with_docs
    })


In [None]:
from pydantic import ValidationError

def refine_narrative(state: GraphState) -> GraphState:
    topic_id = str(state.topic_id)
    documents_bm25 = state.documents_bm25 or []
    documents_chroma = state.documents_chroma or []

    # 🔍 Include grading explanation if the last grade was 'refine'
    explanation_text = ""
    if state.grade_result and state.grade_result.grade == Grade.refine:
        reason = state.grade_result.explanation.strip()
        if reason:
            explanation_text = (
                "Note: In the previous attempt, the narrative was marked for refinement because:\n"
                f"\"{reason}\"\n\n"
            )

    # 🧠 Construct prompt with explanation and extraction instructions
    docs_text = "\n".join(get_page_content(doc) for doc in documents_bm25)
    combined_text = (f"""
        You are a information extraction system.
        Your task:
        From the following documents, extract ONLY the information present to fill the following JSON object:
        {{
          "actor": "",
          "action": "",
          "event": "",
          "description": ""
        }}
        Rules:
        - STRICTLY use only the information found in the provided documents.
        - Absolutely NO external knowledge, assumptions, or inferred details.
        - Your output will be discarded if it contains information not directly from the documents.
        - Do NOT copy or reuse the examples below.
        - "action" should include at least one verb.
        - "event" is the object of the action and can include nouns and noun phrases.
        - "actor" can be any entity or multiple entities (individual, group, institution, public entity, country, etc.).
        - ONLY if you cannot determine an actor, use "user".
        - "description" must summarize the narrative in one sentence and must be consistent with "actor","action" and "event".
        - Output ONLY the JSON object, nothing else.


        DOCUMENTS:
        -------------------
        {docs_text}

        """  )




    #combined_text = prompt + "\n".join(get_page_content(doc) for doc in documents_bm25)

    last_raw_result = None
    try:
        result = llm_struct.invoke(combined_text)
        print("RAW LLM result (refine):", result)

        if isinstance(result, Narrative):
            narrative = result
        elif isinstance(result, dict):
            narrative = Narrative.model_validate(result)
        else:
            narrative = Narrative.model_validate(result)

        print("Narrative before overwrite:", narrative)
        narrative.topic_id = topic_id
        print("Narrative after overwrite:", narrative)

    except ValidationError as ve:
        print(f"❌ Validation error parsing Narrative (refine): {ve}")
        print("Raw LLM output:", result)
        last_raw_result = result
        if isinstance(last_raw_result, dict):
            narrative_data = {
                "topic_id": topic_id,
                "actor": last_raw_result.get("actor", ""),
                "action": last_raw_result.get("action", ""),
                "event": last_raw_result.get("event", ""),
                "description": last_raw_result.get("description", "")
            }
            narrative = Narrative.model_construct(**narrative_data)
        else:
            narrative = Narrative.model_construct(
                topic_id=topic_id, actor="", action="", event="", description=""
            )

    except Exception as e:
        print(f"❌ Unexpected error during refinement: {e}")
        narrative = Narrative.model_construct(
            topic_id=topic_id, actor="", action="", event="", description=""
        )

    narrative_with_docs = ApprovedNarrativeWithDocs(
        narrative=narrative,
        documents_bm25=documents_bm25,
        documents_chroma=documents_chroma
    )

    return state.model_copy(update={
        "pending_narratives_with_docs": {topic_id: narrative_with_docs}
    })


### Execution Invokation of Models

In [None]:
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig

def build_graph():
    builder = StateGraph(GraphState)

    builder.add_node("retrieve", retrieve_node)
    builder.add_node("extract", extract_narrative)
    builder.add_node("refine", refine_narrative)
    builder.add_node("grade", grade_narrative)

    builder.set_entry_point("retrieve")

    # Linear flow
    builder.add_edge("retrieve", "extract")
    builder.add_edge("extract", "grade")
    builder.add_edge("refine", "grade")

    # Conditional routing after grading
    def route_after_grading(state: GraphState):
        grade_result = state.grade_result
        topic_key = str(state.topic_id)
        pending = state.pending_narratives_with_docs

        # If narrative is approved, or no longer pending (force-approved or otherwise), we're done
        if (grade_result and grade_result.grade == Grade.approved) or \
           (not pending or topic_key not in pending):
            print(f"🎉 Narrative approved or force-approved for topic {state.topic_id}.")
            return END

        # Otherwise, keep refining
        print(f"🔁 Refining narrative for topic {state.topic_id}...")
        return "refine"

    builder.add_conditional_edges("grade", route_after_grading, {
        "refine": "refine",
        END: END
    })

    return builder.compile()

In [None]:
from pydantic import BaseModel
import traceback
from pathlib import Path
import json
import numpy as np
from pydantic import TypeAdapter

def safe_model_dump(obj, _seen=None):
    if _seen is None:
        _seen = set()
    obj_id = id(obj)
    if obj_id in _seen:
        return None  # safer than injecting invalid strings
    _seen.add(obj_id)

    if isinstance(obj, BaseModel):
        return safe_model_dump(obj.model_dump(mode="python", by_alias=False), _seen)
    elif isinstance(obj, dict):
        return {k: safe_model_dump(v, _seen) for k, v in obj.items() if v is not None}
    elif isinstance(obj, list):
        return [safe_model_dump(i, _seen) for i in obj if i is not None]
    else:
        return obj


def convert_numpy_types(obj, _seen=None):
    if _seen is None:
        _seen = set()
    obj_id = id(obj)
    if obj_id in _seen:
        return None
    _seen.add(obj_id)

    if isinstance(obj, dict):
        return {k: convert_numpy_types(v, _seen) for k, v in obj.items() if v is not None}
    elif isinstance(obj, list):
        return [convert_numpy_types(i, _seen) for i in obj if i is not None]
    elif isinstance(obj, np.integer):
        return int(obj)
    elif isinstance(obj, np.floating):
        return float(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, np.generic):
        return obj.item()
    else:
        return obj


def get_page_content(doc):
    if hasattr(doc, "page_content"):
        return doc.page_content
    elif isinstance(doc, dict) and "page_content" in doc:
        return doc["page_content"]
    elif hasattr(doc, "get") and callable(doc.get):
        return doc.get("page_content", str(doc))
    else:
        return str(doc)

def run_narrative_extraction(topic_keywords: dict, output_dir: Path):
    from langgraph.graph import END
    topic_keywords = {str(k): v for k, v in topic_keywords.items()}
    output_dir.mkdir(exist_ok=True)
    graph = build_graph()
    all_approved_narratives = []
    topic_results = {}

    for topic_id, keywords in topic_keywords.items():
        try:
            print(f"\n🚀 Processing topic {topic_id}...")

            initial_state = GraphState(
                topic_id=topic_id,
                query=" ".join(keywords),
                pending_narratives_with_docs={},  # important for grading step
            )

            final_state = graph.invoke(initial_state, {"recursion_limit": 500})

            # Ensure final_state is a valid GraphState instance
            if not isinstance(final_state, GraphState):
                if isinstance(final_state, BaseModel):
                    raw_state = final_state.model_dump(mode="python", by_alias=False)
                elif isinstance(final_state, dict):
                    raw_state = final_state
                else:
                    raise TypeError(f"Unexpected type for final_state: {type(final_state)}")

                final_state = TypeAdapter(GraphState).validate_python(raw_state)


            # Step 3: Extract narratives
            approved_narratives = final_state.approved_narratives or []

            # Step 4: JSON-safe output (detect circular refs only at this stage)
            try:
                result_dict = safe_model_dump(final_state)        # Handles nested BaseModels
                result_dict = convert_numpy_types(result_dict)    # Handles NumPy types
                with open(output_dir / f"topic_{topic_id}.json", "w") as f:
                    json.dump(result_dict, f, indent=2)
            except Exception as serialization_error:
                print(f"⚠️ Failed to serialize topic {topic_id}: {serialization_error}")
                traceback.print_exc()



            topic_results[topic_id] = {
                "approved_narratives": approved_narratives,
                "final_state": final_state
            }
            all_approved_narratives.extend(approved_narratives)
            print(f"✅ Topic {topic_id} done. {len(approved_narratives)} narrative(s) approved.")

        except Exception as e:
            print(f"❌ Error processing topic {topic_id}: {e}")
            traceback.print_exc()

    # Save all approved narratives globally
    approved_path = output_dir / "approved_narratives_global.json"
    with open(approved_path, "w") as f:
        json.dump(convert_numpy_types([
            safe_model_dump(n) for n in all_approved_narratives
        ]), f, indent=2)
    print(f"📋 Saved {len(all_approved_narratives)} total approved narratives.")

    return all_approved_narratives, topic_results

output_dir = Path("/content/drive/MyDrive/narrRAG/mexico_RAG_results")
approved, results = run_narrative_extraction(topic_keywords, output_dir)


[1;30;43mDie letzten 5000 Zeilen der Streamingausgabe wurden abgeschnitten.[0m
📝 Narrative to grade (may be partial): topic_id='52' actor='Nina Bonina Brown' action='blames, explains, says' event='Mexico City Earthquake' description='Nina Bonina Brown blames Mexico City for the earthquake she attributes to karma.'
✅ Grading result: grade=<Grade.refine: 'refine'> explanation="The narrative contradicts the context. The event is a natural disaster (earthquake), while Nina Bonina Brown's explanation attributes it to 'karma' or 'twisted logic'."
🔁 Refining narrative for topic 52...
RAW LLM result (refine): topic_id="Yeah, 'karma' is a great explanation." actor='Nina Bonina Brown' action='blames' event='Mexico City Earthquake' description='Nina Bonina Brown blames the Mexico City earthquake on Valentina Trolls as karma.'
Narrative before overwrite: topic_id="Yeah, 'karma' is a great explanation." actor='Nina Bonina Brown' action='blames' event='Mexico City Earthquake' description='Nina Bon

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/langchain_core/output_parsers/pydantic.py", line 28, in _parse_obj
    return self.pydantic_object.model_validate(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/pydantic/main.py", line 705, in model_validate
    return cls.__pydantic_validator__.validate_python(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.ValidationError: 4 validation errors for Narrative
actor
  Field required [type=missing, input_value={'topic_id': '}}<|python_tag|>assistant'}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.11/v/missing
action
  Field required [type=missing, input_value={'topic_id': '}}<|python_tag|>assistant'}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.11/v/missing
event
  Field required [type=missing, input_value={'topic_id': '}}<|python_tag|>assist

[1;30;43mDie letzten 5000 Zeilen der Streamingausgabe wurden abgeschnitten.[0m
🔁 Refining narrative for topic 336...
RAW LLM result (refine): topic_id='MX' actor='mexico' action='rescinds, withdraws, withdraws' event='Harvey aid offer, earthquake recovery' description='Mexico rescinds Harvey aid offer to focus on earthquake recovery after a powerful earthquake devastates the country.'
Narrative before overwrite: topic_id='MX' actor='mexico' action='rescinds, withdraws, withdraws' event='Harvey aid offer, earthquake recovery' description='Mexico rescinds Harvey aid offer to focus on earthquake recovery after a powerful earthquake devastates the country.'
Narrative after overwrite: topic_id='336' actor='mexico' action='rescinds, withdraws, withdraws' event='Harvey aid offer, earthquake recovery' description='Mexico rescinds Harvey aid offer to focus on earthquake recovery after a powerful earthquake devastates the country.'
📝 Narrative to grade (may be partial): topic_id='336' actor='m

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/langchain_core/output_parsers/pydantic.py", line 28, in _parse_obj
    return self.pydantic_object.model_validate(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/pydantic/main.py", line 705, in model_validate
    return cls.__pydantic_validator__.validate_python(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.ValidationError: 4 validation errors for Narrative
actor
  Field required [type=missing, input_value={'topic_id': '}}<|python_tag|>'}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.11/v/missing
action
  Field required [type=missing, input_value={'topic_id': '}}<|python_tag|>'}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.11/v/missing
event
  Field required [type=missing, input_value={'topic_id': '}}<|python_tag|>'}, input_type=dict]
   

RAW LLM result: topic_id='https://twitter.com/ramsNFL?lang=en' actor='user' action='streaming, hits' event='off coast of Mexico' description='User streams breaking news about an earthquake off the coast of Mexico'
Narrative before overwrite: topic_id='https://twitter.com/ramsNFL?lang=en' actor='user' action='streaming, hits' event='off coast of Mexico' description='User streams breaking news about an earthquake off the coast of Mexico'
Narrative after overwrite: topic_id='627' actor='user' action='streaming, hits' event='off coast of Mexico' description='User streams breaking news about an earthquake off the coast of Mexico'
📝 Narrative to grade (may be partial): topic_id='627' actor='user' action='streaming, hits' event='off coast of Mexico' description='User streams breaking news about an earthquake off the coast of Mexico'
✅ Grading result: grade=<Grade.approved: 'approved'> explanation="The narrative is consistent with the context and does not contradict it. It provides a specific 

In [None]:
import threading
import subprocess
import time

def run_ollama_serve():
  subprocess.Popen(["ollama","serve"])
thread = threading.Thread(target=run_ollama_serve)

thread.start()
time.sleep(5)

!ollama pull llama3.2

[?2026h[?25l[1G[?25h[?2026l[?2026h[?25l[1G[?25h[?2026l[?2026h[?25l[1G[?25h[?2026l[?2026h[?25l[1G[?25h[?2026l[?2026h[?25l[1G[?25h[?2026l[?2026h[?25l[1G[?25h[?2026l


Latest functions work, BUT  almost always the actor is user, which should be adjusted in the prompt of grade_narrative, also sometimes change or description is missing, overall --> adjust the prompt for grade narrative and make sure the function does not alter the data from the previous nodes.


### Inspect output files

In [None]:
import json

with open('/content/drive/My Drive/narrRAG/sanbr_RAG_results/deduplicated_global.json', 'r') as f:
    data = json.load(f)

In [None]:
print(json.dumps(data, indent=2, ensure_ascii=False))

{
  "topic_id": "-1",
  "query": "None",
  "documents_bm25": [],
  "documents_chroma": [],
  "narratives": [
    {
      "actor": "police",
      "action": "confirm",
      "change": "active shooting",
      "description": "Police confirm the incident is ongoing and have not confirmed the number of fatalities or injuries."
    }
  ],
  "grade_result": "None",
  "merged_from": [
    [
      {
        "actor": "Additional Resources",
        "action": "For those affected by the YouTube Headquarters shooting, the following resources are available:",
        "change": "Additional Resources",
        "description": "The National Alliance on Mental Illness (NAMI) Helpline is available at 1-800-950-NAMI (6264). The Crisis Text Line can be reached by texting HOME to 741741. The Substance Abuse and Mental Health Services Administration (SAMHSA) National Helpline can be reached at 1-800-662-HELP (4357)"
      },
      {
        "actor": "female",
        "action": "dead",
        "change": "",
 

In [None]:
import json
import csv
import os

# Path to the JSON file
json_path = '/content/drive/My Drive/narrRAG/sanbr_RAG_results/deduplicated_global.json'

# Load JSON
with open(json_path, 'r', encoding='utf-8') as f:
    data = json.load(f)

# Choose which narratives to export, here we use "all_narratives"
narratives = data.get("all_narratives", [])

# Prepare CSV file path (same directory as JSON)
json_dir = os.path.dirname(json_path)
csv_file = os.path.join(json_dir, 'narratives.csv')

fields = ['actor', 'action', 'change', 'description']

with open(csv_file, 'w', newline='', encoding='utf-8') as f:
    writer = csv.DictWriter(f, fieldnames=fields)
    writer.writeheader()
    for n in narratives:
        # If the fields are missing, default to empty string
        row = {k: n.get(k, "") for k in fields}
        writer.writerow(row)

print(f"CSV written to {csv_file}")

CSV written to /content/drive/My Drive/narrRAG/sanbr_RAG_results/narratives.csv


In [None]:
# create test file for faster tests
import pandas as pd

from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
CSV_PATH = "/content/drive/My Drive/narrRAG/sanbr_seedtopics_merged.csv"
df = pd.read_csv(CSV_PATH)


In [None]:
# Filter the DataFrame to keep only rows with Topic in [1,2,3,4,5,6,7,8]
filtered_df = df[df['Topic'].isin([1,2,3,4,5,6])]

# Save to the specified path
output_path = '/content/drive/My Drive/narrRAG/sanbr_seedtopics_merged_tests.csv'
filtered_df.to_csv(output_path, index=False)

In [None]:
#V3 repaired graph state (works on approved narratives now)

def deduplicate(state: GraphState) -> GraphState:
    """
    Deduplicate similar narratives in the provided state.approved_narratives.
    Returns a new GraphState with merged/clustered narratives.
    """
    approved = state.approved_narratives or []
    narratives = [awd.narrative for awd in approved]

    if len(narratives) > 1:
        merged_text = "\n\n".join(
            f"Narrative {i+1}:\nActor: {n.actor}\nAction: {n.action}\nChange: {n.change}\nDescription: {n.description}"
            for i, n in enumerate(narratives)
        )

        prompt = (
            "You will receive a list of narratives. Your task is to group together only those narratives that are very similar "
            "(i.e., describe the same event or fact, even if phrased differently), and merge each group into a single cohesive narrative. "
            "Narratives that are about different events, actors, or facts should not be merged—they should remain as separate narratives. "
            "Return a list of merged narratives, where each element uses the MergedNarratives schema. "
            "If a narrative is unique, return it as a merged narrative with only itself in 'merged_from'.\n\n"
            f"{merged_text}"
        )

        # LLM returns a list of merged narratives (MergedNarratives model)
        merged_results = llm_struct_merge.invoke(prompt)

        if isinstance(merged_results, MergedNarratives):
            merged_results = [merged_results]
        elif not isinstance(merged_results, list):
            raise ValueError("Expected a list of MergedNarratives from LLM.")

        # If you want to aggregate docs for each merged narrative, do it here:
        desc_to_awd = {awd.narrative.description: awd for awd in approved}
        deduped_approved = []
        for mn in merged_results:
            # Aggregate docs from all merged originals
            bm25, chroma = [], []
            for orig in mn.merged_from:
                awd = desc_to_awd.get(orig.description)
                if awd:
                    bm25.extend(awd.documents_bm25 or [])
                    chroma.extend(awd.documents_chroma or [])
            deduped_approved.append(
                ApprovedNarrativeWithDocs(
                    narrative=mn.merged_narrative,
                    documents_bm25=bm25,
                    documents_chroma=chroma
                )
            )

        # Return a new state with deduplicated approved narratives
        return state.model_copy(update={
            "approved_narratives": deduped_approved,
            # Optionally: "narratives": [mn.merged_narrative for mn in merged_results],
            # Optionally: "merged_from": [mn.merged_from for mn in merged_results],
        })

    return state