From 784ef19b0adaabc54eb9cf7047b0ee7d0bc19aea Mon Sep 17 00:00:00 2001 From: Matheus Nogueira Date: Wed, 4 Oct 2023 12:34:50 -0300 Subject: [PATCH] feat: report metrics about the test pipeline queues (#3213) * feat(server): report metrics about the test pipeline queues * fix: test_suitte_runner_test --- server/app/app.go | 2 ++ server/app/test_pipeline.go | 5 +++- server/app/test_suite_pipeline.go | 5 +++- server/executor/queue.go | 10 ++++++++ server/executor/test_suite_runner_test.go | 4 +++- server/pkg/pipeline/postgres_driver.go | 4 ++++ server/pkg/pipeline/queue.go | 28 +++++++++++++++++++++-- 7 files changed, 53 insertions(+), 5 deletions(-) diff --git a/server/app/app.go b/server/app/app.go index cb6a878cb1..bae622b020 100644 --- a/server/app/app.go +++ b/server/app/app.go @@ -256,6 +256,7 @@ func (app *App) Start(opts ...appOption) error { triggerRegistry, tracedbFactory, app.cfg, + meter, ) testPipeline.Start() app.registerStopFn(func() { @@ -267,6 +268,7 @@ func (app *App) Start(opts ...appOption) error { testSuiteRunRepository, testPipeline, subscriptionManager, + meter, ) testSuitePipeline.Start() diff --git a/server/app/test_pipeline.go b/server/app/test_pipeline.go index d777c9cdfd..06937776e5 100644 --- a/server/app/test_pipeline.go +++ b/server/app/test_pipeline.go @@ -15,6 +15,7 @@ import ( "github.com/kubeshop/tracetest/server/subscription" "github.com/kubeshop/tracetest/server/test" "github.com/kubeshop/tracetest/server/tracedb" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" ) @@ -32,6 +33,7 @@ func buildTestPipeline( triggerRegistry *trigger.Registry, tracedbFactory tracedb.FactoryFunc, appConfig *config.AppConfig, + meter metric.Meter, ) *executor.TestPipeline { eventEmitter := executor.NewEventEmitter(treRepo, subscriptionManager) @@ -116,7 +118,8 @@ func buildTestPipeline( WithPollingProfileGetter(ppRepo). WithTestGetter(testRepo). WithRunGetter(runRepo). - WithInstanceID(instanceID) + WithInstanceID(instanceID). + WithMetricMeter(meter) pgQueue := pipeline.NewPostgresQueueDriver[executor.Job](pool, pgChannelName) diff --git a/server/app/test_suite_pipeline.go b/server/app/test_suite_pipeline.go index 15c5f86a2f..9b25c15dd9 100644 --- a/server/app/test_suite_pipeline.go +++ b/server/app/test_suite_pipeline.go @@ -5,6 +5,7 @@ import ( "github.com/kubeshop/tracetest/server/pkg/pipeline" "github.com/kubeshop/tracetest/server/subscription" "github.com/kubeshop/tracetest/server/testsuite" + "go.opentelemetry.io/otel/metric" ) func buildTestSuitePipeline( @@ -12,11 +13,13 @@ func buildTestSuitePipeline( runRepo *testsuite.RunRepository, testRunner *executor.TestPipeline, subscriptionManager *subscription.Manager, + meter metric.Meter, ) *executor.TestSuitesPipeline { tranRunner := executor.NewTestSuiteRunner(testRunner, runRepo, subscriptionManager) queueBuilder := executor.NewQueueConfigurer(). WithTestSuiteGetter(tranRepo). - WithTestSuiteRunGetter(runRepo) + WithTestSuiteRunGetter(runRepo). + WithMetricMeter(meter) pipeline := pipeline.New(queueBuilder, pipeline.Step[executor.Job]{Processor: tranRunner, Driver: pipeline.NewInMemoryQueueDriver[executor.Job]("testSuiteRunner")}, diff --git a/server/executor/queue.go b/server/executor/queue.go index 76f99031e3..0ec002eafc 100644 --- a/server/executor/queue.go +++ b/server/executor/queue.go @@ -17,6 +17,7 @@ import ( "github.com/kubeshop/tracetest/server/subscription" "github.com/kubeshop/tracetest/server/test" "github.com/kubeshop/tracetest/server/testsuite" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" ) @@ -187,6 +188,8 @@ type queueConfigurer[T any] struct { pollingProfiles pollingProfileGetter dataStores dataStoreGetter + meter metric.Meter + instanceID string } @@ -239,6 +242,11 @@ func (qb *queueConfigurer[T]) WithTestSuiteRunGetter(suiteRuns testSuiteRunGette return qb } +func (qb *queueConfigurer[T]) WithMetricMeter(meter metric.Meter) *queueConfigurer[T] { + qb.meter = meter + return qb +} + func (qb *queueConfigurer[T]) Configure(queue *pipeline.Queue[Job]) { q := &Queue{ cancelRunHandlerFn: qb.cancelRunHandlerFn, @@ -259,6 +267,8 @@ func (qb *queueConfigurer[T]) Configure(queue *pipeline.Queue[Job]) { queue.EnqueuePreprocessorFn = q.enqueuePreprocess queue.ListenPreprocessorFn = q.listenPreprocess + queue.InitializeMetrics(qb.meter) + } type Queue struct { diff --git a/server/executor/test_suite_runner_test.go b/server/executor/test_suite_runner_test.go index 42113923bd..1176353e71 100644 --- a/server/executor/test_suite_runner_test.go +++ b/server/executor/test_suite_runner_test.go @@ -21,6 +21,7 @@ import ( "github.com/kubeshop/tracetest/server/variableset" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric/noop" ) type fakeTestRunner struct { @@ -163,7 +164,8 @@ func runTestSuiteRunnerTest(t *testing.T, withErrors bool, assert func(t *testin queueBuilder := executor.NewQueueConfigurer(). WithTestSuiteGetter(transactionsRepo). - WithTestSuiteRunGetter(transactionRunRepo) + WithTestSuiteRunGetter(transactionRunRepo). + WithMetricMeter(noop.NewMeterProvider().Meter("noop")) pipeline := pipeline.New(queueBuilder, pipeline.Step[executor.Job]{Processor: runner, Driver: pipeline.NewInMemoryQueueDriver[executor.Job]("testSuiteRunner")}, diff --git a/server/pkg/pipeline/postgres_driver.go b/server/pkg/pipeline/postgres_driver.go index 934d29edc9..d84b3f44cb 100644 --- a/server/pkg/pipeline/postgres_driver.go +++ b/server/pkg/pipeline/postgres_driver.go @@ -203,3 +203,7 @@ func (ch *channel[T]) Enqueue(item T) { ch.log("notified postgres") } + +func (ch *channel[T]) Name() string { + return ch.name +} diff --git a/server/pkg/pipeline/queue.go b/server/pkg/pipeline/queue.go index 035421ca8b..cc8e5e091a 100644 --- a/server/pkg/pipeline/queue.go +++ b/server/pkg/pipeline/queue.go @@ -4,6 +4,8 @@ import ( "context" "github.com/alitto/pond" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" ) type Enqueuer[T any] interface { @@ -22,9 +24,16 @@ type QueueDriver[T any] interface { SetListener(Listener[T]) } +type namedDriver interface { + Name() string +} + type Queue[T any] struct { - driver QueueDriver[T] - itemProcessor QueueItemProcessor[T] + name string + driver QueueDriver[T] + itemProcessor QueueItemProcessor[T] + enqueueCounter metric.Int64Counter + listenCounter metric.Int64Counter EnqueuePreprocessorFn func(context.Context, T) T ListenPreprocessorFn func(context.Context, T) (context.Context, T) @@ -43,11 +52,20 @@ func NewQueue[T any](driver QueueDriver[T], itemProcessor QueueItemProcessor[T]) workerPool: pond.New(QueueWorkerCount, QueueWorkerBufferSize), } + if namedDriver, ok := driver.(namedDriver); ok { + queue.name = namedDriver.Name() + } + queue.SetDriver(driver) return queue } +func (q *Queue[T]) InitializeMetrics(meter metric.Meter) { + q.enqueueCounter, _ = meter.Int64Counter("messaging.enqueue.count") + q.listenCounter, _ = meter.Int64Counter("messaging.listen.count") +} + func (q *Queue[T]) SetDriver(driver QueueDriver[T]) { q.driver = driver driver.SetListener(q) @@ -67,6 +85,9 @@ func (q Queue[T]) Enqueue(ctx context.Context, item T) { item = q.EnqueuePreprocessorFn(ctx, item) } + q.enqueueCounter.Add(ctx, 1, metric.WithAttributes( + attribute.String("queue.name", q.name), + )) q.driver.Enqueue(item) }) } @@ -86,6 +107,9 @@ func (q Queue[T]) Listen(item T) { } q.workerPool.Submit(func() { + q.listenCounter.Add(ctx, 1, metric.WithAttributes( + attribute.String("queue.name", q.name), + )) q.itemProcessor.ProcessItem(ctx, item) }) }