Skip to content

Commit

Permalink
Add runID to step.invoke() when the run is scheduled (#1360)
Browse files Browse the repository at this point in the history
Annotate the `step.invoke` span with the target runID, so it can be used as reference when showing the trace.
This mainly helps with in-process runs where the invoke target might not have finished yet.

This PR also tweaks the wait virtual span to make sure it's reconstruction works as expected.

---------
Co-authored-by: Darwin D Wu <darwin67@users.noreply.github.com>
  • Loading branch information
darwin67 committed May 16, 2024
1 parent e156e49 commit 8546316
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 141 deletions.
8 changes: 5 additions & 3 deletions pkg/consts/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@ const (

OtelSysStepSleepEndAt = "sys.step.sleep.end"

OtelSysStepWaitExpires = "sys.step.wait.expires"
OtelSysStepWaitExpired = "sys.step.wait.expired"
OtelSysStepWaitExpires = "sys.step.wait.expires"
OtelSysStepWaitExpired = "sys.step.wait.expired"
OtelSysStepWaitEventName = "sys.step.wait.event"
OtelSysStepWaitExpression = "sys.step.wait.expr"
OtelSysStepWaitMatchedEventID = "sys.step.wait.matched.event.id"

OtelSysStepInvokeExpires = "sys.step.invoke.expires"
OtelSysStepInvokeTargetFnID = "sys.step.invoke.fn.id"
Expand Down Expand Up @@ -83,7 +86,6 @@ const (
OtelScopeEvent = "event.inngest"
OtelScopeBatch = "batch.inngest"
OtelScopeDebounce = "debounce.inngest"
OtelScopeWait = "wait.inngest"
OtelScopeTrigger = "trigger.inngest"
OtelScopeCron = "cron.inngest"
OtelScopeEnv = "env.inngest"
Expand Down
70 changes: 58 additions & 12 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"crypto/rand"
"encoding/json"
"errors"
"strings"
"time"

"github.com/google/uuid"
"github.com/inngest/inngest/pkg/consts"
"github.com/inngest/inngest/pkg/telemetry"
"github.com/oklog/ulid/v2"
)

Expand Down Expand Up @@ -135,8 +137,38 @@ func (e Event) IsFinishedEvent() bool {
// function. Note that this metadata is not present on all functions. For
// accessing an event's correlation ID, prefer using `Event.CorrelationID()`.
type InngestMetadata struct {
InvokeFnID string `json:"fn_id"`
InvokeCorrelationId string `json:"correlation_id,omitempty"`
SourceAppID string `json:"source_app_id"`
SourceFnID string `json:"source_fn_id"`
SourceFnVersion int `json:"source_fn_v"`
InvokeFnID string `json:"fn_id"`
InvokeCorrelationId string `json:"correlation_id,omitempty"`
InvokeTraceCarrier *telemetry.TraceCarrier `json:"tc,omitempty"`
InvokeExpiresAt int64 `json:"expire"`
InvokeGroupID string `json:"gid"`
InvokeDisplayName string `json:"name"`
}

func (m *InngestMetadata) Decode(data any) error {
byt, err := json.Marshal(data)
if err != nil {
return err
}
return json.Unmarshal(byt, m)
}

func (m *InngestMetadata) RunID() *ulid.ULID {
if len(m.InvokeCorrelationId) == 0 {
return nil
}
s := strings.Split(m.InvokeCorrelationId, ".")
if len(s) != 2 {
return nil
}

if id, err := ulid.Parse(s[0]); err == nil {
return &id
}
return nil
}

func (e Event) InngestMetadata() *InngestMetadata {
Expand Down Expand Up @@ -203,9 +235,16 @@ func (o ossTrackedEvent) GetWorkspaceID() uuid.UUID {
}

type NewInvocationEventOpts struct {
Event Event
FnID string
CorrelationID *string
SourceAppID string
SourceFnID string
SourceFnVersion int
Event Event
FnID string
CorrelationID *string
TraceCarrier *telemetry.TraceCarrier
ExpiresAt int64
GroupID string
DisplayName string
}

func NewInvocationEvent(opts NewInvocationEventOpts) Event {
Expand All @@ -222,14 +261,21 @@ func NewInvocationEvent(opts NewInvocationEventOpts) Event {
}
evt.Name = InvokeFnName

correlationID := ""
if opts.CorrelationID != nil {
correlationID = *opts.CorrelationID
}

evt.Data[consts.InngestEventDataPrefix] = InngestMetadata{
InvokeFnID: opts.FnID,
InvokeCorrelationId: func() string {
if opts.CorrelationID != nil {
return *opts.CorrelationID
}
return ""
}(),
InvokeFnID: opts.FnID,
InvokeCorrelationId: correlationID,
InvokeTraceCarrier: opts.TraceCarrier,
InvokeExpiresAt: opts.ExpiresAt,
InvokeGroupID: opts.GroupID,
InvokeDisplayName: opts.DisplayName,
SourceAppID: opts.SourceAppID,
SourceFnID: opts.SourceFnID,
SourceFnVersion: opts.SourceFnVersion,
}

return evt
Expand Down
Loading

0 comments on commit 8546316

Please sign in to comment.