Skip to content

Commit

Permalink
storagenode/{pieces,blobstore}: save-state-resume feature for GC file…
Browse files Browse the repository at this point in the history
…walker

The bloomfilter is now stored on the disk from this change
https://review.dev.storj.io/c/storj/storj/+/12651, so we don't lose
the bloomfilter on restart. But that means that when the bloomfilter
on the disk is loaded and added to the retain queue, it will start
scanning the directories all over again. This will continuously be the
behavior till the node is able to complete a full scan which might never
happen on a large node.

In this patch, we make sure the node can resume from where it left off.
We read the two-letter directory names (or prefixes) from the satellite
blobs folder and sort them alphabetically before scanning.
During the scan, the last scanned prefix is stored in the db.

Updates #6708

Change-Id: Icb32cdc7dd49ef8ce44f6d771e4e33045078ed55
  • Loading branch information
profclems committed Apr 2, 2024
1 parent e4191b6 commit 0f90f06
Show file tree
Hide file tree
Showing 22 changed files with 276 additions and 77 deletions.
2 changes: 1 addition & 1 deletion cmd/storagenode/internalcmd/gc_filewalker.go
Expand Up @@ -99,7 +99,7 @@ func gcCmdRun(g *RunOptions) (err error) {

log.Info("gc-filewalker started", zap.Time("createdBefore", req.CreatedBefore), zap.Int("bloomFilterSize", len(req.BloomFilter)))

filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo())
filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo(), db.GCFilewalkerProgress())
numTrashed := 0
pieceIDs, piecesCount, piecesSkippedCount, err := filewalker.WalkSatellitePiecesToTrash(g.Ctx, req.SatelliteID, req.CreatedBefore, filter, func(pieceID storj.PieceID) error {
log.Debug("found a trash piece", zap.Stringer("pieceID", pieceID))
Expand Down
2 changes: 1 addition & 1 deletion cmd/storagenode/internalcmd/trash_filewalker.go
Expand Up @@ -89,7 +89,7 @@ func trashCmdRun(opts *RunOptions) (err error) {
err = errs.Combine(err, db.Close())
}()

filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo())
filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo(), db.GCFilewalkerProgress())
bytesDeleted, keysDeleted, err := filewalker.WalkCleanupTrash(opts.Ctx, req.SatelliteID, req.DateBefore)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/storagenode/internalcmd/used_space_filewalker.go
Expand Up @@ -86,7 +86,7 @@ func usedSpaceCmdRun(opts *RunOptions) (err error) {

log.Info("used-space-filewalker started")

filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo())
filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo(), db.GCFilewalkerProgress())
total, contentSize, err := filewalker.WalkAndComputeSpaceUsedBySatellite(opts.Ctx, req.SatelliteID)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/tools/segment-verify/main_test.go
Expand Up @@ -238,7 +238,7 @@ func TestCommandLineTool(t *testing.T) {

func deletePiecesRandomly(ctx context.Context, satelliteID storj.NodeID, node *testplanet.StorageNode, rate float64) (deletedPieces map[storj.PieceID]struct{}, err error) {
deletedPieces = make(map[storj.PieceID]struct{})
err = node.Storage2.FileWalker.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) error {
err = node.Storage2.FileWalker.WalkSatellitePieces(ctx, satelliteID, "", func(access pieces.StoredPieceAccess) error {
if rand.Float64() < rate {
path, err := access.FullPath(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion storagenode/blobstore/blob.go
Expand Up @@ -119,7 +119,7 @@ type Blobs interface {
// storage format V1 or greater, in the given namespace. If walkFunc returns a non-nil
// error, WalkNamespace will stop iterating and return the error immediately. The ctx
// parameter is intended to allow canceling iteration early.
WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(BlobInfo) error) error
WalkNamespace(ctx context.Context, namespace []byte, startFromPrefix string, walkFunc func(BlobInfo) error) error

// CheckWritability tests writability of the storage directory by creating and deleting a file.
CheckWritability(ctx context.Context) error
Expand Down
95 changes: 72 additions & 23 deletions storagenode/blobstore/filestore/dir.go
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/exp/slices"

"storj.io/common/experiment"
"storj.io/common/storj"
Expand Down Expand Up @@ -584,7 +585,7 @@ func (dir *Dir) forEachTrashDayDir(ctx context.Context, namespace []byte, f func

func (dir *Dir) walkTrashDayDir(ctx context.Context, namespace []byte, dirTime time.Time, f func(info blobstore.BlobInfo) error) (err error) {
trashPath := dir.trashPath(namespace, dirTime)
return dir.walkNamespaceUnderPath(ctx, namespace, trashPath, f)
return dir.walkNamespaceUnderPath(ctx, namespace, trashPath, "", f)
}

func (dir *Dir) listTrashDayDirs(ctx context.Context, namespace []byte) (dirTimes []time.Time, err error) {
Expand Down Expand Up @@ -810,19 +811,19 @@ func (dir *Dir) listNamespacesInPath(ctx context.Context, path string) (ids [][]
// greater, in the given namespace. If walkFunc returns a non-nil error, WalkNamespace will stop
// iterating and return the error immediately. The ctx parameter is intended specifically to allow
// canceling iteration early.
func (dir *Dir) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(blobstore.BlobInfo) error) (err error) {
func (dir *Dir) WalkNamespace(ctx context.Context, namespace []byte, startFromPrefix string, walkFunc func(blobstore.BlobInfo) error) (err error) {
defer mon.Task()(&ctx)(&err)
return dir.walkNamespaceInPath(ctx, namespace, dir.blobsdir(), walkFunc)
return dir.walkNamespaceInPath(ctx, namespace, dir.blobsdir(), startFromPrefix, walkFunc)
}

func (dir *Dir) walkNamespaceInPath(ctx context.Context, namespace []byte, path string, walkFunc func(blobstore.BlobInfo) error) (err error) {
func (dir *Dir) walkNamespaceInPath(ctx context.Context, namespace []byte, path, startPrefix string, walkFunc func(blobstore.BlobInfo) error) (err error) {
defer mon.Task()(&ctx)(&err)
namespaceDir := PathEncoding.EncodeToString(namespace)
nsDir := filepath.Join(path, namespaceDir)
return dir.walkNamespaceUnderPath(ctx, namespace, nsDir, walkFunc)
return dir.walkNamespaceUnderPath(ctx, namespace, nsDir, startPrefix, walkFunc)
}

func (dir *Dir) walkNamespaceUnderPath(ctx context.Context, namespace []byte, nsDir string, walkFunc func(blobstore.BlobInfo) error) (err error) {
func (dir *Dir) walkNamespaceUnderPath(ctx context.Context, namespace []byte, nsDir, startPrefix string, walkFunc func(blobstore.BlobInfo) error) (err error) {
openDir, err := os.Open(nsDir)
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -833,36 +834,54 @@ func (dir *Dir) walkNamespaceUnderPath(ctx context.Context, namespace []byte, ns
return err
}
defer func() { err = errs.Combine(err, openDir.Close()) }()

var subdirNames []string
for {
// check for context done both before and after our readdir() call
if err := ctx.Err(); err != nil {
return err
}
subdirNames, err := openDir.Readdirnames(nameBatchSize)
names, err := openDir.Readdirnames(nameBatchSize)
if err != nil {
if errors.Is(err, io.EOF) || os.IsNotExist(err) {
return nil
break
}
return err
}
if len(subdirNames) == 0 {
if len(names) == 0 {
return nil
}
if err := ctx.Err(); err != nil {
return err
}
for _, keyPrefix := range subdirNames {
if len(keyPrefix) != 2 {
// just an invalid subdir; could be garbage of many kinds. probably
// don't need to pass on this error
continue
}
err := walkNamespaceWithPrefix(ctx, dir.log, namespace, nsDir, keyPrefix, walkFunc)
if err != nil {
return err
}

subdirNames = append(subdirNames, names...)
}

dir.log.Debug("number of subdirs", zap.Int("count", len(subdirNames)))

// sort the dir names, so we can start from the startPrefix
sortPrefixes(subdirNames)

// just a little optimization: if we were somehow given a startPrefix that
// is the last subdir, we can skip the rest and just start from there.
if startPrefix != "" && subdirNames[len(subdirNames)-1] == startPrefix {
subdirNames = subdirNames[:len(subdirNames)-1]
}

for _, keyPrefix := range subdirNames {
if len(keyPrefix) != 2 {
// just an invalid subdir; could be garbage of many kinds. probably
// don't need to pass on this error
continue
}

if startPrefix != "" && startPrefix > keyPrefix {
continue
}
err := walkNamespaceWithPrefix(ctx, dir.log, namespace, nsDir, keyPrefix, walkFunc)
if err != nil {
return err
}
}

return nil
}

// migrateTrashToPerDayDirs migrates a trash directory that is _not_ using per-day directories
Expand Down Expand Up @@ -1050,3 +1069,33 @@ func (cde CorruptDataError) Path() string {
func (cde CorruptDataError) Error() string {
return fmt.Sprintf("unrecoverable error accessing data on the storage file system (path=%v; error=%v). This is most likely due to disk bad sectors or a corrupted file system. Check your disk for bad sectors and integrity", cde.path, cde.error)
}

// sortPrefixes sorts the given prefixes in a way that it puts a-z before 0-9.
func sortPrefixes(prefixes []string) {
slices.SortStableFunc(prefixes, func(a, b string) int {
if a[0] == b[0] {
if isDigit(a[1]) && isLetter(b[1]) {
return 1 // a (numeric) comes after b (alphabet)
}
if isLetter(a[1]) && isDigit(b[1]) {
return -1 // a (alphabet) comes before b (numeric)
}
}
if isDigit(a[0]) && isLetter(b[0]) {
return 1 // a (numeric) comes after b (alphabet)
}
if isLetter(a[0]) && isDigit(b[0]) {
return -1 // a (alphabet) comes before b (numeric)
}
// Default behavior: compare strings lexicographically
return strings.Compare(a, b)
})
}

func isDigit(r byte) bool {
return '0' <= r && r <= '9'
}

func isLetter(r byte) bool {
return ('a' <= r && r <= 'z') || ('A' <= r && r <= 'Z')
}
97 changes: 97 additions & 0 deletions storagenode/blobstore/filestore/dir_test.go
Expand Up @@ -6,6 +6,7 @@ package filestore
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"os"
Expand All @@ -15,6 +16,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/exp/slices"

Expand Down Expand Up @@ -355,3 +357,98 @@ func writeTestBlob(ctx context.Context, t *testing.T, dir *Dir, ref blobstore.Bl
err = dir.Commit(ctx, f, ref, FormatV1)
require.NoError(t, err)
}

func BenchmarkDir_WalkNamespace(b *testing.B) {
dir, err := NewDir(zap.NewNop(), b.TempDir())
require.NoError(b, err)

ctx := testcontext.New(b)

satelliteID := testrand.NodeID()
for i := uint16(0); i < 32*32; i++ {
keyPrefix := numToBase32Prefix(i)
namespace := PathEncoding.EncodeToString(satelliteID.Bytes())
require.NoError(b, os.MkdirAll(filepath.Join(dir.blobsdir(), namespace, keyPrefix), 0700))
}
b.Run("1024-prefixes", func(b *testing.B) {
for i := 0; i < b.N; i++ {
err := dir.WalkNamespace(ctx, satelliteID.Bytes(), "", func(ref blobstore.BlobInfo) error {
return nil
})
require.NoError(b, err)
}
})

satelliteID2 := testrand.NodeID()
for i := uint16(0); i < 32*2; i++ {
keyPrefix := numToBase32Prefix(i)
namespace := PathEncoding.EncodeToString(satelliteID2.Bytes())
require.NoError(b, os.MkdirAll(filepath.Join(dir.blobsdir(), namespace, keyPrefix), 0700))
}
b.Run("64-prefixes", func(b *testing.B) {
for i := 0; i < b.N; i++ {
err := dir.WalkNamespace(ctx, satelliteID2.Bytes(), "", func(ref blobstore.BlobInfo) error {
return nil
})
require.NoError(b, err)
}
})

satelliteID3 := testrand.NodeID()
for i := uint16(0); i < 32*16; i++ {
keyPrefix := numToBase32Prefix(i)
namespace := PathEncoding.EncodeToString(satelliteID3.Bytes())
require.NoError(b, os.MkdirAll(filepath.Join(dir.blobsdir(), namespace, keyPrefix), 0700))
}
b.Run("512-prefixes", func(b *testing.B) {
for i := 0; i < b.N; i++ {
err := dir.WalkNamespace(ctx, satelliteID3.Bytes(), "", func(ref blobstore.BlobInfo) error {
return nil
})
require.NoError(b, err)
}
})
}

func Test_sortPrefixes(t *testing.T) {
var str []string

for i := uint16(0); i < 32*32; i++ {
keyPrefix := numToBase32Prefix(i)
str = append(str, keyPrefix)
}

type test struct {
name string
prefixes []string
expected []string
}

tests := []test{
{
name: "1024 prefixes sorted",
prefixes: str,
expected: str,
},
{
name: "unordered prefixes",
prefixes: []string{"77", "a2", "3z", "an", "b2", "a6", "b7", "aa", "7a", "23"},
expected: []string{"aa", "an", "a2", "a6", "b2", "b7", "23", "3z", "7a", "77"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sortPrefixes(tt.prefixes)
assert.Equal(t, tt.expected, tt.prefixes)
})
}
}

// numToBase32Prefix gives the two character base32 prefix corresponding to the given
// 10-bit number.
func numToBase32Prefix(n uint16) string {
var b [2]byte
binary.BigEndian.PutUint16(b[:], n<<6)
return PathEncoding.EncodeToString(b[:])[:2]
}
6 changes: 3 additions & 3 deletions storagenode/blobstore/filestore/store.go
Expand Up @@ -222,7 +222,7 @@ func (store *blobStore) SpaceUsedForBlobs(ctx context.Context) (space int64, err
// SpaceUsedForBlobsInNamespace adds up how much is used in the given namespace for blob storage.
func (store *blobStore) SpaceUsedForBlobsInNamespace(ctx context.Context, namespace []byte) (int64, error) {
var totalUsed int64
err := store.WalkNamespace(ctx, namespace, func(info blobstore.BlobInfo) error {
err := store.WalkNamespace(ctx, namespace, "", func(info blobstore.BlobInfo) error {
statInfo, statErr := info.Stat(ctx)
if statErr != nil {
store.log.Error("failed to stat blob", zap.Binary("namespace", namespace), zap.Binary("key", info.BlobRef().Key), zap.Error(statErr))
Expand Down Expand Up @@ -314,8 +314,8 @@ func (store *blobStore) ListNamespaces(ctx context.Context) (ids [][]byte, err e
// WalkNamespace executes walkFunc for each locally stored blob in the given namespace. If walkFunc
// returns a non-nil error, WalkNamespace will stop iterating and return the error immediately. The
// ctx parameter is intended specifically to allow canceling iteration early.
func (store *blobStore) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(blobstore.BlobInfo) error) (err error) {
return store.dir.WalkNamespace(ctx, namespace, walkFunc)
func (store *blobStore) WalkNamespace(ctx context.Context, namespace []byte, startFromPrefix string, walkFunc func(blobstore.BlobInfo) error) (err error) {
return store.dir.WalkNamespace(ctx, namespace, startFromPrefix, walkFunc)
}

// walkNamespaceInTrash executes walkFunc for each blob stored in the trash under the given
Expand Down
6 changes: 3 additions & 3 deletions storagenode/blobstore/filestore/store_test.go
Expand Up @@ -468,7 +468,7 @@ func TestStoreTraversals(t *testing.T) {
// keep track of which blobs we visit with WalkNamespace
found := make([]bool, len(expected.blobs))

err = store.WalkNamespace(ctx, expected.namespace, func(info blobstore.BlobInfo) error {
err = store.WalkNamespace(ctx, expected.namespace, "", func(info blobstore.BlobInfo) error {
gotBlobRef := info.BlobRef()
assert.Equal(t, expected.namespace, gotBlobRef.Namespace)
// find which blob this is in expected.blobs
Expand Down Expand Up @@ -508,7 +508,7 @@ func TestStoreTraversals(t *testing.T) {

// test WalkNamespace on a nonexistent namespace also
namespaceBase[len(namespaceBase)-1] = byte(numNamespaces)
err = store.WalkNamespace(ctx, namespaceBase, func(_ blobstore.BlobInfo) error {
err = store.WalkNamespace(ctx, namespaceBase, "", func(_ blobstore.BlobInfo) error {
t.Fatal("this should not have been called")
return nil
})
Expand All @@ -517,7 +517,7 @@ func TestStoreTraversals(t *testing.T) {
// check that WalkNamespace stops iterating after an error return
iterations := 0
expectedErr := errs.New("an expected error")
err = store.WalkNamespace(ctx, recordsToInsert[numNamespaces-1].namespace, func(_ blobstore.BlobInfo) error {
err = store.WalkNamespace(ctx, recordsToInsert[numNamespaces-1].namespace, "", func(_ blobstore.BlobInfo) error {
iterations++
if iterations == 2 {
return expectedErr
Expand Down
4 changes: 2 additions & 2 deletions storagenode/blobstore/testblobs/bad.go
Expand Up @@ -221,11 +221,11 @@ func (bad *BadBlobs) StatWithStorageFormat(ctx context.Context, ref blobstore.Bl
// WalkNamespace executes walkFunc for each locally stored blob in the given namespace.
// If walkFunc returns a non-nil error, WalkNamespace will stop iterating and return the
// error immediately.
func (bad *BadBlobs) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(blobstore.BlobInfo) error) error {
func (bad *BadBlobs) WalkNamespace(ctx context.Context, namespace []byte, startFromPrefix string, walkFunc func(blobstore.BlobInfo) error) error {
if err := bad.err.Err(); err != nil {
return err
}
return bad.blobs.WalkNamespace(ctx, namespace, walkFunc)
return bad.blobs.WalkNamespace(ctx, namespace, startFromPrefix, walkFunc)
}

// ListNamespaces returns all namespaces that might be storing data.
Expand Down
4 changes: 2 additions & 2 deletions storagenode/blobstore/testblobs/slow.go
Expand Up @@ -176,11 +176,11 @@ func (slow *SlowBlobs) TryRestoreTrashBlob(ctx context.Context, ref blobstore.Bl
// WalkNamespace executes walkFunc for each locally stored blob in the given namespace.
// If walkFunc returns a non-nil error, WalkNamespace will stop iterating and return the
// error immediately.
func (slow *SlowBlobs) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(blobstore.BlobInfo) error) error {
func (slow *SlowBlobs) WalkNamespace(ctx context.Context, namespace []byte, startFromPrefix string, walkFunc func(blobstore.BlobInfo) error) error {
if err := slow.sleep(ctx); err != nil {
return errs.Wrap(err)
}
return slow.blobs.WalkNamespace(ctx, namespace, walkFunc)
return slow.blobs.WalkNamespace(ctx, namespace, startFromPrefix, walkFunc)
}

// ListNamespaces returns all namespaces that might be storing data.
Expand Down
2 changes: 1 addition & 1 deletion storagenode/peer.go
Expand Up @@ -481,7 +481,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten

{ // setup storage
peer.Storage2.BlobsCache = pieces.NewBlobsUsageCache(process.NamedLog(log, "blobscache"), peer.DB.Pieces())
peer.Storage2.FileWalker = pieces.NewFileWalker(process.NamedLog(log, "filewalker"), peer.Storage2.BlobsCache, peer.DB.V0PieceInfo())
peer.Storage2.FileWalker = pieces.NewFileWalker(process.NamedLog(log, "filewalker"), peer.Storage2.BlobsCache, peer.DB.V0PieceInfo(), peer.DB.GCFilewalkerProgress())

if config.Pieces.EnableLazyFilewalker {
executable, err := os.Executable()
Expand Down

3 comments on commit 0f90f06

@storjrobot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Storj Community Forum (official). There might be relevant details there:

https://forum.storj.io/t/current-situation-with-garbage-collection/25711/191

@storjrobot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Storj Community Forum (official). There might be relevant details there:

https://forum.storj.io/t/it-seems-that-the-new-feature-save-state-resume-gc-filewalker-isn-t-functioning-as-expected/25874/13

@storjrobot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Storj Community Forum (official). There might be relevant details there:

https://forum.storj.io/t/storagenode-operational-updates/25843/1

Please sign in to comment.