diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index 97d8220a2d4..6caceb115e2 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -6,7 +6,6 @@ package exporterhelper import ( "context" "errors" - "sync/atomic" "testing" "time" @@ -365,38 +364,39 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { } func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { - produceCounter := &atomic.Uint32{} - qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 + storageID := component.NewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence to ensure data is re-queued on shutdown + rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered - be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg)) + mockReq := newErrorRequest() + be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(mockReq), + newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - // wraps original queue so we can count operations - be.queueSender.(*queueSender).queue = &producerConsumerQueueWithCounter{ - Queue: be.queueSender.(*queueSender).queue, - produceCounter: produceCounter, + var extensions = map[component.ID]component.Component{ + storageID: internal.NewMockStorageExtension(nil), } - be.queueSender.(*queueSender).requeuingEnabled = true + host := &mockHost{ext: extensions} - require.NoError(t, be.Start(context.Background(), &mockHost{})) + require.NoError(t, be.Start(context.Background(), host)) // Invoke queuedRetrySender so the producer will put the item for consumer to poll - require.NoError(t, be.send(context.Background(), newErrorRequest())) + require.NoError(t, be.send(context.Background(), mockReq)) - // first wait for the item to be produced to the queue initially + // first wait for the item to be consumed from the queue assert.Eventually(t, func() bool { - return produceCounter.Load() == uint32(1) + return be.queueSender.(*queueSender).queue.Size() == 0 }, time.Second, 1*time.Millisecond) // shuts down and ensure the item is produced in the queue again require.NoError(t, be.Shutdown(context.Background())) assert.Eventually(t, func() bool { - return produceCounter.Load() == uint32(2) + return be.queueSender.(*queueSender).queue.Size() == 1 }, time.Second, 1*time.Millisecond) } diff --git a/exporter/exporterhelper/retry_sender_test.go b/exporter/exporterhelper/retry_sender_test.go index 235a9f3ccbc..89a7ab577ce 100644 --- a/exporter/exporterhelper/retry_sender_test.go +++ b/exporter/exporterhelper/retry_sender_test.go @@ -21,12 +21,11 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/testdata" ) -func mockRequestUnmarshaler(mr *mockRequest) RequestUnmarshaler { +func mockRequestUnmarshaler(mr Request) RequestUnmarshaler { return func(bytes []byte) (Request, error) { return mr, nil } @@ -405,13 +404,3 @@ func tagsMatchLabelKeys(tags []tag.Tag, keys []metricdata.LabelKey, labels []met } return true } - -type producerConsumerQueueWithCounter struct { - internal.Queue[Request] - produceCounter *atomic.Uint32 -} - -func (pcq *producerConsumerQueueWithCounter) Offer(ctx context.Context, item Request) error { - pcq.produceCounter.Add(1) - return pcq.Queue.Offer(ctx, item) -}