In [2]:
import dask.dataframe as dd
from sentence_transformers import SentenceTransformer
from pymongo import MongoClient
from dask.distributed import Client
from dask import delayed

In [3]:
def encode_chunk(chunk, VDB_model):
    return VDB_model.encode(chunk)

In [4]:
client = Client()  # Start a Dask client

df = dd.read_csv('DATASET/Captioned/*.csv')
captions = df["captioning"].compute().tolist()

ADDRESS = dict()  # key: caption, value: file_name
for _, row in df.iterrows():
    ADDRESS[row['captioning']] = row['file_name']

VDB_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

chunk_size = 200 
chunks = [captions[i:i + chunk_size] for i in range(0, len(captions), chunk_size)]


In [5]:
# Process each chunk independently and concatenate the results
delayed_tasks = [delayed(encode_chunk)(chunk, VDB_model) for chunk in chunks]
results = client.compute(delayed_tasks)  # Trigger computation in parallel

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


  (['[0:00-10:00]\nThis is the live performance of a ...  Normalize()
))
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good


In [6]:
passage_embedding = []
for result in results:
    passage_embedding.extend(result.result())

In [7]:
client = MongoClient("mongodb://localhost:27017/?directConnection=true")
testdb = client.testdb
captionsData = testdb.captions
for caption in captions:
    captionsData.insert_one({
        "caption": caption,
        "file-name": ADDRESS[caption]
    })
captionsData.create_index("file-name")

'file-name_1'

In [15]:
def gendata(passage_embedding, ADDRESS, captions):
    for vec in range(len(passage_embedding)):
        yield {
            "_op_type": "create",
            "_index": "captions-index",
            "caption-vector": passage_embedding[vec],
            "file-name": ADDRESS[captions[vec]],
            "file-type": "audio"
        }

In [16]:
from elasticsearch import Elasticsearch, helpers
es = Elasticsearch(['https://66d731f461bf42b09d7000cf2ade257b.us-central1.gcp.cloud.es.io'], http_auth=("username", "password"))
helpers.bulk(es , gendata(passage_embedding, ADDRESS, captions))

  es = Elasticsearch(['https://66d731f461bf42b09d7000cf2ade257b.us-central1.gcp.cloud.es.io'], http_auth=("yashaswi", "kafkatest"))


(986, [])