Skip to content

Commit

Permalink
go back to otel-go attribute.Set
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd committed Apr 5, 2023
1 parent 12528d1 commit de35032
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 35 deletions.
10 changes: 8 additions & 2 deletions processor/batchprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ regardless of size.
It must be greater than or equal to `send_batch_size`.
- `metadata_keys` (default = empty): When set, this processor will
create one batcher instance per distinct combination of values in
the `client.Metadata`. See notes about metadata batching below.
the `client.Metadata`.
- `metadata_cardinality_limit` (default = 100): When `metadata_keys` is
not empty, this setting limits the number of unique combinations of
metadata key values that will be processed over the lifetime of the
process.

See notes about metadata batching below.

Examples:

Expand All @@ -50,7 +56,7 @@ examples on using the processor.
Batching by metadata enables support for multi-tenant OpenTelemetry
Collector pipelines with batching over groups of data having the same
authorization metadata. For example
authorization metadata. For example:
```yaml
processors:
Expand Down
36 changes: 7 additions & 29 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/otel/attribute"
)

// errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached.
Expand Down Expand Up @@ -84,7 +85,7 @@ type batchProcessor struct {
singleton *batcher

lock sync.Mutex
batchers map[attributeSet]*batcher
batchers map[attribute.Set]*batcher
}

// batcher is a single instance of the batcher logic. When metadata
Expand Down Expand Up @@ -169,29 +170,6 @@ func (bp *batchProcessor) newBatcher(md map[string][]string) *batcher {
return b
}

// newAttributeSet is like otel-go's attribute.NewSet(attrs...).
func newAttributeSet(attrs ...attributeKeyValue) attributeSet {
// Note: Key uniqueness is ensured in (Config).Validate()
// and sorted order is ensured in newBatchProcessor().
var aset attributeSet
for _, attr := range attrs {
aset = attributeSet(fmt.Sprint(aset, ";", attr))
}
return aset
}

// attributeString is like otel-go's attribute.String, for input
// to newAttributeSet.
func attributeString(k string, v string) attributeKeyValue {
return attributeKeyValue(fmt.Sprint(k, "=", v))
}

// attributeStringSlice is like otel-go's attribute.StringSlice, for
// input to newAttributeSet.
func attributeStringSlice(k string, v []string) attributeKeyValue {
return attributeKeyValue(fmt.Sprint(k, "∈", strings.Join(v, ",")))
}

func (bp *batchProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}
Expand All @@ -202,7 +180,7 @@ func (bp *batchProcessor) Start(context.Context, component.Host) error {
if len(bp.metadataKeys) == 0 {
bp.singleton = bp.newBatcher(nil)
} else {
bp.batchers = map[attributeSet]*batcher{}
bp.batchers = map[attribute.Set]*batcher{}
}
return nil
}
Expand Down Expand Up @@ -298,20 +276,20 @@ func (bp *batchProcessor) findBatcher(ctx context.Context) (*batcher, error) {
// attribute set for use as a map lookup key.
info := client.FromContext(ctx)
md := map[string][]string{}
var attrs []attributeKeyValue
var attrs []attribute.KeyValue
for _, k := range bp.metadataKeys {
// Lookup the value in the incoming metadata, copy it
// into the outgoing metadata, and create a unique
// value for the attributeSet.
vs := info.Metadata.Get(k)
md[k] = vs
if len(vs) == 1 {
attrs = append(attrs, attributeString(k, vs[0]))
attrs = append(attrs, attribute.String(k, vs[0]))
} else {
attrs = append(attrs, attributeStringSlice(k, vs))
attrs = append(attrs, attribute.StringSlice(k, vs))
}
}
aset := newAttributeSet(attrs...)
aset := attribute.NewSet(attrs...)

bp.lock.Lock()
defer bp.lock.Unlock()
Expand Down
4 changes: 0 additions & 4 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,10 +944,6 @@ func TestBatchProcessorDuplicateMetadataKeys(t *testing.T) {
require.Contains(t, err.Error(), "mytoken")
}

func TestAttributeEmptyStringUniqueness(t *testing.T) {
require.NotEqual(t, attributeStringSlice("key", nil), attributeString("key", ""))
}

func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) {
const cardLimit = 10

Expand Down

0 comments on commit de35032

Please sign in to comment.