# © Artur Czarnecki. All rights reserved.
# Integrax framework – proprietary and confidential.
# Use, modification, or distribution without written permission is prohibited.

In [1]:
import sys, os
sys.path.insert(0, os.path.abspath(os.path.join(os.getcwd(), "..", "..")))

# Hybrid Multi-Source RAG with Intergrax + LangGraph

This notebook demonstrates a **practical, end-to-end RAG workflow** that combines multiple knowledge sources into a single in-memory vector index and exposes it through a LangGraph-based agent.

We will use **Intergrax** components together with **LangGraph** to:

1. **Ingest content from multiple sources:**
   - Local PDF files (from a given directory),
   - Local DOCX/Word files (from a given directory),
   - Live web results using the Intergrax `WebSearchExecutor`.

2. **Build a unified RAG corpus:**
   - Normalize all documents into a common internal format,
   - (Optionally) attach basic metadata about origin (pdf / docx / web),
   - Split documents into chunks suitable for embedding.

3. **Create an in-memory vector index:**
   - Use an Intergrax embedding manager (e.g. OpenAI / Ollama),
   - Store embeddings in an **in-memory Chroma** collection via Intergrax vectorstore manager,
   - Keep everything ephemeral (no persistence, perfect for “ad-hoc research” scenarios).

4. **Answer user questions with a RAG agent:**
   - The user provides a natural language question,
   - LangGraph orchestrates the flow: load → merge → index → retrieve → answer,
   - An Intergrax `RagAnswerer` (or `WindowedAnswerer`) generates a **single, structured report**:
     - short summary of the relevant information,
     - key insights and conclusions,
     - optionally: recommendations / action items.

---

## What this notebook showcases

- How to combine **local files + web search** in a single RAG pipeline.
- How to plug Intergrax components (loaders, splitter, embeddings, vectorstore, RAG answerer, websearch) into a **LangGraph `StateGraph`**.
- How to build a **temporary, in-memory knowledge graph** for one-off research tasks (no database setup required).
- A clean, production-oriented pattern that can be reused in:
  - internal knowledge explorers,
  - “research bot” agents,
  - prototype assistants for teams or clients.

All code and comments in this notebook are in **English** to keep it ready for public documentation, GitHub examples, and international collaborators.


## 1. Environment and configuration

In this section we prepare the environment for the hybrid multi-source RAG agent.

The goals of this step:

- Load all required configuration values (API keys, base paths, model names) from environment variables or a `.env` file.
- Import the core building blocks from:
  - the Intergrax framework (LLM adapter, embedding manager, vectorstore manager, document loaders, RAG answerer, websearch executor),
  - LangGraph (for defining and running the `StateGraph`),
  - Standard Python modules (`os`, `pathlib`, `typing`, etc.).
- Define the base directories for local documents:
  - one directory for PDF files (e.g. `./data/pdf`),
  - one directory for DOCX files (e.g. `./data/docx`).
- Decide on the core RAG parameters:
  - chunk size and overlap for splitting documents,
  - embedding model name,
  - LLM model name used by the answerer,
  - number of retrieved documents (`top_k`) during similarity search.
- Initialize the core Intergrax components that will be reused across the notebook:
  - an embedding manager (e.g. OpenAI-based or Ollama-based),
  - a vectorstore manager backed by an **in-memory** Chroma collection (no persistence),
  - an LLM adapter used by the RAG answerer,
  - a simple text splitter for turning documents into chunks.

At the end of this section we want to have a small configuration block that:

1. Reads configuration (API keys, paths, model names),
2. Instantiates the main Intergrax services (embeddings, vectorstore, LLM adapter, text splitter),
3. Is easy to adjust for different environments (OpenAI vs local models, different directories, different Chroma settings).

In [None]:
from pathlib import Path
import os

from intergrax.globals.settings import GLOBAL_SETTINGS
from intergrax.rag.documents_loader import DocumentsLoader
from intergrax.rag.documents_splitter import DocumentsSplitter
from intergrax.rag.embedding_manager import EmbeddingManager
from intergrax.rag.vectorstore_manager import VectorstoreManager, VSConfig

import intergrax.logging  # initializes logging format/levels for the framework

# ---- Tenant / corpus configuration (for metadata + filtering) ----
TENANT = "intergrax"
CORPUS = "hybrid-multi-source"
VERSION = "v1"

# ---- Base directories for local documents ----
# You can adjust these to your actual layout.
# For the hybrid demo we assume:
#   ../documents/hybrid-corpus/pdf
#   ../documents/hybrid-corpus/docx
BASE_DOCS_DIR = Path("../documents/hybrid-corpus")
PDF_DIR = BASE_DOCS_DIR / "pdf"
DOCX_DIR = BASE_DOCS_DIR / "docx"

# ---- Core RAG parameters ----
CHUNK_SIZE = 800
CHUNK_OVERLAP = 150
TOP_K = 8

# Embedding model configuration (using your existing Ollama-based setup)
EMBED_PROVIDER = "ollama"
EMBED_MODEL_NAME = GLOBAL_SETTINGS.default_ollama_embed_model
EMBED_DIM = 1536  # assumed dimension for this model

# Vectorstore configuration
# For *ephemeral* usage you can set `chroma_persist_directory=None`
# or point it to a throwaway path. For now we keep a dedicated collection.
VS_COLLECTION_NAME = "hybrid_multi_source_rag"
VS_PERSIST_DIR = None  # set to e.g. "chroma_db/hybrid_multi_source_rag_v1" if you want persistence

# ---- Instantiate core components ----

# Loader and splitter (used later to build the hybrid corpus)
doc_loader = DocumentsLoader(
    verbose=True,
    # docx_mode="paragraphs" lets you load Word files in finer-grained segments
    docx_mode="paragraphs",
)

splitter = DocumentsSplitter(
    verbose=True,
    # Note: the splitter currently takes its chunking config from inside the class;
    # if you expose chunk_size/overlap in the future, you can wire CHUNK_SIZE here.
)

# Embedding manager (Ollama-based embeddings)
embed_manager = EmbeddingManager(
    verbose=True,
    provider=EMBED_PROVIDER,
    model_name=EMBED_MODEL_NAME,
    assume_ollama_dim=EMBED_DIM,
)

# Vectorstore manager (Chroma backend)
vs_config = VSConfig(
    provider="chroma",
    collection_name=VS_COLLECTION_NAME,
    chroma_persist_directory=VS_PERSIST_DIR,
)

vectorstore = VectorstoreManager(
    config=vs_config,
    verbose=True,
)

os.makedirs(PDF_DIR, exist_ok=True)
os.makedirs(DOCX_DIR, exist_ok=True)


print("Environment initialized.")
print(f"TENANT={TENANT}, CORPUS={CORPUS}, VERSION={VERSION}")
print(f"PDF_DIR={PDF_DIR}")
print(f"DOCX_DIR={DOCX_DIR}")
print(f"Vectorstore collection={VS_COLLECTION_NAME}, persist={VS_PERSIST_DIR}")

2025-11-18 16:28:52,641 [INFO] [intergraxEmbeddingManager] Loading model 'rjmalagon/gte-qwen2-1.5b-instruct-embed-f16:latest' (provider=ollama)
2025-11-18 16:28:53,175 [INFO] HTTP Request: POST http://127.0.0.1:11434/api/embed "HTTP/1.1 200 OK"
2025-11-18 16:28:53,177 [INFO] [intergraxEmbeddingManager] Loaded. Embedding dim = 1536


[intergraxVectorstoreManager] Initialized provider=chroma, collection=hybrid_multi_source_rag
[intergraxVectorstoreManager] Existing count: 0
Environment initialized.
TENANT=intergrax, CORPUS=hybrid-multi-source, VERSION=v1
PDF_DIR=..\documents\hybrid-corpus\pdf
DOCX_DIR=..\documents\hybrid-corpus\docx
Vectorstore collection=hybrid_multi_source_rag, persist=None


## 2. Web search setup (Intergrax WebSearchExecutor)

In this section we configure the **web search layer** that will provide live web documents as one of the sources for the hybrid RAG corpus.

We:

- Load API keys from the environment,
- Initialize:
  - `OpenAIChatResponsesAdapter` (LLM used by web search),
  - `WebSearchExecutor` with `GoogleCSEProvider`,
  - `WebSearchContextBuilder` for building condensed context from web docs,
  - (optionally) `WebSearchAnswerer` for standalone web-only QA,
- Provide a small async helper function that returns **serialized web documents** (plain dicts) ready to be merged with local PDF/DOCX documents later in the notebook.

In [4]:

from typing import List, Dict, Any
import os

from dotenv import load_dotenv
from openai import Client

from intergrax.llm_adapters import OpenAIChatResponsesAdapter
from intergrax.websearch.service.websearch_executor import WebSearchExecutor
from intergrax.websearch.context.websearch_context_builder import WebSearchContextBuilder
from intergrax.websearch.service.websearch_answerer import WebSearchAnswerer
from intergrax.websearch.providers.google_cse_provider import GoogleCSEProvider

load_dotenv()

# --- Environment variables for OpenAI + Google CSE ---

os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "")
os.environ["GOOGLE_CSE_API_KEY"] = os.getenv("GOOGLE_CSE_API_KEY")
os.environ["GOOGLE_CSE_CX"] = os.getenv("GOOGLE_CSE_CX")

# --- LLM adapter used by the websearch layer (and later we can reuse it) ---

openai_client = Client()

llm_adapter = OpenAIChatResponsesAdapter(
    client=openai_client,
    model="gpt-5-mini",  # adjust to your preferred model
)

# --- WebSearchExecutor with Google CSE provider ---

websearch_executor = WebSearchExecutor(
    providers=[GoogleCSEProvider()],
    default_top_k=6,
    default_locale="en-US",
    default_region="en-US",
    default_language="en",
    default_safe_search=True,
    max_text_chars=2000,
)

# --- Context builder and (optional) web-only answerer ---

context_builder = WebSearchContextBuilder(
    max_docs=4,
    max_chars_per_doc=1500,
    include_snippet=True,
    include_url=True,
    source_label_prefix="Source",
)

websearch_answerer = WebSearchAnswerer(
    adapter=llm_adapter,
    executor=websearch_executor,
    context_builder=context_builder,
    answer_language="en",
    # system_prompt=system_prompts.strict_web_rag,  # optional, if you have it
)

print("Web search layer initialized (Google CSE + OpenAI).")

# --- Async helper for raw web documents (serialized) ---

async def websearch_fetch_serialized(
    question: str,
    top_k: int = 8,
) -> List[Dict[str, Any]]:
    """
    Run web search for a given question and return a list of serialized documents.

    Each element in the returned list is a plain dict (serialized WebDocument),
    ready to be:
      - converted into RAG chunks,
      - or used by WebSearchContextBuilder to build a condensed text context.
    """
    web_docs: List[Dict[str, Any]] = await websearch_executor.search_async(
        query=question,
        top_k=top_k,
        serialize=True,
    )
    return web_docs


Web search layer initialized (Google CSE + OpenAI).


## 3. Hybrid RAG state definition

To orchestrate the hybrid RAG flow with LangGraph, we will use a single shared state
object that flows through all nodes.

The state should contain:

- `question`: the original user question,
- `pdf_docs`: documents loaded from local PDF files (before splitting),
- `docx_docs`: documents loaded from local DOCX files (before splitting),
- `web_docs_serialized`: web search results in serialized form (plain dicts coming from `WebSearchExecutor.search_async(..., serialize=True)`),
- `split_docs`: the final list of **chunked** documents (LangChain `Document` objects) ready for embedding,
- `vectorstore_ready`: a simple flag indicating that the vectorstore has been built/updated,
- `answer`: the final answer text produced by the RAG pipeline,
- `debug_info`: diagnostic information useful for inspecting what happened
  (counts of docs/chunks, vectorstore collection name, etc.).

We will represent this state as a `TypedDict` so that LangGraph can use it as the
graph state type. This also makes the code easier to reason about and refactor.

In [5]:
from typing import TypedDict, List, Dict, Any, Optional

from langchain_core.documents import Document
from langgraph.graph import StateGraph, END


class HybridRagState(TypedDict, total=False):
    """
    Shared state for the hybrid multi-source RAG pipeline.

    This object flows through all LangGraph nodes and is gradually enriched with:
      - local PDF/DOCX documents,
      - web documents (serialized),
      - split/chunked documents,
      - vectorstore metadata,
      - final RAG answer and debug info.
    """

    # User input
    question: str

    # Local documents before splitting (LangChain Document objects)
    pdf_docs: List[Document]
    docx_docs: List[Document]

    # Web documents as serialized dicts (from WebSearchExecutor.search_async(..., serialize=True))
    web_docs_serialized: List[Dict[str, Any]]

    # Final chunked documents ready for embedding / indexing
    split_docs: List[Document]

    # Vectorstore status / metadata
    vectorstore_ready: bool
    vectorstore_collection: Optional[str]

    # Final answer
    answer: str

    # Misc debug information (counts, timings, etc.)
    debug_info: Dict[str, Any]


## 4. Local documents loading (PDF + DOCX)

In this step we load **local documents** that will form the first part of the hybrid RAG corpus.

We use the existing `IntergraxDocumentsLoader` to:

- Load PDF files from a dedicated directory (e.g. `PDF_DIR`),
- Load DOCX files from a dedicated directory (e.g. `DOCX_DIR`),
- Return them as LangChain `Document` objects.

Each document already carries metadata (such as `source_path`, `source_name`, etc.) that we will later
retain when splitting into chunks and inserting into the vectorstore.

The goals of this step:

- Provide small helper functions:
  - `load_pdf_docs(pdf_dir: Path) -> list[Document]`
  - `load_docx_docs(docx_dir: Path) -> list[Document]`
- Provide a convenience function that loads **both** PDF and DOCX documents and returns:
  - the documents,
  - basic debug information (counts per type),
- Prepare the structure we will later wrap into a LangGraph node that updates `HybridRagState`
  (`pdf_docs`, `docx_docs`, `debug_info`).


In [None]:
from langchain_core.documents import Document
from pathlib import Path
from typing import List, Dict, Any, Tuple


def load_pdf_docs(pdf_dir: Path) -> List[Document]:
    """
    Load all PDF documents from the given directory using IntergraxDocumentsLoader.

    The loader is configured globally (doc_loader) and will:
      - scan the directory,
      - load supported files (PDF),
      - attach basic metadata (e.g. source_path, source_name).
    """
    if not pdf_dir.exists():
        print(f"[WARN] PDF directory does not exist: {pdf_dir}")
        return []

    # Reuse the global loader; it will handle PDF files as well.
    pdf_docs: List[Document] = doc_loader.load_documents(str(pdf_dir))
    print(f"[LOCAL LOAD] PDF docs loaded: {len(pdf_docs)} from {pdf_dir}")
    return pdf_docs


def load_docx_docs(docx_dir: Path) -> List[Document]:
    """
    Load all DOCX documents from the given directory using IntergraxDocumentsLoader.

    Because `doc_loader` was initialized with `docx_mode='paragraphs'`,
    DOCX files will be loaded with finer-grained paragraph segmentation
    (before we apply RAG splitting).
    """
    if not docx_dir.exists():
        print(f"[WARN] DOCX directory does not exist: {docx_dir}")
        return []

    docx_docs: List[Document] = doc_loader.load_documents(str(docx_dir))
    print(f"[LOCAL LOAD] DOCX docs loaded: {len(docx_docs)} from {docx_dir}")
    return docx_docs


def load_all_local_docs() -> Tuple[List[Document], List[Document], Dict[str, Any]]:
    """
    Convenience helper for the notebook:

    - Loads PDF docs from PDF_DIR,
    - Loads DOCX docs from DOCX_DIR,
    - Returns both lists plus a simple debug_info dict.
    """
    pdf_docs = load_pdf_docs(PDF_DIR)
    docx_docs = load_docx_docs(DOCX_DIR)

    debug_info: Dict[str, Any] = {
        "pdf_docs_count": len(pdf_docs),
        "docx_docs_count": len(docx_docs),
        "pdf_dir": str(PDF_DIR),
        "docx_dir": str(DOCX_DIR),
    }

    print(
        f"[LOCAL LOAD] Total local docs -> "
        f"PDF: {len(pdf_docs)}, DOCX: {len(docx_docs)}"
    )

    return pdf_docs, docx_docs, debug_info


## 5. Source loading nodes (local PDF/DOCX + web)

Now that we have:

- a shared `HybridRagState`,
- helpers for loading local documents (`load_all_local_docs()`),
- a helper for fetching web documents as serialized dicts (`websearch_fetch_serialized()`),

we can expose them as **LangGraph nodes**.

We will create two nodes:

1. `load_local_docs_node(state: HybridRagState) -> HybridRagState`  
   - Loads PDF and DOCX documents from the configured directories,
   - Stores them in `state["pdf_docs"]` and `state["docx_docs"]`,
   - Updates `state["debug_info"]` with basic counts and directory paths.

2. `load_web_docs_node(state: HybridRagState) -> HybridRagState` (async)  
   - Uses the user `question` from the state,
   - Calls `websearch_fetch_serialized(question, top_k=...)`,
   - Stores serialized web documents in `state["web_docs_serialized"]`,
   - Updates `state["debug_info"]` with the number of web documents.

These nodes will be the **first steps** of the hybrid RAG pipeline in the LangGraph graph.
Later nodes will split documents, build the vectorstore, and generate the final answer.


In [None]:
from typing import Dict, Any


def load_local_docs_node(state: HybridRagState) -> HybridRagState:
    """
    LangGraph node:
      - loads local PDF + DOCX documents,
      - stores them in the state,
      - updates debug_info with basic stats.
    """
    pdf_docs, docx_docs, debug_local = load_all_local_docs()

    # Merge with any existing debug info
    debug = dict(state.get("debug_info", {}))
    debug.update(debug_local)

    new_state: HybridRagState = {
        **state,
        "pdf_docs": pdf_docs,
        "docx_docs": docx_docs,
        "debug_info": debug,
    }

    return new_state


async def load_web_docs_node(state: HybridRagState) -> HybridRagState:
    """
    LangGraph node (async):
      - uses the question from the state,
      - runs web search via Intergrax WebSearchExecutor,
      - stores serialized web docs in the state,
      - updates debug_info with web_docs_count.
    """
    question = state.get("question", "").strip()
    if not question:
        print("[WEB LOAD] Empty question in state; skipping web search.")
        web_docs_serialized: list[Dict[str, Any]] = []
    else:
        web_docs_serialized = await websearch_fetch_serialized(
            question=question,
            top_k=8,
        )

    debug = dict(state.get("debug_info", {}))
    debug.update(
        {
            "web_docs_count": len(web_docs_serialized),
        }
    )

    new_state: HybridRagState = {
        **state,
        "web_docs_serialized": web_docs_serialized,
        "debug_info": debug,
    }

    return new_state

## 6. Building a unified, chunked corpus

At this point we already have:

- Local documents:
  - `state["pdf_docs"]`: PDF documents loaded as `Document` objects,
  - `state["docx_docs"]`: DOCX documents loaded as `Document` objects,
- Web documents:
  - `state["web_docs_serialized"]`: web results as serialized dicts coming from
    `websearch_fetch_serialized(...)`.

In this step we:

1. Convert serialized web documents into LangChain `Document` objects, so that all
   sources (PDF, DOCX, web) share the same internal representation.
2. Merge all sources into a single list of `Document` objects.
3. Use `IntergraxDocumentsSplitter` to split the merged corpus into chunks suitable
   for embedding and retrieval.
4. Store the result in:
   - `state["split_docs"]`: list of chunked `Document` objects,
   - `state["debug_info"]["split_docs_count"]`: total number of chunks.

The splitter will:

- Preserve existing metadata on each document,
- Optionally add its own metadata (e.g. `chunk_id`, `chunk_index`, `parent_id`, etc.),
- Allow us to keep track of the origin of each chunk (`source_type`, `source_path`, `source_url`).

These chunked documents will be embedded and inserted into the vectorstore in the next step.


In [None]:
from langchain_core.documents import Document
from typing import List, Dict, Any


def web_docs_to_documents(
    web_docs_serialized: List[Dict[str, Any]],
) -> List[Document]:
    """
    Convert serialized web documents (plain dicts) into LangChain Document objects.

    We try to pick a reasonable text field:
      - prefer 'content' or 'text' if available,
      - fall back to 'snippet' as a last resort.

    Metadata includes:
      - source_type = 'web',
      - url, title, snippet (if present),
      - you can also attach TENANT/CORPUS/VERSION here if you want them at document level.
    """
    docs: List[Document] = []
    for d in web_docs_serialized:
        text = (
            d.get("content")
            or d.get("text")
            or d.get("snippet")
            or ""
        )

        if not text.strip():
            # Skip empty documents
            continue

        metadata: Dict[str, Any] = {
            "source_type": "web",
            "source_url": d.get("url"),
            "source_title": d.get("title"),
            "source_snippet": d.get("snippet"),
            "tenant": TENANT,
            "corpus": CORPUS,
            "version": VERSION,
        }

        docs.append(
            Document(
                page_content=text,
                metadata=metadata,
            )
        )

    return docs


def build_unified_corpus(
    pdf_docs: List[Document],
    docx_docs: List[Document],
    web_docs_serialized: List[Dict[str, Any]],
) -> List[Document]:
    """
    Merge all sources (PDF, DOCX, web) into a single list of Document objects.

    - Local documents keep their loader metadata,
      but we also ensure tenant/corpus/version is present.
    - Web documents are converted from serialized dicts into Documents.
    """

    # Enrich local docs with tenant/corpus/version if missing
    def ensure_base_meta(doc: Document) -> Document:
        md = dict(doc.metadata or {})
        md.setdefault("tenant", TENANT)
        md.setdefault("corpus", CORPUS)
        md.setdefault("version", VERSION)
        # Optionally mark the source type if not present
        if "source_type" not in md:
            md["source_type"] = "local"
        return Document(page_content=doc.page_content, metadata=md)

    pdf_docs_enriched = [ensure_base_meta(d) for d in pdf_docs]
    docx_docs_enriched = [ensure_base_meta(d) for d in docx_docs]

    # Convert web docs to Document objects
    web_docs = web_docs_to_documents(web_docs_serialized)

    all_docs: List[Document] = pdf_docs_enriched + docx_docs_enriched + web_docs

    print(
        "[UNIFIED CORPUS] Total documents before splitting -> "
        f"PDF: {len(pdf_docs_enriched)}, "
        f"DOCX: {len(docx_docs_enriched)}, "
        f"WEB: {len(web_docs)}, "
        f"TOTAL: {len(all_docs)}"
    )

    return all_docs


def split_corpus_node(state: HybridRagState) -> HybridRagState:
    """
    LangGraph node:
      - builds a unified corpus from local + web sources,
      - applies IntergraxDocumentsSplitter to produce chunked documents,
      - stores chunks in `state['split_docs']`,
      - updates debug_info with `split_docs_count`.
    """
    pdf_docs = state.get("pdf_docs", []) or []
    docx_docs = state.get("docx_docs", []) or []
    web_docs_serialized = state.get("web_docs_serialized", []) or []

    # 1) Build unified Document list
    all_docs = build_unified_corpus(
        pdf_docs=pdf_docs,
        docx_docs=docx_docs,
        web_docs_serialized=web_docs_serialized,
    )

    if not all_docs:
        print("[SPLIT] No documents to split; unified corpus is empty.")
        split_docs: List[Document] = []
    else:
        # 2) Split into chunks using IntergraxDocumentsSplitter
        # You already initialized `splitter` earlier.
        #
        # Optionally you can pass a custom metadata callback similar to your ingest script:
        #
        # def add_meta(chunk_doc: Document, idx: int, total: int):
        #     return {"tenant": TENANT, "corpus": CORPUS, "version": VERSION}
        #
        # split_docs = splitter.split_documents(
        #     documents=all_docs,
        #     call_custom_metadata=add_meta,
        # )
        #
        # For now we rely on the splitter's default behaviour.
        split_docs = splitter.split_documents(documents=all_docs)

    debug = dict(state.get("debug_info", {}))
    debug.update(
        {
            "split_docs_count": len(split_docs),
        }
    )

    new_state: HybridRagState = {
        **state,
        "split_docs": split_docs,
        "debug_info": debug,
    }

    print(f"[SPLIT] Total chunks produced: {len(split_docs)}")

    return new_state


## 7. Indexing node (embeddings + Chroma vectorstore)

Now that we have a unified, chunked corpus in `state["split_docs"]`, we can:

1. Compute embeddings for all chunks using `IntergraxEmbeddingManager`,
2. Generate **stable, unique IDs** for each chunk (reusing splitter metadata if available),
3. Deduplicate IDs inside the current batch (Chroma requires unique IDs),
4. Insert everything into the configured `IntergraxVectorstoreManager` (Chroma backend).

The indexing node will:

- Read `state["split_docs"]`,
- Use `embed_manager.embed_documents(docs=split_docs)` to obtain:
  - a list/array of embeddings,
  - a list of (possibly normalized) `Document` objects,
- Build an `ids` list:
  - Prefer `chunk_id` from document metadata (if present),
  - Fallback to a combination like `parent_id#ch{index}` or `source_path#ch{index}`,
- Call `vectorstore.add_documents(...)` with:
  - `documents`,
  - `embeddings`,
  - `ids`,
  - a shared `base_metadata` containing `tenant`, `corpus`, `version`.

On success it will set:

- `state["vectorstore_ready"] = True`,
- `state["vectorstore_collection"] = VS_COLLECTION_NAME`,
- Update `state["debug_info"]` with:
  - `indexed_docs_count`,
  - optional vectorstore count (`vectorstore.count()`), if available.


In [None]:
# 7. Indexing node (embeddings + Chroma vectorstore)
# --------------------------------------------------

from collections import Counter
from typing import List
from langchain_core.documents import Document


def _build_chunk_ids(docs: List[Document]) -> List[str]:
    """
    Build stable, unique IDs for chunk documents based on their metadata.

    Preference order:
      1. metadata["chunk_id"] (if provided by the splitter),
      2. "{parent_id}#ch{chunk_index:04d}",
      3. "{source_path or source_name or 'doc'}#ch{chunk_index:04d}".
    """
    ids: List[str] = []

    for d in docs:
        md = d.metadata or {}
        cid = md.get("chunk_id")

        if not cid:
            parent = (
                md.get("parent_id")
                or md.get("source_path")
                or md.get("source_name")
                or "doc"
            )
            idx = int(md.get("chunk_index", 0))
            cid = f"{parent}#ch{idx:04d}"

        ids.append(str(cid))

    return ids


def _dedup_ids_batch(
    ids: List[str],
    docs: List[Document],
    embs: List[List[float]],
) -> tuple[list[str], list[Document], list[List[float]]]:
    """
    Deduplicate IDs within the current batch.

    Chroma requires unique IDs per upsert; we keep the first occurrence.
    """
    seen = set()
    new_ids: list[str] = []
    new_docs: list[Document] = []
    new_embs: list[List[float]] = []

    for i, _id in enumerate(ids):
        if _id in seen:
            continue
        seen.add(_id)
        new_ids.append(_id)
        new_docs.append(docs[i])
        new_embs.append(embs[i])

    return new_ids, new_docs, new_embs


def index_corpus_node(state: HybridRagState) -> HybridRagState:
    """
    LangGraph node:
      - embeds all split_docs with IntergraxEmbeddingManager,
      - generates stable IDs for chunks,
      - deduplicates IDs in the batch,
      - ingests everything into the Chroma-based IntergraxVectorstoreManager,
      - updates state with vectorstore flags and debug info.
    """
    split_docs: List[Document] = state.get("split_docs", []) or []

    if not split_docs:
        print("[INDEX] No split_docs in state; skipping indexing.")
        debug = dict(state.get("debug_info", {}))
        debug.update(
            {
                "indexed_docs_count": 0,
                "vectorstore_collection": VS_COLLECTION_NAME,
            }
        )
        new_state: HybridRagState = {
            **state,
            "vectorstore_ready": False,
            "vectorstore_collection": VS_COLLECTION_NAME,
            "debug_info": debug,
        }
        return new_state

    # 1) Embed all chunks
    print(f"[INDEX] Embedding {len(split_docs)} chunks...")
    embeddings, documents = embed_manager.embed_documents(docs=split_docs)

    # 2) Build stable IDs
    ids = _build_chunk_ids(documents)

    # 3) Deduplicate IDs inside the batch
    ids, documents, embeddings = _dedup_ids_batch(ids, documents, embeddings)

    # Optional double-check / warning for duplicates
    c = Counter(ids)
    dups = [k for k, v in c.items() if v > 1]
    if dups:
        print(f"[WARN] Duplicate IDs after dedup? count={len(dups)}")

    # 4) Base metadata for the whole corpus
    base_metadata = {
        "tenant": TENANT,
        "corpus": CORPUS,
        "version": VERSION,
    }

    # 5) Ingest into vectorstore
    print(
        f"[INDEX] Ingesting {len(documents)} chunks into vectorstore "
        f"(collection={VS_COLLECTION_NAME}, persist={VS_PERSIST_DIR})..."
    )

    vectorstore.add_documents(
        documents=documents,
        embeddings=embeddings,
        ids=ids,
        batch_size=128,
        base_metadata=base_metadata,
    )

    total_count = None
    try:
        total_count = vectorstore.count()
        print(f"[INDEX] Vectorstore total count after ingest: {total_count}")
    except Exception as e:
        print(f"[INDEX] Could not read vectorstore count: {e}")

    debug = dict(state.get("debug_info", {}))
    debug.update(
        {
            "indexed_docs_count": len(documents),
            "vectorstore_collection": VS_COLLECTION_NAME,
            "vectorstore_total_count": total_count,
        }
    )

    new_state: HybridRagState = {
        **state,
        "vectorstore_ready": True,
        "vectorstore_collection": VS_COLLECTION_NAME,
        "debug_info": debug,
    }

    return new_state


## 8. Retrieval and answer generation (IntergraxRagRetriever + LLM)

With the vectorstore populated, we can now:

1. Retrieve the most relevant chunks for a given user question using
   `IntergraxRagRetriever`,
2. Build a structured prompt (system + user messages) that:
   - injects the retrieved chunks as context,
   - clearly instructs the model to answer *only* based on this context,
3. Use the existing `llm_adapter` (`OpenAIChatResponsesAdapter`) to generate
   the final answer.

In this step we will:

- Instantiate a global `IntergraxRagRetriever` bound to the same vectorstore
  and embedding manager used for indexing,
- Define a helper function that:
  - runs retrieval with appropriate filters (`tenant`, `corpus`, `version`),
  - returns ranked hits with scores and metadata,
- Define a small prompt builder that turns those hits into `ChatMessage` objects,
- Implement a LangGraph node:

  `rag_answer_node(state: HybridRagState) -> HybridRagState`

  which will:
  - read `state["question"]`,
  - run retrieval,
  - generate an answer using `llm_adapter.generate_messages(...)`,
  - store the answer in `state["answer"]`,
  - update `state["debug_info"]` with retrieval stats and a small preview
    of the hits.


In [None]:
# 8. Retrieval and answer generation
# ----------------------------------

from typing import List, Dict, Any

from intergrax.rag.rag_retriever import RagRetriever
from intergrax.llm.messages import ChatMessage


# --- Global RAG retriever (same vectorstore + embeddings as indexing) ---

rag_retriever = RagRetriever(
    vectorstore,
    embed_manager,
    verbose=True,
)


def run_rag_retrieval(
    question: str,
    top_k: int = TOP_K,
    score_threshold: float = 0.15,
    max_per_parent: int = 2,
    use_mmr: bool = True,
) -> List[Dict[str, Any]]:
    """
    Run RAG retrieval against the current vectorstore with standard filters.

    Uses the same pattern as your standalone retriever example:
      - filters by TENANT/CORPUS/VERSION,
      - can use MMR,
      - returns a list of hit dicts with:
        - 'content' (chunk text),
        - 'metadata',
        - 'similarity_score',
        - 'rank', etc. (depending on your implementation).
    """
    where_filter = {
        "$and": [
            {"tenant": {"$eq": TENANT}},
            {"corpus": {"$eq": CORPUS}},
            {"version": {"$eq": VERSION}},
        ]
    }

    hits = rag_retriever.retrieve(
        question=question,
        top_k=top_k,
        score_threshold=score_threshold,
        where=where_filter,
        max_per_parent=max_per_parent,
        use_mmr=use_mmr,
        include_embeddings=False,
        prefetch_factor=5,
    )

    return hits


def build_rag_messages(
    question: str,
    hits: List[Dict[str, Any]],
    max_context_chunks: int = 8,
) -> List[ChatMessage]:
    """
    Build system + user messages for the RAG answer.

    - Takes top-N retrieved chunks,
    - Builds a context section with numbered sources,
    - Asks the model to answer strictly based on that context.
    """
    # Limit the number of chunks going into the context
    clipped_hits = hits[:max_context_chunks]

    context_parts = []
    for i, h in enumerate(clipped_hits, start=1):
        source_type = h.get("metadata", {}).get("source_type", "unknown")
        source_name = (
            h.get("metadata", {}).get("source_path")
            or h.get("metadata", {}).get("source_name")
            or h.get("metadata", {}).get("source_title")
            or f"chunk_{i}"
        )
        context_parts.append(
            f"[{i}] (source_type={source_type}, source={source_name})\n{h['content']}"
        )

    context_block = "\n\n".join(context_parts) if context_parts else "(no context retrieved)"

    system_content = (
        "You are a strict RAG assistant. "
        "Answer the user's question **only** using the provided context.\n\n"
        "If the context does not contain enough information, say that explicitly "
        "and avoid guessing or hallucinating.\n"
    )

    user_content = (
        "Context from documents and web sources:\n"
        "--------------------------------------\n"
        f"{context_block}\n\n"
        "User question:\n"
        "--------------\n"
        f"{question}\n\n"
        "Please provide a concise but detailed answer, with clear structure "
        "(short summary first, then key points)."
    )

    messages: List[ChatMessage] = [
        ChatMessage(role="system", content=system_content),
        ChatMessage(role="user", content=user_content),
    ]

    return messages


def rag_answer_node(state: HybridRagState) -> HybridRagState:
    """
    LangGraph node:
      - runs retrieval against the hybrid vectorstore,
      - builds a RAG prompt,
      - generates the final answer with the LLM adapter,
      - updates state['answer'] and state['debug_info'].

    Preconditions:
      - `index_corpus_node` has already ingested the chunks into the vectorstore.
    """
    question = state.get("question", "").strip()
    if not question:
        print("[RAG ANSWER] Empty question in state; returning without answer.")
        debug = dict(state.get("debug_info", {}))
        debug.update({"rag_hits_count": 0})
        new_state: HybridRagState = {
            **state,
            "answer": "",
            "debug_info": debug,
        }
        return new_state

    if not state.get("vectorstore_ready", False):
        print("[RAG ANSWER] Vectorstore not marked as ready; attempting retrieval anyway.")

    # 1) Retrieve relevant chunks
    hits = run_rag_retrieval(question=question, top_k=TOP_K)

    print(f"[RAG ANSWER] Retrieved {len(hits)} hits for question.")

    # 2) Build messages for the LLM
    messages = build_rag_messages(question, hits, max_context_chunks=TOP_K)

    # 3) Generate answer via Intergrax LLM adapter
    answer_text = llm_adapter.generate_messages(
        messages,
        temperature=0.1,
        max_tokens=None,
    )

    # 4) Update debug info with stats and a small preview
    debug = dict(state.get("debug_info", {}))
    debug.update(
        {
            "rag_hits_count": len(hits),
            "rag_hits_preview": [
                {
                    "rank": h.get("rank"),
                    "similarity_score": h.get("similarity_score"),
                    "source_type": h.get("metadata", {}).get("source_type"),
                    "source": (
                        h.get("metadata", {}).get("source_path")
                        or h.get("metadata", {}).get("source_name")
                        or h.get("metadata", {}).get("source_title")
                    ),
                }
                for h in hits[:3]
            ],
        }
    )

    new_state: HybridRagState = {
        **state,
        "answer": answer_text,
        "debug_info": debug,
    }

    return new_state


## 9. Building the LangGraph pipeline

Now that we have all the building blocks as standalone functions and nodes:

- `load_local_docs_node(state)`,
- `load_web_docs_node(state)`,
- `split_corpus_node(state)`,
- `index_corpus_node(state)`,
- `rag_answer_node(state)`,

we can combine them into a single `StateGraph[HybridRagState]`:

1. The graph entry point will be `load_local_docs`,
2. Then we invoke `load_web_docs` (async node),
3. Then `split_corpus` to build chunked documents,
4. Then `index_corpus` to embed and store chunks in the vectorstore,
5. Finally `rag_answer` to run retrieval and generate the final answer.

Because one of the nodes is async (`load_web_docs_node`), the compiled graph
will be executed via `graph.ainvoke(...)` and the top-level helper
`run_hybrid_rag(question: str)` will therefore be async as well.

For convenience, we also provide a small synchronous wrapper using `asyncio.run(...)`
for environments that do not support `await` at the top level.


In [None]:
# 9. Building the LangGraph pipeline
# ----------------------------------

from langgraph.graph import StateGraph, END
from typing import Dict, Any
import asyncio


# --- Build the graph ---

graph_builder = StateGraph(HybridRagState)

# Register nodes
graph_builder.add_node("load_local_docs", load_local_docs_node)
graph_builder.add_node("load_web_docs", load_web_docs_node)
graph_builder.add_node("split_corpus", split_corpus_node)
graph_builder.add_node("index_corpus", index_corpus_node)
graph_builder.add_node("rag_answer", rag_answer_node)

# Entry point
graph_builder.set_entry_point("load_local_docs")

# Edges: linear flow for the basic demo
graph_builder.add_edge("load_local_docs", "load_web_docs")
graph_builder.add_edge("load_web_docs", "split_corpus")
graph_builder.add_edge("split_corpus", "index_corpus")
graph_builder.add_edge("index_corpus", "rag_answer")
graph_builder.add_edge("rag_answer", END)

# Compile the graph (async-capable, because we have async nodes)
hybrid_rag_graph = graph_builder.compile()

print("Hybrid multi-source RAG graph compiled.")

from IPython.display import Image, display

display(Image(hybrid_rag_graph.get_graph().draw_mermaid_png()))

In [None]:
# 9.1 Helper: async runner for a single question
# ----------------------------------------------

async def run_hybrid_rag_async(question: str) -> Dict[str, Any]:
    """
    Run the full hybrid RAG pipeline for a single question (async).

    Steps:
      - initialize HybridRagState with the question,
      - run the LangGraph pipeline,
      - return question, answer and debug_info.
    """
    initial_state: HybridRagState = {
        "question": question,
        "pdf_docs": [],
        "docx_docs": [],
        "web_docs_serialized": [],
        "split_docs": [],
        "vectorstore_ready": False,
        "vectorstore_collection": VS_COLLECTION_NAME,
        "answer": "",
        "debug_info": {},
    }

    result_state = await hybrid_rag_graph.ainvoke(initial_state)

    return {
        "question": question,
        "answer": result_state.get("answer", ""),
        "debug_info": result_state.get("debug_info", {}),
    }


# 9.2 Helper: sync wrapper for environments without top-level await
# -----------------------------------------------------------------

def run_hybrid_rag(question: str) -> Dict[str, Any]:
    """
    Synchronous convenience wrapper around `run_hybrid_rag_async`.

    Useful in plain Python scripts or notebooks that do not support
    top-level `await`.
    """
    return asyncio.run(run_hybrid_rag_async(question))


In [None]:
# 9.3 Example usage
# -----------------

example_question = (
    "Summarize the key ideas from my local documents and recent web sources "
    "about AI Agents in ERP/CRM software, and propose concrete next steps "
    "to make this pipeline production-ready."
)

result = await run_hybrid_rag_async(example_question)

print("=== QUESTION ===")
print(result["question"])
print("\n=== ANSWER ===")
print(result["answer"])

print("\n=== DEBUG INFO ===")
for key, value in result["debug_info"].items():
    print(f"{key}: {value}")
