Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions internal/temporalcli/commands.workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,13 +542,19 @@ func (s *SingleWorkflowOrBatchOptions) workflowExecOrBatch(
return nil, nil, fmt.Errorf("cannot set run ID when query is set")
}

// Count the workflows that will be affected
count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: s.Query})
if err != nil {
return nil, nil, fmt.Errorf("failed counting workflows from query: %w", err)
// The count is only used in the confirmation prompt; skip the request when --yes
// bypasses it, so batch jobs can still proceed if the visibility API is timing out.
var promptMessage string
if s.Yes {
promptMessage = fmt.Sprintf("Start batch against workflows matching query %q? y/N", s.Query)
} else {
count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: s.Query})
if err != nil {
return nil, nil, fmt.Errorf("failed counting workflows from query: %w", err)
}
promptMessage = fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count)
}
yes, err := cctx.promptYes(
fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count), s.Yes)
yes, err := cctx.promptYes(promptMessage, s.Yes)
if err != nil {
return nil, nil, err
} else if !yes {
Expand Down
17 changes: 12 additions & 5 deletions internal/temporalcli/commands.workflow_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,19 @@ func (c *TemporalWorkflowResetCommand) runBatchResetWithPostOps(cctx *CommandCon
PostResetOperations: postOps,
},
}
count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: c.Query})
if err != nil {
return fmt.Errorf("failed counting workflows from query: %w", err)
// The count is only used in the confirmation prompt; skip the request when --yes
// bypasses it, so batch jobs can still proceed if the visibility API is timing out.
var promptMessage string
if c.Yes {
promptMessage = fmt.Sprintf("Start batch against workflows matching query %q? y/N", c.Query)
} else {
count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: c.Query})
if err != nil {
return fmt.Errorf("failed counting workflows from query: %w", err)
}
promptMessage = fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count)
}
yes, err := cctx.promptYes(
fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count), c.Yes)
yes, err := cctx.promptYes(promptMessage, c.Yes)
if err != nil {
return err
}
Expand Down
79 changes: 79 additions & 0 deletions internal/temporalcli/commands.workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,85 @@ func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess_JSON() {
s.NotEmpty(jsonRes["batchJobId"])
}

// TestWorkflow_Terminate_BatchWorkflow_SkipsCountWhenYes verifies that --yes causes
// the batch terminate command to skip the CountWorkflowExecutions call. The count
// is only used for the "Start batch against approximately N workflow(s)?" prompt;
// when --yes bypasses the prompt, issuing it adds latency and prevents batch jobs
// from running on clusters whose visibility API is timing out.
func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflow_SkipsCountWhenYes() {
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
ctx.Done().Receive(ctx, nil)
return nil, ctx.Err()
})

var callLock sync.Mutex
var countCalls, startBatchCalls int
s.CommandHarness.Options.AdditionalClientGRPCDialOptions = append(
s.CommandHarness.Options.AdditionalClientGRPCDialOptions,
grpc.WithChainUnaryInterceptor(func(
ctx context.Context,
method string, req, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
) error {
callLock.Lock()
switch req.(type) {
case *workflowservice.CountWorkflowExecutionsRequest:
countCalls++
case *workflowservice.StartBatchOperationRequest:
startBatchCalls++
}
callLock.Unlock()
return invoker(ctx, method, req, reply, cc, opts...)
}),
)

// Start a workflow so the batch query has at least one match. The count assertion
// is independent of the match count (it asserts zero CountWorkflow calls regardless),
// but executing the batch against a real workflow keeps the test path realistic.
searchAttr := "keyword-" + uuid.NewString()
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{
TaskQueue: s.Worker().Options.TaskQueue,
SearchAttributes: map[string]any{"CustomKeywordField": searchAttr},
},
DevWorkflow,
"ignored",
)
s.NoError(err)
s.Eventually(func() bool {
resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{
Query: "CustomKeywordField = '" + searchAttr + "'",
})
s.NoError(err)
return len(resp.Executions) == 1
}, 3*time.Second, 100*time.Millisecond)

res := s.Execute(
"workflow", "terminate",
"--address", s.Address(),
"--query", "CustomKeywordField = '"+searchAttr+"'",
"--reason", "skip-count-test",
"--yes",
)
s.NoError(res.Err)

callLock.Lock()
defer callLock.Unlock()
s.Equal(0, countCalls, "CountWorkflowExecutions must not be called when --yes is set")
s.Equal(1, startBatchCalls, "StartBatchOperation must still be called")

// Sanity-check: the prompt text should reflect the missing count.
s.NotContains(res.Stdout.String(), "approximately")
s.Contains(res.Stdout.String(), "matching query")

// Drain the workflow so the test fixture cleans up.
s.Eventually(func() bool {
err := run.Get(s.Context, nil)
return err != nil
}, 5*time.Second, 100*time.Millisecond)
}

func (s *SharedServerSuite) testTerminateBatchWorkflow(
total int,
rps float32,
Expand Down