Skip to content

Commit

Permalink
Refactor frontend poll wf tq (#4992)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
Removed direct calls to persistence for `PollWorkflowTaskQueue` out of
frontend service
The logic to get histories was moved to matching service
Matching service now includes history as part of its
`PollWorkflowTaskQueueResponse`
Actual behavior should be unchanged. Existing logic is kept as part of
`workflow_handler_deprecated` since it will be needed if frontend gets a
response from an older matching instance without a history.
Uses updated protos from
#4968

<!-- Tell your future self why have you made these changes -->
**Why?**
Frontend shouldn't be responsible for doing database queries.

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
Existing tests and new test for error returned to matching from
persistence

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**
Inconsistent handling of errors between old logic and new

<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
No
  • Loading branch information
pdoerner committed Oct 26, 2023
1 parent a14d0ef commit 949b0f9
Show file tree
Hide file tree
Showing 19 changed files with 1,050 additions and 522 deletions.
960 changes: 545 additions & 415 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Expand Up @@ -428,6 +428,8 @@ const (
MatchingLongPollExpirationInterval = "matching.longPollExpirationInterval"
// MatchingSyncMatchWaitDuration is to wait time for sync match
MatchingSyncMatchWaitDuration = "matching.syncMatchWaitDuration"
// MatchingHistoryMaxPageSize is the maximum page size of history events returned on PollWorkflowTaskQueue requests
MatchingHistoryMaxPageSize = "matching.historyMaxPageSize"
// MatchingLoadUserData can be used to entirely disable loading user data from persistence (and the inter node RPCs
// that propoagate it). When turned off, features that rely on user data (e.g. worker versioning) will essentially
// be disabled. When disabled, matching will drop tasks for versioned workflows and activities to avoid breaking
Expand Down
18 changes: 17 additions & 1 deletion common/dynamicconfig/shared_constants.go
Expand Up @@ -24,7 +24,12 @@

package dynamicconfig

import "go.temporal.io/server/common/primitives"
import (
"math/rand"

"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/primitives"
)

const GlobalDefaultNumTaskQueuePartitions = 4

Expand Down Expand Up @@ -112,3 +117,14 @@ var DefaultDynamicRateLimitingParams = map[string]interface{}{
dynamicRateLimitMultiMinKey: dynamicRateLimitMultiMinDefault,
dynamicRateLimitMultiMaxKey: dynamicRateLimitMultiMaxDefault,
}

// AccessHistory is an interim config helper for dialing fraction of FE->History calls
// DEPRECATED: Remove once migration is complete
func AccessHistory(accessHistoryFraction FloatPropertyFn, metricsHandler metrics.Handler) bool {
if rand.Float64() < accessHistoryFraction() {
metricsHandler.Counter(metrics.AccessHistoryNew).Record(1)
return true
}
metricsHandler.Counter(metrics.AccessHistoryOld).Record(1)
return false
}
2 changes: 2 additions & 0 deletions common/metrics/metric_defs.go
Expand Up @@ -1121,4 +1121,6 @@ const (
FrontendGetWorkflowExecutionHistoryTag = "GetWorkflowExecutionHistory"
FrontendGetWorkflowExecutionHistoryReverseTag = "GetWorkflowExecutionHistoryReverse"
FrontendRespondWorkflowTaskCompletedTag = "RespondWorkflowTaskCompleted"
MatchingPollWorkflowTaskQueueTag = "PollWorkflowTaskQueue"
HistoryHandleWorkflowTaskStartedTag = "HandleWorkflowTaskStarted"
)
2 changes: 2 additions & 0 deletions common/util.go
Expand Up @@ -522,6 +522,8 @@ func CreateMatchingPollWorkflowTaskQueueResponse(historyResponse *historyservice
StartedTime: historyResponse.StartedTime,
Queries: historyResponse.Queries,
Messages: historyResponse.Messages,
History: historyResponse.History,
NextPageToken: historyResponse.NextPageToken,
}

return matchingResp
Expand Down
Expand Up @@ -180,6 +180,8 @@ message RecordWorkflowTaskStartedResponse {
temporal.server.api.clock.v1.VectorClock clock = 15;
repeated temporal.api.protocol.v1.Message messages = 16;
int64 version = 17;
temporal.api.history.v1.History history = 18;
bytes next_page_token = 19;
}

message RecordActivityTaskStartedRequest {
Expand Down
5 changes: 3 additions & 2 deletions service/frontend/admin_handler.go
Expand Up @@ -69,6 +69,7 @@ import (
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -935,7 +936,7 @@ func (adh *AdminHandler) GetWorkflowExecutionRawHistoryV2(ctx context.Context, r
return nil, err
}

if adh.config.accessHistory(adh.metricsHandler.WithTags(metrics.OperationTag(metrics.AdminGetWorkflowExecutionRawHistoryV2Tag))) {
if dynamicconfig.AccessHistory(adh.config.AccessHistoryFraction, adh.metricsHandler.WithTags(metrics.OperationTag(metrics.AdminGetWorkflowExecutionRawHistoryV2Tag))) {
response, err := adh.historyClient.GetWorkflowExecutionRawHistoryV2(ctx,
&historyservice.GetWorkflowExecutionRawHistoryV2Request{
NamespaceId: request.NamespaceId,
Expand Down Expand Up @@ -1586,7 +1587,7 @@ func (adh *AdminHandler) DeleteWorkflowExecution(
return nil, err
}

if adh.config.accessHistory(adh.metricsHandler.WithTags(metrics.OperationTag(metrics.AdminDeleteWorkflowExecutionTag))) {
if dynamicconfig.AccessHistory(adh.config.AccessHistoryFraction, adh.metricsHandler.WithTags(metrics.OperationTag(metrics.AdminDeleteWorkflowExecutionTag))) {
response, err := adh.historyClient.ForceDeleteWorkflowExecution(ctx,
&historyservice.ForceDeleteWorkflowExecutionRequest{
NamespaceId: namespaceID.String(),
Expand Down
11 changes: 0 additions & 11 deletions service/frontend/service.go
Expand Up @@ -25,7 +25,6 @@
package frontend

import (
"math/rand"
"net"
"os"
"sync"
Expand Down Expand Up @@ -430,16 +429,6 @@ func (s *Service) Stop() {
s.logger.Info("frontend stopped")
}

// DEPRECATED: remove interim dynamic config helper for dialing fraction of FE->History calls
func (c *Config) accessHistory(metricsHandler metrics.Handler) bool {
if rand.Float64() < c.AccessHistoryFraction() {
metricsHandler.Counter(metrics.AccessHistoryNew).Record(1)
return true
}
metricsHandler.Counter(metrics.AccessHistoryOld).Record(1)
return false
}

func (s *Service) GetFaultInjection() *client.FaultInjectionDataStoreFactory {
return s.faultInjectionDataStoreFactory
}
86 changes: 82 additions & 4 deletions service/frontend/workflow_handler.go
Expand Up @@ -446,7 +446,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
}
}

if wh.config.accessHistory(wh.metricsScope(ctx).WithTags(metrics.OperationTag(metrics.FrontendGetWorkflowExecutionHistoryTag))) {
if dynamicconfig.AccessHistory(wh.config.AccessHistoryFraction, wh.metricsScope(ctx).WithTags(metrics.OperationTag(metrics.FrontendGetWorkflowExecutionHistoryTag))) {
response, err := wh.historyClient.GetWorkflowExecutionHistory(ctx,
&historyservice.GetWorkflowExecutionHistoryRequest{
NamespaceId: namespaceID.String(),
Expand Down Expand Up @@ -491,7 +491,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistoryReverse(ctx context.Contex
request.MaximumPageSize = common.GetHistoryMaxPageSize
}

if wh.config.accessHistory(wh.metricsScope(ctx).WithTags(metrics.OperationTag(metrics.FrontendGetWorkflowExecutionHistoryReverseTag))) {
if dynamicconfig.AccessHistory(wh.config.AccessHistoryFraction, wh.metricsScope(ctx).WithTags(metrics.OperationTag(metrics.FrontendGetWorkflowExecutionHistoryReverseTag))) {
response, err := wh.historyClient.GetWorkflowExecutionHistoryReverse(ctx,
&historyservice.GetWorkflowExecutionHistoryReverseRequest{
NamespaceId: namespaceID.String(),
Expand Down Expand Up @@ -537,7 +537,85 @@ func (wh *WorkflowHandler) PollWorkflowTaskQueue(ctx context.Context, request *w
return nil, err
}

return wh.pollWorkflowTaskQueue(ctx, request)
callTime := time.Now().UTC()

namespaceEntry, err := wh.namespaceRegistry.GetNamespace(namespace.Name(request.GetNamespace()))
if err != nil {
return nil, err
}
namespaceID := namespaceEntry.ID()

wh.logger.Debug("Poll workflow task queue.", tag.WorkflowNamespace(namespaceEntry.Name().String()), tag.WorkflowNamespaceID(namespaceID.String()))
if err := wh.checkBadBinary(namespaceEntry, request.GetBinaryChecksum()); err != nil {
return nil, err
}

if contextNearDeadline(ctx, longPollTailRoom) {
return &workflowservice.PollWorkflowTaskQueueResponse{}, nil
}

pollerID := uuid.New()
matchingResp, err := wh.matchingClient.PollWorkflowTaskQueue(ctx, &matchingservice.PollWorkflowTaskQueueRequest{
NamespaceId: namespaceID.String(),
PollerId: pollerID,
PollRequest: request,
})
if err != nil {
contextWasCanceled := wh.cancelOutstandingPoll(ctx, namespaceID, enumspb.TASK_QUEUE_TYPE_WORKFLOW, request.TaskQueue, pollerID)
if contextWasCanceled {
// Clear error as we don't want to report context cancellation error to count against our SLA.
// It doesn't matter what to return here, client has already gone. But (nil,nil) is invalid gogo return pair.
return &workflowservice.PollWorkflowTaskQueueResponse{}, nil
}

// These errors are expected from some versioning situations. We should not log them, it'd be too noisy.
var newerBuild *serviceerror.NewerBuildExists // expected when versioned poller is superceded
var failedPrecond *serviceerror.FailedPrecondition // expected when user data is disabled
if errors.As(err, &newerBuild) || errors.As(err, &failedPrecond) {
return nil, err
}

// For all other errors log an error and return it back to client.
ctxTimeout := "not-set"
ctxDeadline, ok := ctx.Deadline()
if ok {
ctxTimeout = ctxDeadline.Sub(callTime).String()
}
wh.logger.Error("Unable to call matching.PollWorkflowTaskQueue.",
tag.WorkflowTaskQueueName(request.GetTaskQueue().GetName()),
tag.Timeout(ctxTimeout),
tag.Error(err))
return nil, err
}

if matchingResp.History == nil {
// Got an old matching response, need to lookup history
// Eventually empty history will only happen for sticky query tasks
return wh.createPollWorkflowTaskQueueResponse(
ctx,
namespaceID,
matchingResp,
matchingResp.BranchToken,
)
}

return &workflowservice.PollWorkflowTaskQueueResponse{
TaskToken: matchingResp.TaskToken,
WorkflowExecution: matchingResp.WorkflowExecution,
WorkflowType: matchingResp.WorkflowType,
PreviousStartedEventId: matchingResp.PreviousStartedEventId,
StartedEventId: matchingResp.StartedEventId,
Query: matchingResp.Query,
BacklogCountHint: matchingResp.BacklogCountHint,
Attempt: matchingResp.Attempt,
History: matchingResp.History,
NextPageToken: matchingResp.NextPageToken,
WorkflowExecutionTaskQueue: matchingResp.WorkflowExecutionTaskQueue,
ScheduledTime: matchingResp.ScheduledTime,
StartedTime: matchingResp.StartedTime,
Queries: matchingResp.Queries,
Messages: matchingResp.Messages,
}, nil
}

func contextNearDeadline(ctx context.Context, tailroom time.Duration) bool {
Expand Down Expand Up @@ -577,7 +655,7 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(

wh.overrides.DisableEagerActivityDispatchForBuggyClients(ctx, request)

if wh.config.accessHistory(wh.metricsScope(ctx).WithTags(metrics.OperationTag(metrics.FrontendRespondWorkflowTaskCompletedTag))) {
if dynamicconfig.AccessHistory(wh.config.AccessHistoryFraction, wh.metricsScope(ctx).WithTags(metrics.OperationTag(metrics.FrontendRespondWorkflowTaskCompletedTag))) {
namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
if err != nil {
return nil, err
Expand Down
64 changes: 0 additions & 64 deletions service/frontend/workflow_handler_deprecated.go
Expand Up @@ -26,11 +26,7 @@ package frontend

import (
"context"
"errors"
"fmt"
"time"

"github.com/pborman/uuid"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -436,66 +432,6 @@ func (wh *WorkflowHandler) getWorkflowExecutionHistoryReverse(
}, nil
}

// DEPRECATED: TBD
func (wh *WorkflowHandler) pollWorkflowTaskQueue(ctx context.Context, request *workflowservice.PollWorkflowTaskQueueRequest) (_ *workflowservice.PollWorkflowTaskQueueResponse, retError error) {
callTime := time.Now().UTC()

namespaceEntry, err := wh.namespaceRegistry.GetNamespace(namespace.Name(request.GetNamespace()))
if err != nil {
return nil, err
}
namespaceID := namespaceEntry.ID()

wh.logger.Debug("Poll workflow task queue.", tag.WorkflowNamespace(namespaceEntry.Name().String()), tag.WorkflowNamespaceID(namespaceID.String()))
if err := wh.checkBadBinary(namespaceEntry, request.GetBinaryChecksum()); err != nil {
return nil, err
}

if contextNearDeadline(ctx, longPollTailRoom) {
return &workflowservice.PollWorkflowTaskQueueResponse{}, nil
}

pollerID := uuid.New()
matchingResp, err := wh.matchingClient.PollWorkflowTaskQueue(ctx, &matchingservice.PollWorkflowTaskQueueRequest{
NamespaceId: namespaceID.String(),
PollerId: pollerID,
PollRequest: request,
})
if err != nil {
contextWasCanceled := wh.cancelOutstandingPoll(ctx, namespaceID, enumspb.TASK_QUEUE_TYPE_WORKFLOW, request.TaskQueue, pollerID)
if contextWasCanceled {
// Clear error as we don't want to report context cancellation error to count against our SLA.
// It doesn't matter what to return here, client has already gone. But (nil,nil) is invalid gogo return pair.
return &workflowservice.PollWorkflowTaskQueueResponse{}, nil
}

// These errors are expected from some versioning situations. We should not log them, it'd be too noisy.
var newerBuild *serviceerror.NewerBuildExists // expected when versioned poller is superceded
var failedPrecond *serviceerror.FailedPrecondition // expected when user data is disabled
if errors.As(err, &newerBuild) || errors.As(err, &failedPrecond) {
return nil, err
}

// For all other errors log an error and return it back to client.
ctxTimeout := "not-set"
ctxDeadline, ok := ctx.Deadline()
if ok {
ctxTimeout = ctxDeadline.Sub(callTime).String()
}
wh.logger.Error("Unable to call matching.PollWorkflowTaskQueue.",
tag.WorkflowTaskQueueName(request.GetTaskQueue().GetName()),
tag.Timeout(ctxTimeout),
tag.Error(err))
return nil, err
}

resp, err := wh.createPollWorkflowTaskQueueResponse(ctx, namespaceID, matchingResp, matchingResp.GetBranchToken())
if err != nil {
return nil, err
}
return resp, nil
}

// DEPRECATED: DO NOT MODIFY UNLESS ALSO APPLIED TO WorkflowHandler.RespondWorkflowTaskCompleted() and ./service/history/workflowTaskHandlerCallbacks.go
func (wh *WorkflowHandler) respondWorkflowTaskCompleted(
ctx context.Context,
Expand Down
5 changes: 5 additions & 0 deletions service/history/configs/config.go
Expand Up @@ -317,6 +317,9 @@ type Config struct {
WorkflowExecutionMaxTotalUpdates dynamicconfig.IntPropertyFnWithNamespaceFilter

SendRawWorkflowHistory dynamicconfig.BoolPropertyFnWithNamespaceFilter

// FrontendAccessHistoryFraction is an interim flag across 2 minor releases and will be removed once fully enabled.
FrontendAccessHistoryFraction dynamicconfig.FloatPropertyFn
}

const (
Expand Down Expand Up @@ -576,6 +579,8 @@ func NewConfig(
WorkflowExecutionMaxTotalUpdates: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.WorkflowExecutionMaxTotalUpdates, 2000),

SendRawWorkflowHistory: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.SendRawWorkflowHistory, false),

FrontendAccessHistoryFraction: dc.GetFloat64Property(dynamicconfig.FrontendAccessHistoryFraction, 0.0),
}

return cfg
Expand Down

0 comments on commit 949b0f9

Please sign in to comment.