Skip to content

Commit

Permalink
fix: improvements to the test run events (#2405)
Browse files Browse the repository at this point in the history
* fix: improvements to the test run events

* PR comment updates
  • Loading branch information
xoscar committed Apr 19, 2023
1 parent 7e772fc commit daf930c
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 67 deletions.
4 changes: 2 additions & 2 deletions server/executor/assertion_runner.go
Expand Up @@ -198,7 +198,7 @@ func (e *defaultAssertionRunner) emitFailedAssertions(ctx context.Context, req A

if errors.Is(spanAssertionResult.CompareErr, expression.ErrExpressionResolution) {
unwrappedError := errors.Unwrap(spanAssertionResult.CompareErr)
e.eventEmitter.Emit(ctx, events.TestSpecsAssertionError(
e.eventEmitter.Emit(ctx, events.TestSpecsAssertionWarning(
req.Run.TestID,
req.Run.ID,
unwrappedError,
Expand All @@ -208,7 +208,7 @@ func (e *defaultAssertionRunner) emitFailedAssertions(ctx context.Context, req A
}

if errors.Is(spanAssertionResult.CompareErr, expression.ErrInvalidSyntax) {
e.eventEmitter.Emit(ctx, events.TestSpecsAssertionError(
e.eventEmitter.Emit(ctx, events.TestSpecsAssertionWarning(
req.Run.TestID,
req.Run.ID,
spanAssertionResult.CompareErr,
Expand Down
40 changes: 16 additions & 24 deletions server/executor/poller_executor.go
Expand Up @@ -2,14 +2,12 @@ package executor

import (
"context"
"errors"
"fmt"
"log"

"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/model/events"
"github.com/kubeshop/tracetest/server/tracedb"
"github.com/kubeshop/tracetest/server/tracedb/connection"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -117,50 +115,44 @@ func (pe DefaultPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, s
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceDataStoreConnectionInfo event: error: %s\n", request.test.ID, request.run.ID, err.Error())
}
}
}

err = pe.eventEmitter.Emit(request.ctx, events.TraceFetchingStart(request.test.ID, request.run.ID))
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceFetchingStart event: error: %s\n", request.test.ID, request.run.ID, err.Error())
err = pe.eventEmitter.Emit(request.ctx, events.TracePollingStart(request.test.ID, request.run.ID))
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingStart event: error: %s\n", request.test.ID, request.run.ID, err.Error())
}
}

traceID := run.TraceID.String()
trace, err := traceDB.GetTraceByID(request.ctx, traceID)
if err != nil {
connectionResult := model.ConnectionResult{}

if !errors.Is(err, connection.ErrTraceNotFound) {
// run test connection to give a diagnostic when an unknown error happens
if testableTraceDB, ok := traceDB.(tracedb.TestableTraceDB); ok {
connectionResult = testableTraceDB.TestConnection(request.ctx)
}
}

anotherErr := pe.eventEmitter.Emit(request.ctx, events.TraceFetchingError(request.test.ID, request.run.ID, connectionResult, err))
anotherErr := pe.eventEmitter.Emit(request.ctx, events.TracePollingIterationInfo(request.test.ID, request.run.ID, 0, request.count, false, err.Error()))
if anotherErr != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceFetchingError event: error: %s\n", request.test.ID, request.run.ID, anotherErr.Error())
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingIterationInfo event: error: %s\n", request.test.ID, request.run.ID, anotherErr.Error())
}

log.Printf("[PollerExecutor] Test %s Run %d: GetTraceByID (traceID %s) error: %s\n", request.test.ID, request.run.ID, traceID, err.Error())
return false, "", model.Run{}, err
}

err = pe.eventEmitter.Emit(request.ctx, events.TraceFetchingSuccess(request.test.ID, request.run.ID))
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceFetchingSuccess event: error: %s\n", request.test.ID, request.run.ID, err.Error())
}

trace.ID = run.TraceID

done, reason := pe.donePollingTraces(request, traceDB, trace)

if !done {
err := pe.eventEmitter.Emit(request.ctx, events.TracePollingIterationInfo(request.test.ID, request.run.ID, len(trace.Flat), request.count, false, reason))
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingIterationInfo event: error: %s\n", request.test.ID, request.run.ID, err.Error())
}

log.Printf("[PollerExecutor] Test %s Run %d: Not done polling. (%s)\n", request.test.ID, request.run.ID, reason)
run.Trace = &trace
request.run = run
return false, "", run, nil
}

err = pe.eventEmitter.Emit(request.ctx, events.TracePollingSuccess(request.test.ID, request.run.ID, reason))
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingSuccess event: error: %s\n", request.test.ID, request.run.ID, err.Error())
}

log.Printf("[PollerExecutor] Test %s Run %d: Done polling. (%s)\n", request.test.ID, request.run.ID, reason)

log.Printf("[PollerExecutor] Test %s Run %d: Start Sorting\n", request.test.ID, request.run.ID)
Expand Down
21 changes: 13 additions & 8 deletions server/executor/trace_poller.go
Expand Up @@ -16,6 +16,8 @@ import (
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
)

const PollingRequestStartIteration = 1

type TracePoller interface {
Poll(context.Context, model.Test, model.Run)
}
Expand Down Expand Up @@ -123,7 +125,7 @@ func (tp tracePoller) Stop() {
func (tp tracePoller) Poll(ctx context.Context, test model.Test, run model.Run) {
log.Printf("[TracePoller] Test %s Run %d: Poll\n", test.ID, run.ID)

job := NewPollingRequest(ctx, test, run, 0)
job := NewPollingRequest(ctx, test, run, PollingRequestStartIteration)

tp.enqueueJob(*job)
}
Expand All @@ -141,12 +143,14 @@ func (tp tracePoller) processJob(job PollingRequest) {
}

if job.IsFirstRequest() {
err := tp.eventEmitter.Emit(job.ctx, events.TracePollingStart(job.test.ID, job.run.ID))
err := tp.eventEmitter.Emit(job.ctx, events.TraceFetchingStart(job.test.ID, job.run.ID))
if err != nil {
log.Printf("[TracePoller] Test %s Run %d: fail to emit TracePollingStart event: %s \n", job.test.ID, job.run.ID, err.Error())
}
}

fmt.Println("tracePoller processJob", job.count)

finished, finishReason, run, err := tp.pollerExecutor.ExecuteRequest(&job)
if err != nil {
log.Printf("[TracePoller] Test %s Run %d: ExecuteRequest Error: %s\n", job.test.ID, job.run.ID, err.Error())
Expand All @@ -157,16 +161,16 @@ func (tp tracePoller) processJob(job PollingRequest) {
if anotherErr != nil {
log.Printf("[TracePoller] Test %s Run %d: fail to emit TracePollingError event: %s \n", job.test.ID, job.run.ID, err.Error())
}

anotherErr = tp.eventEmitter.Emit(job.ctx, events.TraceFetchingError(job.test.ID, job.run.ID, err))
if anotherErr != nil {
log.Printf("[TracePoller] Test %s Run %d: fail to emit TracePollingError event: %s \n", job.test.ID, job.run.ID, err.Error())
}
}

return
}

err = tp.eventEmitter.Emit(job.ctx, events.TracePollingIterationInfo(job.test.ID, job.run.ID, len(run.Trace.Flat), job.count, finished))
if err != nil {
log.Printf("[TracePoller] Test %s Run %d: failed to emit TracePollingIterationInfo event: error: %s\n", job.test.ID, job.run.ID, err.Error())
}

if !finished {
job.count += 1
tp.requeue(job)
Expand All @@ -175,7 +179,7 @@ func (tp tracePoller) processJob(job PollingRequest) {

log.Printf("[TracePoller] Test %s Run %d: Done polling (reason: %s). Completed polling after %d iterations, number of spans collected %d\n", job.test.ID, job.run.ID, finishReason, job.count+1, len(run.Trace.Flat))

err = tp.eventEmitter.Emit(job.ctx, events.TracePollingSuccess(job.test.ID, job.run.ID))
err = tp.eventEmitter.Emit(job.ctx, events.TraceFetchingSuccess(job.test.ID, job.run.ID))
if err != nil {
log.Printf("[TracePoller] Test %s Run %d: fail to emit TracePollingSuccess event: %s \n", job.test.ID, job.run.ID, err.Error())
}
Expand Down Expand Up @@ -209,6 +213,7 @@ func (tp tracePoller) handleTraceDBError(job PollingRequest, err error) (bool, s
// Edge case: the trace still not avaiable on Data Store during polling
if errors.Is(err, connection.ErrTraceNotFound) && time.Since(run.ServiceTriggeredAt) < pp.TimeoutDuration() {
log.Println("[TracePoller] Trace not found on Data Store yet. Requeuing...")
job.count += 1
tp.requeue(job)
return false, "Trace not found" // job without fail
}
Expand Down
16 changes: 8 additions & 8 deletions server/model/events/events.go
Expand Up @@ -195,14 +195,14 @@ func TracePollingStart(testID id.ID, runID int) model.TestRunEvent {
}
}

func TracePollingIterationInfo(testID id.ID, runID, numberOfSpans, iteration int, isComplete bool) model.TestRunEvent {
func TracePollingIterationInfo(testID id.ID, runID, numberOfSpans, iteration int, isComplete bool, reason string) model.TestRunEvent {
return model.TestRunEvent{
TestID: testID,
RunID: runID,
Stage: model.StageTrace,
Type: "POLLING_ITERATION_INFO",
Title: "Trace polling iteration executed",
Description: "A trace polling iteration has been executed",
Description: fmt.Sprintf("A trace polling iteration has been executed. Reason: %s", reason),
CreatedAt: time.Now(),
DataStoreConnection: model.ConnectionResult{},
Polling: model.PollingInfo{
Expand All @@ -217,14 +217,14 @@ func TracePollingIterationInfo(testID id.ID, runID, numberOfSpans, iteration int
}
}

func TracePollingSuccess(testID id.ID, runID int) model.TestRunEvent {
func TracePollingSuccess(testID id.ID, runID int, reason string) model.TestRunEvent {
return model.TestRunEvent{
TestID: testID,
RunID: runID,
Stage: model.StageTrace,
Type: "POLLING_SUCCESS",
Title: "Trace polling strategy succeeded",
Description: "The polling strategy has succeeded in fetching the trace from the data store",
Description: fmt.Sprintf("The polling strategy has succeeded in fetching the trace from the data store. Reason: %s", reason),
CreatedAt: time.Now(),
DataStoreConnection: model.ConnectionResult{},
Polling: model.PollingInfo{},
Expand Down Expand Up @@ -277,7 +277,7 @@ func TraceFetchingSuccess(testID id.ID, runID int) model.TestRunEvent {
}
}

func TraceFetchingError(testID id.ID, runID int, connectionResult model.ConnectionResult, err error) model.TestRunEvent {
func TraceFetchingError(testID id.ID, runID int, err error) model.TestRunEvent {
return model.TestRunEvent{
TestID: testID,
RunID: runID,
Expand All @@ -286,7 +286,7 @@ func TraceFetchingError(testID id.ID, runID int, connectionResult model.Connecti
Title: "Trace fetching failed",
Description: fmt.Sprintf("The trace was not able to be fetched from the data store. Error: %s", err),
CreatedAt: time.Now(),
DataStoreConnection: connectionResult,
DataStoreConnection: model.ConnectionResult{},
Polling: model.PollingInfo{},
Outputs: []model.OutputInfo{},
}
Expand Down Expand Up @@ -382,12 +382,12 @@ func TestSpecsRunStart(testID id.ID, runID int) model.TestRunEvent {
}
}

func TestSpecsAssertionError(testID id.ID, runID int, err error, spanID string, assertion string) model.TestRunEvent {
func TestSpecsAssertionWarning(testID id.ID, runID int, err error, spanID string, assertion string) model.TestRunEvent {
return model.TestRunEvent{
TestID: testID,
RunID: runID,
Stage: model.StageTest,
Type: "TEST_SPECS_ASSERTION_ERROR",
Type: "TEST_SPECS_ASSERTION_WARNING",
Title: fmt.Sprintf(`Assertion '%s' failed`, assertion),
Description: fmt.Sprintf(`The assertion '%s' returned an error on span %s. Error: %s`, assertion, spanID, err.Error()),
CreatedAt: time.Now(),
Expand Down
76 changes: 51 additions & 25 deletions web/src/components/RunEvents/RunEventPolling.tsx
@@ -1,31 +1,57 @@
import {Typography} from 'antd';
import {capitalize} from 'lodash';
import {useSettingsValues} from 'providers/SettingsValues/SettingsValues.provider';
import {IPropsEvent} from './RunEvent';
import * as S from './RunEvents.styled';

const RunEventPolling = ({event}: IPropsEvent) => (
<S.EventContainer>
<S.Dot $logLevel={event.logLevel} />
<Typography.Title level={3}>{event.title}</Typography.Title>
<Typography.Text>{event.description}</Typography.Text>
{!!event.polling && (
<S.Content>
<S.Column>
<S.InfoIcon />
<div>
<Typography.Title level={3}>Number of spans collected:</Typography.Title>
<Typography.Text>{event.polling.periodic.numberSpans}</Typography.Text>
</div>
</S.Column>
<S.Column>
<S.InfoIcon />
<div>
<Typography.Title level={3}>Iteration number:</Typography.Title>
<Typography.Text>{event.polling.periodic.numberIterations}</Typography.Text>
</div>
</S.Column>
</S.Content>
)}
</S.EventContainer>
);
const RunEventPolling = ({event}: IPropsEvent) => {
const {
pollingProfile: {
strategy,
periodic: {retryDelay = '', timeout = ''},
},
} = useSettingsValues();

return (
<S.EventContainer>
<S.Dot $logLevel={event.logLevel} />
<Typography.Title level={3}>{event.title}</Typography.Title>
<Typography.Text>{event.description}</Typography.Text>
{!!event.polling && (
<S.Content>
<S.Column>
<S.InfoIcon />
<div>
<Typography.Title level={3}>Polling profile configuration:</Typography.Title>
<S.Column>
<Typography.Text>{capitalize(strategy)} strategy</Typography.Text>
</S.Column>
<S.Column>
<Typography.Text>{retryDelay} of retry delay</Typography.Text>
</S.Column>
<S.Column>
<Typography.Text>{timeout} of timeout</Typography.Text>
</S.Column>
</div>
</S.Column>
<S.Column>
<S.InfoIcon />
<div>
<Typography.Title level={3}>Number of spans collected:</Typography.Title>
<Typography.Text>{event.polling.periodic.numberSpans}</Typography.Text>
</div>
</S.Column>
<S.Column>
<S.InfoIcon />
<div>
<Typography.Title level={3}>Iteration number:</Typography.Title>
<Typography.Text>{event.polling.periodic.numberIterations}</Typography.Text>
</div>
</S.Column>
</S.Content>
)}
</S.EventContainer>
);
};

export default RunEventPolling;

0 comments on commit daf930c

Please sign in to comment.