Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config for scheduler local activity sleep limit #5882

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2306,6 +2306,12 @@ If the service configures with archival feature enabled, update worker.historySc
30.0,
`SchedulerNamespaceStartWorkflowRPS is the per-namespace limit for starting workflows by schedules`,
)
SchedulerLocalActivitySleepLimit = NewNamespaceDurationSetting(
"worker.schedulerLocalActivitySleepLimit",
1*time.Second,
`How long to sleep within a local activity before pushing to workflow level sleep (don't make this
close to or more than the workflow task timeout)`,
)
WorkerDeleteNamespaceActivityLimitsConfig = NewGlobalMapSetting(
"worker.deleteNamespaceActivityLimitsConfig",
map[string]any{},
Expand Down
29 changes: 15 additions & 14 deletions service/worker/scheduler/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ type (
namespaceID namespace.ID
// Rate limiter for start workflow requests. Note that the scope is all schedules in
// this namespace on this worker.
startWorkflowRateLimiter quotas.RateLimiter
singleResultStorageSizePerNs dynamicconfig.IntPropertyFnWithNamespaceFilter
startWorkflowRateLimiter quotas.RateLimiter
maxBlobSize dynamicconfig.IntPropertyFn
localActivitySleepLimit dynamicconfig.DurationPropertyFn
}

errFollow string
Expand Down Expand Up @@ -118,7 +119,7 @@ func (a *activities) waitForRateLimiterPermission(req *schedspb.StartWorkflowReq
return translateError(errBlocked, "StartWorkflowExecution")
}
delay := reservation.Delay()
if delay > 1*time.Second {
if delay > a.localActivitySleepLimit() {
// for a long sleep, ask the workflow to do it in workflow logic
return temporal.NewNonRetryableApplicationError(
rateLimitedErrorType, rateLimitedErrorType, nil, rateLimitedDetails{Delay: delay})
Expand Down Expand Up @@ -178,7 +179,7 @@ func (a *activities) tryWatchWorkflow(ctx context.Context, req *schedspb.WatchWo
req,
pollRes.WorkflowStatus,
a.Logger,
a.singleResultStorageSizePerNs(a.namespace.String())-recordOverheadSize,
a.maxBlobSize()-recordOverheadSize,
)
if pollRes.WorkflowStatus == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
return rb.Build(nil)
Expand Down Expand Up @@ -297,23 +298,23 @@ func translateError(err error, msgPrefix string) error {
}

type responseBuilder struct {
request *schedspb.WatchWorkflowRequest
workflowStatus enumspb.WorkflowExecutionStatus
logger log.Logger
resultStorageNumberSize int
request *schedspb.WatchWorkflowRequest
workflowStatus enumspb.WorkflowExecutionStatus
logger log.Logger
maxBlobSize int
}

func newResponseBuilder(
request *schedspb.WatchWorkflowRequest,
workflowStatus enumspb.WorkflowExecutionStatus,
logger log.Logger,
resultStorageSize int,
maxBlobSize int,
) responseBuilder {
return responseBuilder{
request: request,
workflowStatus: workflowStatus,
logger: logger,
resultStorageNumberSize: resultStorageSize,
request: request,
workflowStatus: workflowStatus,
logger: logger,
maxBlobSize: maxBlobSize,
}
}

Expand Down Expand Up @@ -377,7 +378,7 @@ func (r responseBuilder) Build(event *historypb.HistoryEvent) (*schedspb.WatchWo
}

func (r responseBuilder) isTooBig(m proto.Message) bool {
return proto.Size(m) > r.resultStorageNumberSize
return proto.Size(m) > r.maxBlobSize
}

func (r responseBuilder) makeResponse(result *commonpb.Payloads, failure *failurepb.Failure) *schedspb.WatchWorkflowResponse {
Expand Down
22 changes: 13 additions & 9 deletions service/worker/scheduler/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package scheduler

import (
"fmt"
"time"

"go.uber.org/fx"

Expand Down Expand Up @@ -69,6 +70,7 @@ type (
enabledForNs dynamicconfig.BoolPropertyFnWithNamespaceFilter
globalNSStartWorkflowRPS dynamicconfig.FloatPropertyFnWithNamespaceFilter
maxBlobSize dynamicconfig.IntPropertyFnWithNamespaceFilter
localActivitySleepLimit dynamicconfig.DurationPropertyFnWithNamespaceFilter
}

activityDeps struct {
Expand All @@ -91,17 +93,18 @@ var Module = fx.Options(
)

func NewResult(
dcCollection *dynamicconfig.Collection,
dc *dynamicconfig.Collection,
specBuilder *SpecBuilder,
params activityDeps,
) fxResult {
return fxResult{
Component: &workerComponent{
specBuilder: specBuilder,
activityDeps: params,
enabledForNs: dynamicconfig.WorkerEnableScheduler.Get(dcCollection),
globalNSStartWorkflowRPS: dynamicconfig.SchedulerNamespaceStartWorkflowRPS.Get(dcCollection),
maxBlobSize: dynamicconfig.BlobSizeLimitError.Get(dcCollection),
enabledForNs: dynamicconfig.WorkerEnableScheduler.Get(dc),
globalNSStartWorkflowRPS: dynamicconfig.SchedulerNamespaceStartWorkflowRPS.Get(dc),
maxBlobSize: dynamicconfig.BlobSizeLimitError.Get(dc),
localActivitySleepLimit: dynamicconfig.SchedulerLocalActivitySleepLimit.Get(dc),
},
}
}
Expand All @@ -125,10 +128,11 @@ func (s *workerComponent) activities(name namespace.Name, id namespace.ID, detai
return float64(details.Multiplicity) * s.globalNSStartWorkflowRPS(name.String()) / float64(details.TotalWorkers)
}
return &activities{
activityDeps: s.activityDeps,
namespace: name,
namespaceID: id,
startWorkflowRateLimiter: quotas.NewDefaultOutgoingRateLimiter(localRPS),
singleResultStorageSizePerNs: s.maxBlobSize,
activityDeps: s.activityDeps,
namespace: name,
namespaceID: id,
startWorkflowRateLimiter: quotas.NewDefaultOutgoingRateLimiter(localRPS),
maxBlobSize: func() int { return s.maxBlobSize(name.String()) },
localActivitySleepLimit: func() time.Duration { return s.localActivitySleepLimit(name.String()) },
}
}
Loading