-
Notifications
You must be signed in to change notification settings - Fork 397
/
retain.go
370 lines (308 loc) · 9.91 KB
/
retain.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package retain
import (
"context"
"sync"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/bloomfilter"
"storj.io/common/storj"
"storj.io/storj/storagenode/pieces"
)
var (
mon = monkit.Package()
// Error is the default error class for retain errors.
Error = errs.Class("retain")
)
// Config defines parameters for the retain service.
type Config struct {
MaxTimeSkew time.Duration `help:"allows for small differences in the satellite and storagenode clocks" default:"72h0m0s"`
Status Status `help:"allows configuration to enable, disable, or test retain requests from the satellite. Options: (disabled/enabled/debug)" default:"enabled"`
Concurrency int `help:"how many concurrent retain requests can be processed at the same time." default:"5"`
}
// Request contains all the info necessary to process a retain request.
type Request struct {
SatelliteID storj.NodeID
CreatedBefore time.Time
Filter *bloomfilter.Filter
}
// Status is a type defining the enabled/disabled status of retain requests.
type Status uint32
const (
// Disabled means we do not do anything with retain requests.
Disabled Status = iota + 1
// Enabled means we fully enable retain requests and delete data not defined by bloom filter.
Enabled
// Debug means we partially enable retain requests, and print out pieces we should delete, without actually deleting them.
Debug
)
// Set implements pflag.Value.
func (v *Status) Set(s string) error {
switch s {
case "disabled":
*v = Disabled
case "enabled":
*v = Enabled
case "debug":
*v = Debug
default:
return Error.New("invalid status %q", s)
}
return nil
}
// Type implements pflag.Value.
func (*Status) Type() string { return "storj.Status" }
// String implements pflag.Value.
func (v *Status) String() string {
switch *v {
case Disabled:
return "disabled"
case Enabled:
return "enabled"
case Debug:
return "debug"
default:
return "invalid"
}
}
// Service queues and processes retain requests from satellites.
//
// architecture: Worker
type Service struct {
log *zap.Logger
config Config
cond sync.Cond
queued map[storj.NodeID]Request
working map[storj.NodeID]struct{}
group errgroup.Group
closedOnce sync.Once
closed chan struct{}
started bool
store *pieces.Store
}
// NewService creates a new retain service.
func NewService(log *zap.Logger, store *pieces.Store, config Config) *Service {
return &Service{
log: log,
config: config,
cond: *sync.NewCond(&sync.Mutex{}),
queued: make(map[storj.NodeID]Request),
working: make(map[storj.NodeID]struct{}),
closed: make(chan struct{}),
store: store,
}
}
// Queue adds a retain request to the queue.
// It discards a request for a satellite that already has a queued request.
// true is returned if the request is queued and false is returned if it is discarded.
func (s *Service) Queue(req Request) bool {
s.cond.L.Lock()
defer s.cond.L.Unlock()
select {
case <-s.closed:
return false
default:
}
s.queued[req.SatelliteID] = req
s.cond.Broadcast()
return true
}
// Run listens for queued retain requests and processes them as they come in.
func (s *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// Hold the lock while we spawn the workers because a concurrent Close call
// can race and wait for them. We later temporarily drop the lock while we
// wait for the workers to exit.
s.cond.L.Lock()
defer s.cond.L.Unlock()
// Ensure Run is only ever called once. If not, there's many subtle
// bugs with concurrency.
if s.started {
return Error.New("service already started")
}
s.started = true
// Ensure Run doesn't start after it's closed. Then we may leak some
// workers.
select {
case <-s.closed:
return Error.New("service Closed")
default:
}
// Create a sub-context that we can cancel.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Start a goroutine that does most of the work of Close in the case
// the context is canceled and broadcasts to anyone waiting on the condition
// variable to check the state.
s.group.Go(func() error {
defer s.cond.Broadcast()
defer cancel()
select {
case <-s.closed:
return nil
case <-ctx.Done():
s.cond.L.Lock()
s.closedOnce.Do(func() { close(s.closed) })
s.cond.L.Unlock()
return ctx.Err()
}
})
for i := 0; i < s.config.Concurrency; i++ {
s.group.Go(func() error {
// Grab lock to check things.
s.cond.L.Lock()
defer s.cond.L.Unlock()
for {
// If we have closed, exit.
select {
case <-ctx.Done():
return ctx.Err()
case <-s.closed:
return nil
default:
}
// Grab next item from queue.
request, ok := s.next()
if !ok {
// Nothing in queue, go to sleep and wait for
// things shutting down or next item.
s.cond.Wait()
continue
}
// Temporarily Unlock so others can work on the queue while
// we're working on the retain request.
s.cond.L.Unlock()
// Signal to anyone waiting that the queue state changed.
s.cond.Broadcast()
// Run retaining process.
err := s.retainPieces(ctx, request)
if err != nil {
s.log.Error("retain pieces failed", zap.Error(err))
}
// Mark the request as finished. Relock to maintain that
// at the top of the for loop the lock is held.
s.cond.L.Lock()
s.finish(request)
s.cond.Broadcast()
}
})
}
// Unlock while we wait for the workers to exit.
s.cond.L.Unlock()
err = s.group.Wait()
s.cond.L.Lock()
// Clear the queue after Wait has exited. We're sure no more entries
// can be added after we acquire the mutex because wait spawned a
// worker that ensures the closed channel is closed before it exits.
s.queued = nil
s.cond.Broadcast()
return err
}
// next returns next item from queue, requires mutex to be held.
func (s *Service) next() (Request, bool) {
for id, request := range s.queued {
// Check whether a worker is retaining this satellite,
// if, yes, then try to get something else from the queue.
if _, ok := s.working[request.SatelliteID]; ok {
continue
}
delete(s.queued, id)
// Mark this satellite as being worked on.
s.working[request.SatelliteID] = struct{}{}
return request, true
}
return Request{}, false
}
// finish marks the request as finished, requires mutex to be held.
func (s *Service) finish(request Request) {
delete(s.working, request.SatelliteID)
}
// Close causes any pending Run to exit and waits for any retain requests to
// clean up.
func (s *Service) Close() error {
s.cond.L.Lock()
s.closedOnce.Do(func() { close(s.closed) })
s.cond.L.Unlock()
s.cond.Broadcast()
// ignoring error here, because the same error is already returned from Run.
_ = s.group.Wait()
return nil
}
// TestWaitUntilEmpty blocks until the queue and working is empty.
// When Run exits, it empties the queue.
func (s *Service) TestWaitUntilEmpty() {
s.cond.L.Lock()
defer s.cond.L.Unlock()
for len(s.queued) > 0 || len(s.working) > 0 {
s.cond.Wait()
}
}
// Status returns the retain status.
func (s *Service) Status() Status {
return s.config.Status
}
func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {
// if retain status is disabled, return immediately
if s.config.Status == Disabled {
return nil
}
defer mon.Task()(&ctx, req.SatelliteID, req.CreatedBefore)(&err)
numDeleted := 0
satelliteID := req.SatelliteID
filter := req.Filter
// subtract some time to leave room for clock difference between the satellite and storage node
createdBefore := req.CreatedBefore.Add(-s.config.MaxTimeSkew)
started := time.Now().UTC()
filterHashCount, _ := req.Filter.Parameters()
mon.IntVal("garbage_collection_created_before").Observe(createdBefore.Unix())
mon.IntVal("garbage_collection_filter_hash_count").Observe(int64(filterHashCount))
mon.IntVal("garbage_collection_filter_size").Observe(filter.Size())
mon.IntVal("garbage_collection_started").Observe(started.Unix())
s.log.Info("Prepared to run a Retain request.",
zap.Time("Created Before", createdBefore),
zap.Int64("Filter Size", filter.Size()),
zap.Stringer("Satellite ID", satelliteID))
pieceIDs, piecesCount, piecesSkipped, err := s.store.SatellitePiecesToTrash(ctx, satelliteID, createdBefore, filter)
if err != nil {
return Error.Wrap(err)
}
piecesToDeleteCount := len(pieceIDs)
for i := range pieceIDs {
pieceID := pieceIDs[i]
s.log.Debug("About to move piece to trash",
zap.Stringer("Satellite ID", satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.String("Status", s.config.Status.String()))
// if retain status is enabled, delete pieceid
if s.config.Status == Enabled {
if err = s.trash(ctx, satelliteID, pieceID); err != nil {
s.log.Warn("failed to delete piece",
zap.Stringer("Satellite ID", satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(err))
return nil
}
}
numDeleted++
}
mon.IntVal("garbage_collection_pieces_count").Observe(piecesCount)
mon.IntVal("garbage_collection_pieces_skipped").Observe(piecesSkipped)
mon.IntVal("garbage_collection_pieces_to_delete_count").Observe(int64(piecesToDeleteCount))
mon.IntVal("garbage_collection_pieces_deleted").Observe(int64(numDeleted))
mon.DurationVal("garbage_collection_loop_duration").Observe(time.Now().UTC().Sub(started))
s.log.Info("Moved pieces to trash during retain", zap.Int("num deleted", numDeleted), zap.String("Retain Status", s.config.Status.String()))
return nil
}
// trash wraps retains piece deletion to monitor moving retained piece to trash error during garbage collection.
func (s *Service) trash(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) {
defer mon.Task()(&ctx, satelliteID)(&err)
return s.store.Trash(ctx, satelliteID, pieceID)
}
// HowManyQueued peeks at the number of bloom filters queued.
func (s *Service) HowManyQueued() int {
return len(s.queued)
}