Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] ListKeys method for listing kv keys #1490

Merged
merged 1 commit into from Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 49 additions & 0 deletions jetstream/kv.go
Expand Up @@ -67,7 +67,10 @@ type (
// WatchAll will invoke the callback for all updates.
WatchAll(ctx context.Context, opts ...WatchOpt) (KeyWatcher, error)
// Keys will return all keys.
// DEPRECATED: Use ListKeys instead to avoid memory issues.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which was the memory issue?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keys() could be used to fetch millions of keys all stored in a slice before returning. There should be a way of either paging or iterating over the result (which the new method provides). I could add some details to this comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a use case now with > 600k keys.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see, makes sense to switch to use channels instead

Keys(ctx context.Context, opts ...WatchOpt) ([]string, error)
// ListKeys will return all keys in a channel.
ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error)
// History will return all historical values for the key.
History(ctx context.Context, key string, opts ...WatchOpt) ([]KeyValueEntry, error)
// Bucket returns the current bucket name.
Expand All @@ -78,6 +81,12 @@ type (
Status(ctx context.Context) (KeyValueStatus, error)
}

// KeyLister is used to retrieve a list of key value store keys
KeyLister interface {
Keys() <-chan string
Stop() error
}

// KeyValueConfig is for configuring a KeyValue store.
KeyValueConfig struct {
Bucket string
Expand Down Expand Up @@ -923,6 +932,46 @@ func (kv *kvs) Keys(ctx context.Context, opts ...WatchOpt) ([]string, error) {
return keys, nil
}

type keyLister struct {
watcher KeyWatcher
keys chan string
}

// Keys will return all keys.
func (kv *kvs) ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error) {
opts = append(opts, IgnoreDeletes(), MetaOnly())
watcher, err := kv.WatchAll(ctx, opts...)
if err != nil {
return nil, err
}
kl := &keyLister{watcher: watcher, keys: make(chan string, 256)}

go func() {
defer close(kl.keys)
defer watcher.Stop()
for {
select {
case entry := <-watcher.Updates():
if entry == nil {
return
}
kl.keys <- entry.Key()
case <-ctx.Done():
return
}
}
}()
return kl, nil
}

func (kl *keyLister) Keys() <-chan string {
return kl.keys
}

func (kl *keyLister) Stop() error {
return kl.watcher.Stop()
}

// History will return all historical values for the key.
func (kv *kvs) History(ctx context.Context, key string, opts ...WatchOpt) ([]KeyValueEntry, error) {
opts = append(opts, IncludeHistory())
Expand Down
74 changes: 74 additions & 0 deletions jetstream/test/kv_test.go
Expand Up @@ -746,6 +746,80 @@ func TestKeyValueKeys(t *testing.T) {
}
}

func TestKeyValueListKeys(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "KVS", History: 2})
expectOk(t, err)

put := func(key, value string) {
t.Helper()
_, err := kv.Put(ctx, key, []byte(value))
expectOk(t, err)
}

// Put in a few names and ages.
put("name", "derek")
put("age", "22")
put("country", "US")
put("name", "ivan")
put("age", "33")
put("country", "US")
put("name", "rip")
put("age", "44")
put("country", "MT")

keys, err := kv.ListKeys(ctx)
expectOk(t, err)

kmap := make(map[string]struct{})
for key := range keys.Keys() {
if _, ok := kmap[key]; ok {
t.Fatalf("Already saw %q", key)
}
kmap[key] = struct{}{}
}
if len(kmap) != 3 {
t.Fatalf("Expected 3 total keys, got %d", len(kmap))
}
expected := map[string]struct{}{
"name": struct{}{},
"age": struct{}{},
"country": struct{}{},
}
if !reflect.DeepEqual(kmap, expected) {
t.Fatalf("Expected %+v but got %+v", expected, kmap)
}
// Make sure delete and purge do the right thing and not return the keys.
err = kv.Delete(ctx, "name")
expectOk(t, err)
err = kv.Purge(ctx, "country")
expectOk(t, err)

keys, err = kv.ListKeys(ctx)
expectOk(t, err)

kmap = make(map[string]struct{})
for key := range keys.Keys() {
if _, ok := kmap[key]; ok {
t.Fatalf("Already saw %q", key)
}
kmap[key] = struct{}{}
}
if len(kmap) != 1 {
t.Fatalf("Expected 1 total key, got %d", len(kmap))
}
if _, ok := kmap["age"]; !ok {
t.Fatalf("Expected %q to be only key present", "age")
}
}

func TestKeyValueCrossAccounts(t *testing.T) {
conf := createConfFile(t, []byte(`
jetstream: enabled
Expand Down
44 changes: 44 additions & 0 deletions kv.go
Expand Up @@ -65,7 +65,10 @@ type KeyValue interface {
// WatchAll will invoke the callback for all updates.
WatchAll(opts ...WatchOpt) (KeyWatcher, error)
// Keys will return all keys.
// DEPRECATED: Use ListKeys instead to avoid memory issues.
Keys(opts ...WatchOpt) ([]string, error)
// ListKeys will return all keys in a channel.
ListKeys(opts ...WatchOpt) (KeyLister, error)
// History will return all historical values for the key.
History(key string, opts ...WatchOpt) ([]KeyValueEntry, error)
// Bucket returns the current bucket name.
Expand Down Expand Up @@ -110,6 +113,12 @@ type KeyWatcher interface {
Stop() error
}

// KeyLister is used to retrieve a list of key value store keys
type KeyLister interface {
Keys() <-chan string
Stop() error
}

type WatchOpt interface {
configureWatcher(opts *watchOpts) error
}
Expand Down Expand Up @@ -842,6 +851,41 @@ func (kv *kvs) Keys(opts ...WatchOpt) ([]string, error) {
return keys, nil
}

type keyLister struct {
watcher KeyWatcher
keys chan string
}

// ListKeys will return all keys.
func (kv *kvs) ListKeys(opts ...WatchOpt) (KeyLister, error) {
opts = append(opts, IgnoreDeletes(), MetaOnly())
watcher, err := kv.WatchAll(opts...)
if err != nil {
return nil, err
}
kl := &keyLister{watcher: watcher, keys: make(chan string, 256)}

go func() {
defer close(kl.keys)
defer watcher.Stop()
for entry := range watcher.Updates() {
if entry == nil {
return
}
kl.keys <- entry.Key()
}
}()
return kl, nil
}

func (kl *keyLister) Keys() <-chan string {
return kl.keys
}

func (kl *keyLister) Stop() error {
return kl.watcher.Stop()
}

// History will return all values for the key.
func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {
opts = append(opts, IncludeHistory())
Expand Down
72 changes: 72 additions & 0 deletions test/kv_test.go
Expand Up @@ -694,6 +694,78 @@ func TestKeyValueKeys(t *testing.T) {
}
}

func TestKeyValueListKeys(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 2})
expectOk(t, err)

put := func(key, value string) {
t.Helper()
_, err := kv.Put(key, []byte(value))
expectOk(t, err)
}

// Put in a few names and ages.
put("name", "derek")
put("age", "22")
put("country", "US")
put("name", "ivan")
put("age", "33")
put("country", "US")
put("name", "rip")
put("age", "44")
put("country", "MT")

keys, err := kv.ListKeys()
expectOk(t, err)

kmap := make(map[string]struct{})
for key := range keys.Keys() {
if _, ok := kmap[key]; ok {
t.Fatalf("Already saw %q", key)
}
kmap[key] = struct{}{}
}
if len(kmap) != 3 {
t.Fatalf("Expected 3 total keys, got %d", len(kmap))
}
expected := map[string]struct{}{
"name": struct{}{},
"age": struct{}{},
"country": struct{}{},
}
if !reflect.DeepEqual(kmap, expected) {
t.Fatalf("Expected %+v but got %+v", expected, kmap)
}
// Make sure delete and purge do the right thing and not return the keys.
err = kv.Delete("name")
expectOk(t, err)
err = kv.Purge("country")
expectOk(t, err)

keys, err = kv.ListKeys()
expectOk(t, err)

kmap = make(map[string]struct{})
for key := range keys.Keys() {
if _, ok := kmap[key]; ok {
t.Fatalf("Already saw %q", key)
}
kmap[key] = struct{}{}
}
if len(kmap) != 1 {
t.Fatalf("Expected 1 total key, got %d", len(kmap))
}
if _, ok := kmap["age"]; !ok {
t.Fatalf("Expected %q to be only key present", "age")
}
}

func TestKeyValueCrossAccounts(t *testing.T) {
conf := createConfFile(t, []byte(`
jetstream: enabled
Expand Down