Skip to content

Commit

Permalink
Adding locks where context is accessed
Browse files Browse the repository at this point in the history
This commit addresses data race issues where span.context is accessed without locks in methods which can be called concurrently with method setBaggageItem which modifies context

Per issue #526

Signed-off-by: Keerthana Selvakumar <keerukeerthana8@gmail.com>
  • Loading branch information
keer25 authored and kselvaku committed Aug 26, 2020
1 parent cf8927b commit b138fa1
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 21 deletions.
38 changes: 20 additions & 18 deletions span.go
Expand Up @@ -99,6 +99,9 @@ func (s *Span) SetTag(key string, value interface{}) opentracing.Span {
return s.setTagInternal(key, value, true)
}

// setTagInternal sets tags in a thread-safe manner if lock param is set to true.
// lock param can be set to false if concurrent access in not expected like when span is created.
// The caller shouldn't obtain any lock on the span while calling this as it will lead to a deadlock.
func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span {
s.observer.OnSetTag(key, value)
if key == string(ext.SamplingPriority) && !setSamplingPriority(s, value) {
Expand All @@ -120,8 +123,8 @@ func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentrac

// SpanContext returns span context
func (s *Span) SpanContext() SpanContext {
s.Lock()
defer s.Unlock()
s.RLock()
defer s.RUnlock()
return s.context
}

Expand Down Expand Up @@ -345,7 +348,7 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
decision := s.tracer.sampler.OnFinishSpan(s)
s.applySamplingDecision(decision, true)
}
if s.context.IsSampled() {
if s.SpanContext().IsSampled() {
s.Lock()
s.fixLogsIfDropped()
if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 {
Expand All @@ -366,8 +369,8 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {

// Context implements opentracing.Span API
func (s *Span) Context() opentracing.SpanContext {
s.Lock()
defer s.Unlock()
s.RLock()
defer s.RUnlock()
return s.context
}

Expand Down Expand Up @@ -427,10 +430,10 @@ func (s *Span) serviceName() string {

func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) {
if !decision.Retryable {
s.context.samplingState.setFinal()
s.SpanContext().samplingState.setFinal()
}
if decision.Sample {
s.context.samplingState.setSampled()
s.SpanContext().samplingState.setSampled()
if len(decision.Tags) > 0 {
if lock {
s.Lock()
Expand All @@ -445,12 +448,12 @@ func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) {

// Span can be written to if it is sampled or the sampling decision has not been finalized.
func (s *Span) isWriteable() bool {
state := s.context.samplingState
state := s.SpanContext().samplingState
return !state.isFinal() || state.isSampled()
}

func (s *Span) isSamplingFinalized() bool {
return s.context.samplingState.isFinal()
return s.SpanContext().samplingState.isFinal()
}

// setSamplingPriority returns true if the flag was updated successfully, false otherwise.
Expand All @@ -469,26 +472,25 @@ func setSamplingPriority(s *Span, value interface{}) bool {
if !ok {
return false
}
ctx := s.SpanContext()
if val == 0 {
s.context.samplingState.unsetSampled()
s.context.samplingState.setFinal()
ctx.samplingState.unsetSampled()
ctx.samplingState.setFinal()
return true
}
if s.tracer.options.noDebugFlagOnForcedSampling {
s.context.samplingState.setSampled()
s.context.samplingState.setFinal()
ctx.samplingState.setSampled()
ctx.samplingState.setFinal()
return true
} else if s.tracer.isDebugAllowed(s.operationName) {
s.context.samplingState.setDebugAndSampled()
s.context.samplingState.setFinal()
ctx.samplingState.setDebugAndSampled()
ctx.samplingState.setFinal()
return true
}
return false
}

// EnableFirehose enables firehose flag on the span context
func EnableFirehose(s *Span) {
s.Lock()
defer s.Unlock()
s.context.samplingState.setFirehose()
s.SpanContext().samplingState.setFirehose()
}
11 changes: 10 additions & 1 deletion span_test.go
Expand Up @@ -363,7 +363,6 @@ func TestSpan_References(t *testing.T) {
}

func TestSpanContextRaces(t *testing.T) {
t.Skip("Skipped: test will panic with -race, see https://github.com/jaegertracing/jaeger-client-go/issues/526")
tracer, closer := NewTracer("test", NewConstSampler(true), NewNullReporter())
defer closer.Close()

Expand Down Expand Up @@ -395,6 +394,16 @@ func TestSpanContextRaces(t *testing.T) {
go accessor(func() {
span.BaggageItem("k")
})
go accessor(func() {
ext.SamplingPriority.Set(span, 0)
})
go accessor(func() {
EnableFirehose(span)
})
go accessor(func() {
span.SpanContext().samplingState.setFlag(flagDebug)
})
time.Sleep(100 * time.Millisecond)
span.Finish()
close(end)
}
4 changes: 2 additions & 2 deletions tracer.go
Expand Up @@ -439,7 +439,7 @@ func (t *Tracer) emitNewSpanMetrics(sp *Span, newTrace bool) {
func (t *Tracer) reportSpan(sp *Span) {
if !sp.isSamplingFinalized() {
t.metrics.SpansFinishedDelayedSampling.Inc(1)
} else if sp.context.IsSampled() {
} else if sp.SpanContext().IsSampled() {
t.metrics.SpansFinishedSampled.Inc(1)
} else {
t.metrics.SpansFinishedNotSampled.Inc(1)
Expand All @@ -448,7 +448,7 @@ func (t *Tracer) reportSpan(sp *Span) {
// Note: if the reporter is processing Span asynchronously then it needs to Retain() the span,
// and then Release() it when no longer needed.
// Otherwise, the span may be reused for another trace and its data may be overwritten.
if sp.context.IsSampled() {
if sp.SpanContext().IsSampled() {
t.reporter.Report(sp)
}

Expand Down

0 comments on commit b138fa1

Please sign in to comment.