Skip to content

Commit

Permalink
make singleBatcher, multiBatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd committed Apr 5, 2023
1 parent 6728d37 commit 8cf701d
Showing 1 changed file with 47 additions and 23 deletions.
70 changes: 47 additions & 23 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,24 @@ type batchProcessor struct {

telemetry *batchProcessorTelemetry

// singleton is used when metadataKeys is empty, to avoid the
// additional lock and map operations.
singleton *batcher
// batcherFinder will be either *singletonBatcher or *multiBatcher
batcherFinder
}

type batcherFinder interface {
findBatcher(ctx context.Context) (*batcher, error)
currentMetadataCardinality() int
}

// singleBatcher is used when metadataKeys is empty, to avoid the
// additional lock and map operations used in multiBatcher.
type singleBatcher struct {
*batcher
}

// multiBatcher is used when metadataKeys is not empty.
type multiBatcher struct {
*batchProcessor

lock sync.Mutex
batchers map[attribute.Set]*batcher
Expand Down Expand Up @@ -147,11 +162,21 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func
metadataKeys: mks,
metadataLimit: int(cfg.MetadataCardinalityLimit),
}
if len(bp.metadataKeys) == 0 {
bp.batcherFinder = &singleBatcher{bp.newBatcher(nil)}
} else {
bp.batcherFinder = &multiBatcher{
batchProcessor: bp,
batchers: map[attribute.Set]*batcher{},
}
}

bpt, err := newBatchProcessorTelemetry(set, bp.currentMetadataCardinality, useOtel)
if err != nil {
return nil, fmt.Errorf("error creating batch processor telemetry: %w", err)
}
bp.telemetry = bpt

return bp, nil
}

Expand All @@ -178,11 +203,6 @@ func (bp *batchProcessor) Capabilities() consumer.Capabilities {
// Start is invoked during service startup.
func (bp *batchProcessor) Start(context.Context, component.Host) error {
bp.goroutines.Add(1)
if len(bp.metadataKeys) == 0 {
bp.singleton = bp.newBatcher(nil)
} else {
bp.batchers = map[attribute.Set]*batcher{}
}
return nil
}

Expand Down Expand Up @@ -268,17 +288,17 @@ func (b *batcher) sendItems(trigger trigger) {
}
}

func (bp *batchProcessor) findBatcher(ctx context.Context) (*batcher, error) {
if bp.singleton != nil {
return bp.singleton, nil
}
func (sb *singleBatcher) findBatcher(ctx context.Context) (*batcher, error) {
return sb.batcher, nil
}

func (mb *multiBatcher) findBatcher(ctx context.Context) (*batcher, error) {
// Get each metadata key value, form the corresponding
// attribute set for use as a map lookup key.
info := client.FromContext(ctx)
md := map[string][]string{}
var attrs []attribute.KeyValue
for _, k := range bp.metadataKeys {
for _, k := range mb.metadataKeys {
// Lookup the value in the incoming metadata, copy it
// into the outgoing metadata, and create a unique
// value for the attributeSet.
Expand All @@ -292,29 +312,33 @@ func (bp *batchProcessor) findBatcher(ctx context.Context) (*batcher, error) {
}
aset := attribute.NewSet(attrs...)

bp.lock.Lock()
defer bp.lock.Unlock()
mb.lock.Lock()
defer mb.lock.Unlock()

b, ok := bp.batchers[aset]
b, ok := mb.batchers[aset]
if ok {
return b, nil
}

if limit := bp.metadataLimit; limit != 0 && len(bp.batchers) >= int(limit) {
if limit := mb.metadataLimit; limit != 0 && len(mb.batchers) >= int(limit) {
return nil, errTooManyBatchers
}

// aset.ToSlice() returns the sorted, deduplicated,
// and name-downcased list of attributes.
b = bp.newBatcher(md)
bp.batchers[aset] = b
b = mb.newBatcher(md)
mb.batchers[aset] = b
return b, nil
}

func (bp *batchProcessor) currentMetadataCardinality() int {
bp.lock.Lock()
defer bp.lock.Unlock()
return len(bp.batchers)
func (sb *singleBatcher) currentMetadataCardinality() int {
return 1
}

func (mb *multiBatcher) currentMetadataCardinality() int {
mb.lock.Lock()
defer mb.lock.Unlock()
return len(mb.batchers)
}

// ConsumeTraces implements TracesProcessor
Expand Down

0 comments on commit 8cf701d

Please sign in to comment.