Skip to content

Commit

Permalink
Add more test for XDC (#4025)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 committed Mar 10, 2023
1 parent 7b6ce25 commit 122bb36
Showing 1 changed file with 234 additions and 20 deletions.
254 changes: 234 additions & 20 deletions tests/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ func (s *integrationClustersTestSuite) TestStickyWorkflowTaskFailover() {

s.failover(namespace, clusterName[0], int64(11), client2)

_, err = poller1.PollAndProcessWorkflowTask(true, false)
_, err = poller1.PollAndProcessWorkflowTask(false, false)
s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)
s.True(workflowCompleted)
Expand Down Expand Up @@ -1562,7 +1562,7 @@ func (s *integrationClustersTestSuite) TestUserTimerFailover() {

for i := 1; i < 20; i++ {
if !workflowCompleted {
_, err = poller2.PollAndProcessWorkflowTask(true, false)
_, err = poller2.PollAndProcessWorkflowTask(false, false)
s.NoError(err)
time.Sleep(time.Second)
}
Expand Down Expand Up @@ -1828,8 +1828,8 @@ func (s *integrationClustersTestSuite) TestTransientWorkflowTaskFailover() {
s.True(workflowFinished)
}

func (s *integrationClustersTestSuite) TestCronWorkflowFailover() {
namespace := "test-cron-workflow-failover-" + common.GenerateRandomString(5)
func (s *integrationClustersTestSuite) TestCronWorkflowStartAndFailover() {
namespace := "test-cron-workflow-start-and-failover-" + common.GenerateRandomString(5)
client1 := s.cluster1.GetFrontendClient() // active
regReq := &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
Expand All @@ -1853,9 +1853,9 @@ func (s *integrationClustersTestSuite) TestCronWorkflowFailover() {
client2 := s.cluster2.GetFrontendClient() // standby

// start a workflow
id := "integration-cron-workflow-failover-test"
wt := "integration-cron-workflow-failover-test-type"
tl := "integration-cron-workflow-failover-test-taskqueue"
id := "integration-cron-workflow-start-and-failover-test"
wt := "integration-cron-workflow-start-and-failover-test-type"
tl := "integration-cron-workflow-start-and-failover-test-taskqueue"
identity := "worker1"
workflowType := &commonpb.WorkflowType{Name: wt}
taskQueue := &taskqueuepb.TaskQueue{Name: tl}
Expand All @@ -1875,8 +1875,12 @@ func (s *integrationClustersTestSuite) TestCronWorkflowFailover() {
s.NoError(err)
s.NotNil(we.GetRunId())

wfCompleted := false
var executions []*commonpb.WorkflowExecution
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
executions = append(executions, execution)
wfCompleted = true
return []*commandpb.Command{
{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Expand All @@ -1898,12 +1902,120 @@ func (s *integrationClustersTestSuite) TestCronWorkflowFailover() {

s.failover(namespace, clusterName[1], int64(2), client1)

// Run twice to make sure cron schedule is passed to standby.
for i := 0; i < 2; i++ {
_, err = poller2.PollAndProcessWorkflowTask(false, false)
s.NoError(err)
_, err = poller2.PollAndProcessWorkflowTask(false, false)
s.NoError(err)
s.True(wfCompleted)
events := s.getHistory(client2, namespace, executions[0])
s.Equal(int64(2), events[len(events)-1].GetVersion())

// terminate the remaining cron
_, err = client2.TerminateWorkflowExecution(tests.NewContext(), &workflowservice.TerminateWorkflowExecutionRequest{
Namespace: namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
},
})
s.NoError(err)
}

func (s *integrationClustersTestSuite) TestCronWorkflowCompleteAndFailover() {
namespace := "test-cron-workflow-complete-and-failover-" + common.GenerateRandomString(5)
client1 := s.cluster1.GetFrontendClient() // active
regReq := &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
IsGlobalNamespace: true,
Clusters: clusterReplicationConfig,
ActiveClusterName: clusterName[0],
WorkflowExecutionRetentionPeriod: timestamp.DurationPtr(1 * time.Hour * 24),
}
_, err := client1.RegisterNamespace(tests.NewContext(), regReq)
s.NoError(err)
// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)

descReq := &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
}
resp, err := client1.DescribeNamespace(tests.NewContext(), descReq)
s.NoError(err)
s.NotNil(resp)

client2 := s.cluster2.GetFrontendClient() // standby

// start a workflow
id := "integration-cron-workflow-complete-andfailover-test"
wt := "integration-cron-workflow-complete-andfailover-test-type"
tl := "integration-cron-workflow-complete-andfailover-test-taskqueue"
identity := "worker1"
workflowType := &commonpb.WorkflowType{Name: wt}
taskQueue := &taskqueuepb.TaskQueue{Name: tl}
startReq := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
Input: nil,
WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),
Identity: identity,
CronSchedule: "@every 5s",
}
we, err := client1.StartWorkflowExecution(tests.NewContext(), startReq)
s.NoError(err)
s.NotNil(we.GetRunId())

wfCompletionCount := 0
var executions []*commonpb.WorkflowExecution
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wfCompletionCount += 1
executions = append(executions, execution)
return []*commandpb.Command{
{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("cron-test-result"),
}},
}}, nil
}

poller1 := tests.TaskPoller{
Engine: client1,
Namespace: namespace,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
Logger: s.logger,
T: s.T(),
}

poller2 := tests.TaskPoller{
Engine: client2,
Namespace: namespace,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
Logger: s.logger,
T: s.T(),
}

_, err = poller1.PollAndProcessWorkflowTask(false, false)
s.NoError(err)
s.Equal(1, wfCompletionCount)
events := s.getHistory(client1, namespace, executions[0])
s.Equal(int64(1), events[0].GetVersion())
s.Equal(int64(1), events[len(events)-1].GetVersion())

s.failover(namespace, clusterName[1], int64(2), client1)

_, err = poller2.PollAndProcessWorkflowTask(false, false)
s.NoError(err)
s.Equal(2, wfCompletionCount)
events = s.getHistory(client2, namespace, executions[1])
s.Equal(int64(1), events[0].GetVersion())
s.Equal(int64(2), events[len(events)-1].GetVersion())

_, err = client2.TerminateWorkflowExecution(tests.NewContext(), &workflowservice.TerminateWorkflowExecutionRequest{
Namespace: namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
Expand All @@ -1913,8 +2025,8 @@ func (s *integrationClustersTestSuite) TestCronWorkflowFailover() {
s.NoError(err)
}

func (s *integrationClustersTestSuite) TestWorkflowRetryFailover() {
namespace := "test-workflow-retry-failover-" + common.GenerateRandomString(5)
func (s *integrationClustersTestSuite) TestWorkflowRetryStartAndFailover() {
namespace := "test-workflow-retry-start-and-failover-" + common.GenerateRandomString(5)
client1 := s.cluster1.GetFrontendClient() // active
regReq := &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
Expand All @@ -1938,9 +2050,9 @@ func (s *integrationClustersTestSuite) TestWorkflowRetryFailover() {
client2 := s.cluster2.GetFrontendClient() // standby

// start a workflow
id := "integration-workflow-retry-failover-test"
wt := "integration-workflow-retry-failover-test-type"
tl := "integration-workflow-retry-failover-test-taskqueue"
id := "integration-workflow-retry-start-and-failover-test"
wt := "integration-workflow-retry-start-and-failover-test-type"
tl := "integration-workflow-retry-start-and-failover-test-taskqueue"
identity := "worker1"
workflowType := &commonpb.WorkflowType{Name: wt}
taskQueue := &taskqueuepb.TaskQueue{Name: tl}
Expand Down Expand Up @@ -1995,22 +2107,124 @@ func (s *integrationClustersTestSuite) TestWorkflowRetryFailover() {
_, err = poller2.PollAndProcessWorkflowTask(false, false)
s.NoError(err)
events := s.getHistory(client2, namespace, executions[0])
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, events[len(events)-1].GetEventType())
s.Equal(int64(1), events[0].GetVersion())
s.Equal(int64(2), events[len(events)-1].GetVersion())
s.Equal(int32(1), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt())
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, events[len(events)-1].GetEventType())

// second attempt
_, err = poller2.PollAndProcessWorkflowTask(false, false)
s.NoError(err)
events = s.getHistory(client2, namespace, executions[1])
s.Equal(int64(2), events[0].GetVersion())
s.Equal(int64(2), events[len(events)-1].GetVersion())
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, events[len(events)-1].GetEventType())
s.Equal(int32(2), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt())
}

func (s *integrationClustersTestSuite) TestWorkflowRetryFailAndFailover() {
namespace := "test-workflow-retry-fail-and-failover-" + common.GenerateRandomString(5)
client1 := s.cluster1.GetFrontendClient() // active
regReq := &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
IsGlobalNamespace: true,
Clusters: clusterReplicationConfig,
ActiveClusterName: clusterName[0],
WorkflowExecutionRetentionPeriod: timestamp.DurationPtr(1 * time.Hour * 24),
}
_, err := client1.RegisterNamespace(tests.NewContext(), regReq)
s.NoError(err)
// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)

descReq := &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
}
resp, err := client1.DescribeNamespace(tests.NewContext(), descReq)
s.NoError(err)
s.NotNil(resp)

client2 := s.cluster2.GetFrontendClient() // standby

// start a workflow
id := "integration-workflow-retry-fail-and-failover-test"
wt := "integration-workflow-retry-fail-and-failover-test-type"
tl := "integration-workflow-retry-fail-and-failover-test-taskqueue"
identity := "worker1"
workflowType := &commonpb.WorkflowType{Name: wt}
taskQueue := &taskqueuepb.TaskQueue{Name: tl}
startReq := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
Input: nil,
WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),
Identity: identity,
RetryPolicy: &commonpb.RetryPolicy{
InitialInterval: timestamp.DurationPtr(1 * time.Second),
MaximumAttempts: 3,
MaximumInterval: timestamp.DurationPtr(1 * time.Second),
NonRetryableErrorTypes: []string{"bad-bug"},
BackoffCoefficient: 1,
},
}
we, err := client1.StartWorkflowExecution(tests.NewContext(), startReq)
s.NoError(err)
s.NotNil(we.GetRunId())

var executions []*commonpb.WorkflowExecution
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
executions = append(executions, execution)
return []*commandpb.Command{
{
CommandType: enumspb.COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_FailWorkflowExecutionCommandAttributes{FailWorkflowExecutionCommandAttributes: &commandpb.FailWorkflowExecutionCommandAttributes{
Failure: failure.NewServerFailure("retryable-error", false),
}},
}}, nil
}

poller1 := tests.TaskPoller{
Engine: client1,
Namespace: namespace,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
Logger: s.logger,
T: s.T(),
}

poller2 := tests.TaskPoller{
Engine: client2,
Namespace: namespace,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
Logger: s.logger,
T: s.T(),
}

_, err = poller1.PollAndProcessWorkflowTask(false, false)
s.NoError(err)
events := s.getHistory(client1, namespace, executions[0])
s.Equal(int64(1), events[0].GetVersion())
s.Equal(int64(1), events[len(events)-1].GetVersion())
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, events[len(events)-1].GetEventType())
s.Equal(int32(1), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt())

s.failover(namespace, clusterName[1], int64(2), client1)

// third attempt. Still failing, should stop retry.
_, err = poller2.PollAndProcessWorkflowTask(false, false)
s.NoError(err)
events = s.getHistory(client2, namespace, executions[2])
events = s.getHistory(client2, namespace, executions[1])
s.Equal(int64(1), events[0].GetVersion())
s.Equal(int64(2), events[len(events)-1].GetVersion())
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, events[len(events)-1].GetEventType())
s.Equal(int32(3), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt())
s.Equal(int32(2), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt())
}

func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() {
Expand Down

0 comments on commit 122bb36

Please sign in to comment.