Skip to content

Commit

Permalink
Fix archival activities error handling (#3227)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored and alexshtin committed Aug 15, 2022
1 parent 4b7507d commit 0bd7435
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 32 deletions.
34 changes: 16 additions & 18 deletions service/worker/archiver/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/temporal"

Expand Down Expand Up @@ -59,11 +60,8 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
sw := scope.StartTimer(metrics.ServiceLatency)
defer func() {
sw.Stop()
if err != nil {
if err.Error() == errUploadNonRetryable.Error() {
scope.IncCounter(metrics.ArchiverNonRetryableErrorCount)
}
err = temporal.NewNonRetryableApplicationError(err.Error(), "", nil)
if err == errUploadNonRetryable {
scope.IncCounter(metrics.ArchiverNonRetryableErrorCount)
}
}()
logger := tagLoggerWithHistoryRequest(tagLoggerWithActivityInfo(container.Logger, activity.GetInfo(ctx)), &request)
Expand Down Expand Up @@ -104,11 +102,8 @@ func deleteHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
sw := scope.StartTimer(metrics.ServiceLatency)
defer func() {
sw.Stop()
if err != nil {
if err.Error() == errDeleteNonRetryable.Error() {
scope.IncCounter(metrics.ArchiverNonRetryableErrorCount)
}
err = temporal.NewNonRetryableApplicationError(err.Error(), "", nil)
if err == errDeleteNonRetryable {
scope.IncCounter(metrics.ArchiverNonRetryableErrorCount)
}
}()
_, err = container.HistoryClient.DeleteWorkflowExecution(ctx, &historyservice.DeleteWorkflowExecutionRequest{
Expand All @@ -123,9 +118,15 @@ func deleteHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
if err == nil {
return nil
}
logger := tagLoggerWithHistoryRequest(tagLoggerWithActivityInfo(container.Logger, activity.GetInfo(ctx)), &request)
logger.Error("failed to delete history events", tag.Error(err))
if !common.IsPersistenceTransientError(err) {

if _, ok := err.(*serviceerror.WorkflowNotReady); !ok {
logger := tagLoggerWithHistoryRequest(tagLoggerWithActivityInfo(container.Logger, activity.GetInfo(ctx)), &request)
logger.Error("failed to delete workflow execution", tag.Error(err))
}

if !common.IsServiceTransientError(err) &&
!common.IsContextDeadlineExceededErr(err) &&
!common.IsContextCanceledErr(err) {
return errDeleteNonRetryable
}
return err
Expand All @@ -137,11 +138,8 @@ func archiveVisibilityActivity(ctx context.Context, request ArchiveRequest) (err
sw := scope.StartTimer(metrics.ServiceLatency)
defer func() {
sw.Stop()
if err != nil {
if err.Error() == errArchiveVisibilityNonRetryable.Error() {
scope.IncCounter(metrics.ArchiverNonRetryableErrorCount)
}
err = temporal.NewNonRetryableApplicationError(err.Error(), "", nil)
if err == errArchiveVisibilityNonRetryable {
scope.IncCounter(metrics.ArchiverNonRetryableErrorCount)
}
}()
logger := tagLoggerWithVisibilityRequest(tagLoggerWithActivityInfo(container.Logger, activity.GetInfo(ctx)), &request)
Expand Down
92 changes: 79 additions & 13 deletions service/worker/archiver/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ import (
"testing"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/historyservicemock/v1"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/sdk/worker"

Expand All @@ -61,8 +63,6 @@ const (

var (
testBranchToken = []byte{1, 2, 3}

errPersistenceNonRetryable = errors.New("persistence non-retryable error")
)

type activitiesSuite struct {
Expand Down Expand Up @@ -129,7 +129,10 @@ func (s *activitiesSuite) TestUploadHistory_Fail_InvalidURI() {
HistoryURI: "some invalid URI without scheme",
}
_, err := env.ExecuteActivity(uploadHistoryActivity, request)
s.Equal(errUploadNonRetryable.Error(), errors.Unwrap(err).Error())
applicationErr, ok := errors.Unwrap(err).(*temporal.ApplicationError)
s.True(ok)
s.True(applicationErr.NonRetryable())
s.Equal(errUploadNonRetryable.Error(), applicationErr.Error())
}

func (s *activitiesSuite) TestUploadHistory_Fail_GetArchiverError() {
Expand Down Expand Up @@ -161,7 +164,10 @@ func (s *activitiesSuite) TestUploadHistory_Fail_GetArchiverError() {
HistoryURI: testArchivalURI,
}
_, err := env.ExecuteActivity(uploadHistoryActivity, request)
s.Equal(errUploadNonRetryable.Error(), errors.Unwrap(err).Error())
applicationErr, ok := errors.Unwrap(err).(*temporal.ApplicationError)
s.True(ok)
s.True(applicationErr.NonRetryable())
s.Equal(errUploadNonRetryable.Error(), applicationErr.Error())
}

func (s *activitiesSuite) TestUploadHistory_Fail_ArchiveNonRetryableError() {
Expand Down Expand Up @@ -190,7 +196,10 @@ func (s *activitiesSuite) TestUploadHistory_Fail_ArchiveNonRetryableError() {
HistoryURI: testArchivalURI,
}
_, err := env.ExecuteActivity(uploadHistoryActivity, request)
s.Equal(errUploadNonRetryable.Error(), errors.Unwrap(err).Error())
applicationErr, ok := errors.Unwrap(err).(*temporal.ApplicationError)
s.True(ok)
s.True(applicationErr.NonRetryable())
s.Equal(errUploadNonRetryable.Error(), applicationErr.Error())
}

func (s *activitiesSuite) TestUploadHistory_Fail_ArchiveRetryableError() {
Expand Down Expand Up @@ -219,7 +228,10 @@ func (s *activitiesSuite) TestUploadHistory_Fail_ArchiveRetryableError() {
HistoryURI: testArchivalURI,
}
_, err := env.ExecuteActivity(uploadHistoryActivity, request)
s.Equal(testArchiveErr.Error(), errors.Unwrap(err).Error())
applicationErr, ok := errors.Unwrap(err).(*temporal.ApplicationError)
s.True(ok)
s.False(applicationErr.NonRetryable())
s.Equal(testArchiveErr.Error(), applicationErr.Error())
}

func (s *activitiesSuite) TestUploadHistory_Success() {
Expand Down Expand Up @@ -250,7 +262,46 @@ func (s *activitiesSuite) TestUploadHistory_Success() {
s.NoError(err)
}

func (s *activitiesSuite) TestDeleteHistoryActivity_Fail_DeleteFromV2NonRetryableError() {
func (s *activitiesSuite) TestDeleteHistoryActivity_Fail_RetryableError() {
s.metricsClient.EXPECT().Scope(metrics.ArchiverDeleteHistoryActivityScope, []metrics.Tag{metrics.NamespaceTag(testNamespace)}).Return(s.metricsScope)
container := &BootstrapContainer{
Logger: s.logger,
MetricsClient: s.metricsClient,
HistoryV2Manager: s.mockExecutionMgr,
HistoryClient: s.historyClient,
}
env := s.NewTestActivityEnvironment()
s.registerWorkflows(env)
env.SetWorkerOptions(worker.Options{
BackgroundActivityContext: context.WithValue(context.Background(), bootstrapContainerKey, container),
})

s.historyClient.EXPECT().DeleteWorkflowExecution(gomock.Any(), &historyservice.DeleteWorkflowExecutionRequest{
NamespaceId: testNamespaceID,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: testWorkflowID,
RunId: testRunID,
},
WorkflowVersion: testCloseFailoverVersion,
ClosedWorkflowOnly: true,
}).Return(nil, &serviceerror.WorkflowNotReady{})
request := ArchiveRequest{
NamespaceID: testNamespaceID,
Namespace: testNamespace,
WorkflowID: testWorkflowID,
RunID: testRunID,
BranchToken: testBranchToken,
NextEventID: testNextEventID,
CloseFailoverVersion: testCloseFailoverVersion,
HistoryURI: testArchivalURI,
}
_, err := env.ExecuteActivity(deleteHistoryActivity, request)
applicationErr, ok := errors.Unwrap(err).(*temporal.ApplicationError)
s.True(ok)
s.False(applicationErr.NonRetryable())
}

func (s *activitiesSuite) TestDeleteHistoryActivity_Fail_NonRetryableError() {
s.metricsClient.EXPECT().Scope(metrics.ArchiverDeleteHistoryActivityScope, []metrics.Tag{metrics.NamespaceTag(testNamespace)}).Return(s.metricsScope)
s.metricsScope.EXPECT().IncCounter(metrics.ArchiverNonRetryableErrorCount)
container := &BootstrapContainer{
Expand All @@ -272,7 +323,7 @@ func (s *activitiesSuite) TestDeleteHistoryActivity_Fail_DeleteFromV2NonRetryabl
},
WorkflowVersion: testCloseFailoverVersion,
ClosedWorkflowOnly: true,
}).Return(nil, errPersistenceNonRetryable)
}).Return(nil, &serviceerror.NotFound{})
request := ArchiveRequest{
NamespaceID: testNamespaceID,
Namespace: testNamespace,
Expand All @@ -284,7 +335,10 @@ func (s *activitiesSuite) TestDeleteHistoryActivity_Fail_DeleteFromV2NonRetryabl
HistoryURI: testArchivalURI,
}
_, err := env.ExecuteActivity(deleteHistoryActivity, request)
s.Equal(errDeleteNonRetryable.Error(), errors.Unwrap(err).Error())
applicationErr, ok := errors.Unwrap(err).(*temporal.ApplicationError)
s.True(ok)
s.True(applicationErr.NonRetryable())
s.Equal(errDeleteNonRetryable.Error(), applicationErr.Error())
}

func (s *activitiesSuite) TestArchiveVisibilityActivity_Fail_InvalidURI() {
Expand All @@ -307,7 +361,10 @@ func (s *activitiesSuite) TestArchiveVisibilityActivity_Fail_InvalidURI() {
VisibilityURI: "some invalid URI without scheme",
}
_, err := env.ExecuteActivity(archiveVisibilityActivity, request)
s.Equal(errArchiveVisibilityNonRetryable.Error(), errors.Unwrap(err).Error())
applicationErr, ok := errors.Unwrap(err).(*temporal.ApplicationError)
s.True(ok)
s.True(applicationErr.NonRetryable())
s.Equal(errArchiveVisibilityNonRetryable.Error(), applicationErr.Error())
}

func (s *activitiesSuite) TestArchiveVisibilityActivity_Fail_GetArchiverError() {
Expand All @@ -332,7 +389,10 @@ func (s *activitiesSuite) TestArchiveVisibilityActivity_Fail_GetArchiverError()
VisibilityURI: testArchivalURI,
}
_, err := env.ExecuteActivity(archiveVisibilityActivity, request)
s.Equal(errArchiveVisibilityNonRetryable.Error(), errors.Unwrap(err).Error())
applicationErr, ok := errors.Unwrap(err).(*temporal.ApplicationError)
s.True(ok)
s.True(applicationErr.NonRetryable())
s.Equal(errArchiveVisibilityNonRetryable.Error(), applicationErr.Error())
}

func (s *activitiesSuite) TestArchiveVisibilityActivity_Fail_ArchiveNonRetryableError() {
Expand All @@ -358,7 +418,10 @@ func (s *activitiesSuite) TestArchiveVisibilityActivity_Fail_ArchiveNonRetryable
VisibilityURI: testArchivalURI,
}
_, err := env.ExecuteActivity(archiveVisibilityActivity, request)
s.Equal(errArchiveVisibilityNonRetryable.Error(), errors.Unwrap(err).Error())
applicationErr, ok := errors.Unwrap(err).(*temporal.ApplicationError)
s.True(ok)
s.True(applicationErr.NonRetryable())
s.Equal(errArchiveVisibilityNonRetryable.Error(), applicationErr.Error())
}

func (s *activitiesSuite) TestArchiveVisibilityActivity_Fail_ArchiveRetryableError() {
Expand All @@ -384,7 +447,10 @@ func (s *activitiesSuite) TestArchiveVisibilityActivity_Fail_ArchiveRetryableErr
VisibilityURI: testArchivalURI,
}
_, err := env.ExecuteActivity(archiveVisibilityActivity, request)
s.Equal(testArchiveErr.Error(), errors.Unwrap(err).Error())
applicationErr, ok := errors.Unwrap(err).(*temporal.ApplicationError)
s.True(ok)
s.False(applicationErr.NonRetryable())
s.Equal(testArchiveErr.Error(), applicationErr.Error())
}

func (s *activitiesSuite) TestArchiveVisibilityActivity_Success() {
Expand Down
2 changes: 1 addition & 1 deletion service/worker/archiver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (h *handler) handleHistoryRequest(ctx workflow.Context, request *ArchiveReq
localActCtx := workflow.WithLocalActivityOptions(ctx, lao)
err = workflow.ExecuteLocalActivity(localActCtx, deleteHistoryActivity, *request).Get(localActCtx, nil)
if err != nil {
logger.Error("deleting history failed, this means zombie histories are left", tag.Error(err))
logger.Error("deleting workflow execution failed all retires, skip workflow deletion", tag.Error(err))
h.metricsClient.IncCounter(metrics.ArchiverScope, metrics.ArchiverDeleteFailedAllRetriesCount)
} else {
h.metricsClient.IncCounter(metrics.ArchiverScope, metrics.ArchiverDeleteSuccessCount)
Expand Down

0 comments on commit 0bd7435

Please sign in to comment.