Skip to content

Commit 3cb1987

Browse files
committed
Merge remote-tracking branch 'origin/master' into update.redisearch
2 parents ef8bf08 + 0d20796 commit 3cb1987

39 files changed

+1555
-845
lines changed

.github/workflows/continuous-benchmark.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,11 @@ jobs:
2121
export GCS_SECRET=${{ secrets.GCS_SECRET }}
2222
export POSTGRES_PASSWORD=${{ secrets.POSTGRES_PASSWORD }}
2323
export POSTGRES_HOST=${{ secrets.POSTGRES_HOST }}
24+
25+
# Benchmark the dev branch:
26+
export QDRANT_VERSION=ghcr/dev
27+
bash -x tools/run_ci.sh
28+
29+
# Benchmark the master branch:
30+
export QDRANT_VERSION=docker/master
2431
bash -x tools/run_ci.sh

.github/workflows/manual-benchmark.yaml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@ on:
44
repository_dispatch:
55
workflow_dispatch:
66
inputs:
7-
qdrant_branch:
8-
description: "Branch of qdrant to benchmark"
9-
default: dev
7+
qdrant_version:
8+
description: "Version of qdrant to benchmark (tags/v1.6.1, <commit-id>, my-branch, docker/v1.5.1, ghcr/dev)"
9+
default: ghcr/dev
1010
dataset:
1111
description: "Dataset to benchmark"
1212
default: laion-small-clip
1313

1414
jobs:
1515
runManualBenchmark:
16-
name: manual benchmark - ${{ inputs.qdrant_branch }} - ${{ inputs.dataset }}
16+
name: manual benchmark - ${{ inputs.qdrant_version }} - ${{ inputs.dataset }}
1717
runs-on: ubuntu-latest
1818
steps:
1919
- uses: actions/checkout@v3
@@ -27,7 +27,7 @@ jobs:
2727
export GCS_SECRET=${{ secrets.GCS_SECRET }}
2828
export POSTGRES_PASSWORD=${{ secrets.POSTGRES_PASSWORD }}
2929
export POSTGRES_HOST=${{ secrets.POSTGRES_HOST }}
30-
export QDRANT_VERSION=${{ inputs.qdrant_branch }}
30+
export QDRANT_VERSION=${{ inputs.qdrant_version }}
3131
export DATASETS=${{ inputs.dataset }}
3232
export POSTGRES_TABLE=benchmark_manual
3333
bash -x tools/run_ci.sh

engine/base_client/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,10 @@ def run_experiment(
129129
print(f"\tskipping ef runtime: {ef}; #clients {parallel}")
130130
print("Experiment stage: Done")
131131
print("Results saved to: ", RESULTS_DIR)
132+
133+
def delete_client(self):
134+
self.uploader.delete_client()
135+
self.configurator.delete_client()
136+
137+
for s in self.searchers:
138+
s.delete_client()

engine/base_client/configure.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,6 @@ def configure(self, dataset: Dataset) -> Optional[dict]:
2323

2424
def execution_params(self, distance, vector_size) -> dict:
2525
return {}
26+
27+
def delete_client(self):
28+
pass

engine/clients/client_factory.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818
OpenSearchSearcher,
1919
OpenSearchUploader,
2020
)
21+
from engine.clients.pgvector import (
22+
PgVectorConfigurator,
23+
PgVectorSearcher,
24+
PgVectorUploader,
25+
)
2126
from engine.clients.qdrant import QdrantConfigurator, QdrantSearcher, QdrantUploader
2227
from engine.clients.redis import RedisConfigurator, RedisSearcher, RedisUploader
2328
from engine.clients.weaviate import (
@@ -33,6 +38,7 @@
3338
"elasticsearch": ElasticConfigurator,
3439
"opensearch": OpenSearchConfigurator,
3540
"redis": RedisConfigurator,
41+
"pgvector": PgVectorConfigurator,
3642
}
3743

3844
ENGINE_UPLOADERS = {
@@ -42,6 +48,7 @@
4248
"elasticsearch": ElasticUploader,
4349
"opensearch": OpenSearchUploader,
4450
"redis": RedisUploader,
51+
"pgvector": PgVectorUploader,
4552
}
4653

4754
ENGINE_SEARCHERS = {
@@ -51,6 +58,7 @@
5158
"elasticsearch": ElasticSearcher,
5259
"opensearch": OpenSearchSearcher,
5360
"redis": RedisSearcher,
61+
"pgvector": PgVectorSearcher,
5462
}
5563

5664

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from engine.clients.pgvector.configure import PgVectorConfigurator
2+
from engine.clients.pgvector.search import PgVectorSearcher
3+
from engine.clients.pgvector.upload import PgVectorUploader
4+
5+
__all__ = [
6+
"PgVectorConfigurator",
7+
"PgVectorSearcher",
8+
"PgVectorUploader",
9+
]

engine/clients/pgvector/config.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
PGVECTOR_PORT = 9200
2+
PGVECTOR_DB = "postgres"
3+
PGVECTOR_USER = "postgres"
4+
PGVECTOR_PASSWORD = "passwd"
5+
6+
7+
def get_db_config(host, connection_params):
8+
return {
9+
"host": host or "localhost",
10+
"dbname": PGVECTOR_DB,
11+
"user": PGVECTOR_USER,
12+
"password": PGVECTOR_PASSWORD,
13+
"autocommit": True,
14+
**connection_params,
15+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import pgvector.psycopg
2+
import psycopg
3+
4+
from benchmark.dataset import Dataset
5+
from engine.base_client import IncompatibilityError
6+
from engine.base_client.configure import BaseConfigurator
7+
from engine.base_client.distances import Distance
8+
from engine.clients.pgvector.config import get_db_config
9+
10+
11+
class PgVectorConfigurator(BaseConfigurator):
12+
DISTANCE_MAPPING = {
13+
Distance.L2: "vector_l2_ops",
14+
Distance.COSINE: "vector_cosine_ops",
15+
}
16+
17+
def __init__(self, host, collection_params: dict, connection_params: dict):
18+
super().__init__(host, collection_params, connection_params)
19+
self.conn = psycopg.connect(**get_db_config(host, connection_params))
20+
print("configure connection created")
21+
self.conn.execute("CREATE EXTENSION IF NOT EXISTS vector;")
22+
pgvector.psycopg.register_vector(self.conn)
23+
24+
def clean(self):
25+
self.conn.execute(
26+
"DROP TABLE IF EXISTS items CASCADE;",
27+
)
28+
29+
def recreate(self, dataset: Dataset, collection_params):
30+
if dataset.config.distance == Distance.DOT:
31+
raise IncompatibilityError
32+
33+
self.conn.execute(
34+
f"""CREATE TABLE items (
35+
id SERIAL PRIMARY KEY,
36+
embedding vector({dataset.config.vector_size}) NOT NULL
37+
);"""
38+
)
39+
self.conn.execute("ALTER TABLE items ALTER COLUMN embedding SET STORAGE PLAIN")
40+
41+
try:
42+
hnsw_distance_type = self.DISTANCE_MAPPING[dataset.config.distance]
43+
except KeyError:
44+
raise IncompatibilityError(
45+
f"Unsupported distance metric: {dataset.config.distance}"
46+
)
47+
48+
self.conn.execute(
49+
f"CREATE INDEX on items USING hnsw(embedding {hnsw_distance_type}) WITH (m = {collection_params['hnsw_config']['m']}, ef_construction = {collection_params['hnsw_config']['ef_construct']})"
50+
)
51+
52+
self.conn.close()
53+
54+
def delete_client(self):
55+
self.conn.close()

engine/clients/pgvector/parser.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import json
2+
from typing import Any, List, Optional
3+
4+
from engine.base_client import IncompatibilityError
5+
from engine.base_client.parser import BaseConditionParser, FieldValue
6+
7+
8+
class PgVectorConditionParser(BaseConditionParser):
9+
def build_condition(
10+
self, and_subfilters: Optional[List[Any]], or_subfilters: Optional[List[Any]]
11+
) -> Optional[Any]:
12+
clauses = []
13+
if or_subfilters is not None and len(or_subfilters) > 0:
14+
clauses.append(f"( {' OR '.join(or_subfilters)} )")
15+
if and_subfilters is not None and len(and_subfilters) > 0:
16+
clauses.append(f"( {' AND '.join(or_subfilters)} )")
17+
18+
return " AND ".join(clauses)
19+
20+
def build_exact_match_filter(self, field_name: str, value: FieldValue) -> Any:
21+
raise f"{field_name} == {json.dumps(value)}"
22+
23+
def build_range_filter(
24+
self,
25+
field_name: str,
26+
lt: Optional[FieldValue],
27+
gt: Optional[FieldValue],
28+
lte: Optional[FieldValue],
29+
gte: Optional[FieldValue],
30+
) -> Any:
31+
clauses = []
32+
if lt is not None:
33+
clauses.append(f"{field_name} < {lt}")
34+
if gt is not None:
35+
clauses.append(f"{field_name} > {gt}")
36+
if lte is not None:
37+
clauses.append(f"{field_name} <= {lte}")
38+
if gte is not None:
39+
clauses.append(f"{field_name} >= {gte}")
40+
return f"( {' AND '.join(clauses)} )"
41+
42+
def build_geo_filter(
43+
self, field_name: str, lat: float, lon: float, radius: float
44+
) -> Any:
45+
# TODO: Implement this
46+
raise IncompatibilityError

engine/clients/pgvector/search.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import multiprocessing as mp
2+
from typing import List, Tuple
3+
4+
import numpy as np
5+
import psycopg
6+
from pgvector.psycopg import register_vector
7+
8+
from engine.base_client.distances import Distance
9+
from engine.base_client.search import BaseSearcher
10+
from engine.clients.pgvector.config import get_db_config
11+
from engine.clients.pgvector.parser import PgVectorConditionParser
12+
13+
14+
class PgVectorSearcher(BaseSearcher):
15+
conn = None
16+
cur = None
17+
distance = None
18+
search_params = {}
19+
parser = PgVectorConditionParser()
20+
21+
@classmethod
22+
def init_client(cls, host, distance, connection_params: dict, search_params: dict):
23+
cls.conn = psycopg.connect(**get_db_config(host, connection_params))
24+
register_vector(cls.conn)
25+
cls.cur = cls.conn.cursor()
26+
cls.distance = distance
27+
cls.search_params = search_params["search_params"]
28+
29+
@classmethod
30+
def search_one(cls, vector, meta_conditions, top) -> List[Tuple[int, float]]:
31+
cls.cur.execute(f"SET hnsw.ef_search = {cls.search_params['hnsw_ef']}")
32+
33+
if cls.distance == Distance.COSINE:
34+
query = f"SELECT id, embedding <=> %s AS _score FROM items ORDER BY _score LIMIT {top};"
35+
elif cls.distance == Distance.L2:
36+
query = f"SELECT id, embedding <-> %s AS _score FROM items ORDER BY _score LIMIT {top};"
37+
else:
38+
raise NotImplementedError(f"Unsupported distance metric {cls.distance}")
39+
40+
cls.cur.execute(
41+
query,
42+
(np.array(vector),),
43+
)
44+
return cls.cur.fetchall()
45+
46+
@classmethod
47+
def delete_client(cls):
48+
if cls.cur:
49+
cls.cur.close()
50+
cls.conn.close()

0 commit comments

Comments
 (0)