Skip to content

Commit

Permalink
storagenode/pieces: make a lazy filewalker for trash emptying
Browse files Browse the repository at this point in the history
Emptying the trash is a huge and io-consuming task on many nodes. This
change allows use of a lazy filewalker (filewalker in a separate process
with low io priority) for emptying the trash.

Refs: #6609
Change-Id: I354d5af4cbf3730900f13e498ddcb2a3044dcf86
  • Loading branch information
thepaul authored and Storj Robot committed Mar 5, 2024
1 parent 2e81bc6 commit b1dd96d
Show file tree
Hide file tree
Showing 13 changed files with 381 additions and 48 deletions.
50 changes: 47 additions & 3 deletions cmd/storagenode/internalcmd/cmd_lazy_filewalker.go
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"net/url"
"os"
"sync"

"github.com/spf13/cobra"
"go.uber.org/zap"
Expand Down Expand Up @@ -52,10 +53,20 @@ type RunOptions struct {
type LazyFilewalkerCmd struct {
Command *cobra.Command
*RunOptions
originalPreRunE func(cmd *cobra.Command, args []string) error
}

var _ execwrapper.Command = (*LazyFilewalkerCmd)(nil)

// NewLazyFilewalkerCmd creates a new instance of LazyFilewalkerCmd.
func NewLazyFilewalkerCmd(command *cobra.Command, opts *RunOptions) *LazyFilewalkerCmd {
return &LazyFilewalkerCmd{
Command: command,
RunOptions: opts,
originalPreRunE: command.PreRunE,
}
}

// SetArgs sets arguments for the command.
// The command or executable path should be passed as the first argument.
func (cmd *LazyFilewalkerCmd) SetArgs(args []string) {
Expand All @@ -67,8 +78,38 @@ func (cmd *LazyFilewalkerCmd) SetArgs(args []string) {
cmd.Command.SetArgs(args)
}

var (
// cobraMutex should be held while manually invoking *cobra.Command
// instances. It may be released after the invocation is complete, or once
// control is returned to caller code (i.e., in the Run methods).
//
// All of this silliness is simply to avoid running the first part of
// (*cobra.Command).ExecuteC() at the same time in multiple goroutines. It
// is not technically thread-safe. The data that is affected by the race
// condition does not matter for our purposes, so we aren't worried about
// that, but we also don't want to upset the race detector when we are
// running multiple tests that might invoke our commands in parallel.
cobraMutex sync.Mutex
)

// Run runs the LazyFileWalker.
func (cmd *LazyFilewalkerCmd) Run() error {
cobraMutex.Lock()
wasUnlockedByPreRun := false
defer func() {
if !wasUnlockedByPreRun {
cobraMutex.Unlock()
}
}()
wrappedPreRun := cmd.originalPreRunE
if wrappedPreRun == nil {
wrappedPreRun = func(cmd *cobra.Command, args []string) error { return nil }
}
cmd.Command.PreRunE = func(cmd *cobra.Command, args []string) error {
cobraMutex.Unlock()
wasUnlockedByPreRun = true
return wrappedPreRun(cmd, args)
}
return cmd.Command.ExecuteContext(cmd.Ctx)
}

Expand Down Expand Up @@ -136,9 +177,12 @@ func (r *RunOptions) tryCreateNewLogger() {
})

// this error is expected if the sink is already registered.
duplicateSinkErr := fmt.Errorf("sink factory already registered for scheme %q", writerkey)
if err != nil && err.Error() != duplicateSinkErr.Error() {
r.Logger.Error("failed to register logger sink", zap.Error(err))
if err != nil {
if err.Error() == fmt.Sprintf("sink factory already registered for scheme %q", writerkey) {
r.Logger.Info("logger sink already registered")
} else {
r.Logger.Error("failed to register logger sink", zap.Error(err))
}
return
}

Expand Down
5 changes: 1 addition & 4 deletions cmd/storagenode/internalcmd/gc_filewalker.go
Expand Up @@ -42,10 +42,7 @@ func NewGCFilewalkerCmd() *LazyFilewalkerCmd {

process.Bind(cmd, &cfg)

return &LazyFilewalkerCmd{
Command: cmd,
RunOptions: &runOpts,
}
return NewLazyFilewalkerCmd(cmd, &runOpts)
}

// Run runs the GCLazyFileWalker.
Expand Down
107 changes: 107 additions & 0 deletions cmd/storagenode/internalcmd/trash_filewalker.go
@@ -0,0 +1,107 @@
// Copyright (C) 2024 Storj Labs, Inc.
// See LICENSE for copying information.

package internalcmd

import (
"encoding/json"
"runtime"

"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"

"storj.io/common/process"
"storj.io/storj/storagenode/iopriority"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/pieces/lazyfilewalker"
"storj.io/storj/storagenode/storagenodedb"
)

// NewTrashFilewalkerCmd creates a new cobra command for running a trash cleanup filewalker.
func NewTrashFilewalkerCmd() *LazyFilewalkerCmd {
var cfg FilewalkerCfg
var runOpts RunOptions

cmd := &cobra.Command{
Use: lazyfilewalker.TrashCleanupFilewalkerCmdName,
Short: "An internal subcommand used to run a trash cleanup filewalker as a separate subprocess with lower IO priority",
RunE: func(cmd *cobra.Command, args []string) error {
runOpts.normalize(cmd)
runOpts.config = &cfg

return trashCmdRun(&runOpts)
},
FParseErrWhitelist: cobra.FParseErrWhitelist{
UnknownFlags: true,
},
Hidden: true,
Args: cobra.ExactArgs(0),
}

process.Bind(cmd, &cfg)

return NewLazyFilewalkerCmd(cmd, &runOpts)
}

// trashCmdRun runs the TrashLazyFileWalker.
func trashCmdRun(opts *RunOptions) (err error) {
if opts.config.LowerIOPriority {
if runtime.GOOS == "linux" {
// Pin the current goroutine to the current OS thread, so we can set the IO priority
// for the current thread.
// This is necessary because Go does use CLONE_IO when creating new threads,
// so they do not share a single IO context.
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

err = iopriority.SetLowIOPriority()
if err != nil {
return err
}
}

log := opts.Logger

// Decode the data struct received from the main process
var req lazyfilewalker.TrashCleanupRequest
if err = json.NewDecoder(opts.stdin).Decode(&req); err != nil {
return errs.New("Error decoding data from stdin: %v", err)
}

// Validate the request data
switch {
case req.SatelliteID.IsZero():
return errs.New("SatelliteID is required")
case req.DateBefore.IsZero():
return errs.New("DateBefore is required")
}

log.Info("trash-filewalker started", zap.Time("dateBefore", req.DateBefore))

db, err := storagenodedb.OpenExisting(opts.Ctx, log.Named("db"), opts.config.DatabaseConfig())
if err != nil {
return errs.New("Error starting master database on storage node: %v", err)
}
log.Info("Database started")
defer func() {
err = errs.Combine(err, db.Close())
}()

filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo())
bytesDeleted, keysDeleted, err := filewalker.WalkCleanupTrash(opts.Ctx, req.SatelliteID, req.DateBefore)
if err != nil {
return err
}

resp := lazyfilewalker.TrashCleanupResponse{
BytesDeleted: bytesDeleted,
KeysDeleted: keysDeleted,
}

log.Info("trash-filewalker completed", zap.Int64("bytesDeleted", bytesDeleted), zap.Int("numKeysDeleted", len(keysDeleted)))

// encode the response struct and write it to stdout
return json.NewEncoder(opts.stdout).Encode(resp)
}
5 changes: 1 addition & 4 deletions cmd/storagenode/internalcmd/used_space_filewalker.go
Expand Up @@ -41,10 +41,7 @@ func NewUsedSpaceFilewalkerCmd() *LazyFilewalkerCmd {

process.Bind(cmd, &cfg)

return &LazyFilewalkerCmd{
Command: cmd,
RunOptions: &runOpts,
}
return NewLazyFilewalkerCmd(cmd, &runOpts)
}

// Run runs the UsedSpaceLazyFileWalker.
Expand Down
2 changes: 1 addition & 1 deletion cmd/storagenode/main.go
Expand Up @@ -38,5 +38,5 @@ func main() {
}

func isFilewalkerCommand() bool {
return len(os.Args) > 1 && (os.Args[1] == lazyfilewalker.UsedSpaceFilewalkerCmdName || os.Args[1] == lazyfilewalker.GCFilewalkerCmdName)
return len(os.Args) > 1 && (os.Args[1] == lazyfilewalker.UsedSpaceFilewalkerCmdName || os.Args[1] == lazyfilewalker.GCFilewalkerCmdName || os.Args[1] == lazyfilewalker.TrashCleanupFilewalkerCmdName)
}
1 change: 1 addition & 0 deletions cmd/storagenode/root.go
Expand Up @@ -64,6 +64,7 @@ func newRootCmd(setDefaults bool) (*cobra.Command, *Factory) {
// internal hidden commands
internalcmd.NewUsedSpaceFilewalkerCmd().Command,
internalcmd.NewGCFilewalkerCmd().Command,
internalcmd.NewTrashFilewalkerCmd().Command,
)

return cmd, factory
Expand Down
7 changes: 7 additions & 0 deletions private/testplanet/storagenode.go
Expand Up @@ -306,6 +306,13 @@ func (planet *Planet) newStorageNode(ctx context.Context, prefix string, index,
cmd.Ctx = ctx
peer.Storage2.LazyFileWalker.TestingSetGCCmd(cmd)
}
{
// set up the trash cleanup lazyfilewalker filewalker
cmd := internalcmd.NewTrashFilewalkerCmd()
cmd.Logger = log.Named("trash-filewalker")
cmd.Ctx = ctx
peer.Storage2.LazyFileWalker.TestingSetTrashCleanupCmd(cmd)
}

return &StorageNode{
Name: prefix,
Expand Down
6 changes: 5 additions & 1 deletion storagenode/blobstore/filestore/store.go
Expand Up @@ -175,7 +175,11 @@ func (store *blobStore) RestoreTrash(ctx context.Context, namespace []byte) (key
// in the trash or could not be restored.
func (store *blobStore) TryRestoreTrashBlob(ctx context.Context, ref blobstore.BlobRef) (err error) {
defer mon.Task()(&ctx)(&err)
return Error.Wrap(store.dir.TryRestoreTrashBlob(ctx, ref))
err = store.dir.TryRestoreTrashBlob(ctx, ref)
if os.IsNotExist(err) {
return err
}
return Error.Wrap(err)
}

// EmptyTrash removes files in trash that have been there since before trashedBefore.
Expand Down
22 changes: 22 additions & 0 deletions storagenode/pieces/filewalker.go
Expand Up @@ -190,3 +190,25 @@ func (fw *FileWalker) WalkSatellitePiecesToTrash(ctx context.Context, satelliteI

return pieceIDs, piecesCount, piecesSkipped, errFileWalker.Wrap(err)
}

// WalkCleanupTrash looks at all trash per-day directories owned by the given satellite and
// recursively deletes any of them that correspond to a time before the given dateBefore.
//
// This method returns the number of blobs deleted, the total count of bytes occupied by those
// deleted blobs, and the number of bytes which were freed by the deletion (including filesystem
// overhead).
func (fw *FileWalker) WalkCleanupTrash(ctx context.Context, satelliteID storj.NodeID, dateBefore time.Time) (bytesDeleted int64, keysDeleted []storj.PieceID, err error) {
defer mon.Task()(&ctx)(&err)

bytesDeleted, deletedKeysList, err := fw.blobs.EmptyTrash(ctx, satelliteID[:], dateBefore)
keysDeleted = make([]storj.PieceID, 0, len(deletedKeysList))
for _, dk := range deletedKeysList {
pieceID, parseErr := storj.PieceIDFromBytes(dk)
if parseErr != nil {
fw.log.Error("stored blob has invalid pieceID", zap.ByteString("deletedKey", dk), zap.Error(parseErr))
continue
}
keysDeleted = append(keysDeleted, pieceID)
}
return bytesDeleted, keysDeleted, err
}
61 changes: 52 additions & 9 deletions storagenode/pieces/lazyfilewalker/supervisor.go
Expand Up @@ -21,6 +21,8 @@ const (
UsedSpaceFilewalkerCmdName = "used-space-filewalker"
// GCFilewalkerCmdName is the name of the gc-filewalker subcommand.
GCFilewalkerCmdName = "gc-filewalker"
// TrashCleanupFilewalkerCmdName is the name of the trash-cleanup-filewalker subcommand.
TrashCleanupFilewalkerCmdName = "trash-cleanup-filewalker"
)

var (
Expand All @@ -36,21 +38,24 @@ var (
type Supervisor struct {
log *zap.Logger

executable string
gcArgs []string
usedSpaceArgs []string
executable string
gcArgs []string
usedSpaceArgs []string
trashCleanupArgs []string

testingGCCmd execwrapper.Command
testingUsedSpaceCmd execwrapper.Command
testingGCCmd execwrapper.Command
testingUsedSpaceCmd execwrapper.Command
testingTrashCleanupCmd execwrapper.Command
}

// NewSupervisor creates a new lazy filewalker Supervisor.
func NewSupervisor(log *zap.Logger, config Config, executable string) *Supervisor {
return &Supervisor{
log: log,
gcArgs: append([]string{GCFilewalkerCmdName}, config.Args()...),
usedSpaceArgs: append([]string{UsedSpaceFilewalkerCmdName}, config.Args()...),
executable: executable,
log: log,
gcArgs: append([]string{GCFilewalkerCmdName}, config.Args()...),
usedSpaceArgs: append([]string{UsedSpaceFilewalkerCmdName}, config.Args()...),
trashCleanupArgs: append([]string{TrashCleanupFilewalkerCmdName}, config.Args()...),
executable: executable,
}
}

Expand All @@ -66,6 +71,12 @@ func (fw *Supervisor) TestingSetUsedSpaceCmd(cmd execwrapper.Command) {
fw.testingUsedSpaceCmd = cmd
}

// TestingSetTrashCleanupCmd sets the command for the trash cleanup filewalker subprocess.
// The cmd acts as a replacement for the subprocess.
func (fw *Supervisor) TestingSetTrashCleanupCmd(cmd execwrapper.Command) {
fw.testingTrashCleanupCmd = cmd
}

// UsedSpaceRequest is the request struct for the used-space-filewalker process.
type UsedSpaceRequest struct {
SatelliteID storj.NodeID `json:"satelliteID"`
Expand All @@ -91,6 +102,18 @@ type GCFilewalkerResponse struct {
PiecesCount int64 `json:"piecesCount"`
}

// TrashCleanupRequest is the request struct for the trash-cleanup-filewalker process.
type TrashCleanupRequest struct {
SatelliteID storj.NodeID `json:"satelliteID"`
DateBefore time.Time `json:"dateBefore"`
}

// TrashCleanupResponse is the response struct for the trash-cleanup-filewalker process.
type TrashCleanupResponse struct {
BytesDeleted int64 `json:"bytesDeleted"`
KeysDeleted []storj.PieceID `json:"keysDeleted"`
}

// WalkAndComputeSpaceUsedBySatellite returns the total used space by satellite.
func (fw *Supervisor) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal int64, piecesContentSize int64, err error) {
defer mon.Task()(&ctx)(&err)
Expand Down Expand Up @@ -134,3 +157,23 @@ func (fw *Supervisor) WalkSatellitePiecesToTrash(ctx context.Context, satelliteI

return resp.PieceIDs, resp.PiecesCount, resp.PiecesSkippedCount, nil
}

// WalkCleanupTrash deletes per-day trash directories which are older than the given time.
func (fw *Supervisor) WalkCleanupTrash(ctx context.Context, satelliteID storj.NodeID, dateBefore time.Time) (bytesDeleted int64, keysDeleted []storj.PieceID, err error) {
defer mon.Task()(&ctx)(&err)

req := TrashCleanupRequest{
SatelliteID: satelliteID,
DateBefore: dateBefore,
}
var resp TrashCleanupResponse

log := fw.log.Named(TrashCleanupFilewalkerCmdName).With(zap.String("satelliteID", satelliteID.String()))

err = newProcess(fw.testingTrashCleanupCmd, log, fw.executable, fw.trashCleanupArgs).run(ctx, req, &resp)
if err != nil {
return 0, nil, err
}

return resp.BytesDeleted, resp.KeysDeleted, nil
}

0 comments on commit b1dd96d

Please sign in to comment.