Skip to content

Commit

Permalink
Add a per-workflow limit to in-flight updates (#4307)
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt McShane committed May 11, 2023
1 parent 2339531 commit 25b7cbc
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 108 deletions.
3 changes: 3 additions & 0 deletions common/dynamicconfig/constants.go
Expand Up @@ -556,6 +556,9 @@ const (
// DurableArchivalEnabled is the flag to enable durable archival
DurableArchivalEnabled = "history.durableArchivalEnabled"

// WorkflowExecutionMaxInFlightUpdates is the max number of updates that can be in-flight (admitted but not yet completed) for any given workflow execution.
WorkflowExecutionMaxInFlightUpdates = "history.maxInFlightUpdates"

// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize"
// ReplicatorMaxSkipTaskCount is maximum number of tasks that can be skipped during tasks pagination due to not meeting filtering conditions (e.g. missed namespace).
Expand Down
5 changes: 5 additions & 0 deletions service/history/configs/config.go
Expand Up @@ -297,6 +297,8 @@ type Config struct {
ArchivalProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
ArchivalProcessorArchiveDelay dynamicconfig.DurationPropertyFn
ArchivalBackendMaxRPS dynamicconfig.FloatPropertyFn

WorkflowExecutionMaxInFlightUpdates dynamicconfig.IntPropertyFnWithNamespaceFilter
}

const (
Expand Down Expand Up @@ -531,6 +533,9 @@ func NewConfig(
ArchivalProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorPollBackoffInterval, 5*time.Second),
ArchivalProcessorArchiveDelay: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorArchiveDelay, 5*time.Minute),
ArchivalBackendMaxRPS: dc.GetFloat64Property(dynamicconfig.ArchivalBackendMaxRPS, 10000.0),

// workflow update related
WorkflowExecutionMaxInFlightUpdates: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.WorkflowExecutionMaxInFlightUpdates, 10),
}

return cfg
Expand Down
6 changes: 6 additions & 0 deletions service/history/workflow/context.go
Expand Up @@ -851,11 +851,17 @@ func (c *ContextImpl) ReapplyEvents(

func (c *ContextImpl) UpdateRegistry(ctx context.Context) update.Registry {
if c.updateRegistry == nil {
nsIDStr := c.MutableState.GetNamespaceEntry().ID().String()
c.updateRegistry = update.NewRegistry(
c.MutableState,
update.WithLogger(c.logger),
update.WithMetrics(c.metricsHandler),
update.WithTracerProvider(trace.SpanFromContext(ctx).TracerProvider()),
update.WithInFlightLimit(
func() int {
return c.config.WorkflowExecutionMaxInFlightUpdates(nsIDStr)
},
),
)
}
return c.updateRegistry
Expand Down
26 changes: 26 additions & 0 deletions service/history/workflow/update/registry.go
Expand Up @@ -26,10 +26,14 @@ package update

import (
"context"
"fmt"
"math"
"sync"

"go.opentelemetry.io/otel/trace"
enumspb "go.temporal.io/api/enums/v1"
protocolpb "go.temporal.io/api/protocol/v1"
"go.temporal.io/api/serviceerror"
updatepb "go.temporal.io/api/update/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -77,13 +81,20 @@ type (
updates map[string]*Update
store UpdateStore
instrumentation instrumentation
maxInFlight func() int
}

regOpt func(*RegistryImpl)
)

//revive:disable:unexported-return I *want* it to be unexported

func WithInFlightLimit(f func() int) regOpt {
return func(r *RegistryImpl) {
r.maxInFlight = f
}
}

// WithLogger sets the log.Logger to be used by an UpdateRegistry and its
// Updates.
func WithLogger(l log.Logger) regOpt {
Expand Down Expand Up @@ -117,6 +128,7 @@ func NewRegistry(store UpdateStore, opts ...regOpt) *RegistryImpl {
updates: make(map[string]*Update),
store: store,
instrumentation: noopInstrumentation,
maxInFlight: func() int { return math.MaxInt },
}
for _, opt := range opts {
opt(r)
Expand All @@ -135,6 +147,9 @@ func (r *RegistryImpl) FindOrCreate(ctx context.Context, id string) (*Update, bo
if upd, ok := r.findLocked(ctx, id); ok {
return upd, true, nil
}
if err := r.admit(ctx); err != nil {
return nil, false, err
}
upd := New(id, r.remover(id), withInstrumentation(&r.instrumentation))
r.updates[id] = upd
return upd, false, nil
Expand Down Expand Up @@ -206,6 +221,17 @@ func (r *RegistryImpl) remover(id string) func() {
}
}

func (r *RegistryImpl) admit(context.Context) error {
max := r.maxInFlight()
if len(r.updates) >= max {
return serviceerror.NewResourceExhausted(
enumspb.RESOURCE_EXHAUSTED_CAUSE_CONCURRENT_LIMIT,
fmt.Sprintf("update concurrent in-flight limit has been reached (%v)", max),
)
}
return nil
}

func (r *RegistryImpl) findLocked(ctx context.Context, id string) (*Update, bool) {
upd, ok := r.updates[id]
if ok {
Expand Down

0 comments on commit 25b7cbc

Please sign in to comment.