diff --git a/host/activity_test.go b/host/activity_test.go index 9d343f09dd0..b8ba10b91a0 100644 --- a/host/activity_test.go +++ b/host/activity_test.go @@ -259,7 +259,7 @@ func (s *integrationSuite) TestActivityHeartbeatDetailsDuringRetry() { TaskToken: taskToken, Details: heartbeatDetails}) s.NoError(err) // Trigger heartbeat timeout and retry - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 4) } else if activityExecutedCount == 1 { // return an error and retry err = errors.New("retryable-error") diff --git a/host/ndc/ndc_integration_test.go b/host/ndc/ndc_integration_test.go index 35a9108226a..fe2f7cf47dd 100644 --- a/host/ndc/ndc_integration_test.go +++ b/host/ndc/ndc_integration_test.go @@ -997,15 +997,30 @@ func (s *nDCIntegrationTestSuite) TestEventsReapply_ZombieWorkflow() { s.generator = test.InitializeHistoryEventGenerator(s.namespace, version) // verify two batches of zombie workflow are call reapply API - s.mockAdminClient["standby"].(*adminservicemock.MockAdminServiceClient).EXPECT().ReapplyEvents(gomock.Any(), gomock.Any()).Return(&adminservice.ReapplyEventsResponse{}, nil).Times(2) + reapplyCount := 0 for i := 0; i < 2 && s.generator.HasNextVertex(); i++ { events := s.generator.GetNextVertices() historyEvents := &historypb.History{} + reapply := false for _, event := range events { - historyEvents.Events = append(historyEvents.Events, event.GetData().(*historypb.HistoryEvent)) + historyEvent := event.GetData().(*historypb.HistoryEvent) + if historyEvent.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { + reapply = true + } + historyEvents.Events = append(historyEvents.Events, historyEvent) + } + if reapply { + reapplyCount += 1 } historyBatch = append(historyBatch, historyEvents) } + s.mockAdminClient["standby"].(*adminservicemock.MockAdminServiceClient).EXPECT().ReapplyEvents( + gomock.Any(), + gomock.Any(), + ).Return( + &adminservice.ReapplyEventsResponse{}, + nil, + ).Times(reapplyCount * 2) versionHistory = s.eventBatchesToVersionHistory(nil, historyBatch) s.applyEvents( diff --git a/host/xdc/integration_failover_test.go b/host/xdc/integration_failover_test.go index a1ff7e2df9b..c139576488a 100644 --- a/host/xdc/integration_failover_test.go +++ b/host/xdc/integration_failover_test.go @@ -160,18 +160,7 @@ func (s *integrationClustersTestSuite) TestNamespaceFailover() { s.NotNil(resp2) s.Equal(resp, resp2) - // update namespace to fail over - updateReq := &workflowservice.UpdateNamespaceRequest{ - Namespace: namespace, - ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ - ActiveClusterName: clusterName[1], - }, - } - updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq) - s.NoError(err) - s.NotNil(updateResp) - s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName()) - s.Equal(int64(2), updateResp.GetFailoverVersion()) + s.failover(namespace, clusterName[1], int64(2), client1) updated := false var resp3 *workflowservice.DescribeNamespaceResponse @@ -417,21 +406,7 @@ func (s *integrationClustersTestSuite) TestSimpleWorkflowFailover() { s.NoError(err) s.Equal("query-result", queryResultString) - // update namespace to fail over - updateReq := &workflowservice.UpdateNamespaceRequest{ - Namespace: namespace, - ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ - ActiveClusterName: clusterName[1], - }, - } - updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq) - s.NoError(err) - s.NotNil(updateResp) - s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName()) - s.Equal(int64(2), updateResp.GetFailoverVersion()) - - // wait till failover completed - time.Sleep(cacheRefreshInterval) + s.failover(namespace, clusterName[1], int64(2), client1) // check history matched getHistoryReq := &workflowservice.GetWorkflowExecutionHistoryRequest{ @@ -645,21 +620,7 @@ func (s *integrationClustersTestSuite) TestStickyWorkflowTaskFailover() { }) s.NoError(err) - // Update namespace to fail over - updateReq := &workflowservice.UpdateNamespaceRequest{ - Namespace: namespace, - ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ - ActiveClusterName: clusterName[1], - }, - } - updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq) - s.NoError(err) - s.NotNil(updateResp) - s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName()) - s.Equal(int64(2), updateResp.GetFailoverVersion()) - - // Wait for namespace cache to pick the change - time.Sleep(cacheRefreshInterval) + s.failover(namespace, clusterName[1], int64(2), client1) _, err = poller2.PollAndProcessWorkflowTaskWithAttemptAndRetry(false, false, false, true, 1, 5) s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) @@ -678,18 +639,7 @@ func (s *integrationClustersTestSuite) TestStickyWorkflowTaskFailover() { }) s.NoError(err) - // Update namespace to fail over back - updateReq = &workflowservice.UpdateNamespaceRequest{ - Namespace: namespace, - ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ - ActiveClusterName: clusterName[0], - }, - } - updateResp, err = client2.UpdateNamespace(host.NewContext(), updateReq) - s.NoError(err) - s.NotNil(updateResp) - s.Equal(clusterName[0], updateResp.ReplicationConfig.GetActiveClusterName()) - s.Equal(int64(11), updateResp.GetFailoverVersion()) + s.failover(namespace, clusterName[0], int64(11), client2) _, err = poller1.PollAndProcessWorkflowTask(true, false) s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) @@ -790,21 +740,7 @@ func (s *integrationClustersTestSuite) TestStartWorkflowExecution_Failover_Workf s.NoError(err) s.Equal(1, workflowCompleteTimes) - // update namespace to fail over - updateReq := &workflowservice.UpdateNamespaceRequest{ - Namespace: namespace, - ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ - ActiveClusterName: clusterName[1], - }, - } - updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq) - s.NoError(err) - s.NotNil(updateResp) - s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName()) - s.Equal(int64(2), updateResp.GetFailoverVersion()) - - // wait till failover completed - time.Sleep(cacheRefreshInterval) + s.failover(namespace, clusterName[1], int64(2), client1) // start the same workflow in cluster 2 is not allowed if policy is AllowDuplicateFailedOnly startReq.RequestId = uuid.New() @@ -935,21 +871,7 @@ func (s *integrationClustersTestSuite) TestTerminateFailover() { s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) s.NoError(err) - // update namespace to fail over - updateReq := &workflowservice.UpdateNamespaceRequest{ - Namespace: namespace, - ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ - ActiveClusterName: clusterName[1], - }, - } - updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq) - s.NoError(err) - s.NotNil(updateResp) - s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName()) - s.Equal(int64(2), updateResp.GetFailoverVersion()) - - // wait till failover completed - time.Sleep(cacheRefreshInterval) + s.failover(namespace, clusterName[1], int64(2), client1) // terminate workflow at cluster 2 terminateReason := "terminate reason" @@ -1148,21 +1070,7 @@ func (s *integrationClustersTestSuite) TestResetWorkflowFailover() { }) s.NoError(err) - // update namespace to fail over - updateReq := &workflowservice.UpdateNamespaceRequest{ - Namespace: namespace, - ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ - ActiveClusterName: clusterName[1], - }, - } - updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq) - s.NoError(err) - s.NotNil(updateResp) - s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName()) - s.Equal(int64(2), updateResp.GetFailoverVersion()) - - // wait till failover completed - time.Sleep(cacheRefreshInterval) + s.failover(namespace, clusterName[1], int64(2), client1) _, err = poller2.PollAndProcessWorkflowTask(false, false) s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) @@ -1298,21 +1206,7 @@ func (s *integrationClustersTestSuite) TestContinueAsNewFailover() { s.NoError(err, strconv.Itoa(i)) } - // update namespace to fail over - updateReq := &workflowservice.UpdateNamespaceRequest{ - Namespace: namespace, - ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ - ActiveClusterName: clusterName[1], - }, - } - updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq) - s.NoError(err) - s.NotNil(updateResp) - s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName()) - s.Equal(int64(2), updateResp.GetFailoverVersion()) - - // wait till failover completed - time.Sleep(cacheRefreshInterval) + s.failover(namespace, clusterName[1], int64(2), client1) // finish the rest in cluster 2 for i := 0; i < 2; i++ { @@ -1438,21 +1332,7 @@ func (s *integrationClustersTestSuite) TestSignalFailover() { s.NoError(err) s.True(eventSignaled) - // Update namespace to fail over - updateReq := &workflowservice.UpdateNamespaceRequest{ - Namespace: namespace, - ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ - ActiveClusterName: clusterName[1], - }, - } - updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq) - s.NoError(err) - s.NotNil(updateResp) - s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName()) - s.Equal(int64(2), updateResp.GetFailoverVersion()) - - // Wait for namespace cache to pick the change - time.Sleep(cacheRefreshInterval) + s.failover(namespace, clusterName[1], int64(2), client1) // check history matched getHistoryReq := &workflowservice.GetWorkflowExecutionHistoryRequest{ @@ -1656,21 +1536,7 @@ func (s *integrationClustersTestSuite) TestUserTimerFailover() { } s.True(timerCreated) - // Update namespace to fail over - updateReq := &workflowservice.UpdateNamespaceRequest{ - Namespace: namespace, - ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ - ActiveClusterName: clusterName[1], - }, - } - updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq) - s.NoError(err) - s.NotNil(updateResp) - s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName()) - s.Equal(int64(2), updateResp.GetFailoverVersion()) - - // Wait for namespace cache to pick the change - time.Sleep(cacheRefreshInterval) + s.failover(namespace, clusterName[1], int64(2), client1) for i := 1; i < 20; i++ { if !workflowCompleted { @@ -1830,21 +1696,7 @@ func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() { err = poller1.PollAndProcessActivityTask(false) s.IsType(&serviceerror.NotFound{}, err) - // Update namespace to fail over - updateReq := &workflowservice.UpdateNamespaceRequest{ - Namespace: namespace, - ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ - ActiveClusterName: clusterName[1], - }, - } - updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq) - s.NoError(err) - s.NotNil(updateResp) - s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName()) - s.Equal(int64(2), updateResp.GetFailoverVersion()) - - // Wait for namespace cache to pick the change - time.Sleep(cacheRefreshInterval) + s.failover(namespace, clusterName[1], int64(2), client1) // Make sure the heartbeat details are sent to cluster2 even when the activity at cluster1 // has heartbeat timeout. Also make sure the information is recorded when the activity state @@ -1987,21 +1839,7 @@ func (s *integrationClustersTestSuite) TestTransientWorkflowTaskFailover() { _, err = poller1.PollAndProcessWorkflowTask(false, false) s.NoError(err) - // Update namespace to fail over - updateReq := &workflowservice.UpdateNamespaceRequest{ - Namespace: namespace, - ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ - ActiveClusterName: clusterName[1], - }, - } - updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq) - s.NoError(err) - s.NotNil(updateResp) - s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName()) - s.Equal(int64(2), updateResp.GetFailoverVersion()) - - // Wait for namespace cache to pick the change - time.Sleep(cacheRefreshInterval) + s.failover(namespace, clusterName[1], int64(2), client1) // for failover transient workflow task, it is guaranteed that the transient workflow task // after the failover has attempt 1 @@ -2079,22 +1917,7 @@ func (s *integrationClustersTestSuite) TestCronWorkflowFailover() { T: s.T(), } - // Failover during backoff - // Update namespace to fail over - updateReq := &workflowservice.UpdateNamespaceRequest{ - Namespace: namespace, - ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ - ActiveClusterName: clusterName[1], - }, - } - updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq) - s.NoError(err) - s.NotNil(updateResp) - s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName()) - s.Equal(int64(2), updateResp.GetFailoverVersion()) - - // Wait for namespace cache to pick the change - time.Sleep(cacheRefreshInterval) + s.failover(namespace, clusterName[1], int64(2), client1) // Run twice to make sure cron schedule is passed to standby. for i := 0; i < 2; i++ { @@ -2187,21 +2010,7 @@ func (s *integrationClustersTestSuite) TestWorkflowRetryFailover() { T: s.T(), } - // Update namespace to fail over - updateReq := &workflowservice.UpdateNamespaceRequest{ - Namespace: namespace, - ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ - ActiveClusterName: clusterName[1], - }, - } - updateResp, err := client1.UpdateNamespace(host.NewContext(), updateReq) - s.NoError(err) - s.NotNil(updateResp) - s.Equal(clusterName[1], updateResp.ReplicationConfig.GetActiveClusterName()) - s.Equal(int64(2), updateResp.GetFailoverVersion()) - - // Wait for namespace cache to pick the change - time.Sleep(cacheRefreshInterval) + s.failover(namespace, clusterName[1], int64(2), client1) // First attempt _, err = poller2.PollAndProcessWorkflowTask(false, false) @@ -2246,3 +2055,28 @@ func (s *integrationClustersTestSuite) getHistory(client host.FrontendClient, na return events } + +func (s *integrationClustersTestSuite) failover( + namespace string, + targetCluster string, + targetFailoverVersion int64, + client host.FrontendClient, +) { + // wait for replication task propagation + time.Sleep(4 * time.Second) + + // update namespace to fail over + updateReq := &workflowservice.UpdateNamespaceRequest{ + Namespace: namespace, + ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ + ActiveClusterName: targetCluster, + }, + } + updateResp, err := client.UpdateNamespace(host.NewContext(), updateReq) + s.NoError(err) + s.Equal(targetCluster, updateResp.ReplicationConfig.GetActiveClusterName()) + s.Equal(targetFailoverVersion, updateResp.GetFailoverVersion()) + + // wait till failover completed + time.Sleep(cacheRefreshInterval) +}