forked from tetrafolium/luci-gae
/
query_merger.go
308 lines (279 loc) · 8.81 KB
/
query_merger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package txnBuf
import (
"bytes"
"sort"
"go.chromium.org/gae/impl/memory"
ds "go.chromium.org/gae/service/datastore"
"go.chromium.org/gae/service/datastore/serialize"
"go.chromium.org/luci/common/data/stringset"
)
// queryToIter takes a FinalizedQuery and returns an iterator function which
// will produce either *items or errors.
//
// - d is the raw datastore to run this query on
// - filter is a function which will return true if the given key should be
// excluded from the result set.
func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterface) func() (*item, error) {
c := make(chan *item)
go func() {
defer close(c)
err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorCB) error {
i := &item{key: k, data: pm}
select {
case c <- i:
return nil
case <-stopChan:
return ds.Stop
}
})
if err != nil {
c <- &item{err: err}
}
}()
return func() (*item, error) {
itm := <-c
if itm == nil {
return nil, nil
}
if itm.err != nil {
return nil, itm.err
}
return itm, nil
}
}
// adjustQuery applies various mutations to the query to make it suitable for
// merging. In general, this removes limits and offsets the 'distinct' modifier,
// and it ensures that if there are sort orders which won't appear in the
// result data that the query is transformed into a projection query which
// contains all of the data. A non-projection query will never be transformed
// in this way.
func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) {
q := fq.Original()
// The limit and offset must be done in-memory because otherwise we may
// request too few entities from the underlying store if many matching
// entities have been deleted in the buffered transaction.
q = q.Limit(-1)
q = q.Offset(-1)
// distinction must be done in-memory, because otherwise there's no way
// to merge in the effect of the in-flight changes (because there's no way
// to push back to the datastore "yeah, I know you told me that the (1, 2)
// result came from `/Bob,1`, but would you mind pretending that it didn't
// and tell me next the one instead?
q = q.Distinct(false)
// since we need to merge results, we must have all order-related fields
// in each result. The only time we wouldn't have all the data available would
// be for a keys-only or projection query. To fix this, we convert all
// Projection and KeysOnly queries to project on /all/ Orders.
//
// FinalizedQuery already guarantees that all projected fields show up in
// the Orders, but the projected fields could be a subset of the orders.
//
// Additionally on a keys-only query, any orders other than __key__ require
// conversion of this query to a projection query including those orders in
// order to merge the results correctly.
//
// In both cases, the resulting objects returned to the higher layers of the
// stack will only include the information requested by the user; keys-only
// queries will discard all PropertyMap data, and projection queries will
// discard any field data that the user didn't ask for.
orders := fq.Orders()
if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) {
q = q.KeysOnly(false)
for _, o := range orders {
if o.Property == "__key__" {
continue
}
q = q.Project(o.Property)
}
}
return q.Finalize()
}
// runMergedQueries executes a user query `fq` against the parent datastore as
// well as the in-memory datastore, calling `cb` with the merged result set.
//
// It's expected that the caller of this function will apply limit and offset
// if the query contains those restrictions. This may convert the query to
// an expanded projection query with more data than the user asked for. It's the
// caller's responsibility to prune away the extra data.
//
// See also `dsTxnBuf.Run()`.
func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker,
memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap) error) error {
toRun, err := adjustQuery(fq)
if err != nil {
return err
}
cmpLower, cmpUpper := memory.GetBinaryBounds(fq)
cmpOrder := fq.Orders()
cmpFn := func(i *item) string {
return i.getCmpRow(cmpLower, cmpUpper, cmpOrder)
}
dedup := stringset.Set(nil)
distinct := stringset.Set(nil)
distinctOrder := []ds.IndexColumn(nil)
if len(fq.Project()) > 0 { // the original query was a projection query
if fq.Distinct() {
// it was a distinct projection query, so we need to dedup by distinct
// options.
distinct = stringset.New(0)
proj := fq.Project()
distinctOrder = make([]ds.IndexColumn, len(proj))
for i, p := range proj {
distinctOrder[i].Property = p
}
}
} else {
// the original was a normal or keys-only query, so we need to dedup by keys.
dedup = stringset.New(0)
}
stopChan := make(chan struct{})
parIter := queryToIter(stopChan, toRun, parentDS)
memIter := queryToIter(stopChan, toRun, memDS)
parItemGet := func() (*item, error) {
for {
itm, err := parIter()
if itm == nil || err != nil {
return nil, err
}
encKey := itm.getEncKey()
if sizes.has(encKey) || (dedup != nil && dedup.Has(encKey)) {
continue
}
return itm, nil
}
}
memItemGet := func() (*item, error) {
for {
itm, err := memIter()
if itm == nil || err != nil {
return nil, err
}
if dedup != nil && dedup.Has(itm.getEncKey()) {
continue
}
return itm, nil
}
}
defer func() {
close(stopChan)
parItemGet()
memItemGet()
}()
pitm, err := parItemGet()
if err != nil {
return err
}
mitm, err := memItemGet()
if err != nil {
return err
}
for {
// the err can be set during the loop below. If we come around the bend and
// it's set, then we need to return it. We don't check it immediately
// because it's set after we already have a good result to return to the
// user.
if err != nil {
return err
}
usePitm := pitm != nil
if pitm != nil && mitm != nil {
usePitm = cmpFn(pitm) < cmpFn(mitm)
} else if pitm == nil && mitm == nil {
break
}
toUse := (*item)(nil)
// we check the error at the beginning of the loop.
if usePitm {
toUse = pitm
pitm, err = parItemGet()
} else {
toUse = mitm
mitm, err = memItemGet()
}
if dedup != nil {
if !dedup.Add(toUse.getEncKey()) {
continue
}
}
if distinct != nil {
// NOTE: We know that toUse will not be used after this point for
// comparison purposes, so re-use its cmpRow property for our distinct
// filter here.
toUse.cmpRow = ""
if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder)) {
continue
}
}
if err := cb(toUse.key, toUse.data); err != nil {
return err
}
}
return nil
}
// toComparableString computes the byte-sortable 'order' string for the given
// key/PropertyMap.
//
// * start/end are byte sequences which are the inequality bounds of the
// query, if any. These are a serialized datastore.Property. If the
// inequality column is inverted, then start and end are also inverted and
// swapped with each other.
// * order is the list of sort orders in the actual executing queries.
// * k / pm are the data to derive a sortable string for.
//
// The result of this function is the series of serialized properties, one per
// order column, which represent this key/pm's first entry in the composite
// index that would point to it (e.g. the one with `order` sort orders).
func toComparableString(start, end []byte, order []ds.IndexColumn, k *ds.Key, pm ds.PropertyMap) (row, key []byte) {
doCmp := true
soFar := []byte{}
ps := serialize.PropertyMapPartially(k, nil)
for _, ord := range order {
row, ok := ps[ord.Property]
if !ok {
if pslice := pm.Slice(ord.Property); len(pslice) > 0 {
row = serialize.PropertySlice(pslice)
}
}
sort.Sort(row)
foundOne := false
for _, serialized := range row {
if ord.Descending {
serialized = serialize.Invert(serialized)
}
if doCmp {
maybe := serialize.Join(soFar, serialized)
cmp := bytes.Compare(maybe, start)
if cmp >= 0 {
foundOne = true
soFar = maybe
doCmp = len(soFar) < len(start)
break
}
} else {
foundOne = true
soFar = serialize.Join(soFar, serialized)
break
}
}
if !foundOne {
return nil, nil
}
}
if end != nil && bytes.Compare(soFar, end) >= 0 {
return nil, nil
}
return soFar, ps["__key__"][0]
}