Skip to content

Commit

Permalink
Fail force-replication early if any GenerateReplicationTasks/Generate…
Browse files Browse the repository at this point in the history
…AndVerifyReplicationTasks activity returns error
  • Loading branch information
hehaifengcn committed Jul 18, 2023
1 parent 45d340d commit ca5bf4e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
2 changes: 1 addition & 1 deletion service/worker/migration/activities.go
Expand Up @@ -645,7 +645,7 @@ func (a *activities) verifyReplicationTasks(
return false, progress, nil

default:
return false, progress, err
return false, progress, fmt.Errorf("remoteClient.DescribeMutableState failed. error: %v", err)
}
}

Expand Down
10 changes: 9 additions & 1 deletion service/worker/migration/force_replication_workflow.go
Expand Up @@ -371,6 +371,7 @@ func enqueueReplicationTasks(ctx workflow.Context, workflowExecutionsCh workflow
var a *activities
var futures []workflow.Future
var workflowExecutions []commonpb.WorkflowExecution
var lastActivityErr error

for workflowExecutionsCh.Receive(ctx, &workflowExecutions) {
var replicationTaskFuture workflow.Future
Expand All @@ -394,10 +395,17 @@ func enqueueReplicationTasks(ctx workflow.Context, workflowExecutionsCh workflow
pendingActivities++
selector.AddFuture(replicationTaskFuture, func(f workflow.Future) {
pendingActivities--

if err := f.Get(ctx, nil); err != nil {
lastActivityErr = err
}
})

if pendingActivities == params.ConcurrentActivityCount {
if pendingActivities >= params.ConcurrentActivityCount {
selector.Select(ctx) // this will block until one of the in-flight activities completes
if lastActivityErr != nil {
return lastActivityErr
}
}

futures = append(futures, replicationTaskFuture)
Expand Down

0 comments on commit ca5bf4e

Please sign in to comment.