From b7b7e51d1d54e7bcb3df8950e9f31661e1ac4c7e Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Sat, 4 May 2024 09:20:48 -0700 Subject: [PATCH] [chore] [exporterhelper] Integrate capacity limiting into the communication channel (#9232) Integrate capacity limiting into internal channels used by both memory and persistent queues. Otherwise, with the independent capacity limiter, it's hard to ensure that queue size is always accurate going forward. Benchmarks before: ``` goos: darwin goarch: arm64 Benchmark_QueueUsage_1000_requests-10 3252 325010 ns/op 246059 B/op 10 allocs/op Benchmark_QueueUsage_100000_requests-10 39 29811116 ns/op 24002870 B/op 10 allocs/op Benchmark_QueueUsage_10000_items-10 3404 349753 ns/op 246052 B/op 10 allocs/op Benchmark_QueueUsage_1M_items-10 40 29415583 ns/op 24002858 B/op 10 allocs/op BenchmarkPersistentQueue_TraceSpans BenchmarkPersistentQueue_TraceSpans/#traces:_1_#spansPerTrace:_1-10 338180 3836 ns/op 2851 B/op 78 allocs/op BenchmarkPersistentQueue_TraceSpans/#traces:_1_#spansPerTrace:_10-10 81369 15822 ns/op 14598 B/op 289 allocs/op BenchmarkPersistentQueue_TraceSpans/#traces:_10_#spansPerTrace:_10-10 13066 90155 ns/op 130087 B/op 2417 allocs/op ``` Benchmarks after: ``` Benchmark_QueueUsage_1000_requests-10 4210 278175 ns/op 246055 B/op 10 allocs/op Benchmark_QueueUsage_100000_requests-10 42 25835945 ns/op 24002968 B/op 10 allocs/op Benchmark_QueueUsage_10000_items-10 4376 279571 ns/op 246056 B/op 10 allocs/op Benchmark_QueueUsage_1M_items-10 42 26483907 ns/op 24002995 B/op 10 allocs/op BenchmarkPersistentQueue_TraceSpans BenchmarkPersistentQueue_TraceSpans/#traces:_1_#spansPerTrace:_1-10 328268 4251 ns/op 2854 B/op 78 allocs/op BenchmarkPersistentQueue_TraceSpans/#traces:_1_#spansPerTrace:_10-10 101683 12238 ns/op 14582 B/op 289 allocs/op BenchmarkPersistentQueue_TraceSpans/#traces:_10_#spansPerTrace:_10-10 13382 86464 ns/op 130154 B/op 2417 allocs/op ``` --- exporter/exporterqueue/queue.go | 4 +- .../internal/queue/bounded_memory_queue.go | 23 +- .../queue/bounded_memory_queue_test.go | 10 +- exporter/internal/queue/persistent_queue.go | 204 ++++++++---------- .../internal/queue/persistent_queue_test.go | 124 +++++++++-- exporter/internal/queue/queue.go | 23 ++ exporter/internal/queue/queue_capacity.go | 74 ------- .../internal/queue/queue_capacity_test.go | 58 ----- exporter/internal/queue/sized_channel.go | 104 +++++++++ exporter/internal/queue/sized_channel_test.go | 44 ++++ 10 files changed, 395 insertions(+), 273 deletions(-) delete mode 100644 exporter/internal/queue/queue_capacity.go delete mode 100644 exporter/internal/queue/queue_capacity_test.go create mode 100644 exporter/internal/queue/sized_channel.go create mode 100644 exporter/internal/queue/sized_channel_test.go diff --git a/exporter/exporterqueue/queue.go b/exporter/exporterqueue/queue.go index e97392097a3..5b568851b66 100644 --- a/exporter/exporterqueue/queue.go +++ b/exporter/exporterqueue/queue.go @@ -95,7 +95,7 @@ func sizerFromConfig[T itemsCounter](Config) queue.Sizer[T] { return &queue.RequestSizer[T]{} } -func capacityFromConfig(cfg Config) int { +func capacityFromConfig(cfg Config) int64 { // TODO: Handle other ways to measure the queue size once they are added. - return cfg.QueueSize + return int64(cfg.QueueSize) } diff --git a/exporter/internal/queue/bounded_memory_queue.go b/exporter/internal/queue/bounded_memory_queue.go index 9f85e8496bf..98e1b281176 100644 --- a/exporter/internal/queue/bounded_memory_queue.go +++ b/exporter/internal/queue/bounded_memory_queue.go @@ -16,43 +16,38 @@ import ( // the producer are dropped. type boundedMemoryQueue[T any] struct { component.StartFunc - *queueCapacityLimiter[T] - items chan queueRequest[T] + *sizedChannel[memQueueEl[T]] + sizer Sizer[T] } // MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation. type MemoryQueueSettings[T any] struct { Sizer Sizer[T] - Capacity int + Capacity int64 } // NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional // callback for dropped items (e.g. useful to emit metrics). func NewBoundedMemoryQueue[T any](set MemoryQueueSettings[T]) Queue[T] { return &boundedMemoryQueue[T]{ - queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity), - items: make(chan queueRequest[T], set.Capacity), + sizedChannel: newSizedChannel[memQueueEl[T]](set.Capacity, nil, 0), + sizer: set.Sizer, } } // Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic. func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { - if !q.queueCapacityLimiter.claim(req) { - return ErrQueueIsFull - } - q.items <- queueRequest[T]{ctx: ctx, req: req} - return nil + return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil) } // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped and emptied. func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - item, ok := <-q.items + item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) }) if !ok { return false } - q.queueCapacityLimiter.release(item.req) // the memory queue doesn't handle consume errors _ = consumeFunc(item.ctx, item.req) return true @@ -60,11 +55,11 @@ func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) err // Shutdown closes the queue channel to initiate draining of the queue. func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error { - close(q.items) + q.sizedChannel.shutdown() return nil } -type queueRequest[T any] struct { +type memQueueEl[T any] struct { req T ctx context.Context } diff --git a/exporter/internal/queue/bounded_memory_queue_test.go b/exporter/internal/queue/bounded_memory_queue_test.go index d861893eda9..ae98dca41f1 100644 --- a/exporter/internal/queue/bounded_memory_queue_test.go +++ b/exporter/internal/queue/bounded_memory_queue_test.go @@ -133,7 +133,7 @@ func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], requestsCount int) func queueUsage(tb testing.TB, sizer Sizer[fakeReq], requestsCount int) { var wg sync.WaitGroup wg.Add(requestsCount) - q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: 10 * requestsCount}) + q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: int64(10 * requestsCount)}) consumers := NewQueueConsumers(q, 1, func(context.Context, fakeReq) error { wg.Done() return nil @@ -156,3 +156,11 @@ func TestZeroSizeNoConsumers(t *testing.T) { assert.NoError(t, q.Shutdown(context.Background())) } + +type fakeReq struct { + itemsCount int +} + +func (r fakeReq) ItemsCount() int { + return r.itemsCount +} diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index 80a71f6bad7..07b0d1fb628 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -10,7 +10,6 @@ import ( "fmt" "strconv" "sync" - "sync/atomic" "go.uber.org/multierr" "go.uber.org/zap" @@ -44,7 +43,10 @@ import ( // index index x // xxxx deleted type persistentQueue[T any] struct { - *queueCapacityLimiter[T] + // sizedChannel is used by the persistent queue for two purposes: + // 1. a communication channel notifying the consumer that a new item is available. + // 2. capacity control based on the size of the items. + *sizedChannel[permanentQueueEl] set PersistentQueueSettings[T] logger *zap.Logger @@ -53,14 +55,10 @@ type persistentQueue[T any] struct { // isRequestSized indicates whether the queue is sized by the number of requests. isRequestSized bool - putChan chan struct{} - // mu guards everything declared below. mu sync.Mutex readIndex uint64 writeIndex uint64 - initIndexSize uint64 - initQueueSize *atomic.Uint64 currentlyDispatchedItems []uint64 refClient int64 stopped bool @@ -86,7 +84,7 @@ var ( type PersistentQueueSettings[T any] struct { Sizer Sizer[T] - Capacity int + Capacity int64 DataType component.DataType StorageID component.ID Marshaler func(req T) ([]byte, error) @@ -98,12 +96,9 @@ type PersistentQueueSettings[T any] struct { func NewPersistentQueue[T any](set PersistentQueueSettings[T]) Queue[T] { _, isRequestSized := set.Sizer.(*RequestSizer[T]) return &persistentQueue[T]{ - queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity), - set: set, - logger: set.ExporterSettings.Logger, - initQueueSize: &atomic.Uint64{}, - isRequestSized: isRequestSized, - putChan: make(chan struct{}, set.Capacity), + set: set, + logger: set.ExporterSettings.Logger, + isRequestSized: isRequestSized, } } @@ -148,39 +143,49 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex pq.readIndex = 0 pq.writeIndex = 0 } - pq.initIndexSize = pq.writeIndex - pq.readIndex - // Ensure the communication channel has the same size as the queue - for i := 0; i < int(pq.initIndexSize); i++ { - pq.putChan <- struct{}{} - } + initIndexSize := pq.writeIndex - pq.readIndex + + var ( + initEls []permanentQueueEl + initQueueSize uint64 + ) - // Read snapshot of the queue size from storage. It's not a problem if the value cannot be fetched, - // or it's not accurate. The queue size will be corrected once the recovered queue is drained. - if pq.initIndexSize > 0 { + // Pre-allocate the communication channel with the size of the restored queue. + if initIndexSize > 0 { + initQueueSize = initIndexSize // If the queue is sized by the number of requests, no need to read the queue size from storage. - if pq.isRequestSized { - pq.initQueueSize.Store(pq.initIndexSize) - return + if !pq.isRequestSized { + if restoredQueueSize, err := pq.restoreQueueSizeFromStorage(ctx); err == nil { + initQueueSize = restoredQueueSize + } } - res, err := pq.client.Get(ctx, queueSizeKey) - if err == nil { - var restoredQueueSize uint64 - restoredQueueSize, err = bytesToItemIndex(res) - pq.initQueueSize.Store(restoredQueueSize) - } - if err != nil { - if errors.Is(err, errValueNotSet) { - pq.logger.Warn("Cannot read the queue size snapshot from storage. "+ - "The reported queue size will be inaccurate until the initial queue is drained. "+ - "It's expected when the items sized queue enabled for the first time", zap.Error(err)) - } else { - pq.logger.Error("Failed to read the queue size snapshot from storage. "+ - "The reported queue size will be inaccurate until the initial queue is drained.", zap.Error(err)) - } + // Ensure the communication channel filled with evenly sized elements up to the total restored queue size. + initEls = make([]permanentQueueEl, initIndexSize) + } + + pq.sizedChannel = newSizedChannel[permanentQueueEl](pq.set.Capacity, initEls, int64(initQueueSize)) +} + +// permanentQueueEl is the type of the elements passed to the sizedChannel by the persistentQueue. +type permanentQueueEl struct{} + +// restoreQueueSizeFromStorage restores the queue size from storage. +func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) (uint64, error) { + val, err := pq.client.Get(ctx, queueSizeKey) + if err != nil { + if errors.Is(err, errValueNotSet) { + pq.logger.Warn("Cannot read the queue size snapshot from storage. "+ + "The reported queue size will be inaccurate until the initial queue is drained. "+ + "It's expected when the items sized queue enabled for the first time", zap.Error(err)) + } else { + pq.logger.Error("Failed to read the queue size snapshot from storage. "+ + "The reported queue size will be inaccurate until the initial queue is drained.", zap.Error(err)) } + return 0, err } + return bytesToItemIndex(val) } // Consume applies the provided function on the head of queue. @@ -188,14 +193,24 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex // The function returns true when an item is consumed or false if the queue is stopped. func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { for { + var ( + req T + onProcessingFinished func(error) + consumed bool + ) + // If we are stopped we still process all the other events in the channel before, but we // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. - _, ok := <-pq.putChan + _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { + req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) + if !consumed { + return 0 + } + return pq.set.Sizer.Sizeof(req) + }) if !ok { return false } - - req, onProcessingFinished, consumed := pq.getNextItem(context.Background()) if consumed { onProcessingFinished(consumeFunc(context.Background(), req)) return true @@ -203,31 +218,24 @@ func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error } } -// Size returns the current size of the queue. -func (pq *persistentQueue[T]) Size() int { - return int(pq.initQueueSize.Load()) + pq.queueCapacityLimiter.Size() -} - func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error { - close(pq.putChan) + // If the queue is not initialized, there is nothing to shut down. + if pq.client == nil { + return nil + } + pq.mu.Lock() defer pq.mu.Unlock() + backupErr := pq.backupQueueSize(ctx) + pq.sizedChannel.shutdown() // Mark this queue as stopped, so consumer don't start any more work. pq.stopped = true - return multierr.Combine( - pq.backupQueueSize(ctx), - pq.unrefClient(ctx), - ) + return multierr.Combine(backupErr, pq.unrefClient(ctx)) } // backupQueueSize writes the current queue size to storage. The value is used to recover the queue size // in case if the collector is killed. func (pq *persistentQueue[T]) backupQueueSize(ctx context.Context) error { - // Client can be nil if the queue is not initialized yet. - if pq.client == nil { - return nil - } - // No need to write the queue size if the queue is sized by the number of requests. // That information is already stored as difference between read and write indexes. if pq.isRequestSized { @@ -258,34 +266,31 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error { // putInternal is the internal version that requires caller to hold the mutex lock. func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { - if !pq.queueCapacityLimiter.claim(req) { - pq.logger.Warn("Maximum queue capacity reached") - return ErrQueueIsFull - } + err := pq.sizedChannel.push(permanentQueueEl{}, pq.set.Sizer.Sizeof(req), func() error { + itemKey := getItemKey(pq.writeIndex) + newIndex := pq.writeIndex + 1 + + reqBuf, err := pq.set.Marshaler(req) + if err != nil { + return err + } - itemKey := getItemKey(pq.writeIndex) - newIndex := pq.writeIndex + 1 + // Carry out a transaction where we both add the item and update the write index + ops := []storage.Operation{ + storage.SetOperation(writeIndexKey, itemIndexToBytes(newIndex)), + storage.SetOperation(itemKey, reqBuf), + } + if storageErr := pq.client.Batch(ctx, ops...); storageErr != nil { + return storageErr + } - reqBuf, err := pq.set.Marshaler(req) + pq.writeIndex = newIndex + return nil + }) if err != nil { - pq.queueCapacityLimiter.release(req) return err } - // Carry out a transaction where we both add the item and update the write index - ops := []storage.Operation{ - storage.SetOperation(writeIndexKey, itemIndexToBytes(newIndex)), - storage.SetOperation(itemKey, reqBuf), - } - if storageErr := pq.client.Batch(ctx, ops...); storageErr != nil { - pq.queueCapacityLimiter.release(req) - return storageErr - } - - pq.writeIndex = newIndex - // Inform the loop that there's some data to process - pq.putChan <- struct{}{} - // Back up the queue size to storage every 10 writes. The stored value is used to recover the queue size // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. if (pq.writeIndex % 10) == 5 { @@ -337,16 +342,6 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), return request, nil, false } - pq.releaseCapacity(request) - - // Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size - // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. - if (pq.readIndex % 10) == 0 { - if qsErr := pq.backupQueueSize(ctx); qsErr != nil { - pq.logger.Error("Error writing queue size to storage", zap.Error(err)) - } - } - // Increase the reference count, so the client is not closed while the request is being processed. // The client cannot be closed because we hold the lock since last we checked `stopped`. pq.refClient++ @@ -371,29 +366,18 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), pq.logger.Error("Error deleting item from queue", zap.Error(err)) } - }, true -} - -// releaseCapacity releases the capacity of the queue. The caller must hold the mutex. -func (pq *persistentQueue[T]) releaseCapacity(req T) { - // If the recovered queue size is not emptied yet, decrease it first. - if pq.initIndexSize > 0 { - pq.initIndexSize-- - if pq.initIndexSize == 0 { - pq.initQueueSize.Store(0) - return - } - reqSize := pq.queueCapacityLimiter.sizeOf(req) - if pq.initQueueSize.Load() < reqSize { - pq.initQueueSize.Store(0) - return + // Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size + // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. + if (pq.readIndex % 10) == 0 { + if qsErr := pq.backupQueueSize(ctx); qsErr != nil { + pq.logger.Error("Error writing queue size to storage", zap.Error(err)) + } } - pq.initQueueSize.Add(^(reqSize - 1)) - return - } - // Otherwise, decrease the current queue size. - pq.queueCapacityLimiter.release(req) + // Ensure the used size and the channel size are in sync. + pq.sizedChannel.syncSize() + + }, true } // retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage diff --git a/exporter/internal/queue/persistent_queue_test.go b/exporter/internal/queue/persistent_queue_test.go index 2b525fa792f..6385088d5b1 100644 --- a/exporter/internal/queue/persistent_queue_test.go +++ b/exporter/internal/queue/persistent_queue_test.go @@ -54,7 +54,7 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component { } // createAndStartTestPersistentQueue creates and starts a fake queue with the given capacity and number of consumers. -func createAndStartTestPersistentQueue(t *testing.T, sizer Sizer[tracesRequest], capacity int, numConsumers int, +func createAndStartTestPersistentQueue(t *testing.T, sizer Sizer[tracesRequest], capacity int64, numConsumers int, consumeFunc func(_ context.Context, item tracesRequest) error) Queue[tracesRequest] { pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{ Sizer: sizer, @@ -90,16 +90,16 @@ func createTestPersistentQueueWithClient(client storage.Client) *persistentQueue return pq } -func createTestPersistentQueueWithRequestsCapacity(t testing.TB, ext storage.Extension, capacity int) *persistentQueue[tracesRequest] { +func createTestPersistentQueueWithRequestsCapacity(t testing.TB, ext storage.Extension, capacity int64) *persistentQueue[tracesRequest] { return createTestPersistentQueueWithCapacityLimiter(t, ext, &RequestSizer[tracesRequest]{}, capacity) } -func createTestPersistentQueueWithItemsCapacity(t testing.TB, ext storage.Extension, capacity int) *persistentQueue[tracesRequest] { +func createTestPersistentQueueWithItemsCapacity(t testing.TB, ext storage.Extension, capacity int64) *persistentQueue[tracesRequest] { return createTestPersistentQueueWithCapacityLimiter(t, ext, &ItemsSizer[tracesRequest]{}, capacity) } func createTestPersistentQueueWithCapacityLimiter(t testing.TB, ext storage.Extension, sizer Sizer[tracesRequest], - capacity int) *persistentQueue[tracesRequest] { + capacity int64) *persistentQueue[tracesRequest] { pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{ Sizer: sizer, Capacity: capacity, @@ -117,7 +117,7 @@ func TestPersistentQueue_FullCapacity(t *testing.T) { tests := []struct { name string sizer Sizer[tracesRequest] - capacity int + capacity int64 sizeMultiplier int }{ { @@ -518,8 +518,6 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) { require.NoError(t, err) } - // get one item out, but don't mark it as processed - <-ps.putChan require.True(t, ps.Consume(func(context.Context, tracesRequest) error { // put one more item in require.NoError(t, ps.Offer(context.Background(), req)) @@ -847,34 +845,132 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { newPQ := createTestPersistentQueueWithItemsCapacity(t, ext, 100) - // The queue items size cannot be restored to the previous size. Falls back to 0. - assert.Equal(t, 0, newPQ.Size()) + // The queue items size cannot be restored, fall back to request-based size + assert.Equal(t, 2, newPQ.Size()) assert.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) - // Only new items are reflected - assert.Equal(t, 10, newPQ.Size()) + // Only new items are correctly reflected + assert.Equal(t, 12, newPQ.Size()) - // Consuming old items should does not affect the size. + // Consuming a restored request should reduce the restored size by 20 but it should not go to below zero assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 20, traces.traces.SpanCount()) return nil })) - assert.Equal(t, 10, newPQ.Size()) + assert.Equal(t, 0, newPQ.Size()) + // Consuming another restored request should not affect the restored size since it's already dropped to 0. assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 25, traces.traces.SpanCount()) return nil })) - assert.Equal(t, 10, newPQ.Size()) + assert.Equal(t, 0, newPQ.Size()) + + // Adding another batch should update the size accordingly + assert.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(5, 5))) + assert.Equal(t, 25, newPQ.Size()) assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 10, traces.traces.SpanCount()) return nil })) + assert.Equal(t, 15, newPQ.Size()) + + assert.NoError(t, newPQ.Shutdown(context.Background())) +} + +// This test covers the case when the queue is restarted with the less capacity than needed to restore the queued items. +// In that case, the queue has to be restored anyway even if it exceeds the capacity limit. +func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { + ext := NewMockStorageExtension(nil) + pq := createTestPersistentQueueWithRequestsCapacity(t, ext, 100) + + assert.Equal(t, 0, pq.Size()) + + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(4, 10))) + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(2, 10))) + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(5, 5))) + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(1, 5))) + + // Read the first request just to populate the read index in the storage. + // Otherwise, the write index won't be restored either. + assert.True(t, pq.Consume(func(_ context.Context, traces tracesRequest) error { + assert.Equal(t, 40, traces.traces.SpanCount()) + return nil + })) + assert.Equal(t, 3, pq.Size()) + + assert.NoError(t, pq.Shutdown(context.Background())) + + // The queue is restarted with the less capacity than needed to restore the queued items, but with the same + // underlying storage. No need to drop requests that are over capacity since they are already in the storage. + newPQ := createTestPersistentQueueWithRequestsCapacity(t, ext, 2) + + // The queue items size cannot be restored, fall back to request-based size + assert.Equal(t, 3, newPQ.Size()) + + // Queue is full + assert.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + + assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.Equal(t, 20, traces.traces.SpanCount()) + return nil + })) + assert.Equal(t, 2, newPQ.Size()) + + // Still full + assert.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + + assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.Equal(t, 25, traces.traces.SpanCount()) + return nil + })) + assert.Equal(t, 1, newPQ.Size()) + + // Now it can accept new items + assert.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + + assert.NoError(t, newPQ.Shutdown(context.Background())) +} + +// This test covers the case when the persistent storage is recovered from a snapshot which has +// bigger value for the used size than the size of the actual items in the storage. +func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) { + ext := NewMockStorageExtension(nil) + pq := createTestPersistentQueueWithItemsCapacity(t, ext, 1000) + + assert.Equal(t, 0, pq.Size()) + + for i := 0; i < 6; i++ { + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(2, 5))) + } + assert.Equal(t, 60, pq.Size()) + + // Consume 30 items + for i := 0; i < 3; i++ { + assert.True(t, pq.Consume(func(context.Context, tracesRequest) error { return nil })) + } + // The used size is now 30, but the snapshot should have 50, because it's taken every 5 read/writes. + assert.Equal(t, 30, pq.Size()) + + // Create a new queue pointed to the same storage + newPQ := createTestPersistentQueueWithItemsCapacity(t, ext, 1000) + + // This is an incorrect size restored from the snapshot. + // In reality the size should be 30. Once the queue is drained, it will be updated to the correct size. + assert.Equal(t, 50, newPQ.Size()) + + assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil })) + assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil })) + assert.Equal(t, 30, newPQ.Size()) + + // Now the size must be correctly reflected + assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil })) assert.Equal(t, 0, newPQ.Size()) assert.NoError(t, newPQ.Shutdown(context.Background())) + assert.NoError(t, pq.Shutdown(context.Background())) } func requireCurrentlyDispatchedItemsEqual(t *testing.T, pq *persistentQueue[tracesRequest], compare []uint64) { diff --git a/exporter/internal/queue/queue.go b/exporter/internal/queue/queue.go index 0ae0703b05d..35bc504579e 100644 --- a/exporter/internal/queue/queue.go +++ b/exporter/internal/queue/queue.go @@ -34,3 +34,26 @@ type Queue[T any] interface { // Capacity returns the capacity of the queue. Capacity() int } + +type itemsCounter interface { + ItemsCount() int +} + +// Sizer is an interface that returns the size of the given element. +type Sizer[T any] interface { + Sizeof(T) int64 +} + +// ItemsSizer is a Sizer implementation that returns the size of a queue element as the number of items it contains. +type ItemsSizer[T itemsCounter] struct{} + +func (is *ItemsSizer[T]) Sizeof(el T) int64 { + return int64(el.ItemsCount()) +} + +// RequestSizer is a Sizer implementation that returns the size of a queue element as one request. +type RequestSizer[T any] struct{} + +func (rs *RequestSizer[T]) Sizeof(T) int64 { + return 1 +} diff --git a/exporter/internal/queue/queue_capacity.go b/exporter/internal/queue/queue_capacity.go deleted file mode 100644 index 1995febcd63..00000000000 --- a/exporter/internal/queue/queue_capacity.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" - -import ( - "sync/atomic" -) - -type itemsCounter interface { - ItemsCount() int -} - -// Sizer is an interface that returns the size of the given element. -type Sizer[T any] interface { - SizeOf(T) uint64 -} - -// ItemsSizer is a Sizer implementation that returns the size of a queue element as the number of items it contains. -type ItemsSizer[T itemsCounter] struct{} - -func (is *ItemsSizer[T]) SizeOf(el T) uint64 { - return uint64(el.ItemsCount()) -} - -// RequestSizer is a Sizer implementation that returns the size of a queue element as one request. -type RequestSizer[T any] struct{} - -func (rs *RequestSizer[T]) SizeOf(T) uint64 { - return 1 -} - -type queueCapacityLimiter[T any] struct { - used *atomic.Uint64 - cap uint64 - sz Sizer[T] -} - -func (bcl queueCapacityLimiter[T]) Capacity() int { - return int(bcl.cap) -} - -func (bcl queueCapacityLimiter[T]) Size() int { - return int(bcl.used.Load()) -} - -func (bcl queueCapacityLimiter[T]) claim(el T) bool { - size := bcl.sizeOf(el) - if bcl.used.Add(size) > bcl.cap { - bcl.releaseSize(size) - return false - } - return true -} - -func (bcl queueCapacityLimiter[T]) release(el T) { - bcl.releaseSize(bcl.sizeOf(el)) -} - -func (bcl queueCapacityLimiter[T]) releaseSize(size uint64) { - bcl.used.Add(^(size - 1)) -} - -func (bcl queueCapacityLimiter[T]) sizeOf(el T) uint64 { - return bcl.sz.SizeOf(el) -} - -func newQueueCapacityLimiter[T any](sizer Sizer[T], capacity int) *queueCapacityLimiter[T] { - return &queueCapacityLimiter[T]{ - used: &atomic.Uint64{}, - cap: uint64(capacity), - sz: sizer, - } -} diff --git a/exporter/internal/queue/queue_capacity_test.go b/exporter/internal/queue/queue_capacity_test.go deleted file mode 100644 index 3dd6ad2b898..00000000000 --- a/exporter/internal/queue/queue_capacity_test.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package queue - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestRequestsCapacityLimiter(t *testing.T) { - rl := newQueueCapacityLimiter[fakeReq](&RequestSizer[fakeReq]{}, 2) - assert.Equal(t, 0, rl.Size()) - assert.Equal(t, 2, rl.Capacity()) - - req := fakeReq{itemsCount: 5} - - assert.True(t, rl.claim(req)) - assert.Equal(t, 1, rl.Size()) - - assert.True(t, rl.claim(req)) - assert.Equal(t, 2, rl.Size()) - - assert.False(t, rl.claim(req)) - assert.Equal(t, 2, rl.Size()) - - rl.release(req) - assert.Equal(t, 1, rl.Size()) -} - -func TestItemsCapacityLimiter(t *testing.T) { - rl := newQueueCapacityLimiter[fakeReq](&ItemsSizer[fakeReq]{}, 7) - assert.Equal(t, 0, rl.Size()) - assert.Equal(t, 7, rl.Capacity()) - - req := fakeReq{itemsCount: 3} - - assert.True(t, rl.claim(req)) - assert.Equal(t, 3, rl.Size()) - - assert.True(t, rl.claim(req)) - assert.Equal(t, 6, rl.Size()) - - assert.False(t, rl.claim(req)) - assert.Equal(t, 6, rl.Size()) - - rl.release(req) - assert.Equal(t, 3, rl.Size()) -} - -type fakeReq struct { - itemsCount int -} - -func (r fakeReq) ItemsCount() int { - return r.itemsCount -} diff --git a/exporter/internal/queue/sized_channel.go b/exporter/internal/queue/sized_channel.go new file mode 100644 index 00000000000..1702a38ac2f --- /dev/null +++ b/exporter/internal/queue/sized_channel.go @@ -0,0 +1,104 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import "sync/atomic" + +// sizedChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements. +// The channel will accept elements until the total size of the elements reaches the capacity. +type sizedChannel[T any] struct { + used *atomic.Int64 + + // We need to store the capacity in a separate field because the capacity of the channel can be higher. + // It happens when we restore a persistent queue from a disk that is bigger than the pre-configured capacity. + cap int64 + ch chan T +} + +// newSizedChannel creates a sized elements channel. Each element is assigned a size by the provided sizer. +// chanCapacity is the capacity of the underlying channel which usually should be equal to the capacity of the queue to +// avoid blocking the producer. Optionally, the channel can be preloaded with the elements and their total size. +func newSizedChannel[T any](capacity int64, els []T, totalSize int64) *sizedChannel[T] { + used := &atomic.Int64{} + used.Store(totalSize) + + chCap := capacity + if chCap < int64(len(els)) { + chCap = int64(len(els)) + } + + ch := make(chan T, chCap) + for _, el := range els { + ch <- el + } + + return &sizedChannel[T]{ + used: used, + cap: capacity, + ch: ch, + } +} + +// push puts the element into the queue with the given sized if there is enough capacity. +// Returns an error if the queue is full. The callback is called before the element is committed to the queue. +// If the callback returns an error, the element is not put into the queue and the error is returned. +// The size is the size of the element MUST be positive. +func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error { + if vcq.used.Add(size) > vcq.cap { + vcq.used.Add(-size) + return ErrQueueIsFull + } + if callback != nil { + if err := callback(); err != nil { + vcq.used.Add(-size) + return err + } + } + vcq.ch <- el + return nil +} + +// pop removes the element from the queue and returns it. +// The call blocks until there is an item available or the queue is stopped. +// The function returns true when an item is consumed or false if the queue is stopped and emptied. +// The callback is called before the element is removed from the queue. It must return the size of the element. +func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) { + el, ok := <-vcq.ch + if !ok { + return el, false + } + + size := callback(el) + + // The used size and the channel size might be not in sync with the queue in case it's restored from the disk + // because we don't flush the current queue size on the disk on every read/write. + // In that case we need to make sure it doesn't go below 0. + if vcq.used.Add(-size) < 0 { + vcq.used.Store(0) + } + return el, true +} + +// syncSize updates the used size to 0 if the queue is empty. +// The caller must ensure that this call is not called concurrently with push. +// It's used by the persistent queue to ensure the used value correctly reflects the reality which may not be always +// the case in case if the queue size is restored from the disk after a crash. +func (vcq *sizedChannel[T]) syncSize() { + if len(vcq.ch) == 0 { + vcq.used.Store(0) + } +} + +// shutdown closes the queue channel to initiate draining of the queue. +func (vcq *sizedChannel[T]) shutdown() { + close(vcq.ch) +} + +func (vcq *sizedChannel[T]) Size() int { + return int(vcq.used.Load()) +} + +func (vcq *sizedChannel[T]) Capacity() int { + return int(vcq.cap) +} diff --git a/exporter/internal/queue/sized_channel_test.go b/exporter/internal/queue/sized_channel_test.go new file mode 100644 index 00000000000..02cd4bf8e68 --- /dev/null +++ b/exporter/internal/queue/sized_channel_test.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSizedCapacityChannel(t *testing.T) { + q := newSizedChannel[int](7, nil, 0) + assert.NoError(t, q.push(1, 1, nil)) + assert.Equal(t, 1, q.Size()) + assert.Equal(t, 7, q.Capacity()) + + // failed callback should not allow the element to be added + assert.Error(t, q.push(2, 2, func() error { return errors.New("failed") })) + assert.Equal(t, 1, q.Size()) + + assert.NoError(t, q.push(3, 3, nil)) + assert.Equal(t, 4, q.Size()) + + // should not be able to send to the full queue + assert.Error(t, q.push(4, 4, nil)) + assert.Equal(t, 4, q.Size()) + + el, ok := q.pop(func(el int) int64 { return int64(el) }) + assert.Equal(t, 1, el) + assert.True(t, ok) + assert.Equal(t, 3, q.Size()) + + el, ok = q.pop(func(el int) int64 { return int64(el) }) + assert.Equal(t, 3, el) + assert.True(t, ok) + assert.Equal(t, 0, q.Size()) + + q.shutdown() + el, ok = q.pop(func(el int) int64 { return int64(el) }) + assert.False(t, ok) + assert.Equal(t, 0, el) +}