Skip to content

Commit

Permalink
fix: error when kv_watch with no keys (#981)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed Aug 23, 2023
1 parent 4cbd729 commit 66c3219
Showing 1 changed file with 21 additions and 5 deletions.
26 changes: 21 additions & 5 deletions pkg/watermark/store/jetstream/kv_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,31 @@ func (jsw *jetStreamWatch) newWatcher(ctx context.Context) nats.KeyWatcher {

// lastUpdateKVTime returns the last update time of the kv store
func (jsw *jetStreamWatch) lastUpdateKVTime() time.Time {
keys, err := jsw.kvStore.Keys()
for err != nil {
jsw.log.Errorw("Failed to get keys", zap.String("watcher", jsw.GetKVName()), zap.Error(err))
var (
keys []string
err error
lastUpdate time.Time
value nats.KeyValueEntry
)

retryLoop:
for {
keys, err = jsw.kvStore.Keys()
if err == nil {
break retryLoop
} else {
// if there are no keys in the store, return zero time because there are no updates
// upstream will handle it
if err == nats.ErrNoKeysFound {
return time.Time{}
}
jsw.log.Errorw("Failed to get keys", zap.String("watcher", jsw.GetKVName()), zap.Error(err))
}
time.Sleep(100 * time.Millisecond)
}
var lastUpdate = time.Time{}

for _, key := range keys {
value, err := jsw.kvStore.Get(key)
value, err = jsw.kvStore.Get(key)
for err != nil {
jsw.log.Errorw("Failed to get value", zap.String("watcher", jsw.GetKVName()), zap.Error(err))
value, err = jsw.kvStore.Get(key)
Expand Down

0 comments on commit 66c3219

Please sign in to comment.