From 80f39bc702f5dded1b3465bf3b37692f7c719343 Mon Sep 17 00:00:00 2001 From: Aubrey Tull Date: Fri, 13 Jul 2018 12:54:47 -0700 Subject: [PATCH] Passing trace context to signalfx --- flusher.go | 2 ++ proxy.go | 14 +++++++++----- sinks/signalfx/signalfx.go | 11 ++++++----- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/flusher.go b/flusher.go index 33e91664c..05103e6c0 100644 --- a/flusher.go +++ b/flusher.go @@ -294,6 +294,8 @@ func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) { defer span.ClientFinish(s.TraceClient) jmLength := 0 for _, wm := range wms { + jmLength += len(wm.globalCounters) + jmLength += len(wm.globalGauges) jmLength += len(wm.histograms) jmLength += len(wm.sets) jmLength += len(wm.timers) diff --git a/proxy.go b/proxy.go index 7bd281ae4..44fc8ddc7 100644 --- a/proxy.go +++ b/proxy.go @@ -360,14 +360,18 @@ func (p *Proxy) RefreshDestinations(serviceName string, ring *consistent.Consist return } - // At the last moment, lock the mutex and defer so we unlock after setting. - // We do this after we've fetched info so we don't hold the lock during long - // queries, timeouts or errors. The flusher can lock the mutex and prevent us - // from updating at the same time. + updateRing(destinations, ring, mtx) + samples.Add(ssf.Gauge("discoverer.destination_number", float32(len(destinations)), srvTags)) +} + +// updateRing lock the mutex and defer so we unlock after setting. +// We do this after we've fetched info so we don't hold the lock during long +// queries, timeouts or errors. The flusher can lock the mutex and prevent us +// from updating at the same time. +func updateRing(destinations []string, ring *consistent.Consistent, mtx *sync.Mutex) { mtx.Lock() defer mtx.Unlock() ring.Set(destinations) - samples.Add(ssf.Gauge("discoverer.destination_number", float32(len(destinations)), srvTags)) } // Handler returns the Handler responsible for routing request processing. diff --git a/sinks/signalfx/signalfx.go b/sinks/signalfx/signalfx.go index 5ae610df9..eb5256175 100644 --- a/sinks/signalfx/signalfx.go +++ b/sinks/signalfx/signalfx.go @@ -47,15 +47,16 @@ func (c *collection) submit(ctx context.Context, cl *trace.Client) error { errorCh := make(chan error, len(c.pointsByKey)+1) submitOne := func(client dpsink.Sink, points []*datapoint.Datapoint) { - span, ctx := trace.StartSpanFromContext(ctx, "") + span, childCtx := trace.StartSpanFromContext(ctx, "") + span.SetTag("datapoint_count", len(points)) defer span.ClientFinish(cl) - err := client.AddDatapoints(ctx, points) + defer wg.Done() + err := client.AddDatapoints(childCtx, points) if err != nil { span.Error(err) span.Add(ssf.Count("flush.error_total", 1, map[string]string{"cause": "io", "sink": "signalfx"})) errorCh <- err } - wg.Done() } wg.Add(1) @@ -152,7 +153,7 @@ func (sfx *SignalFxSink) newPointCollection() *collection { // Flush sends metrics to SignalFx func (sfx *SignalFxSink) Flush(ctx context.Context, interMetrics []samplers.InterMetric) error { - span, ctx := trace.StartSpanFromContext(ctx, "") + span, subCtx := trace.StartSpanFromContext(ctx, "") defer span.ClientFinish(sfx.traceClient) flushStart := time.Now() @@ -211,7 +212,7 @@ func (sfx *SignalFxSink) Flush(ctx context.Context, interMetrics []samplers.Inte } tags := map[string]string{"sink": "signalfx"} span.Add(ssf.Count(sinks.MetricKeyTotalMetricsSkipped, float32(countSkipped), tags)) - err := coll.submit(ctx, sfx.traceClient) + err := coll.submit(subCtx, sfx.traceClient) if err != nil { span.Error(err) }