In [1]:
import os
from time import time
from typing import Any, Dict, List, Tuple

from dotenv import load_dotenv
from huggingface_hub import snapshot_download
from langchain.text_splitter import RecursiveCharacterTextSplitter
import ray
from sentence_transformers import SentenceTransformer
import torch

import embedding_utils as eu

In [2]:
load_dotenv('minio.env')
MINIO_URL = os.environ['MINIO_URL']
MINIO_ACCESS_KEY = os.environ['MINIO_ACCESS_KEY']
MINIO_SECRET_KEY = os.environ['MINIO_SECRET_KEY']
if os.environ['MINIO_SECURE']=='true': MINIO_SECURE = True 
else: MINIO_SECURE = False 
PGVECTOR_HOST = os.environ['PGVECTOR_HOST']
PGVECTOR_DATABASE = os.environ['PGVECTOR_DATABASE']
PGVECTOR_USER = os.environ['PGVECTOR_USER']
PGVECTOR_PASSWORD = os.environ['PGVECTOR_PASSWORD']
PGVECTOR_PORT = os.environ['PGVECTOR_PORT']

MODELS_BUCKET = 'hf-models'
EMBEDDING_MODEL = 'intfloat/multilingual-e5-small' # Embedding model to use for converting text chunks to vector embeddings.
EMBEDDING_MODEL_REVISION = 'ffdcc22a9a5c973ef0470385cef91e1ecb461d9f'

RAY_BATCH_SIZE = 2  #100
EMBEDDING_BATCH_SIZE = 100
CHUNK_SIZE = 1000                   # Text chunk sizes which will be converted to vector embeddings
CHUNK_OVERLAP = 10
DIMENSION = 384                     # Embeddings size
ACTOR_POOL_SIZE = 1                 # Number of actors for the distributed map_batches function.
BUCKET_NAME = 'custom-corpus'         # Bucket name for batch creation of embeddings.

##### One time task to stage the embedding model in MinIO

In [3]:
#eu.upload_model_to_minio(MODELS_BUCKET, EMBEDDING_MODEL, EMBEDDING_MODEL_REVISION)

##### Callable class for distributed embedding

In [4]:
class Embed:

    def __init__(self):
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.logger = eu.create_logger()
        model_path = eu.download_model_from_minio(MODELS_BUCKET, EMBEDDING_MODEL, EMBEDDING_MODEL_REVISION)
        self.logger.info('Embedding object sucessfully downloaded.')

        self.embedding_model = SentenceTransformer(model_path, device=device)
        self.splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, length_function=len)
        self.logger.info(f'torch cuda version: {torch.version.cuda}.')
        self.logger.info(f'Device: {device}')
        self.logger.info('Splitter object sucessfully created.')
        self.logger.info('Embedding object sucessfully created.')

    def __call__(self, batch_list: List[str]) -> None:
        document_list = batch_list["item"]
        self.logger.debug(f'type(text): {type(document_list)}, type(text_batch): {type(batch_list)}.')
        self.logger.debug(f'Batch list: {batch_list}')

        timings = []
        documents = []
        for document_data in document_list:
            start_time = time()
            bucket_name = document_data[0]
            object_name = document_data[1]
            self.logger.info(f'Embedding started for: {bucket_name} - {object_name}.')
            temp_file = eu.get_document_from_minio(bucket_name, object_name)
            file = open(temp_file, 'r')
            data = file.read()

            chunks = self.splitter.split_text(data)
            embeddings = self.embedding_model.encode(chunks, batch_size=EMBEDDING_BATCH_SIZE).tolist()
            eu.save_embeddings_to_vectordb(chunks, embeddings)
            
            self.logger.info(f'Embeddings complete for: {bucket_name} - {object_name}.')
            self.logger.debug(f'len(chunks): {len(chunks)} len(emb): {len(embeddings)}.')
            total_time_sec = time() - start_time
            documents.append(object_name)
            timings.append(total_time_sec)

        self.logger.info('Embeddings sucessfully created for batch.')

        return {'timings': timings, 'documents': documents}

In [5]:
ray.init(
    #address="ray://ray-cluster-kuberay-head-svc:10001",
    runtime_env={
        "env_vars": {
            "MINIO_URL": MINIO_URL,
            "MINIO_ACCESS_KEY": MINIO_ACCESS_KEY,
            "MINIO_SECRET_KEY": MINIO_SECRET_KEY,
            "MINIO_SECURE": str(MINIO_SECURE),
            "PGVECTOR_HOST": os.environ['PGVECTOR_HOST'],
            "PGVECTOR_DATABASE": os.environ['PGVECTOR_DATABASE'],
            "PGVECTOR_USER": os.environ['PGVECTOR_USER'],
            "PGVECTOR_PASSWORD": os.environ['PGVECTOR_PASSWORD'],
            "PGVECTOR_PORT": os.environ['PGVECTOR_PORT'],
        },
        "pip": [               
            "datasets==2.19.0",
            "huggingface_hub==0.22.2",
            "minio==7.2.7",
            "psycopg2-binary==2.9.9",
            "pyarrow==16.0.0",
            "sentence-transformers==3.0.1",
            "torch==2.3.0",
            "transformers==4.40.1",
        ]
    }
)

2024-07-29 09:48:18,375	INFO worker.py:1781 -- Started a local Ray instance.


0,1
Python version:,3.9.13
Ray version:,2.33.0


In [6]:
# The embedding class expects bucket_name and document_name pairs - so add bucket name to each entry in the list. 
document_list = eu.get_object_list(BUCKET_NAME)
list_for_ray = [[BUCKET_NAME, doc] for doc in document_list]

ray_ds = ray.data.from_items(list_for_ray)
print(type(ray_ds))
print(ray_ds.schema)

<class 'ray.data.dataset.MaterializedDataset'>
<bound method Dataset.schema of MaterializedDataset(num_blocks=4, num_rows=4, schema={item: list<item: string>})>


In [7]:
ds_embed = ray_ds.map_batches(
    Embed,
    concurrency=ACTOR_POOL_SIZE,
    batch_size=RAY_BATCH_SIZE,  # Large batch size to maximize GPU utilization.
    #num_gpus=1,            # 1 GPU for each actor.
    num_cpus=1,             # 1 CPU for each actor.
)

In [8]:
@ray.remote
def ray_data_task(ds_embed):
    results = []
    for row in ds_embed.iter_rows():
        documents = row['documents']
        timings = row['timings']
        results.append((documents, timings))
    return results

start_time = time()
results = ray.get(ray_data_task.remote(ds_embed))
#results = ray_data_task(ds_embed)
total_time_sec = time() - start_time

total_distributed = 0
for result in results:
    total_distributed += result[1]

print('Total elapsed time:', total_time_sec)
print('Total distributed:', total_distributed)
results

[36m(ray_data_task pid=89704)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-07-29_09-48-16_638467_89411/logs/ray-data
[36m(ray_data_task pid=89704)[0m Execution plan of Dataset: InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(Embed)]


[36m(_MapWorker pid=89866)[0m /var/folders/_5/jt7lb09d49n9qscq4l2m3sph0000gn/T/hf-models/models--intfloat--multilingual-e5-small/snapshots/ffdcc22a9a5c973ef0470385cef91e1ecb461d9f
[36m(_MapWorker pid=89866)[0m 89866 2024-07-29 09:49:38,592 | INFO | Embedding object sucessfully downloaded.
[36m(_MapWorker pid=89866)[0m 89866 2024-07-29 09:49:40,039 | INFO | torch cuda version: None.
[36m(_MapWorker pid=89866)[0m 89866 2024-07-29 09:49:40,039 | INFO | Device: cpu
[36m(_MapWorker pid=89866)[0m 89866 2024-07-29 09:49:40,039 | INFO | Splitter object sucessfully created.
[36m(_MapWorker pid=89866)[0m 89866 2024-07-29 09:49:40,039 | INFO | Embedding object sucessfully created.
[36m(MapWorker(MapBatches(Embed)) pid=89866)[0m 89866 2024-07-29 09:49:40,057 | DEBUG | type(text): <class 'numpy.ndarray'>, type(text_batch): <class 'dict'>.
[36m(MapWorker(MapBatches(Embed)) pid=89866)[0m 89866 2024-07-29 09:49:40,057 | DEBUG | Batch list: {'item': array([array(['custom-corpus', 'A Tre

(pid=89704) - MapBatches(Embed) 1: 0 bundle [00:00, ? bundle/s]

(pid=89704) Running 0: 0 bundle [00:00, ? bundle/s]

Total elapsed time: 247.8459289073944
Total distributed: 206.50060486793518


[('A Treatise of Human Nature.txt', 108.67823910713196),
 ('The Art of War.txt', 33.333197832107544),
 ('The Strange Case of Dr Jekyll and Mr Hyde.txt', 14.420641899108887),
 ('Twenty Thousand Leagues under the Sea.txt', 50.06852602958679)]

In [9]:
ray.shutdown()