forked from coreos/fleet
/
event.go
85 lines (73 loc) · 2.61 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package registry
import (
"path"
"strings"
"time"
"github.com/coreos/fleet/third_party/github.com/coreos/go-etcd/etcd"
log "github.com/coreos/fleet/third_party/github.com/golang/glog"
"github.com/coreos/fleet/event"
)
type EventStream struct {
etcd *etcd.Client
close chan bool
}
func NewEventStream(client *etcd.Client) *EventStream {
return &EventStream{client, make(chan bool)}
}
func (self *EventStream) Stream(eventchan chan *event.Event) {
watchMap := map[string][]func(*etcd.Response) *event.Event{
path.Join(keyPrefix, jobPrefix): []func(*etcd.Response) *event.Event{filterEventJobCreated, filterEventJobScheduled, filterEventJobStopped},
path.Join(keyPrefix, machinePrefix): []func(*etcd.Response) *event.Event{self.filterEventMachineCreated, self.filterEventMachineRemoved},
path.Join(keyPrefix, offerPrefix): []func(*etcd.Response) *event.Event{self.filterEventJobOffered, filterEventJobBidSubmitted},
}
for key, funcs := range watchMap {
for _, f := range funcs {
etcdchan := make(chan *etcd.Response)
go watch(self.etcd, etcdchan, key, self.close)
go pipe(etcdchan, f, eventchan, self.close)
}
}
}
func (self *EventStream) Close() {
log.V(1).Info("Closing EventStream")
close(self.close)
}
func pipe(etcdchan chan *etcd.Response, translate func(resp *etcd.Response) *event.Event, eventchan chan *event.Event, closechan chan bool) {
for true {
select {
case <-closechan:
return
case resp := <-etcdchan:
log.V(2).Infof("Received response from etcd watcher: Action=%s ModifiedIndex=%d Key=%s", resp.Action, resp.Node.ModifiedIndex, resp.Node.Key)
ev := translate(resp)
if ev != nil {
log.V(2).Infof("Translated response(ModifiedIndex=%d) to event(Type=%s)", resp.Node.ModifiedIndex, ev.Type)
eventchan <- ev
} else {
log.V(2).Infof("Discarding response(ModifiedIndex=%d) from etcd watcher", resp.Node.ModifiedIndex)
}
}
}
}
func watch(client *etcd.Client, etcdchan chan *etcd.Response, key string, closechan chan bool) {
idx := uint64(0)
for true {
select {
case <-closechan:
log.V(2).Infof("Gracefully closing etcd watch loop: key=%s", key)
return
default:
log.V(2).Infof("Creating etcd watcher: key=%s, index=%d, machines=%s", key, idx, strings.Join(client.GetCluster(), ","))
resp, err := client.Watch(key, idx, true, nil, nil)
if err == nil {
idx = resp.Node.ModifiedIndex + 1
etcdchan <- resp
} else {
log.V(2).Infof("etcd watcher returned error: key=%s, err=\"%s\"", key, err.Error())
// Let's not slam the etcd server in the event that we know
// an unexpected error occurred.
time.Sleep(time.Second)
}
}
}
}