Skip to content

Commit

Permalink
fix(server): Datastore test pipeline (#3261)
Browse files Browse the repository at this point in the history
  • Loading branch information
xoscar committed Oct 13, 2023
1 parent e4debd9 commit a91e7a8
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 3 deletions.
1 change: 1 addition & 0 deletions server/app/app.go
Expand Up @@ -281,6 +281,7 @@ func (app *App) Start(opts ...appOption) error {
tracer,
tracedbFactory,
app.cfg,
meter,
)

dsTestPipeline.Start()
Expand Down
4 changes: 3 additions & 1 deletion server/app/ds_test_connection_pipeline.go
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/kubeshop/tracetest/server/pkg/pipeline"
"github.com/kubeshop/tracetest/server/testconnection"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -15,13 +16,14 @@ func buildDataStoreTestPipeline(
tracer trace.Tracer,
newTraceDBFn tracedb.FactoryFunc,
appConfig *config.AppConfig,
meter metric.Meter,
) *testconnection.DataStoreTestPipeline {
requestWorker := testconnection.NewDsTestConnectionRequest(tracer, newTraceDBFn, appConfig.DataStorePipelineTestConnectionEnabled())
notifyWorker := testconnection.NewDsTestConnectionNotify(dsTestListener, tracer)

pgQueue := pipeline.NewPostgresQueueDriver[testconnection.Job](pool, pgChannelName)

pipeline := pipeline.New(&testconnection.Configurer[testconnection.Job]{},
pipeline := pipeline.New(testconnection.NewConfigurer(meter),
pipeline.Step[testconnection.Job]{Processor: requestWorker, Driver: pgQueue.Channel("datastore_test_connection_request")},
pipeline.Step[testconnection.Job]{Processor: notifyWorker, Driver: pgQueue.Channel("datastore_test_connection_notify")},
)
Expand Down
13 changes: 11 additions & 2 deletions server/testconnection/pipeline.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/pkg/pipeline"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel/metric"
)

type Job struct {
Expand All @@ -30,9 +31,17 @@ type DataStoreTestPipeline struct {
dsTestListener DsTestListener
}

type Configurer[T any] struct{}
type Configurer[T any] struct {
meter metric.Meter
}

func NewConfigurer(meter metric.Meter) *Configurer[Job] {
return &Configurer[Job]{meter: meter}
}

func (c *Configurer[Job]) Configure(_ *pipeline.Queue[Job]) {}
func (c *Configurer[Job]) Configure(queue *pipeline.Queue[Job]) {
queue.InitializeMetrics(c.meter)
}

func NewDataStoreTestPipeline(
pipeline *pipeline.Pipeline[Job],
Expand Down

0 comments on commit a91e7a8

Please sign in to comment.