diff --git a/exporter/exporterqueue/queue.go b/exporter/exporterqueue/queue.go index e97392097a3..5b568851b66 100644 --- a/exporter/exporterqueue/queue.go +++ b/exporter/exporterqueue/queue.go @@ -95,7 +95,7 @@ func sizerFromConfig[T itemsCounter](Config) queue.Sizer[T] { return &queue.RequestSizer[T]{} } -func capacityFromConfig(cfg Config) int { +func capacityFromConfig(cfg Config) int64 { // TODO: Handle other ways to measure the queue size once they are added. - return cfg.QueueSize + return int64(cfg.QueueSize) } diff --git a/exporter/internal/queue/bounded_memory_queue.go b/exporter/internal/queue/bounded_memory_queue.go index 9f85e8496bf..98e1b281176 100644 --- a/exporter/internal/queue/bounded_memory_queue.go +++ b/exporter/internal/queue/bounded_memory_queue.go @@ -16,43 +16,38 @@ import ( // the producer are dropped. type boundedMemoryQueue[T any] struct { component.StartFunc - *queueCapacityLimiter[T] - items chan queueRequest[T] + *sizedChannel[memQueueEl[T]] + sizer Sizer[T] } // MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation. type MemoryQueueSettings[T any] struct { Sizer Sizer[T] - Capacity int + Capacity int64 } // NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional // callback for dropped items (e.g. useful to emit metrics). func NewBoundedMemoryQueue[T any](set MemoryQueueSettings[T]) Queue[T] { return &boundedMemoryQueue[T]{ - queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity), - items: make(chan queueRequest[T], set.Capacity), + sizedChannel: newSizedChannel[memQueueEl[T]](set.Capacity, nil, 0), + sizer: set.Sizer, } } // Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic. func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { - if !q.queueCapacityLimiter.claim(req) { - return ErrQueueIsFull - } - q.items <- queueRequest[T]{ctx: ctx, req: req} - return nil + return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil) } // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped and emptied. func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - item, ok := <-q.items + item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) }) if !ok { return false } - q.queueCapacityLimiter.release(item.req) // the memory queue doesn't handle consume errors _ = consumeFunc(item.ctx, item.req) return true @@ -60,11 +55,11 @@ func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) err // Shutdown closes the queue channel to initiate draining of the queue. func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error { - close(q.items) + q.sizedChannel.shutdown() return nil } -type queueRequest[T any] struct { +type memQueueEl[T any] struct { req T ctx context.Context } diff --git a/exporter/internal/queue/bounded_memory_queue_test.go b/exporter/internal/queue/bounded_memory_queue_test.go index d861893eda9..ae98dca41f1 100644 --- a/exporter/internal/queue/bounded_memory_queue_test.go +++ b/exporter/internal/queue/bounded_memory_queue_test.go @@ -133,7 +133,7 @@ func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], requestsCount int) func queueUsage(tb testing.TB, sizer Sizer[fakeReq], requestsCount int) { var wg sync.WaitGroup wg.Add(requestsCount) - q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: 10 * requestsCount}) + q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: int64(10 * requestsCount)}) consumers := NewQueueConsumers(q, 1, func(context.Context, fakeReq) error { wg.Done() return nil @@ -156,3 +156,11 @@ func TestZeroSizeNoConsumers(t *testing.T) { assert.NoError(t, q.Shutdown(context.Background())) } + +type fakeReq struct { + itemsCount int +} + +func (r fakeReq) ItemsCount() int { + return r.itemsCount +} diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index 80a71f6bad7..07b0d1fb628 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -10,7 +10,6 @@ import ( "fmt" "strconv" "sync" - "sync/atomic" "go.uber.org/multierr" "go.uber.org/zap" @@ -44,7 +43,10 @@ import ( // index index x // xxxx deleted type persistentQueue[T any] struct { - *queueCapacityLimiter[T] + // sizedChannel is used by the persistent queue for two purposes: + // 1. a communication channel notifying the consumer that a new item is available. + // 2. capacity control based on the size of the items. + *sizedChannel[permanentQueueEl] set PersistentQueueSettings[T] logger *zap.Logger @@ -53,14 +55,10 @@ type persistentQueue[T any] struct { // isRequestSized indicates whether the queue is sized by the number of requests. isRequestSized bool - putChan chan struct{} - // mu guards everything declared below. mu sync.Mutex readIndex uint64 writeIndex uint64 - initIndexSize uint64 - initQueueSize *atomic.Uint64 currentlyDispatchedItems []uint64 refClient int64 stopped bool @@ -86,7 +84,7 @@ var ( type PersistentQueueSettings[T any] struct { Sizer Sizer[T] - Capacity int + Capacity int64 DataType component.DataType StorageID component.ID Marshaler func(req T) ([]byte, error) @@ -98,12 +96,9 @@ type PersistentQueueSettings[T any] struct { func NewPersistentQueue[T any](set PersistentQueueSettings[T]) Queue[T] { _, isRequestSized := set.Sizer.(*RequestSizer[T]) return &persistentQueue[T]{ - queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity), - set: set, - logger: set.ExporterSettings.Logger, - initQueueSize: &atomic.Uint64{}, - isRequestSized: isRequestSized, - putChan: make(chan struct{}, set.Capacity), + set: set, + logger: set.ExporterSettings.Logger, + isRequestSized: isRequestSized, } } @@ -148,39 +143,49 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex pq.readIndex = 0 pq.writeIndex = 0 } - pq.initIndexSize = pq.writeIndex - pq.readIndex - // Ensure the communication channel has the same size as the queue - for i := 0; i < int(pq.initIndexSize); i++ { - pq.putChan <- struct{}{} - } + initIndexSize := pq.writeIndex - pq.readIndex + + var ( + initEls []permanentQueueEl + initQueueSize uint64 + ) - // Read snapshot of the queue size from storage. It's not a problem if the value cannot be fetched, - // or it's not accurate. The queue size will be corrected once the recovered queue is drained. - if pq.initIndexSize > 0 { + // Pre-allocate the communication channel with the size of the restored queue. + if initIndexSize > 0 { + initQueueSize = initIndexSize // If the queue is sized by the number of requests, no need to read the queue size from storage. - if pq.isRequestSized { - pq.initQueueSize.Store(pq.initIndexSize) - return + if !pq.isRequestSized { + if restoredQueueSize, err := pq.restoreQueueSizeFromStorage(ctx); err == nil { + initQueueSize = restoredQueueSize + } } - res, err := pq.client.Get(ctx, queueSizeKey) - if err == nil { - var restoredQueueSize uint64 - restoredQueueSize, err = bytesToItemIndex(res) - pq.initQueueSize.Store(restoredQueueSize) - } - if err != nil { - if errors.Is(err, errValueNotSet) { - pq.logger.Warn("Cannot read the queue size snapshot from storage. "+ - "The reported queue size will be inaccurate until the initial queue is drained. "+ - "It's expected when the items sized queue enabled for the first time", zap.Error(err)) - } else { - pq.logger.Error("Failed to read the queue size snapshot from storage. "+ - "The reported queue size will be inaccurate until the initial queue is drained.", zap.Error(err)) - } + // Ensure the communication channel filled with evenly sized elements up to the total restored queue size. + initEls = make([]permanentQueueEl, initIndexSize) + } + + pq.sizedChannel = newSizedChannel[permanentQueueEl](pq.set.Capacity, initEls, int64(initQueueSize)) +} + +// permanentQueueEl is the type of the elements passed to the sizedChannel by the persistentQueue. +type permanentQueueEl struct{} + +// restoreQueueSizeFromStorage restores the queue size from storage. +func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) (uint64, error) { + val, err := pq.client.Get(ctx, queueSizeKey) + if err != nil { + if errors.Is(err, errValueNotSet) { + pq.logger.Warn("Cannot read the queue size snapshot from storage. "+ + "The reported queue size will be inaccurate until the initial queue is drained. "+ + "It's expected when the items sized queue enabled for the first time", zap.Error(err)) + } else { + pq.logger.Error("Failed to read the queue size snapshot from storage. "+ + "The reported queue size will be inaccurate until the initial queue is drained.", zap.Error(err)) } + return 0, err } + return bytesToItemIndex(val) } // Consume applies the provided function on the head of queue. @@ -188,14 +193,24 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex // The function returns true when an item is consumed or false if the queue is stopped. func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { for { + var ( + req T + onProcessingFinished func(error) + consumed bool + ) + // If we are stopped we still process all the other events in the channel before, but we // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. - _, ok := <-pq.putChan + _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { + req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) + if !consumed { + return 0 + } + return pq.set.Sizer.Sizeof(req) + }) if !ok { return false } - - req, onProcessingFinished, consumed := pq.getNextItem(context.Background()) if consumed { onProcessingFinished(consumeFunc(context.Background(), req)) return true @@ -203,31 +218,24 @@ func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error } } -// Size returns the current size of the queue. -func (pq *persistentQueue[T]) Size() int { - return int(pq.initQueueSize.Load()) + pq.queueCapacityLimiter.Size() -} - func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error { - close(pq.putChan) + // If the queue is not initialized, there is nothing to shut down. + if pq.client == nil { + return nil + } + pq.mu.Lock() defer pq.mu.Unlock() + backupErr := pq.backupQueueSize(ctx) + pq.sizedChannel.shutdown() // Mark this queue as stopped, so consumer don't start any more work. pq.stopped = true - return multierr.Combine( - pq.backupQueueSize(ctx), - pq.unrefClient(ctx), - ) + return multierr.Combine(backupErr, pq.unrefClient(ctx)) } // backupQueueSize writes the current queue size to storage. The value is used to recover the queue size // in case if the collector is killed. func (pq *persistentQueue[T]) backupQueueSize(ctx context.Context) error { - // Client can be nil if the queue is not initialized yet. - if pq.client == nil { - return nil - } - // No need to write the queue size if the queue is sized by the number of requests. // That information is already stored as difference between read and write indexes. if pq.isRequestSized { @@ -258,34 +266,31 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error { // putInternal is the internal version that requires caller to hold the mutex lock. func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { - if !pq.queueCapacityLimiter.claim(req) { - pq.logger.Warn("Maximum queue capacity reached") - return ErrQueueIsFull - } + err := pq.sizedChannel.push(permanentQueueEl{}, pq.set.Sizer.Sizeof(req), func() error { + itemKey := getItemKey(pq.writeIndex) + newIndex := pq.writeIndex + 1 + + reqBuf, err := pq.set.Marshaler(req) + if err != nil { + return err + } - itemKey := getItemKey(pq.writeIndex) - newIndex := pq.writeIndex + 1 + // Carry out a transaction where we both add the item and update the write index + ops := []storage.Operation{ + storage.SetOperation(writeIndexKey, itemIndexToBytes(newIndex)), + storage.SetOperation(itemKey, reqBuf), + } + if storageErr := pq.client.Batch(ctx, ops...); storageErr != nil { + return storageErr + } - reqBuf, err := pq.set.Marshaler(req) + pq.writeIndex = newIndex + return nil + }) if err != nil { - pq.queueCapacityLimiter.release(req) return err } - // Carry out a transaction where we both add the item and update the write index - ops := []storage.Operation{ - storage.SetOperation(writeIndexKey, itemIndexToBytes(newIndex)), - storage.SetOperation(itemKey, reqBuf), - } - if storageErr := pq.client.Batch(ctx, ops...); storageErr != nil { - pq.queueCapacityLimiter.release(req) - return storageErr - } - - pq.writeIndex = newIndex - // Inform the loop that there's some data to process - pq.putChan <- struct{}{} - // Back up the queue size to storage every 10 writes. The stored value is used to recover the queue size // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. if (pq.writeIndex % 10) == 5 { @@ -337,16 +342,6 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), return request, nil, false } - pq.releaseCapacity(request) - - // Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size - // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. - if (pq.readIndex % 10) == 0 { - if qsErr := pq.backupQueueSize(ctx); qsErr != nil { - pq.logger.Error("Error writing queue size to storage", zap.Error(err)) - } - } - // Increase the reference count, so the client is not closed while the request is being processed. // The client cannot be closed because we hold the lock since last we checked `stopped`. pq.refClient++ @@ -371,29 +366,18 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), pq.logger.Error("Error deleting item from queue", zap.Error(err)) } - }, true -} - -// releaseCapacity releases the capacity of the queue. The caller must hold the mutex. -func (pq *persistentQueue[T]) releaseCapacity(req T) { - // If the recovered queue size is not emptied yet, decrease it first. - if pq.initIndexSize > 0 { - pq.initIndexSize-- - if pq.initIndexSize == 0 { - pq.initQueueSize.Store(0) - return - } - reqSize := pq.queueCapacityLimiter.sizeOf(req) - if pq.initQueueSize.Load() < reqSize { - pq.initQueueSize.Store(0) - return + // Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size + // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. + if (pq.readIndex % 10) == 0 { + if qsErr := pq.backupQueueSize(ctx); qsErr != nil { + pq.logger.Error("Error writing queue size to storage", zap.Error(err)) + } } - pq.initQueueSize.Add(^(reqSize - 1)) - return - } - // Otherwise, decrease the current queue size. - pq.queueCapacityLimiter.release(req) + // Ensure the used size and the channel size are in sync. + pq.sizedChannel.syncSize() + + }, true } // retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage diff --git a/exporter/internal/queue/persistent_queue_test.go b/exporter/internal/queue/persistent_queue_test.go index 2b525fa792f..6385088d5b1 100644 --- a/exporter/internal/queue/persistent_queue_test.go +++ b/exporter/internal/queue/persistent_queue_test.go @@ -54,7 +54,7 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component { } // createAndStartTestPersistentQueue creates and starts a fake queue with the given capacity and number of consumers. -func createAndStartTestPersistentQueue(t *testing.T, sizer Sizer[tracesRequest], capacity int, numConsumers int, +func createAndStartTestPersistentQueue(t *testing.T, sizer Sizer[tracesRequest], capacity int64, numConsumers int, consumeFunc func(_ context.Context, item tracesRequest) error) Queue[tracesRequest] { pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{ Sizer: sizer, @@ -90,16 +90,16 @@ func createTestPersistentQueueWithClient(client storage.Client) *persistentQueue return pq } -func createTestPersistentQueueWithRequestsCapacity(t testing.TB, ext storage.Extension, capacity int) *persistentQueue[tracesRequest] { +func createTestPersistentQueueWithRequestsCapacity(t testing.TB, ext storage.Extension, capacity int64) *persistentQueue[tracesRequest] { return createTestPersistentQueueWithCapacityLimiter(t, ext, &RequestSizer[tracesRequest]{}, capacity) } -func createTestPersistentQueueWithItemsCapacity(t testing.TB, ext storage.Extension, capacity int) *persistentQueue[tracesRequest] { +func createTestPersistentQueueWithItemsCapacity(t testing.TB, ext storage.Extension, capacity int64) *persistentQueue[tracesRequest] { return createTestPersistentQueueWithCapacityLimiter(t, ext, &ItemsSizer[tracesRequest]{}, capacity) } func createTestPersistentQueueWithCapacityLimiter(t testing.TB, ext storage.Extension, sizer Sizer[tracesRequest], - capacity int) *persistentQueue[tracesRequest] { + capacity int64) *persistentQueue[tracesRequest] { pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{ Sizer: sizer, Capacity: capacity, @@ -117,7 +117,7 @@ func TestPersistentQueue_FullCapacity(t *testing.T) { tests := []struct { name string sizer Sizer[tracesRequest] - capacity int + capacity int64 sizeMultiplier int }{ { @@ -518,8 +518,6 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) { require.NoError(t, err) } - // get one item out, but don't mark it as processed - <-ps.putChan require.True(t, ps.Consume(func(context.Context, tracesRequest) error { // put one more item in require.NoError(t, ps.Offer(context.Background(), req)) @@ -847,34 +845,132 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { newPQ := createTestPersistentQueueWithItemsCapacity(t, ext, 100) - // The queue items size cannot be restored to the previous size. Falls back to 0. - assert.Equal(t, 0, newPQ.Size()) + // The queue items size cannot be restored, fall back to request-based size + assert.Equal(t, 2, newPQ.Size()) assert.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) - // Only new items are reflected - assert.Equal(t, 10, newPQ.Size()) + // Only new items are correctly reflected + assert.Equal(t, 12, newPQ.Size()) - // Consuming old items should does not affect the size. + // Consuming a restored request should reduce the restored size by 20 but it should not go to below zero assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 20, traces.traces.SpanCount()) return nil })) - assert.Equal(t, 10, newPQ.Size()) + assert.Equal(t, 0, newPQ.Size()) + // Consuming another restored request should not affect the restored size since it's already dropped to 0. assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 25, traces.traces.SpanCount()) return nil })) - assert.Equal(t, 10, newPQ.Size()) + assert.Equal(t, 0, newPQ.Size()) + + // Adding another batch should update the size accordingly + assert.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(5, 5))) + assert.Equal(t, 25, newPQ.Size()) assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 10, traces.traces.SpanCount()) return nil })) + assert.Equal(t, 15, newPQ.Size()) + + assert.NoError(t, newPQ.Shutdown(context.Background())) +} + +// This test covers the case when the queue is restarted with the less capacity than needed to restore the queued items. +// In that case, the queue has to be restored anyway even if it exceeds the capacity limit. +func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { + ext := NewMockStorageExtension(nil) + pq := createTestPersistentQueueWithRequestsCapacity(t, ext, 100) + + assert.Equal(t, 0, pq.Size()) + + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(4, 10))) + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(2, 10))) + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(5, 5))) + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(1, 5))) + + // Read the first request just to populate the read index in the storage. + // Otherwise, the write index won't be restored either. + assert.True(t, pq.Consume(func(_ context.Context, traces tracesRequest) error { + assert.Equal(t, 40, traces.traces.SpanCount()) + return nil + })) + assert.Equal(t, 3, pq.Size()) + + assert.NoError(t, pq.Shutdown(context.Background())) + + // The queue is restarted with the less capacity than needed to restore the queued items, but with the same + // underlying storage. No need to drop requests that are over capacity since they are already in the storage. + newPQ := createTestPersistentQueueWithRequestsCapacity(t, ext, 2) + + // The queue items size cannot be restored, fall back to request-based size + assert.Equal(t, 3, newPQ.Size()) + + // Queue is full + assert.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + + assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.Equal(t, 20, traces.traces.SpanCount()) + return nil + })) + assert.Equal(t, 2, newPQ.Size()) + + // Still full + assert.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + + assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.Equal(t, 25, traces.traces.SpanCount()) + return nil + })) + assert.Equal(t, 1, newPQ.Size()) + + // Now it can accept new items + assert.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + + assert.NoError(t, newPQ.Shutdown(context.Background())) +} + +// This test covers the case when the persistent storage is recovered from a snapshot which has +// bigger value for the used size than the size of the actual items in the storage. +func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) { + ext := NewMockStorageExtension(nil) + pq := createTestPersistentQueueWithItemsCapacity(t, ext, 1000) + + assert.Equal(t, 0, pq.Size()) + + for i := 0; i < 6; i++ { + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(2, 5))) + } + assert.Equal(t, 60, pq.Size()) + + // Consume 30 items + for i := 0; i < 3; i++ { + assert.True(t, pq.Consume(func(context.Context, tracesRequest) error { return nil })) + } + // The used size is now 30, but the snapshot should have 50, because it's taken every 5 read/writes. + assert.Equal(t, 30, pq.Size()) + + // Create a new queue pointed to the same storage + newPQ := createTestPersistentQueueWithItemsCapacity(t, ext, 1000) + + // This is an incorrect size restored from the snapshot. + // In reality the size should be 30. Once the queue is drained, it will be updated to the correct size. + assert.Equal(t, 50, newPQ.Size()) + + assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil })) + assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil })) + assert.Equal(t, 30, newPQ.Size()) + + // Now the size must be correctly reflected + assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil })) assert.Equal(t, 0, newPQ.Size()) assert.NoError(t, newPQ.Shutdown(context.Background())) + assert.NoError(t, pq.Shutdown(context.Background())) } func requireCurrentlyDispatchedItemsEqual(t *testing.T, pq *persistentQueue[tracesRequest], compare []uint64) { diff --git a/exporter/internal/queue/queue.go b/exporter/internal/queue/queue.go index 0ae0703b05d..35bc504579e 100644 --- a/exporter/internal/queue/queue.go +++ b/exporter/internal/queue/queue.go @@ -34,3 +34,26 @@ type Queue[T any] interface { // Capacity returns the capacity of the queue. Capacity() int } + +type itemsCounter interface { + ItemsCount() int +} + +// Sizer is an interface that returns the size of the given element. +type Sizer[T any] interface { + Sizeof(T) int64 +} + +// ItemsSizer is a Sizer implementation that returns the size of a queue element as the number of items it contains. +type ItemsSizer[T itemsCounter] struct{} + +func (is *ItemsSizer[T]) Sizeof(el T) int64 { + return int64(el.ItemsCount()) +} + +// RequestSizer is a Sizer implementation that returns the size of a queue element as one request. +type RequestSizer[T any] struct{} + +func (rs *RequestSizer[T]) Sizeof(T) int64 { + return 1 +} diff --git a/exporter/internal/queue/queue_capacity.go b/exporter/internal/queue/queue_capacity.go deleted file mode 100644 index 1995febcd63..00000000000 --- a/exporter/internal/queue/queue_capacity.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" - -import ( - "sync/atomic" -) - -type itemsCounter interface { - ItemsCount() int -} - -// Sizer is an interface that returns the size of the given element. -type Sizer[T any] interface { - SizeOf(T) uint64 -} - -// ItemsSizer is a Sizer implementation that returns the size of a queue element as the number of items it contains. -type ItemsSizer[T itemsCounter] struct{} - -func (is *ItemsSizer[T]) SizeOf(el T) uint64 { - return uint64(el.ItemsCount()) -} - -// RequestSizer is a Sizer implementation that returns the size of a queue element as one request. -type RequestSizer[T any] struct{} - -func (rs *RequestSizer[T]) SizeOf(T) uint64 { - return 1 -} - -type queueCapacityLimiter[T any] struct { - used *atomic.Uint64 - cap uint64 - sz Sizer[T] -} - -func (bcl queueCapacityLimiter[T]) Capacity() int { - return int(bcl.cap) -} - -func (bcl queueCapacityLimiter[T]) Size() int { - return int(bcl.used.Load()) -} - -func (bcl queueCapacityLimiter[T]) claim(el T) bool { - size := bcl.sizeOf(el) - if bcl.used.Add(size) > bcl.cap { - bcl.releaseSize(size) - return false - } - return true -} - -func (bcl queueCapacityLimiter[T]) release(el T) { - bcl.releaseSize(bcl.sizeOf(el)) -} - -func (bcl queueCapacityLimiter[T]) releaseSize(size uint64) { - bcl.used.Add(^(size - 1)) -} - -func (bcl queueCapacityLimiter[T]) sizeOf(el T) uint64 { - return bcl.sz.SizeOf(el) -} - -func newQueueCapacityLimiter[T any](sizer Sizer[T], capacity int) *queueCapacityLimiter[T] { - return &queueCapacityLimiter[T]{ - used: &atomic.Uint64{}, - cap: uint64(capacity), - sz: sizer, - } -} diff --git a/exporter/internal/queue/queue_capacity_test.go b/exporter/internal/queue/queue_capacity_test.go deleted file mode 100644 index 3dd6ad2b898..00000000000 --- a/exporter/internal/queue/queue_capacity_test.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package queue - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestRequestsCapacityLimiter(t *testing.T) { - rl := newQueueCapacityLimiter[fakeReq](&RequestSizer[fakeReq]{}, 2) - assert.Equal(t, 0, rl.Size()) - assert.Equal(t, 2, rl.Capacity()) - - req := fakeReq{itemsCount: 5} - - assert.True(t, rl.claim(req)) - assert.Equal(t, 1, rl.Size()) - - assert.True(t, rl.claim(req)) - assert.Equal(t, 2, rl.Size()) - - assert.False(t, rl.claim(req)) - assert.Equal(t, 2, rl.Size()) - - rl.release(req) - assert.Equal(t, 1, rl.Size()) -} - -func TestItemsCapacityLimiter(t *testing.T) { - rl := newQueueCapacityLimiter[fakeReq](&ItemsSizer[fakeReq]{}, 7) - assert.Equal(t, 0, rl.Size()) - assert.Equal(t, 7, rl.Capacity()) - - req := fakeReq{itemsCount: 3} - - assert.True(t, rl.claim(req)) - assert.Equal(t, 3, rl.Size()) - - assert.True(t, rl.claim(req)) - assert.Equal(t, 6, rl.Size()) - - assert.False(t, rl.claim(req)) - assert.Equal(t, 6, rl.Size()) - - rl.release(req) - assert.Equal(t, 3, rl.Size()) -} - -type fakeReq struct { - itemsCount int -} - -func (r fakeReq) ItemsCount() int { - return r.itemsCount -} diff --git a/exporter/internal/queue/sized_channel.go b/exporter/internal/queue/sized_channel.go new file mode 100644 index 00000000000..1702a38ac2f --- /dev/null +++ b/exporter/internal/queue/sized_channel.go @@ -0,0 +1,104 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import "sync/atomic" + +// sizedChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements. +// The channel will accept elements until the total size of the elements reaches the capacity. +type sizedChannel[T any] struct { + used *atomic.Int64 + + // We need to store the capacity in a separate field because the capacity of the channel can be higher. + // It happens when we restore a persistent queue from a disk that is bigger than the pre-configured capacity. + cap int64 + ch chan T +} + +// newSizedChannel creates a sized elements channel. Each element is assigned a size by the provided sizer. +// chanCapacity is the capacity of the underlying channel which usually should be equal to the capacity of the queue to +// avoid blocking the producer. Optionally, the channel can be preloaded with the elements and their total size. +func newSizedChannel[T any](capacity int64, els []T, totalSize int64) *sizedChannel[T] { + used := &atomic.Int64{} + used.Store(totalSize) + + chCap := capacity + if chCap < int64(len(els)) { + chCap = int64(len(els)) + } + + ch := make(chan T, chCap) + for _, el := range els { + ch <- el + } + + return &sizedChannel[T]{ + used: used, + cap: capacity, + ch: ch, + } +} + +// push puts the element into the queue with the given sized if there is enough capacity. +// Returns an error if the queue is full. The callback is called before the element is committed to the queue. +// If the callback returns an error, the element is not put into the queue and the error is returned. +// The size is the size of the element MUST be positive. +func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error { + if vcq.used.Add(size) > vcq.cap { + vcq.used.Add(-size) + return ErrQueueIsFull + } + if callback != nil { + if err := callback(); err != nil { + vcq.used.Add(-size) + return err + } + } + vcq.ch <- el + return nil +} + +// pop removes the element from the queue and returns it. +// The call blocks until there is an item available or the queue is stopped. +// The function returns true when an item is consumed or false if the queue is stopped and emptied. +// The callback is called before the element is removed from the queue. It must return the size of the element. +func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) { + el, ok := <-vcq.ch + if !ok { + return el, false + } + + size := callback(el) + + // The used size and the channel size might be not in sync with the queue in case it's restored from the disk + // because we don't flush the current queue size on the disk on every read/write. + // In that case we need to make sure it doesn't go below 0. + if vcq.used.Add(-size) < 0 { + vcq.used.Store(0) + } + return el, true +} + +// syncSize updates the used size to 0 if the queue is empty. +// The caller must ensure that this call is not called concurrently with push. +// It's used by the persistent queue to ensure the used value correctly reflects the reality which may not be always +// the case in case if the queue size is restored from the disk after a crash. +func (vcq *sizedChannel[T]) syncSize() { + if len(vcq.ch) == 0 { + vcq.used.Store(0) + } +} + +// shutdown closes the queue channel to initiate draining of the queue. +func (vcq *sizedChannel[T]) shutdown() { + close(vcq.ch) +} + +func (vcq *sizedChannel[T]) Size() int { + return int(vcq.used.Load()) +} + +func (vcq *sizedChannel[T]) Capacity() int { + return int(vcq.cap) +} diff --git a/exporter/internal/queue/sized_channel_test.go b/exporter/internal/queue/sized_channel_test.go new file mode 100644 index 00000000000..02cd4bf8e68 --- /dev/null +++ b/exporter/internal/queue/sized_channel_test.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSizedCapacityChannel(t *testing.T) { + q := newSizedChannel[int](7, nil, 0) + assert.NoError(t, q.push(1, 1, nil)) + assert.Equal(t, 1, q.Size()) + assert.Equal(t, 7, q.Capacity()) + + // failed callback should not allow the element to be added + assert.Error(t, q.push(2, 2, func() error { return errors.New("failed") })) + assert.Equal(t, 1, q.Size()) + + assert.NoError(t, q.push(3, 3, nil)) + assert.Equal(t, 4, q.Size()) + + // should not be able to send to the full queue + assert.Error(t, q.push(4, 4, nil)) + assert.Equal(t, 4, q.Size()) + + el, ok := q.pop(func(el int) int64 { return int64(el) }) + assert.Equal(t, 1, el) + assert.True(t, ok) + assert.Equal(t, 3, q.Size()) + + el, ok = q.pop(func(el int) int64 { return int64(el) }) + assert.Equal(t, 3, el) + assert.True(t, ok) + assert.Equal(t, 0, q.Size()) + + q.shutdown() + el, ok = q.pop(func(el int) int64 { return int64(el) }) + assert.False(t, ok) + assert.Equal(t, 0, el) +}