/
etcd_client.go
234 lines (207 loc) · 8.3 KB
/
etcd_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
/*
* Tencent is pleased to support the open source community by making TKEStack
* available.
*
* Copyright (C) 2012-2019 Tencent. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://opensource.org/licenses/Apache-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package options
import (
"context"
"fmt"
"sync/atomic"
"time"
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/namespace"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/value"
"tkestack.io/tke/pkg/util/log"
)
const (
flagETCDServers = "etcd-servers"
flagETCDPrefix = "etcd-prefix"
flagETCDKeyFile = "etcd-keyfile"
flagETCDCertFile = "etcd-certfile"
flagETCDCAFile = "etcd-cafile"
flagETCDCompactionInterval = "etcd-compaction-interval"
flagETCDCountMetricPollPeriod = "etcd-count-metric-poll-period"
)
const (
configETCDServers = "etcd.servers"
configETCDPrefix = "etcd.prefix"
configETCDKeyFile = "etcd.keyfile"
configETCDCertFile = "etcd.certfile"
configETCDCAFile = "etcd.cafile"
configETCDCompactionInterval = "etcd.compaction_interval"
configETCDCountMetricPollPeriod = "etcd.count_metric_poll_period"
)
// The short keepalive timeout and interval have been chosen to aggressively
// detect a failed etcd server without introducing much overhead.
const keepaliveTime = 30 * time.Second
const keepaliveTimeout = 10 * time.Second
// dialTimeout is the timeout for failing to establish a connection.
// It is set to 20 seconds as times shorter than that will cause TLS connections to fail
// on heavily loaded arm64 CPUs (issue #64649)
const dialTimeout = 20 * time.Second
// ETCDClientOptions contains the options that storage backend by etcd.
type ETCDClientOptions struct {
// Prefix is the prefix to all keys passed to storage.Interface methods.
Prefix string
// ServerList is the list of storage servers to connect with.
ServerList []string
// TLS credentials
KeyFile string
CertFile string
CAFile string
// Paging indicates whether the server implementation should allow paging (if it is
// supported). This is generally configured by feature gating, or by a specific
// resource type not wishing to allow paging, and is not intended for end users to
// set.
Paging bool
Codec runtime.Codec
// EncodeVersioner is the same groupVersioner used to build the
// storage encoder. Given a list of kinds the input object might belong
// to, the EncodeVersioner outputs the gvk the object will be
// converted to before persisted in etcd.
EncodeVersioner runtime.GroupVersioner
// Transformer allows the value to be transformed prior to persisting into etcd.
Transformer value.Transformer
// CompactionInterval is an interval of requesting compaction from apiserver.
// If the value is 0, no compaction will be issued.
CompactionInterval time.Duration
// CountMetricPollPeriod specifies how often should count metric be updated
CountMetricPollPeriod time.Duration
}
// NewETCDClientOptions creates a Options object with default parameters.
func NewETCDClientOptions(defaultETCDPathPrefix string) *ETCDClientOptions {
return &ETCDClientOptions{
Prefix: defaultETCDPathPrefix,
CompactionInterval: 5 * time.Minute,
}
}
// AddFlags adds flags for log to the specified FlagSet object.
func (o *ETCDClientOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringSlice(flagETCDServers, o.ServerList,
"List of etcd servers to connect with (scheme://ip:port), comma separated.")
_ = viper.BindPFlag(configETCDServers, fs.Lookup(flagETCDServers))
fs.String(flagETCDPrefix, o.Prefix,
"The prefix to prepend to all resource paths in etcd.")
_ = viper.BindPFlag(configETCDPrefix, fs.Lookup(flagETCDPrefix))
fs.String(flagETCDKeyFile, o.KeyFile,
"SSL key file used to secure etcd communication.")
_ = viper.BindPFlag(configETCDKeyFile, fs.Lookup(flagETCDKeyFile))
fs.String(flagETCDCertFile, o.CertFile,
"SSL certification file used to secure etcd communication.")
_ = viper.BindPFlag(configETCDCertFile, fs.Lookup(flagETCDCertFile))
fs.String(flagETCDCAFile, o.CAFile,
"SSL Certificate Authority file used to secure etcd communication.")
_ = viper.BindPFlag(configETCDCAFile, fs.Lookup(flagETCDCAFile))
fs.Duration(flagETCDCompactionInterval, o.CompactionInterval,
"The interval of compaction requests. If 0, the compaction request from apiserver is disabled.")
_ = viper.BindPFlag(configETCDCompactionInterval, fs.Lookup(flagETCDCompactionInterval))
fs.Duration(flagETCDCountMetricPollPeriod, o.CountMetricPollPeriod, ""+
"Frequency of polling etcd for number of resources per type. 0 disables the metric collection.")
_ = viper.BindPFlag(configETCDCountMetricPollPeriod, fs.Lookup(flagETCDCountMetricPollPeriod))
}
// ApplyFlags parsing parameters from the command line or configuration file
// to the options instance.
func (o *ETCDClientOptions) ApplyFlags() []error {
var errs []error
o.ServerList = viper.GetStringSlice(configETCDServers)
o.CAFile = viper.GetString(configETCDCAFile)
o.CertFile = viper.GetString(configETCDCertFile)
o.KeyFile = viper.GetString(configETCDKeyFile)
o.Prefix = viper.GetString(configETCDPrefix)
o.CompactionInterval = viper.GetDuration(configETCDCompactionInterval)
o.CountMetricPollPeriod = viper.GetDuration(configETCDCountMetricPollPeriod)
if len(o.ServerList) == 0 {
errs = append(errs, fmt.Errorf("--%s must be specified", flagETCDServers))
}
return errs
}
// NewClient creates the etcd v3 client object and returns it.
func (o *ETCDClientOptions) NewClient() (*clientv3.Client, error) {
tlsInfo := transport.TLSInfo{
CertFile: o.CertFile,
KeyFile: o.KeyFile,
TrustedCAFile: o.CAFile,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
// NOTE: Client relies on nil tlsConfig
// for non-secure connections, update the implicit variable
if len(o.CertFile) == 0 && len(o.KeyFile) == 0 && len(o.CAFile) == 0 {
tlsConfig = nil
}
cfg := clientv3.Config{
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
},
Endpoints: o.ServerList,
TLS: tlsConfig,
}
c, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
if o.Prefix != "" {
c.KV = namespace.NewKV(c.KV, o.Prefix)
}
return c, nil
}
// NewHealthCheck creates the health check callback by given backend config.
func (o *ETCDClientOptions) NewHealthCheck() (func() error, error) {
// constructing the etcd v3 client blocks and times out if etcd is not available.
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
clientValue := &atomic.Value{}
clientErrMsg := &atomic.Value{}
clientErrMsg.Store("etcd client connection not yet established")
go func() {
if err := wait.PollUntil(time.Second, func() (bool, error) {
client, err := o.NewClient()
if err != nil {
clientErrMsg.Store(err.Error())
return false, nil
}
clientValue.Store(client)
clientErrMsg.Store("")
return true, nil
}, wait.NeverStop); err != nil {
log.Error("Failed to wait poll until", log.Err(err))
}
}()
return func() error {
if errMsg := clientErrMsg.Load().(string); len(errMsg) > 0 {
return fmt.Errorf(errMsg)
}
client := clientValue.Load().(*clientv3.Client)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if _, err := client.Cluster.MemberList(ctx); err != nil {
return fmt.Errorf("error listing etcd members: %v", err)
}
return nil
}, nil
}