/
endpointregistry.go
139 lines (110 loc) · 2.8 KB
/
endpointregistry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package routing
import (
"sync"
"time"
"github.com/zalando/skipper/eskip"
)
const defaultLastSeenTimeout = 1 * time.Minute
// Metrics describe the data about endpoint that could be
// used to perform better load balancing, fadeIn, etc.
type Metrics interface {
DetectedTime() time.Time
InflightRequests() int64
}
type entry struct {
detected time.Time
inflightRequests int64
}
var _ Metrics = &entry{}
func (e *entry) DetectedTime() time.Time {
return e.detected
}
func (e *entry) InflightRequests() int64 {
return e.inflightRequests
}
type EndpointRegistry struct {
lastSeen map[string]time.Time
lastSeenTimeout time.Duration
now func() time.Time
mu sync.Mutex
data map[string]*entry
}
var _ PostProcessor = &EndpointRegistry{}
type RegistryOptions struct {
LastSeenTimeout time.Duration
}
func (r *EndpointRegistry) Do(routes []*Route) []*Route {
now := r.now()
for _, route := range routes {
if route.BackendType == eskip.LBBackend {
for _, epi := range route.LBEndpoints {
metrics := r.GetMetrics(epi.Host)
if metrics.DetectedTime().IsZero() {
r.SetDetectedTime(epi.Host, now)
}
r.lastSeen[epi.Host] = now
}
}
}
for host, ts := range r.lastSeen {
if ts.Add(r.lastSeenTimeout).Before(now) {
r.mu.Lock()
if r.data[host].inflightRequests == 0 {
delete(r.lastSeen, host)
delete(r.data, host)
}
r.mu.Unlock()
}
}
return routes
}
func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry {
if o.LastSeenTimeout == 0 {
o.LastSeenTimeout = defaultLastSeenTimeout
}
return &EndpointRegistry{
data: map[string]*entry{},
lastSeen: map[string]time.Time{},
lastSeenTimeout: o.LastSeenTimeout,
now: time.Now,
}
}
func (r *EndpointRegistry) GetMetrics(key string) Metrics {
r.mu.Lock()
defer r.mu.Unlock()
e := r.getOrInitEntryLocked(key)
copy := &entry{}
*copy = *e
return copy
}
func (r *EndpointRegistry) SetDetectedTime(key string, detected time.Time) {
r.mu.Lock()
defer r.mu.Unlock()
e := r.getOrInitEntryLocked(key)
e.detected = detected
}
func (r *EndpointRegistry) IncInflightRequest(key string) {
r.mu.Lock()
defer r.mu.Unlock()
e := r.getOrInitEntryLocked(key)
e.inflightRequests++
}
func (r *EndpointRegistry) DecInflightRequest(key string) {
r.mu.Lock()
defer r.mu.Unlock()
e := r.getOrInitEntryLocked(key)
e.inflightRequests--
}
// getOrInitEntryLocked returns pointer to endpoint registry entry
// which contains the information about endpoint representing the
// following key. r.mu must be held while calling this function and
// using of the entry returned. In general, key represents the "host:port"
// string
func (r *EndpointRegistry) getOrInitEntryLocked(key string) *entry {
e, ok := r.data[key]
if !ok {
e = &entry{}
r.data[key] = e
}
return e
}