-
Notifications
You must be signed in to change notification settings - Fork 68
/
astra_vectors_live.yaml
151 lines (143 loc) · 7.42 KB
/
astra_vectors_live.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
description: |
This is a template for live vector search testing.
The goals of this test are to:
1. establish basic recall metrics on a knn computed dataset
2.
schema: Install the schema required to run the test
rampup: Measure how long it takes to load a set of embeddings
search_and_index: Measure how the system responds to queries while it
is indexing recently ingested data.
#? await_index: Pause and wait for the system to complete compactions or index processing
search: Run vector search with a set of default (or overridden) parameters
search_and_rewrite: Run the same search operations as above, but while rewriting the data
search_and_invalidate: Run the same search operations as above, but while overwriting the data
with different content using the same vector id.
In all of these phases, it is important to instance the metrics with distinct names.
Also, aggregates of recall should include total aggregate as well as a moving average.
scenarios:
astra_vectors:
drop: run tags='block:drop' labels='target:astra' threads===1 cycles===2 driverconfig=app.conf
schema: run tags='block:schema' labels='target:astra' threads===1 cycles===2
rampup: run tags='block:rampup' labels='target:astra' threads=100 cycles=TEMPLATE(trainsize) errors=counter
search_and_index_unthrottled: >-
run tags='block:search_and_index,optype=select' labels='target:astra'
cycles=TEMPLATE(testsize) threads=10 errors=count,retry stride=500 errors=counter
search_and_index: >-
run alias=search_and_index tags='block:search_and_index,optype=select' labels='target:astra'
cycles=TEMPLATE(testsize) errors=count,retry stride=100 striderate=17.5
errors=counter threads=500
# one activity or two? data leap-frog? or concurrency separate for both?
# await_index: run tags='block:await_index' # This would need to exit when a condition is met
# stop_search_and_index: stop search_and_index
# only possible if we have a triggering event to indicated
# live_search: run tags='block:search' labels='target:astra' threads=1 cycles=TEMPLATE(testsize,10000)
search_and_rewrite: run tags='block:search_and_rewrite' labels='target:astra'
search_and_invalidate: run tags='block:search_and_invalidate' labels='target:astra'
params:
driver: cqld4
instrument: true
bindings:
id: ToString()
# This
test_floatlist: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/test"); ToCqlVector();
relevant_indices: HdfFileToIntArray("testdata/TEMPLATE(dataset).hdf5", "/neighbors")
distance_floatlist: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/distance")
train_floatlist: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/train"); ToCqlVector();
synthetic_vectors: HashedFloatVectors(TEMPLATE(dimensions));
blocks:
drop:
params:
cl: TEMPLATE(cl,LOCAL_QUORUM)
prepared: false
ops:
drop_index: |
DROP INDEX IF EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors);
drop_table: |
DROP TABLE IF EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors);
schema:
params:
cl: TEMPLATE(cl,LOCAL_QUORUM)
prepared: false
ops:
# create_keyspace: |
# CREATE KEYSPACE IF NOT EXISTS TEMPLATE(keyspace,baselines)
# WITH replication = {'class': 'NetworkTopologyStrategy', 'TEMPLATE(region)': '3'};
create_table: |
CREATE TABLE IF NOT EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (
key TEXT,
value vector<float,TEMPLATE(dimensions)>,
PRIMARY KEY (key)
);
create_sai_index: |
CREATE CUSTOM INDEX IF NOT EXISTS ON TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (value) USING 'StorageAttachedIndex'
WITH OPTIONS = {'similarity_function' : 'TEMPLATE(similarity_function,cosine)'};
# WITH OPTIONS = {'maximum_node_connections' : TEMPLATE(M,16), 'construction_beam_width' : TEMPLATE(ef,100), 'similarity_function' : 'TEMPLATE(similarity_function,dot_product)'};
rampup:
params:
cl: TEMPLATE(write_cl,LOCAL_QUORUM)
prepared: true
ops:
insert: |
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors)
(key, value) VALUES ({id},{train_floatlist});
# await_index:
# ops:
search_and_index:
ops:
select_ann_limit:
prepared: |
SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors)
ORDER BY value ANN OF {test_floatlist} LIMIT TEMPLATE(select_limit,100);
tags:
optype: select
verifier-init: |
relevancy=scriptingmetrics.newRelevancyMeasures(_parsed_op,"group","relevancy");
for (int k in List.of(100)) {
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.recall("recall",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.precision("precision",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.F1("F1",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.reciprocal_rank("RR",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.average_precision("AP",k));
}
for (int k in List.of(1,2,3,5,10,25,50,75)) {
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.recall("s_recall",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.precision("s_precision",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.F1("s_F1",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.reciprocal_rank("s_RR",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.average_precision("s_AP",k));
}
verifier: |
// driver-specific function
actual_indices=cql_utils.cqlStringColumnToIntArray("key",result);
// driver-agnostic function
relevancy.accept({relevant_indices},actual_indices);
// because we are "verifying" although this needs to be reorganized
return true;
insert_rewrite:
prepared: |
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors)
(key, value) VALUES ({id},{train_floatlist});
tags:
optype: insert
search_and_rewrite:
ops:
select_ann_limit:
stmt: |
SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) ORDER BY value ANN OF {test_vector} LIMIT TEMPLATE(select_limit,100);
verifier-init: |
scriptingmetrics.newSummaryGauge(_parsed_op,"recall")
# verifier: |
upsert_same:
stmt: |
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors)
(key, value) VALUES ({rw_key},{train_vector});
search_and_invalidate:
ops:
select_ann_limit:
stmt: |
SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) ORDER BY value ANN OF {test_vector} LIMIT TEMPLATE(select_limit,100);
# verifier-init: |
# verifier: |
upsert_random: |
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors)
(key, value) VALUES ({rw_key},{train_vector});