Skip to content

Commit

Permalink
Enforce the per-workflow pending activity constraint
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Nov 17, 2022
1 parent 3ae2811 commit 206bf5b
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 37 deletions.
177 changes: 140 additions & 37 deletions host/client_integration_test.go
Expand Up @@ -37,6 +37,7 @@ import (
"testing"
"time"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -67,6 +68,9 @@ type (
worker worker.Worker
taskQueue string
maxPendingChildExecutions int
maxPendingActivities int
maxPendingCancelRequests int
maxPendingSignals int
}
)

Expand All @@ -81,11 +85,17 @@ func TestClientIntegrationSuite(t *testing.T) {
}

func (s *clientIntegrationSuite) SetupSuite() {
// maxPendingChildWorkflows should be NumPendingChildExecutionsLimit, but that value is 1000, and it takes
// too long to test (which is the point of this limit)
s.maxPendingChildExecutions = 10
// these limits are higher in production, but our tests would take too long if we set them that high
limit := 10
s.maxPendingChildExecutions = limit
s.maxPendingActivities = limit
s.maxPendingCancelRequests = limit
s.maxPendingSignals = limit
s.dynamicConfigOverrides = map[dynamicconfig.Key]interface{}{
dynamicconfig.NumPendingChildExecutionsLimitError: s.maxPendingChildExecutions,
dynamicconfig.NumPendingActivitiesLimitError: s.maxPendingActivities,
dynamicconfig.NumPendingCancelRequestsLimitError: s.maxPendingCancelRequests,
dynamicconfig.NumPendingSignalsLimitError: s.maxPendingSignals,
}
s.setupSuite("testdata/clientintegrationtestcluster.yaml")

Expand Down Expand Up @@ -114,7 +124,7 @@ func (s *clientIntegrationSuite) SetupTest() {
s.taskQueue = s.randomizeStr("tq")

// We need to set this timeout to 0 to disable the deadlock detector. Otherwise, the deadlock detector will cause
// TestTooManyChildWorkflows to fail because it thinks there is a deadlocked due to the blocked child workflows.
// TestTooManyChildWorkflows to fail because it thinks there is a deadlock due to the blocked child workflows.
s.worker = worker.New(s.sdkClient, s.taskQueue, worker.Options{DeadlockDetectionTimeout: 0})
if err := s.worker.Start(); err != nil {
s.Logger.Fatal("Error when start worker", tag.Error(err))
Expand Down Expand Up @@ -504,21 +514,8 @@ func (s *clientIntegrationSuite) TestTooManyChildWorkflows() {
future, err := s.sdkClient.ExecuteWorkflow(ctx, options, parentWorkflow)
s.NoError(err)

// retry policy for some asynchronous operations
retry := func(operationCtx backoff.OperationCtx) error {
return backoff.ThrottleRetryContext(
ctx,
operationCtx,
backoff.NewExponentialRetryPolicy(time.Second),
func(err error) bool {
// all errors are retryable
return true
},
)
}

// wait until the workflow task to create the child workflow is retried
s.NoError(retry(func(ctx context.Context) error {
// wait until we retry workflow task to create the last child workflow
s.eventuallySucceeds(ctx, func(ctx context.Context) error {
result, err := s.sdkClient.DescribeWorkflowExecution(ctx, parentWorkflowId, "")
s.NoError(err)
if result.PendingWorkflowTask != nil {
Expand All @@ -528,7 +525,7 @@ func (s *clientIntegrationSuite) TestTooManyChildWorkflows() {
}
}
return fmt.Errorf("the task to create the child workflow was never retried")
}))
})

// unblock the last child, allowing it to complete, which lowers the number of pending child workflows
s.NoError(s.sdkClient.SignalWorkflow(
Expand All @@ -540,31 +537,137 @@ func (s *clientIntegrationSuite) TestTooManyChildWorkflows() {
))

// verify that the parent workflow completes soon after the number of pending child workflows drops
s.NoError(retry(func(ctx context.Context) error {
s.eventuallySucceeds(ctx, func(ctx context.Context) error {
return future.Get(ctx, nil)
}))
})

// verify that the workflow's history contains a task that failed because it would otherwise exceed the pending
// child workflow limit
history := s.sdkClient.GetWorkflowHistory(
s.historyContainsFailureCausedBy(
ctx,
parentWorkflowId,
"",
true,
enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED,
)
for history.HasNext() {
event, err := history.Next()
}

// TestTooManyPendingActivities verifies that we don't allow users to schedule new activities when they've already
// reached the limit for pending activities.
func (s *clientIntegrationSuite) TestTooManyPendingActivities() {
timeout := time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

pendingActivities := make(chan activity.Info, s.maxPendingActivities)
pendingActivity := func(ctx context.Context) error {
pendingActivities <- activity.GetInfo(ctx)
return activity.ErrResultPending
}
s.worker.RegisterActivity(pendingActivity)
lastActivity := func(ctx context.Context) error {
return nil
}
s.worker.RegisterActivity(lastActivity)

readyToScheduleLastActivity := "ready-to-schedule-last-activity"
myWorkflow := func(ctx workflow.Context) error {
for i := 0; i < s.maxPendingActivities; i++ {
workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
ActivityID: fmt.Sprintf("pending-activity-%d", i),
}), pendingActivity)
}

workflow.GetSignalChannel(ctx, readyToScheduleLastActivity).Receive(ctx, nil)

return workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
ActivityID: "last-activity",
}), lastActivity).Get(ctx, nil)
}
s.worker.RegisterWorkflow(myWorkflow)

workflowId := uuid.New()
workflowRun, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
ID: workflowId,
TaskQueue: s.taskQueue,
}, myWorkflow)
s.NoError(err)

// wait until all of the activities are started (but not finished) before trying to schedule the last one
var activityInfo activity.Info
for i := 0; i < s.maxPendingActivities; i++ {
activityInfo = <-pendingActivities
}
s.NoError(s.sdkClient.SignalWorkflow(ctx, workflowId, "", readyToScheduleLastActivity, nil))

// verify that we can't finish the workflow yet
{
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*100)
defer cancel()
err = workflowRun.Get(ctx, nil)
s.Error(err, "the workflow should not be done while there are too many pending activities")
}

// wait until the workflow task to schedule the last activity is retried
s.eventuallySucceeds(ctx, func(ctx context.Context) error {
result, err := s.sdkClient.DescribeWorkflowExecution(ctx, workflowId, "")
s.NoError(err)
switch a := event.Attributes.(type) {
case *historypb.HistoryEvent_WorkflowTaskFailedEventAttributes:
if a.WorkflowTaskFailedEventAttributes.Cause == enumspb.
WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED {
return
if result.PendingWorkflowTask != nil {
if len(result.PendingActivities) == s.maxPendingActivities {
return nil
}
}
}
s.Fail("did not find a history event for the failure")
return fmt.Errorf("we haven't retried the task to schedule the last activity")
})

// mark one of the pending activities as complete and verify that the worfklow can now complete
s.NoError(s.sdkClient.CompleteActivity(ctx, activityInfo.TaskToken, nil, nil))
s.eventuallySucceeds(ctx, func(ctx context.Context) error {
return workflowRun.Get(ctx, nil)
})

// verify that the workflow's history contains a task that failed because it would otherwise exceed the pending
// child workflow limit
s.historyContainsFailureCausedBy(
ctx,
workflowId,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_ACTIVITIES_LIMIT_EXCEEDED,
)
}

func (s *clientIntegrationSuite) eventuallySucceeds(ctx context.Context, operationCtx backoff.OperationCtx) {
s.T().Helper()
s.NoError(backoff.ThrottleRetryContext(
ctx,
operationCtx,
backoff.NewExponentialRetryPolicy(time.Second),
func(err error) bool {
// all errors are retryable
return true
},
))
}

func (s *clientIntegrationSuite) historyContainsFailureCausedBy(ctx context.Context, parentWorkflowId string, cause enumspb.WorkflowTaskFailedCause) {
s.T().Helper()
s.eventuallySucceeds(ctx, func(ctx context.Context) error {
history := s.sdkClient.GetWorkflowHistory(
ctx,
parentWorkflowId,
"",
true,
enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
)
for history.HasNext() {
event, err := history.Next()
s.NoError(err)
switch a := event.Attributes.(type) {
case *historypb.HistoryEvent_WorkflowTaskFailedEventAttributes:
if a.WorkflowTaskFailedEventAttributes.Cause == cause {
return nil
}
}
}
return fmt.Errorf("did not find a failed task whose cause was %q", cause)
})
}

func (s *clientIntegrationSuite) Test_StickyWorkerRestartWorkflowTask() {
Expand Down
5 changes: 5 additions & 0 deletions service/history/workflowTaskHandler.go
Expand Up @@ -264,6 +264,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandScheduleActivity(
); err != nil {
return nil, handler.failWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_ACTIVITY_ATTRIBUTES, err)
}
if err := handler.sizeLimitChecker.checkIfNumPendingActivitiesExceedsLimit(); err != nil {
handler.workflowTaskFailedCause = NewWorkflowTaskFailedCause(enumspb.
WORKFLOW_TASK_FAILED_CAUSE_PENDING_ACTIVITIES_LIMIT_EXCEEDED, err)
return nil, nil
}

enums.SetDefaultTaskQueueKind(&attr.GetTaskQueue().Kind)

Expand Down

0 comments on commit 206bf5b

Please sign in to comment.