Skip to content

Commit

Permalink
Reuse batching logic in executor (#1321)
Browse files Browse the repository at this point in the history
This change introduces two new executor methods, `AppendAndScheduleBatch` and `RetrieveAndScheduleBatch` which offer high-level batch handling capabilities and are used throughout the batching implementation. Existing usages have been refactored to remove duplicated logic.
  • Loading branch information
BrunoScheufler committed May 8, 2024
1 parent 9b93885 commit 5540d01
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 118 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ pkg/devserver/static/
Thumbs.db

/.vercel/

.idea
4 changes: 3 additions & 1 deletion pkg/execution/batch/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,10 @@ func (b redisBatchManager) ScheduleExecution(ctx context.Context, opts ScheduleB
Identifier: state.Identifier{
WorkflowID: opts.FunctionID,
WorkflowVersion: opts.FunctionVersion,
RunID: ulid.Make(),
Key: fmt.Sprintf("batchschedule:%s", opts.BatchID),
AccountID: opts.AccountID,
WorkspaceID: opts.WorkspaceID,
AppID: opts.AppID,
},
Attempt: 0,
MaxAttempts: &maxAttempts,
Expand Down
4 changes: 4 additions & 0 deletions pkg/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package execution
import (
"context"
"encoding/json"
"github.com/inngest/inngest/pkg/execution/batch"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -121,6 +122,9 @@ type Executor interface {

// InvokeNotFoundHandler invokes the invoke not found handler.
InvokeNotFoundHandler(context.Context, InvokeNotFoundHandlerOpts) error

AppendAndScheduleBatch(ctx context.Context, fn inngest.Function, bi batch.BatchItem) error
RetrieveAndScheduleBatch(ctx context.Context, fn inngest.Function, payload batch.ScheduleBatchPayload) error
}

// PublishFinishedEventOpts represents the options for publishing a finished event.
Expand Down
115 changes: 115 additions & 0 deletions pkg/execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2282,6 +2282,121 @@ func (e *executor) extractTraceCtx(ctx context.Context, id state.Identifier, ite
return ctx
}

// AppendAndScheduleBatch appends a new batch item. If a new batch is created, it will be scheduled to run
// after the batch timeout. If the item finalizes the batch, a function run is immediately scheduled.
func (e executor) AppendAndScheduleBatch(ctx context.Context, fn inngest.Function, bi batch.BatchItem) error {
result, err := e.batcher.Append(ctx, bi, fn)
if err != nil {
return err
}

switch result.Status {
case enums.BatchAppend:
// noop
case enums.BatchNew:
dur, err := time.ParseDuration(fn.EventBatch.Timeout)
if err != nil {
return err
}
at := time.Now().Add(dur)

if err := e.batcher.ScheduleExecution(ctx, batch.ScheduleBatchOpts{
ScheduleBatchPayload: batch.ScheduleBatchPayload{
BatchID: ulid.MustParse(result.BatchID),
AccountID: bi.AccountID,
WorkspaceID: bi.WorkspaceID,
AppID: bi.AppID,
FunctionID: bi.FunctionID,
FunctionVersion: bi.FunctionVersion,
},
At: at,
}); err != nil {
return err
}
case enums.BatchFull:
// start execution immediately
batchID := ulid.MustParse(result.BatchID)
if err := e.RetrieveAndScheduleBatch(ctx, fn, batch.ScheduleBatchPayload{
BatchID: batchID,
AppID: bi.AppID,
WorkspaceID: bi.WorkspaceID,
AccountID: bi.AccountID,
}); err != nil {
return fmt.Errorf("could not retrieve and schedule batch items: %w", err)
}
default:
return fmt.Errorf("invalid status of batch append ops: %d", result.Status)
}

return nil
}

// RetrieveAndScheduleBatch retrieves all items from a started batch and schedules a function run
func (e executor) RetrieveAndScheduleBatch(ctx context.Context, fn inngest.Function, payload batch.ScheduleBatchPayload) error {
evtList, err := e.batcher.RetrieveItems(ctx, payload.BatchID)
if err != nil {
return err
}

evtIDs := make([]string, len(evtList))
events := make([]event.TrackedEvent, len(evtList))
for i, e := range evtList {
events[i] = e
evtIDs[i] = e.GetInternalID().String()
}

ctx, span := telemetry.NewSpan(ctx,
telemetry.WithScope(consts.OtelScopeBatch),
telemetry.WithName(consts.OtelSpanBatch),
telemetry.WithNewRoot(),
telemetry.WithSpanAttributes(
attribute.Bool(consts.OtelUserTraceFilterKey, true),
attribute.String(consts.OtelSysAccountID, payload.AccountID.String()),
attribute.String(consts.OtelSysWorkspaceID, payload.WorkspaceID.String()),
attribute.String(consts.OtelSysAppID, payload.AppID.String()),
attribute.String(consts.OtelSysFunctionID, fn.ID.String()),
attribute.String(consts.OtelSysBatchID, payload.BatchID.String()),
attribute.String(consts.OtelSysEventIDs, strings.Join(evtIDs, ",")),
))
defer span.End()

// still process events in case the user disables batching while a batch is still in-flight
if fn.EventBatch != nil {
if len(events) == fn.EventBatch.MaxSize {
span.SetAttributes(attribute.Bool(consts.OtelSysBatchFull, true))
} else {
span.SetAttributes(attribute.Bool(consts.OtelSysBatchTimeout, true))
}
}

key := fmt.Sprintf("%s-%s", fn.ID, payload.BatchID)
identifier, err := e.Schedule(ctx, execution.ScheduleRequest{
AccountID: payload.AccountID,
WorkspaceID: payload.WorkspaceID,
AppID: payload.AppID,
Function: fn,
Events: events,
BatchID: &payload.BatchID,
IdempotencyKey: &key,
})
if err != nil {
span.SetStatus(codes.Error, err.Error())
return err
}

if identifier != nil {
span.SetAttributes(attribute.String(consts.OtelAttrSDKRunID, identifier.RunID.String()))
} else {
span.SetAttributes(attribute.Bool(consts.OtelSysStepDelete, true))
}

if err := e.batcher.ExpireKeys(ctx, payload.BatchID); err != nil {
return err
}

return nil
}

// extractTraceCtxFromMap extracts the trace context from a map, if it exists.
// If it doesn't or it is invalid, it nil.
func extractTraceCtxFromMap(ctx context.Context, target map[string]any) (context.Context, bool) {
Expand Down
51 changes: 9 additions & 42 deletions pkg/execution/executor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,48 +318,15 @@ func (s *svc) handleScheduledBatch(ctx context.Context, item queue.Item) error {
return err
}

// TODO: this logic is repeated in runner, consolidate it somewhere
// convert batch items to tracked events
items, err := s.batcher.RetrieveItems(ctx, batchID)
if err != nil {
return err
}
events := make([]event.TrackedEvent, len(items))
for i, item := range items {
events[i] = item
}

ctx, span := telemetry.NewSpan(ctx,
telemetry.WithScope(consts.OtelScopeBatch),
telemetry.WithName(consts.OtelSpanBatch),
telemetry.WithSpanAttributes(
attribute.String(consts.OtelSysAccountID, item.Identifier.AccountID.String()),
attribute.String(consts.OtelSysWorkspaceID, item.Identifier.WorkspaceID.String()),
attribute.String(consts.OtelSysAppID, item.Identifier.AppID.String()),
attribute.String(consts.OtelSysFunctionID, item.Identifier.WorkflowID.String()),
attribute.String(consts.OtelSysBatchID, batchID.String()),
attribute.Bool(consts.OtelSysBatchFull, true),
),
)
defer span.End()

// start execution
id := fmt.Sprintf("%s-%s", opts.FunctionID, batchID)
_, err = s.exec.Schedule(ctx, execution.ScheduleRequest{
AccountID: opts.AccountID,
WorkspaceID: opts.WorkspaceID,
AppID: opts.AppID,
Function: *fn,
Events: events,
BatchID: &batchID,
IdempotencyKey: &id,
})
if err != nil {
return err
}

if err := s.batcher.ExpireKeys(ctx, batchID); err != nil {
return err
if err := s.exec.RetrieveAndScheduleBatch(ctx, *fn, batch.ScheduleBatchPayload{
BatchID: batchID,
AccountID: item.Identifier.AccountID,
WorkspaceID: item.Identifier.WorkspaceID,
AppID: item.Identifier.AppID,
FunctionID: item.Identifier.WorkflowID,
FunctionVersion: fn.FunctionVersion,
}); err != nil {
return fmt.Errorf("could not retrieve and schedule batch items: %w", err)
}

return nil
Expand Down
77 changes: 2 additions & 75 deletions pkg/execution/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/inngest/inngest/pkg/config"
"github.com/inngest/inngest/pkg/consts"
"github.com/inngest/inngest/pkg/cqrs"
"github.com/inngest/inngest/pkg/enums"
"github.com/inngest/inngest/pkg/event"
"github.com/inngest/inngest/pkg/execution"
"github.com/inngest/inngest/pkg/execution/batch"
Expand Down Expand Up @@ -616,81 +615,9 @@ func (s *svc) initialize(ctx context.Context, fn inngest.Function, evt event.Tra
EventID: evt.GetInternalID(),
Event: evt.GetEvent(),
}
result, err := s.batcher.Append(ctx, bi, fn)
if err != nil {
return err
}

switch result.Status {
case enums.BatchAppend:
// noop
case enums.BatchNew:
dur, err := time.ParseDuration(fn.EventBatch.Timeout)
if err != nil {
return err
}
at := time.Now().Add(dur)

if err := s.batcher.ScheduleExecution(ctx, batch.ScheduleBatchOpts{
ScheduleBatchPayload: batch.ScheduleBatchPayload{
BatchID: ulid.MustParse(result.BatchID),
AccountID: bi.AccountID,
WorkspaceID: bi.WorkspaceID,
AppID: bi.AppID,
FunctionID: bi.FunctionID,
FunctionVersion: bi.FunctionVersion,
},
At: at,
}); err != nil {
return err
}
case enums.BatchFull:
// start execution immediately
batchID := ulid.MustParse(result.BatchID)

// TODO: this logic is repeated in executor, consolidate it somewhere
evtList, err := s.batcher.RetrieveItems(ctx, batchID)
if err != nil {
return err
}

events := make([]event.TrackedEvent, len(evtList))
for i, e := range evtList {
events[i] = e
}

ctx, span := telemetry.NewSpan(ctx,
telemetry.WithScope(consts.OtelScopeBatch),
telemetry.WithName(consts.OtelSpanBatch),
telemetry.WithSpanAttributes(
attribute.String(consts.OtelSysAccountID, bi.AccountID.String()),
attribute.String(consts.OtelSysWorkspaceID, bi.WorkspaceID.String()),
attribute.String(consts.OtelSysAppID, bi.AppID.String()),
attribute.String(consts.OtelSysFunctionID, bi.FunctionID.String()),
attribute.String(consts.OtelSysBatchID, batchID.String()),
attribute.Bool(consts.OtelSysBatchFull, true),
))
defer span.End()

key := fmt.Sprintf("%s-%s", fn.ID, batchID)
_, err = s.executor.Schedule(ctx, execution.ScheduleRequest{
AccountID: bi.AccountID,
WorkspaceID: bi.WorkspaceID,
AppID: bi.AppID,
Function: fn,
Events: events,
BatchID: &batchID,
IdempotencyKey: &key,
})
if err != nil {
return err
}

if err := s.batcher.ExpireKeys(ctx, batchID); err != nil {
return err
}
default:
return fmt.Errorf("invalid status of batch append ops: %d", result.Status)
if err := s.executor.AppendAndScheduleBatch(ctx, fn, bi); err != nil {
return fmt.Errorf("could not append and schedule batch item: %w", err)
}

return nil
Expand Down

0 comments on commit 5540d01

Please sign in to comment.