Skip to content

Commit

Permalink
Fix: failWorkflowTask returns wrong nextEventBatchID when failing spe…
Browse files Browse the repository at this point in the history
…culative workflow tasks (#4354)
  • Loading branch information
alexshtin authored May 18, 2023
1 parent 3bfbb81 commit 9a6edae
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
38 changes: 22 additions & 16 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,19 @@ type (
suite.Suite
*require.Assertions

controller *gomock.Controller
mockShard *shard.ContextTest
mockTxProcessor *queues.MockQueue
mockTimerProcessor *queues.MockQueue
mockVisibilityProcessor *queues.MockQueue
mockArchivalProcessor *queues.MockQueue
mockNamespaceCache *namespace.MockRegistry
mockMatchingClient *matchingservicemock.MockMatchingServiceClient
mockHistoryClient *historyservicemock.MockHistoryServiceClient
mockClusterMetadata *cluster.MockMetadata
mockEventsReapplier *ndc.MockEventsReapplier
mockWorkflowResetter *ndc.MockWorkflowResetter
controller *gomock.Controller
mockShard *shard.ContextTest
mockTxProcessor *queues.MockQueue
mockTimerProcessor *queues.MockQueue
mockVisibilityProcessor *queues.MockQueue
mockArchivalProcessor *queues.MockQueue
mockMemoryScheduledQueue *queues.MockQueue
mockNamespaceCache *namespace.MockRegistry
mockMatchingClient *matchingservicemock.MockMatchingServiceClient
mockHistoryClient *historyservicemock.MockHistoryServiceClient
mockClusterMetadata *cluster.MockMetadata
mockEventsReapplier *ndc.MockEventsReapplier
mockWorkflowResetter *ndc.MockWorkflowResetter

workflowCache wcache.Cache
mockHistoryEngine *historyEngineImpl
Expand Down Expand Up @@ -134,14 +135,18 @@ func (s *engineSuite) SetupTest() {
s.mockTimerProcessor = queues.NewMockQueue(s.controller)
s.mockVisibilityProcessor = queues.NewMockQueue(s.controller)
s.mockArchivalProcessor = queues.NewMockQueue(s.controller)
s.mockMemoryScheduledQueue = queues.NewMockQueue(s.controller)
s.mockTxProcessor.EXPECT().Category().Return(tasks.CategoryTransfer).AnyTimes()
s.mockTimerProcessor.EXPECT().Category().Return(tasks.CategoryTimer).AnyTimes()
s.mockVisibilityProcessor.EXPECT().Category().Return(tasks.CategoryVisibility).AnyTimes()
s.mockArchivalProcessor.EXPECT().Category().Return(tasks.CategoryArchival).AnyTimes()
s.mockMemoryScheduledQueue.EXPECT().Category().Return(tasks.CategoryMemoryTimer).AnyTimes()
s.mockTxProcessor.EXPECT().NotifyNewTasks(gomock.Any()).AnyTimes()
s.mockTimerProcessor.EXPECT().NotifyNewTasks(gomock.Any()).AnyTimes()
s.mockVisibilityProcessor.EXPECT().NotifyNewTasks(gomock.Any()).AnyTimes()
s.mockArchivalProcessor.EXPECT().NotifyNewTasks(gomock.Any()).AnyTimes()
s.mockMemoryScheduledQueue.EXPECT().NotifyNewTasks(gomock.Any()).AnyTimes()

s.config = tests.NewDynamicConfig()
s.mockShard = shard.NewTestContext(
s.controller,
Expand Down Expand Up @@ -200,10 +205,11 @@ func (s *engineSuite) SetupTest() {
eventNotifier: eventNotifier,
config: s.config,
queueProcessors: map[tasks.Category]queues.Queue{
s.mockTxProcessor.Category(): s.mockTxProcessor,
s.mockTimerProcessor.Category(): s.mockTimerProcessor,
s.mockVisibilityProcessor.Category(): s.mockVisibilityProcessor,
s.mockArchivalProcessor.Category(): s.mockArchivalProcessor,
s.mockTxProcessor.Category(): s.mockTxProcessor,
s.mockTimerProcessor.Category(): s.mockTimerProcessor,
s.mockVisibilityProcessor.Category(): s.mockVisibilityProcessor,
s.mockArchivalProcessor.Category(): s.mockArchivalProcessor,
s.mockMemoryScheduledQueue.Category(): s.mockMemoryScheduledQueue,
},
eventsReapplier: s.mockEventsReapplier,
workflowResetter: s.mockWorkflowResetter,
Expand Down
4 changes: 2 additions & 2 deletions service/history/workflowTaskHandlerCallbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,6 @@ func failWorkflowTask(
if err != nil {
return nil, common.EmptyEventID, err
}
nextEventBatchId := mutableState.GetNextEventID()
if _, err = mutableState.AddWorkflowTaskFailedEvent(
workflowTask,
wtFailedCause.failedCause,
Expand All @@ -990,9 +989,10 @@ func failWorkflowTask(
"",
"",
0); err != nil {
return nil, nextEventBatchId, err
return nil, common.EmptyEventID, err
}

nextEventBatchId := mutableState.GetNextEventID() - 1
// Return new mutable state back to the caller for further updates
return mutableState, nextEventBatchId, nil
}
Expand Down

0 comments on commit 9a6edae

Please sign in to comment.