/
client.go
289 lines (259 loc) · 8.95 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package etcd
import (
"context"
"time"
"github.com/benbjohnson/clock"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
cerrors "github.com/tikv/migration/cdc/pkg/errors"
"github.com/tikv/migration/cdc/pkg/retry"
)
// etcd operation names
const (
EtcdPut = "Put"
EtcdGet = "Get"
EtcdTxn = "Txn"
EtcdDel = "Del"
EtcdGrant = "Grant"
EtcdRevoke = "Revoke"
)
const (
backoffBaseDelayInMs = 500
// in previous/backoff retry pkg, the DefaultMaxInterval = 60 * time.Second
backoffMaxDelayInMs = 60 * 1000
// If no msg comes from an etcd watchCh for etcdWatchChTimeoutDuration long,
// we should cancel the watchCh and request a new watchCh from etcd client
etcdWatchChTimeoutDuration = 10 * time.Second
// If no msg comes from an etcd watchCh for etcdRequestProgressDuration long,
// we should call RequestProgress of etcd client
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
etcdWatchChBufferSize = 16
// etcdClientTimeoutDuration represents the timeout duration for
// etcd client to execute a remote call
etcdClientTimeoutDuration = 30 * time.Second
)
// set to var instead of const for mocking the value to speedup test
var maxTries int64 = 8
// Client is a simple wrapper that adds retry to etcd RPC
type Client struct {
cli *clientv3.Client
metrics map[string]prometheus.Counter
// clock is for making it easier to mock time-related data structures in unit tests
clock clock.Clock
}
// Wrap warps a clientv3.Client that provides etcd APIs required by TiCDC.
func Wrap(cli *clientv3.Client, metrics map[string]prometheus.Counter) *Client {
return &Client{cli: cli, metrics: metrics, clock: clock.New()}
}
// Unwrap returns a clientv3.Client
func (c *Client) Unwrap() *clientv3.Client {
return c.cli
}
func retryRPC(rpcName string, metric prometheus.Counter, etcdRPC func() error) error {
// By default, PD etcd sets [3s, 6s) for election timeout.
// Some rpc could fail due to etcd errors, like "proposal dropped".
// Retry at least two election timeout to handle the case that two PDs restarted
// (the first election maybe failed).
// 16s = \sum_{n=0}^{6} 0.5*1.5^n
return retry.Do(context.Background(), func() error {
err := etcdRPC()
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("etcd RPC failed", zap.String("RPC", rpcName), zap.Error(err))
}
if metric != nil {
metric.Inc()
}
return err
}, retry.WithBackoffBaseDelay(backoffBaseDelayInMs),
retry.WithBackoffMaxDelay(backoffMaxDelayInMs),
retry.WithMaxTries(maxTries),
retry.WithIsRetryableErr(isRetryableError(rpcName)))
}
// Put delegates request to clientv3.KV.Put
func (c *Client) Put(
ctx context.Context, key, val string, opts ...clientv3.OpOption,
) (resp *clientv3.PutResponse, err error) {
putCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdPut, c.metrics[EtcdPut], func() error {
var inErr error
resp, inErr = c.cli.Put(putCtx, key, val, opts...)
return inErr
})
return
}
// Get delegates request to clientv3.KV.Get
func (c *Client) Get(
ctx context.Context, key string, opts ...clientv3.OpOption,
) (resp *clientv3.GetResponse, err error) {
getCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdGet, c.metrics[EtcdGet], func() error {
var inErr error
resp, inErr = c.cli.Get(getCtx, key, opts...)
return inErr
})
return
}
// Delete delegates request to clientv3.KV.Delete
func (c *Client) Delete(
ctx context.Context, key string, opts ...clientv3.OpOption,
) (resp *clientv3.DeleteResponse, err error) {
if metric, ok := c.metrics[EtcdTxn]; ok {
metric.Inc()
}
delCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
// We don't retry on delete operation. It's dangerous.
return c.cli.Delete(delCtx, key, opts...)
}
// Txn delegates request to clientv3.KV.Txn
func (c *Client) Txn(ctx context.Context) clientv3.Txn {
if metric, ok := c.metrics[EtcdTxn]; ok {
metric.Inc()
}
return c.cli.Txn(ctx)
}
// Grant delegates request to clientv3.Lease.Grant
func (c *Client) Grant(
ctx context.Context, ttl int64,
) (resp *clientv3.LeaseGrantResponse, err error) {
grantCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdGrant, c.metrics[EtcdGrant], func() error {
var inErr error
resp, inErr = c.cli.Grant(grantCtx, ttl)
return inErr
})
return
}
func isRetryableError(rpcName string) retry.IsRetryable {
return func(err error) bool {
if !cerrors.IsRetryableError(err) {
return false
}
if rpcName == EtcdRevoke {
if etcdErr, ok := err.(rpctypes.EtcdError); ok && etcdErr.Code() == codes.NotFound {
// it means the etcd lease is already expired or revoked
return false
}
}
return true
}
}
// Revoke delegates request to clientv3.Lease.Revoke
func (c *Client) Revoke(
ctx context.Context, id clientv3.LeaseID,
) (resp *clientv3.LeaseRevokeResponse, err error) {
revokeCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error {
var inErr error
resp, inErr = c.cli.Revoke(revokeCtx, id)
return inErr
})
return
}
// TimeToLive delegates request to clientv3.Lease.TimeToLive
func (c *Client) TimeToLive(
ctx context.Context, lease clientv3.LeaseID, opts ...clientv3.LeaseOption,
) (resp *clientv3.LeaseTimeToLiveResponse, err error) {
timeToLiveCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error {
var inErr error
resp, inErr = c.cli.TimeToLive(timeToLiveCtx, lease, opts...)
return inErr
})
return
}
// Watch delegates request to clientv3.Watcher.Watch
func (c *Client) Watch(
ctx context.Context, key string, opts ...clientv3.OpOption,
) clientv3.WatchChan {
watchCh := make(chan clientv3.WatchResponse, etcdWatchChBufferSize)
go c.WatchWithChan(ctx, watchCh, key, opts...)
return watchCh
}
// WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh
func (c *Client) WatchWithChan(
ctx context.Context, outCh chan<- clientv3.WatchResponse,
key string, opts ...clientv3.OpOption,
) {
defer func() {
close(outCh)
log.Info("WatchWithChan exited")
}()
// get initial revision from opts to avoid revision fall back
lastRevision := getRevisionFromWatchOpts(opts...)
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
watchCh := c.cli.Watch(watchCtx, key, opts...)
ticker := c.clock.Ticker(etcdRequestProgressDuration)
defer ticker.Stop()
lastReceivedResponseTime := c.clock.Now()
for {
select {
case <-ctx.Done():
cancel()
return
case response := <-watchCh:
lastReceivedResponseTime = c.clock.Now()
if response.Err() == nil && !response.IsProgressNotify() {
lastRevision = response.Header.Revision
}
Loop:
// we must loop here until the response is sent to outCh
// or otherwise the response will be lost
for {
select {
case <-ctx.Done():
cancel()
return
case outCh <- response: // it may block here
break Loop
case <-ticker.C:
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)))
}
}
}
ticker.Reset(etcdRequestProgressDuration)
case <-ticker.C:
if err := c.RequestProgress(ctx); err != nil {
log.Warn("failed to request progress for etcd watcher", zap.Error(err))
}
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
// cancel the last cancel func to reset it
log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack"))
cancel()
watchCtx, cancel = context.WithCancel(ctx)
watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision))
// we need to reset lastReceivedResponseTime after reset Watch
lastReceivedResponseTime = c.clock.Now()
}
}
}
}
// RequestProgress requests a progress notify response be sent in all watch channels.
func (c *Client) RequestProgress(ctx context.Context) error {
return c.cli.RequestProgress(ctx)
}