This repository has been archived by the owner on May 4, 2022. It is now read-only.
forked from atlassian/gostatsd
/
cloud_handler.go
352 lines (320 loc) · 10.4 KB
/
cloud_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
package statsd
import (
"context"
"sync"
"sync/atomic"
"time"
cloudTypes "github.com/atlassian/gostatsd/cloudprovider/types"
"github.com/atlassian/gostatsd/types"
log "github.com/Sirupsen/logrus"
"golang.org/x/time/rate"
)
const (
lookupChannelSize = 1024 // Random size. Should be good enough.
// DefaultCacheRefreshPeriod is the default cache refresh period.
DefaultCacheRefreshPeriod = 1 * time.Minute
// DefaultCacheEvictAfterIdlePeriod is the default idle cache eviction period.
DefaultCacheEvictAfterIdlePeriod = 10 * time.Minute
// DefaultCacheTTL is the default cache TTL for successful lookups.
DefaultCacheTTL = 30 * time.Minute
// DefaultCacheNegativeTTL is the default cache TTL for failed lookups (errors or when instance was not found).
DefaultCacheNegativeTTL = 1 * time.Minute
)
type lookupResult struct {
ip types.IP
instance *cloudTypes.Instance // Can be nil if lookup failed
}
type instanceHolder struct {
lastAccessNano int64
expires time.Time // When this record expires.
instance *cloudTypes.Instance // Can be nil if the lookup resulted in an error (instance not found/etc)
}
func (ih *instanceHolder) updateAccess() {
atomic.StoreInt64(&ih.lastAccessNano, time.Now().UnixNano())
}
func (ih *instanceHolder) lastAccess() int64 {
return atomic.LoadInt64(&ih.lastAccessNano)
}
// CacheOptions holds cache behaviour configuration.
type CacheOptions struct {
CacheRefreshPeriod time.Duration
CacheEvictAfterIdlePeriod time.Duration
CacheTTL time.Duration
CacheNegativeTTL time.Duration
}
// cloudHandler enriches metrics and events with additional information fetched from cloud provider.
type cloudHandler struct {
cacheOpts CacheOptions
cloud cloudTypes.Interface // Cloud provider interface
next Handler
limiter *rate.Limiter
metricSource chan *types.Metric
eventSource chan *types.Event
wg sync.WaitGroup
rw sync.RWMutex // Protects cache
cache map[types.IP]*instanceHolder
}
// NewCloudHandler initialises a new cloud handler.
// If cacheOptions is nil default cache configuration is used.
func NewCloudHandler(cloud cloudTypes.Interface, next Handler, limiter *rate.Limiter, cacheOptions *CacheOptions) *cloudHandler {
if cacheOptions == nil {
cacheOptions = &CacheOptions{
CacheRefreshPeriod: DefaultCacheRefreshPeriod,
CacheEvictAfterIdlePeriod: DefaultCacheEvictAfterIdlePeriod,
CacheTTL: DefaultCacheTTL,
CacheNegativeTTL: DefaultCacheNegativeTTL,
}
}
return &cloudHandler{
cacheOpts: *cacheOptions,
cloud: cloud,
next: next,
limiter: limiter,
metricSource: make(chan *types.Metric),
eventSource: make(chan *types.Event),
cache: make(map[types.IP]*instanceHolder),
}
}
func (ch *cloudHandler) DispatchMetric(ctx context.Context, m *types.Metric) error {
if ch.updateTagsAndHostname(m.SourceIP, &m.Tags, &m.Hostname) {
return ch.next.DispatchMetric(ctx, m)
}
select {
case <-ctx.Done():
return ctx.Err()
case ch.metricSource <- m:
return nil
}
}
func (ch *cloudHandler) DispatchEvent(ctx context.Context, e *types.Event) error {
if ch.updateTagsAndHostname(e.SourceIP, &e.Tags, &e.Hostname) {
return ch.next.DispatchEvent(ctx, e)
}
ch.wg.Add(1) // Increment before sending to the channel
select {
case <-ctx.Done():
ch.wg.Done()
return ctx.Err()
case ch.eventSource <- e:
return nil
}
}
// WaitForEvents waits for all event-dispatching goroutines to finish.
func (ch *cloudHandler) WaitForEvents() {
ch.wg.Wait()
ch.next.WaitForEvents()
}
func (ch *cloudHandler) Run(ctx context.Context) error {
toLookup := make(chan types.IP, lookupChannelSize) // IPs to lookup
lookupResults := make(chan *lookupResult)
awaitingEvents := make(map[types.IP][]*types.Event)
awaitingMetrics := make(map[types.IP][]*types.Metric)
var wg sync.WaitGroup
defer wg.Wait() // Wait for lookupDispatcher to stop
defer close(toLookup) // Tell lookupDispatcher to stop
wg.Add(1)
go ch.lookupDispatcher(ctx, &wg, toLookup, lookupResults)
refreshTicker := time.NewTicker(ch.cacheOpts.CacheRefreshPeriod)
defer refreshTicker.Stop()
// No locking for ch.cache READ access required - this goroutine owns the object and only it mutates it.
// So reads from the same goroutine are always safe (no concurrent mutations).
// When we mutate the cache, we hold the exclusive (write) lock to avoid concurrent reads.
// When we read from the cache from other goroutines, we obtain the read lock.
for {
select {
case <-ctx.Done():
return ctx.Err()
case lr := <-lookupResults:
ch.handleLookupResult(ctx, lr, awaitingMetrics, awaitingEvents)
case t := <-refreshTicker.C:
ch.doRefresh(ctx, toLookup, t)
case m := <-ch.metricSource:
ch.handleMetric(ctx, toLookup, m, awaitingMetrics)
case e := <-ch.eventSource:
ch.handleEvent(ctx, toLookup, e, awaitingEvents)
}
}
}
func (ch *cloudHandler) doRefresh(ctx context.Context, toLookup chan<- types.IP, t time.Time) {
var toDelete []types.IP
now := t.UnixNano()
idleNano := ch.cacheOpts.CacheEvictAfterIdlePeriod.Nanoseconds()
for ip, holder := range ch.cache {
if now-holder.lastAccess() > idleNano {
// Entry was not used recently, remove it.
toDelete = append(toDelete, ip)
} else if t.After(holder.expires) {
// Entry needs a refresh.
select {
case <-ctx.Done():
return
case toLookup <- ip:
}
}
}
if len(toDelete) > 0 {
ch.rw.Lock()
for _, ip := range toDelete {
delete(ch.cache, ip)
}
ch.rw.Unlock()
}
}
func (ch *cloudHandler) handleLookupResult(ctx context.Context, lr *lookupResult, awaitingMetrics map[types.IP][]*types.Metric, awaitingEvents map[types.IP][]*types.Event) {
var ttl time.Duration
if lr.instance == nil {
ttl = ch.cacheOpts.CacheNegativeTTL
} else {
ttl = ch.cacheOpts.CacheTTL
}
now := time.Now()
newHolder := &instanceHolder{
expires: now.Add(ttl),
instance: lr.instance,
}
currentHolder := ch.cache[lr.ip]
if currentHolder == nil {
newHolder.lastAccessNano = now.UnixNano()
} else {
newHolder.lastAccessNano = currentHolder.lastAccess()
}
ch.rw.Lock()
ch.cache[lr.ip] = newHolder
ch.rw.Unlock()
metrics := awaitingMetrics[lr.ip]
if metrics != nil {
delete(awaitingMetrics, lr.ip)
go ch.updateAndDispatchMetrics(ctx, lr.instance, metrics...)
}
events := awaitingEvents[lr.ip]
if events != nil {
delete(awaitingEvents, lr.ip)
go ch.updateAndDispatchEvents(ctx, lr.instance, events...)
}
}
func (ch *cloudHandler) handleMetric(ctx context.Context, toLookup chan<- types.IP, m *types.Metric, awaitingMetrics map[types.IP][]*types.Metric) {
holder, ok := ch.cache[m.SourceIP]
if ok {
// While metric was in the queue the cache was primed. Use the value.
holder.updateAccess()
go ch.updateAndDispatchMetrics(ctx, holder.instance, m)
} else {
// Still nothing in the cache.
queue := awaitingMetrics[m.SourceIP]
awaitingMetrics[m.SourceIP] = append(queue, m)
if len(queue) == 0 {
// This is the first metric in the queue
select {
case <-ctx.Done():
case toLookup <- m.SourceIP:
}
}
}
}
func (ch *cloudHandler) updateAndDispatchMetrics(ctx context.Context, instance *cloudTypes.Instance, metrics ...*types.Metric) {
for _, m := range metrics {
updateInplace(&m.Tags, &m.Hostname, instance)
if err := ch.next.DispatchMetric(ctx, m); err != nil {
if err == context.Canceled || err == context.DeadlineExceeded {
return
}
log.Warnf("Failed to dispatch metric: %v", err)
}
}
}
func (ch *cloudHandler) handleEvent(ctx context.Context, toLookup chan<- types.IP, e *types.Event, awaitingEvents map[types.IP][]*types.Event) {
holder, ok := ch.cache[e.SourceIP]
if ok {
// While event was in the queue the cache was primed. Use the value.
holder.updateAccess()
go ch.updateAndDispatchEvents(ctx, holder.instance, e)
} else {
// Still nothing in the cache.
queue := awaitingEvents[e.SourceIP]
awaitingEvents[e.SourceIP] = append(queue, e)
if len(queue) == 0 {
// This is the first event in the queue
select {
case <-ctx.Done():
case toLookup <- e.SourceIP:
}
}
}
}
func (ch *cloudHandler) updateAndDispatchEvents(ctx context.Context, instance *cloudTypes.Instance, events ...*types.Event) {
var dispatched int
defer func() {
ch.wg.Add(-dispatched)
}()
for _, e := range events {
updateInplace(&e.Tags, &e.Hostname, instance)
dispatched++
if err := ch.next.DispatchEvent(ctx, e); err != nil {
if err == context.Canceled || err == context.DeadlineExceeded {
return
}
log.Warnf("Failed to dispatch event: %v", err)
}
}
}
func (ch *cloudHandler) lookupDispatcher(ctx context.Context, wg *sync.WaitGroup, toLookup <-chan types.IP, lookupResults chan<- *lookupResult) {
defer wg.Done()
defer log.Info("Cloud lookup dispatcher stopped")
var wgLookups sync.WaitGroup
defer wgLookups.Wait() // Wait for all in-flight lookups to finish
for ip := range toLookup {
if err := ch.limiter.Wait(ctx); err != nil {
if err != context.Canceled && err != context.DeadlineExceeded {
// This could be an error caused by context signaling done. Or something nasty but it is very unlikely.
log.Warnf("Error from limiter: %v", err)
}
return
}
wgLookups.Add(1)
go ch.doLookup(ctx, &wgLookups, ip, lookupResults)
}
}
func (ch *cloudHandler) doLookup(ctx context.Context, wg *sync.WaitGroup, ip types.IP, lookupResults chan<- *lookupResult) {
defer wg.Done()
instance, err := ch.cloud.Instance(ctx, ip)
if err != nil {
log.Debugf("Error retrieving instance details from cloud provider for %s: %v", ip, err)
}
res := &lookupResult{
ip: ip,
instance: instance,
}
select {
case <-ctx.Done():
case lookupResults <- res:
}
}
func (ch *cloudHandler) updateTagsAndHostname(ip types.IP, tags *types.Tags, hostname *string) bool {
instance, ok := ch.getInstance(ip)
if ok {
updateInplace(tags, hostname, instance)
}
return ok
}
func (ch *cloudHandler) getInstance(ip types.IP) (*cloudTypes.Instance, bool) {
if ip == types.UnknownIP {
return nil, true
}
ch.rw.RLock()
holder, ok := ch.cache[ip]
ch.rw.RUnlock()
if ok {
holder.updateAccess()
return holder.instance, true
}
return nil, false
}
func updateInplace(tags *types.Tags, hostname *string, instance *cloudTypes.Instance) {
if instance != nil { // It was a positive cache hit (successful lookup cache, not failed lookup cache)
// Update hostname inplace
*hostname = instance.ID
// Update tag list inplace
*tags = append(*tags, "region:"+instance.Region)
*tags = append(*tags, instance.Tags...)
}
}