Skip to content

Commit a7d8322

Browse files
kaloyan-raevStorj Robot
authored andcommitted
storagenode/collector: reverse flat file store deletion order
We've killed many times the expiration jobs that were running the old implementation, which left a lot of hour files that have already been processed. Running the jobs again requires them to process these files again, which significantly delays the deletion of the remaining expired pieces. By reversing the order, the job will priorities the expired pieces that has not been deleted yet. Change-Id: I95eaafe8ecc603fd939a34f91467b98766b715dd
1 parent 12b96a6 commit a7d8322

File tree

9 files changed

+198
-85
lines changed

9 files changed

+198
-85
lines changed

storagenode/collector/service.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type Config struct {
2525
ExpirationGracePeriod time.Duration `help:"how long should the collector wait before deleting expired pieces. Should not be less than 30 min since nodes are allowed to be 30 mins out of sync with the satellite." default:"1h0m0s"`
2626
ExpirationBatchSize int `help:"how many expired pieces to delete in one batch. If <= 0, all expired pieces will be deleted in one batch. (ignored by flat file store)" default:"1000"`
2727
FlatFileBatchLimit int `help:"how many per hour flat files can be deleted in one batch." default:"5"`
28+
ReverseOrder bool `help:"delete expired pieces in reverse order (recently expired first)" default:"false"`
2829
}
2930

3031
// Service implements collecting expired pieces on the storage node.
@@ -37,7 +38,7 @@ type Service struct {
3738

3839
Loop *sync2.Cycle
3940

40-
limits pieces.ExpirationLimits
41+
opts pieces.ExpirationOptions
4142
expirationGracePeriod time.Duration
4243
}
4344

@@ -48,14 +49,16 @@ func NewService(log *zap.Logger, pieceStore *pieces.Store, usedSerials *usedseri
4849
config.ExpirationGracePeriod = 1 * time.Hour
4950
}
5051

52+
opts := pieces.DefaultExpirationOptions()
53+
opts.Limits.BatchSize = config.ExpirationBatchSize
54+
opts.Limits.FlatFileLimit = config.FlatFileBatchLimit
55+
opts.ReverseOrder = config.ReverseOrder
56+
5157
return &Service{
52-
log: log,
53-
pieces: pieceStore,
54-
usedSerials: usedSerials,
55-
limits: pieces.ExpirationLimits{
56-
BatchSize: config.ExpirationBatchSize,
57-
FlatFileLimit: config.FlatFileBatchLimit,
58-
},
58+
log: log,
59+
pieces: pieceStore,
60+
usedSerials: usedSerials,
61+
opts: opts,
5962
expirationGracePeriod: config.ExpirationGracePeriod,
6063
Loop: sync2.NewCycle(config.Interval),
6164
}
@@ -100,7 +103,7 @@ func (service *Service) Collect(ctx context.Context, now time.Time) (err error)
100103
}()
101104

102105
for {
103-
infoLists, err := service.pieces.GetExpiredBatchSkipV0(ctx, now, service.limits)
106+
infoLists, err := service.pieces.GetExpiredBatchSkipV0(ctx, now, service.opts)
104107
if err != nil {
105108
return errs.Wrap(err)
106109
}
@@ -128,7 +131,7 @@ func (service *Service) Collect(ctx context.Context, now time.Time) (err error)
128131
}
129132

130133
// delete the batch from the database
131-
if deleteErr := service.pieces.DeleteExpiredBatchSkipV0(ctx, now, service.limits); deleteErr != nil {
134+
if deleteErr := service.pieces.DeleteExpiredBatchSkipV0(ctx, now, service.opts); deleteErr != nil {
132135
service.log.Error("error during deleting expired pieces: ", zap.Error(deleteErr))
133136
return errs.Wrap(deleteErr)
134137
}

storagenode/pieces/combined_expiration_store.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,15 @@ func (c *CombinedExpirationStore) SetExpiration(ctx context.Context, satellite s
3636
}
3737

3838
// GetExpired returns the expired pieces.
39-
func (c *CombinedExpirationStore) GetExpired(ctx context.Context, expiresBefore time.Time, limits ExpirationLimits) ([]*ExpiredInfoRecords, error) {
39+
func (c *CombinedExpirationStore) GetExpired(ctx context.Context, expiresBefore time.Time, opts ExpirationOptions) ([]*ExpiredInfoRecords, error) {
4040
var errList errs.Group
41-
expired, err := c.flatFileStore.GetExpired(ctx, expiresBefore, limits)
41+
expired, err := c.flatFileStore.GetExpired(ctx, expiresBefore, opts)
4242
if err != nil {
4343
errList.Add(err)
4444
}
4545

4646
if c.chainedDB != nil {
47-
chainedExpired, err := c.chainedDB.GetExpired(ctx, expiresBefore, limits)
47+
chainedExpired, err := c.chainedDB.GetExpired(ctx, expiresBefore, opts)
4848
if err != nil {
4949
errList.Add(err)
5050
}
@@ -72,17 +72,17 @@ func (c *CombinedExpirationStore) DeleteExpirations(ctx context.Context, expires
7272
}
7373

7474
// DeleteExpirationsBatch deletes the expirations for the given time.
75-
func (c *CombinedExpirationStore) DeleteExpirationsBatch(ctx context.Context, now time.Time, limits ExpirationLimits) error {
75+
func (c *CombinedExpirationStore) DeleteExpirationsBatch(ctx context.Context, now time.Time, opts ExpirationOptions) error {
7676
var errList errs.Group
7777

78-
c.log.Debug("deleting expired pieces from flat file store", zap.Time("expiresAt", now), zap.Any("limits", limits))
79-
if err := c.flatFileStore.DeleteExpirationsBatch(ctx, now, limits); err != nil {
78+
c.log.Debug("deleting expired pieces from flat file store", zap.Time("expiresAt", now), zap.Any("opts", opts))
79+
if err := c.flatFileStore.DeleteExpirationsBatch(ctx, now, opts); err != nil {
8080
errList.Add(err)
8181
}
8282

8383
if c.chainedDB != nil {
84-
c.log.Debug("deleting expired pieces from flat file store", zap.Time("expiresAt", now), zap.Any("limits", limits))
85-
errList.Add(c.chainedDB.DeleteExpirationsBatch(ctx, now, limits))
84+
c.log.Debug("deleting expired pieces from flat file store", zap.Time("expiresAt", now), zap.Any("opts", opts))
85+
errList.Add(c.chainedDB.DeleteExpirationsBatch(ctx, now, opts))
8686
}
8787

8888
return errList.Err()

storagenode/pieces/combined_expiration_store_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func TestPieceExpirationCombinedStore(t *testing.T) {
5050
require.NoError(t, err)
5151

5252
// check to see that values are in both backends
53-
expirationLists, err := combinedStore.GetExpired(ctx, now.Add(72*time.Hour), pieces.DefaultExpirationLimits())
53+
expirationLists, err := combinedStore.GetExpired(ctx, now.Add(72*time.Hour), pieces.DefaultExpirationOptions())
5454
require.NoError(t, err)
5555
expirationInfos := pieces.FlattenExpirationInfoLists(expirationLists)
5656
require.Len(t, expirationInfos, 4)
@@ -71,14 +71,13 @@ func TestPieceExpirationCombinedStore(t *testing.T) {
7171
})
7272

7373
// delete up to now+36h
74-
err = combinedStore.DeleteExpirationsBatch(ctx, now.Add(36*time.Hour), pieces.ExpirationLimits{
75-
BatchSize: 10,
76-
FlatFileLimit: -1,
77-
})
74+
opts := pieces.DefaultExpirationOptions()
75+
opts.Limits.BatchSize = 10
76+
err = combinedStore.DeleteExpirationsBatch(ctx, now.Add(36*time.Hour), opts)
7877
require.NoError(t, err)
7978

8079
// piece1 should be deleted from both databases, and not the others
81-
expirationLists, err = combinedStore.GetExpired(ctx, now.Add(72*time.Hour), pieces.DefaultExpirationLimits())
80+
expirationLists, err = combinedStore.GetExpired(ctx, now.Add(72*time.Hour), pieces.DefaultExpirationOptions())
8281
require.NoError(t, err)
8382
expirationInfos = pieces.FlattenExpirationInfoLists(expirationLists)
8483
require.Len(t, expirationInfos, 2)
@@ -93,7 +92,7 @@ func TestPieceExpirationCombinedStore(t *testing.T) {
9392
})
9493

9594
// querying sqlite3 db only
96-
expirationLists, err = sqliteDB.GetExpired(ctx, now.Add(72*time.Hour), pieces.DefaultExpirationLimits())
95+
expirationLists, err = sqliteDB.GetExpired(ctx, now.Add(72*time.Hour), pieces.DefaultExpirationOptions())
9796
require.NoError(t, err)
9897
expirationInfos = pieces.FlattenExpirationInfoLists(expirationLists)
9998
require.Len(t, expirationInfos, 1)

storagenode/pieces/pieceexpiration.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ var monGetExpired = mon.Task()
136136
// GetExpired gets piece IDs that expire or have expired before the given time.
137137
// It returns the lastExpiration time, which can be used to delete all expirations
138138
// up to that time.
139-
func (peStore *PieceExpirationStore) GetExpired(ctx context.Context, now time.Time, limits ExpirationLimits) (infos []*ExpiredInfoRecords, err error) {
139+
func (peStore *PieceExpirationStore) GetExpired(ctx context.Context, now time.Time, opts ExpirationOptions) (infos []*ExpiredInfoRecords, err error) {
140140
defer monGetExpired(&ctx)(&err)
141141

142142
satellites, err := peStore.getSatellitesWithExpirations(ctx)
@@ -145,7 +145,7 @@ func (peStore *PieceExpirationStore) GetExpired(ctx context.Context, now time.Ti
145145
}
146146
var errList errs.Group
147147
for _, satelliteID := range satellites {
148-
satelliteInfos, err := peStore.GetExpiredForSatellite(ctx, satelliteID, now, limits)
148+
satelliteInfos, err := peStore.GetExpiredForSatellite(ctx, satelliteID, now, opts)
149149
if err != nil {
150150
errList.Add(ErrPieceExpiration.Wrap(err))
151151
}
@@ -161,13 +161,13 @@ var monDeleteExpirations = mon.Task()
161161
func (peStore *PieceExpirationStore) DeleteExpirations(ctx context.Context, expiresAt time.Time) (err error) {
162162
defer monDeleteExpirations(&ctx)(&err)
163163

164-
errList := peStore.deleteExpirationsFromAllSatellites(ctx, expiresAt, -1)
164+
errList := peStore.deleteExpirationsFromAllSatellites(ctx, expiresAt, DefaultExpirationOptions())
165165
return errList.Err()
166166
}
167167

168168
var monDeleteExpirationsFromAllSatellites = mon.Task()
169169

170-
func (peStore *PieceExpirationStore) deleteExpirationsFromAllSatellites(ctx context.Context, expiresAt time.Time, numFiles int) (errList errs.Group) {
170+
func (peStore *PieceExpirationStore) deleteExpirationsFromAllSatellites(ctx context.Context, expiresAt time.Time, opts ExpirationOptions) (errList errs.Group) {
171171
defer monDeleteExpirationsFromAllSatellites(&ctx)(nil)
172172

173173
satellites, err := peStore.getSatellitesWithExpirations(ctx)
@@ -176,7 +176,7 @@ func (peStore *PieceExpirationStore) deleteExpirationsFromAllSatellites(ctx cont
176176
return errList
177177
}
178178
for _, satelliteID := range satellites {
179-
err := peStore.DeleteExpirationsForSatellite(ctx, satelliteID, expiresAt, numFiles)
179+
err := peStore.DeleteExpirationsForSatellite(ctx, satelliteID, expiresAt, opts)
180180
errList.Add(ErrPieceExpiration.Wrap(err))
181181
}
182182
return errList
@@ -218,10 +218,10 @@ var monGetExpiredForSatellite = mon.Task()
218218

219219
// GetExpiredForSatellite gets piece IDs that expire or have expired before the
220220
// given time for a specific satellite.
221-
func (peStore *PieceExpirationStore) GetExpiredForSatellite(ctx context.Context, satellite storj.NodeID, now time.Time, limits ExpirationLimits) (infos *ExpiredInfoRecords, err error) {
221+
func (peStore *PieceExpirationStore) GetExpiredForSatellite(ctx context.Context, satellite storj.NodeID, now time.Time, opts ExpirationOptions) (infos *ExpiredInfoRecords, err error) {
222222
defer monGetExpiredForSatellite(&ctx)(&err)
223223

224-
elapsed, err := peStore.getElapsedHoursWithExpirations(ctx, satellite, now, limits.FlatFileLimit)
224+
elapsed, err := peStore.getElapsedHoursWithExpirations(ctx, satellite, now, opts)
225225
if err != nil {
226226
return nil, ErrPieceExpiration.Wrap(err)
227227
}
@@ -327,10 +327,10 @@ var monDeleteExpirationsForSatellite = mon.Task()
327327

328328
// DeleteExpirationsForSatellite deletes information about piece expirations
329329
// before the given time for a specific satellite.
330-
func (peStore *PieceExpirationStore) DeleteExpirationsForSatellite(ctx context.Context, satellite storj.NodeID, now time.Time, numFiles int) (err error) {
330+
func (peStore *PieceExpirationStore) DeleteExpirationsForSatellite(ctx context.Context, satellite storj.NodeID, now time.Time, opts ExpirationOptions) (err error) {
331331
defer monDeleteExpirationsForSatellite(&ctx)(&err)
332332

333-
elapsed, err := peStore.getElapsedHoursWithExpirations(ctx, satellite, now, numFiles)
333+
elapsed, err := peStore.getElapsedHoursWithExpirations(ctx, satellite, now, opts)
334334
if err != nil {
335335
return ErrPieceExpiration.Wrap(err)
336336
}
@@ -358,14 +358,14 @@ func (peStore *PieceExpirationStore) DeleteExpirationsForSatellite(ctx context.C
358358
}
359359

360360
// DeleteExpirationsBatch removes expiration records for pieces that have expired before the given time.
361-
func (peStore *PieceExpirationStore) DeleteExpirationsBatch(ctx context.Context, now time.Time, limits ExpirationLimits) error {
362-
errList := peStore.deleteExpirationsFromAllSatellites(ctx, now, limits.FlatFileLimit)
361+
func (peStore *PieceExpirationStore) DeleteExpirationsBatch(ctx context.Context, now time.Time, opts ExpirationOptions) error {
362+
errList := peStore.deleteExpirationsFromAllSatellites(ctx, now, opts)
363363
return errList.Err()
364364
}
365365

366366
var monGetElapsedHoursWithExpirations = mon.Task()
367367

368-
func (peStore *PieceExpirationStore) getElapsedHoursWithExpirations(ctx context.Context, satellite storj.NodeID, now time.Time, numOfFiles int) (elapsed []time.Time, err error) {
368+
func (peStore *PieceExpirationStore) getElapsedHoursWithExpirations(ctx context.Context, satellite storj.NodeID, now time.Time, opts ExpirationOptions) (elapsed []time.Time, err error) {
369369
defer monGetElapsedHoursWithExpirations(&ctx)(&err)
370370

371371
satelliteDir := PathEncoding.EncodeToString(satellite[:])
@@ -382,6 +382,9 @@ func (peStore *PieceExpirationStore) getElapsedHoursWithExpirations(ctx context.
382382
return nil, ErrPieceExpiration.Wrap(err)
383383
}
384384
slices.Sort(names)
385+
if opts.ReverseOrder {
386+
slices.Reverse(names)
387+
}
385388
for _, name := range names {
386389
hour, err := time.ParseInLocation(PieceExpirationFileNameFormat, name, time.UTC)
387390
if err != nil {
@@ -394,7 +397,7 @@ func (peStore *PieceExpirationStore) getElapsedHoursWithExpirations(ctx context.
394397
}
395398

396399
// if we have a limit, we can stop once we have enough elapsed hours
397-
if numOfFiles > 0 && len(elapsed) >= numOfFiles {
400+
if opts.Limits.FlatFileLimit > 0 && len(elapsed) >= opts.Limits.FlatFileLimit {
398401
break
399402
}
400403
}

storagenode/pieces/pieceexpiration_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func PieceExpirationFunctionalityTest(ctx context.Context, t *testing.T, expireD
2727
pieceID := testrand.PieceID()
2828

2929
// GetExpired with no matches
30-
expiredLists, err := expireDB.GetExpired(ctx, time.Now(), DefaultExpirationLimits())
30+
expiredLists, err := expireDB.GetExpired(ctx, time.Now(), DefaultExpirationOptions())
3131
require.NoError(t, err)
3232
expired := FlattenExpirationInfoLists(expiredLists)
3333
require.Len(t, expired, 0)
@@ -47,7 +47,7 @@ func PieceExpirationFunctionalityTest(ctx context.Context, t *testing.T, expireD
4747
require.NoError(t, err)
4848

4949
// GetExpired normal usage
50-
expiredLists, err = expireDB.GetExpired(ctx, expireAt, DefaultExpirationLimits())
50+
expiredLists, err = expireDB.GetExpired(ctx, expireAt, DefaultExpirationOptions())
5151
require.NoError(t, err)
5252
expired = FlattenExpirationInfoLists(expiredLists)
5353
require.Len(t, expired, 1)
@@ -57,7 +57,7 @@ func PieceExpirationFunctionalityTest(ctx context.Context, t *testing.T, expireD
5757
require.NoError(t, err)
5858

5959
// Should not be there anymore
60-
expiredLists, err = expireDB.GetExpired(ctx, expireAt.Add(365*24*time.Hour), DefaultExpirationLimits())
60+
expiredLists, err = expireDB.GetExpired(ctx, expireAt.Add(365*24*time.Hour), DefaultExpirationOptions())
6161
expired = FlattenExpirationInfoLists(expiredLists)
6262
require.NoError(t, err)
6363
require.Len(t, expired, 0)
@@ -112,7 +112,7 @@ func TestPieceExpirationStoreInDepth(t *testing.T) {
112112
}
113113

114114
afterExpirations := now.Add(time.Duration(numSatellites*numPieces+1) * time.Hour)
115-
expirationLists, err := store.GetExpired(ctx, afterExpirations, DefaultExpirationLimits())
115+
expirationLists, err := store.GetExpired(ctx, afterExpirations, DefaultExpirationOptions())
116116
require.NoError(t, err)
117117

118118
require.Len(t, expirationLists, numSatellites)
@@ -136,10 +136,10 @@ func TestPieceExpirationStoreInDepth(t *testing.T) {
136136
}
137137
}
138138

139-
err = store.DeleteExpirationsForSatellite(ctx, satellites[0], afterExpirations, -1)
139+
err = store.DeleteExpirationsForSatellite(ctx, satellites[0], afterExpirations, DefaultExpirationOptions())
140140
require.NoError(t, err)
141141

142-
expirationLists, err = store.GetExpired(ctx, afterExpirations, DefaultExpirationLimits())
142+
expirationLists, err = store.GetExpired(ctx, afterExpirations, DefaultExpirationOptions())
143143
require.NoError(t, err)
144144
expirationInfos := FlattenExpirationInfoLists(expirationLists)
145145
require.Len(t, expirationInfos, (numSatellites-1)*numPieces)
@@ -157,7 +157,7 @@ func TestPieceExpirationStoreInDepth(t *testing.T) {
157157
err = store.DeleteExpirations(ctx, afterExpirations)
158158
require.NoError(t, err)
159159

160-
expirationLists, err = store.GetExpired(ctx, afterExpirations, DefaultExpirationLimits())
160+
expirationLists, err = store.GetExpired(ctx, afterExpirations, DefaultExpirationOptions())
161161
require.NoError(t, err)
162162
expirationInfos = FlattenExpirationInfoLists(expirationLists)
163163

@@ -208,7 +208,7 @@ func TestPieceExpirationStoreFileTruncation(t *testing.T) {
208208
require.NoError(t, f.Close())
209209

210210
// check all piece expirations in store
211-
expirationLists, err := store.GetExpired(ctx, now.Add(time.Hour), DefaultExpirationLimits())
211+
expirationLists, err := store.GetExpired(ctx, now.Add(time.Hour), DefaultExpirationOptions())
212212
require.NoError(t, err)
213213
require.Len(t, expirationLists, 1)
214214
require.Equal(t, satelliteID, expirationLists[0].SatelliteID)
@@ -222,7 +222,7 @@ func TestPieceExpirationStoreFileTruncation(t *testing.T) {
222222
err = store.SetExpiration(ctx, satelliteID, pieceID2, now, 200)
223223
require.NoError(t, err)
224224

225-
expirationLists, err = store.GetExpired(ctx, now.Add(time.Hour), DefaultExpirationLimits())
225+
expirationLists, err = store.GetExpired(ctx, now.Add(time.Hour), DefaultExpirationOptions())
226226
require.NoError(t, err)
227227
expirationInfos := FlattenExpirationInfoLists(expirationLists)
228228
require.Len(t, expirationInfos, 2)

0 commit comments

Comments
 (0)