RAG ETL

In [1]:
# load imports
import os
import chromadb
from langchain_core.documents import Document
from chromadbx import UUIDGenerator
from tqdm import tqdm
import datetime
import pandas as pd
from joblib import Parallel, delayed
import uuid
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_community.document_loaders import PyPDFium2Loader
from pymilvus import MilvusClient
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
from pymilvus import CollectionSchema, FieldSchema, DataType, Collection


In [2]:
# function decorator to log the execution time of function
import functools
def log_execution_time(func):
    @functools.wraps(func)
    def log_execution_time_wrapper(*args, **kwargs):
        time_helper = TimeHelper()
        start_time = time_helper.get_utc_now()
        print(f"Execution Start time {start_time}")
        result = func(*args, **kwargs)
        elapsed = time_helper.get_elapsed_seconds(start_time)
        print(f"Execution End time {time_helper.get_utc_now()}. Elapsed total seconds {elapsed}")
        return result
    
    return log_execution_time_wrapper

In [3]:
class FileHelper():
    """
        Helper class for Files
    """
    def populate_file_paths(self, dir_path, files):
        directories = os.listdir(dir_path)
        for dircontent in directories:
            if dircontent.startswith("."):
                continue
            file_path = os.path.join(dir_path, dircontent)
            if os.path.isfile(file_path):
                files.append(file_path)
            elif os.path.isdir(file_path):
                self.populate_file_paths(file_path, files)

    def load_documents(self, file_path, document_loader):
        loader = document_loader(file_path)
        return loader.load_and_split()

    def read_documents(self, file_paths, document_loader, n_jobs=2):
        pages_all = []
        pages = Parallel(n_jobs=n_jobs)(delayed(self.load_documents)(file_path, document_loader) for file_path in file_paths)
        for page in pages:
            pages_all.extend(page)
        
        return pages_all

In [4]:
class IterHelper():
    def batched(self, iterable, n):
        if n < 1:
            raise ValueError('n must be at least one')

        for i in range(0, len(iterable), n):
            yield iterable[i: i + n]

class TimeHelper():
    def get_utc_now(self):
        return datetime.datetime.now(datetime.timezone.utc)

    def get_elapsed_seconds(self, start_time: datetime):
        return (self.get_utc_now() - start_time).total_seconds()

class IdHelper():
    def new_id(self):
        return str(uuid.uuid4())

    def new_ids(self, num: int):
        return [self.new_id() for x in range(num)]
   
class VectorDbRepository():
    def __init__(self, collection_name, repository):
        self.collection_name = collection_name
        self.repository = repository(collection_name)

    def delete_collection(self):
        self.repository.delete_collection()

    def get_or_create_collection(self, collection_name):
        return self.repository.get_or_create_collection(collection_name)

    def create(self, documents_list: list[Document]) -> None:
        return self.repository.create(documents_list)
        
    def get_data_by_source(self, source: str):
        return self.repository.get_data_by_source(source)

    def has_data_for_source(self, source: str):
        return self.repository.has_data_for_source(source)
    
    def delete_data_by_source(self, source: str):
        return self.repository.delete_data_by_source(source)

    def find(self, search_query: str):
        return self.repository.find(search_query)

In [5]:
class EmbeddingHelper():
    def __init__(self):
        model_name = "BAAI/bge-small-en"
        model_kwargs = {"device": "mps"}
        encode_kwargs = {"normalize_embeddings": True}
        self.hf_bge_embedding = HuggingFaceBgeEmbeddings(
            model_name=model_name,
            model_kwargs=model_kwargs,
            encode_kwargs=encode_kwargs)

    @log_execution_time
    def embed(self, query):
        print("Starting embedding")
       
        return self.hf_bge_embedding.embed_query(query)


In [6]:
class ChromaDbRepository():
    def __init__(self, collection_name):
        self.collection_name = collection_name
        self.client = chromadb.PersistentClient()
        self.collection = self.get_or_create_collection()

    def delete_collection(self):
        self.client.delete_collection(self.collection_name)

    def get_or_create_collection(self):
        return self.client.get_or_create_collection(self.collection_name)

    def create(self, documents_list: list[Document]) -> None:
        ids = IdHelper().new_ids(len(documents_list))
        embedding_helper = EmbeddingHelper()
        return self.collection.add(ids=ids,
                metadatas=[p.metadata for p in documents_list],
                documents=[p.page_content for p in documents_list],
                embeddings=[embedding_helper.embed(p.page_content) for p in documents_list])
        
    def get_data_by_source(self, source: str):
        return self.collection.get(where={"source":source})

    def has_data_for_source(self, source: str):
        data_by_source = self.get_data_by_source(source)
        return len(data_by_source["ids"]) > 0
    
    def delete_data_by_source(self, source: str):
        return self.collection.delete(where={"source":source})

    def find(self, search_query: str):
        results = self.collection.query(query_texts=[search_query])
        result_documents = results.get("documents")
        if result_documents and len(result_documents) > 0:
            return ' '.join(result_documents[0])
        

In [7]:
class PdfCollectionHelper():
    def __init__(self):
        self.embedding_helper = EmbeddingHelper()

    def get_schema(self):
        pdf_id = FieldSchema(
            name="id",
            dtype=DataType.VARCHAR,
            is_primary=True,
            auto_id=False,
            max_length=100)
        source = FieldSchema(
            name="source",
            dtype=DataType.VARCHAR,
            max_length=200)
        page_number = FieldSchema(
            name="page",
            dtype=DataType.VARCHAR,
            max_length=10)
        text = FieldSchema(
            name="text",
            dtype=DataType.VARCHAR,
            max_length=6000)
        text_vector = FieldSchema(
            name="text_vector",
            dtype=DataType.FLOAT_VECTOR,
            dim=384)
        return CollectionSchema(
            fields=[pdf_id, source, page_number, text, text_vector],
            description="PDF Documents",
            enable_dynamic_field=True)

    def get_index_params(self, client):
        index_params = client.prepare_index_params()
        index_params.add_index(field_name="source", index_type="AUTOINDEX")
        index_params.add_index(field_name="text_vector", index_type="AUTOINDEX", metric_type="L2")
        return index_params

    def new_data(self, document: Document):
        return {
            "id": IdHelper().new_id(),
            "source": document.metadata.get("source"),
            "page": str(document.metadata.get("page")),
            "text": document.page_content,
            "text_vector": self.embedding_helper.embed(document.page_content)
        }

class MilvusDbRepository():
    def __init__(self, collection_name):
        self.collection_name = collection_name
        self.client = MilvusClient(uri="http://localhost:19530", token="root:Milvus")
        self.get_or_create_collection()
   
    def get_collection_helper(self):
        if self.collection_name == "pdfs":
            return PdfCollectionHelper()

    def delete_collection(self):
        if self.client.has_collection(self.collection_name):
            print(f"Deleting collection {self.collection_name}")
            self.client.drop_collection(collection_name = self.collection_name)

    def get_or_create_collection(self):
        if not self.client.has_collection(self.collection_name):
            print(f"Creating collection {self.collection_name}")
            collection_helper = self.get_collection_helper()      
            self.client.create_collection(
                collection_name = self.collection_name,
                schema = collection_helper.get_schema(),
                index_params = collection_helper.get_index_params(self.client))

    def create(self, documents_list: list[Document]) -> None:
        collection_helper = self.get_collection_helper()
        data = [collection_helper.new_data(d) for d in documents_list]
        print("data to add ", data)
        return self.client.insert(
            collection_name=self.collection_name,
            data=data)
        
    def get_data_by_source(self, source: str):
        return self.client.query(
            collection_name=self.collection_name,
            filter=f"source == '{source}'")

    def has_data_for_source(self, source: str):
        data_by_source = self.get_data_by_source(source)
        return len(data_by_source) > 0
    
    def delete_data_by_source(self, source: str):
        # Delete entities by a filter expression
        return self.client.delete(
            collection_name=self.collection_name,
            filter=f"source == '{source}'")

    def find(self, search_query: str):
        search_embedding = EmbeddingHelper().embed(search_query)
        search_result = self.client.search(collection_name=self.collection_name,
            data=[search_embedding],
            output_fields=["text"])
        print('Search result ', search_result)
        if len(search_result) > 0:
            docs = [x.get('entity').get('text') for x in search_result[0]]
            return ' '.join(docs)


In [8]:
# constants
ROOT_DIR = "/Users/pujanmaharjan/pdfs"

file_helper = FileHelper()

files = []
root_path = os.path.join(ROOT_DIR, "agile")
print('root path ', root_path)
file_helper.populate_file_paths(root_path, files)
files

root path  /Users/pujanmaharjan/pdfs/agile


['/Users/pujanmaharjan/pdfs/agile/Kameron H. Clean Code. An Agile Guide to Software Craft 2023.pdf',
 '/Users/pujanmaharjan/pdfs/agile/akka/Abraham F. Akka in Action (MEAP v13) 2ed 2023.pdf']

In [9]:
# # Experiment pdf loaders
# pdf_loader_results = []
# pdf_loaders = [PyPDFLoader, PyMuPDFLoader, PyPDFium2Loader]
# # pdf_loaders = [PyPDFium2Loader]
# time_helper = TimeHelper()
# for pdf_loader in pdf_loaders:
#     start_time = time_helper.get_utc_now()
#     pages = file_helper.read_documents(files, pdf_loader, n_jobs=5)
#     pdf_loader_results.append({"loader": pdf_loader.__name__,
#                                "elapsed": time_helper.get_elapsed_seconds(start_time),
#                                "pages_count": len(pages)})
    
# pdf_loader_results_df = pd.DataFrame(pdf_loader_results)
# pdf_loader_results_df.sort_values(by="elapsed")

# # The result shows PyMuPDFLoader is fastest

In [10]:
pages = file_helper.read_documents(files, PyMuPDFLoader, n_jobs=5)

In [11]:

@log_execution_time
def add_to_db(db_repository: VectorDbRepository, pages, batch_size):
    sources = list(set([p.metadata['source'] for p in pages]))
    for source in sources:
        if db_repository.has_data_for_source(source):
            print("previous data found so delete them ", source)
            db_repository.delete_data_by_source(source)

    iter_helper = IterHelper()
    for page_batch in tqdm(iter_helper.batched(pages, batch_size), total=len(pages)/batch_size):
        db_repository.create(page_batch)


In [12]:
db_repository = VectorDbRepository("pdfs", MilvusDbRepository)

In [13]:
db_repository.repository.client.get_load_state('pdfs')

{'state': <LoadState: Loaded>}

In [14]:
db_repository = VectorDbRepository("pdfs", MilvusDbRepository)
add_to_db(db_repository, pages[0:4], 3)

Execution Start time 2024-07-29 08:31:03.609423+00:00
previous data found so delete them  /Users/pujanmaharjan/pdfs/agile/Kameron H. Clean Code. An Agile Guide to Software Craft 2023.pdf


  from tqdm.autonotebook import tqdm, trange


Execution Start time 2024-07-29 08:31:10.958862+00:00
Starting embedding


 75%|███████▌  | 1/1.3333333333333333 [00:07<00:02,  7.48s/it]

Execution End time 2024-07-29 08:31:11.355007+00:00. Elapsed total seconds 0.396136
Execution Start time 2024-07-29 08:31:11.355260+00:00
Starting embedding
Execution End time 2024-07-29 08:31:11.415499+00:00. Elapsed total seconds 0.060232
Execution Start time 2024-07-29 08:31:11.415558+00:00
Starting embedding
Execution End time 2024-07-29 08:31:11.475509+00:00. Elapsed total seconds 0.059943
data to add  [{'id': 'aebae495-f777-4dd4-b90c-8dd565a2309a', 'source': '/Users/pujanmaharjan/pdfs/agile/Kameron H. Clean Code. An Agile Guide to Software Craft 2023.pdf', 'page': '0', 'text': 'Clean Code: An Agile Guide to Software Craft\n\xa0\nKameron Hussain and Frahaan Hussain\n\xa0\nPublished by Sonar Publishing, 2023.', 'text_vector': [-0.047733038663864136, -0.02504950761795044, -0.001811690628528595, -0.03612646460533142, 0.038309499621391296, -0.016000768169760704, -0.01492363028228283, 0.015392203815281391, -0.011930588632822037, 0.01908709481358528, 0.009967081248760223, 0.004012013319

2it [00:11,  5.63s/it]                                        

Execution Start time 2024-07-29 08:31:15.179922+00:00
Starting embedding
Execution End time 2024-07-29 08:31:15.251580+00:00. Elapsed total seconds 0.071652
data to add  [{'id': 'e15174af-4134-4c60-8585-aadc027bd5f6', 'source': '/Users/pujanmaharjan/pdfs/agile/Kameron H. Clean Code. An Agile Guide to Software Craft 2023.pdf', 'page': '3', 'text': 'Chapter 10: Classes\n\xa0\nChapter 13: Concurrency\n\xa0\nChapter 15: JUnit Internals\n\xa0\nChapter 19: Appendix B: Decimal I/O\n\xa0\nChapter 20: Appendix C: How to Transform Employee', 'text_vector': [-0.10066168010234833, -0.045353103429079056, 0.027762454003095627, -0.0646040216088295, -0.01319350115954876, 0.0023368492256850004, 0.04312436655163765, 0.024412378668785095, -0.01708403415977955, -0.020047901198267937, 0.014584182761609554, -0.040169451385736465, 0.003421908477321267, 0.00972665473818779, 0.032264918088912964, 0.01330861821770668, 0.009933099150657654, 0.051750537008047104, -0.016256136819720268, 0.007255255710333586, 0.046




In [15]:
db_repository_c = VectorDbRepository("pdfs", ChromaDbRepository)
add_to_db(db_repository_c, pages[0:4], 3)

Execution Start time 2024-07-29 08:31:15.461804+00:00
previous data found so delete them  /Users/pujanmaharjan/pdfs/agile/Kameron H. Clean Code. An Agile Guide to Software Craft 2023.pdf


 75%|███████▌  | 1/1.3333333333333333 [00:03<00:01,  3.49s/it]

Execution Start time 2024-07-29 08:31:19.337306+00:00
Starting embedding
Execution End time 2024-07-29 08:31:19.370417+00:00. Elapsed total seconds 0.033102
Execution Start time 2024-07-29 08:31:19.370498+00:00
Starting embedding
Execution End time 2024-07-29 08:31:19.387873+00:00. Elapsed total seconds 0.017369
Execution Start time 2024-07-29 08:31:19.387897+00:00
Starting embedding
Execution End time 2024-07-29 08:31:19.424267+00:00. Elapsed total seconds 0.036362


2it [00:06,  3.33s/it]                                        

Execution Start time 2024-07-29 08:31:22.543322+00:00
Starting embedding
Execution End time 2024-07-29 08:31:22.575790+00:00. Elapsed total seconds 0.032461
Execution End time 2024-07-29 08:31:22.592035+00:00. Elapsed total seconds 7.130228





In [16]:
chromadb_repo = ChromaDbRepository("pdfs")

In [17]:
chromadb_repo.find("clean code")

Number of requested results 10 is greater than number of elements in index 4, updating n_results = 4


'Table of Contents\n\xa0\nTitle Page\n\xa0\nCopyright Page\n\xa0\nClean Code: An Agile Guide to Software Craft\n\xa0\nChapter 1: Clean Code\n\xa0\nChapter 2: Meaningful Names\n\xa0\nChapter 3: Functions\n\xa0\nChapter 4: Comments\n\xa0\nChapter 5: Formatting\n\xa0\nChapter 6: Objects and Data Structures\n\xa0\nChapter 7: Error Handling\n\xa0\nChapter 8: Boundaries\n\xa0\nChapter 9: Unit Tests While every precaution has been taken in the preparation of this book, the\npublisher assumes no responsibility for errors or omissions, or for\ndamages resulting from the use of the information contained herein.\n\xa0\nCLEAN CODE: AN AGILE GUIDE TO SOFTWARE CRAFT\n\xa0\nFirst edition. October 15, 2023.\n\xa0\nCopyright © 2023 Kameron Hussain and Frahaan Hussain.\n\xa0\nWritten by Kameron Hussain and Frahaan Hussain. Clean Code: An Agile Guide to Software Craft\n\xa0\nKameron Hussain and Frahaan Hussain\n\xa0\nPublished by Sonar Publishing, 2023. Chapter 10: Classes\n\xa0\nChapter 13: Concurrency\

In [18]:
milvus_repo = MilvusDbRepository("pdfs")


In [19]:
milvus_repo.find("clean code")

Execution Start time 2024-07-29 08:31:28.858922+00:00
Starting embedding
Execution End time 2024-07-29 08:31:28.947421+00:00. Elapsed total seconds 0.088489
Search result  data: ["[{'id': '5882477b-4513-49e8-9b34-5df7f5c74020', 'distance': 0.2022618055343628, 'entity': {'text': 'While every precaution has been taken in the preparation of this book, the\\npublisher assumes no responsibility for errors or omissions, or for\\ndamages resulting from the use of the information contained herein.\\n\\xa0\\nCLEAN CODE: AN AGILE GUIDE TO SOFTWARE CRAFT\\n\\xa0\\nFirst edition. October 15, 2023.\\n\\xa0\\nCopyright © 2023 Kameron Hussain and Frahaan Hussain.\\n\\xa0\\nWritten by Kameron Hussain and Frahaan Hussain.'}}, {'id': '8f52153b-a3a1-4b86-9379-7e4c4ea68f5e', 'distance': 0.20561350882053375, 'entity': {'text': 'Table of Contents\\n\\xa0\\nTitle Page\\n\\xa0\\nCopyright Page\\n\\xa0\\nClean Code: An Agile Guide to Software Craft\\n\\xa0\\nChapter 1: Clean Code\\n\\xa0\\nChapter 2: Meaningfu

'While every precaution has been taken in the preparation of this book, the\npublisher assumes no responsibility for errors or omissions, or for\ndamages resulting from the use of the information contained herein.\n\xa0\nCLEAN CODE: AN AGILE GUIDE TO SOFTWARE CRAFT\n\xa0\nFirst edition. October 15, 2023.\n\xa0\nCopyright © 2023 Kameron Hussain and Frahaan Hussain.\n\xa0\nWritten by Kameron Hussain and Frahaan Hussain. Table of Contents\n\xa0\nTitle Page\n\xa0\nCopyright Page\n\xa0\nClean Code: An Agile Guide to Software Craft\n\xa0\nChapter 1: Clean Code\n\xa0\nChapter 2: Meaningful Names\n\xa0\nChapter 3: Functions\n\xa0\nChapter 4: Comments\n\xa0\nChapter 5: Formatting\n\xa0\nChapter 6: Objects and Data Structures\n\xa0\nChapter 7: Error Handling\n\xa0\nChapter 8: Boundaries\n\xa0\nChapter 9: Unit Tests Clean Code: An Agile Guide to Software Craft\n\xa0\nKameron Hussain and Frahaan Hussain\n\xa0\nPublished by Sonar Publishing, 2023. Chapter 10: Classes\n\xa0\nChapter 13: Concurrency\