Skip to content

Commit

Permalink
Complete pause support in batch handling (#1356)
Browse files Browse the repository at this point in the history
* minor: typo

* Add function pause data as a new option on RetrieveAndScheduleBatch/AppendAndScheduleBatch

* deprecate ScheduleBatchPayload.FunctionPausedAt

* migrate remaining callsites to new method

* allow passing nil opts instead of empty structs
  • Loading branch information
cdzombak committed May 15, 2024
1 parent 3fb08b2 commit c548109
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 17 deletions.
14 changes: 7 additions & 7 deletions pkg/execution/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ type ScheduleBatchOpts struct {
}

type ScheduleBatchPayload struct {
BatchID ulid.ULID `json:"batchID"`
AccountID uuid.UUID `json:"acctID"`
WorkspaceID uuid.UUID `json:"wsID"`
AppID uuid.UUID `json:"appID"`
FunctionID uuid.UUID `json:"fnID"`
FunctionVersion int `json:"fnV"`
FunctionPausedAt *time.Time `json:"fpAt,omitempty"`
BatchID ulid.ULID `json:"batchID"`
AccountID uuid.UUID `json:"acctID"`
WorkspaceID uuid.UUID `json:"wsID"`
AppID uuid.UUID `json:"appID"`
FunctionID uuid.UUID `json:"fnID"`
FunctionVersion int `json:"fnV"`
DeprecatedFunctionPausedAt *time.Time `json:"fpAt,omitempty"` // deprecated
}
11 changes: 11 additions & 0 deletions pkg/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@ type Executor interface {
// InvokeNotFoundHandler invokes the invoke not found handler.
InvokeNotFoundHandler(context.Context, InvokeNotFoundHandlerOpts) error

AppendAndScheduleBatchWithOpts(ctx context.Context, fn inngest.Function, bi batch.BatchItem, opts *BatchExecOpts) error
// deprecated; use AppendAndScheduleBatchWithOpts in new code
AppendAndScheduleBatch(ctx context.Context, fn inngest.Function, bi batch.BatchItem) error

RetrieveAndScheduleBatchWithOpts(ctx context.Context, fn inngest.Function, payload batch.ScheduleBatchPayload, opts *BatchExecOpts) error
// deprecated; use RetrieveAndScheduleBatchWithOpts in new code
RetrieveAndScheduleBatch(ctx context.Context, fn inngest.Function, payload batch.ScheduleBatchPayload) error
}

Expand All @@ -137,6 +142,12 @@ type InvokeNotFoundHandlerOpts struct {
Result any
}

// BatchExecOpts communicates state and options that are relevant only when scheduling a batch
// to be worked on *imminently* (i.e. ~now, not at some future time).
type BatchExecOpts struct {
FunctionPausedAt *time.Time
}

// FinishHandler is a function that handles functions finishing in the executor.
// It should be used to send the given events.
type FinishHandler func(context.Context, state.State, []event.Event) error
Expand Down
32 changes: 25 additions & 7 deletions pkg/execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewExecutor(opts ...ExecutorOpt) (execution.Executor, error) {
return m, nil
}

// ExecutorOpt modifies the built in executor on creation.
// ExecutorOpt modifies the built-in executor on creation.
type ExecutorOpt func(m execution.Executor) error

func WithCancellationChecker(c cancellation.Checker) ExecutorOpt {
Expand Down Expand Up @@ -2439,14 +2439,22 @@ func (e *executor) extractTraceCtx(ctx context.Context, id state.Identifier, ite
return ctx
}

// AppendAndScheduleBatch appends a new batch item. If a new batch is created, it will be scheduled to run
func (e *executor) AppendAndScheduleBatch(ctx context.Context, fn inngest.Function, bi batch.BatchItem) error {
return e.AppendAndScheduleBatchWithOpts(ctx, fn, bi, nil)
}

// AppendAndScheduleBatchWithOpts appends a new batch item. If a new batch is created, it will be scheduled to run
// after the batch timeout. If the item finalizes the batch, a function run is immediately scheduled.
func (e executor) AppendAndScheduleBatch(ctx context.Context, fn inngest.Function, bi batch.BatchItem) error {
func (e *executor) AppendAndScheduleBatchWithOpts(ctx context.Context, fn inngest.Function, bi batch.BatchItem, opts *execution.BatchExecOpts) error {
result, err := e.batcher.Append(ctx, bi, fn)
if err != nil {
return err
}

if opts == nil {
opts = &execution.BatchExecOpts{}
}

switch result.Status {
case enums.BatchAppend:
// noop
Expand All @@ -2473,11 +2481,13 @@ func (e executor) AppendAndScheduleBatch(ctx context.Context, fn inngest.Functio
case enums.BatchFull:
// start execution immediately
batchID := ulid.MustParse(result.BatchID)
if err := e.RetrieveAndScheduleBatch(ctx, fn, batch.ScheduleBatchPayload{
if err := e.RetrieveAndScheduleBatchWithOpts(ctx, fn, batch.ScheduleBatchPayload{
BatchID: batchID,
AppID: bi.AppID,
WorkspaceID: bi.WorkspaceID,
AccountID: bi.AccountID,
}, &execution.BatchExecOpts{
FunctionPausedAt: opts.FunctionPausedAt,
}); err != nil {
return fmt.Errorf("could not retrieve and schedule batch items: %w", err)
}
Expand All @@ -2488,13 +2498,21 @@ func (e executor) AppendAndScheduleBatch(ctx context.Context, fn inngest.Functio
return nil
}

// RetrieveAndScheduleBatch retrieves all items from a started batch and schedules a function run
func (e executor) RetrieveAndScheduleBatch(ctx context.Context, fn inngest.Function, payload batch.ScheduleBatchPayload) error {
func (e *executor) RetrieveAndScheduleBatch(ctx context.Context, fn inngest.Function, payload batch.ScheduleBatchPayload) error {
return e.RetrieveAndScheduleBatchWithOpts(ctx, fn, payload, nil)
}

// RetrieveAndScheduleBatchWithOpts retrieves all items from a started batch and schedules a function run
func (e *executor) RetrieveAndScheduleBatchWithOpts(ctx context.Context, fn inngest.Function, payload batch.ScheduleBatchPayload, opts *execution.BatchExecOpts) error {
evtList, err := e.batcher.RetrieveItems(ctx, payload.BatchID)
if err != nil {
return err
}

if opts == nil {
opts = &execution.BatchExecOpts{}
}

evtIDs := make([]string, len(evtList))
events := make([]event.TrackedEvent, len(evtList))
for i, e := range evtList {
Expand Down Expand Up @@ -2535,7 +2553,7 @@ func (e executor) RetrieveAndScheduleBatch(ctx context.Context, fn inngest.Funct
Events: events,
BatchID: &payload.BatchID,
IdempotencyKey: &key,
FunctionPausedAt: payload.FunctionPausedAt,
FunctionPausedAt: opts.FunctionPausedAt,
})
if err != nil {
span.SetStatus(codes.Error, err.Error())
Expand Down
4 changes: 2 additions & 2 deletions pkg/execution/executor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,14 @@ func (s *svc) handleScheduledBatch(ctx context.Context, item queue.Item) error {
return err
}

if err := s.exec.RetrieveAndScheduleBatch(ctx, *fn, batch.ScheduleBatchPayload{
if err := s.exec.RetrieveAndScheduleBatchWithOpts(ctx, *fn, batch.ScheduleBatchPayload{
BatchID: batchID,
AccountID: item.Identifier.AccountID,
WorkspaceID: item.Identifier.WorkspaceID,
AppID: item.Identifier.AppID,
FunctionID: item.Identifier.WorkflowID,
FunctionVersion: fn.FunctionVersion,
}); err != nil {
}, nil); err != nil {
return fmt.Errorf("could not retrieve and schedule batch items: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/execution/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func (s *svc) initialize(ctx context.Context, fn inngest.Function, evt event.Tra
Event: evt.GetEvent(),
}

if err := s.executor.AppendAndScheduleBatch(ctx, fn, bi); err != nil {
if err := s.executor.AppendAndScheduleBatchWithOpts(ctx, fn, bi, nil); err != nil {
return fmt.Errorf("could not append and schedule batch item: %w", err)
}

Expand Down

0 comments on commit c548109

Please sign in to comment.