diff --git a/example/server/orchestrator/BUILD.bazel b/example/server/orchestrator/BUILD.bazel index 084a1857..698662c3 100644 --- a/example/server/orchestrator/BUILD.bazel +++ b/example/server/orchestrator/BUILD.bazel @@ -16,6 +16,8 @@ go_library( "//entity", "//extension/changeprovider", "//extension/changeprovider/github", + "//extension/changestore", + "//extension/changestore/mysql", "//extension/counter", "//extension/counter/mysql", "//extension/mergechecker", diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index cbc7265c..fc39eb2a 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -35,6 +35,8 @@ import ( "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/extension/changeprovider" githubprovider "github.com/uber/submitqueue/extension/changeprovider/github" + "github.com/uber/submitqueue/extension/changestore" + mysqlchangestore "github.com/uber/submitqueue/extension/changestore/mysql" "github.com/uber/submitqueue/extension/counter" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" "github.com/uber/submitqueue/extension/mergechecker" @@ -154,6 +156,8 @@ func run() error { return fmt.Errorf("failed to create storage: %w", err) } + changeStore := mysqlchangestore.NewChangeStore(appDB, scope.SubScope("changestore")) + // Open queue database connection // Docker Compose healthchecks ensure MySQL is ready before service starts queueDSN := os.Getenv("QUEUE_MYSQL_DSN") @@ -212,7 +216,7 @@ func run() error { } // Register controllers - if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, cnt, store); err != nil { + if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, cnt, store, changeStore); err != nil { return err } @@ -397,11 +401,12 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe // │ │ │ // └────────┴────────────────────────┘ -func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, cnt counter.Counter, store storage.Storage) error { +func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error { requestController := start.NewController( logger, scope, store, + changeStore, registry, consumer.TopicKeyStart, "orchestrator-start", @@ -414,6 +419,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, store, + changeStore, registry, mc, cp, diff --git a/orchestrator/controller/start/BUILD.bazel b/orchestrator/controller/start/BUILD.bazel index df436444..85ce2a89 100644 --- a/orchestrator/controller/start/BUILD.bazel +++ b/orchestrator/controller/start/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//core/request", "//entity", "//entity/queue", + "//extension/changestore", "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", @@ -25,6 +26,7 @@ go_test( "//core/errs", "//entity", "//entity/queue", + "//extension/changestore/mock", "//extension/queue/mock", "//extension/storage", "//extension/storage/mock", diff --git a/orchestrator/controller/start/start.go b/orchestrator/controller/start/start.go index 643f6ef1..f3d449a8 100644 --- a/orchestrator/controller/start/start.go +++ b/orchestrator/controller/start/start.go @@ -18,23 +18,29 @@ import ( "context" "errors" "fmt" + "time" "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" corerequest "github.com/uber/submitqueue/core/request" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/changestore" "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) // Controller handles start queue messages. -// It consumes requests, persists them to storage, and publishes to the validate stage. -// Implements consumer.Controller interface for integration with the consumer. +// It consumes requests, persists them to the request store, claims their URIs in +// the change store, and publishes to the validate stage. Both writes are idempotent +// on retries; the duplicate-detection check itself is performed downstream by the +// validate controller, which reads the change store and consults the request store +// for liveness. Implements consumer.Controller. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage + changeStore changestore.ChangeStore registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -48,6 +54,7 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, + changeStore changestore.ChangeStore, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -56,6 +63,7 @@ func NewController( logger: logger.Named("start_controller"), metricsScope: scope.SubScope("start_controller"), store: store, + changeStore: changeStore, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -63,14 +71,13 @@ func NewController( } // Process processes a request delivery from the queue. -// Deserializes the request and publishes to the validate topic. +// Persists the request, claims its URIs in the change store, and publishes to validate. // Returns nil to ack (success), or error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { c.metricsScope.Counter("received").Inc(1) msg := delivery.Message() - // Deserialize land request from gateway landRequest, err := entity.LandRequestFromBytes(msg.Payload) if err != nil { c.metricsScope.Counter("deserialize_errors").Inc(1) @@ -78,7 +85,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return fmt.Errorf("failed to deserialize land request: %w", err) } - // Construct the full versioned Request entity with orchestrator-owned fields request := entity.Request{ ID: landRequest.ID, Queue: landRequest.Queue, @@ -100,22 +106,29 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "partition_key", msg.PartitionKey, ) - // Persist request to storage (idempotent — ErrAlreadyExists means a retry) + // Persist request to storage. ErrAlreadyExists means a queue redelivery of the same + // request_id (an at-least-once retry of THIS message), not a cross-request collision. if err := c.store.GetRequestStore().Create(ctx, request); err != nil && !errors.Is(err, storage.ErrAlreadyExists) { c.metricsScope.Counter("storage_errors").Inc(1) return fmt.Errorf("failed to create request: %w", err) } - // Record the "new" status in the request log + // Claim each URI in the change store. Different requests on the same URI write + // distinct rows (different request_id), so cross-request URI overlap does not + // collide on insert; the validate controller surfaces it via GetByURI + a + // liveness check against the request store. + if err := c.claimURIs(ctx, request); err != nil { + c.metricsScope.Counter("change_store_errors").Inc(1) + return fmt.Errorf("failed to claim URIs for request %s: %w", request.ID, err) + } + + // Record the "new" status in the request log. logEntry := entity.NewRequestLog(request.ID, entity.RequestStatusStarted, request.Version, "", nil) - // Using request.ID as the partition key to ensure ordering of log entries for the same request - // and parallel processing of log entries for different requests. if err := corerequest.PublishLog(ctx, c.registry, logEntry, request.ID); err != nil { c.metricsScope.Counter("request_log_errors").Inc(1) return fmt.Errorf("failed to publish request log: %w", err) } - // Publish to validate topic if err := c.publish(ctx, consumer.TopicKeyValidate, request.ID, request.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to validate: %w", err) @@ -127,8 +140,28 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) c.metricsScope.Counter("processed").Inc(1) + return nil +} - return nil // Success - message will be acked +// claimURIs persists one ChangeRecord per URI in the request. Each Create call is +// independent; the change store's per-PK idempotency makes the loop safe under +// queue redelivery (same (Queue, URI, RequestID) is a no-op on retry). +func (c *Controller) claimURIs(ctx context.Context, request entity.Request) error { + now := time.Now().UnixMilli() + for _, uri := range request.Change.URIs { + record := entity.ChangeRecord{ + URI: uri, + RequestID: request.ID, + Queue: request.Queue, + CreatedAt: now, + UpdatedAt: now, + Version: 1, + } + if err := c.changeStore.Create(ctx, record); err != nil { + return fmt.Errorf("failed to claim uri=%s for request %s: %w", uri, request.ID, err) + } + } + return nil } // publish publishes a request ID to the specified topic key. diff --git a/orchestrator/controller/start/start_test.go b/orchestrator/controller/start/start_test.go index b294f09c..8793b912 100644 --- a/orchestrator/controller/start/start_test.go +++ b/orchestrator/controller/start/start_test.go @@ -26,6 +26,7 @@ import ( "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" + changemock "github.com/uber/submitqueue/extension/changestore/mock" queuemock "github.com/uber/submitqueue/extension/queue/mock" "github.com/uber/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/extension/storage/mock" @@ -34,7 +35,13 @@ import ( ) // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { +func newTestController( + t *testing.T, + ctrl *gomock.Controller, + store *storagemock.MockStorage, + cs *changemock.MockChangeStore, + publishErr error, +) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -56,7 +63,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock ) require.NoError(t, err) - return NewController(logger, scope, store, registry, consumer.TopicKeyStart, "orchestrator-request") + return NewController(logger, scope, store, cs, registry, consumer.TopicKeyStart, "orchestrator-start") } // newMockStorage creates a MockStorage with a MockRequestStore that succeeds on Create. @@ -69,44 +76,52 @@ func newMockStorage(ctrl *gomock.Controller) *storagemock.MockStorage { return store } +// newMockChangeStore returns a MockChangeStore that accepts any Create call. +func newMockChangeStore(ctrl *gomock.Controller) *changemock.MockChangeStore { + cs := changemock.NewMockChangeStore(ctrl) + cs.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + return cs +} + +// makeDelivery builds a MockDelivery wrapping a serialized LandRequest. +func makeDelivery(t *testing.T, ctrl *gomock.Controller, lr entity.LandRequest) *queuemock.MockDelivery { + payload, err := lr.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(lr.ID, payload, lr.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + return delivery +} + func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) + controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyStart, controller.TopicKey()) - assert.Equal(t, "orchestrator-request", controller.ConsumerGroup()) + assert.Equal(t, "orchestrator-start", controller.ConsumerGroup()) assert.Equal(t, "start", controller.Name()) } func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), nil) - controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) - - landRequest := entity.LandRequest{ + delivery := makeDelivery(t, ctrl, entity.LandRequest{ ID: "test-queue/123", Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, LandStrategy: entity.RequestLandStrategyRebase, - } - - payload, err := landRequest.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + }) - err = controller.Process(context.Background(), delivery) - require.NoError(t, err) + require.NoError(t, controller.Process(context.Background(), delivery)) } func TestController_Process_InvalidJSON(t *testing.T) { ctrl := gomock.NewController(t) - - controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) + controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), nil) invalidPayload := []byte(`{"invalid": json"}`) msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) @@ -114,10 +129,7 @@ func TestController_Process_InvalidJSON(t *testing.T) { delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - // Process the delivery err := controller.Process(context.Background(), delivery) - - // Should return NonRetryableError for malformed messages require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } @@ -125,7 +137,6 @@ func TestController_Process_InvalidJSON(t *testing.T) { func TestController_Process_ConstructsRequestWithStateAndVersion(t *testing.T) { ctrl := gomock.NewController(t) - // Capture the request passed to Create var capturedRequest entity.Request mockReqStore := storagemock.NewMockRequestStore(ctrl) mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).DoAndReturn( @@ -137,7 +148,7 @@ func TestController_Process_ConstructsRequestWithStateAndVersion(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() - controller := newTestController(t, ctrl, store, nil) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil) landRequest := entity.LandRequest{ ID: "test-queue/42", @@ -145,19 +156,9 @@ func TestController_Process_ConstructsRequestWithStateAndVersion(t *testing.T) { Change: entity.Change{URIs: []string{"github://uber/service/pull/1/abc123def"}}, LandStrategy: entity.RequestLandStrategySquashRebase, } + delivery := makeDelivery(t, ctrl, landRequest) + require.NoError(t, controller.Process(context.Background(), delivery)) - payload, err := landRequest.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() - - err = controller.Process(context.Background(), delivery) - require.NoError(t, err) - - // Verify the controller sets State and Version on the constructed Request assert.Equal(t, landRequest.ID, capturedRequest.ID) assert.Equal(t, landRequest.Queue, capturedRequest.Queue) assert.Equal(t, landRequest.Change.URIs, capturedRequest.Change.URIs) @@ -179,26 +180,16 @@ func TestController_Process_AllStrategies(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), nil) - controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) - - landRequest := entity.LandRequest{ + delivery := makeDelivery(t, ctrl, entity.LandRequest{ ID: fmt.Sprintf("queue/%s", tt.strategy), Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/1/aaa111bbb"}}, LandStrategy: tt.strategy, - } - - payload, err := landRequest.ToBytes() - require.NoError(t, err) + }) - msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() - - err = controller.Process(context.Background(), delivery) - require.NoError(t, err) + require.NoError(t, controller.Process(context.Background(), delivery)) }) } } @@ -206,55 +197,54 @@ func TestController_Process_AllStrategies(t *testing.T) { func TestController_Process_MultipleChanges(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) - - landRequest := entity.LandRequest{ - ID: "queue/999", - Queue: "test-queue", - Change: entity.Change{ - URIs: []string{ - "github://uber/monorepo/pull/1/aaa111", - "github://uber/monorepo/pull/2/bbb222", - "github://uber/monorepo/pull/3/ccc333", - }, + cs := changemock.NewMockChangeStore(ctrl) + var captured []entity.ChangeRecord + cs.EXPECT().Create(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, record entity.ChangeRecord) error { + captured = append(captured, record) + return nil }, - LandStrategy: entity.RequestLandStrategySquashRebase, - } + ).Times(3) - payload, err := landRequest.ToBytes() - require.NoError(t, err) + controller := newTestController(t, ctrl, newMockStorage(ctrl), cs, nil) - msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() - - err = controller.Process(context.Background(), delivery) - require.NoError(t, err) + uris := []string{ + "github://uber/monorepo/pull/1/aaa111", + "github://uber/monorepo/pull/2/bbb222", + "github://uber/monorepo/pull/3/ccc333", + } + delivery := makeDelivery(t, ctrl, entity.LandRequest{ + ID: "queue/999", + Queue: "test-queue", + Change: entity.Change{URIs: uris}, + LandStrategy: entity.RequestLandStrategySquashRebase, + }) + + require.NoError(t, controller.Process(context.Background(), delivery)) + + require.Len(t, captured, len(uris)) + for i, r := range captured { + assert.Equal(t, uris[i], r.URI) + assert.Equal(t, "queue/999", r.RequestID) + assert.Equal(t, "test-queue", r.Queue) + assert.Equal(t, int32(1), r.Version) + assert.Positive(t, r.CreatedAt) + assert.Equal(t, r.CreatedAt, r.UpdatedAt) + } } func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), fmt.Errorf("publish failed")) - controller := newTestController(t, ctrl, newMockStorage(ctrl), fmt.Errorf("publish failed")) - - landRequest := entity.LandRequest{ + delivery := makeDelivery(t, ctrl, entity.LandRequest{ ID: "test-queue/123", Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, LandStrategy: entity.RequestLandStrategyRebase, - } - - payload, err := landRequest.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + }) - err = controller.Process(context.Background(), delivery) - assert.Error(t, err) + assert.Error(t, controller.Process(context.Background(), delivery)) } func TestController_Process_StorageFailure(t *testing.T) { @@ -265,24 +255,16 @@ func TestController_Process_StorageFailure(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() - controller := newTestController(t, ctrl, store, nil) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil) - landRequest := entity.LandRequest{ + delivery := makeDelivery(t, ctrl, entity.LandRequest{ ID: "test-queue/123", Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, LandStrategy: entity.RequestLandStrategyRebase, - } - - payload, err := landRequest.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + }) - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } @@ -295,31 +277,39 @@ func TestController_Process_AlreadyExistsSucceeds(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() - controller := newTestController(t, ctrl, store, nil) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil) - landRequest := entity.LandRequest{ + delivery := makeDelivery(t, ctrl, entity.LandRequest{ ID: "test-queue/123", Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, LandStrategy: entity.RequestLandStrategyRebase, - } + }) - payload, err := landRequest.ToBytes() - require.NoError(t, err) + require.NoError(t, controller.Process(context.Background(), delivery)) +} - msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() +func TestController_Process_ChangeStoreFailure(t *testing.T) { + ctrl := gomock.NewController(t) - // Should succeed even though Create returns ErrAlreadyExists (idempotent) - err = controller.Process(context.Background(), delivery) - require.NoError(t, err) + cs := changemock.NewMockChangeStore(ctrl) + cs.EXPECT().Create(gomock.Any(), gomock.Any()).Return(fmt.Errorf("change store down")) + + controller := newTestController(t, ctrl, newMockStorage(ctrl), cs, nil) + + delivery := makeDelivery(t, ctrl, entity.LandRequest{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, + LandStrategy: entity.RequestLandStrategyRebase, + }) + + require.Error(t, controller.Process(context.Background(), delivery)) } func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) + controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), nil) var _ consumer.Controller = controller } diff --git a/orchestrator/controller/validate/BUILD.bazel b/orchestrator/controller/validate/BUILD.bazel index a7da4c8b..17d19ac5 100644 --- a/orchestrator/controller/validate/BUILD.bazel +++ b/orchestrator/controller/validate/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//entity", "//entity/queue", "//extension/changeprovider", + "//extension/changestore", "//extension/mergechecker", "//extension/storage", "@com_github_uber_go_tally_v4//:tally", @@ -29,9 +30,11 @@ go_test( "//entity", "//entity/queue", "//extension/changeprovider", + "//extension/changestore/mock", "//extension/mergechecker", "//extension/mergechecker/mock", "//extension/queue/mock", + "//extension/storage", "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/orchestrator/controller/validate/validate.go b/orchestrator/controller/validate/validate.go index d541d16c..8998e6fb 100644 --- a/orchestrator/controller/validate/validate.go +++ b/orchestrator/controller/validate/validate.go @@ -16,6 +16,7 @@ package validate import ( "context" + "errors" "fmt" "github.com/uber-go/tally/v4" @@ -25,19 +26,21 @@ import ( "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/changeprovider" + "github.com/uber/submitqueue/extension/changestore" "github.com/uber/submitqueue/extension/mergechecker" "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) // Controller handles validate queue messages. -// It consumes requests, performs validation checks (merge conflicts, duplicate requests, etc.), -// and publishes to the batch stage. Validation logic is extensible to support additional checks. -// Implements consumer.Controller interface for integration with the consumer. +// It consumes requests, performs validation checks (duplicate detection via the change store, +// merge conflicts, change metadata fetch), and publishes to the batch stage. Validation logic +// is extensible to support additional checks. Implements consumer.Controller. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage + changeStore changestore.ChangeStore registry consumer.TopicRegistry mergeChecker mergechecker.MergeChecker changeProvider changeprovider.ChangeProvider @@ -53,6 +56,7 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, + changeStore changestore.ChangeStore, registry consumer.TopicRegistry, mergeChecker mergechecker.MergeChecker, changeProvider changeprovider.ChangeProvider, @@ -63,6 +67,7 @@ func NewController( logger: logger.Named("validate_controller"), metricsScope: scope.SubScope("validate_controller"), store: store, + changeStore: changeStore, registry: registry, mergeChecker: mergeChecker, changeProvider: changeProvider, @@ -72,8 +77,8 @@ func NewController( } // Process processes a validate delivery from the queue. -// Deserializes the request and publishes to the batch topic. -// Returns nil to ack (success), or error to nack (retry). +// Runs duplicate detection, merge-conflict check, change metadata fetch, then publishes to batch. +// Returns nil to ack (success or non-retryable rejection), error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { op := coremetrics.Begin(c.metricsScope, "process") defer func() { op.Complete(retErr) }() @@ -103,6 +108,22 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r "partition_key", msg.PartitionKey, ) + // Duplicate detection: look for any other in-flight request that has already + // claimed an overlapping URI in this queue. Per-queue partition leasing + // (see core/consumer + extension/queue) guarantees serial processing within + // a queue, so the read-then-claim sequence below is race-free. + if dupID, err := c.checkDuplicate(ctx, request); err != nil { + return err + } else if dupID != "" { + c.logger.Infow("duplicate request detected", + "request_id", request.ID, + "queue", request.Queue, + "duplicate_id", dupID, + ) + coremetrics.NamedCounter(c.metricsScope, "process", "duplicate_requests", 1) + return errs.NewUserError(fmt.Errorf("request %s is a duplicate of in-flight request %s", request.ID, dupID)) + } + // Merge conflict check mergeResult, err := c.mergeChecker.Check(ctx, request.Queue, request.Change) if err != nil { @@ -122,13 +143,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r // Fetch change metadata changeInfos, err := c.changeProvider.Get(ctx, request.Change) if err != nil { - c.logger.Errorw("failed to fetch change information", - "request_id", request.ID, - "change_uris", request.Change.URIs, - "error", err, - ) coremetrics.NamedCounter(c.metricsScope, "process", "change_provider_errors", 1) - return fmt.Errorf("failed to fetch change information: %w", err) + return fmt.Errorf("failed to fetch change information for request %s: %w", request.ID, err) } c.logger.Infow("fetched change information", @@ -151,6 +167,47 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return nil // Success - message will be acked } +// checkDuplicate looks for any other in-flight request whose URIs overlap with this +// request's. The change rows are written upstream by the start controller; validate +// is read-only here. For each URI it queries the change store, walks the returned +// candidates skipping self/duplicates/orphans/terminals, and short-circuits on the +// first live duplicate. Returns that request_id, or "" if none. +// +// Per-URI / per-record reads keep the contract backend-agnostic; the typical request +// has 1-5 URIs, so the loop is cheap. +func (c *Controller) checkDuplicate(ctx context.Context, request entity.Request) (string, error) { + seenOwners := make(map[string]struct{}) + for _, uri := range request.Change.URIs { + records, err := c.changeStore.GetByURI(ctx, request.Queue, uri) + if err != nil { + coremetrics.NamedCounter(c.metricsScope, "process", "change_store_query_errors", 1) + return "", fmt.Errorf("failed to query change store for request %s uri=%s: %w", request.ID, uri, err) + } + for _, rec := range records { + if rec.RequestID == request.ID { + continue // skip rows belonging to this request itself + } + if _, ok := seenOwners[rec.RequestID]; ok { + continue + } + seenOwners[rec.RequestID] = struct{}{} + + owner, err := c.store.GetRequestStore().Get(ctx, rec.RequestID) + if errors.Is(err, storage.ErrNotFound) { + continue + } + if err != nil { + coremetrics.NamedCounter(c.metricsScope, "process", "storage_errors", 1) + return "", fmt.Errorf("failed to look up overlapping request %s: %w", rec.RequestID, err) + } + if !entity.IsRequestStateTerminal(owner.State) { + return rec.RequestID, nil + } + } + } + return "", nil +} + // publish publishes a request ID to the specified topic key. func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, requestID string, partitionKey string) error { rid := entity.RequestID{ID: requestID} diff --git a/orchestrator/controller/validate/validate_test.go b/orchestrator/controller/validate/validate_test.go index 0d1b2ca7..ad592e2e 100644 --- a/orchestrator/controller/validate/validate_test.go +++ b/orchestrator/controller/validate/validate_test.go @@ -27,9 +27,11 @@ import ( "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/changeprovider" + changemock "github.com/uber/submitqueue/extension/changestore/mock" "github.com/uber/submitqueue/extension/mergechecker" mergecheckermock "github.com/uber/submitqueue/extension/mergechecker/mock" queuemock "github.com/uber/submitqueue/extension/queue/mock" + "github.com/uber/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" @@ -46,7 +48,6 @@ func requestIDPayload(t *testing.T, id string) []byte { type mockChangeProvider struct{} func (m *mockChangeProvider) Get(ctx context.Context, change entity.Change) ([]changeprovider.ChangeInfo, error) { - // Return simple test data return []changeprovider.ChangeInfo{ { URI: "github://org/repo/123/abc123", @@ -69,17 +70,34 @@ func newMergeableMock(ctrl *gomock.Controller) *mergecheckermock.MockMergeChecke } // newMockStorage creates a MockStorage with a MockRequestStore that returns the given request on Get. -func newMockStorage(ctrl *gomock.Controller, request entity.Request) *storagemock.MockStorage { +// The returned MockRequestStore is exposed so individual tests can layer additional Get expectations. +func newMockStorage(ctrl *gomock.Controller, request entity.Request) (*storagemock.MockStorage, *storagemock.MockRequestStore) { mockReqStore := storagemock.NewMockRequestStore(ctrl) mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil).AnyTimes() store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() - return store + return store, mockReqStore +} + +// newMockChangeStore creates a MockChangeStore with default no-overlap behavior. +// Tests that need to simulate overlap can override GetByURI with their own EXPECT. +// Validate is read-only against the change store — it never calls Create. +func newMockChangeStore(ctrl *gomock.Controller) *changemock.MockChangeStore { + cs := changemock.NewMockChangeStore(ctrl) + cs.EXPECT().GetByURI(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + return cs } // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, mc mergechecker.MergeChecker, publishErr error) *Controller { +func newTestController( + t *testing.T, + ctrl *gomock.Controller, + store *storagemock.MockStorage, + cs *changemock.MockChangeStore, + mc mergechecker.MergeChecker, + publishErr error, +) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -100,7 +118,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock cp := &mockChangeProvider{} - return NewController(logger, scope, store, registry, mc, cp, consumer.TopicKeyValidate, "orchestrator-validate") + return NewController(logger, scope, store, cs, registry, mc, cp, consumer.TopicKeyValidate, "orchestrator-validate") } func TestNewController(t *testing.T) { @@ -114,8 +132,8 @@ func TestNewController(t *testing.T) { State: entity.RequestStateStarted, Version: 1, } - store := newMockStorage(ctrl, request) - controller := newTestController(t, ctrl, store, mc, nil) + store, _ := newMockStorage(ctrl, request) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyValidate, controller.TopicKey()) @@ -135,17 +153,15 @@ func TestController_Process_Success(t *testing.T) { State: entity.RequestStateStarted, Version: 1, } - store := newMockStorage(ctrl, request) - - controller := newTestController(t, ctrl, store, mc, nil) + store, _ := newMockStorage(ctrl, request) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) msg := queue.NewMessage("test-queue/123", requestIDPayload(t, request.ID), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err := controller.Process(context.Background(), delivery) - require.NoError(t, err) + require.NoError(t, controller.Process(context.Background(), delivery)) } func TestController_Process_StorageFailure(t *testing.T) { @@ -157,7 +173,7 @@ func TestController_Process_StorageFailure(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() - controller := newTestController(t, ctrl, store, mc, nil) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) msg := queue.NewMessage("test-queue/123", requestIDPayload(t, "test-queue/123"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -181,25 +197,24 @@ func TestController_Process_PublishFailure(t *testing.T) { State: entity.RequestStateStarted, Version: 1, } - store := newMockStorage(ctrl, request) + store, _ := newMockStorage(ctrl, request) - controller := newTestController(t, ctrl, store, mc, fmt.Errorf("publish failed")) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, fmt.Errorf("publish failed")) msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err := controller.Process(context.Background(), delivery) - assert.Error(t, err) + assert.Error(t, controller.Process(context.Background(), delivery)) } func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) mc := newMergeableMock(ctrl) request := entity.Request{ID: "test-queue/123", Queue: "test-queue"} - store := newMockStorage(ctrl, request) - controller := newTestController(t, ctrl, store, mc, nil) + store, _ := newMockStorage(ctrl, request) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) var _ consumer.Controller = controller } @@ -218,9 +233,8 @@ func TestController_Process_NotMergeable(t *testing.T) { State: entity.RequestStateStarted, Version: 1, } - store := newMockStorage(ctrl, request) - - controller := newTestController(t, ctrl, store, mc, nil) + store, _ := newMockStorage(ctrl, request) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -229,7 +243,7 @@ func TestController_Process_NotMergeable(t *testing.T) { err := controller.Process(context.Background(), delivery) require.Error(t, err) - assert.False(t, errs.IsRetryable(err)) + assert.True(t, errs.IsUserError(err)) } func TestController_Process_MergeCheckError(t *testing.T) { @@ -246,9 +260,8 @@ func TestController_Process_MergeCheckError(t *testing.T) { State: entity.RequestStateStarted, Version: 1, } - store := newMockStorage(ctrl, request) - - controller := newTestController(t, ctrl, store, mc, nil) + store, _ := newMockStorage(ctrl, request) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -259,3 +272,204 @@ func TestController_Process_MergeCheckError(t *testing.T) { require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } + +func TestController_Process_DuplicateDetection(t *testing.T) { + const ( + queueName = "test-queue" + newRequestID = queueName + "/123" + dupRequestID = queueName + "/100" + uriA = "github://uber/service/pull/1/abc" + uriB = "github://uber/service/pull/2/def" + anotherReqID = queueName + "/050" + orphanReqID = queueName + "/999" + terminalReqID = queueName + "/200" + ) + + tests := []struct { + name string + requestURIs []string // URIs on the new request; defaults to [uriA] + byURI map[string][]entity.ChangeRecord // GetByURI mock returns + ownerLookup map[string]entity.Request + ownerNotFound map[string]bool + ownerErr map[string]error + wantUserErr bool + wantUnexpected bool + }{ + { + name: "no overlap proceeds to merge check", + byURI: map[string][]entity.ChangeRecord{uriA: nil}, + }, + { + name: "overlap with live in-flight request returns user error", + byURI: map[string][]entity.ChangeRecord{ + uriA: {{URI: uriA, RequestID: dupRequestID, Queue: queueName}}, + }, + ownerLookup: map[string]entity.Request{ + dupRequestID: {ID: dupRequestID, Queue: queueName, State: entity.RequestStateStarted, Version: 1}, + }, + wantUserErr: true, + }, + { + name: "overlap with terminal owner is skipped", + byURI: map[string][]entity.ChangeRecord{ + uriA: {{URI: uriA, RequestID: terminalReqID, Queue: queueName}}, + }, + ownerLookup: map[string]entity.Request{ + terminalReqID: {ID: terminalReqID, Queue: queueName, State: entity.RequestStateLanded, Version: 5}, + }, + }, + { + name: "overlap with orphan owner (ErrNotFound) is skipped", + byURI: map[string][]entity.ChangeRecord{ + uriA: {{URI: uriA, RequestID: orphanReqID, Queue: queueName}}, + }, + ownerNotFound: map[string]bool{orphanReqID: true}, + }, + { + name: "multi-URI same owner deduped to single Get call", + requestURIs: []string{uriA, uriB}, + byURI: map[string][]entity.ChangeRecord{ + uriA: {{URI: uriA, RequestID: dupRequestID, Queue: queueName}}, + uriB: {{URI: uriB, RequestID: dupRequestID, Queue: queueName}}, + }, + ownerLookup: map[string]entity.Request{ + dupRequestID: {ID: dupRequestID, Queue: queueName, State: entity.RequestStateValidated, Version: 2}, + }, + wantUserErr: true, + }, + { + name: "first URI's owner is terminal, second URI's owner is live", + requestURIs: []string{uriA, uriB}, + byURI: map[string][]entity.ChangeRecord{ + uriA: {{URI: uriA, RequestID: terminalReqID, Queue: queueName}}, + uriB: {{URI: uriB, RequestID: anotherReqID, Queue: queueName}}, + }, + ownerLookup: map[string]entity.Request{ + terminalReqID: {ID: terminalReqID, State: entity.RequestStateError, Version: 3}, + anotherReqID: {ID: anotherReqID, State: entity.RequestStateProcessing, Version: 4}, + }, + wantUserErr: true, + }, + { + // Store doesn't exclude self; controller filters by RequestID and must not look up its own row. + name: "self row in result is filtered (no Get call)", + byURI: map[string][]entity.ChangeRecord{ + uriA: {{URI: uriA, RequestID: newRequestID, Queue: queueName}}, + }, + }, + { + name: "self row mixed with live other returns the other", + byURI: map[string][]entity.ChangeRecord{ + uriA: { + {URI: uriA, RequestID: newRequestID, Queue: queueName}, + {URI: uriA, RequestID: dupRequestID, Queue: queueName}, + }, + }, + ownerLookup: map[string]entity.Request{ + dupRequestID: {ID: dupRequestID, Queue: queueName, State: entity.RequestStateStarted, Version: 1}, + }, + wantUserErr: true, + }, + { + name: "owner lookup unexpected error propagates", + byURI: map[string][]entity.ChangeRecord{ + uriA: {{URI: uriA, RequestID: dupRequestID, Queue: queueName}}, + }, + ownerErr: map[string]error{ + dupRequestID: fmt.Errorf("db down"), + }, + wantUnexpected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mc := newMergeableMock(ctrl) + + uris := tt.requestURIs + if uris == nil { + uris = []string{uriA} + } + + request := entity.Request{ + ID: newRequestID, + Queue: queueName, + Change: entity.Change{URIs: uris}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateStarted, + Version: 1, + } + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) + for id, req := range tt.ownerLookup { + mockReqStore.EXPECT().Get(gomock.Any(), id).Return(req, nil) + } + for id := range tt.ownerNotFound { + mockReqStore.EXPECT().Get(gomock.Any(), id).Return(entity.Request{}, storage.WrapNotFound(fmt.Errorf("missing"))) + } + for id, e := range tt.ownerErr { + mockReqStore.EXPECT().Get(gomock.Any(), id).Return(entity.Request{}, e) + } + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + + cs := changemock.NewMockChangeStore(ctrl) + // One GetByURI per URI on the request, in order. Controller short-circuits on first + // live duplicate, so .AnyTimes() lets unmatched URIs go un-queried. + for _, u := range uris { + cs.EXPECT().GetByURI(gomock.Any(), queueName, u).Return(tt.byURI[u], nil).MaxTimes(1) + } + + controller := newTestController(t, ctrl, store, cs, mc, nil) + + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err := controller.Process(context.Background(), delivery) + + switch { + case tt.wantUnexpected: + require.Error(t, err) + assert.False(t, errs.IsUserError(err), "owner lookup failure should not be a user error") + case tt.wantUserErr: + require.Error(t, err) + assert.True(t, errs.IsUserError(err), "duplicate detection should be a user error") + default: + require.NoError(t, err) + } + }) + } +} + +func TestController_Process_ChangeStoreQueryFailure(t *testing.T) { + ctrl := gomock.NewController(t) + mc := newMergeableMock(ctrl) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/1/abc"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateStarted, + Version: 1, + } + store, _ := newMockStorage(ctrl, request) + + cs := changemock.NewMockChangeStore(ctrl) + cs.EXPECT().GetByURI(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("change store down")) + + controller := newTestController(t, ctrl, store, cs, mc, nil) + + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err := controller.Process(context.Background(), delivery) + require.Error(t, err) + assert.False(t, errs.IsUserError(err), "infra error should not be classified as user error") +}