Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4de6b44
Add support for manticoresearch engine
donhardman Apr 9, 2024
553e67a
Use ef in parameters for manticoresearch instead of cutoff that is wr…
donhardman Apr 21, 2024
2ef6808
Update manticore to the latest version
donhardman May 9, 2024
d1dbce3
Fix issues with limited output when K is high
donhardman May 9, 2024
fd61682
Patch tqdm to start id from 1 instead of 0 cuz some engines uses 0 fo…
donhardman May 9, 2024
bd30f68
Adjust json reader to start id from 1 instead of 0 to solve auto incr…
donhardman May 9, 2024
fda22c6
Revert "Patch tqdm to start id from 1 instead of 0 cuz some engines u…
donhardman May 9, 2024
90b3e31
Fix random-100 database neighbours
donhardman May 9, 2024
988b4e8
Change approach how we altering ids
donhardman May 9, 2024
67ede06
Revert "Fix random-100 database neighbours"
donhardman May 9, 2024
7cc72be
Add timeouts to the connection
donhardman May 9, 2024
41dc943
Try to use keep alive connection for session when we benchmarking sea…
donhardman May 9, 2024
2b49697
Fetch only id for manticoresearch client the same way as other client…
donhardman May 10, 2024
b62d217
Update docker image and add quantization configs both for qdrant and …
donhardman Mar 10, 2025
74599c5
Run Optimize for manticore and add config of binary quantization for …
donhardman Mar 10, 2025
996d830
Disable timeout for optimize
donhardman Mar 10, 2025
e84f304
Set timeout in proper way
donhardman Mar 10, 2025
c2057dc
Add missing option word in optimize
donhardman Mar 10, 2025
face9f6
Update qdrant config
donhardman Mar 11, 2025
8479724
Add cutoff=1 to manticore and tune configs, update qdrant version
donhardman Mar 13, 2025
b8c8f77
Remove thread limitation for qdrant
donhardman Mar 16, 2025
27683b8
Add config for elasticsearch quantization
donhardman Apr 18, 2025
9b7ed61
Fix elasticserch config
donhardman Apr 18, 2025
f9c39bd
Set exact version of elasticsearch cuz of incompatibility issue
donhardman Apr 18, 2025
4d30aa9
Fix elasticsearch config
donhardman Apr 18, 2025
c81e110
Exclude parallel key for elastic
donhardman Apr 18, 2025
aa026b2
Revert "Fix elasticsearch config"
donhardman Apr 18, 2025
ed35127
Reapply "Fix elasticsearch config"
donhardman Apr 18, 2025
c259610
Update config of elastic again
donhardman Apr 18, 2025
2b53b01
Revert "Exclude parallel key for elastic"
donhardman Apr 18, 2025
74a8ac9
fix
donhardman Apr 18, 2025
16adbd5
Fix elasticsearch config
donhardman Apr 21, 2025
66d0839
Try to fix manticore search index mappings
donhardman Apr 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
__pycache__
*.pyc
NOTES.md

.DS_Store
results/*
tools/custom/data.json
1 change: 0 additions & 1 deletion engine/base_client/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def _search_one(cls, query, top: Optional[int] = None):
if query.expected_result:
ids = set(x[0] for x in search_res)
precision = len(ids.intersection(query.expected_result[:top])) / top

return precision, end - start

def search_all(
Expand Down
9 changes: 9 additions & 0 deletions engine/clients/client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
WeaviateUploader,
)

from engine.clients.manticoresearch import (
ManticoreSearchConfigurator,
ManticoreSearchSearcher,
ManticoreSearchUploader,
)

ENGINE_CONFIGURATORS = {
"qdrant": QdrantConfigurator,
"weaviate": WeaviateConfigurator,
Expand All @@ -39,6 +45,7 @@
"opensearch": OpenSearchConfigurator,
"redis": RedisConfigurator,
"pgvector": PgVectorConfigurator,
"manticoresearch": ManticoreSearchConfigurator,
}

ENGINE_UPLOADERS = {
Expand All @@ -49,6 +56,7 @@
"opensearch": OpenSearchUploader,
"redis": RedisUploader,
"pgvector": PgVectorUploader,
"manticoresearch": ManticoreSearchUploader,
}

ENGINE_SEARCHERS = {
Expand All @@ -59,6 +67,7 @@
"opensearch": OpenSearchSearcher,
"redis": RedisSearcher,
"pgvector": PgVectorSearcher,
"manticoresearch": ManticoreSearchSearcher,
}


Expand Down
5 changes: 4 additions & 1 deletion engine/clients/elasticsearch/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ def search_one(cls, vector, meta_conditions, top) -> List[Tuple[int, float]]:
"field": "vector",
"query_vector": vector,
"k": top,
**{"num_candidates": 100, **cls.search_params},
**{
"num_candidates": 100,
**{k: v for k, v in cls.search_params.items() if k != 'parallel'}
}
}

meta_conditions = cls.parser.parse(meta_conditions)
Expand Down
3 changes: 3 additions & 0 deletions engine/clients/manticoresearch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from engine.clients.manticoresearch.configure import ManticoreSearchConfigurator
from engine.clients.manticoresearch.search import ManticoreSearchSearcher
from engine.clients.manticoresearch.upload import ManticoreSearchUploader
2 changes: 2 additions & 0 deletions engine/clients/manticoresearch/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
MANTICORESEARCH_PORT = 9308
MANTICORESEARCH_TABLE = "bench"
71 changes: 71 additions & 0 deletions engine/clients/manticoresearch/configure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from benchmark.dataset import Dataset
from engine.base_client import IncompatibilityError
from engine.base_client.configure import BaseConfigurator
from engine.base_client.distances import Distance
from engine.clients.manticoresearch.config import (
MANTICORESEARCH_PORT,
MANTICORESEARCH_TABLE,
)

import requests
import json
class ManticoreSearchConfigurator(BaseConfigurator):
DISTANCE_MAPPING = {
Distance.L2: "L2",
Distance.COSINE: "COSINE",
}


INDEX_TYPE_MAPPING = {
"int": "uint",
"keyword": "string",
"text": "text",
"float": "float",
"geo": "json", # Manticore typically handles geo as JSON
}

def __init__(self, host, collection_params, connection_params):
self.host = host
self.collection_params = collection_params
self.connection_params = connection_params

def clean(self):
url = f'http://{self.host}:{MANTICORESEARCH_PORT}/sql?mode=raw'
query = f"DROP TABLE IF EXISTS `{MANTICORESEARCH_TABLE}`"
data = 'query=' + requests.utils.quote(query, safe='')
response = requests.post(url, data, **self.connection_params)
if response.status_code != 200:
print(f'Error cleaning table: {response.text}')

def recreate(self, dataset, collection_params):
if dataset.config.distance == Distance.DOT:
raise IncompatibilityError
knn_options = collection_params.get('knn_options', {})
hnsw_options = ' '.join([f"{key}='{value}'" for key, value in knn_options.items()])

vector_field = {
'name': 'vector',
'type': f"float_vector knn_type='hnsw' knn_dims='{dataset.config.vector_size}' hnsw_similarity='{self.DISTANCE_MAPPING[dataset.config.distance]}' {hnsw_options}",
}

fields = [vector_field] + [
{
'name': field_name,
'type': self.INDEX_TYPE_MAPPING.get(field_type, field_type),
}
for field_name, field_type in dataset.config.schema.items()
]

field_definitions = ', '.join([f"`{field['name']}` {field['type']}" for field in fields])

query = f"""
CREATE TABLE IF NOT EXISTS `{MANTICORESEARCH_TABLE}` (
{field_definitions}
)
"""
url = f'http://{self.host}:{MANTICORESEARCH_PORT}/sql?mode=raw'
data = 'query=' + requests.utils.quote(query, safe='')
response = requests.post(url, data, **self.connection_params)

if response.status_code != 200:
print(f'Error creating table: {response.text}')
38 changes: 38 additions & 0 deletions engine/clients/manticoresearch/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Any, List, Optional

from engine.base_client.parser import BaseConditionParser, FieldValue


class ManticoreSearchConditionParser(BaseConditionParser):
def build_condition(
self, and_subfilters: Optional[List[Any]], or_subfilters: Optional[List[Any]]
) -> Optional[Any]:
return {
"bool": {
"must": and_subfilters,
"should": or_subfilters,
}
}

def build_exact_match_filter(self, field_name: str, value: FieldValue) -> Any:
return {"match": {field_name: value}}

def build_range_filter(
self,
field_name: str,
lt: Optional[FieldValue],
gt: Optional[FieldValue],
lte: Optional[FieldValue],
gte: Optional[FieldValue],
) -> Any:
return {"range": {field_name: {"lt": lt, "gt": gt, "lte": lte, "gte": gte}}}

def build_geo_filter(
self, field_name: str, lat: float, lon: float, radius: float
) -> Any:
return {
"geo_distance": {
"distance": f"{radius}m",
field_name: {"lat": lat, "lon": lon},
}
}
50 changes: 50 additions & 0 deletions engine/clients/manticoresearch/search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import multiprocessing as mp
import uuid
from typing import List, Tuple
from urllib.parse import urljoin

from engine.base_client.search import BaseSearcher
from engine.clients.manticoresearch.config import (
MANTICORESEARCH_PORT,
MANTICORESEARCH_TABLE,
)
from engine.clients.manticoresearch.parser import ManticoreSearchConditionParser
import requests

class ManticoreSearchSearcher(BaseSearcher):
connection_params = {}
search_params = {}
parser = ManticoreSearchConditionParser()

@classmethod
def get_mp_start_method(cls):
return "forkserver" if "forkserver" in mp.get_all_start_methods() else "spawn"

@classmethod
def init_client(cls, host, distance, connection_params: dict, search_params: dict):
cls.session = requests.Session()
cls.session.headers.update({"Connection": "keep-alive"})
cls.session.headers.update({"Content-Type": "application/json"})
cls.base_url = urljoin(f"http://{host}:{MANTICORESEARCH_PORT}", "/search")
cls.search_params = search_params
cls.connection_params = connection_params

@classmethod
def search_one(cls, vector, meta_conditions, top) -> List[Tuple[int, float]]:
knn = {
"index": MANTICORESEARCH_TABLE,
"_source": "id",
"knn": {
"field": "vector",
"query_vector": vector,
"k": top,
**{**cls.search_params.get('options', {})},
},
"limit": top,
}

meta_conditions = cls.parser.parse(meta_conditions)
if meta_conditions:
knn.update(meta_conditions)
res = cls.session.post(cls.base_url, json=knn, **cls.connection_params).json()
return [(int(hit["_id"]) - 1, hit["_knn_dist"]) for hit in res["hits"]["hits"]]
70 changes: 70 additions & 0 deletions engine/clients/manticoresearch/upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import multiprocessing as mp
import uuid
from typing import List, Optional
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from engine.clients.manticoresearch.config import (
MANTICORESEARCH_PORT,
MANTICORESEARCH_TABLE,
)

from engine.base_client.upload import BaseUploader
import json

class ClosableSession(requests.Session):
def __del__(self):
self.close()

class ManticoreSearchUploader(BaseUploader):
api_url = None
session: requests.Session = None
upload_params = {}

@classmethod
def get_mp_start_method(cls):
return "forkserver" if "forkserver" in mp.get_all_start_methods() else "spawn"

@classmethod
def init_client(cls, host, distance, connection_params, upload_params):
cls.api_url = f"http://{host}:{MANTICORESEARCH_PORT}"
retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
cls.host = host
cls.session = ClosableSession()
adapter = HTTPAdapter(max_retries=retries)
cls.session.mount("http://", adapter)
cls.session.headers.update({"Content-type": "application/x-ndjson"})
cls.upload_params = upload_params

@classmethod
def upload_batch(cls, ids: List[int], vectors: List[list], metadata: Optional[List[dict]]):
if metadata is None:
metadata = [{}] * len(vectors)

docs = []
for id, vector, payload in zip(ids, vectors, metadata):
data = {
"index": MANTICORESEARCH_TABLE,
"id": id + 1, # we do not support id=0
"doc": {
"vector": vector
}
}
# data.update(payload)
docs.append({"insert": data})
data = '\n'.join([json.dumps(item) for item in docs])
response = cls.session.post(f"{cls.api_url}/bulk", data)
response.raise_for_status()

@classmethod
def post_upload(cls, _distance):
response = cls.session.post(f"{cls.api_url}/sql?mode=raw", data=f"query=FLUSH%20RAMCHUNK%60{MANTICORESEARCH_TABLE}%60")
response.raise_for_status()

response = cls.session.post(
f"{cls.api_url}/sql?mode=raw",
data=f"query=OPTIMIZE%20TABLE%20%60{MANTICORESEARCH_TABLE}%60%20OPTION%20sync%3D1%20%2Ccutoff%3D1",
timeout=None
)
response.raise_for_status()
return {}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3.5'

services:
es:
image: docker.elastic.co/elasticsearch/elasticsearch:8.10.2
image: docker.elastic.co/elasticsearch/elasticsearch:8.18.0
environment:
ELASTIC_PASSWORD: "passwd"
KIBANA_PASSWORD: "passwd"
Expand Down
19 changes: 19 additions & 0 deletions engine/servers/manticoresearch-single-node/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
version: '3.7'

services:
searchd:
image: manticoresearch/manticore:dev-7.4.7-39a6f50
environment:
EXTRA: 1
ports:
- "9308:9308"
logging:
driver: "json-file"
options:
max-file: 1
max-size: 10m
deploy:
resources:
limits:
memory: 25Gb
cpus: "1.0"
6 changes: 2 additions & 4 deletions engine/servers/qdrant-single-node/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@ version: '3.7'

services:
qdrant_bench:
image: ${CONTAINER_REGISTRY:-docker.io}/qdrant/qdrant:v1.8.2
image: ${CONTAINER_REGISTRY:-docker.io}/qdrant/qdrant:v1.13.4
network_mode: host
logging:
driver: "json-file"
options:
max-file: 1
max-size: 10m
environment:
- QDRANT__STORAGE__PERFORMANCE__OPTIMIZER_CPU_BUDGET=8
- QDRANT__STORAGE__PERFORMANCE__MAX_SEARCH_THREADS=8
deploy:
resources:
limits:
memory: 25Gb
cpus: "1.0"
Loading