mirrored from https://chromium.googlesource.com/infra/luci/luci-go
/
queries.go
138 lines (124 loc) · 5.82 KB
/
queries.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package paged implements a helper for making paginated Datastore queries.
package paged
import (
"context"
"reflect"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/gae/service/datastore"
)
// Response is an interface implemented by ListResponses which support page
// tokens.
type Response interface {
proto.Message
// GetNextPageToken returns a token to use to fetch the next page of results.
GetNextPageToken() string
}
// cursorCBType is the reflect.Type of a datastore.CursorCB.
var cursorCBType = reflect.TypeOf((datastore.CursorCB)(nil))
// returnedNil is a []reflect.Value{} containing one nil error.
var returnedNil = reflect.ValueOf(func() error { return nil }).Call([]reflect.Value{})
// returnedStop is a []reflect.Value{} containing one datastore.Stop error.
var returnedStop = reflect.ValueOf(func() error { return datastore.Stop }).Call([]reflect.Value{})
// Query executes a query to fetch the given page of results, invoking a
// callback function for each key or entity returned by the query. If the page
// isn't the last of the query, the given response will have its next page token
// set appropriately.
//
// A non-positive limit means to fetch all results starting at the given page
// token in a single page. An empty page token means to start at the first page.
//
// The callback must be a function of one argument, the type of which is either
// *datastore.Key (implies keys-only query) or a pointer to a struct to decode
// the returned entity into. The callback should return an error, which if not
// nil halts the query, and if the error is not datastore.Stop, causes this
// function to return an error as well. See datastore.Run for more information.
// No maximum page size is imposed, use datastore.Stop to enforce one.
func Query(c context.Context, lim int32, tok string, rsp Response, q *datastore.Query, cb interface{}) error {
// Validate as much about the callback as this function relies on.
// The rest is validated by datastore.Run.
v := reflect.ValueOf(cb)
if v.Kind() != reflect.Func {
return errors.Reason("callback must be a function").Err()
}
t := v.Type()
switch {
case t.NumIn() != 1:
return errors.Reason("callback function must accept one argument").Err()
case t.NumOut() != 1:
return errors.Reason("callback function must return one value").Err()
}
// Modify the query with the request parameters.
if tok != "" {
cur, err := datastore.DecodeCursor(c, tok)
if err != nil {
return status.Errorf(codes.InvalidArgument, "invalid page token %q", tok)
}
q = q.Start(cur)
}
if lim > 0 {
// Peek ahead at the next result to determine if the cursor for the given page size
// is worth returning. The cursor should be omitted if there are no further results.
q = q.Limit(lim + 1)
}
// Wrap the callback with a custom function that grabs the cursor (if necessary) before
// invoking the callback for each result up to the page size specified in the request.
// This is the type of function datastore.Run will receive as an argument.
// TODO(smut): Move this block to gae/datastore, since it doesn't depend on PagedRequest.
t = reflect.FuncOf([]reflect.Type{t.In(0), cursorCBType}, []reflect.Type{t.Out(0)}, false)
var cur datastore.Cursor
// If the query is not limited and the callback never returns datastore.Stop, the query runs
// until the end so it's not necessary to set the next page token. If the callback does
// return datastore.Stop, save the cursor but peek at the next result. Only set the next
// page token if there is a next result.
// If the query is limited, the limit is set to one more than the specified value in order
// to peek at the next result by default. Save the cursor at the limit but peek at the next
// result. Only set the next page token if there is a next result. The callback may return
// datastore.Stop ahead of the limit. If it does, save the cursor but peek at the next result
// Only set the next page token if there is a next result.
i := int32(0)
curCB := reflect.MakeFunc(t, func(args []reflect.Value) []reflect.Value {
i++
if cur != nil {
// Cursor is set below, when the result is at the limit or datastore.Stop
// is returned by the callback. Since the query is still running, there
// are more results. Set the page token and halt the query. Don't invoke
// the callback since it isn't expecting any more results.
f := reflect.ValueOf(rsp).Elem().FieldByName("NextPageToken")
f.SetString(cur.String())
return returnedStop
}
// Invoke the callback. Per t, it returns one argument (the error).
ret := v.Call([]reflect.Value{args[0]})
// Save the cursor if the callback wants to stop or the query is limited and
// this is the last requested result. In either case peek at the next result.
if ret[0].Interface() == datastore.Stop || (i == lim && ret[0].IsNil()) {
var err error
cur, err = args[1].Interface().(datastore.CursorCB)()
if err != nil {
return []reflect.Value{reflect.ValueOf(errors.Annotate(err, "failed to fetch cursor").Err())}
}
return returnedNil
}
return ret
}).Interface()
if err := datastore.Run(c, q, curCB); err != nil {
return errors.Annotate(err, "failed to fetch entities").Err()
}
return nil
}