Skip to content

Commit

Permalink
Improve deletenamespace workflow errors (#2909)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed May 27, 2022
1 parent 738b36f commit 4e70983
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 11 deletions.
6 changes: 5 additions & 1 deletion service/worker/deletenamespace/deleteexecutions/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ func DeleteExecutionsWorkflow(ctx workflow.Context, params DeleteExecutionsParam
}

if nextPageToken == nil {
logger.Info("Finish deleting workflow executions.", tag.WorkflowNamespace(params.Namespace.String()), tag.DeletedExecutionsCount(result.SuccessCount), tag.DeletedExecutionsErrorCount(result.ErrorCount))
if result.ErrorCount == 0 {
logger.Info("Successfully deleted workflow executions.", tag.WorkflowNamespace(params.Namespace.String()), tag.DeletedExecutionsCount(result.SuccessCount))
} else {
logger.Error("Finish deleting workflow executions with some errors.", tag.WorkflowNamespace(params.Namespace.String()), tag.DeletedExecutionsCount(result.SuccessCount), tag.DeletedExecutionsErrorCount(result.ErrorCount))
}
return result, nil
}

Expand Down
24 changes: 19 additions & 5 deletions service/worker/deletenamespace/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package errors

import (
"errors"
"fmt"

"go.temporal.io/sdk/temporal"
)
Expand All @@ -37,9 +38,22 @@ const (
)

var (
ErrUnableToExecuteActivity = errors.New("unable to execute activity")
ErrUnableToExecuteChildWorkflow = errors.New("unable to execute child workflow")
ErrExecutionsStillExist = temporal.NewApplicationError("executions are still exist", ExecutionsStillExistErrType)
ErrNoProgress = temporal.NewNonRetryableApplicationError("no progress were made", NoProgressErrType, nil)
ErrNotDeletedExecutionsStillExist = temporal.NewNonRetryableApplicationError("not deleted executions are still exist", NotDeletedExecutionsStillExistErrType, nil)
ErrUnableToExecuteActivity = errors.New("unable to execute activity")
ErrUnableToExecuteChildWorkflow = errors.New("unable to execute child workflow")
)

func NewExecutionsStillExistError(count int) error {
return temporal.NewApplicationError(fmt.Sprintf("%d executions are still exist", count), ExecutionsStillExistErrType, count)
}

func NewSomeExecutionsStillExistError() error {
return temporal.NewApplicationError("some executions are still exist", ExecutionsStillExistErrType)
}

func NewNoProgressError(count int) error {
return temporal.NewNonRetryableApplicationError(fmt.Sprintf("no progress were made: %d executions are still exist", count), NoProgressErrType, nil, count)
}

func NewNotDeletedExecutionsStillExistError(count int) error {
return temporal.NewNonRetryableApplicationError(fmt.Sprintf("%d not deleted executions are still exist", count), NotDeletedExecutionsStillExistErrType, nil, count)
}
8 changes: 4 additions & 4 deletions service/worker/deletenamespace/reclaimresources/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,18 @@ func (a *Activities) EnsureNoExecutionsAdvVisibilityActivity(ctx context.Context
// No progress were made. Something bad happened on task processor side or new executions were created during deletion.
// Return non-retryable error and workflow will try to delete executions again.
a.logger.Warn("No progress were made.", tag.WorkflowNamespace(nsName.String()), tag.Attempt(activityInfo.Attempt), tag.Counter(int(count)))
return errors.ErrNoProgress
return errors.NewNoProgressError(int(count))
}
}

a.logger.Warn("Some workflow executions still exist.", tag.WorkflowNamespace(nsName.String()), tag.Counter(int(count)))
activity.RecordHeartbeat(ctx, count)
return errors.ErrExecutionsStillExist
return errors.NewExecutionsStillExistError(int(count))
}

if notDeletedCount > 0 {
a.logger.Warn("Some workflow executions were not deleted and still exist.", tag.WorkflowNamespace(nsName.String()), tag.Counter(notDeletedCount))
return errors.ErrNotDeletedExecutionsStillExist
return errors.NewNotDeletedExecutionsStillExistError(notDeletedCount)
}

return nil
Expand All @@ -129,7 +129,7 @@ func (a *Activities) EnsureNoExecutionsStdVisibilityActivity(ctx context.Context

if len(resp.Executions) > 0 {
a.logger.Warn("Some workflow executions still exist.", tag.WorkflowNamespace(nsName.String()))
return errors.ErrExecutionsStillExist
return errors.NewSomeExecutionsStillExistError()
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func Test_ReclaimResourcesWorkflow_EnsureNoExecutionsActivity_ExecutionsStillExi

env.OnActivity(a.IsAdvancedVisibilityActivity, mock.Anything).Return(true, nil).Once()
env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace"), 0).
Return(errors.ErrExecutionsStillExist).
Return(errors.NewExecutionsStillExistError(1)).
Times(10) // GoSDK defaultMaximumAttemptsForUnitTest value.

env.ExecuteWorkflow(ReclaimResourcesWorkflow, ReclaimResourcesParams{
Expand Down

0 comments on commit 4e70983

Please sign in to comment.