Skip to content

Commit

Permalink
Improve integration tests stability (#1495)
Browse files Browse the repository at this point in the history
* Improve the stability of TestActivityHeartbeatDetailsDuringRetry
* Improve the stability of TestUserTimerFailover
* Improve the stability of TestEventsReapply_ZombieWorkflow
  • Loading branch information
wxing1292 committed Apr 27, 2021
1 parent fee1c43 commit e67e845
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 208 deletions.
2 changes: 1 addition & 1 deletion host/activity_test.go
Expand Up @@ -259,7 +259,7 @@ func (s *integrationSuite) TestActivityHeartbeatDetailsDuringRetry() {
TaskToken: taskToken, Details: heartbeatDetails})
s.NoError(err)
// Trigger heartbeat timeout and retry
time.Sleep(time.Second * 2)
time.Sleep(time.Second * 4)
} else if activityExecutedCount == 1 {
// return an error and retry
err = errors.New("retryable-error")
Expand Down
19 changes: 17 additions & 2 deletions host/ndc/ndc_integration_test.go
Expand Up @@ -997,15 +997,30 @@ func (s *nDCIntegrationTestSuite) TestEventsReapply_ZombieWorkflow() {
s.generator = test.InitializeHistoryEventGenerator(s.namespace, version)

// verify two batches of zombie workflow are call reapply API
s.mockAdminClient["standby"].(*adminservicemock.MockAdminServiceClient).EXPECT().ReapplyEvents(gomock.Any(), gomock.Any()).Return(&adminservice.ReapplyEventsResponse{}, nil).Times(2)
reapplyCount := 0
for i := 0; i < 2 && s.generator.HasNextVertex(); i++ {
events := s.generator.GetNextVertices()
historyEvents := &historypb.History{}
reapply := false
for _, event := range events {
historyEvents.Events = append(historyEvents.Events, event.GetData().(*historypb.HistoryEvent))
historyEvent := event.GetData().(*historypb.HistoryEvent)
if historyEvent.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
reapply = true
}
historyEvents.Events = append(historyEvents.Events, historyEvent)
}
if reapply {
reapplyCount += 1
}
historyBatch = append(historyBatch, historyEvents)
}
s.mockAdminClient["standby"].(*adminservicemock.MockAdminServiceClient).EXPECT().ReapplyEvents(
gomock.Any(),
gomock.Any(),
).Return(
&adminservice.ReapplyEventsResponse{},
nil,
).Times(reapplyCount * 2)

versionHistory = s.eventBatchesToVersionHistory(nil, historyBatch)
s.applyEvents(
Expand Down
244 changes: 39 additions & 205 deletions host/xdc/integration_failover_test.go
Expand Up @@ -160,18 +160,7 @@ func (s *integrationClustersTestSuite) TestNamespaceFailover() {
s.NotNil(resp2)
s.Equal(resp, resp2)

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())
s.failover(namespace, clusterName[1], int64(2), client1)

updated := false
var resp3 *workflowservice.DescribeNamespaceResponse
Expand Down Expand Up @@ -417,21 +406,7 @@ func (s *integrationClustersTestSuite) TestSimpleWorkflowFailover() {
s.NoError(err)
s.Equal("query-result", queryResultString)

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// wait till failover completed
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// check history matched
getHistoryReq := &workflowservice.GetWorkflowExecutionHistoryRequest{
Expand Down Expand Up @@ -645,21 +620,7 @@ func (s *integrationClustersTestSuite) TestStickyWorkflowTaskFailover() {
})
s.NoError(err)

// Update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

_, err = poller2.PollAndProcessWorkflowTaskWithAttemptAndRetry(false, false, false, true, 1, 5)
s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
Expand All @@ -678,18 +639,7 @@ func (s *integrationClustersTestSuite) TestStickyWorkflowTaskFailover() {
})
s.NoError(err)

// Update namespace to fail over back
updateReq = &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[0],
},
}
updateResp, err = client2.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[0], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(11), updateResp.GetFailoverVersion())
s.failover(namespace, clusterName[0], int64(11), client2)

_, err = poller1.PollAndProcessWorkflowTask(true, false)
s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
Expand Down Expand Up @@ -790,21 +740,7 @@ func (s *integrationClustersTestSuite) TestStartWorkflowExecution_Failover_Workf
s.NoError(err)
s.Equal(1, workflowCompleteTimes)

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// wait till failover completed
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// start the same workflow in cluster 2 is not allowed if policy is AllowDuplicateFailedOnly
startReq.RequestId = uuid.New()
Expand Down Expand Up @@ -935,21 +871,7 @@ func (s *integrationClustersTestSuite) TestTerminateFailover() {
s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// wait till failover completed
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// terminate workflow at cluster 2
terminateReason := "terminate reason"
Expand Down Expand Up @@ -1148,21 +1070,7 @@ func (s *integrationClustersTestSuite) TestResetWorkflowFailover() {
})
s.NoError(err)

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// wait till failover completed
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

_, err = poller2.PollAndProcessWorkflowTask(false, false)
s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
Expand Down Expand Up @@ -1298,21 +1206,7 @@ func (s *integrationClustersTestSuite) TestContinueAsNewFailover() {
s.NoError(err, strconv.Itoa(i))
}

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// wait till failover completed
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// finish the rest in cluster 2
for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -1438,21 +1332,7 @@ func (s *integrationClustersTestSuite) TestSignalFailover() {
s.NoError(err)
s.True(eventSignaled)

// Update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// check history matched
getHistoryReq := &workflowservice.GetWorkflowExecutionHistoryRequest{
Expand Down Expand Up @@ -1656,21 +1536,7 @@ func (s *integrationClustersTestSuite) TestUserTimerFailover() {
}
s.True(timerCreated)

// Update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

for i := 1; i < 20; i++ {
if !workflowCompleted {
Expand Down Expand Up @@ -1830,21 +1696,7 @@ func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() {
err = poller1.PollAndProcessActivityTask(false)
s.IsType(&serviceerror.NotFound{}, err)

// Update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// Make sure the heartbeat details are sent to cluster2 even when the activity at cluster1
// has heartbeat timeout. Also make sure the information is recorded when the activity state
Expand Down Expand Up @@ -1987,21 +1839,7 @@ func (s *integrationClustersTestSuite) TestTransientWorkflowTaskFailover() {
_, err = poller1.PollAndProcessWorkflowTask(false, false)
s.NoError(err)

// Update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// for failover transient workflow task, it is guaranteed that the transient workflow task
// after the failover has attempt 1
Expand Down Expand Up @@ -2079,22 +1917,7 @@ func (s *integrationClustersTestSuite) TestCronWorkflowFailover() {
T: s.T(),
}

// Failover during backoff
// Update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// Run twice to make sure cron schedule is passed to standby.
for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -2187,21 +2010,7 @@ func (s *integrationClustersTestSuite) TestWorkflowRetryFailover() {
T: s.T(),
}

// Update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterName[1],
},
}
updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(int64(2), updateResp.GetFailoverVersion())

// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)
s.failover(namespace, clusterName[1], int64(2), client1)

// First attempt
_, err = poller2.PollAndProcessWorkflowTask(false, false)
Expand Down Expand Up @@ -2246,3 +2055,28 @@ func (s *integrationClustersTestSuite) getHistory(client host.FrontendClient, na

return events
}

func (s *integrationClustersTestSuite) failover(
namespace string,
targetCluster string,
targetFailoverVersion int64,
client host.FrontendClient,
) {
// wait for replication task propagation
time.Sleep(4 * time.Second)

// update namespace to fail over
updateReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: targetCluster,
},
}
updateResp, err := client.UpdateNamespace(host.NewContext(), updateReq)
s.NoError(err)
s.Equal(targetCluster, updateResp.ReplicationConfig.GetActiveClusterName())
s.Equal(targetFailoverVersion, updateResp.GetFailoverVersion())

// wait till failover completed
time.Sleep(cacheRefreshInterval)
}

0 comments on commit e67e845

Please sign in to comment.