-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
watcher.go
124 lines (109 loc) · 2.53 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package router
import (
"errors"
"sync"
"time"
)
var (
// ErrWatcherStopped is returned when routing table watcher has been stopped
ErrWatcherStopped = errors.New("watcher stopped")
)
// EventType defines routing table event
type EventType int
const (
// Create is emitted when a new route has been created
Create EventType = iota
// Delete is emitted when an existing route has been deleted
Delete
// Update is emitted when an existing route has been updated
Update
)
// String returns human readable event type
func (t EventType) String() string {
switch t {
case Create:
return "create"
case Delete:
return "delete"
case Update:
return "update"
default:
return "unknown"
}
}
// Event is returned by a call to Next on the watcher.
type Event struct {
// Unique id of the event
Id string
// Type defines type of event
Type EventType
// Timestamp is event timestamp
Timestamp time.Time
// Route is table route
Route Route
}
// Watcher defines routing table watcher interface
// Watcher returns updates to the routing table
type Watcher interface {
// Next is a blocking call that returns watch result
Next() (*Event, error)
// Chan returns event channel
Chan() (<-chan *Event, error)
// Stop stops watcher
Stop()
}
// WatchOption is used to define what routes to watch in the table
type WatchOption func(*WatchOptions)
// WatchOptions are table watcher options
// TODO: expand the options to watch based on other criteria
type WatchOptions struct {
// Service allows to watch specific service routes
Service string
}
// WatchService sets what service routes to watch
// Service is the microservice name
func WatchService(s string) WatchOption {
return func(o *WatchOptions) {
o.Service = s
}
}
// tableWatcher implements routing table Watcher
type tableWatcher struct {
sync.RWMutex
id string
opts WatchOptions
resChan chan *Event
done chan struct{}
}
// Next returns the next noticed action taken on table
// TODO: right now we only allow to watch particular service
func (w *tableWatcher) Next() (*Event, error) {
for {
select {
case res := <-w.resChan:
switch w.opts.Service {
case res.Route.Service, "*":
return res, nil
default:
continue
}
case <-w.done:
return nil, ErrWatcherStopped
}
}
}
// Chan returns watcher events channel
func (w *tableWatcher) Chan() (<-chan *Event, error) {
return w.resChan, nil
}
// Stop stops routing table watcher
func (w *tableWatcher) Stop() {
w.Lock()
defer w.Unlock()
select {
case <-w.done:
return
default:
close(w.done)
}
}