Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions example/server/orchestrator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ go_library(
"//entity",
"//extension/changeprovider",
"//extension/changeprovider/github",
"//extension/changestore",
"//extension/changestore/mysql",
"//extension/counter",
"//extension/counter/mysql",
"//extension/mergechecker",
Expand Down
10 changes: 8 additions & 2 deletions example/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/changeprovider"
githubprovider "github.com/uber/submitqueue/extension/changeprovider/github"
"github.com/uber/submitqueue/extension/changestore"
mysqlchangestore "github.com/uber/submitqueue/extension/changestore/mysql"
"github.com/uber/submitqueue/extension/counter"
mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql"
"github.com/uber/submitqueue/extension/mergechecker"
Expand Down Expand Up @@ -154,6 +156,8 @@ func run() error {
return fmt.Errorf("failed to create storage: %w", err)
}

changeStore := mysqlchangestore.NewChangeStore(appDB, scope.SubScope("changestore"))

// Open queue database connection
// Docker Compose healthchecks ensure MySQL is ready before service starts
queueDSN := os.Getenv("QUEUE_MYSQL_DSN")
Expand Down Expand Up @@ -212,7 +216,7 @@ func run() error {
}

// Register controllers
if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, cnt, store); err != nil {
if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, cnt, store, changeStore); err != nil {
return err
}

Expand Down Expand Up @@ -397,11 +401,12 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
// │ │ │
// └────────┴────────────────────────┘

func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, cnt counter.Counter, store storage.Storage) error {
func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error {
requestController := start.NewController(
logger,
scope,
store,
changeStore,
registry,
consumer.TopicKeyStart,
"orchestrator-start",
Expand All @@ -414,6 +419,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
logger,
scope,
store,
changeStore,
registry,
mc,
cp,
Expand Down
2 changes: 2 additions & 0 deletions orchestrator/controller/start/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//core/request",
"//entity",
"//entity/queue",
"//extension/changestore",
"//extension/storage",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_zap//:zap",
Expand All @@ -25,6 +26,7 @@ go_test(
"//core/errs",
"//entity",
"//entity/queue",
"//extension/changestore/mock",
"//extension/queue/mock",
"//extension/storage",
"//extension/storage/mock",
Expand Down
55 changes: 44 additions & 11 deletions orchestrator/controller/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,29 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
corerequest "github.com/uber/submitqueue/core/request"
"github.com/uber/submitqueue/entity"
entityqueue "github.com/uber/submitqueue/entity/queue"
"github.com/uber/submitqueue/extension/changestore"
"github.com/uber/submitqueue/extension/storage"
"go.uber.org/zap"
)

// Controller handles start queue messages.
// It consumes requests, persists them to storage, and publishes to the validate stage.
// Implements consumer.Controller interface for integration with the consumer.
// It consumes requests, persists them to the request store, claims their URIs in
// the change store, and publishes to the validate stage. Both writes are idempotent
// on retries; the duplicate-detection check itself is performed downstream by the
// validate controller, which reads the change store and consults the request store
// for liveness. Implements consumer.Controller.
type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
store storage.Storage
changeStore changestore.ChangeStore
registry consumer.TopicRegistry
topicKey consumer.TopicKey
consumerGroup string
Expand All @@ -48,6 +54,7 @@ func NewController(
logger *zap.SugaredLogger,
scope tally.Scope,
store storage.Storage,
changeStore changestore.ChangeStore,
registry consumer.TopicRegistry,
topicKey consumer.TopicKey,
consumerGroup string,
Expand All @@ -56,29 +63,28 @@ func NewController(
logger: logger.Named("start_controller"),
metricsScope: scope.SubScope("start_controller"),
store: store,
changeStore: changeStore,
registry: registry,
topicKey: topicKey,
consumerGroup: consumerGroup,
}
}

// Process processes a request delivery from the queue.
// Deserializes the request and publishes to the validate topic.
// Persists the request, claims its URIs in the change store, and publishes to validate.
// Returns nil to ack (success), or error to nack (retry).
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error {
c.metricsScope.Counter("received").Inc(1)

msg := delivery.Message()

// Deserialize land request from gateway
landRequest, err := entity.LandRequestFromBytes(msg.Payload)
if err != nil {
c.metricsScope.Counter("deserialize_errors").Inc(1)
// Non-retryable: malformed messages will never succeed regardless of retry count
return fmt.Errorf("failed to deserialize land request: %w", err)
}

// Construct the full versioned Request entity with orchestrator-owned fields
request := entity.Request{
ID: landRequest.ID,
Queue: landRequest.Queue,
Expand All @@ -100,22 +106,29 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
"partition_key", msg.PartitionKey,
)

// Persist request to storage (idempotent — ErrAlreadyExists means a retry)
// Persist request to storage. ErrAlreadyExists means a queue redelivery of the same
// request_id (an at-least-once retry of THIS message), not a cross-request collision.
if err := c.store.GetRequestStore().Create(ctx, request); err != nil && !errors.Is(err, storage.ErrAlreadyExists) {
c.metricsScope.Counter("storage_errors").Inc(1)
return fmt.Errorf("failed to create request: %w", err)
}

// Record the "new" status in the request log
// Claim each URI in the change store. Different requests on the same URI write
// distinct rows (different request_id), so cross-request URI overlap does not
// collide on insert; the validate controller surfaces it via GetByURI + a
// liveness check against the request store.
if err := c.claimURIs(ctx, request); err != nil {
c.metricsScope.Counter("change_store_errors").Inc(1)
return fmt.Errorf("failed to claim URIs for request %s: %w", request.ID, err)
}

// Record the "new" status in the request log.
logEntry := entity.NewRequestLog(request.ID, entity.RequestStatusStarted, request.Version, "", nil)
// Using request.ID as the partition key to ensure ordering of log entries for the same request
// and parallel processing of log entries for different requests.
if err := corerequest.PublishLog(ctx, c.registry, logEntry, request.ID); err != nil {
c.metricsScope.Counter("request_log_errors").Inc(1)
return fmt.Errorf("failed to publish request log: %w", err)
}

// Publish to validate topic
if err := c.publish(ctx, consumer.TopicKeyValidate, request.ID, request.Queue); err != nil {
c.metricsScope.Counter("publish_errors").Inc(1)
return fmt.Errorf("failed to publish to validate: %w", err)
Expand All @@ -127,8 +140,28 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
)

c.metricsScope.Counter("processed").Inc(1)
return nil
}

return nil // Success - message will be acked
// claimURIs persists one ChangeRecord per URI in the request. Each Create call is
// independent; the change store's per-PK idempotency makes the loop safe under
// queue redelivery (same (Queue, URI, RequestID) is a no-op on retry).
func (c *Controller) claimURIs(ctx context.Context, request entity.Request) error {
now := time.Now().UnixMilli()
for _, uri := range request.Change.URIs {
record := entity.ChangeRecord{
URI: uri,
RequestID: request.ID,
Queue: request.Queue,
CreatedAt: now,
UpdatedAt: now,
Version: 1,
}
if err := c.changeStore.Create(ctx, record); err != nil {
return fmt.Errorf("failed to claim uri=%s for request %s: %w", uri, request.ID, err)
}
}
return nil
}

// publish publishes a request ID to the specified topic key.
Expand Down
Loading
Loading