Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve integration tests stability #1495

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion host/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (s *integrationSuite) TestActivityHeartbeatDetailsDuringRetry() {
TaskToken: taskToken, Details: heartbeatDetails})
s.NoError(err)
// Trigger heartbeat timeout and retry
time.Sleep(time.Second * 2)
time.Sleep(time.Second * 4)
} else if activityExecutedCount == 1 {
// return an error and retry
err = errors.New("retryable-error")
Expand Down
19 changes: 17 additions & 2 deletions host/ndc/ndc_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,15 +997,30 @@ func (s *nDCIntegrationTestSuite) TestEventsReapply_ZombieWorkflow() {
s.generator = test.InitializeHistoryEventGenerator(s.namespace, version)

// verify two batches of zombie workflow are call reapply API
s.mockAdminClient["standby"].(*adminservicemock.MockAdminServiceClient).EXPECT().ReapplyEvents(gomock.Any(), gomock.Any()).Return(&adminservice.ReapplyEventsResponse{}, nil).Times(2)
reapplyCount := 0
for i := 0; i < 2 && s.generator.HasNextVertex(); i++ {
events := s.generator.GetNextVertices()
historyEvents := &historypb.History{}
reapply := false
for _, event := range events {
historyEvents.Events = append(historyEvents.Events, event.GetData().(*historypb.HistoryEvent))
historyEvent := event.GetData().(*historypb.HistoryEvent)
if historyEvent.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
reapply = true
}
historyEvents.Events = append(historyEvents.Events, historyEvent)
}
if reapply {
reapplyCount += 1
}
historyBatch = append(historyBatch, historyEvents)
}
s.mockAdminClient["standby"].(*adminservicemock.MockAdminServiceClient).EXPECT().ReapplyEvents(
gomock.Any(),
gomock.Any(),
).Return(
&adminservice.ReapplyEventsResponse{},
nil,
).Times(reapplyCount * 2)

versionHistory = s.eventBatchesToVersionHistory(nil, historyBatch)
s.applyEvents(
Expand Down
244 changes: 39 additions & 205 deletions host/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,18 +160,7 @@ func (s *integrationClustersTestSuite) TestNamespaceFailover() {
s.NotNil(resp2)
s.Equal(resp, resp2)

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())
s.failover(namespace, clusterName[1], int64(2), client1)

updated := false
var resp3 *workflowservice.DescribeNamespaceResponse
Expand Down Expand Up @@ -417,21 +406,7 @@ func (s *integrationClustersTestSuite) TestSimpleWorkflowFailover() {
s.NoError(err)
s.Equal("query-result", queryResultString)

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// wait till failover completed
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// check history matched
getHistoryReq := &workflowservice.GetWorkflowExecutionHistoryRequest{
Expand Down Expand Up @@ -645,21 +620,7 @@ func (s *integrationClustersTestSuite) TestStickyWorkflowTaskFailover() {
})
s.NoError(err)

// Update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

_, err = poller2.PollAndProcessWorkflowTaskWithAttemptAndRetry(false, false, false, true, 1, 5)
s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
Expand All @@ -678,18 +639,7 @@ func (s *integrationClustersTestSuite) TestStickyWorkflowTaskFailover() {
})
s.NoError(err)

// Update namespace to fail over back
updateReq = &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[0],
},
}
updateResp, err = client2.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[0], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(11), updateResp.GetFailoverVersion())
s.failover(namespace, clusterName[0], int64(11), client2)

_, err = poller1.PollAndProcessWorkflowTask(true, false)
s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
Expand Down Expand Up @@ -790,21 +740,7 @@ func (s *integrationClustersTestSuite) TestStartWorkflowExecution_Failover_Workf
s.NoError(err)
s.Equal(1, workflowCompleteTimes)

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// wait till failover completed
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// start the same workflow in cluster 2 is not allowed if policy is AllowDuplicateFailedOnly
startReq.RequestId = uuid.New()
Expand Down Expand Up @@ -935,21 +871,7 @@ func (s *integrationClustersTestSuite) TestTerminateFailover() {
s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// wait till failover completed
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// terminate workflow at cluster 2
terminateReason := "terminate reason"
Expand Down Expand Up @@ -1148,21 +1070,7 @@ func (s *integrationClustersTestSuite) TestResetWorkflowFailover() {
})
s.NoError(err)

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// wait till failover completed
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

_, err = poller2.PollAndProcessWorkflowTask(false, false)
s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
Expand Down Expand Up @@ -1298,21 +1206,7 @@ func (s *integrationClustersTestSuite) TestContinueAsNewFailover() {
s.NoError(err, strconv.Itoa(i))
}

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// wait till failover completed
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// finish the rest in cluster 2
for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -1438,21 +1332,7 @@ func (s *integrationClustersTestSuite) TestSignalFailover() {
s.NoError(err)
s.True(eventSignaled)

// Update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// check history matched
getHistoryReq := &workflowservice.GetWorkflowExecutionHistoryRequest{
Expand Down Expand Up @@ -1656,21 +1536,7 @@ func (s *integrationClustersTestSuite) TestUserTimerFailover() {
}
s.True(timerCreated)

// Update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

for i := 1; i < 20; i++ {
if !workflowCompleted {
Expand Down Expand Up @@ -1830,21 +1696,7 @@ func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() {
err = poller1.PollAndProcessActivityTask(false)
s.IsType(&serviceerror.NotFound{}, err)

// Update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// Make sure the heartbeat details are sent to cluster2 even when the activity at cluster1
// has heartbeat timeout. Also make sure the information is recorded when the activity state
Expand Down Expand Up @@ -1987,21 +1839,7 @@ func (s *integrationClustersTestSuite) TestTransientWorkflowTaskFailover() {
_, err = poller1.PollAndProcessWorkflowTask(false, false)
s.NoError(err)

// Update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// for failover transient workflow task, it is guaranteed that the transient workflow task
// after the failover has attempt 1
Expand Down Expand Up @@ -2079,22 +1917,7 @@ func (s *integrationClustersTestSuite) TestCronWorkflowFailover() {
T: s.T(),
}

// Failover during backoff
// Update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// Run twice to make sure cron schedule is passed to standby.
for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -2187,21 +2010,7 @@ func (s *integrationClustersTestSuite) TestWorkflowRetryFailover() {
T: s.T(),
}

// Update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// First attempt
_, err = poller2.PollAndProcessWorkflowTask(false, false)
Expand Down Expand Up @@ -2246,3 +2055,28 @@ func (s *integrationClustersTestSuite) getHistory(client host.FrontendClient, na

return events
}

func (s *integrationClustersTestSuite) failover(
namespace string,
targetCluster string,
targetFailoverVersion int64,
client host.FrontendClient,
) {
// wait for replication task propagation
time.Sleep(4 * time.Second)

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: targetCluster,
},
}
updateResp, err := client.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.Equal(targetCluster, updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(targetFailoverVersion, updateResp.GetFailoverVersion())

// wait till failover completed
time.Sleep(cacheRefreshInterval)
}