Skip to content

Commit

Permalink
chore(server): split trigger worker into 3 workers (#3120)
Browse files Browse the repository at this point in the history
* chore(server): split trigger worker into 3 workers

* rename runner file and delete unit test

* fix test

* fix(trigger-worker): trace-id injection

* address sebastian's comment

* fix: instrumentation
  • Loading branch information
mathnogueira committed Sep 5, 2023
1 parent bdc477b commit a784025
Show file tree
Hide file tree
Showing 14 changed files with 417 additions and 697 deletions.
20 changes: 17 additions & 3 deletions server/app/test_pipeline.go
Expand Up @@ -68,16 +68,28 @@ func buildTestPipeline(
eventEmitter,
)

runner := executor.NewPersistentRunner(
triggerResolverWorker := executor.NewTriggerResolverWorker(
triggerRegistry,
execTestUpdater,
tracer,
subscriptionManager,
tracedbFactory,
dsRepo,
eventEmitter,
)

triggerExecuterWorker := executor.NewTriggerExecuterWorker(
triggerRegistry,
execTestUpdater,
tracer,
eventEmitter,
)

triggerResultProcessorWorker := executor.NewTriggerResultProcessorWorker(
tracer,
subscriptionManager,
eventEmitter,
)

cancelRunHandlerFn := executor.HandleRunCancelation(execTestUpdater, tracer, eventEmitter)

queueBuilder := executor.NewQueueBuilder().
Expand All @@ -92,7 +104,9 @@ func buildTestPipeline(
pgQueue := executor.NewPostgresQueueDriver(pool)

pipeline := executor.NewPipeline(queueBuilder,
executor.PipelineStep{Processor: runner, Driver: pgQueue.Channel("runner")},
executor.PipelineStep{Processor: triggerResolverWorker, Driver: pgQueue.Channel("trigger_resolve")},
executor.PipelineStep{Processor: triggerExecuterWorker, Driver: pgQueue.Channel("trigger_execute")},
executor.PipelineStep{Processor: triggerResultProcessorWorker, Driver: pgQueue.Channel("trigger_result")},
executor.PipelineStep{Processor: tracePoller, Driver: pgQueue.Channel("tracePoller")},
executor.PipelineStep{Processor: linterRunner, Driver: pgQueue.Channel("linterRunner")},
executor.PipelineStep{Processor: assertionRunner, Driver: pgQueue.Channel("assertionRunner")},
Expand Down
3 changes: 2 additions & 1 deletion server/executor/poller_executor_test.go
Expand Up @@ -26,7 +26,8 @@ import (
)

var (
randomIDGenerator = id.NewRandGenerator()
randomIDGenerator = id.NewRandGenerator()
noError error = nil
)

func Test_PollerExecutor_ExecuteRequest_NoRootSpan_NoSpanCase(t *testing.T) {
Expand Down
303 changes: 0 additions & 303 deletions server/executor/runner.go

This file was deleted.

0 comments on commit a784025

Please sign in to comment.