Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add XDC integration test for ResetWorkflow API #1469

Merged
merged 1 commit into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
171 changes: 171 additions & 0 deletions host/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,177 @@ GetHistoryLoop2:
s.True(eventsReplicated)
}

func (s *integrationClustersTestSuite) testResetWorkflowRangeScheduleToStart() {
namespace := "test-reset-workflow-failover-" + common.GenerateRandomString(5)
client1 := s.cluster1.GetFrontendClient() // active
regReq := &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
IsGlobalNamespace: true,
Clusters: clusterReplicationConfig,
ActiveClusterName: clusterName[0],
WorkflowExecutionRetentionPeriod: timestamp.DurationPtr(1 * time.Hour * 24),
}
_, err := client1.RegisterNamespace(host.NewContext(), regReq)
s.NoError(err)

descReq := &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
}
resp, err := client1.DescribeNamespace(host.NewContext(), descReq)
s.NoError(err)
s.NotNil(resp)
// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)

client2 := s.cluster2.GetFrontendClient() // standby

// start a workflow
id := "integration-reset-workflow-failover-test"
wt := "integration-reset-workflow-failover-test-type"
tl := "integration-reset-workflow-failover-test-taskqueue"
identity := "worker1"
workflowType := &commonpb.WorkflowType{Name: wt}
taskQueue := &taskqueuepb.TaskQueue{Name: tl}
startReq := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
Input: nil,
WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),
Identity: identity,
}
we, err := client1.StartWorkflowExecution(host.NewContext(), startReq)
s.NoError(err)
s.NotNil(we.GetRunId())

_, err = client1.SignalWorkflowExecution(host.NewContext(), &workflowservice.SignalWorkflowExecutionRequest{
Namespace: namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
SignalName: "random signal name",
Input: &commonpb.Payloads{Payloads: []*commonpb.Payload{
{Data: []byte("random signal payload")},
}},
Identity: identity,
})
s.NoError(err)

// workflow logic
workflowComplete := false
isWorkflowTaskProcessed := false
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {

if !isWorkflowTaskProcessed {
isWorkflowTaskProcessed = true
return []*commandpb.Command{}, nil
}

// Complete workflow after reset
workflowComplete = true
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{
CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("Done"),
}},
}}, nil

}

poller := host.TaskPoller{
Engine: client1,
Namespace: namespace,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
ActivityTaskHandler: nil,
Logger: s.logger,
T: s.T(),
}

poller2 := host.TaskPoller{
Engine: client2,
Namespace: namespace,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
ActivityTaskHandler: nil,
Logger: s.logger,
T: s.T(),
}

_, err = poller.PollAndProcessWorkflowTask(false, false)
s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)

// events layout
// 1. WorkflowExecutionStarted
// 2. WorkflowTaskScheduled
// 3. WorkflowExecutionSignaled
// 4. WorkflowTaskStarted
// 5. WorkflowTaskCompleted

// Reset workflow execution
resetResp, err := client1.ResetWorkflowExecution(host.NewContext(), &workflowservice.ResetWorkflowExecutionRequest{
Namespace: namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
Reason: "reset execution from test",
WorkflowTaskFinishEventId: 4, // before WorkflowTaskStarted
RequestId: uuid.New(),
})
s.NoError(err)

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// wait till failover completed
time.Sleep(cacheRefreshInterval)

_, err = poller2.PollAndProcessWorkflowTask(false, false)
s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)
s.True(workflowComplete)

time.Sleep(cacheRefreshInterval)

getHistoryReq := &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: resetResp.RunId,
},
}

getHistoryResp, err := client1.GetWorkflowExecutionHistory(host.NewContext(), getHistoryReq)
s.NoError(err)
events := getHistoryResp.History.Events
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED, events[len(events)-1].GetEventType())

getHistoryResp, err = client2.GetWorkflowExecutionHistory(host.NewContext(), getHistoryReq)
s.NoError(err)
events = getHistoryResp.History.Events
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED, events[len(events)-1].GetEventType())
}

func (s *integrationClustersTestSuite) TestContinueAsNewFailover() {
namespace := "test-continueAsNew-workflow-failover-" + common.GenerateRandomString(5)
client1 := s.cluster1.GetFrontendClient() // active
Expand Down
23 changes: 5 additions & 18 deletions service/history/nDCStateRebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,27 +123,10 @@ func (r *nDCStateRebuilderImpl) rebuild(
return nil, 0, err
}

// need to specially handling the first batch, to initialize mutable state & state builder
batch, err := iter.Next()
switch err.(type) {
case nil:
// noop
case *serviceerror.DataLoss:
// log event
r.logger.Error("encounter data loss event", tag.WorkflowNamespaceID(baseWorkflowIdentifier.NamespaceID), tag.WorkflowID(baseWorkflowIdentifier.WorkflowID), tag.WorkflowRunID(baseWorkflowIdentifier.RunID))
return nil, 0, err
default:
return nil, 0, err
}

firstEventBatch := batch.(*historypb.History).Events
rebuiltMutableState, stateBuilder := r.initializeBuilders(
namespaceEntry,
now,
)
if err := r.applyEvents(targetWorkflowIdentifier, stateBuilder, firstEventBatch, requestID); err != nil {
return nil, 0, err
}

for iter.HasNext() {
batch, err := iter.Next()
Expand Down Expand Up @@ -183,7 +166,11 @@ func (r *nDCStateRebuilderImpl) rebuild(
baseLastEventID,
baseLastEventVersion,
)) {
return nil, 0, serviceerror.NewInternal(fmt.Sprintf("nDCStateRebuilder unable to rebuild mutable state to event ID: %v, version: %v", baseLastEventID, baseLastEventVersion))
return nil, 0, serviceerror.NewInvalidArgument(fmt.Sprintf(
"nDCStateRebuilder unable to rebuild mutable state to event ID: %v, version: %v, this event must be at the boundary",
baseLastEventID,
baseLastEventVersion,
))
}

// close rebuilt mutable state transaction clearing all generated tasks, etc.
Expand Down