Skip to content

Commit

Permalink
Fix 5s delay on sticky queue (#2811)
Browse files Browse the repository at this point in the history
Sticky worker mark workflows as running on sticky queue. But when sticky worker
is gone (gracefully shutdown or forcefully killed), they don't notify server. 
When new workflow task or query is scheduled, server still dispatch them to the 
old sticky queue which now has no worker working on and will only result in 5s 
sticky queue's schedule_to_start timeout. No mater how long after the old 
sticky worker is gone, there is always that 5s delay. This PR fix that by 
checking if if there is no inflight poller for the sticky queue, and no poller 
is seem within 10s. In that case, matching would return a new 
StickyWorkerUnavailable error and history would retry that by using original 
task queue.
  • Loading branch information
yiminc committed May 7, 2022
1 parent 1f44265 commit 95dfa07
Show file tree
Hide file tree
Showing 17 changed files with 580 additions and 1,373 deletions.
219 changes: 190 additions & 29 deletions api/errordetails/v1/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/history/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ func (c *metricClient) finishMetricsRecording(
case *serviceerror.Canceled,
*serviceerror.DeadlineExceeded,
*serviceerror.NotFound,
*serviceerror.QueryFailed,
*serviceerror.NamespaceNotFound,
*serviceerror.WorkflowExecutionAlreadyStarted:
// noop - not interest and too many logs
Expand Down
5 changes: 4 additions & 1 deletion client/matching/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
serviceerrors "go.temporal.io/server/common/serviceerror"
"google.golang.org/grpc"

"go.temporal.io/server/api/matchingservice/v1"
Expand Down Expand Up @@ -258,10 +259,12 @@ func (c *metricClient) finishMetricsRecording(
) {
if err != nil {
switch err.(type) {
case *serviceerror.Canceled,
case *serviceerrors.StickyWorkerUnavailable,
*serviceerror.Canceled,
*serviceerror.DeadlineExceeded,
*serviceerror.NotFound,
*serviceerror.NamespaceNotFound,
*serviceerror.QueryFailed,
*serviceerror.WorkflowExecutionAlreadyStarted:
// noop - not interest and too many logs
default:
Expand Down
3 changes: 3 additions & 0 deletions common/rpc/interceptor/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ func (ti *TelemetryInterceptor) handleError(
}

switch err := err.(type) {
case *serviceerrors.StickyWorkerUnavailable:
// we emit service_errors_with_type metrics, no need to emit specific metric for this error type.
// TODO deprecate all metrics below
case *serviceerrors.ShardOwnershipLost:
scope.IncCounter(metrics.ServiceErrShardOwnershipLostCounter)
case *serviceerrors.TaskAlreadyStarted:
Expand Down
5 changes: 5 additions & 0 deletions common/serviceerror/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func FromStatus(st *status.Status) error {
case *errordetails.RetryReplicationFailure:
return newRetryReplication(st, errDetails)
}
case codes.Unavailable:
switch errDetails.(type) {
case *errordetails.StickyWorkerUnavailableFailure:
return newStickyWorkerUnavailable(st)
}
}

return serviceerror.FromStatus(st)
Expand Down

0 comments on commit 95dfa07

Please sign in to comment.