Skip to content

Commit

Permalink
storage: implement Range
Browse files Browse the repository at this point in the history
By implementing a simpler Range, we can get rid of many methods
defined for storage.KeyValueStore.

Initially KeyValueStore was used to manage objects, hence it required
much more complex behaviour. However, we've moved away from this
implementation, hence we can significantly simplify everything.

Change-Id: Iaaa8c9c83d8601ad3c894767f17c0f5b356f7253
  • Loading branch information
egonelbre committed Apr 6, 2023
1 parent e599594 commit 5498424
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 38 deletions.
43 changes: 20 additions & 23 deletions certificate/authorization/db.go
Expand Up @@ -148,36 +148,33 @@ func (authDB *DB) Get(ctx context.Context, userID string) (_ Group, err error) {
// UserIDs returns a list of all userIDs present in the authorization database.
func (authDB *DB) UserIDs(ctx context.Context) (userIDs []string, err error) {
defer mon.Task()(&ctx)(&err)
err = authDB.db.Iterate(ctx, storage.IterateOptions{
Recurse: true,
}, func(ctx context.Context, iterator storage.Iterator) error {
var listItem storage.ListItem
for iterator.Next(ctx, &listItem) {
userIDs = append(userIDs, listItem.Key.String())
}
return nil
})

err = authDB.db.Range(ctx,
func(ctx context.Context, key storage.Key, _ storage.Value) error {
userIDs = append(userIDs, key.String())
return nil
})
return userIDs, ErrDBInternal.Wrap(err)
}

// List returns all authorizations in the database.
func (authDB *DB) List(ctx context.Context) (auths Group, err error) {
defer mon.Task()(&ctx)(&err)
err = authDB.db.Iterate(ctx, storage.IterateOptions{
Recurse: true,
}, func(ctx context.Context, iterator storage.Iterator) error {
var listErrs errs.Group
var listItem storage.ListItem
for iterator.Next(ctx, &listItem) {
var nextAuths Group
if err := nextAuths.Unmarshal(listItem.Value); err != nil {
listErrs.Add(err)

var errs errs.Group
err = authDB.db.Range(ctx,
func(ctx context.Context, key storage.Key, value storage.Value) error {
var group Group
err := group.Unmarshal(value)
if err != nil {
errs.Add(err)
return nil
}
auths = append(auths, nextAuths...)
}
return ErrDBInternal.Wrap(listErrs.Err())
})
return auths, ErrDBInternal.Wrap(err)
auths = append(auths, group...)
return nil
})
errs.Add(err)
return auths, ErrDBInternal.Wrap(errs.Err())
}

// Claim marks an authorization as claimed and records claim information.
Expand Down
21 changes: 6 additions & 15 deletions private/revocation/db.go
Expand Up @@ -108,25 +108,16 @@ func (db *DB) List(ctx context.Context) (revs []*extensions.Revocation, err erro
return nil, nil
}

keys, err := db.store.List(ctx, []byte{}, 0)
if err != nil {
return nil, extensions.ErrRevocationDB.Wrap(err)
}

marshaledRevs, err := db.store.GetAll(ctx, keys)
if err != nil {
return nil, extensions.ErrRevocationDB.Wrap(err)
}

for _, revBytes := range marshaledRevs {
err = db.store.Range(ctx, func(ctx context.Context, key storage.Key, value storage.Value) error {
rev := new(extensions.Revocation)
if err := rev.Unmarshal(revBytes); err != nil {
return nil, extensions.ErrRevocationDB.Wrap(err)
if err := rev.Unmarshal(value); err != nil {
return extensions.ErrRevocationDB.Wrap(err)
}

revs = append(revs, rev)
}
return revs, nil
return nil
})
return revs, extensions.ErrRevocationDB.Wrap(err)
}

// TestGetStore returns the internal store for testing.
Expand Down
11 changes: 11 additions & 0 deletions storage/boltdb/client.go
Expand Up @@ -286,6 +286,17 @@ func (client *Client) IterateWithoutLookupLimit(ctx context.Context, opts storag
})
}

// Range iterates over all items in unspecified order.
func (client *Client) Range(ctx context.Context, fn func(context.Context, storage.Key, storage.Value) error) (err error) {
defer mon.Task()(&ctx)(&err)

return client.view(func(bucket *bbolt.Bucket) error {
return bucket.ForEach(func(k, v []byte) error {
return fn(ctx, storage.Key(k), storage.Value(v))
})
})
}

type advancer interface {
PositionToFirst(prefix, first storage.Key) (key, value []byte)
SkipPrefix(prefix storage.Key) (key, value []byte)
Expand Down
3 changes: 3 additions & 0 deletions storage/common.go
Expand Up @@ -68,6 +68,9 @@ type KeyValueStore interface {
Delete(context.Context, Key) error
// DeleteMultiple deletes keys and returns nil for.
DeleteMultiple(context.Context, []Key) (Items, error)
// Range iterates over all items in unspecified order.
// The Key and Value are valid only for the duration of callback.
Range(ctx context.Context, fn func(context.Context, Key, Value) error) error
// List lists all keys starting from start and upto limit items.
List(ctx context.Context, start Key, limit int) (Keys, error)
// Iterate iterates over items based on opts.
Expand Down
29 changes: 29 additions & 0 deletions storage/redis/client.go
Expand Up @@ -183,6 +183,35 @@ func (client *Client) GetAll(ctx context.Context, keys storage.Keys) (_ storage.
return values, nil
}

// Range iterates over all items in unspecified order.
func (client *Client) Range(ctx context.Context, fn func(context.Context, storage.Key, storage.Value) error) (err error) {
defer mon.Task()(&ctx)(&err)

it := client.db.Scan(ctx, 0, "", 0).Iterator()

var lastKey string
var lastOk bool
for it.Next(ctx) {
key := it.Val()
// redis may return duplicates
if lastOk && key == lastKey {
continue
}
lastKey, lastOk = key, true

value, err := get(ctx, client.db, storage.Key(key))
if err != nil {
return Error.Wrap(err)
}

if err := fn(ctx, storage.Key(key), value); err != nil {
return err
}
}

return Error.Wrap(it.Err())
}

// Iterate iterates over items based on opts.
func (client *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) {
defer mon.Task()(&ctx)(&err)
Expand Down
14 changes: 14 additions & 0 deletions storage/storelogger/logger.go
Expand Up @@ -77,6 +77,20 @@ func (store *Logger) List(ctx context.Context, first storage.Key, limit int) (_
return keys, err
}

// Range iterates over all items in unspecified order.
func (store *Logger) Range(ctx context.Context, fn func(context.Context, storage.Key, storage.Value) error) (err error) {
defer mon.Task()(&ctx)(&err)
store.log.Debug("Range")
return store.store.Range(ctx, func(ctx context.Context, key storage.Key, value storage.Value) error {
store.log.Debug(" ",
zap.ByteString("key", key),
zap.Int("value length", len(value)),
zap.Binary("truncated value", truncate(value)),
)
return fn(ctx, key, value)
})
}

// Iterate iterates over items based on opts.
func (store *Logger) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) {
defer mon.Task()(&ctx)(&err)
Expand Down
20 changes: 20 additions & 0 deletions storage/teststore/store.go
Expand Up @@ -34,6 +34,7 @@ type Client struct {
GetAll int
Delete int
Close int
Range int
Iterate int
CompareAndSwap int
}
Expand Down Expand Up @@ -231,6 +232,25 @@ func (store *Client) Close() error {
return nil
}

// Range iterates over all items in unspecified order.
func (store *Client) Range(ctx context.Context, fn func(context.Context, storage.Key, storage.Value) error) error {
store.mu.Lock()
store.CallCount.Range++
if store.forcedError() {
store.mu.Unlock()
return errors.New("internal error")
}
items := append([]storage.ListItem{}, store.Items...)
store.mu.Unlock()

for _, item := range items {
if err := fn(ctx, item.Key, item.Value); err != nil {
return err
}
}
return nil
}

// Iterate iterates over items based on opts.
func (store *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) {
defer mon.Task()(&ctx)(&err)
Expand Down
1 change: 1 addition & 0 deletions storage/testsuite/test.go
Expand Up @@ -25,6 +25,7 @@ func RunTests(t *testing.T, store storage.KeyValueStore) {
defer ctx.Cleanup()
t.Run("CRUD", func(t *testing.T) { testCRUD(t, ctx, store) })
t.Run("Constraints", func(t *testing.T) { testConstraints(t, ctx, store) })
t.Run("Range", func(t *testing.T) { testRange(t, ctx, store) })
t.Run("Iterate", func(t *testing.T) { testIterate(t, ctx, store) })
t.Run("IterateAll", func(t *testing.T) { testIterateAll(t, ctx, store) })
t.Run("Prefix", func(t *testing.T) { testPrefix(t, ctx, store) })
Expand Down
59 changes: 59 additions & 0 deletions storage/testsuite/test_range.go
@@ -0,0 +1,59 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.

package testsuite

import (
"context"
"errors"
"math/rand"
"sort"
"testing"

"github.com/stretchr/testify/require"

"storj.io/common/testcontext"
"storj.io/storj/storage"
)

func testRange(t *testing.T, ctx *testcontext.Context, store storage.KeyValueStore) {
err := store.Range(ctx, func(ctx context.Context, key storage.Key, value storage.Value) error {
return errors.New("empty store")
})
require.NoError(t, err)

items := storage.Items{
newItem("a", "a", false),
newItem("b/1", "b/1", false),
newItem("b/2", "b/2", false),
newItem("b/3", "b/3", false),
newItem("c", "c", false),
newItem("c/", "c/", false),
newItem("c//", "c//", false),
newItem("c/1", "c/1", false),
newItem("g", "g", false),
newItem("h", "h", false),
}
rand.Shuffle(len(items), items.Swap)
defer cleanupItems(t, ctx, store, items)

if err := storage.PutAll(ctx, store, items...); err != nil {
t.Fatalf("failed to setup: %v", err)
}

var output storage.Items
err = store.Range(ctx, func(ctx context.Context, key storage.Key, value storage.Value) error {
output = append(output, storage.ListItem{
Key: append([]byte{}, key...),
Value: append([]byte{}, value...),
})
return nil
})
require.NoError(t, err)

expected := storage.CloneItems(items)
sort.Sort(expected)
sort.Sort(output)

require.EqualValues(t, expected, output)
}

0 comments on commit 5498424

Please sign in to comment.