Skip to content

Commit

Permalink
Refactor WithRequireLeader to make it part of the etcd store
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Jun 21, 2023
1 parent 9e0569f commit a9af2de
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
13 changes: 12 additions & 1 deletion staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go
Expand Up @@ -885,7 +885,18 @@ func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions)
if err != nil {
return nil, err
}
return s.watcher.Watch(ctx, preparedKey, int64(rev), opts.Recursive, opts.ProgressNotify, s.transformer, opts.Predicate)
return s.watcher.Watch(s.watchContext(ctx), preparedKey, int64(rev), opts.Recursive, opts.ProgressNotify, s.transformer, opts.Predicate)
}

func (s *store) watchContext(ctx context.Context) context.Context {
// The etcd server waits until it cannot find a leader for 3 election
// timeouts to cancel existing streams. 3 is currently a hard coded
// constant. The election timeout defaults to 1000ms. If the cluster is
// healthy, when the leader is stopped, the leadership transfer should be
// smooth. (leader transfers its leadership before stopping). If leader is
// hard killed, other servers will take an election timeout to realize
// leader lost and start campaign.
return clientv3.WithRequireLeader(ctx)
}

func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
Expand Down
10 changes: 1 addition & 9 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go
Expand Up @@ -144,15 +144,7 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
// The filter doesn't filter out any object.
wc.internalPred = storage.Everything
}

// The etcd server waits until it cannot find a leader for 3 election
// timeouts to cancel existing streams. 3 is currently a hard coded
// constant. The election timeout defaults to 1000ms. If the cluster is
// healthy, when the leader is stopped, the leadership transfer should be
// smooth. (leader transfers its leadership before stopping). If leader is
// hard killed, other servers will take an election timeout to realize
// leader lost and start campaign.
wc.ctx, wc.cancel = context.WithCancel(clientv3.WithRequireLeader(ctx))
wc.ctx, wc.cancel = context.WithCancel(ctx)
return wc
}

Expand Down

0 comments on commit a9af2de

Please sign in to comment.