From 082713b14af5b980d1ffaeba018c6ef330b34503 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Fri, 30 Sep 2022 17:16:58 -0700 Subject: [PATCH] Fix reset workflow in replication reapply (#3449) * Fix reset workflow in replication reapply --- service/history/nDCTransactionMgr.go | 26 ++------ service/history/nDCTransactionMgr_test.go | 78 +++++++++++++++++++++++ 2 files changed, 84 insertions(+), 20 deletions(-) diff --git a/service/history/nDCTransactionMgr.go b/service/history/nDCTransactionMgr.go index b84bcd4e0d3..94107e91788 100644 --- a/service/history/nDCTransactionMgr.go +++ b/service/history/nDCTransactionMgr.go @@ -30,10 +30,6 @@ import ( "context" "time" - "go.temporal.io/server/common" - - "go.temporal.io/server/common/persistence/serialization" - "github.com/pborman/uuid" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" @@ -45,6 +41,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" @@ -323,18 +320,6 @@ func (r *nDCTransactionMgrImpl) backfillWorkflowEventsReapply( baseRunID := baseMutableState.GetExecutionState().GetRunId() resetRunID := uuid.New() baseRebuildLastEventID := baseMutableState.GetPreviousStartedEventID() - - // TODO when https://github.com/uber/cadence/issues/2420 is finished, remove this block, - // since cannot reapply event to a finished workflow which had no workflow task started - if baseRebuildLastEventID == common.EmptyEventID { - r.logger.Warn("cannot reapply event to a finished workflow", - tag.WorkflowNamespaceID(namespaceID.String()), - tag.WorkflowID(workflowID), - ) - r.metricsClient.IncCounter(metrics.HistoryReapplyEventsScope, metrics.EventReapplySkippedCount) - return persistence.UpdateWorkflowModeBypassCurrent, workflow.TransactionPolicyPassive, nil - } - baseVersionHistories := baseMutableState.GetExecutionInfo().GetVersionHistories() baseCurrentVersionHistory, err := versionhistory.GetCurrentVersionHistory(baseVersionHistories) if err != nil { @@ -367,14 +352,15 @@ func (r *nDCTransactionMgrImpl) backfillWorkflowEventsReapply( case *serviceerror.InvalidArgument: // no-op. Usually this is due to reset workflow with pending child workflows r.logger.Warn("Cannot reset workflow. Ignoring reapply events.", tag.Error(err)) + // the target workflow is not reset so it is still the current workflow. It need to persist updated version histories. + return persistence.UpdateWorkflowModeUpdateCurrent, workflow.TransactionPolicyPassive, nil case nil: - // no-op + // after the reset of target workflow (current workflow) with additional events to be reapplied + // target workflow is no longer the current workflow + return persistence.UpdateWorkflowModeBypassCurrent, workflow.TransactionPolicyPassive, nil default: return 0, workflow.TransactionPolicyActive, err } - // after the reset of target workflow (current workflow) with additional events to be reapplied - // target workflow is no longer the current workflow - return persistence.UpdateWorkflowModeBypassCurrent, workflow.TransactionPolicyPassive, nil } // case 2 diff --git a/service/history/nDCTransactionMgr_test.go b/service/history/nDCTransactionMgr_test.go index 63b3f9c2023..98a3e13f56a 100644 --- a/service/history/nDCTransactionMgr_test.go +++ b/service/history/nDCTransactionMgr_test.go @@ -255,6 +255,84 @@ func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Active_Clo s.True(releaseCalled) } +func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Closed_ResetFailed() { + ctx := context.Background() + now := time.Now().UTC() + + namespaceID := namespace.ID("some random namespace ID") + workflowID := "some random workflow ID" + runID := "some random run ID" + lastWorkflowTaskStartedEventID := int64(9999) + nextEventID := lastWorkflowTaskStartedEventID * 2 + lastWorkflowTaskStartedVersion := s.namespaceEntry.FailoverVersion() + versionHistory := versionhistory.NewVersionHistory([]byte("branch token"), []*historyspb.VersionHistoryItem{ + {EventId: lastWorkflowTaskStartedEventID, Version: lastWorkflowTaskStartedVersion}, + }) + histories := versionhistory.NewVersionHistories(versionHistory) + + releaseCalled := false + + targetWorkflow := NewMocknDCWorkflow(s.controller) + weContext := workflow.NewMockContext(s.controller) + mutableState := workflow.NewMockMutableState(s.controller) + var releaseFn workflow.ReleaseCacheFunc = func(error) { releaseCalled = true } + + workflowEvents := &persistence.WorkflowEvents{} + + targetWorkflow.EXPECT().getContext().Return(weContext).AnyTimes() + targetWorkflow.EXPECT().getMutableState().Return(mutableState).AnyTimes() + targetWorkflow.EXPECT().getReleaseFn().Return(releaseFn).AnyTimes() + + s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName).AnyTimes() + s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + + mutableState.EXPECT().IsCurrentWorkflowGuaranteed().Return(false).AnyTimes() + mutableState.EXPECT().IsWorkflowExecutionRunning().Return(false).AnyTimes() + mutableState.EXPECT().GetNamespaceEntry().Return(s.namespaceEntry).AnyTimes() + mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + NamespaceId: namespaceID.String(), + WorkflowId: workflowID, + VersionHistories: histories, + }).AnyTimes() + mutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ + RunId: runID, + }).AnyTimes() + mutableState.EXPECT().GetNextEventID().Return(nextEventID).AnyTimes() + mutableState.EXPECT().GetPreviousStartedEventID().Return(lastWorkflowTaskStartedEventID) + + s.mockWorkflowResetter.EXPECT().resetWorkflow( + ctx, + namespaceID, + workflowID, + runID, + versionHistory.GetBranchToken(), + lastWorkflowTaskStartedEventID, + lastWorkflowTaskStartedVersion, + nextEventID, + gomock.Any(), + gomock.Any(), + targetWorkflow, + eventsReapplicationResetWorkflowReason, + workflowEvents.Events, + enumspb.RESET_REAPPLY_TYPE_SIGNAL, + ).Return(serviceerror.NewInvalidArgument("reset fail")) + + s.mockExecutionMgr.EXPECT().GetCurrentExecution(gomock.Any(), &persistence.GetCurrentExecutionRequest{ + ShardID: s.mockShard.GetShardID(), + NamespaceID: namespaceID.String(), + WorkflowID: workflowID, + }).Return(&persistence.GetCurrentExecutionResponse{RunID: runID}, nil) + + weContext.EXPECT().PersistWorkflowEvents(gomock.Any(), workflowEvents).Return(int64(0), nil) + weContext.EXPECT().UpdateWorkflowExecutionWithNew( + gomock.Any(), now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, workflow.TransactionPolicyPassive, (*workflow.TransactionPolicy)(nil), + ).Return(nil) + + err := s.transactionMgr.backfillWorkflow(ctx, now, targetWorkflow, workflowEvents) + s.NoError(err) + s.True(releaseCalled) +} + func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Passive_Open() { ctx := context.Background() now := time.Now().UTC()