-
Notifications
You must be signed in to change notification settings - Fork 402
/
cache.go
494 lines (431 loc) · 15.5 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces
import (
"context"
"fmt"
"sync"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/storage"
)
// CacheService updates the space used cache.
//
// architecture: Chore
type CacheService struct {
log *zap.Logger
usageCache *BlobsUsageCache
store *Store
Loop *sync2.Cycle
// InitFence is released once the cache's Run method returns or when it has
// completed its first loop. This is useful for testing.
InitFence sync2.Fence
}
// NewService creates a new cache service that updates the space usage cache on startup and syncs the cache values to
// persistent storage on an interval.
func NewService(log *zap.Logger, usageCache *BlobsUsageCache, pieces *Store, interval time.Duration) *CacheService {
return &CacheService{
log: log,
usageCache: usageCache,
store: pieces,
Loop: sync2.NewCycle(interval),
}
}
// Run recalculates the space used cache once and also runs a loop to sync the space used cache
// to persistent storage on an interval.
func (service *CacheService) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
defer service.InitFence.Release()
totalsAtStart := service.usageCache.copyCacheTotals()
// recalculate the cache once
piecesTotal, piecesContentSize, totalsBySatellite, err := service.store.SpaceUsedTotalAndBySatellite(ctx)
if err != nil {
service.log.Error("error getting current used space: ", zap.Error(err))
return err
}
trashTotal, err := service.usageCache.Blobs.SpaceUsedForTrash(ctx)
if err != nil {
service.log.Error("error getting current used space for trash: ", zap.Error(err))
return err
}
service.usageCache.Recalculate(
piecesTotal,
totalsAtStart.piecesTotal,
piecesContentSize,
totalsAtStart.piecesContentSize,
trashTotal,
totalsAtStart.trashTotal,
totalsBySatellite,
totalsAtStart.spaceUsedBySatellite,
)
if err = service.store.spaceUsedDB.Init(ctx); err != nil {
service.log.Error("error during init space usage db: ", zap.Error(err))
return err
}
return service.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// on a loop sync the cache values to the db so that we have the them saved
// in the case that the storagenode restarts
if err := service.PersistCacheTotals(ctx); err != nil {
service.log.Error("error persisting cache totals to the database: ", zap.Error(err))
}
service.InitFence.Release()
return err
})
}
// PersistCacheTotals saves the current totals of the space used cache to the database
// so that if the storagenode restarts it can retrieve the latest space used
// values without needing to recalculate since that could take a long time.
func (service *CacheService) PersistCacheTotals(ctx context.Context) error {
cache := service.usageCache
cache.mu.Lock()
defer cache.mu.Unlock()
if err := service.store.spaceUsedDB.UpdatePieceTotals(ctx, cache.piecesTotal, cache.piecesContentSize); err != nil {
return err
}
if err := service.store.spaceUsedDB.UpdatePieceTotalsForAllSatellites(ctx, cache.spaceUsedBySatellite); err != nil {
return err
}
if err := service.store.spaceUsedDB.UpdateTrashTotal(ctx, cache.trashTotal); err != nil {
return err
}
return nil
}
// Init initializes the space used cache with the most recent values that were stored persistently.
func (service *CacheService) Init(ctx context.Context) (err error) {
piecesTotal, piecesContentSize, err := service.store.spaceUsedDB.GetPieceTotals(ctx)
if err != nil {
service.log.Error("CacheServiceInit error during initializing space usage cache GetTotal:", zap.Error(err))
return err
}
totalsBySatellite, err := service.store.spaceUsedDB.GetPieceTotalsForAllSatellites(ctx)
if err != nil {
service.log.Error("CacheServiceInit error during initializing space usage cache GetTotalsForAllSatellites:", zap.Error(err))
return err
}
trashTotal, err := service.store.spaceUsedDB.GetTrashTotal(ctx)
if err != nil {
service.log.Error("CacheServiceInit error during initializing space usage cache GetTrashTotal:", zap.Error(err))
return err
}
service.usageCache.init(piecesTotal, piecesContentSize, trashTotal, totalsBySatellite)
return nil
}
// Close closes the loop.
func (service *CacheService) Close() (err error) {
service.Loop.Close()
return nil
}
// BlobsUsageCache is a blob storage with a cache for storing
// totals of current space used.
//
// The following names have the following meaning:
// - piecesTotal: the total space used by pieces, including headers
// - piecesContentSize: the space used by piece content, not including headers
// - trashTotal: the total space used in the trash, including headers
//
// pieceTotal and pieceContentSize are the corollary for a single file.
//
// architecture: Database
type BlobsUsageCache struct {
storage.Blobs
log *zap.Logger
mu sync.Mutex
piecesTotal int64
piecesContentSize int64
trashTotal int64
spaceUsedBySatellite map[storj.NodeID]SatelliteUsage
}
// NewBlobsUsageCache creates a new disk blob store with a space used cache.
func NewBlobsUsageCache(log *zap.Logger, blob storage.Blobs) *BlobsUsageCache {
return &BlobsUsageCache{
log: log,
Blobs: blob,
spaceUsedBySatellite: map[storj.NodeID]SatelliteUsage{},
}
}
// NewBlobsUsageCacheTest creates a new disk blob store with a space used cache.
func NewBlobsUsageCacheTest(log *zap.Logger, blob storage.Blobs, piecesTotal, piecesContentSize, trashTotal int64, spaceUsedBySatellite map[storj.NodeID]SatelliteUsage) *BlobsUsageCache {
return &BlobsUsageCache{
log: log,
Blobs: blob,
piecesTotal: piecesTotal,
piecesContentSize: piecesContentSize,
trashTotal: trashTotal,
spaceUsedBySatellite: spaceUsedBySatellite,
}
}
func (blobs *BlobsUsageCache) init(pieceTotal, contentSize, trashTotal int64, totalsBySatellite map[storj.NodeID]SatelliteUsage) {
blobs.mu.Lock()
defer blobs.mu.Unlock()
blobs.piecesTotal = pieceTotal
blobs.piecesContentSize = contentSize
blobs.trashTotal = trashTotal
blobs.spaceUsedBySatellite = totalsBySatellite
}
// SpaceUsedBySatellite returns the current total space used for a specific
// satellite for all pieces.
func (blobs *BlobsUsageCache) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal int64, piecesContentSize int64, err error) {
blobs.mu.Lock()
defer blobs.mu.Unlock()
values := blobs.spaceUsedBySatellite[satelliteID]
return values.Total, values.ContentSize, nil
}
// SpaceUsedForPieces returns the current total used space for all pieces.
func (blobs *BlobsUsageCache) SpaceUsedForPieces(ctx context.Context) (int64, int64, error) {
blobs.mu.Lock()
defer blobs.mu.Unlock()
return blobs.piecesTotal, blobs.piecesContentSize, nil
}
// SpaceUsedForTrash returns the current total used space for the trash dir.
func (blobs *BlobsUsageCache) SpaceUsedForTrash(ctx context.Context) (int64, error) {
blobs.mu.Lock()
defer blobs.mu.Unlock()
return blobs.trashTotal, nil
}
// Delete gets the size of the piece that is going to be deleted then deletes it and
// updates the space used cache accordingly.
func (blobs *BlobsUsageCache) Delete(ctx context.Context, blobRef storage.BlobRef) error {
pieceTotal, pieceContentSize, err := blobs.pieceSizes(ctx, blobRef)
if err != nil {
return Error.Wrap(err)
}
if err := blobs.Blobs.Delete(ctx, blobRef); err != nil {
return Error.Wrap(err)
}
satelliteID, err := storj.NodeIDFromBytes(blobRef.Namespace)
if err != nil {
return err
}
blobs.Update(ctx, satelliteID, -pieceTotal, -pieceContentSize, 0)
blobs.log.Debug("deleted piece", zap.String("Satellite ID", satelliteID.String()), zap.Int64("disk space freed in bytes", pieceContentSize))
return nil
}
func (blobs *BlobsUsageCache) pieceSizes(ctx context.Context, blobRef storage.BlobRef) (pieceTotal int64, pieceContentSize int64, err error) {
blobInfo, err := blobs.Stat(ctx, blobRef)
if err != nil {
return 0, 0, err
}
pieceAccess, err := newStoredPieceAccess(nil, blobInfo)
if err != nil {
return 0, 0, err
}
return pieceAccess.Size(ctx)
}
// Update updates the cache totals.
func (blobs *BlobsUsageCache) Update(ctx context.Context, satelliteID storj.NodeID, piecesTotalDelta, piecesContentSizeDelta, trashDelta int64) {
blobs.mu.Lock()
defer blobs.mu.Unlock()
blobs.piecesTotal += piecesTotalDelta
blobs.piecesContentSize += piecesContentSizeDelta
blobs.trashTotal += trashDelta
blobs.ensurePositiveCacheValue(&blobs.piecesTotal, "piecesTotal")
blobs.ensurePositiveCacheValue(&blobs.piecesContentSize, "piecesContentSize")
blobs.ensurePositiveCacheValue(&blobs.trashTotal, "trashTotal")
oldVals := blobs.spaceUsedBySatellite[satelliteID]
newVals := SatelliteUsage{
Total: oldVals.Total + piecesTotalDelta,
ContentSize: oldVals.ContentSize + piecesContentSizeDelta,
}
blobs.ensurePositiveCacheValue(&newVals.Total, "satPiecesTotal")
blobs.ensurePositiveCacheValue(&newVals.ContentSize, "satPiecesContentSize")
blobs.spaceUsedBySatellite[satelliteID] = newVals
}
func (blobs *BlobsUsageCache) ensurePositiveCacheValue(value *int64, name string) {
if *value >= 0 {
return
}
blobs.log.Error(fmt.Sprintf("%s < 0", name), zap.Int64(name, *value))
*value = 0
}
// Trash moves the ref to the trash and updates the cache.
func (blobs *BlobsUsageCache) Trash(ctx context.Context, blobRef storage.BlobRef) error {
pieceTotal, pieceContentSize, err := blobs.pieceSizes(ctx, blobRef)
if err != nil {
return Error.Wrap(err)
}
err = blobs.Blobs.Trash(ctx, blobRef)
if err != nil {
return Error.Wrap(err)
}
satelliteID, err := storj.NodeIDFromBytes(blobRef.Namespace)
if err != nil {
return Error.Wrap(err)
}
blobs.Update(ctx, satelliteID, -pieceTotal, -pieceContentSize, pieceTotal)
return nil
}
// EmptyTrash empties the trash and updates the cache.
func (blobs *BlobsUsageCache) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (int64, [][]byte, error) {
satelliteID, err := storj.NodeIDFromBytes(namespace)
if err != nil {
return 0, nil, err
}
bytesEmptied, keys, err := blobs.Blobs.EmptyTrash(ctx, namespace, trashedBefore)
if err != nil {
return 0, nil, err
}
blobs.Update(ctx, satelliteID, 0, 0, -bytesEmptied)
return bytesEmptied, keys, nil
}
// RestoreTrash restores the trash for the namespace and updates the cache.
func (blobs *BlobsUsageCache) RestoreTrash(ctx context.Context, namespace []byte) ([][]byte, error) {
satelliteID, err := storj.NodeIDFromBytes(namespace)
if err != nil {
return nil, err
}
keysRestored, err := blobs.Blobs.RestoreTrash(ctx, namespace)
if err != nil {
return nil, err
}
for _, key := range keysRestored {
pieceTotal, pieceContentSize, sizeErr := blobs.pieceSizes(ctx, storage.BlobRef{
Key: key,
Namespace: namespace,
})
if sizeErr != nil {
err = errs.Combine(err, sizeErr)
continue
}
blobs.Update(ctx, satelliteID, pieceTotal, pieceContentSize, -pieceTotal)
}
return keysRestored, err
}
func (blobs *BlobsUsageCache) copyCacheTotals() BlobsUsageCache {
blobs.mu.Lock()
defer blobs.mu.Unlock()
var copyMap = map[storj.NodeID]SatelliteUsage{}
for k, v := range blobs.spaceUsedBySatellite {
copyMap[k] = v
}
return BlobsUsageCache{
piecesTotal: blobs.piecesTotal,
piecesContentSize: blobs.piecesContentSize,
trashTotal: blobs.trashTotal,
spaceUsedBySatellite: copyMap,
}
}
// Recalculate estimates new totals for the space used cache. In order to get new totals for the
// space used cache, we had to iterate over all the pieces on disk. Since that can potentially take
// a long time, here we need to check if we missed any additions/deletions while we were iterating and
// estimate how many bytes missed then add those to the space used result of iteration.
func (blobs *BlobsUsageCache) Recalculate(
piecesTotal,
piecesTotalAtStart,
piecesContentSize,
piecesContentSizeAtStart,
trashTotal,
trashTotalAtStart int64,
totalsBySatellite,
totalsBySatelliteAtStart map[storj.NodeID]SatelliteUsage,
) {
totalsAtEnd := blobs.copyCacheTotals()
estimatedPiecesTotal := estimate(
piecesTotal,
piecesTotalAtStart,
totalsAtEnd.piecesTotal,
)
estimatedTotalTrash := estimate(
trashTotal,
trashTotalAtStart,
totalsAtEnd.trashTotal,
)
estimatedPiecesContentSize := estimate(
piecesContentSize,
piecesContentSizeAtStart,
totalsAtEnd.piecesContentSize,
)
var estimatedTotalsBySatellite = map[storj.NodeID]SatelliteUsage{}
for ID, values := range totalsBySatellite {
estimatedTotal := estimate(
values.Total,
totalsBySatelliteAtStart[ID].Total,
totalsAtEnd.spaceUsedBySatellite[ID].Total,
)
estimatedPiecesContentSize := estimate(
values.ContentSize,
totalsBySatelliteAtStart[ID].ContentSize,
totalsAtEnd.spaceUsedBySatellite[ID].ContentSize,
)
// if the estimatedTotal is zero then there is no data stored
// for this satelliteID so don't add it to the cache
if estimatedTotal == 0 && estimatedPiecesContentSize == 0 {
continue
}
estimatedTotalsBySatellite[ID] = SatelliteUsage{
Total: estimatedTotal,
ContentSize: estimatedPiecesContentSize,
}
}
// find any saIDs that are in totalsAtEnd but not in totalsBySatellite
missedWhenIterationEnded := getMissed(totalsAtEnd.spaceUsedBySatellite,
totalsBySatellite,
)
if len(missedWhenIterationEnded) > 0 {
for ID := range missedWhenIterationEnded {
estimatedTotal := estimate(
0,
totalsBySatelliteAtStart[ID].Total,
totalsAtEnd.spaceUsedBySatellite[ID].Total,
)
estimatedPiecesContentSize := estimate(
0,
totalsBySatelliteAtStart[ID].ContentSize,
totalsAtEnd.spaceUsedBySatellite[ID].ContentSize,
)
if estimatedTotal == 0 && estimatedPiecesContentSize == 0 {
continue
}
estimatedTotalsBySatellite[ID] = SatelliteUsage{
Total: estimatedTotal,
ContentSize: estimatedPiecesContentSize,
}
}
}
blobs.mu.Lock()
blobs.piecesTotal = estimatedPiecesTotal
blobs.piecesContentSize = estimatedPiecesContentSize
blobs.trashTotal = estimatedTotalTrash
blobs.spaceUsedBySatellite = estimatedTotalsBySatellite
blobs.mu.Unlock()
}
func estimate(newSpaceUsedTotal, totalAtIterationStart, totalAtIterationEnd int64) int64 {
if newSpaceUsedTotal == totalAtIterationEnd {
if newSpaceUsedTotal < 0 {
return 0
}
return newSpaceUsedTotal
}
// If we missed writes/deletes while iterating, we will assume that half of those missed occurred before
// the iteration and half occurred after. So here we add half of the delta to the result space used totals
// from the iteration to account for those missed.
spaceUsedDeltaDuringIteration := totalAtIterationEnd - totalAtIterationStart
estimatedTotal := newSpaceUsedTotal + (spaceUsedDeltaDuringIteration / 2)
if estimatedTotal < 0 {
return 0
}
return estimatedTotal
}
func getMissed(endTotals, newTotals map[storj.NodeID]SatelliteUsage) map[storj.NodeID]SatelliteUsage {
var missed = map[storj.NodeID]SatelliteUsage{}
for id, vals := range endTotals {
if _, ok := newTotals[id]; !ok {
missed[id] = vals
}
}
return missed
}
// Close satisfies the pieces interface.
func (blobs *BlobsUsageCache) Close() error {
return nil
}
// TestCreateV0 creates a new V0 blob that can be written. This is only appropriate in test situations.
func (blobs *BlobsUsageCache) TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error) {
fStore := blobs.Blobs.(interface {
TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error)
})
return fStore.TestCreateV0(ctx, ref)
}