diff --git a/cmd/kubeadm/app/phases/etcd/local.go b/cmd/kubeadm/app/phases/etcd/local.go index a0dff2060ec9..f7ba0c644337 100644 --- a/cmd/kubeadm/app/phases/etcd/local.go +++ b/cmd/kubeadm/app/phases/etcd/local.go @@ -118,6 +118,10 @@ func RemoveStackedEtcdMemberFromCluster(client clientset.Interface, cfg *kubeadm klog.V(2).Infof("[etcd] get the member id from peer: %s", etcdPeerAddress) id, err := etcdClient.GetMemberID(etcdPeerAddress) if err != nil { + if errors.Is(etcdutil.ErrNoMemberIDForPeerURL, err) { + klog.V(5).Infof("[etcd] member was already removed, because no member id exists for peer %s", etcdPeerAddress) + return nil + } return err } diff --git a/cmd/kubeadm/app/util/etcd/etcd.go b/cmd/kubeadm/app/util/etcd/etcd.go index ed1e4ba5f3cd..b1c120a78b6a 100644 --- a/cmd/kubeadm/app/util/etcd/etcd.go +++ b/cmd/kubeadm/app/util/etcd/etcd.go @@ -53,6 +53,8 @@ var etcdBackoff = wait.Backoff{ Jitter: 0.1, } +var ErrNoMemberIDForPeerURL = errors.New("no member id found for peer URL") + // ClusterInterrogator is an interface to get etcd cluster related information type ClusterInterrogator interface { CheckClusterHealth() error @@ -64,27 +66,69 @@ type ClusterInterrogator interface { RemoveMember(id uint64) ([]Member, error) } +type etcdClient interface { + // Close shuts down the client's etcd connections. + Close() error + + // Endpoints lists the registered endpoints for the client. + Endpoints() []string + + // MemberList lists the current cluster membership. + MemberList(ctx context.Context) (*clientv3.MemberListResponse, error) + + // MemberAdd adds a new member into the cluster. + MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) + + // MemberAddAsLearner adds a new learner member into the cluster. + MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) + + // MemberRemove removes an existing member from the cluster. + MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) + + // MemberPromote promotes a member from raft learner (non-voting) to raft voting member. + MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error) + + // Status gets the status of the endpoint. + Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) + + // Sync synchronizes client's endpoints with the known endpoints from the etcd membership. + Sync(ctx context.Context) error +} + // Client provides connection parameters for an etcd cluster type Client struct { Endpoints []string - TLS *tls.Config + + newEtcdClient func(endpoints []string) (etcdClient, error) } // New creates a new EtcdCluster client func New(endpoints []string, ca, cert, key string) (*Client, error) { client := Client{Endpoints: endpoints} + var err error + var tlsConfig *tls.Config if ca != "" || cert != "" || key != "" { tlsInfo := transport.TLSInfo{ CertFile: cert, KeyFile: key, TrustedCAFile: ca, } - tlsConfig, err := tlsInfo.ClientConfig() + tlsConfig, err = tlsInfo.ClientConfig() if err != nil { return nil, err } - client.TLS = tlsConfig + } + + client.newEtcdClient = func(endpoints []string) (etcdClient, error) { + return clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: etcdTimeout, + DialOptions: []grpc.DialOption{ + grpc.WithBlock(), // block until the underlying connection is up + }, + TLS: tlsConfig, + }) } return &client, nil @@ -190,24 +234,16 @@ func getRawEtcdEndpointsFromPodAnnotationWithoutRetry(client clientset.Interface // Sync synchronizes client's endpoints with the known endpoints from the etcd membership. func (c *Client) Sync() error { // Syncs the list of endpoints - var cli *clientv3.Client + var cli etcdClient var lastError error err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { var err error - cli, err = clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: etcdTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - }, - TLS: c.TLS, - }) + cli, err = c.newEtcdClient(c.Endpoints) if err != nil { lastError = err return false, nil } defer cli.Close() - ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) err = cli.Sync(ctx) cancel() @@ -239,14 +275,7 @@ func (c *Client) listMembers() (*clientv3.MemberListResponse, error) { var lastError error var resp *clientv3.MemberListResponse err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: etcdTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - }, - TLS: c.TLS, - }) + cli, err := c.newEtcdClient(c.Endpoints) if err != nil { lastError = err return false, nil @@ -281,7 +310,7 @@ func (c *Client) GetMemberID(peerURL string) (uint64, error) { return member.GetID(), nil } } - return 0, nil + return 0, ErrNoMemberIDForPeerURL } // ListMembers returns the member list. @@ -304,14 +333,7 @@ func (c *Client) RemoveMember(id uint64) ([]Member, error) { var lastError error var resp *clientv3.MemberRemoveResponse err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: etcdTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - }, - TLS: c.TLS, - }) + cli, err := c.newEtcdClient(c.Endpoints) if err != nil { lastError = err return false, nil @@ -324,6 +346,10 @@ func (c *Client) RemoveMember(id uint64) ([]Member, error) { if err == nil { return true, nil } + if errors.Is(rpctypes.ErrMemberNotFound, err) { + klog.V(5).Infof("Member was already removed, because member %016x was not found", id) + return true, nil + } klog.V(5).Infof("Failed to remove etcd member: %v", err) lastError = err return false, nil @@ -353,20 +379,19 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) { return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs) } + cli, err := c.newEtcdClient(c.Endpoints) + if err != nil { + return nil, err + } + defer cli.Close() + // Adds a new member to the cluster var ( lastError error respMembers []*etcdserverpb.Member ) err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: etcdTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - }, - TLS: c.TLS, - }) + cli, err := c.newEtcdClient(c.Endpoints) if err != nil { lastError = err return false, nil @@ -441,14 +466,7 @@ func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error) var lastError error var resp *clientv3.StatusResponse err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: etcdTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - }, - TLS: c.TLS, - }) + cli, err := c.newEtcdClient(c.Endpoints) if err != nil { lastError = err return false, nil diff --git a/cmd/kubeadm/app/util/etcd/etcd_test.go b/cmd/kubeadm/app/util/etcd/etcd_test.go index d0aa67593b43..ac05118c0797 100644 --- a/cmd/kubeadm/app/util/etcd/etcd_test.go +++ b/cmd/kubeadm/app/util/etcd/etcd_test.go @@ -17,6 +17,7 @@ limitations under the License. package etcd import ( + "context" "fmt" "reflect" "strconv" @@ -24,6 +25,8 @@ import ( "github.com/pkg/errors" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + clientv3 "go.etcd.io/etcd/client/v3" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -35,9 +38,64 @@ import ( testresources "k8s.io/kubernetes/cmd/kubeadm/test/resources" ) +var errNotImplemented = errors.New("not implemented") + +type fakeEtcdClient struct { + members []*pb.Member + endpoints []string +} + +// Close shuts down the client's etcd connections. +func (f *fakeEtcdClient) Close() error { + f.members = []*pb.Member{} + return nil +} + +// Endpoints lists the registered endpoints for the client. +func (f *fakeEtcdClient) Endpoints() []string { + return f.endpoints +} + +// MemberList lists the current cluster membership. +func (f *fakeEtcdClient) MemberList(_ context.Context) (*clientv3.MemberListResponse, error) { + return &clientv3.MemberListResponse{ + Members: f.members, + }, nil +} + +// MemberAdd adds a new member into the cluster. +func (f *fakeEtcdClient) MemberAdd(_ context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) { + return nil, errNotImplemented +} + +// MemberAddAsLearner adds a new learner member into the cluster. +func (f *fakeEtcdClient) MemberAddAsLearner(_ context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) { + return nil, errNotImplemented +} + +// MemberRemove removes an existing member from the cluster. +func (f *fakeEtcdClient) MemberRemove(_ context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) { + return nil, errNotImplemented +} + +// MemberPromote promotes a member from raft learner (non-voting) to raft voting member. +func (f *fakeEtcdClient) MemberPromote(_ context.Context, id uint64) (*clientv3.MemberPromoteResponse, error) { + return nil, errNotImplemented +} + +// Status gets the status of the endpoint. +func (f *fakeEtcdClient) Status(_ context.Context, endpoint string) (*clientv3.StatusResponse, error) { + return nil, errNotImplemented +} + +// Sync synchronizes client's endpoints with the known endpoints from the etcd membership. +func (f *fakeEtcdClient) Sync(_ context.Context) error { + return errNotImplemented +} + func testGetURL(t *testing.T, getURLFunc func(*kubeadmapi.APIEndpoint) string, port int) { portStr := strconv.Itoa(port) - var tests = []struct { + tests := []struct { name string advertiseAddress string expectedURL string @@ -82,7 +140,7 @@ func TestGetPeerURL(t *testing.T) { func TestGetClientURLByIP(t *testing.T) { portStr := strconv.Itoa(constants.EtcdListenClientPort) - var tests = []struct { + tests := []struct { name string ip string expectedURL string @@ -118,7 +176,7 @@ func TestGetClientURLByIP(t *testing.T) { } func TestGetEtcdEndpointsWithBackoff(t *testing.T) { - var tests = []struct { + tests := []struct { name string pods []testresources.FakeStaticPod expectedEndpoints []string @@ -169,7 +227,7 @@ func TestGetEtcdEndpointsWithBackoff(t *testing.T) { } func TestGetRawEtcdEndpointsFromPodAnnotation(t *testing.T) { - var tests = []struct { + tests := []struct { name string pods []testresources.FakeStaticPod clientSetup func(*clientsetfake.Clientset) @@ -253,7 +311,7 @@ func TestGetRawEtcdEndpointsFromPodAnnotation(t *testing.T) { } func TestGetRawEtcdEndpointsFromPodAnnotationWithoutRetry(t *testing.T) { - var tests = []struct { + tests := []struct { name string pods []testresources.FakeStaticPod clientSetup func(*clientsetfake.Clientset) @@ -351,3 +409,88 @@ func TestGetRawEtcdEndpointsFromPodAnnotationWithoutRetry(t *testing.T) { }) } } + +func TestClient_GetMemberID(t *testing.T) { + type fields struct { + Endpoints []string + newEtcdClient func(endpoints []string) (etcdClient, error) + } + type args struct { + peerURL string + } + tests := []struct { + name string + fields fields + args args + want uint64 + wantErr error + }{ + { + name: "member ID found", + fields: fields{ + Endpoints: []string{}, + newEtcdClient: func(endpoints []string) (etcdClient, error) { + f := &fakeEtcdClient{ + members: []*pb.Member{ + { + ID: 1, + Name: "member1", + PeerURLs: []string{ + "https://member1:2380", + }, + }, + }, + } + return f, nil + }, + }, + args: args{ + peerURL: "https://member1:2380", + }, + wantErr: nil, + want: 1, + }, + { + name: "member ID not found", + fields: fields{ + Endpoints: []string{}, + newEtcdClient: func(endpoints []string) (etcdClient, error) { + f := &fakeEtcdClient{ + members: []*pb.Member{ + { + ID: 1, + Name: "member1", + PeerURLs: []string{ + "https://member1:2380", + }, + }, + }, + } + return f, nil + }, + }, + args: args{ + peerURL: "https://member2:2380", + }, + wantErr: ErrNoMemberIDForPeerURL, + want: 0, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &Client{ + Endpoints: tt.fields.Endpoints, + newEtcdClient: tt.fields.newEtcdClient, + } + + got, err := c.GetMemberID(tt.args.peerURL) + if !errors.Is(tt.wantErr, err) { + t.Errorf("Client.GetMemberID() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("Client.GetMemberID() = %v, want %v", got, tt.want) + } + }) + } +}