Skip to content

Commit

Permalink
Change multiBatcher to use sync.Map, avoid global lock on fast path
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed May 23, 2023
1 parent 179d4be commit 1218853
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 77 deletions.
11 changes: 11 additions & 0 deletions .chloggen/sharder.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: batchprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change multiBatcher to use sync.Map, avoid global lock on fast path

# One or more tracking issues or pull requests related to the change
issues: [7714]
144 changes: 67 additions & 77 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,42 +61,28 @@ type batchProcessor struct {

telemetry *batchProcessorTelemetry

// batcherFinder will be either *singletonBatcher or *multiBatcher
batcherFinder
// batcher will be either *singletonBatcher or *multiBatcher
batcher batcher
}

type batcherFinder interface {
findBatcher(ctx context.Context) (*batcher, error)
type batcher interface {
consume(ctx context.Context, data any) error
currentMetadataCardinality() int
}

// singleBatcher is used when metadataKeys is empty, to avoid the
// additional lock and map operations used in multiBatcher.
type singleBatcher struct {
*batcher
}

// multiBatcher is used when metadataKeys is not empty.
type multiBatcher struct {
*batchProcessor

lock sync.Mutex
batchers map[attribute.Set]*batcher
}

// batcher is a single instance of the batcher logic. When metadata
// shard is a single instance of the batch logic. When metadata
// keys are in use, one of these is created per distinct combination
// of values.
type batcher struct {
type shard struct {
// processor refers to this processor, for access to common
// configuration.
processor *batchProcessor

// exportCtx is a context with the metadata key-values
// corresponding with this batcher set.
// corresponding with this shard set.
exportCtx context.Context

// timer informs the batcher send a batch.
// timer informs the shard send a batch.
timer *time.Timer

// newItem is used to receive data items from producers.
Expand Down Expand Up @@ -143,15 +129,14 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func
metadataLimit: int(cfg.MetadataCardinalityLimit),
}
if len(bp.metadataKeys) == 0 {
bp.batcherFinder = &singleBatcher{bp.newBatcher(nil)}
bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)}
} else {
bp.batcherFinder = &multiBatcher{
bp.batcher = &multiShardBatcher{
batchProcessor: bp,
batchers: map[attribute.Set]*batcher{},
}
}

bpt, err := newBatchProcessorTelemetry(set, bp.currentMetadataCardinality, useOtel)
bpt, err := newBatchProcessorTelemetry(set, bp.batcher.currentMetadataCardinality, useOtel)
if err != nil {
return nil, fmt.Errorf("error creating batch processor telemetry: %w", err)
}
Expand All @@ -160,12 +145,12 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func
return bp, nil
}

// newBatcher gets or creates a batcher corresponding with attrs.
func (bp *batchProcessor) newBatcher(md map[string][]string) *batcher {
// newShard gets or creates a batcher corresponding with attrs.
func (bp *batchProcessor) newShard(md map[string][]string) *shard {
exportCtx := client.NewContext(context.Background(), client.Info{
Metadata: client.NewMetadata(md),
})
b := &batcher{
b := &shard{
processor: bp,
newItem: make(chan any, runtime.NumCPU()),
exportCtx: exportCtx,
Expand Down Expand Up @@ -194,7 +179,7 @@ func (bp *batchProcessor) Shutdown(context.Context) error {
return nil
}

func (b *batcher) start() {
func (b *shard) start() {
defer b.processor.goroutines.Done()

// timerCh ensures we only block when there is a
Expand Down Expand Up @@ -237,7 +222,7 @@ func (b *batcher) start() {
}
}

func (b *batcher) processItem(item any) {
func (b *shard) processItem(item any) {
b.batch.add(item)
sent := false
for b.batch.itemCount() > 0 && (!b.hasTimer() || b.batch.itemCount() >= b.processor.sendBatchSize) {
Expand All @@ -251,23 +236,23 @@ func (b *batcher) processItem(item any) {
}
}

func (b *batcher) hasTimer() bool {
func (b *shard) hasTimer() bool {
return b.timer != nil
}

func (b *batcher) stopTimer() {
func (b *shard) stopTimer() {
if b.hasTimer() && !b.timer.Stop() {
<-b.timer.C
}
}

func (b *batcher) resetTimer() {
func (b *shard) resetTimer() {
if b.hasTimer() {
b.timer.Reset(b.processor.timeout)
}
}

func (b *batcher) sendItems(trigger trigger) {
func (b *shard) sendItems(trigger trigger) {
sent, bytes, err := b.batch.export(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed)
if err != nil {
b.processor.logger.Warn("Sender failed", zap.Error(err))
Expand All @@ -276,11 +261,33 @@ func (b *batcher) sendItems(trigger trigger) {
}
}

func (sb *singleBatcher) findBatcher(_ context.Context) (*batcher, error) {
return sb.batcher, nil
// singleShardBatcher is used when metadataKeys is empty, to avoid the
// additional lock and map operations used in multiBatcher.
type singleShardBatcher struct {
batcher *shard
}

func (sb *singleShardBatcher) consume(_ context.Context, data any) error {
sb.batcher.newItem <- data
return nil
}

func (sb *singleShardBatcher) currentMetadataCardinality() int {
return 1
}

// multiBatcher is used when metadataKeys is not empty.
type multiShardBatcher struct {
*batchProcessor
batchers sync.Map

// Guards the size and the storing logic to ensure no more than limit items are stored.
// If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic.
lock sync.Mutex
size int
}

func (mb *multiBatcher) findBatcher(ctx context.Context) (*batcher, error) {
func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
// Get each metadata key value, form the corresponding
// attribute set for use as a map lookup key.
info := client.FromContext(ctx)
Expand All @@ -300,63 +307,46 @@ func (mb *multiBatcher) findBatcher(ctx context.Context) (*batcher, error) {
}
aset := attribute.NewSet(attrs...)

mb.lock.Lock()
defer mb.lock.Unlock()

b, ok := mb.batchers[aset]
if ok {
return b, nil
}
b, ok := mb.batchers.Load(aset)
if !ok {
mb.lock.Lock()
if mb.metadataLimit != 0 && mb.size >= mb.metadataLimit {
mb.lock.Unlock()
return errTooManyBatchers
}

if limit := mb.metadataLimit; limit != 0 && len(mb.batchers) >= limit {
return nil, errTooManyBatchers
// aset.ToSlice() returns the sorted, deduplicated,
// and name-downcased list of attributes.
var loaded bool
b, loaded = mb.batchers.LoadOrStore(aset, mb.newShard(md))
if !loaded {
mb.size++
}
mb.lock.Unlock()
}

// aset.ToSlice() returns the sorted, deduplicated,
// and name-downcased list of attributes.
b = mb.newBatcher(md)
mb.batchers[aset] = b
return b, nil
}

func (sb *singleBatcher) currentMetadataCardinality() int {
return 1
b.(*shard).newItem <- data
return nil
}

func (mb *multiBatcher) currentMetadataCardinality() int {
func (mb *multiShardBatcher) currentMetadataCardinality() int {

Check warning on line 331 in processor/batchprocessor/batch_processor.go

View check run for this annotation

Codecov / codecov/patch

processor/batchprocessor/batch_processor.go#L331

Added line #L331 was not covered by tests
mb.lock.Lock()
defer mb.lock.Unlock()
return len(mb.batchers)
return mb.size

Check warning on line 334 in processor/batchprocessor/batch_processor.go

View check run for this annotation

Codecov / codecov/patch

processor/batchprocessor/batch_processor.go#L334

Added line #L334 was not covered by tests
}

// ConsumeTraces implements TracesProcessor
func (bp *batchProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
b, err := bp.findBatcher(ctx)
if err != nil {
return err
}
b.newItem <- td
return nil
return bp.batcher.consume(ctx, td)
}

// ConsumeMetrics implements MetricsProcessor
func (bp *batchProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
b, err := bp.findBatcher(ctx)
if err != nil {
return err
}
b.newItem <- md
return nil
return bp.batcher.consume(ctx, md)
}

// ConsumeLogs implements LogsProcessor
func (bp *batchProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
b, err := bp.findBatcher(ctx)
if err != nil {
return err
}
b.newItem <- ld
return nil
return bp.batcher.consume(ctx, ld)
}

// newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout
Expand Down

0 comments on commit 1218853

Please sign in to comment.