# RAG Pipeline 

Original [Infotrend's CoreAI](https://github.com/Infotrend-Inc/CoreAI) Demo project: https://github.com/Infotrend-Inc/CoreAI-DemoProjects/tree/main/RAG_Pipeline 
**(Note: modified to use Ollama)**

## Description

This notebook demonstrates a **Retrieval-Augmented Generation (RAG)** Pipeline using Ragbits, Docling, Chromadb, and Ollama.

To find more details about each tool:
1. Ragbits: https://ragbits.deepsense.ai/
2. Docling: https://docling-project.github.io/docling/ (used by Ragbits)
3. Chromadb: https://www.trychroma.com/
4. Ollama: https://www.ollama.com/

# Pre-requisites

To support features of this notebook with CoreAI, we need to install some libraries that are not pre-installed but are required. 

**Create and Activate the Virtual Environment:**

Open a terminal within the Jupyter notebook (`File -> New -> Terminal`).
Navigate to this project's folder; where we want to set up the environment (where this notebook is located) and run:

```bash
export PROJECT_NAME="RAG_Pipeline"
export PIP_CACHE_DIR=`pwd`/.cache/pip
mkdir -p $PIP_CACHE_DIR
python -m venv --system-site-packages myvenv
source myvenv/bin/activate
pip install ipykernel
python -m ipykernel install --user --name=${PROJECT_NAME}_myvenv --display-name="Python (${PROJECT_NAME}_myvenv)"
echo ""; echo "Before continuing load the created Python kernel: Python (${PROJECT_NAME}_myvenv)"
```

This will create a local virtual environment to contain installed files to the mounted `/iti` folder (and not modify the container's files).

**Load the Python kernel described above before running the cell below** (it might take a few seconds for the kernel to appear in the list of kernels).

Install the required Libraries (from `requirements.txt`).
The rest of this notebook relies on the proper kernel to be loaded and environment variables to be set. 

Load the Python kernel described above before running the cell below (it might take a few seconds for the kernel to appear in the list of kernels).

The following will set the folder location for download so that they are local to the running container, to provide cache.

In [None]:
import os
os.environ["ANONYMIZED_TELEMETRY"] = 'False'
def set_env_with_cache_dir(env_var_name: str, subdir: str):
    base_cache = os.path.join(os.getcwd(), ".cache")
    full_path = os.path.join(base_cache, subdir)
    os.environ[env_var_name] = full_path
    os.makedirs(full_path, exist_ok=True)
    print(f"{env_var_name}={full_path}")

set_env_with_cache_dir("PIP_CACHE_DIR", "pip")
set_env_with_cache_dir("HF_HOME", "huggingface")
set_env_with_cache_dir("EASYOCR_MODULE_PATH", "easyocr")

In [None]:
!. ./myvenv/bin/activate && pip install -r requirements.txt

Import the used Python pacakges

In [None]:
import asyncio
import chromadb
import torch
import numpy as np
import uuid
import requests
from pathlib import Path
from typing import List, Tuple
from openai import OpenAI
from chromadb.errors import NotFoundError 
from typing import List, Tuple, Optional
from chromadb.api.types import EmbeddingFunction, Documents
from chromadb.config import Settings
from transformers import AutoTokenizer, AutoModel
from ragbits.core.sources.local import LocalFileSource
from ragbits.document_search.documents.document import Document, DocumentType
from ragbits.document_search.ingestion.parsers.docling import DoclingDocumentParser
from ragbits.document_search.documents.element import TextElement, Element

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"DEVICE: {device}")

## Asynchronous Document Parsing using Docling

This cell defines functions to:
- Determine file types from paths
- Parse single/multiple documents concurrently
- Extract structured elements using the Docling parser


In [None]:
async def parse_document(file_path: str):
    path = Path(file_path).resolve()
    print(f"\n Parsing all elements from '{file_path}'")
    doc_type = get_document_type(path)
    source = LocalFileSource(path=path)
    document = Document( local_path=path, metadata={ "document_type": doc_type, "source": source })
    parser = DoclingDocumentParser(ignore_images=True)
    elements = await parser.parse(document)
    return elements

def get_document_type(path: Path) -> DocumentType:
    ext = path.suffix.lower()
    if ext == ".pdf":
        return DocumentType.PDF
    elif ext == ".png":
        return DocumentType.PNG 
    elif ext == ".jpg":
        return DocumentType.JPG 
    elif ext == ".docx":
        return DocumentType.DOCX
    elif ext == ".pptx":
        return DocumentType.PPTX
    elif ext == ".xlsx":
        return DocumentType.XLSX
    elif ext == ".md":
        return DocumentType.MD
    elif ext == ".txt":
        return DocumentType.TXT
    elif ext == ".html":
        return DocumentType.HTML
    else:
        raise ValueError(f"Unsupported file type: {ext}")

async def process_files(file_paths: List[str]):
    tasks = [parse_document(file_path) for file_path in file_paths]
    results = await asyncio.gather(*tasks)
    return results

## Building a Semantic Search Index with ChromaDB

This cell defines the `ChromaDBIndex` class, which:
- Converts `TextElement` objects into vector embeddings using Sentence Transformers
- Stores them in a ChromaDB collection
- Supports fast semantic search via cosine similarity

The helper function `index_all_elements(...)` initializes and populates the index with parsed document elements.

In [None]:
class ChromaDBIndex:

    def __init__(self, model_name, collection_name="ragbits_docs",clear_on_init=False):
        chromadb_path= os.getcwd() + "/.cache/chroma"
        # https://docs.trychroma.com/docs/overview/telemetry
        self.client = chromadb.PersistentClient(path=chromadb_path, settings=Settings(anonymized_telemetry=False))
        self.collection_name = collection_name
        if clear_on_init:
            try:
                self.client.delete_collection(collection_name)
            except NotFoundError:
                pass
    
        try:
            self.collection = self.client.get_collection(name=collection_name)
        except chromadb.errors.NotFoundError:
            self.collection = self.client.create_collection(name=collection_name)
            
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)

        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"using the device as {self.device}" )
        self.model.to(self.device) 

        self._item_to_source = {}

    def _mean_pooling(self, model_output, attention_mask):
        token_embeddings = model_output[0]
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
        return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp( input_mask_expanded.sum(1), min=1e-9)

    def add_text_elements(self, elements: List[TextElement], source_id: str):
        ids = [str(uuid.uuid4()) for _ in elements]
        texts = [element.content for element in elements]
        metadatas = [{"source": source_id, "location": str(element.location)} for element in elements]
        encoded_input = self.tokenizer( texts, padding=True, truncation=True, max_length=512, return_tensors="pt").to(self.device)
        with torch.no_grad():
            model_output = self.model( input_ids=encoded_input["input_ids"], attention_mask=encoded_input["attention_mask"])
        
        embeddings = self._mean_pooling(model_output, encoded_input["attention_mask"]).cpu().numpy().tolist()
    
        self.collection.add( embeddings=embeddings, documents=texts, ids=ids, metadatas=metadatas)
    
        for item_id in ids:
            self._item_to_source[item_id] = source_id
    
        print(f"Added {len(ids)} elements from '{source_id}'")

    def search(self, query: str, k: int = 5) -> List[Tuple[str, float]]:
        encoded_input = self.tokenizer([query], padding=True, truncation=True, return_tensors="pt").to(self.device)
        with torch.no_grad():
            model_output = self.model( input_ids=encoded_input["input_ids"], attention_mask=encoded_input["attention_mask"])
        
        query_embedding = self._mean_pooling(model_output, encoded_input["attention_mask"]).cpu().numpy().tolist()[0]
        results = self.collection.query( query_embeddings=[query_embedding], n_results=k )
        matched_texts = results.get("documents", [[]])[0]
        scores = results.get("distances", [[]])[0]
        return [(text, float(score)) for text, score in zip(matched_texts, scores)]

def index_all_elements(file_paths: List[str], all_elements_list: List[List[Element]], model_name):
    index = ChromaDBIndex(model_name,collection_name="ragbits_docs", clear_on_init=True)

    for file_path, elements in zip(file_paths, all_elements_list):
        source_id = file_path
        text_elements = [el for el in elements if isinstance(el, TextElement)]
        index.add_text_elements(text_elements, source_id)

    return index

## Generate Answer from Context using LLM

This function uses a LLM to generate an answer to a given question using a specific context. If the context is insufficient, the model is instructed to say so. It returns a concise, context-grounded answer using the provided model and API credentials.

**Tip**: Feel free to modify the prompt in `generate_answer()` to better suit our needs or task objectives!



In [None]:
from openai import OpenAI

client = OpenAI(
    base_url='http://localhost:11434/v1/',
    api_key='ollama' # required but ignored
)

def generate_answer(context, question, temperature, max_tokens, model) -> str:
    prompt = f"""
You are a helpful assistant. Use the following context to answer the question.
If the context does not contain enough information, say so.

Context:
{context}

Question:
{question}

Answer:
"""

    print(f"PROMPT: {prompt}")
    
    response = client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], temperature=temperature, max_tokens=max_tokens)
    return response.choices[0].message.content.strip()

## Model Configuration

Select a model from the list displayed above and assign its name to the `model` variable.

We can also adjust:

- **Temperature:** Controls the randomness of the response.
  - Low (e.g., 0.2): More factual, predictable, and focused answers.
  - High (e.g., 1.0): More creative, diverse, and sometimes unexpected answers.

</n>

- **Max Tokens:** Sets the maximum length of the generated response. A token is roughly a word or part of a word. Use a lower number for short answers and a higher number for more detailed responses. The available limit can vary depending on the model.

In [None]:
model = "llama3.1:8b"
temperature = 0.5
max_tokens = 10000

## Set Embedding Model

This model is used to convert document text into vector embeddings for semantic search.
Choose any embedding model supported by `sentence-transformers` or **Hugging Face** `transformers`.
It is recommended to select a model that fits the performance and accuracy needs.

**Note:** Some embedding models require permission to execute custom code during their operation. These models may also come with specific dependencies that are not included by default. Always review the model documentation before allowing code execution.

In [None]:
embedding_model = "mixedbread-ai/mxbai-embed-large-v1" # for Docling, obtaining from HugginFace
# Ollama equivalent: "mxbai-embed-large:latest"

## Select Documents to Parse

Create the `docs` folder and upload documents to use for the RAG process.
We can include one or more files (e.g., PDFs, DOCX, Markdown, etc.).

In [None]:
file_input=",".join(f"docs/{f}" for f in os.listdir('./docs'))
print(f"File list: {file_input}")

## Parse and Index Documents

This step:
- Parses each file asynchronously and extracts document elements
- Indexes the text content into ChromaDB for fast semantic search

Once this is done, we're ready to run queries against the indexed documents!

In [None]:
# Adding caching to speed up demo

import os
import time
import asyncio
import hashlib
from typing import List
import pickle

PARSED_DIR = "parsed"
os.makedirs(PARSED_DIR, exist_ok=True)

def sha256_file(path: str, chunk_size: int = 1024 * 1024) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(chunk_size), b""):
            h.update(chunk)
    return h.hexdigest()

async def parse_and_cache(file_path: str):
    """Parse a file (async), cache results to .pkl if not already cached."""
    sha = await asyncio.to_thread(sha256_file, file_path)
    cache_path = os.path.join(PARSED_DIR, f"{sha}.pkl")

    # Load cached result if available
    if os.path.exists(cache_path):
        # Already processed — just load the cached result
        print(f"[cache] Loading parsed data for {file_path}")
        with open(cache_path, "rb") as f:
            result = pickle.load(f)
        return result

    # Otherwise parse the document
    print(f"[parse] Processing {file_path} ...")
    parsed_result = await parse_document(file_path)  # existing async parser

    # Save to pickle
    with open(cache_path, "wb") as f:
        pickle.dump(parsed_result, f)
    
    return parsed_result

import time
st = time.time()

files_to_parse = [f.strip() for f in file_input.split(",") if f.strip()]
all_elements = await asyncio.gather(*(parse_and_cache(f) for f in files_to_parse))

for file_path, elements in zip(files_to_parse, all_elements):
    print(f"\n Parsed {len(elements)} elements from '{file_path}'")

print(f"time taken to parse: {time.time() - st:.2f}s")

In [None]:
st= time.time()
index = index_all_elements(files_to_parse, all_elements, embedding_model)
print(f"time taken to index:{time.time()-st}")

In [None]:
# Release GPU memory if possible
torch.cuda.empty_cache()

## Ask a Question

Now it's time to query the indexed documents!

- Enter a question.
- The system will:
  1. Search for relevant text chunks using semantic similarity.
  2. Send the top results along with the question to a language model.
  3. Return a grounded answer based only on the retrieved context.



In [None]:
query = "what is exosphere? How does it work, is it used in some project currently"

# Searching embeddings
results = index.search(query)
context = "\n".join([text for text, score in results])

# Gemerating answer using found context
answer = generate_answer(context, query, temperature, max_tokens, model)

print(f"ANSWER: {answer}")

In [None]:
query = "discuss Bare Metal Management Using OpenStack Ironic"

# Searching embeddings
results = index.search(query)
context = "\n".join([text for text, score in results])

# Gemerating answer using found context
answer = generate_answer(context, query, temperature, max_tokens, model)

print(answer)

In [None]:
model = "gpt-oss:20b"

query = "discuss OpenStack virtualized environments"

# Searching embeddings
results = index.search(query)
context = "\n".join([text for text, score in results])

# Gemerating answer using found context
answer = generate_answer(context, query, temperature, max_tokens, model)

print(answer)