Skip to content

Commit

Permalink
Implement the BatchingProcessor (#5093)
Browse files Browse the repository at this point in the history
* [WIP] Implement the BatchingProcessor

* Add TestExportSync

* Add TestChunker

* Test export error default to ErrorHandler

* Fix lint

* Fix chunk smaller than size error

* Add batch tests

* Fix lint

* Update OnEmit test

Check the len of records in eventually assertion given that is what we
are going to measure.

* Revert unneeded change to BatchingProcessor doc

* Add batch type

* Refactor testing of batching config

The BatchingProcessor is not expected to ultimately contain
configuration fields for queue size or export parameters (see #5093).
This will break TestNewBatchingProcessorConfiguration which tests the
configuration by evaluating the BatchingProcessor directly.

Instead, test the batchingConfig and rename the test to
TestNewBatchingConfig to match what is being tested.

* Implement the BatchingProcessor without polling

* Add TestBatchingProcessor

* Add ConcurrentSafe test

* Expand Shutdown tests

* Test context canceled for ForceFlush

* Refactor batch to queue

* Use exportSync

* Update docs and naming

* Split buffered export to its own type

* Update comments and naming

* Fix lint

* Remove redundant triggered type

* Add interval polling

* Refactor test structure

* Add custom ring implimementation

* Add BenchmarkBatchingProcessor

* Fix merge

* Remove custom ring impl

* Remove BenchmarkBatchingProcessor

* Update dev docs

* Test nil exporter

* Update OnEmit test

Ensure the poll goroutine will completely flush the queue of batches.

* Test RetriggerFlushNonBlocking

* Update ascii diagram

* Fix flaky OnEmit

* Revert unnecessary change to test pkg name

* Use batching term in docs

* Document EnqueueExport

* Return from EnqueueExport if blocked

Do not wait for the enqueue to succeed.

* Do not drop failed flush log records

* Use cancelable ctx in concurrency test

* Fix comments

* Apply feedback

Do not spawn a goroutine for the flush operation.

* Return true from EnqueueExport when stopped

* Update sdk/log/batch.go

Co-authored-by: Robert Pająk <pellared@hotmail.com>

* Remove TODO

* Comment re-trigger in poll

---------

Co-authored-by: Robert Pająk <pellared@hotmail.com>
  • Loading branch information
MrAlias and pellared committed Apr 18, 2024
1 parent fe3de70 commit 4af9c20
Show file tree
Hide file tree
Showing 4 changed files with 508 additions and 26 deletions.
211 changes: 189 additions & 22 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"container/ring"
"context"
"errors"
"slices"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -26,56 +29,220 @@ const (
var _ Processor = (*BatchingProcessor)(nil)

// BatchingProcessor is a processor that exports batches of log records.
// A BatchingProcessor must be created with [NewBatchingProcessor].
type BatchingProcessor struct {
exporter Exporter

maxQueueSize int
exportInterval time.Duration
exportTimeout time.Duration
exportMaxBatchSize int
// The BatchingProcessor is designed to provide the highest throughput of
// log records possible while being compatible with OpenTelemetry. The
// entry point of log records is the OnEmit method. This method is designed
// to receive records as fast as possible while still honoring shutdown
// commands. All records received are enqueued to queue.
//
// In order to block OnEmit as little as possible, a separate "poll"
// goroutine is spawned at the creation of a BatchingProcessor. This
// goroutine is responsible for batching the queue at regular polled
// intervals, or when it is directly signaled to.
//
// To keep the polling goroutine from backing up, all batches it makes are
// exported with a bufferedExporter. This exporter allows the poll
// goroutine to enqueue an export payload that will be handled in a
// separate goroutine dedicated to the export. This asynchronous behavior
// allows the poll goroutine to maintain accurate interval polling.
//
// __BatchingProcessor__ __Poll Goroutine__ __Export Goroutine__
// || || || || || ||
// || ********** || || || || ********** ||
// || Records=>* OnEmit * || || | - ticker || || * export * ||
// || ********** || || | - trigger || || ********** ||
// || || || || | || || || ||
// || || || || | || || || ||
// || __________\/___ || || |*********** || || ______/\_______ ||
// || (____queue______)>=||=||===|* batch *===||=||=>[_export_buffer_] ||
// || || || |*********** || || ||
// ||_____________________|| ||__________________|| ||____________________||
//
//
// The "release valve" in this processing is the record queue. This queue
// is a ring buffer. It will overwrite the oldest records first when writes
// to OnEmit are made faster than the queue can be flushed. If batches
// cannot be flushed to the export buffer, the records will remain in the
// queue.

// exporter is the bufferedExporter all batches are exported with.
exporter *bufferExporter

// q is the active queue of records that have not yet been exported.
q *queue
// batchSize is the minimum number of records needed before an export is
// triggered (unless the interval expires).
batchSize int

// pollTrigger triggers the poll goroutine to flush a batch from the queue.
// This is sent to when it is known that the queue contains at least one
// complete batch.
//
// When a send is made to the channel, the poll loop will be reset after
// the flush. If there is still enough records in the queue for another
// batch the reset of the poll loop will automatically re-trigger itself.
// There is no need for the original sender to monitor and resend.
pollTrigger chan struct{}
// pollKill kills the poll goroutine. This is only expected to be closed
// once by the Shutdown method.
pollKill chan struct{}
// pollDone signals the poll goroutine has completed.
pollDone chan struct{}

// stopped holds the stopped state of the BatchingProcessor.
stopped atomic.Bool
}

// NewBatchingProcessor decorates the provided exporter
// so that the log records are batched before exporting.
//
// All of the exporter's methods are called synchronously.
func NewBatchingProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchingProcessor {
cfg := newBatchingConfig(opts)
if exporter == nil {
// Do not panic on nil export.
exporter = defaultNoopExporter
}
cfg := newBatchingConfig(opts)
return &BatchingProcessor{
exporter: exporter,

maxQueueSize: cfg.maxQSize.Value,
exportInterval: cfg.expInterval.Value,
exportTimeout: cfg.expTimeout.Value,
exportMaxBatchSize: cfg.expMaxBatchSize.Value,
// Order is important here. Wrap the timeoutExporter with the chunkExporter
// to ensure each export completes in timeout (instead of all chuncked
// exports).
exporter = newTimeoutExporter(exporter, cfg.expTimeout.Value)
// Use a chunkExporter to ensure ForceFlush and Shutdown calls are batched
// appropriately on export.
exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value)

b := &BatchingProcessor{
// TODO: explore making the size of this configurable.
exporter: newBufferExporter(exporter, 1),

q: newQueue(cfg.maxQSize.Value),
batchSize: cfg.expMaxBatchSize.Value,
pollTrigger: make(chan struct{}, 1),
pollKill: make(chan struct{}),
}
b.pollDone = b.poll(cfg.expInterval.Value)
return b
}

// poll spawns a goroutine to handle interval polling and batch exporting. The
// returned done chan is closed when the spawned goroutine completes.
func (b *BatchingProcessor) poll(interval time.Duration) (done chan struct{}) {
done = make(chan struct{})

ticker := time.NewTicker(interval)
// TODO: investigate using a sync.Pool instead of cloning.
buf := make([]Record, b.batchSize)
go func() {
defer close(done)
defer ticker.Stop()

for {
select {
case <-ticker.C:
case <-b.pollTrigger:
ticker.Reset(interval)
case <-b.pollKill:
return
}

qLen := b.q.TryDequeue(buf, func(r []Record) bool {
ok := b.exporter.EnqueueExport(r)
if ok {
buf = slices.Clone(buf)
}
return ok
})
if qLen >= b.batchSize {
// There is another full batch ready. Immediately trigger
// another export attempt.
select {
case b.pollTrigger <- struct{}{}:
default:
// Another flush signal already received.
}
}
}
}()
return done
}

// OnEmit batches provided log record.
func (b *BatchingProcessor) OnEmit(ctx context.Context, r Record) error {
// TODO (#5063): Implement.
func (b *BatchingProcessor) OnEmit(_ context.Context, r Record) error {
if b.stopped.Load() {
return nil
}
if n := b.q.Enqueue(r); n >= b.batchSize {
select {
case b.pollTrigger <- struct{}{}:
default:
// Flush chan full. The poll goroutine will handle this by
// re-sending any trigger until the queue has less than batchSize
// records.
}
}
return nil
}

// Enabled returns true.
// Enabled returns if b is enabled.
func (b *BatchingProcessor) Enabled(context.Context, Record) bool {
return true
return !b.stopped.Load()
}

// Shutdown flushes queued log records and shuts down the decorated exporter.
func (b *BatchingProcessor) Shutdown(ctx context.Context) error {
// TODO (#5063): Implement.
return nil
if b.stopped.Swap(true) {
return nil
}

// Stop the poll goroutine.
close(b.pollKill)
select {
case <-b.pollDone:
case <-ctx.Done():
// Out of time.
return errors.Join(ctx.Err(), b.exporter.Shutdown(ctx))
}

// Flush remaining queued before exporter shutdown.
err := b.exporter.Export(ctx, b.q.Flush())
return errors.Join(err, b.exporter.Shutdown(ctx))
}

var errPartialFlush = errors.New("partial flush: export buffer full")

// Used for testing.
var ctxErr = func(ctx context.Context) error {
return ctx.Err()
}

// ForceFlush flushes queued log records and flushes the decorated exporter.
func (b *BatchingProcessor) ForceFlush(ctx context.Context) error {
// TODO (#5063): Implement.
return nil
if b.stopped.Load() {
return nil
}

buf := make([]Record, b.q.cap)
notFlushed := func() bool {
var flushed bool
_ = b.q.TryDequeue(buf, func(r []Record) bool {
flushed = b.exporter.EnqueueExport(r)
return flushed
})
return !flushed
}
var err error
// For as long as ctx allows, try to make a single flush of the queue.
for notFlushed() {
// Use ctxErr instead of calling ctx.Err directly so we can test
// the partial error return.
if e := ctxErr(ctx); e != nil {
err = errors.Join(e, errPartialFlush)
break
}
}
return errors.Join(err, b.exporter.ForceFlush(ctx))
}

// queue holds a queue of logging records.
Expand Down

0 comments on commit 4af9c20

Please sign in to comment.