/
watch.go
198 lines (176 loc) · 4.96 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
// Package watch implements better watch semantics on top of etcd.
// See this issue for the reasoning behind the package:
// https://github.com/coreos/etcd/issues/7362
package watch
import (
"context"
etcd "github.com/coreos/etcd/clientv3"
"github.com/gogo/protobuf/proto"
)
// EventType is the type of event
type EventType int
const (
// EventPut happens when an item is added
EventPut EventType = iota
// EventDelete happens when an item is removed
EventDelete
// EventError happens when an error occurred
EventError
)
// Event is an event that occurred to an item in etcd.
type Event struct {
Key []byte
Value []byte
PrevKey []byte
PrevValue []byte
Type EventType
Rev int64
Err error
}
// Unmarshal unmarshals the item in an event into a protobuf message.
func (e *Event) Unmarshal(key *string, val proto.Unmarshaler) error {
*key = string(e.Key)
return val.Unmarshal(e.Value)
}
// UnmarshalPrev unmarshals the prev item in an event into a protobuf
// message.
func (e *Event) UnmarshalPrev(key *string, val proto.Unmarshaler) error {
*key = string(e.PrevKey)
return val.Unmarshal(e.PrevValue)
}
// Watcher ...
type Watcher interface {
// Watch returns a channel that delivers events
Watch() <-chan *Event
// Close this channel when you are done receiving events
Close()
}
type watcher struct {
eventCh chan *Event
done chan struct{}
}
func (w *watcher) Watch() <-chan *Event {
return w.eventCh
}
func (w *watcher) Close() {
close(w.done)
}
// sort the key-value pairs by revision time
type byModRev struct{ etcd.GetResponse }
func (s byModRev) Len() int {
return len(s.GetResponse.Kvs)
}
func (s byModRev) Swap(i, j int) {
s.GetResponse.Kvs[i], s.GetResponse.Kvs[j] = s.GetResponse.Kvs[j], s.GetResponse.Kvs[i]
}
func (s byModRev) Less(i, j int) bool {
return s.GetResponse.Kvs[i].ModRevision < s.GetResponse.Kvs[j].ModRevision
}
// NewWatcher watches a given etcd prefix for events.
func NewWatcher(ctx context.Context, client *etcd.Client, prefix string) (Watcher, error) {
return newWatcher(ctx, client, prefix, false)
}
// NewWatcherWithPrev is like NewWatcher, except that the returned events
// include the previous version of the values.
func NewWatcherWithPrev(ctx context.Context, client *etcd.Client, prefix string) (Watcher, error) {
return newWatcher(ctx, client, prefix, true)
}
func newWatcher(ctx context.Context, client *etcd.Client, prefix string, withPrev bool) (Watcher, error) {
eventCh := make(chan *Event)
done := make(chan struct{})
// Firstly we list the collection to get the current items
// Sort them by ascending order because that's how the items would have
// been returned if we watched them from the beginning.
resp, err := client.Get(ctx, prefix, etcd.WithPrefix(), etcd.WithSort(etcd.SortByModRevision, etcd.SortAscend))
if err != nil {
return nil, err
}
nextRevision := resp.Header.Revision + 1
etcdWatcher := etcd.NewWatcher(client)
// Now we issue a watch that uses the revision timestamp returned by the
// Get request earlier. That way even if some items are added between
// when we list the collection and when we start watching the collection,
// we won't miss any items.
options := []etcd.OpOption{etcd.WithPrefix(), etcd.WithRev(nextRevision)}
if withPrev {
options = append(options, etcd.WithPrevKV())
}
rch := etcdWatcher.Watch(ctx, prefix, options...)
go func() (retErr error) {
defer func() {
if retErr != nil {
select {
case eventCh <- &Event{
Err: retErr,
Type: EventError,
}:
case <-done:
}
}
close(eventCh)
etcdWatcher.Close()
}()
for _, etcdKv := range resp.Kvs {
eventCh <- &Event{
Key: etcdKv.Key,
Value: etcdKv.Value,
Type: EventPut,
Rev: etcdKv.ModRevision,
}
}
for {
var resp etcd.WatchResponse
var ok bool
select {
case resp, ok = <-rch:
case <-done:
return nil
}
if !ok {
if err := etcdWatcher.Close(); err != nil {
return err
}
etcdWatcher = etcd.NewWatcher(client)
rch = etcdWatcher.Watch(ctx, prefix, etcd.WithPrefix(), etcd.WithRev(nextRevision))
continue
}
if err := resp.Err(); err != nil {
return err
}
for _, etcdEv := range resp.Events {
ev := &Event{
Key: etcdEv.Kv.Key,
Value: etcdEv.Kv.Value,
Rev: etcdEv.Kv.ModRevision,
}
if etcdEv.PrevKv != nil {
ev.PrevKey = etcdEv.PrevKv.Key
ev.PrevValue = etcdEv.PrevKv.Value
}
if etcdEv.Type == etcd.EventTypePut {
ev.Type = EventPut
} else {
ev.Type = EventDelete
}
select {
case eventCh <- ev:
case <-done:
return nil
}
}
nextRevision = resp.Header.Revision + 1
}
}()
return &watcher{
eventCh: eventCh,
done: done,
}, nil
}
// MakeWatcher returns a Watcher that uses the given event channel and done
// channel internally to deliver events and signal closure, respectively.
func MakeWatcher(eventCh chan *Event, done chan struct{}) Watcher {
return &watcher{
eventCh: eventCh,
done: done,
}
}