Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #117792: kubeadm: Use internal etcd client through an interface #117724: kubeadm: Make etcd member removal idempotent #118057

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/kubeadm/app/phases/etcd/local.go
Expand Up @@ -118,6 +118,10 @@ func RemoveStackedEtcdMemberFromCluster(client clientset.Interface, cfg *kubeadm
klog.V(2).Infof("[etcd] get the member id from peer: %s", etcdPeerAddress)
id, err := etcdClient.GetMemberID(etcdPeerAddress)
if err != nil {
if errors.Is(etcdutil.ErrNoMemberIDForPeerURL, err) {
klog.V(5).Infof("[etcd] member was already removed, because no member id exists for peer %s", etcdPeerAddress)
return nil
}
return err
}

Expand Down
104 changes: 58 additions & 46 deletions cmd/kubeadm/app/util/etcd/etcd.go
Expand Up @@ -53,6 +53,8 @@ var etcdBackoff = wait.Backoff{
Jitter: 0.1,
}

var ErrNoMemberIDForPeerURL = errors.New("no member id found for peer URL")

// ClusterInterrogator is an interface to get etcd cluster related information
type ClusterInterrogator interface {
CheckClusterHealth() error
Expand All @@ -64,27 +66,69 @@ type ClusterInterrogator interface {
RemoveMember(id uint64) ([]Member, error)
}

type etcdClient interface {
// Close shuts down the client's etcd connections.
Close() error

// Endpoints lists the registered endpoints for the client.
Endpoints() []string

// MemberList lists the current cluster membership.
MemberList(ctx context.Context) (*clientv3.MemberListResponse, error)

// MemberAdd adds a new member into the cluster.
MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error)

// MemberAddAsLearner adds a new learner member into the cluster.
MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error)

// MemberRemove removes an existing member from the cluster.
MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error)

// MemberPromote promotes a member from raft learner (non-voting) to raft voting member.
MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error)

// Status gets the status of the endpoint.
Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error)

// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
Sync(ctx context.Context) error
}

// Client provides connection parameters for an etcd cluster
type Client struct {
Endpoints []string
TLS *tls.Config

newEtcdClient func(endpoints []string) (etcdClient, error)
}

// New creates a new EtcdCluster client
func New(endpoints []string, ca, cert, key string) (*Client, error) {
client := Client{Endpoints: endpoints}

var err error
var tlsConfig *tls.Config
if ca != "" || cert != "" || key != "" {
tlsInfo := transport.TLSInfo{
CertFile: cert,
KeyFile: key,
TrustedCAFile: ca,
}
tlsConfig, err := tlsInfo.ClientConfig()
tlsConfig, err = tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
client.TLS = tlsConfig
}

client.newEtcdClient = func(endpoints []string) (etcdClient, error) {
return clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: tlsConfig,
})
}

return &client, nil
Expand Down Expand Up @@ -190,24 +234,16 @@ func getRawEtcdEndpointsFromPodAnnotationWithoutRetry(client clientset.Interface
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
func (c *Client) Sync() error {
// Syncs the list of endpoints
var cli *clientv3.Client
var cli etcdClient
var lastError error
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
var err error
cli, err = clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
cli, err = c.newEtcdClient(c.Endpoints)
if err != nil {
lastError = err
return false, nil
}
defer cli.Close()

ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
err = cli.Sync(ctx)
cancel()
Expand Down Expand Up @@ -239,14 +275,7 @@ func (c *Client) listMembers() (*clientv3.MemberListResponse, error) {
var lastError error
var resp *clientv3.MemberListResponse
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
cli, err := c.newEtcdClient(c.Endpoints)
if err != nil {
lastError = err
return false, nil
Expand Down Expand Up @@ -281,7 +310,7 @@ func (c *Client) GetMemberID(peerURL string) (uint64, error) {
return member.GetID(), nil
}
}
return 0, nil
return 0, ErrNoMemberIDForPeerURL
}

// ListMembers returns the member list.
Expand All @@ -304,14 +333,7 @@ func (c *Client) RemoveMember(id uint64) ([]Member, error) {
var lastError error
var resp *clientv3.MemberRemoveResponse
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
cli, err := c.newEtcdClient(c.Endpoints)
if err != nil {
lastError = err
return false, nil
Expand All @@ -324,6 +346,10 @@ func (c *Client) RemoveMember(id uint64) ([]Member, error) {
if err == nil {
return true, nil
}
if errors.Is(rpctypes.ErrMemberNotFound, err) {
klog.V(5).Infof("Member was already removed, because member %016x was not found", id)
return true, nil
}
klog.V(5).Infof("Failed to remove etcd member: %v", err)
lastError = err
return false, nil
Expand Down Expand Up @@ -359,14 +385,7 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
respMembers []*etcdserverpb.Member
)
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
cli, err := c.newEtcdClient(c.Endpoints)
if err != nil {
lastError = err
return false, nil
Expand Down Expand Up @@ -441,14 +460,7 @@ func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error)
var lastError error
var resp *clientv3.StatusResponse
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
cli, err := c.newEtcdClient(c.Endpoints)
if err != nil {
lastError = err
return false, nil
Expand Down