Skip to content

Commit

Permalink
Move async gauge instrumentation as part of initialization (#1352)
Browse files Browse the repository at this point in the history
The current otel gauge APIs only allow observables, and not sync instrumentation.
Move them to queue initialization instead so we don't attempt to register the callbacks multiple times. Which also contributes to memory growth.

Ref:
https://pkg.go.dev/go.opentelemetry.io/otel/metric#hdr-Measurements

Also add an internal registry to make sure metrics are not registered multiple times.

---------
Co-authored-by: Darwin D Wu <darwin67@users.noreply.github.com>
  • Loading branch information
darwin67 and darwin67 committed May 13, 2024
1 parent 2f4146d commit bda3400
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 109 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ Thumbs.db
/.vercel/

.idea
/tmp/
46 changes: 46 additions & 0 deletions pkg/execution/state/redis_state/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
osqueue "github.com/inngest/inngest/pkg/execution/queue"
"github.com/inngest/inngest/pkg/execution/state"
"github.com/inngest/inngest/pkg/logger"
"github.com/inngest/inngest/pkg/telemetry"
"github.com/oklog/ulid/v2"
"github.com/redis/rueidis"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -215,6 +216,51 @@ func WithQueueItemIndexer(i QueueItemIndexer) QueueOpt {
}
}

// WithAsyncInstrumentation registers all the async instrumentation that needs to happen on
// each instrumentation cycle
// These are mostly gauges for point in time metrics
func WithAsyncInstrumentation(ctx context.Context) QueueOpt {
return func(q *queue) {
telemetry.GaugeWorkerQueueCapacity(ctx, q.capacity(), telemetry.GaugeOpt{PkgName: pkgName})

telemetry.GaugeGlobalQueuePartitionCount(ctx, telemetry.GaugeOpt{
PkgName: pkgName,
Observer: func(ctx context.Context) (int64, error) {
dur := time.Hour * 24 * 365
return q.partitionSize(ctx, q.kg.GlobalPartitionIndex(), getNow().Add(dur))
},
})

telemetry.GaugeGlobalQueuePartitionAvailable(ctx, telemetry.GaugeOpt{
PkgName: pkgName,
Observer: func(ctx context.Context) (int64, error) {
return q.partitionSize(ctx, q.kg.GlobalPartitionIndex(), getNow().Add(PartitionLookahead))
},
})

// Shard instrumentations
shards, err := q.getShards(ctx)
if err != nil {
q.logger.Error().Err(err).Msg("error retrieving shards")
}

telemetry.GaugeQueueShardCount(ctx, int64(len(shards)), telemetry.GaugeOpt{PkgName: pkgName})
for _, shard := range shards {
tags := map[string]any{"shard_name": shard.Name}

telemetry.GaugeQueueShardGuaranteedCapacityCount(ctx, int64(shard.GuaranteedCapacity), telemetry.GaugeOpt{PkgName: pkgName, Tags: tags})
telemetry.GaugeQueueShardLeaseCount(ctx, int64(len(shard.Leases)), telemetry.GaugeOpt{PkgName: pkgName, Tags: tags})
telemetry.GaugeQueueShardPartitionAvailableCount(ctx, telemetry.GaugeOpt{
PkgName: pkgName,
Tags: tags,
Observer: func(ctx context.Context) (int64, error) {
return q.partitionSize(ctx, q.kg.ShardPartitionIndex(shard.Name), getNow().Add(PartitionLookahead))
},
})
}
}
}

// WithDenyQueueNames specifies that the worker cannot select jobs from queue partitions
// within the given list of names. This means that the worker will never work on jobs
// in the specified queues.
Expand Down
68 changes: 5 additions & 63 deletions pkg/execution/state/redis_state/queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func (q *queue) Run(ctx context.Context, f osqueue.RunFunc) error {
go q.worker(ctx, f)
}

go q.queueGauges(ctx)
go q.claimShards(ctx)
go q.claimSequentialLease(ctx)
go q.runScavenger(ctx)
Expand Down Expand Up @@ -198,9 +197,6 @@ func (q *queue) claimShards(ctx context.Context) {
return
}

// Report shard gauge metrics.
go q.shardGauges(ctx)

scanTick := time.NewTicker(ShardTickTime)
leaseTick := time.NewTicker(ShardLeaseTime / 2)

Expand Down Expand Up @@ -976,10 +972,11 @@ func (q *queue) process(ctx context.Context, p QueuePartition, qi QueueItem, s *
// Update the ewma
latencySem.Lock()
latencyAvg.Add(float64(latency))
telemetry.GaugeQueueItemLatencyEWMA(ctx, int64(latencyAvg.Value()/1e6), telemetry.GaugeOpt{
PkgName: pkgName,
Tags: map[string]any{"kind": qi.Data.Kind},
})
// TODO: Add this back when sync gauge instrumentation is available - https://github.com/open-telemetry/opentelemetry-go/pull/5304
// telemetry.GaugeQueueItemLatencyEWMA(ctx, int64(latencyAvg.Value()/1e6), telemetry.GaugeOpt{
// PkgName: pkgName,
// Tags: map[string]any{"kind": qi.Data.Kind},
// })
latencySem.Unlock()

// Set the metrics historgram and gauge, which reports the ewma value.
Expand Down Expand Up @@ -1132,61 +1129,6 @@ func (q *queue) isScavenger() bool {
return ulid.Time(l.Time()).After(getNow())
}

func (q *queue) queueGauges(ctx context.Context) {
// Report gauges to otel.
telemetry.GaugeWorkerQueueCapacity(ctx, q.capacity(), telemetry.GaugeOpt{PkgName: pkgName})

telemetry.GaugeGlobalQueuePartitionCount(ctx, telemetry.GaugeOpt{
PkgName: pkgName,
Observer: func(ctx context.Context) (int64, error) {
dur := time.Hour * 24 * 365
return q.partitionSize(ctx, q.kg.GlobalPartitionIndex(), getNow().Add(dur))
},
})

telemetry.GaugeGlobalQueuePartitionAvailable(ctx, telemetry.GaugeOpt{
PkgName: pkgName,
Observer: func(ctx context.Context) (int64, error) {
return q.partitionSize(ctx, q.kg.GlobalPartitionIndex(), getNow().Add(PartitionLookahead))
},
})
}

// shardGauges reports shard gauges via otel.
func (q *queue) shardGauges(ctx context.Context) {
tick := time.NewTicker(ShardTickTime)
for {
select {
case <-ctx.Done():
tick.Stop()
return
case <-tick.C:
// Reload shards.
shards, err := q.getShards(ctx)
if err != nil {
q.logger.Error().Err(err).Msg("error retrieving shards")
}

// Report gauges to otel.
telemetry.GaugeQueueShardCount(ctx, int64(len(shards)), telemetry.GaugeOpt{PkgName: pkgName})

for _, shard := range shards {
tags := map[string]any{"shard_name": shard.Name}

telemetry.GaugeQueueShardGuaranteedCapacityCount(ctx, int64(shard.GuaranteedCapacity), telemetry.GaugeOpt{PkgName: pkgName, Tags: tags})
telemetry.GaugeQueueShardLeaseCount(ctx, int64(len(shard.Leases)), telemetry.GaugeOpt{PkgName: pkgName, Tags: tags})
telemetry.GaugeQueueShardPartitionAvailableCount(ctx, telemetry.GaugeOpt{
PkgName: pkgName,
Tags: tags,
Observer: func(ctx context.Context) (int64, error) {
return q.partitionSize(ctx, q.kg.ShardPartitionIndex(shard.Name), getNow().Add(PartitionLookahead))
},
})
}
}
}
}

// trackingSemaphore returns a semaphore that tracks closely - but not atomically -
// the total number of items in the semaphore. This is best effort, and is loosely
// accurate to reduce further contention.
Expand Down
18 changes: 9 additions & 9 deletions pkg/telemetry/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type GaugeOpt struct {
}

func GaugeQueueItemLatencyEWMA(ctx context.Context, value int64, opts GaugeOpt) {
recordGaugeMetric(ctx, gaugeOpt{
registerAsyncGauge(ctx, gaugeOpt{
Name: opts.PkgName,
MetricName: "queue_item_latency_ewma",
Description: "The moving average of the queue item latency",
Expand All @@ -21,7 +21,7 @@ func GaugeQueueItemLatencyEWMA(ctx context.Context, value int64, opts GaugeOpt)
}

func GaugeWorkerQueueCapacity(ctx context.Context, value int64, opts GaugeOpt) {
recordGaugeMetric(ctx, gaugeOpt{
registerAsyncGauge(ctx, gaugeOpt{
Name: opts.PkgName,
MetricName: "queue_capacity_total",
Description: "Capacity of current worker",
Expand All @@ -33,17 +33,17 @@ func GaugeWorkerQueueCapacity(ctx context.Context, value int64, opts GaugeOpt) {
}

func GaugeGlobalQueuePartitionCount(ctx context.Context, opts GaugeOpt) {
recordGaugeMetric(ctx, gaugeOpt{
registerAsyncGauge(ctx, gaugeOpt{
Name: opts.PkgName,
MetricName: "queue_global_partition_total_count",
MetricName: "queue_global_partition_count",
Description: "Number of total partitions in the global queue",
Attributes: opts.Tags,
Callback: opts.Observer,
})
}

func GaugeGlobalQueuePartitionAvailable(ctx context.Context, opts GaugeOpt) {
recordGaugeMetric(ctx, gaugeOpt{
registerAsyncGauge(ctx, gaugeOpt{
Name: opts.PkgName,
MetricName: "queue_global_partition_available_count",
Description: "Number of available partitions in the global queue",
Expand All @@ -53,7 +53,7 @@ func GaugeGlobalQueuePartitionAvailable(ctx context.Context, opts GaugeOpt) {
}

func GaugeQueueShardCount(ctx context.Context, value int64, opts GaugeOpt) {
recordGaugeMetric(ctx, gaugeOpt{
registerAsyncGauge(ctx, gaugeOpt{
Name: opts.PkgName,
MetricName: "queue_shards_count",
Description: "Number of shards in the queue",
Expand All @@ -65,7 +65,7 @@ func GaugeQueueShardCount(ctx context.Context, value int64, opts GaugeOpt) {
}

func GaugeQueueShardGuaranteedCapacityCount(ctx context.Context, value int64, opts GaugeOpt) {
recordGaugeMetric(ctx, gaugeOpt{
registerAsyncGauge(ctx, gaugeOpt{
Name: opts.PkgName,
MetricName: "queue_shards_guaranteed_capacity_count",
Description: "Shard guaranteed capacity",
Expand All @@ -77,7 +77,7 @@ func GaugeQueueShardGuaranteedCapacityCount(ctx context.Context, value int64, op
}

func GaugeQueueShardLeaseCount(ctx context.Context, value int64, opts GaugeOpt) {
recordGaugeMetric(ctx, gaugeOpt{
registerAsyncGauge(ctx, gaugeOpt{
Name: opts.PkgName,
MetricName: "queue_shards_lease_count",
Description: "Shard current lease count",
Expand All @@ -89,7 +89,7 @@ func GaugeQueueShardLeaseCount(ctx context.Context, value int64, opts GaugeOpt)
}

func GaugeQueueShardPartitionAvailableCount(ctx context.Context, opts GaugeOpt) {
recordGaugeMetric(ctx, gaugeOpt{
registerAsyncGauge(ctx, gaugeOpt{
Name: opts.PkgName,
MetricName: "queue_shard_partition_available_count",
Description: "The number of shard partitions available",
Expand Down
Loading

0 comments on commit bda3400

Please sign in to comment.