From 312da2e067497e6befb051ece9d48df4eba2b92c Mon Sep 17 00:00:00 2001 From: Tony Holdstock-Brown Date: Thu, 25 Apr 2024 21:58:37 +0100 Subject: [PATCH] Allow for increased steps based off of identifiers (#1304) * Allow for increased steps based off of identifiers This allows customisation of step limits dependent on various factors in the identifier. Note that defaults do not change * lints --- pkg/devserver/devserver.go | 2 +- pkg/execution/executor/executor.go | 7 ++----- pkg/execution/executor/validate.go | 17 ++++++++++++++++- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/pkg/devserver/devserver.go b/pkg/devserver/devserver.go index 67dc73fe63..7508623696 100644 --- a/pkg/devserver/devserver.go +++ b/pkg/devserver/devserver.go @@ -218,7 +218,7 @@ func start(ctx context.Context, opts StartOpts) error { eventTopic: opts.Config.EventStream.Service.Concrete.TopicName(), }, ), - executor.WithStepLimits(consts.DefaultMaxStepLimit), + executor.WithStepLimits(func(id state.Identifier) int { return consts.DefaultMaxStepLimit }), executor.WithInvokeNotFoundHandler(getInvokeNotFoundHandler(ctx, pb, opts.Config.EventStream.Service.Concrete.TopicName())), executor.WithSendingEventHandler(getSendingEventHandler(ctx, pb, opts.Config.EventStream.Service.Concrete.TopicName())), executor.WithDebouncer(debouncer), diff --git a/pkg/execution/executor/executor.go b/pkg/execution/executor/executor.go index 12c74da09c..cf85c22cd0 100644 --- a/pkg/execution/executor/executor.go +++ b/pkg/execution/executor/executor.go @@ -168,11 +168,8 @@ func WithLifecycleListeners(l ...execution.LifecycleListener) ExecutorOpt { } } -func WithStepLimits(limit uint) ExecutorOpt { +func WithStepLimits(limit func(id state.Identifier) int) ExecutorOpt { return func(e execution.Executor) error { - if limit > consts.AbsoluteMaxStepLimit { - return fmt.Errorf("%d is greater than the absolute step limit of %d", limit, consts.AbsoluteMaxStepLimit) - } e.(*executor).steplimit = limit return nil } @@ -241,7 +238,7 @@ type executor struct { lifecycles []execution.LifecycleListener - steplimit uint + steplimit func(id state.Identifier) int } func (e *executor) SetFinishHandler(f execution.FinishHandler) { diff --git a/pkg/execution/executor/validate.go b/pkg/execution/executor/validate.go index 42c1997503..3cffc69f34 100644 --- a/pkg/execution/executor/validate.go +++ b/pkg/execution/executor/validate.go @@ -2,8 +2,10 @@ package executor import ( "context" + "fmt" "time" + "github.com/inngest/inngest/pkg/consts" "github.com/inngest/inngest/pkg/enums" "github.com/inngest/inngest/pkg/execution" "github.com/inngest/inngest/pkg/execution/queue" @@ -63,7 +65,20 @@ func (r *runValidator) checkCancelled(ctx context.Context) error { } func (r *runValidator) checkStepLimit(ctx context.Context) error { - if r.e.steplimit != 0 && len(r.s.Actions()) >= int(r.e.steplimit) { + var limit int + + if r.e.steplimit != nil { + limit = r.e.steplimit(r.item.Identifier) + } + + if limit == 0 { + limit = consts.DefaultMaxStepLimit + } + if limit > consts.AbsoluteMaxStepLimit { + return fmt.Errorf("%d is greater than the absolute step limit of %d", limit, consts.AbsoluteMaxStepLimit) + } + + if limit > 0 && len(r.s.Actions()) >= limit { // Update this function's state to overflowed, if running. if r.md.Status == enums.RunStatusRunning { // XXX: Update error to failed, set error message