Skip to content

Commit

Permalink
[chore] [exporterhelper] Make memory queue benchmarks deteministic (#…
Browse files Browse the repository at this point in the history
…9247)

Ensure the queue always has enough capacity to make the benchmark
results deterministic. Otherwise, it's unpredictable when the queue
starts rejecting the new entries. It depends on how fast it processes
them, which can significantly fluctuate.

Also, remove the multiple consumers use cases from benchmarks as
redundant. Not sure they provide any value.
  • Loading branch information
dmitryax authored Jan 9, 2024
1 parent bb90069 commit cf4b97c
Showing 1 changed file with 22 additions and 45 deletions.
67 changes: 22 additions & 45 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package internal

import (
"context"
"errors"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -97,75 +96,53 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
}))
}

func Benchmark_QueueUsage_10000_requests_1_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 1, 50000)
func Benchmark_QueueUsage_1000_requests(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 1000)
}

func Benchmark_QueueUsage_10000_requests_10_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 10, 50000)
func Benchmark_QueueUsage_100000_requests(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 100000)
}

func Benchmark_QueueUsage_50000_requests_1_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 50000, 1, 50000)
func Benchmark_QueueUsage_10000_items(b *testing.B) {
// each request has 10 items: 1000 requests = 10000 items
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 1000)
}

func Benchmark_QueueUsage_50000_requests_10_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 50000, 10, 50000)
}

func Benchmark_QueueUsage_10000_requests_1_250000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 1, 250000)
}

func Benchmark_QueueUsage_10000_requests_10_250000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 10, 250000)
}

func Benchmark_QueueUsage_1M_items_10_250k(b *testing.B) {
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 1000000, 10, 250000)
}

func Benchmark_QueueUsage_1M_items_10_1M(b *testing.B) {
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 1000000, 10, 1000000)
}

func Benchmark_QueueUsage_100M_items_10_10M(b *testing.B) {
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 100000000, 10, 10000000)
func Benchmark_QueueUsage_1M_items(b *testing.B) {
// each request has 10 items: 100000 requests = 1M items
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 100000)
}

func TestQueueUsage(t *testing.T) {
t.Run("with enough workers", func(t *testing.T) {
queueUsage(t, &RequestSizer[fakeReq]{}, 10000, 5, 1000)
t.Run("requests_based", func(t *testing.T) {
queueUsage(t, &RequestSizer[fakeReq]{}, 10)
})
t.Run("past capacity", func(t *testing.T) {
queueUsage(t, &RequestSizer[fakeReq]{}, 10000, 2, 50000)
t.Run("items_based", func(t *testing.T) {
queueUsage(t, &ItemsSizer[fakeReq]{}, 10)
})
}

func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], capacity int, numConsumers int,
numberOfItems int) {
func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], requestsCount int) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
queueUsage(b, sizer, capacity, numConsumers, numberOfItems)
queueUsage(b, sizer, requestsCount)
}
}

func queueUsage(tb testing.TB, sizer Sizer[fakeReq], capacity int, numConsumers int, numberOfItems int) {
func queueUsage(tb testing.TB, sizer Sizer[fakeReq], requestsCount int) {
var wg sync.WaitGroup
wg.Add(numberOfItems)
q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: capacity})
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, fakeReq) error {
wg.Add(requestsCount)
q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: 10 * requestsCount})
consumers := NewQueueConsumers(q, 1, func(context.Context, fakeReq) error {
wg.Done()
return nil
})
require.NoError(tb, consumers.Start(context.Background(), componenttest.NewNopHost()))
for j := 0; j < numberOfItems; j++ {
if err := q.Offer(context.Background(), fakeReq{10}); errors.Is(err, ErrQueueIsFull) {
wg.Done()
}
for j := 0; j < requestsCount; j++ {
require.NoError(tb, q.Offer(context.Background(), fakeReq{10}))
}
assert.NoError(tb, consumers.Shutdown(context.Background()))

wg.Wait()
}

Expand Down

0 comments on commit cf4b97c

Please sign in to comment.