Skip to content

Commit

Permalink
Log warning when a trace attribute/event/link is discarded due to lim…
Browse files Browse the repository at this point in the history
…its (#5434)

Fix #5343

- Update the `evictionQueue` to log when it drops a value
- Update the `evictionQueue` to be declared over an `[T any]` parameter
so it knows what to log when it is dropping a value and to reduce the
`interface{}` allocation
- Add a `clone` method to replace the now unneeded
`interfaceArrayTo*Array` functions.
- Update the `recordingSpan` to log once that is dropped an attribute
when limits are reached.
  • Loading branch information
MrAlias committed May 30, 2024
1 parent fad23ee commit 5bfa9c5
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Log a warning to the OpenTelemetry internal logger when a `Record` in `go.opentelemetry.io/otel/sdk/log` drops an attribute due to a limit being reached. (#5376)
- Identify the `Tracer` returned from the global `TracerProvider` in `go.opentelemetry.io/otel/global` with its schema URL. (#5426)
- Identify the `Meter` returned from the global `MeterProvider` in `go.opentelemetry.io/otel/global` with its schema URL. (#5426)
- Log a warning to the OpenTelemetry internal logger when a `Span` in `go.opentelemetry.io/otel/sdk/trace` drops an attribute, event, or link due to a limit being reached. (#5434)

## [1.27.0/0.49.0/0.3.0] 2024-05-21

Expand Down
31 changes: 26 additions & 5 deletions sdk/trace/evictedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,38 @@

package trace // import "go.opentelemetry.io/otel/sdk/trace"

import (
"fmt"
"slices"
"sync"

"go.opentelemetry.io/otel/internal/global"
)

// evictedQueue is a FIFO queue with a configurable capacity.
type evictedQueue struct {
queue []interface{}
type evictedQueue[T any] struct {
queue []T
capacity int
droppedCount int
logDropped func()
}

func newEvictedQueue(capacity int) evictedQueue {
func newEvictedQueue[T any](capacity int) evictedQueue[T] {
var tVal T
msg := fmt.Sprintf("limit reached: dropping trace %T", tVal)
// Do not pre-allocate queue, do this lazily.
return evictedQueue{capacity: capacity}
return evictedQueue[T]{
capacity: capacity,
logDropped: sync.OnceFunc(func() { global.Warn(msg) }),
}
}

// add adds value to the evictedQueue eq. If eq is at capacity, the oldest
// queued value will be discarded and the drop count incremented.
func (eq *evictedQueue) add(value interface{}) {
func (eq *evictedQueue[T]) add(value T) {
if eq.capacity == 0 {
eq.droppedCount++
eq.logDropped()
return
}

Expand All @@ -28,6 +43,12 @@ func (eq *evictedQueue) add(value interface{}) {
copy(eq.queue[:eq.capacity-1], eq.queue[1:])
eq.queue = eq.queue[:eq.capacity-1]
eq.droppedCount++
eq.logDropped()
}
eq.queue = append(eq.queue, value)
}

// copy returns a copy of the evictedQueue.
func (eq *evictedQueue[T]) copy() []T {
return slices.Clone(eq.queue)
}
31 changes: 22 additions & 9 deletions sdk/trace/evictedqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,47 @@ package trace
import (
"reflect"
"testing"

"github.com/stretchr/testify/assert"
)

func init() {
}

func TestAdd(t *testing.T) {
q := newEvictedQueue(3)
q := newEvictedQueue[string](3)
q.add("value1")
q.add("value2")
if wantLen, gotLen := 2, len(q.queue); wantLen != gotLen {
t.Errorf("got queue length %d want %d", gotLen, wantLen)
}
}

func (eq *evictedQueue) queueToArray() []string {
arr := make([]string, 0)
for _, value := range eq.queue {
arr = append(arr, value.(string))
}
return arr
func TestCopy(t *testing.T) {
q := newEvictedQueue[string](3)
q.add("value1")
cp := q.copy()

q.add("value2")
assert.Equal(t, []string{"value1"}, cp, "queue update modified copy")

cp[0] = "value0"
assert.Equal(t, "value1", q.queue[0], "copy update modified queue")
}

func TestDropCount(t *testing.T) {
q := newEvictedQueue(3)
q := newEvictedQueue[string](3)
var called bool
q.logDropped = func() { called = true }

q.add("value1")
assert.False(t, called, `"value1" logged as dropped`)
q.add("value2")
assert.False(t, called, `"value2" logged as dropped`)
q.add("value3")
assert.False(t, called, `"value3" logged as dropped`)
q.add("value1")
assert.True(t, called, `"value2" not logged as dropped`)
q.add("value4")
if wantLen, gotLen := 3, len(q.queue); wantLen != gotLen {
t.Errorf("got queue length %d want %d", gotLen, wantLen)
Expand All @@ -42,7 +55,7 @@ func TestDropCount(t *testing.T) {
t.Errorf("got drop count %d want %d", gotDropCount, wantDropCount)
}
wantArr := []string{"value3", "value1", "value4"}
gotArr := q.queueToArray()
gotArr := q.copy()

if wantLen, gotLen := len(wantArr), len(gotArr); gotLen != wantLen {
t.Errorf("got array len %d want %d", gotLen, wantLen)
Expand Down
54 changes: 28 additions & 26 deletions sdk/trace/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/internal"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -137,12 +138,13 @@ type recordingSpan struct {
// ReadOnlySpan exported when the span ends.
attributes []attribute.KeyValue
droppedAttributes int
logDropAttrsOnce sync.Once

// events are stored in FIFO queue capped by configured limit.
events evictedQueue
events evictedQueue[Event]

// links are stored in FIFO queue capped by configured limit.
links evictedQueue
links evictedQueue[Link]

// executionTracerTaskEnd ends the execution tracer span.
executionTracerTaskEnd func()
Expand Down Expand Up @@ -219,7 +221,7 @@ func (s *recordingSpan) SetAttributes(attributes ...attribute.KeyValue) {
limit := s.tracer.provider.spanLimits.AttributeCountLimit
if limit == 0 {
// No attributes allowed.
s.droppedAttributes += len(attributes)
s.addDroppedAttr(len(attributes))
return
}

Expand All @@ -236,14 +238,30 @@ func (s *recordingSpan) SetAttributes(attributes ...attribute.KeyValue) {
for _, a := range attributes {
if !a.Valid() {
// Drop all invalid attributes.
s.droppedAttributes++
s.addDroppedAttr(1)
continue
}
a = truncateAttr(s.tracer.provider.spanLimits.AttributeValueLengthLimit, a)
s.attributes = append(s.attributes, a)
}
}

// Declared as a var so tests can override.
var logDropAttrs = func() {
global.Warn("limit reached: dropping trace Span attributes")
}

// addDroppedAttr adds incr to the count of dropped attributes.
//
// The first, and only the first, time this method is called a warning will be
// logged.
//
// This method assumes s.mu.Lock is held by the caller.
func (s *recordingSpan) addDroppedAttr(incr int) {
s.droppedAttributes += incr
s.logDropAttrsOnce.Do(logDropAttrs)
}

// addOverCapAttrs adds the attributes attrs to the span s while
// de-duplicating the attributes of s and attrs and dropping attributes that
// exceed the limit.
Expand Down Expand Up @@ -273,7 +291,7 @@ func (s *recordingSpan) addOverCapAttrs(limit int, attrs []attribute.KeyValue) {
for _, a := range attrs {
if !a.Valid() {
// Drop all invalid attributes.
s.droppedAttributes++
s.addDroppedAttr(1)
continue
}

Expand All @@ -286,7 +304,7 @@ func (s *recordingSpan) addOverCapAttrs(limit int, attrs []attribute.KeyValue) {
if len(s.attributes) >= limit {
// Do not just drop all of the remaining attributes, make sure
// updates are checked and performed.
s.droppedAttributes++
s.addDroppedAttr(1)
} else {
a = truncateAttr(s.tracer.provider.spanLimits.AttributeValueLengthLimit, a)
s.attributes = append(s.attributes, a)
Expand Down Expand Up @@ -585,7 +603,7 @@ func (s *recordingSpan) Links() []Link {
if len(s.links.queue) == 0 {
return []Link{}
}
return s.interfaceArrayToLinksArray()
return s.links.copy()
}

// Events returns the events of this span.
Expand All @@ -595,7 +613,7 @@ func (s *recordingSpan) Events() []Event {
if len(s.events.queue) == 0 {
return []Event{}
}
return s.interfaceArrayToEventArray()
return s.events.copy()
}

// Status returns the status of this span.
Expand Down Expand Up @@ -717,32 +735,16 @@ func (s *recordingSpan) snapshot() ReadOnlySpan {
}
sd.droppedAttributeCount = s.droppedAttributes
if len(s.events.queue) > 0 {
sd.events = s.interfaceArrayToEventArray()
sd.events = s.events.copy()
sd.droppedEventCount = s.events.droppedCount
}
if len(s.links.queue) > 0 {
sd.links = s.interfaceArrayToLinksArray()
sd.links = s.links.copy()
sd.droppedLinkCount = s.links.droppedCount
}
return &sd
}

func (s *recordingSpan) interfaceArrayToLinksArray() []Link {
linkArr := make([]Link, 0)
for _, value := range s.links.queue {
linkArr = append(linkArr, value.(Link))
}
return linkArr
}

func (s *recordingSpan) interfaceArrayToEventArray() []Event {
eventArr := make([]Event, 0)
for _, value := range s.events.queue {
eventArr = append(eventArr, value.(Event))
}
return eventArr
}

func (s *recordingSpan) addChild() {
if !s.IsRecording() {
return
Expand Down
16 changes: 16 additions & 0 deletions sdk/trace/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,22 @@ func TestTruncateAttr(t *testing.T) {
}
}

func TestLogDropAttrs(t *testing.T) {
orig := logDropAttrs
t.Cleanup(func() { logDropAttrs = orig })

var called bool
logDropAttrs = func() { called = true }

s := &recordingSpan{}
s.addDroppedAttr(1)
assert.True(t, called, "logDropAttrs not called")

called = false
s.addDroppedAttr(1)
assert.False(t, called, "logDropAttrs called multiple times for same Span")
}

func BenchmarkRecordingSpanSetAttributes(b *testing.B) {
var attrs []attribute.KeyValue
for i := 0; i < 100; i++ {
Expand Down
4 changes: 2 additions & 2 deletions sdk/trace/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ func (tr *tracer) newRecordingSpan(psc, sc trace.SpanContext, name string, sr Sa
spanKind: trace.ValidateSpanKind(config.SpanKind()),
name: name,
startTime: startTime,
events: newEvictedQueue(tr.provider.spanLimits.EventCountLimit),
links: newEvictedQueue(tr.provider.spanLimits.LinkCountLimit),
events: newEvictedQueue[Event](tr.provider.spanLimits.EventCountLimit),
links: newEvictedQueue[Link](tr.provider.spanLimits.LinkCountLimit),
tracer: tr,
}

Expand Down

0 comments on commit 5bfa9c5

Please sign in to comment.