Skip to content

Commit

Permalink
feat(server): allow tests to be stopped in all test pipeline steps (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
schoren committed Jan 9, 2024
1 parent 3a75722 commit db11183
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 0 deletions.
6 changes: 6 additions & 0 deletions server/executor/assertion_runner.go
Expand Up @@ -45,6 +45,12 @@ func (e *defaultAssertionRunner) SetOutputQueue(pipeline.Enqueuer[Job]) {
}

func (e *defaultAssertionRunner) ProcessItem(ctx context.Context, job Job) {
select {
default:
case <-ctx.Done():
return
}

run, err := e.runAssertionsAndUpdateResult(ctx, job)

log.Printf("[AssertionRunner] Test %s Run %d: update channel start\n", job.Test.ID, job.Run.ID)
Expand Down
6 changes: 6 additions & 0 deletions server/executor/linter_runner.go
Expand Up @@ -45,6 +45,12 @@ func (e *defaultLinterRunner) SetOutputQueue(queue pipeline.Enqueuer[Job]) {
}

func (e *defaultLinterRunner) ProcessItem(ctx context.Context, job Job) {
select {
default:
case <-ctx.Done():
return
}

lintResource := e.analyzerGetter.GetDefault(ctx)

shouldSkip := lintResource.ShouldSkip()
Expand Down
6 changes: 6 additions & 0 deletions server/executor/tracepollerworker/evaluator_worker.go
Expand Up @@ -61,6 +61,12 @@ func (w *tracePollerEvaluatorWorker) SetOutputQueue(queue pipeline.Enqueuer[exec
}

func (w *tracePollerEvaluatorWorker) ProcessItem(ctx context.Context, job executor.Job) {
select {
default:
case <-ctx.Done():
return
}

ctx, span := w.state.tracer.Start(ctx, "Evaluating trace")
defer span.End()

Expand Down
6 changes: 6 additions & 0 deletions server/executor/tracepollerworker/fetcher_worker.go
Expand Up @@ -53,6 +53,12 @@ func (w *traceFetcherWorker) ProcessItem(ctx context.Context, job executor.Job)
return
}

select {
default:
case <-ctx.Done():
return
}

ctx, span := w.state.tracer.Start(ctx, "Fetching trace")
defer span.End()

Expand Down
6 changes: 6 additions & 0 deletions server/executor/trigger_executer_worker.go
Expand Up @@ -62,6 +62,12 @@ func (r triggerExecuterWorker) ProcessItem(ctx context.Context, job Job) {
return
}

select {
default:
case <-ctx.Done():
return
}

err := r.eventEmitter.Emit(ctx, events.TriggerExecutionStart(job.Run.TestID, job.Run.ID))
if err != nil {
r.handleError(job.Run, err)
Expand Down
6 changes: 6 additions & 0 deletions server/executor/trigger_resolver_worker.go
Expand Up @@ -77,6 +77,12 @@ func (r triggerResolverWorker) traceDB(ctx context.Context) (tracedb.TraceDB, er
}

func (r triggerResolverWorker) ProcessItem(ctx context.Context, job Job) {
select {
default:
case <-ctx.Done():
return
}

ctx, pollingSpan := r.tracer.Start(ctx, "Resolve trigger")
defer pollingSpan.End()

Expand Down
6 changes: 6 additions & 0 deletions server/executor/trigger_result_processor_worker.go
Expand Up @@ -58,6 +58,12 @@ func (r triggerResultProcessorWorker) handleError(run test.Run, err error) {
}

func (r triggerResultProcessorWorker) ProcessItem(ctx context.Context, job Job) {
select {
default:
case <-ctx.Done():
return
}

ctx, pollingSpan := r.tracer.Start(ctx, "Start processing trigger response")
defer pollingSpan.End()

Expand Down

0 comments on commit db11183

Please sign in to comment.