From c82d43c6d6fef15b5bdd1d1331a95d92eb7aeda9 Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Tue, 15 Nov 2022 14:11:08 -0800 Subject: [PATCH] Return an error whenever we exceed the number of pending child executions (#3586) --- host/client_integration_test.go | 135 ++++++++++++++++++++++++- service/history/historyEngine2_test.go | 98 +++++++++++++++++- service/history/workflowTaskHandler.go | 5 + 3 files changed, 230 insertions(+), 8 deletions(-) diff --git a/host/client_integration_test.go b/host/client_integration_test.go index fd773a885b7..15571f6eabe 100644 --- a/host/client_integration_test.go +++ b/host/client_integration_test.go @@ -41,6 +41,7 @@ import ( "github.com/stretchr/testify/suite" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" + historypb "go.temporal.io/api/history/v1" "go.temporal.io/sdk/activity" sdkclient "go.temporal.io/sdk/client" "go.temporal.io/sdk/converter" @@ -49,6 +50,8 @@ import ( "go.temporal.io/sdk/workflow" "go.temporal.io/server/api/adminservice/v1" + "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/rpc" ) @@ -59,10 +62,11 @@ type ( // not merely log an error *require.Assertions IntegrationBase - hostPort string - sdkClient sdkclient.Client - worker worker.Worker - taskQueue string + hostPort string + sdkClient sdkclient.Client + worker worker.Worker + taskQueue string + maxPendingChildExecutions int } ) @@ -77,6 +81,12 @@ func TestClientIntegrationSuite(t *testing.T) { } func (s *clientIntegrationSuite) SetupSuite() { + // maxPendingChildWorkflows should be NumPendingChildExecutionLimitError, but that value is 1000, and it takes + // too long to test (which is the point of this limit) + s.maxPendingChildExecutions = 10 + s.dynamicConfigOverrides = map[dynamicconfig.Key]interface{}{ + dynamicconfig.NumPendingChildExecutionLimitError: s.maxPendingChildExecutions, + } s.setupSuite("testdata/clientintegrationtestcluster.yaml") s.hostPort = "127.0.0.1:7134" @@ -102,7 +112,10 @@ func (s *clientIntegrationSuite) SetupTest() { } s.sdkClient = sdkClient s.taskQueue = s.randomizeStr("tq") - s.worker = worker.New(s.sdkClient, s.taskQueue, worker.Options{}) + + // We need to set this timeout to 0 to disable the deadlock detector. Otherwise, the deadlock detector will cause + // TestTooManyChildWorkflows to fail because it thinks there is a deadlocked due to the blocked child workflows. + s.worker = worker.New(s.sdkClient, s.taskQueue, worker.Options{DeadlockDetectionTimeout: 0}) if err := s.worker.Start(); err != nil { s.Logger.Fatal("Error when start worker", tag.Error(err)) } @@ -442,6 +455,118 @@ func (s *clientIntegrationSuite) TestClientDataConverter_WithChild() { s.Equal(2, d.NumOfCallFromPayloads) } +func (s *clientIntegrationSuite) TestTooManyChildWorkflows() { + // To ensure that there is one pending child workflow before we try to create the next one, + // we create a child workflow here that signals the parent when it has started and then blocks forever. + parentWorkflowId := "client-integration-too-many-child-workflows" + blockingChildWorkflow := func(ctx workflow.Context) error { + workflow.SignalExternalWorkflow(ctx, parentWorkflowId, "", "blocking-child-started", nil) + workflow.GetSignalChannel(ctx, "unblock-child").Receive(ctx, nil) + return nil + } + childWorkflow := func(ctx workflow.Context) error { + return nil + } + + // define a workflow which creates N blocked children, and then tries to create another, which should fail because + // it's now past the limit + maxPendingChildWorkflows := s.maxPendingChildExecutions + parentWorkflow := func(ctx workflow.Context) error { + childStarted := workflow.GetSignalChannel(ctx, "blocking-child-started") + for i := 0; i < maxPendingChildWorkflows; i++ { + childOptions := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ + WorkflowID: fmt.Sprintf("child-%d", i+1), + }) + workflow.ExecuteChildWorkflow(childOptions, blockingChildWorkflow) + } + for i := 0; i < maxPendingChildWorkflows; i++ { + childStarted.Receive(ctx, nil) + } + return workflow.ExecuteChildWorkflow(workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ + WorkflowID: fmt.Sprintf("child-%d", maxPendingChildWorkflows+1), + }), childWorkflow).Get(ctx, nil) + } + + // register all the workflows + s.worker.RegisterWorkflow(blockingChildWorkflow) + s.worker.RegisterWorkflow(childWorkflow) + s.worker.RegisterWorkflow(parentWorkflow) + + // start the parent workflow + timeout := time.Minute * 5 + ctx, cancel := rpc.NewContextWithTimeoutAndVersionHeaders(timeout) + defer cancel() + options := sdkclient.StartWorkflowOptions{ + ID: parentWorkflowId, + TaskQueue: s.taskQueue, + WorkflowRunTimeout: timeout, + } + future, err := s.sdkClient.ExecuteWorkflow(ctx, options, parentWorkflow) + s.NoError(err) + + // retry policy for some asynchronous operations + retry := func(operationCtx backoff.OperationCtx) error { + return backoff.ThrottleRetryContext( + ctx, + operationCtx, + backoff.NewExponentialRetryPolicy(time.Second), + func(err error) bool { + // all errors are retryable + return true + }, + ) + } + + // wait until the workflow task to create the child workflow is retried + s.NoError(retry(func(ctx context.Context) error { + result, err := s.sdkClient.DescribeWorkflowExecution(ctx, parentWorkflowId, "") + s.NoError(err) + if result.PendingWorkflowTask != nil { + if result.PendingWorkflowTask.Attempt > 1 { + s.Len(result.PendingChildren, s.maxPendingChildExecutions) + return nil + } + } + return fmt.Errorf("the task to create the child workflow was never retried") + })) + + // unblock the last child, allowing it to complete, which lowers the number of pending child workflows + s.NoError(s.sdkClient.SignalWorkflow( + ctx, + fmt.Sprintf("child-%d", maxPendingChildWorkflows), + "", + "unblock-child", + nil, + )) + + // verify that the parent workflow completes soon after the number of pending child workflows drops + s.NoError(retry(func(ctx context.Context) error { + return future.Get(ctx, nil) + })) + + // verify that the workflow's history contains a task that failed because it would otherwise exceed the pending + // child workflow limit + history := s.sdkClient.GetWorkflowHistory( + ctx, + parentWorkflowId, + "", + true, + enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + for history.HasNext() { + event, err := history.Next() + s.NoError(err) + switch a := event.Attributes.(type) { + case *historypb.HistoryEvent_WorkflowTaskFailedEventAttributes: + if a.WorkflowTaskFailedEventAttributes.Cause == enumspb. + WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED { + return + } + } + } + s.Fail("did not find a history event for the failure") +} + func (s *clientIntegrationSuite) Test_StickyWorkerRestartWorkflowTask() { testCases := []struct { name string diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index da4d52e6fcb..26ecaa71352 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -94,8 +94,9 @@ type ( historyEngine *historyEngineImpl mockExecutionMgr *persistence.MockExecutionManager - config *configs.Config - logger log.Logger + config *configs.Config + logger *log.MockLogger + errorMessages []string } ) @@ -153,7 +154,14 @@ func (s *engine2Suite) SetupTest() { s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(false, common.EmptyVersion).Return(cluster.TestCurrentClusterName).AnyTimes() s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, tests.Version).Return(cluster.TestCurrentClusterName).AnyTimes() s.workflowCache = workflow.NewCache(s.mockShard) - s.logger = s.mockShard.GetLogger() + s.logger = log.NewMockLogger(s.controller) + s.logger.EXPECT().Debug(gomock.Any(), gomock.Any()).AnyTimes() + s.logger.EXPECT().Info(gomock.Any(), gomock.Any()).AnyTimes() + s.logger.EXPECT().Warn(gomock.Any(), gomock.Any()).AnyTimes() + s.errorMessages = make([]string, 0) + s.logger.EXPECT().Error(gomock.Any(), gomock.Any()).AnyTimes().Do(func(msg string, tags ...tag.Tag) { + s.errorMessages = append(s.errorMessages, msg) + }) h := &historyEngineImpl{ currentClusterName: s.mockShard.GetClusterMetadata().GetCurrentClusterName(), @@ -916,6 +924,90 @@ func (s *engine2Suite) TestRespondWorkflowTaskCompleted_StartChildWithSearchAttr s.Nil(err) } +func (s *engine2Suite) TestRespondWorkflowTaskCompleted_StartChildWorkflow_ExceedsLimit() { + namespaceID := tests.NamespaceID + taskQueue := "testTaskQueue" + identity := "testIdentity" + workflowType := "testWorkflowType" + + we := commonpb.WorkflowExecution{ + WorkflowId: tests.WorkflowID, + RunId: tests.RunID, + } + ms := workflow.TestLocalMutableState( + s.historyEngine.shard, + s.mockEventsCache, + tests.LocalNamespaceEntry, + log.NewTestLogger(), + we.GetRunId(), + ) + + addWorkflowExecutionStartedEvent( + ms, + we, + workflowType, + taskQueue, + nil, + time.Minute, + time.Minute, + time.Minute, + identity, + ) + + s.mockNamespaceCache.EXPECT().GetNamespace(tests.Namespace).Return(tests.LocalNamespaceEntry, nil).AnyTimes() + + var commands []*commandpb.Command + for i := 0; i < 6; i++ { + commands = append( + commands, + &commandpb.Command{ + CommandType: enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_StartChildWorkflowExecutionCommandAttributes{ + StartChildWorkflowExecutionCommandAttributes: &commandpb.StartChildWorkflowExecutionCommandAttributes{ + Namespace: tests.Namespace.String(), + WorkflowId: tests.WorkflowID, + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + }}, + }, + ) + } + + wt := addWorkflowTaskScheduledEvent(ms) + addWorkflowTaskStartedEvent( + ms, + wt.ScheduledEventID, + taskQueue, + identity, + ) + taskToken := &tokenspb.Task{ + Attempt: 1, + NamespaceId: namespaceID.String(), + WorkflowId: tests.WorkflowID, + RunId: we.GetRunId(), + ScheduledEventId: 2, + } + taskTokenBytes, _ := taskToken.Marshal() + response := &persistence.GetWorkflowExecutionResponse{State: workflow.TestCloneToProto(ms)} + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(response, nil).AnyTimes() + + s.historyEngine.shard.GetConfig().NumPendingChildExecutionLimitError = func(namespace string) int { + return 5 + } + _, err := s.historyEngine.RespondWorkflowTaskCompleted(metrics.AddMetricsContext(context.Background()), &historyservice.RespondWorkflowTaskCompletedRequest{ + NamespaceId: tests.NamespaceID.String(), + CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + TaskToken: taskTokenBytes, + Commands: commands, + Identity: identity, + }, + }) + + s.Error(err) + s.Assert().Equal([]string{"the number of pending child workflow executions, 5, " + + "has reached the error limit of 5 established with \"limit.numPendingChildExecution.error\""}, s.errorMessages) +} + func (s *engine2Suite) TestStartWorkflowExecution_BrandNew() { namespaceID := tests.NamespaceID workflowID := "workflowID" diff --git a/service/history/workflowTaskHandler.go b/service/history/workflowTaskHandler.go index ea467310057..03bfd7640f2 100644 --- a/service/history/workflowTaskHandler.go +++ b/service/history/workflowTaskHandler.go @@ -895,6 +895,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandStartChildWorkflow( return handler.failWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_START_CHILD_EXECUTION_ATTRIBUTES, err) } + // child workflow limit + if err := handler.sizeLimitChecker.checkIfNumChildWorkflowsExceedsLimit(); err != nil { + return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED, err) + } + enabled := handler.config.EnableParentClosePolicy(parentNamespace.String()) if enabled { enums.SetDefaultParentClosePolicy(&attr.ParentClosePolicy)