Skip to content

Commit

Permalink
[ADDED] ListKeys method for listing kv keys (#1490)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Dec 13, 2023
1 parent f8caaf9 commit b95565d
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 0 deletions.
49 changes: 49 additions & 0 deletions jetstream/kv.go
Expand Up @@ -67,7 +67,10 @@ type (
// WatchAll will invoke the callback for all updates.
WatchAll(ctx context.Context, opts ...WatchOpt) (KeyWatcher, error)
// Keys will return all keys.
// DEPRECATED: Use ListKeys instead to avoid memory issues.
Keys(ctx context.Context, opts ...WatchOpt) ([]string, error)
// ListKeys will return all keys in a channel.
ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error)
// History will return all historical values for the key.
History(ctx context.Context, key string, opts ...WatchOpt) ([]KeyValueEntry, error)
// Bucket returns the current bucket name.
Expand All @@ -78,6 +81,12 @@ type (
Status(ctx context.Context) (KeyValueStatus, error)
}

// KeyLister is used to retrieve a list of key value store keys
KeyLister interface {
Keys() <-chan string
Stop() error
}

// KeyValueConfig is for configuring a KeyValue store.
KeyValueConfig struct {
Bucket string
Expand Down Expand Up @@ -923,6 +932,46 @@ func (kv *kvs) Keys(ctx context.Context, opts ...WatchOpt) ([]string, error) {
return keys, nil
}

type keyLister struct {
watcher KeyWatcher
keys chan string
}

// Keys will return all keys.
func (kv *kvs) ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error) {
opts = append(opts, IgnoreDeletes(), MetaOnly())
watcher, err := kv.WatchAll(ctx, opts...)
if err != nil {
return nil, err
}
kl := &keyLister{watcher: watcher, keys: make(chan string, 256)}

go func() {
defer close(kl.keys)
defer watcher.Stop()
for {
select {
case entry := <-watcher.Updates():
if entry == nil {
return
}
kl.keys <- entry.Key()
case <-ctx.Done():
return
}
}
}()
return kl, nil
}

func (kl *keyLister) Keys() <-chan string {
return kl.keys
}

func (kl *keyLister) Stop() error {
return kl.watcher.Stop()
}

// History will return all historical values for the key.
func (kv *kvs) History(ctx context.Context, key string, opts ...WatchOpt) ([]KeyValueEntry, error) {
opts = append(opts, IncludeHistory())
Expand Down
74 changes: 74 additions & 0 deletions jetstream/test/kv_test.go
Expand Up @@ -746,6 +746,80 @@ func TestKeyValueKeys(t *testing.T) {
}
}

func TestKeyValueListKeys(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "KVS", History: 2})
expectOk(t, err)

put := func(key, value string) {
t.Helper()
_, err := kv.Put(ctx, key, []byte(value))
expectOk(t, err)
}

// Put in a few names and ages.
put("name", "derek")
put("age", "22")
put("country", "US")
put("name", "ivan")
put("age", "33")
put("country", "US")
put("name", "rip")
put("age", "44")
put("country", "MT")

keys, err := kv.ListKeys(ctx)
expectOk(t, err)

kmap := make(map[string]struct{})
for key := range keys.Keys() {
if _, ok := kmap[key]; ok {
t.Fatalf("Already saw %q", key)
}
kmap[key] = struct{}{}
}
if len(kmap) != 3 {
t.Fatalf("Expected 3 total keys, got %d", len(kmap))
}
expected := map[string]struct{}{
"name": struct{}{},
"age": struct{}{},
"country": struct{}{},
}
if !reflect.DeepEqual(kmap, expected) {
t.Fatalf("Expected %+v but got %+v", expected, kmap)
}
// Make sure delete and purge do the right thing and not return the keys.
err = kv.Delete(ctx, "name")
expectOk(t, err)
err = kv.Purge(ctx, "country")
expectOk(t, err)

keys, err = kv.ListKeys(ctx)
expectOk(t, err)

kmap = make(map[string]struct{})
for key := range keys.Keys() {
if _, ok := kmap[key]; ok {
t.Fatalf("Already saw %q", key)
}
kmap[key] = struct{}{}
}
if len(kmap) != 1 {
t.Fatalf("Expected 1 total key, got %d", len(kmap))
}
if _, ok := kmap["age"]; !ok {
t.Fatalf("Expected %q to be only key present", "age")
}
}

func TestKeyValueCrossAccounts(t *testing.T) {
conf := createConfFile(t, []byte(`
jetstream: enabled
Expand Down
44 changes: 44 additions & 0 deletions kv.go
Expand Up @@ -65,7 +65,10 @@ type KeyValue interface {
// WatchAll will invoke the callback for all updates.
WatchAll(opts ...WatchOpt) (KeyWatcher, error)
// Keys will return all keys.
// DEPRECATED: Use ListKeys instead to avoid memory issues.
Keys(opts ...WatchOpt) ([]string, error)
// ListKeys will return all keys in a channel.
ListKeys(opts ...WatchOpt) (KeyLister, error)
// History will return all historical values for the key.
History(key string, opts ...WatchOpt) ([]KeyValueEntry, error)
// Bucket returns the current bucket name.
Expand Down Expand Up @@ -110,6 +113,12 @@ type KeyWatcher interface {
Stop() error
}

// KeyLister is used to retrieve a list of key value store keys
type KeyLister interface {
Keys() <-chan string
Stop() error
}

type WatchOpt interface {
configureWatcher(opts *watchOpts) error
}
Expand Down Expand Up @@ -842,6 +851,41 @@ func (kv *kvs) Keys(opts ...WatchOpt) ([]string, error) {
return keys, nil
}

type keyLister struct {
watcher KeyWatcher
keys chan string
}

// ListKeys will return all keys.
func (kv *kvs) ListKeys(opts ...WatchOpt) (KeyLister, error) {
opts = append(opts, IgnoreDeletes(), MetaOnly())
watcher, err := kv.WatchAll(opts...)
if err != nil {
return nil, err
}
kl := &keyLister{watcher: watcher, keys: make(chan string, 256)}

go func() {
defer close(kl.keys)
defer watcher.Stop()
for entry := range watcher.Updates() {
if entry == nil {
return
}
kl.keys <- entry.Key()
}
}()
return kl, nil
}

func (kl *keyLister) Keys() <-chan string {
return kl.keys
}

func (kl *keyLister) Stop() error {
return kl.watcher.Stop()
}

// History will return all values for the key.
func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {
opts = append(opts, IncludeHistory())
Expand Down
72 changes: 72 additions & 0 deletions test/kv_test.go
Expand Up @@ -694,6 +694,78 @@ func TestKeyValueKeys(t *testing.T) {
}
}

func TestKeyValueListKeys(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 2})
expectOk(t, err)

put := func(key, value string) {
t.Helper()
_, err := kv.Put(key, []byte(value))
expectOk(t, err)
}

// Put in a few names and ages.
put("name", "derek")
put("age", "22")
put("country", "US")
put("name", "ivan")
put("age", "33")
put("country", "US")
put("name", "rip")
put("age", "44")
put("country", "MT")

keys, err := kv.ListKeys()
expectOk(t, err)

kmap := make(map[string]struct{})
for key := range keys.Keys() {
if _, ok := kmap[key]; ok {
t.Fatalf("Already saw %q", key)
}
kmap[key] = struct{}{}
}
if len(kmap) != 3 {
t.Fatalf("Expected 3 total keys, got %d", len(kmap))
}
expected := map[string]struct{}{
"name": struct{}{},
"age": struct{}{},
"country": struct{}{},
}
if !reflect.DeepEqual(kmap, expected) {
t.Fatalf("Expected %+v but got %+v", expected, kmap)
}
// Make sure delete and purge do the right thing and not return the keys.
err = kv.Delete("name")
expectOk(t, err)
err = kv.Purge("country")
expectOk(t, err)

keys, err = kv.ListKeys()
expectOk(t, err)

kmap = make(map[string]struct{})
for key := range keys.Keys() {
if _, ok := kmap[key]; ok {
t.Fatalf("Already saw %q", key)
}
kmap[key] = struct{}{}
}
if len(kmap) != 1 {
t.Fatalf("Expected 1 total key, got %d", len(kmap))
}
if _, ok := kmap["age"]; !ok {
t.Fatalf("Expected %q to be only key present", "age")
}
}

func TestKeyValueCrossAccounts(t *testing.T) {
conf := createConfFile(t, []byte(`
jetstream: enabled
Expand Down

0 comments on commit b95565d

Please sign in to comment.