forked from stripe/veneur
-
Notifications
You must be signed in to change notification settings - Fork 0
/
flusher.go
686 lines (598 loc) · 23.8 KB
/
flusher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
package veneur
import (
"bytes"
"compress/zlib"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/stripe/veneur/samplers"
"github.com/stripe/veneur/ssf"
"github.com/stripe/veneur/trace"
)
// Flush takes the slices of metrics, combines then and marshals them to json
// for posting to Datadog.
func (s *Server) Flush() {
span := tracer.StartSpan("flush", trace.NameTag("veneur.opentracing.flush")).(*trace.Span)
defer span.Finish()
// right now we have only one destination plugin
// but eventually, this is where we would loop over our supported
// destinations
if s.IsLocal() {
s.FlushLocal(span.Attach(context.Background()))
} else {
s.FlushGlobal(span.Attach(context.Background()))
}
}
// FlushGlobal sends any global metrics to their destination.
func (s *Server) FlushGlobal(ctx context.Context) {
span, _ := trace.StartSpanFromContext(ctx, "flush", trace.NameTag("veneur.opentracing.flush.FlushGlobal"))
defer span.Finish()
go s.flushEventsChecks() // we can do all of this separately
go s.flushTraces(span.Attach(ctx)) // this too!
percentiles := s.HistogramPercentiles
tempMetrics, ms := s.tallyMetrics(percentiles)
// the global veneur instance is also responsible for reporting the sets
// and global counters
ms.totalLength += ms.totalSets
ms.totalLength += ms.totalGlobalCounters
finalMetrics := s.generateDDMetrics(span.Attach(ctx), percentiles, tempMetrics, ms)
s.reportMetricsFlushCounts(ms)
s.reportGlobalMetricsFlushCounts(ms)
go func() {
for _, p := range s.getPlugins() {
start := time.Now()
err := p.Flush(finalMetrics, s.Hostname)
s.statsd.TimeInMilliseconds(fmt.Sprintf("flush.plugins.%s.total_duration_ns", p.Name()), float64(time.Since(start).Nanoseconds()), []string{"part:post"}, 1.0)
if err != nil {
countName := fmt.Sprintf("flush.plugins.%s.error_total", p.Name())
s.statsd.Count(countName, 1, []string{}, 1.0)
}
s.statsd.Gauge(fmt.Sprintf("flush.plugins.%s.post_metrics_total", p.Name()), float64(len(finalMetrics)), nil, 1.0)
}
}()
s.flushRemote(finalMetrics)
}
// FlushLocal takes the slices of metrics, combines then and marshals them to json
// for posting to Datadog.
func (s *Server) FlushLocal(ctx context.Context) {
span, _ := trace.StartSpanFromContext(ctx, "flush", trace.NameTag("veneur.opentracing.flush.FlushLocal"))
defer span.Finish()
go s.flushEventsChecks() // we can do all of this separately
go s.flushTraces(span.Attach(ctx)) // this too!
// don't publish percentiles if we're a local veneur; that's the global
// veneur's job
var percentiles []float64
tempMetrics, ms := s.tallyMetrics(percentiles)
finalMetrics := s.generateDDMetrics(span.Attach(ctx), percentiles, tempMetrics, ms)
s.reportMetricsFlushCounts(ms)
// we don't report totalHistograms, totalSets, or totalTimers for local veneur instances
// we cannot do this until we're done using tempMetrics within this function,
// since not everything in tempMetrics is safe for sharing
go s.flushForward(tempMetrics)
go func() {
for _, p := range s.getPlugins() {
start := time.Now()
err := p.Flush(finalMetrics, s.Hostname)
s.statsd.TimeInMilliseconds(fmt.Sprintf("flush.plugins.%s.total_duration_ns", p.Name()), float64(time.Since(start).Nanoseconds()), []string{"part:post"}, 1.0)
if err != nil {
countName := fmt.Sprintf("flush.plugins.%s.error_total", p.Name())
s.statsd.Count(countName, 1, []string{}, 1.0)
}
s.statsd.Gauge(fmt.Sprintf("flush.plugins.%s.post_metrics_total", p.Name()), float64(len(finalMetrics)), nil, 1.0)
}
}()
s.flushRemote(finalMetrics)
}
type metricsSummary struct {
totalCounters int
totalGauges int
totalHistograms int
totalSets int
totalTimers int
totalGlobalCounters int
totalLocalHistograms int
totalLocalSets int
totalLocalTimers int
totalLength int
}
// tallyMetrics gives a slight overestimate of the number
// of metrics we'll be reporting, so that we can pre-allocate
// a slice of the correct length instead of constantly appending
// for performance
func (s *Server) tallyMetrics(percentiles []float64) ([]WorkerMetrics, metricsSummary) {
// allocating this long array to count up the sizes is cheaper than appending
// the []DDMetrics together one at a time
tempMetrics := make([]WorkerMetrics, 0, len(s.Workers))
gatherStart := time.Now()
ms := metricsSummary{}
for i, w := range s.Workers {
log.WithField("worker", i).Debug("Flushing")
wm := w.Flush()
tempMetrics = append(tempMetrics, wm)
ms.totalCounters += len(wm.counters)
ms.totalGauges += len(wm.gauges)
ms.totalHistograms += len(wm.histograms)
ms.totalSets += len(wm.sets)
ms.totalTimers += len(wm.timers)
ms.totalGlobalCounters += len(wm.globalCounters)
ms.totalLocalHistograms += len(wm.localHistograms)
ms.totalLocalSets += len(wm.localSets)
ms.totalLocalTimers += len(wm.localTimers)
}
s.statsd.TimeInMilliseconds("flush.total_duration_ns", float64(time.Since(gatherStart).Nanoseconds()), []string{"part:gather"}, 1.0)
ms.totalLength = ms.totalCounters + ms.totalGauges +
// histograms and timers each report a metric point for each percentile
// plus a point for each of their aggregates
(ms.totalTimers+ms.totalHistograms)*(s.HistogramAggregates.Count+len(percentiles)) +
// local-only histograms will be flushed with percentiles, so we intentionally
// use the original percentile list here.
// remember that both the global veneur and the local instances have
// 'local-only' histograms.
ms.totalLocalSets + (ms.totalLocalTimers+ms.totalLocalHistograms)*(s.HistogramAggregates.Count+len(s.HistogramPercentiles))
return tempMetrics, ms
}
// generateDDMetrics calls the Flush method on each
// counter/gauge/histogram/timer/set in order to
// generate a DDMetric corresponding to that value
func (s *Server) generateDDMetrics(ctx context.Context, percentiles []float64, tempMetrics []WorkerMetrics, ms metricsSummary) []samplers.DDMetric {
span, _ := trace.StartSpanFromContext(ctx, "flush", trace.NameTag("veneur.opentracing.flush.generateDDMetrics"))
defer span.Finish()
finalMetrics := make([]samplers.DDMetric, 0, ms.totalLength)
for _, wm := range tempMetrics {
for _, c := range wm.counters {
finalMetrics = append(finalMetrics, c.Flush(s.interval)...)
}
for _, g := range wm.gauges {
finalMetrics = append(finalMetrics, g.Flush()...)
}
// if we're a local veneur, then percentiles=nil, and only the local
// parts (count, min, max) will be flushed
for _, h := range wm.histograms {
finalMetrics = append(finalMetrics, h.Flush(s.interval, percentiles, s.HistogramAggregates)...)
}
for _, t := range wm.timers {
finalMetrics = append(finalMetrics, t.Flush(s.interval, percentiles, s.HistogramAggregates)...)
}
// local-only samplers should be flushed in their entirety, since they
// will not be forwarded
// we still want percentiles for these, even if we're a local veneur, so
// we use the original percentile list when flushing them
for _, h := range wm.localHistograms {
finalMetrics = append(finalMetrics, h.Flush(s.interval, s.HistogramPercentiles, s.HistogramAggregates)...)
}
for _, s := range wm.localSets {
finalMetrics = append(finalMetrics, s.Flush()...)
}
for _, t := range wm.localTimers {
finalMetrics = append(finalMetrics, t.Flush(s.interval, s.HistogramPercentiles, s.HistogramAggregates)...)
}
// TODO (aditya) refactor this out so we don't
// have to call IsLocal again
if !s.IsLocal() {
// sets have no local parts, so if we're a local veneur, there's
// nothing to flush at all
for _, s := range wm.sets {
finalMetrics = append(finalMetrics, s.Flush()...)
}
// also do this for global counters
// global counters have no local parts, so if we're a local veneur,
// there's nothing to flush
for _, gc := range wm.globalCounters {
finalMetrics = append(finalMetrics, gc.Flush(s.interval)...)
}
}
}
finalizeMetrics(s.Hostname, s.Tags, finalMetrics)
s.statsd.TimeInMilliseconds("flush.total_duration_ns", float64(time.Since(span.Start).Nanoseconds()), []string{"part:combine"}, 1.0)
return finalMetrics
}
// reportMetricsFlushCounts reports the counts of
// Counters, Gauges, LocalHistograms, LocalSets, and LocalTimers
// as metrics. These are shared by both global and local flush operations.
// It does *not* report the totalHistograms, totalSets, or totalTimers
// because those are only performed by the global veneur instance.
// It also does not report the total metrics posted, because on the local veneur,
// that should happen *after* the flush-forward operation.
func (s *Server) reportMetricsFlushCounts(ms metricsSummary) {
s.statsd.Count("worker.metrics_flushed_total", int64(ms.totalCounters), []string{"metric_type:counter"}, 1.0)
s.statsd.Count("worker.metrics_flushed_total", int64(ms.totalGauges), []string{"metric_type:gauge"}, 1.0)
s.statsd.Count("worker.metrics_flushed_total", int64(ms.totalLocalHistograms), []string{"metric_type:local_histogram"}, 1.0)
s.statsd.Count("worker.metrics_flushed_total", int64(ms.totalLocalSets), []string{"metric_type:local_set"}, 1.0)
s.statsd.Count("worker.metrics_flushed_total", int64(ms.totalLocalTimers), []string{"metric_type:local_timer"}, 1.0)
}
// reportGlobalMetricsFlushCounts reports the counts of
// globalCounters, totalHistograms, totalSets, and totalTimers,
// which are the three metrics reported *only* by the global
// veneur instance.
func (s *Server) reportGlobalMetricsFlushCounts(ms metricsSummary) {
// we only report these lengths in FlushGlobal
// since if we're the global veneur instance responsible for flushing them
// this avoids double-counting problems where a local veneur reports
// histograms that it received, and then a global veneur reports them
// again
s.statsd.Count("worker.metrics_flushed_total", int64(ms.totalGlobalCounters), []string{"metric_type:global_counter"}, 1.0)
s.statsd.Count("worker.metrics_flushed_total", int64(ms.totalHistograms), []string{"metric_type:histogram"}, 1.0)
s.statsd.Count("worker.metrics_flushed_total", int64(ms.totalSets), []string{"metric_type:set"}, 1.0)
s.statsd.Count("worker.metrics_flushed_total", int64(ms.totalTimers), []string{"metric_type:timer"}, 1.0)
}
// flushRemote breaks up the final metrics into chunks
// (to avoid hitting the size cap) and POSTs them to the remote API
func (s *Server) flushRemote(finalMetrics []samplers.DDMetric) {
s.statsd.Gauge("flush.post_metrics_total", float64(len(finalMetrics)), nil, 1.0)
// Check to see if we have anything to do
if len(finalMetrics) == 0 {
log.Info("Nothing to flush, skipping.")
return
}
// break the metrics into chunks of approximately equal size, such that
// each chunk is less than the limit
// we compute the chunks using rounding-up integer division
workers := ((len(finalMetrics) - 1) / s.FlushMaxPerBody) + 1
chunkSize := ((len(finalMetrics) - 1) / workers) + 1
log.WithField("workers", workers).Debug("Worker count chosen")
log.WithField("chunkSize", chunkSize).Debug("Chunk size chosen")
var wg sync.WaitGroup
flushStart := time.Now()
for i := 0; i < workers; i++ {
chunk := finalMetrics[i*chunkSize:]
if i < workers-1 {
// trim to chunk size unless this is the last one
chunk = chunk[:chunkSize]
}
wg.Add(1)
go s.flushPart(chunk, &wg)
}
wg.Wait()
s.statsd.TimeInMilliseconds("flush.total_duration_ns", float64(time.Since(flushStart).Nanoseconds()), []string{"part:post"}, 1.0)
log.WithField("metrics", len(finalMetrics)).Info("Completed flush to Datadog")
}
func finalizeMetrics(hostname string, tags []string, finalMetrics []samplers.DDMetric) {
for i := range finalMetrics {
// Let's look for "magic tags" that override metric fields host and device.
for j, tag := range finalMetrics[i].Tags {
// This overrides hostname
if strings.HasPrefix(tag, "host:") {
// delete the tag from the list
finalMetrics[i].Tags = append(finalMetrics[i].Tags[:j], finalMetrics[i].Tags[j+1:]...)
// Override the hostname with the tag, trimming off the prefix
finalMetrics[i].Hostname = tag[5:]
} else if strings.HasPrefix(tag, "device:") {
// Same as above, but device this time
finalMetrics[i].Tags = append(finalMetrics[i].Tags[:j], finalMetrics[i].Tags[j+1:]...)
finalMetrics[i].DeviceName = tag[7:]
}
}
if finalMetrics[i].Hostname == "" {
// No magic tag, set the hostname
finalMetrics[i].Hostname = hostname
}
finalMetrics[i].Tags = append(finalMetrics[i].Tags, tags...)
}
}
// flushPart flushes a set of metrics to the remote API server
func (s *Server) flushPart(metricSlice []samplers.DDMetric, wg *sync.WaitGroup) {
defer wg.Done()
s.postHelper(context.TODO(), fmt.Sprintf("%s/api/v1/series?api_key=%s", s.DDHostname, s.DDAPIKey), map[string][]samplers.DDMetric{
"series": metricSlice,
}, "flush", true)
}
func (s *Server) flushForward(wms []WorkerMetrics) {
jmLength := 0
for _, wm := range wms {
jmLength += len(wm.histograms)
jmLength += len(wm.sets)
jmLength += len(wm.timers)
}
jsonMetrics := make([]samplers.JSONMetric, 0, jmLength)
exportStart := time.Now()
for _, wm := range wms {
for _, count := range wm.globalCounters {
jm, err := count.Export()
if err != nil {
log.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"type": "counter",
"name": count.Name,
}).Error("Could not export metric")
continue
}
jsonMetrics = append(jsonMetrics, jm)
}
for _, histo := range wm.histograms {
jm, err := histo.Export()
if err != nil {
log.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"type": "histogram",
"name": histo.Name,
}).Error("Could not export metric")
continue
}
jsonMetrics = append(jsonMetrics, jm)
}
for _, set := range wm.sets {
jm, err := set.Export()
if err != nil {
log.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"type": "set",
"name": set.Name,
}).Error("Could not export metric")
continue
}
jsonMetrics = append(jsonMetrics, jm)
}
for _, timer := range wm.timers {
jm, err := timer.Export()
if err != nil {
log.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"type": "timer",
"name": timer.Name,
}).Error("Could not export metric")
continue
}
// the exporter doesn't know that these two are "different"
jm.Type = "timer"
jsonMetrics = append(jsonMetrics, jm)
}
}
s.statsd.TimeInMilliseconds("forward.duration_ns", float64(time.Since(exportStart).Nanoseconds()), []string{"part:export"}, 1.0)
s.statsd.Gauge("forward.post_metrics_total", float64(len(jsonMetrics)), nil, 1.0)
if len(jsonMetrics) == 0 {
log.Debug("Nothing to forward, skipping.")
return
}
// always re-resolve the host to avoid dns caching
dnsStart := time.Now()
endpoint, err := resolveEndpoint(fmt.Sprintf("%s/import", s.ForwardAddr))
if err != nil {
// not a fatal error if we fail
// we'll just try to use the host as it was given to us
s.statsd.Count("forward.error_total", 1, []string{"cause:dns"}, 1.0)
log.WithError(err).Warn("Could not re-resolve host for forward")
}
s.statsd.TimeInMilliseconds("forward.duration_ns", float64(time.Since(dnsStart).Nanoseconds()), []string{"part:dns"}, 1.0)
// the error has already been logged (if there was one), so we only care
// about the success case
if s.postHelper(context.TODO(), endpoint, jsonMetrics, "forward", true) == nil {
log.WithField("metrics", len(jsonMetrics)).Info("Completed forward to upstream Veneur")
}
}
// given a url, attempts to resolve the url's host, and returns a new url whose
// host has been replaced by the first resolved address
// on failure, it returns the argument, and the resulting error
func resolveEndpoint(endpoint string) (string, error) {
origURL, err := url.Parse(endpoint)
if err != nil {
// caution: this error contains the endpoint itself, so if the endpoint
// has secrets in it, you have to remove them
return endpoint, err
}
origHost, origPort, err := net.SplitHostPort(origURL.Host)
if err != nil {
return endpoint, err
}
resolvedNames, err := net.LookupHost(origHost)
if err != nil {
return endpoint, err
}
if len(resolvedNames) == 0 {
return endpoint, &net.DNSError{
Err: "no hosts found",
Name: origHost,
}
}
origURL.Host = net.JoinHostPort(resolvedNames[0], origPort)
return origURL.String(), nil
}
func (s *Server) flushTraces(ctx context.Context) {
if !s.TracingEnabled() {
return
}
span, _ := trace.StartSpanFromContext(ctx, "flush", trace.NameTag("veneur.opentracing.flush.flushTraces"))
defer span.Finish()
traces := s.TraceWorker.Flush()
var finalTraces []*DatadogTraceSpan
traces.Do(func(t interface{}) {
if t != nil {
span, ok := t.(ssf.SSFSample)
if !ok {
log.Error("Got an unknown object in tracing ring!")
return
}
// -1 is a canonical way of passing in invalid info in Go
// so we should support that too
parentID := span.Trace.ParentId
// check if this is the root span
if parentID <= 0 {
// we need parentId to be zero for json:omitempty to work
parentID = 0
}
resource := span.Trace.Resource
tags := map[string]string{}
for _, tag := range span.Tags {
tags[tag.Name] = tag.Value
}
// TODO implement additional metrics
var metrics map[string]float64
ddspan := &DatadogTraceSpan{
TraceID: span.Trace.TraceId,
SpanID: span.Trace.Id,
ParentID: parentID,
Service: span.Service,
Name: span.Name,
Resource: resource,
Start: span.Timestamp,
Duration: span.Trace.Duration,
// TODO don't hardcode
Type: "http",
Error: int64(span.Status),
Metrics: metrics,
Meta: tags,
}
finalTraces = append(finalTraces, ddspan)
}
})
if len(finalTraces) != 0 {
// this endpoint is not documented to take an array... but it does
// another curious constraint of this endpoint is that it does not
// support "Content-Encoding: deflate"
err := s.postHelper(span.Attach(ctx), fmt.Sprintf("%s/spans", s.DDTraceAddress), finalTraces, "flush_traces", false)
if err == nil {
log.WithField("traces", len(finalTraces)).Info("Completed flushing traces to Datadog")
} else {
log.WithFields(
logrus.Fields{
"traces": len(finalTraces),
logrus.ErrorKey: err}).Error("Error flushing traces to Datadog")
}
} else {
log.Info("No traces to flush, skipping.")
}
}
func (s *Server) flushEventsChecks() {
events, checks := s.EventWorker.Flush()
s.statsd.Count("worker.events_flushed_total", int64(len(events)), nil, 1.0)
s.statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0)
// fill in the default hostname for packets that didn't set it
for i := range events {
if events[i].Hostname == "" {
events[i].Hostname = s.Hostname
}
events[i].Tags = append(events[i].Tags, s.Tags...)
}
for i := range checks {
if checks[i].Hostname == "" {
checks[i].Hostname = s.Hostname
}
checks[i].Tags = append(checks[i].Tags, s.Tags...)
}
if len(events) != 0 {
// this endpoint is not documented at all, its existence is only known from
// the official dd-agent
// we don't actually pass all the body keys that dd-agent passes here... but
// it still works
err := s.postHelper(context.TODO(), fmt.Sprintf("%s/intake?api_key=%s", s.DDHostname, s.DDAPIKey), map[string]map[string][]samplers.UDPEvent{
"events": {
"api": events,
},
}, "flush_events", true)
if err == nil {
log.WithField("events", len(events)).Info("Completed flushing events to Datadog")
}
}
if len(checks) != 0 {
// this endpoint is not documented to take an array... but it does
// another curious constraint of this endpoint is that it does not
// support "Content-Encoding: deflate"
err := s.postHelper(context.TODO(), fmt.Sprintf("%s/api/v1/check_run?api_key=%s", s.DDHostname, s.DDAPIKey), checks, "flush_checks", false)
if err == nil {
log.WithField("checks", len(checks)).Info("Completed flushing service checks to Datadog")
}
}
}
// shared code for POSTing to an endpoint, that consumes JSON, that is zlib-
// compressed, that returns 202 on success, that has a small response
// action is a string used for statsd metric names and log messages emitted from
// this function - probably a static string for each callsite
// you can disable compression with compress=false for endpoints that don't
// support it
func (s *Server) postHelper(ctx context.Context, endpoint string, bodyObject interface{}, action string, compress bool) error {
span, _ := trace.StartSpanFromContext(ctx, action, trace.NameTag("veneur.opentracing.flush.postHelper"))
defer span.Finish()
// attach this field to all the logs we generate
innerLogger := log.WithField("action", action)
marshalStart := time.Now()
var (
bodyBuffer bytes.Buffer
encoder *json.Encoder
compressor *zlib.Writer
)
if compress {
compressor = zlib.NewWriter(&bodyBuffer)
encoder = json.NewEncoder(compressor)
} else {
encoder = json.NewEncoder(&bodyBuffer)
}
if err := encoder.Encode(bodyObject); err != nil {
s.statsd.Count(action+".error_total", 1, []string{"cause:json"}, 1.0)
innerLogger.WithError(err).Error("Could not render JSON")
return err
}
if compress {
// don't forget to flush leftover compressed bytes to the buffer
if err := compressor.Close(); err != nil {
s.statsd.Count(action+".error_total", 1, []string{"cause:compress"}, 1.0)
innerLogger.WithError(err).Error("Could not finalize compression")
return err
}
}
s.statsd.TimeInMilliseconds(action+".duration_ns", float64(time.Since(marshalStart).Nanoseconds()), []string{"part:json"}, 1.0)
// Len reports the unread length, so we have to record this before the
// http client consumes it
bodyLength := bodyBuffer.Len()
s.statsd.Histogram(action+".content_length_bytes", float64(bodyLength), nil, 1.0)
req, err := http.NewRequest(http.MethodPost, endpoint, &bodyBuffer)
if err != nil {
s.statsd.Count(action+".error_total", 1, []string{"cause:construct"}, 1.0)
innerLogger.WithError(err).Error("Could not construct request")
return err
}
req.Header.Set("Content-Type", "application/json")
if compress {
req.Header.Set("Content-Encoding", "deflate")
}
// we only make http requests at flush time, so keepalive is not a big win
req.Close = true
err = tracer.InjectRequest(span.Trace, req)
if err != nil {
s.statsd.Count("veneur.opentracing.flush.inject.errors", 1, nil, 1.0)
innerLogger.WithError(err).Error("Error injecting header")
}
requestStart := time.Now()
resp, err := s.HTTPClient.Do(req)
if err != nil {
if urlErr, ok := err.(*url.Error); ok {
// if the error has the url in it, then retrieve the inner error
// and ditch the url (which might contain secrets)
err = urlErr.Err
}
s.statsd.Count(action+".error_total", 1, []string{"cause:io"}, 1.0)
innerLogger.WithError(err).Error("Could not execute request")
return err
}
s.statsd.TimeInMilliseconds(action+".duration_ns", float64(time.Since(requestStart).Nanoseconds()), []string{"part:post"}, 1.0)
defer resp.Body.Close()
responseBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
// this error is not fatal, since we only need the body for reporting
// purposes
s.statsd.Count(action+".error_total", 1, []string{"cause:readresponse"}, 1.0)
innerLogger.WithError(err).Error("Could not read response body")
}
resultLogger := innerLogger.WithFields(logrus.Fields{
"request_length": bodyLength,
"request_headers": req.Header,
"status": resp.Status,
"response_headers": resp.Header,
"response": string(responseBody),
})
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
s.statsd.Count(action+".error_total", 1, []string{fmt.Sprintf("cause:%d", resp.StatusCode)}, 1.0)
resultLogger.Error("Could not POST")
return err
}
// make sure the error metric isn't sparse
s.statsd.Count(action+".error_total", 0, nil, 1.0)
resultLogger.Debug("POSTed successfully")
return nil
}