Skip to content

Commit

Permalink
storagenodedb: buffer up piece expirations
Browse files Browse the repository at this point in the history
- use callback style interfaces rather than a bunch
  of slices. lowers memory usage
- don't individually delete pieces from the db and
  do bulk deletes. it doesn't matter if we fail to
  delete a piece and remove it from the ttl database
  because garbage collection will pick it up
- batch up writes to the ttl database to do 1000 at
  a time. this reduces database writes at the cost
  of potentially losing records but who cares becuase
  gc will pick it up
- remove the concept of recording if a piece failed
  to be deleted. it doesn't matter if we end up
  trying to delete a piece that's already deleted
  for some reason. just move on.
- adds a DeleteSkipV0 call that skips trying the
  delete on the V0 database used by the process
  that deletes expired objects. all the V0 pieces
  left that would have expired already have, and
  if not GC will get them eventually anyway.

Change-Id: I85066f124685a7495f917ff92c681dd1c27c6803
  • Loading branch information
zeebo committed May 6, 2024
1 parent 9802535 commit aa84cb6
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 363 deletions.
46 changes: 9 additions & 37 deletions storagenode/collector/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package collector

import (
"context"
"os"
"time"

"github.com/spacemonkeygo/monkit/v3"
Expand Down Expand Up @@ -74,50 +73,23 @@ func (service *Service) Collect(ctx context.Context, now time.Time) (err error)

service.usedSerials.DeleteExpired(now)

const maxBatches = 100
const batchSize = 1000

var count int64
defer func() {
if count > 0 {
service.log.Info("collect", zap.Int64("count", count))
}
}()

for k := 0; k < maxBatches; k++ {
infos, err := service.pieces.GetExpired(ctx, now, batchSize)
err = service.pieces.GetExpired(ctx, now, func(ctx context.Context, ei pieces.ExpiredInfo) bool {
err := service.pieces.DeleteSkipV0(ctx, ei.SatelliteID, ei.PieceID)
if err != nil {
return err
}
if len(infos) == 0 {
return nil
}

for _, expired := range infos {
err := service.pieces.Delete(ctx, expired.SatelliteID, expired.PieceID)
if err != nil {
if errs.Is(err, os.ErrNotExist) {
service.log.Warn("file does not exist", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID))
err := service.pieces.DeleteExpired(ctx, expired.SatelliteID, expired.PieceID)
if err != nil {
service.log.Error("unable to delete expired piece info from DB", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID), zap.Error(err))
continue
}
service.log.Info("deleted expired piece info from DB", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID))
continue
}
errfailed := service.pieces.DeleteFailed(ctx, expired, now)
if errfailed != nil {
service.log.Error("unable to update piece info", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID), zap.Error(errfailed))
}
service.log.Error("unable to delete piece", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID), zap.Error(err))
continue
}
service.log.Info("deleted expired piece", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID))

service.log.Warn("unable to delete piece", zap.Stringer("Satellite ID", ei.SatelliteID), zap.Stringer("Piece ID", ei.PieceID), zap.Error(err))
} else {
service.log.Debug("deleted expired piece", zap.Stringer("Satellite ID", ei.SatelliteID), zap.Stringer("Piece ID", ei.PieceID))
count++
}
}

return nil
return true
})
_ = service.pieces.DeleteExpired(ctx, now)
return errs.Wrap(err)
}
68 changes: 0 additions & 68 deletions storagenode/collector/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/storagenode/collector"
)

func TestCollector(t *testing.T) {
Expand Down Expand Up @@ -112,67 +108,3 @@ func TestCollector(t *testing.T) {
require.Equal(t, 0, serialsPresent)
})
}

func TestCollector_fileNotFound(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(1, 1, 2, 2),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
for _, storageNode := range planet.StorageNodes {
// stop collector, so we can start a new service manually
storageNode.Collector.Loop.Stop()
// stop order sender because we will stop satellite later
storageNode.Storage2.Orders.Sender.Pause()
}

expectedData := testrand.Bytes(5 * memory.KiB)

// upload some data to exactly 2 nodes that expires in 1 day
err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData, time.Now().Add(1*24*time.Hour))
require.NoError(t, err)

// stop satellite to prevent audits
require.NoError(t, planet.StopPeer(planet.Satellites[0]))

collections := 0

// assume we are 2 days in the future
for _, storageNode := range planet.StorageNodes {
pieceStore := storageNode.DB.Pieces()

// verify that we actually have some data on storage nodes
used, err := pieceStore.SpaceUsedForBlobs(ctx)
require.NoError(t, err)
if used == 0 {
// this storage node didn't get picked for storing data
continue
}

// delete file before collector service runs
err = pieceStore.DeleteNamespace(ctx, planet.Satellites[0].Identity.ID.Bytes())
require.NoError(t, err)

// create new observed logger
observedZapCore, observedLogs := observer.New(zap.InfoLevel)
observedLogger := zap.New(observedZapCore)
// start new collector service
collectorService := collector.NewService(observedLogger, storageNode.Storage2.Store, storageNode.UsedSerials, storageNode.Config.Collector)
// collect all the data
err = collectorService.Collect(ctx, time.Now().Add(2*24*time.Hour))
require.NoError(t, err)
require.Equal(t, 2, observedLogs.Len())
// check "file does not exist" log
require.Equal(t, observedLogs.All()[0].Level, zapcore.WarnLevel)
require.Equal(t, observedLogs.All()[0].Message, "file does not exist")
// check piece info deleted from db log
require.Equal(t, observedLogs.All()[1].Level, zapcore.InfoLevel)
require.Equal(t, observedLogs.All()[1].Message, "deleted expired piece info from DB")

collections++
}

require.NotZero(t, collections)
})
}
31 changes: 13 additions & 18 deletions storagenode/pieces/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package pieces_test

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -128,25 +129,22 @@ func TestV0PieceInfo(t *testing.T) {
require.Empty(t, cmp.Diff(info1, info1loaded, cmp.Comparer(pb.Equal)))

// getting no expired pieces
expired, err := pieceinfos.GetExpired(ctx, now.Add(-10*time.Hour), 10)
err = pieceinfos.GetExpired(ctx, now.Add(-10*time.Hour), func(_ context.Context, ei pieces.ExpiredInfo) bool {
t.Fatal("should not be called")
return false
})
assert.NoError(t, err)
assert.Len(t, expired, 0)

// getting expired pieces
var expired []pieces.ExpiredInfo
exp := now.Add(8 * 24 * time.Hour)
expired, err = pieceinfos.GetExpired(ctx, exp, 10)
err = pieceinfos.GetExpired(ctx, exp, func(_ context.Context, ei pieces.ExpiredInfo) bool {
expired = append(expired, ei)
return true
})
assert.NoError(t, err)
assert.Len(t, expired, 3)

// mark info0 deletion as a failure
err = pieceinfos.DeleteFailed(ctx, info0.SatelliteID, info0.PieceID, exp)
assert.NoError(t, err)

// this shouldn't return info0
expired, err = pieceinfos.GetExpired(ctx, exp, 10)
assert.NoError(t, err)
assert.Len(t, expired, 2)

// deleting
err = pieceinfos.Delete(ctx, info0.SatelliteID, info0.PieceID)
require.NoError(t, err)
Expand Down Expand Up @@ -190,18 +188,15 @@ func TestPieceinfo_Trivial(t *testing.T) {
require.NoError(t, err)
}

{ // Ensure DeleteFailed works at all
err := pieceinfos.DeleteFailed(ctx, satelliteID, pieceID, time.Now())
require.NoError(t, err)
}

{ // Ensure Delete works at all
err := pieceinfos.Delete(ctx, satelliteID, pieceID)
require.NoError(t, err)
}

{ // Ensure GetExpired works at all
_, err := pieceinfos.GetExpired(ctx, time.Now(), 1)
err := pieceinfos.GetExpired(ctx, time.Now(), func(_ context.Context, ei pieces.ExpiredInfo) bool {
return true
})
require.NoError(t, err)
}
})
Expand Down
57 changes: 19 additions & 38 deletions storagenode/pieces/pieceexpiration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
package pieces_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"storj.io/common/testcontext"
Expand All @@ -24,24 +24,16 @@ func TestPieceExpirationDB(t *testing.T) {

satelliteID := testrand.NodeID()
pieceID := testrand.PieceID()
expectedExpireInfo := pieces.ExpiredInfo{
SatelliteID: satelliteID,
PieceID: pieceID,
InPieceInfo: false,
}

// GetExpired with no matches
expiredPieceIDs, err := expireDB.GetExpired(ctx, time.Now(), 1000)
err := expireDB.GetExpired(ctx, time.Now(), func(_ context.Context, ei pieces.ExpiredInfo) bool {
t.Fatal("should not be called")
return false
})
require.NoError(t, err)
require.Len(t, expiredPieceIDs, 0)

// DeleteExpiration with no matches
found, err := expireDB.DeleteExpiration(ctx, satelliteID, pieceID)
require.NoError(t, err)
require.False(t, found)

// DeleteFailed with no matches
err = expireDB.DeleteFailed(ctx, satelliteID, pieceID, time.Now())
err = expireDB.DeleteExpirations(ctx, time.Time{})
require.NoError(t, err)

expireAt := time.Now()
Expand All @@ -51,38 +43,27 @@ func TestPieceExpirationDB(t *testing.T) {
require.NoError(t, err)

// SetExpiration duplicate
err = expireDB.SetExpiration(ctx, satelliteID, pieceID, expireAt.Add(time.Hour))
require.Error(t, err)

// GetExpired normal usage
expiredPieceIDs, err = expireDB.GetExpired(ctx, expireAt.Add(time.Microsecond), 1000)
err = expireDB.SetExpiration(ctx, satelliteID, pieceID, expireAt.Add(-time.Hour))
require.NoError(t, err)
require.Len(t, expiredPieceIDs, 1)
assert.Equal(t, expiredPieceIDs[0], expectedExpireInfo)

deleteFailedAt := expireAt.Add(2 * time.Microsecond)

// DeleteFailed normal usage
err = expireDB.DeleteFailed(ctx, satelliteID, pieceID, deleteFailedAt)
require.NoError(t, err)

// GetExpired filters out rows with deletion_failed_at = t
expiredPieceIDs, err = expireDB.GetExpired(ctx, deleteFailedAt, 1000)
require.NoError(t, err)
require.Len(t, expiredPieceIDs, 0)
expiredPieceIDs, err = expireDB.GetExpired(ctx, deleteFailedAt.Add(time.Microsecond), 1000)
// GetExpired normal usage
var expired []pieces.ExpiredInfo
err = expireDB.GetExpired(ctx, expireAt, func(_ context.Context, ei pieces.ExpiredInfo) bool {
expired = append(expired, ei)
return true
})
require.NoError(t, err)
require.Len(t, expiredPieceIDs, 1)
assert.Equal(t, expiredPieceIDs[0], expectedExpireInfo)
require.Len(t, expired, 1)

// DeleteExpiration normal usage
found, err = expireDB.DeleteExpiration(ctx, satelliteID, pieceID)
err = expireDB.DeleteExpirations(ctx, expireAt)
require.NoError(t, err)
require.True(t, found)

// Should not be there anymore
expiredPieceIDs, err = expireDB.GetExpired(ctx, expireAt.Add(365*24*time.Hour), 1000)
err = expireDB.GetExpired(ctx, expireAt.Add(365*24*time.Hour), func(_ context.Context, ei pieces.ExpiredInfo) bool {
t.Fatal("should not be called")
return false
})
require.NoError(t, err)
require.Len(t, expiredPieceIDs, 0)
})
}

0 comments on commit aa84cb6

Please sign in to comment.