# Base part

In [1]:
import datetime
import random

import numpy as np
from pydantic import BaseModel
import abc
import logging

logger = logging.getLogger(__name__)


class DotModel(BaseModel):
    uid: str
    vector: list[float]


class VectorClient(abc.ABC):
    @abc.abstractmethod
    def push_dots(self, dots: list[DotModel]):
        pass

    @abc.abstractmethod
    def query_dot(self, vector: list[float], limit: int = 15) -> list[DotModel]:
        pass

    @abc.abstractmethod
    def create_collection(self):
        pass

    def start_pushing(self):
        self._start_pushing_time = datetime.datetime.now()

    def end_pushing(self):
        delta = datetime.datetime.now() - self._start_pushing_time
        logger.info(f'end pushing {type(self).__name__} in {delta}')

In [None]:
from create_embeddings.schemas.embedding import ProductEmbedding, ModelEnum
from parsers.runnures.utils.csv import CsvReader

from create_embeddings.schemas.embedding_model import EmbeddingModelCsvFile, embedding_size_map
from create_embeddings.utils import get_csv_files_info

BATCH_SIZE = 100
USED_MODEL = ModelEnum.MULTILINGUAL_E5_LARGE_INSTRUCT
VECTOR_LEN = embedding_size_map[USED_MODEL]


def populate_embeddings(client: VectorClient):
    csv_files_info: list[EmbeddingModelCsvFile] = get_csv_files_info()

    client.create_collection()
    logger.info(f'collection created {type(client).__name__}')

    i = 0
    client.start_pushing()
    for csv_file_info in csv_files_info:
        if csv_file_info.embedding_model_name != USED_MODEL or "OBI" not in csv_file_info.file_path:
            continue

        logger.info(f'start new file {csv_file_info.file_path.split("/")[-1]}')

        batch: list[DotModel] = []
        csv_reader: CsvReader[ProductEmbedding] = CsvReader(csv_file_info.file_path, ProductEmbedding)
        for uid_vector in csv_reader:
            i += 1
            batch.append(DotModel(id=uid_vector.uid, vector=uid_vector.embedding))
            if len(batch) == BATCH_SIZE:
                logger.info(f'uploading {i} points to {type(client).__name__}')
                client.push_dots(batch)
                batch = []

        client.push_dots(batch)
        batch = []
    client.end_pushing()


In [None]:
from typing import Generator
import time

TEST_DURATION = 60


def call_get_dot_with_rps(client: VectorClient, rps: int):
    now = time.time()
    end_time = now + TEST_DURATION
    sleep_interval = 1 / rps
    next_call = now + sleep_interval
    point_generator = next_point()

    while now < end_time:
        now = time.time()
        if now >= next_call:
            client.get_dot(next(point_generator))
            next_call = next_call + sleep_interval


def call_get_dot_with_max_single_core_rps(client: VectorClient):
    now = time.time()
    end_time = now + TEST_DURATION
    point_generator = next_point()

    while now < end_time:
        now = time.time()
        client.get_dot(next(point_generator))


def next_point(include_random=True) -> Generator[None, list[float], None]:
    csv_files_info: list[EmbeddingModelCsvFile] = get_csv_files_info()
    while True:
        for csv_file_info in csv_files_info:
            if csv_file_info.embedding_model_name != USED_MODEL or "STROYDVOR" not in csv_file_info.file_path:
                continue

            csv_reader: CsvReader[ProductEmbedding] = CsvReader(csv_file_info.file_path, ProductEmbedding)
            for uid_vector in csv_reader:
                if include_random:
                    while True:
                        use_random = random.randint(0, 1)
                        if use_random:
                            yield get_random_point(VECTOR_LEN)
                        else:
                            yield uid_vector.embedding
                else:
                    yield uid_vector.embedding


def get_random_point(l: int) -> list[float]:
    return [random.random() for _ in range(l)]

# Clients

## Qdarant

```
docker run -d -p 8900:6333 \
    -v ~volumes/benchmark_vector_db/qdrant:/qdrant/storage \
    qdrant/qdrant
```

In [None]:
from qdrant_client.http import models as qdrant_models
from qdrant_client import QdrantClient


class QdrantVectorClient(VectorClient):
    def __init__(self):
        QDRANT_CONNECTION_STRING = "http://localhost:8900"
        self.client = QdrantClient(url=QDRANT_CONNECTION_STRING)
        self.collection_name = "benchmarking_vector_db"

    def push_dots(self, dots: list[DotModel]):
        batch = []
        for dot in dots:
            batch.append(qdrant_models.PointStruct(
                id=dot.uid,
                vector=dot.vector,
            ))
        self.client.upload_points(
            collection_name=self.collection_name,
            points=batch,
        )

    def query_dot(self, vector: list[float], limit: int = 15) -> list[DotModel]:
        hits: list[qdrant_models.ScoredPoint] = self.client.query_points(
            collection_name=self.collection_name,
            query=vector,
            limit=limit,
        ).points

        result: list[DotModel] = []
        for hit in hits:
            result.append(DotModel(uid=hit.id, vector=hit.vector))
        return result

    def create_collection(self):
        self.client.create_collection(
            collection_name=self.collection_name,
            vectors_config=qdrant_models.VectorParams(
                size=VECTOR_LEN, distance=qdrant_models.Distance.COSINE, on_disk=True
            ),
            hnsw_config=qdrant_models.HnswConfigDiff(m=16, ef_construct=128, on_disk=True),
            optimizers_config=qdrant_models.OptimizersConfigDiff(indexing_threshold=0),
        )

    def end_pushing(self):
        self.client.update_collection(
            collection_name=self.collection_name,
            optimizer_config=qdrant_models.OptimizersConfigDiff(indexing_threshold=10_000),
        )

    super().end_pushing()

## Chroma

```
docker run -d --rm --name chromadb -p 8901:8000 -v ~volumes/benchmark_vector_db/chroma:/chroma/chroma -e IS_PERSISTENT=TRUE chromadb/chroma:0.6.3
```

In [None]:
from chromadb.api.types import convert_np_embeddings_to_list
import chromadb


class ChromaVectorClient(VectorClient):
    def __init__(self):
        self.client = chromadb.HttpClient(port=8901)
        self.collection_name = "benchmarking_vector_db"
        self.collection: chromadb.Collection = None

    def push_dots(self, dots: list[DotModel]):
        ids = []
        embeddings = []
        for dot in dots:
            ids.append(dot.uid)
            embeddings.append(dot.vector)

        self.collection.add(
            ids=ids,
            embeddings=embeddings,
        )

    def query_dot(self, vector: list[float], limit: int = 15) -> list[DotModel]:
        result: chromadb.QueryResult = self.collection.query(
            query_embeddings=vector,
            n_results=limit,
        )

        result: list[DotModel] = []
        for i in range(len(result["ids"])):
            embeddings = result["embeddings"][i]
            if isinstance(embeddings, chromadb.Embeddings):
                embeddings = convert_np_embeddings_to_list(embeddings)
            result.append(DotModel(uid=result["ids"][i], vector=embeddings))
        return result

    def create_collection(self):
        self.collection = self.client.create_collection(
            self.collection_name,
            metadata={
                "hnsw:space": "cosine",
                "hnsw:construction_ef": 128,
                "hnsw:M": 16,
            }
        )


## Elasticsearch

```
docker run --name elastic -p 8902:9200 -d -m 1GB -e ELASTIC_PASSWORD=password docker.elastic.co/elasticsearch/elasticsearch:8.17.3
```

In [None]:
from elasticsearch.helpers import bulk
from elasticsearch import Elasticsearch


class ElasticVectorClient(VectorClient):
    def __init__(self):
        self.client = Elasticsearch(
            "https://localhost:8902",
            basic_auth=("elastic", "password"),
            verify_certs=False,
        )
        self.collection_name = "benchmarking_vector_db" 
    
    def push_dots(self, dots: list[DotModel]):
        actions = [
            {
                "_op_type": "index",
                "_index": self.collection_name,
                "_id": dot.uid,
                "_source": {
                    "embedding": dot.vector
                }
            }
            for dot in dots
        ]
        bulk(self.client, actions)

    def query_dot(self, vector: list[float], limit: int = 15) -> list[DotModel]:
        query = {
            "size": limit,
            "query": {
                "knn": {
                    "embedding": {
                        "vector": vector,
                        "k": limit
                    }
                }
            }
        }

        response = self.client.search(index=self.collection_name, body=query)
        results = []
        for hit in response['hits']['hits']:
            uid = hit['_id']
            embedding = hit['_source']['embedding']
            results.append(DotModel(uid=uid, vector=embedding))
        return results

    def create_collection(self):
        self.client.indices.create(
            index="my_index",
            mappings={
                "properties": {
                    "embedding": {
                        "type": "dense_vector",
                        "dims": VECTOR_LEN,
                        "index": "true",
                        "similarity": "cosine",
                        "element_type": "float",
                        "index_options": {
                            "type": "hnsw",
                            "ef_construction": 128,
                            "m": 16,
                        }
                    }
                }
            }
        )


## Postgres

```
docker run --name pgvector -d -p 8903:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=password -e POSTGRES_DB=benchmarking_vector_db -v ~volumes/benchmark_vector_db/pgvector:/var/lib/postgresql/data pgvector/pgvector:pg17
```

In [None]:
from uuid import uuid4
from sqlalchemy import create_engine, text
from elasticsearch.helpers import bulk
from elasticsearch import Elasticsearch


class PgvectorVectorClient(VectorClient):
    def __init__(self):
        self.engine = create_engine("postgresql+psycopg2://postgres:password@localhost:8903/benchmarking_vector_db")
        self.collection_name = "benchmarking_vector_db" 
    
    def push_dots(self, dots: list[DotModel]):
        actions = [
            {
                "_op_type": "index",
                "_index": self.collection_name,
                "_id": dot.uid,
                "_source": {
                    "embedding": dot.vector
                }
            }
            for dot in dots
        ]
        bulk(self.client, actions)

    def query_dot(self, vector: list[float], limit: int = 15) -> list[DotModel]:
        query = {
            "size": limit,
            "query": {
                "knn": {
                    "embedding": {
                        "vector": vector,
                        "k": limit
                    }
                }
            }
        }

        response = self.client.search(index=self.collection_name, body=query)
        results = []
        for hit in response['hits']['hits']:
            uid = hit['_id']
            embedding = hit['_source']['embedding']
            results.append(DotModel(uid=uid, vector=embedding))
        return results

    def create_collection(self):
        with self.engine.connect() as connection:
            result = connection.execute(
                text("CREATE EXTENSION IF NOT EXISTS vector;")
            )
            
            result = connection.execute(
                text("""
                CREATE TABLE items (
                    uid ??? PRIMARY KEY,
                    embedding vector(:vector_len) -- vector data
                );
                """,
                     {"uuid_len": len(str(uuid4())), "vector_len": VECTOR_LEN})
            )
        # self.client.indices.create(
        #     index="my_index",
        #     mappings={
        #         "properties": {
        #             "embedding": {
        #                 "type": "dense_vector",
        #                 "dims": VECTOR_LEN,
        #                 "index": "true",
        #                 "similarity": "cosine",
        #                 "element_type": "float",
        #                 "index_options": {
        #                     "type": "hnsw",
        #                     "ef_construction": 128,
        #                     "m": 16,
        #                 }
        #             }
        #         }
        #     }
        # )


# Run Tests