Skip to content

Commit

Permalink
Register/unregister in the fixed order (#1198)
Browse files Browse the repository at this point in the history
- change the processors' map to array
- increase test coverage

Signed-off-by: Hui Kang <kangh@us.ibm.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
huikang and MrAlias committed Sep 24, 2020
1 parent 559fecd commit b97533a
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -65,6 +65,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Rename `NewProvider` to `NewTracerProvider` in the `go.opentelemetry.io/otel/sdk/trace` package. (#1190)
- Renamed `SamplingDecision` values to comply with OpenTelemetry specification change. (#1192)
- Renamed Zipkin attribute names from `ot.status_code & ot.status_description` to `otel.status_code & otel.status_description`. (#1201)
- The default SDK now invokes registered `SpanProcessor`s in the order they were registered with the `TracerProvider`. (#1195)

### Removed

Expand Down
43 changes: 30 additions & 13 deletions sdk/trace/provider.go
Expand Up @@ -109,32 +109,49 @@ func (p *TracerProvider) Tracer(name string, opts ...apitrace.TracerOption) apit
func (p *TracerProvider) RegisterSpanProcessor(s SpanProcessor) {
p.mu.Lock()
defer p.mu.Unlock()
new := make(spanProcessorMap)
if old, ok := p.spanProcessors.Load().(spanProcessorMap); ok {
for k, v := range old {
new[k] = v
}
new := spanProcessorStates{}
if old, ok := p.spanProcessors.Load().(spanProcessorStates); ok {
new = append(new, old...)
}
newSpanSync := &spanProcessorState{
sp: s,
state: &sync.Once{},
}
new[s] = &sync.Once{}
new = append(new, newSpanSync)
p.spanProcessors.Store(new)
}

// UnregisterSpanProcessor removes the given SpanProcessor from the list of SpanProcessors
func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) {
p.mu.Lock()
defer p.mu.Unlock()
new := make(spanProcessorMap)
if old, ok := p.spanProcessors.Load().(spanProcessorMap); ok {
for k, v := range old {
new[k] = v
new := spanProcessorStates{}
old, ok := p.spanProcessors.Load().(spanProcessorStates)
if !ok || len(old) == 0 {
return
}
new = append(new, old...)

// stop the span processor if it is started and remove it from the list
var stopOnce *spanProcessorState
var idx int
for i, sps := range new {
if sps.sp == s {
stopOnce = sps
idx = i
}
}
if stopOnce, ok := new[s]; ok && stopOnce != nil {
stopOnce.Do(func() {
if stopOnce != nil {
stopOnce.state.Do(func() {
s.Shutdown()
})
}
delete(new, s)
if len(new) > 1 {
copy(new[idx:], new[idx+1:])
}
new[len(new)-1] = nil
new = new[:len(new)-1]

p.spanProcessors.Store(new)
}

Expand Down
8 changes: 4 additions & 4 deletions sdk/trace/span.go
Expand Up @@ -139,17 +139,17 @@ func (s *span) End(options ...apitrace.SpanOption) {
}
config := apitrace.NewSpanConfig(options...)
s.endOnce.Do(func() {
sps, _ := s.tracer.provider.spanProcessors.Load().(spanProcessorMap)
mustExportOrProcess := len(sps) > 0
sps, ok := s.tracer.provider.spanProcessors.Load().(spanProcessorStates)
mustExportOrProcess := ok && len(sps) > 0
if mustExportOrProcess {
sd := s.makeSpanData()
if config.Timestamp.IsZero() {
sd.EndTime = internal.MonotonicEndTime(sd.StartTime)
} else {
sd.EndTime = config.Timestamp
}
for sp := range sps {
sp.OnEnd(sd)
for _, sp := range sps {
sp.sp.OnEnd(sd)
}
}
})
Expand Down
6 changes: 5 additions & 1 deletion sdk/trace/span_processor.go
Expand Up @@ -43,4 +43,8 @@ type SpanProcessor interface {
ForceFlush()
}

type spanProcessorMap map[SpanProcessor]*sync.Once
type spanProcessorState struct {
sp SpanProcessor
state *sync.Once
}
type spanProcessorStates []*spanProcessorState
61 changes: 40 additions & 21 deletions sdk/trace/span_processor_test.go
Expand Up @@ -45,47 +45,66 @@ func (t *testSpanProcesor) ForceFlush() {
func TestRegisterSpanProcessort(t *testing.T) {
name := "Register span processor before span starts"
tp := basicTracerProvider(t)
sp := NewTestSpanProcessor()
tp.RegisterSpanProcessor(sp)
sps := []*testSpanProcesor{
NewTestSpanProcessor(),
NewTestSpanProcessor(),
}

for _, sp := range sps {
tp.RegisterSpanProcessor(sp)
}

tr := tp.Tracer("SpanProcessor")
_, span := tr.Start(context.Background(), "OnStart")
span.End()
wantCount := 1
gotCount := len(sp.spansStarted)
if gotCount != wantCount {
t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount)
}
gotCount = len(sp.spansEnded)
if gotCount != wantCount {
t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount)

for _, sp := range sps {
gotCount := len(sp.spansStarted)
if gotCount != wantCount {
t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount)
}
gotCount = len(sp.spansEnded)
if gotCount != wantCount {
t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount)
}
}
}

func TestUnregisterSpanProcessor(t *testing.T) {
name := "Start span after unregistering span processor"
tp := basicTracerProvider(t)
sp := NewTestSpanProcessor()
tp.RegisterSpanProcessor(sp)
sps := []*testSpanProcesor{
NewTestSpanProcessor(),
NewTestSpanProcessor(),
}

for _, sp := range sps {
tp.RegisterSpanProcessor(sp)
}

tr := tp.Tracer("SpanProcessor")
_, span := tr.Start(context.Background(), "OnStart")
span.End()
tp.UnregisterSpanProcessor(sp)
for _, sp := range sps {
tp.UnregisterSpanProcessor(sp)
}

// start another span after unregistering span processor.
_, span = tr.Start(context.Background(), "Start span after unregister")
span.End()

wantCount := 1
gotCount := len(sp.spansStarted)
if gotCount != wantCount {
t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount)
}

gotCount = len(sp.spansEnded)
if gotCount != wantCount {
t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount)
for _, sp := range sps {
wantCount := 1
gotCount := len(sp.spansStarted)
if gotCount != wantCount {
t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount)
}

gotCount = len(sp.spansEnded)
if gotCount != wantCount {
t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount)
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions sdk/trace/tracer.go
Expand Up @@ -58,9 +58,9 @@ func (tr *tracer) Start(ctx context.Context, name string, options ...apitrace.Sp
span.tracer = tr

if span.IsRecording() {
sps, _ := tr.provider.spanProcessors.Load().(spanProcessorMap)
for sp := range sps {
sp.OnStart(span.data)
sps, _ := tr.provider.spanProcessors.Load().(spanProcessorStates)
for _, sp := range sps {
sp.sp.OnStart(span.data)
}
}

Expand Down

0 comments on commit b97533a

Please sign in to comment.