-
Notifications
You must be signed in to change notification settings - Fork 0
/
manager.go
364 lines (318 loc) · 10.4 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datastorecache
import (
"context"
"fmt"
"net/http"
"strings"
"sync/atomic"
"time"
"github.com/TriggerMail/luci-go/appengine/memlock"
"github.com/TriggerMail/luci-go/common/clock"
"github.com/TriggerMail/luci-go/common/errors"
log "github.com/TriggerMail/luci-go/common/logging"
"github.com/TriggerMail/luci-go/common/sync/parallel"
"github.com/TriggerMail/luci-go/server/router"
"go.chromium.org/gae/service/datastore"
"go.chromium.org/gae/service/info"
"github.com/julienschmidt/httprouter"
)
func errHTTPHandler(fn func(c context.Context, req *http.Request, params httprouter.Params) error) router.Handler {
return func(ctx *router.Context) {
err := fn(ctx.Context, ctx.Request, ctx.Params)
if err == nil {
// Handler returned no error, everything is good.
return
}
// Handler returned an error, dump it to output.
ctx.Writer.WriteHeader(http.StatusInternalServerError)
// Log all of our stack lines individually, so we don't overflow the
// maximum log message size with a full stack.
stk := errors.RenderStack(err)
log.WithError(err).Errorf(ctx.Context, "Handler returned error.")
for _, line := range stk {
log.Errorf(ctx.Context, ">> %s", line)
}
dumpErr := func() error {
for _, line := range stk {
if _, err := ctx.Writer.Write([]byte(line + "\n")); err != nil {
return err
}
}
return nil
}()
if dumpErr != nil {
log.WithError(dumpErr).Errorf(ctx.Context, "Failed to dump error stack.")
}
}
}
// manager is initialized to perform the management cron task.
type manager struct {
cache *Cache
// queryBatchSize is the size of the query batch to run. This must be >0.
queryBatchSize int32
}
// installCronRoute installs a handler for this manager's cron task into the
// supplied Router at the specified path.
//
// It is recommended to assert in the middleware that this endpoint is only
// accessible from a cron task.
func (m *manager) installCronRoute(path string, r *router.Router, base router.MiddlewareChain) {
r.GET(path, base, errHTTPHandler(m.handleManageCronGET))
}
func (m *manager) handleManageCronGET(c context.Context, req *http.Request, params httprouter.Params) error {
var h Handler
if hf := m.cache.HandlerFunc; hf != nil {
h = hf(c)
}
// NOTE: All manager runs currently have exactly one shard, #0.
const shardID = 0
shard := managerShard{
manager: m,
h: h,
now: clock.Now(c).UTC(),
st: managerShardStats{
Shard: shardID + 1, // +1 b/c 0 is invalid ID.
},
clientID: strings.Join([]string{
"datastore_cache_manager",
info.RequestID(c),
}, "\x00"),
shard: 0,
shardKey: fmt.Sprintf("datastore_cache_manager_shard_%d", shardID),
}
return shard.run(c)
}
type managerShard struct {
*manager
// h is this manager's Handler. Note this can be nil, in which case the cached
// entries will be pruned eventually.
h Handler
clientID string
now time.Time
st managerShardStats
shard int
shardKey string
// entries is the number of observed cache entries.
entries int32
// errors is the number of errors encountered. If this is non-zero, our
// handler will return an error.
errors int32
}
func (ms *managerShard) observeEntry() { atomic.AddInt32(&ms.entries, 1) }
func (ms *managerShard) observeErrors(c int32) { atomic.AddInt32(&ms.errors, c) }
func (ms *managerShard) run(c context.Context) error {
// Enter our cache cacheNamespace. We'll leave this when calling Handler
// functions.
c = ms.cache.withNamespace(c)
c = log.SetField(c, "shard", ms.shard)
// Validate shard configuration.
switch {
case ms.queryBatchSize <= 0:
return errors.Reason("invalid query batch size %d", ms.queryBatchSize).Err()
}
// Take out a memlock on our cache shard.
return memlock.TryWithLock(c, ms.shardKey, ms.clientID, func(c context.Context) (rv error) {
// Output our stats on completion.
defer func() {
ms.st.LastEntryCount = int(ms.entries)
if rv == nil {
ms.st.LastSuccessfulRun = ms.now
}
if rv := datastore.Put(c, &ms.st); rv != nil {
log.WithError(rv).Errorf(c, "Failed to Put() stats on completion.")
}
}()
if err := ms.runLocked(c); err != nil {
return errors.Annotate(err, "running maintenance loop").Err()
}
// If we observed errors during processing, note this.
if ms.errors > 0 {
return errors.Reason("%d error(s) encountered during processing", ms.errors).Err()
}
return nil
})
}
// runLocked runs the main main maintenance loop.
//
// As the run is executed, stats can be collected in ms.st. These will be output
// to datastore on completion.
func (ms *managerShard) runLocked(c context.Context) error {
workers := ms.cache.Parallel
if workers <= 0 {
workers = 1
}
// NOTE: This does not currently restrain itself to the current shard. This
// can be done using a "__key__" inequality filter to bound the query.
prototype := entry{
CacheName: ms.cache.Name,
}
q := datastore.NewQuery(prototype.kind())
var (
totalEntries = 0
totalRefreshed = 0
totalPruned = 0
totalErrors = int32(0)
entries = make([]*entry, 0, ms.queryBatchSize)
putBuf = make([]*entry, ms.queryBatchSize)
deleteBuf = make([]*entry, ms.queryBatchSize)
)
// Calculate our pruning threshold. If an entry's "LastAccessed" is <= this
// threshold, it is candidate for pruning.
var pruneThreshold time.Time
if pi := ms.cache.pruneInterval(); pi > 0 {
pruneThreshold = ms.now.Add(-pi)
}
// handleEntries refreshes the accumulated entries. It is used as a callback
// in between query batches, as well as a finalizer after the queries
// complete.
//
// handleEntries is, itself, not goroutine-safe.
//
// It will refresh in parallel, adding entries to "putBuf" or "deleteBuf" as
// appropriate. At the end of its operation, all deferred datastore
// operations will execute, and the accumulated entries list will be purged
// for next round.
handleEntries := func(c context.Context) error {
if len(entries) == 0 {
return nil
}
// Use atomic-friendly int32 values to index the put/delete buffers. This
// will let our parallel goroutines safely add entries with very low
// overhead.
var putIdx, deleteIdx int32
putEntry := func(e *entry) { putBuf[atomic.AddInt32(&putIdx, 1)-1] = e }
deleteEntry := func(e *entry) { deleteBuf[atomic.AddInt32(&deleteIdx, 1)-1] = e }
// Process each entry in parallel.
//
// Each task will return a nil error, so the error result does not need to
// be observed.
_ = parallel.WorkPool(workers, func(taskC chan<- func() error) {
for _, e := range entries {
e := e
taskC <- func() error {
// Is this entry candidate for pruning?
if !(pruneThreshold.IsZero() || e.LastAccessed.After(pruneThreshold)) {
log.Fields{
"key": e.keyHash(),
"lastRefresh": e.LastRefreshed,
"lastAccessed": e.LastAccessed,
"pruneThreshold": pruneThreshold,
}.Infof(c, "Pruning expired cache entry.")
deleteEntry(e)
return nil
}
// Is this cache entry candidate for refresh?
if ms.h != nil {
refreshInterval := ms.h.RefreshInterval(e.Key)
if refreshInterval > 0 && !e.LastRefreshed.After(ms.now.Add(-refreshInterval)) {
// Call our Handler's Refresh function. We leave our cache namespace
// first.
switch value, delta, err := doRefresh(c, ms.h, e, ""); err {
case nil:
// Refresh successful! Update the entry.
//
// Even if he data hasn't changed, the LastRefreshed time has, and
// the cost of the "Put" is the same either way.
e.LastRefreshed = ms.now
e.LastRefreshDelta = int64(delta)
e.loadValue(value)
putEntry(e)
return nil
case ErrDeleteCacheEntry:
log.Fields{
"key": e.keyHash(),
}.Debugf(c, "Refresh requested entry deletion.")
deleteEntry(e)
return nil
default:
log.Fields{
log.ErrorKey: err,
"key": e.keyHash(),
"lastRefresh": e.LastRefreshed,
}.Errorf(c, "Failed to refresh cache entry.")
atomic.AddInt32(&totalErrors, 1)
}
}
}
return nil
}
}
})
// Clear our entries buffer for next round.
entries = entries[:0]
// Flush our put/delete buffers. Accumulate errors (best effort) and return
// them as a batch.
//
// A failure here is a datastore failure, and will halt processing by
// propagating from the callback to the query return value.
_ = parallel.FanOutIn(func(taskC chan<- func() error) {
if putIdx > 0 {
taskC <- func() error {
if err := datastore.Put(c, putBuf[:putIdx]); err != nil {
log.Fields{
log.ErrorKey: err,
"size": putIdx,
}.Errorf(c, "Failed to Put batch.")
atomic.AddInt32(&totalErrors, 1)
} else {
totalRefreshed += int(putIdx)
}
return nil
}
}
if deleteIdx > 0 {
taskC <- func() error {
if err := datastore.Delete(c, deleteBuf[:deleteIdx]); err != nil {
log.Fields{
log.ErrorKey: err,
"size": deleteIdx,
}.Errorf(c, "Failed to Delete batch.")
atomic.AddInt32(&totalErrors, 1)
} else {
totalPruned += int(deleteIdx)
}
return nil
}
}
})
return nil
}
err := datastore.RunBatch(c, ms.queryBatchSize, q, func(e *entry) error {
totalEntries++
ms.observeEntry()
entries = append(entries, e)
if len(entries) >= int(ms.queryBatchSize) {
// Hit the end of a query batch. Process entries.
handleEntries(c)
}
return nil
})
if err != nil {
return errors.Annotate(err, "failed to run entry query").Err()
}
// Flush any outstanding entries (ignore error, will always be nil).
_ = handleEntries(c)
if totalErrors > 0 {
ms.observeErrors(totalErrors)
}
log.Fields{
"entries": totalEntries,
"errors": totalErrors,
"refreshed": totalRefreshed,
"pruned": totalPruned,
}.Infof(c, "Successfully updated cache entries.")
return nil
}