From 61fdf005d6794a801f265d525832c25a9daf0542 Mon Sep 17 00:00:00 2001 From: Daniel Baptista Dias Date: Wed, 5 Apr 2023 14:08:07 -0300 Subject: [PATCH] Adding PollingEvents (#2291) * Adding PollingEvents draft * Change mocking mechanisms for trace poller * Updating code with PR suggestions and integration env fixes --- TEST_RUN_EVENTS.csv | 73 ++++++++-------- api/testEvents.yaml | 2 - cli/openapi/model_polling_info.go | 42 +-------- server/app/facade.go | 2 + server/executor/poller_executor.go | 59 +++++++++++-- server/executor/poller_executor_test.go | 31 ++++--- server/executor/trace_poller.go | 82 +++++++++++++---- server/http/mappings/test_run_events.go | 7 +- server/model/events/events.go | 87 +++++++++---------- server/model/test_run_event.go | 7 +- server/openapi/model_polling_info.go | 2 - .../components/RunEvents/RunEventPolling.tsx | 7 -- web/src/models/TestRunEvent.model.ts | 2 - web/src/types/Generated.types.ts | 1 - 14 files changed, 221 insertions(+), 183 deletions(-) diff --git a/TEST_RUN_EVENTS.csv b/TEST_RUN_EVENTS.csv index 4cfd57e037..429709e200 100644 --- a/TEST_RUN_EVENTS.csv +++ b/TEST_RUN_EVENTS.csv @@ -1,37 +1,36 @@ -Test Run Events,,, -,,, -EVENT_SUFFIX,Description,, -_INFO,Adding extra context for an specific task,, -_SUCCESS,The task has finalized with no errors,, -_ERROR,An execution problem was found,, -_START,A new task section has begun,, -,,, -,,, -,,, -,,, -,,, -Stage ,Event Type,Description,Definition -Trigger,CREATED_INFO,Trigger Run has been created, -Trigger,RESOLVE_ERROR,Resolving trigger details failed, -Trigger,RESOLVE_SUCCESS,Successful resolving of trigger details, -Trigger,RESOLVE_START,Resolving trigger details based on environment variables, -Trigger,EXECUTION_START,Initial trigger execution, -Trigger,EXECUTION_SUCCESS,Successful trigger execution, -Trigger,HTTP_UNREACHABLE_HOST_ERROR,Tracetest could not reach the defined host in the trigger, -Trigger,DOCKER_COMPOSE_HOST_MISMATCH_ERROR,"We identified Tracetest is running inside a docker compose container, so if you are trying to access your local host machine please use the host.docker.internal hostname. For more information, see https://docs.docker.com/docker-for-mac/networking/#use-cases-and-workarounds", -Trigger,GRPC_UNREACHABLE_HOST_ERROR,Tracetest could not reach the defined host in the trigger, -Trace,FETCHING_START,Starting the trace fetching process, -Trace,QUEUED_INFO,Trace Run has been queued to start the fetching process, -Trace,DATA_STORE_CONNECTION_INFO,A Data store connection request has been executed,test connection result information -Trace,POLLING_START,Starting the trace polling process, -Trace,POLLING_ITERATION_INFO,A polling iteration has been executed,# of spans - iteration # - reason of next iteration -Trace,POLLING_SUCCESS,The polling strategy has succeeded in fetching the trace from the Data Store, -Trace,POLLING_ERROR,The polling strategy has failed to fetch the trace, -Trace,FETCHING_SUCCESS,The trace was successfully processed by the backend, -Trace,FETCHING_ERROR,The trace was not able to be fetched, -Trace,STOPPED_INFO,The test run was stopped during its execution, -Test,OUTPUT_GENERATION_WARNING,The value for output could not be generated, -Test,TEST_SPECS_RUN_SUCCESS,Test Specs were successfully executed, -Test,TEST_SPECS_RUN_ERROR,Test specs execution error, -Test,TEST_SPECS_RUN_START,Test specs execution start, -Test,TEST_SPECS_ASSERTION_ERROR,An assertion in the test spec failed +Test Run Events,,, +,,, +EVENT_SUFFIX,Description,, +_INFO,Adding extra context for an specific task,, +_SUCCESS,The task has finalized with no errors,, +_ERROR,An execution problem was found,, +_START,A new task section has begun,, +,,, +,,, +,,, +,,, +,,, +Stage ,Event Type,Description,Definition +Trigger,CREATED_INFO,Trigger Run has been created, +Trigger,RESOLVE_ERROR,Resolving trigger details failed, +Trigger,RESOLVE_SUCCESS,Successful resolving of trigger details, +Trigger,RESOLVE_START,Resolving trigger details based on environment variables, +Trigger,EXECUTION_START,Initial trigger execution, +Trigger,EXECUTION_SUCCESS,Successful trigger execution, +Trigger,HTTP_UNREACHABLE_HOST_ERROR,Tracetest could not reach the defined host in the trigger, +Trigger,DOCKER_COMPOSE_HOST_MISMATCH_ERROR,"We identified Tracetest is running inside a docker compose container, so if you are trying to access your local host machine please use the host.docker.internal hostname. For more information, see https://docs.docker.com/docker-for-mac/networking/#use-cases-and-workarounds", +Trigger,GRPC_UNREACHABLE_HOST_ERROR,Tracetest could not reach the defined host in the trigger, +Trace,DATA_STORE_CONNECTION_INFO,A Data store connection request has been executed,test connection result information +Trace,POLLING_START,Starting the trace polling process, +Trace,POLLING_ITERATION_INFO,A polling iteration has been executed,# of spans - iteration # - reason of next iteration +Trace,POLLING_SUCCESS,The polling strategy has succeeded in fetching the trace from the Data Store, +Trace,POLLING_ERROR,The polling strategy has failed to fetch the trace, +Trace,FETCHING_START,Starting the trace fetching process, +Trace,FETCHING_SUCCESS,The trace was successfully processed by the backend, +Trace,FETCHING_ERROR,The trace was not able to be fetched, +Trace,STOPPED_INFO,The test run was stopped during its execution, +Test,OUTPUT_GENERATION_WARNING,The value for output could not be generated, +Test,TEST_SPECS_RUN_SUCCESS,Test Specs were successfully executed, +Test,TEST_SPECS_RUN_ERROR,Test specs execution error, +Test,TEST_SPECS_RUN_START,Test specs execution start, +Test,TEST_SPECS_ASSERTION_ERROR,An assertion in the test spec failed diff --git a/api/testEvents.yaml b/api/testEvents.yaml index 55fc62ddd9..83a4634deb 100644 --- a/api/testEvents.yaml +++ b/api/testEvents.yaml @@ -39,8 +39,6 @@ components: type: string enum: - periodic - reasonNextIteration: - type: string isComplete: type: boolean periodic: diff --git a/cli/openapi/model_polling_info.go b/cli/openapi/model_polling_info.go index c3cc46ed2e..e081dd3301 100644 --- a/cli/openapi/model_polling_info.go +++ b/cli/openapi/model_polling_info.go @@ -19,10 +19,9 @@ var _ MappedNullable = &PollingInfo{} // PollingInfo struct for PollingInfo type PollingInfo struct { - Type *string `json:"type,omitempty"` - ReasonNextIteration *string `json:"reasonNextIteration,omitempty"` - IsComplete *bool `json:"isComplete,omitempty"` - Periodic *PollingInfoPeriodic `json:"periodic,omitempty"` + Type *string `json:"type,omitempty"` + IsComplete *bool `json:"isComplete,omitempty"` + Periodic *PollingInfoPeriodic `json:"periodic,omitempty"` } // NewPollingInfo instantiates a new PollingInfo object @@ -74,38 +73,6 @@ func (o *PollingInfo) SetType(v string) { o.Type = &v } -// GetReasonNextIteration returns the ReasonNextIteration field value if set, zero value otherwise. -func (o *PollingInfo) GetReasonNextIteration() string { - if o == nil || isNil(o.ReasonNextIteration) { - var ret string - return ret - } - return *o.ReasonNextIteration -} - -// GetReasonNextIterationOk returns a tuple with the ReasonNextIteration field value if set, nil otherwise -// and a boolean to check if the value has been set. -func (o *PollingInfo) GetReasonNextIterationOk() (*string, bool) { - if o == nil || isNil(o.ReasonNextIteration) { - return nil, false - } - return o.ReasonNextIteration, true -} - -// HasReasonNextIteration returns a boolean if a field has been set. -func (o *PollingInfo) HasReasonNextIteration() bool { - if o != nil && !isNil(o.ReasonNextIteration) { - return true - } - - return false -} - -// SetReasonNextIteration gets a reference to the given string and assigns it to the ReasonNextIteration field. -func (o *PollingInfo) SetReasonNextIteration(v string) { - o.ReasonNextIteration = &v -} - // GetIsComplete returns the IsComplete field value if set, zero value otherwise. func (o *PollingInfo) GetIsComplete() bool { if o == nil || isNil(o.IsComplete) { @@ -183,9 +150,6 @@ func (o PollingInfo) ToMap() (map[string]interface{}, error) { if !isNil(o.Type) { toSerialize["type"] = o.Type } - if !isNil(o.ReasonNextIteration) { - toSerialize["reasonNextIteration"] = o.ReasonNextIteration - } if !isNil(o.IsComplete) { toSerialize["isComplete"] = o.IsComplete } diff --git a/server/app/facade.go b/server/app/facade.go index 6fc84b9d92..a706fd20d4 100644 --- a/server/app/facade.go +++ b/server/app/facade.go @@ -59,6 +59,7 @@ func newRunnerFacades( execTestUpdater, tracedb.Factory(testDB), testDB, + eventEmitter, ) tracePoller := executor.NewTracePoller( @@ -67,6 +68,7 @@ func newRunnerFacades( execTestUpdater, assertionRunner, subscriptionManager, + eventEmitter, ) runner := executor.NewPersistentRunner( diff --git a/server/executor/poller_executor.go b/server/executor/poller_executor.go index 07332907ff..73d731fc77 100644 --- a/server/executor/poller_executor.go +++ b/server/executor/poller_executor.go @@ -2,11 +2,14 @@ package executor import ( "context" + "errors" "fmt" "log" "github.com/kubeshop/tracetest/server/model" + "github.com/kubeshop/tracetest/server/model/events" "github.com/kubeshop/tracetest/server/tracedb" + "github.com/kubeshop/tracetest/server/tracedb/connection" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -18,6 +21,7 @@ type DefaultPollerExecutor struct { updater RunUpdater newTraceDBFn traceDBFactoryFn dsRepo model.DataStoreRepository + eventEmitter EventEmitter } type InstrumentedPollerExecutor struct { @@ -25,11 +29,11 @@ type InstrumentedPollerExecutor struct { pollerExecutor PollerExecutor } -func (pe InstrumentedPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, model.Run, error) { +func (pe InstrumentedPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, string, model.Run, error) { _, span := pe.tracer.Start(request.ctx, "Fetch trace") defer span.End() - finished, run, err := pe.pollerExecutor.ExecuteRequest(request) + finished, finishReason, run, err := pe.pollerExecutor.ExecuteRequest(request) spanCount := 0 if run.Trace != nil { @@ -44,13 +48,17 @@ func (pe InstrumentedPollerExecutor) ExecuteRequest(request *PollingRequest) (bo attribute.Int("tracetest.run.trace_poller.amount_retrieved_spans", spanCount), } + if finishReason != "" { + attrs = append(attrs, attribute.String("tracetest.run.trace_poller.finish_reason", finishReason)) + } + if err != nil { attrs = append(attrs, attribute.String("tracetest.run.trace_poller.error", err.Error())) span.RecordError(err) } span.SetAttributes(attrs...) - return finished, run, err + return finished, finishReason, run, err } func NewPollerExecutor( @@ -59,6 +67,7 @@ func NewPollerExecutor( updater RunUpdater, newTraceDBFn traceDBFactoryFn, dsRepo model.DataStoreRepository, + eventEmitter EventEmitter, ) PollerExecutor { pollerExecutor := &DefaultPollerExecutor{ @@ -66,6 +75,7 @@ func NewPollerExecutor( updater: updater, newTraceDBFn: newTraceDBFn, dsRepo: dsRepo, + eventEmitter: eventEmitter, } return &InstrumentedPollerExecutor{ @@ -88,21 +98,52 @@ func (pe DefaultPollerExecutor) traceDB(ctx context.Context) (tracedb.TraceDB, e return tdb, nil } -func (pe DefaultPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, model.Run, error) { +func (pe DefaultPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, string, model.Run, error) { log.Printf("[PollerExecutor] Test %s Run %d: ExecuteRequest\n", request.test.ID, request.run.ID) run := request.run traceDB, err := pe.traceDB(request.ctx) if err != nil { log.Printf("[PollerExecutor] Test %s Run %d: GetDataStore error: %s\n", request.test.ID, request.run.ID, err.Error()) - return false, model.Run{}, err + return false, "", model.Run{}, err + } + + if request.IsFirstRequest() { + connectionResult := traceDB.TestConnection(request.ctx) + + err = pe.eventEmitter.Emit(request.ctx, events.TraceDataStoreConnectionInfo(request.test.ID, request.run.ID, connectionResult)) + if err != nil { + log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceDataStoreConnectionInfo event: error: %s\n", request.test.ID, request.run.ID, err.Error()) + } + } + + err = pe.eventEmitter.Emit(request.ctx, events.TraceFetchingStart(request.test.ID, request.run.ID)) + if err != nil { + log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceFetchingStart event: error: %s\n", request.test.ID, request.run.ID, err.Error()) } traceID := run.TraceID.String() trace, err := traceDB.GetTraceByID(request.ctx, traceID) if err != nil { + connectionResult := model.ConnectionResult{} + + if !errors.Is(err, connection.ErrTraceNotFound) { + // run test connection to give a diagnostic when an unknown error happens + connectionResult = traceDB.TestConnection(request.ctx) + } + + anotherErr := pe.eventEmitter.Emit(request.ctx, events.TraceFetchingError(request.test.ID, request.run.ID, connectionResult, err)) + if anotherErr != nil { + log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceFetchingError event: error: %s\n", request.test.ID, request.run.ID, anotherErr.Error()) + } + log.Printf("[PollerExecutor] Test %s Run %d: GetTraceByID (traceID %s) error: %s\n", request.test.ID, request.run.ID, traceID, err.Error()) - return false, model.Run{}, err + return false, "", model.Run{}, err + } + + err = pe.eventEmitter.Emit(request.ctx, events.TraceFetchingSuccess(request.test.ID, request.run.ID)) + if err != nil { + log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceFetchingSuccess event: error: %s\n", request.test.ID, request.run.ID, err.Error()) } trace.ID = run.TraceID @@ -113,7 +154,7 @@ func (pe DefaultPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, m log.Printf("[PollerExecutor] Test %s Run %d: Not done polling. (%s)\n", request.test.ID, request.run.ID, reason) run.Trace = &trace request.run = run - return false, run, nil + return false, "", run, nil } log.Printf("[PollerExecutor] Test %s Run %d: Done polling. (%s)\n", request.test.ID, request.run.ID, reason) @@ -138,10 +179,10 @@ func (pe DefaultPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, m err = pe.updater.Update(request.ctx, run) if err != nil { log.Printf("[PollerExecutor] Test %s Run %d: Update error: %s\n", request.test.ID, request.run.ID, err.Error()) - return false, model.Run{}, err + return false, "", model.Run{}, err } - return true, run, nil + return true, reason, run, nil } func (pe DefaultPollerExecutor) donePollingTraces(job *PollingRequest, traceDB tracedb.TraceDB, trace model.Trace) (bool, string) { diff --git a/server/executor/poller_executor_test.go b/server/executor/poller_executor_test.go index f78cc6462a..18ff826a2d 100644 --- a/server/executor/poller_executor_test.go +++ b/server/executor/poller_executor_test.go @@ -10,10 +10,12 @@ import ( "github.com/kubeshop/tracetest/server/executor/pollingprofile" "github.com/kubeshop/tracetest/server/id" "github.com/kubeshop/tracetest/server/model" + "github.com/kubeshop/tracetest/server/subscription" "github.com/kubeshop/tracetest/server/testdb" "github.com/kubeshop/tracetest/server/tracedb" "github.com/kubeshop/tracetest/server/tracedb/connection" "github.com/kubeshop/tracetest/server/tracing" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/trace" @@ -455,15 +457,17 @@ func executeAndValidatePollingRequests(t *testing.T, pollerExecutor executor.Pol for i, value := range expectedValues { request := executor.NewPollingRequest(ctx, test, run, i) - finished, anotherRun, err := pollerExecutor.ExecuteRequest(request) + finished, finishReason, anotherRun, err := pollerExecutor.ExecuteRequest(request) run = anotherRun // should store a run to use in another iteration require.NotNilf(t, run, "The test run should not be nil on iteration %d", i) if value.finished { require.Truef(t, finished, "The poller should have finished on iteration %d", i) + require.NotEmptyf(t, finishReason, "The poller should not have finish reason on iteration %d", i) } else { require.Falsef(t, finished, "The poller should have not finished on iteration %d", i) + require.Emptyf(t, finishReason, "The poller should have finish reason on iteration %d", i) } if value.expectNoTraceError { @@ -505,6 +509,7 @@ func getPollerExecutorWithMocks(t *testing.T, retryDelay, maxWaitTimeForTrace ti tracer := getTracerMock(t) testDB := getDataStoreRepositoryMock(t) traceDBFactory := getTraceDBMockFactory(t, tracePerIteration, &traceDBState{}) + eventEmitter := getEventEmitterMock(t, testDB) return executor.NewPollerExecutor( defaultProfileGetter{retryDelay, maxWaitTimeForTrace}, @@ -512,6 +517,7 @@ func getPollerExecutorWithMocks(t *testing.T, retryDelay, maxWaitTimeForTrace ti updater, traceDBFactory, testDB, + eventEmitter, ) } @@ -527,23 +533,22 @@ func getRunUpdaterMock(t *testing.T) executor.RunUpdater { } // DataStoreRepository -type dataStoreRepositoryMock struct { - testdb.MockRepository - // ... -} +func getDataStoreRepositoryMock(t *testing.T) model.Repository { + t.Helper() + + testDB := testdb.MockRepository{} -func (m dataStoreRepositoryMock) DefaultDataStore(_ context.Context) (model.DataStore, error) { - return model.DataStore{}, nil + testDB.Mock.On("DefaultDataStore", mock.Anything).Return(model.DataStore{Type: model.DataStoreTypeOTLP}, nil) + testDB.Mock.On("CreateTestRunEvent", mock.Anything).Return(noError) + + return &testDB } -func getDataStoreRepositoryMock(t *testing.T) model.Repository { +// EventEmitter +func getEventEmitterMock(t *testing.T, db model.Repository) executor.EventEmitter { t.Helper() - mock := new(dataStoreRepositoryMock) - mock.T = t - mock.Test(t) - - return mock + return executor.NewEventEmitter(db, subscription.NewManager()) } // Tracer diff --git a/server/executor/trace_poller.go b/server/executor/trace_poller.go index 8ed1262ea2..026e964b07 100644 --- a/server/executor/trace_poller.go +++ b/server/executor/trace_poller.go @@ -9,6 +9,7 @@ import ( "github.com/kubeshop/tracetest/server/executor/pollingprofile" "github.com/kubeshop/tracetest/server/model" + "github.com/kubeshop/tracetest/server/model/events" "github.com/kubeshop/tracetest/server/subscription" "github.com/kubeshop/tracetest/server/tracedb/connection" v1 "go.opentelemetry.io/proto/otlp/trace/v1" @@ -24,7 +25,7 @@ type PersistentTracePoller interface { } type PollerExecutor interface { - ExecuteRequest(*PollingRequest) (bool, model.Run, error) + ExecuteRequest(*PollingRequest) (bool, string, model.Run, error) } type TraceFetcher interface { @@ -41,6 +42,7 @@ func NewTracePoller( updater RunUpdater, assertionRunner AssertionRunner, subscriptionManager *subscription.Manager, + eventEmitter EventEmitter, ) PersistentTracePoller { return tracePoller{ updater: updater, @@ -50,34 +52,41 @@ func NewTracePoller( exit: make(chan bool, 1), assertionRunner: assertionRunner, subscriptionManager: subscriptionManager, + eventEmitter: eventEmitter, } } type tracePoller struct { - updater RunUpdater - ppGetter PollingProfileGetter - pollerExecutor PollerExecutor - assertionRunner AssertionRunner - + updater RunUpdater + ppGetter PollingProfileGetter + pollerExecutor PollerExecutor + assertionRunner AssertionRunner subscriptionManager *subscription.Manager + eventEmitter EventEmitter executeQueue chan PollingRequest exit chan bool } type PollingRequest struct { - ctx context.Context - test model.Test - run model.Run - count int + ctx context.Context + test model.Test + run model.Run + count int + hadRequeue bool +} + +func (pr PollingRequest) IsFirstRequest() bool { + return !pr.hadRequeue } func NewPollingRequest(ctx context.Context, test model.Test, run model.Run, count int) *PollingRequest { return &PollingRequest{ - ctx: ctx, - test: test, - run: run, - count: count, + ctx: ctx, + test: test, + run: run, + count: count, + hadRequeue: false, } } @@ -123,20 +132,45 @@ func (tp tracePoller) enqueueJob(job PollingRequest) { } func (tp tracePoller) processJob(job PollingRequest) { - finished, run, err := tp.pollerExecutor.ExecuteRequest(&job) + if job.IsFirstRequest() { + err := tp.eventEmitter.Emit(job.ctx, events.TracePollingStart(job.test.ID, job.run.ID)) + if err != nil { + log.Printf("[TracePoller] Test %s Run %d: fail to emit TracePollingStart event: %s \n", job.test.ID, job.run.ID, err.Error()) + } + } + + finished, finishReason, run, err := tp.pollerExecutor.ExecuteRequest(&job) if err != nil { log.Printf("[TracePoller] Test %s Run %d: ExecuteRequest Error: %s\n", job.test.ID, job.run.ID, err.Error()) - tp.handleTraceDBError(job, err) + jobFailed, reason := tp.handleTraceDBError(job, err) + + if jobFailed { + anotherErr := tp.eventEmitter.Emit(job.ctx, events.TracePollingError(job.test.ID, job.run.ID, reason, err)) + if anotherErr != nil { + log.Printf("[TracePoller] Test %s Run %d: fail to emit TracePollingError event: %s \n", job.test.ID, job.run.ID, err.Error()) + } + } + return } + err = tp.eventEmitter.Emit(job.ctx, events.TracePollingIterationInfo(job.test.ID, job.run.ID, len(run.Trace.Flat), job.count, finished)) + if err != nil { + log.Printf("[TracePoller] Test %s Run %d: failed to emit TracePollingIterationInfo event: error: %s\n", job.test.ID, job.run.ID, err.Error()) + } + if !finished { job.count += 1 tp.requeue(job) return } - log.Printf("[TracePoller] Test %s Run %d: Done polling. Completed polling after %d iterations, number of spans collected %d\n", job.test.ID, job.run.ID, job.count+1, len(run.Trace.Flat)) + log.Printf("[TracePoller] Test %s Run %d: Done polling (reason: %s). Completed polling after %d iterations, number of spans collected %d\n", job.test.ID, job.run.ID, finishReason, job.count+1, len(run.Trace.Flat)) + + err = tp.eventEmitter.Emit(job.ctx, events.TracePollingSuccess(job.test.ID, job.run.ID)) + if err != nil { + log.Printf("[TracePoller] Test %s Run %d: fail to emit TracePollingSuccess event: %s \n", job.test.ID, job.run.ID, err.Error()) + } tp.handleDBError(tp.updater.Update(job.ctx, run)) @@ -153,7 +187,7 @@ func (tp tracePoller) runAssertions(job PollingRequest) { tp.assertionRunner.RunAssertions(job.ctx, assertionRequest) } -func (tp tracePoller) handleTraceDBError(job PollingRequest, err error) { +func (tp tracePoller) handleTraceDBError(job PollingRequest, err error) (bool, string) { run := job.run pp := *tp.ppGetter.GetDefault(job.ctx).Periodic @@ -162,13 +196,19 @@ func (tp tracePoller) handleTraceDBError(job PollingRequest, err error) { if errors.Is(err, connection.ErrTraceNotFound) && time.Since(run.ServiceTriggeredAt) < pp.TimeoutDuration() { log.Println("[TracePoller] Trace not found on Data Store yet. Requeuing...") tp.requeue(job) - return + return false, "Trace not found" // job without fail } + reason := "" + if errors.Is(err, connection.ErrTraceNotFound) { + reason = "Timed out without finding trace" + err = fmt.Errorf("timed out waiting for traces after %s", pp.Timeout) fmt.Println("[TracePoller] Timed-out", err) } else { + reason = "Unexpected error" + err = fmt.Errorf("cannot fetch trace: %w", err) fmt.Println("[TracePoller] Unknown error", err) } @@ -180,6 +220,8 @@ func (tp tracePoller) handleTraceDBError(job PollingRequest, err error) { Type: "update_run", Content: RunResult{Run: run, Err: err}, }) + + return true, reason // job failed } func (tp tracePoller) requeue(job PollingRequest) { @@ -187,6 +229,8 @@ func (tp tracePoller) requeue(job PollingRequest) { pp := *tp.ppGetter.GetDefault(job.ctx).Periodic fmt.Printf("[TracePoller] Requeuing Test Run %d. Current iteration: %d\n", job.run.ID, job.count) time.Sleep(pp.RetryDelayDuration()) + + job.hadRequeue = true tp.enqueueJob(job) }() } diff --git a/server/http/mappings/test_run_events.go b/server/http/mappings/test_run_events.go index 2ff78a4c51..cd4148884f 100644 --- a/server/http/mappings/test_run_events.go +++ b/server/http/mappings/test_run_events.go @@ -31,10 +31,9 @@ func (m OpenAPI) TestRunEvent(in model.TestRunEvent) openapi.TestRunEvent { func (m OpenAPI) PollingInfo(in model.PollingInfo) openapi.PollingInfo { return openapi.PollingInfo{ - Type: string(in.Type), - ReasonNextIteration: in.ReasonNextIteration, - IsComplete: in.IsComplete, - Periodic: m.PeriodicPollingInfo(in.Periodic), + Type: string(in.Type), + IsComplete: in.IsComplete, + Periodic: m.PeriodicPollingInfo(in.Periodic), } } diff --git a/server/model/events/events.go b/server/model/events/events.go index cd13f0818a..8767037344 100644 --- a/server/model/events/events.go +++ b/server/model/events/events.go @@ -158,46 +158,16 @@ func TriggergRPCUnreachableHostError(testID id.ID, runID int, err error) model.T } } -func TraceFetchingStart(testID id.ID, runID int) model.TestRunEvent { - return model.TestRunEvent{ - TestID: testID, - RunID: runID, - Stage: model.StageTrace, - Type: "FETCHING_START", - Title: "Starting the trace fetching process", - Description: "Starting the trace fetching process", - CreatedAt: time.Now(), - DataStoreConnection: model.ConnectionResult{}, - Polling: model.PollingInfo{}, - Outputs: []model.OutputInfo{}, - } -} - -func TraceQueuedInfo(testID id.ID, runID int) model.TestRunEvent { - return model.TestRunEvent{ - TestID: testID, - RunID: runID, - Stage: model.StageTrace, - Type: "QUEUED_INFO", - Title: "Trace Run has been queued to start the fetching process", - Description: "Trace Run has been queued to start the fetching process", - CreatedAt: time.Now(), - DataStoreConnection: model.ConnectionResult{}, - Polling: model.PollingInfo{}, - Outputs: []model.OutputInfo{}, - } -} - -func TraceDataStoreConnectionInfo(testID id.ID, runID int) model.TestRunEvent { +func TraceDataStoreConnectionInfo(testID id.ID, runID int, connectionResult model.ConnectionResult) model.TestRunEvent { return model.TestRunEvent{ TestID: testID, RunID: runID, Stage: model.StageTrace, Type: "DATA_STORE_CONNECTION_INFO", - Title: "A Data store connection request has been executed,test connection result information", + Title: "A Data store connection request has been executed", Description: "A Data store connection request has been executed,test connection result information", CreatedAt: time.Now(), - DataStoreConnection: model.ConnectionResult{}, + DataStoreConnection: connectionResult, Polling: model.PollingInfo{}, Outputs: []model.OutputInfo{}, } @@ -213,23 +183,37 @@ func TracePollingStart(testID id.ID, runID int) model.TestRunEvent { Description: "Starting the trace polling process", CreatedAt: time.Now(), DataStoreConnection: model.ConnectionResult{}, - Polling: model.PollingInfo{}, - Outputs: []model.OutputInfo{}, + Polling: model.PollingInfo{ + Type: model.PollingTypePeriodic, + IsComplete: false, + Periodic: &model.PeriodicPollingConfig{ + NumberSpans: 0, + NumberIterations: 0, + }, + }, + Outputs: []model.OutputInfo{}, } } -func TracePollingIterationInfo(testID id.ID, runID int, numberOfSpans, iteration int, nextIterationReason string) model.TestRunEvent { +func TracePollingIterationInfo(testID id.ID, runID, numberOfSpans, iteration int, isComplete bool) model.TestRunEvent { return model.TestRunEvent{ TestID: testID, RunID: runID, Stage: model.StageTrace, Type: "POLLING_ITERATION_INFO", Title: "A polling iteration has been executed", - Description: fmt.Sprintf("A polling iteration has been executed, %d spans - iteration %d - reason of next iteration: %s", numberOfSpans, iteration, nextIterationReason), + Description: fmt.Sprintf("A polling iteration has been executed, %d spans collected - iteration %d", numberOfSpans, iteration), CreatedAt: time.Now(), DataStoreConnection: model.ConnectionResult{}, - Polling: model.PollingInfo{}, - Outputs: []model.OutputInfo{}, + Polling: model.PollingInfo{ + Type: model.PollingTypePeriodic, + IsComplete: isComplete, + Periodic: &model.PeriodicPollingConfig{ + NumberSpans: numberOfSpans, + NumberIterations: iteration, + }, + }, + Outputs: []model.OutputInfo{}, } } @@ -248,14 +232,29 @@ func TracePollingSuccess(testID id.ID, runID int) model.TestRunEvent { } } -func TracePollingError(testID id.ID, runID int) model.TestRunEvent { +func TracePollingError(testID id.ID, runID int, reason string, err error) model.TestRunEvent { return model.TestRunEvent{ TestID: testID, RunID: runID, Stage: model.StageTrace, Type: "POLLING_ERROR", Title: "The polling strategy has failed to fetch the trace", - Description: "The polling strategy has failed to fetch the trace", + Description: fmt.Sprintf("The polling strategy has failed to fetch the trace. Reason: %s Error: %s", reason, err.Error()), + CreatedAt: time.Now(), + DataStoreConnection: model.ConnectionResult{}, + Polling: model.PollingInfo{}, + Outputs: []model.OutputInfo{}, + } +} + +func TraceFetchingStart(testID id.ID, runID int) model.TestRunEvent { + return model.TestRunEvent{ + TestID: testID, + RunID: runID, + Stage: model.StageTrace, + Type: "FETCHING_START", + Title: "Starting the trace fetching process", + Description: "Starting the trace fetching process", CreatedAt: time.Now(), DataStoreConnection: model.ConnectionResult{}, Polling: model.PollingInfo{}, @@ -278,16 +277,16 @@ func TraceFetchingSuccess(testID id.ID, runID int) model.TestRunEvent { } } -func TraceFetchingError(testID id.ID, runID int) model.TestRunEvent { +func TraceFetchingError(testID id.ID, runID int, connectionResult model.ConnectionResult, err error) model.TestRunEvent { return model.TestRunEvent{ TestID: testID, RunID: runID, Stage: model.StageTrace, Type: "FETCHING_ERROR", Title: "The trace was not able to be fetched", - Description: "The trace was not able to be fetched", + Description: fmt.Sprintf("The trace was not able to be fetched from the DataStore. Error: %s", err), CreatedAt: time.Now(), - DataStoreConnection: model.ConnectionResult{}, + DataStoreConnection: connectionResult, Polling: model.PollingInfo{}, Outputs: []model.OutputInfo{}, } diff --git a/server/model/test_run_event.go b/server/model/test_run_event.go index a603a02206..e31048b52e 100644 --- a/server/model/test_run_event.go +++ b/server/model/test_run_event.go @@ -61,10 +61,9 @@ func (e TestRunEvent) ResourceID() string { } type PollingInfo struct { - Type PollingType - ReasonNextIteration string - IsComplete bool - Periodic *PeriodicPollingConfig + Type PollingType + IsComplete bool + Periodic *PeriodicPollingConfig } type PeriodicPollingConfig struct { diff --git a/server/openapi/model_polling_info.go b/server/openapi/model_polling_info.go index 752e3d02fd..9a637bd440 100644 --- a/server/openapi/model_polling_info.go +++ b/server/openapi/model_polling_info.go @@ -12,8 +12,6 @@ package openapi type PollingInfo struct { Type string `json:"type,omitempty"` - ReasonNextIteration string `json:"reasonNextIteration,omitempty"` - IsComplete bool `json:"isComplete,omitempty"` Periodic PollingInfoPeriodic `json:"periodic,omitempty"` diff --git a/web/src/components/RunEvents/RunEventPolling.tsx b/web/src/components/RunEvents/RunEventPolling.tsx index a320e2378e..b9f95a92cd 100644 --- a/web/src/components/RunEvents/RunEventPolling.tsx +++ b/web/src/components/RunEvents/RunEventPolling.tsx @@ -23,13 +23,6 @@ const RunEventPolling = ({event}: IPropsEvent) => ( {event.polling.periodic.numberIterations} - - -
- Reason why the next iteration will be executed: - {event.polling.reasonNextIteration} -
-
)} diff --git a/web/src/models/TestRunEvent.model.ts b/web/src/models/TestRunEvent.model.ts index 699e948dcd..0b196e7230 100644 --- a/web/src/models/TestRunEvent.model.ts +++ b/web/src/models/TestRunEvent.model.ts @@ -15,7 +15,6 @@ type TestRunEvent = Model< function PollingInfo({ type = PollingInfoType.Periodic, - reasonNextIteration = '', isComplete = false, periodic = {}, }: TRawPollingInfo): PollingInfo { @@ -23,7 +22,6 @@ function PollingInfo({ return { type: types.includes(type) ? type : PollingInfoType.Periodic, - reasonNextIteration, isComplete, periodic: { numberSpans: periodic?.numberSpans ?? 0, diff --git a/web/src/types/Generated.types.ts b/web/src/types/Generated.types.ts index d12e3774fc..cd85b4c926 100644 --- a/web/src/types/Generated.types.ts +++ b/web/src/types/Generated.types.ts @@ -1810,7 +1810,6 @@ export interface external { PollingInfo: { /** @enum {string} */ type?: "periodic"; - reasonNextIteration?: string; isComplete?: boolean; periodic?: { numberSpans?: number;