In [2]:
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
import os
from tqdm import tqdm
from embedding_model import Embedder
import chromadb
import torch
import hashlib

import math



##### USE HASH library for persistent ids assignment

https://cookbook.chromadb.dev/core/document-ids/#hashes

In [3]:
def generate_sha256_hash_from_text(text):
    # Create a SHA256 hash object
    sha256_hash = hashlib.sha256()
    # Update the hash object with the text encoded to bytes
    sha256_hash.update(text.encode('utf-8'))
    # Return the hexadecimal representation of the hash
    return sha256_hash.hexdigest()


In [4]:
generate_sha256_hash_from_text("Hello Worl3232d!")

'3eb787d0b3d7fe92710642d8b377187a8f320eaf6d6e4e015d2a5a150477c8bf'

In [5]:
def augment_multiple_query(query, model="gpt-3.5-turbo"):
    messages = [
        {
            "role": "system",
            "content": SYSTEM_PROMT_QUERY_EXPANSION,
        },
        {"role": "user", "content": query}
    ]

    response = openai_client.chat.completions.create(
        model=model,
        messages=messages,
    )
    content = response.choices[0].message.content
    content = content.split("\n")
    return content

### Usage


```bash
docker run tinyrag --persistent_storage [path]  
```


```bash
docker-compose up
```
### This should deploys a RAG API with 4 endpoints:

 - /tinyrag/upload_file
 
 - /tinyrag/upload_zip
 
 - /tinyrag/query$?expand
 
 - /tinyrag/reset
 
 
 Think more about it

In [6]:
class PdfChunksLoader_ChromaDB():
    def __init__(self, collection, embedder, text_splitter=None):
        
        
        
        self.collection = collection
        self.embedder = embedder
        self.id = 0
        self.text_splitter = text_splitter if text_splitter else RecursiveCharacterTextSplitter(chunk_size=1500, 
                                                                           chunk_overlap=100,
                                                                           separators=["\n", "\t", ".", ",", " ", ""],)
        
        
    def _extract_pdf_chunks(self, path):
        
        loader = PyPDFLoader(path)
        
        chunks = loader.load_and_split(text_splitter=self.text_splitter)
        
        return chunks
    
    def _get_chunk_id(self, chunk):
        
        return "chunkID_" + generate_sha256_hash_from_text(chunk.page_content)
        
    def filter_existing_docs(self, docs_ids_map):
        
        
        ids_computed = list(docs_ids_map.keys())
        
            
        existing_chunks_ids = rag.collection.get(ids=ids_computed)["ids"]
        
        
        def extract_only_new_docs(keyval_tuple):
            key, value = keyval_tuple
            
            return (key not in existing_chunks_ids)
            
        filtered_docs_map = dict(filter(extract_only_new_docs,  docs_ids_map.items()))
        
        return filtered_docs_map
        
        
    
    def populate(self, documents):
        ##TODO: add batch size for computing embeddings
        
        ### try to add one by one to avoid redundant computing of embedds
        
        
        #check filter ids
        
        ids_computed = [self._get_chunk_id(chunk) for chunk in documents]
        
        docs_id_map = {uri_id : doc for uri_id, doc in zip(ids_computed, documents)}
        
        filtered_docs_id_map = self.filter_existing_docs(docs_id_map)
        
        if (filtered_docs_id_map):
            self.collection.add(
                documents=[chunk.page_content for chunk in filtered_docs_id_map.values()],

                metadatas = [chunk.metadata for chunk in filtered_docs_id_map.values()],

                embeddings = self.embedder.compute_embeddings([chunk.page_content for chunk in filtered_docs_id_map.values()]).tolist(),
                ids = [doc_id for doc_id in filtered_docs_id_map.keys()]

                )
        else:
            print("Documents already exist...")



In [7]:

class RetrievalAugmentedGenerator():
    def __init__(self, db_client, embedder, collection_name):
        
        self.db_client = db_client
        self.embedder = embedder
        
        self.collection_name = collection_name
        self.collection = self.db_client.get_or_create_collection(name=self.collection_name)
        
        self.chunk_loader = PdfChunksLoader_ChromaDB(self.collection,
                                                     embedder)

    def upload_pdf_file(self, path_file, batch_size=5):
        ##Load chunks by batches
        
        docs = self.chunk_loader._extract_pdf_chunks(path_file)
        
        for i in tqdm(range(math.ceil(len(docs) / batch_size)), desc=f"[{path_file}] loading batches:"):
            self.chunk_loader.populate(docs[i * batch_size : (i + 1) * batch_size])
        
        
        print(f"[{path_file}]: All batches loaded successfully...")
    
    def query_with_embeddings(self, embeddings, top_k):
        
        return self.collection.query(query_embeddings=embeddings,
                                      n_results=top_k)
    
    def query_with_text(self, queries, top_k):
        
        #compute embeddings
        
        embeddings_tensor = self.embedder.compute_embeddings(queries)
        embeddings_list = embeddings_tensor.tolist()
        
        
        return self.query_with_embeddings(embeddings_list, top_k)
    
    
    def get(self, ids, where, limit):
        pass
    
    def reset_collection(self):
        pass
    
    
    
    

In [8]:
#db_client = chromadb.PersistentClient(path="./persistent_storage")

db_client = chromadb.HttpClient(host="localhost", port=8000)

#collection = db_client.get_collection("my_collection")
embedder =  Embedder(model_name='sentence-transformers/all-MiniLM-L12-v2',
                    tokenizer_name='sentence-transformers/all-MiniLM-L12-v2')



2024-03-17 10:43:08.074163: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [9]:
rag = RetrievalAugmentedGenerator(db_client, embedder, "default_collection")

In [10]:
rag.upload_pdf_file("library/pthreads.pdf")

[library/pthreads.pdf] loading batches:: 100%|██| 14/14 [01:04<00:00,  4.63s/it]

[library/pthreads.pdf]: All batches loaded successfully...





In [11]:
rag.collection.count()

66

In [56]:
relevant_chunks = rag.query_with_text(["Mutexes and semaphores for avoiding deadlocks"], top_k=5)

In [73]:
sources = "\n".join([ str(d) for d in relevant_chunks["metadatas"][0]])

In [86]:
promt_template = "Hello {}; I am {}"



'Hello good; I am bad'

## Creating basic LLM_Generator

In [46]:
from openai import OpenAI
import os
from dotenv import load_dotenv, find_dotenv

_ = load_dotenv(find_dotenv())

OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]

openai_client = OpenAI(api_key=OPENAI_API_KEY)

In [89]:
class OpenAI_LLMGenerator():
    def __init__(self, openai_client, system_promt, max_token=1500, modelname="gpt-3.5-turbo"):
        
        self.system_promt = system_promt
        self.client = openai_client
        self.modelname = modelname
        self.max_token = max_token
        
    
    #NOW WORKS FOR SINGLE query
    def _create_userpromt_from_chunks(self, query, relevant_chunks):
        
        joint_relevant_chunks = "\n<EOD>\n".join(relevant_chunks["documents"][0])
        
        
        USER_PROMT_TEMPLATE = f""" You need to answer this question using provided information: {query}
                            
                            Here's the related chunks of documents. Each chunks ends with special token <EOD>:
                            
                            {joint_relevant_chunks}
                           
                      """
        
        return USER_PROMT_TEMPLATE
    
    def _get_chunkssources_info(self, relevant_chunks):
        
        sources = "\n".join([ str(d) for d in relevant_chunks["metadatas"][0]])
        
        return sources
    
    
        
    def generate_response(self, query_text, relevant_chunks):
        
        messages = [
        {
            "role": "system",
            "content": self.system_promt,
        },
            
        {
            "role": "user",
            "content": self._create_userpromt_from_chunks(query=query_text,
                                                          relevant_chunks=relevant_chunks)
        }   ]

        response = self.client.chat.completions.create(
            model=self.modelname,
            messages=messages,
           )
        
        
        content = response.choices[0].message.content
        content = content
        
        content += "\n\n\n [INFO] Related chunks used for generation:\n\n" + self._get_chunkssources_info(relevant_chunks)
        
        return content

        
        

In [90]:
SYS_PROMT_GENERATION = "You are a helpful and knowledgeable advisor that uses provided information to combine your knowledge with this info. Be helpful"

In [91]:
llmgen = OpenAI_LLMGenerator(openai_client, SYS_PROMT_GENERATION)

In [92]:
print(llmgen.generate_response("What are mutexes and semaphores for?", relevant_chunks))

Mutexes and semaphores are synchronization mechanisms used in multithreaded programming to manage access to shared resources and coordinate the execution of threads.

- Mutexes are used to prevent multiple threads from accessing a shared resource concurrently. They allow exclusive access to the resource, ensuring that only one thread can access it at a time. Mutexes help in avoiding race conditions and ensuring data integrity.
  
- Semaphores, on the other hand, are used to manage synchronization between multiple threads. A semaphore is essentially a counter that can control access to a shared resource by allowing a specified number of threads to access it simultaneously. 

From the provided information:
- Mutexes help in preventing deadlocks by allowing one thread to block the execution of another, but if not used properly, they can lead to deadlocks.
  
- Semaphores can be used to block threads when a queue empties until new jobs become available, ensuring that threads do not exit pr