From 365fd12de3be899decdfb84f054a697a6f206c4d Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Wed, 3 Jun 2026 20:19:08 -0700 Subject: [PATCH] refactor(storage): revert per-queue Factory; storage is global MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary ### Why? PR #188 added `storage.Factory` (`For(queue) (Storage, error)`) and threaded it into every controller, leaving a `TODO(queue-aware)` in all 11 orchestrator controllers: each fell back to `stores.For("")` because storage is needed *before* the by-ID entity load, so the queue isn't known yet. That chicken-and-egg was the sole reason for the TODO. Behavioral extensions (mergechecker, buildrunner, scorer, conflict, changeprovider, pusher) are already correctly per-queue — resolved *after* the load from the loaded entity's queue. Storage, by contrast, does not vary by queue: every queue shares the same entity schema and tables. Per-queue storage, if ever needed, is better achieved by splitting the environment and wiring different stores per deployment, not an in-process Factory. ### What? - Remove the `storage.Factory` interface and delete the `staticFactory` / `NewStaticFactory` implementation (+ its test). Regenerate the storage mock (drops `MockFactory`). - Switch the 11 orchestrator controllers and the gateway `land` controller from `storage.Factory` to a single injected `storage.Storage`; drop the `stores.For("")` hack and the `TODO(queue-aware)` comments. Queue for logging and behavioral-extension resolution continues to come from the loaded entity. - Update example wiring (gateway + orchestrator `main.go`) to pass the store directly instead of `NewStaticFactory(store)`. - Behavioral-extension Factories are unchanged. No behavior change under today's single-store deployment — `For("")` already returned this exact store. ## Test Plan ✅ `make build`, `make test` (39 pass) ✅ `make gazelle` / `make mocks` / `make tidy` / `make fmt` (all in sync) --- .../submitqueue/gateway/server/BUILD.bazel | 1 - example/submitqueue/gateway/server/main.go | 8 ++-- .../submitqueue/orchestrator/server/main.go | 26 +++++----- submitqueue/extension/storage/BUILD.bazel | 13 +---- submitqueue/extension/storage/factory.go | 34 ------------- submitqueue/extension/storage/factory_test.go | 48 ------------------- .../extension/storage/mock/storage_mock.go | 39 --------------- submitqueue/extension/storage/storage.go | 12 ----- submitqueue/gateway/controller/land.go | 16 ++----- submitqueue/gateway/controller/land_test.go | 28 +++++------ .../orchestrator/controller/batch/batch.go | 7 +-- .../controller/batch/batch_test.go | 4 +- .../orchestrator/controller/build/build.go | 7 +-- .../controller/build/build_test.go | 8 ++-- .../controller/buildsignal/BUILD.bazel | 1 - .../controller/buildsignal/buildsignal.go | 7 +-- .../buildsignal/buildsignal_test.go | 3 +- .../orchestrator/controller/cancel/cancel.go | 7 +-- .../controller/cancel/cancel_test.go | 2 +- .../controller/conclude/conclude.go | 7 +-- .../controller/conclude/conclude_test.go | 2 +- .../orchestrator/controller/log/BUILD.bazel | 1 - .../orchestrator/controller/log/log.go | 7 +-- .../orchestrator/controller/log/log_test.go | 3 +- .../orchestrator/controller/merge/BUILD.bazel | 1 - .../orchestrator/controller/merge/merge.go | 7 +-- .../controller/merge/merge_test.go | 21 ++++---- .../orchestrator/controller/score/BUILD.bazel | 1 - .../orchestrator/controller/score/score.go | 7 +-- .../controller/score/score_test.go | 7 ++- .../controller/speculate/speculate.go | 7 +-- .../controller/speculate/speculate_test.go | 12 ++--- .../orchestrator/controller/start/start.go | 7 +-- .../controller/start/start_test.go | 2 +- .../controller/validate/validate.go | 7 +-- .../controller/validate/validate_test.go | 2 +- 36 files changed, 75 insertions(+), 297 deletions(-) delete mode 100644 submitqueue/extension/storage/factory.go delete mode 100644 submitqueue/extension/storage/factory_test.go diff --git a/example/submitqueue/gateway/server/BUILD.bazel b/example/submitqueue/gateway/server/BUILD.bazel index 5d38b40b..86baf961 100644 --- a/example/submitqueue/gateway/server/BUILD.bazel +++ b/example/submitqueue/gateway/server/BUILD.bazel @@ -15,7 +15,6 @@ go_library( "//extension/messagequeue/mysql", "//submitqueue/core/consumer", "//submitqueue/extension/queueconfig/yaml", - "//submitqueue/extension/storage", "//submitqueue/extension/storage/mysql", "//submitqueue/gateway/controller", "//submitqueue/gateway/protopb", diff --git a/example/submitqueue/gateway/server/main.go b/example/submitqueue/gateway/server/main.go index f5336b10..b2c814d8 100644 --- a/example/submitqueue/gateway/server/main.go +++ b/example/submitqueue/gateway/server/main.go @@ -32,7 +32,6 @@ import ( queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" "github.com/uber/submitqueue/submitqueue/core/consumer" yamlqueueconfig "github.com/uber/submitqueue/submitqueue/extension/queueconfig/yaml" - "github.com/uber/submitqueue/submitqueue/extension/storage" mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql" "github.com/uber/submitqueue/submitqueue/gateway/controller" pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" @@ -201,13 +200,12 @@ func run() error { )) // Initialize storage from the shared app database connection. The land - // controller takes a storage.Factory (static: every queue → this store); - // cancel/status use the request log store directly. + // controller writes to this store directly; cancel/status use the request + // log store directly. store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage")) if err != nil { return fmt.Errorf("failed to create storage: %w", err) } - stores := storage.NewStaticFactory(store) requestLogStore := store.GetRequestLogStore() // Load queue configurations from YAML. Path is required so the gateway @@ -223,7 +221,7 @@ func run() error { // Create controllers and wrap them for gRPC pingController := controller.NewPingController(logger, scope) - landController := controller.NewLandController(logger.Sugar(), scope, cnt, stores, queueConfigs, registry) + landController := controller.NewLandController(logger.Sugar(), scope, cnt, store, queueConfigs, registry) cancelController := controller.NewCancelController(logger.Sugar(), scope, requestLogStore, registry) statusController := controller.NewStatusController(logger.Sugar(), scope, requestLogStore) gatewayServer := &GatewayServer{ diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index 520f5667..6fc80ace 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -461,14 +461,10 @@ type conflictFactory struct{ impl conflict.Analyzer } func (f conflictFactory) For(conflict.Config) (conflict.Analyzer, error) { return f.impl, nil } func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error { - // Static storage factory: every queue resolves to the one configured store. - // The factory is the injection point for future per-queue backends. - stores := storage.NewStaticFactory(store) - requestController := start.NewController( logger, scope, - stores, + store, changeStore, registry, consumer.TopicKeyStart, @@ -481,7 +477,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t cancelController := cancel.NewController( logger, scope, - stores, + store, registry, consumer.TopicKeyCancel, "orchestrator-cancel", @@ -493,7 +489,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t validateController := validate.NewController( logger, scope, - stores, + store, changeStore, registry, mergeCheckerFactory{impl: mc}, @@ -510,7 +506,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t scope, registry, cnt, - stores, + store, // TODO: replace with a real conflict analyzer (e.g. one backed by // Tango target analysis). The "all" stub serializes the queue. conflictFactory{impl: all.New()}, @@ -524,7 +520,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t scoreController := score.NewController( logger, scope, - stores, + store, // TODO: replace with a real scorer scorerFactory{impl: heuristic.New( []heuristic.Bucket{ @@ -549,7 +545,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t speculateController := speculate.NewController( logger, scope, - stores, + store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate", @@ -561,7 +557,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t buildController := build.NewController( logger, scope, - stores, + store, buildRunnerFactory{impl: br}, registry, consumer.TopicKeyBuild, @@ -574,7 +570,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t buildsignalController := buildsignal.NewController( logger, scope, - stores, + store, buildRunnerFactory{impl: br}, registry, consumer.TopicKeyBuildSignal, @@ -587,7 +583,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t mergeController := merge.NewController( logger, scope, - stores, + store, registry, pusherFactory{impl: psh}, consumer.TopicKeyMerge, @@ -600,7 +596,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t concludeController := conclude.NewController( logger, scope, - stores, + store, registry, consumer.TopicKeyConclude, "orchestrator-conclude", @@ -612,7 +608,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logController := logctrl.NewController( logger, scope, - stores, + store, consumer.TopicKeyLog, "orchestrator-log", ) diff --git a/submitqueue/extension/storage/BUILD.bazel b/submitqueue/extension/storage/BUILD.bazel index 0fd34935..fe889fbc 100644 --- a/submitqueue/extension/storage/BUILD.bazel +++ b/submitqueue/extension/storage/BUILD.bazel @@ -1,4 +1,4 @@ -load("@rules_go//go:def.bzl", "go_library", "go_test") +load("@rules_go//go:def.bzl", "go_library") go_library( name = "storage", @@ -7,7 +7,6 @@ go_library( "batch_store.go", "build_store.go", "change_provider_store.go", - "factory.go", "request_log_store.go", "request_store.go", "speculation_tree_store.go", @@ -17,13 +16,3 @@ go_library( visibility = ["//visibility:public"], deps = ["//submitqueue/entity"], ) - -go_test( - name = "storage_test", - srcs = ["factory_test.go"], - embed = [":storage"], - deps = [ - "@com_github_stretchr_testify//assert", - "@com_github_stretchr_testify//require", - ], -) diff --git a/submitqueue/extension/storage/factory.go b/submitqueue/extension/storage/factory.go deleted file mode 100644 index f1c4fe15..00000000 --- a/submitqueue/extension/storage/factory.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -// staticFactory is the default Factory. It returns the same Storage for every -// queue, ignoring the name. It exists so call sites can route through the -// Factory contract today (a no-op) and adopt true per-queue backends later -// without further consumer changes. -type staticFactory struct { - storage Storage -} - -// NewStaticFactory returns a Factory that serves the given Storage for every -// queue name. -func NewStaticFactory(s Storage) Factory { - return staticFactory{storage: s} -} - -// For returns the configured Storage for any queue name. -func (f staticFactory) For(string) (Storage, error) { - return f.storage, nil -} diff --git a/submitqueue/extension/storage/factory_test.go b/submitqueue/extension/storage/factory_test.go deleted file mode 100644 index 0986a2f0..00000000 --- a/submitqueue/extension/storage/factory_test.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// stubStorage is a minimal Storage used to verify the factory returns the -// wrapped instance. All accessors return zero values; they are never invoked. -type stubStorage struct{} - -func (stubStorage) GetRequestStore() RequestStore { return nil } -func (stubStorage) GetChangeProviderStore() ChangeProviderStore { return nil } -func (stubStorage) GetBatchStore() BatchStore { return nil } -func (stubStorage) GetBatchDependentStore() BatchDependentStore { return nil } -func (stubStorage) GetBuildStore() BuildStore { return nil } -func (stubStorage) GetSpeculationTreeStore() SpeculationTreeStore { return nil } -func (stubStorage) GetRequestLogStore() RequestLogStore { return nil } -func (stubStorage) Close() error { return nil } - -func TestStaticFactory_For(t *testing.T) { - s := stubStorage{} - f := NewStaticFactory(s) - - t.Run("returns the wrapped storage for any name", func(t *testing.T) { - for _, name := range []string{"", "queue-a", "queue-b"} { - got, err := f.For(name) - require.NoError(t, err) - assert.Equal(t, s, got) - } - }) -} diff --git a/submitqueue/extension/storage/mock/storage_mock.go b/submitqueue/extension/storage/mock/storage_mock.go index 2d78186d..ddf0574a 100644 --- a/submitqueue/extension/storage/mock/storage_mock.go +++ b/submitqueue/extension/storage/mock/storage_mock.go @@ -151,42 +151,3 @@ func (mr *MockStorageMockRecorder) GetSpeculationTreeStore() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSpeculationTreeStore", reflect.TypeOf((*MockStorage)(nil).GetSpeculationTreeStore)) } - -// MockFactory is a mock of Factory interface. -type MockFactory struct { - ctrl *gomock.Controller - recorder *MockFactoryMockRecorder - isgomock struct{} -} - -// MockFactoryMockRecorder is the mock recorder for MockFactory. -type MockFactoryMockRecorder struct { - mock *MockFactory -} - -// NewMockFactory creates a new mock instance. -func NewMockFactory(ctrl *gomock.Controller) *MockFactory { - mock := &MockFactory{ctrl: ctrl} - mock.recorder = &MockFactoryMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockFactory) EXPECT() *MockFactoryMockRecorder { - return m.recorder -} - -// For mocks base method. -func (m *MockFactory) For(name string) (storage.Storage, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "For", name) - ret0, _ := ret[0].(storage.Storage) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// For indicates an expected call of For. -func (mr *MockFactoryMockRecorder) For(name any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "For", reflect.TypeOf((*MockFactory)(nil).For), name) -} diff --git a/submitqueue/extension/storage/storage.go b/submitqueue/extension/storage/storage.go index 1a185267..1b84c4e5 100644 --- a/submitqueue/extension/storage/storage.go +++ b/submitqueue/extension/storage/storage.go @@ -68,15 +68,3 @@ type Storage interface { // Close closes the storage and all underlying connections. Should only be called once at the end of the program. Close() error } - -// Factory returns the Storage backing a named queue. It exists so a single -// queue can be migrated to a different backend without affecting others; the -// default implementation (NewStaticFactory) returns the same Storage for -// every queue. Callers resolve the Storage per message from the queue name -// carried in the message envelope, before any entity lookup. -type Factory interface { - // For returns the Storage for the named queue. An empty name selects the - // default backend. It returns an error if no backend is configured for the - // queue. - For(name string) (Storage, error) -} diff --git a/submitqueue/gateway/controller/land.go b/submitqueue/gateway/controller/land.go index 4ffe1212..2567a049 100644 --- a/submitqueue/gateway/controller/land.go +++ b/submitqueue/gateway/controller/land.go @@ -64,7 +64,7 @@ type LandController struct { logger *zap.SugaredLogger metricsScope tally.Scope counter counter.Counter - stores storage.Factory + store storage.Storage queueConfigs queueconfig.Store registry consumer.TopicRegistry } @@ -72,12 +72,12 @@ type LandController struct { // NewLandController creates a new instance of the gateway land controller. // The controller publishes land requests to the topic registered under // consumer.TopicKeyStart in the registry. -func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, stores storage.Factory, queueConfigs queueconfig.Store, registry consumer.TopicRegistry) *LandController { +func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, store storage.Storage, queueConfigs queueconfig.Store, registry consumer.TopicRegistry) *LandController { return &LandController{ logger: logger, metricsScope: scope.SubScope("land_controller"), counter: counter, - stores: stores, + store: store, queueConfigs: queueConfigs, registry: registry, } @@ -110,14 +110,6 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *p return nil, fmt.Errorf("LandController failed to look up queue %q: %w", queue, err) } - // Resolve the storage backend for this entityqueue. The queue is known up front - // here (from the request), so this is the one place per-queue storage - // routing is actionable. - store, err := c.stores.For(queue) - if err != nil { - return nil, fmt.Errorf("LandController failed to resolve storage for queue=%s: %w", queue, err) - } - // TODO: pass default queue land strategy to resolver function to process a default. strategy, err := resolveRequestLandStrategy(req.Strategy) if err != nil { @@ -141,7 +133,7 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *p // It is important to record the status before publishing to the queue for processing. It is important to publish straight to the database and not via a entityqueue. // Gateway has to stay consistent with the request log. logEntry := entity.NewRequestLog(landRequest.ID, entity.RequestStatusAccepted, 0, "", nil) - if err := store.GetRequestLogStore().Insert(ctx, logEntry); err != nil { + if err := c.store.GetRequestLogStore().Insert(ctx, logEntry); err != nil { return nil, fmt.Errorf("LandController failed to insert request log for sqid=%s: %w", landRequest.ID, err) } diff --git a/submitqueue/gateway/controller/land_test.go b/submitqueue/gateway/controller/land_test.go index bbe38ebf..a3ac0707 100644 --- a/submitqueue/gateway/controller/land_test.go +++ b/submitqueue/gateway/controller/land_test.go @@ -62,14 +62,14 @@ func newTestRegistryWithNoopPublisher(t *testing.T, ctrl *gomock.Controller) con return registry } -// noopStorageFactory returns a storage.Factory whose RequestLogStore.Insert +// noopStorage returns a storage.Storage whose RequestLogStore.Insert // succeeds silently for any entityqueue. -func noopStorageFactory(ctrl *gomock.Controller) storage.Factory { +func noopStorage(ctrl *gomock.Controller) storage.Storage { logStore := storagemock.NewMockRequestLogStore(ctrl) logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() - return storage.NewStaticFactory(store) + return store } // noopQueueConfigStore returns a mock queueconfig.Store that always reports @@ -84,7 +84,7 @@ func TestNewLandController(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorage(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) require.NotNil(t, controller) } @@ -93,7 +93,7 @@ func TestLand_ReturnsSqid(t *testing.T) { cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorage(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -111,7 +111,7 @@ func TestLand_ReturnsErrorOnCounterFailure(t *testing.T) { cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), fmt.Errorf("counter unavailable")) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorage(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -135,7 +135,7 @@ func TestLand_CounterDomainIncludesQueue(t *testing.T) { return 1, nil }, ) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorage(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -152,7 +152,7 @@ func TestLand_ReturnsErrorOnEmptyQueue(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorage(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -169,7 +169,7 @@ func TestLand_ReturnsErrorOnEmptyChangeUri(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorage(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -186,7 +186,7 @@ func TestLand_ReturnsErrorOnNilChange(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorage(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -206,7 +206,7 @@ func TestLand_ReturnsUnrecognizedQueueWhenStoreReportsNotFound(t *testing.T) { qcs := qcmock.NewMockStore(ctrl) qcs.EXPECT().Get(gomock.Any(), "missing-queue").Return(entity.QueueConfig{}, queueconfig.ErrNotFound) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), qcs, newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorage(ctrl), qcs, newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -232,7 +232,7 @@ func TestLand_PropagatesQueueConfigStoreError(t *testing.T) { qcs := qcmock.NewMockStore(ctrl) qcs.EXPECT().Get(gomock.Any(), "test-queue").Return(entity.QueueConfig{}, fmt.Errorf("config backend down")) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), qcs, newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorage(ctrl), qcs, newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -264,7 +264,7 @@ func TestLand_PublishesToQueue(t *testing.T) { }, ) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), registry) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorage(ctrl), noopQueueConfigStore(ctrl), registry) ctx := context.Background() req := &pb.LandRequest{ @@ -300,7 +300,7 @@ func TestLand_ContinuesWhenPublishFails(t *testing.T) { registry, publisher := newTestRegistry(t, ctrl) publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("queue unavailable")) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), registry) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorage(ctrl), noopQueueConfigStore(ctrl), registry) ctx := context.Background() req := &pb.LandRequest{ diff --git a/submitqueue/orchestrator/controller/batch/batch.go b/submitqueue/orchestrator/controller/batch/batch.go index 3a3b6dc2..e111d754 100644 --- a/submitqueue/orchestrator/controller/batch/batch.go +++ b/submitqueue/orchestrator/controller/batch/batch.go @@ -53,16 +53,11 @@ func NewController( scope tally.Scope, registry consumer.TopicRegistry, counter counter.Counter, - stores storage.Factory, + store storage.Storage, analyzers conflict.Factory, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { - // TODO(queue-aware): make this controller queue-aware during Process — derive the - // queue from the loaded entity and use it for structured logging, metrics scoping, - // and per-queue storage resolution. Today it uses the default store because the - // queue is only known after the by-ID load. - store, _ := stores.For("") return &Controller{ logger: logger.Named("batch_controller"), metricsScope: scope.SubScope("batch_controller"), diff --git a/submitqueue/orchestrator/controller/batch/batch_test.go b/submitqueue/orchestrator/controller/batch/batch_test.go index f5f6c70f..d881de9f 100644 --- a/submitqueue/orchestrator/controller/batch/batch_test.go +++ b/submitqueue/orchestrator/controller/batch/batch_test.go @@ -117,7 +117,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M analyzerFactory := conflictmock.NewMockFactory(ctrl) analyzerFactory.EXPECT().For(gomock.Any()).Return(analyzer, nil).AnyTimes() - return NewController(logger, scope, registry, cnt, storage.NewStaticFactory(mockStorage), analyzerFactory, consumer.TopicKeyBatch, "orchestrator-batch") + return NewController(logger, scope, registry, cnt, mockStorage, analyzerFactory, consumer.TopicKeyBatch, "orchestrator-batch") } func TestNewController(t *testing.T) { @@ -443,7 +443,7 @@ func TestController_Process_CASLostToCancel(t *testing.T) { analyzerFactory.EXPECT().For(gomock.Any()).Return(all.New(), nil).AnyTimes() controller := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, registry, newSequentialCounter(ctrl), - storage.NewStaticFactory(mockStorage), analyzerFactory, consumer.TopicKeyBatch, "orchestrator-batch", + mockStorage, analyzerFactory, consumer.TopicKeyBatch, "orchestrator-batch", ) msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) diff --git a/submitqueue/orchestrator/controller/build/build.go b/submitqueue/orchestrator/controller/build/build.go index 6e60fbc7..6a9ac930 100644 --- a/submitqueue/orchestrator/controller/build/build.go +++ b/submitqueue/orchestrator/controller/build/build.go @@ -49,17 +49,12 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - stores storage.Factory, + store storage.Storage, buildRunners buildrunner.Factory, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { - // TODO(queue-aware): make this controller queue-aware during Process — derive the - // queue from the loaded entity and use it for structured logging, metrics scoping, - // and per-queue storage resolution. Today it uses the default store because the - // queue is only known after the by-ID load. - store, _ := stores.For("") return &Controller{ logger: logger.Named("build_controller"), metricsScope: scope.SubScope("build_controller"), diff --git a/submitqueue/orchestrator/controller/build/build_test.go b/submitqueue/orchestrator/controller/build/build_test.go index 520c947d..72ed0dc0 100644 --- a/submitqueue/orchestrator/controller/build/build_test.go +++ b/submitqueue/orchestrator/controller/build/build_test.go @@ -104,7 +104,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock ) require.NoError(t, err) - return NewController(logger, scope, storage.NewStaticFactory(store), staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") + return NewController(logger, scope, store, staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") } func TestNewController(t *testing.T) { @@ -205,7 +205,7 @@ func TestController_Process_TriggersWithBaseAndHead(t *testing.T) { ) require.NoError(t, err) - controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, storage.NewStaticFactory(store), staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") msg := entityqueue.NewMessage(headBatch.ID, batchIDPayload(t, headBatch.ID), headBatch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -263,7 +263,7 @@ func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) { []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}}, ) require.NoError(t, err) - controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, storage.NewStaticFactory(store), staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -295,7 +295,7 @@ func TestController_Process_TriggerFailure(t *testing.T) { []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: queuemock.NewMockQueue(ctrl)}}, ) require.NoError(t, err) - controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, storage.NewStaticFactory(store), staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) diff --git a/submitqueue/orchestrator/controller/buildsignal/BUILD.bazel b/submitqueue/orchestrator/controller/buildsignal/BUILD.bazel index e3ca520a..1481d473 100644 --- a/submitqueue/orchestrator/controller/buildsignal/BUILD.bazel +++ b/submitqueue/orchestrator/controller/buildsignal/BUILD.bazel @@ -29,7 +29,6 @@ go_test( "//submitqueue/core/consumer", "//submitqueue/entity", "//submitqueue/extension/buildrunner/mock", - "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/submitqueue/orchestrator/controller/buildsignal/buildsignal.go b/submitqueue/orchestrator/controller/buildsignal/buildsignal.go index cf2b9960..9ea902e4 100644 --- a/submitqueue/orchestrator/controller/buildsignal/buildsignal.go +++ b/submitqueue/orchestrator/controller/buildsignal/buildsignal.go @@ -71,17 +71,12 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - stores storage.Factory, + store storage.Storage, buildRunners buildrunner.Factory, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { - // TODO(queue-aware): make this controller queue-aware during Process — derive the - // queue from the loaded entity and use it for structured logging, metrics scoping, - // and per-queue storage resolution. Today it uses the default store because the - // queue is only known after the by-ID load. - store, _ := stores.For("") return &Controller{ logger: logger.Named("buildsignal_controller"), metricsScope: scope.SubScope("buildsignal_controller"), diff --git a/submitqueue/orchestrator/controller/buildsignal/buildsignal_test.go b/submitqueue/orchestrator/controller/buildsignal/buildsignal_test.go index 5d46ad0b..afc02e32 100644 --- a/submitqueue/orchestrator/controller/buildsignal/buildsignal_test.go +++ b/submitqueue/orchestrator/controller/buildsignal/buildsignal_test.go @@ -28,7 +28,6 @@ import ( "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" buildrunnermock "github.com/uber/submitqueue/submitqueue/extension/buildrunner/mock" - "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" @@ -74,7 +73,7 @@ func newTestHarness(t *testing.T, ctrl *gomock.Controller) *testHarness { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - storage.NewStaticFactory(store), + store, brFactory, registry, consumer.TopicKeyBuildSignal, diff --git a/submitqueue/orchestrator/controller/cancel/cancel.go b/submitqueue/orchestrator/controller/cancel/cancel.go index 93e693a7..cddd2336 100644 --- a/submitqueue/orchestrator/controller/cancel/cancel.go +++ b/submitqueue/orchestrator/controller/cancel/cancel.go @@ -84,16 +84,11 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - stores storage.Factory, + store storage.Storage, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { - // TODO(queue-aware): make this controller queue-aware during Process — derive the - // queue from the loaded entity and use it for structured logging, metrics scoping, - // and per-queue storage resolution. Today it uses the default store because the - // queue is only known after the by-ID load. - store, _ := stores.For("") return &Controller{ logger: logger.Named("cancel_controller"), metricsScope: scope.SubScope("cancel_controller"), diff --git a/submitqueue/orchestrator/controller/cancel/cancel_test.go b/submitqueue/orchestrator/controller/cancel/cancel_test.go index ca8c56f9..82308c22 100644 --- a/submitqueue/orchestrator/controller/cancel/cancel_test.go +++ b/submitqueue/orchestrator/controller/cancel/cancel_test.go @@ -59,7 +59,7 @@ func newRegistry(t *testing.T, ctrl *gomock.Controller) (consumer.TopicRegistry, } func newController(t *testing.T, store storage.Storage, registry consumer.TopicRegistry) *Controller { - return NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, storage.NewStaticFactory(store), registry, consumer.TopicKeyCancel, "orchestrator-cancel") + return NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, registry, consumer.TopicKeyCancel, "orchestrator-cancel") } func newDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte, partitionKey string) consumer.Delivery { diff --git a/submitqueue/orchestrator/controller/conclude/conclude.go b/submitqueue/orchestrator/controller/conclude/conclude.go index 8bde94d0..21a23270 100644 --- a/submitqueue/orchestrator/controller/conclude/conclude.go +++ b/submitqueue/orchestrator/controller/conclude/conclude.go @@ -45,16 +45,11 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - stores storage.Factory, + store storage.Storage, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { - // TODO(queue-aware): make this controller queue-aware during Process — derive the - // queue from the loaded entity and use it for structured logging, metrics scoping, - // and per-queue storage resolution. Today it uses the default store because the - // queue is only known after the by-ID load. - store, _ := stores.For("") return &Controller{ logger: logger.Named("conclude_controller"), metricsScope: scope.SubScope("conclude_controller"), diff --git a/submitqueue/orchestrator/controller/conclude/conclude_test.go b/submitqueue/orchestrator/controller/conclude/conclude_test.go index 1a86440c..18d10c73 100644 --- a/submitqueue/orchestrator/controller/conclude/conclude_test.go +++ b/submitqueue/orchestrator/controller/conclude/conclude_test.go @@ -56,7 +56,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *stora registry, err := consumer.NewTopicRegistry(nil) require.NoError(t, err) - return NewController(logger, scope, storage.NewStaticFactory(mockStorage), registry, consumer.TopicKeyConclude, "orchestrator-conclude") + return NewController(logger, scope, mockStorage, registry, consumer.TopicKeyConclude, "orchestrator-conclude") } func TestNewController(t *testing.T) { diff --git a/submitqueue/orchestrator/controller/log/BUILD.bazel b/submitqueue/orchestrator/controller/log/BUILD.bazel index d9594ff6..f700931c 100644 --- a/submitqueue/orchestrator/controller/log/BUILD.bazel +++ b/submitqueue/orchestrator/controller/log/BUILD.bazel @@ -24,7 +24,6 @@ go_test( "//extension/messagequeue/mock", "//submitqueue/core/consumer", "//submitqueue/entity", - "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/submitqueue/orchestrator/controller/log/log.go b/submitqueue/orchestrator/controller/log/log.go index 77fece36..3c6212dd 100644 --- a/submitqueue/orchestrator/controller/log/log.go +++ b/submitqueue/orchestrator/controller/log/log.go @@ -44,15 +44,10 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - stores storage.Factory, + store storage.Storage, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { - // TODO(queue-aware): make this controller queue-aware during Process — derive the - // queue from the loaded entity and use it for structured logging, metrics scoping, - // and per-queue storage resolution. Today it uses the default store because the - // queue is only known after the by-ID load. - store, _ := stores.For("") return &Controller{ logger: logger.Named("log_controller"), metricsScope: scope.SubScope("log_controller"), diff --git a/submitqueue/orchestrator/controller/log/log_test.go b/submitqueue/orchestrator/controller/log/log_test.go index c77c8622..f8a1319d 100644 --- a/submitqueue/orchestrator/controller/log/log_test.go +++ b/submitqueue/orchestrator/controller/log/log_test.go @@ -25,7 +25,6 @@ import ( queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" - "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" @@ -36,7 +35,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope - return NewController(logger, scope, storage.NewStaticFactory(store), consumer.TopicKeyLog, "orchestrator-log") + return NewController(logger, scope, store, consumer.TopicKeyLog, "orchestrator-log") } func TestController_Process(t *testing.T) { diff --git a/submitqueue/orchestrator/controller/merge/BUILD.bazel b/submitqueue/orchestrator/controller/merge/BUILD.bazel index ea6c5f9c..d2424dd0 100644 --- a/submitqueue/orchestrator/controller/merge/BUILD.bazel +++ b/submitqueue/orchestrator/controller/merge/BUILD.bazel @@ -29,7 +29,6 @@ go_test( "//submitqueue/entity", "//submitqueue/extension/pusher", "//submitqueue/extension/pusher/mock", - "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/submitqueue/orchestrator/controller/merge/merge.go b/submitqueue/orchestrator/controller/merge/merge.go index 117900e3..ee04b302 100644 --- a/submitqueue/orchestrator/controller/merge/merge.go +++ b/submitqueue/orchestrator/controller/merge/merge.go @@ -56,17 +56,12 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - stores storage.Factory, + store storage.Storage, registry consumer.TopicRegistry, pushers pusher.Factory, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { - // TODO(queue-aware): make this controller queue-aware during Process — derive the - // queue from the loaded entity and use it for structured logging, metrics scoping, - // and per-queue storage resolution. Today it uses the default store because the - // queue is only known after the by-ID load. - store, _ := stores.For("") return &Controller{ logger: logger.Named("merge_controller"), metricsScope: scope.SubScope("merge_controller"), diff --git a/submitqueue/orchestrator/controller/merge/merge_test.go b/submitqueue/orchestrator/controller/merge/merge_test.go index 36b1e3c2..09845a96 100644 --- a/submitqueue/orchestrator/controller/merge/merge_test.go +++ b/submitqueue/orchestrator/controller/merge/merge_test.go @@ -32,7 +32,6 @@ import ( "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/pusher" pushermock "github.com/uber/submitqueue/submitqueue/extension/pusher/mock" - "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" ) @@ -81,7 +80,7 @@ func TestNewController(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - storage.NewStaticFactory(store), + store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, pushermock.NewMockPusher(ctrl)), consumer.TopicKeyMerge, @@ -144,7 +143,7 @@ func TestController_Process_SuccessfulMerge(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - storage.NewStaticFactory(store), + store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, mockPusher), consumer.TopicKeyMerge, @@ -208,7 +207,7 @@ func TestController_Process_PassesAllChangesInBatchOrder(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - storage.NewStaticFactory(store), + store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, mockPusher), consumer.TopicKeyMerge, @@ -255,7 +254,7 @@ func TestController_Process_PushConflictMarksBatchFailed(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - storage.NewStaticFactory(store), + store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, mockPusher), consumer.TopicKeyMerge, @@ -301,7 +300,7 @@ func TestController_Process_PushInfraFailureReturnsError(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - storage.NewStaticFactory(store), + store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, mockPusher), consumer.TopicKeyMerge, @@ -342,7 +341,7 @@ func TestController_Process_TerminalBatchSkipsPushButFansOut(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - storage.NewStaticFactory(store), + store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, mockPusher), consumer.TopicKeyMerge, @@ -392,7 +391,7 @@ func TestController_Process_CancellingShortCircuit(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - storage.NewStaticFactory(store), + store, registry, newPusherFactory(ctrl, mockPusher), consumer.TopicKeyMerge, @@ -417,7 +416,7 @@ func TestController_Process_BatchStoreGetFailureNotRetryable(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - storage.NewStaticFactory(store), + store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, pushermock.NewMockPusher(ctrl)), consumer.TopicKeyMerge, @@ -456,7 +455,7 @@ func TestController_Process_RequestStoreFailurePropagates(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - storage.NewStaticFactory(store), + store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, pushermock.NewMockPusher(ctrl)), consumer.TopicKeyMerge, @@ -504,7 +503,7 @@ func TestController_Process_PublishFailureSurfaces(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - storage.NewStaticFactory(store), + store, newRegistry(t, ctrl, fmt.Errorf("queue down")), newPusherFactory(ctrl, mockPusher), consumer.TopicKeyMerge, diff --git a/submitqueue/orchestrator/controller/score/BUILD.bazel b/submitqueue/orchestrator/controller/score/BUILD.bazel index edaf8847..e1f9aa49 100644 --- a/submitqueue/orchestrator/controller/score/BUILD.bazel +++ b/submitqueue/orchestrator/controller/score/BUILD.bazel @@ -29,7 +29,6 @@ go_test( "//submitqueue/core/consumer", "//submitqueue/entity", "//submitqueue/extension/scorer/mock", - "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/submitqueue/orchestrator/controller/score/score.go b/submitqueue/orchestrator/controller/score/score.go index 702bdd75..2b1c3e61 100644 --- a/submitqueue/orchestrator/controller/score/score.go +++ b/submitqueue/orchestrator/controller/score/score.go @@ -50,17 +50,12 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - stores storage.Factory, + store storage.Storage, scorers scorer.Factory, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { - // TODO(queue-aware): make this controller queue-aware during Process — derive the - // queue from the loaded entity and use it for structured logging, metrics scoping, - // and per-queue storage resolution. Today it uses the default store because the - // queue is only known after the by-ID load. - store, _ := stores.For("") return &Controller{ logger: logger.Named("score_controller"), metricsScope: scope.SubScope("score_controller"), diff --git a/submitqueue/orchestrator/controller/score/score_test.go b/submitqueue/orchestrator/controller/score/score_test.go index 7f63555a..aec0c974 100644 --- a/submitqueue/orchestrator/controller/score/score_test.go +++ b/submitqueue/orchestrator/controller/score/score_test.go @@ -28,7 +28,6 @@ import ( "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" scorermock "github.com/uber/submitqueue/submitqueue/extension/scorer/mock" - "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" @@ -107,7 +106,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock scorerFactory := scorermock.NewMockFactory(ctrl) scorerFactory.EXPECT().For(gomock.Any()).Return(scorer, nil).AnyTimes() - return NewController(logger, scope, storage.NewStaticFactory(store), scorerFactory, registry, consumer.TopicKeyScore, "orchestrator-score") + return NewController(logger, scope, store, scorerFactory, registry, consumer.TopicKeyScore, "orchestrator-score") } func TestNewController(t *testing.T) { @@ -360,7 +359,7 @@ func TestController_Process_TerminalShortCircuit(t *testing.T) { scorerFactory := scorermock.NewMockFactory(ctrl) scorerFactory.EXPECT().For(gomock.Any()).Return(mockScorer, nil).AnyTimes() - controller := NewController(logger, scope, storage.NewStaticFactory(mockStorage), scorerFactory, registry, consumer.TopicKeyScore, "orchestrator-score") + controller := NewController(logger, scope, mockStorage, scorerFactory, registry, consumer.TopicKeyScore, "orchestrator-score") msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -410,7 +409,7 @@ func TestController_Process_CancellingShortCircuit(t *testing.T) { scorerFactory := scorermock.NewMockFactory(ctrl) scorerFactory.EXPECT().For(gomock.Any()).Return(mockScorer, nil).AnyTimes() - controller := NewController(logger, scope, storage.NewStaticFactory(mockStorage), scorerFactory, registry, consumer.TopicKeyScore, "orchestrator-score") + controller := NewController(logger, scope, mockStorage, scorerFactory, registry, consumer.TopicKeyScore, "orchestrator-score") msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) diff --git a/submitqueue/orchestrator/controller/speculate/speculate.go b/submitqueue/orchestrator/controller/speculate/speculate.go index 0760f830..05471979 100644 --- a/submitqueue/orchestrator/controller/speculate/speculate.go +++ b/submitqueue/orchestrator/controller/speculate/speculate.go @@ -71,16 +71,11 @@ const opName = "process" func NewController( logger *zap.SugaredLogger, scope tally.Scope, - stores storage.Factory, + store storage.Storage, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { - // TODO(queue-aware): make this controller queue-aware during Process — derive the - // queue from the loaded entity and use it for structured logging, metrics scoping, - // and per-queue storage resolution. Today it uses the default store because the - // queue is only known after the by-ID load. - store, _ := stores.For("") return &Controller{ logger: logger.Named("speculate_controller"), metricsScope: scope.SubScope("speculate_controller"), diff --git a/submitqueue/orchestrator/controller/speculate/speculate_test.go b/submitqueue/orchestrator/controller/speculate/speculate_test.go index 6a684696..2342bd17 100644 --- a/submitqueue/orchestrator/controller/speculate/speculate_test.go +++ b/submitqueue/orchestrator/controller/speculate/speculate_test.go @@ -77,7 +77,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock ) require.NoError(t, err) - return NewController(logger, scope, storage.NewStaticFactory(store), registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + return NewController(logger, scope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") } // runProcess builds a delivery for batchID and invokes Process once. @@ -276,7 +276,7 @@ func TestController_Process_TerminalSelfHeals(t *testing.T) { require.NoError(t, err) logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, storage.NewStaticFactory(store), registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) }) @@ -331,7 +331,7 @@ func TestController_Process_CancelledTerminalSelfHealsDependents(t *testing.T) { require.NoError(t, err) logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, storage.NewStaticFactory(store), registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -399,7 +399,7 @@ func TestController_Process_CancellingTerminalFlow(t *testing.T) { require.NoError(t, err) logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, storage.NewStaticFactory(store), registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -508,7 +508,7 @@ func TestController_Process_CancellingNoDependents(t *testing.T) { require.NoError(t, err) logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, storage.NewStaticFactory(store), registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) } @@ -549,7 +549,7 @@ func TestController_Process_CancellingTerminalCASVersionMismatch(t *testing.T) { require.NoError(t, err) logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, storage.NewStaticFactory(store), registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") err = runProcess(t, ctrl, controller, batch.ID) require.Error(t, err) diff --git a/submitqueue/orchestrator/controller/start/start.go b/submitqueue/orchestrator/controller/start/start.go index 889481d0..5349835f 100644 --- a/submitqueue/orchestrator/controller/start/start.go +++ b/submitqueue/orchestrator/controller/start/start.go @@ -54,17 +54,12 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - stores storage.Factory, + store storage.Storage, changeStore changestore.ChangeStore, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { - // TODO(queue-aware): make this controller queue-aware during Process — derive the - // queue from the loaded entity and use it for structured logging, metrics scoping, - // and per-queue storage resolution. Today it uses the default store because the - // queue is only known after the by-ID load. - store, _ := stores.For("") return &Controller{ logger: logger.Named("start_controller"), metricsScope: scope.SubScope("start_controller"), diff --git a/submitqueue/orchestrator/controller/start/start_test.go b/submitqueue/orchestrator/controller/start/start_test.go index f7c9a6fa..05995542 100644 --- a/submitqueue/orchestrator/controller/start/start_test.go +++ b/submitqueue/orchestrator/controller/start/start_test.go @@ -63,7 +63,7 @@ func newTestController( ) require.NoError(t, err) - return NewController(logger, scope, storage.NewStaticFactory(store), cs, registry, consumer.TopicKeyStart, "orchestrator-start") + return NewController(logger, scope, store, cs, registry, consumer.TopicKeyStart, "orchestrator-start") } // newMockStorage creates a MockStorage with a MockRequestStore that succeeds on Create. diff --git a/submitqueue/orchestrator/controller/validate/validate.go b/submitqueue/orchestrator/controller/validate/validate.go index 22e581a6..71615eb9 100644 --- a/submitqueue/orchestrator/controller/validate/validate.go +++ b/submitqueue/orchestrator/controller/validate/validate.go @@ -55,7 +55,7 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - stores storage.Factory, + store storage.Storage, changeStore changestore.ChangeStore, registry consumer.TopicRegistry, mergeCheckers mergechecker.Factory, @@ -63,11 +63,6 @@ func NewController( topicKey consumer.TopicKey, consumerGroup string, ) *Controller { - // TODO(queue-aware): make this controller queue-aware during Process — derive the - // queue from the loaded entity and use it for structured logging, metrics scoping, - // and per-queue storage resolution. Today it uses the default store because the - // queue is only known after the by-ID load. - store, _ := stores.For("") return &Controller{ logger: logger.Named("validate_controller"), metricsScope: scope.SubScope("validate_controller"), diff --git a/submitqueue/orchestrator/controller/validate/validate_test.go b/submitqueue/orchestrator/controller/validate/validate_test.go index 2556d0fc..c8e8850a 100644 --- a/submitqueue/orchestrator/controller/validate/validate_test.go +++ b/submitqueue/orchestrator/controller/validate/validate_test.go @@ -124,7 +124,7 @@ func newTestController( cpFactory := changeprovidermock.NewMockFactory(ctrl) cpFactory.EXPECT().For(gomock.Any()).Return(cp, nil).AnyTimes() - return NewController(logger, scope, storage.NewStaticFactory(store), cs, registry, mcFactory, cpFactory, consumer.TopicKeyValidate, "orchestrator-validate") + return NewController(logger, scope, store, cs, registry, mcFactory, cpFactory, consumer.TopicKeyValidate, "orchestrator-validate") } func TestNewController(t *testing.T) {