Skip to content

Commit

Permalink
storagenode: trash with Bloom filter's receiving time
Browse files Browse the repository at this point in the history
Rather than using the current time while placing items in trash use the
bloom filter receive data. This reduces the time trash is being kept,
while still providing sufficient recovery time.
  • Loading branch information
liori committed Feb 19, 2024
1 parent 759b419 commit 96dd51b
Show file tree
Hide file tree
Showing 13 changed files with 38 additions and 51 deletions.
2 changes: 1 addition & 1 deletion storagenode/blobstore/blob.go
Expand Up @@ -86,7 +86,7 @@ type Blobs interface {
// DeleteTrashNamespace deletes the trash folder for a given namespace.
DeleteTrashNamespace(ctx context.Context, namespace []byte) (err error)
// Trash marks a file for pending deletion.
Trash(ctx context.Context, ref BlobRef) error
Trash(ctx context.Context, ref BlobRef, timestamp time.Time) error
// RestoreTrash restores all files in the trash for a given namespace and returns the keys restored.
RestoreTrash(ctx context.Context, namespace []byte) ([][]byte, error)
// EmptyTrash removes all files in trash that were moved to trash prior to trashedBefore and returns the total bytes emptied and keys deleted.
Expand Down
37 changes: 14 additions & 23 deletions storagenode/blobstore/filestore/dir.go
Expand Up @@ -40,16 +40,13 @@ var pathEncoding = base32.NewEncoding("abcdefghijklmnopqrstuvwxyz234567").WithPa
type Dir struct {
log *zap.Logger
path string

trashnow func() time.Time // the function used by trash to determine "now"
}

// OpenDir opens existing folder for storing blobs.
func OpenDir(log *zap.Logger, path string) (*Dir, error) {
dir := &Dir{
log: log,
path: path,
trashnow: time.Now,
log: log,
path: path,
}

stat := func(path string) error {
Expand All @@ -67,9 +64,8 @@ func OpenDir(log *zap.Logger, path string) (*Dir, error) {
// NewDir returns folder for storing blobs.
func NewDir(log *zap.Logger, path string) (*Dir, error) {
dir := &Dir{
log: log,
path: path,
trashnow: time.Now,
log: log,
path: path,
}

return dir, errs.Combine(
Expand Down Expand Up @@ -316,13 +312,15 @@ func (dir *Dir) StatWithStorageFormat(ctx context.Context, ref blobstore.BlobRef
}

// Trash moves the blob specified by ref to the trash for every format version.
func (dir *Dir) Trash(ctx context.Context, ref blobstore.BlobRef) (err error) {
func (dir *Dir) Trash(ctx context.Context, ref blobstore.BlobRef, timestamp time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
return dir.iterateStorageFormatVersions(ctx, ref, dir.TrashWithStorageFormat)
return dir.iterateStorageFormatVersions(ctx, ref, func(ctx context.Context, ref blobstore.BlobRef, formatVersion blobstore.FormatVersion) error {
return dir.TrashWithStorageFormat(ctx, ref, formatVersion, timestamp)
})
}

// TrashWithStorageFormat moves the blob specified by ref to the trash for the specified format version.
func (dir *Dir) TrashWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (err error) {
func (dir *Dir) TrashWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion, timestamp time.Time) (err error) {
blobsBasePath, err := dir.blobToBasePath(ref)
if err != nil {
return err
Expand All @@ -343,16 +341,15 @@ func (dir *Dir) TrashWithStorageFormat(ctx context.Context, ref blobstore.BlobRe
return err
}

// Change mtime to now. This allows us to check the mtime to know how long
// the file has been in the trash. If the file is restored this may make it
// take longer to be trashed again, but the simplicity is worth the
// trade-off.
// Change mtime to the logical time of removal. This allows us to check the
// mtime to know how long the file has been in the trash. If the file is
// restored this may make it take longer to be trashed again, but the
// simplicity is worth the trade-off.
//
// We change the mtime prior to moving the file so that if this call fails
// the file will not be in the trash with an unmodified mtime, which could
// result in its permanent deletion too soon.
now := dir.trashnow()
err = os.Chtimes(blobsVerPath, now, now)
err = os.Chtimes(blobsVerPath, timestamp, timestamp)
if os.IsNotExist(err) {
return nil
}
Expand All @@ -371,12 +368,6 @@ func (dir *Dir) TrashWithStorageFormat(ctx context.Context, ref blobstore.BlobRe
return err
}

// ReplaceTrashnow is a helper for tests to replace the trashnow function used
// when moving files to the trash.
func (dir *Dir) ReplaceTrashnow(trashnow func() time.Time) {
dir.trashnow = trashnow
}

// RestoreTrash moves every blob in the trash folder back into blobsdir.
func (dir *Dir) RestoreTrash(ctx context.Context, namespace []byte) (keysRestored [][]byte, err error) {
var errorsEncountered errs.Group
Expand Down
4 changes: 2 additions & 2 deletions storagenode/blobstore/filestore/store.go
Expand Up @@ -152,9 +152,9 @@ func (store *blobStore) DeleteTrashNamespace(ctx context.Context, namespace []by
}

// Trash moves the ref to a trash directory.
func (store *blobStore) Trash(ctx context.Context, ref blobstore.BlobRef) (err error) {
func (store *blobStore) Trash(ctx context.Context, ref blobstore.BlobRef, timestamp time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
return Error.Wrap(store.dir.Trash(ctx, ref))
return Error.Wrap(store.dir.Trash(ctx, ref, timestamp))
}

// RestoreTrash moves every blob in the trash back into the regular location.
Expand Down
4 changes: 2 additions & 2 deletions storagenode/blobstore/filestore/store_test.go
Expand Up @@ -634,7 +634,7 @@ func TestEmptyTrash(t *testing.T) {
}

// Trash the ref
require.NoError(t, store.Trash(ctx, blobref))
require.NoError(t, store.Trash(ctx, blobref, time.Now()))
}
}

Expand Down Expand Up @@ -759,7 +759,7 @@ func TestTrashAndRestore(t *testing.T) {
}

// Trash the ref
require.NoError(t, store.Trash(ctx, blobref))
require.NoError(t, store.Trash(ctx, blobref, time.Now()))

// Verify files are gone
for _, file := range ref.files {
Expand Down
4 changes: 2 additions & 2 deletions storagenode/blobstore/testblobs/bad.go
Expand Up @@ -137,11 +137,11 @@ func (bad *BadBlobs) OpenWithStorageFormat(ctx context.Context, ref blobstore.Bl
}

// Trash deletes the blob with the namespace and key.
func (bad *BadBlobs) Trash(ctx context.Context, ref blobstore.BlobRef) error {
func (bad *BadBlobs) Trash(ctx context.Context, ref blobstore.BlobRef, timestamp time.Time) error {
if err := bad.err.Err(); err != nil {
return err
}
return bad.blobs.Trash(ctx, ref)
return bad.blobs.Trash(ctx, ref, timestamp)
}

// RestoreTrash restores all files in the trash.
Expand Down
4 changes: 2 additions & 2 deletions storagenode/blobstore/testblobs/slow.go
Expand Up @@ -92,11 +92,11 @@ func (slow *SlowBlobs) OpenWithStorageFormat(ctx context.Context, ref blobstore.
}

// Trash deletes the blob with the namespace and key.
func (slow *SlowBlobs) Trash(ctx context.Context, ref blobstore.BlobRef) error {
func (slow *SlowBlobs) Trash(ctx context.Context, ref blobstore.BlobRef, timestamp time.Time) error {
if err := slow.sleep(ctx); err != nil {
return errs.Wrap(err)
}
return slow.blobs.Trash(ctx, ref)
return slow.blobs.Trash(ctx, ref, timestamp)
}

// RestoreTrash restores all files in the trash.
Expand Down
4 changes: 2 additions & 2 deletions storagenode/pieces/cache.go
Expand Up @@ -289,13 +289,13 @@ func (blobs *BlobsUsageCache) ensurePositiveCacheValue(value *int64, name string
}

// Trash moves the ref to the trash and updates the cache.
func (blobs *BlobsUsageCache) Trash(ctx context.Context, blobRef blobstore.BlobRef) error {
func (blobs *BlobsUsageCache) Trash(ctx context.Context, blobRef blobstore.BlobRef, timestamp time.Time) error {
pieceTotal, pieceContentSize, err := blobs.pieceSizes(ctx, blobRef)
if err != nil {
return Error.Wrap(err)
}

err = blobs.Blobs.Trash(ctx, blobRef)
err = blobs.Blobs.Trash(ctx, blobRef, timestamp)
if err != nil {
return Error.Wrap(err)
}
Expand Down
8 changes: 4 additions & 4 deletions storagenode/pieces/cache_test.go
Expand Up @@ -224,7 +224,7 @@ func TestCacheServiceRun(t *testing.T) {
_, err = w.Write(testrand.Bytes(expTrashSize))
require.NoError(t, err)
require.NoError(t, w.Commit(ctx))
require.NoError(t, store.Trash(ctx, trashRef)) // trash it
require.NoError(t, store.Trash(ctx, trashRef, time.Now())) // trash it

// Now instantiate the cache
cache := pieces.NewBlobsUsageCache(log, store)
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestCacheServiceRun_LazyFilewalker(t *testing.T) {
_, err = w.Write(testrand.Bytes(expTrashSize))
require.NoError(t, err)
require.NoError(t, w.Commit(ctx))
require.NoError(t, store.Trash(ctx, trashRef)) // trash it
require.NoError(t, store.Trash(ctx, trashRef, time.Now())) // trash it

// Set up the lazy filewalker
cfg := pieces.DefaultConfig
Expand Down Expand Up @@ -732,7 +732,7 @@ func TestCacheCreateDeleteAndTrash(t *testing.T) {
fileInfo, err := blobInfo.Stat(ctx)
require.NoError(t, err)
ref0Size := fileInfo.Size()
err = cache.Trash(ctx, refs[0])
err = cache.Trash(ctx, refs[0], time.Now())
require.NoError(t, err)
assertValues("trashed refs[0]", satelliteID, expPieceSize, len(pieceContent), int(ref0Size))

Expand All @@ -742,7 +742,7 @@ func TestCacheCreateDeleteAndTrash(t *testing.T) {
assertValues("restore trash for satellite", satelliteID, expPieceSize*len(refs), len(pieceContent)*len(refs), 0)

// Trash piece again
err = cache.Trash(ctx, refs[0])
err = cache.Trash(ctx, refs[0], time.Now())
require.NoError(t, err)
assertValues("trashed again", satelliteID, expPieceSize, len(pieceContent), int(ref0Size))

Expand Down
2 changes: 1 addition & 1 deletion storagenode/pieces/deleter.go
Expand Up @@ -182,7 +182,7 @@ func (d *Deleter) deleteOrTrash(ctx context.Context, satelliteID storj.NodeID, p
var errMsg string
var infoMsg string
if d.store.config.DeleteToTrash {
err = d.store.Trash(ctx, satelliteID, pieceID)
err = d.store.Trash(ctx, satelliteID, pieceID, time.Now())
errMsg = "could not send delete piece to trash"
infoMsg = "delete piece sent to trash"
} else {
Expand Down
4 changes: 2 additions & 2 deletions storagenode/pieces/store.go
Expand Up @@ -376,7 +376,7 @@ func (store *Store) DeleteSatelliteBlobs(ctx context.Context, satellite storj.No
// Trash moves the specified piece to the blob trash. If necessary, it converts
// the v0 piece to a v1 piece. It also marks the item as "trashed" in the
// pieceExpirationDB.
func (store *Store) Trash(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error) {
func (store *Store) Trash(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, timestamp time.Time) (err error) {
defer mon.Task()(&ctx)(&err)

// Check if the MaxFormatVersionSupported piece exists. If not, we assume
Expand Down Expand Up @@ -404,7 +404,7 @@ func (store *Store) Trash(ctx context.Context, satellite storj.NodeID, pieceID s
err = errs.Combine(err, store.blobs.Trash(ctx, blobstore.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
}))
}, timestamp))

return Error.Wrap(err)
}
Expand Down
8 changes: 2 additions & 6 deletions storagenode/pieces/store_test.go
Expand Up @@ -384,12 +384,8 @@ func TestTrashAndRestore(t *testing.T) {

}

trashDurToUse := piece.trashDur
dir.ReplaceTrashnow(func() time.Time {
return time.Now().Add(-trashDurToUse)
})
// Trash the piece
require.NoError(t, store.Trash(ctx, satellite.satelliteID, piece.pieceID))
require.NoError(t, store.Trash(ctx, satellite.satelliteID, piece.pieceID, time.Now().Add(-piece.trashDur)))

// Confirm is missing
r, err := store.Reader(ctx, satellite.satelliteID, piece.pieceID)
Expand Down Expand Up @@ -446,7 +442,7 @@ func TestTrashAndRestore(t *testing.T) {
verifyPieceData(ctx, t, tStore, satellites[0].satelliteID, piece.pieceID, filestore.MaxFormatVersionSupported, lastFile.data, piece.expiration, publicKey)
} else {
// Expect the piece to be missing, it should be removed from the trash on EmptyTrash
r, err := store.Reader(ctx, satellites[1].satelliteID, piece.pieceID)
r, err := store.Reader(ctx, satellites[0].satelliteID, piece.pieceID)
require.Error(t, err)
require.Nil(t, r)
}
Expand Down
2 changes: 1 addition & 1 deletion storagenode/piecestore/endpoint_test.go
Expand Up @@ -351,7 +351,7 @@ func TestDownload(t *testing.T) {
// upload another piece that we will trash
trashPieceID := storj.PieceID{3}
trashPieceData, _, _ := uploadPiece(t, ctx, trashPieceID, planet.StorageNodes[0], planet.Uplinks[0], planet.Satellites[0])
err := planet.StorageNodes[0].Storage2.Store.Trash(ctx, planet.Satellites[0].ID(), trashPieceID)
err := planet.StorageNodes[0].Storage2.Store.Trash(ctx, planet.Satellites[0].ID(), trashPieceID, time.Now())
require.NoError(t, err)
_, err = planet.StorageNodes[0].Storage2.Store.Stat(ctx, planet.Satellites[0].ID(), trashPieceID)
require.Equal(t, true, errs.Is(err, os.ErrNotExist))
Expand Down
6 changes: 3 additions & 3 deletions storagenode/retain/retain.go
Expand Up @@ -338,7 +338,7 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {

// if retain status is enabled, delete pieceid
if s.config.Status == Enabled {
if err = s.trash(ctx, satelliteID, pieceID); err != nil {
if err = s.trash(ctx, satelliteID, pieceID, started); err != nil {
s.log.Warn("failed to delete piece",
zap.Stringer("Satellite ID", satelliteID),
zap.Stringer("Piece ID", pieceID),
Expand Down Expand Up @@ -368,9 +368,9 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {
}

// trash wraps retains piece deletion to monitor moving retained piece to trash error during garbage collection.
func (s *Service) trash(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) {
func (s *Service) trash(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, timestamp time.Time) (err error) {
defer mon.Task()(&ctx, satelliteID)(&err)
return s.store.Trash(ctx, satelliteID, pieceID)
return s.store.Trash(ctx, satelliteID, pieceID, timestamp)
}

// HowManyQueued peeks at the number of bloom filters queued.
Expand Down

0 comments on commit 96dd51b

Please sign in to comment.