Skip to content

Commit

Permalink
Fail query fast if workflow task attempt greater or equals 3 (#4359)
Browse files Browse the repository at this point in the history
* Fail query fast if workflow task attempt greater or equals 3
  • Loading branch information
yiminc committed May 19, 2023
1 parent 9a6edae commit 680e4e1
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 8 deletions.
1 change: 1 addition & 0 deletions common/rpc/interceptor/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (ti *TelemetryInterceptor) handleError(
// we emit service_error_with_type metrics, no need to emit specific metric for these known error types.
case *serviceerror.AlreadyExists,
*serviceerror.CancellationAlreadyRequested,
*serviceerror.FailedPrecondition,
*serviceerror.NamespaceInvalidState,
*serviceerror.NamespaceNotActive,
*serviceerror.NamespaceNotFound,
Expand Down
13 changes: 11 additions & 2 deletions service/history/api/queryworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
querypb "go.temporal.io/api/query/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/common/log/tag"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
Expand All @@ -48,6 +49,9 @@ import (
"go.temporal.io/server/service/history/workflow"
)

// Fail query fast if workflow task keeps failing (attempt >= 3).
const failQueryWorkflowTaskAttemptCount = 3

func Invoke(
ctx context.Context,
request *historyservice.QueryWorkflowRequest,
Expand Down Expand Up @@ -117,10 +121,15 @@ func Invoke(
return nil, consts.ErrWorkflowTaskNotScheduled
}

if mutableState.IsTransientWorkflowTask() {
if mutableState.GetExecutionInfo().WorkflowTaskAttempt >= failQueryWorkflowTaskAttemptCount {
// while workflow task is failing, the query to that workflow will also fail. Failing fast here to prevent wasting
// resources to load history for a query that will fail.
return nil, serviceerror.NewFailedPrecondition("Cannot query workflow due to Workflow Task in failed state.")
shard.GetLogger().Info("Fail query fast due to WorkflowTask in failed state.",
tag.WorkflowNamespace(request.Request.Namespace),
tag.WorkflowNamespaceID(workflowKey.NamespaceID),
tag.WorkflowID(workflowKey.WorkflowID),
tag.WorkflowRunID(workflowKey.RunID))
return nil, serviceerror.NewWorkflowNotReady("Cannot query workflow due to Workflow Task in failed state.")
}

// There are two ways in which queries get dispatched to workflow worker. First, queries can be dispatched on workflow tasks.
Expand Down
14 changes: 8 additions & 6 deletions tests/query_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,10 @@ func (s *clientIntegrationSuite) TestQueryWorkflow_QueryFailedWorkflowTask() {

id := "test-query-failed-workflow-task"
workflowOptions := sdkclient.StartWorkflowOptions{
ID: id,
TaskQueue: s.taskQueue,
WorkflowRunTimeout: 20 * time.Second,
ID: id,
TaskQueue: s.taskQueue,
WorkflowTaskTimeout: time.Second * 1, // use shorter wft timeout to make this test faster
WorkflowRunTimeout: 20 * time.Second,
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand All @@ -291,9 +292,10 @@ func (s *clientIntegrationSuite) TestQueryWorkflow_QueryFailedWorkflowTask() {
s.NotNil(workflowRun)
s.True(workflowRun.GetRunID() != "")

// wait for workflow to fail
time.Sleep(time.Second * 2)
// wait for workflow task to fail 3 times
time.Sleep(time.Second * 3) // 1st_attempt, 0_delay, 2nd_attempt, 1s_delay, 3rd_attempt
_, err = s.sdkClient.QueryWorkflow(ctx, id, "", "test")
s.Error(err)
s.IsType(&serviceerror.FailedPrecondition{}, err)
s.IsType(&serviceerror.WorkflowNotReady{}, err)

}

0 comments on commit 680e4e1

Please sign in to comment.