Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdutil: fix ctx in watch loop #6445

Merged
merged 3 commits into from
May 15, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 21 additions & 4 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,19 +427,19 @@
defer logutil.LogPanic()
defer lw.wg.Done()

ctx, cancel := context.WithTimeout(lw.ctx, lw.loadTimeout)
ctx, cancel := context.WithCancel(lw.ctx)
defer cancel()
watchStartRevision := lw.initFromEtcd(ctx)

log.Info("start to watch loop", zap.String("name", lw.name), zap.String("key", lw.key))
for {
select {
case <-lw.ctx.Done():
case <-ctx.Done():
log.Info("server is closed, exit watch loop", zap.String("name", lw.name), zap.String("key", lw.key))
return
default:
}
nextRevision, err := lw.watch(lw.ctx, watchStartRevision)
nextRevision, err := lw.watch(ctx, watchStartRevision)
if err != nil {
log.Error("watcher canceled unexpectedly and a new watcher will start after a while for watch loop",
zap.String("name", lw.name),
Expand All @@ -461,6 +461,8 @@
watchStartRevision int64
err error
)
ctx, cancel := context.WithTimeout(ctx, lw.loadTimeout)
defer cancel()
ticker := time.NewTicker(defaultLoadFromEtcdRetryInterval)
defer ticker.Stop()
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -489,6 +491,8 @@
}
if err != nil {
log.Warn("meet error when loading in watch loop", zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err))
} else {
log.Info("load finished in watch loop", zap.String("name", lw.name), zap.String("key", lw.key))
}
lw.isLoadedCh <- err
return watchStartRevision
Expand All @@ -500,8 +504,12 @@

for {
WatchChan:
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watchChanCtx, watchChanCancel := context.WithCancel(clientv3.WithRequireLeader(ctx))
defer watchChanCancel()
opts := append(lw.opts, clientv3.WithRev(revision))
watchChan := watcher.Watch(ctx, lw.key, opts...)
watchChan := watcher.Watch(watchChanCtx, lw.key, opts...)
select {
case <-ctx.Done():
return revision, nil
Expand All @@ -511,13 +519,15 @@
log.Warn("force load key failed in watch loop", zap.String("name", lw.name),
zap.String("key", lw.key), zap.Error(err))
}
watchChanCancel()
goto WatchChan
case wresp := <-watchChan:
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision in watch loop",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
watchChanCancel()

Check warning on line 530 in pkg/utils/etcdutil/etcdutil.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/etcdutil/etcdutil.go#L530

Added line #L530 was not covered by tests
goto WatchChan
} else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0
log.Error("watcher is canceled in watch loop",
Expand All @@ -532,11 +542,16 @@
log.Error("put failed in watch loop", zap.String("name", lw.name),
zap.String("key", lw.key), zap.Error(err))
}
log.Debug("put in watch loop", zap.String("name", lw.name),
zap.ByteString("key", event.Kv.Key),
rleungx marked this conversation as resolved.
Show resolved Hide resolved
zap.ByteString("value", event.Kv.Value))
case clientv3.EventTypeDelete:
if err := lw.deleteFn(event.Kv); err != nil {
log.Error("delete failed in watch loop", zap.String("name", lw.name),
zap.String("key", lw.key), zap.Error(err))
}
log.Debug("delete in watch loop", zap.String("name", lw.name),
zap.ByteString("key", event.Kv.Key))
}
}
if err := lw.postEventFn(); err != nil {
Expand All @@ -545,6 +560,7 @@
}
revision = wresp.Header.Revision + 1
}
watchChanCancel()
}
}

Expand Down Expand Up @@ -580,6 +596,7 @@
log.Error("put failed in watch loop when loading", zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err))
}
}
// Note: if there are no keys in etcd, the resp.More is false. It also means the load is finished.
if !resp.More {
if err := lw.postEventFn(); err != nil {
log.Error("run post event failed in watch loop", zap.String("name", lw.name),
Expand Down