Skip to content

Commit

Permalink
Limit number of delete workflow executions retries (#2768)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Apr 29, 2022
1 parent f84d6d0 commit 2a938b8
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
20 changes: 11 additions & 9 deletions service/worker/deletenamespace/reclaimresources/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@ import (
)

const (
// WorkflowName is the workflow name.
WorkflowName = "temporal-sys-reclaim-namespace-resources-workflow"

namespaceCacheRefreshDelay = 11 * time.Second

// Workflow will try to delete workflow executions (call API and wait for all executions to get deleted) maxDeleteExecutionsAttempts number of times.
// If there are still some executions left, workflow fails, and needs to be restarted manually (this indicates some serious problems with transfer/visibility task processing).
maxDeleteExecutionsAttempts = 3
)

type (
Expand Down Expand Up @@ -142,7 +145,6 @@ func ReclaimResourcesWorkflow(ctx workflow.Context, params ReclaimResourcesParam
func deleteWorkflowExecutions(ctx workflow.Context, params ReclaimResourcesParams) (ReclaimResourcesResult, error) {
var a *Activities
logger := workflow.GetLogger(ctx)
deleteAttempt := int32(1)
var result ReclaimResourcesResult

ctx1 := workflow.WithLocalActivityOptions(ctx, localActivityOptions)
Expand All @@ -152,7 +154,7 @@ func deleteWorkflowExecutions(ctx workflow.Context, params ReclaimResourcesParam
return result, fmt.Errorf("%w: IsAdvancedVisibilityActivity: %v", errors.ErrUnableToExecuteActivity, err)
}

for {
for deleteAttempt := int32(1); deleteAttempt <= maxDeleteExecutionsAttempts; deleteAttempt++ {
ctx2 := workflow.WithChildOptions(ctx, deleteExecutionsWorkflowOptions)
ctx2 = workflow.WithWorkflowID(ctx2, fmt.Sprintf("%s/%s", deleteexecutions.WorkflowName, params.Namespace))
var der deleteexecutions.DeleteExecutionsResult
Expand All @@ -166,27 +168,27 @@ func deleteWorkflowExecutions(ctx workflow.Context, params ReclaimResourcesParam

if isAdvancedVisibility {
ctx3 := workflow.WithActivityOptions(ctx, ensureNoExecutionsAdvVisibilityActivityOptions)
err = workflow.ExecuteActivity(ctx3, a.EnsureNoExecutionsAdvVisibilityActivity, params.NamespaceID, params.Namespace, isAdvancedVisibility).Get(ctx, nil)
err = workflow.ExecuteActivity(ctx3, a.EnsureNoExecutionsAdvVisibilityActivity, params.NamespaceID, params.Namespace).Get(ctx, nil)
} else {
ctx3 := workflow.WithActivityOptions(ctx, ensureNoExecutionsStdVisibilityOptionsActivity)
err = workflow.ExecuteActivity(ctx3, a.EnsureNoExecutionsStdVisibilityActivity, params.NamespaceID, params.Namespace, isAdvancedVisibility).Get(ctx, nil)
err = workflow.ExecuteActivity(ctx3, a.EnsureNoExecutionsStdVisibilityActivity, params.NamespaceID, params.Namespace).Get(ctx, nil)
}
if err == nil {
break
logger.Info("All workflow executions has been deleted successfully.", tag.WorkflowNamespace(params.Namespace.String()), tag.DeletedExecutionsCount(result.SuccessCount), tag.DeletedExecutionsErrorCount(result.ErrorCount))
return result, nil
}

var appErr *temporal.ApplicationError
if stderrors.As(err, &appErr) {
switch appErr.Type() {
case errors.ExecutionsStillExistErrType, errors.NoProgressErrType:
logger.Info("Unable to delete workflow executions. Will try again.", tag.WorkflowNamespace(params.Namespace.String()), tag.Counter(der.ErrorCount), tag.Attempt(deleteAttempt))
deleteAttempt++
continue
}
}
return result, fmt.Errorf("%w: EnsureNoExecutionsActivity: %v", errors.ErrUnableToExecuteActivity, err)
}

logger.Info("All workflow executions has been deleted successfully.", tag.WorkflowNamespace(params.Namespace.String()), tag.DeletedExecutionsCount(result.SuccessCount), tag.DeletedExecutionsErrorCount(result.ErrorCount))
return result, nil
logger.Error("Unable to delete workflow executions after maximum attempts.", tag.WorkflowNamespace(params.Namespace.String()), tag.DeletedExecutionsCount(result.SuccessCount), tag.DeletedExecutionsErrorCount(result.ErrorCount), tag.Attempt(maxDeleteExecutionsAttempts))
return result, fmt.Errorf("unable to delete workflow executions after %d attempts", maxDeleteExecutionsAttempts)
}
1 change: 0 additions & 1 deletion service/worker/deletenamespace/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
)

const (
// WorkflowName is the workflow name.
WorkflowName = "temporal-sys-delete-namespace-workflow"
)

Expand Down

0 comments on commit 2a938b8

Please sign in to comment.