diff --git a/cmd/storagenode/internalcmd/cmd_lazy_filewalker.go b/cmd/storagenode/internalcmd/cmd_lazy_filewalker.go index f1cdf0e6b9c1..c79099ac9c3b 100644 --- a/cmd/storagenode/internalcmd/cmd_lazy_filewalker.go +++ b/cmd/storagenode/internalcmd/cmd_lazy_filewalker.go @@ -10,6 +10,7 @@ import ( "io" "net/url" "os" + "sync" "github.com/spf13/cobra" "go.uber.org/zap" @@ -52,10 +53,20 @@ type RunOptions struct { type LazyFilewalkerCmd struct { Command *cobra.Command *RunOptions + originalPreRunE func(cmd *cobra.Command, args []string) error } var _ execwrapper.Command = (*LazyFilewalkerCmd)(nil) +// NewLazyFilewalkerCmd creates a new instance of LazyFilewalkerCmd. +func NewLazyFilewalkerCmd(command *cobra.Command, opts *RunOptions) *LazyFilewalkerCmd { + return &LazyFilewalkerCmd{ + Command: command, + RunOptions: opts, + originalPreRunE: command.PreRunE, + } +} + // SetArgs sets arguments for the command. // The command or executable path should be passed as the first argument. func (cmd *LazyFilewalkerCmd) SetArgs(args []string) { @@ -67,8 +78,38 @@ func (cmd *LazyFilewalkerCmd) SetArgs(args []string) { cmd.Command.SetArgs(args) } +var ( + // cobraMutex should be held while manually invoking *cobra.Command + // instances. It may be released after the invocation is complete, or once + // control is returned to caller code (i.e., in the Run methods). + // + // All of this silliness is simply to avoid running the first part of + // (*cobra.Command).ExecuteC() at the same time in multiple goroutines. It + // is not technically thread-safe. The data that is affected by the race + // condition does not matter for our purposes, so we aren't worried about + // that, but we also don't want to upset the race detector when we are + // running multiple tests that might invoke our commands in parallel. + cobraMutex sync.Mutex +) + // Run runs the LazyFileWalker. func (cmd *LazyFilewalkerCmd) Run() error { + cobraMutex.Lock() + wasUnlockedByPreRun := false + defer func() { + if !wasUnlockedByPreRun { + cobraMutex.Unlock() + } + }() + wrappedPreRun := cmd.originalPreRunE + if wrappedPreRun == nil { + wrappedPreRun = func(cmd *cobra.Command, args []string) error { return nil } + } + cmd.Command.PreRunE = func(cmd *cobra.Command, args []string) error { + cobraMutex.Unlock() + wasUnlockedByPreRun = true + return wrappedPreRun(cmd, args) + } return cmd.Command.ExecuteContext(cmd.Ctx) } @@ -136,9 +177,12 @@ func (r *RunOptions) tryCreateNewLogger() { }) // this error is expected if the sink is already registered. - duplicateSinkErr := fmt.Errorf("sink factory already registered for scheme %q", writerkey) - if err != nil && err.Error() != duplicateSinkErr.Error() { - r.Logger.Error("failed to register logger sink", zap.Error(err)) + if err != nil { + if err.Error() == fmt.Sprintf("sink factory already registered for scheme %q", writerkey) { + r.Logger.Info("logger sink already registered") + } else { + r.Logger.Error("failed to register logger sink", zap.Error(err)) + } return } diff --git a/cmd/storagenode/internalcmd/gc_filewalker.go b/cmd/storagenode/internalcmd/gc_filewalker.go index 22a8faa4b1da..fdd66ac44971 100644 --- a/cmd/storagenode/internalcmd/gc_filewalker.go +++ b/cmd/storagenode/internalcmd/gc_filewalker.go @@ -42,10 +42,7 @@ func NewGCFilewalkerCmd() *LazyFilewalkerCmd { process.Bind(cmd, &cfg) - return &LazyFilewalkerCmd{ - Command: cmd, - RunOptions: &runOpts, - } + return NewLazyFilewalkerCmd(cmd, &runOpts) } // Run runs the GCLazyFileWalker. diff --git a/cmd/storagenode/internalcmd/trash_filewalker.go b/cmd/storagenode/internalcmd/trash_filewalker.go new file mode 100644 index 000000000000..c07634f1c215 --- /dev/null +++ b/cmd/storagenode/internalcmd/trash_filewalker.go @@ -0,0 +1,107 @@ +// Copyright (C) 2024 Storj Labs, Inc. +// See LICENSE for copying information. + +package internalcmd + +import ( + "encoding/json" + "runtime" + + "github.com/spf13/cobra" + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/common/process" + "storj.io/storj/storagenode/iopriority" + "storj.io/storj/storagenode/pieces" + "storj.io/storj/storagenode/pieces/lazyfilewalker" + "storj.io/storj/storagenode/storagenodedb" +) + +// NewTrashFilewalkerCmd creates a new cobra command for running a trash cleanup filewalker. +func NewTrashFilewalkerCmd() *LazyFilewalkerCmd { + var cfg FilewalkerCfg + var runOpts RunOptions + + cmd := &cobra.Command{ + Use: lazyfilewalker.TrashCleanupFilewalkerCmdName, + Short: "An internal subcommand used to run a trash cleanup filewalker as a separate subprocess with lower IO priority", + RunE: func(cmd *cobra.Command, args []string) error { + runOpts.normalize(cmd) + runOpts.config = &cfg + + return trashCmdRun(&runOpts) + }, + FParseErrWhitelist: cobra.FParseErrWhitelist{ + UnknownFlags: true, + }, + Hidden: true, + Args: cobra.ExactArgs(0), + } + + process.Bind(cmd, &cfg) + + return NewLazyFilewalkerCmd(cmd, &runOpts) +} + +// trashCmdRun runs the TrashLazyFileWalker. +func trashCmdRun(opts *RunOptions) (err error) { + if opts.config.LowerIOPriority { + if runtime.GOOS == "linux" { + // Pin the current goroutine to the current OS thread, so we can set the IO priority + // for the current thread. + // This is necessary because Go does use CLONE_IO when creating new threads, + // so they do not share a single IO context. + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } + + err = iopriority.SetLowIOPriority() + if err != nil { + return err + } + } + + log := opts.Logger + + // Decode the data struct received from the main process + var req lazyfilewalker.TrashCleanupRequest + if err = json.NewDecoder(opts.stdin).Decode(&req); err != nil { + return errs.New("Error decoding data from stdin: %v", err) + } + + // Validate the request data + switch { + case req.SatelliteID.IsZero(): + return errs.New("SatelliteID is required") + case req.DateBefore.IsZero(): + return errs.New("DateBefore is required") + } + + log.Info("trash-filewalker started", zap.Time("dateBefore", req.DateBefore)) + + db, err := storagenodedb.OpenExisting(opts.Ctx, log.Named("db"), opts.config.DatabaseConfig()) + if err != nil { + return errs.New("Error starting master database on storage node: %v", err) + } + log.Info("Database started") + defer func() { + err = errs.Combine(err, db.Close()) + }() + + filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo()) + bytesDeleted, keysDeleted, err := filewalker.WalkCleanupTrash(opts.Ctx, req.SatelliteID, req.DateBefore) + if err != nil { + return err + } + + resp := lazyfilewalker.TrashCleanupResponse{ + BytesDeleted: bytesDeleted, + KeysDeleted: keysDeleted, + } + + log.Info("trash-filewalker completed", zap.Int64("bytesDeleted", bytesDeleted), zap.Int("numKeysDeleted", len(keysDeleted))) + + // encode the response struct and write it to stdout + return json.NewEncoder(opts.stdout).Encode(resp) +} diff --git a/cmd/storagenode/internalcmd/used_space_filewalker.go b/cmd/storagenode/internalcmd/used_space_filewalker.go index fe51ff2da2ea..0c35a32bf12f 100644 --- a/cmd/storagenode/internalcmd/used_space_filewalker.go +++ b/cmd/storagenode/internalcmd/used_space_filewalker.go @@ -41,10 +41,7 @@ func NewUsedSpaceFilewalkerCmd() *LazyFilewalkerCmd { process.Bind(cmd, &cfg) - return &LazyFilewalkerCmd{ - Command: cmd, - RunOptions: &runOpts, - } + return NewLazyFilewalkerCmd(cmd, &runOpts) } // Run runs the UsedSpaceLazyFileWalker. diff --git a/cmd/storagenode/main.go b/cmd/storagenode/main.go index 58fa53cc8f05..454888222bbd 100644 --- a/cmd/storagenode/main.go +++ b/cmd/storagenode/main.go @@ -38,5 +38,5 @@ func main() { } func isFilewalkerCommand() bool { - return len(os.Args) > 1 && (os.Args[1] == lazyfilewalker.UsedSpaceFilewalkerCmdName || os.Args[1] == lazyfilewalker.GCFilewalkerCmdName) + return len(os.Args) > 1 && (os.Args[1] == lazyfilewalker.UsedSpaceFilewalkerCmdName || os.Args[1] == lazyfilewalker.GCFilewalkerCmdName || os.Args[1] == lazyfilewalker.TrashCleanupFilewalkerCmdName) } diff --git a/cmd/storagenode/root.go b/cmd/storagenode/root.go index 3dc7caf21ef5..f37ebaee50d8 100644 --- a/cmd/storagenode/root.go +++ b/cmd/storagenode/root.go @@ -64,6 +64,7 @@ func newRootCmd(setDefaults bool) (*cobra.Command, *Factory) { // internal hidden commands internalcmd.NewUsedSpaceFilewalkerCmd().Command, internalcmd.NewGCFilewalkerCmd().Command, + internalcmd.NewTrashFilewalkerCmd().Command, ) return cmd, factory diff --git a/private/testplanet/storagenode.go b/private/testplanet/storagenode.go index e3ace2533356..d22b7070fef3 100644 --- a/private/testplanet/storagenode.go +++ b/private/testplanet/storagenode.go @@ -306,6 +306,13 @@ func (planet *Planet) newStorageNode(ctx context.Context, prefix string, index, cmd.Ctx = ctx peer.Storage2.LazyFileWalker.TestingSetGCCmd(cmd) } + { + // set up the trash cleanup lazyfilewalker filewalker + cmd := internalcmd.NewTrashFilewalkerCmd() + cmd.Logger = log.Named("trash-filewalker") + cmd.Ctx = ctx + peer.Storage2.LazyFileWalker.TestingSetTrashCleanupCmd(cmd) + } return &StorageNode{ Name: prefix, diff --git a/storagenode/blobstore/filestore/store.go b/storagenode/blobstore/filestore/store.go index 953895ce163f..f5ff9f3d7bb3 100644 --- a/storagenode/blobstore/filestore/store.go +++ b/storagenode/blobstore/filestore/store.go @@ -175,7 +175,11 @@ func (store *blobStore) RestoreTrash(ctx context.Context, namespace []byte) (key // in the trash or could not be restored. func (store *blobStore) TryRestoreTrashBlob(ctx context.Context, ref blobstore.BlobRef) (err error) { defer mon.Task()(&ctx)(&err) - return Error.Wrap(store.dir.TryRestoreTrashBlob(ctx, ref)) + err = store.dir.TryRestoreTrashBlob(ctx, ref) + if os.IsNotExist(err) { + return err + } + return Error.Wrap(err) } // EmptyTrash removes files in trash that have been there since before trashedBefore. diff --git a/storagenode/pieces/filewalker.go b/storagenode/pieces/filewalker.go index cb7c88422069..9d183fc1366d 100644 --- a/storagenode/pieces/filewalker.go +++ b/storagenode/pieces/filewalker.go @@ -190,3 +190,25 @@ func (fw *FileWalker) WalkSatellitePiecesToTrash(ctx context.Context, satelliteI return pieceIDs, piecesCount, piecesSkipped, errFileWalker.Wrap(err) } + +// WalkCleanupTrash looks at all trash per-day directories owned by the given satellite and +// recursively deletes any of them that correspond to a time before the given dateBefore. +// +// This method returns the number of blobs deleted, the total count of bytes occupied by those +// deleted blobs, and the number of bytes which were freed by the deletion (including filesystem +// overhead). +func (fw *FileWalker) WalkCleanupTrash(ctx context.Context, satelliteID storj.NodeID, dateBefore time.Time) (bytesDeleted int64, keysDeleted []storj.PieceID, err error) { + defer mon.Task()(&ctx)(&err) + + bytesDeleted, deletedKeysList, err := fw.blobs.EmptyTrash(ctx, satelliteID[:], dateBefore) + keysDeleted = make([]storj.PieceID, 0, len(deletedKeysList)) + for _, dk := range deletedKeysList { + pieceID, parseErr := storj.PieceIDFromBytes(dk) + if parseErr != nil { + fw.log.Error("stored blob has invalid pieceID", zap.ByteString("deletedKey", dk), zap.Error(parseErr)) + continue + } + keysDeleted = append(keysDeleted, pieceID) + } + return bytesDeleted, keysDeleted, err +} diff --git a/storagenode/pieces/lazyfilewalker/supervisor.go b/storagenode/pieces/lazyfilewalker/supervisor.go index 739adc304e54..96284cd495c7 100644 --- a/storagenode/pieces/lazyfilewalker/supervisor.go +++ b/storagenode/pieces/lazyfilewalker/supervisor.go @@ -21,6 +21,8 @@ const ( UsedSpaceFilewalkerCmdName = "used-space-filewalker" // GCFilewalkerCmdName is the name of the gc-filewalker subcommand. GCFilewalkerCmdName = "gc-filewalker" + // TrashCleanupFilewalkerCmdName is the name of the trash-cleanup-filewalker subcommand. + TrashCleanupFilewalkerCmdName = "trash-cleanup-filewalker" ) var ( @@ -36,21 +38,24 @@ var ( type Supervisor struct { log *zap.Logger - executable string - gcArgs []string - usedSpaceArgs []string + executable string + gcArgs []string + usedSpaceArgs []string + trashCleanupArgs []string - testingGCCmd execwrapper.Command - testingUsedSpaceCmd execwrapper.Command + testingGCCmd execwrapper.Command + testingUsedSpaceCmd execwrapper.Command + testingTrashCleanupCmd execwrapper.Command } // NewSupervisor creates a new lazy filewalker Supervisor. func NewSupervisor(log *zap.Logger, config Config, executable string) *Supervisor { return &Supervisor{ - log: log, - gcArgs: append([]string{GCFilewalkerCmdName}, config.Args()...), - usedSpaceArgs: append([]string{UsedSpaceFilewalkerCmdName}, config.Args()...), - executable: executable, + log: log, + gcArgs: append([]string{GCFilewalkerCmdName}, config.Args()...), + usedSpaceArgs: append([]string{UsedSpaceFilewalkerCmdName}, config.Args()...), + trashCleanupArgs: append([]string{TrashCleanupFilewalkerCmdName}, config.Args()...), + executable: executable, } } @@ -66,6 +71,12 @@ func (fw *Supervisor) TestingSetUsedSpaceCmd(cmd execwrapper.Command) { fw.testingUsedSpaceCmd = cmd } +// TestingSetTrashCleanupCmd sets the command for the trash cleanup filewalker subprocess. +// The cmd acts as a replacement for the subprocess. +func (fw *Supervisor) TestingSetTrashCleanupCmd(cmd execwrapper.Command) { + fw.testingTrashCleanupCmd = cmd +} + // UsedSpaceRequest is the request struct for the used-space-filewalker process. type UsedSpaceRequest struct { SatelliteID storj.NodeID `json:"satelliteID"` @@ -91,6 +102,18 @@ type GCFilewalkerResponse struct { PiecesCount int64 `json:"piecesCount"` } +// TrashCleanupRequest is the request struct for the trash-cleanup-filewalker process. +type TrashCleanupRequest struct { + SatelliteID storj.NodeID `json:"satelliteID"` + DateBefore time.Time `json:"dateBefore"` +} + +// TrashCleanupResponse is the response struct for the trash-cleanup-filewalker process. +type TrashCleanupResponse struct { + BytesDeleted int64 `json:"bytesDeleted"` + KeysDeleted []storj.PieceID `json:"keysDeleted"` +} + // WalkAndComputeSpaceUsedBySatellite returns the total used space by satellite. func (fw *Supervisor) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal int64, piecesContentSize int64, err error) { defer mon.Task()(&ctx)(&err) @@ -134,3 +157,23 @@ func (fw *Supervisor) WalkSatellitePiecesToTrash(ctx context.Context, satelliteI return resp.PieceIDs, resp.PiecesCount, resp.PiecesSkippedCount, nil } + +// WalkCleanupTrash deletes per-day trash directories which are older than the given time. +func (fw *Supervisor) WalkCleanupTrash(ctx context.Context, satelliteID storj.NodeID, dateBefore time.Time) (bytesDeleted int64, keysDeleted []storj.PieceID, err error) { + defer mon.Task()(&ctx)(&err) + + req := TrashCleanupRequest{ + SatelliteID: satelliteID, + DateBefore: dateBefore, + } + var resp TrashCleanupResponse + + log := fw.log.Named(TrashCleanupFilewalkerCmdName).With(zap.String("satelliteID", satelliteID.String())) + + err = newProcess(fw.testingTrashCleanupCmd, log, fw.executable, fw.trashCleanupArgs).run(ctx, req, &resp) + if err != nil { + return 0, nil, err + } + + return resp.BytesDeleted, resp.KeysDeleted, nil +} diff --git a/storagenode/pieces/store.go b/storagenode/pieces/store.go index ba2bbcd99635..46c5463826d0 100644 --- a/storagenode/pieces/store.go +++ b/storagenode/pieces/store.go @@ -324,10 +324,14 @@ func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID // It returns nil if the piece was restored, or an error if the piece was not in the trash. func (store *Store) TryRestoreTrashPiece(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error) { defer mon.Task()(&ctx)(&err) - return Error.Wrap(store.blobs.TryRestoreTrashBlob(ctx, blobstore.BlobRef{ + err = store.blobs.TryRestoreTrashBlob(ctx, blobstore.BlobRef{ Namespace: satellite.Bytes(), Key: pieceID.Bytes(), - })) + }) + if os.IsNotExist(err) { + return err + } + return Error.Wrap(err) } // Delete deletes the specified piece. @@ -414,20 +418,30 @@ func (store *Store) Trash(ctx context.Context, satellite storj.NodeID, pieceID s func (store *Store) EmptyTrash(ctx context.Context, satelliteID storj.NodeID, trashedBefore time.Time) (err error) { defer mon.Task()(&ctx)(&err) - _, deletedIDs, err := store.blobs.EmptyTrash(ctx, satelliteID[:], trashedBefore) - if err != nil { - return Error.Wrap(err) - } - - for _, deletedID := range deletedIDs { - pieceID, pieceIDErr := storj.PieceIDFromBytes(deletedID) - if pieceIDErr != nil { - return Error.Wrap(pieceIDErr) + var errList errs.Group + if store.config.EnableLazyFilewalker && store.lazyFilewalker != nil { + _, deletedIDs, err := store.lazyFilewalker.WalkCleanupTrash(ctx, satelliteID, trashedBefore) + errList.Add(err) + // the lazyfilewalker has already transmitted PieceIDs; we don't have to parse them + for _, deletedID := range deletedIDs { + _, err := store.expirationInfo.DeleteExpiration(ctx, satelliteID, deletedID) + errList.Add(err) + } + } else { + _, deletedIDs, err := store.blobs.EmptyTrash(ctx, satelliteID[:], trashedBefore) + errList.Add(err) + // we have this answer directly from the blobstore, and must translate the blob keys to PieceIDs + for _, deletedID := range deletedIDs { + pieceID, err := storj.PieceIDFromBytes(deletedID) + if err != nil { + store.log.Error("stored blob has invalid PieceID", zap.ByteString("deletedKey", deletedID), zap.Error(err)) + continue + } + _, err = store.expirationInfo.DeleteExpiration(ctx, satelliteID, pieceID) + errList.Add(err) } - _, deleteErr := store.expirationInfo.DeleteExpiration(ctx, satelliteID, pieceID) - err = errs.Combine(err, deleteErr) } - return Error.Wrap(err) + return Error.Wrap(errList.Err()) } // RestoreTrash restores all pieces in the trash. diff --git a/storagenode/pieces/store_test.go b/storagenode/pieces/store_test.go index dece52773806..6da2b7ec0efc 100644 --- a/storagenode/pieces/store_test.go +++ b/storagenode/pieces/store_test.go @@ -26,10 +26,13 @@ import ( "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" + "storj.io/storj/cmd/storagenode/internalcmd" "storj.io/storj/storagenode" "storj.io/storj/storagenode/blobstore" "storj.io/storj/storagenode/blobstore/filestore" "storj.io/storj/storagenode/pieces" + "storj.io/storj/storagenode/pieces/lazyfilewalker" + "storj.io/storj/storagenode/pieces/lazyfilewalker/execwrapper" "storj.io/storj/storagenode/storagenodedb/storagenodedbtest" "storj.io/storj/storagenode/trust" ) @@ -896,3 +899,109 @@ func TestOverwriteV0WithV1(t *testing.T) { require.NoError(t, err) }) } + +func TestEmptyTrash_lazyFilewalker(t *testing.T) { + storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { + log := zaptest.NewLogger(t) + blobs := db.Pieces() + fw := pieces.NewFileWalker(log, blobs, nil) + cfg := pieces.DefaultConfig + cfg.EnableLazyFilewalker = true + + lazyFwCfg := db.Config().LazyFilewalkerConfig() + lazyFw := lazyfilewalker.NewSupervisor(log, lazyFwCfg, "") + cmd := internalcmd.NewTrashFilewalkerCmd() + cmd.Logger = log.Named("trash-filewalker") + cmd.Ctx = ctx + runCount := 0 + lazyFw.TestingSetTrashCleanupCmd(&execCommandWrapper{Command: cmd, count: &runCount}) + + startTime := time.Now() + store := pieces.NewStore(log, fw, lazyFw, blobs, nil, db.PieceExpirationDB(), db.PieceSpaceUsedDB(), cfg) + + // build some data belonging to multiple satellites and store it in the piecestore + const ( + numSatellites = 2 + numPiecesPerSatellite = 10 + deleteDays = 4 + ) + satellites := make([]storj.NodeID, numSatellites) + pieceIDs := make([][]storj.PieceID, numSatellites) + contents := make([][][]byte, numSatellites) + for sat := 0; sat < numSatellites; sat++ { + satellites[sat] = testrand.NodeID() + pieceIDs[sat] = make([]storj.PieceID, numPiecesPerSatellite) + contents[sat] = make([][]byte, numPiecesPerSatellite) + + for p := 0; p < numPiecesPerSatellite; p++ { + pieceIDs[sat][p] = testrand.PieceID() + contents[sat][p] = testrand.Bytes(10 * memory.KiB) + storeSinglePiece(ctx, t, store, satellites[sat], pieceIDs[sat][p], contents[sat][p]) + } + } + + // trash the pieces on different "days" + for sat, satID := range satellites { + for p, pieceID := range pieceIDs[sat] { + trashNow := startTime.Add(time.Duration(p-numPiecesPerSatellite) * 24 * time.Hour) + err := store.Trash(ctx, satID, pieceID, trashNow) + require.NoError(t, err) + } + } + + // empty the trash + deleteBefore := startTime.Add(-deleteDays * 24 * time.Hour) + for _, satID := range satellites { + err := store.EmptyTrash(ctx, satID, deleteBefore) + require.NoError(t, err) + } + // we should have run the lazy filewalker version of EmptyTrash twice now + assert.Equal(t, 2, runCount) + + // and check that everything is the way we expect + for sat, satID := range satellites { + for p, pieceID := range pieceIDs[sat] { + err := store.TryRestoreTrashPiece(ctx, satID, pieceID) + if shouldBeDeleted(startTime.Add(time.Duration(p-numPiecesPerSatellite)*24*time.Hour), deleteBefore) { + require.Error(t, err) + require.True(t, os.IsNotExist(err), "Expected IsNotExist but got %+v", err) + } else { + require.NoError(t, err) + } + } + } + }) +} + +func storeSinglePiece(ctx *testcontext.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, data []byte) { + writer, err := store.Writer(ctx, satelliteID, pieceID, pb.PieceHashAlgorithm_SHA256) + require.NoError(t, err) + + _, err = io.Copy(writer, bytes.NewReader(data)) + require.NoError(t, err) + + // commit + require.NoError(t, writer.Commit(ctx, &pb.PieceHeader{ + Hash: writer.Hash(), + HashAlgorithm: pb.PieceHashAlgorithm_SHA256, + })) +} + +func shouldBeDeleted(trashTime, deleteBefore time.Time) bool { + return !trashTime.After(deleteBefore.Add(-24 * time.Hour)) +} + +type execCommandWrapper struct { + execwrapper.Command + count *int +} + +func (w *execCommandWrapper) Start() error { + *w.count++ + return w.Command.Start() +} + +func (w *execCommandWrapper) Run() error { + *w.count++ + return w.Command.Run() +} diff --git a/storagenode/pieces/trashchore.go b/storagenode/pieces/trashchore.go index 2e082b7832bf..d760ae6afa2d 100644 --- a/storagenode/pieces/trashchore.go +++ b/storagenode/pieces/trashchore.go @@ -30,8 +30,6 @@ type TrashChore struct { mu sync.Mutex done bool satellites map[storj.NodeID]*sync2.Workplace - - minTimeStamp map[storj.NodeID]time.Time } const ( @@ -51,8 +49,6 @@ func NewTrashChore(log *zap.Logger, choreInterval, trashExpiryInterval time.Dura Cycle: sync2.NewCycle(choreInterval), satellites: map[storj.NodeID]*sync2.Workplace{}, - - minTimeStamp: map[storj.NodeID]time.Time{}, } } @@ -69,14 +65,6 @@ func (chore *TrashChore) Run(ctx context.Context) (err error) { var wg sync.WaitGroup limiter := make(chan struct{}, 1) for _, satellite := range chore.trust.GetSatellites(ctx) { - chore.mu.Lock() - nextViableRun := chore.minTimeStamp[satellite].Add(chore.trashExpiryInterval) - chore.mu.Unlock() - if time.Now().Before(nextViableRun) { - chore.log.Info("skipping emptying trash", zap.Stringer("Satellite ID", satellite), zap.Time("Next viable run", nextViableRun)) - continue - } - satellite := satellite place := chore.ensurePlace(satellite) wg.Add(1)