Skip to content

Commit

Permalink
Fix query (#2826)
Browse files Browse the repository at this point in the history
Fix query for 2 cases:
1) No workflow task has ever scheduled, usually due to firstWorkflowTaskBackoff. 
    In this case, return WorkflowNotReady error.
2) No workflow task has ever started, usually due to race condition or worker is down.
    In this case, do not dispatch query directly, instead buffer query and sent the query with next workflow task. 
    The query might timeout if no worker pick up the scheduled workflow task.
  • Loading branch information
yiminc committed May 10, 2022
1 parent 153fb24 commit 668c851
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 16 deletions.
1 change: 1 addition & 0 deletions client/history/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ func (c *metricClient) finishMetricsRecording(
*serviceerror.NotFound,
*serviceerror.QueryFailed,
*serviceerror.NamespaceNotFound,
*serviceerror.WorkflowNotReady,
*serviceerror.WorkflowExecutionAlreadyStarted:
// noop - not interest and too many logs
default:
Expand Down
3 changes: 2 additions & 1 deletion common/rpc/interceptor/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ func (ti *TelemetryInterceptor) handleError(
}

switch err := err.(type) {
case *serviceerrors.StickyWorkerUnavailable:
case *serviceerror.WorkflowNotReady,
*serviceerrors.StickyWorkerUnavailable:
// we emit service_errors_with_type metrics, no need to emit specific metric for this error type.
// TODO deprecate all metrics below
case *serviceerrors.ShardOwnershipLost:
Expand Down
120 changes: 120 additions & 0 deletions host/query_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ package host

import (
"context"
"sync"
"sync/atomic"
"time"

commonpb "go.temporal.io/api/common/v1"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
"go.temporal.io/server/service/history/consts"

"go.temporal.io/server/common/log/tag"
)
Expand Down Expand Up @@ -138,3 +143,118 @@ func (s *clientIntegrationSuite) TestQueryWorkflow_Consistent_PiggybackQuery() {
// verify query sees all signals before it
s.Equal("pauseabc", queryResultStr)
}

func (s *clientIntegrationSuite) TestQueryWorkflow_QueryWhileBackoff() {
workflowFn := func(ctx workflow.Context) (string, error) {
workflow.SetQueryHandler(ctx, "test", func() (string, error) {
return "should-reach-here", nil
})
return "", temporal.NewApplicationError("retry-me", "test-error")
}

s.worker.RegisterWorkflow(workflowFn)

id := "test-query-before-backoff"
workflowOptions := sdkclient.StartWorkflowOptions{
ID: id,
TaskQueue: s.taskQueue,
WorkflowRunTimeout: 20 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 10 * time.Second,
},
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
workflowRun, err := s.sdkClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn)
if err != nil {
s.Logger.Fatal("Start workflow failed with err", tag.Error(err))
}

s.NotNil(workflowRun)
s.True(workflowRun.GetRunID() != "")

// wait until retry with backoff is scheduled
findBackoffWorkflow := false
for i := 0; i < 5; i++ {
historyEvents := s.getHistory(s.namespace, &commonpb.WorkflowExecution{
WorkflowId: id,
})
s.True(len(historyEvents) > 0)
startEvent := historyEvents[0]
startAttributes := startEvent.GetWorkflowExecutionStartedEventAttributes()
s.NotNil(startAttributes)
if startAttributes.FirstWorkflowTaskBackoff != nil && *startAttributes.FirstWorkflowTaskBackoff > 0 {
findBackoffWorkflow = true
break
}
// wait for the retry, which will have backoff
time.Sleep(time.Second)
}
s.True(findBackoffWorkflow)

_, err = s.sdkClient.QueryWorkflow(ctx, id, "", "test")
s.Error(err)
s.ErrorContains(err, consts.ErrWorkflowTaskNotScheduled.Error())
}

func (s *clientIntegrationSuite) TestQueryWorkflow_QueryBeforeStart() {
// stop the worker, so the workflow won't be started before query
s.worker.Stop()

workflowFn := func(ctx workflow.Context) (string, error) {
status := "initialized"
workflow.SetQueryHandler(ctx, "test", func() (string, error) {
return status, nil
})

status = "started"
workflow.Sleep(ctx, time.Hour)
return "", nil
}

id := "test-query-before-start"
workflowOptions := sdkclient.StartWorkflowOptions{
ID: id,
TaskQueue: s.taskQueue,
WorkflowRunTimeout: 20 * time.Second,
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
workflowRun, err := s.sdkClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn)
if err != nil {
s.Logger.Fatal("Start workflow failed with err", tag.Error(err))
}

s.NotNil(workflowRun)
s.True(workflowRun.GetRunID() != "")

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

startTime := time.Now()
queryResult, err := s.sdkClient.QueryWorkflow(ctx, id, "", "test")
endTime := time.Now()
s.NoError(err)
var queryResultStr string
err = queryResult.Get(&queryResultStr)
s.NoError(err)

// verify query sees all signals before it
s.Equal("started", queryResultStr)

s.True(endTime.Sub(startTime) > time.Second)
}()

// delay 2s to start worker, this will block query for 2s
time.Sleep(time.Second * 2)
s.worker = worker.New(s.sdkClient, s.taskQueue, worker.Options{})
s.worker.RegisterWorkflow(workflowFn)
if err := s.worker.Start(); err != nil {
s.Logger.Fatal("Error when start worker", tag.Error(err))
}

// wait query
wg.Wait()
}
2 changes: 2 additions & 0 deletions service/history/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ var (
ErrChildExecutionNotFound = serviceerror.NewNotFound("Pending child execution not found.")
// ErrWorkflowNotReady is error indicating workflow mutable state is missing necessary information for handling the request
ErrWorkflowNotReady = serviceerror.NewWorkflowNotReady("Workflow state is not ready to handle the request.")
// ErrWorkflowTaskNotScheduled is error indicating workflow task is not scheduled yet.
ErrWorkflowTaskNotScheduled = serviceerror.NewWorkflowNotReady("Workflow task is not scheduled yet.")

// FailedWorkflowStatuses is a set of failed workflow close states, used for start workflow policy
// for start workflow execution API
Expand Down
39 changes: 24 additions & 15 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ func (e *historyEngineImpl) QueryWorkflow(
}
}

de, err := e.shard.GetNamespaceRegistry().GetNamespaceByID(namespaceID)
nsEntry, err := e.shard.GetNamespaceRegistry().GetNamespaceByID(namespaceID)
if err != nil {
return nil, err
}
Expand All @@ -701,27 +701,36 @@ func (e *historyEngineImpl) QueryWorkflow(
return nil, err
}

if !mutableState.HasProcessedOrPendingWorkflowTask() {
// workflow has no workflow task ever scheduled, this usually is due to firstWorkflowTaskBackoff (cron / retry)
// in this case, don't buffer the query, because it is almost certain the query will time out.
return nil, consts.ErrWorkflowTaskNotScheduled
}

// There are two ways in which queries get dispatched to workflow worker. First, queries can be dispatched on workflow tasks.
// These workflow tasks potentially contain new events and queries. The events are treated as coming before the query in time.
// The second way in which queries are dispatched to workflow worker is directly through matching; in this approach queries can be
// dispatched to workflow worker immediately even if there are outstanding events that came before the query. The following logic
// is used to determine if a query can be safely dispatched directly through matching or must be dispatched on a workflow task.
//
// There are three cases in which a query can be dispatched directly through matching safely, without violating strong consistency level:
// 1. the namespace is not active, in this case history is immutable so a query dispatched at any time is consistent
// 2. the workflow is not running, whenever a workflow is not running dispatching query directly is consistent
// 3. if there is no pending or started workflow tasks it means no events came before query arrived, so its safe to dispatch directly
safeToDispatchDirectly := !de.ActiveInCluster(e.clusterMetadata.GetCurrentClusterName()) ||
!mutableState.IsWorkflowExecutionRunning() ||
(!mutableState.HasPendingWorkflowTask() && !mutableState.HasInFlightWorkflowTask())
if safeToDispatchDirectly {
release(nil)
msResp, err := e.getMutableState(ctx, namespaceID, *request.GetRequest().GetExecution())
if err != nil {
return nil, err
// Precondition to dispatch query directly to matching is workflow has at least one WorkflowTaskStarted event. Otherwise, sdk would panic.
if mutableState.GetPreviousStartedEventID() != common.EmptyEventID {
// There are three cases in which a query can be dispatched directly through matching safely, without violating strong consistency level:
// 1. the namespace is not active, in this case history is immutable so a query dispatched at any time is consistent
// 2. the workflow is not running, whenever a workflow is not running dispatching query directly is consistent
// 3. if there is no pending or started workflow tasks it means no events came before query arrived, so its safe to dispatch directly
safeToDispatchDirectly := !nsEntry.ActiveInCluster(e.clusterMetadata.GetCurrentClusterName()) ||
!mutableState.IsWorkflowExecutionRunning() ||
(!mutableState.HasPendingWorkflowTask() && !mutableState.HasInFlightWorkflowTask())
if safeToDispatchDirectly {
release(nil)
msResp, err := e.getMutableState(ctx, namespaceID, *request.GetRequest().GetExecution())
if err != nil {
return nil, err
}
req.Execution.RunId = msResp.Execution.RunId
return e.queryDirectlyThroughMatching(ctx, msResp, request.GetNamespaceId(), req, scope)
}
req.Execution.RunId = msResp.Execution.RunId
return e.queryDirectlyThroughMatching(ctx, msResp, request.GetNamespaceId(), req, scope)
}

// If we get here it means query could not be dispatched through matching directly, so it must block
Expand Down

0 comments on commit 668c851

Please sign in to comment.