Skip to content

Commit

Permalink
INN-2912: Minor tweaks for span annotations (#1280)
Browse files Browse the repository at this point in the history
* change to store trigger IDs as comma delimited string

* add function status on schedule

* update WithLinks implementation

* embed link trace data if available

* store event data in trigger

* add event data to fn span

* add max attempts for step

---------

Co-authored-by: Darwin D Wu <darwin67@users.noreply.github.com>
  • Loading branch information
darwin67 and darwin67 committed Apr 17, 2024
1 parent 39c54a7 commit 0b7326d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 5 deletions.
4 changes: 3 additions & 1 deletion pkg/consts/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
OtelSysStepStatus = "sys.step.status"
OtelSysStepStatusCode = "sys.step.status.code"
OtelSysStepAttempt = "sys.step.attempt"
OtelSysStepMaxAttempt = "sys.step.attempt.max"
OtelSysStepOutput = "sys.step.output"
OtelSysStepOutputSizeBytes = "sys.step.output.size.bytes"
OtelSysStepFirst = "sys.step.first"
Expand Down Expand Up @@ -68,5 +69,6 @@ const (
// otel collector filter keys
OtelUserTraceFilterKey = "inngest.user"

OtelPropagationKey = "sys.trace"
OtelPropagationKey = "sys.trace"
OtelPropagationLinkKey = "sys.trace.link"
)
28 changes: 26 additions & 2 deletions pkg/execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,28 @@ func (e *executor) Schedule(ctx context.Context, req execution.ScheduleRequest)
attribute.String(consts.OtelSysFunctionSlug, req.Function.GetSlug()),
attribute.Int(consts.OtelSysFunctionVersion, req.Function.FunctionVersion),
attribute.String(consts.OtelAttrSDKRunID, runID.String()),
attribute.String(consts.OtelSysFunctionStatus, enums.RunStatusScheduled.String()),
),
)
defer span.End()
if len(req.Events) > 1 {
span.SetAttributes(attribute.String(consts.OtelSysBatchID, req.BatchID.String()))
}
if req.Context != nil {
if val, ok := req.Context[consts.OtelPropagationLinkKey]; ok {
if link, ok := val.(string); ok {
span.SetAttributes(attribute.String(consts.OtelPropagationLinkKey, link))
}
}
}
for _, e := range req.Events {
evt := e.GetEvent()
if byt, err := json.Marshal(evt); err == nil {
span.AddEvent(string(byt), trace.WithAttributes(
attribute.Bool(consts.OtelSysEventData, true),
))
}
}

var key string
if req.IdempotencyKey != nil {
Expand Down Expand Up @@ -342,7 +358,7 @@ func (e *executor) Schedule(ctx context.Context, req execution.ScheduleRequest)
eventIDsStr = append(eventIDsStr, id.String())
}
spanID := telemetry.NewSpanID(ctx)
span.SetAttributes(attribute.StringSlice(consts.OtelSysEventIDs, eventIDsStr))
span.SetAttributes(attribute.String(consts.OtelSysEventIDs, strings.Join(eventIDsStr, ",")))

id := state.Identifier{
WorkflowID: req.Function.ID,
Expand Down Expand Up @@ -628,13 +644,20 @@ func (e *executor) Execute(ctx context.Context, id state.Identifier, item queue.
attribute.String(consts.OtelSysFunctionSlug, s.Function().GetSlug()),
attribute.Int(consts.OtelSysFunctionVersion, id.WorkflowVersion),
attribute.String(consts.OtelAttrSDKRunID, id.RunID.String()),
attribute.StringSlice(consts.OtelSysEventIDs, evtIDs),
attribute.String(consts.OtelSysEventIDs, strings.Join(evtIDs, ",")),
attribute.String(consts.OtelSysIdempotencyKey, id.IdempotencyKey()),
),
)
if len(id.EventIDs) > 1 {
fnSpan.SetAttributes(attribute.String(consts.OtelSysBatchID, id.BatchID.String()))
}
for _, e := range s.Events() {
if byt, err := json.Marshal(e); err == nil {
fnSpan.AddEvent(string(byt), trace.WithAttributes(
attribute.Bool(consts.OtelSysEventData, true),
))
}
}

ctx, span := telemetry.NewSpan(ctx,
telemetry.WithScope(consts.OtelScopeExecution),
Expand All @@ -649,6 +672,7 @@ func (e *executor) Execute(ctx context.Context, id state.Identifier, item queue.
attribute.Int(consts.OtelSysFunctionVersion, id.WorkflowVersion),
attribute.String(consts.OtelAttrSDKRunID, id.RunID.String()),
attribute.Int(consts.OtelSysStepAttempt, item.Attempt),
attribute.Int(consts.OtelSysStepMaxAttempt, item.GetMaxAttempts()),
attribute.String(consts.OtelSysStepGroupID, item.GroupID),
),
)
Expand Down
8 changes: 6 additions & 2 deletions pkg/telemetry/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ func WithSpanKind(k trace.SpanKind) SpanOpt {
}
}

func WithLinks(l []tracesdk.Link) SpanOpt {
func WithLinks(links ...tracesdk.Link) SpanOpt {
return func(s *spanOpt) {
s.links = l
for _, l := range links {
if l.SpanContext.IsValid() {
s.links = append(s.links, l)
}
}
}
}

Expand Down

0 comments on commit 0b7326d

Please sign in to comment.