From a78402517a0d3db189440ded28d3602a5488c089 Mon Sep 17 00:00:00 2001 From: Matheus Nogueira Date: Tue, 5 Sep 2023 18:00:21 -0300 Subject: [PATCH] chore(server): split trigger worker into 3 workers (#3120) * chore(server): split trigger worker into 3 workers * rename runner file and delete unit test * fix test * fix(trigger-worker): trace-id injection * address sebastian's comment * fix: instrumentation --- server/app/test_pipeline.go | 20 +- server/executor/poller_executor_test.go | 3 +- server/executor/runner.go | 303 -------------- server/executor/runner_test.go | 383 ------------------ server/executor/trigger/grpc.go | 2 +- server/executor/trigger/http.go | 2 +- server/executor/trigger/instrument.go | 2 +- server/executor/trigger/kafka.go | 2 +- server/executor/trigger/traceid.go | 2 +- server/executor/trigger/triggerer.go | 7 +- server/executor/trigger_executer_worker.go | 91 +++++ server/executor/trigger_resolver_worker.go | 147 +++++++ .../trigger_result_processor_worker.go | 137 +++++++ server/test/trigger/trigger.go | 13 + 14 files changed, 417 insertions(+), 697 deletions(-) delete mode 100644 server/executor/runner.go delete mode 100644 server/executor/runner_test.go create mode 100644 server/executor/trigger_executer_worker.go create mode 100644 server/executor/trigger_resolver_worker.go create mode 100644 server/executor/trigger_result_processor_worker.go diff --git a/server/app/test_pipeline.go b/server/app/test_pipeline.go index ba342f3d4b..3c4370739e 100644 --- a/server/app/test_pipeline.go +++ b/server/app/test_pipeline.go @@ -68,16 +68,28 @@ func buildTestPipeline( eventEmitter, ) - runner := executor.NewPersistentRunner( + triggerResolverWorker := executor.NewTriggerResolverWorker( triggerRegistry, execTestUpdater, tracer, - subscriptionManager, tracedbFactory, dsRepo, eventEmitter, ) + triggerExecuterWorker := executor.NewTriggerExecuterWorker( + triggerRegistry, + execTestUpdater, + tracer, + eventEmitter, + ) + + triggerResultProcessorWorker := executor.NewTriggerResultProcessorWorker( + tracer, + subscriptionManager, + eventEmitter, + ) + cancelRunHandlerFn := executor.HandleRunCancelation(execTestUpdater, tracer, eventEmitter) queueBuilder := executor.NewQueueBuilder(). @@ -92,7 +104,9 @@ func buildTestPipeline( pgQueue := executor.NewPostgresQueueDriver(pool) pipeline := executor.NewPipeline(queueBuilder, - executor.PipelineStep{Processor: runner, Driver: pgQueue.Channel("runner")}, + executor.PipelineStep{Processor: triggerResolverWorker, Driver: pgQueue.Channel("trigger_resolve")}, + executor.PipelineStep{Processor: triggerExecuterWorker, Driver: pgQueue.Channel("trigger_execute")}, + executor.PipelineStep{Processor: triggerResultProcessorWorker, Driver: pgQueue.Channel("trigger_result")}, executor.PipelineStep{Processor: tracePoller, Driver: pgQueue.Channel("tracePoller")}, executor.PipelineStep{Processor: linterRunner, Driver: pgQueue.Channel("linterRunner")}, executor.PipelineStep{Processor: assertionRunner, Driver: pgQueue.Channel("assertionRunner")}, diff --git a/server/executor/poller_executor_test.go b/server/executor/poller_executor_test.go index 0fdc203987..be5a019454 100644 --- a/server/executor/poller_executor_test.go +++ b/server/executor/poller_executor_test.go @@ -26,7 +26,8 @@ import ( ) var ( - randomIDGenerator = id.NewRandGenerator() + randomIDGenerator = id.NewRandGenerator() + noError error = nil ) func Test_PollerExecutor_ExecuteRequest_NoRootSpan_NoSpanCase(t *testing.T) { diff --git a/server/executor/runner.go b/server/executor/runner.go deleted file mode 100644 index 94cfe9f07c..0000000000 --- a/server/executor/runner.go +++ /dev/null @@ -1,303 +0,0 @@ -package executor - -import ( - "context" - "errors" - "fmt" - "net/url" - "os" - "strings" - - "github.com/kubeshop/tracetest/server/analytics" - "github.com/kubeshop/tracetest/server/datastore" - triggerer "github.com/kubeshop/tracetest/server/executor/trigger" - "github.com/kubeshop/tracetest/server/expression" - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/model/events" - "github.com/kubeshop/tracetest/server/subscription" - "github.com/kubeshop/tracetest/server/test" - "github.com/kubeshop/tracetest/server/test/trigger" - "github.com/kubeshop/tracetest/server/tracedb" - "go.opentelemetry.io/otel/trace" -) - -type RunResult struct { - Run test.Run - Err error -} - -type PersistentRunner interface { - QueueItemProcessor -} - -const ProcessorName = "test_runner" - -func NewPersistentRunner( - triggers *triggerer.Registry, - updater RunUpdater, - tracer trace.Tracer, - subscriptionManager *subscription.Manager, - newTraceDBFn tracedb.FactoryFunc, - dsRepo currentDataStoreGetter, - eventEmitter EventEmitter, -) *persistentRunner { - return &persistentRunner{ - triggers: triggers, - updater: updater, - tracer: tracer, - newTraceDBFn: newTraceDBFn, - dsRepo: dsRepo, - subscriptionManager: subscriptionManager, - eventEmitter: eventEmitter, - } -} - -type currentDataStoreGetter interface { - Current(context.Context) (datastore.DataStore, error) -} - -type persistentRunner struct { - triggers *triggerer.Registry - updater RunUpdater - tracer trace.Tracer - subscriptionManager *subscription.Manager - newTraceDBFn tracedb.FactoryFunc - dsRepo currentDataStoreGetter - eventEmitter EventEmitter - outputQueue Enqueuer -} - -func (r *persistentRunner) SetOutputQueue(queue Enqueuer) { - r.outputQueue = queue -} - -func (r persistentRunner) handleDBError(run test.Run, err error) { - if err != nil { - fmt.Printf("test %s run #%d trigger DB error: %s\n", run.TestID, run.ID, err.Error()) - } -} - -func (r persistentRunner) handleError(run test.Run, err error) { - if err != nil { - fmt.Printf("test %s run #%d trigger DB error: %s\n", run.TestID, run.ID, err.Error()) - } -} - -func (r persistentRunner) traceDB(ctx context.Context) (tracedb.TraceDB, error) { - ds, err := r.dsRepo.Current(ctx) - if err != nil { - return nil, fmt.Errorf("cannot get default datastore: %w", err) - } - - tdb, err := r.newTraceDBFn(ds) - if err != nil { - return nil, fmt.Errorf(`cannot get tracedb from DataStore config with ID "%s": %w`, ds.ID, err) - } - - return tdb, nil -} - -func (r persistentRunner) ProcessItem(ctx context.Context, job Job) { - run := job.Run.Start() - r.handleDBError(run, r.updater.Update(ctx, run)) - - err := r.eventEmitter.Emit(ctx, events.TriggerCreatedInfo(job.Run.TestID, job.Run.ID)) - if err != nil { - r.handleError(job.Run, err) - } - - triggererObj, err := r.triggers.Get(job.Test.Trigger.Type) - if err != nil { - r.handleError(job.Run, err) - } - - tdb, err := r.traceDB(ctx) - if err != nil { - r.handleError(job.Run, err) - } - - traceID := tdb.GetTraceID() - run.TraceID = traceID - r.handleDBError(run, r.updater.Update(ctx, run)) - - ds := []expression.DataStore{expression.VariableDataStore{ - Values: run.VariableSet.Values, - }} - - executor := expression.NewExecutor(ds...) - - triggerOptions := &triggerer.TriggerOptions{ - TraceID: traceID, - Executor: executor, - } - - err = r.eventEmitter.Emit(ctx, events.TriggerResolveStart(job.Run.TestID, job.Run.ID)) - if err != nil { - r.handleError(job.Run, err) - } - - resolvedTest, err := triggererObj.Resolve(ctx, job.Test, triggerOptions) - if err != nil { - emitErr := r.eventEmitter.Emit(ctx, events.TriggerResolveError(job.Run.TestID, job.Run.ID, err)) - if emitErr != nil { - r.handleError(job.Run, emitErr) - } - - r.handleError(job.Run, err) - } - - run.ResolvedTrigger = resolvedTest.Trigger - r.handleDBError(run, r.updater.Update(ctx, run)) - - err = r.eventEmitter.Emit(ctx, events.TriggerResolveSuccess(job.Run.TestID, job.Run.ID)) - if err != nil { - r.handleError(job.Run, err) - } - - if job.Test.Trigger.Type == trigger.TriggerTypeTraceID { - traceIDFromParam, err := trace.TraceIDFromHex(job.Test.Trigger.TraceID.ID) - if err == nil { - run.TraceID = traceIDFromParam - } - } - - err = r.eventEmitter.Emit(ctx, events.TriggerExecutionStart(job.Run.TestID, job.Run.ID)) - if err != nil { - r.handleError(job.Run, err) - } - - response, err := triggererObj.Trigger(ctx, resolvedTest, triggerOptions) - run = r.handleExecutionResult(run, response, err) - if err != nil { - if isConnectionError(err) { - r.emitUnreachableEndpointEvent(ctx, job, err) - - if isTargetLocalhost(job) && isServerRunningInsideContainer() { - r.emitMismatchEndpointEvent(ctx, job, err) - } - } - - emitErr := r.eventEmitter.Emit(ctx, events.TriggerExecutionError(job.Run.TestID, job.Run.ID, err)) - if emitErr != nil { - r.handleError(job.Run, emitErr) - } - - fmt.Printf("test %s run #%d trigger error: %s\n", run.TestID, run.ID, err.Error()) - r.subscriptionManager.PublishUpdate(subscription.Message{ - ResourceID: run.TransactionStepResourceID(), - Type: "run_update", - Content: RunResult{Run: run, Err: err}, - }) - } else { - err = r.eventEmitter.Emit(ctx, events.TriggerExecutionSuccess(job.Run.TestID, job.Run.ID)) - if err != nil { - r.handleDBError(job.Run, err) - } - } - - run.SpanID = response.SpanID - - r.handleDBError(run, r.updater.Update(ctx, run)) - if run.State != test.RunStateAwaitingTrace { - return - } - - job.Run = run - ctx, pollingSpan := r.tracer.Start(ctx, "Start Polling trace") - defer pollingSpan.End() - r.outputQueue.Enqueue(ctx, job) -} - -func (r persistentRunner) handleExecutionResult(run test.Run, response triggerer.Response, err error) test.Run { - run = run.TriggerCompleted(response.Result) - if err != nil { - run = run.TriggerFailed(err) - - analytics.SendEvent("test_run_finished", "error", "", &map[string]string{ - "finalState": string(run.State), - }) - - return run - } - - return run.SuccessfullyTriggered() -} - -func (r persistentRunner) emitUnreachableEndpointEvent(ctx context.Context, job Job, err error) { - var event model.TestRunEvent - switch job.Test.Trigger.Type { - case trigger.TriggerTypeHTTP: - event = events.TriggerHTTPUnreachableHostError(job.Run.TestID, job.Run.ID, err) - case trigger.TriggerTypeGRPC: - event = events.TriggergRPCUnreachableHostError(job.Run.TestID, job.Run.ID, err) - } - - emitErr := r.eventEmitter.Emit(ctx, event) - if emitErr != nil { - r.handleError(job.Run, emitErr) - } -} - -func (r persistentRunner) emitMismatchEndpointEvent(ctx context.Context, job Job, err error) { - emitErr := r.eventEmitter.Emit(ctx, events.TriggerDockerComposeHostMismatchError(job.Run.TestID, job.Run.ID)) - if emitErr != nil { - r.handleError(job.Run, emitErr) - } -} - -func isConnectionError(err error) bool { - for err != nil { - // a dial error means we couldn't open a TCP connection (either host is not available or DNS doesn't exist) - if strings.HasPrefix(err.Error(), "dial ") { - return true - } - - // it means a trigger timeout - if errors.Is(err, context.DeadlineExceeded) { - return true - } - - err = errors.Unwrap(err) - } - - return false -} - -func isTargetLocalhost(job Job) bool { - var endpoint string - switch job.Test.Trigger.Type { - case trigger.TriggerTypeHTTP: - endpoint = job.Test.Trigger.HTTP.URL - case trigger.TriggerTypeGRPC: - endpoint = job.Test.Trigger.GRPC.Address - } - - url, err := url.Parse(endpoint) - if err != nil { - return false - } - - // removes port - host := url.Host - colonPosition := strings.Index(url.Host, ":") - if colonPosition >= 0 { - host = host[0:colonPosition] - } - - return host == "localhost" || host == "127.0.0.1" -} - -func isServerRunningInsideContainer() bool { - // Check if running on Docker - // Reference: https://paulbradley.org/indocker/ - if _, err := os.Stat("/.dockerenv"); err == nil { - return true - } - - // Check if running on k8s - if os.Getenv("KUBERNETES_SERVICE_HOST") != "" { - return true - } - - return false -} diff --git a/server/executor/runner_test.go b/server/executor/runner_test.go deleted file mode 100644 index 8e1a911ed7..0000000000 --- a/server/executor/runner_test.go +++ /dev/null @@ -1,383 +0,0 @@ -package executor_test - -import ( - "context" - "math/rand" - "testing" - "time" - - "github.com/kubeshop/tracetest/server/config" - "github.com/kubeshop/tracetest/server/datastore" - "github.com/kubeshop/tracetest/server/executor" - "github.com/kubeshop/tracetest/server/executor/pollingprofile" - "github.com/kubeshop/tracetest/server/executor/testrunner" - triggerer "github.com/kubeshop/tracetest/server/executor/trigger" - "github.com/kubeshop/tracetest/server/pkg/id" - "github.com/kubeshop/tracetest/server/subscription" - "github.com/kubeshop/tracetest/server/test" - "github.com/kubeshop/tracetest/server/test/trigger" - "github.com/kubeshop/tracetest/server/testdb" - "github.com/kubeshop/tracetest/server/tracedb" - "github.com/kubeshop/tracetest/server/traces" - "github.com/kubeshop/tracetest/server/tracing" - "github.com/kubeshop/tracetest/server/variableset" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/trace" -) - -func TestPersistentRunner(t *testing.T) { - t.Run("TestIsTriggerd", func(t *testing.T) { - t.Parallel() - - testObj := test.Test{ - ID: id.ID("test1"), - Trigger: sampleTrigger, - } - - f := runnerSetup(t) - f.expectSuccessExec(testObj) - - f.run([]test.Test{testObj}, 10*time.Millisecond) - - result := f.runsMock.runs[testObj.ID] - require.NotNil(t, result) - assert.Greater(t, result.ServiceTriggerCompletedAt.UnixNano(), result.CreatedAt.UnixNano()) - - f.assert(t) - }) - - t.Run("TestsCanBeTriggerdConcurrently", func(t *testing.T) { - t.Parallel() - - test1 := test.Test{ID: id.ID("test1"), Trigger: sampleTrigger} - test2 := test.Test{ID: id.ID("test2"), Trigger: sampleTrigger} - - f := runnerSetup(t) - f.expectSuccessExecLong(test1) - f.expectSuccessExec(test2) - - // the worker pool lib reverses the order of this test for some reason - // but this doesn't matter, as long as both tests run, - // we only care about the completion time in this test - f.run([]test.Test{test2, test1}, 100*time.Millisecond) - - run1 := f.runsMock.runs[test1.ID] - run2 := f.runsMock.runs[test2.ID] - - assert.Greater(t, run1.ServiceTriggerCompletedAt.UnixNano(), run2.ServiceTriggerCompletedAt.UnixNano(), "test1 did not complete after test2") - f.assert(t) - }) - -} - -var ( - noError error = nil - - sampleResponse = triggerer.Response{ - SpanAttributes: map[string]string{ - "tracetest.run.trigger.http.response_code": "200", - }, - Result: trigger.TriggerResult{ - Type: trigger.TriggerTypeHTTP, - HTTP: &trigger.HTTPResponse{ - StatusCode: 200, - Body: "this is the body", - Headers: []trigger.HTTPHeader{ - {Key: "Content-Type", Value: "text/plain"}, - }, - }, - }, - } - - sampleTrigger = trigger.Trigger{ - Type: trigger.TriggerTypeHTTP, - } -) - -type runnerFixture struct { - runner *executor.TestPipeline - dsMock *datastoreGetterMock - ppMock *pollingprofileGetterMock - trMock *testrunnerGetterMock - testMock *testGetterMock - runsMock *runsRepoMock - triggererMock *mockTriggerer - processorMock *mockProcessor -} - -func (f runnerFixture) run(tests []test.Test, ttl time.Duration) { - // TODO - fix this test - f.runner.Start() - time.Sleep(10 * time.Millisecond) - for _, testObj := range tests { - newRun := f.runner.Run(context.TODO(), testObj, test.RunMetadata{}, variableset.VariableSet{}, nil) - // readd this when using not-in-memory queues - // f.runsMock. - // On("GetRun", testObj.ID, newRun.ID). - // Return(newRun, noError) - f.processorMock. - On("ProcessItem", testObj.ID, newRun.ID, datastore.DataStoreSingleID, pollingprofile.DefaultPollingProfile.ID). - Return(newRun, noError) - } - time.Sleep(ttl) - f.runner.Stop() -} - -func (f runnerFixture) assert(t *testing.T) { - f.dsMock.AssertExpectations(t) - f.ppMock.AssertExpectations(t) - f.trMock.AssertExpectations(t) - f.testMock.AssertExpectations(t) - f.runsMock.AssertExpectations(t) - f.triggererMock.AssertExpectations(t) -} - -func (f runnerFixture) expectSuccessExecLong(test test.Test) { - f.triggererMock.expectTriggerTestLong(test) - f.expectSuccessResultPersist(test) -} - -func (f runnerFixture) expectSuccessExec(test test.Test) { - f.testMock.On("GetAugmented", test.ID).Return(test, noError) - f.triggererMock.expectTriggerTest(test) - f.expectSuccessResultPersist(test) -} - -func (f runnerFixture) expectSuccessResultPersist(test test.Test) { - f.testMock.On("GetAugmented", test.ID).Return(test, noError) - expectCreateRun(f.runsMock, test) - f.runsMock.On("UpdateRun", mock.Anything).Return(noError) - f.runsMock.On("UpdateRun", mock.Anything).Return(noError) -} - -func runnerSetup(t *testing.T) runnerFixture { - - dsMock := new(datastoreGetterMock) - dsMock.Test(t) - - ppMock := new(pollingprofileGetterMock) - ppMock.Test(t) - - trMock := new(testrunnerGetterMock) - trMock.Test(t) - - testMock := new(testGetterMock) - testMock.Test(t) - - runsMock := new(runsRepoMock) - runsMock.Test(t) - - triggererMock := new(mockTriggerer) - runsMock.Test(t) - - processorMock := new(mockProcessor) - processorMock.Test(t) - - tracesMock := new(mockTraces) - tracesMock.Test(t) - - sm := subscription.NewManager() - tracer, _ := tracing.NewTracer(context.Background(), config.Must(config.New())) - eventEmitter := executor.NewEventEmitter(getTestRunEventRepositoryMock(t, false), sm) - - registry := triggerer.NewRegistry(tracer, tracer) - registry.Add(triggererMock) - - runner := executor.NewPersistentRunner( - registry, - executor.NewDBUpdater(runsMock), - tracer, - sm, - tracedb.Factory(tracesMock), - dsMock, - eventEmitter, - ) - - queueBuilder := executor.NewQueueBuilder(). - WithDataStoreGetter(dsMock). - WithPollingProfileGetter(ppMock). - WithTestGetter(testMock). - WithRunGetter(runsMock) - - pipeline := executor.NewPipeline(queueBuilder, - executor.PipelineStep{Processor: runner, Driver: executor.NewInMemoryQueueDriver("runner")}, - executor.PipelineStep{Processor: processorMock, Driver: executor.NewInMemoryQueueDriver("runner")}, - ) - - testPipeline := executor.NewTestPipeline( - pipeline, - nil, - pipeline.GetQueueForStep(1), // processorMock queue - runsMock, - trMock, - ppMock, - dsMock, - ) - - return runnerFixture{ - runner: testPipeline, - dsMock: dsMock, - ppMock: ppMock, - trMock: trMock, - testMock: testMock, - runsMock: runsMock, - triggererMock: triggererMock, - processorMock: processorMock, - } -} - -type mockTraces struct { - mock.Mock -} - -func (r *mockTraces) Get(ctx context.Context, id trace.TraceID) (traces.Trace, error) { - args := r.Called(id) - return args.Get(0).(traces.Trace), args.Error(1) -} - -func (r *mockTraces) SaveTrace(ctx context.Context, trace *traces.Trace) error { - args := r.Called(trace) - return args.Error(0) -} - -type mockProcessor struct { - mock.Mock -} - -func (m *mockProcessor) ProcessItem(_ context.Context, job executor.Job) { - m.Called(job.Test.ID, job.Run.ID, job.DataStore.ID, job.PollingProfile.ID) -} - -func (m *mockProcessor) SetOutputQueue(_ executor.Enqueuer) {} - -type datastoreGetterMock struct { - mock.Mock -} - -func (r *datastoreGetterMock) Get(ctx context.Context, id id.ID) (datastore.DataStore, error) { - return r.Current(ctx) -} - -func (r *datastoreGetterMock) Current(context.Context) (datastore.DataStore, error) { - return datastore.DataStore{ - ID: datastore.DataStoreSingleID, - Name: "test", - Type: datastore.DataStoreTypeOTLP, - Default: true, - }, nil -} - -type pollingprofileGetterMock struct { - mock.Mock -} - -func (r *pollingprofileGetterMock) Get(ctx context.Context, _ id.ID) (pollingprofile.PollingProfile, error) { - return r.GetDefault(ctx), nil -} - -func (r *pollingprofileGetterMock) GetDefault(context.Context) pollingprofile.PollingProfile { - return pollingprofile.DefaultPollingProfile -} - -type testGetterMock struct { - mock.Mock -} - -func (r *testGetterMock) GetAugmented(_ context.Context, id id.ID) (test.Test, error) { - args := r.Called(id) - return args.Get(0).(test.Test), args.Error(1) -} - -type runsRepoMock struct { - testdb.MockRepository - - runs map[id.ID]test.Run -} - -func (m *runsRepoMock) CreateRun(_ context.Context, testObj test.Test, run test.Run) (test.Run, error) { - args := m.Called(testObj.ID) - if m.runs == nil { - m.runs = map[id.ID]test.Run{} - } - - run.ID = rand.Intn(100) - m.runs[testObj.ID] = run - - return run, args.Error(0) -} - -func (m *runsRepoMock) UpdateRun(_ context.Context, run test.Run) error { - args := m.Called(run.ID) - for k, v := range m.runs { - if v.ID == run.ID { - m.runs[k] = run - } - } - - return args.Error(0) -} - -func (r *runsRepoMock) GetRun(_ context.Context, testID id.ID, runID int) (test.Run, error) { - if run, ok := r.runs[testID]; ok && run.ID == runID { - return run, nil - } - - args := r.Called(testID, runID) - return args.Get(0).(test.Run), args.Error(1) -} - -func (r *runsRepoMock) GetRunByTraceID(_ context.Context, id trace.TraceID) (test.Run, error) { - args := r.Called(id) - return args.Get(0).(test.Run), args.Error(1) -} - -type testrunnerGetterMock struct { - mock.Mock -} - -func (r *testrunnerGetterMock) GetDefault(context.Context) testrunner.TestRunner { - return testrunner.DefaultTestRunner -} - -type mockTriggerer struct { - mock.Mock -} - -func (m *mockTriggerer) Type() trigger.TriggerType { - return trigger.TriggerTypeHTTP -} - -func (m *mockTriggerer) Trigger(_ context.Context, test test.Test, opts *triggerer.TriggerOptions) (triggerer.Response, error) { - args := m.Called(test.ID) - return args.Get(0).(triggerer.Response), args.Error(1) -} - -func (m *mockTriggerer) Resolve(_ context.Context, testObj test.Test, opts *triggerer.TriggerOptions) (test.Test, error) { - args := m.Called(testObj.ID) - return args.Get(0).(test.Test), args.Error(1) -} - -func (m *mockTriggerer) expectTriggerTest(test test.Test) *mock.Call { - return m. - On("Resolve", test.ID). - Return(test, noError). - On("Trigger", test.ID). - Return(sampleResponse, noError) -} - -func (m *mockTriggerer) expectTriggerTestLong(test test.Test) *mock.Call { - return m. - On("Trigger", test.ID). - After(50*time.Millisecond). - Return(sampleResponse, noError). - On("Resolve", test.ID). - Return(test, noError) -} - -func expectCreateRun(m *runsRepoMock, test test.Test) *mock.Call { - return m. - On("CreateRun", test.ID). - Return(noError) -} diff --git a/server/executor/trigger/grpc.go b/server/executor/trigger/grpc.go index f7dcb87687..41a32d312b 100644 --- a/server/executor/trigger/grpc.go +++ b/server/executor/trigger/grpc.go @@ -105,7 +105,7 @@ func (t *grpcTriggerer) Type() trigger.TriggerType { return trigger.TriggerTypeGRPC } -func (t *grpcTriggerer) Resolve(ctx context.Context, test test.Test, opts *TriggerOptions) (test.Test, error) { +func (t *grpcTriggerer) Resolve(ctx context.Context, test test.Test, opts *ResolveOptions) (test.Test, error) { grpc := test.Trigger.GRPC if grpc == nil { diff --git a/server/executor/trigger/http.go b/server/executor/trigger/http.go index 428355cb97..fb88056d4f 100644 --- a/server/executor/trigger/http.go +++ b/server/executor/trigger/http.go @@ -120,7 +120,7 @@ func (t *httpTriggerer) Type() trigger.TriggerType { return trigger.TriggerTypeHTTP } -func (t *httpTriggerer) Resolve(ctx context.Context, test test.Test, opts *TriggerOptions) (test.Test, error) { +func (t *httpTriggerer) Resolve(ctx context.Context, test test.Test, opts *ResolveOptions) (test.Test, error) { http := test.Trigger.HTTP if http == nil { diff --git a/server/executor/trigger/instrument.go b/server/executor/trigger/instrument.go index f7b4247266..2f324a9311 100644 --- a/server/executor/trigger/instrument.go +++ b/server/executor/trigger/instrument.go @@ -34,7 +34,7 @@ func (t *instrumentedTriggerer) Type() trigger.TriggerType { return trigger.TriggerType("instrumented") } -func (t *instrumentedTriggerer) Resolve(ctx context.Context, test test.Test, opts *TriggerOptions) (test.Test, error) { +func (t *instrumentedTriggerer) Resolve(ctx context.Context, test test.Test, opts *ResolveOptions) (test.Test, error) { return t.triggerer.Resolve(ctx, test, opts) } diff --git a/server/executor/trigger/kafka.go b/server/executor/trigger/kafka.go index 0d27ca797d..e54e0bc443 100644 --- a/server/executor/trigger/kafka.go +++ b/server/executor/trigger/kafka.go @@ -58,7 +58,7 @@ func (t *KafkaTriggerer) Type() trigger.TriggerType { return trigger.TriggerTypeKafka } -func (t *KafkaTriggerer) Resolve(ctx context.Context, test test.Test, opts *TriggerOptions) (test.Test, error) { +func (t *KafkaTriggerer) Resolve(ctx context.Context, test test.Test, opts *ResolveOptions) (test.Test, error) { kafkaConfig := test.Trigger.Kafka if kafkaConfig == nil { return test, fmt.Errorf("no settings provided for kafka triggerer") diff --git a/server/executor/trigger/traceid.go b/server/executor/trigger/traceid.go index 887a08a720..b3d2ed81dd 100644 --- a/server/executor/trigger/traceid.go +++ b/server/executor/trigger/traceid.go @@ -29,7 +29,7 @@ func (t *traceidTriggerer) Type() trigger.TriggerType { return trigger.TriggerTypeTraceID } -func (t *traceidTriggerer) Resolve(ctx context.Context, test test.Test, opts *TriggerOptions) (test.Test, error) { +func (t *traceidTriggerer) Resolve(ctx context.Context, test test.Test, opts *ResolveOptions) (test.Test, error) { traceid := test.Trigger.TraceID if traceid == nil { return test, fmt.Errorf("no settings provided for TRACEID triggerer") diff --git a/server/executor/trigger/triggerer.go b/server/executor/trigger/triggerer.go index 1cbc34b843..3451d9d58b 100644 --- a/server/executor/trigger/triggerer.go +++ b/server/executor/trigger/triggerer.go @@ -13,14 +13,17 @@ import ( ) type TriggerOptions struct { - TraceID trace.TraceID + TraceID trace.TraceID +} + +type ResolveOptions struct { Executor expression.Executor } type Triggerer interface { Trigger(context.Context, test.Test, *TriggerOptions) (Response, error) Type() trigger.TriggerType - Resolve(context.Context, test.Test, *TriggerOptions) (test.Test, error) + Resolve(context.Context, test.Test, *ResolveOptions) (test.Test, error) } type Response struct { diff --git a/server/executor/trigger_executer_worker.go b/server/executor/trigger_executer_worker.go new file mode 100644 index 0000000000..c425d3f494 --- /dev/null +++ b/server/executor/trigger_executer_worker.go @@ -0,0 +1,91 @@ +package executor + +import ( + "context" + "fmt" + + "github.com/kubeshop/tracetest/server/analytics" + triggerer "github.com/kubeshop/tracetest/server/executor/trigger" + "github.com/kubeshop/tracetest/server/model/events" + "github.com/kubeshop/tracetest/server/test" + "go.opentelemetry.io/otel/trace" +) + +func NewTriggerExecuterWorker( + triggers *triggerer.Registry, + updater RunUpdater, + tracer trace.Tracer, + eventEmitter EventEmitter, +) *triggerExecuterWorker { + return &triggerExecuterWorker{ + triggers: triggers, + updater: updater, + tracer: tracer, + eventEmitter: eventEmitter, + } +} + +type triggerExecuterWorker struct { + triggers *triggerer.Registry + updater RunUpdater + tracer trace.Tracer + eventEmitter EventEmitter + outputQueue Enqueuer +} + +func (r *triggerExecuterWorker) SetOutputQueue(queue Enqueuer) { + r.outputQueue = queue +} + +func (r triggerExecuterWorker) handleDBError(run test.Run, err error) { + if err != nil { + fmt.Printf("test %s run #%d trigger DB error: %s\n", run.TestID, run.ID, err.Error()) + } +} + +func (r triggerExecuterWorker) handleError(run test.Run, err error) { + if err != nil { + fmt.Printf("test %s run #%d trigger DB error: %s\n", run.TestID, run.ID, err.Error()) + } +} + +func (r triggerExecuterWorker) ProcessItem(ctx context.Context, job Job) { + err := r.eventEmitter.Emit(ctx, events.TriggerExecutionStart(job.Run.TestID, job.Run.ID)) + if err != nil { + r.handleError(job.Run, err) + } + + triggererObj, err := r.triggers.Get(job.Test.Trigger.Type) + if err != nil { + r.handleError(job.Run, err) + } + + job.Test.Trigger = job.Run.ResolvedTrigger + run := job.Run + + response, err := triggererObj.Trigger(ctx, job.Test, &triggerer.TriggerOptions{ + TraceID: run.TraceID, + }) + run = r.handleExecutionResult(run, response, err) + run.SpanID = response.SpanID + + r.handleDBError(run, r.updater.Update(ctx, run)) + + job.Run = run + r.outputQueue.Enqueue(ctx, job) +} + +func (r triggerExecuterWorker) handleExecutionResult(run test.Run, response triggerer.Response, err error) test.Run { + run = run.TriggerCompleted(response.Result) + if err != nil { + run = run.TriggerFailed(err) + + analytics.SendEvent("test_run_finished", "error", "", &map[string]string{ + "finalState": string(run.State), + }) + + return run + } + + return run.SuccessfullyTriggered() +} diff --git a/server/executor/trigger_resolver_worker.go b/server/executor/trigger_resolver_worker.go new file mode 100644 index 0000000000..e74629ee57 --- /dev/null +++ b/server/executor/trigger_resolver_worker.go @@ -0,0 +1,147 @@ +package executor + +import ( + "context" + "fmt" + + "github.com/kubeshop/tracetest/server/datastore" + triggerer "github.com/kubeshop/tracetest/server/executor/trigger" + "github.com/kubeshop/tracetest/server/expression" + "github.com/kubeshop/tracetest/server/model/events" + "github.com/kubeshop/tracetest/server/test" + "github.com/kubeshop/tracetest/server/test/trigger" + "github.com/kubeshop/tracetest/server/tracedb" + "go.opentelemetry.io/otel/trace" +) + +type currentDataStoreGetter interface { + Current(context.Context) (datastore.DataStore, error) +} + +func NewTriggerResolverWorker( + triggers *triggerer.Registry, + updater RunUpdater, + tracer trace.Tracer, + newTraceDBFn tracedb.FactoryFunc, + dsRepo currentDataStoreGetter, + eventEmitter EventEmitter, +) *triggerResolverWorker { + return &triggerResolverWorker{ + triggers: triggers, + updater: updater, + tracer: tracer, + newTraceDBFn: newTraceDBFn, + dsRepo: dsRepo, + eventEmitter: eventEmitter, + } +} + +type triggerResolverWorker struct { + triggers *triggerer.Registry + updater RunUpdater + tracer trace.Tracer + newTraceDBFn tracedb.FactoryFunc + dsRepo currentDataStoreGetter + eventEmitter EventEmitter + outputQueue Enqueuer +} + +func (r *triggerResolverWorker) SetOutputQueue(queue Enqueuer) { + r.outputQueue = queue +} + +func (r triggerResolverWorker) handleDBError(run test.Run, err error) { + if err != nil { + fmt.Printf("test %s run #%d trigger DB error: %s\n", run.TestID, run.ID, err.Error()) + } +} + +func (r triggerResolverWorker) handleError(run test.Run, err error) { + if err != nil { + fmt.Printf("test %s run #%d trigger DB error: %s\n", run.TestID, run.ID, err.Error()) + } +} + +func (r triggerResolverWorker) traceDB(ctx context.Context) (tracedb.TraceDB, error) { + ds, err := r.dsRepo.Current(ctx) + if err != nil { + return nil, fmt.Errorf("cannot get default datastore: %w", err) + } + + tdb, err := r.newTraceDBFn(ds) + if err != nil { + return nil, fmt.Errorf(`cannot get tracedb from DataStore config with ID "%s": %w`, ds.ID, err) + } + + return tdb, nil +} + +func (r triggerResolverWorker) ProcessItem(ctx context.Context, job Job) { + ctx, pollingSpan := r.tracer.Start(ctx, "Resolve trigger") + defer pollingSpan.End() + + run := job.Run.Start() + r.handleDBError(run, r.updater.Update(ctx, run)) + + err := r.eventEmitter.Emit(ctx, events.TriggerCreatedInfo(job.Run.TestID, job.Run.ID)) + if err != nil { + r.handleError(job.Run, err) + } + + triggererObj, err := r.triggers.Get(job.Test.Trigger.Type) + if err != nil { + r.handleError(job.Run, err) + } + + tdb, err := r.traceDB(ctx) + if err != nil { + r.handleError(job.Run, err) + } + + traceID := tdb.GetTraceID() + run.TraceID = traceID + r.handleDBError(run, r.updater.Update(ctx, run)) + + ds := []expression.DataStore{expression.VariableDataStore{ + Values: run.VariableSet.Values, + }} + + executor := expression.NewExecutor(ds...) + + triggerOptions := &triggerer.ResolveOptions{ + Executor: executor, + } + + err = r.eventEmitter.Emit(ctx, events.TriggerResolveStart(job.Run.TestID, job.Run.ID)) + if err != nil { + r.handleError(job.Run, err) + } + + resolvedTest, err := triggererObj.Resolve(ctx, job.Test, triggerOptions) + if err != nil { + emitErr := r.eventEmitter.Emit(ctx, events.TriggerResolveError(job.Run.TestID, job.Run.ID, err)) + if emitErr != nil { + r.handleError(job.Run, emitErr) + } + + r.handleError(job.Run, err) + } + + err = r.eventEmitter.Emit(ctx, events.TriggerResolveSuccess(job.Run.TestID, job.Run.ID)) + if err != nil { + r.handleError(job.Run, err) + } + + if job.Test.Trigger.Type == trigger.TriggerTypeTraceID { + traceIDFromParam, err := trace.TraceIDFromHex(job.Test.Trigger.TraceID.ID) + if err == nil { + run.TraceID = traceIDFromParam + } + } + + run.ResolvedTrigger = resolvedTest.Trigger + r.handleDBError(run, r.updater.Update(ctx, run)) + job.Run = run + + r.outputQueue.Enqueue(ctx, job) +} diff --git a/server/executor/trigger_result_processor_worker.go b/server/executor/trigger_result_processor_worker.go new file mode 100644 index 0000000000..b79c73b9c8 --- /dev/null +++ b/server/executor/trigger_result_processor_worker.go @@ -0,0 +1,137 @@ +package executor + +import ( + "context" + "fmt" + "net/url" + "strings" + + "github.com/kubeshop/tracetest/server/model" + "github.com/kubeshop/tracetest/server/model/events" + "github.com/kubeshop/tracetest/server/subscription" + "github.com/kubeshop/tracetest/server/test" + "github.com/kubeshop/tracetest/server/test/trigger" + "go.opentelemetry.io/otel/trace" +) + +type RunResult struct { + Run test.Run + Err error +} + +func NewTriggerResultProcessorWorker( + tracer trace.Tracer, + subscriptionManager *subscription.Manager, + eventEmitter EventEmitter, +) *triggerResultProcessorWorker { + return &triggerResultProcessorWorker{ + tracer: tracer, + subscriptionManager: subscriptionManager, + eventEmitter: eventEmitter, + } +} + +type triggerResultProcessorWorker struct { + tracer trace.Tracer + subscriptionManager *subscription.Manager + eventEmitter EventEmitter + outputQueue Enqueuer +} + +func (r *triggerResultProcessorWorker) SetOutputQueue(queue Enqueuer) { + r.outputQueue = queue +} + +func (r triggerResultProcessorWorker) handleDBError(run test.Run, err error) { + if err != nil { + fmt.Printf("test %s run #%d trigger DB error: %s\n", run.TestID, run.ID, err.Error()) + } +} + +func (r triggerResultProcessorWorker) handleError(run test.Run, err error) { + if err != nil { + fmt.Printf("test %s run #%d trigger DB error: %s\n", run.TestID, run.ID, err.Error()) + } +} + +func (r triggerResultProcessorWorker) ProcessItem(ctx context.Context, job Job) { + ctx, pollingSpan := r.tracer.Start(ctx, "Start processing trigger response") + defer pollingSpan.End() + + triggerResult := job.Run.TriggerResult + if triggerResult.Error != nil { + err := triggerResult.Error.Error() + if triggerResult.Error.ConnectionError { + r.emitUnreachableEndpointEvent(ctx, job, err) + + if isTargetLocalhost(job) && triggerResult.Error.RunningOnContainer { + r.emitMismatchEndpointEvent(ctx, job, err) + } + } + + emitErr := r.eventEmitter.Emit(ctx, events.TriggerExecutionError(job.Run.TestID, job.Run.ID, err)) + if emitErr != nil { + r.handleError(job.Run, emitErr) + } + + fmt.Printf("test %s run #%d trigger error: %s\n", job.Run.TestID, job.Run.ID, err.Error()) + r.subscriptionManager.PublishUpdate(subscription.Message{ + ResourceID: job.Run.TransactionStepResourceID(), + Type: "run_update", + Content: RunResult{Run: job.Run, Err: err}, + }) + } else { + err := r.eventEmitter.Emit(ctx, events.TriggerExecutionSuccess(job.Run.TestID, job.Run.ID)) + if err != nil { + r.handleDBError(job.Run, err) + } + } + + r.outputQueue.Enqueue(ctx, job) +} + +func (r triggerResultProcessorWorker) emitUnreachableEndpointEvent(ctx context.Context, job Job, err error) { + var event model.TestRunEvent + switch job.Test.Trigger.Type { + case trigger.TriggerTypeHTTP: + event = events.TriggerHTTPUnreachableHostError(job.Run.TestID, job.Run.ID, err) + case trigger.TriggerTypeGRPC: + event = events.TriggergRPCUnreachableHostError(job.Run.TestID, job.Run.ID, err) + } + + emitErr := r.eventEmitter.Emit(ctx, event) + if emitErr != nil { + r.handleError(job.Run, emitErr) + } +} + +func (r triggerResultProcessorWorker) emitMismatchEndpointEvent(ctx context.Context, job Job, err error) { + emitErr := r.eventEmitter.Emit(ctx, events.TriggerDockerComposeHostMismatchError(job.Run.TestID, job.Run.ID)) + if emitErr != nil { + r.handleError(job.Run, emitErr) + } +} + +func isTargetLocalhost(job Job) bool { + var endpoint string + switch job.Test.Trigger.Type { + case trigger.TriggerTypeHTTP: + endpoint = job.Test.Trigger.HTTP.URL + case trigger.TriggerTypeGRPC: + endpoint = job.Test.Trigger.GRPC.Address + } + + url, err := url.Parse(endpoint) + if err != nil { + return false + } + + // removes port + host := url.Host + colonPosition := strings.Index(url.Host, ":") + if colonPosition >= 0 { + host = host[0:colonPosition] + } + + return host == "localhost" || host == "127.0.0.1" +} diff --git a/server/test/trigger/trigger.go b/server/test/trigger/trigger.go index bd83eff020..0809ae629a 100644 --- a/server/test/trigger/trigger.go +++ b/server/test/trigger/trigger.go @@ -1,5 +1,7 @@ package trigger +import "errors" + type ( TriggerType string @@ -17,5 +19,16 @@ type ( GRPC *GRPCResponse `json:"grpc,omitempty"` TraceID *TraceIDResponse `json:"traceid,omitempty"` Kafka *KafkaResponse `json:"kafka,omitempty"` + Error *TriggerError `json:"error,omitempty"` + } + + TriggerError struct { + ConnectionError bool `json:"connectionError"` + RunningOnContainer bool `json:"runningOnContainer"` + ErrorMessage string `json:"message"` } ) + +func (e TriggerError) Error() error { + return errors.New(e.ErrorMessage) +}