-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
watch.go
99 lines (85 loc) · 2.43 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package consultopo
import (
"flag"
"fmt"
"path"
"time"
"golang.org/x/net/context"
"github.com/hashicorp/consul/api"
"github.com/youtube/vitess/go/vt/topo"
)
var (
watchPollDuration = flag.Duration("topo_consul_watch_poll_duration", 30*time.Second, "time of the long poll for watch queries. Interrupting a watch may wait for up to that time.")
)
// Watch is part of the topo.Backend interface.
func (s *Server) Watch(ctx context.Context, cell, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) {
// Initial get.
c, err := s.clientForCell(ctx, cell)
if err != nil {
return &topo.WatchData{Err: fmt.Errorf("Watch cannot get cell: %v", err)}, nil, nil
}
nodePath := path.Join(c.root, filePath)
pair, _, err := c.kv.Get(nodePath, nil)
if err != nil {
return &topo.WatchData{Err: err}, nil, nil
}
if pair == nil {
// Node doesn't exist.
return &topo.WatchData{Err: topo.ErrNoNode}, nil, nil
}
// Initial value to return.
wd := &topo.WatchData{
Contents: pair.Value,
Version: ConsulVersion(pair.ModifyIndex),
}
// Create a context, will be used to cancel the watch.
watchCtx, watchCancel := context.WithCancel(context.Background())
// Create the notifications channel, send updates to it.
notifications := make(chan *topo.WatchData, 10)
go func() {
defer close(notifications)
for {
// Wait/poll until we get a new version.
// Get with a WaitIndex and WaitTime will return
// the current version at the end of WaitTime
// if it didn't change. So we just check for that
// and swallow the notifications when version matches.
waitIndex := pair.ModifyIndex
pair, _, err = c.kv.Get(nodePath, &api.QueryOptions{
WaitIndex: waitIndex,
WaitTime: *watchPollDuration,
})
if err != nil {
// Serious error.
notifications <- &topo.WatchData{
Err: err,
}
return
}
// If the node disappeared, pair is nil.
if pair == nil {
notifications <- &topo.WatchData{
Err: topo.ErrNoNode,
}
return
}
// If we got a new value, send it.
if pair.ModifyIndex != waitIndex {
notifications <- &topo.WatchData{
Contents: pair.Value,
Version: ConsulVersion(pair.ModifyIndex),
}
}
// See if the watch was canceled.
select {
case <-watchCtx.Done():
notifications <- &topo.WatchData{
Err: convertError(watchCtx.Err()),
}
return
default:
}
}
}()
return wd, notifications, topo.CancelFunc(watchCancel)
}