Skip to content

Commit

Permalink
Fix reset workflow in replication reapply (#3449)
Browse files Browse the repository at this point in the history
* Fix reset workflow in replication reapply
  • Loading branch information
yux0 authored and dnr committed Oct 10, 2022
1 parent f84be21 commit 082713b
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 20 deletions.
26 changes: 6 additions & 20 deletions service/history/nDCTransactionMgr.go
Expand Up @@ -30,10 +30,6 @@ import (
"context"
"time"

"go.temporal.io/server/common"

"go.temporal.io/server/common/persistence/serialization"

"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
Expand All @@ -45,6 +41,7 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
Expand Down Expand Up @@ -323,18 +320,6 @@ func (r *nDCTransactionMgrImpl) backfillWorkflowEventsReapply(
baseRunID := baseMutableState.GetExecutionState().GetRunId()
resetRunID := uuid.New()
baseRebuildLastEventID := baseMutableState.GetPreviousStartedEventID()

// TODO when https://github.com/uber/cadence/issues/2420 is finished, remove this block,
// since cannot reapply event to a finished workflow which had no workflow task started
if baseRebuildLastEventID == common.EmptyEventID {
r.logger.Warn("cannot reapply event to a finished workflow",
tag.WorkflowNamespaceID(namespaceID.String()),
tag.WorkflowID(workflowID),
)
r.metricsClient.IncCounter(metrics.HistoryReapplyEventsScope, metrics.EventReapplySkippedCount)
return persistence.UpdateWorkflowModeBypassCurrent, workflow.TransactionPolicyPassive, nil
}

baseVersionHistories := baseMutableState.GetExecutionInfo().GetVersionHistories()
baseCurrentVersionHistory, err := versionhistory.GetCurrentVersionHistory(baseVersionHistories)
if err != nil {
Expand Down Expand Up @@ -367,14 +352,15 @@ func (r *nDCTransactionMgrImpl) backfillWorkflowEventsReapply(
case *serviceerror.InvalidArgument:
// no-op. Usually this is due to reset workflow with pending child workflows
r.logger.Warn("Cannot reset workflow. Ignoring reapply events.", tag.Error(err))
// the target workflow is not reset so it is still the current workflow. It need to persist updated version histories.
return persistence.UpdateWorkflowModeUpdateCurrent, workflow.TransactionPolicyPassive, nil
case nil:
// no-op
// after the reset of target workflow (current workflow) with additional events to be reapplied
// target workflow is no longer the current workflow
return persistence.UpdateWorkflowModeBypassCurrent, workflow.TransactionPolicyPassive, nil
default:
return 0, workflow.TransactionPolicyActive, err
}
// after the reset of target workflow (current workflow) with additional events to be reapplied
// target workflow is no longer the current workflow
return persistence.UpdateWorkflowModeBypassCurrent, workflow.TransactionPolicyPassive, nil
}

// case 2
Expand Down
78 changes: 78 additions & 0 deletions service/history/nDCTransactionMgr_test.go
Expand Up @@ -255,6 +255,84 @@ func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Active_Clo
s.True(releaseCalled)
}

func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Closed_ResetFailed() {
ctx := context.Background()
now := time.Now().UTC()

namespaceID := namespace.ID("some random namespace ID")
workflowID := "some random workflow ID"
runID := "some random run ID"
lastWorkflowTaskStartedEventID := int64(9999)
nextEventID := lastWorkflowTaskStartedEventID * 2
lastWorkflowTaskStartedVersion := s.namespaceEntry.FailoverVersion()
versionHistory := versionhistory.NewVersionHistory([]byte("branch token"), []*historyspb.VersionHistoryItem{
{EventId: lastWorkflowTaskStartedEventID, Version: lastWorkflowTaskStartedVersion},
})
histories := versionhistory.NewVersionHistories(versionHistory)

releaseCalled := false

targetWorkflow := NewMocknDCWorkflow(s.controller)
weContext := workflow.NewMockContext(s.controller)
mutableState := workflow.NewMockMutableState(s.controller)
var releaseFn workflow.ReleaseCacheFunc = func(error) { releaseCalled = true }

workflowEvents := &persistence.WorkflowEvents{}

targetWorkflow.EXPECT().getContext().Return(weContext).AnyTimes()
targetWorkflow.EXPECT().getMutableState().Return(mutableState).AnyTimes()
targetWorkflow.EXPECT().getReleaseFn().Return(releaseFn).AnyTimes()

s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()

mutableState.EXPECT().IsCurrentWorkflowGuaranteed().Return(false).AnyTimes()
mutableState.EXPECT().IsWorkflowExecutionRunning().Return(false).AnyTimes()
mutableState.EXPECT().GetNamespaceEntry().Return(s.namespaceEntry).AnyTimes()
mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
NamespaceId: namespaceID.String(),
WorkflowId: workflowID,
VersionHistories: histories,
}).AnyTimes()
mutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
RunId: runID,
}).AnyTimes()
mutableState.EXPECT().GetNextEventID().Return(nextEventID).AnyTimes()
mutableState.EXPECT().GetPreviousStartedEventID().Return(lastWorkflowTaskStartedEventID)

s.mockWorkflowResetter.EXPECT().resetWorkflow(
ctx,
namespaceID,
workflowID,
runID,
versionHistory.GetBranchToken(),
lastWorkflowTaskStartedEventID,
lastWorkflowTaskStartedVersion,
nextEventID,
gomock.Any(),
gomock.Any(),
targetWorkflow,
eventsReapplicationResetWorkflowReason,
workflowEvents.Events,
enumspb.RESET_REAPPLY_TYPE_SIGNAL,
).Return(serviceerror.NewInvalidArgument("reset fail"))

s.mockExecutionMgr.EXPECT().GetCurrentExecution(gomock.Any(), &persistence.GetCurrentExecutionRequest{
ShardID: s.mockShard.GetShardID(),
NamespaceID: namespaceID.String(),
WorkflowID: workflowID,
}).Return(&persistence.GetCurrentExecutionResponse{RunID: runID}, nil)

weContext.EXPECT().PersistWorkflowEvents(gomock.Any(), workflowEvents).Return(int64(0), nil)
weContext.EXPECT().UpdateWorkflowExecutionWithNew(
gomock.Any(), now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, workflow.TransactionPolicyPassive, (*workflow.TransactionPolicy)(nil),
).Return(nil)

err := s.transactionMgr.backfillWorkflow(ctx, now, targetWorkflow, workflowEvents)
s.NoError(err)
s.True(releaseCalled)
}

func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Passive_Open() {
ctx := context.Background()
now := time.Now().UTC()
Expand Down

0 comments on commit 082713b

Please sign in to comment.