forked from asonawalla/gazette
/
retry_watcher.go
71 lines (60 loc) · 1.93 KB
/
retry_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package consensus
import (
"time"
etcd "github.com/coreos/etcd/client"
"golang.org/x/net/context"
)
// RetryWatcher composes Get() and Watch() of etcd.KeysAPI to provide a
// etcd.Watcher implementation with builtin retry for watch errors.
// RetryWatcher differs from KeysAPI.Watcher() in two key ways:
// * WatcherOptions.AfterIndex is ignored. Instead, RetryWatcher() performs
// its own Get() on the first call to Next().
// * Watch-related errors will be silently retried via a Get(), which is both
// passed through and also used to re-establish watch consistency.
// Callers must be able to handle a periodic "get" response.
func RetryWatcher(keysAPI etcd.KeysAPI, key string, getOpts *etcd.GetOptions,
watcherOpts *etcd.WatcherOptions, refreshTicker <-chan time.Time) etcd.Watcher {
return &retryWatcher{
keysAPI: keysAPI,
key: key,
getOpts: getOpts,
watchOpts: watcherOpts,
refreshTicker: refreshTicker,
}
}
type retryWatcher struct {
keysAPI etcd.KeysAPI
key string
getOpts *etcd.GetOptions
watchOpts *etcd.WatcherOptions
refreshTicker <-chan time.Time
cur etcd.Watcher
}
func (w *retryWatcher) Next(ctx context.Context) (*etcd.Response, error) {
// Periodically force a full tree refresh.
select {
case <-w.refreshTicker:
w.cur = nil
default:
}
if w.cur != nil {
r, err := w.cur.Next(ctx)
if etcdErr, ok := err.(etcd.Error); ok {
// If the error code indicates that further Next() attempts will fail,
// clear the current etcd.Watcher to force a full tree refresh.
if etcdErr.Code == etcd.ErrorCodeEventIndexCleared ||
etcdErr.Code == etcd.ErrorCodeWatcherCleared {
w.cur = nil
}
}
return r, err
}
// No current Watcher. Perform a full tree refresh.
r, err := w.keysAPI.Get(ctx, w.key, w.getOpts)
if err == nil {
var opts = *w.watchOpts // Clone & update.
opts.AfterIndex = r.Index
w.cur = w.keysAPI.Watcher(w.key, &opts)
}
return r, err
}