Skip to content

Commit

Permalink
Allow for increased steps based off of identifiers (#1304)
Browse files Browse the repository at this point in the history
* Allow for increased steps based off of identifiers

This allows customisation of step limits dependent on various factors in
the identifier.  Note that defaults do not change

* lints
  • Loading branch information
tonyhb committed Apr 25, 2024
1 parent 2f11095 commit 312da2e
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pkg/devserver/devserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func start(ctx context.Context, opts StartOpts) error {
eventTopic: opts.Config.EventStream.Service.Concrete.TopicName(),
},
),
executor.WithStepLimits(consts.DefaultMaxStepLimit),
executor.WithStepLimits(func(id state.Identifier) int { return consts.DefaultMaxStepLimit }),
executor.WithInvokeNotFoundHandler(getInvokeNotFoundHandler(ctx, pb, opts.Config.EventStream.Service.Concrete.TopicName())),
executor.WithSendingEventHandler(getSendingEventHandler(ctx, pb, opts.Config.EventStream.Service.Concrete.TopicName())),
executor.WithDebouncer(debouncer),
Expand Down
7 changes: 2 additions & 5 deletions pkg/execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,8 @@ func WithLifecycleListeners(l ...execution.LifecycleListener) ExecutorOpt {
}
}

func WithStepLimits(limit uint) ExecutorOpt {
func WithStepLimits(limit func(id state.Identifier) int) ExecutorOpt {
return func(e execution.Executor) error {
if limit > consts.AbsoluteMaxStepLimit {
return fmt.Errorf("%d is greater than the absolute step limit of %d", limit, consts.AbsoluteMaxStepLimit)
}
e.(*executor).steplimit = limit
return nil
}
Expand Down Expand Up @@ -241,7 +238,7 @@ type executor struct {

lifecycles []execution.LifecycleListener

steplimit uint
steplimit func(id state.Identifier) int
}

func (e *executor) SetFinishHandler(f execution.FinishHandler) {
Expand Down
17 changes: 16 additions & 1 deletion pkg/execution/executor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package executor

import (
"context"
"fmt"
"time"

"github.com/inngest/inngest/pkg/consts"
"github.com/inngest/inngest/pkg/enums"
"github.com/inngest/inngest/pkg/execution"
"github.com/inngest/inngest/pkg/execution/queue"
Expand Down Expand Up @@ -63,7 +65,20 @@ func (r *runValidator) checkCancelled(ctx context.Context) error {
}

func (r *runValidator) checkStepLimit(ctx context.Context) error {
if r.e.steplimit != 0 && len(r.s.Actions()) >= int(r.e.steplimit) {
var limit int

if r.e.steplimit != nil {
limit = r.e.steplimit(r.item.Identifier)
}

if limit == 0 {
limit = consts.DefaultMaxStepLimit
}
if limit > consts.AbsoluteMaxStepLimit {
return fmt.Errorf("%d is greater than the absolute step limit of %d", limit, consts.AbsoluteMaxStepLimit)
}

if limit > 0 && len(r.s.Actions()) >= limit {
// Update this function's state to overflowed, if running.
if r.md.Status == enums.RunStatusRunning {
// XXX: Update error to failed, set error message
Expand Down

0 comments on commit 312da2e

Please sign in to comment.