Skip to content

Commit

Permalink
INN-2874: Incorporate tracer into codebase for user traces (#1237)
Browse files Browse the repository at this point in the history
This change introduces a singleton tracer that can be used for user traces.
We want to keep the default global singleton for system traces in the future, so we don't cause any conflicts between the two.

The change also add basic instrumentation from event ingestion to step runs for now.
The following are not added and will be follow ups.
- invoke
- fan out
- tracestate forwarding to SDKs

---------
Co-authored-by: Darwin D Wu <darwin67@users.noreply.github.com>
  • Loading branch information
darwin67 committed Mar 20, 2024
1 parent 07d141d commit fc9dcf3
Show file tree
Hide file tree
Showing 14 changed files with 357 additions and 83 deletions.
18 changes: 11 additions & 7 deletions cmd/commands/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ func doDev(cmd *cobra.Command, args []string) {
retryInterval, _ := cmd.Flags().GetInt("retry-interval")
tick, _ := cmd.Flags().GetInt("tick")

if err := telemetry.NewUserTracer(ctx, telemetry.TracerOpts{
ServiceName: "devserver",
Type: telemetry.TracerTypeNoop, // No-op so it doesn't blow up stdout for the user
}); err != nil {
fmt.Println(err)
os.Exit(1)
}
defer func() {
_ = telemetry.CloseUserTracer(ctx)
}()

opts := devserver.StartOpts{
Config: *conf,
URLs: urls,
Expand All @@ -86,13 +97,6 @@ func doDev(cmd *cobra.Command, args []string) {
Tick: time.Duration(tick) * time.Millisecond,
}

close, err := telemetry.TracerSetup("devserver", telemetry.TracerTypeNoop)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer close()

err = devserver.New(ctx, opts)
if err != nil {
fmt.Println(err)
Expand Down
4 changes: 3 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (a API) ReceiveEvent(w http.ResponseWriter, r *http.Request) {
StatusCode: http.StatusUnauthorized,
Error: "Event key is required",
})

return
}

Expand Down Expand Up @@ -158,7 +159,7 @@ func (a API) ReceiveEvent(w http.ResponseWriter, r *http.Request) {
evt.Timestamp = time.Now().UnixMilli()
}

id, err := a.handler(r.Context(), &evt)
id, err := a.handler(ctx, &evt)
if err != nil {
a.log.Error().Str("event", evt.Name).Err(err).Msg("error handling event")
return err
Expand Down Expand Up @@ -186,6 +187,7 @@ func (a API) ReceiveEvent(w http.ResponseWriter, r *http.Request) {
Status: 400,
Error: err,
})

return
}

Expand Down
23 changes: 23 additions & 0 deletions pkg/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@ import (

"github.com/go-chi/chi/v5"
"github.com/inngest/inngest/pkg/config"
"github.com/inngest/inngest/pkg/consts"
"github.com/inngest/inngest/pkg/event"
"github.com/inngest/inngest/pkg/logger"
"github.com/inngest/inngest/pkg/pubsub"
"github.com/inngest/inngest/pkg/service"
"github.com/inngest/inngest/pkg/telemetry"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

// NewService returns a new API service for ingesting events. Any additional
Expand Down Expand Up @@ -88,13 +94,21 @@ func (a *apiServer) handleEvent(ctx context.Context, e *event.Event) (string, er
l := logger.From(ctx).With().Str("caller", "api").Logger()
ctx = logger.With(ctx, l)

ctx, span := telemetry.UserTracer().Provider().
Tracer(consts.OtelScopeEventIngestion).
Start(ctx, "event-ingestion", trace.WithAttributes(
attribute.Bool(consts.OtelUserTraceFilterKey, true),
))
defer span.End()

l.Debug().Str("event", e.Name).Msg("handling event")

trackedEvent := event.NewOSSTrackedEvent(*e)

byt, err := json.Marshal(trackedEvent)
if err != nil {
l.Error().Err(err).Msg("error unmarshalling event as JSON")
span.SetStatus(codes.Error, "error parsing event as JSON")
return "", err
}

Expand All @@ -105,15 +119,24 @@ func (a *apiServer) handleEvent(ctx context.Context, e *event.Event) (string, er
Interface("event", trackedEvent.GetEvent()).
Msg("publishing event")

carrier := telemetry.NewTraceCarrier()
telemetry.UserTracer().Propagator().Inject(ctx, propagation.MapCarrier(carrier.Context))

err = a.publisher.Publish(
ctx,
a.config.EventStream.Service.TopicName(),
pubsub.Message{
Name: event.EventReceivedName,
Data: string(byt),
Timestamp: time.Now(),
Metadata: map[string]any{
consts.OtelPropagationKey: carrier,
},
},
)
if err != nil {
span.SetStatus(codes.Error, err.Error())
}

return trackedEvent.GetInternalID().String(), err
}
Expand Down
48 changes: 48 additions & 0 deletions pkg/consts/otel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package consts

const (
// system attributes
OtelSysAccountID = "sys.account.id"
OtelSysWorkspaceID = "sys.workspace.id"
OtelSysAppID = "sys.app.id"
OtelSysEventData = "sys.event"
OtelSysEventRequestID = "sys.event.request.id"
OtelSysEventIDs = "sys.event.ids"
OtelSysBatchID = "sys.batch.id"
OtelSysIdempotencyKey = "sys.idempotency.key"

OtelSysFunctionID = "sys.function.id"
OtelSysFunctionSlug = "sys.function.slug"
OtelSysFunctionVersion = "sys.function.version"
OtelSysFunctionScheduleAt = "sys.function.time.schedule"
OtelSysFunctionStartAt = "sys.function.time.start"
OtelSysFunctionEndAt = "sys.function.time.end"
OtelSysFunctionStatus = "sys.function.status"
OtelSysFunctionOutput = "sys.function.output"

OtelSysStepScheduleAt = "sys.step.time.schedule"
OtelSysStepStartAt = "sys.step.time.start"
OtelSysStepEndAt = "sys.step.time.end"
OtelSysStepStatus = "sys.step.status"
OtelSysStepAttempt = "sys.step.attempt"
OtelSysStepOutput = "sys.step.output"
OtelSysStepOutputSizeBytes = "sys.step.output.size.bytes"

// SDK attributes
OtelAttrSDKServiceName = "sdk.app.id"
OtelAttrSDKRunID = "sdk.run.id"

// otel scopes
OtelScopeEventAPI = "event.api.inngest"
OtelScopeEventIngestion = "event.inngest"
OtelScopeCron = "cron.inngest"
OtelScopeEnv = "env.inngest"
OtelScopeApp = "app.env.inngest"
OtelScopeFunction = "function.app.env.inngest"
OtelScopeStep = "step.function.app.env.inngest"

// otel collector filter keys
OtelUserTraceFilterKey = "inngest.user"

OtelPropagationKey = "sys.trace"
)
92 changes: 90 additions & 2 deletions pkg/execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/fatih/structs"
"github.com/google/uuid"
"github.com/gosimple/slug"
"github.com/inngest/inngest/pkg/consts"
"github.com/inngest/inngest/pkg/enums"
"github.com/inngest/inngest/pkg/event"
Expand All @@ -30,9 +31,14 @@ import (
"github.com/inngest/inngest/pkg/inngest"
"github.com/inngest/inngest/pkg/inngest/log"
"github.com/inngest/inngest/pkg/logger"
"github.com/inngest/inngest/pkg/telemetry"
"github.com/oklog/ulid/v2"
"github.com/rs/zerolog"
"github.com/xhit/go-str2duration/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -283,6 +289,19 @@ func (e *executor) Schedule(ctx context.Context, req execution.ScheduleRequest)
return nil, ErrFunctionDebounced
}

ctx, span := telemetry.UserTracer().Provider().
Tracer(consts.OtelScopeFunction).
Start(ctx, req.Function.GetSlug(), trace.WithAttributes(
attribute.Bool(consts.OtelUserTraceFilterKey, true),
attribute.String(consts.OtelSysAccountID, req.AccountID.String()),
attribute.String(consts.OtelSysWorkspaceID, req.WorkspaceID.String()),
attribute.String(consts.OtelSysAppID, req.AppID.String()),
attribute.String(consts.OtelSysFunctionID, req.Function.ID.String()),
attribute.String(consts.OtelSysFunctionSlug, req.Function.GetSlug()),
attribute.Int(consts.OtelSysFunctionVersion, req.Function.FunctionVersion),
))
defer span.End()

// Run IDs are created embedding the timestamp now, when the function is being scheduled.
// When running a cancellation, functions are cancelled at scheduling time based off of
// this run ID.
Expand All @@ -308,8 +327,22 @@ func (e *executor) Schedule(ctx context.Context, req execution.ScheduleRequest)
}

eventIDs := []ulid.ULID{}
spanEventIDs := []string{}
for _, e := range req.Events {
eventIDs = append(eventIDs, e.GetInternalID())
id := e.GetInternalID()
eventIDs = append(eventIDs, id)
spanEventIDs = append(spanEventIDs, id.String())
}

span.SetAttributes(
attribute.String(consts.OtelAttrSDKRunID, runID.String()),
attribute.StringSlice(consts.OtelSysEventIDs, spanEventIDs),
attribute.String(consts.OtelSysIdempotencyKey, key),
)
if len(req.Events) > 1 {
span.SetAttributes(
attribute.String(consts.OtelSysBatchID, req.BatchID.String()),
)
}

id := state.Identifier{
Expand Down Expand Up @@ -371,11 +404,20 @@ func (e *executor) Schedule(ctx context.Context, req execution.ScheduleRequest)
id.PriorityFactor = &factor
}

// Inject trace context into state metadata
stateMetadata := map[string]any{}
if req.Context != nil {
stateMetadata = req.Context
}
carrier := telemetry.NewTraceCarrier()
telemetry.UserTracer().Propagator().Inject(ctx, propagation.MapCarrier(carrier.Context))
stateMetadata[consts.OtelPropagationKey] = carrier

// Create a new function.
s, err := e.sm.New(ctx, state.Input{
Identifier: id,
EventBatchData: mapped,
Context: req.Context,
Context: stateMetadata,
})
if err == state.ErrIdentifierExists {
// This function was already created.
Expand Down Expand Up @@ -510,6 +552,29 @@ func (e *executor) Execute(ctx context.Context, id state.Identifier, item queue.
// reads in the future.
ctx = WithContextMetadata(ctx, md)

// Propagate trace context
if md.Context != nil {
if trace, ok := md.Context[consts.OtelPropagationKey]; ok {
carrier := telemetry.NewTraceCarrier()
if err := carrier.Unmarshal(trace); err == nil {
ctx = telemetry.UserTracer().Propagator().Extract(ctx, propagation.MapCarrier(carrier.Context))
}
}
}

ctx, span := telemetry.UserTracer().Provider().
Tracer(consts.OtelScopeStep).
Start(ctx, "running", trace.WithAttributes(
attribute.Bool(consts.OtelUserTraceFilterKey, true),
attribute.String(consts.OtelSysAccountID, id.AccountID.String()),
attribute.String(consts.OtelSysWorkspaceID, id.WorkspaceID.String()),
attribute.String(consts.OtelSysAppID, id.AppID.String()),
attribute.String(consts.OtelSysFunctionID, id.WorkflowID.String()),
attribute.Int(consts.OtelSysFunctionVersion, id.WorkflowVersion),
attribute.String(consts.OtelAttrSDKRunID, id.RunID.String()),
))
defer span.End()

if md.Status == enums.RunStatusCancelled {
return nil, state.ErrFunctionCancelled
}
Expand Down Expand Up @@ -615,9 +680,32 @@ func (e *executor) Execute(ctx context.Context, id state.Identifier, item queue.

resp, err := e.run(ctx, id, item, edge, s, stackIndex)
if resp == nil && err != nil {
span.SetStatus(codes.Error, err.Error())
if byt, err := json.Marshal(err.Error()); err == nil {
span.AddEvent(string(byt), trace.WithAttributes(
attribute.Bool(consts.OtelSysStepOutput, true),
))
}

return nil, err
}

if resp != nil {
spanName := strings.ToLower(slug.Make(resp.Step.Name))
span.SetName(spanName)

span.SetAttributes(
attribute.Int(consts.OtelSysStepStatus, resp.StatusCode),
attribute.Int(consts.OtelSysStepOutputSizeBytes, resp.OutputSize),
)

if byt, err := json.Marshal(resp.Output); err == nil {
span.AddEvent(string(byt), trace.WithAttributes(
attribute.Bool(consts.OtelSysStepOutput, true),
))
}
}

err = e.HandleResponse(ctx, id, item, edge, resp)
return resp, err
}
Expand Down
24 changes: 23 additions & 1 deletion pkg/execution/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"github.com/inngest/inngest/pkg/config"
"github.com/inngest/inngest/pkg/consts"
"github.com/inngest/inngest/pkg/cqrs"
"github.com/inngest/inngest/pkg/enums"
"github.com/inngest/inngest/pkg/event"
Expand All @@ -24,8 +25,12 @@ import (
"github.com/inngest/inngest/pkg/logger"
"github.com/inngest/inngest/pkg/pubsub"
"github.com/inngest/inngest/pkg/service"
"github.com/inngest/inngest/pkg/telemetry"
"github.com/oklog/ulid/v2"
"github.com/robfig/cron/v3"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

const (
Expand Down Expand Up @@ -233,7 +238,15 @@ func (s *svc) InitializeCrons(ctx context.Context) error {
}
cron := t.CronTrigger.Cron
_, err := s.cronmanager.AddFunc(cron, func() {
err := s.initialize(context.Background(), fn, event.NewOSSTrackedEvent(event.Event{
ctx, span := telemetry.UserTracer().Provider().
Tracer(consts.OtelScopeCron).
Start(ctx, "cron", trace.WithAttributes(
attribute.String(consts.OtelSysFunctionID, fn.ID.String()),
attribute.Int(consts.OtelSysFunctionVersion, fn.FunctionVersion),
))
defer span.End()

err := s.initialize(ctx, fn, event.NewOSSTrackedEvent(event.Event{
Data: map[string]any{
"cron": cron,
},
Expand Down Expand Up @@ -295,6 +308,15 @@ func (s *svc) handleMessage(ctx context.Context, m pubsub.Message) error {
return fmt.Errorf("unknown event type: %s", m.Name)
}

if m.Metadata != nil {
if trace, ok := m.Metadata[consts.OtelPropagationKey]; ok {
carrier := telemetry.NewTraceCarrier()
if err := carrier.Unmarshal(trace); err == nil {
ctx = telemetry.UserTracer().Propagator().Extract(ctx, propagation.MapCarrier(carrier.Context))
}
}
}

var tracked event.TrackedEvent
var err error

Expand Down
5 changes: 0 additions & 5 deletions pkg/execution/state/redis_state/consts.go

This file was deleted.

Loading

0 comments on commit fc9dcf3

Please sign in to comment.