Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions example/server/orchestrator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ go_library(
"//core/consumer",
"//core/httpclient",
"//entity",
"//extension/build",
"//extension/build/noop",
"//extension/changeprovider",
"//extension/changeprovider/github",
"//extension/changestore",
Expand Down
11 changes: 9 additions & 2 deletions example/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/httpclient"
"github.com/uber/submitqueue/entity"
buildext "github.com/uber/submitqueue/extension/build"
buildnoop "github.com/uber/submitqueue/extension/build/noop"
"github.com/uber/submitqueue/extension/changeprovider"
githubprovider "github.com/uber/submitqueue/extension/changeprovider/github"
"github.com/uber/submitqueue/extension/changestore"
Expand Down Expand Up @@ -216,8 +218,12 @@ func run() error {
return fmt.Errorf("failed to create pusher: %w", err)
}

// Create build manager. The noop manager is the pass-through default
// (every build immediately succeeds) until a real provider is wired in.
bm := buildnoop.New()

// Register controllers
if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, cnt, store, changeStore); err != nil {
if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, bm, cnt, store, changeStore); err != nil {
return err
}

Expand Down Expand Up @@ -402,7 +408,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
// │ │ │
// └────────┴────────────────────────┘

func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error {
func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, bm buildext.BuildManager, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error {
requestController := start.NewController(
logger,
scope,
Expand Down Expand Up @@ -488,6 +494,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
logger,
scope,
store,
bm,
registry,
consumer.TopicKeyBuild,
"orchestrator-build",
Expand Down
4 changes: 4 additions & 0 deletions orchestrator/controller/build/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//core/metrics",
"//entity",
"//entity/queue",
"//extension/build",
"//extension/storage",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_zap//:zap",
Expand All @@ -25,6 +26,9 @@ go_test(
"//core/errs",
"//entity",
"//entity/queue",
"//extension/build",
"//extension/build/mock",
"//extension/build/noop",
"//extension/queue/mock",
"//extension/storage/mock",
"@com_github_stretchr_testify//assert",
Expand Down
32 changes: 27 additions & 5 deletions orchestrator/controller/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/entity"
entityqueue "github.com/uber/submitqueue/entity/queue"
"github.com/uber/submitqueue/extension/build"
"github.com/uber/submitqueue/extension/storage"
"go.uber.org/zap"
)
Expand All @@ -34,6 +35,7 @@ type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
store storage.Storage
buildManager build.BuildManager
registry consumer.TopicRegistry
topicKey consumer.TopicKey
consumerGroup string
Expand All @@ -47,6 +49,7 @@ func NewController(
logger *zap.SugaredLogger,
scope tally.Scope,
store storage.Storage,
buildManager build.BuildManager,
registry consumer.TopicRegistry,
topicKey consumer.TopicKey,
consumerGroup string,
Expand All @@ -55,6 +58,7 @@ func NewController(
logger: logger.Named("build_controller"),
metricsScope: scope.SubScope("build_controller"),
store: store,
buildManager: buildManager,
registry: registry,
topicKey: topicKey,
consumerGroup: consumerGroup,
Expand Down Expand Up @@ -95,14 +99,31 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
"partition_key", msg.PartitionKey,
)

// TODO: Add build logic
// - Trigger CI build
// - Track build status
// Assemble the changes to build from the batch's requests.
changes := make([]entity.BuildChange, 0, len(batch.Contains))
for _, reqID := range batch.Contains {
req, err := c.store.GetRequestStore().Get(ctx, reqID)
if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1)
return fmt.Errorf("failed to get request %s for batch %s: %w", reqID, batch.ID, err)
}
changes = append(changes, entity.BuildChange{
Change: req.Change,
Action: entity.ChangeActionValidate,
})
}

// Trigger the build with the configured build manager.
buildID, status, err := c.buildManager.Trigger(ctx, batch.Queue, changes)
if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "trigger_errors", 1)
return fmt.Errorf("failed to trigger build for batch %s: %w", batch.ID, err)
}

build := entity.Build{
ID: batch.ID,
ID: buildID,
BatchID: batch.ID,
Status: entity.BuildStatusAccepted,
Status: status,
}

// Publish build to build signal topic
Expand All @@ -114,6 +135,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
c.logger.Infow("published build to buildsignal",
"batch_id", batch.ID,
"build_id", build.ID,
"status", string(build.Status),
"topic_key", consumer.TopicKeyBuildSignal,
)

Expand Down
111 changes: 103 additions & 8 deletions orchestrator/controller/build/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/entity/queue"
"github.com/uber/submitqueue/extension/build"
buildmock "github.com/uber/submitqueue/extension/build/mock"
buildnoop "github.com/uber/submitqueue/extension/build/noop"
queuemock "github.com/uber/submitqueue/extension/queue/mock"
storagemock "github.com/uber/submitqueue/extension/storage/mock"
"go.uber.org/mock/gomock"
Expand Down Expand Up @@ -59,8 +62,9 @@ func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.Mo
return store
}

// newTestController creates a controller with test dependencies.
func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller {
// newTestController creates a controller with test dependencies. bm is the
// build manager to inject; pass buildnoop.New() for the pass-through default.
func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, bm build.BuildManager, publishErr error) *Controller {
logger := zaptest.NewLogger(t).Sugar()
scope := tally.NoopScope

Expand All @@ -79,14 +83,14 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock
)
require.NoError(t, err)

return NewController(logger, scope, store, registry, consumer.TopicKeyBuild, "orchestrator-build")
return NewController(logger, scope, store, bm, registry, consumer.TopicKeyBuild, "orchestrator-build")
}

func TestNewController(t *testing.T) {
ctrl := gomock.NewController(t)
batch := testBatch()
store := newMockStorage(ctrl, batch)
controller := newTestController(t, ctrl, store, nil)
controller := newTestController(t, ctrl, store, buildnoop.New(), nil)

require.NotNil(t, controller)
assert.Equal(t, consumer.TopicKeyBuild, controller.TopicKey())
Expand All @@ -99,7 +103,7 @@ func TestController_Process_Success(t *testing.T) {

batch := testBatch()
store := newMockStorage(ctrl, batch)
controller := newTestController(t, ctrl, store, nil)
controller := newTestController(t, ctrl, store, buildnoop.New(), nil)

msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil)
delivery := queuemock.NewMockDelivery(ctrl)
Expand All @@ -110,6 +114,97 @@ func TestController_Process_Success(t *testing.T) {
require.NoError(t, err)
}

// TestController_Process_TriggersBuildWithChanges verifies the controller
// assembles one BuildChange per request in the batch, triggers the build, and
// publishes a Build carrying the manager's returned ID and status.
func TestController_Process_TriggersBuildWithChanges(t *testing.T) {
ctrl := gomock.NewController(t)

batch := entity.Batch{
ID: "test-queue/batch/1",
Queue: "test-queue",
State: entity.BatchStateCreated,
Version: 1,
Contains: []string{"test-queue/1", "test-queue/2"},
}
req1 := entity.Request{ID: "test-queue/1", Change: entity.Change{URIs: []string{"github://o/r/pull/1/aaa"}}}
req2 := entity.Request{ID: "test-queue/2", Change: entity.Change{URIs: []string{"github://o/r/pull/2/bbb"}}}

mockBatchStore := storagemock.NewMockBatchStore(ctrl)
mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil)
mockRequestStore := storagemock.NewMockRequestStore(ctrl)
mockRequestStore.EXPECT().Get(gomock.Any(), req1.ID).Return(req1, nil)
mockRequestStore.EXPECT().Get(gomock.Any(), req2.ID).Return(req2, nil)
store := storagemock.NewMockStorage(ctrl)
store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes()
store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes()

bm := buildmock.NewMockBuildManager(ctrl)
wantChanges := []entity.BuildChange{
{Change: req1.Change, Action: entity.ChangeActionValidate},
{Change: req2.Change, Action: entity.ChangeActionValidate},
}
bm.EXPECT().Trigger(gomock.Any(), batch.Queue, wantChanges).Return("build-xyz", entity.BuildStatusRunning, nil)

var published entity.Build
mockPub := queuemock.NewMockPublisher(ctrl)
mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ string, msg queue.Message) error {
b, err := entity.BuildFromBytes(msg.Payload)
require.NoError(t, err)
published = b
return nil
},
)
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes()
registry, err := consumer.NewTopicRegistry(
[]consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}},
)
require.NoError(t, err)

controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, bm, registry, consumer.TopicKeyBuild, "orchestrator-build")

msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil)
delivery := queuemock.NewMockDelivery(ctrl)
delivery.EXPECT().Message().Return(msg).AnyTimes()
delivery.EXPECT().Attempt().Return(1).AnyTimes()

require.NoError(t, controller.Process(context.Background(), delivery))
assert.Equal(t, "build-xyz", published.ID)
assert.Equal(t, batch.ID, published.BatchID)
assert.Equal(t, entity.BuildStatusRunning, published.Status)
}

// TestController_Process_TriggerFailure verifies a build-manager failure is
// surfaced as an error (nack) and nothing is published.
func TestController_Process_TriggerFailure(t *testing.T) {
ctrl := gomock.NewController(t)

batch := testBatch()
mockBatchStore := storagemock.NewMockBatchStore(ctrl)
mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil)
store := storagemock.NewMockStorage(ctrl)
store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes()

bm := buildmock.NewMockBuildManager(ctrl)
bm.EXPECT().Trigger(gomock.Any(), batch.Queue, gomock.Any()).
Return("", entity.BuildStatusUnknown, fmt.Errorf("provider down"))

registry, err := consumer.NewTopicRegistry(
[]consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: queuemock.NewMockQueue(ctrl)}},
)
require.NoError(t, err)
controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, bm, registry, consumer.TopicKeyBuild, "orchestrator-build")

msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil)
delivery := queuemock.NewMockDelivery(ctrl)
delivery.EXPECT().Message().Return(msg).AnyTimes()
delivery.EXPECT().Attempt().Return(1).AnyTimes()

require.Error(t, controller.Process(context.Background(), delivery))
}

func TestController_Process_StorageFailure(t *testing.T) {
ctrl := gomock.NewController(t)

Expand All @@ -118,7 +213,7 @@ func TestController_Process_StorageFailure(t *testing.T) {
store := storagemock.NewMockStorage(ctrl)
store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes()

controller := newTestController(t, ctrl, store, nil)
controller := newTestController(t, ctrl, store, buildnoop.New(), nil)

msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil)
delivery := queuemock.NewMockDelivery(ctrl)
Expand All @@ -135,7 +230,7 @@ func TestController_Process_PublishFailure(t *testing.T) {

batch := testBatch()
store := newMockStorage(ctrl, batch)
controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed"))
controller := newTestController(t, ctrl, store, buildnoop.New(), fmt.Errorf("publish failed"))

msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil)
delivery := queuemock.NewMockDelivery(ctrl)
Expand All @@ -150,7 +245,7 @@ func TestController_InterfaceImplementation(t *testing.T) {
ctrl := gomock.NewController(t)
batch := testBatch()
store := newMockStorage(ctrl, batch)
controller := newTestController(t, ctrl, store, nil)
controller := newTestController(t, ctrl, store, buildnoop.New(), nil)

var _ consumer.Controller = controller
}
Loading