Skip to content

Commit

Permalink
Enforce mutable state size limit (#4350)
Browse files Browse the repository at this point in the history
* calculate ms size on db load

* enforce mutable state size limit

* comments

* track size on ms update

* update size on hbuilder change

* cleanup

* cleanup

* only check buffered events size when requested

* refactor size checks

* tests

* feedback

* check for existence before subtracting size

* tests

* change existence check

* feedback

* tests

* include buffered events if they will not be flushed

* include keys in size

* math

* update size in retry activity

* cleanup
  • Loading branch information
pdoerner committed May 24, 2023
1 parent df69684 commit 2fce7a5
Show file tree
Hide file tree
Showing 13 changed files with 506 additions and 62 deletions.
4 changes: 4 additions & 0 deletions common/dynamicconfig/constants.go
Expand Up @@ -156,6 +156,10 @@ const (
MutableStateActivityFailureSizeLimitError = "limit.mutableStateActivityFailureSize.error"
// MutableStateActivityFailureSizeLimitWarn is the per activity failure size warning limit for workflow mutable state
MutableStateActivityFailureSizeLimitWarn = "limit.mutableStateActivityFailureSize.warn"
// MutableStateSizeLimitError is the per workflow execution mutable state size limit in bytes
MutableStateSizeLimitError = "limit.mutableStateSize.error"
// MutableStateSizeLimitWarn is the per workflow execution mutable state size limit in bytes for warning
MutableStateSizeLimitWarn = "limit.mutableStateSize.warn"
// HistoryCountSuggestContinueAsNew is the workflow execution history event count limit to
// suggest continue-as-new (in workflow task started event)
HistoryCountSuggestContinueAsNew = "limit.historyCount.suggestContinueAsNew"
Expand Down
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Expand Up @@ -332,6 +332,11 @@ func WorkflowHistorySizeBytes(historySizeBytes int) ZapTag {
return NewInt("wf-history-size-bytes", historySizeBytes)
}

// WorkflowMutableStateSize returns tag for MutableStateSize
func WorkflowMutableStateSize(mutableStateSize int) ZapTag {
return NewInt("wf-mutable-state-size", mutableStateSize)
}

// WorkflowEventCount returns tag for EventCount
func WorkflowEventCount(eventCount int) ZapTag {
return NewInt("wf-event-count", eventCount)
Expand Down
6 changes: 4 additions & 2 deletions common/util.go
Expand Up @@ -131,8 +131,10 @@ const (
FailureReasonCancelDetailsExceedsLimit = "Cancel details exceed size limit."
// FailureReasonHeartbeatExceedsLimit is failureReason for heartbeat exceeds limit
FailureReasonHeartbeatExceedsLimit = "Heartbeat details exceed size limit."
// FailureReasonSizeExceedsLimit is reason to fail workflow when history size or count exceed limit
FailureReasonSizeExceedsLimit = "Workflow history size / count exceeds limit."
// FailureReasonHistorySizeExceedsLimit is reason to fail workflow when history size or count exceed limit
FailureReasonHistorySizeExceedsLimit = "Workflow history size / count exceeds limit."
// FailureReasonMutableStateSizeExceedsLimit is reason to fail workflow when mutable state size exceeds limit
FailureReasonMutableStateSizeExceedsLimit = "Workflow mutable state size exceeds limit."
// FailureReasonTransactionSizeExceedsLimit is the failureReason for when transaction cannot be committed because it exceeds size limit
FailureReasonTransactionSizeExceedsLimit = "Transaction size exceeds limit."
)
Expand Down
4 changes: 4 additions & 0 deletions service/history/configs/config.go
Expand Up @@ -195,6 +195,8 @@ type Config struct {
HistoryCountSuggestContinueAsNew dynamicconfig.IntPropertyFnWithNamespaceFilter
MutableStateActivityFailureSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
MutableStateActivityFailureSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
MutableStateSizeLimitError dynamicconfig.IntPropertyFn
MutableStateSizeLimitWarn dynamicconfig.IntPropertyFn
NumPendingChildExecutionsLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
NumPendingActivitiesLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
NumPendingSignalsLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
Expand Down Expand Up @@ -458,6 +460,8 @@ func NewConfig(
HistoryCountSuggestContinueAsNew: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountSuggestContinueAsNew, 4*1024),
MutableStateActivityFailureSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MutableStateActivityFailureSizeLimitError, 4*1024),
MutableStateActivityFailureSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MutableStateActivityFailureSizeLimitWarn, 2*1024),
MutableStateSizeLimitError: dc.GetIntProperty(dynamicconfig.MutableStateSizeLimitError, 8*1024*1024),
MutableStateSizeLimitWarn: dc.GetIntProperty(dynamicconfig.MutableStateSizeLimitWarn, 1*1024*1024),

ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.HistoryThrottledLogRPS, 4),
EnableStickyQuery: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableStickyQuery, true),
Expand Down
6 changes: 4 additions & 2 deletions service/history/consts/const.go
Expand Up @@ -77,8 +77,10 @@ var (
ErrConsistentQueryBufferExceeded = serviceerror.NewUnavailable("consistent query buffer is full, cannot accept new consistent queries")
// ErrEmptyHistoryRawEventBatch indicate that one single batch of history raw events is of size 0
ErrEmptyHistoryRawEventBatch = serviceerror.NewInvalidArgument("encountered empty history batch")
// ErrSizeExceedsLimit is error indicating workflow execution has exceeded system defined limit
ErrSizeExceedsLimit = serviceerror.NewInvalidArgument(common.FailureReasonSizeExceedsLimit)
// ErrHistorySizeExceedsLimit is error indicating workflow execution has exceeded system defined history size limit
ErrHistorySizeExceedsLimit = serviceerror.NewInvalidArgument(common.FailureReasonHistorySizeExceedsLimit)
// ErrMutableStateSizeExceedsLimit is error indicating workflow execution has exceeded system defined mutable state size limit
ErrMutableStateSizeExceedsLimit = serviceerror.NewInvalidArgument(common.FailureReasonMutableStateSizeExceedsLimit)
// ErrUnknownCluster is error indicating unknown cluster
ErrUnknownCluster = serviceerror.NewInvalidArgument("unknown cluster")
// ErrBufferedQueryCleared is error indicating mutable state is cleared while buffered query is pending
Expand Down
144 changes: 109 additions & 35 deletions service/history/workflow/context.go
Expand Up @@ -148,6 +148,7 @@ type (
shard shard.Context
workflowKey definition.WorkflowKey
logger log.Logger
throttledLogger log.ThrottledLogger
metricsHandler metrics.Handler
clusterMetadata cluster.Metadata
timeSource clock.TimeSource
Expand All @@ -172,6 +173,7 @@ func NewContext(
shard: shard,
workflowKey: workflowKey,
logger: logger,
throttledLogger: shard.GetThrottledLogger(),
metricsHandler: shard.GetMetricsHandler().WithTags(metrics.OperationTag(metrics.WorkflowContextScope)),
clusterMetadata: shard.GetClusterMetadata(),
timeSource: shard.GetTimeSource(),
Expand Down Expand Up @@ -481,27 +483,38 @@ func (c *ContextImpl) UpdateWorkflowExecutionAsActive(
) error {

// We only perform this check on active cluster for the namespace
forceTerminate, err := c.enforceSizeCheck(ctx)
historyForceTerminate, err := c.enforceHistorySizeCheck(ctx)
if err != nil {
return err
}
msForceTerminate := false
if !historyForceTerminate {
msForceTerminate, err = c.enforceMutableStateSizeCheck(ctx)
if err != nil {
return err
}
}

if err := c.UpdateWorkflowExecutionWithNew(
err = c.UpdateWorkflowExecutionWithNew(
ctx,
persistence.UpdateWorkflowModeUpdateCurrent,
nil,
nil,
TransactionPolicyActive,
nil,
); err != nil {
)
if err != nil {
return err
}

if forceTerminate {
// Returns ResourceExhausted error back to caller after workflow execution is forced terminated
// Retrying the operation will give appropriate semantics operation should expect in the case of workflow
// execution being closed.
return consts.ErrSizeExceedsLimit
// Returns ResourceExhausted error back to caller after workflow execution is forced terminated
// Retrying the operation will give appropriate semantics operation should expect in the case of workflow
// execution being closed.
if historyForceTerminate {
return consts.ErrHistorySizeExceedsLimit
}
if msForceTerminate {
return consts.ErrMutableStateSizeExceedsLimit
}

return nil
Expand Down Expand Up @@ -869,9 +882,23 @@ func (c *ContextImpl) UpdateRegistry(ctx context.Context) update.Registry {
}

// Returns true if execution is forced terminated
func (c *ContextImpl) enforceSizeCheck(
func (c *ContextImpl) enforceHistorySizeCheck(
ctx context.Context,
) (bool, error) {
// Hard terminate workflow if still running and breached history size or history count limits
if c.maxHistorySizeExceeded() {
if err := c.forceTerminateWorkflow(ctx, common.FailureReasonHistorySizeExceedsLimit); err != nil {
return false, err
}
// Return true to caller to indicate workflow state is overwritten to force terminate execution on update
return true, nil
}
return false, nil
}

// Returns true if the workflow is running and history size or event count should trigger a forced termination
// Prints a log message if history size or history event count are over the error or warn limits
func (c *ContextImpl) maxHistorySizeExceeded() bool {
namespaceName := c.GetNamespace().String()
historySizeLimitWarn := c.config.HistorySizeLimitWarn(namespaceName)
historySizeLimitError := c.config.HistorySizeLimitError(namespaceName)
Expand All @@ -881,53 +908,100 @@ func (c *ContextImpl) enforceSizeCheck(
historySize := int(c.GetHistorySize())
historyCount := int(c.MutableState.GetNextEventID() - 1)

// Hard terminate workflow if still running and breached size or count limit
if (historySize > historySizeLimitError || historyCount > historyCountLimitError) &&
c.MutableState.IsWorkflowExecutionRunning() {
c.logger.Error("history size exceeds error limit.",
c.logger.Warn("history size exceeds error limit.",
tag.WorkflowNamespaceID(c.workflowKey.NamespaceID),
tag.WorkflowID(c.workflowKey.WorkflowID),
tag.WorkflowRunID(c.workflowKey.RunID),
tag.WorkflowHistorySize(historySize),
tag.WorkflowEventCount(historyCount))

// Discard pending changes in MutableState so we can apply terminate state transition
c.Clear()
return true
}

// Reload mutable state
mutableState, err := c.LoadMutableState(ctx)
if err != nil {
return false, err
}
if historySize > historySizeLimitWarn || historyCount > historyCountLimitWarn {
c.throttledLogger.Warn("history size exceeds warn limit.",
tag.WorkflowNamespaceID(c.MutableState.GetExecutionInfo().NamespaceId),
tag.WorkflowID(c.MutableState.GetExecutionInfo().WorkflowId),
tag.WorkflowRunID(c.MutableState.GetExecutionState().RunId),
tag.WorkflowHistorySize(historySize),
tag.WorkflowEventCount(historyCount))
}

// Terminate workflow is written as a separate batch and might result in more than one event as we close the
// outstanding workflow task before terminating the workflow
eventBatchFirstEventID := mutableState.GetNextEventID()
if err := TerminateWorkflow(
mutableState,
eventBatchFirstEventID,
common.FailureReasonSizeExceedsLimit,
nil,
consts.IdentityHistoryService,
false,
); err != nil {
return false
}

// Returns true if execution is forced terminated
// TODO: ideally this check should be after closing mutable state tx, but that would require a large refactor
func (c *ContextImpl) enforceMutableStateSizeCheck(ctx context.Context) (bool, error) {
if c.maxMutableStateSizeExceeded() {
if err := c.forceTerminateWorkflow(ctx, common.FailureReasonMutableStateSizeExceedsLimit); err != nil {
return false, err
}

// Return true to caller to indicate workflow state is overwritten to force terminate execution on update
return true, nil
}
return false, nil
}

if historySize > historySizeLimitWarn || historyCount > historyCountLimitWarn {
c.logger.Warn("history size exceeds warn limit.",
// Returns true if the workflow is running and mutable state size should trigger a forced termination
// Prints a log message if mutable state size is over the error or warn limits
func (c *ContextImpl) maxMutableStateSizeExceeded() bool {
mutableStateSizeLimitError := c.config.MutableStateSizeLimitError()
mutableStateSizeLimitWarn := c.config.MutableStateSizeLimitWarn()

mutableStateSize := c.MutableState.GetApproximatePersistedSize()

if mutableStateSize > mutableStateSizeLimitError {
c.logger.Warn("mutable state size exceeds error limit.",
tag.WorkflowNamespaceID(c.workflowKey.NamespaceID),
tag.WorkflowID(c.workflowKey.WorkflowID),
tag.WorkflowRunID(c.workflowKey.RunID),
tag.WorkflowMutableStateSize(mutableStateSize))

return true
}

if mutableStateSize > mutableStateSizeLimitWarn {
c.throttledLogger.Warn("mutable state size exceeds warn limit.",
tag.WorkflowNamespaceID(c.MutableState.GetExecutionInfo().NamespaceId),
tag.WorkflowID(c.MutableState.GetExecutionInfo().WorkflowId),
tag.WorkflowRunID(c.MutableState.GetExecutionState().RunId),
tag.WorkflowHistorySize(historySize),
tag.WorkflowEventCount(historyCount))
tag.WorkflowMutableStateSize(mutableStateSize))
}

return false, nil
return false
}

func (c *ContextImpl) forceTerminateWorkflow(
ctx context.Context,
failureReason string,
) error {
if !c.MutableState.IsWorkflowExecutionRunning() {
return nil
}

// Discard pending changes in MutableState so we can apply terminate state transition
c.Clear()

// Reload mutable state
mutableState, err := c.LoadMutableState(ctx)
if err != nil {
return err
}

// Terminate workflow is written as a separate batch and might result in more than one event as we close the
// outstanding workflow task before terminating the workflow
eventBatchFirstEventID := mutableState.GetNextEventID()
return TerminateWorkflow(
mutableState,
eventBatchFirstEventID,
failureReason,
nil,
consts.IdentityHistoryService,
false,
)
}

func emitStateTransitionCount(
Expand Down
1 change: 1 addition & 0 deletions service/history/workflow/mutable_state.go
Expand Up @@ -211,6 +211,7 @@ type (
IsCancelRequested() bool
IsCurrentWorkflowGuaranteed() bool
IsSignalRequested(requestID string) bool
GetApproximatePersistedSize() int

CurrentTaskQueue() *taskqueuepb.TaskQueue
SetStickyTaskQueue(name string, scheduleToStartTimeout *time.Duration)
Expand Down

0 comments on commit 2fce7a5

Please sign in to comment.