diff --git a/.chloggen/sharder.yaml b/.chloggen/sharder.yaml new file mode 100755 index 00000000000..a20fa89d4c4 --- /dev/null +++ b/.chloggen/sharder.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: batchprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Change multiBatcher to use sync.Map, avoid global lock on fast path + +# One or more tracking issues or pull requests related to the change +issues: [7714] diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 170240e5bc1..cb258867fb4 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -61,42 +61,28 @@ type batchProcessor struct { telemetry *batchProcessorTelemetry - // batcherFinder will be either *singletonBatcher or *multiBatcher - batcherFinder + // batcher will be either *singletonBatcher or *multiBatcher + batcher batcher } -type batcherFinder interface { - findBatcher(ctx context.Context) (*batcher, error) +type batcher interface { + consume(ctx context.Context, data any) error currentMetadataCardinality() int } -// singleBatcher is used when metadataKeys is empty, to avoid the -// additional lock and map operations used in multiBatcher. -type singleBatcher struct { - *batcher -} - -// multiBatcher is used when metadataKeys is not empty. -type multiBatcher struct { - *batchProcessor - - lock sync.Mutex - batchers map[attribute.Set]*batcher -} - -// batcher is a single instance of the batcher logic. When metadata +// shard is a single instance of the batch logic. When metadata // keys are in use, one of these is created per distinct combination // of values. -type batcher struct { +type shard struct { // processor refers to this processor, for access to common // configuration. processor *batchProcessor // exportCtx is a context with the metadata key-values - // corresponding with this batcher set. + // corresponding with this shard set. exportCtx context.Context - // timer informs the batcher send a batch. + // timer informs the shard send a batch. timer *time.Timer // newItem is used to receive data items from producers. @@ -143,15 +129,14 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func metadataLimit: int(cfg.MetadataCardinalityLimit), } if len(bp.metadataKeys) == 0 { - bp.batcherFinder = &singleBatcher{bp.newBatcher(nil)} + bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)} } else { - bp.batcherFinder = &multiBatcher{ + bp.batcher = &multiShardBatcher{ batchProcessor: bp, - batchers: map[attribute.Set]*batcher{}, } } - bpt, err := newBatchProcessorTelemetry(set, bp.currentMetadataCardinality, useOtel) + bpt, err := newBatchProcessorTelemetry(set, bp.batcher.currentMetadataCardinality, useOtel) if err != nil { return nil, fmt.Errorf("error creating batch processor telemetry: %w", err) } @@ -160,12 +145,12 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func return bp, nil } -// newBatcher gets or creates a batcher corresponding with attrs. -func (bp *batchProcessor) newBatcher(md map[string][]string) *batcher { +// newShard gets or creates a batcher corresponding with attrs. +func (bp *batchProcessor) newShard(md map[string][]string) *shard { exportCtx := client.NewContext(context.Background(), client.Info{ Metadata: client.NewMetadata(md), }) - b := &batcher{ + b := &shard{ processor: bp, newItem: make(chan any, runtime.NumCPU()), exportCtx: exportCtx, @@ -194,7 +179,7 @@ func (bp *batchProcessor) Shutdown(context.Context) error { return nil } -func (b *batcher) start() { +func (b *shard) start() { defer b.processor.goroutines.Done() // timerCh ensures we only block when there is a @@ -237,7 +222,7 @@ func (b *batcher) start() { } } -func (b *batcher) processItem(item any) { +func (b *shard) processItem(item any) { b.batch.add(item) sent := false for b.batch.itemCount() > 0 && (!b.hasTimer() || b.batch.itemCount() >= b.processor.sendBatchSize) { @@ -251,23 +236,23 @@ func (b *batcher) processItem(item any) { } } -func (b *batcher) hasTimer() bool { +func (b *shard) hasTimer() bool { return b.timer != nil } -func (b *batcher) stopTimer() { +func (b *shard) stopTimer() { if b.hasTimer() && !b.timer.Stop() { <-b.timer.C } } -func (b *batcher) resetTimer() { +func (b *shard) resetTimer() { if b.hasTimer() { b.timer.Reset(b.processor.timeout) } } -func (b *batcher) sendItems(trigger trigger) { +func (b *shard) sendItems(trigger trigger) { sent, bytes, err := b.batch.export(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed) if err != nil { b.processor.logger.Warn("Sender failed", zap.Error(err)) @@ -276,11 +261,33 @@ func (b *batcher) sendItems(trigger trigger) { } } -func (sb *singleBatcher) findBatcher(_ context.Context) (*batcher, error) { - return sb.batcher, nil +// singleShardBatcher is used when metadataKeys is empty, to avoid the +// additional lock and map operations used in multiBatcher. +type singleShardBatcher struct { + batcher *shard +} + +func (sb *singleShardBatcher) consume(_ context.Context, data any) error { + sb.batcher.newItem <- data + return nil +} + +func (sb *singleShardBatcher) currentMetadataCardinality() int { + return 1 +} + +// multiBatcher is used when metadataKeys is not empty. +type multiShardBatcher struct { + *batchProcessor + batchers sync.Map + + // Guards the size and the storing logic to ensure no more than limit items are stored. + // If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic. + lock sync.Mutex + size int } -func (mb *multiBatcher) findBatcher(ctx context.Context) (*batcher, error) { +func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { // Get each metadata key value, form the corresponding // attribute set for use as a map lookup key. info := client.FromContext(ctx) @@ -300,63 +307,46 @@ func (mb *multiBatcher) findBatcher(ctx context.Context) (*batcher, error) { } aset := attribute.NewSet(attrs...) - mb.lock.Lock() - defer mb.lock.Unlock() - - b, ok := mb.batchers[aset] - if ok { - return b, nil - } + b, ok := mb.batchers.Load(aset) + if !ok { + mb.lock.Lock() + if mb.metadataLimit != 0 && mb.size >= mb.metadataLimit { + mb.lock.Unlock() + return errTooManyBatchers + } - if limit := mb.metadataLimit; limit != 0 && len(mb.batchers) >= limit { - return nil, errTooManyBatchers + // aset.ToSlice() returns the sorted, deduplicated, + // and name-downcased list of attributes. + var loaded bool + b, loaded = mb.batchers.LoadOrStore(aset, mb.newShard(md)) + if !loaded { + mb.size++ + } + mb.lock.Unlock() } - - // aset.ToSlice() returns the sorted, deduplicated, - // and name-downcased list of attributes. - b = mb.newBatcher(md) - mb.batchers[aset] = b - return b, nil -} - -func (sb *singleBatcher) currentMetadataCardinality() int { - return 1 + b.(*shard).newItem <- data + return nil } -func (mb *multiBatcher) currentMetadataCardinality() int { +func (mb *multiShardBatcher) currentMetadataCardinality() int { mb.lock.Lock() defer mb.lock.Unlock() - return len(mb.batchers) + return mb.size } // ConsumeTraces implements TracesProcessor func (bp *batchProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - b, err := bp.findBatcher(ctx) - if err != nil { - return err - } - b.newItem <- td - return nil + return bp.batcher.consume(ctx, td) } // ConsumeMetrics implements MetricsProcessor func (bp *batchProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - b, err := bp.findBatcher(ctx) - if err != nil { - return err - } - b.newItem <- md - return nil + return bp.batcher.consume(ctx, md) } // ConsumeLogs implements LogsProcessor func (bp *batchProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { - b, err := bp.findBatcher(ctx) - if err != nil { - return err - } - b.newItem <- ld - return nil + return bp.batcher.consume(ctx, ld) } // newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout