forked from qdrant/vector-db-benchmark
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathclient.py
172 lines (150 loc) · 6.15 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import json
import os
from datetime import datetime
from typing import List
from benchmark import ROOT_DIR
from benchmark.dataset import Dataset
from engine.base_client.configure import BaseConfigurator
from engine.base_client.search import BaseSearcher
from engine.base_client.upload import BaseUploader
RESULTS_DIR = ROOT_DIR / "results"
RESULTS_DIR.mkdir(exist_ok=True)
DETAILED_RESULTS = bool(int(os.getenv("DETAILED_RESULTS", False)))
class BaseClient:
def __init__(
self,
name: str, # name of the experiment
engine: str, # name of the engine
configurator: BaseConfigurator,
uploader: BaseUploader,
searchers: List[BaseSearcher],
):
self.name = name
self.configurator = configurator
self.uploader = uploader
self.searchers = searchers
self.engine = engine
@property
def sparse_vector_support(self):
return self.configurator.SPARSE_VECTOR_SUPPORT
def save_search_results(
self, dataset_name: str, results: dict, search_id: int, search_params: dict
):
now = datetime.now()
timestamp = now.strftime("%Y-%m-%d-%H-%M-%S")
experiments_file = (
f"{self.name}-{dataset_name}-search-{search_id}-{timestamp}.json"
)
result_path = RESULTS_DIR / experiments_file
with open(result_path, "w") as out:
out.write(
json.dumps(
{
"params": {
"dataset": dataset_name,
"experiment": self.name,
"engine": self.engine,
**search_params,
},
"results": results,
},
indent=2,
)
)
return result_path
def save_upload_results(
self, dataset_name: str, results: dict, upload_params: dict
):
now = datetime.now()
timestamp = now.strftime("%Y-%m-%d-%H-%M-%S")
experiments_file = f"{self.name}-{dataset_name}-upload-{timestamp}.json"
with open(RESULTS_DIR / experiments_file, "w") as out:
upload_stats = {
"params": {
"experiment": self.name,
"engine": self.engine,
"dataset": dataset_name,
**upload_params,
},
"results": results,
}
out.write(json.dumps(upload_stats, indent=2))
def run_experiment(
self,
dataset: Dataset,
skip_upload: bool = False,
skip_search: bool = False,
skip_if_exists: bool = True,
parallels: [int] = [],
):
execution_params = self.configurator.execution_params(
distance=dataset.config.distance, vector_size=dataset.config.vector_size
)
reader = dataset.get_reader(execution_params.get("normalize", False))
if skip_if_exists:
glob_pattern = f"{self.name}-{dataset.config.name}-search-*-*.json"
existing_results = list(RESULTS_DIR.glob(glob_pattern))
if len(existing_results) == len(self.searchers):
print(
f"Skipping run for {self.name} since it already ran {len(self.searchers)} search configs previously"
)
return
if not skip_upload:
print("Experiment stage: Configure")
self.configurator.configure(dataset)
print("Experiment stage: Upload")
upload_stats = self.uploader.upload(
distance=dataset.config.distance, records=reader.read_data()
)
if not DETAILED_RESULTS:
# Remove verbose stats from upload results
upload_stats.pop("latencies", None)
self.save_upload_results(
dataset.config.name,
upload_stats,
upload_params={
**self.uploader.upload_params,
**self.configurator.collection_params,
},
)
if not skip_search:
print("Experiment stage: Search")
for search_id, searcher in enumerate(self.searchers):
if skip_if_exists:
glob_pattern = (
f"{self.name}-{dataset.config.name}-search-{search_id}-*.json"
)
existing_results = list(RESULTS_DIR.glob(glob_pattern))
print("Pattern", glob_pattern, "Results:", existing_results)
if len(existing_results) >= 1:
print(
f"Skipping search {search_id} as it already exists",
)
continue
search_params = {**searcher.search_params}
ef = search_params.get("search_params", {}).get("ef", "default")
client_count = search_params.get("parallel", 1)
filter_client_count = len(parallels) > 0
if filter_client_count and (client_count not in parallels):
print(f"\tSkipping ef runtime: {ef}; #clients {client_count}")
continue
print(f"\tRunning ef runtime: {ef}; #clients {client_count}")
search_stats = searcher.search_all(
dataset.config.distance, reader.read_queries()
)
# ensure we specify the client count in the results
search_params["parallel"] = client_count
if not DETAILED_RESULTS:
# Remove verbose stats from search results
search_stats.pop("latencies", None)
search_stats.pop("precisions", None)
self.save_search_results(
dataset.config.name, search_stats, search_id, search_params
)
print("Experiment stage: Done")
print("Results saved to: ", RESULTS_DIR)
def delete_client(self):
self.uploader.delete_client()
self.configurator.delete_client()
for s in self.searchers:
s.delete_client()