Skip to content

Commit

Permalink
Fix sanitize mutable state after replication (#3479)
Browse files Browse the repository at this point in the history
* Fix sanitize mutable state after replication
  • Loading branch information
yux0 authored and dnr committed Oct 17, 2022
1 parent 45bd55d commit bb7b1f4
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 13 deletions.
2 changes: 2 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (
LastBlobNextPageToken = -1
// EndMessageID is the id of the end message, here we use the int64 max
EndMessageID int64 = 1<<63 - 1
// EmptyTaskID is the id of the empty task
EmptyTaskID int64 = 0
)

const (
Expand Down
41 changes: 41 additions & 0 deletions host/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
filterpb "go.temporal.io/api/filter/v1"
historypb "go.temporal.io/api/history/v1"
querypb "go.temporal.io/api/query/v1"
replicationpb "go.temporal.io/api/replication/v1"
Expand Down Expand Up @@ -2175,6 +2176,21 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
s.Equal(1, len(nsResp.ReplicationConfig.Clusters))
time.Sleep(cacheRefreshInterval)

// Start wf1 (in local ns)
workflowID8 := "global-ns-wf-1"
run8, err := client1.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{
ID: workflowID8,
TaskQueue: taskqueue,
WorkflowRunTimeout: time.Second * 30,
}, testWorkflowFn, time.Millisecond*10)

s.NoError(err)
s.NotEmpty(run8.GetRunID())
s.logger.Info("start wf8", tag.WorkflowRunID(run8.GetRunID()))
// wait until wf1 complete
err = run8.Get(testCtx, nil)
s.NoError(err)

// this will buffer after ns promotion
err = client1.SignalWorkflow(testCtx, workflowID7, run7.GetRunID(), "signal-name", "signal-value")
s.NoError(err)
Expand Down Expand Up @@ -2284,11 +2300,36 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
Namespace: namespace,
})
s.NoError(err)
feClient2 := s.cluster2.GetFrontendClient()
verify := func(wfID string, expectedRunID string) {
desc1, err := client2.DescribeWorkflowExecution(testCtx, wfID, "")
s.NoError(err)
s.Equal(expectedRunID, desc1.WorkflowExecutionInfo.Execution.RunId)
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, desc1.WorkflowExecutionInfo.Status)
resp, err := feClient2.GetWorkflowExecutionHistoryReverse(testCtx, &workflowservice.GetWorkflowExecutionHistoryReverseRequest{
Namespace: namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: wfID,
RunId: expectedRunID,
},
MaximumPageSize: 1,
NextPageToken: nil,
})
s.NoError(err)
s.True(len(resp.GetHistory().GetEvents()) > 0)
listWorkflowResp, err := feClient2.ListClosedWorkflowExecutions(
testCtx,
&workflowservice.ListClosedWorkflowExecutionsRequest{
Namespace: namespace,
MaximumPageSize: 1000,
Filters: &workflowservice.ListClosedWorkflowExecutionsRequest_ExecutionFilter{
ExecutionFilter: &filterpb.WorkflowExecutionFilter{
WorkflowId: wfID,
}},
},
)
s.NoError(err)
s.True(len(listWorkflowResp.GetExecutions()) > 0)
}
verify(workflowID, run1.GetRunID())
verify(workflowID2, run2.GetRunID())
Expand Down
24 changes: 13 additions & 11 deletions service/history/nDCHistoryReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (r *nDCHistoryReplicatorImpl) ApplyWorkflowState(
return err
}

lastEventTime, err := r.backfillHistory(
lastEventTime, lastFirstTxnID, err := r.backfillHistory(
ctx,
request.GetRemoteCluster(),
namespaceID,
Expand All @@ -293,6 +293,8 @@ func (r *nDCHistoryReplicatorImpl) ApplyWorkflowState(
r.logger,
ns,
request.GetWorkflowState(),
lastFirstTxnID,
lastEventItem.GetVersion(),
)
if err != nil {
return err
Expand Down Expand Up @@ -836,7 +838,7 @@ func (r *nDCHistoryReplicatorImpl) backfillHistory(
lastEventID int64,
lastEventVersion int64,
branchToken []byte,
) (*time.Time, error) {
) (*time.Time, int64, error) {

// Get the last batch node id to check if the history data is already in DB.
localHistoryIterator := collection.NewPagingIterator(r.getHistoryFromLocalPaginationFn(
Expand All @@ -854,7 +856,7 @@ func (r *nDCHistoryReplicatorImpl) backfillHistory(
}
case *serviceerror.NotFound:
default:
return nil, err
return nil, common.EmptyTaskID, err
}
}

Expand All @@ -874,11 +876,11 @@ func (r *nDCHistoryReplicatorImpl) backfillHistory(
},
)
if err != nil {
return nil, err
return nil, common.EmptyTaskID, err
}
historyBranch := resp.BranchInfo

prevTxnID := common.EmptyVersion
prevTxnID := common.EmptyTaskID
var lastHistoryBatch *commonpb.DataBlob
var prevBranchID string
sortedAncestors := sortAncestors(historyBranch.GetAncestors())
Expand All @@ -889,7 +891,7 @@ BackfillLoop:
for remoteHistoryIterator.HasNext() {
historyBlob, err := remoteHistoryIterator.Next()
if err != nil {
return nil, err
return nil, common.EmptyTaskID, err
}

if historyBlob.nodeID <= lastBatchNodeID {
Expand All @@ -910,7 +912,7 @@ BackfillLoop:
currentAncestor = sortedAncestors[sortedAncestorsIdx]
branchID = currentAncestor.GetBranchId()
if historyBlob.nodeID < currentAncestor.GetBeginNodeId() || historyBlob.nodeID >= currentAncestor.GetEndNodeId() {
return nil, serviceerror.NewInternal(
return nil, common.EmptyTaskID, serviceerror.NewInternal(
fmt.Sprintf("The backfill history blob node id %d is not in acestoer range [%d, %d]",
historyBlob.nodeID,
currentAncestor.GetBeginNodeId(),
Expand All @@ -931,11 +933,11 @@ BackfillLoop:
},
})
if err != nil {
return nil, err
return nil, common.EmptyTaskID, err
}
txnID, err := r.shard.GenerateTaskID()
if err != nil {
return nil, err
return nil, common.EmptyTaskID, err
}
_, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shard.GetShardID(),
Expand All @@ -952,7 +954,7 @@ BackfillLoop:
),
})
if err != nil {
return nil, err
return nil, common.EmptyTaskID, err
}
prevTxnID = txnID
prevBranchID = branchID
Expand All @@ -964,7 +966,7 @@ BackfillLoop:
if len(events) > 0 {
lastEventTime = events[len(events)-1].EventTime
}
return lastEventTime, nil
return lastEventTime, prevTxnID, nil
}

func sortAncestors(ans []*persistencespb.HistoryBranchRange) []*persistencespb.HistoryBranchRange {
Expand Down
1 change: 1 addition & 0 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ func (s *ContextImpl) CreateWorkflowExecution(
); err != nil {
return nil, err
}
s.updateCloseTaskIDs(request.NewWorkflowSnapshot.ExecutionInfo, request.NewWorkflowSnapshot.Tasks)

currentRangeID := s.getRangeIDLocked()
request.RangeID = currentRangeID
Expand Down
7 changes: 6 additions & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ func NewSanitizedMutableState(
logger log.Logger,
namespaceEntry *namespace.Namespace,
mutableStateRecord *persistencespb.WorkflowMutableState,
lastFirstEventTxnID int64,
lastWriteVersion int64,
) (*MutableStateImpl, error) {

mutableState, err := newMutableStateFromDB(shard, eventsCache, logger, namespaceEntry, mutableStateRecord, 1)
Expand All @@ -365,12 +367,15 @@ func NewSanitizedMutableState(
}

// sanitize data
mutableState.executionInfo.LastFirstEventTxnId = common.EmptyVersion
mutableState.executionInfo.LastFirstEventTxnId = lastFirstEventTxnID
mutableState.executionInfo.CloseVisibilityTaskId = common.EmptyVersion
mutableState.executionInfo.CloseTransferTaskId = common.EmptyVersion
// TODO: after adding cluster to clock info, no need to reset clock here
mutableState.executionInfo.ParentClock = nil
for _, childExecutionInfo := range mutableState.pendingChildExecutionInfoIDs {
childExecutionInfo.Clock = nil
}
mutableState.currentVersion = lastWriteVersion
return mutableState, nil
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (s *mutableStateSuite) TestSanitizedMutableState() {
}}

mutableStateProto := mutableState.CloneToProto()
sanitizedMutableState, err := NewSanitizedMutableState(s.mockShard, s.mockEventsCache, s.logger, tests.LocalNamespaceEntry, mutableStateProto)
sanitizedMutableState, err := NewSanitizedMutableState(s.mockShard, s.mockEventsCache, s.logger, tests.LocalNamespaceEntry, mutableStateProto, 0, 0)
s.NoError(err)
s.Equal(int64(0), sanitizedMutableState.executionInfo.LastFirstEventTxnId)
s.Nil(sanitizedMutableState.executionInfo.ParentClock)
Expand Down

0 comments on commit bb7b1f4

Please sign in to comment.