From 5498424e1f68d423a4a740e1c0535b415157110b Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Thu, 6 Apr 2023 12:53:25 +0300 Subject: [PATCH] storage: implement Range By implementing a simpler Range, we can get rid of many methods defined for storage.KeyValueStore. Initially KeyValueStore was used to manage objects, hence it required much more complex behaviour. However, we've moved away from this implementation, hence we can significantly simplify everything. Change-Id: Iaaa8c9c83d8601ad3c894767f17c0f5b356f7253 --- certificate/authorization/db.go | 43 +++++++++++------------- private/revocation/db.go | 21 ++++-------- storage/boltdb/client.go | 11 ++++++ storage/common.go | 3 ++ storage/redis/client.go | 29 ++++++++++++++++ storage/storelogger/logger.go | 14 ++++++++ storage/teststore/store.go | 20 +++++++++++ storage/testsuite/test.go | 1 + storage/testsuite/test_range.go | 59 +++++++++++++++++++++++++++++++++ 9 files changed, 163 insertions(+), 38 deletions(-) create mode 100644 storage/testsuite/test_range.go diff --git a/certificate/authorization/db.go b/certificate/authorization/db.go index 540b2efd64bf..4bfc6ada5a46 100644 --- a/certificate/authorization/db.go +++ b/certificate/authorization/db.go @@ -148,36 +148,33 @@ func (authDB *DB) Get(ctx context.Context, userID string) (_ Group, err error) { // UserIDs returns a list of all userIDs present in the authorization database. func (authDB *DB) UserIDs(ctx context.Context) (userIDs []string, err error) { defer mon.Task()(&ctx)(&err) - err = authDB.db.Iterate(ctx, storage.IterateOptions{ - Recurse: true, - }, func(ctx context.Context, iterator storage.Iterator) error { - var listItem storage.ListItem - for iterator.Next(ctx, &listItem) { - userIDs = append(userIDs, listItem.Key.String()) - } - return nil - }) + + err = authDB.db.Range(ctx, + func(ctx context.Context, key storage.Key, _ storage.Value) error { + userIDs = append(userIDs, key.String()) + return nil + }) return userIDs, ErrDBInternal.Wrap(err) } // List returns all authorizations in the database. func (authDB *DB) List(ctx context.Context) (auths Group, err error) { defer mon.Task()(&ctx)(&err) - err = authDB.db.Iterate(ctx, storage.IterateOptions{ - Recurse: true, - }, func(ctx context.Context, iterator storage.Iterator) error { - var listErrs errs.Group - var listItem storage.ListItem - for iterator.Next(ctx, &listItem) { - var nextAuths Group - if err := nextAuths.Unmarshal(listItem.Value); err != nil { - listErrs.Add(err) + + var errs errs.Group + err = authDB.db.Range(ctx, + func(ctx context.Context, key storage.Key, value storage.Value) error { + var group Group + err := group.Unmarshal(value) + if err != nil { + errs.Add(err) + return nil } - auths = append(auths, nextAuths...) - } - return ErrDBInternal.Wrap(listErrs.Err()) - }) - return auths, ErrDBInternal.Wrap(err) + auths = append(auths, group...) + return nil + }) + errs.Add(err) + return auths, ErrDBInternal.Wrap(errs.Err()) } // Claim marks an authorization as claimed and records claim information. diff --git a/private/revocation/db.go b/private/revocation/db.go index 2d15caf306ed..96fa5d56a0ef 100644 --- a/private/revocation/db.go +++ b/private/revocation/db.go @@ -108,25 +108,16 @@ func (db *DB) List(ctx context.Context) (revs []*extensions.Revocation, err erro return nil, nil } - keys, err := db.store.List(ctx, []byte{}, 0) - if err != nil { - return nil, extensions.ErrRevocationDB.Wrap(err) - } - - marshaledRevs, err := db.store.GetAll(ctx, keys) - if err != nil { - return nil, extensions.ErrRevocationDB.Wrap(err) - } - - for _, revBytes := range marshaledRevs { + err = db.store.Range(ctx, func(ctx context.Context, key storage.Key, value storage.Value) error { rev := new(extensions.Revocation) - if err := rev.Unmarshal(revBytes); err != nil { - return nil, extensions.ErrRevocationDB.Wrap(err) + if err := rev.Unmarshal(value); err != nil { + return extensions.ErrRevocationDB.Wrap(err) } revs = append(revs, rev) - } - return revs, nil + return nil + }) + return revs, extensions.ErrRevocationDB.Wrap(err) } // TestGetStore returns the internal store for testing. diff --git a/storage/boltdb/client.go b/storage/boltdb/client.go index 7b3d49c9eb2a..21f35eb54831 100644 --- a/storage/boltdb/client.go +++ b/storage/boltdb/client.go @@ -286,6 +286,17 @@ func (client *Client) IterateWithoutLookupLimit(ctx context.Context, opts storag }) } +// Range iterates over all items in unspecified order. +func (client *Client) Range(ctx context.Context, fn func(context.Context, storage.Key, storage.Value) error) (err error) { + defer mon.Task()(&ctx)(&err) + + return client.view(func(bucket *bbolt.Bucket) error { + return bucket.ForEach(func(k, v []byte) error { + return fn(ctx, storage.Key(k), storage.Value(v)) + }) + }) +} + type advancer interface { PositionToFirst(prefix, first storage.Key) (key, value []byte) SkipPrefix(prefix storage.Key) (key, value []byte) diff --git a/storage/common.go b/storage/common.go index 4522248a803e..d4a6a427f81b 100644 --- a/storage/common.go +++ b/storage/common.go @@ -68,6 +68,9 @@ type KeyValueStore interface { Delete(context.Context, Key) error // DeleteMultiple deletes keys and returns nil for. DeleteMultiple(context.Context, []Key) (Items, error) + // Range iterates over all items in unspecified order. + // The Key and Value are valid only for the duration of callback. + Range(ctx context.Context, fn func(context.Context, Key, Value) error) error // List lists all keys starting from start and upto limit items. List(ctx context.Context, start Key, limit int) (Keys, error) // Iterate iterates over items based on opts. diff --git a/storage/redis/client.go b/storage/redis/client.go index 2e3940dd497e..7496a3a7745a 100644 --- a/storage/redis/client.go +++ b/storage/redis/client.go @@ -183,6 +183,35 @@ func (client *Client) GetAll(ctx context.Context, keys storage.Keys) (_ storage. return values, nil } +// Range iterates over all items in unspecified order. +func (client *Client) Range(ctx context.Context, fn func(context.Context, storage.Key, storage.Value) error) (err error) { + defer mon.Task()(&ctx)(&err) + + it := client.db.Scan(ctx, 0, "", 0).Iterator() + + var lastKey string + var lastOk bool + for it.Next(ctx) { + key := it.Val() + // redis may return duplicates + if lastOk && key == lastKey { + continue + } + lastKey, lastOk = key, true + + value, err := get(ctx, client.db, storage.Key(key)) + if err != nil { + return Error.Wrap(err) + } + + if err := fn(ctx, storage.Key(key), value); err != nil { + return err + } + } + + return Error.Wrap(it.Err()) +} + // Iterate iterates over items based on opts. func (client *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { defer mon.Task()(&ctx)(&err) diff --git a/storage/storelogger/logger.go b/storage/storelogger/logger.go index b132161d986a..33b5675900c7 100644 --- a/storage/storelogger/logger.go +++ b/storage/storelogger/logger.go @@ -77,6 +77,20 @@ func (store *Logger) List(ctx context.Context, first storage.Key, limit int) (_ return keys, err } +// Range iterates over all items in unspecified order. +func (store *Logger) Range(ctx context.Context, fn func(context.Context, storage.Key, storage.Value) error) (err error) { + defer mon.Task()(&ctx)(&err) + store.log.Debug("Range") + return store.store.Range(ctx, func(ctx context.Context, key storage.Key, value storage.Value) error { + store.log.Debug(" ", + zap.ByteString("key", key), + zap.Int("value length", len(value)), + zap.Binary("truncated value", truncate(value)), + ) + return fn(ctx, key, value) + }) +} + // Iterate iterates over items based on opts. func (store *Logger) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { defer mon.Task()(&ctx)(&err) diff --git a/storage/teststore/store.go b/storage/teststore/store.go index 41d77d4a87e5..194b0dd0bc4f 100644 --- a/storage/teststore/store.go +++ b/storage/teststore/store.go @@ -34,6 +34,7 @@ type Client struct { GetAll int Delete int Close int + Range int Iterate int CompareAndSwap int } @@ -231,6 +232,25 @@ func (store *Client) Close() error { return nil } +// Range iterates over all items in unspecified order. +func (store *Client) Range(ctx context.Context, fn func(context.Context, storage.Key, storage.Value) error) error { + store.mu.Lock() + store.CallCount.Range++ + if store.forcedError() { + store.mu.Unlock() + return errors.New("internal error") + } + items := append([]storage.ListItem{}, store.Items...) + store.mu.Unlock() + + for _, item := range items { + if err := fn(ctx, item.Key, item.Value); err != nil { + return err + } + } + return nil +} + // Iterate iterates over items based on opts. func (store *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { defer mon.Task()(&ctx)(&err) diff --git a/storage/testsuite/test.go b/storage/testsuite/test.go index f817728c9028..b592c859511b 100644 --- a/storage/testsuite/test.go +++ b/storage/testsuite/test.go @@ -25,6 +25,7 @@ func RunTests(t *testing.T, store storage.KeyValueStore) { defer ctx.Cleanup() t.Run("CRUD", func(t *testing.T) { testCRUD(t, ctx, store) }) t.Run("Constraints", func(t *testing.T) { testConstraints(t, ctx, store) }) + t.Run("Range", func(t *testing.T) { testRange(t, ctx, store) }) t.Run("Iterate", func(t *testing.T) { testIterate(t, ctx, store) }) t.Run("IterateAll", func(t *testing.T) { testIterateAll(t, ctx, store) }) t.Run("Prefix", func(t *testing.T) { testPrefix(t, ctx, store) }) diff --git a/storage/testsuite/test_range.go b/storage/testsuite/test_range.go new file mode 100644 index 000000000000..3be9e36bbb22 --- /dev/null +++ b/storage/testsuite/test_range.go @@ -0,0 +1,59 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package testsuite + +import ( + "context" + "errors" + "math/rand" + "sort" + "testing" + + "github.com/stretchr/testify/require" + + "storj.io/common/testcontext" + "storj.io/storj/storage" +) + +func testRange(t *testing.T, ctx *testcontext.Context, store storage.KeyValueStore) { + err := store.Range(ctx, func(ctx context.Context, key storage.Key, value storage.Value) error { + return errors.New("empty store") + }) + require.NoError(t, err) + + items := storage.Items{ + newItem("a", "a", false), + newItem("b/1", "b/1", false), + newItem("b/2", "b/2", false), + newItem("b/3", "b/3", false), + newItem("c", "c", false), + newItem("c/", "c/", false), + newItem("c//", "c//", false), + newItem("c/1", "c/1", false), + newItem("g", "g", false), + newItem("h", "h", false), + } + rand.Shuffle(len(items), items.Swap) + defer cleanupItems(t, ctx, store, items) + + if err := storage.PutAll(ctx, store, items...); err != nil { + t.Fatalf("failed to setup: %v", err) + } + + var output storage.Items + err = store.Range(ctx, func(ctx context.Context, key storage.Key, value storage.Value) error { + output = append(output, storage.ListItem{ + Key: append([]byte{}, key...), + Value: append([]byte{}, value...), + }) + return nil + }) + require.NoError(t, err) + + expected := storage.CloneItems(items) + sort.Sort(expected) + sort.Sort(output) + + require.EqualValues(t, expected, output) +}