Skip to content

Commit

Permalink
storagenode/{piece,blob}store: avoid extra trash checks
Browse files Browse the repository at this point in the history
We check to see if missing pieces/blobs are in the trash so that we can
get an idea via metrics if things are going into the trash incorrectly.
However, we've been doing that twice for every time we get a not-found
error, which is unnecessary.

The first check happens in the blobstore. We currently use that to
increment a metric counter, then we throw the information away.
Afterward, at the piecestore endpoint layer, we check again, then try to
restore the blob out of the trash if possible.

Instead of doing that, we will remove the check from the blobstore, then
increment the same metric counter from the piecestore side if
appropriate.

Change-Id: I4513baf5a98c1df11d0d903ddf6658d024176303
  • Loading branch information
thepaul authored and Storj Robot committed Feb 16, 2024
1 parent 63a81ec commit 121de29
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 29 deletions.
23 changes: 1 addition & 22 deletions storagenode/blobstore/filestore/dir.go
Expand Up @@ -173,18 +173,6 @@ func (dir *Dir) refToDirPath(ref blobstore.BlobRef, subDir string) (string, erro
return filepath.Join(subDir, namespace, key[:2], key[2:]), nil
}

// fileConfirmedInTrash returns true if it is able to confirm the file is in
// the trash. On errors, or if the file is not in the trash, it returns false.
func (dir *Dir) fileConfirmedInTrash(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) bool {
trashBasePath, err := dir.refToDirPath(ref, dir.trashdir())
if err != nil {
return false
}
trashVerPath := blobPathForFormatVersion(trashBasePath, formatVer)
_, err = os.Stat(trashVerPath)
return err == nil
}

// blobPathForFormatVersion adjusts a bare blob path (as might have been generated by a call to
// blobToBasePath()) to what it should be for the given storage format version.
func blobPathForFormatVersion(path string, formatVersion blobstore.FormatVersion) string {
Expand Down Expand Up @@ -258,12 +246,7 @@ func (dir *Dir) Open(ctx context.Context, ref blobstore.BlobRef) (_ *os.File, _
if err == nil {
return file, formatVer, nil
}
if os.IsNotExist(err) {
// Check and monitor if the file is in the trash
if dir.fileConfirmedInTrash(ctx, ref, formatVer) {
monFileInTrash(ref.Namespace).Mark(1)
}
} else {
if !os.IsNotExist(err) {
return nil, FormatV0, Error.New("unable to open %q: %v", vPath, err)
}
}
Expand All @@ -284,10 +267,6 @@ func (dir *Dir) OpenWithStorageFormat(ctx context.Context, ref blobstore.BlobRef
return file, nil
}
if os.IsNotExist(err) {
// Check and monitor if the file is in the trash
if dir.fileConfirmedInTrash(ctx, ref, formatVer) {
monFileInTrash(ref.Namespace).Mark(1)
}
return nil, err
}
return nil, Error.New("unable to open %q: %v", vPath, err)
Expand Down
6 changes: 5 additions & 1 deletion storagenode/blobstore/filestore/store.go
Expand Up @@ -36,7 +36,11 @@ var (
_ blobstore.Blobs = (*blobStore)(nil)
)

func monFileInTrash(namespace []byte) *monkit.Meter {
// MonFileInTrash returns a monkit meter which counts the times a requested blob is
// found in the trash. It is exported so that it can be activated from outside this
// package for backwards compatibility. (It is no longer activated from inside this
// package.)
func MonFileInTrash(namespace []byte) *monkit.Meter {
return monStorage.Meter("open_file_in_trash", monkit.NewSeriesTag("namespace", hex.EncodeToString(namespace))) //mon:locked
}

Expand Down
10 changes: 4 additions & 6 deletions storagenode/piecestore/endpoint.go
Expand Up @@ -34,6 +34,7 @@ import (
"storj.io/drpc"
"storj.io/drpc/drpcctx"
"storj.io/storj/storagenode/bandwidth"
"storj.io/storj/storagenode/blobstore/filestore"
"storj.io/storj/storagenode/monitor"
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/orders/ordersfile"
Expand Down Expand Up @@ -680,16 +681,13 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
// continue serving the download request.
tryRestoreErr := endpoint.store.TryRestoreTrashPiece(ctx, limit.SatelliteId, limit.PieceId)
if tryRestoreErr != nil {
if !errs.Is(tryRestoreErr, fs.ErrNotExist) {
// file is not in trash, and we don't want to return a file rename error,
// so we return the original "file does not exist" error
tryRestoreErr = err
}
endpoint.monitor.VerifyDirReadableLoop.TriggerWait()
return rpcstatus.Wrap(rpcstatus.NotFound, tryRestoreErr)
// we want to return the original "file does not exist" error to the rpc client
return rpcstatus.Wrap(rpcstatus.NotFound, err)
}
restoredFromTrash = true
mon.Meter("download_file_in_trash", monkit.NewSeriesTag("namespace", limit.SatelliteId.String()), monkit.NewSeriesTag("piece_id", limit.PieceId.String())).Mark(1)
filestore.MonFileInTrash(limit.SatelliteId[:]).Mark(1)
log.Warn("file found in trash")

// try to open the file again
Expand Down

0 comments on commit 121de29

Please sign in to comment.