From 48890aa89acd550bdc0b98df26382e9f5724c4d1 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Tue, 2 Jun 2026 10:16:28 -0700 Subject: [PATCH] feat(buildrunner): poll-driven buildsignal + pipeline wiring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary ### Why? The orchestrator's build stage needs to drive the `BuildRunner` contract end-to-end: trigger the runner, persist the result, and poll `Status` until terminal so the batch state machine can react. Polling has to behave like the rest of the pipeline (queue-driven, partition-isolated, restart-safe) rather than running as an in-process timer loop. ### What? Stacks on top of the BuildRunner interface, noop, and `PublishAfter` PRs. Wires the runner into the orchestrator pipeline. The build poll loop runs as queue traffic inside the existing `buildsignal` consumer (no separate stage). On each delivery it loads the `Build` from storage, calls `BuildRunner.Status`, persists the result via `BuildStore.UpdateStatus`, publishes the batch ID to `speculate` so the state machine re-evaluates, and re-publishes itself via `Publisher.PublishAfter` until the build reaches a terminal state. A webhook-capable backend can publish into the same topic — the consumer cannot tell a poll-driven message from a push. Only the build **ID** travels on the queue (`entity.BuildID`); the consumer reloads the full `Build` from `BuildStore`, keeping the message small and storage the single source of truth — the same ID-on-the-queue, load-from-storage pattern the rest of the pipeline already uses for batches and requests. The controllers consume the runner's `entity.BuildID` signatures (`Trigger` returns one; `Status` takes one). Pieces: - `orchestrator/controller/build`: assembles `base` from `batch.Dependencies` and `head` from `batch.Contains`, calls `Trigger`, persists the initial `Build{Accepted}` via `BuildStore.Create` (`ErrAlreadyExists` is swallowed for redelivery), publishes the build ID to `buildsignal`. - `orchestrator/controller/buildsignal`: the polling consumer described above. It loads the `Build` by ID, then polls. `PollDelayAcceptedMs=5000`, `PollDelayRunningMs=2000` by default (vars so tests can override; a TODO notes these should move into the `queueconfig` extension). Error classification: only the `PublishAfter` re-schedule is wrapped retryable (`errs.NewRetryableError`) — it is the poll loop's heartbeat, so a transient enqueue blip nacks and replays (up to `MaxAttempts`) rather than rejecting the loop's only live message straight to DLQ. Deserialize, the `Build` load, `Status`, `UpdateStatus`, and the speculate publish stay non-retryable and reject to DLQ on first failure, where an operational republish is the recovery path. - `example/server/orchestrator/main.go`: passes the `BuildRunner` to both `build.NewController` and `buildsignal.NewController`; pipeline diagram updated. - root `BUILD.bazel`: adds `# gazelle:exclude .claude` so gazelle does not index nested worktrees as duplicate rule definitions and corrupt the canonical BUILD files. ## Test Plan - ✅ `bazel test //extension/buildrunner/... //orchestrator/controller/build/... //orchestrator/controller/buildsignal/... //extension/queue/...` — all pass. - ✅ `make fmt lint check-tidy check-gazelle check-mocks` — clean. - ✅ `make build` — all targets compile. - New coverage: build controller persist+publish path (with `ErrAlreadyExists` swallow), buildsignal poll loop (terminal forwards to speculate, non-terminal re-publishes via `PublishAfter` with per-status delay, retryable re-publish failure, non-retryable build-load / `Status` / `UpdateStatus` failures reject to DLQ). --- BUILD.bazel | 4 + core/consumer/registry.go | 6 +- example/server/orchestrator/BUILD.bazel | 2 + example/server/orchestrator/main.go | 20 +- orchestrator/controller/build/BUILD.bazel | 5 + orchestrator/controller/build/build.go | 80 ++++- orchestrator/controller/build/build_test.go | 207 ++++++++++++- .../controller/buildsignal/BUILD.bazel | 3 + .../controller/buildsignal/buildsignal.go | 158 ++++++++-- .../buildsignal/buildsignal_test.go | 283 +++++++++++++----- 10 files changed, 634 insertions(+), 134 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 45300fde..384eafe9 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2,6 +2,10 @@ load("@gazelle//:def.bzl", "gazelle") # gazelle:prefix github.com/uber/submitqueue +# Exclude nested worktrees (created under .claude/worktrees) so gazelle does not +# index them as duplicate rule definitions and corrupt the canonical BUILD files. +# gazelle:exclude .claude + # Resolve protobuf import ambiguities - use the actual protopb packages, not the proto aliases # gazelle:resolve go github.com/uber/submitqueue/gateway/protopb //gateway/protopb # gazelle:resolve go github.com/uber/submitqueue/orchestrator/protopb //orchestrator/protopb diff --git a/core/consumer/registry.go b/core/consumer/registry.go index 64955337..ce817293 100644 --- a/core/consumer/registry.go +++ b/core/consumer/registry.go @@ -40,7 +40,11 @@ const ( TopicKeySpeculate TopicKey = "speculate" // TopicKeyBuild is the pipeline stage where speculated batches are published for builds. TopicKeyBuild TopicKey = "build" - // TopicKeyBuildSignal is the pipeline stage where builds are published for build signal processing. + // TopicKeyBuildSignal is the polling stage for triggered builds. Each + // message carries a Build; the consumer calls BuildRunner.Status, + // persists the latest status, publishes the batch ID to TopicKeySpeculate + // so the state machine re-evaluates, and re-publishes itself via + // PublishAfter when the build has not yet reached a terminal state. TopicKeyBuildSignal TopicKey = "buildsignal" // TopicKeyMerge is the pipeline stage where speculated batches are published for merging. TopicKeyMerge TopicKey = "merge" diff --git a/example/server/orchestrator/BUILD.bazel b/example/server/orchestrator/BUILD.bazel index 12cf41bc..f7538e42 100644 --- a/example/server/orchestrator/BUILD.bazel +++ b/example/server/orchestrator/BUILD.bazel @@ -14,6 +14,8 @@ go_library( "//core/consumer", "//core/httpclient", "//entity", + "//extension/buildrunner", + "//extension/buildrunner/noop", "//extension/changeprovider", "//extension/changeprovider/github", "//extension/changestore", diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index d903741e..48ec4994 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -33,6 +33,8 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/httpclient" "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/buildrunner" + buildnoop "github.com/uber/submitqueue/extension/buildrunner/noop" "github.com/uber/submitqueue/extension/changeprovider" githubprovider "github.com/uber/submitqueue/extension/changeprovider/github" "github.com/uber/submitqueue/extension/changestore" @@ -216,8 +218,12 @@ func run() error { return fmt.Errorf("failed to create pusher: %w", err) } + // Create build runner. The noop runner is the pass-through default + // (every build immediately succeeds) until a real backend is wired in. + br := buildnoop.New() + // Register controllers - if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, cnt, store, changeStore); err != nil { + if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, br, cnt, store, changeStore); err != nil { return err } @@ -397,12 +403,12 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe // Pipeline: // // request → validate → batch → score → speculate → build → buildsignal ─┐ -// ↑ ↘ │ -// │ merge → conclude │ -// │ │ │ -// └────────┴────────────────────────┘ +// ↑ ↘ ↻ poll │ +// │ merge → conclude │ +// │ │ │ +// └────────┴───────────────────────┘ -func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error { +func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error { requestController := start.NewController( logger, scope, @@ -488,6 +494,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, store, + br, registry, consumer.TopicKeyBuild, "orchestrator-build", @@ -500,6 +507,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, store, + br, registry, consumer.TopicKeyBuildSignal, "orchestrator-buildsignal", diff --git a/orchestrator/controller/build/BUILD.bazel b/orchestrator/controller/build/BUILD.bazel index 50c0f3e4..a7ffcd70 100644 --- a/orchestrator/controller/build/BUILD.bazel +++ b/orchestrator/controller/build/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//core/metrics", "//entity", "//entity/queue", + "//extension/buildrunner", "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", @@ -25,7 +26,11 @@ go_test( "//core/errs", "//entity", "//entity/queue", + "//extension/buildrunner", + "//extension/buildrunner/mock", + "//extension/buildrunner/noop", "//extension/queue/mock", + "//extension/storage", "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/orchestrator/controller/build/build.go b/orchestrator/controller/build/build.go index 0a450872..de61374b 100644 --- a/orchestrator/controller/build/build.go +++ b/orchestrator/controller/build/build.go @@ -16,6 +16,7 @@ package build import ( "context" + "errors" "fmt" "github.com/uber-go/tally/v4" @@ -23,6 +24,7 @@ import ( "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/buildrunner" "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -34,6 +36,7 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage + buildRunner buildrunner.BuildRunner registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -47,6 +50,7 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, + buildRunner buildrunner.BuildRunner, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -55,6 +59,7 @@ func NewController( logger: logger.Named("build_controller"), metricsScope: scope.SubScope("build_controller"), store: store, + buildRunner: buildRunner, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -95,17 +100,45 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r "partition_key", msg.PartitionKey, ) - // TODO: Add build logic - // - Trigger CI build - // - Track build status + // Assemble base (dependency batches in order) and head (this batch). + base, err := c.collectChanges(ctx, batch.Dependencies) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to assemble base changes for batch %s: %w", batch.ID, err) + } + head, err := c.collectChanges(ctx, []string{batch.ID}) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to assemble head changes for batch %s: %w", batch.ID, err) + } + + // Trigger the build with the configured build manager. metadata is nil + // until a caller-supplied source materializes (e.g. requester / ticket + // pulled off the originating LandRequest). + buildID, err := c.buildRunner.Trigger(ctx, batch.Queue, base, head, nil) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "trigger_errors", 1) + return fmt.Errorf("failed to trigger build for batch %s: %w", batch.ID, err) + } build := entity.Build{ - ID: batch.ID, - BatchID: batch.ID, - Status: entity.BuildStatusAccepted, + ID: buildID.ID, + BatchID: batch.ID, + SpeculationPath: entity.SpeculationPathInfo{Base: append([]string{}, batch.Dependencies...)}, + Status: entity.BuildStatusAccepted, + } + + // Persist the initial Build snapshot so the buildsignal poll loop has a + // row to UpdateStatus against. ErrAlreadyExists is benign — a redelivery + // of this message after a previous successful Create. + if err := c.store.GetBuildStore().Create(ctx, build); err != nil && !errors.Is(err, storage.ErrAlreadyExists) { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to persist build %s: %w", build.ID, err) } - // Publish build to build signal topic + // Hand off to the buildsignal poll loop; it calls Status, updates the + // persisted Build, publishes to speculate, and re-publishes itself via + // PublishAfter until terminal. if err := c.publish(ctx, consumer.TopicKeyBuildSignal, build); err != nil { metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to buildsignal: %w", err) @@ -114,17 +147,44 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r c.logger.Infow("published build to buildsignal", "batch_id", batch.ID, "build_id", build.ID, + "status", string(build.Status), "topic_key", consumer.TopicKeyBuildSignal, ) return nil // Success - message will be acked } -// publish publishes a build to the specified topic key. +// collectChanges loads each batch by ID and concatenates the Change values +// from its contained requests in batch order. Used to build the base +// (dependency batches) and head (this batch) inputs to BuildRunner.Trigger. +func (c *Controller) collectChanges(ctx context.Context, batchIDs []string) ([]entity.Change, error) { + if len(batchIDs) == 0 { + return nil, nil + } + var changes []entity.Change + for _, bID := range batchIDs { + b, err := c.store.GetBatchStore().Get(ctx, bID) + if err != nil { + return nil, fmt.Errorf("failed to get batch %s: %w", bID, err) + } + for _, reqID := range b.Contains { + req, err := c.store.GetRequestStore().Get(ctx, reqID) + if err != nil { + return nil, fmt.Errorf("failed to get request %s for batch %s: %w", reqID, bID, err) + } + changes = append(changes, req.Change) + } + } + return changes, nil +} + +// publish publishes a build's ID to the specified topic key. Only the +// identifier travels on the queue; the consumer loads the full Build from +// storage, keeping the message small and the store the single source of truth. func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, build entity.Build) error { - payload, err := build.ToBytes() + payload, err := entity.BuildID{ID: build.ID}.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize build: %w", err) + return fmt.Errorf("failed to serialize build ID: %w", err) } msg := entityqueue.NewMessage(build.ID, payload, build.BatchID, nil) diff --git a/orchestrator/controller/build/build_test.go b/orchestrator/controller/build/build_test.go index 0e595195..a015907a 100644 --- a/orchestrator/controller/build/build_test.go +++ b/orchestrator/controller/build/build_test.go @@ -26,7 +26,11 @@ import ( "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/buildrunner" + buildrunnermock "github.com/uber/submitqueue/extension/buildrunner/mock" + buildnoop "github.com/uber/submitqueue/extension/buildrunner/noop" queuemock "github.com/uber/submitqueue/extension/queue/mock" + "github.com/uber/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" @@ -49,18 +53,31 @@ func testBatch() entity.Batch { } } -// newMockStorage creates a MockStorage with a MockBatchStore that returns the given batch on Get. +// newMockStorage creates a MockStorage with a MockBatchStore that returns the +// given batch on Get, a no-op MockRequestStore, and a MockBuildStore that +// accepts any Create call. Tests that care about Create arguments build their +// own MockBuildStore. func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.MockStorage { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() + mockRequestStore := storagemock.NewMockRequestStore(ctrl) + + mockBuildStore := storagemock.NewMockBuildStore(ctrl) + mockBuildStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(mockBuildStore).AnyTimes() return store } -// newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { +// newTestController creates a controller with test dependencies. br is the +// build runner to inject; pass buildnoop.New() for the pass-through default. +// The wired registry exposes only the buildsignal topic — that is what the +// controller publishes to after the RFC refactor. +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, br buildrunner.BuildRunner, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -79,14 +96,14 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock ) require.NoError(t, err) - return NewController(logger, scope, store, registry, consumer.TopicKeyBuild, "orchestrator-build") + return NewController(logger, scope, store, br, registry, consumer.TopicKeyBuild, "orchestrator-build") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) batch := testBatch() store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, nil) + controller := newTestController(t, ctrl, store, buildnoop.New(), nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyBuild, controller.TopicKey()) @@ -99,7 +116,7 @@ func TestController_Process_Success(t *testing.T) { batch := testBatch() store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, nil) + controller := newTestController(t, ctrl, store, buildnoop.New(), nil) msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -110,6 +127,176 @@ func TestController_Process_Success(t *testing.T) { require.NoError(t, err) } +// TestController_Process_TriggersWithBaseAndHead verifies the controller +// splits the input to BuildRunner.Trigger into base (dependency batches in +// order, concatenated) and head (this batch's changes in order), persists +// the initial Accepted Build, and publishes it to the buildsignal topic. +func TestController_Process_TriggersWithBaseAndHead(t *testing.T) { + ctrl := gomock.NewController(t) + + depBatch := entity.Batch{ + ID: "test-queue/batch/dep", + Queue: "test-queue", + Contains: []string{"test-queue/dep-1"}, + } + headBatch := entity.Batch{ + ID: "test-queue/batch/head", + Queue: "test-queue", + State: entity.BatchStateSpeculating, + Version: 1, + Dependencies: []string{depBatch.ID}, + Contains: []string{"test-queue/head-1", "test-queue/head-2"}, + } + depReq := entity.Request{ID: "test-queue/dep-1", Change: entity.Change{URIs: []string{"github://o/r/pull/9/aaa"}}} + head1 := entity.Request{ID: "test-queue/head-1", Change: entity.Change{URIs: []string{"github://o/r/pull/1/aaa"}}} + head2 := entity.Request{ID: "test-queue/head-2", Change: entity.Change{URIs: []string{"github://o/r/pull/2/bbb"}}} + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), headBatch.ID).Return(headBatch, nil).AnyTimes() + mockBatchStore.EXPECT().Get(gomock.Any(), depBatch.ID).Return(depBatch, nil).AnyTimes() + mockRequestStore := storagemock.NewMockRequestStore(ctrl) + mockRequestStore.EXPECT().Get(gomock.Any(), depReq.ID).Return(depReq, nil) + mockRequestStore.EXPECT().Get(gomock.Any(), head1.ID).Return(head1, nil) + mockRequestStore.EXPECT().Get(gomock.Any(), head2.ID).Return(head2, nil) + + var created entity.Build + mockBuildStore := storagemock.NewMockBuildStore(ctrl) + mockBuildStore.EXPECT().Create(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, b entity.Build) error { + created = b + return nil + }, + ).Times(1) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(mockBuildStore).AnyTimes() + + br := buildrunnermock.NewMockBuildRunner(ctrl) + wantBase := []entity.Change{depReq.Change} + wantHead := []entity.Change{head1.Change, head2.Change} + br.EXPECT().Trigger(gomock.Any(), headBatch.Queue, wantBase, wantHead, gomock.Nil()).Return(entity.BuildID{ID: "build-xyz"}, nil) + + var publishedTopic string + var published entity.BuildID + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, msg queue.Message) error { + publishedTopic = topic + bid, err := entity.BuildIDFromBytes(msg.Payload) + require.NoError(t, err) + published = bid + return nil + }, + ) + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}}, + ) + require.NoError(t, err) + + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, br, registry, consumer.TopicKeyBuild, "orchestrator-build") + + msg := queue.NewMessage(headBatch.ID, batchIDPayload(t, headBatch.ID), headBatch.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + require.NoError(t, controller.Process(context.Background(), delivery)) + + // Only the build ID is published to buildsignal. + assert.Equal(t, "buildsignal", publishedTopic) + assert.Equal(t, "build-xyz", published.ID) + + // The full Build is persisted to storage (the source of truth the poll + // loop reloads), and its ID matches what was published. + assert.Equal(t, "build-xyz", created.ID) + assert.Equal(t, headBatch.ID, created.BatchID) + assert.Equal(t, entity.BuildStatusAccepted, created.Status) + assert.Equal(t, []string{depBatch.ID}, created.SpeculationPath.Base) + assert.Equal(t, published.ID, created.ID) +} + +// TestController_Process_BuildStoreAlreadyExistsIsSwallowed covers the +// redelivery case: Create returns ErrAlreadyExists, the controller proceeds +// to publish to buildsignal anyway. The polling loop will pick up the +// existing row via UpdateStatus. +func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) { + ctrl := gomock.NewController(t) + + batch := testBatch() + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() + mockBuildStore := storagemock.NewMockBuildStore(ctrl) + mockBuildStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(storage.ErrAlreadyExists) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(storagemock.NewMockRequestStore(ctrl)).AnyTimes() + store.EXPECT().GetBuildStore().Return(mockBuildStore).AnyTimes() + + br := buildrunnermock.NewMockBuildRunner(ctrl) + br.EXPECT().Trigger(gomock.Any(), batch.Queue, gomock.Any(), gomock.Any(), gomock.Any()).Return(entity.BuildID{ID: "build-dup"}, nil) + + publishCalled := false + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), "buildsignal", gomock.Any()).DoAndReturn( + func(_ context.Context, _ string, _ queue.Message) error { + publishCalled = true + return nil + }, + ).Times(1) + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}}, + ) + require.NoError(t, err) + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, br, registry, consumer.TopicKeyBuild, "orchestrator-build") + + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + require.NoError(t, controller.Process(context.Background(), delivery)) + assert.True(t, publishCalled, "publish to buildsignal must run even when Create reports ErrAlreadyExists") +} + +// TestController_Process_TriggerFailure verifies a build-runner failure is +// surfaced as an error (nack) and nothing is persisted or published. +func TestController_Process_TriggerFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + batch := testBatch() + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(storagemock.NewMockRequestStore(ctrl)).AnyTimes() + // No build store expectation: Trigger failure must short-circuit before Create. + + br := buildrunnermock.NewMockBuildRunner(ctrl) + br.EXPECT().Trigger(gomock.Any(), batch.Queue, gomock.Any(), gomock.Any(), gomock.Any()). + Return(entity.BuildID{}, fmt.Errorf("provider down")) + + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: queuemock.NewMockQueue(ctrl)}}, + ) + require.NoError(t, err) + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, br, registry, consumer.TopicKeyBuild, "orchestrator-build") + + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + require.Error(t, controller.Process(context.Background(), delivery)) +} + func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) @@ -117,8 +304,10 @@ func TestController_Process_StorageFailure(t *testing.T) { mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("db connection lost")) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(storagemock.NewMockRequestStore(ctrl)).AnyTimes() + store.EXPECT().GetBuildStore().Return(storagemock.NewMockBuildStore(ctrl)).AnyTimes() - controller := newTestController(t, ctrl, store, nil) + controller := newTestController(t, ctrl, store, buildnoop.New(), nil) msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -135,7 +324,7 @@ func TestController_Process_PublishFailure(t *testing.T) { batch := testBatch() store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) + controller := newTestController(t, ctrl, store, buildnoop.New(), fmt.Errorf("publish failed")) msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -150,7 +339,7 @@ func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) batch := testBatch() store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, nil) + controller := newTestController(t, ctrl, store, buildnoop.New(), nil) var _ consumer.Controller = controller } diff --git a/orchestrator/controller/buildsignal/BUILD.bazel b/orchestrator/controller/buildsignal/BUILD.bazel index df5bb9dd..4c8d48e8 100644 --- a/orchestrator/controller/buildsignal/BUILD.bazel +++ b/orchestrator/controller/buildsignal/BUILD.bazel @@ -7,9 +7,11 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", + "//core/errs", "//core/metrics", "//entity", "//entity/queue", + "//extension/buildrunner", "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", @@ -25,6 +27,7 @@ go_test( "//core/errs", "//entity", "//entity/queue", + "//extension/buildrunner/mock", "//extension/queue/mock", "//extension/storage/mock", "@com_github_stretchr_testify//assert", diff --git a/orchestrator/controller/buildsignal/buildsignal.go b/orchestrator/controller/buildsignal/buildsignal.go index c7b92e53..05ac22de 100644 --- a/orchestrator/controller/buildsignal/buildsignal.go +++ b/orchestrator/controller/buildsignal/buildsignal.go @@ -12,6 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package buildsignal implements the build poll loop. Each message carries +// a Build; the controller calls BuildRunner.Status, writes the latest +// status to the BuildStore, publishes the batch ID to TopicKeySpeculate +// so the state machine re-evaluates, and re-publishes itself via +// PublishAfter when the build has not yet reached a terminal state. Each +// buildID partitions independently, so slow polls on one build do not +// block others. A webhook-capable backend can publish into this same +// topic — the controller cannot tell a poll-driven message from a push. package buildsignal import ( @@ -20,20 +28,37 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/buildrunner" "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) -// Controller handles build signal queue messages. -// It consumes builds from the build signal topic and publishes batch results to the speculate stage only if the build has reached a terminal state. -// Implements consumer.Controller interface for integration with the consumer. +// Poll delays for non-terminal statuses. Vars (not consts) so tests can +// shorten them; the orchestrator always uses the defaults. +// +// TODO: make these poll delays configurable per queue via the queueconfig +// extension instead of package-level vars, so operators can tune poll cadence +// without a code change. +var ( + // PollDelayAcceptedMs is the delay between Status calls while the build + // is queued by the runner but has not started executing. + PollDelayAcceptedMs int64 = 5000 + // PollDelayRunningMs is the delay between Status calls while the build + // is executing. + PollDelayRunningMs int64 = 2000 +) + +// Controller consumes build signal messages, polls BuildRunner.Status, +// persists the result, and drives the polling loop. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage + buildRunner buildrunner.BuildRunner registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -47,6 +72,7 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, + buildRunner buildrunner.BuildRunner, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -55,15 +81,26 @@ func NewController( logger: logger.Named("buildsignal_controller"), metricsScope: scope.SubScope("buildsignal_controller"), store: store, + buildRunner: buildRunner, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, } } -// Process processes a build signal delivery from the queue. -// Deserializes the build and publishes a batch result to the speculate topic. -// Returns nil to ack (success), or error to nack (retry). +// Process polls the build's current status, persists it, publishes the +// batch ID to speculate so the state machine re-evaluates, and re-publishes +// a delayed message back to this topic when the build is still in flight. +// Returns nil to ack (success), or error to nack/reject. +// +// Error classification: deserialize, Status, UpdateStatus, and the speculate +// publish stay non-retryable — they reject straight to DLQ on the first +// failure, where the operational republish path is the recovery mechanism. +// Only the PublishAfter self-reschedule is retryable: it is the poll loop's +// heartbeat and runs only after status/persist/speculate have all succeeded, +// so a transient enqueue blip nacks and replays (up to MaxAttempts) rather +// than silently stalling the build, then still falls through to DLQ if it +// persists. func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { const opName = "process" @@ -72,50 +109,117 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r msg := delivery.Message() - // Deserialize build entity - build, err := entity.BuildFromBytes(msg.Payload) + buildID, err := entity.BuildIDFromBytes(msg.Payload) if err != nil { metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize build: %w", err) + // Non-retryable: malformed messages will never succeed. + return fmt.Errorf("failed to deserialize build ID: %w", err) + } + + // Only the build ID travels on the queue; load the full Build from + // storage, which is the single source of truth for its BatchID and the + // snapshot the poll loop updates. + build, err := c.store.GetBuildStore().Get(ctx, buildID.ID) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to get build %s: %w", buildID.ID, err) } - c.logger.Infow("received build signal event", + c.logger.Debugw("polling build status", "build_id", build.ID, "batch_id", build.BatchID, - "status", string(build.Status), "attempt", delivery.Attempt(), "partition_key", msg.PartitionKey, ) - // TODO: Add build signal processing logic - // - Evaluate build result (pass/fail) - // - Update batch state based on build outcome - - // Fetch batch from storage to get the partition key (queue) - batch, err := c.store.GetBatchStore().Get(ctx, build.BatchID) + status, _, err := c.buildRunner.Status(ctx, buildID) if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "status_errors", 1) + return fmt.Errorf("failed to get status for build %s: %w", buildID.ID, err) + } + + build.Status = status + + if err := c.store.GetBuildStore().UpdateStatus(ctx, build.ID, status); err != nil { metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) - return fmt.Errorf("failed to get batch %s: %w", build.BatchID, err) + return fmt.Errorf("failed to update status for build %s: %w", build.ID, err) } - // Publish batch to speculate topic - if err := c.publish(ctx, consumer.TopicKeySpeculate, batch.ID, batch.Queue); err != nil { + // Re-evaluate the batch state machine with the latest build status. + if err := c.publishBatchID(ctx, consumer.TopicKeySpeculate, build.BatchID, msg.PartitionKey); err != nil { metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to speculate: %w", err) } - c.logger.Infow("published batch to speculate", + if status.IsTerminal() { + metrics.NamedCounter(c.metricsScope, opName, "terminal", 1, metrics.NewTag("status", string(status))) + c.logger.Infow("build reached terminal status", + "build_id", build.ID, + "batch_id", build.BatchID, + "status", string(status), + ) + return nil + } + + delayMs := pollDelay(status) + metrics.NamedCounter(c.metricsScope, opName, "rescheduled", 1, metrics.NewTag("status", string(status))) + if err := c.publishBuild(ctx, c.topicKey, build, delayMs); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + // Retryable: this is the poll loop's heartbeat. A transient enqueue + // failure should nack and replay rather than DLQ the only message + // keeping this build's status loop alive. + return errs.NewRetryableError(fmt.Errorf("failed to re-publish to buildsignal: %w", err)) + } + + c.logger.Debugw("rescheduled build status poll", "build_id", build.ID, - "batch_id", batch.ID, - "topic_key", consumer.TopicKeySpeculate, + "status", string(status), + "delay_ms", delayMs, ) + return nil +} - return nil // Success - message will be acked +// pollDelay returns the delay before the next Status call for a non-terminal status. +func pollDelay(status entity.BuildStatus) int64 { + switch status { + case entity.BuildStatusRunning: + return PollDelayRunningMs + default: + // Accepted and any unknown non-terminal state. + return PollDelayAcceptedMs + } +} + +// publishBuild publishes a build's ID to the topic identified by key. delayMs > 0 +// uses PublishAfter; otherwise it uses Publish. Only the identifier travels on +// the queue — the consumer reloads the full Build from storage. +func (c *Controller) publishBuild(ctx context.Context, key consumer.TopicKey, build entity.Build, delayMs int64) error { + payload, err := entity.BuildID{ID: build.ID}.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize build ID: %w", err) + } + + msg := entityqueue.NewMessage(build.ID, payload, build.BatchID, nil) + + q, ok := c.registry.Queue(key) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", key) + } + + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + publisher := q.Publisher() + if delayMs > 0 { + return publisher.PublishAfter(ctx, topicName, msg, delayMs) + } + return publisher.Publish(ctx, topicName, msg) } -// publish publishes a batch ID to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error { +// publishBatchID publishes a batch ID to the topic identified by key. +func (c *Controller) publishBatchID(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error { bid := entity.BatchID{ID: batchID} payload, err := bid.ToBytes() if err != nil { diff --git a/orchestrator/controller/buildsignal/buildsignal_test.go b/orchestrator/controller/buildsignal/buildsignal_test.go index fed0cb40..08498120 100644 --- a/orchestrator/controller/buildsignal/buildsignal_test.go +++ b/orchestrator/controller/buildsignal/buildsignal_test.go @@ -16,7 +16,7 @@ package buildsignal import ( "context" - "fmt" + "errors" "testing" "github.com/stretchr/testify/assert" @@ -25,130 +25,251 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" - "github.com/uber/submitqueue/entity/queue" + entityqueue "github.com/uber/submitqueue/entity/queue" + buildrunnermock "github.com/uber/submitqueue/extension/buildrunner/mock" queuemock "github.com/uber/submitqueue/extension/queue/mock" storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) -// newMockStorage creates a MockStorage with a MockBatchStore that returns a batch for the given batchID. -func newMockStorage(ctrl *gomock.Controller, batchID string) *storagemock.MockStorage { - mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().Get(gomock.Any(), batchID).Return(entity.Batch{ - ID: batchID, - Queue: "test-queue", - }, nil).AnyTimes() - - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() - return store +// testHarness wires a Controller against mock queues for two topic keys +// (buildsignal and speculate) so individual tests can assert which +// Publish / PublishAfter happens. +type testHarness struct { + controller *Controller + br *buildrunnermock.MockBuildRunner + buildStore *storagemock.MockBuildStore + signalPub *queuemock.MockPublisher + speculatePub *queuemock.MockPublisher } -// newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { - logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope +func newTestHarness(t *testing.T, ctrl *gomock.Controller) *testHarness { + br := buildrunnermock.NewMockBuildRunner(ctrl) - mockPub := queuemock.NewMockPublisher(ctrl) - mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, topic string, msg queue.Message) error { - return publishErr - }, - ).AnyTimes() + signalPub := queuemock.NewMockPublisher(ctrl) + signalQ := queuemock.NewMockQueue(ctrl) + signalQ.EXPECT().Publisher().Return(signalPub).AnyTimes() - mockQ := queuemock.NewMockQueue(ctrl) - mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + speculatePub := queuemock.NewMockPublisher(ctrl) + speculateQ := queuemock.NewMockQueue(ctrl) + speculateQ.EXPECT().Publisher().Return(speculatePub).AnyTimes() - registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}}, - ) + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: signalQ}, + {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: speculateQ}, + }) require.NoError(t, err) - return NewController(logger, scope, store, registry, consumer.TopicKeyBuildSignal, "orchestrator-buildsignal") + buildStore := storagemock.NewMockBuildStore(ctrl) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() + + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + br, + registry, + consumer.TopicKeyBuildSignal, + "orchestrator-buildsignal", + ) + return &testHarness{ + controller: c, + br: br, + buildStore: buildStore, + signalPub: signalPub, + speculatePub: speculatePub, + } +} + +// buildDelivery builds a delivery whose payload is the build's ID, matching +// the on-queue contract: only the identifier travels, the consumer loads the +// full Build from storage. +func buildDelivery(t *testing.T, ctrl *gomock.Controller, b entity.Build) consumer.Delivery { + t.Helper() + payload, err := entity.BuildID{ID: b.ID}.ToBytes() + require.NoError(t, err) + msg := entityqueue.NewMessage(b.ID, payload, b.BatchID, nil) + d := queuemock.NewMockDelivery(ctrl) + d.EXPECT().Message().Return(msg).AnyTimes() + d.EXPECT().Attempt().Return(1).AnyTimes() + return d } -func TestNewController(t *testing.T) { +func TestController_Identity(t *testing.T) { ctrl := gomock.NewController(t) - store := newMockStorage(ctrl, "test-queue/batch/1") - controller := newTestController(t, ctrl, store, nil) + h := newTestHarness(t, ctrl) - require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyBuildSignal, controller.TopicKey()) - assert.Equal(t, "orchestrator-buildsignal", controller.ConsumerGroup()) - assert.Equal(t, "buildsignal", controller.Name()) + assert.Equal(t, "buildsignal", h.controller.Name()) + assert.Equal(t, consumer.TopicKeyBuildSignal, h.controller.TopicKey()) + assert.Equal(t, "orchestrator-buildsignal", h.controller.ConsumerGroup()) + + var _ consumer.Controller = h.controller } -func TestController_Process_Success(t *testing.T) { - ctrl := gomock.NewController(t) +// TestController_Process_Terminal verifies a terminal poll persists the +// status, publishes the batch ID to speculate, and does NOT re-publish to +// buildsignal. +func TestController_Process_Terminal(t *testing.T) { + tests := []struct { + name string + status entity.BuildStatus + }{ + {"succeeded", entity.BuildStatusSucceeded}, + {"failed", entity.BuildStatusFailed}, + {"cancelled", entity.BuildStatusCancelled}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + h := newTestHarness(t, ctrl) + + build := entity.Build{ID: "b-1", BatchID: "batch-1", Status: entity.BuildStatusAccepted} - store := newMockStorage(ctrl, "test-queue/batch/1") - controller := newTestController(t, ctrl, store, nil) + h.buildStore.EXPECT().Get(gomock.Any(), build.ID).Return(build, nil) + h.br.EXPECT().Status(gomock.Any(), entity.BuildID{ID: build.ID}).Return(tt.status, entity.BuildMetadata{}, nil) + h.buildStore.EXPECT().UpdateStatus(gomock.Any(), build.ID, tt.status).Return(nil) + h.speculatePub.EXPECT(). + Publish(gomock.Any(), "speculate", gomock.AssignableToTypeOf(entityqueue.Message{})). + DoAndReturn(func(_ context.Context, _ string, msg entityqueue.Message) error { + bid, err := entity.BatchIDFromBytes(msg.Payload) + require.NoError(t, err) + assert.Equal(t, build.BatchID, bid.ID) + return nil + }).Times(1) + // No PublishAfter expected on terminal. - build := entity.Build{ - ID: "build-123", - BatchID: "test-queue/batch/1", - Status: entity.BuildStatusAccepted, + err := h.controller.Process(context.Background(), buildDelivery(t, ctrl, build)) + require.NoError(t, err) + }) } +} - payload, err := build.ToBytes() - require.NoError(t, err) +// TestController_Process_NonTerminal verifies a non-terminal poll persists +// the status, publishes to speculate, AND re-publishes to buildsignal via +// PublishAfter with the per-status delay. +func TestController_Process_NonTerminal(t *testing.T) { + tests := []struct { + name string + status entity.BuildStatus + wantDelayMs int64 + }{ + {"accepted uses accepted delay", entity.BuildStatusAccepted, PollDelayAcceptedMs}, + {"running uses running delay", entity.BuildStatusRunning, PollDelayRunningMs}, + } - msg := queue.NewMessage("build-123", payload, "test-queue/batch/1", nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + h := newTestHarness(t, ctrl) - err = controller.Process(context.Background(), delivery) - require.NoError(t, err) + build := entity.Build{ID: "b-2", BatchID: "batch-2", Status: entity.BuildStatusAccepted} + + h.buildStore.EXPECT().Get(gomock.Any(), build.ID).Return(build, nil) + h.br.EXPECT().Status(gomock.Any(), entity.BuildID{ID: build.ID}).Return(tt.status, entity.BuildMetadata{}, nil) + h.buildStore.EXPECT().UpdateStatus(gomock.Any(), build.ID, tt.status).Return(nil) + h.speculatePub.EXPECT().Publish(gomock.Any(), "speculate", gomock.Any()).Return(nil).Times(1) + h.signalPub.EXPECT(). + PublishAfter(gomock.Any(), "buildsignal", gomock.AssignableToTypeOf(entityqueue.Message{}), tt.wantDelayMs). + DoAndReturn(func(_ context.Context, _ string, msg entityqueue.Message, _ int64) error { + bid, err := entity.BuildIDFromBytes(msg.Payload) + require.NoError(t, err) + // Re-published payload carries only the build ID. + assert.Equal(t, build.ID, bid.ID) + return nil + }).Times(1) + + err := h.controller.Process(context.Background(), buildDelivery(t, ctrl, build)) + require.NoError(t, err) + }) + } } -func TestController_Process_InvalidJSON(t *testing.T) { +func TestController_Process_StatusError(t *testing.T) { ctrl := gomock.NewController(t) + h := newTestHarness(t, ctrl) + + build := entity.Build{ID: "b-3", BatchID: "batch-3", Status: entity.BuildStatusAccepted} + + h.buildStore.EXPECT().Get(gomock.Any(), build.ID).Return(build, nil) + h.br.EXPECT().Status(gomock.Any(), entity.BuildID{ID: build.ID}).Return(entity.BuildStatusUnknown, nil, errors.New("provider down")) + // No UpdateStatus, no Publish, no PublishAfter expected. + + err := h.controller.Process(context.Background(), buildDelivery(t, ctrl, build)) + require.Error(t, err) + // Non-retryable: rejects to DLQ on first failure; republish is the recovery path. + assert.False(t, errs.IsRetryable(err)) +} - store := newMockStorage(ctrl, "test-queue/batch/1") - controller := newTestController(t, ctrl, store, nil) +func TestController_Process_UpdateStatusError(t *testing.T) { + ctrl := gomock.NewController(t) + h := newTestHarness(t, ctrl) - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + build := entity.Build{ID: "b-4", BatchID: "batch-4", Status: entity.BuildStatusAccepted} - err := controller.Process(context.Background(), delivery) + h.buildStore.EXPECT().Get(gomock.Any(), build.ID).Return(build, nil) + h.br.EXPECT().Status(gomock.Any(), entity.BuildID{ID: build.ID}).Return(entity.BuildStatusRunning, nil, nil) + h.buildStore.EXPECT().UpdateStatus(gomock.Any(), build.ID, entity.BuildStatusRunning). + Return(errors.New("db unreachable")) + // No Publish / PublishAfter expected after the store failure. + err := h.controller.Process(context.Background(), buildDelivery(t, ctrl, build)) require.Error(t, err) + // Non-retryable: rejects to DLQ on first failure; republish is the recovery path. assert.False(t, errs.IsRetryable(err)) } -func TestController_Process_PublishFailure(t *testing.T) { +// TestController_Process_RepublishError verifies that a failure to re-publish +// the delayed poll message is retryable: the re-schedule is the loop's +// heartbeat, so it should nack and replay rather than reject straight to DLQ. +// The preceding status/persist/speculate steps all succeed. +func TestController_Process_RepublishError(t *testing.T) { ctrl := gomock.NewController(t) + h := newTestHarness(t, ctrl) - store := newMockStorage(ctrl, "test-queue/batch/2") - controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) + build := entity.Build{ID: "b-5", BatchID: "batch-5", Status: entity.BuildStatusAccepted} - build := entity.Build{ - ID: "build-456", - BatchID: "test-queue/batch/2", - Status: entity.BuildStatusRunning, - } + h.buildStore.EXPECT().Get(gomock.Any(), build.ID).Return(build, nil) + h.br.EXPECT().Status(gomock.Any(), entity.BuildID{ID: build.ID}).Return(entity.BuildStatusRunning, entity.BuildMetadata{}, nil) + h.buildStore.EXPECT().UpdateStatus(gomock.Any(), build.ID, entity.BuildStatusRunning).Return(nil) + h.speculatePub.EXPECT().Publish(gomock.Any(), "speculate", gomock.Any()).Return(nil).Times(1) + h.signalPub.EXPECT(). + PublishAfter(gomock.Any(), "buildsignal", gomock.Any(), PollDelayRunningMs). + Return(errors.New("queue unavailable")).Times(1) - payload, err := build.ToBytes() - require.NoError(t, err) + err := h.controller.Process(context.Background(), buildDelivery(t, ctrl, build)) + require.Error(t, err) + assert.True(t, errs.IsRetryable(err)) +} + +// TestController_Process_GetError verifies that a failure to load the Build +// from storage (only the ID is on the queue) surfaces an error. Non-retryable: +// it rejects to DLQ on first failure, consistent with other storage reads. +func TestController_Process_GetError(t *testing.T) { + ctrl := gomock.NewController(t) + h := newTestHarness(t, ctrl) - msg := queue.NewMessage(build.ID, payload, build.BatchID, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + build := entity.Build{ID: "b-6", BatchID: "batch-6", Status: entity.BuildStatusAccepted} - err = controller.Process(context.Background(), delivery) - assert.Error(t, err) + h.buildStore.EXPECT().Get(gomock.Any(), build.ID).Return(entity.Build{}, errors.New("db unreachable")) + // No Status / UpdateStatus / Publish expected once the load fails. + + err := h.controller.Process(context.Background(), buildDelivery(t, ctrl, build)) + require.Error(t, err) + assert.False(t, errs.IsRetryable(err)) } -func TestController_InterfaceImplementation(t *testing.T) { +func TestController_Process_MalformedPayload(t *testing.T) { ctrl := gomock.NewController(t) - store := newMockStorage(ctrl, "test-queue/batch/1") - controller := newTestController(t, ctrl, store, nil) + h := newTestHarness(t, ctrl) - var _ consumer.Controller = controller + msg := entityqueue.NewMessage("bad", []byte(`{"invalid"`), "batch-bad", nil) + d := queuemock.NewMockDelivery(ctrl) + d.EXPECT().Message().Return(msg).AnyTimes() + d.EXPECT().Attempt().Return(1).AnyTimes() + + err := h.controller.Process(context.Background(), d) + require.Error(t, err) }