Skip to content

Commit

Permalink
storagenode/retain: store bloomfilter on disk
Browse files Browse the repository at this point in the history
The filewalker generally is very likely to never complete
a full scan of all the satellite blobs directories on very
large nodes.
For GC, this leaves a lot of trash files on the node, instead of
freeing up the space for more uploads. And when the node is restarted
the bloomfilter is lost, and the node would have to wait for
the satellite to send another bloomfilter.

In this patch, we store the bloomfilter on the disk for each request
and load them on restart and add them to the retain queue so
GC filewalker can process them.

Updates #6725

Change-Id: I6455473470b22d128ebed3bbf46e76f2538e0485
  • Loading branch information
profclems authored and Storj Robot committed Mar 20, 2024
1 parent f4602e6 commit d62e81b
Show file tree
Hide file tree
Showing 8 changed files with 567 additions and 27 deletions.
1 change: 1 addition & 0 deletions private/testplanet/storagenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (planet *Planet) newStorageNode(ctx context.Context, prefix string, index,
MaxTimeSkew: 10 * time.Second,
Status: retain.Enabled,
Concurrency: 5,
CachePath: filepath.Join(planet.directory, "retain"),
},
Version: version.Config{
Config: planet.NewVersionConfig(),
Expand Down
25 changes: 13 additions & 12 deletions storagenode/blobstore/filestore/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ const (
TrashUsesDayDirsIndicator = ".trash-uses-day-dirs-indicator"
)

var pathEncoding = base32.NewEncoding("abcdefghijklmnopqrstuvwxyz234567").WithPadding(base32.NoPadding)
// PathEncoding is the encoding used for the namespace and key in the filestore.
var PathEncoding = base32.NewEncoding("abcdefghijklmnopqrstuvwxyz234567").WithPadding(base32.NoPadding)

// Dir represents single folder for storing blobs.
type Dir struct {
Expand Down Expand Up @@ -125,7 +126,7 @@ func (dir *Dir) trashdir() string { return filepath.Join(dir.path, "trash") }

// trashPath returns the toplevel trash directory for the given namespace and timestamp.
func (dir *Dir) trashPath(namespace []byte, forTime time.Time) string {
namespaceStr := pathEncoding.EncodeToString(namespace)
namespaceStr := PathEncoding.EncodeToString(namespace)
dayDirName := forTime.UTC().Format("2006-01-02")
return filepath.Join(dir.trashdir(), namespaceStr, dayDirName)
}
Expand All @@ -136,7 +137,7 @@ func (dir *Dir) refToTrashPath(ref blobstore.BlobRef, forTime time.Time) (string
return "", blobstore.ErrInvalidBlobRef.New("")
}

key := pathEncoding.EncodeToString(ref.Key)
key := PathEncoding.EncodeToString(ref.Key)
if len(key) < 3 {
// ensure we always have enough characters to split [:2] and [2:]
key = "11" + key
Expand Down Expand Up @@ -217,8 +218,8 @@ func (dir *Dir) refToDirPath(ref blobstore.BlobRef, subDir string) (string, erro
return "", blobstore.ErrInvalidBlobRef.New("")
}

namespace := pathEncoding.EncodeToString(ref.Namespace)
key := pathEncoding.EncodeToString(ref.Key)
namespace := PathEncoding.EncodeToString(ref.Namespace)
key := PathEncoding.EncodeToString(ref.Key)
if len(key) < 3 {
// ensure we always have enough characters to split [:2] and [2:]
key = "11" + key
Expand Down Expand Up @@ -545,7 +546,7 @@ func (dir *Dir) DeleteTrashNamespace(ctx context.Context, namespace []byte) (err
return nil
})
errorsEncountered.Add(err)
namespaceEncoded := pathEncoding.EncodeToString(namespace)
namespaceEncoded := PathEncoding.EncodeToString(namespace)
namespaceTrashDir := filepath.Join(dir.trashdir(), namespaceEncoded)
err = removeButIgnoreIfNotExist(namespaceTrashDir)
errorsEncountered.Add(err)
Expand Down Expand Up @@ -587,7 +588,7 @@ func (dir *Dir) walkTrashDayDir(ctx context.Context, namespace []byte, dirTime t
}

func (dir *Dir) listTrashDayDirs(ctx context.Context, namespace []byte) (dirTimes []time.Time, err error) {
namespaceEncoded := pathEncoding.EncodeToString(namespace)
namespaceEncoded := PathEncoding.EncodeToString(namespace)
namespaceTrashDir := filepath.Join(dir.trashdir(), namespaceEncoded)
openDir, err := os.Open(namespaceTrashDir)
if err != nil {
Expand Down Expand Up @@ -752,7 +753,7 @@ func (dir *Dir) deleteWithStorageFormatInPath(ctx context.Context, path string,
func (dir *Dir) deleteNamespace(ctx context.Context, path string, ref []byte) (err error) {
defer mon.Task()(&ctx)(&err)

namespace := pathEncoding.EncodeToString(ref)
namespace := PathEncoding.EncodeToString(ref)
folderPath := filepath.Join(path, namespace)

err = os.RemoveAll(folderPath)
Expand Down Expand Up @@ -794,7 +795,7 @@ func (dir *Dir) listNamespacesInPath(ctx context.Context, path string) (ids [][]
return ids, nil
}
for _, name := range dirNames {
namespace, err := pathEncoding.DecodeString(name)
namespace, err := PathEncoding.DecodeString(name)
if err != nil {
// just an invalid directory entry, and not a namespace. probably
// don't need to pass on this error
Expand All @@ -816,7 +817,7 @@ func (dir *Dir) WalkNamespace(ctx context.Context, namespace []byte, walkFunc fu

func (dir *Dir) walkNamespaceInPath(ctx context.Context, namespace []byte, path string, walkFunc func(blobstore.BlobInfo) error) (err error) {
defer mon.Task()(&ctx)(&err)
namespaceDir := pathEncoding.EncodeToString(namespace)
namespaceDir := PathEncoding.EncodeToString(namespace)
nsDir := filepath.Join(path, namespaceDir)
return dir.walkNamespaceUnderPath(ctx, namespace, nsDir, walkFunc)
}
Expand Down Expand Up @@ -887,7 +888,7 @@ func (dir *Dir) migrateTrashToPerDayDirs(now time.Time) (err error) {

namespaces, err := dir.listNamespacesInTrash(context.Background())
for _, ns := range namespaces {
nsEncoded := pathEncoding.EncodeToString(ns)
nsEncoded := PathEncoding.EncodeToString(ns)
todayDirName := now.Format("2006-01-02")
nsPath := filepath.Join(dir.trashdir(), nsEncoded)
tempTodayDirPath := filepath.Join(dir.trashdir(), nsEncoded+"-"+todayDirName)
Expand Down Expand Up @@ -919,7 +920,7 @@ func decodeBlobInfo(namespace []byte, keyPrefix, keyDir, name string) (info blob
}
// in case we prepended '1' chars because the key was too short (1 is an invalid char in base32)
encodedKey = strings.TrimLeft(encodedKey, "1")
key, err := pathEncoding.DecodeString(encodedKey)
key, err := PathEncoding.DecodeString(encodedKey)
if err != nil {
return nil, false
}
Expand Down
8 changes: 4 additions & 4 deletions storagenode/blobstore/filestore/dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ func TestMigrateTrash(t *testing.T) {
require.NoError(t, os.Mkdir(filepath.Join(storeDir, "temp"), dirPermission))

for n, namespace := range namespaces {
namespaceStr := pathEncoding.EncodeToString(namespace)
namespaceStr := PathEncoding.EncodeToString(namespace)
for k, key := range keys[n] {
keyStr := pathEncoding.EncodeToString(key)
keyStr := PathEncoding.EncodeToString(key)
storageDir := filepath.Join(trashDir, namespaceStr, keyStr[:2])
require.NoError(t, os.MkdirAll(storageDir, 0700))
require.NoError(t, os.WriteFile(filepath.Join(storageDir, keyStr[2:]+".sj1"), data[n][k], 0600))
Expand All @@ -108,10 +108,10 @@ func TestMigrateTrash(t *testing.T) {
// expect that everything has been migrated and all pre-existing trash has been
// put into a day dir.
for n, namespace := range namespaces {
namespaceStr := pathEncoding.EncodeToString(namespace)
namespaceStr := PathEncoding.EncodeToString(namespace)
expectedDayDir := filepath.Join(trashDir, namespaceStr, trashTime.Format("2006-01-02"))
for k, key := range keys[n] {
keyStr := pathEncoding.EncodeToString(key)
keyStr := PathEncoding.EncodeToString(key)
storageDir := filepath.Join(expectedDayDir, keyStr[:2])
contents, err := os.ReadFile(filepath.Join(storageDir, keyStr[2:]+".sj1"))
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions storagenode/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.Storage2.Store,
config.Retain,
)

peer.Services.Add(lifecycle.Item{
Name: "retain",
Run: peer.Storage2.RetainService.Run,
Expand Down
67 changes: 56 additions & 11 deletions storagenode/retain/retain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package retain

import (
"context"
"fmt"
"strconv"
"sync"
"time"

Expand All @@ -15,6 +17,7 @@ import (

"storj.io/common/bloomfilter"
"storj.io/common/storj"
"storj.io/storj/storagenode/blobstore/filestore"
"storj.io/storj/storagenode/pieces"
)

Expand All @@ -30,6 +33,7 @@ type Config struct {
MaxTimeSkew time.Duration `help:"allows for small differences in the satellite and storagenode clocks" default:"72h0m0s"`
Status Status `help:"allows configuration to enable, disable, or test retain requests from the satellite. Options: (disabled/enabled/debug)" default:"enabled"`
Concurrency int `help:"how many concurrent retain requests can be processed at the same time." default:"5"`
CachePath string `help:"path to the cache directory for retain requests." default:"$CONFDIR/retain"`
}

// Request contains all the info necessary to process a retain request.
Expand All @@ -39,6 +43,30 @@ type Request struct {
Filter *bloomfilter.Filter
}

// Filename returns the filename used to store the request in the cache directory.
func (req *Request) Filename() string {
return fmt.Sprintf("%s-%s",
filestore.PathEncoding.EncodeToString(req.SatelliteID.Bytes()),
strconv.FormatInt(req.CreatedBefore.UnixNano(), 10),
)
}

// Queue manages the retain requests queue.
type Queue interface {
// Add adds a request to the queue.
Add(request Request) (bool, error)
// Remove removes a request from the queue.
// Returns true if there was a request to remove.
Remove(request Request) bool
// Next returns the next request from the queue.
Next() (Request, bool)
// Len returns the number of requests in the queue.
Len() int
// DeleteCache removes the request from the queue and deletes the cache file.
DeleteCache(request Request) error
// MarkInProgress marks the request as in progress.
}

// Status is a type defining the enabled/disabled status of retain requests.
type Status uint32

Expand Down Expand Up @@ -91,7 +119,7 @@ type Service struct {
config Config

cond sync.Cond
queued map[storj.NodeID]Request
queue Queue
working map[storj.NodeID]struct{}
group errgroup.Group

Expand All @@ -104,12 +132,18 @@ type Service struct {

// NewService creates a new retain service.
func NewService(log *zap.Logger, store *pieces.Store, config Config) *Service {
log = log.With(zap.String("cachePath", config.CachePath))
cache, err := NewRequestStore(config.CachePath)
if err != nil {
log.Warn("encountered error(s) while loading cache", zap.Error(err))
}

return &Service{
log: log,
config: config,

cond: *sync.NewCond(&sync.Mutex{}),
queued: make(map[storj.NodeID]Request),
queue: &cache,
working: make(map[storj.NodeID]struct{}),
closed: make(chan struct{}),

Expand All @@ -129,10 +163,14 @@ func (s *Service) Queue(req Request) bool {
default:
}

s.queued[req.SatelliteID] = req
ok, err := s.queue.Add(req)
if err != nil {
s.log.Warn("encountered an error while adding request to queue", zap.Error(err), zap.Bool("Queued", ok), zap.Stringer("Satellite ID", req.SatelliteID))
}

s.cond.Broadcast()

return true
return ok
}

// Run listens for queued retain requests and processes them as they come in.
Expand Down Expand Up @@ -239,31 +277,38 @@ func (s *Service) Run(ctx context.Context) (err error) {
// Clear the queue after Wait has exited. We're sure no more entries
// can be added after we acquire the mutex because wait spawned a
// worker that ensures the closed channel is closed before it exits.
s.queued = nil
s.queue = nil
s.cond.Broadcast()

return err
}

// next returns next item from queue, requires mutex to be held.
func (s *Service) next() (Request, bool) {
for id, request := range s.queued {
for {
request, ok := s.queue.Next()
if !ok {
return Request{}, false
}
// Check whether a worker is retaining this satellite,
// if, yes, then try to get something else from the queue.
if _, ok := s.working[request.SatelliteID]; ok {
continue
}
delete(s.queued, id)
// Mark this satellite as being worked on.
s.working[request.SatelliteID] = struct{}{}
s.queue.Remove(request)
return request, true
}
return Request{}, false
}

// finish marks the request as finished, requires mutex to be held.
// finish marks the request as finished and removes the cache, requires mutex to be held.
func (s *Service) finish(request Request) {
delete(s.working, request.SatelliteID)
err := s.queue.DeleteCache(request)
if err != nil {
s.log.Warn("encountered an error while removing request from queue", zap.Error(err), zap.Stringer("Satellite ID", request.SatelliteID))
}
}

// Close causes any pending Run to exit and waits for any retain requests to
Expand All @@ -285,7 +330,7 @@ func (s *Service) TestWaitUntilEmpty() {
s.cond.L.Lock()
defer s.cond.L.Unlock()

for len(s.queued) > 0 || len(s.working) > 0 {
for s.queue.Len() > 0 || len(s.working) > 0 {
s.cond.Wait()
}
}
Expand Down Expand Up @@ -372,5 +417,5 @@ func (s *Service) trash(ctx context.Context, satelliteID storj.NodeID, pieceID s
func (s *Service) TestingHowManyQueued() int {
s.cond.L.Lock()
defer s.cond.L.Unlock()
return len(s.queued)
return s.queue.Len()
}

0 comments on commit d62e81b

Please sign in to comment.