Skip to content

Commit

Permalink
Truncate activity failure in mutable state (#4338)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed May 15, 2023
1 parent a05d21c commit 9d7545b
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 46 deletions.
5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ const (
HistoryCountLimitError = "limit.historyCount.error"
// HistoryCountLimitWarn is the per workflow execution history event count limit for warning
HistoryCountLimitWarn = "limit.historyCount.warn"
// MutableStateActivityFailureSizeLimitError is the per activity failure size limit for workflow mutable state.
// If exceeded, failure will be truncated before being stored in mutable state.
MutableStateActivityFailureSizeLimitError = "limit.mutableStateActivityFailureSize.error"
// MutableStateActivityFailureSizeLimitWarn is the per activity failure size warning limit for workflow mutable state
MutableStateActivityFailureSizeLimitWarn = "limit.mutableStateActivityFailureSize.warn"
// HistoryCountSuggestContinueAsNew is the workflow execution history event count limit to
// suggest continue-as-new (in workflow task started event)
HistoryCountSuggestContinueAsNew = "limit.historyCount.suggestContinueAsNew"
Expand Down
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,11 @@ func WorkflowTaskQueueName(taskQueueName string) ZapTag {

// size limit

// BlobSize returns tag for BlobSize
func BlobSize(blobSize int64) ZapTag {
return NewInt64("blob-size", blobSize)
}

// WorkflowSize returns tag for WorkflowSize
func WorkflowSize(workflowSize int64) ZapTag {
return NewInt64("wf-size", workflowSize)
Expand Down
60 changes: 32 additions & 28 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,20 +182,22 @@ type Config struct {
DurableArchivalEnabled dynamicconfig.BoolPropertyFn

// Size limit related settings
BlobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
BlobSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
MemoSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
MemoSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
HistorySizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
HistorySizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
HistorySizeSuggestContinueAsNew dynamicconfig.IntPropertyFnWithNamespaceFilter
HistoryCountLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
HistoryCountLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
HistoryCountSuggestContinueAsNew dynamicconfig.IntPropertyFnWithNamespaceFilter
NumPendingChildExecutionsLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
NumPendingActivitiesLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
NumPendingSignalsLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
NumPendingCancelsRequestLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
BlobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
BlobSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
MemoSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
MemoSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
HistorySizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
HistorySizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
HistorySizeSuggestContinueAsNew dynamicconfig.IntPropertyFnWithNamespaceFilter
HistoryCountLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
HistoryCountLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
HistoryCountSuggestContinueAsNew dynamicconfig.IntPropertyFnWithNamespaceFilter
MutableStateActivityFailureSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
MutableStateActivityFailureSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
NumPendingChildExecutionsLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
NumPendingActivitiesLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
NumPendingSignalsLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
NumPendingCancelsRequestLimit dynamicconfig.IntPropertyFnWithNamespaceFilter

// DefaultActivityRetryOptions specifies the out-of-box retry policy if
// none is configured on the Activity by the user.
Expand Down Expand Up @@ -440,20 +442,22 @@ func NewConfig(
ArchiveSignalTimeout: dc.GetDurationProperty(dynamicconfig.ArchiveSignalTimeout, 300*time.Millisecond),
DurableArchivalEnabled: dc.GetBoolProperty(dynamicconfig.DurableArchivalEnabled, true),

BlobSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitError, 2*1024*1024),
BlobSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitWarn, 512*1024),
MemoSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MemoSizeLimitError, 2*1024*1024),
MemoSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MemoSizeLimitWarn, 2*1024),
NumPendingChildExecutionsLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingChildExecutionsLimitError, 2000),
NumPendingActivitiesLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingActivitiesLimitError, 2000),
NumPendingSignalsLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingSignalsLimitError, 2000),
NumPendingCancelsRequestLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingCancelRequestsLimitError, 2000),
HistorySizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeLimitError, 50*1024*1024),
HistorySizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeLimitWarn, 10*1024*1024),
HistorySizeSuggestContinueAsNew: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeSuggestContinueAsNew, 4*1024*1024),
HistoryCountLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountLimitError, 50*1024),
HistoryCountLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountLimitWarn, 10*1024),
HistoryCountSuggestContinueAsNew: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountSuggestContinueAsNew, 4*1024),
BlobSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitError, 2*1024*1024),
BlobSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitWarn, 512*1024),
MemoSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MemoSizeLimitError, 2*1024*1024),
MemoSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MemoSizeLimitWarn, 2*1024),
NumPendingChildExecutionsLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingChildExecutionsLimitError, 2000),
NumPendingActivitiesLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingActivitiesLimitError, 2000),
NumPendingSignalsLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingSignalsLimitError, 2000),
NumPendingCancelsRequestLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingCancelRequestsLimitError, 2000),
HistorySizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeLimitError, 50*1024*1024),
HistorySizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeLimitWarn, 10*1024*1024),
HistorySizeSuggestContinueAsNew: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeSuggestContinueAsNew, 4*1024*1024),
HistoryCountLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountLimitError, 50*1024),
HistoryCountLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountLimitWarn, 10*1024),
HistoryCountSuggestContinueAsNew: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountSuggestContinueAsNew, 4*1024),
MutableStateActivityFailureSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MutableStateActivityFailureSizeLimitError, 4*1024),
MutableStateActivityFailureSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MutableStateActivityFailureSizeLimitWarn, 2*1024),

ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.HistoryThrottledLogRPS, 4),
EnableStickyQuery: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableStickyQuery, true),
Expand Down
38 changes: 37 additions & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/enums"
"go.temporal.io/server/common/failure"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -3972,7 +3973,7 @@ func (ms *MutableStateImpl) RetryActivity(
ai.StartedTime = timestamp.TimePtr(time.Time{})
ai.TimerTaskStatus = TimerTaskStatusNone
ai.RetryLastWorkerIdentity = ai.StartedIdentity
ai.RetryLastFailure = failure
ai.RetryLastFailure = ms.truncateRetryableActivityFailure(failure)

if err := ms.taskGenerator.GenerateActivityRetryTasks(
ai.ScheduledEventId,
Expand All @@ -3985,6 +3986,41 @@ func (ms *MutableStateImpl) RetryActivity(
return enumspb.RETRY_STATE_IN_PROGRESS, nil
}

func (ms *MutableStateImpl) truncateRetryableActivityFailure(
activityFailure *failurepb.Failure,
) *failurepb.Failure {
namespaceName := ms.namespaceEntry.Name().String()
failureSize := activityFailure.Size()

if failureSize <= ms.config.MutableStateActivityFailureSizeLimitWarn(namespaceName) {
return activityFailure
}

throttledLogger := log.With(
ms.shard.GetThrottledLogger(),
tag.WorkflowNamespace(namespaceName),
tag.WorkflowID(ms.executionInfo.WorkflowId),
tag.WorkflowRunID(ms.executionState.RunId),
tag.BlobSize(int64(failureSize)),
tag.BlobSizeViolationOperation("RetryActivity"),
)

sizeLimitError := ms.config.MutableStateActivityFailureSizeLimitError(namespaceName)
if failureSize <= sizeLimitError {
throttledLogger.Warn("Activity failure size exceeds warning limit for mutable state.")
return activityFailure
}

throttledLogger.Warn("Activity failure size exceeds error limit for mutable state, truncated.")

// nonRetryable is set to false here as only retryable failures are recorded in mutable state.
// also when this method is called, the check for isRetryable is already done, so the value
// is only for visibility/debugging purpose.
serverFailure := failure.NewServerFailure(common.FailureReasonFailureExceedsLimit, false)
serverFailure.Cause = failure.Truncate(activityFailure, sizeLimitError)
return serverFailure
}

// TODO mutable state should generate corresponding transfer / timer tasks according to
// updates accumulated, while currently all transfer / timer tasks are managed manually

Expand Down
88 changes: 71 additions & 17 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
Expand Down Expand Up @@ -110,6 +111,8 @@ func (s *mutableStateSuite) SetupTest() {
// set the checksum probabilities to 100% for exercising during test
s.mockConfig.MutableStateChecksumGenProbability = func(namespace string) int { return 100 }
s.mockConfig.MutableStateChecksumVerifyProbability = func(namespace string) int { return 100 }
s.mockConfig.MutableStateActivityFailureSizeLimitWarn = func(namespace string) int { return 1 * 1024 }
s.mockConfig.MutableStateActivityFailureSizeLimitError = func(namespace string) int { return 2 * 1024 }
s.mockShard.SetEventsCacheForTesting(s.mockEventsCache)

s.testScope = s.mockShard.Resource.MetricsScope.(tally.TestScope)
Expand Down Expand Up @@ -904,49 +907,41 @@ func (s *mutableStateSuite) TestReplicateActivityTaskStartedEvent() {
}

func (s *mutableStateSuite) TestTotalEntitiesCount() {
namespaceEntry := s.newNamespaceCacheEntry()
mutableState := TestLocalMutableState(
s.mockShard,
s.mockEventsCache,
namespaceEntry,
s.logger,
uuid.New(),
)
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()

// scheduling, starting & completing workflow task is omitted here

workflowTaskCompletedEventID := int64(4)
_, _, err := mutableState.AddActivityTaskScheduledEvent(
_, _, err := s.mutableState.AddActivityTaskScheduledEvent(
workflowTaskCompletedEventID,
&commandpb.ScheduleActivityTaskCommandAttributes{},
false,
)
s.NoError(err)

_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(
_, _, err = s.mutableState.AddStartChildWorkflowExecutionInitiatedEvent(
workflowTaskCompletedEventID,
uuid.New(),
&commandpb.StartChildWorkflowExecutionCommandAttributes{},
namespace.ID(uuid.New()),
)
s.NoError(err)

_, _, err = mutableState.AddTimerStartedEvent(
_, _, err = s.mutableState.AddTimerStartedEvent(
workflowTaskCompletedEventID,
&commandpb.StartTimerCommandAttributes{},
)
s.NoError(err)

_, _, err = mutableState.AddRequestCancelExternalWorkflowExecutionInitiatedEvent(
_, _, err = s.mutableState.AddRequestCancelExternalWorkflowExecutionInitiatedEvent(
workflowTaskCompletedEventID,
uuid.New(),
&commandpb.RequestCancelExternalWorkflowExecutionCommandAttributes{},
namespace.ID(uuid.New()),
)
s.NoError(err)

_, _, err = mutableState.AddSignalExternalWorkflowExecutionInitiatedEvent(
_, _, err = s.mutableState.AddSignalExternalWorkflowExecutionInitiatedEvent(
workflowTaskCompletedEventID,
uuid.New(),
&commandpb.SignalExternalWorkflowExecutionCommandAttributes{
Expand All @@ -959,7 +954,7 @@ func (s *mutableStateSuite) TestTotalEntitiesCount() {
)
s.NoError(err)

_, err = mutableState.AddWorkflowExecutionSignaled(
_, err = s.mutableState.AddWorkflowExecutionSignaled(
"signalName",
&commonpb.Payloads{},
"identity",
Expand All @@ -969,12 +964,12 @@ func (s *mutableStateSuite) TestTotalEntitiesCount() {
s.NoError(err)

s.mockShard.Resource.ClusterMetadata.EXPECT().ClusterNameForFailoverVersion(
namespaceEntry.IsGlobalNamespace(),
mutableState.GetCurrentVersion(),
tests.LocalNamespaceEntry.IsGlobalNamespace(),
s.mutableState.GetCurrentVersion(),
).Return(cluster.TestCurrentClusterName)
s.mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName)

mutation, _, err := mutableState.CloseTransactionAsMutation(
mutation, _, err := s.mutableState.CloseTransactionAsMutation(
TransactionPolicyActive,
)
s.NoError(err)
Expand Down Expand Up @@ -1048,3 +1043,62 @@ func (s *mutableStateSuite) TestSpeculativeWorkflowTaskNotPersisted() {
})
}
}

func (s *mutableStateSuite) TestRetryActivity_TruncateRetryableFailure() {
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()

// scheduling, starting & completing workflow task is omitted here

workflowTaskCompletedEventID := int64(4)
_, activityInfo, err := s.mutableState.AddActivityTaskScheduledEvent(
workflowTaskCompletedEventID,
&commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: "5",
ActivityType: &commonpb.ActivityType{Name: "activity-type"},
TaskQueue: &taskqueuepb.TaskQueue{Name: "task-queue"},
RetryPolicy: &commonpb.RetryPolicy{
InitialInterval: timestamp.DurationFromSeconds(1),
},
},
false,
)
s.NoError(err)

_, err = s.mutableState.AddActivityTaskStartedEvent(
activityInfo,
activityInfo.ScheduledEventId,
uuid.New(),
"worker-identity",
)
s.NoError(err)

failureSizeErrorLimit := s.mockConfig.MutableStateActivityFailureSizeLimitError(
s.mutableState.namespaceEntry.Name().String(),
)

activityFailure := &failurepb.Failure{
Message: "activity failure with large details",
Source: "application",
FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
Type: "application-failure-type",
NonRetryable: false,
Details: &commonpb.Payloads{
Payloads: []*commonpb.Payload{
{
Data: make([]byte, failureSizeErrorLimit*2),
},
},
},
}},
}
s.Greater(activityFailure.Size(), failureSizeErrorLimit)

retryState, err := s.mutableState.RetryActivity(activityInfo, activityFailure)
s.NoError(err)
s.Equal(enumspb.RETRY_STATE_IN_PROGRESS, retryState)

activityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
s.True(ok)
s.LessOrEqual(activityInfo.RetryLastFailure.Size(), failureSizeErrorLimit)
s.Equal(activityFailure.GetMessage(), activityInfo.RetryLastFailure.Cause.GetMessage())
}

0 comments on commit 9d7545b

Please sign in to comment.