diff --git a/Makefile b/Makefile index b75fa5ac..241ea9d4 100644 --- a/Makefile +++ b/Makefile @@ -274,7 +274,7 @@ local-stovepipe-stop: ## Stop Stovepipe service mocks: ## Generate mock files using mockgen @echo "Generating mocks..." - @$(BAZEL) run @rules_go//go -- generate ./extension/storage/... ./extension/changestore/... ./extension/counter/... ./extension/queue/... ./extension/mergechecker/... ./extension/scorer/... ./core/consumer/... + @$(BAZEL) run @rules_go//go -- generate ./extension/storage/... ./extension/changestore/... ./extension/counter/... ./extension/queue/... ./extension/mergechecker/... ./extension/pusher/... ./extension/scorer/... ./extension/conflict/... ./core/consumer/... @echo "Mocks generated successfully!" proto: ## Generate protobuf files from .proto definitions diff --git a/example/server/orchestrator/BUILD.bazel b/example/server/orchestrator/BUILD.bazel index 698662c3..12cf41bc 100644 --- a/example/server/orchestrator/BUILD.bazel +++ b/example/server/orchestrator/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//extension/changeprovider/github", "//extension/changestore", "//extension/changestore/mysql", + "//extension/conflict/all", "//extension/counter", "//extension/counter/mysql", "//extension/mergechecker", diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index fc39eb2a..d903741e 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -37,6 +37,7 @@ import ( githubprovider "github.com/uber/submitqueue/extension/changeprovider/github" "github.com/uber/submitqueue/extension/changestore" mysqlchangestore "github.com/uber/submitqueue/extension/changestore/mysql" + "github.com/uber/submitqueue/extension/conflict/all" "github.com/uber/submitqueue/extension/counter" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" "github.com/uber/submitqueue/extension/mergechecker" @@ -436,6 +437,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t registry, cnt, store, + // TODO: replace with a real conflict analyzer (e.g. one backed by + // Tango target analysis). The "all" stub serializes the queue. + all.New(), consumer.TopicKeyBatch, "orchestrator-batch", ) diff --git a/extension/conflict/BUILD.bazel b/extension/conflict/BUILD.bazel new file mode 100644 index 00000000..9792ba94 --- /dev/null +++ b/extension/conflict/BUILD.bazel @@ -0,0 +1,9 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "conflict", + srcs = ["conflict.go"], + importpath = "github.com/uber/submitqueue/extension/conflict", + visibility = ["//visibility:public"], + deps = ["//entity"], +) diff --git a/extension/conflict/README.md b/extension/conflict/README.md new file mode 100644 index 00000000..4813f641 --- /dev/null +++ b/extension/conflict/README.md @@ -0,0 +1,46 @@ +# Conflict + +Vendor-agnostic interface for detecting conflicts between a candidate batch +and the batches already in flight. + +## Interface + +`Analyzer` exposes a single `Analyze` method that takes the candidate batch +and the list of in-flight batches it might conflict with. It returns the +subset of in-flight batches that conflict with the candidate, each paired +with a `ConflictType` describing the kind of conflict. An empty result means +the candidate is free to advance independently. + +Callers are responsible for filtering out the candidate itself and any +terminal batches from the in-flight list before invoking the analyzer. The +analyzer itself stays free of lifecycle knowledge. A non-nil error reports +an infrastructure failure of the analysis and should be treated as +retryable by the caller. + +The analyzer is intentionally pure with respect to batch state: it does not +mutate inputs, does not read storage, and may be called concurrently. Real +implementations are expected to resolve the batch contents (e.g. changed +build targets, modified files) via whichever upstream system they depend +on, and to return as much classification detail as that system supports. + +## Implementations + +- [`all/`](all/) — pessimistic stub: reports every in-flight batch as a + `ConflictTypeConservative` conflict. Useful as a worst-case baseline and + for wiring tests where speculation must serialize. +- [`none/`](none/) — optimistic stub: reports no conflicts. Useful as a + best-case baseline and for wiring tests where speculation should run all + batches in parallel. + +## Adding a new backend + +1. Create `extension/conflict/{backend}/` with an `Analyzer` implementation. +2. Resolve each `entity.Batch` into whatever signal the backend needs + (e.g. changed build targets, files touched, dependency graphs). +3. Emit one `Conflict` per (in-flight batch, detected conflict type). Pick + the most specific `ConflictType` your backend can determine; use + `ConflictTypeConservative` only when the backend cannot prove the absence + of a conflict and falls back to a pessimistic default. Introduce a new + `ConflictType` constant when you can classify the conflict more precisely. +4. Return a plain error for transient infrastructure failures so callers + can classify and retry. diff --git a/extension/conflict/all/BUILD.bazel b/extension/conflict/all/BUILD.bazel new file mode 100644 index 00000000..f66b479c --- /dev/null +++ b/extension/conflict/all/BUILD.bazel @@ -0,0 +1,24 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "all", + srcs = ["all.go"], + importpath = "github.com/uber/submitqueue/extension/conflict/all", + visibility = ["//visibility:public"], + deps = [ + "//entity", + "//extension/conflict", + ], +) + +go_test( + name = "all_test", + srcs = ["all_test.go"], + embed = [":all"], + deps = [ + "//entity", + "//extension/conflict", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], +) diff --git a/extension/conflict/all/all.go b/extension/conflict/all/all.go new file mode 100644 index 00000000..4b753151 --- /dev/null +++ b/extension/conflict/all/all.go @@ -0,0 +1,51 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package all provides a conflict.Analyzer that pessimistically reports a +// conflict against every in-flight batch. It is intended as a stub for +// wiring tests and as a worst-case baseline for speculation behavior. +package all + +import ( + "context" + + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/conflict" +) + +// analyzer is a conflict.Analyzer that reports every in-flight batch as a +// conflict, classified as ConflictTypeConservative. +type analyzer struct{} + +// New returns a conflict.Analyzer that reports a conflict against every +// in-flight batch. +func New() conflict.Analyzer { + return analyzer{} +} + +// Analyze returns one ConflictTypeConservative Conflict per in-flight batch, +// preserving the input order. Returns an empty slice when inFlight is empty. +func (analyzer) Analyze(_ context.Context, _ entity.Batch, inFlight []entity.Batch) ([]conflict.Conflict, error) { + if len(inFlight) == 0 { + return nil, nil + } + conflicts := make([]conflict.Conflict, len(inFlight)) + for i, b := range inFlight { + conflicts[i] = conflict.Conflict{ + BatchID: b.ID, + Type: conflict.ConflictTypeConservative, + } + } + return conflicts, nil +} diff --git a/extension/conflict/all/all_test.go b/extension/conflict/all/all_test.go new file mode 100644 index 00000000..02e7f521 --- /dev/null +++ b/extension/conflict/all/all_test.go @@ -0,0 +1,68 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package all + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/conflict" +) + +func TestAnalyze(t *testing.T) { + batch := entity.Batch{ID: "queueA/batch/10"} + + tests := []struct { + name string + inFlight []entity.Batch + want []conflict.Conflict + }{ + { + name: "no in-flight batches yields no conflicts", + inFlight: nil, + want: nil, + }, + { + name: "empty in-flight slice yields no conflicts", + inFlight: []entity.Batch{}, + want: nil, + }, + { + name: "every in-flight batch is reported in input order", + inFlight: []entity.Batch{ + {ID: "queueA/batch/1"}, + {ID: "queueA/batch/2"}, + {ID: "queueA/batch/3"}, + }, + want: []conflict.Conflict{ + {BatchID: "queueA/batch/1", Type: conflict.ConflictTypeConservative}, + {BatchID: "queueA/batch/2", Type: conflict.ConflictTypeConservative}, + {BatchID: "queueA/batch/3", Type: conflict.ConflictTypeConservative}, + }, + }, + } + + a := New() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := a.Analyze(context.Background(), batch, tt.inFlight) + require.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/extension/conflict/conflict.go b/extension/conflict/conflict.go new file mode 100644 index 00000000..b9c98229 --- /dev/null +++ b/extension/conflict/conflict.go @@ -0,0 +1,68 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package conflict + +//go:generate mockgen -source=conflict.go -destination=mock/conflict_mock.go -package=mock + +import ( + "context" + + "github.com/uber/submitqueue/entity" +) + +// ConflictType classifies why two batches are considered to conflict. +// New values may be added as more sophisticated analyzers are introduced. +type ConflictType string + +const ( + // ConflictTypeUnknown is the unreachable zero value, set by default when + // the structure is initialized. It should never be seen in the system. + ConflictTypeUnknown ConflictType = "" + // ConflictTypeConservative means the analyzer treated the batches as + // conflicting because it could not prove otherwise, without identifying a + // specific reason. Used by conservative analyzers that serialize + // everything by default. + ConflictTypeConservative ConflictType = "conservative" + // ConflictTypeTargetOverlap means the two batches modify one or more of + // the same build targets and may therefore interfere with each other. + ConflictTypeTargetOverlap ConflictType = "target_overlap" +) + +// Conflict reports a single conflict between the analyzed batch and one of +// the in-flight batches. +type Conflict struct { + // BatchID is the ID of the in-flight batch that conflicts with the + // analyzed batch. + BatchID string + // Type classifies the conflict. A single (analyzed, in-flight) pair may + // be reported with multiple Conflict entries when different conflict + // types apply. + Type ConflictType +} + +// Analyzer detects conflicts between a candidate batch and the batches +// already in flight, so the speculation layer can decide which batches can +// safely advance in parallel. +type Analyzer interface { + // Analyze returns the subset of inFlight batches that conflict with + // batch, each paired with the type of conflict detected. An empty + // result means batch does not conflict with any in-flight batch. + // + // Callers should not include batch itself in inFlight; terminal batches + // should be filtered out before calling. A non-nil error indicates the + // analysis itself failed (infrastructure issue) and should be treated as + // retryable by the caller. + Analyze(ctx context.Context, batch entity.Batch, inFlight []entity.Batch) ([]Conflict, error) +} diff --git a/extension/conflict/mock/BUILD.bazel b/extension/conflict/mock/BUILD.bazel new file mode 100644 index 00000000..b88453ca --- /dev/null +++ b/extension/conflict/mock/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "mock", + srcs = ["conflict_mock.go"], + importpath = "github.com/uber/submitqueue/extension/conflict/mock", + visibility = ["//visibility:public"], + deps = [ + "//entity", + "//extension/conflict", + "@org_uber_go_mock//gomock", + ], +) diff --git a/extension/conflict/mock/conflict_mock.go b/extension/conflict/mock/conflict_mock.go new file mode 100644 index 00000000..fbc09eb6 --- /dev/null +++ b/extension/conflict/mock/conflict_mock.go @@ -0,0 +1,58 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: conflict.go +// +// Generated by this command: +// +// mockgen -source=conflict.go -destination=mock/conflict_mock.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + entity "github.com/uber/submitqueue/entity" + conflict "github.com/uber/submitqueue/extension/conflict" + gomock "go.uber.org/mock/gomock" +) + +// MockAnalyzer is a mock of Analyzer interface. +type MockAnalyzer struct { + ctrl *gomock.Controller + recorder *MockAnalyzerMockRecorder + isgomock struct{} +} + +// MockAnalyzerMockRecorder is the mock recorder for MockAnalyzer. +type MockAnalyzerMockRecorder struct { + mock *MockAnalyzer +} + +// NewMockAnalyzer creates a new mock instance. +func NewMockAnalyzer(ctrl *gomock.Controller) *MockAnalyzer { + mock := &MockAnalyzer{ctrl: ctrl} + mock.recorder = &MockAnalyzerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockAnalyzer) EXPECT() *MockAnalyzerMockRecorder { + return m.recorder +} + +// Analyze mocks base method. +func (m *MockAnalyzer) Analyze(ctx context.Context, batch entity.Batch, inFlight []entity.Batch) ([]conflict.Conflict, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Analyze", ctx, batch, inFlight) + ret0, _ := ret[0].([]conflict.Conflict) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Analyze indicates an expected call of Analyze. +func (mr *MockAnalyzerMockRecorder) Analyze(ctx, batch, inFlight any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Analyze", reflect.TypeOf((*MockAnalyzer)(nil).Analyze), ctx, batch, inFlight) +} diff --git a/extension/conflict/none/BUILD.bazel b/extension/conflict/none/BUILD.bazel new file mode 100644 index 00000000..0ce19267 --- /dev/null +++ b/extension/conflict/none/BUILD.bazel @@ -0,0 +1,23 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "none", + srcs = ["none.go"], + importpath = "github.com/uber/submitqueue/extension/conflict/none", + visibility = ["//visibility:public"], + deps = [ + "//entity", + "//extension/conflict", + ], +) + +go_test( + name = "none_test", + srcs = ["none_test.go"], + embed = [":none"], + deps = [ + "//entity", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], +) diff --git a/extension/conflict/none/none.go b/extension/conflict/none/none.go new file mode 100644 index 00000000..7637a404 --- /dev/null +++ b/extension/conflict/none/none.go @@ -0,0 +1,38 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package none provides a conflict.Analyzer that never reports a conflict. +// It is intended as a stub for wiring tests and as a best-case baseline for +// speculation behavior (maximum parallelism). +package none + +import ( + "context" + + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/conflict" +) + +// analyzer is a conflict.Analyzer that always returns no conflicts. +type analyzer struct{} + +// New returns a conflict.Analyzer that never reports a conflict. +func New() conflict.Analyzer { + return analyzer{} +} + +// Analyze always returns a nil conflict slice, regardless of inputs. +func (analyzer) Analyze(_ context.Context, _ entity.Batch, _ []entity.Batch) ([]conflict.Conflict, error) { + return nil, nil +} diff --git a/extension/conflict/none/none_test.go b/extension/conflict/none/none_test.go new file mode 100644 index 00000000..a3e93695 --- /dev/null +++ b/extension/conflict/none/none_test.go @@ -0,0 +1,53 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package none + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/entity" +) + +func TestAnalyze(t *testing.T) { + batch := entity.Batch{ID: "queueA/batch/10"} + + tests := []struct { + name string + inFlight []entity.Batch + }{ + {name: "no in-flight batches", inFlight: nil}, + {name: "empty in-flight slice", inFlight: []entity.Batch{}}, + { + name: "many in-flight batches", + inFlight: []entity.Batch{ + {ID: "queueA/batch/1"}, + {ID: "queueA/batch/2"}, + {ID: "queueA/batch/3"}, + }, + }, + } + + a := New() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := a.Analyze(context.Background(), batch, tt.inFlight) + require.NoError(t, err) + assert.Empty(t, got) + }) + } +} diff --git a/orchestrator/controller/batch/BUILD.bazel b/orchestrator/controller/batch/BUILD.bazel index 73c66d9b..22692a1a 100644 --- a/orchestrator/controller/batch/BUILD.bazel +++ b/orchestrator/controller/batch/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//core/consumer", "//entity", "//entity/queue", + "//extension/conflict", "//extension/counter", "//extension/storage", "@com_github_uber_go_tally_v4//:tally", @@ -24,6 +25,9 @@ go_test( "//core/consumer", "//entity", "//entity/queue", + "//extension/conflict", + "//extension/conflict/all", + "//extension/conflict/mock", "//extension/counter/mock", "//extension/queue/mock", "//extension/storage/mock", diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index 9330c7b4..2dadde22 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -22,6 +22,7 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/conflict" "github.com/uber/submitqueue/extension/counter" "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" @@ -36,6 +37,7 @@ type Controller struct { registry consumer.TopicRegistry counter counter.Counter store storage.Storage + analyzer conflict.Analyzer topicKey consumer.TopicKey consumerGroup string } @@ -50,6 +52,7 @@ func NewController( registry consumer.TopicRegistry, counter counter.Counter, store storage.Storage, + analyzer conflict.Analyzer, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { @@ -59,6 +62,7 @@ func NewController( registry: registry, counter: counter, store: store, + analyzer: analyzer, topicKey: topicKey, consumerGroup: consumerGroup, } @@ -112,10 +116,9 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er Version: 1, } - // TODO: run Target Analyzer to understand new batch's dependency graph, run it against other active batches to understand the conflicts. - // So far we'll just assume that the new batch conflicts with all active batches, which results in a serial non-parallelized queue. - - // Get active batches for this queue to set as dependencies. + // Get active batches for this queue and ask the conflict analyzer which + // of them the new batch must serialize behind. The dependency set drives + // the speculation graph downstream. activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{ entity.BatchStateCreated, entity.BatchStateSpeculating, @@ -126,29 +129,43 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err) } - for _, dep := range activeBatches { - batch.Dependencies = append(batch.Dependencies, dep.ID) + // Dedupe by batch ID since a single (analyzed, in-flight) pair may be + // reported with multiple Conflict entries when different conflict types + // apply; the dependency graph only tracks the relation. + conflicts, err := c.analyzer.Analyze(ctx, batch, activeBatches) + if err != nil { + c.metricsScope.Counter("conflict_analyzer_errors").Inc(1) + return fmt.Errorf("failed to analyze conflicts for batchID=%s: %w", batch.ID, err) + } + + seen := make(map[string]struct{}, len(conflicts)) + conflictingIDs := make([]string, 0, len(conflicts)) + for _, cf := range conflicts { + if _, ok := seen[cf.BatchID]; ok { + continue + } + seen[cf.BatchID] = struct{}{} + conflictingIDs = append(conflictingIDs, cf.BatchID) } - // Create batch dependent entities (reverse relationship of batch.Dependencies). - // For each dependency, record the new batch as a dependent. - // If existing dependents are found in the store, append them. - for _, dep := range activeBatches { - // Get existing reverse index entry for the dependency. - existing, err := c.store.GetBatchDependentStore().Get(ctx, dep.ID) + batch.Dependencies = conflictingIDs + + // Update reverse index for each conflicting batch (BatchDependent = + // "batches that depend on me"). One UpdateDependents call per conflict. + for _, depID := range conflictingIDs { + existing, err := c.store.GetBatchDependentStore().Get(ctx, depID) if err != nil { c.metricsScope.Counter("batch_dependent_store_errors").Inc(1) - return fmt.Errorf("failed to get batch dependent for batchID=%s: %w", dep.ID, err) + return fmt.Errorf("failed to get batch dependent for batchID=%s: %w", depID, err) } dependents := append(existing.Dependents, batch.ID) newVersion := existing.Version + 1 - if err := c.store.GetBatchDependentStore().UpdateDependents(ctx, dep.ID, existing.Version, newVersion, dependents); err != nil { + if err := c.store.GetBatchDependentStore().UpdateDependents(ctx, depID, existing.Version, newVersion, dependents); err != nil { c.metricsScope.Counter("batch_dependent_store_errors").Inc(1) - return fmt.Errorf("failed to update batch dependent index for existing batchID=%s and new batchID=%s: %w", dep.ID, batch.ID, err) + return fmt.Errorf("failed to update batch dependent index for existing batchID=%s and new batchID=%s: %w", depID, batch.ID, err) } - existing.Version = newVersion } // Create new reverse index entry for the new batch. It would be empty for now, but will be updated as new batches are created that conflict with this batch. diff --git a/orchestrator/controller/batch/batch_test.go b/orchestrator/controller/batch/batch_test.go index 91d41331..3bb49d42 100644 --- a/orchestrator/controller/batch/batch_test.go +++ b/orchestrator/controller/batch/batch_test.go @@ -26,6 +26,9 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/conflict" + "github.com/uber/submitqueue/extension/conflict/all" + conflictmock "github.com/uber/submitqueue/extension/conflict/mock" countermock "github.com/uber/submitqueue/extension/counter/mock" queuemock "github.com/uber/submitqueue/extension/queue/mock" storagemock "github.com/uber/submitqueue/extension/storage/mock" @@ -66,7 +69,8 @@ func testRequest() entity.Request { // newTestController creates a controller with test dependencies. // If mockStorage is nil, a default MockStorage with an empty batch store is created. -func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.MockCounter, mockStorage *storagemock.MockStorage, publishErr error) *Controller { +// If analyzer is nil, the "all" conflict analyzer is used (every active batch becomes a dependency). +func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.MockCounter, mockStorage *storagemock.MockStorage, analyzer conflict.Analyzer, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -88,6 +92,10 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() } + if analyzer == nil { + analyzer = all.New() + } + mockPub := queuemock.NewMockPublisher(ctrl) mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, topic string, msg queue.Message) error { @@ -103,12 +111,12 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M ) require.NoError(t, err) - return NewController(logger, scope, registry, cnt, mockStorage, consumer.TopicKeyBatch, "orchestrator-batch") + return NewController(logger, scope, registry, cnt, mockStorage, analyzer, consumer.TopicKeyBatch, "orchestrator-batch") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, nil) + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, nil, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyBatch, controller.TopicKey()) @@ -119,7 +127,7 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, nil) + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, nil, nil) request := testRequest() msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) @@ -140,7 +148,7 @@ func TestController_Process_StorageFailure(t *testing.T) { mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() - controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil) + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil, nil) msg := queue.NewMessage("test-queue/123", requestIDPayload(t, "test-queue/123"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -154,7 +162,7 @@ func TestController_Process_StorageFailure(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, fmt.Errorf("publish failed")) + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, nil, fmt.Errorf("publish failed")) request := testRequest() msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) @@ -171,7 +179,7 @@ func TestController_Process_CounterFailure(t *testing.T) { cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), fmt.Errorf("counter unavailable")) - controller := newTestController(t, ctrl, cnt, nil, nil) + controller := newTestController(t, ctrl, cnt, nil, nil, nil) request := testRequest() msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) @@ -230,7 +238,7 @@ func TestController_Process_WithDependencies(t *testing.T) { mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() - controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil) + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil, nil) msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -241,9 +249,89 @@ func TestController_Process_WithDependencies(t *testing.T) { require.NoError(t, err) } +func TestController_Process_AnalyzerSelectsSubset(t *testing.T) { + ctrl := gomock.NewController(t) + + request := testRequest() + + // Two active batches in flight; analyzer picks only one as a conflict. + activeBatches := []entity.Batch{ + {ID: "test-queue/batch/1", Queue: "test-queue", State: entity.BatchStateCreated, Version: 1}, + {ID: "test-queue/batch/2", Queue: "test-queue", State: entity.BatchStateSpeculating, Version: 2}, + } + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(activeBatches, nil) + mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + + mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) + // Only batch/2 is selected by the analyzer, so only it gets a reverse-index update. + mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/2").Return(entity.BatchDependent{ + BatchID: "test-queue/batch/2", + Version: 5, + }, nil) + mockBatchDependentStore.EXPECT().UpdateDependents(gomock.Any(), "test-queue/batch/2", int32(5), int32(6), gomock.Any()).Return(nil) + mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + + // Analyzer returns duplicate Conflict entries for the same batch (different + // conflict types) to prove the controller dedupes by BatchID. + analyzer := conflictmock.NewMockAnalyzer(ctrl) + analyzer.EXPECT().Analyze(gomock.Any(), gomock.Any(), gomock.Any()).Return([]conflict.Conflict{ + {BatchID: "test-queue/batch/2", Type: conflict.ConflictTypeConservative}, + {BatchID: "test-queue/batch/2", Type: conflict.ConflictTypeTargetOverlap}, + }, nil) + + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, analyzer, nil) + + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err := controller.Process(context.Background(), delivery) + require.NoError(t, err) +} + +func TestController_Process_AnalyzerFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + request := testRequest() + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, nil) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + + analyzer := conflictmock.NewMockAnalyzer(ctrl) + analyzer.EXPECT().Analyze(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("analyzer down")) + + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, analyzer, nil) + + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err := controller.Process(context.Background(), delivery) + require.Error(t, err) +} + func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, nil) + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, nil, nil) var _ consumer.Controller = controller }