In [1]:
# libraries
import os
import numpy as np
import pypdf
import json
import faiss

from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer

# Gemini
import google.generativeai as genai
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))

# Mistral
from mistralai import Mistral
client = Mistral(api_key=os.environ["MISTRAL_API_KEY"])

In [None]:
# query functions
def query_gemini(prompt: str, modelname="gemini-2.0-flash") -> str:
    """Queries the Gemini API with a prompt and returns the response."""
    model = genai.GenerativeModel(modelname)
    response = model.generate_content(prompt)
    return response.text

def query_mistral(prompt: str, modelname="ministral-3b-latest") -> str:
    """Queries the Mistral API with a prompt and returns the response."""
    chat_response = client.chat.complete(
        model = modelname,
        messages = [{"role": "user", "content": prompt,}]
    )

    response = chat_response.choices[0].message.content
    return response

# file load functions
def pdf_to_text(filepath) -> str:
    """Extract text from a PDF file."""
    reader = pypdf.PdfReader(filepath)
    text = " ".join([page.extract_text() for page in reader.pages if page.extract_text()]) # join pages
    return text

def txt_to_text(filepath: str) -> str:
    """Read text from a .txt file."""
    with open(filepath, "r", encoding="utf-8") as file:
        return file.read()

In [None]:
class Embedder:
    """Handles file loading, chunking and text embedding using SentenceTransformer and stores embeddings in FAISS & JSON."""
    
    def __init__(self, model_name: str):
        self.model = SentenceTransformer(model_name)
        self.dim = self.model.get_sentence_embedding_dimension()
        self.data_path = os.getenv("data_path", "")
        self.vector_embedding_path = os.getenv("embedding_path", "") + 'chunk_vectors.faiss'
        self.metadata_path = os.getenv("embedding_path", "") + 'chunk_metadata.json'
        self.index = None
        self.metadata = None

        if os.path.exists(self.vector_embedding_path) and os.path.exists(self.metadata_path):
                self._load_embeddings()

    def _load_embeddings(self):
        """Loads embeddings and metadata from files."""
        self.index = self._load_index()
        self.metadata = self._load_metadata()

    def _delete_embedding_files(self):
        """Deletes the vector embedding and metadata files."""
        if os.path.exists(self.vector_embedding_path):
            os.remove(self.vector_embedding_path)
        if os.path.exists(self.metadata_path):
            os.remove(self.metadata_path)
    
    def _load_documents(self):
        """Loads text data from the data_path."""
        docs = []
        for filename in os.listdir(self.data_path):
            file_path = os.path.join(self.data_path, filename)
            if filename.endswith(".pdf"):
                docs.append({"filename": filename, "text": pdf_to_text(file_path)})
            elif filename.endswith(".txt"):
                docs.append({"filename": filename, "text": txt_to_text(file_path)})
            elif os.path.isdir(file_path):
                continue
            else:
                print(f"Could not load file {filename}")
        return docs

    def _chunk_documents(self, chunk_size: int, chunk_overlap: int):
        """Splits texts into chunks."""
        docs = self._load_documents()
        splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
        chunks = []
        for doc in docs:
            chunk_texts = splitter.split_text(doc["text"])
            filename = doc["filename"]
            for chunk_text in chunk_texts:
                chunks.append({"filename": filename, "text": chunk_text})

        return chunks 

    def encode(self, texts: list[str]) -> np.ndarray:
        """Returns vector embeddings for a list of texts."""
        return self.model.encode(texts, convert_to_numpy=True)
    
    def _create_index(self):
        """Creates FAISS index and stores it in the vector_embedding_path."""
        self.index = faiss.IndexFlatL2(self.dim)

    def _load_index(self):
        """Loads FAISS index from the vector_embedding_path."""
        return faiss.read_index(self.vector_embedding_path)
    
    def _save_index(self):
        """Saves FAISS index to the vector_embedding_path."""
        faiss.write_index(self.index, self.vector_embedding_path)
    
    def _create_metadata(self):
        """Creates metadata file if it does not exist."""
        if not os.path.exists(self.metadata_path):
            with open(self.metadata_path, "w") as f:
                json.dump([], f)

    def _load_metadata(self):
        """Loads metadata from JSON file if it exists; otherwise, returns an empty list."""
        try:
            with open(self.metadata_path, "r") as f:
                return json.load(f)
        except json.JSONDecodeError:
            pass

    def _save_metadata(self):
        """Saves metadata to JSON file."""
        with open(self.metadata_path, "w") as f:
            json.dump(self.metadata, f, indent=4)

    def _store_embeddings(self, chunks: list[dict]):
        """Stores vector embeddings in FAISS and saves text-index mapping in JSON."""
        chunk_vectors = self.encode([chunk["text"] for chunk in chunks])

        # Get the starting index for new entries
        start_idx = len(self.metadata)

        # Store vectors in FAISS
        self.index.add(chunk_vectors)
        
        # Store metadata (text -> FAISS index)
        for i, chunk in enumerate(chunks):
            self.metadata.append({
                "filename": chunk["filename"],
                "text": chunk["text"],
                "vector_id": start_idx + i
            })

        self._save_index()
        self._save_metadata()

    def reload_embeddings(self, chunk_size: int, chunk_overlap: int):
        """Reloads embeddings from the data_path."""
        
        # check if data path contains files
        if len(os.listdir(self.data_path)) == 0:
            print("No files found in data_path.")
            return
        
        self._delete_embedding_files()
        self._create_index()
        self._create_metadata()
        chunks = self._chunk_documents(chunk_size, chunk_overlap)
        self._store_embeddings(chunks)

    def search(self, query_text, top_k=3):
        """Searches FAISS and retrieves text metadata from JSON."""
        query_vector = self.encode([query_text])
        distances, indices = self.index.search(query_vector, top_k)
        return distances, indices


class RAG:
    def __init__(self, embedding_model_name="all-MiniLM-L6-v2", LLM_name="gemini-2.0-flash"):       
        self.data_path = os.getenv("data_path")
        self.vector_embedding_path = os.getenv("vector_embedding_path")
        self.embedder = Embedder(embedding_model_name) 
        self.LLM_name = LLM_name

    def _retrieve_context(self, query, top_k_chunks):
        distances, indices = self.embedder.search(query, top_k=top_k_chunks)
        relevant_chunks = [self.embedder.metadata[idx] for idx in indices[0]]
        for i, chunk in enumerate(relevant_chunks):
            chunk["distance"] = distances[0][i]
        return relevant_chunks
    
    def reload_embeddings(self, chunk_size=500, chunk_overlap=50):
        self.embedder.reload_embeddings(chunk_size, chunk_overlap)

    def query(self, query, top_k_chunks=3):
        relevant_chunks = self._retrieve_context(query, top_k_chunks)
        joined_chunks = " ".join([chunk["text"] for chunk in relevant_chunks])
        
        # Format the prompt
        prompt = f"""
        You are an AI assistant. Use the following retrieved context to answer the question.

        Context:
        {joined_chunks}

        Question:
        {query}
        """


        answer = query_gemini(prompt)

        response = {
            "chunks": relevant_chunks,
            "query": query,
            "answer": answer
        }	

        return response


In [15]:
em = Embedder("all-MiniLM-L6-v2")
em._delete_embedding_files()
#em._chunk_documents(500, 50)

In [19]:
em = Embedder("all-MiniLM-L6-v2")
em.reload_embeddings(500, 50)

In [21]:
# check number of vectors in faiss
em.index.ntotal == len(em.metadata)

True

In [None]:
# Example usage
rag = RAG(top_k_chunks=5)
rag.reload_embeddings()
rag.embedder.index.ntotal

879

In [23]:
query = "How does the RAGE tool work?"
response = rag.query(query)
print(response["answer"])

RAGE works by querying external sources and incorporating relevant information into its input context. It considers combinations of retrieved sources, creates prompts based on the user's question for each combination, and retrieves answers from a Large Language Model (LLM). After analyzing the answers, RAGE groups combinations by answer and displays the proportion of each answer. It identifies parts of the input context that, when removed, change the LLM's answer, providing counterfactual explanations. RAGE also includes pruning methods to manage the space of possible explanations.



In [24]:
response["chunks"]

[{'filename': 'Rorseth2024.pdf',
  'text': '1A video is available at https://vimeo.com/877281038.\n2The tool is available at http://lg-research-2.uwaterloo.ca:8092/rage.\narXiv:2405.13000v1  [cs.CL]  11 May 2024 Perturbation\nSearch\nCounterfactual\nSearch\nRetrieval Model\n(Pyserini BM25)\nLlama 2 Chat 7B LLMCounterfactual\nExplanations\nAnswers\nAnalysis\nLucene\nIndex\nUsers RAGE \nWeb App\n(Plotly Dash)\nKnowledge \nLLM \nFig. 1. The architecture of RAGE.\nknowledge about the topic and a provided set of sources. In',
  'vector_id': 332,
  'distance': np.float32(1.0285647)},
 {'filename': 'Rorseth2024.pdf',
  'text': 'set of combinations, RAGE considers all combinations of the\nretrieved sources Dq, or draws a fixed-size random sample of s\ncombinations. Based on the user’s original question, a prompt\nis created for each selected combination, which is then used to\nretrieve corresponding answers from the LLM. After analyzing\nthe answers, RAGE renders a table that groups combinatio

In [13]:
sum([len(chunk['text']) for chunk in rag.embedder.metadata]) / len(rag.embedder.metadata)

449.1808873720137

In [None]:
class Summarizer:
    def __init__(self, model="gemini", model_name="gemini-2.0-flash"):
        self.data_path = os.getenv("data_path")
        self.model = model
        self.model_name = model_name
        self.summary_extension = ".summary.txt"

    def summarize_text(self, text: str) -> str:
        prompt = f"Summarize the following document in one paragraph: \n{text}"
        if self.model == "gemini":
            response = query_gemini(prompt)
        elif self.model == "mistral":
            response = query_mistral(prompt)
        else:
            print(f"Model {self.model} not found.")
            return None
        return response
    
    def _load_document(self, path: str) -> str:
        if path.endswith(".pdf"):
            return pdf_to_text(path)
        elif path.endswith(".txt"):
            return txt_to_text(path)
        elif os.path.isdir(path):
            print(f"Could not load directory {path}")
        else:
            print(f"Could not load file {path}")

    def _write_file(self, path: str, text: str):
        with open(path, "w", encoding="utf-8") as file:
            file.write(text)
    
    def create_summaries(self, clear=True):
        if clear:
            for filename in os.listdir(self.data_path):
                if filename.endswith(self.summary_extension):
                    file_path = os.path.join(self.data_path, filename)
                    os.remove(file_path)

        summaries = {}

        for filename in os.listdir(self.data_path):
            file_path = os.path.join(self.data_path, filename)
            filename = os.path.splitext(filename)[0] # remove file extension
            
            text = self._load_document(file_path)
            summary = self.summarize_text(text)
            
            summary_path = f"{self.data_path}/{filename}{self.summary_extension}"
            summaries[summary_path] = summary
        
        for path, summary in summaries.items():
            self._write_file(path, summary)

In [None]:
# test summarizer
s = Summarizer(model="mistral")
s.create_summaries()

In [13]:
query = "What is usable XAI?"
embedding_model = Embedder("all-MiniLM-L6-v2")



# Load the index
index = faiss.read_index(os.getenv("vector_embedding_path"))

# Encode the query
query_embedding = embedding_model.encode([query])

# Search for the most similar chunks
distances, indices = index.search(query_embedding, 3)

# Retrieve the most similar chunks
#retrieved_context = [self.chunks[i] for i in indices[0]]

In [15]:
indices[0]

array([375, 726, 402])

In [30]:
rag = RAG(include_summaries=True)
rag._create_summaries()

#query = "What is usable XAI?"
#rag.create_embedding()
#print(rag.doc_names)
#rag.query(query)
#rag.create_embedding()

In [80]:
# Chunk data

def split_text_into_chunks(texts, chunk_size=1000, overlap=100):
    splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=overlap)
    chunks = []
    for text in texts:
        chunks.extend(splitter.split_text(text))
    return chunks

text_chunks = split_text_into_chunks(pdf_texts)

In [111]:
def split_text_into_chunks(texts, chunk_size=1000, overlap=100):
    """
    Splits a list of texts into chunks and returns a list of dictionaries.

    :param texts: List of text documents
    :param chunk_size: Maximum chunk size
    :param overlap: Overlap between chunks
    :return: List of dictionaries with 'text' and 'document' keys
    """
    splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=overlap)
    chunks = []

    for doc_index, text in enumerate(texts):
        split_texts = splitter.split_text(text)
        for chunk in split_texts:
            chunks.append({"text": chunk, "document": doc_index})  # Track source document

    return chunks

# Example usage
chunks = split_text_into_chunks(pdf_texts)


In [112]:
# Embed data

embedding_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

def get_embeddings_local(text_chunks):
    text_chunks = [chunk['text'] for chunk in chunks]
    return embedding_model.encode(text_chunks, convert_to_list=True)

vector_embeddings = get_embeddings_local(text_chunks)

In [113]:
# Convert embeddings to a NumPy array
embedding_matrix = np.array(vector_embeddings, dtype=np.float32)

# Create FAISS index (L2 similarity search)
index = faiss.IndexFlatL2(embedding_matrix.shape[1])

# Add embeddings to the index
index.add(embedding_matrix)

# Save the index for later use
faiss.write_index(index, "vector_index.faiss")

In [115]:
def search_faiss(query, top_k=2):
    query_embedding = embedding_model.encode([query], convert_to_numpy=True)
    distances, indices = index.search(query_embedding, top_k)
    
    # Create a dictionary with index as key and distance as value
    results = {int(idx): float(dist) for idx, dist in zip(indices[0], distances[0])}
    
    return results


query = "What are the advantages of RAG?"
result = search_faiss(query, top_k=2)
print(result)


{303: 1.0080444812774658, 310: 1.1675567626953125}


In [132]:
from statistics import mode

doc_origins = []
for chunk in result:
    doc = chunks[chunk]['document']
    doc_origins.append(doc)

mode(doc_origins)

2

In [138]:
# Extract top matching texts from FAISS
text_chunks = [chunk['text'] for chunk in chunks]
retrieved_context = "\n".join([text_chunks[idx] for idx in result.keys()])

## Research

In [76]:
# number of documents
n_docs = len(os.listdir('./data'))
print(f'number of documents: {n_docs}')

# number of chunks
n_chunks = len(text_chunks)
print(f'number of chunks: {n_chunks}')

# average chunk size
avg_chunk_size = round(sum([len(text) for text in text_chunks]) / n_chunks)
print(f'average chunk size (characters): {avg_chunk_size}')

number of documents: 3
number of chunks: 953
average chunk size (characters): 451


In [28]:
# query LLM n times
n = 5
query = 'How long is a banana?'

responses = []
for i in range(n):
    response = query_gemini(query)
    responses.append(response)

responses

['The length of a banana can vary depending on the variety, but a typical banana is **around 6 to 9 inches (15 to 23 centimeters) long.**\n',
 "The length of a banana can vary quite a bit, depending on the type and how it's grown. However, a typical banana is usually between **6 and 9 inches (15 to 23 cm) long**.\n",
 'The length of a banana can vary depending on the type and ripeness, but a typical banana is **about 6 to 9 inches (15 to 23 cm) long.**\n',
 'The length of a banana can vary, but on average:\n\n*   **Typical banana:** 6-8 inches (15-20 cm)\n\nKeep in mind that there are different varieties of bananas, some of which are smaller or larger than average.',
 'The length of a banana can vary depending on the variety, but a typical banana is **around 6-9 inches (15-23 cm) long**.\n']

## Code Archive

In [None]:
from transformers import pipeline
from huggingface_hub import login

login("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")

# Load a local LLM (example: Mistral-7B)
llm_pipeline = pipeline("text-generation", model="meta-llama/Llama-3.2-1B-Instruct")

def query_local_llm(query, retrieved_context):
    """Query a local LLM using the retrieved context."""

    prompt = f"""
    You are an AI assistant. Use the following retrieved context to answer the question.

    Context:
    {retrieved_context}

    Question:
    {query}

    Answer:
    """

    response = llm_pipeline(prompt, max_length=500, do_sample=True)
    return response[0]["generated_text"]

response = query_local_llm(query, retrieved_context)
