From 206bf5bd578f2a1fb800507017f46662774e7068 Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Wed, 16 Nov 2022 23:46:25 -0800 Subject: [PATCH] Enforce the per-workflow pending activity constraint --- host/client_integration_test.go | 177 +++++++++++++++++++------ service/history/workflowTaskHandler.go | 5 + 2 files changed, 145 insertions(+), 37 deletions(-) diff --git a/host/client_integration_test.go b/host/client_integration_test.go index ef3300dda2d0..e50143ab4e25 100644 --- a/host/client_integration_test.go +++ b/host/client_integration_test.go @@ -37,6 +37,7 @@ import ( "testing" "time" + "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" commonpb "go.temporal.io/api/common/v1" @@ -67,6 +68,9 @@ type ( worker worker.Worker taskQueue string maxPendingChildExecutions int + maxPendingActivities int + maxPendingCancelRequests int + maxPendingSignals int } ) @@ -81,11 +85,17 @@ func TestClientIntegrationSuite(t *testing.T) { } func (s *clientIntegrationSuite) SetupSuite() { - // maxPendingChildWorkflows should be NumPendingChildExecutionsLimit, but that value is 1000, and it takes - // too long to test (which is the point of this limit) - s.maxPendingChildExecutions = 10 + // these limits are higher in production, but our tests would take too long if we set them that high + limit := 10 + s.maxPendingChildExecutions = limit + s.maxPendingActivities = limit + s.maxPendingCancelRequests = limit + s.maxPendingSignals = limit s.dynamicConfigOverrides = map[dynamicconfig.Key]interface{}{ dynamicconfig.NumPendingChildExecutionsLimitError: s.maxPendingChildExecutions, + dynamicconfig.NumPendingActivitiesLimitError: s.maxPendingActivities, + dynamicconfig.NumPendingCancelRequestsLimitError: s.maxPendingCancelRequests, + dynamicconfig.NumPendingSignalsLimitError: s.maxPendingSignals, } s.setupSuite("testdata/clientintegrationtestcluster.yaml") @@ -114,7 +124,7 @@ func (s *clientIntegrationSuite) SetupTest() { s.taskQueue = s.randomizeStr("tq") // We need to set this timeout to 0 to disable the deadlock detector. Otherwise, the deadlock detector will cause - // TestTooManyChildWorkflows to fail because it thinks there is a deadlocked due to the blocked child workflows. + // TestTooManyChildWorkflows to fail because it thinks there is a deadlock due to the blocked child workflows. s.worker = worker.New(s.sdkClient, s.taskQueue, worker.Options{DeadlockDetectionTimeout: 0}) if err := s.worker.Start(); err != nil { s.Logger.Fatal("Error when start worker", tag.Error(err)) @@ -504,21 +514,8 @@ func (s *clientIntegrationSuite) TestTooManyChildWorkflows() { future, err := s.sdkClient.ExecuteWorkflow(ctx, options, parentWorkflow) s.NoError(err) - // retry policy for some asynchronous operations - retry := func(operationCtx backoff.OperationCtx) error { - return backoff.ThrottleRetryContext( - ctx, - operationCtx, - backoff.NewExponentialRetryPolicy(time.Second), - func(err error) bool { - // all errors are retryable - return true - }, - ) - } - - // wait until the workflow task to create the child workflow is retried - s.NoError(retry(func(ctx context.Context) error { + // wait until we retry workflow task to create the last child workflow + s.eventuallySucceeds(ctx, func(ctx context.Context) error { result, err := s.sdkClient.DescribeWorkflowExecution(ctx, parentWorkflowId, "") s.NoError(err) if result.PendingWorkflowTask != nil { @@ -528,7 +525,7 @@ func (s *clientIntegrationSuite) TestTooManyChildWorkflows() { } } return fmt.Errorf("the task to create the child workflow was never retried") - })) + }) // unblock the last child, allowing it to complete, which lowers the number of pending child workflows s.NoError(s.sdkClient.SignalWorkflow( @@ -540,31 +537,137 @@ func (s *clientIntegrationSuite) TestTooManyChildWorkflows() { )) // verify that the parent workflow completes soon after the number of pending child workflows drops - s.NoError(retry(func(ctx context.Context) error { + s.eventuallySucceeds(ctx, func(ctx context.Context) error { return future.Get(ctx, nil) - })) + }) - // verify that the workflow's history contains a task that failed because it would otherwise exceed the pending - // child workflow limit - history := s.sdkClient.GetWorkflowHistory( + s.historyContainsFailureCausedBy( ctx, parentWorkflowId, - "", - true, - enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED, ) - for history.HasNext() { - event, err := history.Next() +} + +// TestTooManyPendingActivities verifies that we don't allow users to schedule new activities when they've already +// reached the limit for pending activities. +func (s *clientIntegrationSuite) TestTooManyPendingActivities() { + timeout := time.Minute + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + pendingActivities := make(chan activity.Info, s.maxPendingActivities) + pendingActivity := func(ctx context.Context) error { + pendingActivities <- activity.GetInfo(ctx) + return activity.ErrResultPending + } + s.worker.RegisterActivity(pendingActivity) + lastActivity := func(ctx context.Context) error { + return nil + } + s.worker.RegisterActivity(lastActivity) + + readyToScheduleLastActivity := "ready-to-schedule-last-activity" + myWorkflow := func(ctx workflow.Context) error { + for i := 0; i < s.maxPendingActivities; i++ { + workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: time.Minute, + ActivityID: fmt.Sprintf("pending-activity-%d", i), + }), pendingActivity) + } + + workflow.GetSignalChannel(ctx, readyToScheduleLastActivity).Receive(ctx, nil) + + return workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: time.Minute, + ActivityID: "last-activity", + }), lastActivity).Get(ctx, nil) + } + s.worker.RegisterWorkflow(myWorkflow) + + workflowId := uuid.New() + workflowRun, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ + ID: workflowId, + TaskQueue: s.taskQueue, + }, myWorkflow) + s.NoError(err) + + // wait until all of the activities are started (but not finished) before trying to schedule the last one + var activityInfo activity.Info + for i := 0; i < s.maxPendingActivities; i++ { + activityInfo = <-pendingActivities + } + s.NoError(s.sdkClient.SignalWorkflow(ctx, workflowId, "", readyToScheduleLastActivity, nil)) + + // verify that we can't finish the workflow yet + { + ctx, cancel := context.WithTimeout(ctx, time.Millisecond*100) + defer cancel() + err = workflowRun.Get(ctx, nil) + s.Error(err, "the workflow should not be done while there are too many pending activities") + } + + // wait until the workflow task to schedule the last activity is retried + s.eventuallySucceeds(ctx, func(ctx context.Context) error { + result, err := s.sdkClient.DescribeWorkflowExecution(ctx, workflowId, "") s.NoError(err) - switch a := event.Attributes.(type) { - case *historypb.HistoryEvent_WorkflowTaskFailedEventAttributes: - if a.WorkflowTaskFailedEventAttributes.Cause == enumspb. - WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED { - return + if result.PendingWorkflowTask != nil { + if len(result.PendingActivities) == s.maxPendingActivities { + return nil } } - } - s.Fail("did not find a history event for the failure") + return fmt.Errorf("we haven't retried the task to schedule the last activity") + }) + + // mark one of the pending activities as complete and verify that the worfklow can now complete + s.NoError(s.sdkClient.CompleteActivity(ctx, activityInfo.TaskToken, nil, nil)) + s.eventuallySucceeds(ctx, func(ctx context.Context) error { + return workflowRun.Get(ctx, nil) + }) + + // verify that the workflow's history contains a task that failed because it would otherwise exceed the pending + // child workflow limit + s.historyContainsFailureCausedBy( + ctx, + workflowId, + enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_ACTIVITIES_LIMIT_EXCEEDED, + ) +} + +func (s *clientIntegrationSuite) eventuallySucceeds(ctx context.Context, operationCtx backoff.OperationCtx) { + s.T().Helper() + s.NoError(backoff.ThrottleRetryContext( + ctx, + operationCtx, + backoff.NewExponentialRetryPolicy(time.Second), + func(err error) bool { + // all errors are retryable + return true + }, + )) +} + +func (s *clientIntegrationSuite) historyContainsFailureCausedBy(ctx context.Context, parentWorkflowId string, cause enumspb.WorkflowTaskFailedCause) { + s.T().Helper() + s.eventuallySucceeds(ctx, func(ctx context.Context) error { + history := s.sdkClient.GetWorkflowHistory( + ctx, + parentWorkflowId, + "", + true, + enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + for history.HasNext() { + event, err := history.Next() + s.NoError(err) + switch a := event.Attributes.(type) { + case *historypb.HistoryEvent_WorkflowTaskFailedEventAttributes: + if a.WorkflowTaskFailedEventAttributes.Cause == cause { + return nil + } + } + } + return fmt.Errorf("did not find a failed task whose cause was %q", cause) + }) } func (s *clientIntegrationSuite) Test_StickyWorkerRestartWorkflowTask() { diff --git a/service/history/workflowTaskHandler.go b/service/history/workflowTaskHandler.go index 03bfd7640f21..b8a59a73c822 100644 --- a/service/history/workflowTaskHandler.go +++ b/service/history/workflowTaskHandler.go @@ -264,6 +264,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandScheduleActivity( ); err != nil { return nil, handler.failWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_ACTIVITY_ATTRIBUTES, err) } + if err := handler.sizeLimitChecker.checkIfNumPendingActivitiesExceedsLimit(); err != nil { + handler.workflowTaskFailedCause = NewWorkflowTaskFailedCause(enumspb. + WORKFLOW_TASK_FAILED_CAUSE_PENDING_ACTIVITIES_LIMIT_EXCEEDED, err) + return nil, nil + } enums.SetDefaultTaskQueueKind(&attr.GetTaskQueue().Kind)