Skip to content

Commit

Permalink
Don't log a warning when the number of child workflows is high
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Nov 16, 2022
1 parent c14f643 commit fe0f70a
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 78 deletions.
2 changes: 0 additions & 2 deletions common/dynamicconfig/constants.go
Expand Up @@ -115,8 +115,6 @@ const (
MemoSizeLimitWarn = "limit.memoSize.warn"
// NumPendingChildExecutionLimitError is the per workflow pending child workflow limit
NumPendingChildExecutionLimitError = "limit.numPendingChildExecution.error"
// NumPendingChildExecutionLimitWarning is the per workflow pending child workflow limit for warning
NumPendingChildExecutionLimitWarning = "limit.numPendingChildExecution.warn"
// HistorySizeLimitError is the per workflow execution history size limit
HistorySizeLimitError = "limit.historySize.error"
// HistorySizeLimitWarn is the per workflow execution history size limit for warning
Expand Down
1 change: 0 additions & 1 deletion common/metrics/metric_defs.go
Expand Up @@ -1345,7 +1345,6 @@ var (
HistoryCount = NewDimensionlessHistogramDef("history_count")
SearchAttributesSize = NewBytesHistogramDef("search_attributes_size")
MemoSize = NewBytesHistogramDef("memo_size")
NumPendingChildWorkflowsHigh = NewCounterDef("num_pending_child_workflows_high")
NumPendingChildWorkflowsTooHigh = NewCounterDef("num_pending_child_workflows_too_high")

// Frontend
Expand Down
36 changes: 12 additions & 24 deletions service/history/commandChecker.go
Expand Up @@ -67,7 +67,6 @@ type (
memoSizeLimitWarn int
memoSizeLimitError int
numPendingChildExecutionLimitError int
numPendingChildExecutionLimitWarn int
}

workflowSizeChecker struct {
Expand Down Expand Up @@ -191,30 +190,19 @@ func (c *workflowSizeChecker) checkIfNumChildWorkflowsExceedsLimit() error {

numPending := len(c.mutableState.GetPendingChildExecutionInfos())
errLimit := c.numPendingChildExecutionLimitError
warnLimit := c.numPendingChildExecutionLimitWarn
if !withinLimit(numPending, errLimit) {
c.metricsHandler.Counter(metrics.NumPendingChildWorkflowsTooHigh.GetMetricName()).Record(1)
err := fmt.Errorf(
"the number of pending child workflow executions, %d, "+
"has reached the error limit of %d established with %q",
numPending,
errLimit,
dynamicconfig.NumPendingChildExecutionLimitError,
)
logger.Error(err.Error(), tag.Error(err))
return err
}
if !withinLimit(numPending, warnLimit) {
c.metricsHandler.Counter(metrics.NumPendingChildWorkflowsHigh.GetMetricName()).Record(1)
logger.Warn(fmt.Sprintf(
"The number of pending child workflow executions, %d, "+
"has reached the warning limit of %d established with %q.",
numPending,
warnLimit,
dynamicconfig.NumPendingChildExecutionLimitWarning,
))
if withinLimit(numPending, errLimit) {
return nil
}
return nil
c.metricsHandler.Counter(metrics.NumPendingChildWorkflowsTooHigh.GetMetricName()).Record(1)
err := fmt.Errorf(
"the number of pending child workflow executions, %d, "+
"has reached the error limit of %d established with %q",
numPending,
errLimit,
dynamicconfig.NumPendingChildExecutionLimitError,
)
logger.Error(err.Error(), tag.Error(err))
return err
}

func (c *workflowSizeChecker) checkIfSearchAttributesSizeExceedsLimit(
Expand Down
53 changes: 6 additions & 47 deletions service/history/commandChecker_test.go
Expand Up @@ -719,75 +719,38 @@ func TestWorkflowSizeChecker_NumChildWorkflows(t *testing.T) {
const (
errMsg = "the number of pending child workflow executions, 1, " +
"has reached the error limit of 1 established with \"limit.numPendingChildExecution.error\""
warnMsg = "The number of pending child workflow executions, 1, " +
"has reached the warning limit of 1 established with \"limit.numPendingChildExecution.warn\"."
)
for _, c := range []struct {
Name string

Name string
NumPendingChildExecutions int
WarnLimit int
ErrorLimit int

ExpectedWarningMsg string
ExpectedErrorMsg string

ExpectedMetric string
ExpectedErrorMsg string
ExpectedMetric string
}{
{
Name: "No limits and no child workflows",
Name: "No limit and no child workflows",
NumPendingChildExecutions: 0,
WarnLimit: 0,
ErrorLimit: 0,
},
{
Name: "No executions",
NumPendingChildExecutions: 0,
WarnLimit: 1,
ErrorLimit: 1,
},
{
Name: "Neither limit exceeded",
Name: "Limit not exceeded",
NumPendingChildExecutions: 2,
WarnLimit: 3,
ErrorLimit: 3,
},
{
Name: "Error limit exceeded without a warning limit",
NumPendingChildExecutions: 1,
WarnLimit: 0,
ErrorLimit: 1,

ExpectedErrorMsg: errMsg,
ExpectedMetric: "num_pending_child_workflows_too_high",
},
{
Name: "Error limit and warning limit both exceeded",
Name: "Error limit exceeded",
NumPendingChildExecutions: 1,
WarnLimit: 1,
ErrorLimit: 1,

ExpectedErrorMsg: errMsg,
ExpectedMetric: "num_pending_child_workflows_too_high",
},
{
Name: "Warning limit exceeded, but no error limit exists",
NumPendingChildExecutions: 1,
WarnLimit: 1,
ErrorLimit: 0,

ExpectedWarningMsg: warnMsg,
ExpectedMetric: "num_pending_child_workflows_high",
},
{
Name: "Only warning limit exceeded, and there is an error limit",
NumPendingChildExecutions: 1,
WarnLimit: 1,
ErrorLimit: 2,

ExpectedWarningMsg: warnMsg,
ExpectedMetric: "num_pending_child_workflows_high",
},
} {
t.Run(c.Name, func(t *testing.T) {
ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -830,16 +793,12 @@ func TestWorkflowSizeChecker_NumChildWorkflows(t *testing.T) {
assert.Equal(t, "test-run-id", runID)
}

if len(c.ExpectedWarningMsg) > 0 {
logger.EXPECT().Warn(c.ExpectedWarningMsg, gomock.Any()).Do(assertMessage)
}
if len(c.ExpectedErrorMsg) > 0 {
logger.EXPECT().Error(c.ExpectedErrorMsg, gomock.Any()).Do(assertMessage)
}

checker := newWorkflowSizeChecker(workflowSizeLimits{
numPendingChildExecutionLimitError: c.ErrorLimit,
numPendingChildExecutionLimitWarn: c.WarnLimit,
}, mutableState, nil, nil, metricsHandler, logger)
err := checker.checkIfNumChildWorkflowsExceedsLimit()

Expand Down
2 changes: 0 additions & 2 deletions service/history/configs/config.go
Expand Up @@ -197,7 +197,6 @@ type Config struct {
HistoryCountLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
HistoryCountLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
NumPendingChildExecutionLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
NumPendingChildExecutionLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter

// DefaultActivityRetryOptions specifies the out-of-box retry policy if
// none is configured on the Activity by the user.
Expand Down Expand Up @@ -433,7 +432,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
MemoSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MemoSizeLimitError, 2*1024*1024),
MemoSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MemoSizeLimitWarn, 2*1024),
NumPendingChildExecutionLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingChildExecutionLimitError, 1000),
NumPendingChildExecutionLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingChildExecutionLimitWarning, 200),
HistorySizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeLimitError, 50*1024*1024),
HistorySizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeLimitWarn, 10*1024*1024),
HistoryCountLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountLimitError, 50*1024),
Expand Down
2 changes: 0 additions & 2 deletions service/history/workflowTaskHandlerCallbacks.go
Expand Up @@ -454,8 +454,6 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
blobSizeLimitError: handler.config.BlobSizeLimitError(namespace.String()),
memoSizeLimitWarn: handler.config.MemoSizeLimitWarn(namespace.String()),
memoSizeLimitError: handler.config.MemoSizeLimitError(namespace.String()),
numPendingChildExecutionLimitWarn: handler.config.NumPendingChildExecutionLimitWarn(namespace.
String()),
numPendingChildExecutionLimitError: handler.config.NumPendingChildExecutionLimitError(namespace.
String()),
},
Expand Down

0 comments on commit fe0f70a

Please sign in to comment.