Skip to content

Commit

Permalink
Remove custom retries for ReclaimResourcesWorkflow (#2865)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed May 19, 2022
1 parent 29de7f9 commit b8b58b1
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 61 deletions.
19 changes: 13 additions & 6 deletions service/worker/deletenamespace/deleteexecutions/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package deleteexecutions

import (
"context"
stderrors "errors"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -211,8 +212,12 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_ActivityError(t *testing.T) {

var a *Activities

env.OnActivity(a.GetNextPageTokenActivity, mock.Anything, mock.Anything).Return([]byte{3, 22, 83}, nil).Once()
env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, mock.Anything).Return(DeleteExecutionsActivityResult{}, serviceerror.NewUnavailable("random error")).Once()
env.OnActivity(a.GetNextPageTokenActivity, mock.Anything, mock.Anything).
Return([]byte{3, 22, 83}, nil).
Times(40) // GoSDK defaultMaximumAttemptsForUnitTest value * defaultConcurrentDeleteExecutionsActivities.
env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, mock.Anything).
Return(DeleteExecutionsActivityResult{}, serviceerror.NewUnavailable("specific_error_from_activity")).
Times(40) // GoSDK defaultMaximumAttemptsForUnitTest value * defaultConcurrentDeleteExecutionsActivities.

env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{
NamespaceID: "namespace-id",
Expand All @@ -223,10 +228,12 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_ActivityError(t *testing.T) {
})

require.True(t, env.IsWorkflowCompleted())
wfErr := env.GetWorkflowError()
require.Error(t, wfErr)
var errApplication *temporal.ApplicationError
require.ErrorAs(t, wfErr, &errApplication)
err := env.GetWorkflowError()
require.Error(t, err)
var appErr *temporal.ApplicationError
require.True(t, stderrors.As(err, &appErr))
require.Contains(t, appErr.Error(), "unable to execute activity: DeleteExecutionsActivity")
require.Contains(t, appErr.Error(), "specific_error_from_activity")
}

func Test_DeleteExecutionsWorkflow_NoActivityMocks_ManyExecutions(t *testing.T) {
Expand Down
60 changes: 26 additions & 34 deletions service/worker/deletenamespace/reclaimresources/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ const (
WorkflowName = "temporal-sys-reclaim-namespace-resources-workflow"

namespaceCacheRefreshDelay = 11 * time.Second

// Workflow will try to delete workflow executions (call API and wait for all executions to get deleted) maxDeleteExecutionsAttempts number of times.
// If there are still some executions left, workflow fails, and needs to be restarted manually (this indicates some serious problems with transfer/visibility task processing).
maxDeleteExecutionsAttempts = 3
)

type (
Expand All @@ -53,8 +49,9 @@ type (
}

ReclaimResourcesResult struct {
SuccessCount int
ErrorCount int
SuccessCount int
ErrorCount int
NamespaceDeleted bool
}
)

Expand Down Expand Up @@ -138,6 +135,7 @@ func ReclaimResourcesWorkflow(ctx workflow.Context, params ReclaimResourcesParam
return result, fmt.Errorf("%w: DeleteNamespaceActivity: %v", errors.ErrUnableToExecuteActivity, err)
}

result.NamespaceDeleted = true
logger.Info("Workflow finished successfully.", tag.WorkflowType(WorkflowName))
return result, nil
}
Expand All @@ -154,42 +152,36 @@ func deleteWorkflowExecutions(ctx workflow.Context, params ReclaimResourcesParam
return result, fmt.Errorf("%w: IsAdvancedVisibilityActivity: %v", errors.ErrUnableToExecuteActivity, err)
}

for deleteAttempt := int32(1); deleteAttempt <= maxDeleteExecutionsAttempts; deleteAttempt++ {
ctx2 := workflow.WithChildOptions(ctx, deleteExecutionsWorkflowOptions)
ctx2 = workflow.WithWorkflowID(ctx2, fmt.Sprintf("%s/%s", deleteexecutions.WorkflowName, params.Namespace))
var der deleteexecutions.DeleteExecutionsResult
err := workflow.ExecuteChildWorkflow(ctx2, deleteexecutions.DeleteExecutionsWorkflow, params.DeleteExecutionsParams).Get(ctx, &der)
if err != nil {
logger.Error("Unable to execute child workflow.", tag.WorkflowType(deleteexecutions.WorkflowName), tag.Error(err))
return result, fmt.Errorf("%w: %s: %v", errors.ErrUnableToExecuteChildWorkflow, deleteexecutions.WorkflowName, err)
}
// Accumulate total success count but only last error count to avoid double counting errors from the same workflow executions.
result.SuccessCount += der.SuccessCount
result.ErrorCount = der.ErrorCount

if isAdvancedVisibility {
ctx3 := workflow.WithActivityOptions(ctx, ensureNoExecutionsAdvVisibilityActivityOptions)
err = workflow.ExecuteActivity(ctx3, a.EnsureNoExecutionsAdvVisibilityActivity, params.NamespaceID, params.Namespace, der.ErrorCount).Get(ctx, nil)
} else {
ctx3 := workflow.WithActivityOptions(ctx, ensureNoExecutionsStdVisibilityOptionsActivity)
err = workflow.ExecuteActivity(ctx3, a.EnsureNoExecutionsStdVisibilityActivity, params.NamespaceID, params.Namespace).Get(ctx, nil)
}
if err == nil {
logger.Info("All workflow executions has been deleted successfully.", tag.WorkflowNamespace(params.Namespace.String()), tag.DeletedExecutionsCount(result.SuccessCount), tag.DeletedExecutionsErrorCount(result.ErrorCount))
return result, nil
}
ctx2 := workflow.WithChildOptions(ctx, deleteExecutionsWorkflowOptions)
ctx2 = workflow.WithWorkflowID(ctx2, fmt.Sprintf("%s/%s", deleteexecutions.WorkflowName, params.Namespace))
var der deleteexecutions.DeleteExecutionsResult
err = workflow.ExecuteChildWorkflow(ctx2, deleteexecutions.DeleteExecutionsWorkflow, params.DeleteExecutionsParams).Get(ctx, &der)
if err != nil {
logger.Error("Unable to execute child workflow.", tag.WorkflowType(deleteexecutions.WorkflowName), tag.Error(err))
return result, fmt.Errorf("%w: %s: %v", errors.ErrUnableToExecuteChildWorkflow, deleteexecutions.WorkflowName, err)
}
result.SuccessCount = der.SuccessCount
result.ErrorCount = der.ErrorCount

if isAdvancedVisibility {
ctx3 := workflow.WithActivityOptions(ctx, ensureNoExecutionsAdvVisibilityActivityOptions)
err = workflow.ExecuteActivity(ctx3, a.EnsureNoExecutionsAdvVisibilityActivity, params.NamespaceID, params.Namespace, der.ErrorCount).Get(ctx, nil)
} else {
ctx3 := workflow.WithActivityOptions(ctx, ensureNoExecutionsStdVisibilityOptionsActivity)
err = workflow.ExecuteActivity(ctx3, a.EnsureNoExecutionsStdVisibilityActivity, params.NamespaceID, params.Namespace).Get(ctx, nil)
}
if err != nil {
var appErr *temporal.ApplicationError
if stderrors.As(err, &appErr) {
switch appErr.Type() {
case errors.ExecutionsStillExistErrType, errors.NoProgressErrType, errors.NotDeletedExecutionsStillExistErrType:
logger.Info("Unable to delete workflow executions. Will try again.", tag.WorkflowNamespace(params.Namespace.String()), tag.Counter(der.ErrorCount), tag.Attempt(deleteAttempt))
continue
logger.Info("Unable to delete workflow executions.", tag.WorkflowNamespace(params.Namespace.String()), tag.Counter(der.ErrorCount))
return result, err
}
}
return result, fmt.Errorf("%w: EnsureNoExecutionsActivity: %v", errors.ErrUnableToExecuteActivity, err)
}

logger.Error("Unable to delete workflow executions after maximum attempts.", tag.WorkflowNamespace(params.Namespace.String()), tag.DeletedExecutionsCount(result.SuccessCount), tag.DeletedExecutionsErrorCount(result.ErrorCount), tag.Attempt(maxDeleteExecutionsAttempts))
return result, fmt.Errorf("unable to delete workflow executions after %d attempts", maxDeleteExecutionsAttempts)
logger.Info("All workflow executions has been deleted successfully.", tag.WorkflowNamespace(params.Namespace.String()), tag.DeletedExecutionsCount(result.SuccessCount), tag.DeletedExecutionsErrorCount(result.ErrorCount))
return result, nil
}
98 changes: 80 additions & 18 deletions service/worker/deletenamespace/reclaimresources/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/testsuite"

"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -113,7 +114,9 @@ func Test_ReclaimResourcesWorkflow_EnsureNoExecutionsActivity_Error(t *testing.T
}, nil).Once()

env.OnActivity(a.IsAdvancedVisibilityActivity, mock.Anything).Return(true, nil).Once()
env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace")).Return(stderrors.New("random error")).Once()
env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace"), 0).
Return(stderrors.New("specific_error_from_activity")).
Times(10) // GoSDK defaultMaximumAttemptsForUnitTest value.

env.ExecuteWorkflow(ReclaimResourcesWorkflow, ReclaimResourcesParams{
DeleteExecutionsParams: deleteexecutions.DeleteExecutionsParams{
Expand All @@ -126,7 +129,10 @@ func Test_ReclaimResourcesWorkflow_EnsureNoExecutionsActivity_Error(t *testing.T
})

require.True(t, env.IsWorkflowCompleted())
require.Error(t, env.GetWorkflowError())
err := env.GetWorkflowError()
require.Error(t, err)
require.Contains(t, err.Error(), "unable to execute activity: EnsureNoExecutionsActivity")
require.Contains(t, err.Error(), "specific_error_from_activity")
}

func Test_ReclaimResourcesWorkflow_EnsureNoExecutionsActivity_ExecutionsStillExist(t *testing.T) {
Expand All @@ -153,15 +159,9 @@ func Test_ReclaimResourcesWorkflow_EnsureNoExecutionsActivity_ExecutionsStillExi
}, nil).Once()

env.OnActivity(a.IsAdvancedVisibilityActivity, mock.Anything).Return(true, nil).Once()
errorCount := 0
env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace")).
Return(func(_ context.Context, namespaceID namespace.ID, namespaceName namespace.Name) error {
errorCount++
if errorCount < 20 {
return errors.ErrExecutionsStillExist
}
return stderrors.New("random error")
}).Once()
env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace"), 0).
Return(errors.ErrExecutionsStillExist).
Times(10) // GoSDK defaultMaximumAttemptsForUnitTest value.

env.ExecuteWorkflow(ReclaimResourcesWorkflow, ReclaimResourcesParams{
DeleteExecutionsParams: deleteexecutions.DeleteExecutionsParams{
Expand All @@ -174,7 +174,10 @@ func Test_ReclaimResourcesWorkflow_EnsureNoExecutionsActivity_ExecutionsStillExi
})

require.True(t, env.IsWorkflowCompleted())
require.Error(t, env.GetWorkflowError())
err := env.GetWorkflowError()
var appErr *temporal.ApplicationError
require.True(t, stderrors.As(err, &appErr))
require.Equal(t, errors.ExecutionsStillExistErrType, appErr.Type())
}

func Test_ReclaimResourcesWorkflow_NoActivityMocks_Success(t *testing.T) {
Expand All @@ -190,17 +193,17 @@ func Test_ReclaimResourcesWorkflow_NoActivityMocks_Success(t *testing.T) {
NamespaceID: "namespace-id",
Namespace: "namespace",
}).DoAndReturn(func(_ context.Context, request *manager.CountWorkflowExecutionsRequest) (*manager.CountWorkflowExecutionsResponse, error) {
if countWorkflowExecutionsCallTimes == 9 {
if countWorkflowExecutionsCallTimes == 8 {
return &manager.CountWorkflowExecutionsResponse{
Count: 0,
}, nil
}
countWorkflowExecutionsCallTimes++
// Return same 1 eight times to emulate ErrNoProgress.
// Return same "1" 8 times to emulate ErrNoProgress.
return &manager.CountWorkflowExecutionsResponse{
Count: 1,
}, nil
}).Times(9)
}).Times(8)

metadataManager := persistence.NewMockMetadataManager(ctrl)
metadataManager.EXPECT().DeleteNamespaceByName(gomock.Any(), &persistence.DeleteNamespaceByNameRequest{
Expand Down Expand Up @@ -233,7 +236,7 @@ func Test_ReclaimResourcesWorkflow_NoActivityMocks_Success(t *testing.T) {
}).Return(deleteexecutions.DeleteExecutionsResult{
SuccessCount: 10,
ErrorCount: 0,
}, nil).Twice()
}, nil)

env.ExecuteWorkflow(ReclaimResourcesWorkflow, ReclaimResourcesParams{
DeleteExecutionsParams: deleteexecutions.DeleteExecutionsParams{
Expand All @@ -245,11 +248,70 @@ func Test_ReclaimResourcesWorkflow_NoActivityMocks_Success(t *testing.T) {
},
})

ctrl.Finish()
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
var result ReclaimResourcesResult
require.NoError(t, env.GetWorkflowResult(&result))
require.Equal(t, 0, result.ErrorCount)
require.Equal(t, 20, result.SuccessCount) // DeleteExecutionsWorkflow is called twice (10+10).
require.Equal(t, 10, result.SuccessCount)
}

func Test_ReclaimResourcesWorkflow_NoActivityMocks_NoProgressMade(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

ctrl := gomock.NewController(t)
visibilityManager := manager.NewMockVisibilityManager(ctrl)
visibilityManager.EXPECT().GetName().Return("elasticsearch")

visibilityManager.EXPECT().CountWorkflowExecutions(gomock.Any(), &manager.CountWorkflowExecutionsRequest{
NamespaceID: "namespace-id",
Namespace: "namespace",
}).Return(&manager.CountWorkflowExecutionsResponse{
Count: 1,
}, nil).
Times(8)

a := &Activities{
visibilityManager: visibilityManager,
metricsClient: metrics.NoopClient,
logger: log.NewNoopLogger(),
}

env.RegisterActivity(a.IsAdvancedVisibilityActivity)
env.RegisterActivity(a.EnsureNoExecutionsAdvVisibilityActivity)

env.RegisterWorkflow(deleteexecutions.DeleteExecutionsWorkflow)
env.OnWorkflow(deleteexecutions.DeleteExecutionsWorkflow, mock.Anything, deleteexecutions.DeleteExecutionsParams{
Namespace: "namespace",
NamespaceID: "namespace-id",
Config: deleteexecutions.DeleteExecutionsConfig{
DeleteActivityRPS: 100,
PageSize: 1000,
PagesPerExecutionCount: 256,
ConcurrentDeleteExecutionsActivities: 4,
},
PreviousSuccessCount: 0,
PreviousErrorCount: 0,
}).Return(deleteexecutions.DeleteExecutionsResult{
SuccessCount: 10,
ErrorCount: 0,
}, nil).Twice()

env.ExecuteWorkflow(ReclaimResourcesWorkflow, ReclaimResourcesParams{
DeleteExecutionsParams: deleteexecutions.DeleteExecutionsParams{
Namespace: "namespace",
NamespaceID: "namespace-id",
Config: deleteexecutions.DeleteExecutionsConfig{},
PreviousSuccessCount: 0,
PreviousErrorCount: 0,
},
})

require.True(t, env.IsWorkflowCompleted())
err := env.GetWorkflowError()
require.Error(t, err)
var appErr *temporal.ApplicationError
require.True(t, stderrors.As(err, &appErr))
require.Equal(t, errors.NoProgressErrType, appErr.Type())
}
13 changes: 10 additions & 3 deletions service/worker/deletenamespace/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,26 @@ type (
)

var (
retryPolicy = &temporal.RetryPolicy{
localRetryPolicy = &temporal.RetryPolicy{
InitialInterval: 1 * time.Second,
MaximumInterval: 10 * time.Second,
}

reclaimResourcesWorkflowRetryPolicy = &temporal.RetryPolicy{
InitialInterval: 60 * time.Second,
// ReclaimResourcesWorkflow will try to delete workflow executions (call `DeleteWorkflowExecution` and wait for all executions to be deleted) 3 times.
// If there are still executions left, ReclaimResourcesWorkflow fails, and needs to be restarted manually (this indicates some serious problems with transfer/visibility task processing).
MaximumAttempts: 3,
}

localActivityOptions = workflow.LocalActivityOptions{
RetryPolicy: retryPolicy,
RetryPolicy: localRetryPolicy,
StartToCloseTimeout: 30 * time.Second,
ScheduleToCloseTimeout: 5 * time.Minute,
}

reclaimResourcesWorkflowOptions = workflow.ChildWorkflowOptions{
RetryPolicy: retryPolicy,
RetryPolicy: reclaimResourcesWorkflowRetryPolicy,
// Important: this is required to make sure the child workflow is not terminated when delete namespace workflow is completed.
ParentClosePolicy: enumspb.PARENT_CLOSE_POLICY_ABANDON,
}
Expand Down

0 comments on commit b8b58b1

Please sign in to comment.