Skip to content

Commit

Permalink
Enforce the per-workflow pending signal limit
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Nov 18, 2022
1 parent e465c2e commit 1a08613
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 87 deletions.
139 changes: 52 additions & 87 deletions host/client_integration_test.go
Expand Up @@ -49,6 +49,7 @@ import (
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
"go.uber.org/multierr"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/common"
Expand Down Expand Up @@ -698,108 +699,72 @@ func (s *clientIntegrationSuite) TestTooManyCancelRequests() {
})
}

func (s *clientIntegrationSuite) TestTooManyCancelRequests() {
// set a timeout for this whole test
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
func (s *clientIntegrationSuite) TestTooManyPendingSignals() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

// create a large number of blocked workflows
numTargetWorkflows := 50 // should be much greater than s.maxPendingCancelRequests
targetWorkflow := func(ctx workflow.Context) error {
return workflow.Await(ctx, func() bool {
return false
})
}
s.worker.RegisterWorkflow(targetWorkflow)
for i := 0; i < numTargetWorkflows; i++ {
_, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
ID: fmt.Sprintf("workflow-%d", i),
TaskQueue: s.taskQueue,
}, targetWorkflow)
s.NoError(err)
}

// define a workflow that attempts to cancel a given subsequence of the blocked workflows
cancelWorkflowsInRange := func(ctx workflow.Context, start, stop int) error {
receiverId := "receiver-id"
signalName := "my-signal"
sender := func(ctx workflow.Context, n int) error {
var futures []workflow.Future
for i := start; i < stop; i++ {
future := workflow.RequestCancelExternalWorkflow(ctx, fmt.Sprintf("workflow-%d", i), "")
for i := 0; i < n; i++ {
future := workflow.SignalExternalWorkflow(ctx, receiverId, "", signalName, nil)
futures = append(futures, future)
}
var errs error
for _, future := range futures {
if err := future.Get(ctx, nil); err != nil {
return err
}
err := future.Get(ctx, nil)
errs = multierr.Combine(errs, err)
}
return nil
return errs
}
s.worker.RegisterWorkflow(cancelWorkflowsInRange)
s.worker.RegisterWorkflow(sender)

// try to cancel all the workflows at once and verify that we can't because of the limit violation
s.Run("CancelAllWorkflowsAtOnce", func() {
cancelerWorkflowId := "canceler-workflow-id"
run, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
receiver := func(ctx workflow.Context) error {
channel := workflow.GetSignalChannel(ctx, signalName)
for {
channel.Receive(ctx, nil)
}
}
s.worker.RegisterWorkflow(receiver)
_, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
TaskQueue: s.taskQueue,
ID: receiverId,
}, receiver)
s.NoError(err)

successTimeout := time.Second * 5
s.Run("TooManySignals", func() {
senderId := "sender-1"
senderRun, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
TaskQueue: s.taskQueue,
ID: cancelerWorkflowId,
}, cancelWorkflowsInRange, 0, numTargetWorkflows)
ID: senderId,
}, sender, s.maxPendingSignals+1)
s.NoError(err)
s.workflowTaskEventuallyRetried(ctx, cancelerWorkflowId)
s.historyContainsFailureCausedBy(ctx, cancelerWorkflowId, enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_REQUEST_CANCEL_LIMIT_EXCEEDED)
{
ctx, cancel := context.WithTimeout(ctx, time.Second*3)
ctx, cancel := context.WithTimeout(ctx, successTimeout)
defer cancel()
s.Error(run.Get(ctx, nil))
}
namespaceID := s.getNamespaceID(s.namespace)
shardID := common.WorkflowIDToHistoryShard(namespaceID, cancelerWorkflowId, s.testClusterConfig.HistoryConfig.NumHistoryShards)
workflowExecution, err := s.testCluster.GetExecutionManager().GetWorkflowExecution(ctx, &persistence.GetWorkflowExecutionRequest{
ShardID: shardID,
NamespaceID: namespaceID,
WorkflowID: cancelerWorkflowId,
RunID: run.GetRunID(),
})
s.NoError(err)
numCancelRequests := len(workflowExecution.State.RequestCancelInfos)
s.Assert().Zero(numCancelRequests)
err = s.sdkClient.CancelWorkflow(ctx, cancelerWorkflowId, "")
s.NoError(err)
})

// try to cancel all the workflows in separate batches of cancel workflows and verify that it works
s.Run("CancelWorkflowsInSeparateBatches", func() {
var runs []sdkclient.WorkflowRun
var stop int
for start := 0; start < numTargetWorkflows; start = stop {
stop = start + s.maxPendingCancelRequests
if stop > numTargetWorkflows {
stop = numTargetWorkflows
}
run, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
TaskQueue: s.taskQueue,
}, cancelWorkflowsInRange, start, stop)
s.NoError(err)
runs = append(runs, run)
}

for _, run := range runs {
s.NoError(run.Get(ctx, nil))
err := senderRun.Get(ctx, nil)
s.Error(err)
}
s.historyContainsFailureCausedBy(
ctx,
senderId,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_SIGNALS_LIMIT_EXCEEDED,
)
s.NoError(s.sdkClient.CancelWorkflow(ctx, senderId, ""))
})
}

func (s *clientIntegrationSuite) workflowTaskEventuallyRetried(ctx context.Context, cancelerWorkflowId string) {
s.T().Helper()
s.eventuallySucceeds(ctx, func(ctx context.Context) error {
result, err := s.sdkClient.DescribeWorkflowExecution(ctx, cancelerWorkflowId, "")
if err != nil {
return err
}
if result.PendingWorkflowTask != nil {
if result.PendingWorkflowTask.Attempt > 1 {
return nil
}
}
return fmt.Errorf("workflow task was never retried")
s.Run("NotTooManySignals", func() {
senderID := "sender-2"
senderRun, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
TaskQueue: s.taskQueue,
ID: senderID,
}, sender, s.maxPendingSignals)
s.NoError(err)
ctx, cancel := context.WithTimeout(ctx, successTimeout)
defer cancel()
err = senderRun.Get(ctx, nil)
s.NoError(err)
})
}

Expand Down
3 changes: 3 additions & 0 deletions service/history/workflowTaskHandler.go
Expand Up @@ -955,6 +955,9 @@ func (handler *workflowTaskHandlerImpl) handleCommandSignalExternalWorkflow(
); err != nil || handler.stopProcessing {
return err
}
if err := handler.sizeLimitChecker.checkIfNumPendingSignalsExceedsLimit(); err != nil {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_SIGNALS_LIMIT_EXCEEDED, err)
}

if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit(
metrics.CommandTypeTag(enumspb.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION.String()),
Expand Down

0 comments on commit 1a08613

Please sign in to comment.