diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 1089f75e69e..a321bf2f381 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -6,7 +6,10 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "container/ring" "context" + "errors" + "slices" "sync" + "sync/atomic" "time" ) @@ -26,13 +29,70 @@ const ( var _ Processor = (*BatchingProcessor)(nil) // BatchingProcessor is a processor that exports batches of log records. +// A BatchingProcessor must be created with [NewBatchingProcessor]. type BatchingProcessor struct { - exporter Exporter - - maxQueueSize int - exportInterval time.Duration - exportTimeout time.Duration - exportMaxBatchSize int + // The BatchingProcessor is designed to provide the highest throughput of + // log records possible while being compatible with OpenTelemetry. The + // entry point of log records is the OnEmit method. This method is designed + // to receive records as fast as possible while still honoring shutdown + // commands. All records received are enqueued to queue. + // + // In order to block OnEmit as little as possible, a separate "poll" + // goroutine is spawned at the creation of a BatchingProcessor. This + // goroutine is responsible for batching the queue at regular polled + // intervals, or when it is directly signaled to. + // + // To keep the polling goroutine from backing up, all batches it makes are + // exported with a bufferedExporter. This exporter allows the poll + // goroutine to enqueue an export payload that will be handled in a + // separate goroutine dedicated to the export. This asynchronous behavior + // allows the poll goroutine to maintain accurate interval polling. + // + // __BatchingProcessor__ __Poll Goroutine__ __Export Goroutine__ + // || || || || || || + // || ********** || || || || ********** || + // || Records=>* OnEmit * || || | - ticker || || * export * || + // || ********** || || | - trigger || || ********** || + // || || || || | || || || || + // || || || || | || || || || + // || __________\/___ || || |*********** || || ______/\_______ || + // || (____queue______)>=||=||===|* batch *===||=||=>[_export_buffer_] || + // || || || |*********** || || || + // ||_____________________|| ||__________________|| ||____________________|| + // + // + // The "release valve" in this processing is the record queue. This queue + // is a ring buffer. It will overwrite the oldest records first when writes + // to OnEmit are made faster than the queue can be flushed. If batches + // cannot be flushed to the export buffer, the records will remain in the + // queue. + + // exporter is the bufferedExporter all batches are exported with. + exporter *bufferExporter + + // q is the active queue of records that have not yet been exported. + q *queue + // batchSize is the minimum number of records needed before an export is + // triggered (unless the interval expires). + batchSize int + + // pollTrigger triggers the poll goroutine to flush a batch from the queue. + // This is sent to when it is known that the queue contains at least one + // complete batch. + // + // When a send is made to the channel, the poll loop will be reset after + // the flush. If there is still enough records in the queue for another + // batch the reset of the poll loop will automatically re-trigger itself. + // There is no need for the original sender to monitor and resend. + pollTrigger chan struct{} + // pollKill kills the poll goroutine. This is only expected to be closed + // once by the Shutdown method. + pollKill chan struct{} + // pollDone signals the poll goroutine has completed. + pollDone chan struct{} + + // stopped holds the stopped state of the BatchingProcessor. + stopped atomic.Bool } // NewBatchingProcessor decorates the provided exporter @@ -40,42 +100,149 @@ type BatchingProcessor struct { // // All of the exporter's methods are called synchronously. func NewBatchingProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchingProcessor { + cfg := newBatchingConfig(opts) if exporter == nil { // Do not panic on nil export. exporter = defaultNoopExporter } - cfg := newBatchingConfig(opts) - return &BatchingProcessor{ - exporter: exporter, - - maxQueueSize: cfg.maxQSize.Value, - exportInterval: cfg.expInterval.Value, - exportTimeout: cfg.expTimeout.Value, - exportMaxBatchSize: cfg.expMaxBatchSize.Value, + // Order is important here. Wrap the timeoutExporter with the chunkExporter + // to ensure each export completes in timeout (instead of all chuncked + // exports). + exporter = newTimeoutExporter(exporter, cfg.expTimeout.Value) + // Use a chunkExporter to ensure ForceFlush and Shutdown calls are batched + // appropriately on export. + exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value) + + b := &BatchingProcessor{ + // TODO: explore making the size of this configurable. + exporter: newBufferExporter(exporter, 1), + + q: newQueue(cfg.maxQSize.Value), + batchSize: cfg.expMaxBatchSize.Value, + pollTrigger: make(chan struct{}, 1), + pollKill: make(chan struct{}), } + b.pollDone = b.poll(cfg.expInterval.Value) + return b +} + +// poll spawns a goroutine to handle interval polling and batch exporting. The +// returned done chan is closed when the spawned goroutine completes. +func (b *BatchingProcessor) poll(interval time.Duration) (done chan struct{}) { + done = make(chan struct{}) + + ticker := time.NewTicker(interval) + // TODO: investigate using a sync.Pool instead of cloning. + buf := make([]Record, b.batchSize) + go func() { + defer close(done) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + case <-b.pollTrigger: + ticker.Reset(interval) + case <-b.pollKill: + return + } + + qLen := b.q.TryDequeue(buf, func(r []Record) bool { + ok := b.exporter.EnqueueExport(r) + if ok { + buf = slices.Clone(buf) + } + return ok + }) + if qLen >= b.batchSize { + // There is another full batch ready. Immediately trigger + // another export attempt. + select { + case b.pollTrigger <- struct{}{}: + default: + // Another flush signal already received. + } + } + } + }() + return done } // OnEmit batches provided log record. -func (b *BatchingProcessor) OnEmit(ctx context.Context, r Record) error { - // TODO (#5063): Implement. +func (b *BatchingProcessor) OnEmit(_ context.Context, r Record) error { + if b.stopped.Load() { + return nil + } + if n := b.q.Enqueue(r); n >= b.batchSize { + select { + case b.pollTrigger <- struct{}{}: + default: + // Flush chan full. The poll goroutine will handle this by + // re-sending any trigger until the queue has less than batchSize + // records. + } + } return nil } -// Enabled returns true. +// Enabled returns if b is enabled. func (b *BatchingProcessor) Enabled(context.Context, Record) bool { - return true + return !b.stopped.Load() } // Shutdown flushes queued log records and shuts down the decorated exporter. func (b *BatchingProcessor) Shutdown(ctx context.Context) error { - // TODO (#5063): Implement. - return nil + if b.stopped.Swap(true) { + return nil + } + + // Stop the poll goroutine. + close(b.pollKill) + select { + case <-b.pollDone: + case <-ctx.Done(): + // Out of time. + return errors.Join(ctx.Err(), b.exporter.Shutdown(ctx)) + } + + // Flush remaining queued before exporter shutdown. + err := b.exporter.Export(ctx, b.q.Flush()) + return errors.Join(err, b.exporter.Shutdown(ctx)) +} + +var errPartialFlush = errors.New("partial flush: export buffer full") + +// Used for testing. +var ctxErr = func(ctx context.Context) error { + return ctx.Err() } // ForceFlush flushes queued log records and flushes the decorated exporter. func (b *BatchingProcessor) ForceFlush(ctx context.Context) error { - // TODO (#5063): Implement. - return nil + if b.stopped.Load() { + return nil + } + + buf := make([]Record, b.q.cap) + notFlushed := func() bool { + var flushed bool + _ = b.q.TryDequeue(buf, func(r []Record) bool { + flushed = b.exporter.EnqueueExport(r) + return flushed + }) + return !flushed + } + var err error + // For as long as ctx allows, try to make a single flush of the queue. + for notFlushed() { + // Use ctxErr instead of calling ctx.Err directly so we can test + // the partial error return. + if e := ctxErr(ctx); e != nil { + err = errors.Join(e, errPartialFlush) + break + } + } + return errors.Join(err, b.exporter.ForceFlush(ctx)) } // queue holds a queue of logging records. diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 7e44b1903f3..cfedb8c486f 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -4,6 +4,7 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( + "context" "slices" "strconv" "sync" @@ -144,6 +145,301 @@ func TestNewBatchingConfig(t *testing.T) { } } +func TestBatchingProcessor(t *testing.T) { + ctx := context.Background() + + t.Run("NilExporter", func(t *testing.T) { + assert.NotPanics(t, func() { NewBatchingProcessor(nil) }) + }) + + t.Run("Polling", func(t *testing.T) { + e := newTestExporter(nil) + const size = 15 + b := NewBatchingProcessor( + e, + WithMaxQueueSize(2*size), + WithExportMaxBatchSize(2*size), + WithExportInterval(time.Nanosecond), + WithExportTimeout(time.Hour), + ) + for _, r := range make([]Record, size) { + assert.NoError(t, b.OnEmit(ctx, r)) + } + var got []Record + assert.Eventually(t, func() bool { + for _, r := range e.Records() { + got = append(got, r...) + } + return len(got) == size + }, 2*time.Second, time.Microsecond) + _ = b.Shutdown(ctx) + }) + + t.Run("OnEmit", func(t *testing.T) { + const batch = 10 + e := newTestExporter(nil) + b := NewBatchingProcessor( + e, + WithMaxQueueSize(10*batch), + WithExportMaxBatchSize(batch), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + for _, r := range make([]Record, 10*batch) { + assert.NoError(t, b.OnEmit(ctx, r)) + } + assert.Eventually(t, func() bool { + return e.ExportN() > 1 + }, 2*time.Second, time.Microsecond, "multi-batch flush") + + assert.NoError(t, b.Shutdown(ctx)) + assert.GreaterOrEqual(t, e.ExportN(), 10) + }) + + t.Run("RetriggerFlushNonBlocking", func(t *testing.T) { + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + + const batch = 10 + b := NewBatchingProcessor( + e, + WithMaxQueueSize(3*batch), + WithExportMaxBatchSize(batch), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + for _, r := range make([]Record, 2*batch) { + assert.NoError(t, b.OnEmit(ctx, r)) + } + + var n int + require.Eventually(t, func() bool { + n = e.ExportN() + return n > 0 + }, 2*time.Second, time.Microsecond, "blocked export not attempted") + + var err error + require.Eventually(t, func() bool { + err = b.OnEmit(ctx, Record{}) + return true + }, time.Second, time.Microsecond, "OnEmit blocked") + assert.NoError(t, err) + + e.ExportTrigger <- struct{}{} + assert.Eventually(t, func() bool { + return e.ExportN() > n + }, 2*time.Second, time.Microsecond, "flush not retriggered") + + close(e.ExportTrigger) + assert.NoError(t, b.Shutdown(ctx)) + assert.Equal(t, 3, e.ExportN()) + }) + + t.Run("Enabled", func(t *testing.T) { + b := NewBatchingProcessor(defaultNoopExporter) + assert.True(t, b.Enabled(ctx, Record{})) + + _ = b.Shutdown(ctx) + assert.False(t, b.Enabled(ctx, Record{})) + }) + + t.Run("Shutdown", func(t *testing.T) { + t.Run("Error", func(t *testing.T) { + e := newTestExporter(assert.AnError) + b := NewBatchingProcessor(e) + assert.ErrorIs(t, b.Shutdown(ctx), assert.AnError, "exporter error not returned") + assert.NoError(t, b.Shutdown(ctx)) + }) + + t.Run("Multiple", func(t *testing.T) { + e := newTestExporter(nil) + b := NewBatchingProcessor(e) + + const shutdowns = 3 + for i := 0; i < shutdowns; i++ { + assert.NoError(t, b.Shutdown(ctx)) + } + assert.Equal(t, 1, e.ShutdownN(), "exporter Shutdown calls") + }) + + t.Run("OnEmit", func(t *testing.T) { + e := newTestExporter(nil) + b := NewBatchingProcessor(e) + assert.NoError(t, b.Shutdown(ctx)) + + want := e.ExportN() + assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.Equal(t, want, e.ExportN(), "Export called after shutdown") + }) + + t.Run("ForceFlush", func(t *testing.T) { + e := newTestExporter(nil) + b := NewBatchingProcessor(e) + + assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.NoError(t, b.Shutdown(ctx)) + + assert.NoError(t, b.ForceFlush(ctx)) + assert.Equal(t, 0, e.ForceFlushN(), "ForceFlush called after shutdown") + }) + + t.Run("CanceledContext", func(t *testing.T) { + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + t.Cleanup(func() { close(e.ExportTrigger) }) + b := NewBatchingProcessor(e) + + ctx := context.Background() + c, cancel := context.WithCancel(ctx) + cancel() + + assert.ErrorIs(t, b.Shutdown(c), context.Canceled) + }) + }) + + t.Run("ForceFlush", func(t *testing.T) { + t.Run("Flush", func(t *testing.T) { + e := newTestExporter(assert.AnError) + b := NewBatchingProcessor( + e, + WithMaxQueueSize(100), + WithExportMaxBatchSize(10), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + t.Cleanup(func() { _ = b.Shutdown(ctx) }) + + var r Record + r.SetBody(log.BoolValue(true)) + require.NoError(t, b.OnEmit(ctx, r)) + + assert.ErrorIs(t, b.ForceFlush(ctx), assert.AnError, "exporter error not returned") + assert.Equal(t, 1, e.ForceFlushN(), "exporter ForceFlush calls") + if assert.Equal(t, 1, e.ExportN(), "exporter Export calls") { + got := e.Records() + if assert.Len(t, got[0], 1, "records received") { + assert.Equal(t, r, got[0][0]) + } + } + }) + + t.Run("ErrorPartialFlush", func(t *testing.T) { + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + + ctxErrCalled := make(chan struct{}) + orig := ctxErr + ctxErr = func(ctx context.Context) error { + close(ctxErrCalled) + return orig(ctx) + } + t.Cleanup(func() { ctxErr = orig }) + + const batch = 1 + b := NewBatchingProcessor( + e, + WithMaxQueueSize(10*batch), + WithExportMaxBatchSize(batch), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + + // Enqueue 10 x "batch size" amount of records. + for i := 0; i < 10*batch; i++ { + require.NoError(t, b.OnEmit(ctx, Record{})) + } + assert.Eventually(t, func() bool { + return e.ExportN() > 0 + }, 2*time.Second, time.Microsecond) + // 1 export being performed, 1 export in buffer chan, >1 batch + // still in queue that an attempt to flush will be made on. + // + // Stop the poll routine to prevent contention with the queue lock. + // This is outside of "normal" operations, but we are testing if + // ForceFlush will return the correct error when an EnqueueExport + // fails and not if ForceFlush will ever get the queue lock in high + // throughput situations. + close(b.pollDone) + <-b.pollDone + + // Cancel the flush ctx from the start so errPartialFlush is + // returned right away. + fCtx, cancel := context.WithCancel(ctx) + cancel() + + errCh := make(chan error, 1) + go func() { + errCh <- b.ForceFlush(fCtx) + close(errCh) + }() + // Wait for ctxErrCalled to close before closing ExportTrigger so + // we know the errPartialFlush will be returned in ForceFlush. + <-ctxErrCalled + close(e.ExportTrigger) + + err := <-errCh + assert.ErrorIs(t, err, errPartialFlush, "partial flush error") + assert.ErrorIs(t, err, context.Canceled, "ctx canceled error") + }) + + t.Run("CanceledContext", func(t *testing.T) { + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + b := NewBatchingProcessor(e) + t.Cleanup(func() { _ = b.Shutdown(ctx) }) + + var r Record + r.SetBody(log.BoolValue(true)) + _ = b.OnEmit(ctx, r) + t.Cleanup(func() { _ = b.Shutdown(ctx) }) + t.Cleanup(func() { close(e.ExportTrigger) }) + + c, cancel := context.WithCancel(ctx) + cancel() + assert.ErrorIs(t, b.ForceFlush(c), context.Canceled) + }) + }) + + t.Run("ConcurrentSafe", func(t *testing.T) { + const goRoutines = 10 + + e := newTestExporter(nil) + b := NewBatchingProcessor(e) + + ctx, cancel := context.WithCancel(ctx) + var wg sync.WaitGroup + for i := 0; i < goRoutines-1; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + assert.NoError(t, b.OnEmit(ctx, Record{})) + // Ignore partial flush errors. + _ = b.ForceFlush(ctx) + } + } + }() + } + + require.Eventually(t, func() bool { + return e.ExportN() > 0 + }, 2*time.Second, time.Microsecond, "export before shutdown") + + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, b.Shutdown(ctx)) + cancel() + }() + + wg.Wait() + }) +} + func TestQueue(t *testing.T) { var r Record r.SetBody(log.BoolValue(true)) diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index 6ebf061db48..e9f2140e6cf 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -219,14 +219,33 @@ func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan } // EnqueueExport enqueues an export of records in the context of ctx to be -// performed asynchronously. This will return true if the exported is -// successfully enqueued, false otherwise. +// performed asynchronously. This will return true if the records are +// successfully enqueued (or the bufferExporter is shut down), false otherwise. +// +// The passed records are held after this call returns. func (e *bufferExporter) EnqueueExport(records []Record) bool { if len(records) == 0 { // Nothing to enqueue, do not waste input space. return true } - return e.enqueue(context.Background(), records, nil) == nil + + data := exportData{ctx: context.Background(), records: records} + + e.inputMu.Lock() + defer e.inputMu.Unlock() + + // Check stopped before enqueueing now that e.inputMu is held. This + // prevents sends on a closed chan when Shutdown is called concurrently. + if e.stopped.Load() { + return true + } + + select { + case e.input <- data: + return true + default: + return false + } } // Export synchronously exports records in the context of ctx. This will not diff --git a/sdk/log/exporter_test.go b/sdk/log/exporter_test.go index 85c12860409..264abc3a513 100644 --- a/sdk/log/exporter_test.go +++ b/sdk/log/exporter_test.go @@ -583,7 +583,7 @@ func TestBufferExporter(t *testing.T) { e := newBufferExporter(exp, 1) _ = e.Shutdown(context.Background()) - assert.False(t, e.EnqueueExport(make([]Record, 1))) + assert.True(t, e.EnqueueExport(make([]Record, 1))) }) }) }