Skip to content

Commit

Permalink
Merge pull request #113318 from pacoxu/learner-mode
Browse files Browse the repository at this point in the history
kubeadm: enable etcd's learner mode when joining etcd members
  • Loading branch information
k8s-ci-robot committed Dec 17, 2022
2 parents 7f7bf68 + 37f5da9 commit fdafd50
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 3 deletions.
3 changes: 3 additions & 0 deletions cmd/kubeadm/app/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@ const (
PublicKeysECDSA = "PublicKeysECDSA"
// RootlessControlPlane is expected to be in alpha in v1.22
RootlessControlPlane = "RootlessControlPlane"
// EtcdLearnerMode is expected to be in alpha in v1.27
EtcdLearnerMode = "EtcdLearnerMode"
)

// InitFeatureGates are the default feature gates for the init command
var InitFeatureGates = FeatureList{
PublicKeysECDSA: {FeatureSpec: featuregate.FeatureSpec{Default: false, PreRelease: featuregate.Alpha}},
RootlessControlPlane: {FeatureSpec: featuregate.FeatureSpec{Default: false, PreRelease: featuregate.Alpha}},
EtcdLearnerMode: {FeatureSpec: featuregate.FeatureSpec{Default: false, PreRelease: featuregate.Alpha}},
}

// Feature represents a feature being gated
Expand Down
6 changes: 5 additions & 1 deletion cmd/kubeadm/app/phases/etcd/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifest
fmt.Printf("[etcd] Would add etcd member: %s\n", etcdPeerAddress)
} else {
klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress)
cluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress)
if features.Enabled(cfg.FeatureGates, features.EtcdLearnerMode) {
cluster, err = etcdClient.AddMemberAsLeanerAndPromote(nodeName, etcdPeerAddress)
} else {
cluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress)
}
if err != nil {
return err
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/kubeadm/app/phases/upgrade/staticpods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ func (c fakeTLSEtcdClient) ListMembers() ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}

func (c fakeTLSEtcdClient) AddMemberAsLeanerAndPromote(name string, peerAddrs string) ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}

func (c fakeTLSEtcdClient) AddMember(name string, peerAddrs string) ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}
Expand Down Expand Up @@ -286,6 +290,10 @@ func (c fakePodManifestEtcdClient) ListMembers() ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}

func (c fakePodManifestEtcdClient) AddMemberAsLeanerAndPromote(name string, peerAddrs string) ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}

func (c fakePodManifestEtcdClient) AddMember(name string, peerAddrs string) ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}
Expand Down
53 changes: 51 additions & 2 deletions cmd/kubeadm/app/util/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type ClusterInterrogator interface {
Sync() error
ListMembers() ([]Member, error)
AddMember(name string, peerAddrs string) ([]Member, error)
AddMemberAsLeanerAndPromote(name string, peerAddrs string) ([]Member, error)
GetMemberID(peerURL string) (uint64, error)
RemoveMember(id uint64) ([]Member, error)
}
Expand Down Expand Up @@ -341,10 +342,20 @@ func (c *Client) RemoveMember(id uint64) ([]Member, error) {
return ret, nil
}

// AddMember notifies an existing etcd cluster that a new member is joining, and
// AddMember adds a new member into the etcd cluster
func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
return c.addMember(name, peerAddrs, false)
}

// AddMemberAsLeanerAndPromote adds a new learner member into the etcd cluster and promotes it to a voting member
func (c *Client) AddMemberAsLeanerAndPromote(name string, peerAddrs string) ([]Member, error) {
return c.addMember(name, peerAddrs, true)
}

// addMember notifies an existing etcd cluster that a new member is joining, and
// return the updated list of members. If the member has already been added to the
// cluster, this will return the existing list of etcd members.
func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Member, error) {
// Parse the peer address, required to add the client URL later to the list
// of endpoints for this client. Parsing as a first operation to make sure that
// if this fails no member addition is performed on the etcd cluster.
Expand All @@ -357,6 +368,7 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
var (
lastError error
respMembers []*etcdserverpb.Member
learnerID uint64
)
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Expand All @@ -376,6 +388,26 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
defer cancel()
var resp *clientv3.MemberAddResponse
if isLearner {
// if learnerID is set, it means the etcd member is already added successfully.
if learnerID == 0 {
klog.V(1).Infof("[etcd] Adding etcd member as learner: %016x", peerAddrs)
resp, err = cli.MemberAddAsLearner(ctx, []string{peerAddrs})
if err != nil {
lastError = err
return false, nil
}
learnerID = resp.Member.ID
}
err = memberPromote(ctx, cli, learnerID)
if err != nil {
lastError = err
return false, nil
}
respMembers = resp.Members
return true, nil
}

resp, err = cli.MemberAdd(ctx, []string{peerAddrs})
if err == nil {
respMembers = resp.Members
Expand Down Expand Up @@ -427,6 +459,23 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
return ret, nil
}

func memberPromote(ctx context.Context, cli *clientv3.Client, learnerID uint64) error {
klog.V(1).Infof("[etcd] Promoting a learner as a voting member: %016x", learnerID)
// TODO: warning logs from etcd client should be removed.
// The warning logs are printed by etcd client code for several reasons, including
// 1. can not promote yet(no synced)
// 2. context deadline exceeded
// 3. peer URLs already exists
// Once the client provides a way to check if the etcd learner is ready to promote, the retry logic can be revisited.
_, err := cli.MemberPromote(ctx, learnerID)
if err == nil {
klog.V(1).Infof("[etcd] The learner was promoted as a voting member: %016x", learnerID)
return nil
}
klog.V(5).Infof("[etcd] Promoting the learner %016x failed: %v", learnerID, err)
return err
}

// CheckClusterHealth returns nil for status Up or error for status Down
func (c *Client) CheckClusterHealth() error {
_, err := c.getClusterStatus()
Expand Down

0 comments on commit fdafd50

Please sign in to comment.