Skip to content

Commit

Permalink
Passing trace context to signalfx
Browse files Browse the repository at this point in the history
  • Loading branch information
aubrey-stripe committed Jul 16, 2018
1 parent 8c63ff0 commit 80f39bc
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 10 deletions.
2 changes: 2 additions & 0 deletions flusher.go
Expand Up @@ -294,6 +294,8 @@ func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) {
defer span.ClientFinish(s.TraceClient)
jmLength := 0
for _, wm := range wms {
jmLength += len(wm.globalCounters)
jmLength += len(wm.globalGauges)
jmLength += len(wm.histograms)
jmLength += len(wm.sets)
jmLength += len(wm.timers)
Expand Down
14 changes: 9 additions & 5 deletions proxy.go
Expand Up @@ -360,14 +360,18 @@ func (p *Proxy) RefreshDestinations(serviceName string, ring *consistent.Consist
return
}

// At the last moment, lock the mutex and defer so we unlock after setting.
// We do this after we've fetched info so we don't hold the lock during long
// queries, timeouts or errors. The flusher can lock the mutex and prevent us
// from updating at the same time.
updateRing(destinations, ring, mtx)
samples.Add(ssf.Gauge("discoverer.destination_number", float32(len(destinations)), srvTags))
}

// updateRing lock the mutex and defer so we unlock after setting.
// We do this after we've fetched info so we don't hold the lock during long
// queries, timeouts or errors. The flusher can lock the mutex and prevent us
// from updating at the same time.
func updateRing(destinations []string, ring *consistent.Consistent, mtx *sync.Mutex) {
mtx.Lock()
defer mtx.Unlock()
ring.Set(destinations)
samples.Add(ssf.Gauge("discoverer.destination_number", float32(len(destinations)), srvTags))
}

// Handler returns the Handler responsible for routing request processing.
Expand Down
11 changes: 6 additions & 5 deletions sinks/signalfx/signalfx.go
Expand Up @@ -47,15 +47,16 @@ func (c *collection) submit(ctx context.Context, cl *trace.Client) error {
errorCh := make(chan error, len(c.pointsByKey)+1)

submitOne := func(client dpsink.Sink, points []*datapoint.Datapoint) {
span, ctx := trace.StartSpanFromContext(ctx, "")
span, childCtx := trace.StartSpanFromContext(ctx, "")
span.SetTag("datapoint_count", len(points))
defer span.ClientFinish(cl)
err := client.AddDatapoints(ctx, points)
defer wg.Done()
err := client.AddDatapoints(childCtx, points)
if err != nil {
span.Error(err)
span.Add(ssf.Count("flush.error_total", 1, map[string]string{"cause": "io", "sink": "signalfx"}))
errorCh <- err
}
wg.Done()
}

wg.Add(1)
Expand Down Expand Up @@ -152,7 +153,7 @@ func (sfx *SignalFxSink) newPointCollection() *collection {

// Flush sends metrics to SignalFx
func (sfx *SignalFxSink) Flush(ctx context.Context, interMetrics []samplers.InterMetric) error {
span, ctx := trace.StartSpanFromContext(ctx, "")
span, subCtx := trace.StartSpanFromContext(ctx, "")
defer span.ClientFinish(sfx.traceClient)

flushStart := time.Now()
Expand Down Expand Up @@ -211,7 +212,7 @@ func (sfx *SignalFxSink) Flush(ctx context.Context, interMetrics []samplers.Inte
}
tags := map[string]string{"sink": "signalfx"}
span.Add(ssf.Count(sinks.MetricKeyTotalMetricsSkipped, float32(countSkipped), tags))
err := coll.submit(ctx, sfx.traceClient)
err := coll.submit(subCtx, sfx.traceClient)
if err != nil {
span.Error(err)
}
Expand Down

0 comments on commit 80f39bc

Please sign in to comment.