Skip to content

Commit

Permalink
Add Fail handler to executor
Browse files Browse the repository at this point in the history
This calls lifecycles after marking a function as failed.
  • Loading branch information
tonyhb committed Feb 6, 2024
1 parent 01d108d commit 25ec570
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ type Executor interface {
Cancel(ctx context.Context, runID ulid.ULID, r CancelRequest) error
// Resume resumes an in-progress function run from the given waitForEvent pause.
Resume(ctx context.Context, p state.Pause, r ResumeRequest) error
// Fail fails a functin with the given error.
Fail(ctx context.Context, runID ulid.ULID, err error) error

// AddLifecycleListener adds a lifecycle listener to run on hooks. This must
// always add to a list of listeners vs replace listeners.
Expand Down
42 changes: 42 additions & 0 deletions pkg/execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,48 @@ func (e *executor) handleAggregatePauses(ctx context.Context, evt event.TrackedE
return res, goerr
}

func (e *executor) Fail(ctx context.Context, runID ulid.ULID, err error) error {
s, err := e.sm.Load(ctx, runID)
if err != nil {

Check failure on line 1286 in pkg/execution/executor/executor.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA4009: argument err is overwritten before first use (staticcheck)
return fmt.Errorf("unable to load run: %w", err)

Check failure on line 1287 in pkg/execution/executor/executor.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA4009(related information): assignment to err (staticcheck)
}
md := s.Metadata()

switch md.Status {
case enums.RunStatusFailed, enums.RunStatusCompleted, enums.RunStatusOverflowed:
return ErrFunctionEnded
case enums.RunStatusCancelled:
return nil
}

if err := e.sm.SetStatus(ctx, md.Identifier, enums.RunStatusFailed); err != nil {
return fmt.Errorf("error cancelling function: %w", err)
}

msg := err.Error()
if err := e.runFinishHandler(ctx, s.Identifier(), s, state.DriverResponse{
Err: &msg,
}); err != nil {
logger.From(ctx).Error().Err(err).Msg("error running finish handler")
}

for _, e := range e.lifecycles {
go e.OnFunctionFinished(
context.WithoutCancel(ctx),
md.Identifier,
queue.Item{
Identifier: s.Identifier(),
},
state.DriverResponse{
Err: &msg,
},
s,
)
}

return nil
}

// Cancel cancels an in-progress function.
func (e *executor) Cancel(ctx context.Context, runID ulid.ULID, r execution.CancelRequest) error {
s, err := e.sm.Load(ctx, runID)
Expand Down

0 comments on commit 25ec570

Please sign in to comment.