-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
etcd_client_generator.go
159 lines (135 loc) · 5.5 KB
/
etcd_client_generator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"context"
"crypto/tls"
"time"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/rest"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/proxy"
)
// EtcdClientGenerator generates etcd clients that connect to specific etcd members on particular control plane nodes.
type EtcdClientGenerator struct {
restConfig *rest.Config
tlsConfig *tls.Config
createClient clientCreator
}
type clientCreator func(ctx context.Context, endpoints []string) (*etcd.Client, error)
var errEtcdNodeConnection = errors.New("failed to connect to etcd node")
// NewEtcdClientGenerator returns a new etcdClientGenerator instance.
func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config, etcdDialTimeout time.Duration) *EtcdClientGenerator {
ecg := &EtcdClientGenerator{restConfig: restConfig, tlsConfig: tlsConfig}
ecg.createClient = func(ctx context.Context, endpoints []string) (*etcd.Client, error) {
p := proxy.Proxy{
Kind: "pods",
Namespace: metav1.NamespaceSystem,
KubeConfig: ecg.restConfig,
Port: 2379,
}
return etcd.NewClient(ctx, etcd.ClientConfiguration{
Endpoints: endpoints,
Proxy: p,
TLSConfig: tlsConfig,
DialTimeout: etcdDialTimeout,
})
}
return ecg
}
// forFirstAvailableNode takes a list of nodes and returns a client for the first one that connects.
func (c *EtcdClientGenerator) forFirstAvailableNode(ctx context.Context, nodeNames []string) (*etcd.Client, error) {
// This is an additional safeguard for avoiding this func to return nil, nil.
if len(nodeNames) == 0 {
return nil, errors.New("invalid argument: forLeader can't be called with an empty list of nodes")
}
// Loop through the existing control plane nodes.
var errs []error
for _, name := range nodeNames {
endpoints := []string{staticPodName("etcd", name)}
client, err := c.createClient(ctx, endpoints)
if err != nil {
errs = append(errs, err)
continue
}
return client, nil
}
return nil, errors.Wrap(kerrors.NewAggregate(errs), "could not establish a connection to any etcd node")
}
// forLeader takes a list of nodes and returns a client to the leader node.
func (c *EtcdClientGenerator) forLeader(ctx context.Context, nodeNames []string) (*etcd.Client, error) {
// This is an additional safeguard for avoiding this func to return nil, nil.
if len(nodeNames) == 0 {
return nil, errors.New("invalid argument: forLeader can't be called with an empty list of nodes")
}
nodes := sets.NewString()
for _, n := range nodeNames {
nodes.Insert(n)
}
// Loop through the existing control plane nodes.
var errs []error
for _, nodeName := range nodeNames {
cl, err := c.getLeaderClient(ctx, nodeName, nodes)
if err != nil {
if errors.Is(err, errEtcdNodeConnection) {
errs = append(errs, err)
continue
}
return nil, err
}
return cl, nil
}
return nil, errors.Wrap(kerrors.NewAggregate(errs), "could not establish a connection to the etcd leader")
}
// getLeaderClient provides an etcd client connected to the leader. It returns an
// errEtcdNodeConnection if there was a connection problem with the given etcd
// node, which should be considered non-fatal by the caller.
func (c *EtcdClientGenerator) getLeaderClient(ctx context.Context, nodeName string, allNodes sets.String) (*etcd.Client, error) {
// Get a temporary client to the etcd instance hosted on the node.
client, err := c.forFirstAvailableNode(ctx, []string{nodeName})
if err != nil {
return nil, kerrors.NewAggregate([]error{err, errEtcdNodeConnection})
}
defer client.Close()
// Get the list of members.
members, err := client.Members(ctx)
if err != nil {
return nil, kerrors.NewAggregate([]error{err, errEtcdNodeConnection})
}
// Get the leader member.
var leaderMember *etcd.Member
for _, member := range members {
if member.ID == client.LeaderID {
leaderMember = member
break
}
}
// If we found the leader, and it is one of the nodes,
// get a connection to the etcd leader via the node hosting it.
if leaderMember != nil {
if !allNodes.Has(leaderMember.Name) {
return nil, errors.Errorf("etcd leader is reported as %x with name %q, but we couldn't find a corresponding Node in the cluster", leaderMember.ID, leaderMember.Name)
}
client, err = c.forFirstAvailableNode(ctx, []string{leaderMember.Name})
return client, err
}
// If it is not possible to get a connection to the leader via existing nodes,
// it means that the control plane is an invalid state, with an etcd member - the current leader -
// without a corresponding node.
// TODO: In future we can eventually try to automatically remediate this condition by moving the leader
// to another member with a corresponding node.
return nil, errors.Errorf("etcd leader is reported as %x, but we couldn't find any matching member", client.LeaderID)
}