Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change multiBatcher to use sync.Map, avoid global lock on fast path #7714

Merged
merged 1 commit into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .chloggen/sharder.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: batchprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change multiBatcher to use sync.Map, avoid global lock on fast path

# One or more tracking issues or pull requests related to the change
issues: [7714]
144 changes: 67 additions & 77 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,42 +61,28 @@

telemetry *batchProcessorTelemetry

// batcherFinder will be either *singletonBatcher or *multiBatcher
batcherFinder
// batcher will be either *singletonBatcher or *multiBatcher
batcher batcher
}

type batcherFinder interface {
findBatcher(ctx context.Context) (*batcher, error)
type batcher interface {
consume(ctx context.Context, data any) error
currentMetadataCardinality() int
}

// singleBatcher is used when metadataKeys is empty, to avoid the
// additional lock and map operations used in multiBatcher.
type singleBatcher struct {
*batcher
}

// multiBatcher is used when metadataKeys is not empty.
type multiBatcher struct {
*batchProcessor

lock sync.Mutex
batchers map[attribute.Set]*batcher
}

// batcher is a single instance of the batcher logic. When metadata
// shard is a single instance of the batch logic. When metadata
// keys are in use, one of these is created per distinct combination
// of values.
type batcher struct {
type shard struct {
// processor refers to this processor, for access to common
// configuration.
processor *batchProcessor

// exportCtx is a context with the metadata key-values
// corresponding with this batcher set.
// corresponding with this shard set.
exportCtx context.Context

// timer informs the batcher send a batch.
// timer informs the shard send a batch.
timer *time.Timer

// newItem is used to receive data items from producers.
Expand Down Expand Up @@ -143,15 +129,14 @@
metadataLimit: int(cfg.MetadataCardinalityLimit),
}
if len(bp.metadataKeys) == 0 {
bp.batcherFinder = &singleBatcher{bp.newBatcher(nil)}
bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)}
} else {
bp.batcherFinder = &multiBatcher{
bp.batcher = &multiShardBatcher{
batchProcessor: bp,
batchers: map[attribute.Set]*batcher{},
}
}

bpt, err := newBatchProcessorTelemetry(set, bp.currentMetadataCardinality, useOtel)
bpt, err := newBatchProcessorTelemetry(set, bp.batcher.currentMetadataCardinality, useOtel)
if err != nil {
return nil, fmt.Errorf("error creating batch processor telemetry: %w", err)
}
Expand All @@ -160,12 +145,12 @@
return bp, nil
}

// newBatcher gets or creates a batcher corresponding with attrs.
func (bp *batchProcessor) newBatcher(md map[string][]string) *batcher {
// newShard gets or creates a batcher corresponding with attrs.
func (bp *batchProcessor) newShard(md map[string][]string) *shard {
exportCtx := client.NewContext(context.Background(), client.Info{
Metadata: client.NewMetadata(md),
})
b := &batcher{
b := &shard{
processor: bp,
newItem: make(chan any, runtime.NumCPU()),
exportCtx: exportCtx,
Expand Down Expand Up @@ -194,7 +179,7 @@
return nil
}

func (b *batcher) start() {
func (b *shard) start() {
defer b.processor.goroutines.Done()

// timerCh ensures we only block when there is a
Expand Down Expand Up @@ -237,7 +222,7 @@
}
}

func (b *batcher) processItem(item any) {
func (b *shard) processItem(item any) {
b.batch.add(item)
sent := false
for b.batch.itemCount() > 0 && (!b.hasTimer() || b.batch.itemCount() >= b.processor.sendBatchSize) {
Expand All @@ -251,23 +236,23 @@
}
}

func (b *batcher) hasTimer() bool {
func (b *shard) hasTimer() bool {
return b.timer != nil
}

func (b *batcher) stopTimer() {
func (b *shard) stopTimer() {
if b.hasTimer() && !b.timer.Stop() {
<-b.timer.C
}
}

func (b *batcher) resetTimer() {
func (b *shard) resetTimer() {
if b.hasTimer() {
b.timer.Reset(b.processor.timeout)
}
}

func (b *batcher) sendItems(trigger trigger) {
func (b *shard) sendItems(trigger trigger) {
sent, bytes, err := b.batch.export(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed)
if err != nil {
b.processor.logger.Warn("Sender failed", zap.Error(err))
Expand All @@ -276,11 +261,33 @@
}
}

func (sb *singleBatcher) findBatcher(_ context.Context) (*batcher, error) {
return sb.batcher, nil
// singleShardBatcher is used when metadataKeys is empty, to avoid the
// additional lock and map operations used in multiBatcher.
type singleShardBatcher struct {
batcher *shard
}

func (sb *singleShardBatcher) consume(_ context.Context, data any) error {
sb.batcher.newItem <- data
return nil
}

func (sb *singleShardBatcher) currentMetadataCardinality() int {
return 1
}

// multiBatcher is used when metadataKeys is not empty.
type multiShardBatcher struct {
*batchProcessor
batchers sync.Map

// Guards the size and the storing logic to ensure no more than limit items are stored.
// If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic.
lock sync.Mutex
size int
}

func (mb *multiBatcher) findBatcher(ctx context.Context) (*batcher, error) {
func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
// Get each metadata key value, form the corresponding
// attribute set for use as a map lookup key.
info := client.FromContext(ctx)
Expand All @@ -300,63 +307,46 @@
}
aset := attribute.NewSet(attrs...)

mb.lock.Lock()
defer mb.lock.Unlock()

b, ok := mb.batchers[aset]
if ok {
return b, nil
}
b, ok := mb.batchers.Load(aset)
if !ok {
mb.lock.Lock()
if mb.metadataLimit != 0 && mb.size >= mb.metadataLimit {
mb.lock.Unlock()
return errTooManyBatchers
}

if limit := mb.metadataLimit; limit != 0 && len(mb.batchers) >= limit {
return nil, errTooManyBatchers
// aset.ToSlice() returns the sorted, deduplicated,
// and name-downcased list of attributes.
var loaded bool
b, loaded = mb.batchers.LoadOrStore(aset, mb.newShard(md))
if !loaded {
mb.size++
}
mb.lock.Unlock()
}

// aset.ToSlice() returns the sorted, deduplicated,
// and name-downcased list of attributes.
b = mb.newBatcher(md)
mb.batchers[aset] = b
return b, nil
}

func (sb *singleBatcher) currentMetadataCardinality() int {
return 1
b.(*shard).newItem <- data
return nil
}

func (mb *multiBatcher) currentMetadataCardinality() int {
func (mb *multiShardBatcher) currentMetadataCardinality() int {

Check warning on line 331 in processor/batchprocessor/batch_processor.go

View check run for this annotation

Codecov / codecov/patch

processor/batchprocessor/batch_processor.go#L331

Added line #L331 was not covered by tests
mb.lock.Lock()
defer mb.lock.Unlock()
return len(mb.batchers)
return mb.size

Check warning on line 334 in processor/batchprocessor/batch_processor.go

View check run for this annotation

Codecov / codecov/patch

processor/batchprocessor/batch_processor.go#L334

Added line #L334 was not covered by tests
}

// ConsumeTraces implements TracesProcessor
func (bp *batchProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
b, err := bp.findBatcher(ctx)
if err != nil {
return err
}
b.newItem <- td
return nil
return bp.batcher.consume(ctx, td)
}

// ConsumeMetrics implements MetricsProcessor
func (bp *batchProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
b, err := bp.findBatcher(ctx)
if err != nil {
return err
}
b.newItem <- md
return nil
return bp.batcher.consume(ctx, md)
}

// ConsumeLogs implements LogsProcessor
func (bp *batchProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
b, err := bp.findBatcher(ctx)
if err != nil {
return err
}
b.newItem <- ld
return nil
return bp.batcher.consume(ctx, ld)
}

// newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout
Expand Down
Loading