/
sfxclient.go
370 lines (335 loc) · 12.2 KB
/
sfxclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
// Package sfxclient creates convenient go functions and wrappers to send metrics to SignalFx.
//
// The core of the library is HTTPSink which allows users to send metrics and events to SignalFx
// ad-hoc. A Scheduler is built on top of this to facility easy management of metrics for multiple
// SignalFx reporters at once in more complex libraries.
//
// HTTPSink
//
// The simplest way to send metrics and events to SignalFx is with HTTPSink. The only struct
// parameter that needs to be configured is AuthToken. To make it easier to create common
// Datapoint objects, wrappers exist for Gauge and Cumulative. An example of sending a hello
// world metric would look like this:
// func SendHelloWorld() {
// client := NewHTTPSink()
// client.AuthToken = "ABCDXYZ"
// ctx := context.Background()
// client.AddDatapoints(ctx, []*datapoint.Datapoint{
// GaugeF("hello.world", nil, 1.0),
// })
// }
//
// Scheduler
//
// To facilitate periodic sending of datapoints to SignalFx, a Scheduler abstraction exists. You
// can use this to report custom metrics to SignalFx at some periodic interval.
// type CustomApplication struct {
// queue chan int64
// }
// func (c *CustomApplication) Datapoints() []*datapoint.Datapoint {
// return []*datapoint.Datapoint {
// sfxclient.Gauge("queue.size", nil, len(queue)),
// }
// }
// func main() {
// scheduler := sfxclient.NewScheduler()
// scheduler.Sink.(*HTTPSink).AuthToken = "ABCD-XYZ"
// app := &CustomApplication{}
// scheduler.AddCallback(app)
// go scheduler.Schedule(context.Background())
// }
//
// RollingBucket and CumulativeBucket
//
// Because counting things and calculating percentiles like p99 or median are common operations,
// RollingBucket and CumulativeBucket exist to make this easier. They implement the Collector
// interface which allows users to add them to an existing Scheduler.
//
// To run integration tests, testing sending to SignalFx with an actual token, create a file named
// authinfo.json that has your auth Token, similar to the following
// {
// "AuthToken": "abcdefg"
// }
//
// Then execute the following:
// go test -v --tags=integration -run TestDatapointSending ./sfxclient/
package sfxclient
import (
"bytes"
"context"
"expvar"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/opentracing/opentracing-go"
"github.com/signalfx/golib/v3/datapoint"
"github.com/signalfx/golib/v3/errors"
"github.com/signalfx/golib/v3/log"
"github.com/signalfx/golib/v3/timekeeper"
)
const (
// DefaultReportingDelay is the default interval Scheduler users to report metrics to SignalFx
DefaultReportingDelay = time.Second * 20
// DefaultReportingTimeout is the default timeout value for Scheduler to log error message if reporting is not completed within this duration
DefaultReportingTimeout = time.Second * 5
// used when group name is ""
defaultCallbackGroup = "default-callback-group"
)
// DefaultErrorHandler is the default way to handle errors by a scheduler. It simply prints them to stdout
var DefaultErrorHandler = func(err error) error {
log.DefaultLogger.Log(log.Err, err, "Unable to handle error")
return nil
}
// Sink is anything that can receive points collected by a Scheduler. This can be useful for
// stubbing out your collector to test the points that will be sent to SignalFx.
type Sink interface {
AddDatapoints(ctx context.Context, points []*datapoint.Datapoint) (err error)
}
// Collector is anything Scheduler can track that emits points
type Collector interface {
Datapoints() []*datapoint.Datapoint
}
// HashableCollector is a Collector function that can be inserted into a hashmap. You can use it
// to wrap a functional callback and insert it into a Scheduler.
type HashableCollector struct {
Callback func() []*datapoint.Datapoint
}
// CollectorFunc wraps a function to make it a Collector.
func CollectorFunc(callback func() []*datapoint.Datapoint) *HashableCollector {
return &HashableCollector{
Callback: callback,
}
}
// Datapoints calls the wrapped function.
func (h *HashableCollector) Datapoints() []*datapoint.Datapoint {
return h.Callback()
}
var _ Collector = CollectorFunc(nil)
type callbackPair struct {
callbacks map[Collector]struct{}
defaultDimensions map[string]string
expectedSize int
}
func (c *callbackPair) insertTimeStamp(now time.Time, sendZeroTime bool, ret []*datapoint.Datapoint) {
for _, dp := range ret {
// It's a bit dangerous to modify the map (we don't know how it was passed in) so
// make a copy to be safe
dp.Dimensions = datapoint.AddMaps(c.defaultDimensions, dp.Dimensions)
if !sendZeroTime && dp.Timestamp.IsZero() {
dp.Timestamp = now
}
}
c.expectedSize = len(ret)
}
func (c *callbackPair) getDatapoints(now time.Time, sendZeroTime bool) []*datapoint.Datapoint {
ret := make([]*datapoint.Datapoint, 0, c.expectedSize)
for callback := range c.callbacks {
ret = append(ret, callback.Datapoints()...)
}
c.insertTimeStamp(now, sendZeroTime, ret)
return ret
}
func (c *callbackPair) getDatapointsWithDebug(parentSpan opentracing.Span, now time.Time, sendZeroTime bool) []*datapoint.Datapoint {
var (
buf bytes.Buffer
ret = make([]*datapoint.Datapoint, 0, c.expectedSize)
)
for callback := range c.callbacks {
buf.WriteString(reflect.TypeOf(callback).String())
span := opentracing.GlobalTracer().StartSpan(buf.String(), opentracing.ChildOf(parentSpan.Context()))
ret = append(ret, callback.Datapoints()...)
span.Finish()
buf.Reset()
}
c.insertTimeStamp(now, sendZeroTime, ret)
return ret
}
// A Scheduler reports metrics to SignalFx at some timely manner.
type Scheduler struct {
Sink Sink
Timer timekeeper.TimeKeeper
SendZeroTime bool
debug bool
ErrorHandler func(error) error
ReportingDelayNs int64
ReportingTimeoutNs int64
callbackMutex sync.Mutex
callbackMap map[string]*callbackPair
previousDatapoints []*datapoint.Datapoint
stats struct {
scheduledSleepCounts int64
resetIntervalCounts int64
reportingTimeoutCounts int64
}
Prefix string
}
// NewScheduler creates a default SignalFx scheduler that can report metrics to SignalFx at some
// interval.
func NewScheduler() *Scheduler {
return &Scheduler{
Sink: NewHTTPSink(),
Timer: timekeeper.RealTime{},
ErrorHandler: DefaultErrorHandler,
ReportingDelayNs: DefaultReportingDelay.Nanoseconds(),
ReportingTimeoutNs: DefaultReportingTimeout.Nanoseconds(),
callbackMap: make(map[string]*callbackPair),
}
}
// Var returns an expvar variable that prints the values of the previously reported datapoints.
func (s *Scheduler) Var() expvar.Var {
return expvar.Func(func() interface{} {
s.callbackMutex.Lock()
defer s.callbackMutex.Unlock()
return s.previousDatapoints
})
}
// CollectDatapoints gives a scheduler an external endpoint to be called and is thread safe
func (s *Scheduler) CollectDatapoints() []*datapoint.Datapoint {
s.callbackMutex.Lock()
defer s.callbackMutex.Unlock()
datapoints := s.collectDatapoints()
s.prependPrefix(datapoints)
return datapoints
}
// collectDatapoints gives a scheduler an external endpoint to be called and is not thread safe
func (s *Scheduler) collectDatapoints() []*datapoint.Datapoint {
ret := make([]*datapoint.Datapoint, 0, len(s.previousDatapoints))
now := s.Timer.Now()
if s.debug {
parentSpan := opentracing.GlobalTracer().StartSpan("collect-datapoints")
for group, p := range s.callbackMap {
span := opentracing.GlobalTracer().StartSpan(group, opentracing.ChildOf(parentSpan.Context()))
ret = append(ret, p.getDatapointsWithDebug(span, now, s.SendZeroTime)...)
span.Finish()
}
parentSpan.Finish()
} else {
for _, p := range s.callbackMap {
ret = append(ret, p.getDatapoints(now, s.SendZeroTime)...)
}
}
return ret
}
// AddCallback adds a collector to the default group.
func (s *Scheduler) AddCallback(db Collector) {
s.AddGroupedCallback(defaultCallbackGroup, db)
}
// DefaultDimensions adds a dimension map that are appended to all metrics in the default group.
func (s *Scheduler) DefaultDimensions(dims map[string]string) {
s.GroupedDefaultDimensions(defaultCallbackGroup, dims)
}
// GroupedDefaultDimensions adds default dimensions to a specific group.
func (s *Scheduler) GroupedDefaultDimensions(group string, dims map[string]string) {
s.callbackMutex.Lock()
defer s.callbackMutex.Unlock()
subgroup, exists := s.callbackMap[group]
if !exists {
subgroup = &callbackPair{
callbacks: make(map[Collector]struct{}),
defaultDimensions: dims,
}
s.callbackMap[group] = subgroup
}
subgroup.defaultDimensions = dims
}
// AddGroupedCallback adds a collector to a specific group.
func (s *Scheduler) AddGroupedCallback(group string, db Collector) {
s.callbackMutex.Lock()
defer s.callbackMutex.Unlock()
subgroup, exists := s.callbackMap[group]
if !exists {
subgroup = &callbackPair{
callbacks: map[Collector]struct{}{db: {}},
defaultDimensions: map[string]string{},
}
s.callbackMap[group] = subgroup
}
subgroup.callbacks[db] = struct{}{}
}
// RemoveCallback removes a collector from the default group.
func (s *Scheduler) RemoveCallback(db Collector) {
s.RemoveGroupedCallback(defaultCallbackGroup, db)
}
// RemoveGroupedCallback removes a collector from a specific group.
func (s *Scheduler) RemoveGroupedCallback(group string, db Collector) {
s.callbackMutex.Lock()
defer s.callbackMutex.Unlock()
if g, exists := s.callbackMap[group]; exists {
delete(g.callbacks, db)
if len(g.callbacks) == 0 {
delete(s.callbackMap, group)
}
}
}
// ReportOnce will report any metrics saved in this reporter to SignalFx
func (s *Scheduler) ReportOnce(ctx context.Context) error {
datapoints := func() []*datapoint.Datapoint {
s.callbackMutex.Lock()
defer s.callbackMutex.Unlock()
datapoints := s.collectDatapoints()
s.previousDatapoints = datapoints
return datapoints
}()
s.prependPrefix(datapoints)
return s.Sink.AddDatapoints(ctx, datapoints)
}
// Add prefix to metrics if specified in scheduler
func (s *Scheduler) prependPrefix(datapoints []*datapoint.Datapoint) {
if s.Prefix != "" {
for _, datapoint := range datapoints {
var sb strings.Builder
sb.WriteString(s.Prefix)
sb.WriteString(datapoint.Metric)
datapoint.Metric = sb.String()
}
}
}
// ReportingDelay sets the interval metrics are reported to SignalFx.
func (s *Scheduler) ReportingDelay(delay time.Duration) {
atomic.StoreInt64(&s.ReportingDelayNs, delay.Nanoseconds())
}
// ReportingTimeout sets the timeout value if reporting to SignalFx is not completed within this duration
func (s *Scheduler) ReportingTimeout(timeout time.Duration) {
atomic.StoreInt64(&s.ReportingTimeoutNs, timeout.Nanoseconds())
}
// Debug used for debugging collectDatapoints()
func (s *Scheduler) Debug(debug bool) {
s.debug = debug
}
func (s *Scheduler) reportingTimeoutHandler() {
atomic.AddInt64(&s.stats.reportingTimeoutCounts, 1)
_ = s.ErrorHandler(errors.New(fmt.Sprintln("reporting datapoints is not getting completed in allocated ns time",
time.Duration(atomic.LoadInt64(&s.ReportingTimeoutNs)))))
}
// Schedule will run until either the ErrorHandler returns an error or the context is canceled. This is intended to
// be run inside a goroutine.
func (s *Scheduler) Schedule(ctx context.Context) error {
lastReport := s.Timer.Now()
for {
reportingDelay := time.Duration(atomic.LoadInt64(&s.ReportingDelayNs))
wakeupTime := lastReport.Add(reportingDelay)
now := s.Timer.Now()
if now.After(wakeupTime) {
wakeupTime = now.Add(reportingDelay)
atomic.AddInt64(&s.stats.resetIntervalCounts, 1)
}
sleepTime := wakeupTime.Sub(now)
atomic.AddInt64(&s.stats.scheduledSleepCounts, 1)
select {
case <-ctx.Done():
return errors.Annotate(ctx.Err(), "context closed")
case <-s.Timer.After(sleepTime):
lastReport = s.Timer.Now()
rT := time.AfterFunc(time.Duration(atomic.LoadInt64(&s.ReportingTimeoutNs)), s.reportingTimeoutHandler)
if err := errors.Annotate(s.ReportOnce(ctx), "failed reporting single metric"); err != nil {
if err2 := errors.Annotate(s.ErrorHandler(err), "error handler returned an error"); err2 != nil {
return err2
}
}
rT.Stop()
}
}
}