Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions submitqueue/orchestrator/controller/conclude/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//core/metrics",
"//submitqueue/core/consumer",
"//submitqueue/core/request",
"//submitqueue/entity",
"//submitqueue/extension/storage",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
75 changes: 64 additions & 11 deletions submitqueue/orchestrator/controller/conclude/conclude.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/submitqueue/core/consumer"
corerequest "github.com/uber/submitqueue/submitqueue/core/request"
"github.com/uber/submitqueue/submitqueue/entity"
"github.com/uber/submitqueue/submitqueue/extension/storage"
"go.uber.org/zap"
Expand Down Expand Up @@ -101,27 +102,65 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
metrics.NamedCounter(c.metricsScope, "process", "unexpected_state_errors", 1)
return fmt.Errorf("unexpected batch state %q for batch %s: %w", batch.State, batch.ID, err)
}
requestStatus, err := requestStateToStatus(requestState)
if err != nil {
// Unreachable: batchStateToRequestState only returns terminal request states.
return fmt.Errorf("failed to map request state %s to status: %w", requestState, err)
}

// Update each request's state to reflect the batch outcome.
// Reconcile each request to the batch's terminal state and emit a terminal
// log entry. The flow is idempotent under at-least-once delivery: a prior
// attempt may have completed the CAS but failed before publishing the log,
// so the log publish must still run when the request is already in the
// target terminal state.
for _, requestID := range batch.Contains {
request, err := c.store.GetRequestStore().Get(ctx, requestID)
if err != nil {
metrics.NamedCounter(c.metricsScope, "process", "request_store_errors", 1)
return fmt.Errorf("failed to get request %s: %w", requestID, err)
}

newVersion := request.Version + 1
if err := c.store.GetRequestStore().UpdateState(ctx, requestID, request.Version, newVersion, requestState); err != nil {
metrics.NamedCounter(c.metricsScope, "process", "request_update_errors", 1)
return fmt.Errorf("failed to update request %s state to %s: %w", requestID, requestState, err)
switch {
case request.State == requestState:
// Idempotent retry: a prior delivery already wrote the terminal
// state. Skip the CAS and fall through to the log publish.
metrics.NamedCounter(c.metricsScope, "process", "already_reconciled", 1)
case entity.IsRequestStateTerminal(request.State):
// Divergent terminal state — a concurrent path (e.g. a racing
// cancel-not-yet-batched transition) reached terminal first. Skip
// the reconcile and the log publish; the other writer owns the
// terminal log entry for the state it actually wrote.
c.logger.Warnw("request already in different terminal state, skipping reconcile",
"batch_id", batch.ID,
"request_id", requestID,
"actual_state", string(request.State),
"expected_state", string(requestState),
)
metrics.NamedCounter(c.metricsScope, "process", "terminal_state_divergence", 1)
continue
default:
newVersion := request.Version + 1
if err := c.store.GetRequestStore().UpdateState(ctx, requestID, request.Version, newVersion, requestState); err != nil {
metrics.NamedCounter(c.metricsScope, "process", "request_update_errors", 1)
return fmt.Errorf("failed to update request %s state to %s: %w", requestID, requestState, err)
}
request.Version = newVersion
request.State = requestState

c.logger.Infow("updated request state",
"batch_id", batch.ID,
"request_id", requestID,
"new_state", string(requestState),
)
}
request.Version = newVersion

c.logger.Infow("updated request state",
"batch_id", batch.ID,
"request_id", requestID,
"new_state", string(requestState),
)
logEntry := entity.NewRequestLog(requestID, requestStatus, request.Version, "", map[string]string{
"batch_id": batch.ID,
})
if err := corerequest.PublishLog(ctx, c.registry, logEntry, requestID); err != nil {
metrics.NamedCounter(c.metricsScope, "process", "log_publish_errors", 1)
return fmt.Errorf("failed to publish request log for %s: %w", requestID, err)
}
}

return nil // Success - message will be acked
Expand Down Expand Up @@ -155,3 +194,17 @@ func batchStateToRequestState(state entity.BatchState) (entity.RequestState, err
return entity.RequestStateUnknown, fmt.Errorf("non-terminal batch state: %s", state)
}
}

// requestStateToStatus maps a terminal request state to the corresponding log status.
func requestStateToStatus(state entity.RequestState) (entity.RequestStatus, error) {
switch state {
case entity.RequestStateLanded:
return entity.RequestStatusLanded, nil
case entity.RequestStateError:
return entity.RequestStatusError, nil
case entity.RequestStateCancelled:
return entity.RequestStatusCancelled, nil
default:
return entity.RequestStatusUnknown, fmt.Errorf("non-terminal request state: %s", state)
}
}
113 changes: 99 additions & 14 deletions submitqueue/orchestrator/controller/conclude/conclude_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ func batchIDPayload(t *testing.T, id string) []byte {
}

// newTestController creates a controller with test dependencies.
func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *storagemock.MockStorage) *Controller {
// expectLogPublish controls whether the log topic publisher is wired with an
// expectation; tests that don't reach the log publish step pass false so an
// unexpected publish would fail the test.
func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *storagemock.MockStorage, expectLogPublish bool) (*Controller, *queuemock.MockPublisher) {
logger := zaptest.NewLogger(t).Sugar()
scope := tally.NoopScope

Expand All @@ -53,15 +56,26 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *stora
mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes()
}

registry, err := consumer.NewTopicRegistry(nil)
mockPub := queuemock.NewMockPublisher(ctrl)
if expectLogPublish {
mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
}
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes()

registry, err := consumer.NewTopicRegistry(
[]consumer.TopicConfig{
{Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ},
},
)
require.NoError(t, err)

return NewController(logger, scope, mockStorage, registry, consumer.TopicKeyConclude, "orchestrator-conclude")
return NewController(logger, scope, mockStorage, registry, consumer.TopicKeyConclude, "orchestrator-conclude"), mockPub
}

func TestNewController(t *testing.T) {
ctrl := gomock.NewController(t)
controller := newTestController(t, ctrl, nil)
controller, _ := newTestController(t, ctrl, nil, false)

require.NotNil(t, controller)
assert.Equal(t, consumer.TopicKeyConclude, controller.TopicKey())
Expand All @@ -71,11 +85,12 @@ func TestNewController(t *testing.T) {

func TestController_Process(t *testing.T) {
tests := []struct {
name string
batch entity.Batch
setupStore func(*gomock.Controller) *storagemock.MockStorage
wantErr bool
retryable bool
name string
batch entity.Batch
setupStore func(*gomock.Controller) *storagemock.MockStorage
expectLogPublish bool
wantErr bool
retryable bool
}{
{
name: "succeeded batch lands requests",
Expand Down Expand Up @@ -111,6 +126,7 @@ func TestController_Process(t *testing.T) {
mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes()
return mockStorage
},
expectLogPublish: true,
},
{
name: "failed batch errors requests",
Expand Down Expand Up @@ -142,6 +158,7 @@ func TestController_Process(t *testing.T) {
mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes()
return mockStorage
},
expectLogPublish: true,
},
{
name: "cancelled batch cancels requests",
Expand Down Expand Up @@ -173,6 +190,74 @@ func TestController_Process(t *testing.T) {
mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes()
return mockStorage
},
expectLogPublish: true,
},
{
name: "idempotent retry: request already in target terminal state still publishes log",
batch: entity.Batch{
ID: "test-queue/batch/8",
Queue: "test-queue",
Contains: []string{"test-queue/20"},
State: entity.BatchStateSucceeded,
Version: 2,
},
setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage {
mockBatchStore := storagemock.NewMockBatchStore(ctrl)
mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/8").Return(entity.Batch{
ID: "test-queue/batch/8",
Queue: "test-queue",
Contains: []string{"test-queue/20"},
State: entity.BatchStateSucceeded,
Version: 2,
}, nil)

// Request is already Landed (prior delivery wrote it). UpdateState
// must NOT be called — gomock will fail the test if it is.
mockRequestStore := storagemock.NewMockRequestStore(ctrl)
mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/20").Return(entity.Request{
ID: "test-queue/20", Version: 7, State: entity.RequestStateLanded,
}, nil)

mockStorage := storagemock.NewMockStorage(ctrl)
mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes()
mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes()
return mockStorage
},
expectLogPublish: true,
},
{
name: "divergent terminal state skips reconcile and log publish",
batch: entity.Batch{
ID: "test-queue/batch/9",
Queue: "test-queue",
Contains: []string{"test-queue/30"},
State: entity.BatchStateSucceeded,
Version: 2,
},
setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage {
mockBatchStore := storagemock.NewMockBatchStore(ctrl)
mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/9").Return(entity.Batch{
ID: "test-queue/batch/9",
Queue: "test-queue",
Contains: []string{"test-queue/30"},
State: entity.BatchStateSucceeded,
Version: 2,
}, nil)

// Request is already in a *different* terminal state (Cancelled).
// Conclude must not write the log entry (the other writer owns it),
// and must not attempt UpdateState.
mockRequestStore := storagemock.NewMockRequestStore(ctrl)
mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/30").Return(entity.Request{
ID: "test-queue/30", Version: 5, State: entity.RequestStateCancelled,
}, nil)

mockStorage := storagemock.NewMockStorage(ctrl)
mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes()
mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes()
return mockStorage
},
expectLogPublish: false,
},
{
name: "non-terminal batch state returns error",
Expand Down Expand Up @@ -201,7 +286,7 @@ func TestController_Process(t *testing.T) {
retryable: false,
},
{
name: "request store get failure is retryable",
name: "request store get failure returns error",
batch: entity.Batch{
ID: "test-queue/batch/5",
Queue: "test-queue",
Expand Down Expand Up @@ -231,7 +316,7 @@ func TestController_Process(t *testing.T) {
retryable: false,
},
{
name: "request store update failure is retryable",
name: "request store update failure returns error",
batch: entity.Batch{
ID: "test-queue/batch/6",
Queue: "test-queue",
Expand Down Expand Up @@ -296,7 +381,7 @@ func TestController_Process(t *testing.T) {
mockStorage = tt.setupStore(ctrl)
}

controller := newTestController(t, ctrl, mockStorage)
controller, _ := newTestController(t, ctrl, mockStorage, tt.expectLogPublish)

msg := entityqueue.NewMessage(tt.batch.ID, batchIDPayload(t, tt.batch.ID), tt.batch.Queue, nil)
delivery := queuemock.NewMockDelivery(ctrl)
Expand Down Expand Up @@ -324,7 +409,7 @@ func TestController_Process_StorageFailure(t *testing.T) {
mockStorage := storagemock.NewMockStorage(ctrl)
mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes()

controller := newTestController(t, ctrl, mockStorage)
controller, _ := newTestController(t, ctrl, mockStorage, false)

msg := entityqueue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil)
delivery := queuemock.NewMockDelivery(ctrl)
Expand All @@ -338,7 +423,7 @@ func TestController_Process_StorageFailure(t *testing.T) {

func TestController_InterfaceImplementation(t *testing.T) {
ctrl := gomock.NewController(t)
controller := newTestController(t, ctrl, nil)
controller, _ := newTestController(t, ctrl, nil, false)

var _ consumer.Controller = controller
}
Loading