This repository has been archived by the owner on Oct 16, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
eds_poller.go
118 lines (104 loc) · 3.39 KB
/
eds_poller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package emissary
import (
"context"
"sync"
"time"
"github.com/apex/log"
"github.com/segmentio/stats"
)
type edsPoller struct {
subscriptions map[string]map[resultHandler]struct{} // Service to query in consul and set of interested handlers
resolver Resolver // our resolver
ticker *time.Ticker // Ticker for when to poll consul, useful to mock for testing
mutex sync.RWMutex // RW local for mutating our subscriptions
}
func newEdsPoller(resolver Resolver, ticker *time.Ticker) *edsPoller {
return &edsPoller{
subscriptions: make(map[string]map[resultHandler]struct{}),
resolver: resolver,
ticker: ticker,
}
}
func (c *edsPoller) addSubscription(service string, handler resultHandler) {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.subscriptions[service] == nil {
c.subscriptions[service] = make(map[resultHandler]struct{})
}
c.subscriptions[service][handler] = struct{}{}
}
func (c *edsPoller) removeSubscription(service string, handler resultHandler) {
c.mutex.Lock()
defer c.mutex.Unlock()
handlers := c.subscriptions[service]
delete(handlers, handler)
if len(handlers) == 0 {
delete(c.subscriptions, service)
}
}
func (c *edsPoller) removeHandler(handler resultHandler) {
c.mutex.Lock()
defer c.mutex.Unlock()
for s, handlers := range c.subscriptions {
delete(handlers, handler)
if len(handlers) == 0 {
delete(c.subscriptions, s)
}
}
}
// Primarily here for testing
func (c *edsPoller) get(service string) []resultHandler {
c.mutex.RLock()
defer c.mutex.RUnlock()
handlerMap := c.subscriptions[service]
// Create a copy and return
handlers := make([]resultHandler, 0, len(handlerMap))
for k := range handlerMap {
handlers = append(handlers, k)
}
return handlers
}
// This is our main consul polling loop. On each tick we acquire a read lock on the subscriptions map and
// query the resolver to find the healthy endpoints for each service. We then create a slice of the
// handlers interested in this service and pass them the results.
func (c *edsPoller) pulse(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
log.Info("received done in edsPoller, exiting pulse")
return
case <-c.ticker.C:
// Make a copy of the services in the subscription map
c.mutex.RLock()
services := make([]string, 0, len(c.subscriptions))
for service := range c.subscriptions {
services = append(services, service)
}
c.mutex.RUnlock()
// Fetch endpoints from consul for all services we're interested in.
for _, service := range services {
handlers := c.get(service)
stats.Set("eds-poller.subscriptions.count", len(handlers), stats.Tag{Name: "service", Value: service})
endpoints, err := c.resolver.Lookup(ctx, service)
if err != nil {
stats.Incr("resolver.error", stats.Tag{Name: "service", Value: service})
log.Infof("error querying resolver %s", err)
}
log.Infof("resolver found %d endpoint(s)", len(endpoints))
wg := &sync.WaitGroup{}
wg.Add(len(handlers))
// send the results to each of the handlers
for _, h := range handlers {
go func(h resultHandler) {
defer wg.Done()
h.handle(EdsResult{Service: service, Endpoints: endpoints})
}(h)
}
log.Infof("waiting for consulResultHandlers on %s", service)
wg.Wait()
}
}
}
}()
}