In [1]:
import os
import re
import tqdm
import PyPDF2
import pandas as pd
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.utils import embedding_functions
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline as hf_pipeline

def text_formatter(text: str) -> str:
    cleaned_text = text.replace("\n", " ").strip()
    cleaned_text = re.sub(r'\s+', ' ', cleaned_text)
    return cleaned_text

def split_into_sentences(text: str) -> list[str]:
    sentences = re.split(r'(?<=[.!?])\s+', text)
    return [s.strip() for s in sentences if s.strip()]

def open_and_read_pdf(pdf_path: str) -> list[dict]:
    pages_and_texts = []
    with open(pdf_path, 'rb') as file:
        pdf_reader = PyPDF2.PdfReader(file)
        for page_number in tqdm.tqdm(range(len(pdf_reader.pages)), desc="Reading PDF"):
            text = pdf_reader.pages[page_number].extract_text()
            text = text_formatter(text)
            pages_and_texts.append({
                "page_number": page_number,
                "text": text
            })
    return pages_and_texts

def split_list(input_list: list, slice_size: int, overlap: int = 5) -> list[list[str]]:
    chunks = []
    for i in range(0, len(input_list), slice_size - overlap):
        chunk = input_list[i:i + slice_size]
        if chunk:
            chunks.append(chunk)
    return chunks

def chunk_pdf_text(pages_and_texts: list[dict], num_sentence_chunk_size: int = 10, overlap: int = 5) -> pd.DataFrame:
    # Add sentence-level splits
    for item in pages_and_texts:
        item["sentences"] = split_into_sentences(item["text"])

    # Chunk sentences into overlapping windows
    pages_and_chunks = []
    for item in pages_and_texts:
        item["sentence_chunks"] = split_list(
            input_list=item["sentences"],
            slice_size=num_sentence_chunk_size,
            overlap=overlap
        )
        for sentence_chunk in item["sentence_chunks"]:
            joined_sentence_chunk = " ".join(sentence_chunk)
            joined_sentence_chunk = re.sub(r'\s+', ' ', joined_sentence_chunk)
            chunk_dict = {
                "page_number": item["page_number"],
                "sentence_chunk": joined_sentence_chunk,
                "chunk_char_count": len(joined_sentence_chunk),
                "chunk_word_count": len(joined_sentence_chunk.split(" ")),
                "chunk_token_count": len(joined_sentence_chunk) / 4.0  # rough token estimate
            }
            pages_and_chunks.append(chunk_dict)

    # Convert to DataFrame and filter by minimum token length (optional)
    df = pd.DataFrame(pages_and_chunks)
    return df

def embed_chunks(df: pd.DataFrame, embedding_model_name: str = "all-mpnet-base-v2", min_token_length: int = 30) -> list[dict]:
    # Filter short chunks
    filtered = df[df["chunk_token_count"] > min_token_length].to_dict(orient="records")

    # Load embedding model
    embedding_model = SentenceTransformer(embedding_model_name, device="cpu")

    # Generate embeddings
    for item in tqdm.tqdm(filtered, desc="Generating Embeddings"):
        item["embedding"] = embedding_model.encode(item["sentence_chunk"])

    return filtered

def store_in_chroma(chunks: list[dict], collection_name: str, chroma_db_path: str = "./chroma_db", embedding_model_name: str = "all-mpnet-base-v2") -> None:
    chroma_client = chromadb.PersistentClient(path=chroma_db_path)
    embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
        model_name=embedding_model_name
    )

    collection = chroma_client.get_or_create_collection(
        name=collection_name,
        embedding_function=embedding_function,
        metadata={"hnsw:space": "cosine"}  # Using cosine similarity
    )

    ids = [str(i) for i in range(len(chunks))]
    documents = [item["sentence_chunk"] for item in chunks]
    metadatas = [{"page_number": str(item["page_number"])} for item in chunks]
    embeddings = [item["embedding"].tolist() for item in chunks]

    collection.upsert(
        ids=ids,
        documents=documents,
        metadatas=metadatas,
        embeddings=embeddings
    )
    print(f"Embeddings stored in Chroma collection '{collection_name}' at {chroma_db_path}.")

def query_chroma(query: str, collection_name: str, chroma_db_path: str = "./chroma_db", embedding_model_name: str = "all-mpnet-base-v2", n_results: int = 3) -> dict:
    embedding_model = SentenceTransformer(embedding_model_name, device="cpu")
    query_embedding = embedding_model.encode(query).tolist()

    chroma_client = chromadb.PersistentClient(path=chroma_db_path)
    collection = chroma_client.get_collection(name=collection_name)

    # Query
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=n_results
    )
    return results

  from .autonotebook import tqdm as notebook_tqdm
Reading PDF: 100%|██████████| 100/100 [00:01<00:00, 77.84it/s]
Generating Embeddings: 100%|██████████| 251/251 [00:23<00:00, 10.74it/s]
Add of existing embedding ID: 0
Add of existing embedding ID: 1
Add of existing embedding ID: 2
Add of existing embedding ID: 3
Add of existing embedding ID: 4
Add of existing embedding ID: 5
Add of existing embedding ID: 6
Add of existing embedding ID: 7
Add of existing embedding ID: 8
Add of existing embedding ID: 9
Add of existing embedding ID: 10
Add of existing embedding ID: 11
Add of existing embedding ID: 12
Add of existing embedding ID: 13


Processing complete. Embeddings stored in Chroma collection 'pdf_embeddings'.


In [None]:
def query_chroma(query: str, collection_name: str, chroma_db_path: str = "./chroma_db", embedding_model_name: str = "all-mpnet-base-v2", n_results: int = 3) -> dict:
    # Load embedding model to embed the query
    embedding_model = SentenceTransformer(embedding_model_name, device="cpu")
    query_embedding = embedding_model.encode(query).tolist()

    # Connect to Chroma and retrieve the collection
    chroma_client = chromadb.PersistentClient(path=chroma_db_path)
    collection = chroma_client.get_collection(name=collection_name)

    # Query
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=n_results
    )
    return results


In [2]:
results = collection.query(
    query_texts=["This is a query about diabetes"],
    n_results=2
)

In [3]:
print(results)

{'ids': [['53', '11']], 'embeddings': None, 'documents': [['Food and Nutrition Handbook for Extension Workers10WATER-SOLUBLE VITAMINS (CONTINUED) Vitamin B3 (Niacin)Fish, meat, chicken, eggs, whole grain cerealsEnables energy production in the body, supports appetite and central nervous system functions • Dermatitis • Dementia • Diarrhoea Vitamin B6 (pyridoxine)Legumes, avocado, dark green leafy vegetables (DGLV), whole grains, nuts and seeds, cabbage, banana, liver, chicken, meat, fish, potatoes, water melon, sun flowers seedsFacilitates metabolism and absorption of fats and proteins, promotes red blood cells formation, production of protein and nerve transmitters, antioxidantsTiredness, anaemia, irritability, depression, sore tongue, nausea, muscle twitching, dizziness, dermatitis (skin problem), neuropathy (nerve problem) Vitamin B 12 (cyanocobalamin)Seafood, liver, kidney, heart, whole grains, tuna, yoghurt, eggs, cheese, meat, chickenFormation of red blood cells, affects white blo

In [31]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

def load_optimized_llm(model_name: str = "nvidia/Mistral-NeMo-Minitron-8B-Instruct") -> tuple:
    """
    Loads and optimizes the LLM model using torch.compile.

    Args:
        model_name: The model identifier from Hugging Face Hub.

    Returns:
        model: The optimized LLM model.
        tokenizer: The tokenizer associated with the model.
    """
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(model_name)
    
    # Move model to appropriate device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    # Optimize the model with torch.compile
    optimized_model = torch.compile(model)
    
    return optimized_model, tokenizer

In [32]:
import chromadb
from chromadb.utils import embedding_functions
from sentence_transformers import SentenceTransformer
import torch

def initialize_chroma(collection_name: str = "pdf_embeddings", db_path: str = "./chroma_db") -> chromadb.api.models.Collection:
    """
    Initializes and returns the Chroma collection.

    Args:
        collection_name: Name of the Chroma collection.
        db_path: Path to the Chroma DB.

    Returns:
        Chroma collection instance.
    """
    chroma_client = chromadb.PersistentClient(path=db_path)
    collection = chroma_client.get_or_create_collection(name=collection_name)
    return collection

def retrieve_documents(query: str, collection: chromadb.api.models.Collection, n_results: int = 2) -> list:
    """
    Retrieves relevant documents from Chroma DB based on the query.

    Args:
        query: The user's query string.
        collection: The Chroma collection to query.
        n_results: Number of top results to return.

    Returns:
        List of retrieved documents.
    """
    results = collection.query(
        query_texts=[query],
        n_results=n_results,
        include=['documents', 'metadatas']
    )
    return results['documents'][0]

def generate_response(model, tokenizer, prompt: str, max_length: int = 150) -> str:
    """
    Generates a response using the LLM based on the prompt.

    Args:
        model: The optimized LLM model.
        tokenizer: The tokenizer for the model.
        prompt: The text prompt to generate the response.
        max_length: Maximum length of the generated response.

    Returns:
        Generated text response.
    """
    inputs = tokenizer.encode(prompt, return_tensors="pt").to(model.device)
    outputs = model.generate(inputs, max_length=max_length, num_return_sequences=1)
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return response

In [33]:
def rag_pipeline(query: str, model, tokenizer, collection: chromadb.api.models.Collection, n_results: int = 2) -> str:
    """
    Performs Retrieval-Augmented Generation.

    Args:
        query: The user's query string.
        model: The optimized LLM model.
        tokenizer: The tokenizer for the model.
        collection: The Chroma collection to query.
        n_results: Number of top results to retrieve.

    Returns:
        Generated response based on retrieved documents.
    """
    # Retrieve relevant documents
    retrieved_docs = retrieve_documents(query, collection, n_results)
    
    # Combine retrieved documents into context
    context = "\n\n".join(retrieved_docs)
    
    # Create prompt for the LLM
    prompt = f"Context:\n{context}\n\nQuestion: {query}\nAnswer:"
    
    # Generate response
    answer = generate_response(model, tokenizer, prompt)
    
    return answer

In [34]:
def main():
    # Load and optimize the LLM model
    model, tokenizer = load_optimized_llm(model_name="gpt2")  # You can choose a different model
    
    # Initialize Chroma DB collection
    collection = initialize_chroma(collection_name="pdf_embeddings", db_path="./chroma_db")
    
    # Example query
    query = "This is a query about wheat"
    
    # Perform RAG
    response = rag_pipeline(query, model, tokenizer, collection, n_results=2)
    
    print("Generated Response:")
    print(response)

if __name__ == "__main__":
    main()

InvalidDimensionException: Embedding dimension 384 does not match collection dimensionality 768