Skip to content

Commit

Permalink
Ignore caller context cancellation error (#1734)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Jul 16, 2021
1 parent b123ce4 commit b36dbeb
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 44 deletions.
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Expand Up @@ -858,3 +858,8 @@ func TLSKeyFile(filePath string) ZapTag {
func TLSCertFiles(filePaths []string) ZapTag {
return NewStringsTag("tls-cert-files", filePaths)
}

// Timeout returns tag for timeout
func Timeout(timeoutValue string) ZapTag {
return NewStringTag("timeout", timeoutValue)
}
95 changes: 51 additions & 44 deletions service/frontend/workflowHandler.go
Expand Up @@ -766,19 +766,22 @@ func (wh *WorkflowHandler) PollWorkflowTaskQueue(ctx context.Context, request *w

err = backoff.Retry(op, frontendServiceRetryPolicy, common.IsServiceTransientError)
if err != nil {
errCancel := wh.cancelOutstandingPoll(ctx, err, namespaceID, enumspb.TASK_QUEUE_TYPE_WORKFLOW, request.TaskQueue, pollerID)
if errCancel != nil {
// For all other errors log an error and return it back to client.
ctxTimeout := "not-set"
ctxDeadline, ok := ctx.Deadline()
if ok {
ctxTimeout = ctxDeadline.Sub(callTime).String()
}
wh.GetLogger().Error("PollWorkflowTaskQueue failed.",
tag.WorkflowTaskQueueName(request.GetTaskQueue().GetName()),
tag.Value(ctxTimeout),
tag.Error(errCancel))
contextWasCanceled := wh.cancelOutstandingPoll(ctx, namespaceID, enumspb.TASK_QUEUE_TYPE_WORKFLOW, request.TaskQueue, pollerID)
if contextWasCanceled {
// Clear error as we don't want to report context cancellation error to count against our SLA.
// It doesn't matter what to return here, client has already gone. But (nil,nil) is invalid gogo return pair.
return &workflowservice.PollWorkflowTaskQueueResponse{}, nil
}
// For all other errors log an error and return it back to client.
ctxTimeout := "not-set"
ctxDeadline, ok := ctx.Deadline()
if ok {
ctxTimeout = ctxDeadline.Sub(callTime).String()
}
wh.GetLogger().Error("Unable to call matching.PollWorkflowTaskQueue.",
tag.WorkflowTaskQueueName(request.GetTaskQueue().GetName()),
tag.Timeout(ctxTimeout),
tag.Error(err))
return nil, err
}

Expand Down Expand Up @@ -1028,19 +1031,24 @@ func (wh *WorkflowHandler) PollActivityTaskQueue(ctx context.Context, request *w

err = backoff.Retry(op, frontendServiceRetryPolicy, common.IsServiceTransientError)
if err != nil {
errCancel := wh.cancelOutstandingPoll(ctx, err, namespaceID, enumspb.TASK_QUEUE_TYPE_ACTIVITY, request.TaskQueue, pollerID)
if errCancel != nil {
// For all other errors log an error and return it back to client.
ctxTimeout := "not-set"
ctxDeadline, ok := ctx.Deadline()
if ok {
ctxTimeout = ctxDeadline.Sub(callTime).String()
}
wh.GetLogger().Error("PollActivityTaskQueue failed.",
tag.WorkflowTaskQueueName(request.GetTaskQueue().GetName()),
tag.Value(ctxTimeout),
tag.Error(errCancel))
contextWasCanceled := wh.cancelOutstandingPoll(ctx, namespaceID, enumspb.TASK_QUEUE_TYPE_ACTIVITY, request.TaskQueue, pollerID)
if contextWasCanceled {
// Clear error as we don't want to report context cancellation error to count against our SLA.
// It doesn't matter what to return here, client has already gone. But (nil,nil) is invalid gogo return pair.
return &workflowservice.PollActivityTaskQueueResponse{}, nil
}

// For all other errors log an error and return it back to client.
ctxTimeout := "not-set"
ctxDeadline, ok := ctx.Deadline()
if ok {
ctxTimeout = ctxDeadline.Sub(callTime).String()
}
wh.GetLogger().Error("Unable to call matching.PollActivityTaskQueue.",
tag.WorkflowTaskQueueName(request.GetTaskQueue().GetName()),
tag.Timeout(ctxTimeout),
tag.Error(err))

return nil, err
}
return &workflowservice.PollActivityTaskQueueResponse{
Expand Down Expand Up @@ -3385,29 +3393,28 @@ func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, namespa
pageSize > int32(wh.config.ESIndexMaxResultWindow())
}

func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, err error, namespaceID string, taskQueueType enumspb.TaskQueueType,
taskQueue *taskqueuepb.TaskQueue, pollerID string) error {
// cancelOutstandingPoll cancel outstanding poll if context was canceled and returns true. Otherwise returns false.
func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, namespaceID string, taskQueueType enumspb.TaskQueueType,
taskQueue *taskqueuepb.TaskQueue, pollerID string) bool {
// First check if this err is due to context cancellation. This means client connection to frontend is closed.
if ctx.Err() == context.Canceled {
// Our rpc stack does not propagates context cancellation to the other service. Lets make an explicit
// call to matching to notify this poller is gone to prevent any tasks being dispatched to zombie pollers.
_, err = wh.GetMatchingClient().CancelOutstandingPoll(context.Background(), &matchingservice.CancelOutstandingPollRequest{
NamespaceId: namespaceID,
TaskQueueType: taskQueueType,
TaskQueue: taskQueue,
PollerId: pollerID,
})
// We can not do much if this call fails. Just log the error and move on
if err != nil {
wh.GetLogger().Warn("Failed to cancel outstanding poller.",
tag.WorkflowTaskQueueName(taskQueue.GetName()), tag.Error(err))
}

// clear error as we don't want to report context cancellation error to count against our SLA
return nil
if ctx.Err() != context.Canceled {
return false
}
// Our rpc stack does not propagates context cancellation to the other service. Lets make an explicit
// call to matching to notify this poller is gone to prevent any tasks being dispatched to zombie pollers.
_, err := wh.GetMatchingClient().CancelOutstandingPoll(context.Background(), &matchingservice.CancelOutstandingPollRequest{
NamespaceId: namespaceID,
TaskQueueType: taskQueueType,
TaskQueue: taskQueue,
PollerId: pollerID,
})
// We can not do much if this call fails. Just log the error and move on.
if err != nil {
wh.GetLogger().Warn("Failed to cancel outstanding poller.",
tag.WorkflowTaskQueueName(taskQueue.GetName()), tag.Error(err))
}

return err
return true
}

func (wh *WorkflowHandler) checkBadBinary(namespaceEntry *cache.NamespaceCacheEntry, binaryChecksum string) error {
Expand Down

0 comments on commit b36dbeb

Please sign in to comment.