From 194a8938d25d182955099760a1466305a425f2aa Mon Sep 17 00:00:00 2001 From: Haifeng He Date: Fri, 11 Aug 2023 15:36:00 -0700 Subject: [PATCH] Skip verifying workflow which has already passed retention time (#4770) **What changed?** Previous logic skip workflow of which retention is within a range of current time. But we actually just need to skip workflow already passed retention time. **Why?** **How did you test it?** **Potential risks** **Is hotfix candidate?** --- common/metrics/metric_defs.go | 18 ++++++++-------- service/worker/migration/activities.go | 10 ++++----- service/worker/migration/activities_test.go | 24 --------------------- 3 files changed, 13 insertions(+), 39 deletions(-) diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index bda42d7d5ae..3272e222fed 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1658,15 +1658,15 @@ var ( ScheduleTerminateWorkflowErrors = NewCounterDef("schedule_terminate_workflow_errors") // Force replication - EncounterZombieWorkflowCount = NewCounterDef("encounter_zombie_workflow_count") - EncounterNotFoundWorkflowCount = NewCounterDef("encounter_not_found_workflow_count") - EncounterCloseToRetentionWorkflowCount = NewCounterDef("encounter_close_to_retention_workflow_count") - GenerateReplicationTasksLatency = NewTimerDef("generate_replication_tasks_latency") - VerifyReplicationTaskSuccess = NewCounterDef("verify_replication_task_success") - VerifyReplicationTaskNotFound = NewCounterDef("verify_replication_task_not_found") - VerifyReplicationTaskFailed = NewCounterDef("verify_replication_task_failed") - VerifyReplicationTasksLatency = NewTimerDef("verify_replication_tasks_latency") - VerifyDescribeMutableStateLatency = NewTimerDef("verify_describe_mutable_state_latency") + EncounterZombieWorkflowCount = NewCounterDef("encounter_zombie_workflow_count") + EncounterNotFoundWorkflowCount = NewCounterDef("encounter_not_found_workflow_count") + EncounterPassRetentionWorkflowCount = NewCounterDef("encounter_pass_retention_workflow_count") + GenerateReplicationTasksLatency = NewTimerDef("generate_replication_tasks_latency") + VerifyReplicationTaskSuccess = NewCounterDef("verify_replication_task_success") + VerifyReplicationTaskNotFound = NewCounterDef("verify_replication_task_not_found") + VerifyReplicationTaskFailed = NewCounterDef("verify_replication_task_failed") + VerifyReplicationTasksLatency = NewTimerDef("verify_replication_tasks_latency") + VerifyDescribeMutableStateLatency = NewTimerDef("verify_describe_mutable_state_latency") // Replication NamespaceReplicationTaskAckLevelGauge = NewGaugeDef("namespace_replication_task_ack_level") diff --git a/service/worker/migration/activities.go b/service/worker/migration/activities.go index 88413a3961d..3622fd1c77b 100644 --- a/service/worker/migration/activities.go +++ b/service/worker/migration/activities.go @@ -572,13 +572,11 @@ func (a *activities) canSkipWorkflowExecution( return true, reasonZombieWorkflow, nil } - // source and target cluster handles workflow retention separately. For a workflow which is abort to be deleted, - // it may be already deleted on target cluster but still exist on source, which cause verification delay. - // Here, we skip workflow of which retention time is close to current time to continue the verification. + // Skip verifying workflow which has already passed retention time. if closeTime := resp.GetDatabaseMutableState().GetExecutionInfo().GetCloseTime(); closeTime != nil && ns != nil && ns.Retention() > 0 { deleteTime := closeTime.Add(ns.Retention()) - if isCloseToCurrentTime(deleteTime, request.RetentionBiasDuration) { - a.forceReplicationMetricsHandler.Counter(metrics.EncounterCloseToRetentionWorkflowCount.GetMetricName()).Record(1) + if deleteTime.Before(time.Now()) { + a.forceReplicationMetricsHandler.Counter(metrics.EncounterPassRetentionWorkflowCount.GetMetricName()).Record(1) return true, reasonWorkflowCloseToRetention, nil } } @@ -644,7 +642,7 @@ func (a *activities) verifyReplicationTasks( } const ( - defaultNoProgressNotRetryableTimeout = 15 * time.Minute + defaultNoProgressNotRetryableTimeout = 30 * time.Minute ) func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verifyReplicationTasksRequest) (verifyReplicationTasksResponse, error) { diff --git a/service/worker/migration/activities_test.go b/service/worker/migration/activities_test.go index aac853244a5..918041c785e 100644 --- a/service/worker/migration/activities_test.go +++ b/service/worker/migration/activities_test.go @@ -451,14 +451,6 @@ func (s *activitiesSuite) Test_verifyReplicationTasks() { } } -// Now -// │ │ bias │ -// ───────────────────┼────────▼─────────┼────── -// closeTime │ Skip Range │ -// │ deleteTime -// └───────────────────┘ -// retention - func (s *activitiesSuite) Test_verifyReplicationTasksSkipRetention() { bias := time.Minute request := verifyReplicationTasksRequest{ @@ -479,14 +471,6 @@ func (s *activitiesSuite) Test_verifyReplicationTasksSkipRetention() { }, { 30 * time.Second, - true, - }, - { - -(bias + time.Minute), - false, - }, - { - bias + time.Minute, false, }, } @@ -618,11 +602,3 @@ func (s *activitiesSuite) TestGenerateReplicationTasks_Failed() { // Only the generation of 1st execution suceeded. s.Equal(0, lastHeartBeat) } - -func (s *activitiesSuite) Test_isCloseToCurrentTime() { - d := time.Minute - now := time.Now() - s.True(isCloseToCurrentTime(now, d)) - s.False(isCloseToCurrentTime(now.Add(2*time.Minute), d)) - s.False(isCloseToCurrentTime(now.Add(-2*time.Minute), d)) -}