Skip to content

Commit

Permalink
Execute VerifyReplicationTasks as an individual activity (#4656)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
Divide GenerateAndVerifyReplicationTasks activity into two activities:
GenerateReplicationTasks (reuse previous one) and VerifyReplicationTasks

<!-- Tell your future self why have you made these changes -->
**Why?**
Based on cluster tests, GenerateReplicationTasks is expensive (10ms
latency for `GenerateLastHistoryReplicationTasks` call). In previous
implementation, VerificationTasks runs after GenerateReplicationTasks
and we only get ~60 RPS for GenerateAndVerifyReplicationTasks. By
dividing the two, we can achieve ~100 RPS VerifyReplicationTasks for a
single activity (bottleneck is still GenerateReplicationTasks because of
10ms latency).

Also moved the special handling of WF not_found on target to
VerifyReplicationTasks, which reduced # of `DescribeMutableState` call
on source cluster. In previous implementation, `DescribeMutableState` is
called for every replication task. Now we only call
`DescribeMutableState` if WF was not found on target (which should be
rare for steady state). The downside is that we can potentially
replicate Zombie WF from source to target. But it should be avoidable by
eliminating Zombie during migration process (i.e., delete WF on target
if migration is incomplete).

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
Unit test & cluster tests. 

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**
Low, the feature is disabled by default and only affect force
replication workflow.

<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
No.
  • Loading branch information
hehaifengcn committed Jul 22, 2023
1 parent 132ed5b commit 4644dc9
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 148 deletions.
11 changes: 7 additions & 4 deletions common/metrics/metric_defs.go
Expand Up @@ -1656,10 +1656,13 @@ var (
ScheduleTerminateWorkflowErrors = NewCounterDef("schedule_terminate_workflow_errors")

// Force replication
EncounterZombieWorkflowCount = NewCounterDef("encounter_zombie_workflow_count")
CreateReplicationTasksLatency = NewTimerDef("create_replication_tasks_latency")
VerifyReplicationTaskSuccess = NewCounterDef("verify_replication_task_success")
VerifyReplicationTasksLatency = NewTimerDef("verify_replication_tasks_latency")
EncounterZombieWorkflowCount = NewCounterDef("encounter_zombie_workflow_count")
GenerateReplicationTasksLatency = NewTimerDef("generate_replication_tasks_latency")
VerifyReplicationTaskSuccess = NewCounterDef("verify_replication_task_success")
VerifyReplicationTaskNotFound = NewCounterDef("verify_replication_task_not_found")
VerifyReplicationTaskFailed = NewCounterDef("verify_replication_task_failed")
VerifyReplicationTasksLatency = NewTimerDef("verify_replication_tasks_latency")
VerifyDescribeMutableStateLatency = NewTimerDef("verify_describe_mutable_state_latency")

// Replication
NamespaceReplicationTaskAckLevelGauge = NewGaugeDef("namespace_replication_task_ack_level")
Expand Down
149 changes: 62 additions & 87 deletions service/worker/migration/activities.go
Expand Up @@ -75,30 +75,22 @@ type (

// State Diagram
//
// NOT_CREATED
// │
// │
// CREATED_TO_BE_VERIFIED
// NOT_VERIFIED
// │
// ┌────────┴─────────┐
// │ │
// VERIFIED VERIFIED_SKIPPED
const (
NOT_CREATED VerifyStatus = 0
CREATED_TO_BE_VERIFIED VerifyStatus = 1
VERIFIED VerifyStatus = 2
VERIFY_SKIPPED VerifyStatus = 3
NOT_VERIFIED VerifyStatus = 0
VERIFIED VerifyStatus = 1
VERIFY_SKIPPED VerifyStatus = 2

reasonZombieWorkflow = "Zombie workflow"
reasonWorkflowNotFound = "Workflow not found"
)

func (r VerifyResult) isNotCreated() bool {
return r.Status == NOT_CREATED
}

func (r VerifyResult) isCreatedToBeVerified() bool {
return r.Status == CREATED_TO_BE_VERIFIED
func (r VerifyResult) isNotVerified() bool {
return r.Status == NOT_VERIFIED
}

func (r VerifyResult) isVerified() bool {
Expand Down Expand Up @@ -436,6 +428,11 @@ func (a *activities) GenerateReplicationTasks(ctx context.Context, request *gene
ctx = a.setCallerInfoForGenReplicationTask(ctx, namespace.ID(request.NamespaceID))
rateLimiter := quotas.NewRateLimiter(request.RPS, int(math.Ceil(request.RPS)))

start := time.Now()
defer func() {
a.forceReplicationMetricsHandler.Timer(metrics.GenerateReplicationTasksLatency.GetMetricName()).Record(time.Since(start))
}()

startIndex := 0
if activity.HasHeartbeatDetails(ctx) {
var finishedIndex int
Expand All @@ -447,11 +444,12 @@ func (a *activities) GenerateReplicationTasks(ctx context.Context, request *gene
for i := startIndex; i < len(request.Executions); i++ {
we := request.Executions[i]
if err := a.generateWorkflowReplicationTask(ctx, rateLimiter, definition.NewWorkflowKey(request.NamespaceID, we.WorkflowId, we.RunId)); err != nil {
if _, isNotFound := err.(*serviceerror.NotFound); !isNotFound {
if !isNotFoundServiceError(err) {
a.logger.Error("force-replication failed to generate replication task", tag.WorkflowNamespaceID(request.NamespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId), tag.Error(err))
return err
}
}

activity.RecordHeartbeat(ctx, i)
}

Expand Down Expand Up @@ -550,71 +548,47 @@ func (a *activities) SeedReplicationQueueWithUserDataEntries(ctx context.Context
}
}

func (a *activities) createReplicationTasks(ctx context.Context, request *genearteAndVerifyReplicationTasksRequest, detail *replicationTasksHeartbeatDetails) error {
start := time.Now()
defer func() {
a.forceReplicationMetricsHandler.Timer(metrics.CreateReplicationTasksLatency.GetMetricName()).Record(time.Since(start))
}()
func isNotFoundServiceError(err error) bool {
_, ok := err.(*serviceerror.NotFound)
return ok
}

rateLimiter := quotas.NewRateLimiter(request.RPS, int(math.Ceil(request.RPS)))
func (a *activities) verifyHandleNotFoundWorkflow(
ctx context.Context,
namespaceID string,
we *commonpb.WorkflowExecution,
result *VerifyResult,
) error {
tags := []tag.Tag{tag.WorkflowType(forceReplicationWorkflowName), tag.WorkflowNamespaceID(namespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId)}
resp, err := a.historyClient.DescribeMutableState(ctx, &historyservice.DescribeMutableStateRequest{
NamespaceId: namespaceID,
Execution: we,
})

for i := 0; i < len(request.Executions); i++ {
r := &detail.Results[i]
if r.isCompleted() {
continue
if err != nil {
if isNotFoundServiceError(err) {
// Workflow could be deleted due to retention.
result.Status = VERIFY_SKIPPED
result.Reason = reasonWorkflowNotFound
return nil
}

we := request.Executions[i]
tags := []tag.Tag{tag.WorkflowType(forceReplicationWorkflowName), tag.WorkflowNamespaceID(request.NamespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId)}

resp, err := a.historyClient.DescribeMutableState(ctx, &historyservice.DescribeMutableStateRequest{
NamespaceId: request.NamespaceID,
Execution: &we,
})

switch err.(type) {
case nil:
if resp.GetDatabaseMutableState().GetExecutionState().GetState() == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
a.forceReplicationMetricsHandler.Counter(metrics.EncounterZombieWorkflowCount.GetMetricName()).Record(1)
a.logger.Info("createReplicationTasks skip Zombie workflow", tags...)

r.Status = VERIFY_SKIPPED
r.Reason = reasonZombieWorkflow
continue
}

// Only create replication task if it hasn't been already created
if r.isNotCreated() {
err := a.generateWorkflowReplicationTask(ctx, rateLimiter, definition.NewWorkflowKey(request.NamespaceID, we.WorkflowId, we.RunId))

switch err.(type) {
case nil:
r.Status = CREATED_TO_BE_VERIFIED
case *serviceerror.NotFound:
// rare case but in case if execution was deleted after above DescribeMutableState
r.Status = VERIFY_SKIPPED
r.Reason = reasonWorkflowNotFound
default:
a.logger.Error(fmt.Sprintf("createReplicationTasks failed to generate replication task. Error: %v", err), tags...)
return err
}
}

case *serviceerror.NotFound:
r.Status = VERIFY_SKIPPED
r.Reason = reasonWorkflowNotFound
return err
}

default:
return err
}
if resp.GetDatabaseMutableState().GetExecutionState().GetState() == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
a.forceReplicationMetricsHandler.Counter(metrics.EncounterZombieWorkflowCount.GetMetricName()).Record(1)
a.logger.Info("createReplicationTasks skip Zombie workflow", tags...)
result.Status = VERIFY_SKIPPED
result.Reason = reasonZombieWorkflow
}

return nil
}

func (a *activities) verifyReplicationTasks(
ctx context.Context,
request *genearteAndVerifyReplicationTasksRequest,
request *verifyReplicationTasksRequest,
detail *replicationTasksHeartbeatDetails,
remoteClient adminservice.AdminServiceClient,
) (verified bool, progress bool, err error) {
Expand All @@ -627,32 +601,41 @@ func (a *activities) verifyReplicationTasks(
for i := 0; i < len(request.Executions); i++ {
r := &detail.Results[i]
we := request.Executions[i]
if r.isNotCreated() {
// invalid state
return false, progress, temporal.NewNonRetryableApplicationError(fmt.Sprintf("verifyReplicationTasks: replication task for %v was not created", we), "", nil)
}

if r.isCompleted() {
continue
}

s := time.Now()
// Check if execution exists on remote cluster
_, err := remoteClient.DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{
Namespace: request.Namespace,
Execution: &we,
})
a.forceReplicationMetricsHandler.Timer(metrics.VerifyDescribeMutableStateLatency.GetMetricName()).Record(time.Since(s))

switch err.(type) {
case nil:
a.forceReplicationMetricsHandler.Counter(metrics.VerifyReplicationTaskSuccess.GetMetricName()).Record(1)
a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskSuccess.GetMetricName()).Record(1)
r.Status = VERIFIED
progress = true

case *serviceerror.NotFound:
detail.LastNotFoundWorkflowExecution = we
return false, progress, nil
a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskNotFound.GetMetricName()).Record(1)
if err := a.verifyHandleNotFoundWorkflow(ctx, request.NamespaceID, &we, r); err != nil {
return false, progress, err
}

if r.isNotVerified() {
detail.LastNotFoundWorkflowExecution = we
return false, progress, nil
}

progress = true

default:
a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace), metrics.ServiceErrorTypeTag(err)).
Counter(metrics.VerifyReplicationTaskFailed.GetMetricName()).Record(1)

return false, progress, errors.WithMessage(err, "remoteClient.DescribeMutableState call failed")
}
}
Expand All @@ -665,7 +648,7 @@ const (
defaultNoProgressNotRetryableTimeout = 15 * time.Minute
)

func (a *activities) GenerateAndVerifyReplicationTasks(ctx context.Context, request *genearteAndVerifyReplicationTasksRequest) error {
func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verifyReplicationTasksRequest) error {
ctx = headers.SetCallerInfo(ctx, headers.NewPreemptableCallerInfo(request.Namespace))
remoteClient := a.clientFactory.NewRemoteAdminClientWithTimeout(
request.TargetClusterEndpoint,
Expand All @@ -684,12 +667,6 @@ func (a *activities) GenerateAndVerifyReplicationTasks(ctx context.Context, requ
activity.RecordHeartbeat(ctx, details)
}

if err := a.createReplicationTasks(ctx, request, &details); err != nil {
return err
}

activity.RecordHeartbeat(ctx, details)

// Verify if replication tasks exist on target cluster. There are several cases where execution was not found on target cluster.
// 1. replication lag
// 2. Zombie workflow execution
Expand All @@ -704,10 +681,8 @@ func (a *activities) GenerateAndVerifyReplicationTasks(ctx context.Context, requ
// - more than NonRetryableTimeout, it means potentially we encountered #4. The activity returns
// non-retryable error and force-replication workflow will restarted.
for {
var verified, progress bool
var err error

if verified, progress, err = a.verifyReplicationTasks(ctx, request, &details, remoteClient); err != nil {
verified, progress, err := a.verifyReplicationTasks(ctx, request, &details, remoteClient)
if err != nil {
return err
}

Expand Down

0 comments on commit 4644dc9

Please sign in to comment.