From daf930cf9661b6edf9d3eec126da8bec28732e1c Mon Sep 17 00:00:00 2001 From: Oscar Reyes Date: Wed, 19 Apr 2023 09:15:29 -0600 Subject: [PATCH] fix: improvements to the test run events (#2405) * fix: improvements to the test run events * PR comment updates --- server/executor/assertion_runner.go | 4 +- server/executor/poller_executor.go | 40 ++++------ server/executor/trace_poller.go | 21 +++-- server/model/events/events.go | 16 ++-- .../components/RunEvents/RunEventPolling.tsx | 76 +++++++++++++------ 5 files changed, 90 insertions(+), 67 deletions(-) diff --git a/server/executor/assertion_runner.go b/server/executor/assertion_runner.go index f8ac0e5c6d..870f4ed5ac 100644 --- a/server/executor/assertion_runner.go +++ b/server/executor/assertion_runner.go @@ -198,7 +198,7 @@ func (e *defaultAssertionRunner) emitFailedAssertions(ctx context.Context, req A if errors.Is(spanAssertionResult.CompareErr, expression.ErrExpressionResolution) { unwrappedError := errors.Unwrap(spanAssertionResult.CompareErr) - e.eventEmitter.Emit(ctx, events.TestSpecsAssertionError( + e.eventEmitter.Emit(ctx, events.TestSpecsAssertionWarning( req.Run.TestID, req.Run.ID, unwrappedError, @@ -208,7 +208,7 @@ func (e *defaultAssertionRunner) emitFailedAssertions(ctx context.Context, req A } if errors.Is(spanAssertionResult.CompareErr, expression.ErrInvalidSyntax) { - e.eventEmitter.Emit(ctx, events.TestSpecsAssertionError( + e.eventEmitter.Emit(ctx, events.TestSpecsAssertionWarning( req.Run.TestID, req.Run.ID, spanAssertionResult.CompareErr, diff --git a/server/executor/poller_executor.go b/server/executor/poller_executor.go index 518776757a..a68ecd28e8 100644 --- a/server/executor/poller_executor.go +++ b/server/executor/poller_executor.go @@ -2,14 +2,12 @@ package executor import ( "context" - "errors" "fmt" "log" "github.com/kubeshop/tracetest/server/model" "github.com/kubeshop/tracetest/server/model/events" "github.com/kubeshop/tracetest/server/tracedb" - "github.com/kubeshop/tracetest/server/tracedb/connection" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -117,50 +115,44 @@ func (pe DefaultPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, s log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceDataStoreConnectionInfo event: error: %s\n", request.test.ID, request.run.ID, err.Error()) } } - } - err = pe.eventEmitter.Emit(request.ctx, events.TraceFetchingStart(request.test.ID, request.run.ID)) - if err != nil { - log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceFetchingStart event: error: %s\n", request.test.ID, request.run.ID, err.Error()) + err = pe.eventEmitter.Emit(request.ctx, events.TracePollingStart(request.test.ID, request.run.ID)) + if err != nil { + log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingStart event: error: %s\n", request.test.ID, request.run.ID, err.Error()) + } } traceID := run.TraceID.String() trace, err := traceDB.GetTraceByID(request.ctx, traceID) if err != nil { - connectionResult := model.ConnectionResult{} - - if !errors.Is(err, connection.ErrTraceNotFound) { - // run test connection to give a diagnostic when an unknown error happens - if testableTraceDB, ok := traceDB.(tracedb.TestableTraceDB); ok { - connectionResult = testableTraceDB.TestConnection(request.ctx) - } - } - - anotherErr := pe.eventEmitter.Emit(request.ctx, events.TraceFetchingError(request.test.ID, request.run.ID, connectionResult, err)) + anotherErr := pe.eventEmitter.Emit(request.ctx, events.TracePollingIterationInfo(request.test.ID, request.run.ID, 0, request.count, false, err.Error())) if anotherErr != nil { - log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceFetchingError event: error: %s\n", request.test.ID, request.run.ID, anotherErr.Error()) + log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingIterationInfo event: error: %s\n", request.test.ID, request.run.ID, anotherErr.Error()) } log.Printf("[PollerExecutor] Test %s Run %d: GetTraceByID (traceID %s) error: %s\n", request.test.ID, request.run.ID, traceID, err.Error()) return false, "", model.Run{}, err } - err = pe.eventEmitter.Emit(request.ctx, events.TraceFetchingSuccess(request.test.ID, request.run.ID)) - if err != nil { - log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceFetchingSuccess event: error: %s\n", request.test.ID, request.run.ID, err.Error()) - } - trace.ID = run.TraceID - done, reason := pe.donePollingTraces(request, traceDB, trace) - if !done { + err := pe.eventEmitter.Emit(request.ctx, events.TracePollingIterationInfo(request.test.ID, request.run.ID, len(trace.Flat), request.count, false, reason)) + if err != nil { + log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingIterationInfo event: error: %s\n", request.test.ID, request.run.ID, err.Error()) + } + log.Printf("[PollerExecutor] Test %s Run %d: Not done polling. (%s)\n", request.test.ID, request.run.ID, reason) run.Trace = &trace request.run = run return false, "", run, nil } + err = pe.eventEmitter.Emit(request.ctx, events.TracePollingSuccess(request.test.ID, request.run.ID, reason)) + if err != nil { + log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingSuccess event: error: %s\n", request.test.ID, request.run.ID, err.Error()) + } + log.Printf("[PollerExecutor] Test %s Run %d: Done polling. (%s)\n", request.test.ID, request.run.ID, reason) log.Printf("[PollerExecutor] Test %s Run %d: Start Sorting\n", request.test.ID, request.run.ID) diff --git a/server/executor/trace_poller.go b/server/executor/trace_poller.go index b44620d43a..c2193c01ed 100644 --- a/server/executor/trace_poller.go +++ b/server/executor/trace_poller.go @@ -16,6 +16,8 @@ import ( v1 "go.opentelemetry.io/proto/otlp/trace/v1" ) +const PollingRequestStartIteration = 1 + type TracePoller interface { Poll(context.Context, model.Test, model.Run) } @@ -123,7 +125,7 @@ func (tp tracePoller) Stop() { func (tp tracePoller) Poll(ctx context.Context, test model.Test, run model.Run) { log.Printf("[TracePoller] Test %s Run %d: Poll\n", test.ID, run.ID) - job := NewPollingRequest(ctx, test, run, 0) + job := NewPollingRequest(ctx, test, run, PollingRequestStartIteration) tp.enqueueJob(*job) } @@ -141,12 +143,14 @@ func (tp tracePoller) processJob(job PollingRequest) { } if job.IsFirstRequest() { - err := tp.eventEmitter.Emit(job.ctx, events.TracePollingStart(job.test.ID, job.run.ID)) + err := tp.eventEmitter.Emit(job.ctx, events.TraceFetchingStart(job.test.ID, job.run.ID)) if err != nil { log.Printf("[TracePoller] Test %s Run %d: fail to emit TracePollingStart event: %s \n", job.test.ID, job.run.ID, err.Error()) } } + fmt.Println("tracePoller processJob", job.count) + finished, finishReason, run, err := tp.pollerExecutor.ExecuteRequest(&job) if err != nil { log.Printf("[TracePoller] Test %s Run %d: ExecuteRequest Error: %s\n", job.test.ID, job.run.ID, err.Error()) @@ -157,16 +161,16 @@ func (tp tracePoller) processJob(job PollingRequest) { if anotherErr != nil { log.Printf("[TracePoller] Test %s Run %d: fail to emit TracePollingError event: %s \n", job.test.ID, job.run.ID, err.Error()) } + + anotherErr = tp.eventEmitter.Emit(job.ctx, events.TraceFetchingError(job.test.ID, job.run.ID, err)) + if anotherErr != nil { + log.Printf("[TracePoller] Test %s Run %d: fail to emit TracePollingError event: %s \n", job.test.ID, job.run.ID, err.Error()) + } } return } - err = tp.eventEmitter.Emit(job.ctx, events.TracePollingIterationInfo(job.test.ID, job.run.ID, len(run.Trace.Flat), job.count, finished)) - if err != nil { - log.Printf("[TracePoller] Test %s Run %d: failed to emit TracePollingIterationInfo event: error: %s\n", job.test.ID, job.run.ID, err.Error()) - } - if !finished { job.count += 1 tp.requeue(job) @@ -175,7 +179,7 @@ func (tp tracePoller) processJob(job PollingRequest) { log.Printf("[TracePoller] Test %s Run %d: Done polling (reason: %s). Completed polling after %d iterations, number of spans collected %d\n", job.test.ID, job.run.ID, finishReason, job.count+1, len(run.Trace.Flat)) - err = tp.eventEmitter.Emit(job.ctx, events.TracePollingSuccess(job.test.ID, job.run.ID)) + err = tp.eventEmitter.Emit(job.ctx, events.TraceFetchingSuccess(job.test.ID, job.run.ID)) if err != nil { log.Printf("[TracePoller] Test %s Run %d: fail to emit TracePollingSuccess event: %s \n", job.test.ID, job.run.ID, err.Error()) } @@ -209,6 +213,7 @@ func (tp tracePoller) handleTraceDBError(job PollingRequest, err error) (bool, s // Edge case: the trace still not avaiable on Data Store during polling if errors.Is(err, connection.ErrTraceNotFound) && time.Since(run.ServiceTriggeredAt) < pp.TimeoutDuration() { log.Println("[TracePoller] Trace not found on Data Store yet. Requeuing...") + job.count += 1 tp.requeue(job) return false, "Trace not found" // job without fail } diff --git a/server/model/events/events.go b/server/model/events/events.go index 525b078fed..3713b2cc7f 100644 --- a/server/model/events/events.go +++ b/server/model/events/events.go @@ -195,14 +195,14 @@ func TracePollingStart(testID id.ID, runID int) model.TestRunEvent { } } -func TracePollingIterationInfo(testID id.ID, runID, numberOfSpans, iteration int, isComplete bool) model.TestRunEvent { +func TracePollingIterationInfo(testID id.ID, runID, numberOfSpans, iteration int, isComplete bool, reason string) model.TestRunEvent { return model.TestRunEvent{ TestID: testID, RunID: runID, Stage: model.StageTrace, Type: "POLLING_ITERATION_INFO", Title: "Trace polling iteration executed", - Description: "A trace polling iteration has been executed", + Description: fmt.Sprintf("A trace polling iteration has been executed. Reason: %s", reason), CreatedAt: time.Now(), DataStoreConnection: model.ConnectionResult{}, Polling: model.PollingInfo{ @@ -217,14 +217,14 @@ func TracePollingIterationInfo(testID id.ID, runID, numberOfSpans, iteration int } } -func TracePollingSuccess(testID id.ID, runID int) model.TestRunEvent { +func TracePollingSuccess(testID id.ID, runID int, reason string) model.TestRunEvent { return model.TestRunEvent{ TestID: testID, RunID: runID, Stage: model.StageTrace, Type: "POLLING_SUCCESS", Title: "Trace polling strategy succeeded", - Description: "The polling strategy has succeeded in fetching the trace from the data store", + Description: fmt.Sprintf("The polling strategy has succeeded in fetching the trace from the data store. Reason: %s", reason), CreatedAt: time.Now(), DataStoreConnection: model.ConnectionResult{}, Polling: model.PollingInfo{}, @@ -277,7 +277,7 @@ func TraceFetchingSuccess(testID id.ID, runID int) model.TestRunEvent { } } -func TraceFetchingError(testID id.ID, runID int, connectionResult model.ConnectionResult, err error) model.TestRunEvent { +func TraceFetchingError(testID id.ID, runID int, err error) model.TestRunEvent { return model.TestRunEvent{ TestID: testID, RunID: runID, @@ -286,7 +286,7 @@ func TraceFetchingError(testID id.ID, runID int, connectionResult model.Connecti Title: "Trace fetching failed", Description: fmt.Sprintf("The trace was not able to be fetched from the data store. Error: %s", err), CreatedAt: time.Now(), - DataStoreConnection: connectionResult, + DataStoreConnection: model.ConnectionResult{}, Polling: model.PollingInfo{}, Outputs: []model.OutputInfo{}, } @@ -382,12 +382,12 @@ func TestSpecsRunStart(testID id.ID, runID int) model.TestRunEvent { } } -func TestSpecsAssertionError(testID id.ID, runID int, err error, spanID string, assertion string) model.TestRunEvent { +func TestSpecsAssertionWarning(testID id.ID, runID int, err error, spanID string, assertion string) model.TestRunEvent { return model.TestRunEvent{ TestID: testID, RunID: runID, Stage: model.StageTest, - Type: "TEST_SPECS_ASSERTION_ERROR", + Type: "TEST_SPECS_ASSERTION_WARNING", Title: fmt.Sprintf(`Assertion '%s' failed`, assertion), Description: fmt.Sprintf(`The assertion '%s' returned an error on span %s. Error: %s`, assertion, spanID, err.Error()), CreatedAt: time.Now(), diff --git a/web/src/components/RunEvents/RunEventPolling.tsx b/web/src/components/RunEvents/RunEventPolling.tsx index b9f95a92cd..57cdb71cca 100644 --- a/web/src/components/RunEvents/RunEventPolling.tsx +++ b/web/src/components/RunEvents/RunEventPolling.tsx @@ -1,31 +1,57 @@ import {Typography} from 'antd'; +import {capitalize} from 'lodash'; +import {useSettingsValues} from 'providers/SettingsValues/SettingsValues.provider'; import {IPropsEvent} from './RunEvent'; import * as S from './RunEvents.styled'; -const RunEventPolling = ({event}: IPropsEvent) => ( - - - {event.title} - {event.description} - {!!event.polling && ( - - - -
- Number of spans collected: - {event.polling.periodic.numberSpans} -
-
- - -
- Iteration number: - {event.polling.periodic.numberIterations} -
-
-
- )} -
-); +const RunEventPolling = ({event}: IPropsEvent) => { + const { + pollingProfile: { + strategy, + periodic: {retryDelay = '', timeout = ''}, + }, + } = useSettingsValues(); + + return ( + + + {event.title} + {event.description} + {!!event.polling && ( + + + +
+ Polling profile configuration: + + {capitalize(strategy)} strategy + + + {retryDelay} of retry delay + + + {timeout} of timeout + +
+
+ + +
+ Number of spans collected: + {event.polling.periodic.numberSpans} +
+
+ + +
+ Iteration number: + {event.polling.periodic.numberIterations} +
+
+
+ )} +
+ ); +}; export default RunEventPolling;