Skip to content

Commit

Permalink
Return an error whenever we exceed the number of pending child execut…
Browse files Browse the repository at this point in the history
…ions (#3586)
  • Loading branch information
MichaelSnowden committed Nov 15, 2022
1 parent e7f99d0 commit c82d43c
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 8 deletions.
135 changes: 130 additions & 5 deletions host/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/stretchr/testify/suite"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/sdk/activity"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
Expand All @@ -49,6 +50,8 @@ import (
"go.temporal.io/sdk/workflow"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/rpc"
)
Expand All @@ -59,10 +62,11 @@ type (
// not merely log an error
*require.Assertions
IntegrationBase
hostPort string
sdkClient sdkclient.Client
worker worker.Worker
taskQueue string
hostPort string
sdkClient sdkclient.Client
worker worker.Worker
taskQueue string
maxPendingChildExecutions int
}
)

Expand All @@ -77,6 +81,12 @@ func TestClientIntegrationSuite(t *testing.T) {
}

func (s *clientIntegrationSuite) SetupSuite() {
// maxPendingChildWorkflows should be NumPendingChildExecutionLimitError, but that value is 1000, and it takes
// too long to test (which is the point of this limit)
s.maxPendingChildExecutions = 10
s.dynamicConfigOverrides = map[dynamicconfig.Key]interface{}{
dynamicconfig.NumPendingChildExecutionLimitError: s.maxPendingChildExecutions,
}
s.setupSuite("testdata/clientintegrationtestcluster.yaml")

s.hostPort = "127.0.0.1:7134"
Expand All @@ -102,7 +112,10 @@ func (s *clientIntegrationSuite) SetupTest() {
}
s.sdkClient = sdkClient
s.taskQueue = s.randomizeStr("tq")
s.worker = worker.New(s.sdkClient, s.taskQueue, worker.Options{})

// We need to set this timeout to 0 to disable the deadlock detector. Otherwise, the deadlock detector will cause
// TestTooManyChildWorkflows to fail because it thinks there is a deadlocked due to the blocked child workflows.
s.worker = worker.New(s.sdkClient, s.taskQueue, worker.Options{DeadlockDetectionTimeout: 0})
if err := s.worker.Start(); err != nil {
s.Logger.Fatal("Error when start worker", tag.Error(err))
}
Expand Down Expand Up @@ -442,6 +455,118 @@ func (s *clientIntegrationSuite) TestClientDataConverter_WithChild() {
s.Equal(2, d.NumOfCallFromPayloads)
}

func (s *clientIntegrationSuite) TestTooManyChildWorkflows() {
// To ensure that there is one pending child workflow before we try to create the next one,
// we create a child workflow here that signals the parent when it has started and then blocks forever.
parentWorkflowId := "client-integration-too-many-child-workflows"
blockingChildWorkflow := func(ctx workflow.Context) error {
workflow.SignalExternalWorkflow(ctx, parentWorkflowId, "", "blocking-child-started", nil)
workflow.GetSignalChannel(ctx, "unblock-child").Receive(ctx, nil)
return nil
}
childWorkflow := func(ctx workflow.Context) error {
return nil
}

// define a workflow which creates N blocked children, and then tries to create another, which should fail because
// it's now past the limit
maxPendingChildWorkflows := s.maxPendingChildExecutions
parentWorkflow := func(ctx workflow.Context) error {
childStarted := workflow.GetSignalChannel(ctx, "blocking-child-started")
for i := 0; i < maxPendingChildWorkflows; i++ {
childOptions := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
WorkflowID: fmt.Sprintf("child-%d", i+1),
})
workflow.ExecuteChildWorkflow(childOptions, blockingChildWorkflow)
}
for i := 0; i < maxPendingChildWorkflows; i++ {
childStarted.Receive(ctx, nil)
}
return workflow.ExecuteChildWorkflow(workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
WorkflowID: fmt.Sprintf("child-%d", maxPendingChildWorkflows+1),
}), childWorkflow).Get(ctx, nil)
}

// register all the workflows
s.worker.RegisterWorkflow(blockingChildWorkflow)
s.worker.RegisterWorkflow(childWorkflow)
s.worker.RegisterWorkflow(parentWorkflow)

// start the parent workflow
timeout := time.Minute * 5
ctx, cancel := rpc.NewContextWithTimeoutAndVersionHeaders(timeout)
defer cancel()
options := sdkclient.StartWorkflowOptions{
ID: parentWorkflowId,
TaskQueue: s.taskQueue,
WorkflowRunTimeout: timeout,
}
future, err := s.sdkClient.ExecuteWorkflow(ctx, options, parentWorkflow)
s.NoError(err)

// retry policy for some asynchronous operations
retry := func(operationCtx backoff.OperationCtx) error {
return backoff.ThrottleRetryContext(
ctx,
operationCtx,
backoff.NewExponentialRetryPolicy(time.Second),
func(err error) bool {
// all errors are retryable
return true
},
)
}

// wait until the workflow task to create the child workflow is retried
s.NoError(retry(func(ctx context.Context) error {
result, err := s.sdkClient.DescribeWorkflowExecution(ctx, parentWorkflowId, "")
s.NoError(err)
if result.PendingWorkflowTask != nil {
if result.PendingWorkflowTask.Attempt > 1 {
s.Len(result.PendingChildren, s.maxPendingChildExecutions)
return nil
}
}
return fmt.Errorf("the task to create the child workflow was never retried")
}))

// unblock the last child, allowing it to complete, which lowers the number of pending child workflows
s.NoError(s.sdkClient.SignalWorkflow(
ctx,
fmt.Sprintf("child-%d", maxPendingChildWorkflows),
"",
"unblock-child",
nil,
))

// verify that the parent workflow completes soon after the number of pending child workflows drops
s.NoError(retry(func(ctx context.Context) error {
return future.Get(ctx, nil)
}))

// verify that the workflow's history contains a task that failed because it would otherwise exceed the pending
// child workflow limit
history := s.sdkClient.GetWorkflowHistory(
ctx,
parentWorkflowId,
"",
true,
enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
)
for history.HasNext() {
event, err := history.Next()
s.NoError(err)
switch a := event.Attributes.(type) {
case *historypb.HistoryEvent_WorkflowTaskFailedEventAttributes:
if a.WorkflowTaskFailedEventAttributes.Cause == enumspb.
WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED {
return
}
}
}
s.Fail("did not find a history event for the failure")
}

func (s *clientIntegrationSuite) Test_StickyWorkerRestartWorkflowTask() {
testCases := []struct {
name string
Expand Down
98 changes: 95 additions & 3 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ type (
historyEngine *historyEngineImpl
mockExecutionMgr *persistence.MockExecutionManager

config *configs.Config
logger log.Logger
config *configs.Config
logger *log.MockLogger
errorMessages []string
}
)

Expand Down Expand Up @@ -153,7 +154,14 @@ func (s *engine2Suite) SetupTest() {
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(false, common.EmptyVersion).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, tests.Version).Return(cluster.TestCurrentClusterName).AnyTimes()
s.workflowCache = workflow.NewCache(s.mockShard)
s.logger = s.mockShard.GetLogger()
s.logger = log.NewMockLogger(s.controller)
s.logger.EXPECT().Debug(gomock.Any(), gomock.Any()).AnyTimes()
s.logger.EXPECT().Info(gomock.Any(), gomock.Any()).AnyTimes()
s.logger.EXPECT().Warn(gomock.Any(), gomock.Any()).AnyTimes()
s.errorMessages = make([]string, 0)
s.logger.EXPECT().Error(gomock.Any(), gomock.Any()).AnyTimes().Do(func(msg string, tags ...tag.Tag) {
s.errorMessages = append(s.errorMessages, msg)
})

h := &historyEngineImpl{
currentClusterName: s.mockShard.GetClusterMetadata().GetCurrentClusterName(),
Expand Down Expand Up @@ -916,6 +924,90 @@ func (s *engine2Suite) TestRespondWorkflowTaskCompleted_StartChildWithSearchAttr
s.Nil(err)
}

func (s *engine2Suite) TestRespondWorkflowTaskCompleted_StartChildWorkflow_ExceedsLimit() {
namespaceID := tests.NamespaceID
taskQueue := "testTaskQueue"
identity := "testIdentity"
workflowType := "testWorkflowType"

we := commonpb.WorkflowExecution{
WorkflowId: tests.WorkflowID,
RunId: tests.RunID,
}
ms := workflow.TestLocalMutableState(
s.historyEngine.shard,
s.mockEventsCache,
tests.LocalNamespaceEntry,
log.NewTestLogger(),
we.GetRunId(),
)

addWorkflowExecutionStartedEvent(
ms,
we,
workflowType,
taskQueue,
nil,
time.Minute,
time.Minute,
time.Minute,
identity,
)

s.mockNamespaceCache.EXPECT().GetNamespace(tests.Namespace).Return(tests.LocalNamespaceEntry, nil).AnyTimes()

var commands []*commandpb.Command
for i := 0; i < 6; i++ {
commands = append(
commands,
&commandpb.Command{
CommandType: enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_StartChildWorkflowExecutionCommandAttributes{
StartChildWorkflowExecutionCommandAttributes: &commandpb.StartChildWorkflowExecutionCommandAttributes{
Namespace: tests.Namespace.String(),
WorkflowId: tests.WorkflowID,
WorkflowType: &commonpb.WorkflowType{Name: workflowType},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
}},
},
)
}

wt := addWorkflowTaskScheduledEvent(ms)
addWorkflowTaskStartedEvent(
ms,
wt.ScheduledEventID,
taskQueue,
identity,
)
taskToken := &tokenspb.Task{
Attempt: 1,
NamespaceId: namespaceID.String(),
WorkflowId: tests.WorkflowID,
RunId: we.GetRunId(),
ScheduledEventId: 2,
}
taskTokenBytes, _ := taskToken.Marshal()
response := &persistence.GetWorkflowExecutionResponse{State: workflow.TestCloneToProto(ms)}
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(response, nil).AnyTimes()

s.historyEngine.shard.GetConfig().NumPendingChildExecutionLimitError = func(namespace string) int {
return 5
}
_, err := s.historyEngine.RespondWorkflowTaskCompleted(metrics.AddMetricsContext(context.Background()), &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tests.NamespaceID.String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: taskTokenBytes,
Commands: commands,
Identity: identity,
},
})

s.Error(err)
s.Assert().Equal([]string{"the number of pending child workflow executions, 5, " +
"has reached the error limit of 5 established with \"limit.numPendingChildExecution.error\""}, s.errorMessages)
}

func (s *engine2Suite) TestStartWorkflowExecution_BrandNew() {
namespaceID := tests.NamespaceID
workflowID := "workflowID"
Expand Down
5 changes: 5 additions & 0 deletions service/history/workflowTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandStartChildWorkflow(
return handler.failWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_START_CHILD_EXECUTION_ATTRIBUTES, err)
}

// child workflow limit
if err := handler.sizeLimitChecker.checkIfNumChildWorkflowsExceedsLimit(); err != nil {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED, err)
}

enabled := handler.config.EnableParentClosePolicy(parentNamespace.String())
if enabled {
enums.SetDefaultParentClosePolicy(&attr.ParentClosePolicy)
Expand Down

0 comments on commit c82d43c

Please sign in to comment.