Skip to content

Commit

Permalink
Executor: support function pausing (#1330)
Browse files Browse the repository at this point in the history
* minor: typo

* add FunctionPausedAt to ScheduleRequest

* add ErrFunctionSkipped

* skip running functions that are paused

* extract IsCron/CronSchedule logic for reuse

* implement OnFunctionSkipped lifecycle method

* minor: move isPaused to where it's actually needed

* minor: typo

* add RunStatusSkipped

* make history/lifecycle/OnFunctionSkipped a no-op for now

* Revert "add RunStatusSkipped"

This reverts commit ec3415f.

* remove unused HistoryTypeFunctionSkipped enum member
  • Loading branch information
cdzombak committed May 9, 2024
1 parent 931c4af commit 1530a30
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 43 deletions.
22 changes: 22 additions & 0 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,25 @@ func NewInvocationEvent(opts NewInvocationEventOpts) Event {

return evt
}

func (e Event) IsCron() bool {
return IsCron(e.Name)
}

func (e Event) CronSchedule() *string {
if !IsCron(e.Name) {
return nil
}
return CronSchedule(e.Data)
}

func IsCron(evtName string) bool {
return evtName == FnCronName
}

func CronSchedule(evtData map[string]any) *string {
if cron, ok := evtData["cron"].(string); ok && cron != "" {
return &cron
}
return nil
}
2 changes: 2 additions & 0 deletions pkg/execution/debounce/debounce.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type DebounceItem struct {
Event event.Event `json:"e"`
// Timeout is the timeout for the debounce, in unix milliseconds.
Timeout int64 `json:"t,omitempty"`
// FunctionPausedAt indicates whether the function is paused.
FunctionPausedAt *time.Time `json:"fpAt,omitempty"`
}

func (d DebounceItem) QueuePayload() DebouncePayload {
Expand Down
4 changes: 3 additions & 1 deletion pkg/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,14 @@ type ScheduleRequest struct {
BatchID *ulid.ULID
// IdempotencyKey represents an optional idempotency key for the function.
IdempotencyKey *string
// Context represents additional context used when initialiizing function runs.
// Context represents additional context used when initializing function runs.
Context map[string]any
// PreventDebounce prevents debouncing this function and immediately schedules
// execution. This is used after the debounce has finished to force execution
// of the function, instead of debouncing again.
PreventDebounce bool
// FunctionPausedAt indicates whether the function is paused.
FunctionPausedAt *time.Time
}

// CancelRequest stores information about the incoming cancellation request within
Expand Down
82 changes: 48 additions & 34 deletions pkg/execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
ErrNoActionLoader = fmt.Errorf("no action loader provided")
ErrNoRuntimeDriver = fmt.Errorf("runtime driver for action not found")
ErrFunctionDebounced = fmt.Errorf("function debounced")
ErrFunctionSkipped = fmt.Errorf("function skipped")

ErrFunctionEnded = fmt.Errorf("function already ended")

Expand Down Expand Up @@ -290,38 +291,6 @@ func (e *executor) Schedule(ctx context.Context, req execution.ScheduleRequest)
// this run ID.
runID := ulid.MustNew(ulid.Now(), rand.Reader)

// span that tells when the function was queued
_, span := telemetry.NewSpan(ctx,
telemetry.WithScope(consts.OtelScopeTrigger),
telemetry.WithName(consts.OtelSpanTrigger),
telemetry.WithTimestamp(ulid.Time(runID.Time())),
telemetry.WithSpanAttributes(
attribute.Bool(consts.OtelUserTraceFilterKey, true),
attribute.String(consts.OtelSysAccountID, req.AccountID.String()),
attribute.String(consts.OtelSysWorkspaceID, req.WorkspaceID.String()),
attribute.String(consts.OtelSysAppID, req.AppID.String()),
attribute.String(consts.OtelSysFunctionID, req.Function.ID.String()),
attribute.String(consts.OtelSysFunctionSlug, req.Function.GetSlug()),
attribute.Int(consts.OtelSysFunctionVersion, req.Function.FunctionVersion),
attribute.String(consts.OtelAttrSDKRunID, runID.String()),
attribute.Int64(consts.OtelSysFunctionStatusCode, enums.RunStatusScheduled.ToCode()),
),
)
defer span.End()
if req.BatchID != nil {
span.SetAttributes(attribute.String(consts.OtelSysBatchID, req.BatchID.String()))
}
if req.PreventDebounce {
span.SetAttributes(attribute.Bool(consts.OtelSysDebounceTimeout, true))
}
if req.Context != nil {
if val, ok := req.Context[consts.OtelPropagationLinkKey]; ok {
if link, ok := val.(string); ok {
span.SetAttributes(attribute.String(consts.OtelPropagationLinkKey, link))
}
}
}

var key string
if req.IdempotencyKey != nil {
// Use the given idempotency key
Expand All @@ -347,8 +316,6 @@ func (e *executor) Schedule(ctx context.Context, req execution.ScheduleRequest)
id := e.GetInternalID()
eventIDs = append(eventIDs, id)
}
spanID := telemetry.NewSpanID(ctx)
span.SetEventIDs(req.Events...)

id := state.Identifier{
WorkflowID: req.Function.ID,
Expand All @@ -365,6 +332,51 @@ func (e *executor) Schedule(ctx context.Context, req execution.ScheduleRequest)
ReplayID: req.ReplayID,
}

isPaused := req.FunctionPausedAt != nil && req.FunctionPausedAt.Before(time.Now())
if isPaused {
for _, e := range e.lifecycles {
go e.OnFunctionSkipped(context.WithoutCancel(ctx), id, execution.SkipState{
CronSchedule: req.Events[0].GetEvent().CronSchedule(),
})
}
return nil, nil
}

// span that tells when the function was queued
_, span := telemetry.NewSpan(ctx,
telemetry.WithScope(consts.OtelScopeTrigger),
telemetry.WithName(consts.OtelSpanTrigger),
telemetry.WithTimestamp(ulid.Time(runID.Time())),
telemetry.WithSpanAttributes(
attribute.Bool(consts.OtelUserTraceFilterKey, true),
attribute.String(consts.OtelSysAccountID, req.AccountID.String()),
attribute.String(consts.OtelSysWorkspaceID, req.WorkspaceID.String()),
attribute.String(consts.OtelSysAppID, req.AppID.String()),
attribute.String(consts.OtelSysFunctionID, req.Function.ID.String()),
attribute.String(consts.OtelSysFunctionSlug, req.Function.GetSlug()),
attribute.Int(consts.OtelSysFunctionVersion, req.Function.FunctionVersion),
attribute.String(consts.OtelAttrSDKRunID, runID.String()),
attribute.Int64(consts.OtelSysFunctionStatusCode, enums.RunStatusScheduled.ToCode()),
),
)
defer span.End()

if req.BatchID != nil {
span.SetAttributes(attribute.String(consts.OtelSysBatchID, req.BatchID.String()))
}
if req.PreventDebounce {
span.SetAttributes(attribute.Bool(consts.OtelSysDebounceTimeout, true))
}
if req.Context != nil {
if val, ok := req.Context[consts.OtelPropagationLinkKey]; ok {
if link, ok := val.(string); ok {
span.SetAttributes(attribute.String(consts.OtelPropagationLinkKey, link))
}
}
}

span.SetEventIDs(req.Events...)

mapped := make([]map[string]any, len(req.Events))
for n, item := range req.Events {
evt := item.GetEvent()
Expand Down Expand Up @@ -423,6 +435,8 @@ func (e *executor) Schedule(ctx context.Context, req execution.ScheduleRequest)
telemetry.UserTracer().Propagator().Inject(ctx, propagation.MapCarrier(carrier.Context))
stateMetadata[consts.OtelPropagationKey] = carrier

spanID := telemetry.NewSpanID(ctx)

// Create a new function.
s, err := e.sm.New(ctx, state.Input{
Identifier: id,
Expand Down
9 changes: 9 additions & 0 deletions pkg/execution/history/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ func (l lifecycle) OnFunctionStarted(
}
}

// OnFunctionSkipped is called when a function run is skipped.
func (l lifecycle) OnFunctionSkipped(
_ context.Context,
_ state.Identifier,
_ execution.SkipState,
) {
// no-op for now.
}

// OnFunctionFinished is called when a function finishes. This will
// be called when a function completes successfully or permanently failed,
// with the final driver response indicating the type of success.
Expand Down
24 changes: 23 additions & 1 deletion pkg/execution/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (
"github.com/oklog/ulid/v2"
)

// SkipState represents the subset of state.State's data required for OnFunctionSkipped.
type SkipState struct {
// CronSchedule, if present, is the cron schedule string that triggered the skipped function.
CronSchedule *string
}

var _ LifecycleListener = (*NoopLifecyceListener)(nil)

// LifecycleListener listens to lifecycle events on the executor.
Expand All @@ -26,9 +32,17 @@ type LifecycleListener interface {
state.State,
)

// OnFunctionSkipped is called when a function run is skipped.
// Currently, this happens iff the function is paused.
OnFunctionSkipped(
context.Context,
state.Identifier,
SkipState,
)

// OnFunctionStarted is called when the function starts. This may be
// immediately after the function is scheduled, or in the case of increased
// latency (eg. due to debouncing or concurrency limits) some time after the
// latency (e.g. due to debouncing or concurrency limits) some time after the
// function is scheduled.
OnFunctionStarted(
context.Context,
Expand Down Expand Up @@ -162,6 +176,14 @@ func (NoopLifecyceListener) OnFunctionScheduled(
) {
}

// OnFunctionSkipped is called when a function run is skipped.
func (NoopLifecyceListener) OnFunctionSkipped(
context.Context,
state.Identifier,
SkipState,
) {
}

// OnFunctionStarted is called when the function starts. This may be
// immediately after the function is scheduled, or in the case of increased
// latency (eg. due to debouncing or concurrency limits) some time after the
Expand Down
11 changes: 4 additions & 7 deletions pkg/execution/state/state_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,15 @@ func (s memstate) CronSchedule() *string {
}

if data, ok := s.Event()["data"].(map[string]any); ok {
if cron, ok := data["cron"].(string); ok && cron != "" {
return &cron
}
return event.CronSchedule(data)
}

return nil
}

func (s memstate) IsCron() bool {
if name, _ := s.Event()["name"].(string); name != event.FnCronName {
return false
if name, _ := s.Event()["name"].(string); event.IsCron(name) {
return true
}

return true
return false
}

0 comments on commit 1530a30

Please sign in to comment.