Skip to content

Commit

Permalink
fix: move trigger state check to post process run and fix small issues (
Browse files Browse the repository at this point in the history
#3137)

* fix(trigger): process run state after trigger on processor worker

* fix: some bugs found in tracetest

* fix: tracetest trigger executor
  • Loading branch information
mathnogueira committed Sep 8, 2023
1 parent 4e0515c commit 8489e0d
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 45 deletions.
2 changes: 1 addition & 1 deletion api/tests.yaml
Expand Up @@ -28,7 +28,7 @@ components:
type: object
properties:
id:
type: string
type: integer
readOnly: true
name:
type: string
Expand Down
1 change: 1 addition & 0 deletions server/app/test_pipeline.go
Expand Up @@ -88,6 +88,7 @@ func buildTestPipeline(
tracer,
subscriptionManager,
eventEmitter,
execTestUpdater,
)

cancelRunHandlerFn := executor.HandleRunCancelation(execTestUpdater, tracer, eventEmitter)
Expand Down
20 changes: 2 additions & 18 deletions server/executor/trigger_executer_worker.go
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"strings"

"github.com/kubeshop/tracetest/server/analytics"
triggerer "github.com/kubeshop/tracetest/server/executor/trigger"
"github.com/kubeshop/tracetest/server/model/events"
"github.com/kubeshop/tracetest/server/test"
Expand Down Expand Up @@ -80,30 +79,15 @@ func (r triggerExecuterWorker) ProcessItem(ctx context.Context, job Job) {
}
}

run = r.handleExecutionResult(run, response, err)
run.SpanID = response.SpanID

run.TriggerResult = response.Result
run = run.TriggerCompleted(run.TriggerResult)
r.handleDBError(run, r.updater.Update(ctx, run))

job.Run = run
r.outputQueue.Enqueue(ctx, job)
}

func (r triggerExecuterWorker) handleExecutionResult(run test.Run, response triggerer.Response, err error) test.Run {
run = run.TriggerCompleted(response.Result)
if err != nil {
run = run.TriggerFailed(err)

analytics.SendEvent("test_run_finished", "error", "", &map[string]string{
"finalState": string(run.State),
})

return run
}

return run.SuccessfullyTriggered()
}

func isConnectionError(err error) bool {
for err != nil {
// a dial error means we couldn't open a TCP connection (either host is not available or DNS doesn't exist)
Expand Down
1 change: 0 additions & 1 deletion server/executor/trigger_resolver_worker.go
Expand Up @@ -107,7 +107,6 @@ func (r triggerResolverWorker) ProcessItem(ctx context.Context, job Job) {
}}

executor := expression.NewExecutor(ds...)

triggerOptions := &triggerer.ResolveOptions{
Executor: executor,
}
Expand Down
21 changes: 21 additions & 0 deletions server/executor/trigger_result_processor_worker.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/kubeshop/tracetest/server/analytics"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/model/events"
"github.com/kubeshop/tracetest/server/subscription"
Expand All @@ -21,11 +22,13 @@ func NewTriggerResultProcessorWorker(
tracer trace.Tracer,
subscriptionManager *subscription.Manager,
eventEmitter EventEmitter,
updater RunUpdater,
) *triggerResultProcessorWorker {
return &triggerResultProcessorWorker{
tracer: tracer,
subscriptionManager: subscriptionManager,
eventEmitter: eventEmitter,
updater: updater,
}
}

Expand All @@ -34,6 +37,7 @@ type triggerResultProcessorWorker struct {
subscriptionManager *subscription.Manager
eventEmitter EventEmitter
outputQueue Enqueuer
updater RunUpdater
}

func (r *triggerResultProcessorWorker) SetOutputQueue(queue Enqueuer) {
Expand All @@ -56,6 +60,7 @@ func (r triggerResultProcessorWorker) ProcessItem(ctx context.Context, job Job)
ctx, pollingSpan := r.tracer.Start(ctx, "Start processing trigger response")
defer pollingSpan.End()

job.Run = r.handleExecutionResult(job.Run)
triggerResult := job.Run.TriggerResult
if triggerResult.Error != nil {
err := triggerResult.Error.Error()
Expand Down Expand Up @@ -85,6 +90,8 @@ func (r triggerResultProcessorWorker) ProcessItem(ctx context.Context, job Job)
}
}

r.handleDBError(job.Run, r.updater.Update(ctx, job.Run))

r.outputQueue.Enqueue(ctx, job)
}

Expand All @@ -109,3 +116,17 @@ func (r triggerResultProcessorWorker) emitMismatchEndpointEvent(ctx context.Cont
r.handleError(job.Run, emitErr)
}
}

func (r triggerResultProcessorWorker) handleExecutionResult(run test.Run) test.Run {
if run.TriggerResult.Error != nil {
run = run.TriggerFailed(fmt.Errorf(run.TriggerResult.Error.ErrorMessage))

analytics.SendEvent("test_run_finished", "error", "", &map[string]string{
"finalState": string(run.State),
})

return run
}

return run.SuccessfullyTriggered()
}
24 changes: 0 additions & 24 deletions server/executor/workers/trigger_preparation_worker.go

This file was deleted.

2 changes: 1 addition & 1 deletion server/test/trigger/trigger.go
Expand Up @@ -15,7 +15,7 @@ type (

TriggerResult struct {
Type TriggerType `json:"type"`
HTTP *HTTPResponse `json:"httpRequest,omitempty"`
HTTP *HTTPResponse `json:"http,omitempty"`
GRPC *GRPCResponse `json:"grpc,omitempty"`
TraceID *TraceIDResponse `json:"traceid,omitempty"`
Kafka *KafkaResponse `json:"kafka,omitempty"`
Expand Down

0 comments on commit 8489e0d

Please sign in to comment.