diff --git a/server/app/facade.go b/server/app/facade.go index 5c0e098e93..6fc84b9d92 100644 --- a/server/app/facade.go +++ b/server/app/facade.go @@ -78,6 +78,7 @@ func newRunnerFacades( subscriptionManager, tracedb.Factory(testDB), testDB, + eventEmitter, ) transactionRunner := executor.NewTransactionRunner( diff --git a/server/executor/runner.go b/server/executor/runner.go index 58ca3d996e..4eb19954a5 100644 --- a/server/executor/runner.go +++ b/server/executor/runner.go @@ -7,6 +7,7 @@ import ( "github.com/kubeshop/tracetest/server/executor/trigger" "github.com/kubeshop/tracetest/server/expression" "github.com/kubeshop/tracetest/server/model" + "github.com/kubeshop/tracetest/server/model/events" "github.com/kubeshop/tracetest/server/subscription" "github.com/kubeshop/tracetest/server/tracedb" "go.opentelemetry.io/otel" @@ -37,6 +38,7 @@ func NewPersistentRunner( subscriptionManager *subscription.Manager, newTraceDBFn traceDBFactoryFn, dsRepo model.DataStoreRepository, + eventEmitter EventEmitter, ) PersistentRunner { return persistentRunner{ triggers: triggers, @@ -47,6 +49,7 @@ func NewPersistentRunner( newTraceDBFn: newTraceDBFn, dsRepo: dsRepo, subscriptionManager: subscriptionManager, + eventEmitter: eventEmitter, executeQueue: make(chan execReq, 5), exit: make(chan bool, 1), } @@ -61,6 +64,7 @@ type persistentRunner struct { subscriptionManager *subscription.Manager newTraceDBFn traceDBFactoryFn dsRepo model.DataStoreRepository + eventEmitter EventEmitter executeQueue chan execReq exit chan bool @@ -81,6 +85,12 @@ func (r persistentRunner) handleDBError(run model.Run, err error) { } } +func (r persistentRunner) handleError(run model.Run, err error) { + if err != nil { + fmt.Printf("test %s run #%d trigger DB error: %s\n", run.TestID, run.ID, err.Error()) + } +} + func (r persistentRunner) Start(workers int) { for i := 0; i < workers; i++ { go func() { @@ -160,15 +170,19 @@ func (r persistentRunner) processExecQueue(job execReq) { run := job.run.Start() r.handleDBError(run, r.updater.Update(job.ctx, run)) + err := r.eventEmitter.Emit(job.ctx, events.TriggerCreatedInfo(job.run.TestID, job.run.ID)) + if err != nil { + r.handleError(job.run, err) + } + triggerer, err := r.triggers.Get(job.test.ServiceUnderTest.Type) if err != nil { - // TODO: actually handle the error - panic(err) + r.handleError(job.run, err) } tdb, err := r.traceDB(job.ctx) if err != nil { - panic(err) + r.handleError(job.run, err) } traceID := tdb.GetTraceID() @@ -180,9 +194,24 @@ func (r persistentRunner) processExecQueue(job execReq) { Executor: job.executor, } + err = r.eventEmitter.Emit(job.ctx, events.TriggerResolveStart(job.run.TestID, job.run.ID)) + if err != nil { + r.handleError(job.run, err) + } + resolvedTest, err := triggerer.Resolve(job.ctx, job.test, triggerOptions) if err != nil { - panic(err) + emitErr := r.eventEmitter.Emit(job.ctx, events.TriggerResolveError(job.run.TestID, job.run.ID, err)) + if emitErr != nil { + r.handleError(job.run, emitErr) + } + + r.handleError(job.run, err) + } + + err = r.eventEmitter.Emit(job.ctx, events.TriggerResolveSuccess(job.run.TestID, job.run.ID)) + if err != nil { + r.handleError(job.run, err) } if job.test.ServiceUnderTest.Type == model.TriggerTypeTRACEID { @@ -192,15 +221,30 @@ func (r persistentRunner) processExecQueue(job execReq) { } } + err = r.eventEmitter.Emit(job.ctx, events.TriggerExecutionStart(job.run.TestID, job.run.ID)) + if err != nil { + r.handleError(job.run, err) + } + response, err := triggerer.Trigger(job.ctx, resolvedTest, triggerOptions) run = r.handleExecutionResult(run, response, err) if err != nil { + emitErr := r.eventEmitter.Emit(job.ctx, events.TriggerExecutionError(job.run.TestID, job.run.ID, err)) + if emitErr != nil { + r.handleError(job.run, emitErr) + } + fmt.Printf("test %s run #%d trigger error: %s\n", run.TestID, run.ID, err.Error()) r.subscriptionManager.PublishUpdate(subscription.Message{ ResourceID: run.TransactionStepResourceID(), Type: "run_update", Content: RunResult{Run: run, Err: err}, }) + } else { + err = r.eventEmitter.Emit(job.ctx, events.TriggerExecutionSuccess(job.run.TestID, job.run.ID)) + if err != nil { + r.handleDBError(job.run, err) + } } run.SpanID = response.SpanID diff --git a/server/executor/runner_test.go b/server/executor/runner_test.go index 4996902bab..efde5853ce 100644 --- a/server/executor/runner_test.go +++ b/server/executor/runner_test.go @@ -48,7 +48,6 @@ func TestPersistentRunner(t *testing.T) { test2 := model.Test{ID: id.ID("test2"), ServiceUnderTest: sampleTrigger} f := runnerSetup(t) - f.expectSuccessExecLong(test1) f.expectSuccessExec(test2) @@ -145,10 +144,13 @@ func runnerSetup(t *testing.T) runnerFixture { testDB := testdb.MockRepository{} testDB.Mock.On("DefaultDataStore", mock.Anything).Return(model.DataStore{Type: model.DataStoreTypeOTLP}, nil) + testDB.Mock.On("CreateTestRunEvent", mock.Anything).Return(noError) + + eventEmitter := executor.NewEventEmitter(&testDB, subscription.NewManager()) mtp.Test(t) return runnerFixture{ - runner: executor.NewPersistentRunner(reg, db, executor.NewDBUpdater(db), mtp, tracer, subscription.NewManager(), tracedb.Factory(&testDB), &testDB), + runner: executor.NewPersistentRunner(reg, db, executor.NewDBUpdater(db), mtp, tracer, subscription.NewManager(), tracedb.Factory(&testDB), &testDB, eventEmitter), mockExecutor: me, mockDB: db, mockTracePoller: mtp, diff --git a/server/model/events/events.go b/server/model/events/events.go index 1f10ead1aa..17a3f4bc39 100644 --- a/server/model/events/events.go +++ b/server/model/events/events.go @@ -23,14 +23,14 @@ func TriggerCreatedInfo(testID id.ID, runID int) model.TestRunEvent { } } -func TriggerResolveError(testID id.ID, runID int) model.TestRunEvent { +func TriggerResolveError(testID id.ID, runID int, err error) model.TestRunEvent { return model.TestRunEvent{ TestID: testID, RunID: runID, Stage: model.StageTrigger, Type: "RESOLVE_ERROR", Title: "Resolving trigger details failed", - Description: "Resolving trigger details failed", + Description: fmt.Sprintf("Resolving trigger details failed: %s", err.Error()), CreatedAt: time.Now(), DataStoreConnection: model.ConnectionResult{}, Polling: model.PollingInfo{}, @@ -98,6 +98,21 @@ func TriggerExecutionSuccess(testID id.ID, runID int) model.TestRunEvent { } } +func TriggerExecutionError(testID id.ID, runID int, err error) model.TestRunEvent { + return model.TestRunEvent{ + TestID: testID, + RunID: runID, + Stage: model.StageTrigger, + Type: "EXECUTION_ERROR", + Title: "Failed to trigger execution", + Description: fmt.Sprintf("Failed to trigger execution: %s", err.Error()), + CreatedAt: time.Now(), + DataStoreConnection: model.ConnectionResult{}, + Polling: model.PollingInfo{}, + Outputs: []model.OutputInfo{}, + } +} + func TriggerHTTPUnreachableHostError(testID id.ID, runID int) model.TestRunEvent { return model.TestRunEvent{ TestID: testID,