/
server_blob_fetcher.go
375 lines (338 loc) · 14.2 KB
/
server_blob_fetcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
import "container/heap"
import "math/rand"
import "sync"
import "time"
import "v.io/v23/context"
import wire "v.io/v23/services/syncbase"
import "v.io/x/lib/vlog"
import "v.io/x/lib/nsync"
import blob "v.io/x/ref/services/syncbase/localblobstore"
import "v.io/x/ref/services/syncbase/server/interfaces"
// This file contains the machinery that runs in syncgroup "servers" that try
// to fetch blobs within the syncgroup so that they may be stored on the server
// for reliability and availability.
// ---------------------------------
// A BlobFetcherFunc can be passed into the public calls of this abstraction
// to customize how to fetch a blob.
type BlobFetcherFunc func(ctx *context.T, blobRef wire.BlobRef, clientData interface{}) error
// ---------------------------------
// A blobFetchState records the state of a blob that this server is attempting to fetch.
type blobFetchState struct {
bf *blobFetcher // The associated blobFetcher.
bst blob.BlobStore // The associated blob store.
blobRef wire.BlobRef // the blob's Id.
clientData interface{} // provided by client, and passed to fetchFunc
fetchFunc BlobFetcherFunc // function to be used to fetch blob
// The fields below are protected by bf.mu.
fetchAttempts uint32 // incremented on start of fetch attempt
stopFetching bool // Whether to abandon in-progress fetches, and not restart. Monotonic: false to true.
nextAttempt time.Time // time of next attempted fetch.
err error // error recorded on the last fetch.
heapIndex int // index, if in a heap; -1 if being fetched.
expiry time.Time // additional fetch attempts are not made after this time
}
// ---------------------------------
// A blobFetchStateHeap is a heap of blobFetchState pointers ordered by nextAttempt, earliest first.
type blobFetchStateHeap []*blobFetchState
// The following routines conform to "container/heap".Interface
func (h blobFetchStateHeap) Len() int { return len(h) }
func (h blobFetchStateHeap) Less(i int, j int) bool { return h[i].nextAttempt.Before(h[j].nextAttempt) }
func (h blobFetchStateHeap) Swap(i int, j int) {
h[i], h[j] = h[j], h[i]
h[i].heapIndex = i
h[j].heapIndex = j
}
func (h *blobFetchStateHeap) Push(x interface{}) {
bfs := x.(*blobFetchState)
bfs.heapIndex = len(*h)
*h = append(*h, bfs)
}
func (h *blobFetchStateHeap) Pop() interface{} {
old := *h
n := len(old)
bfs := old[n-1]
bfs.heapIndex = -1
*h = old[0 : n-1]
return bfs
}
// ---------------------------------
// A blobFetcher records the state of all blobs that this server is attempting to fetch.
type blobFetcher struct {
ctx *context.T // Context passed to NewBlobFetcher().
mu nsync.Mu // protects fields below, plus most fields in blobFetchState.
blobMap map[wire.BlobRef]*blobFetchState // blobs that should be fetched
blobQueue blobFetchStateHeap // same elements as blobMap; heap prioritized by nextAttempt time
maxFetcherThreads int // number of threads allowed to fetch blobs.
curFetcherThreads int // number of fetcher threads that exist (<= maxFetcherThreads)
fetchStarterThreadShutdown bool // whether the fetchStarterThread thread has shut down. Monotonic: false to true.
startFetchThreadCV nsync.CV // a fetch thread can be started (canStartAFetchThread())
shutdownCV nsync.CV // shutdown is complete (isShutdown() returns true)
}
// canStartAFetchThread() returns whether a new fetcher thread should be
// started by *bf's fetchStarterThread. The condition is that the number of
// outstanding fetcher threads is below the maximum, and the next blob to fetch
// has reached its "nextAttempt" deadline.
// Places in *firstBlobDeadline the nextAttempt time of the blob to fetch,
// or nsync.NoDeadline if none.
// Called with bf.mu held.
func (bf *blobFetcher) canStartAFetchThread(firstBlobDeadline *time.Time) (result bool) {
*firstBlobDeadline = nsync.NoDeadline
if bf.curFetcherThreads < bf.maxFetcherThreads && len(bf.blobQueue) != 0 { // a chance we could start a thread
result = bf.blobQueue[0].nextAttempt.Before(time.Now())
if !result { // failed only because it's not yet time; set the deadline to when it will be time
*firstBlobDeadline = bf.blobQueue[0].nextAttempt
}
}
return result
}
// isShutdown() returns whether *bf has been shut down. That is, the fetchStarterThread
// and all fetcher threads have finished.
func (bf *blobFetcher) isShutdown() bool {
return bf.fetchStarterThreadShutdown && bf.curFetcherThreads == 0
}
// fetchStarterThread() creates threads to fetch blobs (using fetchABlob()).
// It runs until the ctx passed to NewBlobFetcher() becomes cancelled or
// expires. On exit, it sets bf.fetchStarterThreadShutdown.
func (bf *blobFetcher) fetchStarterThread() {
bf.mu.Lock()
for bf.ctx.Err() == nil {
var deadline time.Time
// Wait until either we're shut down (bf.ctx.Done() is closed),
// or we can start a new fetch thread on some blob.
// canStartAFetchThread() computes the deadline for the CV wait.
for !bf.canStartAFetchThread(&deadline) &&
bf.startFetchThreadCV.WaitWithDeadline(&bf.mu, deadline, bf.ctx.Done()) == nsync.OK {
}
if bf.ctx.Err() == nil && bf.canStartAFetchThread(&deadline) {
// Remove the first blob from the priority queue, and start a thread to fetch it.
var toFetch *blobFetchState = heap.Pop(&bf.blobQueue).(*blobFetchState)
if toFetch.heapIndex != -1 {
panic("blobFetchState unexpectedly in heap")
}
bf.curFetcherThreads++
go bf.fetchABlob(toFetch)
}
}
bf.fetchStarterThreadShutdown = true
if bf.isShutdown() {
bf.shutdownCV.Broadcast()
}
bf.mu.Unlock()
}
// fetchABlob() attempts to fetch the blob identified by *toFetch. If the
// fetch was unsuccessful and if the client has not requested that fetching be
// abandoned, the fetch is queued for retrying. Otherwise, the *toFetch
// is removed from *bf.
func (bf *blobFetcher) fetchABlob(toFetch *blobFetchState) {
bf.mu.Lock()
if toFetch.heapIndex != -1 {
panic("blobFetchState unexpectedly on heap")
}
bf.mu.Unlock()
var err error = toFetch.fetchFunc(bf.ctx, toFetch.blobRef, toFetch.clientData)
bf.mu.Lock()
toFetch.err = err
toFetch.fetchAttempts++
// Maintain fetchAttempts in the on-disc Signpost data structure.
var sp interfaces.Signpost
toFetch.bst.GetSignpost(bf.ctx, toFetch.blobRef, &sp)
// We may write the Signpost back, even if the GetSignpost() call failed.
// On failure, sp will be a canonical empty Signpost.
if toFetch.fetchAttempts > sp.FetchAttempts {
sp.FetchAttempts = toFetch.fetchAttempts
toFetch.bst.SetSignpost(bf.ctx, toFetch.blobRef, &sp)
}
var nextFetchTime time.Time = time.Now().Add(bf.fetchDelay(toFetch.fetchAttempts))
if err == nil || bf.ctx.Err() != nil || toFetch.stopFetching || nextFetchTime.After(toFetch.expiry) {
// fetched blob, or we're told not to retry.
delete(bf.blobMap, toFetch.blobRef)
} else { // failed to fetch blob; try again
toFetch.nextAttempt = nextFetchTime
heap.Push(&bf.blobQueue, toFetch)
}
if toFetch.heapIndex == 0 || bf.curFetcherThreads == bf.maxFetcherThreads {
// new lowest fetch time, or there were no free threads.
bf.startFetchThreadCV.Broadcast()
}
bf.curFetcherThreads--
if bf.isShutdown() {
bf.shutdownCV.Broadcast()
}
bf.mu.Unlock()
}
// fetchDelay() returns how long the blobFetcher should wait before attempting to fetch a blob
// for which there have already been "fetchAttempts" failed fetch attempts.
func (bf *blobFetcher) fetchDelay(fetchAttempts uint32) time.Duration {
// Delay has a random component, and is exponential in failures,
// between about 1.5s and about 11 days. (fetchAttempts will be 1 on
// the first call for a blob, since this is not invoked until there's
// been a failure.)
if fetchAttempts > 20 { // Limit delay to around a million seconds---11 days.
fetchAttempts = 20
}
return ((500 + time.Duration(rand.Int31n(500))) * time.Millisecond) << fetchAttempts
}
// StartFetchingBlob() adds a blobRef to blobFetcher *bf, if it's not already
// known to it, and not shutting down. The client-provided function
// fetchFunc() will be used to fetch the blob. Attempts to fetch the blob
// will continue until expiry, though at least one new attempt will be made,
// even if after expiry.
func (bf *blobFetcher) StartFetchingBlob(bst blob.BlobStore, blobRef wire.BlobRef,
clientData interface{}, expiry time.Time, fetchFunc BlobFetcherFunc) {
bf.mu.Lock()
if bf.ctx.Err() == nil {
var bfs *blobFetchState
var found bool
bfs, found = bf.blobMap[blobRef]
if !found {
bfs = &blobFetchState{
bf: bf,
bst: bst,
blobRef: blobRef,
clientData: clientData,
fetchFunc: fetchFunc,
expiry: expiry,
heapIndex: -1,
}
var sp interfaces.Signpost
if err := bst.GetSignpost(bf.ctx, blobRef, &sp); err == nil {
bfs.fetchAttempts = sp.FetchAttempts
bfs.nextAttempt = time.Now().Add(bf.fetchDelay(bfs.fetchAttempts))
}
bf.blobMap[blobRef] = bfs
heap.Push(&bf.blobQueue, bfs)
if bfs.heapIndex == 0 { // a new lowest fetch time
bf.startFetchThreadCV.Broadcast()
}
} else if expiry.After(bfs.expiry) {
bfs.expiry = expiry
}
}
bf.mu.Unlock()
}
// StopFetchingBlob() removes blobRef from blobFetcher *bf.
// It may still be being fetched, but failures will no longer be retried,
// and an in progress fetch may be halted if possible.
func (bf *blobFetcher) StopFetchingBlob(blobRef wire.BlobRef) {
bf.mu.Lock()
var bfs *blobFetchState
var found bool
if bfs, found = bf.blobMap[blobRef]; found {
bfs.stopFetching = true // tell any in-progress fetcher thread to stop if it can.
if bfs.heapIndex != -1 { // if not currently fetching, forget blob.
delete(bf.blobMap, bfs.blobRef)
heap.Remove(&bf.blobQueue, bfs.heapIndex)
} // else fetching thread will forget the blob when it finishes.
}
bf.mu.Unlock()
}
// NewBlobFetcher() returns a new blobFetcher that can use maxThreads fetcher threads.
func NewBlobFetcher(ctx *context.T, maxThreads int) *blobFetcher {
bf := &blobFetcher{ctx: ctx, maxFetcherThreads: maxThreads, blobMap: make(map[wire.BlobRef]*blobFetchState)}
heap.Init(&bf.blobQueue) // "container/heap"'s spec requires this---apparently even on an empty heap.
go bf.fetchStarterThread()
return bf
}
// WaitForExit() waits *bf is fully shut down. Typically, this is used after
// cancelling the context passed to NewBlobFetcher().
func (bf *blobFetcher) WaitForExit() {
bf.mu.Lock()
for !bf.isShutdown() {
bf.shutdownCV.Wait(&bf.mu)
}
bf.mu.Unlock()
}
// ---------------------------------
// serverBlobScan() scans the blobs in *s, and for those blobs for which it is
// a server in some syncgroup that fetches the blobs, it gives the blob ids to
// blob fetcher *bf. Servers are assumed to have space to keep all blobs.
// The function fetchFunc() is used to fetch blobs.
func (s *syncService) serverBlobScan(ctx *context.T, bf *blobFetcher,
clientData interface{}, fetchFunc BlobFetcherFunc) (err error) {
// Construct a map that indicates which blobs are available locally,
// and expunge blobFetchState records for such blobs.
haveBlob := make(map[wire.BlobRef]bool)
var bs blob.Stream = s.bst.ListBlobIds(ctx)
for bs.Advance() && ctx.Err() == nil {
br, brErr := s.bst.NewBlobReader(ctx, bs.Value())
if brErr == nil && br.IsFinalized() {
haveBlob[wire.BlobRef(bs.Value())] = true
}
}
err = bs.Err()
bs.Cancel() // in case we didn't finish advancing.
// For every blob whose id has been seen locally, there is a Signpost.
// Iterate over them, and fetch the ones not available locally for
// which the current syncbase is a "server" in some syncgroup.
if err == nil && ctx.Err() == nil {
var sps blob.SignpostStream = s.bst.NewSignpostStream(ctx)
for sps.Advance() && ctx.Err() == nil {
blobRef := sps.BlobId()
if !haveBlob[blobRef] &&
len(s.syncgroupsWithServer(ctx, wire.Id{}, s.name, sps.Signpost().SgIds)) != 0 {
// We do not have the blob locally, and this
// syncbase is a server in some syncgroup in
// which the blob has been seen.
bf.StartFetchingBlob(s.bst, blobRef, clientData,
time.Now().Add(720*time.Hour /*a month*/), fetchFunc)
}
}
err = sps.Err()
sps.Cancel() // in case we didn't finish advancing.
}
return err
}
// SyncServiceBlobFetcher() returns a pointer to the blobFetcher associated with syncService *s,
// created by a call to ServerBlobFetcher(). It may return nil if no such pointer exists.
func (s *syncService) SyncServiceBlobFetcher() (bf *blobFetcher) {
s.blobFetcherMu.Lock()
bf = s.blobFetcher
s.blobFetcherMu.Unlock()
return bf
}
// DefaultBlobFetcherFunc() is a BlobFetcherFunc that fetches blob blobRef in the normal way.
func DefaultBlobFetcherFunc(ctx *context.T, blobRef wire.BlobRef, clientData interface{}) error {
s := clientData.(*syncService)
return s.fetchBlobRemote(ctx, blobRef, nil, nil, 0)
}
// ServerBlobFetcher() calls serverBlobScan() repeatedly on ssm (which must contain a *syncService),
// with gaps specified by parameters in parameters.go before scanning them all again. It
// stops only if the context *ctx is cancelled. The function fetchFunc() will
// be used to fetch blobs, and passed the argument clientData on each call.
// If done!=nil, done.Done() is called just before the routine returns.
func ServerBlobFetcher(ctx *context.T, ssm interfaces.SyncServerMethods, done *sync.WaitGroup) {
bf := NewBlobFetcher(ctx, serverBlobFetchConcurrency)
ss := ssm.(*syncService)
ss.blobFetcherMu.Lock()
ss.blobFetcher = bf
ss.blobFetcherMu.Unlock()
var delay time.Duration = serverBlobFetchInitialScanDelay
errCount := 0 // state for limiting log records
for ctx.Err() == nil {
select {
case <-time.After(delay):
case <-ctx.Done():
}
startTime := time.Now()
if ctx.Err() == nil {
if err := ss.serverBlobScan(ctx, bf, ss, DefaultBlobFetcherFunc); err != nil {
if (errCount & (errCount - 1)) == 0 { // errCount is 0 or a power of 2.
vlog.Errorf("ServerBlobFetcher:%d: %v", errCount, err)
}
errCount++
}
}
delay = serverBlobFetchExtraScanDelay + serverBlobFetchScanDelayMultiplier*time.Since(startTime)
}
ss.blobFetcherMu.Lock()
ss.blobFetcher = nil
ss.blobFetcherMu.Unlock()
bf.WaitForExit()
if done != nil {
done.Done()
}
}