Skip to content

Commit

Permalink
Add XDC integration test for ResetWorkflow API (#1469)
Browse files Browse the repository at this point in the history
* Add cross DC integration test for ResetWorkflow API
* Rewrite verbose code logic
  • Loading branch information
wxing1292 committed Apr 16, 2021
1 parent bd21495 commit 10822d6
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 18 deletions.
171 changes: 171 additions & 0 deletions host/xdc/integration_failover_test.go
Expand Up @@ -1019,6 +1019,177 @@ GetHistoryLoop2:
s.True(eventsReplicated)
}

func (s *integrationClustersTestSuite) testResetWorkflowRangeScheduleToStart() {
namespace := "test-reset-workflow-failover-" + common.GenerateRandomString(5)
client1 := s.cluster1.GetFrontendClient() // active
regReq := &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
IsGlobalNamespace: true,
Clusters: clusterReplicationConfig,
ActiveClusterName: clusterName[0],
WorkflowExecutionRetentionPeriod: timestamp.DurationPtr(1 * time.Hour * 24),
}
_, err := client1.RegisterNamespace(host.NewContext(), regReq)
s.NoError(err)

descReq := &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
}
resp, err := client1.DescribeNamespace(host.NewContext(), descReq)
s.NoError(err)
s.NotNil(resp)
// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)

client2 := s.cluster2.GetFrontendClient() // standby

// start a workflow
id := "integration-reset-workflow-failover-test"
wt := "integration-reset-workflow-failover-test-type"
tl := "integration-reset-workflow-failover-test-taskqueue"
identity := "worker1"
workflowType := &commonpb.WorkflowType{Name: wt}
taskQueue := &taskqueuepb.TaskQueue{Name: tl}
startReq := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
Input: nil,
WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),
Identity: identity,
}
we, err := client1.StartWorkflowExecution(host.NewContext(), startReq)
s.NoError(err)
s.NotNil(we.GetRunId())

_, err = client1.SignalWorkflowExecution(host.NewContext(), &workflowservice.SignalWorkflowExecutionRequest{
Namespace: namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
SignalName: "random signal name",
Input: &commonpb.Payloads{Payloads: []*commonpb.Payload{
{Data: []byte("random signal payload")},
}},
Identity: identity,
})
s.NoError(err)

// workflow logic
workflowComplete := false
isWorkflowTaskProcessed := false
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {

if !isWorkflowTaskProcessed {
isWorkflowTaskProcessed = true
return []*commandpb.Command{}, nil
}

// Complete workflow after reset
workflowComplete = true
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{
CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("Done"),
}},
}}, nil

}

poller := host.TaskPoller{
Engine: client1,
Namespace: namespace,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
ActivityTaskHandler: nil,
Logger: s.logger,
T: s.T(),
}

poller2 := host.TaskPoller{
Engine: client2,
Namespace: namespace,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
ActivityTaskHandler: nil,
Logger: s.logger,
T: s.T(),
}

_, err = poller.PollAndProcessWorkflowTask(false, false)
s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)

// events layout
// 1. WorkflowExecutionStarted
// 2. WorkflowTaskScheduled
// 3. WorkflowExecutionSignaled
// 4. WorkflowTaskStarted
// 5. WorkflowTaskCompleted

// Reset workflow execution
resetResp, err := client1.ResetWorkflowExecution(host.NewContext(), &workflowservice.ResetWorkflowExecutionRequest{
Namespace: namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
Reason: "reset execution from test",
WorkflowTaskFinishEventId: 4, // before WorkflowTaskStarted
RequestId: uuid.New(),
})
s.NoError(err)

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// wait till failover completed
time.Sleep(cacheRefreshInterval)

_, err = poller2.PollAndProcessWorkflowTask(false, false)
s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)
s.True(workflowComplete)

time.Sleep(cacheRefreshInterval)

getHistoryReq := &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: resetResp.RunId,
},
}

getHistoryResp, err := client1.GetWorkflowExecutionHistory(host.NewContext(), getHistoryReq)
s.NoError(err)
events := getHistoryResp.History.Events
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED, events[len(events)-1].GetEventType())

getHistoryResp, err = client2.GetWorkflowExecutionHistory(host.NewContext(), getHistoryReq)
s.NoError(err)
events = getHistoryResp.History.Events
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED, events[len(events)-1].GetEventType())
}

func (s *integrationClustersTestSuite) TestContinueAsNewFailover() {
namespace := "test-continueAsNew-workflow-failover-" + common.GenerateRandomString(5)
client1 := s.cluster1.GetFrontendClient() // active
Expand Down
23 changes: 5 additions & 18 deletions service/history/nDCStateRebuilder.go
Expand Up @@ -123,27 +123,10 @@ func (r *nDCStateRebuilderImpl) rebuild(
return nil, 0, err
}

// need to specially handling the first batch, to initialize mutable state & state builder
batch, err := iter.Next()
switch err.(type) {
case nil:
// noop
case *serviceerror.DataLoss:
// log event
r.logger.Error("encounter data loss event", tag.WorkflowNamespaceID(baseWorkflowIdentifier.NamespaceID), tag.WorkflowID(baseWorkflowIdentifier.WorkflowID), tag.WorkflowRunID(baseWorkflowIdentifier.RunID))
return nil, 0, err
default:
return nil, 0, err
}

firstEventBatch := batch.(*historypb.History).Events
rebuiltMutableState, stateBuilder := r.initializeBuilders(
namespaceEntry,
now,
)
if err := r.applyEvents(targetWorkflowIdentifier, stateBuilder, firstEventBatch, requestID); err != nil {
return nil, 0, err
}

for iter.HasNext() {
batch, err := iter.Next()
Expand Down Expand Up @@ -183,7 +166,11 @@ func (r *nDCStateRebuilderImpl) rebuild(
baseLastEventID,
baseLastEventVersion,
)) {
return nil, 0, serviceerror.NewInternal(fmt.Sprintf("nDCStateRebuilder unable to rebuild mutable state to event ID: %v, version: %v", baseLastEventID, baseLastEventVersion))
return nil, 0, serviceerror.NewInvalidArgument(fmt.Sprintf(
"nDCStateRebuilder unable to rebuild mutable state to event ID: %v, version: %v, this event must be at the boundary",
baseLastEventID,
baseLastEventVersion,
))
}

// close rebuilt mutable state transaction clearing all generated tasks, etc.
Expand Down

0 comments on commit 10822d6

Please sign in to comment.