Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #92118: kubeadm: don't re-add an etcd member if it already exists for #92235

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 25 additions & 5 deletions cmd/kubeadm/app/phases/etcd/local.go
Expand Up @@ -130,16 +130,36 @@ func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifest
return err
}

// notifies the other members of the etcd cluster about the joining member
etcdPeerAddress := etcdutil.GetPeerURL(endpoint)

klog.V(1).Infof("Adding etcd member: %s", etcdPeerAddress)
initialCluster, err := etcdClient.AddMember(nodeName, etcdPeerAddress)
klog.V(1).Infoln("[etcd] Getting the list of existing members")
initialCluster, err := etcdClient.ListMembers()
if err != nil {
return err
}
fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster")
klog.V(1).Infof("Updated etcd member list: %v", initialCluster)

// only add the new member if it doesn't already exists
var exists bool
klog.V(1).Infof("[etcd] Checking if the etcd member already exists: %s", etcdPeerAddress)
for _, member := range initialCluster {
if member.PeerURL == etcdPeerAddress {
exists = true
break
}
}

if exists {
klog.V(1).Infof("[etcd] Etcd member already exists: %s", etcdPeerAddress)
} else {
klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress)
initialCluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress)
if err != nil {
return err
}

fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster")
klog.V(1).Infof("Updated etcd member list: %v", initialCluster)
}

fmt.Printf("[etcd] Creating static Pod manifest for %q\n", kubeadmconstants.Etcd)

Expand Down
4 changes: 4 additions & 0 deletions cmd/kubeadm/app/phases/upgrade/compute_test.go
Expand Up @@ -121,6 +121,10 @@ func (f fakeEtcdClient) RemoveMember(id uint64) ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}

func (f fakeEtcdClient) ListMembers() ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}

func getEtcdVersion(v *versionutil.Version) string {
return constants.SupportedEtcdVersion[uint8(v.Minor())]
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/kubeadm/app/phases/upgrade/staticpods_test.go
Expand Up @@ -268,6 +268,10 @@ func (c fakeTLSEtcdClient) RemoveMember(id uint64) ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}

func (c fakeTLSEtcdClient) ListMembers() ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}

type fakePodManifestEtcdClient struct{ ManifestDir, CertificatesDir string }

func (c fakePodManifestEtcdClient) WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error) {
Expand Down Expand Up @@ -309,6 +313,10 @@ func (c fakePodManifestEtcdClient) RemoveMember(id uint64) ([]etcdutil.Member, e
return []etcdutil.Member{}, nil
}

func (c fakePodManifestEtcdClient) ListMembers() ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}

func TestStaticPodControlPlane(t *testing.T) {
tests := []struct {
description string
Expand Down
41 changes: 32 additions & 9 deletions cmd/kubeadm/app/util/etcd/etcd.go
Expand Up @@ -55,6 +55,7 @@ type ClusterInterrogator interface {
GetVersion() (string, error)
WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error)
Sync() error
ListMembers() ([]Member, error)
AddMember(name string, peerAddrs string) ([]Member, error)
GetMemberID(peerURL string) (uint64, error)
RemoveMember(id uint64) ([]Member, error)
Expand Down Expand Up @@ -176,8 +177,36 @@ type Member struct {
PeerURL string
}

// ListMembers returns the member list.
func (c *Client) ListMembers() ([]Member, error) {
resp, err := c.listMembers()
if err != nil {
return nil, err
}

ret := make([]Member, 0, len(resp.Members))
for _, m := range resp.Members {
ret = append(ret, Member{Name: m.Name, PeerURL: m.PeerURLs[0]})
}
return ret, nil
}

// GetMemberID returns the member ID of the given peer URL
func (c *Client) GetMemberID(peerURL string) (uint64, error) {
resp, err := c.listMembers()
if err != nil {
return 0, err
}

for _, member := range resp.Members {
if member.GetPeerURLs()[0] == peerURL {
return member.GetID(), nil
}
}
return 0, nil
}

func (c *Client) listMembers() (*clientv3.MemberListResponse, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: dialTimeout,
Expand All @@ -187,7 +216,7 @@ func (c *Client) GetMemberID(peerURL string) (uint64, error) {
TLS: c.TLS,
})
if err != nil {
return 0, err
return nil, err
}
defer cli.Close()

Expand All @@ -206,15 +235,9 @@ func (c *Client) GetMemberID(peerURL string) (uint64, error) {
return false, nil
})
if err != nil {
return 0, lastError
}

for _, member := range resp.Members {
if member.GetPeerURLs()[0] == peerURL {
return member.GetID(), nil
}
return nil, lastError
}
return 0, nil
return resp, nil
}

// RemoveMember notifies an etcd cluster to remove an existing member
Expand Down