Skip to content

Commit

Permalink
Support step.waitForEvent in trace runs (#1358)
Browse files Browse the repository at this point in the history
* move shared attributes out and construct span for step.waitForEvent

* add span generation on handling wait

* add opcode

---------

Co-authored-by: Darwin D Wu <darwin67@users.noreply.github.com>
  • Loading branch information
darwin67 and darwin67 committed May 15, 2024
1 parent af9c9fb commit 1e9c9f2
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 34 deletions.
16 changes: 11 additions & 5 deletions pkg/consts/otel.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package consts

const (
OtelSpanEvent = "event"
OtelSpanCron = "cron"
OtelSpanBatch = "batch"
OtelSpanDebounce = "debounce"
OtelSpanTrigger = "trigger"
OtelSpanEvent = "event"
OtelSpanCron = "cron"
OtelSpanBatch = "batch"
OtelSpanDebounce = "debounce"
OtelSpanTrigger = "trigger"
OtelSpanInvoke = "invoke"
OtelSpanWaitForEvent = "wait"

// system attributes
OtelSysAccountID = "sys.account.id"
Expand Down Expand Up @@ -50,6 +52,9 @@ const (

OtelSysStepSleepEndAt = "sys.step.sleep.end"

OtelSysStepWaitExpires = "sys.step.wait.expires"
OtelSysStepWaitExpired = "sys.step.wait.expired"

OtelSysStepInvokeExpires = "sys.step.invoke.expires"
OtelSysStepInvokeTargetFnID = "sys.step.invoke.fn.id"
OtelSysStepInvokeTriggeringEventID = "sys.step.invoke.event.outgoing.id"
Expand Down Expand Up @@ -78,6 +83,7 @@ const (
OtelScopeEvent = "event.inngest"
OtelScopeBatch = "batch.inngest"
OtelScopeDebounce = "debounce.inngest"
OtelScopeWait = "wait.inngest"
OtelScopeTrigger = "trigger.inngest"
OtelScopeCron = "cron.inngest"
OtelScopeEnv = "env.inngest"
Expand Down
113 changes: 84 additions & 29 deletions pkg/execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1660,9 +1660,30 @@ func (e *executor) Resume(ctx context.Context, pause state.Pause, r execution.Re
return fmt.Errorf("error enqueueing after pause: %w", err)
}

ts := time.Now()
if pause.TraceStartedAt != nil {
ts = (*pause.TraceStartedAt).Time()
}

commonAttrs := []attribute.KeyValue{
attribute.Bool(consts.OtelUserTraceFilterKey, true),
attribute.String(consts.OtelSysAccountID, pause.Identifier.AccountID.String()),
attribute.String(consts.OtelSysWorkspaceID, pause.Identifier.WorkspaceID.String()),
attribute.String(consts.OtelSysAppID, pause.Identifier.AppID.String()),
attribute.String(consts.OtelSysFunctionID, pause.Identifier.WorkflowID.String()),
// attribute.String(consts.OtelSysFunctionSlug, s.Function().GetSlug()),
attribute.Int(consts.OtelSysFunctionVersion, pause.Identifier.WorkflowVersion),
attribute.String(consts.OtelAttrSDKRunID, pause.Identifier.RunID.String()),
attribute.Int(consts.OtelSysStepAttempt, 0), // ?
attribute.Int(consts.OtelSysStepMaxAttempt, 1), // ?
attribute.String(consts.OtelSysStepGroupID, pause.GroupID),
attribute.String(consts.OtelSysStepDisplayName, pause.StepName),
}

if pause.Opcode != nil && *pause.Opcode == enums.OpcodeInvokeFunction.String() {
if pause.StepSpanID != nil && *pause.StepSpanID != "" {
if spanID, err := trace.SpanIDFromHex(*pause.StepSpanID); err == nil {
// Used for spans
triggeringEventID := ""
if pause.TriggeringEventID != nil {
triggeringEventID = *pause.TriggeringEventID
Expand All @@ -1673,42 +1694,24 @@ func (e *executor) Resume(ctx context.Context, pause state.Pause, r execution.Re
returnedEventID = r.EventID.String()
}

runID := ""
if r.RunID != nil {
runID = r.RunID.String()
}

targetFnID := ""
if pause.InvokeTargetFnID != nil {
targetFnID = *pause.InvokeTargetFnID
}

ts := time.Now()
if pause.TraceStartedAt != nil {
ts = (*pause.TraceStartedAt).Time()
runID := ""
if r.RunID != nil {
runID = r.RunID.String()
}

var span *telemetry.Span
ctx, span = telemetry.NewSpan(ctx,
telemetry.WithScope(consts.OtelScopeStep),
telemetry.WithName("invoke"),
telemetry.WithName(consts.OtelSpanInvoke),
telemetry.WithTimestamp(ts),
telemetry.WithSpanID(spanID),
telemetry.WithSpanAttributes(
attribute.Bool(consts.OtelUserTraceFilterKey, true),
attribute.String(consts.OtelSysAccountID, pause.Identifier.AccountID.String()),
attribute.String(consts.OtelSysWorkspaceID, pause.Identifier.WorkspaceID.String()),
attribute.String(consts.OtelSysAppID, pause.Identifier.AppID.String()),
attribute.String(consts.OtelSysFunctionID, pause.Identifier.WorkflowID.String()),
// attribute.String(consts.OtelSysFunctionSlug, s.Function().GetSlug()),
attribute.Int(consts.OtelSysFunctionVersion, pause.Identifier.WorkflowVersion),
attribute.String(consts.OtelAttrSDKRunID, pause.Identifier.RunID.String()),
attribute.Int(consts.OtelSysStepAttempt, 0), // ?
attribute.Int(consts.OtelSysStepMaxAttempt, 1), // ?
attribute.String(consts.OtelSysStepGroupID, pause.GroupID),
attribute.String(consts.OtelSysStepOpcode, enums.OpcodeInvokeFunction.String()),
attribute.String(consts.OtelSysStepDisplayName, pause.StepName),

attribute.String(consts.OtelSysStepInvokeTargetFnID, targetFnID),
attribute.Int64(consts.OtelSysStepInvokeExpires, pause.Expires.Time().UnixMilli()),
attribute.String(consts.OtelSysStepInvokeTriggeringEventID, triggeringEventID),
Expand All @@ -1717,17 +1720,42 @@ func (e *executor) Resume(ctx context.Context, pause state.Pause, r execution.Re
attribute.Bool(consts.OtelSysStepInvokeExpired, r.EventID == nil),
),
)
span.Send()
defer span.End()
span.SetAttributes(commonAttrs...)
if r.HasError() {
span.SetStatus(codes.Error, r.Error())
}
span.Send()
}
}

for _, e := range e.lifecycles {
go e.OnInvokeFunctionResumed(context.WithoutCancel(ctx), pause.Identifier, r, pause.GroupID)
}
} else {
if pause.StepSpanID != nil && *pause.StepSpanID != "" {
if spanID, err := trace.SpanIDFromHex(*pause.StepSpanID); err == nil {
var span *telemetry.Span
ctx, span = telemetry.NewSpan(ctx,
telemetry.WithScope(consts.OtelScopeStep),
telemetry.WithName(consts.OtelSpanInvoke),
telemetry.WithTimestamp(ts),
telemetry.WithSpanID(spanID),
telemetry.WithSpanAttributes(
attribute.String(consts.OtelSysStepOpcode, enums.OpcodeWaitForEvent.String()),
attribute.Int64(consts.OtelSysStepWaitExpires, pause.Expires.Time().UnixMilli()),
attribute.Bool(consts.OtelSysStepWaitExpired, r.EventID == nil),
),
)
span.Send()
defer span.End()
span.SetAttributes(commonAttrs...)
if r.HasError() {
span.SetStatus(codes.Error, r.Error())
}
}
}

for _, e := range e.lifecycles {
go e.OnWaitForEventResumed(context.WithoutCancel(ctx), pause.Identifier, r, pause.GroupID)
}
Expand Down Expand Up @@ -2177,15 +2205,14 @@ func (e *executor) handleGeneratorInvokeFunction(ctx context.Context, gen state.

ctx, span := telemetry.NewSpan(ctx,
telemetry.WithScope(consts.OtelScopeStep),
telemetry.WithName("invoke"),
telemetry.WithName(consts.OtelSpanInvoke),
telemetry.WithTimestamp(now),
telemetry.WithSpanAttributes(
attribute.Bool(consts.OtelUserTraceFilterKey, true),
attribute.String(consts.OtelSysAccountID, item.Identifier.AccountID.String()),
attribute.String(consts.OtelSysWorkspaceID, item.Identifier.WorkspaceID.String()),
attribute.String(consts.OtelSysAppID, item.Identifier.AppID.String()),
attribute.String(consts.OtelSysFunctionID, item.Identifier.WorkflowID.String()),
// attribute.String(consts.OtelSysFunctionSlug, s.Function().GetSlug()),
attribute.Int(consts.OtelSysFunctionVersion, item.Identifier.WorkflowVersion),
attribute.String(consts.OtelAttrSDKRunID, item.Identifier.RunID.String()),
attribute.Int(consts.OtelSysStepAttempt, 0), // ?
Expand All @@ -2200,6 +2227,7 @@ func (e *executor) handleGeneratorInvokeFunction(ctx context.Context, gen state.
),
)
span.Send()
defer span.End()

spanID := span.SpanContext().SpanID().String()
traceStartedAt := state.Time(now)
Expand Down Expand Up @@ -2265,8 +2293,6 @@ func (e *executor) handleGeneratorInvokeFunction(ctx context.Context, gen state.
return fmt.Errorf("error publishing internal invocation event: %w", err)
}

span.Send()

for _, e := range e.lifecycles {
go e.OnInvokeFunction(context.WithoutCancel(ctx), item.Identifier, item, gen, ulid.MustParse(evt.ID), correlationID)
}
Expand All @@ -2275,7 +2301,7 @@ func (e *executor) handleGeneratorInvokeFunction(ctx context.Context, gen state.
}

func (e *executor) handleGeneratorWaitForEvent(ctx context.Context, gen state.GeneratorOpcode, item queue.Item, edge queue.PayloadEdge) error {
span := trace.SpanFromContext(ctx)
execSpan := trace.SpanFromContext(ctx)
opts, err := gen.WaitForEventOpts()
if err != nil {
return fmt.Errorf("unable to parse wait for event opts: %w", err)
Expand Down Expand Up @@ -2346,6 +2372,31 @@ func (e *executor) handleGeneratorWaitForEvent(ctx context.Context, gen state.Ge
}

opcode := gen.Op.String()
now := time.Now()

ctx, span := telemetry.NewSpan(ctx,
telemetry.WithScope(consts.OtelScopeWait),
telemetry.WithName(consts.OtelSpanWaitForEvent),
telemetry.WithTimestamp(now),
telemetry.WithSpanAttributes(
attribute.Bool(consts.OtelUserTraceFilterKey, true),
attribute.String(consts.OtelSysStepOpcode, enums.OpcodeWaitForEvent.String()),
attribute.String(consts.OtelSysAccountID, item.Identifier.AccountID.String()),
attribute.String(consts.OtelSysWorkspaceID, item.Identifier.WorkspaceID.String()),
attribute.String(consts.OtelSysAppID, item.Identifier.AppID.String()),
attribute.String(consts.OtelSysFunctionID, item.Identifier.WorkflowID.String()),
attribute.Int(consts.OtelSysFunctionVersion, item.Identifier.WorkflowVersion),
attribute.String(consts.OtelAttrSDKRunID, item.Identifier.RunID.String()),
attribute.Int(consts.OtelSysStepAttempt, 0),
attribute.Int(consts.OtelSysStepMaxAttempt, 1),
attribute.String(consts.OtelSysStepGroupID, item.GroupID),
),
)
span.Send()
defer span.End()
spanID := span.SpanContext().SpanID().String()
traceStartedAt := state.Time(now)

err = e.sm.SavePause(ctx, state.Pause{
ID: pauseID,
WorkspaceID: item.WorkspaceID,
Expand All @@ -2360,11 +2411,14 @@ func (e *executor) handleGeneratorWaitForEvent(ctx context.Context, gen state.Ge
Expression: expr,
ExpressionData: data,
DataKey: gen.ID,
StepSpanID: &spanID,
TraceStartedAt: &traceStartedAt,
})
if err == state.ErrPauseAlreadyExists {
return nil
}
if err != nil {
span.Cancel(ctx)
return err
}

Expand All @@ -2389,9 +2443,10 @@ func (e *executor) handleGeneratorWaitForEvent(ctx context.Context, gen state.Ge
},
}, expires)
if err == redis_state.ErrQueueItemExists {
span.Cancel(ctx)
return nil
}
span.SetAttributes(
execSpan.SetAttributes(
attribute.String(consts.OtelSysStepNextOpcode, enums.OpcodeWaitForEvent.String()),
attribute.Int64(consts.OtelSysStepNextTimestamp, time.Now().UnixMilli()),
attribute.Int64(consts.OtelSysStepNextExpires, expires.UnixMilli()),
Expand Down

0 comments on commit 1e9c9f2

Please sign in to comment.