Skip to content

Commit

Permalink
Downstream some tracing changes to span reconstruction (#1343)
Browse files Browse the repository at this point in the history
* Downstream some tracing changes to span reconstruction

* Fix ineffectual context assignment
  • Loading branch information
jpwilliams committed May 10, 2024
1 parent 6907e72 commit 6565f87
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 14 deletions.
11 changes: 11 additions & 0 deletions pkg/consts/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
OtelSysFunctionStatusCode = "sys.function.status.code"
OtelSysFunctionOutput = "sys.function.output"

OtelSysStepDisplayName = "sys.step.display.name"
OtelSysStepOpcode = "sys.step.opcode"
OtelSysStepScheduleAt = "sys.step.time.schedule"
OtelSysStepStartAt = "sys.step.time.start"
OtelSysStepEndAt = "sys.step.time.end"
Expand All @@ -46,6 +48,15 @@ const (
OtelSysStepFirst = "sys.step.first"
OtelSysStepGroupID = "sys.step.group.id"

OtelSysStepSleepEndAt = "sys.step.sleep.end"

OtelSysStepInvokeExpires = "sys.step.invoke.expires"
OtelSysStepInvokeTargetFnID = "sys.step.invoke.fn.id"
OtelSysStepInvokeTriggeringEventID = "sys.step.invoke.event.outgoing.id"
OtelSysStepInvokeReturnedEventID = "sys.step.invoke.event.incoming.id"
OtelSysStepInvokeRunID = "sys.step.invoke.run.id"
OtelSysStepInvokeExpired = "sys.step.invoke.expired"

OtelSysStepRetry = "sys.step.retry"
OtelSysStepNextOpcode = "sys.step.next.opcode"
OtelSysStepNextTimestamp = "sys.step.next.time"
Expand Down
11 changes: 10 additions & 1 deletion pkg/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package execution
import (
"context"
"encoding/json"
"github.com/inngest/inngest/pkg/execution/batch"
"time"

"github.com/inngest/inngest/pkg/execution/batch"

"github.com/google/uuid"
"github.com/inngest/inngest/pkg/event"
"github.com/inngest/inngest/pkg/execution/queue"
Expand Down Expand Up @@ -201,6 +202,10 @@ func (r *ResumeRequest) Error() string {
return r.withKey(StateErrorKey)
}

func (r *ResumeRequest) HasError() bool {
return r.Error() != ""
}

// Set `r.With` to `error` given a `name` and `message`
func (r *ResumeRequest) SetError(name string, message string) {
r.With = map[string]any{
Expand All @@ -216,6 +221,10 @@ func (r *ResumeRequest) Data() string {
return r.withKey(StateDataKey)
}

func (r *ResumeRequest) HasData() bool {
return r.Data() != ""
}

// Set `r.With` to `data` given any data to be set
func (r *ResumeRequest) SetData(data any) {
r.With = map[string]any{
Expand Down
167 changes: 154 additions & 13 deletions pkg/execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ func (e *executor) Execute(ctx context.Context, id state.Identifier, item queue.
incoming := edge.Incoming
if edge.IncomingGeneratorStep != "" {
incoming = edge.IncomingGeneratorStep
span.SetAttributes(attribute.String(consts.OtelSysStepOpcode, enums.OpcodeStepRun.String()))
}
if resp, _ := s.ActionID(incoming); resp != nil {
// This has already successfully been executed.
Expand Down Expand Up @@ -787,9 +788,20 @@ func (e *executor) Execute(ctx context.Context, id state.Identifier, item queue.
span.SetName(spanName)

fnSpan.SetAttributes(attribute.Int64(consts.OtelSysFunctionStatusCode, enums.RunStatusRunning.ToCode()))

foundOp := op.Op
// The op changes based on the current state of the step, so we
// are required to normalize here.
switch foundOp {
case enums.OpcodeStep, enums.OpcodeStepError:
foundOp = enums.OpcodeStepRun
}

span.SetAttributes(
attribute.Int(consts.OtelSysStepStatusCode, resp.StatusCode),
attribute.Int(consts.OtelSysStepOutputSizeBytes, resp.OutputSize),
attribute.String(consts.OtelSysStepDisplayName, op.UserDefinedName()),
attribute.String(consts.OtelSysStepOpcode, foundOp.String()),
)

if byt, err := json.Marshal(resp.Output); err == nil {
Expand Down Expand Up @@ -1648,6 +1660,69 @@ func (e *executor) Resume(ctx context.Context, pause state.Pause, r execution.Re
}

if pause.Opcode != nil && *pause.Opcode == enums.OpcodeInvokeFunction.String() {
if pause.StepSpanID != nil && *pause.StepSpanID != "" {
if spanID, err := trace.SpanIDFromHex(*pause.StepSpanID); err == nil {
triggeringEventID := ""
if pause.TriggeringEventID != nil {
triggeringEventID = *pause.TriggeringEventID
}

returnedEventID := ""
if r.EventID != nil {
returnedEventID = r.EventID.String()
}

runID := ""
if r.RunID != nil {
runID = r.RunID.String()
}

targetFnID := ""
if pause.InvokeTargetFnID != nil {
targetFnID = *pause.InvokeTargetFnID
}

ts := time.Now()
if pause.TraceStartedAt != nil {
ts = (*pause.TraceStartedAt).Time()
}

var span *telemetry.Span
ctx, span = telemetry.NewSpan(ctx,
telemetry.WithScope(consts.OtelScopeStep),
telemetry.WithName("invoke"),
telemetry.WithTimestamp(ts),
telemetry.WithSpanID(spanID),
telemetry.WithSpanAttributes(
attribute.Bool(consts.OtelUserTraceFilterKey, true),
attribute.String(consts.OtelSysAccountID, pause.Identifier.AccountID.String()),
attribute.String(consts.OtelSysWorkspaceID, pause.Identifier.WorkspaceID.String()),
attribute.String(consts.OtelSysAppID, pause.Identifier.AppID.String()),
attribute.String(consts.OtelSysFunctionID, pause.Identifier.WorkflowID.String()),
// attribute.String(consts.OtelSysFunctionSlug, s.Function().GetSlug()),
attribute.Int(consts.OtelSysFunctionVersion, pause.Identifier.WorkflowVersion),
attribute.String(consts.OtelAttrSDKRunID, pause.Identifier.RunID.String()),
attribute.Int(consts.OtelSysStepAttempt, 0), // ?
attribute.Int(consts.OtelSysStepMaxAttempt, 1), // ?
attribute.String(consts.OtelSysStepGroupID, pause.GroupID),
attribute.String(consts.OtelSysStepOpcode, enums.OpcodeInvokeFunction.String()),
attribute.String(consts.OtelSysStepDisplayName, pause.StepName),

attribute.String(consts.OtelSysStepInvokeTargetFnID, targetFnID),
attribute.Int64(consts.OtelSysStepInvokeExpires, pause.Expires.Time().UnixMilli()),
attribute.String(consts.OtelSysStepInvokeTriggeringEventID, triggeringEventID),
attribute.String(consts.OtelSysStepInvokeReturnedEventID, returnedEventID),
attribute.String(consts.OtelSysStepInvokeRunID, runID),
attribute.Bool(consts.OtelSysStepInvokeExpired, r.EventID == nil),
),
)
if r.HasError() {
span.SetStatus(codes.Error, r.Error())
}
span.Send()
}
}

for _, e := range e.lifecycles {
go e.OnInvokeFunctionResumed(context.WithoutCancel(ctx), pause.Identifier, r, pause.GroupID)
}
Expand Down Expand Up @@ -1986,22 +2061,46 @@ func (e *executor) handleGeneratorStepPlanned(ctx context.Context, gen state.Gen
// handleSleep handles the sleep opcode, ensuring that we enqueue the function to rerun
// at the correct time.
func (e *executor) handleGeneratorSleep(ctx context.Context, gen state.GeneratorOpcode, item queue.Item, edge queue.PayloadEdge) error {
span := trace.SpanFromContext(ctx)

dur, err := gen.SleepDuration()
if err != nil {
return err
}

executionSpan := trace.SpanFromContext(ctx)

nextEdge := inngest.Edge{
Outgoing: gen.ID, // Leaving sleep
Incoming: edge.Edge.Incoming, // To re-call the SDK
}

startedAt := time.Now()
endedAt := startedAt.Add(dur)

// Create another group for the next item which will run. We're enqueueing
// the function to run again after sleep, so need a new group.
groupID := uuid.New().String()
ctx = state.WithGroupID(ctx, groupID)
ctx, span := telemetry.NewSpan(ctx,
telemetry.WithScope(consts.OtelScopeStep),
telemetry.WithName("sleep"),
telemetry.WithTimestamp(startedAt),
telemetry.WithSpanAttributes(
attribute.Bool(consts.OtelUserTraceFilterKey, true),
attribute.String(consts.OtelSysAccountID, item.Identifier.AccountID.String()),
attribute.String(consts.OtelSysWorkspaceID, item.Identifier.WorkspaceID.String()),
attribute.String(consts.OtelSysAppID, item.Identifier.AppID.String()),
attribute.String(consts.OtelSysFunctionID, item.Identifier.WorkflowID.String()),
// attribute.String(consts.OtelSysFunctionSlug, s.Function().GetSlug()),
attribute.Int(consts.OtelSysFunctionVersion, item.Identifier.WorkflowVersion),
attribute.String(consts.OtelAttrSDKRunID, item.Identifier.RunID.String()),
attribute.Int(consts.OtelSysStepAttempt, 0), // ?
attribute.Int(consts.OtelSysStepMaxAttempt, 1), // ?
attribute.String(consts.OtelSysStepGroupID, groupID),
attribute.String(consts.OtelSysStepOpcode, enums.OpcodeSleep.String()),
attribute.String(consts.OtelSysStepDisplayName, gen.UserDefinedName()),
attribute.String(consts.OtelSysStepSleepEndAt, endedAt.Format(time.RFC3339Nano)),
),
)

until := time.Now().Add(dur)

Expand All @@ -2022,9 +2121,11 @@ func (e *executor) handleGeneratorSleep(ctx context.Context, gen state.Generator
}, until)
if err == redis_state.ErrQueueItemExists {
// Safely ignore this error.
span.Cancel(ctx)
return nil
}
span.SetAttributes(
span.Send()
executionSpan.SetAttributes(
attribute.String(consts.OtelSysStepNextOpcode, enums.OpcodeSleep.String()),
attribute.Int64(consts.OtelSysStepNextTimestamp, until.UnixMilli()),
)
Expand All @@ -2037,7 +2138,7 @@ func (e *executor) handleGeneratorSleep(ctx context.Context, gen state.Generator
}

func (e *executor) handleGeneratorInvokeFunction(ctx context.Context, gen state.GeneratorOpcode, item queue.Item, edge queue.PayloadEdge) error {
span := trace.SpanFromContext(ctx)
executionSpan := trace.SpanFromContext(ctx)
if e.handleSendingEvent == nil {
return fmt.Errorf("no handleSendingEvent function specified")
}
Expand All @@ -2063,8 +2164,45 @@ func (e *executor) handleGeneratorInvokeFunction(ctx context.Context, gen state.
uuid.NameSpaceOID,
[]byte(item.Identifier.RunID.String()+gen.ID),
)

opcode := gen.Op.String()
now := time.Now()

// Always create an invocation event.
evt := event.NewInvocationEvent(event.NewInvocationEventOpts{
Event: *opts.Payload,
FnID: opts.FunctionID,
CorrelationID: &correlationID,
})

ctx, span := telemetry.NewSpan(ctx,
telemetry.WithScope(consts.OtelScopeStep),
telemetry.WithName("invoke"),
telemetry.WithTimestamp(now),
telemetry.WithSpanAttributes(
attribute.Bool(consts.OtelUserTraceFilterKey, true),
attribute.String(consts.OtelSysAccountID, item.Identifier.AccountID.String()),
attribute.String(consts.OtelSysWorkspaceID, item.Identifier.WorkspaceID.String()),
attribute.String(consts.OtelSysAppID, item.Identifier.AppID.String()),
attribute.String(consts.OtelSysFunctionID, item.Identifier.WorkflowID.String()),
// attribute.String(consts.OtelSysFunctionSlug, s.Function().GetSlug()),
attribute.Int(consts.OtelSysFunctionVersion, item.Identifier.WorkflowVersion),
attribute.String(consts.OtelAttrSDKRunID, item.Identifier.RunID.String()),
attribute.Int(consts.OtelSysStepAttempt, 0), // ?
attribute.Int(consts.OtelSysStepMaxAttempt, 1), // ?
attribute.String(consts.OtelSysStepGroupID, item.GroupID),
attribute.String(consts.OtelSysStepOpcode, enums.OpcodeInvokeFunction.String()),
attribute.String(consts.OtelSysStepDisplayName, gen.UserDefinedName()),

attribute.String(consts.OtelSysStepInvokeTargetFnID, opts.FunctionID),
attribute.Int64(consts.OtelSysStepInvokeExpires, expires.UnixMilli()),
attribute.String(consts.OtelSysStepInvokeTriggeringEventID, evt.ID),
),
)
span.Send()

spanID := span.SpanContext().SpanID().String()
traceStartedAt := state.Time(now)

err = e.sm.SavePause(ctx, state.Pause{
ID: pauseID,
WorkspaceID: item.WorkspaceID,
Expand All @@ -2079,11 +2217,17 @@ func (e *executor) handleGeneratorInvokeFunction(ctx context.Context, gen state.
Expression: &strExpr,
DataKey: gen.ID,
InvokeCorrelationID: &correlationID,
StepSpanID: &spanID,
TriggeringEventID: &evt.ID,
TraceStartedAt: &traceStartedAt,
InvokeTargetFnID: &opts.FunctionID,
})
if err == state.ErrPauseAlreadyExists {
span.Cancel(ctx)
return nil
}
if err != nil {
span.Cancel(ctx)
return err
}

Expand All @@ -2104,27 +2248,24 @@ func (e *executor) handleGeneratorInvokeFunction(ctx context.Context, gen state.
},
}, expires)
if err == redis_state.ErrQueueItemExists {
span.Cancel(ctx)
return nil
}
span.SetAttributes(
executionSpan.SetAttributes(
attribute.String(consts.OtelSysStepNextOpcode, enums.OpcodeInvokeFunction.String()),
attribute.Int64(consts.OtelSysStepNextTimestamp, time.Now().UnixMilli()),
attribute.Int64(consts.OtelSysStepNextExpires, expires.UnixMilli()),
)

// Always create an invocation event.
evt := event.NewInvocationEvent(event.NewInvocationEventOpts{
Event: *opts.Payload,
FnID: opts.FunctionID,
CorrelationID: &correlationID,
})

err = e.handleSendingEvent(ctx, evt, item)
if err != nil {
span.Cancel(ctx)
// TODO Cancel pause/timeout?
return fmt.Errorf("error publishing internal invocation event: %w", err)
}

span.Send()

for _, e := range e.lifecycles {
go e.OnInvokeFunction(context.WithoutCancel(ctx), item.Identifier, item, gen, ulid.MustParse(evt.ID), correlationID)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/execution/state/driver_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ func (g GeneratorOpcode) Output() (string, error) {
return "", nil
}

// IsError returns whether this op represents an error, for example a
// `StepError` being passed back from an SDK.
func (g GeneratorOpcode) IsError() bool {
return g.Error != nil
}

func (g GeneratorOpcode) WaitForEventOpts() (*WaitForEventOpts, error) {
if opts, ok := g.Opts.(*WaitForEventOpts); ok && opts != nil {
return opts, nil
Expand Down
8 changes: 8 additions & 0 deletions pkg/execution/state/pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ type Pause struct {
ExpressionData map[string]any `json:"data"`
// InvokeCorrelationID is the correlation ID for the invoke pause.
InvokeCorrelationID *string `json:"icID,omitempty"`
// InvokeTargetFnID is the target function ID for the invoke pause.
// This is used to be able to accurately reconstruct the entire invocation
// span.
InvokeTargetFnID *string `json:"itFnID,omitempty"`
// OnTimeout indicates that this incoming edge should only be ran
// when the pause times out, if set to true.
OnTimeout bool `json:"onTimeout"`
Expand Down Expand Up @@ -182,6 +186,10 @@ type Pause struct {
// TriggeringEventID is the event that triggered the original run. This allows us
// to exclude the original event ID when considering triggers.
TriggeringEventID *string `json:"tID,omitempty"`
// StepSpanID is the span ID for the step that caused this pause.
StepSpanID *string `json:"ssID,omitempty"`
// SpanStartedAt is the time at which the span for this pause was started.
TraceStartedAt *Time `json:"tsa"`
}

func (p Pause) GetID() uuid.UUID {
Expand Down

0 comments on commit 6565f87

Please sign in to comment.