Skip to content

Commit

Permalink
Enforce the pending cancel request limit (#3615)
Browse files Browse the repository at this point in the history
* Add limits on other workflow fields

* Add methods to check workflow size violations for several other fields

* Enforce the per-workflow pending activity constraint

* Enforce the per-workflow pending cancel request limit
  • Loading branch information
MichaelSnowden committed Nov 19, 2022
1 parent 0c735e4 commit df160db
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 41 deletions.
4 changes: 2 additions & 2 deletions host/archival_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ func (s *integrationSuite) TestVisibilityArchival() {
}
}

func (s *integrationSuite) getNamespaceID(namespace string) string {
func (s *IntegrationBase) getNamespaceID(namespace string) string {
namespaceResp, err := s.engine.DescribeNamespace(NewContext(), &workflowservice.DescribeNamespaceRequest{
Namespace: s.archivalNamespace,
Namespace: namespace,
})
s.NoError(err)
return namespaceResp.NamespaceInfo.GetId()
Expand Down
147 changes: 108 additions & 39 deletions host/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ import (
"go.temporal.io/sdk/workflow"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/rpc"
)

Expand Down Expand Up @@ -514,18 +516,11 @@ func (s *clientIntegrationSuite) TestTooManyChildWorkflows() {
future, err := s.sdkClient.ExecuteWorkflow(ctx, options, parentWorkflow)
s.NoError(err)

// wait until we retry workflow task to create the last child workflow
s.eventuallySucceeds(ctx, func(ctx context.Context) error {
result, err := s.sdkClient.DescribeWorkflowExecution(ctx, parentWorkflowId, "")
s.NoError(err)
if result.PendingWorkflowTask != nil {
if result.PendingWorkflowTask.Attempt > 1 {
s.Len(result.PendingChildren, s.maxPendingChildExecutions)
return nil
}
}
return fmt.Errorf("the task to create the child workflow was never retried")
})
s.historyContainsFailureCausedBy(
ctx,
parentWorkflowId,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED,
)

// unblock the last child, allowing it to complete, which lowers the number of pending child workflows
s.NoError(s.sdkClient.SignalWorkflow(
Expand All @@ -540,18 +535,12 @@ func (s *clientIntegrationSuite) TestTooManyChildWorkflows() {
s.eventuallySucceeds(ctx, func(ctx context.Context) error {
return future.Get(ctx, nil)
})

s.historyContainsFailureCausedBy(
ctx,
parentWorkflowId,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED,
)
}

// TestTooManyPendingActivities verifies that we don't allow users to schedule new activities when they've already
// reached the limit for pending activities.
func (s *clientIntegrationSuite) TestTooManyPendingActivities() {
timeout := time.Minute
timeout := time.Minute * 5
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

Expand Down Expand Up @@ -606,31 +595,107 @@ func (s *clientIntegrationSuite) TestTooManyPendingActivities() {
s.Error(err, "the workflow should not be done while there are too many pending activities")
}

// wait until the workflow task to schedule the last activity is retried
s.eventuallySucceeds(ctx, func(ctx context.Context) error {
result, err := s.sdkClient.DescribeWorkflowExecution(ctx, workflowId, "")
s.NoError(err)
if result.PendingWorkflowTask != nil {
if len(result.PendingActivities) == s.maxPendingActivities {
return nil
}
}
return fmt.Errorf("we haven't retried the task to schedule the last activity")
})

// mark one of the pending activities as complete and verify that the worfklow can now complete
s.NoError(s.sdkClient.CompleteActivity(ctx, activityInfo.TaskToken, nil, nil))
s.eventuallySucceeds(ctx, func(ctx context.Context) error {
return workflowRun.Get(ctx, nil)
})

// verify that the workflow's history contains a task that failed because it would otherwise exceed the pending
// child workflow limit
s.historyContainsFailureCausedBy(
ctx,
workflowId,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_ACTIVITIES_LIMIT_EXCEEDED,
)

// mark one of the pending activities as complete and verify that the worfklow can now complete
s.NoError(s.sdkClient.CompleteActivity(ctx, activityInfo.TaskToken, nil, nil))
s.eventuallySucceeds(ctx, func(ctx context.Context) error {
return workflowRun.Get(ctx, nil)
})
}

func (s *clientIntegrationSuite) TestTooManyCancelRequests() {
// set a timeout for this whole test
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()

// create a large number of blocked workflows
numTargetWorkflows := 50 // should be much greater than s.maxPendingCancelRequests
targetWorkflow := func(ctx workflow.Context) error {
return workflow.Await(ctx, func() bool {
return false
})
}
s.worker.RegisterWorkflow(targetWorkflow)
for i := 0; i < numTargetWorkflows; i++ {
_, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
ID: fmt.Sprintf("workflow-%d", i),
TaskQueue: s.taskQueue,
}, targetWorkflow)
s.NoError(err)
}

// define a workflow that attempts to cancel a given subsequence of the blocked workflows
cancelWorkflowsInRange := func(ctx workflow.Context, start, stop int) error {
var futures []workflow.Future
for i := start; i < stop; i++ {
future := workflow.RequestCancelExternalWorkflow(ctx, fmt.Sprintf("workflow-%d", i), "")
futures = append(futures, future)
}
for _, future := range futures {
if err := future.Get(ctx, nil); err != nil {
return err
}
}
return nil
}
s.worker.RegisterWorkflow(cancelWorkflowsInRange)

// try to cancel all the workflows at once and verify that we can't because of the limit violation
s.Run("CancelAllWorkflowsAtOnce", func() {
cancelerWorkflowId := "canceler-workflow-id"
run, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
TaskQueue: s.taskQueue,
ID: cancelerWorkflowId,
}, cancelWorkflowsInRange, 0, numTargetWorkflows)
s.NoError(err)
s.historyContainsFailureCausedBy(ctx, cancelerWorkflowId, enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_REQUEST_CANCEL_LIMIT_EXCEEDED)
{
ctx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
s.Error(run.Get(ctx, nil))
}
namespaceID := s.getNamespaceID(s.namespace)
shardID := common.WorkflowIDToHistoryShard(namespaceID, cancelerWorkflowId, s.testClusterConfig.HistoryConfig.NumHistoryShards)
workflowExecution, err := s.testCluster.GetExecutionManager().GetWorkflowExecution(ctx, &persistence.GetWorkflowExecutionRequest{
ShardID: shardID,
NamespaceID: namespaceID,
WorkflowID: cancelerWorkflowId,
RunID: run.GetRunID(),
})
s.NoError(err)
numCancelRequests := len(workflowExecution.State.RequestCancelInfos)
s.Assert().Zero(numCancelRequests)
err = s.sdkClient.CancelWorkflow(ctx, cancelerWorkflowId, "")
s.NoError(err)
})

// try to cancel all the workflows in separate batches of cancel workflows and verify that it works
s.Run("CancelWorkflowsInSeparateBatches", func() {
var runs []sdkclient.WorkflowRun
var stop int
for start := 0; start < numTargetWorkflows; start = stop {
stop = start + s.maxPendingCancelRequests
if stop > numTargetWorkflows {
stop = numTargetWorkflows
}
run, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
TaskQueue: s.taskQueue,
}, cancelWorkflowsInRange, start, stop)
s.NoError(err)
runs = append(runs, run)
}

for _, run := range runs {
s.NoError(run.Get(ctx, nil))
}
})
}

func (s *clientIntegrationSuite) eventuallySucceeds(ctx context.Context, operationCtx backoff.OperationCtx) {
Expand All @@ -646,12 +711,16 @@ func (s *clientIntegrationSuite) eventuallySucceeds(ctx context.Context, operati
))
}

func (s *clientIntegrationSuite) historyContainsFailureCausedBy(ctx context.Context, parentWorkflowId string, cause enumspb.WorkflowTaskFailedCause) {
func (s *clientIntegrationSuite) historyContainsFailureCausedBy(
ctx context.Context,
workflowId string,
cause enumspb.WorkflowTaskFailedCause,
) {
s.T().Helper()
s.eventuallySucceeds(ctx, func(ctx context.Context) error {
history := s.sdkClient.GetWorkflowHistory(
ctx,
parentWorkflowId,
workflowId,
"",
true,
enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
Expand Down
3 changes: 3 additions & 0 deletions service/history/workflowTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,9 @@ func (handler *workflowTaskHandlerImpl) handleCommandRequestCancelExternalWorkfl
); err != nil || handler.stopProcessing {
return err
}
if err := handler.sizeLimitChecker.checkIfNumPendingCancelRequestsExceedsLimit(); err != nil {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_REQUEST_CANCEL_LIMIT_EXCEEDED, err)
}

cancelRequestID := uuid.New()
_, _, err := handler.mutableState.AddRequestCancelExternalWorkflowExecutionInitiatedEvent(
Expand Down

0 comments on commit df160db

Please sign in to comment.