From 121ec577d79fba543e879a6f544f36ab63f1b9b2 Mon Sep 17 00:00:00 2001 From: Daniel Baptista Dias Date: Thu, 18 Jan 2024 16:22:23 -0300 Subject: [PATCH] fix: make test connection async on trace polling step (#3534) * fix: make test connection async on trace polling step * Update server/executor/tracepollerworker/starter_worker.go Co-authored-by: Matheus Nogueira --------- Co-authored-by: Matheus Nogueira --- agent/workers/poller.go | 2 +- server/app/app.go | 33 +++++----- server/app/test_pipeline.go | 3 + .../tracepollerworker/starter_worker.go | 66 +++++++++++++++---- 4 files changed, 73 insertions(+), 31 deletions(-) diff --git a/agent/workers/poller.go b/agent/workers/poller.go index 7704c7a38e..cb73eac008 100644 --- a/agent/workers/poller.go +++ b/agent/workers/poller.go @@ -108,7 +108,7 @@ func (w *PollerWorker) poll(ctx context.Context, request *proto.PollingRequest) w.logger.Error("Invalid datastore", zap.Error(err)) return err } - w.logger.Debug("Converted datastore", zap.Any("datastore", datastoreConfig)) + w.logger.Debug("Converted datastore", zap.Any("datastore", datastoreConfig), zap.Any("originalDatastore", request.Datastore)) if datastoreConfig == nil { w.logger.Error("Invalid datastore: nil") diff --git a/server/app/app.go b/server/app/app.go index 7c25f08c96..539f3e2eeb 100644 --- a/server/app/app.go +++ b/server/app/app.go @@ -247,6 +247,22 @@ func (app *App) Start(opts ...appOption) error { registerOtlpServer(app, tracesRepo, runRepo, eventEmitter, dataStoreRepo, subscriptionManager, tracer) } + testConnectionDriverFactory := pipeline.NewDriverFactory[testconnection.Job](natsConn) + dsTestListener := testconnection.NewListener() + dsTestPipeline := buildDataStoreTestPipeline( + testConnectionDriverFactory, + dsTestListener, + tracer, + tracedbFactory, + app.cfg, + meter, + ) + + dsTestPipeline.Start() + app.registerStopFn(func() { + dsTestPipeline.Stop() + }) + executorDriverFactory := pipeline.NewDriverFactory[executor.Job](natsConn) testPipeline := buildTestPipeline( executorDriverFactory, @@ -262,6 +278,7 @@ func (app *App) Start(opts ...appOption) error { subscriptionManager, triggerRegistry, tracedbFactory, + dsTestPipeline, app.cfg, meter, ) @@ -283,22 +300,6 @@ func (app *App) Start(opts ...appOption) error { testSuitePipeline.Stop() }) - testConnectionDriverFactory := pipeline.NewDriverFactory[testconnection.Job](natsConn) - dsTestListener := testconnection.NewListener() - dsTestPipeline := buildDataStoreTestPipeline( - testConnectionDriverFactory, - dsTestListener, - tracer, - tracedbFactory, - app.cfg, - meter, - ) - - dsTestPipeline.Start() - app.registerStopFn(func() { - dsTestPipeline.Stop() - }) - err = analytics.SendEvent("Server Started", "beacon", "", nil) if err != nil { return err diff --git a/server/app/test_pipeline.go b/server/app/test_pipeline.go index 6da3b91ff1..1db1e4d0e5 100644 --- a/server/app/test_pipeline.go +++ b/server/app/test_pipeline.go @@ -14,6 +14,7 @@ import ( "github.com/kubeshop/tracetest/server/pkg/pipeline" "github.com/kubeshop/tracetest/server/subscription" "github.com/kubeshop/tracetest/server/test" + "github.com/kubeshop/tracetest/server/testconnection" "github.com/kubeshop/tracetest/server/tracedb" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" @@ -33,6 +34,7 @@ func buildTestPipeline( subscriptionManager subscription.Manager, triggerRegistry *trigger.Registry, tracedbFactory tracedb.FactoryFunc, + dataStoreTestPipeline *testconnection.DataStoreTestPipeline, appConfig *config.AppConfig, meter metric.Meter, ) *executor.TestPipeline { @@ -66,6 +68,7 @@ func buildTestPipeline( execTestUpdater, subscriptionManager, tracer, + dataStoreTestPipeline, ) traceFetcherWorker := tracepollerworker.NewFetcherWorker( diff --git a/server/executor/tracepollerworker/starter_worker.go b/server/executor/tracepollerworker/starter_worker.go index 521e54b34c..1b6568880f 100644 --- a/server/executor/tracepollerworker/starter_worker.go +++ b/server/executor/tracepollerworker/starter_worker.go @@ -5,20 +5,31 @@ import ( "errors" "fmt" "log" + "sync" "github.com/kubeshop/tracetest/server/datastore" "github.com/kubeshop/tracetest/server/executor" + "github.com/kubeshop/tracetest/server/model" "github.com/kubeshop/tracetest/server/model/events" "github.com/kubeshop/tracetest/server/pkg/pipeline" "github.com/kubeshop/tracetest/server/resourcemanager" "github.com/kubeshop/tracetest/server/subscription" + "github.com/kubeshop/tracetest/server/testconnection" "github.com/kubeshop/tracetest/server/tracedb" "go.opentelemetry.io/otel/trace" ) type tracePollerStarterWorker struct { - state *workerState - outputQueue pipeline.Enqueuer[executor.Job] + state *workerState + dsTestPipeline dataStorePipeline + outputQueue pipeline.Enqueuer[executor.Job] +} + +type dataStorePipeline interface { + Run(context.Context, testconnection.Job) + NewJob(context.Context, datastore.DataStore) testconnection.Job + Subscribe(string, testconnection.NotifierFn) error + Unsubscribe(string) } func NewStarterWorker( @@ -28,6 +39,7 @@ func NewStarterWorker( updater executor.RunUpdater, subscriptionManager subscription.Manager, tracer trace.Tracer, + dsTestPipeline dataStorePipeline, ) *tracePollerStarterWorker { state := &workerState{ eventEmitter: eventEmitter, @@ -38,7 +50,10 @@ func NewStarterWorker( tracer: tracer, } - return &tracePollerStarterWorker{state: state} + return &tracePollerStarterWorker{ + state: state, + dsTestPipeline: dsTestPipeline, // this is necessary just for this worker + } } func (w *tracePollerStarterWorker) SetInputQueue(queue pipeline.Enqueuer[executor.Job]) { @@ -85,29 +100,52 @@ func (w *tracePollerStarterWorker) ProcessItem(ctx context.Context, job executor emitEvent(ctx, w.state, events.TraceFetchingStart(job.Test.ID, job.Run.ID)) - err = w.testConnection(ctx, traceDB, &job) + endpoints := traceDB.GetEndpoints() + ds, err := w.state.dsRepo.Current(ctx) + if err != nil { + wrappedError := fmt.Errorf("could not get current datastore: %w", err) + handleError(ctx, job, wrappedError, w.state, span) + return + } + + connectionResult, err := w.testConnection(ctx, traceDB, ds) if err != nil { + log.Printf("[TracePoller] TestConnection error: %s", err.Error()) handleError(ctx, job, err, w.state, span) return } + if connectionResult != nil { + emitEvent(ctx, w.state, events.TraceDataStoreConnectionInfo(job.Test.ID, job.Run.ID, *connectionResult)) + } + + emitEvent(ctx, w.state, events.TracePollingStart(job.Test.ID, job.Run.ID, string(ds.Type), endpoints)) + w.outputQueue.Enqueue(ctx, job) } -func (w *tracePollerStarterWorker) testConnection(ctx context.Context, traceDB tracedb.TraceDB, job *executor.Job) error { - if testableTraceDB, ok := traceDB.(tracedb.TestableTraceDB); ok { - connectionResult := testableTraceDB.TestConnection(ctx) - - emitEvent(ctx, w.state, events.TraceDataStoreConnectionInfo(job.Test.ID, job.Run.ID, connectionResult)) +func (w *tracePollerStarterWorker) testConnection(ctx context.Context, traceDB tracedb.TraceDB, ds datastore.DataStore) (*model.ConnectionResult, error) { + _, ok := traceDB.(tracedb.TestableTraceDB) + if !ok { + return nil, nil } - endpoints := traceDB.GetEndpoints() - ds, err := w.state.dsRepo.Current(ctx) + job := w.dsTestPipeline.NewJob(ctx, ds) + + wg := sync.WaitGroup{} + err := w.dsTestPipeline.Subscribe(job.ID, func(result testconnection.Job) { + job = result + wg.Done() + }) + if err != nil { - return fmt.Errorf("could not get current datastore: %w", err) + return nil, err } - emitEvent(ctx, w.state, events.TracePollingStart(job.Test.ID, job.Run.ID, string(ds.Type), endpoints)) + wg.Add(1) + w.dsTestPipeline.Run(ctx, job) + wg.Wait() + w.dsTestPipeline.Unsubscribe(job.ID) - return nil + return &job.TestResult, nil }