From beb1f9d544a154a11897114fb5683b7dfef5eb76 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Wed, 3 Jun 2026 15:49:39 -0700 Subject: [PATCH] refactor(extensions): per-queue Factory; drop queue from method signatures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary ### Why? Behavioral extensions were global singletons frozen at process start, and two leaked the queue into vendor-agnostic method signatures (mergechecker.Check, buildrunner.Trigger). Each extension should instead be produced by a Factory the controller asks for using the queue, identified only by name, so an implementation can be selected/configured per queue. ### What? - Reduce entity.QueueConfig to {Name} (the registry of valid queues); all behavioral/VCS config moves into the factory implementations. - Each extension (changeprovider, pusher, scorer, conflict, mergechecker, buildrunner) ships a narrow Config{QueueName} and a Factory interface (For(cfg) (T, error)); concrete factories are written by integrators in the example wiring and tests. Generated mocks updated. - Drop the queue argument from mergechecker.Check(ctx, change) and buildrunner.Trigger(ctx, base, head, metadata); delete mergechecker MultiChecker. Status/Cancel stay buildID-keyed. - Add storage.Factory (default mysql) and thread it into every controller; the gateway land controller resolves For(req.Queue) — the one place per-queue storage routing is actionable. Orchestrator controllers carry a TODO(queue-aware) to derive the queue from the loaded entity for logging/metrics/storage. - Controllers resolve each extension from its Factory using the loaded entity's queue; buildsignal fetches the batch before polling Status. Example servers wire concrete static factory structs. ## Test Plan ✅ make build, make test (40 pass) ✅ make fmt / lint / check-gazelle / check-tidy / check-mocks --- Makefile | 2 +- .../submitqueue/gateway/server/BUILD.bazel | 1 + example/submitqueue/gateway/server/main.go | 14 ++- .../submitqueue/gateway/server/queues.yaml | 30 +----- .../orchestrator/server/BUILD.bazel | 2 + .../submitqueue/orchestrator/server/main.go | 83 +++++++++++----- submitqueue/entity/queue_config.go | 49 +--------- .../extension/buildrunner/build_runner.go | 20 +++- .../extension/buildrunner/mock/BUILD.bazel | 1 + .../buildrunner/mock/build_runner_mock.go | 48 ++++++++- .../extension/buildrunner/noop/noop.go | 2 +- .../extension/buildrunner/noop/noop_test.go | 4 +- .../changeprovider/change_provider.go | 17 ++++ .../extension/changeprovider/mock/BUILD.bazel | 13 +++ .../mock/change_provider_mock.go | 97 +++++++++++++++++++ submitqueue/extension/conflict/conflict.go | 15 +++ .../extension/conflict/mock/conflict_mock.go | 39 ++++++++ .../extension/mergechecker/BUILD.bazel | 18 +--- .../extension/mergechecker/github/checker.go | 3 +- .../mergechecker/github/checker_test.go | 10 +- .../extension/mergechecker/mergechecker.go | 17 +++- .../mergechecker/mock/mergechecker_mock.go | 47 ++++++++- submitqueue/extension/mergechecker/multi.go | 57 ----------- .../extension/mergechecker/multi_test.go | 82 ---------------- .../extension/pusher/mock/pusher_mock.go | 39 ++++++++ submitqueue/extension/pusher/pusher.go | 15 +++ submitqueue/extension/queueconfig/README.md | 2 +- .../extension/queueconfig/yaml/yaml_test.go | 6 -- submitqueue/extension/scorer/mock/BUILD.bazel | 1 + .../extension/scorer/mock/scorer_mock.go | 40 ++++++++ submitqueue/extension/scorer/scorer.go | 15 +++ submitqueue/extension/storage/BUILD.bazel | 13 ++- submitqueue/extension/storage/factory.go | 34 +++++++ submitqueue/extension/storage/factory_test.go | 48 +++++++++ .../extension/storage/mock/storage_mock.go | 39 ++++++++ submitqueue/extension/storage/storage.go | 12 +++ submitqueue/gateway/controller/land.go | 36 ++++--- submitqueue/gateway/controller/land_test.go | 36 ++++--- .../orchestrator/controller/batch/batch.go | 20 +++- .../controller/batch/batch_test.go | 9 +- .../orchestrator/controller/build/build.go | 22 +++-- .../controller/build/build_test.go | 22 +++-- .../controller/buildsignal/BUILD.bazel | 1 + .../controller/buildsignal/buildsignal.go | 34 +++++-- .../buildsignal/buildsignal_test.go | 8 +- .../orchestrator/controller/cancel/cancel.go | 7 +- .../controller/cancel/cancel_test.go | 2 +- .../controller/conclude/conclude.go | 7 +- .../controller/conclude/conclude_test.go | 2 +- .../orchestrator/controller/log/BUILD.bazel | 1 + .../orchestrator/controller/log/log.go | 7 +- .../orchestrator/controller/log/log_test.go | 3 +- .../orchestrator/controller/merge/BUILD.bazel | 1 + .../orchestrator/controller/merge/merge.go | 21 +++- .../controller/merge/merge_test.go | 48 +++++---- .../orchestrator/controller/score/BUILD.bazel | 1 + .../orchestrator/controller/score/score.go | 19 +++- .../controller/score/score_test.go | 14 ++- .../controller/speculate/speculate.go | 7 +- .../controller/speculate/speculate_test.go | 12 +-- .../orchestrator/controller/start/start.go | 7 +- .../controller/start/start_test.go | 2 +- .../controller/validate/BUILD.bazel | 1 + .../controller/validate/validate.go | 61 +++++++----- .../controller/validate/validate_test.go | 14 ++- 65 files changed, 936 insertions(+), 424 deletions(-) create mode 100644 submitqueue/extension/changeprovider/mock/BUILD.bazel create mode 100644 submitqueue/extension/changeprovider/mock/change_provider_mock.go delete mode 100644 submitqueue/extension/mergechecker/multi.go delete mode 100644 submitqueue/extension/mergechecker/multi_test.go create mode 100644 submitqueue/extension/storage/factory.go create mode 100644 submitqueue/extension/storage/factory_test.go diff --git a/Makefile b/Makefile index e010ef5c..7e2411de 100644 --- a/Makefile +++ b/Makefile @@ -334,7 +334,7 @@ local-stovepipe-gateway-start: build-stovepipe-gateway-linux ## Start Stovepipe mocks: ## Generate mock files using mockgen @echo "Generating mocks..." - @$(BAZEL) run @rules_go//go -- generate ./submitqueue/extension/storage/... ./submitqueue/extension/buildrunner/... ./submitqueue/extension/changestore/... ./extension/counter/... ./extension/messagequeue/... ./submitqueue/extension/queueconfig/... ./submitqueue/extension/mergechecker/... ./submitqueue/extension/pusher/... ./submitqueue/extension/scorer/... ./submitqueue/extension/conflict/... ./submitqueue/core/consumer/... + @$(BAZEL) run @rules_go//go -- generate ./submitqueue/extension/storage/... ./submitqueue/extension/buildrunner/... ./submitqueue/extension/changeprovider/... ./submitqueue/extension/changestore/... ./extension/counter/... ./extension/messagequeue/... ./submitqueue/extension/queueconfig/... ./submitqueue/extension/mergechecker/... ./submitqueue/extension/pusher/... ./submitqueue/extension/scorer/... ./submitqueue/extension/conflict/... ./submitqueue/core/consumer/... @echo "Mocks generated successfully!" proto: ## Generate protobuf files from .proto definitions diff --git a/example/submitqueue/gateway/server/BUILD.bazel b/example/submitqueue/gateway/server/BUILD.bazel index 86baf961..5d38b40b 100644 --- a/example/submitqueue/gateway/server/BUILD.bazel +++ b/example/submitqueue/gateway/server/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//extension/messagequeue/mysql", "//submitqueue/core/consumer", "//submitqueue/extension/queueconfig/yaml", + "//submitqueue/extension/storage", "//submitqueue/extension/storage/mysql", "//submitqueue/gateway/controller", "//submitqueue/gateway/protopb", diff --git a/example/submitqueue/gateway/server/main.go b/example/submitqueue/gateway/server/main.go index 368d05f1..f5336b10 100644 --- a/example/submitqueue/gateway/server/main.go +++ b/example/submitqueue/gateway/server/main.go @@ -32,6 +32,7 @@ import ( queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" "github.com/uber/submitqueue/submitqueue/core/consumer" yamlqueueconfig "github.com/uber/submitqueue/submitqueue/extension/queueconfig/yaml" + "github.com/uber/submitqueue/submitqueue/extension/storage" mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql" "github.com/uber/submitqueue/submitqueue/gateway/controller" pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" @@ -199,8 +200,15 @@ func run() error { }, )) - // Initialize request log store from shared app database connection - requestLogStore := mysqlstorage.NewRequestLogStore(appDB, scope.SubScope("request_log_store")) + // Initialize storage from the shared app database connection. The land + // controller takes a storage.Factory (static: every queue → this store); + // cancel/status use the request log store directly. + store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage")) + if err != nil { + return fmt.Errorf("failed to create storage: %w", err) + } + stores := storage.NewStaticFactory(store) + requestLogStore := store.GetRequestLogStore() // Load queue configurations from YAML. Path is required so the gateway // can reject requests for unknown queues at the edge. @@ -215,7 +223,7 @@ func run() error { // Create controllers and wrap them for gRPC pingController := controller.NewPingController(logger, scope) - landController := controller.NewLandController(logger.Sugar(), scope, cnt, requestLogStore, queueConfigs, registry) + landController := controller.NewLandController(logger.Sugar(), scope, cnt, stores, queueConfigs, registry) cancelController := controller.NewCancelController(logger.Sugar(), scope, requestLogStore, registry) statusController := controller.NewStatusController(logger.Sugar(), scope, requestLogStore) gatewayServer := &GatewayServer{ diff --git a/example/submitqueue/gateway/server/queues.yaml b/example/submitqueue/gateway/server/queues.yaml index 3831dc44..5caa3219 100644 --- a/example/submitqueue/gateway/server/queues.yaml +++ b/example/submitqueue/gateway/server/queues.yaml @@ -1,31 +1,9 @@ # Example queue configurations consumed by the gateway YAML store. -# The gateway loads this file at startup; QUEUE_CONFIG_PATH must point -# at it. Each entry maps a queue name to the VCS repository + target -# branch and selects the extension implementations used downstream. +# The gateway loads this file at startup; QUEUE_CONFIG_PATH must point at it. +# QueueConfig is just the registry of valid queue names — all behavioral and +# VCS configuration lives in the extension factory implementations wired in the +# server, not here. queues: - name: test-queue - vcs_type: git - vcs_address: git@github.com:uber/submitqueue.git - target: main - build_runner: buildkite.com/uber/submitqueue-ci - change_provider: github - merge_checker: github - land_provider: github - - name: e2e-test-queue - vcs_type: git - vcs_address: git@github.com:uber/submitqueue.git - target: main - build_runner: buildkite.com/uber/submitqueue-ci - change_provider: github - merge_checker: github - land_provider: github - - name: e2e-cancel-queue - vcs_type: git - vcs_address: git@github.com:uber/submitqueue.git - target: main - build_runner: buildkite.com/uber/submitqueue-ci - change_provider: github - merge_checker: github - land_provider: github diff --git a/example/submitqueue/orchestrator/server/BUILD.bazel b/example/submitqueue/orchestrator/server/BUILD.bazel index deb18c87..93c13e67 100644 --- a/example/submitqueue/orchestrator/server/BUILD.bazel +++ b/example/submitqueue/orchestrator/server/BUILD.bazel @@ -26,11 +26,13 @@ go_library( "//submitqueue/extension/changeprovider/github", "//submitqueue/extension/changestore", "//submitqueue/extension/changestore/mysql", + "//submitqueue/extension/conflict", "//submitqueue/extension/conflict/all", "//submitqueue/extension/mergechecker", "//submitqueue/extension/mergechecker/github", "//submitqueue/extension/pusher", "//submitqueue/extension/pusher/git", + "//submitqueue/extension/scorer", "//submitqueue/extension/scorer/heuristic", "//submitqueue/extension/storage", "//submitqueue/extension/storage/mysql", diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index 3cc0ebdd..520f5667 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -45,11 +45,13 @@ import ( githubprovider "github.com/uber/submitqueue/submitqueue/extension/changeprovider/github" "github.com/uber/submitqueue/submitqueue/extension/changestore" mysqlchangestore "github.com/uber/submitqueue/submitqueue/extension/changestore/mysql" + "github.com/uber/submitqueue/submitqueue/extension/conflict" "github.com/uber/submitqueue/submitqueue/extension/conflict/all" "github.com/uber/submitqueue/submitqueue/extension/mergechecker" githubchecker "github.com/uber/submitqueue/submitqueue/extension/mergechecker/github" "github.com/uber/submitqueue/submitqueue/extension/pusher" gitpusher "github.com/uber/submitqueue/submitqueue/extension/pusher/git" + "github.com/uber/submitqueue/submitqueue/extension/scorer" "github.com/uber/submitqueue/submitqueue/extension/scorer/heuristic" "github.com/uber/submitqueue/submitqueue/extension/storage" mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql" @@ -425,11 +427,48 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe // │ │ │ // └────────┴───────────────────────┘ +// Static per-extension factories for the example server: every queue resolves +// to the single configured implementation. A real deployment would vary the +// returned implementation by cfg.QueueName (and inject per-queue config). +type changeProviderFactory struct{ impl changeprovider.ChangeProvider } + +func (f changeProviderFactory) For(changeprovider.Config) (changeprovider.ChangeProvider, error) { + return f.impl, nil +} + +type mergeCheckerFactory struct{ impl mergechecker.MergeChecker } + +func (f mergeCheckerFactory) For(mergechecker.Config) (mergechecker.MergeChecker, error) { + return f.impl, nil +} + +type pusherFactory struct{ impl pusher.Pusher } + +func (f pusherFactory) For(pusher.Config) (pusher.Pusher, error) { return f.impl, nil } + +type buildRunnerFactory struct{ impl buildrunner.BuildRunner } + +func (f buildRunnerFactory) For(buildrunner.Config) (buildrunner.BuildRunner, error) { + return f.impl, nil +} + +type scorerFactory struct{ impl scorer.Scorer } + +func (f scorerFactory) For(scorer.Config) (scorer.Scorer, error) { return f.impl, nil } + +type conflictFactory struct{ impl conflict.Analyzer } + +func (f conflictFactory) For(conflict.Config) (conflict.Analyzer, error) { return f.impl, nil } + func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error { + // Static storage factory: every queue resolves to the one configured store. + // The factory is the injection point for future per-queue backends. + stores := storage.NewStaticFactory(store) + requestController := start.NewController( logger, scope, - store, + stores, changeStore, registry, consumer.TopicKeyStart, @@ -442,7 +481,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t cancelController := cancel.NewController( logger, scope, - store, + stores, registry, consumer.TopicKeyCancel, "orchestrator-cancel", @@ -454,11 +493,11 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t validateController := validate.NewController( logger, scope, - store, + stores, changeStore, registry, - mc, - cp, + mergeCheckerFactory{impl: mc}, + changeProviderFactory{impl: cp}, consumer.TopicKeyValidate, "orchestrator-validate", ) @@ -471,10 +510,10 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t scope, registry, cnt, - store, + stores, // TODO: replace with a real conflict analyzer (e.g. one backed by // Tango target analysis). The "all" stub serializes the queue. - all.New(), + conflictFactory{impl: all.New()}, consumer.TopicKeyBatch, "orchestrator-batch", ) @@ -485,9 +524,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t scoreController := score.NewController( logger, scope, - store, + stores, // TODO: replace with a real scorer - heuristic.New( + scorerFactory{impl: heuristic.New( []heuristic.Bucket{ {Min: 0, Max: 1, Score: 0.95}, {Min: 2, Max: 5, Score: 0.80}, @@ -498,7 +537,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t return len(change.URIs), nil }, scope.SubScope("scorer"), - ), + )}, registry, consumer.TopicKeyScore, "orchestrator-score", @@ -510,7 +549,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t speculateController := speculate.NewController( logger, scope, - store, + stores, registry, consumer.TopicKeySpeculate, "orchestrator-speculate", @@ -522,8 +561,8 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t buildController := build.NewController( logger, scope, - store, - br, + stores, + buildRunnerFactory{impl: br}, registry, consumer.TopicKeyBuild, "orchestrator-build", @@ -535,8 +574,8 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t buildsignalController := buildsignal.NewController( logger, scope, - store, - br, + stores, + buildRunnerFactory{impl: br}, registry, consumer.TopicKeyBuildSignal, "orchestrator-buildsignal", @@ -548,9 +587,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t mergeController := merge.NewController( logger, scope, - store, + stores, registry, - psh, + pusherFactory{impl: psh}, consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -561,7 +600,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t concludeController := conclude.NewController( logger, scope, - store, + stores, registry, consumer.TopicKeyConclude, "orchestrator-conclude", @@ -573,7 +612,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logController := logctrl.NewController( logger, scope, - store, + stores, consumer.TopicKeyLog, "orchestrator-log", ) @@ -619,14 +658,10 @@ func newMergeChecker(logger *zap.Logger, scope tally.Scope) (mergechecker.MergeC client.Timeout = parseTimeout(os.Getenv("GITHUB_TIMEOUT"), 30*time.Second) - github := githubchecker.NewMergeChecker(githubchecker.Params{ + return githubchecker.NewMergeChecker(githubchecker.Params{ HTTPClient: client, Logger: logger.Sugar(), MetricsScope: scope.SubScope("mergechecker"), - }) - - return mergechecker.NewMultiChecker(map[string]mergechecker.MergeChecker{ - "github": github, }), nil } diff --git a/submitqueue/entity/queue_config.go b/submitqueue/entity/queue_config.go index 940d5319..97f66f2f 100644 --- a/submitqueue/entity/queue_config.go +++ b/submitqueue/entity/queue_config.go @@ -14,52 +14,13 @@ package entity -// QueueConfig holds the configuration for a single submit queue. -// Each queue maps a VCS repository + target to a processing pipeline. -// A repository can have multiple queues, but each queue has exactly one target. -// Immutable after creation. +// QueueConfig identifies a single submit queue. It is the registry of valid +// queue names; the gateway validates that a land request targets a known queue. +// All behavioral and VCS configuration lives in the extension factory +// implementations, which are constructed per integrator deployment — the system +// hands a factory only the queue name. Immutable after creation. type QueueConfig struct { // Name uniquely identifies this queue within the system. // Referenced by Request.Queue. Name string `json:"name" yaml:"name"` - - // VCSType identifies the version control system (e.g., "git", "svn", "perforce"). - // A queue operates on exactly one VCS. - VCSType string `json:"vcs_type" yaml:"vcs_type"` - - // VCSAddress identifies the repository in the version control system. - // The format is VCS-specific: - // - Git: remote URL (e.g., "git@github.com:uber/submitqueue.git") - // - Perforce: depot path (e.g., "//depot/project") - // - SVN: repository URL (e.g., "https://svn.example.com/repos/project") - VCSAddress string `json:"vcs_address" yaml:"vcs_address"` - - // Target is the landing target where changes are merged. - // The format is VCS-specific: - // - Git: branch ref (e.g., "main", "release/v2") - // - Perforce: stream or depot path (e.g., "//depot/main/...") - // - SVN: repository path (e.g., "trunk/") - Target string `json:"target" yaml:"target"` - - // BuildRunner identifies the CI pipeline or job that runs builds for this queue. - // Opaque to the system; meaningful only to the build runner extension implementation. - // Examples: - // - Buildkite: "buildkite.com/uber/submitqueue-ci" - // - Jenkins: "jenkins.example.com/job/submitqueue-verify" - BuildRunner string `json:"build_runner" yaml:"build_runner"` - - // ChangeProvider identifies the change provider implementation for this queue. - // Opaque to the system; meaningful only to the change provider extension implementation. - // Examples: "github", "gitlab", "phabricator" - ChangeProvider string `json:"change_provider" yaml:"change_provider"` - - // MergeChecker identifies the merge checker implementation for this queue. - // Opaque to the system; meaningful only to the merge checker extension implementation. - // Examples: "github", "gitlab" - MergeChecker string `json:"merge_checker" yaml:"merge_checker"` - - // LandProvider identifies the land provider implementation for this queue. - // Opaque to the system; meaningful only to the land provider extension implementation. - // Examples: "github", "gitlab" - LandProvider string `json:"land_provider" yaml:"land_provider"` } diff --git a/submitqueue/extension/buildrunner/build_runner.go b/submitqueue/extension/buildrunner/build_runner.go index f34a404a..ab4b7cc2 100644 --- a/submitqueue/extension/buildrunner/build_runner.go +++ b/submitqueue/extension/buildrunner/build_runner.go @@ -55,11 +55,10 @@ type BuildRunner interface { // asynchronously. Callers learn the build's progress via Status, not // via Trigger. // - // queueName selects the runner-specific job configuration. - // Returns an error if the request is invalid. + // The runner is already bound to its queue's job configuration by the + // Factory that built it. Returns an error if the request is invalid. Trigger( ctx context.Context, - queueName string, base []entity.Change, head []entity.Change, metadata entity.BuildMetadata, @@ -80,3 +79,18 @@ type BuildRunner interface { // on already-terminal builds. Returns an error if the build does not exist. Cancel(ctx context.Context, buildID entity.BuildID) error } + +// Config carries the per-queue identity handed to a Factory. The system knows +// only the queue name; everything an implementation needs (endpoint, pipeline, +// credentials) is injected at construction by the integrator. +type Config struct { + // QueueName identifies the queue this BuildRunner serves. + QueueName string +} + +// Factory builds the BuildRunner for a queue. Implementations are provided by +// integrators (and tests) and inject whatever they need at construction. +type Factory interface { + // For returns the BuildRunner for the given queue. + For(cfg Config) (BuildRunner, error) +} diff --git a/submitqueue/extension/buildrunner/mock/BUILD.bazel b/submitqueue/extension/buildrunner/mock/BUILD.bazel index 4b0375cf..957fbfbf 100644 --- a/submitqueue/extension/buildrunner/mock/BUILD.bazel +++ b/submitqueue/extension/buildrunner/mock/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//submitqueue/entity", + "//submitqueue/extension/buildrunner", "@org_uber_go_mock//gomock", ], ) diff --git a/submitqueue/extension/buildrunner/mock/build_runner_mock.go b/submitqueue/extension/buildrunner/mock/build_runner_mock.go index d39477ae..4b7ac83a 100644 --- a/submitqueue/extension/buildrunner/mock/build_runner_mock.go +++ b/submitqueue/extension/buildrunner/mock/build_runner_mock.go @@ -14,6 +14,7 @@ import ( reflect "reflect" entity "github.com/uber/submitqueue/submitqueue/entity" + buildrunner "github.com/uber/submitqueue/submitqueue/extension/buildrunner" gomock "go.uber.org/mock/gomock" ) @@ -72,16 +73,55 @@ func (mr *MockBuildRunnerMockRecorder) Status(ctx, buildID any) *gomock.Call { } // Trigger mocks base method. -func (m *MockBuildRunner) Trigger(ctx context.Context, queueName string, base, head []entity.Change, metadata entity.BuildMetadata) (entity.BuildID, error) { +func (m *MockBuildRunner) Trigger(ctx context.Context, base, head []entity.Change, metadata entity.BuildMetadata) (entity.BuildID, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Trigger", ctx, queueName, base, head, metadata) + ret := m.ctrl.Call(m, "Trigger", ctx, base, head, metadata) ret0, _ := ret[0].(entity.BuildID) ret1, _ := ret[1].(error) return ret0, ret1 } // Trigger indicates an expected call of Trigger. -func (mr *MockBuildRunnerMockRecorder) Trigger(ctx, queueName, base, head, metadata any) *gomock.Call { +func (mr *MockBuildRunnerMockRecorder) Trigger(ctx, base, head, metadata any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trigger", reflect.TypeOf((*MockBuildRunner)(nil).Trigger), ctx, queueName, base, head, metadata) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trigger", reflect.TypeOf((*MockBuildRunner)(nil).Trigger), ctx, base, head, metadata) +} + +// MockFactory is a mock of Factory interface. +type MockFactory struct { + ctrl *gomock.Controller + recorder *MockFactoryMockRecorder + isgomock struct{} +} + +// MockFactoryMockRecorder is the mock recorder for MockFactory. +type MockFactoryMockRecorder struct { + mock *MockFactory +} + +// NewMockFactory creates a new mock instance. +func NewMockFactory(ctrl *gomock.Controller) *MockFactory { + mock := &MockFactory{ctrl: ctrl} + mock.recorder = &MockFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockFactory) EXPECT() *MockFactoryMockRecorder { + return m.recorder +} + +// For mocks base method. +func (m *MockFactory) For(cfg buildrunner.Config) (buildrunner.BuildRunner, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "For", cfg) + ret0, _ := ret[0].(buildrunner.BuildRunner) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// For indicates an expected call of For. +func (mr *MockFactoryMockRecorder) For(cfg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "For", reflect.TypeOf((*MockFactory)(nil).For), cfg) } diff --git a/submitqueue/extension/buildrunner/noop/noop.go b/submitqueue/extension/buildrunner/noop/noop.go index 71bba9a2..97c6eab0 100644 --- a/submitqueue/extension/buildrunner/noop/noop.go +++ b/submitqueue/extension/buildrunner/noop/noop.go @@ -41,7 +41,7 @@ func New() buildrunner.BuildRunner { // Trigger returns a unique build ID without contacting any runner. // Inputs are ignored. -func (r *runner) Trigger(_ context.Context, _ string, _ []entity.Change, _ []entity.Change, _ entity.BuildMetadata) (entity.BuildID, error) { +func (r *runner) Trigger(_ context.Context, _ []entity.Change, _ []entity.Change, _ entity.BuildMetadata) (entity.BuildID, error) { return entity.BuildID{ID: fmt.Sprintf("noop-%d", r.counter.Add(1))}, nil } diff --git a/submitqueue/extension/buildrunner/noop/noop_test.go b/submitqueue/extension/buildrunner/noop/noop_test.go index 76955c32..dff0d09b 100644 --- a/submitqueue/extension/buildrunner/noop/noop_test.go +++ b/submitqueue/extension/buildrunner/noop/noop_test.go @@ -32,7 +32,7 @@ func TestRunner_Trigger(t *testing.T) { r := New() ctx := context.Background() - id1, err := r.Trigger(ctx, "queueA", + id1, err := r.Trigger(ctx, []entity.Change{{URIs: []string{"github://owner/repo/pull/1"}}}, []entity.Change{{URIs: []string{"github://owner/repo/pull/2"}}}, entity.BuildMetadata{"requester": "alice"}, @@ -41,7 +41,7 @@ func TestRunner_Trigger(t *testing.T) { assert.NotEmpty(t, id1.ID) // IDs are unique across calls, even with empty inputs. - id2, err := r.Trigger(ctx, "queueA", nil, nil, nil) + id2, err := r.Trigger(ctx, nil, nil, nil) require.NoError(t, err) assert.NotEqual(t, id1, id2) } diff --git a/submitqueue/extension/changeprovider/change_provider.go b/submitqueue/extension/changeprovider/change_provider.go index ea40884f..47a91144 100644 --- a/submitqueue/extension/changeprovider/change_provider.go +++ b/submitqueue/extension/changeprovider/change_provider.go @@ -14,6 +14,8 @@ package changeprovider +//go:generate mockgen -source=change_provider.go -destination=mock/change_provider_mock.go -package=mock + import ( "context" @@ -59,3 +61,18 @@ type ChangeProvider interface { // Returns a slice of ChangeInfo, one for each change in the stack. Get(ctx context.Context, change entity.Change) ([]ChangeInfo, error) } + +// Config carries the per-queue identity handed to a Factory. The system knows +// only the queue name; everything an implementation needs is injected at +// construction by the integrator. +type Config struct { + // QueueName identifies the queue this ChangeProvider serves. + QueueName string +} + +// Factory builds the ChangeProvider for a queue. Implementations are provided +// by integrators (and tests) and inject whatever they need at construction. +type Factory interface { + // For returns the ChangeProvider for the given queue. + For(cfg Config) (ChangeProvider, error) +} diff --git a/submitqueue/extension/changeprovider/mock/BUILD.bazel b/submitqueue/extension/changeprovider/mock/BUILD.bazel new file mode 100644 index 00000000..47bb4761 --- /dev/null +++ b/submitqueue/extension/changeprovider/mock/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "mock", + srcs = ["change_provider_mock.go"], + importpath = "github.com/uber/submitqueue/submitqueue/extension/changeprovider/mock", + visibility = ["//visibility:public"], + deps = [ + "//submitqueue/entity", + "//submitqueue/extension/changeprovider", + "@org_uber_go_mock//gomock", + ], +) diff --git a/submitqueue/extension/changeprovider/mock/change_provider_mock.go b/submitqueue/extension/changeprovider/mock/change_provider_mock.go new file mode 100644 index 00000000..948c32a0 --- /dev/null +++ b/submitqueue/extension/changeprovider/mock/change_provider_mock.go @@ -0,0 +1,97 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: change_provider.go +// +// Generated by this command: +// +// mockgen -source=change_provider.go -destination=mock/change_provider_mock.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + entity "github.com/uber/submitqueue/submitqueue/entity" + changeprovider "github.com/uber/submitqueue/submitqueue/extension/changeprovider" + gomock "go.uber.org/mock/gomock" +) + +// MockChangeProvider is a mock of ChangeProvider interface. +type MockChangeProvider struct { + ctrl *gomock.Controller + recorder *MockChangeProviderMockRecorder + isgomock struct{} +} + +// MockChangeProviderMockRecorder is the mock recorder for MockChangeProvider. +type MockChangeProviderMockRecorder struct { + mock *MockChangeProvider +} + +// NewMockChangeProvider creates a new mock instance. +func NewMockChangeProvider(ctrl *gomock.Controller) *MockChangeProvider { + mock := &MockChangeProvider{ctrl: ctrl} + mock.recorder = &MockChangeProviderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockChangeProvider) EXPECT() *MockChangeProviderMockRecorder { + return m.recorder +} + +// Get mocks base method. +func (m *MockChangeProvider) Get(ctx context.Context, change entity.Change) ([]changeprovider.ChangeInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", ctx, change) + ret0, _ := ret[0].([]changeprovider.ChangeInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockChangeProviderMockRecorder) Get(ctx, change any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockChangeProvider)(nil).Get), ctx, change) +} + +// MockFactory is a mock of Factory interface. +type MockFactory struct { + ctrl *gomock.Controller + recorder *MockFactoryMockRecorder + isgomock struct{} +} + +// MockFactoryMockRecorder is the mock recorder for MockFactory. +type MockFactoryMockRecorder struct { + mock *MockFactory +} + +// NewMockFactory creates a new mock instance. +func NewMockFactory(ctrl *gomock.Controller) *MockFactory { + mock := &MockFactory{ctrl: ctrl} + mock.recorder = &MockFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockFactory) EXPECT() *MockFactoryMockRecorder { + return m.recorder +} + +// For mocks base method. +func (m *MockFactory) For(cfg changeprovider.Config) (changeprovider.ChangeProvider, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "For", cfg) + ret0, _ := ret[0].(changeprovider.ChangeProvider) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// For indicates an expected call of For. +func (mr *MockFactoryMockRecorder) For(cfg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "For", reflect.TypeOf((*MockFactory)(nil).For), cfg) +} diff --git a/submitqueue/extension/conflict/conflict.go b/submitqueue/extension/conflict/conflict.go index 40594a36..3c1d1e5c 100644 --- a/submitqueue/extension/conflict/conflict.go +++ b/submitqueue/extension/conflict/conflict.go @@ -66,3 +66,18 @@ type Analyzer interface { // retryable by the caller. Analyze(ctx context.Context, batch entity.Batch, inFlight []entity.Batch) ([]Conflict, error) } + +// Config carries the per-queue identity handed to a Factory. The system knows +// only the queue name; everything an implementation needs is injected at +// construction by the integrator. +type Config struct { + // QueueName identifies the queue this Analyzer serves. + QueueName string +} + +// Factory builds the Analyzer for a queue. Implementations are provided by +// integrators (and tests) and inject whatever they need at construction. +type Factory interface { + // For returns the Analyzer for the given queue. + For(cfg Config) (Analyzer, error) +} diff --git a/submitqueue/extension/conflict/mock/conflict_mock.go b/submitqueue/extension/conflict/mock/conflict_mock.go index 6ed3f10a..e0a795ca 100644 --- a/submitqueue/extension/conflict/mock/conflict_mock.go +++ b/submitqueue/extension/conflict/mock/conflict_mock.go @@ -56,3 +56,42 @@ func (mr *MockAnalyzerMockRecorder) Analyze(ctx, batch, inFlight any) *gomock.Ca mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Analyze", reflect.TypeOf((*MockAnalyzer)(nil).Analyze), ctx, batch, inFlight) } + +// MockFactory is a mock of Factory interface. +type MockFactory struct { + ctrl *gomock.Controller + recorder *MockFactoryMockRecorder + isgomock struct{} +} + +// MockFactoryMockRecorder is the mock recorder for MockFactory. +type MockFactoryMockRecorder struct { + mock *MockFactory +} + +// NewMockFactory creates a new mock instance. +func NewMockFactory(ctrl *gomock.Controller) *MockFactory { + mock := &MockFactory{ctrl: ctrl} + mock.recorder = &MockFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockFactory) EXPECT() *MockFactoryMockRecorder { + return m.recorder +} + +// For mocks base method. +func (m *MockFactory) For(cfg conflict.Config) (conflict.Analyzer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "For", cfg) + ret0, _ := ret[0].(conflict.Analyzer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// For indicates an expected call of For. +func (mr *MockFactoryMockRecorder) For(cfg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "For", reflect.TypeOf((*MockFactory)(nil).For), cfg) +} diff --git a/submitqueue/extension/mergechecker/BUILD.bazel b/submitqueue/extension/mergechecker/BUILD.bazel index cba27dc3..029989d6 100644 --- a/submitqueue/extension/mergechecker/BUILD.bazel +++ b/submitqueue/extension/mergechecker/BUILD.bazel @@ -1,23 +1,9 @@ -load("@rules_go//go:def.bzl", "go_library", "go_test") +load("@rules_go//go:def.bzl", "go_library") go_library( name = "mergechecker", - srcs = [ - "mergechecker.go", - "multi.go", - ], + srcs = ["mergechecker.go"], importpath = "github.com/uber/submitqueue/submitqueue/extension/mergechecker", visibility = ["//visibility:public"], deps = ["//submitqueue/entity"], ) - -go_test( - name = "mergechecker_test", - srcs = ["multi_test.go"], - embed = [":mergechecker"], - deps = [ - "//submitqueue/entity", - "@com_github_stretchr_testify//assert", - "@com_github_stretchr_testify//require", - ], -) diff --git a/submitqueue/extension/mergechecker/github/checker.go b/submitqueue/extension/mergechecker/github/checker.go index ea54c7e3..4f92dca3 100644 --- a/submitqueue/extension/mergechecker/github/checker.go +++ b/submitqueue/extension/mergechecker/github/checker.go @@ -61,7 +61,7 @@ func NewMergeChecker(params Params) mergechecker.MergeChecker { } // Check assesses whether a change can merge cleanly using the GitHub GraphQL API. -func (c *mergeChecker) Check(ctx context.Context, queue string, change entity.Change) (result mergechecker.Result, retErr error) { +func (c *mergeChecker) Check(ctx context.Context, change entity.Change) (result mergechecker.Result, retErr error) { const opName = "check" op := metrics.Begin(c.metricsScope, opName) @@ -96,7 +96,6 @@ func (c *mergeChecker) Check(ctx context.Context, queue string, change entity.Ch if !mergeable { metrics.NamedCounter(c.metricsScope, opName, "not_mergeable", 1) c.logger.Infow("change not mergeable", - "queue", queue, "reason", reason, "change_uris", change.URIs, ) diff --git a/submitqueue/extension/mergechecker/github/checker_test.go b/submitqueue/extension/mergechecker/github/checker_test.go index e349c1ab..6bd28e71 100644 --- a/submitqueue/extension/mergechecker/github/checker_test.go +++ b/submitqueue/extension/mergechecker/github/checker_test.go @@ -83,7 +83,6 @@ func TestMergeChecker_Check(t *testing.T) { tests := []struct { name string handler http.HandlerFunc - queue string change entity.Change wantMergeable bool wantReason string @@ -94,7 +93,6 @@ func TestMergeChecker_Check(t *testing.T) { handler: graphQLHandler(t, []PRInfo{ {Number: 1, Mergeable: PRMergeableStateMergeable, BaseRefName: "main", HeadRefName: "feature-1", HeadRefOid: shaAFull, State: PRStateOpen}, }), - queue: "test-queue", change: entity.Change{URIs: []string{"github://uber/repo/pull/1/" + shaAFull}}, wantMergeable: true, }, @@ -103,7 +101,6 @@ func TestMergeChecker_Check(t *testing.T) { handler: graphQLHandler(t, []PRInfo{ {Number: 1, Mergeable: PRMergeableStateConflicting, BaseRefName: "main", HeadRefName: "feature-1", HeadRefOid: shaAFull, State: PRStateOpen}, }), - queue: "test-queue", change: entity.Change{URIs: []string{"github://uber/repo/pull/1/" + shaAFull}}, wantMergeable: false, wantReason: "PR #1 has merge conflicts", @@ -114,7 +111,6 @@ func TestMergeChecker_Check(t *testing.T) { {Number: 1, Mergeable: PRMergeableStateMergeable, BaseRefName: "main", HeadRefName: "feature-1", HeadRefOid: sha1Full, State: PRStateOpen}, {Number: 2, Mergeable: PRMergeableStateMergeable, BaseRefName: "feature-1", HeadRefName: "feature-2", HeadRefOid: sha2Full, State: PRStateOpen}, }), - queue: "test-queue", change: entity.Change{URIs: []string{"github://uber/repo/pull/1/" + sha1Full, "github://uber/repo/pull/2/" + sha2Full}}, wantMergeable: true, }, @@ -123,7 +119,6 @@ func TestMergeChecker_Check(t *testing.T) { handler: graphQLHandler(t, []PRInfo{ {Number: 1, Mergeable: PRMergeableStateUnknown, BaseRefName: "main", HeadRefName: "feature-1", HeadRefOid: shaAFull, State: PRStateOpen}, }), - queue: "test-queue", change: entity.Change{URIs: []string{"github://uber/repo/pull/1/" + shaAFull}}, wantErr: true, }, @@ -132,7 +127,6 @@ func TestMergeChecker_Check(t *testing.T) { handler: graphQLHandler(t, []PRInfo{ {Number: 1, Mergeable: PRMergeableStateMergeable, BaseRefName: "main", HeadRefName: "feature-1", HeadRefOid: shaNewFull, State: PRStateOpen}, }), - queue: "test-queue", change: entity.Change{URIs: []string{"github://uber/repo/pull/1/" + shaOldFull}}, wantMergeable: false, wantReason: fmt.Sprintf("PR #1 head SHA changed: expected %s, got %s", shaOldFull, shaNewFull), @@ -142,7 +136,6 @@ func TestMergeChecker_Check(t *testing.T) { handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { t.Fatal("should not reach server") }), - queue: "test-queue", change: entity.Change{URIs: []string{"invalid-change-id"}}, wantErr: true, }, @@ -152,7 +145,6 @@ func TestMergeChecker_Check(t *testing.T) { w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte("internal server error")) }), - queue: "test-queue", change: entity.Change{URIs: []string{"github://uber/repo/pull/1/" + shaAFull}}, wantErr: true, }, @@ -164,7 +156,7 @@ func TestMergeChecker_Check(t *testing.T) { defer server.Close() mc := newTestMergeChecker(t, server.URL) - result, err := mc.Check(context.Background(), tt.queue, tt.change) + result, err := mc.Check(context.Background(), tt.change) if tt.wantErr { require.Error(t, err) return diff --git a/submitqueue/extension/mergechecker/mergechecker.go b/submitqueue/extension/mergechecker/mergechecker.go index ea83f5eb..50ef5d09 100644 --- a/submitqueue/extension/mergechecker/mergechecker.go +++ b/submitqueue/extension/mergechecker/mergechecker.go @@ -27,7 +27,7 @@ type MergeChecker interface { // Check is a fail-fast mergeability check that optimistically assesses // whether the changes can be merged. A positive result does not // guarantee that the changes will apply cleanly at merge time. - Check(ctx context.Context, queue string, change entity.Change) (Result, error) + Check(ctx context.Context, change entity.Change) (Result, error) } // Result holds the outcome of a mergeability check. @@ -38,3 +38,18 @@ type Result struct { // Empty when Mergeable is true. Reason string } + +// Config carries the per-queue identity handed to a Factory. The system knows +// only the queue name; everything an implementation needs is injected at +// construction by the integrator. +type Config struct { + // QueueName identifies the queue this MergeChecker serves. + QueueName string +} + +// Factory builds the MergeChecker for a queue. Implementations are provided by +// integrators (and tests) and inject whatever they need at construction. +type Factory interface { + // For returns the MergeChecker for the given queue. + For(cfg Config) (MergeChecker, error) +} diff --git a/submitqueue/extension/mergechecker/mock/mergechecker_mock.go b/submitqueue/extension/mergechecker/mock/mergechecker_mock.go index 68e98b10..eab22c58 100644 --- a/submitqueue/extension/mergechecker/mock/mergechecker_mock.go +++ b/submitqueue/extension/mergechecker/mock/mergechecker_mock.go @@ -43,16 +43,55 @@ func (m *MockMergeChecker) EXPECT() *MockMergeCheckerMockRecorder { } // Check mocks base method. -func (m *MockMergeChecker) Check(ctx context.Context, queue string, change entity.Change) (mergechecker.Result, error) { +func (m *MockMergeChecker) Check(ctx context.Context, change entity.Change) (mergechecker.Result, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Check", ctx, queue, change) + ret := m.ctrl.Call(m, "Check", ctx, change) ret0, _ := ret[0].(mergechecker.Result) ret1, _ := ret[1].(error) return ret0, ret1 } // Check indicates an expected call of Check. -func (mr *MockMergeCheckerMockRecorder) Check(ctx, queue, change any) *gomock.Call { +func (mr *MockMergeCheckerMockRecorder) Check(ctx, change any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Check", reflect.TypeOf((*MockMergeChecker)(nil).Check), ctx, queue, change) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Check", reflect.TypeOf((*MockMergeChecker)(nil).Check), ctx, change) +} + +// MockFactory is a mock of Factory interface. +type MockFactory struct { + ctrl *gomock.Controller + recorder *MockFactoryMockRecorder + isgomock struct{} +} + +// MockFactoryMockRecorder is the mock recorder for MockFactory. +type MockFactoryMockRecorder struct { + mock *MockFactory +} + +// NewMockFactory creates a new mock instance. +func NewMockFactory(ctrl *gomock.Controller) *MockFactory { + mock := &MockFactory{ctrl: ctrl} + mock.recorder = &MockFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockFactory) EXPECT() *MockFactoryMockRecorder { + return m.recorder +} + +// For mocks base method. +func (m *MockFactory) For(cfg mergechecker.Config) (mergechecker.MergeChecker, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "For", cfg) + ret0, _ := ret[0].(mergechecker.MergeChecker) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// For indicates an expected call of For. +func (mr *MockFactoryMockRecorder) For(cfg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "For", reflect.TypeOf((*MockFactory)(nil).For), cfg) } diff --git a/submitqueue/extension/mergechecker/multi.go b/submitqueue/extension/mergechecker/multi.go deleted file mode 100644 index fc4c4e0d..00000000 --- a/submitqueue/extension/mergechecker/multi.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package mergechecker - -import ( - "context" - "fmt" - "strings" - - "github.com/uber/submitqueue/submitqueue/entity" -) - -// multiChecker dispatches mergeability checks to scheme-specific checkers -// based on the URI scheme of the first change URI. Each scheme -// (e.g., "github", "ghe", "ghes") maps to a checker configured for that host. -type multiChecker struct { - // checkers maps URI scheme values to their corresponding MergeChecker. - checkers map[string]MergeChecker -} - -// NewMultiChecker creates a MergeChecker that routes mergeability checks -// to scheme-specific checkers. The map keys correspond to URI schemes -// (e.g., "github", "ghe") extracted from the first change URI. -func NewMultiChecker(checkers map[string]MergeChecker) MergeChecker { - return &multiChecker{checkers: checkers} -} - -// Check dispatches the mergeability check to the checker registered for -// the change URI scheme. -func (m *multiChecker) Check(ctx context.Context, queue string, change entity.Change) (Result, error) { - if len(change.URIs) == 0 { - return Result{}, fmt.Errorf("no change URIs provided") - } - - scheme, _, ok := strings.Cut(change.URIs[0], "://") - if !ok || scheme == "" { - return Result{}, fmt.Errorf("invalid change URI %q: missing scheme", change.URIs[0]) - } - - checker, ok := m.checkers[scheme] - if !ok { - return Result{}, fmt.Errorf("no mergeability checker configured for scheme %q", scheme) - } - return checker.Check(ctx, queue, change) -} diff --git a/submitqueue/extension/mergechecker/multi_test.go b/submitqueue/extension/mergechecker/multi_test.go deleted file mode 100644 index 9cab5105..00000000 --- a/submitqueue/extension/mergechecker/multi_test.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package mergechecker - -import ( - "context" - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/uber/submitqueue/submitqueue/entity" -) - -// stubChecker is a test stub that returns a fixed result. -type stubChecker struct { - result Result - err error -} - -func (s *stubChecker) Check(_ context.Context, _ string, _ entity.Change) (Result, error) { - return s.result, s.err -} - -func TestMultiChecker_RoutesToCorrectChecker(t *testing.T) { - githubChecker := &stubChecker{result: Result{Mergeable: true}} - gheChecker := &stubChecker{result: Result{Mergeable: false}} - - mc := NewMultiChecker(map[string]MergeChecker{ - "github": githubChecker, - "ghe": gheChecker, - }) - - // Route to github checker - result, err := mc.Check(context.Background(), "test-queue", entity.Change{URIs: []string{"github://uber/repo/pull/1/abcdef0123456789abcdef0123456789abcdef01"}}) - require.NoError(t, err) - assert.True(t, result.Mergeable) - - // Route to ghe checker - result, err = mc.Check(context.Background(), "test-queue", entity.Change{URIs: []string{"ghe://uber/repo/pull/1/abcdef0123456789abcdef0123456789abcdef01"}}) - require.NoError(t, err) - assert.False(t, result.Mergeable) -} - -func TestMultiChecker_UnknownScheme(t *testing.T) { - mc := NewMultiChecker(map[string]MergeChecker{ - "github": &stubChecker{result: Result{Mergeable: true}}, - }) - - _, err := mc.Check(context.Background(), "test-queue", entity.Change{URIs: []string{"unknown://uber/repo/1/abcdef0123456789abcdef0123456789abcdef01"}}) - require.Error(t, err) -} - -func TestMultiChecker_PropagatesError(t *testing.T) { - mc := NewMultiChecker(map[string]MergeChecker{ - "github": &stubChecker{err: fmt.Errorf("api failure")}, - }) - - _, err := mc.Check(context.Background(), "test-queue", entity.Change{URIs: []string{"github://uber/repo/pull/1/abcdef0123456789abcdef0123456789abcdef01"}}) - require.Error(t, err) -} - -func TestMultiChecker_EmptyURIs(t *testing.T) { - mc := NewMultiChecker(map[string]MergeChecker{ - "github": &stubChecker{result: Result{Mergeable: true}}, - }) - - _, err := mc.Check(context.Background(), "test-queue", entity.Change{URIs: []string{}}) - require.Error(t, err) -} diff --git a/submitqueue/extension/pusher/mock/pusher_mock.go b/submitqueue/extension/pusher/mock/pusher_mock.go index 6ead5f51..5ff69e71 100644 --- a/submitqueue/extension/pusher/mock/pusher_mock.go +++ b/submitqueue/extension/pusher/mock/pusher_mock.go @@ -56,3 +56,42 @@ func (mr *MockPusherMockRecorder) Push(ctx, changes any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Push", reflect.TypeOf((*MockPusher)(nil).Push), ctx, changes) } + +// MockFactory is a mock of Factory interface. +type MockFactory struct { + ctrl *gomock.Controller + recorder *MockFactoryMockRecorder + isgomock struct{} +} + +// MockFactoryMockRecorder is the mock recorder for MockFactory. +type MockFactoryMockRecorder struct { + mock *MockFactory +} + +// NewMockFactory creates a new mock instance. +func NewMockFactory(ctrl *gomock.Controller) *MockFactory { + mock := &MockFactory{ctrl: ctrl} + mock.recorder = &MockFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockFactory) EXPECT() *MockFactoryMockRecorder { + return m.recorder +} + +// For mocks base method. +func (m *MockFactory) For(cfg pusher.Config) (pusher.Pusher, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "For", cfg) + ret0, _ := ret[0].(pusher.Pusher) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// For indicates an expected call of For. +func (mr *MockFactoryMockRecorder) For(cfg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "For", reflect.TypeOf((*MockFactory)(nil).For), cfg) +} diff --git a/submitqueue/extension/pusher/pusher.go b/submitqueue/extension/pusher/pusher.go index 102ca204..8d86e131 100644 --- a/submitqueue/extension/pusher/pusher.go +++ b/submitqueue/extension/pusher/pusher.go @@ -86,3 +86,18 @@ type Pusher interface { // commits. See the type-level docs for the atomicity contract. Push(ctx context.Context, changes []entity.Change) (Result, error) } + +// Config carries the per-queue identity handed to a Factory. The system knows +// only the queue name; everything an implementation needs (checkout, remote, +// target) is injected at construction by the integrator. +type Config struct { + // QueueName identifies the queue this Pusher serves. + QueueName string +} + +// Factory builds the Pusher for a queue. Implementations are provided by +// integrators (and tests) and inject whatever they need at construction. +type Factory interface { + // For returns the Pusher for the given queue. + For(cfg Config) (Pusher, error) +} diff --git a/submitqueue/extension/queueconfig/README.md b/submitqueue/extension/queueconfig/README.md index 4c94c8c8..aca1c46b 100644 --- a/submitqueue/extension/queueconfig/README.md +++ b/submitqueue/extension/queueconfig/README.md @@ -19,4 +19,4 @@ type Store interface { Queue configuration entity lives in `entity/queue_config.go`: -- **QueueConfig** — configuration for a single submit queue (name, VCS type, VCS repo, target) +- **QueueConfig** — identifies a single submit queue (just its name); the registry of valid queue names. Behavioral/VCS config lives in the extension factory implementations, not here. diff --git a/submitqueue/extension/queueconfig/yaml/yaml_test.go b/submitqueue/extension/queueconfig/yaml/yaml_test.go index f1077627..cd005e8e 100644 --- a/submitqueue/extension/queueconfig/yaml/yaml_test.go +++ b/submitqueue/extension/queueconfig/yaml/yaml_test.go @@ -123,10 +123,6 @@ func TestNewStore_MissingFile(t *testing.T) { func TestStore_Get(t *testing.T) { path := writeTempYAML(t, `queues: - name: main - vcs_type: git - vcs_address: git@github.com:uber/submitqueue.git - target: main - change_provider: github `) store, err := NewStore(path) require.NoError(t, err) @@ -135,8 +131,6 @@ func TestStore_Get(t *testing.T) { cfg, err := store.Get(context.Background(), "main") require.NoError(t, err) assert.Equal(t, "main", cfg.Name) - assert.Equal(t, "git", cfg.VCSType) - assert.Equal(t, "github", cfg.ChangeProvider) }) t.Run("unknown queue returns ErrNotFound", func(t *testing.T) { diff --git a/submitqueue/extension/scorer/mock/BUILD.bazel b/submitqueue/extension/scorer/mock/BUILD.bazel index 95b27c2b..6192dc00 100644 --- a/submitqueue/extension/scorer/mock/BUILD.bazel +++ b/submitqueue/extension/scorer/mock/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//submitqueue/entity", + "//submitqueue/extension/scorer", "@org_uber_go_mock//gomock", ], ) diff --git a/submitqueue/extension/scorer/mock/scorer_mock.go b/submitqueue/extension/scorer/mock/scorer_mock.go index 55a7245e..9cfc706f 100644 --- a/submitqueue/extension/scorer/mock/scorer_mock.go +++ b/submitqueue/extension/scorer/mock/scorer_mock.go @@ -14,6 +14,7 @@ import ( reflect "reflect" entity "github.com/uber/submitqueue/submitqueue/entity" + scorer "github.com/uber/submitqueue/submitqueue/extension/scorer" gomock "go.uber.org/mock/gomock" ) @@ -55,3 +56,42 @@ func (mr *MockScorerMockRecorder) Score(ctx, change any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Score", reflect.TypeOf((*MockScorer)(nil).Score), ctx, change) } + +// MockFactory is a mock of Factory interface. +type MockFactory struct { + ctrl *gomock.Controller + recorder *MockFactoryMockRecorder + isgomock struct{} +} + +// MockFactoryMockRecorder is the mock recorder for MockFactory. +type MockFactoryMockRecorder struct { + mock *MockFactory +} + +// NewMockFactory creates a new mock instance. +func NewMockFactory(ctrl *gomock.Controller) *MockFactory { + mock := &MockFactory{ctrl: ctrl} + mock.recorder = &MockFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockFactory) EXPECT() *MockFactoryMockRecorder { + return m.recorder +} + +// For mocks base method. +func (m *MockFactory) For(cfg scorer.Config) (scorer.Scorer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "For", cfg) + ret0, _ := ret[0].(scorer.Scorer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// For indicates an expected call of For. +func (mr *MockFactoryMockRecorder) For(cfg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "For", reflect.TypeOf((*MockFactory)(nil).For), cfg) +} diff --git a/submitqueue/extension/scorer/scorer.go b/submitqueue/extension/scorer/scorer.go index 33dfe5c6..8b0141ad 100644 --- a/submitqueue/extension/scorer/scorer.go +++ b/submitqueue/extension/scorer/scorer.go @@ -28,3 +28,18 @@ type Scorer interface { // of a successful land for the given change. Score(ctx context.Context, change entity.Change) (float64, error) } + +// Config carries the per-queue identity handed to a Factory. The system knows +// only the queue name; everything an implementation needs is injected at +// construction by the integrator. +type Config struct { + // QueueName identifies the queue this Scorer serves. + QueueName string +} + +// Factory builds the Scorer for a queue. Implementations are provided by +// integrators (and tests) and inject whatever they need at construction. +type Factory interface { + // For returns the Scorer for the given queue. + For(cfg Config) (Scorer, error) +} diff --git a/submitqueue/extension/storage/BUILD.bazel b/submitqueue/extension/storage/BUILD.bazel index fe889fbc..0fd34935 100644 --- a/submitqueue/extension/storage/BUILD.bazel +++ b/submitqueue/extension/storage/BUILD.bazel @@ -1,4 +1,4 @@ -load("@rules_go//go:def.bzl", "go_library") +load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "storage", @@ -7,6 +7,7 @@ go_library( "batch_store.go", "build_store.go", "change_provider_store.go", + "factory.go", "request_log_store.go", "request_store.go", "speculation_tree_store.go", @@ -16,3 +17,13 @@ go_library( visibility = ["//visibility:public"], deps = ["//submitqueue/entity"], ) + +go_test( + name = "storage_test", + srcs = ["factory_test.go"], + embed = [":storage"], + deps = [ + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], +) diff --git a/submitqueue/extension/storage/factory.go b/submitqueue/extension/storage/factory.go new file mode 100644 index 00000000..f1c4fe15 --- /dev/null +++ b/submitqueue/extension/storage/factory.go @@ -0,0 +1,34 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +// staticFactory is the default Factory. It returns the same Storage for every +// queue, ignoring the name. It exists so call sites can route through the +// Factory contract today (a no-op) and adopt true per-queue backends later +// without further consumer changes. +type staticFactory struct { + storage Storage +} + +// NewStaticFactory returns a Factory that serves the given Storage for every +// queue name. +func NewStaticFactory(s Storage) Factory { + return staticFactory{storage: s} +} + +// For returns the configured Storage for any queue name. +func (f staticFactory) For(string) (Storage, error) { + return f.storage, nil +} diff --git a/submitqueue/extension/storage/factory_test.go b/submitqueue/extension/storage/factory_test.go new file mode 100644 index 00000000..0986a2f0 --- /dev/null +++ b/submitqueue/extension/storage/factory_test.go @@ -0,0 +1,48 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// stubStorage is a minimal Storage used to verify the factory returns the +// wrapped instance. All accessors return zero values; they are never invoked. +type stubStorage struct{} + +func (stubStorage) GetRequestStore() RequestStore { return nil } +func (stubStorage) GetChangeProviderStore() ChangeProviderStore { return nil } +func (stubStorage) GetBatchStore() BatchStore { return nil } +func (stubStorage) GetBatchDependentStore() BatchDependentStore { return nil } +func (stubStorage) GetBuildStore() BuildStore { return nil } +func (stubStorage) GetSpeculationTreeStore() SpeculationTreeStore { return nil } +func (stubStorage) GetRequestLogStore() RequestLogStore { return nil } +func (stubStorage) Close() error { return nil } + +func TestStaticFactory_For(t *testing.T) { + s := stubStorage{} + f := NewStaticFactory(s) + + t.Run("returns the wrapped storage for any name", func(t *testing.T) { + for _, name := range []string{"", "queue-a", "queue-b"} { + got, err := f.For(name) + require.NoError(t, err) + assert.Equal(t, s, got) + } + }) +} diff --git a/submitqueue/extension/storage/mock/storage_mock.go b/submitqueue/extension/storage/mock/storage_mock.go index ddf0574a..2d78186d 100644 --- a/submitqueue/extension/storage/mock/storage_mock.go +++ b/submitqueue/extension/storage/mock/storage_mock.go @@ -151,3 +151,42 @@ func (mr *MockStorageMockRecorder) GetSpeculationTreeStore() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSpeculationTreeStore", reflect.TypeOf((*MockStorage)(nil).GetSpeculationTreeStore)) } + +// MockFactory is a mock of Factory interface. +type MockFactory struct { + ctrl *gomock.Controller + recorder *MockFactoryMockRecorder + isgomock struct{} +} + +// MockFactoryMockRecorder is the mock recorder for MockFactory. +type MockFactoryMockRecorder struct { + mock *MockFactory +} + +// NewMockFactory creates a new mock instance. +func NewMockFactory(ctrl *gomock.Controller) *MockFactory { + mock := &MockFactory{ctrl: ctrl} + mock.recorder = &MockFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockFactory) EXPECT() *MockFactoryMockRecorder { + return m.recorder +} + +// For mocks base method. +func (m *MockFactory) For(name string) (storage.Storage, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "For", name) + ret0, _ := ret[0].(storage.Storage) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// For indicates an expected call of For. +func (mr *MockFactoryMockRecorder) For(name any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "For", reflect.TypeOf((*MockFactory)(nil).For), name) +} diff --git a/submitqueue/extension/storage/storage.go b/submitqueue/extension/storage/storage.go index 1b84c4e5..1a185267 100644 --- a/submitqueue/extension/storage/storage.go +++ b/submitqueue/extension/storage/storage.go @@ -68,3 +68,15 @@ type Storage interface { // Close closes the storage and all underlying connections. Should only be called once at the end of the program. Close() error } + +// Factory returns the Storage backing a named queue. It exists so a single +// queue can be migrated to a different backend without affecting others; the +// default implementation (NewStaticFactory) returns the same Storage for +// every queue. Callers resolve the Storage per message from the queue name +// carried in the message envelope, before any entity lookup. +type Factory interface { + // For returns the Storage for the named queue. An empty name selects the + // default backend. It returns an error if no backend is configured for the + // queue. + For(name string) (Storage, error) +} diff --git a/submitqueue/gateway/controller/land.go b/submitqueue/gateway/controller/land.go index 037914bc..4ffe1212 100644 --- a/submitqueue/gateway/controller/land.go +++ b/submitqueue/gateway/controller/land.go @@ -61,25 +61,25 @@ func IsUnrecognizedQueue(err error) bool { // LandController handles land business logic for the gateway type LandController struct { - logger *zap.SugaredLogger - metricsScope tally.Scope - counter counter.Counter - requestLogStore storage.RequestLogStore - queueConfigs queueconfig.Store - registry consumer.TopicRegistry + logger *zap.SugaredLogger + metricsScope tally.Scope + counter counter.Counter + stores storage.Factory + queueConfigs queueconfig.Store + registry consumer.TopicRegistry } // NewLandController creates a new instance of the gateway land controller. // The controller publishes land requests to the topic registered under // consumer.TopicKeyStart in the registry. -func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, requestLogStore storage.RequestLogStore, queueConfigs queueconfig.Store, registry consumer.TopicRegistry) *LandController { +func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, stores storage.Factory, queueConfigs queueconfig.Store, registry consumer.TopicRegistry) *LandController { return &LandController{ - logger: logger, - metricsScope: scope.SubScope("land_controller"), - counter: counter, - requestLogStore: requestLogStore, - queueConfigs: queueConfigs, - registry: registry, + logger: logger, + metricsScope: scope.SubScope("land_controller"), + counter: counter, + stores: stores, + queueConfigs: queueConfigs, + registry: registry, } } @@ -110,6 +110,14 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *p return nil, fmt.Errorf("LandController failed to look up queue %q: %w", queue, err) } + // Resolve the storage backend for this entityqueue. The queue is known up front + // here (from the request), so this is the one place per-queue storage + // routing is actionable. + store, err := c.stores.For(queue) + if err != nil { + return nil, fmt.Errorf("LandController failed to resolve storage for queue=%s: %w", queue, err) + } + // TODO: pass default queue land strategy to resolver function to process a default. strategy, err := resolveRequestLandStrategy(req.Strategy) if err != nil { @@ -133,7 +141,7 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *p // It is important to record the status before publishing to the queue for processing. It is important to publish straight to the database and not via a entityqueue. // Gateway has to stay consistent with the request log. logEntry := entity.NewRequestLog(landRequest.ID, entity.RequestStatusAccepted, 0, "", nil) - if err := c.requestLogStore.Insert(ctx, logEntry); err != nil { + if err := store.GetRequestLogStore().Insert(ctx, logEntry); err != nil { return nil, fmt.Errorf("LandController failed to insert request log for sqid=%s: %w", landRequest.ID, err) } diff --git a/submitqueue/gateway/controller/land_test.go b/submitqueue/gateway/controller/land_test.go index 668e3490..bbe38ebf 100644 --- a/submitqueue/gateway/controller/land_test.go +++ b/submitqueue/gateway/controller/land_test.go @@ -30,6 +30,7 @@ import ( "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/queueconfig" qcmock "github.com/uber/submitqueue/submitqueue/extension/queueconfig/mock" + "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" "go.uber.org/mock/gomock" @@ -61,11 +62,14 @@ func newTestRegistryWithNoopPublisher(t *testing.T, ctrl *gomock.Controller) con return registry } -// noopRequestLogStore returns a mock RequestLogStore that succeeds silently. -func noopRequestLogStore(ctrl *gomock.Controller) *storagemock.MockRequestLogStore { - s := storagemock.NewMockRequestLogStore(ctrl) - s.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - return s +// noopStorageFactory returns a storage.Factory whose RequestLogStore.Insert +// succeeds silently for any entityqueue. +func noopStorageFactory(ctrl *gomock.Controller) storage.Factory { + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + return storage.NewStaticFactory(store) } // noopQueueConfigStore returns a mock queueconfig.Store that always reports @@ -80,7 +84,7 @@ func TestNewLandController(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) require.NotNil(t, controller) } @@ -89,7 +93,7 @@ func TestLand_ReturnsSqid(t *testing.T) { cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -107,7 +111,7 @@ func TestLand_ReturnsErrorOnCounterFailure(t *testing.T) { cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), fmt.Errorf("counter unavailable")) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -131,7 +135,7 @@ func TestLand_CounterDomainIncludesQueue(t *testing.T) { return 1, nil }, ) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -148,7 +152,7 @@ func TestLand_ReturnsErrorOnEmptyQueue(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -165,7 +169,7 @@ func TestLand_ReturnsErrorOnEmptyChangeUri(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -182,7 +186,7 @@ func TestLand_ReturnsErrorOnNilChange(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -202,7 +206,7 @@ func TestLand_ReturnsUnrecognizedQueueWhenStoreReportsNotFound(t *testing.T) { qcs := qcmock.NewMockStore(ctrl) qcs.EXPECT().Get(gomock.Any(), "missing-queue").Return(entity.QueueConfig{}, queueconfig.ErrNotFound) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), qcs, newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), qcs, newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -228,7 +232,7 @@ func TestLand_PropagatesQueueConfigStoreError(t *testing.T) { qcs := qcmock.NewMockStore(ctrl) qcs.EXPECT().Get(gomock.Any(), "test-queue").Return(entity.QueueConfig{}, fmt.Errorf("config backend down")) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), qcs, newTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), qcs, newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -260,7 +264,7 @@ func TestLand_PublishesToQueue(t *testing.T) { }, ) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), noopQueueConfigStore(ctrl), registry) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), registry) ctx := context.Background() req := &pb.LandRequest{ @@ -296,7 +300,7 @@ func TestLand_ContinuesWhenPublishFails(t *testing.T) { registry, publisher := newTestRegistry(t, ctrl) publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("queue unavailable")) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), noopQueueConfigStore(ctrl), registry) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopStorageFactory(ctrl), noopQueueConfigStore(ctrl), registry) ctx := context.Background() req := &pb.LandRequest{ diff --git a/submitqueue/orchestrator/controller/batch/batch.go b/submitqueue/orchestrator/controller/batch/batch.go index 189e5e6d..3a3b6dc2 100644 --- a/submitqueue/orchestrator/controller/batch/batch.go +++ b/submitqueue/orchestrator/controller/batch/batch.go @@ -39,7 +39,7 @@ type Controller struct { registry consumer.TopicRegistry counter counter.Counter store storage.Storage - analyzer conflict.Analyzer + analyzers conflict.Factory topicKey consumer.TopicKey consumerGroup string } @@ -53,18 +53,23 @@ func NewController( scope tally.Scope, registry consumer.TopicRegistry, counter counter.Counter, - store storage.Storage, - analyzer conflict.Analyzer, + stores storage.Factory, + analyzers conflict.Factory, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { + // TODO(queue-aware): make this controller queue-aware during Process — derive the + // queue from the loaded entity and use it for structured logging, metrics scoping, + // and per-queue storage resolution. Today it uses the default store because the + // queue is only known after the by-ID load. + store, _ := stores.For("") return &Controller{ logger: logger.Named("batch_controller"), metricsScope: scope.SubScope("batch_controller"), registry: registry, counter: counter, store: store, - analyzer: analyzer, + analyzers: analyzers, topicKey: topicKey, consumerGroup: consumerGroup, } @@ -149,7 +154,12 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r // Dedupe by batch ID since a single (analyzed, in-flight) pair may be // reported with multiple Conflict entries when different conflict types // apply; the dependency graph only tracks the relation. - conflicts, err := c.analyzer.Analyze(ctx, batch, activeBatches) + analyzer, err := c.analyzers.For(conflict.Config{QueueName: batch.Queue}) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "conflict_analyzer_errors", 1) + return fmt.Errorf("failed to build conflict analyzer for queue=%s: %w", batch.Queue, err) + } + conflicts, err := analyzer.Analyze(ctx, batch, activeBatches) if err != nil { metrics.NamedCounter(c.metricsScope, opName, "conflict_analyzer_errors", 1) return fmt.Errorf("failed to analyze conflicts for batchID=%s: %w", batch.ID, err) diff --git a/submitqueue/orchestrator/controller/batch/batch_test.go b/submitqueue/orchestrator/controller/batch/batch_test.go index d18e195a..f5f6c70f 100644 --- a/submitqueue/orchestrator/controller/batch/batch_test.go +++ b/submitqueue/orchestrator/controller/batch/batch_test.go @@ -114,7 +114,10 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M ) require.NoError(t, err) - return NewController(logger, scope, registry, cnt, mockStorage, analyzer, consumer.TopicKeyBatch, "orchestrator-batch") + analyzerFactory := conflictmock.NewMockFactory(ctrl) + analyzerFactory.EXPECT().For(gomock.Any()).Return(analyzer, nil).AnyTimes() + + return NewController(logger, scope, registry, cnt, storage.NewStaticFactory(mockStorage), analyzerFactory, consumer.TopicKeyBatch, "orchestrator-batch") } func TestNewController(t *testing.T) { @@ -436,9 +439,11 @@ func TestController_Process_CASLostToCancel(t *testing.T) { ) require.NoError(t, err) + analyzerFactory := conflictmock.NewMockFactory(ctrl) + analyzerFactory.EXPECT().For(gomock.Any()).Return(all.New(), nil).AnyTimes() controller := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, registry, newSequentialCounter(ctrl), - mockStorage, all.New(), consumer.TopicKeyBatch, "orchestrator-batch", + storage.NewStaticFactory(mockStorage), analyzerFactory, consumer.TopicKeyBatch, "orchestrator-batch", ) msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) diff --git a/submitqueue/orchestrator/controller/build/build.go b/submitqueue/orchestrator/controller/build/build.go index 7f19ff58..6e60fbc7 100644 --- a/submitqueue/orchestrator/controller/build/build.go +++ b/submitqueue/orchestrator/controller/build/build.go @@ -36,7 +36,7 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage - buildRunner buildrunner.BuildRunner + buildRunners buildrunner.Factory registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -49,17 +49,22 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - store storage.Storage, - buildRunner buildrunner.BuildRunner, + stores storage.Factory, + buildRunners buildrunner.Factory, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { + // TODO(queue-aware): make this controller queue-aware during Process — derive the + // queue from the loaded entity and use it for structured logging, metrics scoping, + // and per-queue storage resolution. Today it uses the default store because the + // queue is only known after the by-ID load. + store, _ := stores.For("") return &Controller{ logger: logger.Named("build_controller"), metricsScope: scope.SubScope("build_controller"), store: store, - buildRunner: buildRunner, + buildRunners: buildRunners, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -126,10 +131,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return fmt.Errorf("failed to assemble head changes for batch %s: %w", batch.ID, err) } - // Trigger the build with the configured build manager. metadata is nil + // Trigger the build with the queue's build runner. metadata is nil // until a caller-supplied source materializes (e.g. requester / ticket // pulled off the originating LandRequest). - buildID, err := c.buildRunner.Trigger(ctx, batch.Queue, base, head, nil) + buildRunner, err := c.buildRunners.For(buildrunner.Config{QueueName: batch.Queue}) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "trigger_errors", 1) + return fmt.Errorf("failed to build runner for batch %s: %w", batch.ID, err) + } + buildID, err := buildRunner.Trigger(ctx, base, head, nil) if err != nil { metrics.NamedCounter(c.metricsScope, opName, "trigger_errors", 1) return fmt.Errorf("failed to trigger build for batch %s: %w", batch.ID, err) diff --git a/submitqueue/orchestrator/controller/build/build_test.go b/submitqueue/orchestrator/controller/build/build_test.go index 90073c36..520c947d 100644 --- a/submitqueue/orchestrator/controller/build/build_test.go +++ b/submitqueue/orchestrator/controller/build/build_test.go @@ -75,6 +75,14 @@ func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.Mo // newTestController creates a controller with test dependencies. br is the // build runner to inject; pass buildnoop.New() for the pass-through default. +// staticBuildRunnerFactory is a test factory that returns a fixed BuildRunner +// for any entityqueue. +type staticBuildRunnerFactory struct{ r buildrunner.BuildRunner } + +func (f staticBuildRunnerFactory) For(buildrunner.Config) (buildrunner.BuildRunner, error) { + return f.r, nil +} + // The wired registry exposes only the buildsignal topic — that is what the // controller publishes to after the RFC refactor. func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, br buildrunner.BuildRunner, publishErr error) *Controller { @@ -96,7 +104,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock ) require.NoError(t, err) - return NewController(logger, scope, store, br, registry, consumer.TopicKeyBuild, "orchestrator-build") + return NewController(logger, scope, storage.NewStaticFactory(store), staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") } func TestNewController(t *testing.T) { @@ -176,7 +184,7 @@ func TestController_Process_TriggersWithBaseAndHead(t *testing.T) { br := buildrunnermock.NewMockBuildRunner(ctrl) wantBase := []entity.Change{depReq.Change} wantHead := []entity.Change{head1.Change, head2.Change} - br.EXPECT().Trigger(gomock.Any(), headBatch.Queue, wantBase, wantHead, gomock.Nil()).Return(entity.BuildID{ID: "build-xyz"}, nil) + br.EXPECT().Trigger(gomock.Any(), wantBase, wantHead, gomock.Nil()).Return(entity.BuildID{ID: "build-xyz"}, nil) var publishedTopic string var published entity.BuildID @@ -197,7 +205,7 @@ func TestController_Process_TriggersWithBaseAndHead(t *testing.T) { ) require.NoError(t, err) - controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, br, registry, consumer.TopicKeyBuild, "orchestrator-build") + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, storage.NewStaticFactory(store), staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") msg := entityqueue.NewMessage(headBatch.ID, batchIDPayload(t, headBatch.ID), headBatch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -239,7 +247,7 @@ func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) { store.EXPECT().GetBuildStore().Return(mockBuildStore).AnyTimes() br := buildrunnermock.NewMockBuildRunner(ctrl) - br.EXPECT().Trigger(gomock.Any(), batch.Queue, gomock.Any(), gomock.Any(), gomock.Any()).Return(entity.BuildID{ID: "build-dup"}, nil) + br.EXPECT().Trigger(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(entity.BuildID{ID: "build-dup"}, nil) publishCalled := false mockPub := queuemock.NewMockPublisher(ctrl) @@ -255,7 +263,7 @@ func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) { []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}}, ) require.NoError(t, err) - controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, br, registry, consumer.TopicKeyBuild, "orchestrator-build") + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, storage.NewStaticFactory(store), staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -280,14 +288,14 @@ func TestController_Process_TriggerFailure(t *testing.T) { // No build store expectation: Trigger failure must short-circuit before Create. br := buildrunnermock.NewMockBuildRunner(ctrl) - br.EXPECT().Trigger(gomock.Any(), batch.Queue, gomock.Any(), gomock.Any(), gomock.Any()). + br.EXPECT().Trigger(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(entity.BuildID{}, fmt.Errorf("provider down")) registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: queuemock.NewMockQueue(ctrl)}}, ) require.NoError(t, err) - controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, br, registry, consumer.TopicKeyBuild, "orchestrator-build") + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, storage.NewStaticFactory(store), staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) diff --git a/submitqueue/orchestrator/controller/buildsignal/BUILD.bazel b/submitqueue/orchestrator/controller/buildsignal/BUILD.bazel index 1481d473..e3ca520a 100644 --- a/submitqueue/orchestrator/controller/buildsignal/BUILD.bazel +++ b/submitqueue/orchestrator/controller/buildsignal/BUILD.bazel @@ -29,6 +29,7 @@ go_test( "//submitqueue/core/consumer", "//submitqueue/entity", "//submitqueue/extension/buildrunner/mock", + "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/submitqueue/orchestrator/controller/buildsignal/buildsignal.go b/submitqueue/orchestrator/controller/buildsignal/buildsignal.go index 9d98045f..cf2b9960 100644 --- a/submitqueue/orchestrator/controller/buildsignal/buildsignal.go +++ b/submitqueue/orchestrator/controller/buildsignal/buildsignal.go @@ -58,7 +58,7 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage - buildRunner buildrunner.BuildRunner + buildRunners buildrunner.Factory registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -71,17 +71,22 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - store storage.Storage, - buildRunner buildrunner.BuildRunner, + stores storage.Factory, + buildRunners buildrunner.Factory, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { + // TODO(queue-aware): make this controller queue-aware during Process — derive the + // queue from the loaded entity and use it for structured logging, metrics scoping, + // and per-queue storage resolution. Today it uses the default store because the + // queue is only known after the by-ID load. + store, _ := stores.For("") return &Controller{ logger: logger.Named("buildsignal_controller"), metricsScope: scope.SubScope("buildsignal_controller"), store: store, - buildRunner: buildRunner, + buildRunners: buildRunners, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -132,7 +137,21 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r "partition_key", msg.PartitionKey, ) - status, _, err := c.buildRunner.Status(ctx, buildID) + // Load the batch first: it gives us the queue (needed to build the right + // BuildRunner) and lets us short-circuit halted batches before polling. + batch, err := c.store.GetBatchStore().Get(ctx, build.BatchID) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to get batch %s: %w", build.BatchID, err) + } + + buildRunner, err := c.buildRunners.For(buildrunner.Config{QueueName: batch.Queue}) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "status_errors", 1) + return fmt.Errorf("failed to build runner for batch %s: %w", batch.ID, err) + } + + status, _, err := buildRunner.Status(ctx, buildID) if err != nil { metrics.NamedCounter(c.metricsScope, opName, "status_errors", 1) return fmt.Errorf("failed to get status for build %s: %w", buildID.ID, err) @@ -143,11 +162,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r // avoids noise. For Cancelling batches the cancel controller owns the // terminal write and the downstream fan-out, so further pipeline work // would race against it; silent ack is the only safe action. - batch, err := c.store.GetBatchStore().Get(ctx, build.BatchID) - if err != nil { - metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) - return fmt.Errorf("failed to get batch %s: %w", build.BatchID, err) - } if entity.IsBatchStateHalted(batch.State) { metrics.NamedCounter(c.metricsScope, opName, "skipped_halted", 1) c.logger.Infow("skipping buildsignal publish for halted batch", diff --git a/submitqueue/orchestrator/controller/buildsignal/buildsignal_test.go b/submitqueue/orchestrator/controller/buildsignal/buildsignal_test.go index 6c732761..5d46ad0b 100644 --- a/submitqueue/orchestrator/controller/buildsignal/buildsignal_test.go +++ b/submitqueue/orchestrator/controller/buildsignal/buildsignal_test.go @@ -28,6 +28,7 @@ import ( "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" buildrunnermock "github.com/uber/submitqueue/submitqueue/extension/buildrunner/mock" + "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" @@ -47,6 +48,8 @@ type testHarness struct { func newTestHarness(t *testing.T, ctrl *gomock.Controller) *testHarness { br := buildrunnermock.NewMockBuildRunner(ctrl) + brFactory := buildrunnermock.NewMockFactory(ctrl) + brFactory.EXPECT().For(gomock.Any()).Return(br, nil).AnyTimes() signalPub := queuemock.NewMockPublisher(ctrl) signalQ := queuemock.NewMockQueue(ctrl) @@ -71,8 +74,8 @@ func newTestHarness(t *testing.T, ctrl *gomock.Controller) *testHarness { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - store, - br, + storage.NewStaticFactory(store), + brFactory, registry, consumer.TopicKeyBuildSignal, "orchestrator-buildsignal", @@ -200,6 +203,7 @@ func TestController_Process_StatusError(t *testing.T) { build := entity.Build{ID: "b-3", BatchID: "batch-3", Status: entity.BuildStatusAccepted} h.buildStore.EXPECT().Get(gomock.Any(), build.ID).Return(build, nil) + h.batchStore.EXPECT().Get(gomock.Any(), build.BatchID).Return(entity.Batch{ID: build.BatchID, State: entity.BatchStateSpeculating}, nil) h.br.EXPECT().Status(gomock.Any(), entity.BuildID{ID: build.ID}).Return(entity.BuildStatusUnknown, nil, errors.New("provider down")) // No UpdateStatus, no Publish, no PublishAfter expected. diff --git a/submitqueue/orchestrator/controller/cancel/cancel.go b/submitqueue/orchestrator/controller/cancel/cancel.go index cddd2336..93e693a7 100644 --- a/submitqueue/orchestrator/controller/cancel/cancel.go +++ b/submitqueue/orchestrator/controller/cancel/cancel.go @@ -84,11 +84,16 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - store storage.Storage, + stores storage.Factory, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { + // TODO(queue-aware): make this controller queue-aware during Process — derive the + // queue from the loaded entity and use it for structured logging, metrics scoping, + // and per-queue storage resolution. Today it uses the default store because the + // queue is only known after the by-ID load. + store, _ := stores.For("") return &Controller{ logger: logger.Named("cancel_controller"), metricsScope: scope.SubScope("cancel_controller"), diff --git a/submitqueue/orchestrator/controller/cancel/cancel_test.go b/submitqueue/orchestrator/controller/cancel/cancel_test.go index 82308c22..ca8c56f9 100644 --- a/submitqueue/orchestrator/controller/cancel/cancel_test.go +++ b/submitqueue/orchestrator/controller/cancel/cancel_test.go @@ -59,7 +59,7 @@ func newRegistry(t *testing.T, ctrl *gomock.Controller) (consumer.TopicRegistry, } func newController(t *testing.T, store storage.Storage, registry consumer.TopicRegistry) *Controller { - return NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, registry, consumer.TopicKeyCancel, "orchestrator-cancel") + return NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, storage.NewStaticFactory(store), registry, consumer.TopicKeyCancel, "orchestrator-cancel") } func newDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte, partitionKey string) consumer.Delivery { diff --git a/submitqueue/orchestrator/controller/conclude/conclude.go b/submitqueue/orchestrator/controller/conclude/conclude.go index 21a23270..8bde94d0 100644 --- a/submitqueue/orchestrator/controller/conclude/conclude.go +++ b/submitqueue/orchestrator/controller/conclude/conclude.go @@ -45,11 +45,16 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - store storage.Storage, + stores storage.Factory, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { + // TODO(queue-aware): make this controller queue-aware during Process — derive the + // queue from the loaded entity and use it for structured logging, metrics scoping, + // and per-queue storage resolution. Today it uses the default store because the + // queue is only known after the by-ID load. + store, _ := stores.For("") return &Controller{ logger: logger.Named("conclude_controller"), metricsScope: scope.SubScope("conclude_controller"), diff --git a/submitqueue/orchestrator/controller/conclude/conclude_test.go b/submitqueue/orchestrator/controller/conclude/conclude_test.go index 18d10c73..1a86440c 100644 --- a/submitqueue/orchestrator/controller/conclude/conclude_test.go +++ b/submitqueue/orchestrator/controller/conclude/conclude_test.go @@ -56,7 +56,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *stora registry, err := consumer.NewTopicRegistry(nil) require.NoError(t, err) - return NewController(logger, scope, mockStorage, registry, consumer.TopicKeyConclude, "orchestrator-conclude") + return NewController(logger, scope, storage.NewStaticFactory(mockStorage), registry, consumer.TopicKeyConclude, "orchestrator-conclude") } func TestNewController(t *testing.T) { diff --git a/submitqueue/orchestrator/controller/log/BUILD.bazel b/submitqueue/orchestrator/controller/log/BUILD.bazel index f700931c..d9594ff6 100644 --- a/submitqueue/orchestrator/controller/log/BUILD.bazel +++ b/submitqueue/orchestrator/controller/log/BUILD.bazel @@ -24,6 +24,7 @@ go_test( "//extension/messagequeue/mock", "//submitqueue/core/consumer", "//submitqueue/entity", + "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/submitqueue/orchestrator/controller/log/log.go b/submitqueue/orchestrator/controller/log/log.go index 3c6212dd..77fece36 100644 --- a/submitqueue/orchestrator/controller/log/log.go +++ b/submitqueue/orchestrator/controller/log/log.go @@ -44,10 +44,15 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - store storage.Storage, + stores storage.Factory, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { + // TODO(queue-aware): make this controller queue-aware during Process — derive the + // queue from the loaded entity and use it for structured logging, metrics scoping, + // and per-queue storage resolution. Today it uses the default store because the + // queue is only known after the by-ID load. + store, _ := stores.For("") return &Controller{ logger: logger.Named("log_controller"), metricsScope: scope.SubScope("log_controller"), diff --git a/submitqueue/orchestrator/controller/log/log_test.go b/submitqueue/orchestrator/controller/log/log_test.go index f8a1319d..c77c8622 100644 --- a/submitqueue/orchestrator/controller/log/log_test.go +++ b/submitqueue/orchestrator/controller/log/log_test.go @@ -25,6 +25,7 @@ import ( queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" @@ -35,7 +36,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope - return NewController(logger, scope, store, consumer.TopicKeyLog, "orchestrator-log") + return NewController(logger, scope, storage.NewStaticFactory(store), consumer.TopicKeyLog, "orchestrator-log") } func TestController_Process(t *testing.T) { diff --git a/submitqueue/orchestrator/controller/merge/BUILD.bazel b/submitqueue/orchestrator/controller/merge/BUILD.bazel index d2424dd0..ea6c5f9c 100644 --- a/submitqueue/orchestrator/controller/merge/BUILD.bazel +++ b/submitqueue/orchestrator/controller/merge/BUILD.bazel @@ -29,6 +29,7 @@ go_test( "//submitqueue/entity", "//submitqueue/extension/pusher", "//submitqueue/extension/pusher/mock", + "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/submitqueue/orchestrator/controller/merge/merge.go b/submitqueue/orchestrator/controller/merge/merge.go index 742f086e..117900e3 100644 --- a/submitqueue/orchestrator/controller/merge/merge.go +++ b/submitqueue/orchestrator/controller/merge/merge.go @@ -44,7 +44,7 @@ type Controller struct { metricsScope tally.Scope store storage.Storage registry consumer.TopicRegistry - pusher pusher.Pusher + pushers pusher.Factory topicKey consumer.TopicKey consumerGroup string } @@ -56,18 +56,23 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - store storage.Storage, + stores storage.Factory, registry consumer.TopicRegistry, - pusherImpl pusher.Pusher, + pushers pusher.Factory, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { + // TODO(queue-aware): make this controller queue-aware during Process — derive the + // queue from the loaded entity and use it for structured logging, metrics scoping, + // and per-queue storage resolution. Today it uses the default store because the + // queue is only known after the by-ID load. + store, _ := stores.For("") return &Controller{ logger: logger.Named("merge_controller"), metricsScope: scope.SubScope("merge_controller"), store: store, registry: registry, - pusher: pusherImpl, + pushers: pushers, topicKey: topicKey, consumerGroup: consumerGroup, } @@ -126,7 +131,13 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return fmt.Errorf("failed to collect changes for batch %s: %w", batch.ID, err) } - pushRes, pushErr := c.pusher.Push(ctx, changes) + push, err := c.pushers.For(pusher.Config{QueueName: batch.Queue}) + if err != nil { + coremetrics.NamedCounter(c.metricsScope, "process", "push_errors", 1) + return fmt.Errorf("failed to build pusher for batch %s: %w", batch.ID, err) + } + + pushRes, pushErr := push.Push(ctx, changes) var newState entity.BatchState switch { diff --git a/submitqueue/orchestrator/controller/merge/merge_test.go b/submitqueue/orchestrator/controller/merge/merge_test.go index 2d69468f..36b1e3c2 100644 --- a/submitqueue/orchestrator/controller/merge/merge_test.go +++ b/submitqueue/orchestrator/controller/merge/merge_test.go @@ -32,6 +32,7 @@ import ( "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/pusher" pushermock "github.com/uber/submitqueue/submitqueue/extension/pusher/mock" + "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" ) @@ -67,15 +68,22 @@ func newRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error) consum return registry } +// newPusherFactory wraps a Pusher in a factory that returns it for any entityqueue. +func newPusherFactory(ctrl *gomock.Controller, p pusher.Pusher) pusher.Factory { + f := pushermock.NewMockFactory(ctrl) + f.EXPECT().For(gomock.Any()).Return(p, nil).AnyTimes() + return f +} + func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) store := storagemock.NewMockStorage(ctrl) c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - store, + storage.NewStaticFactory(store), newRegistry(t, ctrl, nil), - pushermock.NewMockPusher(ctrl), + newPusherFactory(ctrl, pushermock.NewMockPusher(ctrl)), consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -136,9 +144,9 @@ func TestController_Process_SuccessfulMerge(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - store, + storage.NewStaticFactory(store), newRegistry(t, ctrl, nil), - mockPusher, + newPusherFactory(ctrl, mockPusher), consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -200,9 +208,9 @@ func TestController_Process_PassesAllChangesInBatchOrder(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - store, + storage.NewStaticFactory(store), newRegistry(t, ctrl, nil), - mockPusher, + newPusherFactory(ctrl, mockPusher), consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -247,9 +255,9 @@ func TestController_Process_PushConflictMarksBatchFailed(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - store, + storage.NewStaticFactory(store), newRegistry(t, ctrl, nil), - mockPusher, + newPusherFactory(ctrl, mockPusher), consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -293,9 +301,9 @@ func TestController_Process_PushInfraFailureReturnsError(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - store, + storage.NewStaticFactory(store), newRegistry(t, ctrl, nil), - mockPusher, + newPusherFactory(ctrl, mockPusher), consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -334,9 +342,9 @@ func TestController_Process_TerminalBatchSkipsPushButFansOut(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - store, + storage.NewStaticFactory(store), newRegistry(t, ctrl, nil), - mockPusher, + newPusherFactory(ctrl, mockPusher), consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -384,9 +392,9 @@ func TestController_Process_CancellingShortCircuit(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - store, + storage.NewStaticFactory(store), registry, - mockPusher, + newPusherFactory(ctrl, mockPusher), consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -409,9 +417,9 @@ func TestController_Process_BatchStoreGetFailureNotRetryable(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - store, + storage.NewStaticFactory(store), newRegistry(t, ctrl, nil), - pushermock.NewMockPusher(ctrl), + newPusherFactory(ctrl, pushermock.NewMockPusher(ctrl)), consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -448,9 +456,9 @@ func TestController_Process_RequestStoreFailurePropagates(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - store, + storage.NewStaticFactory(store), newRegistry(t, ctrl, nil), - pushermock.NewMockPusher(ctrl), + newPusherFactory(ctrl, pushermock.NewMockPusher(ctrl)), consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -496,9 +504,9 @@ func TestController_Process_PublishFailureSurfaces(t *testing.T) { c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, - store, + storage.NewStaticFactory(store), newRegistry(t, ctrl, fmt.Errorf("queue down")), - mockPusher, + newPusherFactory(ctrl, mockPusher), consumer.TopicKeyMerge, "orchestrator-merge", ) diff --git a/submitqueue/orchestrator/controller/score/BUILD.bazel b/submitqueue/orchestrator/controller/score/BUILD.bazel index e1f9aa49..edaf8847 100644 --- a/submitqueue/orchestrator/controller/score/BUILD.bazel +++ b/submitqueue/orchestrator/controller/score/BUILD.bazel @@ -29,6 +29,7 @@ go_test( "//submitqueue/core/consumer", "//submitqueue/entity", "//submitqueue/extension/scorer/mock", + "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/submitqueue/orchestrator/controller/score/score.go b/submitqueue/orchestrator/controller/score/score.go index d4aa5f2c..702bdd75 100644 --- a/submitqueue/orchestrator/controller/score/score.go +++ b/submitqueue/orchestrator/controller/score/score.go @@ -37,7 +37,7 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage - scorer scorer.Scorer + scorers scorer.Factory registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -50,17 +50,22 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - store storage.Storage, - scorer scorer.Scorer, + stores storage.Factory, + scorers scorer.Factory, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { + // TODO(queue-aware): make this controller queue-aware during Process — derive the + // queue from the loaded entity and use it for structured logging, metrics scoping, + // and per-queue storage resolution. Today it uses the default store because the + // queue is only known after the by-ID load. + store, _ := stores.For("") return &Controller{ logger: logger.Named("score_controller"), metricsScope: scope.SubScope("score_controller"), store: store, - scorer: scorer, + scorers: scorers, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -177,13 +182,17 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r // Uses multiplicative probability: if any single request fails, the entire batch fails, // so the batch score is the product of individual request scores. func (c *Controller) scoreBatch(ctx context.Context, batch entity.Batch) (float64, error) { + sc, err := c.scorers.For(scorer.Config{QueueName: batch.Queue}) + if err != nil { + return 0, fmt.Errorf("failed to build scorer for batch %s: %w", batch.ID, err) + } score := 1.0 for _, requestID := range batch.Contains { request, err := c.store.GetRequestStore().Get(ctx, requestID) if err != nil { return 0, fmt.Errorf("failed to get request %s: %w", requestID, err) } - s, err := c.scorer.Score(ctx, request.Change) + s, err := sc.Score(ctx, request.Change) if err != nil { return 0, fmt.Errorf("failed to score request %s: %w", requestID, err) } diff --git a/submitqueue/orchestrator/controller/score/score_test.go b/submitqueue/orchestrator/controller/score/score_test.go index ef866753..7f63555a 100644 --- a/submitqueue/orchestrator/controller/score/score_test.go +++ b/submitqueue/orchestrator/controller/score/score_test.go @@ -28,6 +28,7 @@ import ( "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" scorermock "github.com/uber/submitqueue/submitqueue/extension/scorer/mock" + "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" @@ -103,7 +104,10 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock ) require.NoError(t, err) - return NewController(logger, scope, store, scorer, registry, consumer.TopicKeyScore, "orchestrator-score") + scorerFactory := scorermock.NewMockFactory(ctrl) + scorerFactory.EXPECT().For(gomock.Any()).Return(scorer, nil).AnyTimes() + + return NewController(logger, scope, storage.NewStaticFactory(store), scorerFactory, registry, consumer.TopicKeyScore, "orchestrator-score") } func TestNewController(t *testing.T) { @@ -354,7 +358,9 @@ func TestController_Process_TerminalShortCircuit(t *testing.T) { ) require.NoError(t, err) - controller := NewController(logger, scope, mockStorage, mockScorer, registry, consumer.TopicKeyScore, "orchestrator-score") + scorerFactory := scorermock.NewMockFactory(ctrl) + scorerFactory.EXPECT().For(gomock.Any()).Return(mockScorer, nil).AnyTimes() + controller := NewController(logger, scope, storage.NewStaticFactory(mockStorage), scorerFactory, registry, consumer.TopicKeyScore, "orchestrator-score") msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -402,7 +408,9 @@ func TestController_Process_CancellingShortCircuit(t *testing.T) { ) require.NoError(t, err) - controller := NewController(logger, scope, mockStorage, mockScorer, registry, consumer.TopicKeyScore, "orchestrator-score") + scorerFactory := scorermock.NewMockFactory(ctrl) + scorerFactory.EXPECT().For(gomock.Any()).Return(mockScorer, nil).AnyTimes() + controller := NewController(logger, scope, storage.NewStaticFactory(mockStorage), scorerFactory, registry, consumer.TopicKeyScore, "orchestrator-score") msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) diff --git a/submitqueue/orchestrator/controller/speculate/speculate.go b/submitqueue/orchestrator/controller/speculate/speculate.go index 05471979..0760f830 100644 --- a/submitqueue/orchestrator/controller/speculate/speculate.go +++ b/submitqueue/orchestrator/controller/speculate/speculate.go @@ -71,11 +71,16 @@ const opName = "process" func NewController( logger *zap.SugaredLogger, scope tally.Scope, - store storage.Storage, + stores storage.Factory, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { + // TODO(queue-aware): make this controller queue-aware during Process — derive the + // queue from the loaded entity and use it for structured logging, metrics scoping, + // and per-queue storage resolution. Today it uses the default store because the + // queue is only known after the by-ID load. + store, _ := stores.For("") return &Controller{ logger: logger.Named("speculate_controller"), metricsScope: scope.SubScope("speculate_controller"), diff --git a/submitqueue/orchestrator/controller/speculate/speculate_test.go b/submitqueue/orchestrator/controller/speculate/speculate_test.go index 2342bd17..6a684696 100644 --- a/submitqueue/orchestrator/controller/speculate/speculate_test.go +++ b/submitqueue/orchestrator/controller/speculate/speculate_test.go @@ -77,7 +77,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock ) require.NoError(t, err) - return NewController(logger, scope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + return NewController(logger, scope, storage.NewStaticFactory(store), registry, consumer.TopicKeySpeculate, "orchestrator-speculate") } // runProcess builds a delivery for batchID and invokes Process once. @@ -276,7 +276,7 @@ func TestController_Process_TerminalSelfHeals(t *testing.T) { require.NoError(t, err) logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + controller := NewController(logger, tally.NoopScope, storage.NewStaticFactory(store), registry, consumer.TopicKeySpeculate, "orchestrator-speculate") require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) }) @@ -331,7 +331,7 @@ func TestController_Process_CancelledTerminalSelfHealsDependents(t *testing.T) { require.NoError(t, err) logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + controller := NewController(logger, tally.NoopScope, storage.NewStaticFactory(store), registry, consumer.TopicKeySpeculate, "orchestrator-speculate") require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -399,7 +399,7 @@ func TestController_Process_CancellingTerminalFlow(t *testing.T) { require.NoError(t, err) logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + controller := NewController(logger, tally.NoopScope, storage.NewStaticFactory(store), registry, consumer.TopicKeySpeculate, "orchestrator-speculate") require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -508,7 +508,7 @@ func TestController_Process_CancellingNoDependents(t *testing.T) { require.NoError(t, err) logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + controller := NewController(logger, tally.NoopScope, storage.NewStaticFactory(store), registry, consumer.TopicKeySpeculate, "orchestrator-speculate") require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) } @@ -549,7 +549,7 @@ func TestController_Process_CancellingTerminalCASVersionMismatch(t *testing.T) { require.NoError(t, err) logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + controller := NewController(logger, tally.NoopScope, storage.NewStaticFactory(store), registry, consumer.TopicKeySpeculate, "orchestrator-speculate") err = runProcess(t, ctrl, controller, batch.ID) require.Error(t, err) diff --git a/submitqueue/orchestrator/controller/start/start.go b/submitqueue/orchestrator/controller/start/start.go index 5349835f..889481d0 100644 --- a/submitqueue/orchestrator/controller/start/start.go +++ b/submitqueue/orchestrator/controller/start/start.go @@ -54,12 +54,17 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - store storage.Storage, + stores storage.Factory, changeStore changestore.ChangeStore, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { + // TODO(queue-aware): make this controller queue-aware during Process — derive the + // queue from the loaded entity and use it for structured logging, metrics scoping, + // and per-queue storage resolution. Today it uses the default store because the + // queue is only known after the by-ID load. + store, _ := stores.For("") return &Controller{ logger: logger.Named("start_controller"), metricsScope: scope.SubScope("start_controller"), diff --git a/submitqueue/orchestrator/controller/start/start_test.go b/submitqueue/orchestrator/controller/start/start_test.go index 05995542..f7c9a6fa 100644 --- a/submitqueue/orchestrator/controller/start/start_test.go +++ b/submitqueue/orchestrator/controller/start/start_test.go @@ -63,7 +63,7 @@ func newTestController( ) require.NoError(t, err) - return NewController(logger, scope, store, cs, registry, consumer.TopicKeyStart, "orchestrator-start") + return NewController(logger, scope, storage.NewStaticFactory(store), cs, registry, consumer.TopicKeyStart, "orchestrator-start") } // newMockStorage creates a MockStorage with a MockRequestStore that succeeds on Create. diff --git a/submitqueue/orchestrator/controller/validate/BUILD.bazel b/submitqueue/orchestrator/controller/validate/BUILD.bazel index 91275d9c..19b7cf21 100644 --- a/submitqueue/orchestrator/controller/validate/BUILD.bazel +++ b/submitqueue/orchestrator/controller/validate/BUILD.bazel @@ -31,6 +31,7 @@ go_test( "//submitqueue/core/consumer", "//submitqueue/entity", "//submitqueue/extension/changeprovider", + "//submitqueue/extension/changeprovider/mock", "//submitqueue/extension/changestore/mock", "//submitqueue/extension/mergechecker", "//submitqueue/extension/mergechecker/mock", diff --git a/submitqueue/orchestrator/controller/validate/validate.go b/submitqueue/orchestrator/controller/validate/validate.go index 786dc9fd..22e581a6 100644 --- a/submitqueue/orchestrator/controller/validate/validate.go +++ b/submitqueue/orchestrator/controller/validate/validate.go @@ -37,15 +37,15 @@ import ( // merge conflicts, change metadata fetch), and publishes to the batch stage. Validation logic // is extensible to support additional checks. Implements consumer.Controller. type Controller struct { - logger *zap.SugaredLogger - metricsScope tally.Scope - store storage.Storage - changeStore changestore.ChangeStore - registry consumer.TopicRegistry - mergeChecker mergechecker.MergeChecker - changeProvider changeprovider.ChangeProvider - topicKey consumer.TopicKey - consumerGroup string + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + changeStore changestore.ChangeStore + registry consumer.TopicRegistry + mergeCheckers mergechecker.Factory + changeProviders changeprovider.Factory + topicKey consumer.TopicKey + consumerGroup string } // Verify Controller implements consumer.Controller interface at compile time. @@ -55,24 +55,29 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, - store storage.Storage, + stores storage.Factory, changeStore changestore.ChangeStore, registry consumer.TopicRegistry, - mergeChecker mergechecker.MergeChecker, - changeProvider changeprovider.ChangeProvider, + mergeCheckers mergechecker.Factory, + changeProviders changeprovider.Factory, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { + // TODO(queue-aware): make this controller queue-aware during Process — derive the + // queue from the loaded entity and use it for structured logging, metrics scoping, + // and per-queue storage resolution. Today it uses the default store because the + // queue is only known after the by-ID load. + store, _ := stores.For("") return &Controller{ - logger: logger.Named("validate_controller"), - metricsScope: scope.SubScope("validate_controller"), - store: store, - changeStore: changeStore, - registry: registry, - mergeChecker: mergeChecker, - changeProvider: changeProvider, - topicKey: topicKey, - consumerGroup: consumerGroup, + logger: logger.Named("validate_controller"), + metricsScope: scope.SubScope("validate_controller"), + store: store, + changeStore: changeStore, + registry: registry, + mergeCheckers: mergeCheckers, + changeProviders: changeProviders, + topicKey: topicKey, + consumerGroup: consumerGroup, } } @@ -138,7 +143,12 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r } // Merge conflict check - mergeResult, err := c.mergeChecker.Check(ctx, request.Queue, request.Change) + mergeChecker, err := c.mergeCheckers.For(mergechecker.Config{QueueName: request.Queue}) + if err != nil { + coremetrics.NamedCounter(c.metricsScope, "process", "merge_check_errors", 1) + return fmt.Errorf("failed to build merge checker for queue %s: %w", request.Queue, err) + } + mergeResult, err := mergeChecker.Check(ctx, request.Change) if err != nil { coremetrics.NamedCounter(c.metricsScope, "process", "merge_check_errors", 1) return fmt.Errorf("merge check failed: %w", err) @@ -154,7 +164,12 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r } // Fetch change metadata - changeInfos, err := c.changeProvider.Get(ctx, request.Change) + changeProvider, err := c.changeProviders.For(changeprovider.Config{QueueName: request.Queue}) + if err != nil { + coremetrics.NamedCounter(c.metricsScope, "process", "change_provider_errors", 1) + return fmt.Errorf("failed to build change provider for queue %s: %w", request.Queue, err) + } + changeInfos, err := changeProvider.Get(ctx, request.Change) if err != nil { coremetrics.NamedCounter(c.metricsScope, "process", "change_provider_errors", 1) return fmt.Errorf("failed to fetch change information for request %s: %w", request.ID, err) diff --git a/submitqueue/orchestrator/controller/validate/validate_test.go b/submitqueue/orchestrator/controller/validate/validate_test.go index 82f98988..2556d0fc 100644 --- a/submitqueue/orchestrator/controller/validate/validate_test.go +++ b/submitqueue/orchestrator/controller/validate/validate_test.go @@ -28,6 +28,7 @@ import ( "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/changeprovider" + changeprovidermock "github.com/uber/submitqueue/submitqueue/extension/changeprovider/mock" changemock "github.com/uber/submitqueue/submitqueue/extension/changestore/mock" "github.com/uber/submitqueue/submitqueue/extension/mergechecker" mergecheckermock "github.com/uber/submitqueue/submitqueue/extension/mergechecker/mock" @@ -65,7 +66,7 @@ func (m *mockChangeProvider) Get(ctx context.Context, change entity.Change) ([]c // newMergeableMock returns a mock MergeChecker that always returns mergeable. func newMergeableMock(ctrl *gomock.Controller) *mergecheckermock.MockMergeChecker { mc := mergecheckermock.NewMockMergeChecker(ctrl) - mc.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(mergechecker.Result{Mergeable: true}, nil).AnyTimes() + mc.EXPECT().Check(gomock.Any(), gomock.Any()).Return(mergechecker.Result{Mergeable: true}, nil).AnyTimes() return mc } @@ -118,7 +119,12 @@ func newTestController( cp := &mockChangeProvider{} - return NewController(logger, scope, store, cs, registry, mc, cp, consumer.TopicKeyValidate, "orchestrator-validate") + mcFactory := mergecheckermock.NewMockFactory(ctrl) + mcFactory.EXPECT().For(gomock.Any()).Return(mc, nil).AnyTimes() + cpFactory := changeprovidermock.NewMockFactory(ctrl) + cpFactory.EXPECT().For(gomock.Any()).Return(cp, nil).AnyTimes() + + return NewController(logger, scope, storage.NewStaticFactory(store), cs, registry, mcFactory, cpFactory, consumer.TopicKeyValidate, "orchestrator-validate") } func TestNewController(t *testing.T) { @@ -223,7 +229,7 @@ func TestController_Process_NotMergeable(t *testing.T) { ctrl := gomock.NewController(t) mc := mergecheckermock.NewMockMergeChecker(ctrl) - mc.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(mergechecker.Result{Mergeable: false}, nil) + mc.EXPECT().Check(gomock.Any(), gomock.Any()).Return(mergechecker.Result{Mergeable: false}, nil) request := entity.Request{ ID: "test-queue/123", @@ -250,7 +256,7 @@ func TestController_Process_MergeCheckError(t *testing.T) { ctrl := gomock.NewController(t) mc := mergecheckermock.NewMockMergeChecker(ctrl) - mc.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(mergechecker.Result{}, fmt.Errorf("merge check failed")) + mc.EXPECT().Check(gomock.Any(), gomock.Any()).Return(mergechecker.Result{}, fmt.Errorf("merge check failed")) request := entity.Request{ ID: "test-queue/123",