Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ load("@gazelle//:def.bzl", "gazelle")

# gazelle:prefix github.com/uber/submitqueue

# Exclude nested worktrees (created under .claude/worktrees) so gazelle does not
# index them as duplicate rule definitions and corrupt the canonical BUILD files.
# gazelle:exclude .claude

# Resolve protobuf import ambiguities - use the actual protopb packages, not the proto aliases
# gazelle:resolve go github.com/uber/submitqueue/gateway/protopb //gateway/protopb
# gazelle:resolve go github.com/uber/submitqueue/orchestrator/protopb //orchestrator/protopb
Expand Down
6 changes: 5 additions & 1 deletion core/consumer/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ const (
TopicKeySpeculate TopicKey = "speculate"
// TopicKeyBuild is the pipeline stage where speculated batches are published for builds.
TopicKeyBuild TopicKey = "build"
// TopicKeyBuildSignal is the pipeline stage where builds are published for build signal processing.
// TopicKeyBuildSignal is the polling stage for triggered builds. Each
// message carries a Build; the consumer calls BuildRunner.Status,
// persists the latest status, publishes the batch ID to TopicKeySpeculate
// so the state machine re-evaluates, and re-publishes itself via
// PublishAfter when the build has not yet reached a terminal state.
TopicKeyBuildSignal TopicKey = "buildsignal"
// TopicKeyMerge is the pipeline stage where speculated batches are published for merging.
TopicKeyMerge TopicKey = "merge"
Expand Down
2 changes: 2 additions & 0 deletions example/server/orchestrator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ go_library(
"//core/consumer",
"//core/httpclient",
"//entity",
"//extension/buildrunner",
"//extension/buildrunner/noop",
"//extension/changeprovider",
"//extension/changeprovider/github",
"//extension/changestore",
Expand Down
20 changes: 14 additions & 6 deletions example/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/httpclient"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/buildrunner"
buildnoop "github.com/uber/submitqueue/extension/buildrunner/noop"
"github.com/uber/submitqueue/extension/changeprovider"
githubprovider "github.com/uber/submitqueue/extension/changeprovider/github"
"github.com/uber/submitqueue/extension/changestore"
Expand Down Expand Up @@ -216,8 +218,12 @@ func run() error {
return fmt.Errorf("failed to create pusher: %w", err)
}

// Create build runner. The noop runner is the pass-through default
// (every build immediately succeeds) until a real backend is wired in.
br := buildnoop.New()

// Register controllers
if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, cnt, store, changeStore); err != nil {
if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, br, cnt, store, changeStore); err != nil {
return err
}

Expand Down Expand Up @@ -397,12 +403,12 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
// Pipeline:
//
// request → validate → batch → score → speculate → build → buildsignal ─┐
// ↑ ↘
// │ merge → conclude
// │ │
// └────────┴───────────────────────
// ↑ ↘ ↻ poll
// │ merge → conclude │
// │ │ │
// └────────┴───────────────────────┘

func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error {
func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error {
requestController := start.NewController(
logger,
scope,
Expand Down Expand Up @@ -488,6 +494,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
logger,
scope,
store,
br,
registry,
consumer.TopicKeyBuild,
"orchestrator-build",
Expand All @@ -500,6 +507,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
logger,
scope,
store,
br,
registry,
consumer.TopicKeyBuildSignal,
"orchestrator-buildsignal",
Expand Down
5 changes: 5 additions & 0 deletions orchestrator/controller/build/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//core/metrics",
"//entity",
"//entity/queue",
"//extension/buildrunner",
"//extension/storage",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_zap//:zap",
Expand All @@ -25,7 +26,11 @@ go_test(
"//core/errs",
"//entity",
"//entity/queue",
"//extension/buildrunner",
"//extension/buildrunner/mock",
"//extension/buildrunner/noop",
"//extension/queue/mock",
"//extension/storage",
"//extension/storage/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
80 changes: 70 additions & 10 deletions orchestrator/controller/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package build

import (
"context"
"errors"
"fmt"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/entity"
entityqueue "github.com/uber/submitqueue/entity/queue"
"github.com/uber/submitqueue/extension/buildrunner"
"github.com/uber/submitqueue/extension/storage"
"go.uber.org/zap"
)
Expand All @@ -34,6 +36,7 @@ type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
store storage.Storage
buildRunner buildrunner.BuildRunner
registry consumer.TopicRegistry
topicKey consumer.TopicKey
consumerGroup string
Expand All @@ -47,6 +50,7 @@ func NewController(
logger *zap.SugaredLogger,
scope tally.Scope,
store storage.Storage,
buildRunner buildrunner.BuildRunner,
registry consumer.TopicRegistry,
topicKey consumer.TopicKey,
consumerGroup string,
Expand All @@ -55,6 +59,7 @@ func NewController(
logger: logger.Named("build_controller"),
metricsScope: scope.SubScope("build_controller"),
store: store,
buildRunner: buildRunner,
registry: registry,
topicKey: topicKey,
consumerGroup: consumerGroup,
Expand Down Expand Up @@ -95,17 +100,45 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
"partition_key", msg.PartitionKey,
)

// TODO: Add build logic
// - Trigger CI build
// - Track build status
// Assemble base (dependency batches in order) and head (this batch).
base, err := c.collectChanges(ctx, batch.Dependencies)
if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1)
return fmt.Errorf("failed to assemble base changes for batch %s: %w", batch.ID, err)
}
head, err := c.collectChanges(ctx, []string{batch.ID})
if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1)
return fmt.Errorf("failed to assemble head changes for batch %s: %w", batch.ID, err)
}

// Trigger the build with the configured build manager. metadata is nil
// until a caller-supplied source materializes (e.g. requester / ticket
// pulled off the originating LandRequest).
buildID, err := c.buildRunner.Trigger(ctx, batch.Queue, base, head, nil)
if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "trigger_errors", 1)
return fmt.Errorf("failed to trigger build for batch %s: %w", batch.ID, err)
}

build := entity.Build{
ID: batch.ID,
BatchID: batch.ID,
Status: entity.BuildStatusAccepted,
ID: buildID.ID,
BatchID: batch.ID,
SpeculationPath: entity.SpeculationPathInfo{Base: append([]string{}, batch.Dependencies...)},
Status: entity.BuildStatusAccepted,
}

// Persist the initial Build snapshot so the buildsignal poll loop has a
// row to UpdateStatus against. ErrAlreadyExists is benign — a redelivery
// of this message after a previous successful Create.
if err := c.store.GetBuildStore().Create(ctx, build); err != nil && !errors.Is(err, storage.ErrAlreadyExists) {
metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1)
return fmt.Errorf("failed to persist build %s: %w", build.ID, err)
}

// Publish build to build signal topic
// Hand off to the buildsignal poll loop; it calls Status, updates the
// persisted Build, publishes to speculate, and re-publishes itself via
// PublishAfter until terminal.
if err := c.publish(ctx, consumer.TopicKeyBuildSignal, build); err != nil {
metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1)
return fmt.Errorf("failed to publish to buildsignal: %w", err)
Expand All @@ -114,17 +147,44 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
c.logger.Infow("published build to buildsignal",
"batch_id", batch.ID,
"build_id", build.ID,
"status", string(build.Status),
"topic_key", consumer.TopicKeyBuildSignal,
)

return nil // Success - message will be acked
}

// publish publishes a build to the specified topic key.
// collectChanges loads each batch by ID and concatenates the Change values
// from its contained requests in batch order. Used to build the base
// (dependency batches) and head (this batch) inputs to BuildRunner.Trigger.
func (c *Controller) collectChanges(ctx context.Context, batchIDs []string) ([]entity.Change, error) {
if len(batchIDs) == 0 {
return nil, nil
}
var changes []entity.Change
for _, bID := range batchIDs {
b, err := c.store.GetBatchStore().Get(ctx, bID)
if err != nil {
return nil, fmt.Errorf("failed to get batch %s: %w", bID, err)
}
for _, reqID := range b.Contains {
req, err := c.store.GetRequestStore().Get(ctx, reqID)
if err != nil {
return nil, fmt.Errorf("failed to get request %s for batch %s: %w", reqID, bID, err)
}
changes = append(changes, req.Change)
}
}
return changes, nil
}

// publish publishes a build's ID to the specified topic key. Only the
// identifier travels on the queue; the consumer loads the full Build from
// storage, keeping the message small and the store the single source of truth.
func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, build entity.Build) error {
payload, err := build.ToBytes()
payload, err := entity.BuildID{ID: build.ID}.ToBytes()
if err != nil {
return fmt.Errorf("failed to serialize build: %w", err)
return fmt.Errorf("failed to serialize build ID: %w", err)
}

msg := entityqueue.NewMessage(build.ID, payload, build.BatchID, nil)
Expand Down
Loading
Loading