Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delete state when run is cancelled #1293

Merged
merged 1 commit into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1546,6 +1546,9 @@ func (e *executor) Cancel(ctx context.Context, runID ulid.ULID, r execution.Canc
return fmt.Errorf("error cancelling function: %w", err)
}

if err := e.sm.Delete(ctx, s.Identifier()); err != nil {
logger.From(ctx).Error().Err(err).Msg("error deleting state after cancel")
}
// TODO: Load all pauses for the function and remove, once we index pauses.

fnCancelledErr := state.ErrFunctionCancelled.Error()
Expand Down
38 changes: 38 additions & 0 deletions pkg/execution/state/redis_state/redis_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,44 @@ func (m mgr) LeasePause(ctx context.Context, id uuid.UUID) error {
}
}

// Delete deletes state from the state store. Previously, we would handle this in a
// lifecycle. Now, state stores must account for deletion directly. Note that if the
// state store is queue-aware, it must delete queue items for the run also. This may
// not always be the case.
func (m mgr) Delete(ctx context.Context, i state.Identifier) error {
// Ensure this context isn't cancelled; this is called in a goroutine.
callCtx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

// Ensure function idempotency exists for the defined period.
key := m.kf.Idempotency(ctx, i)

cmd := m.r.B().Expire().Key(key).Seconds(int64(consts.FunctionIdempotencyPeriod.Seconds())).Build()
if err := m.r.Do(callCtx, cmd).Error(); err != nil {
return err
}

// Clear all other data for a job.
keys := []string{
m.kf.Actions(ctx, i),
m.kf.RunMetadata(ctx, i.RunID),
m.kf.Events(ctx, i),
m.kf.Stack(ctx, i.RunID),

// XXX: remove these in a state store refactor.
m.kf.Event(ctx, i),
m.kf.History(ctx, i.RunID),
m.kf.Errors(ctx, i),
}
for _, k := range keys {
cmd := m.r.B().Del().Key(k).Build()
if err := m.r.Do(callCtx, cmd).Error(); err != nil {
return err
}
}
return nil
}

func (m mgr) DeletePause(ctx context.Context, p state.Pause) error {
// Add a default event here, which is null and overwritten by everything. This is necessary
// to keep the same cluster key.
Expand Down
3 changes: 3 additions & 0 deletions pkg/execution/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ type Mutater interface {

UpdateMetadata(ctx context.Context, runID ulid.ULID, md MetadataUpdate) error

// Delete removes state from the state store.
Delete(ctx context.Context, i Identifier) error

// Cancel sets a function run metadata status to RunStatusCancelled, which prevents
// future execution of steps.
Cancel(ctx context.Context, i Identifier) error
Expand Down