From 9b938851b8200d44f25e3d16aa1188b5dc8a4bb1 Mon Sep 17 00:00:00 2001 From: Darwin <5746693+darwin67@users.noreply.github.com> Date: Wed, 8 May 2024 07:22:46 -0700 Subject: [PATCH] Add queue peek size control and additional metrics (#1327) Add attribute to control the queue peek size. Also add additional metrics for instrumenting queue performance, and reorganize some of the existing counters into `telemetry`. `scope` and `meter` has been removed from the queue attributes. these should be accessible globally without being embedded. --------- Co-authored-by: Darwin D Wu --- pkg/execution/state/redis_state/queue.go | 50 ++-- .../state/redis_state/queue_processor.go | 227 ++++++------------ pkg/telemetry/counter.go | 152 ++++++++++++ pkg/telemetry/gauge.go | 99 ++++++++ pkg/telemetry/histogram.go | 30 +++ pkg/telemetry/utils.go | 18 +- 6 files changed, 382 insertions(+), 194 deletions(-) create mode 100644 pkg/telemetry/counter.go create mode 100644 pkg/telemetry/gauge.go create mode 100644 pkg/telemetry/histogram.go diff --git a/pkg/execution/state/redis_state/queue.go b/pkg/execution/state/redis_state/queue.go index 2774fffa3..636152cf9 100644 --- a/pkg/execution/state/redis_state/queue.go +++ b/pkg/execution/state/redis_state/queue.go @@ -24,9 +24,6 @@ import ( "github.com/oklog/ulid/v2" "github.com/redis/rueidis" "github.com/rs/zerolog" - "github.com/uber-go/tally/v4" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/metric" "gonum.org/v1/gonum/stat/sampleuv" "lukechampine.com/frand" ) @@ -37,6 +34,7 @@ var ( ) const ( + pkgName = "redis_state.state.execution.inngest" // PartitionLeaseDuration dictates how long a worker holds the lease for // a partition. This gives the worker a right to scan all queue items @@ -62,7 +60,7 @@ const ( PartitionLookahead = time.Second QueuePeekMax int64 = 1000 - QueuePeekDefault int64 = 200 + QueuePeekDefault int64 = 250 QueueLeaseDuration = 10 * time.Second ConfigLeaseDuration = 10 * time.Second ConfigLeaseMax = 20 * time.Second @@ -154,71 +152,65 @@ func WithName(name string) func(q *queue) { } } -func WithQueueLifecycles(l ...QueueLifecycleListener) func(q *queue) { +func WithQueueLifecycles(l ...QueueLifecycleListener) QueueOpt { return func(q *queue) { q.lifecycles = l } } -func WithMetricsScope(scope tally.Scope) func(q *queue) { - return func(q *queue) { - q.scope = scope - } -} - -func WithPriorityFinder(pf PriorityFinder) func(q *queue) { +func WithPriorityFinder(pf PriorityFinder) QueueOpt { return func(q *queue) { q.pf = pf } } -func WithShardFinder(sf ShardFinder) func(q *queue) { +func WithShardFinder(sf ShardFinder) QueueOpt { return func(q *queue) { q.sf = sf } } -func WithQueueKeyGenerator(kg QueueKeyGenerator) func(q *queue) { +func WithQueueKeyGenerator(kg QueueKeyGenerator) QueueOpt { return func(q *queue) { q.kg = kg } } -func WithIdempotencyTTL(t time.Duration) func(q *queue) { +func WithIdempotencyTTL(t time.Duration) QueueOpt { return func(q *queue) { q.idempotencyTTL = t } } -func WithOtelMeter(m metric.Meter) func(q *queue) { - return func(q *queue) { - q.meter = m - } -} - // WithIdempotencyTTLFunc returns custom idempotecy durations given a QueueItem. // This allows customization of the idempotency TTL based off of specific jobs. -func WithIdempotencyTTLFunc(f func(context.Context, QueueItem) time.Duration) func(q *queue) { +func WithIdempotencyTTLFunc(f func(context.Context, QueueItem) time.Duration) QueueOpt { return func(q *queue) { q.idempotencyTTLFunc = f } } -func WithNumWorkers(n int32) func(q *queue) { +func WithNumWorkers(n int32) QueueOpt { return func(q *queue) { q.numWorkers = n } } +func WithPeekSize(n int64) QueueOpt { + return func(q *queue) { + q.peek = n + } +} + // WithPollTick specifies the interval at which the queue will poll the backing store // for available partitions. -func WithPollTick(t time.Duration) func(q *queue) { +func WithPollTick(t time.Duration) QueueOpt { return func(q *queue) { q.pollTick = t } } -func WithQueueItemIndexer(i QueueItemIndexer) func(q *queue) { +func WithQueueItemIndexer(i QueueItemIndexer) QueueOpt { return func(q *queue) { q.itemIndexer = i } @@ -345,8 +337,6 @@ func NewQueue(r rueidis.Client, opts ...QueueOpt) *queue { pollTick: defaultPollTick, idempotencyTTL: defaultIdempotencyTTL, queueKindMapping: make(map[string]string), - scope: tally.NoopScope, - meter: otel.Meter("redis_state.queue"), logger: logger.From(context.Background()), partitionConcurrencyGen: func(ctx context.Context, p QueuePartition) (string, int) { return p.Queue(), 10_000 @@ -399,6 +389,8 @@ type queue struct { wg *sync.WaitGroup // numWorkers stores the number of workers available to concurrently process jobs. numWorkers int32 + // peek sets the number of items to check on queue peeks + peek int64 // workers is a buffered channel which allows scanners to send queue items // to workers to be processed workers chan processItem @@ -444,10 +436,6 @@ type queue struct { shardLeases []leasedShard shardLeaseLock *sync.Mutex - // metrics allows reporting of metrics - scope tally.Scope - meter metric.Meter - // backoffFunc is the backoff function to use when retrying operations. backoffFunc backoff.BackoffFunc } diff --git a/pkg/execution/state/redis_state/queue_processor.go b/pkg/execution/state/redis_state/queue_processor.go index e62aac510..3b1dd2101 100644 --- a/pkg/execution/state/redis_state/queue_processor.go +++ b/pkg/execution/state/redis_state/queue_processor.go @@ -13,10 +13,8 @@ import ( "github.com/VividCortex/ewma" osqueue "github.com/inngest/inngest/pkg/execution/queue" "github.com/inngest/inngest/pkg/execution/state" + "github.com/inngest/inngest/pkg/telemetry" "github.com/oklog/ulid/v2" - "github.com/uber-go/tally/v4" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "gonum.org/v1/gonum/stat/sampleuv" @@ -24,25 +22,6 @@ import ( const ( minWorkersFree = 5 - - // Mtric consts - counterQueueItemsStarted = "queue_items_started_total" // Queue item started - counterQueueItemsErrored = "queue_items_errored_total" // Queue item errored - counterQueueItemsComplete = "queue_items_complete_total" // Queue item finished - counterQueueItemsEnqueued = "queue_items_enqueued_total" // Item enqueued - counterQueueItemsProcessLeaseExists = "queue_items_process_lease_exists_total" // Scanned an item with an exisitng lease - counterQueueItemsLeaseConflict = "queue_items_lease_conflict_total" // Attempt to lease an item with an existing lease - counterQueueItemsGone = "queue_items_gone_total" // Attempt to lease a dequeued item - counterSequentialLeaseClaims = "queue_sequential_lease_claims_total" // Sequential lease claimed by worker - counterPartitionProcessNoCapacity = "partition_process_no_capacity_total" // Processing items but there's no more capacity - counterPartitionProcessItems = "partition_process_items_total" // Leased a queue item within a partition to begin work - counterConcurrencyLimit = "concurrency_limit_processing_total" - counterPartitionProcess = "partition_process_total" - counterPartitionLeaseConflict = "partition_lease_conflict_total" - counterPartitionConcurrencyLimitReached = "partition_concurrency_limit_reached_total" - counterPartitionGone = "partition_gone_total" - gaugeQueueItemLatencyEWMA = "queue_item_latency_ewma" - histogramItemLatency = "queue_item_latency_duration" ) var ( @@ -59,20 +38,6 @@ var ( latencyAvg ewma.MovingAverage latencySem *sync.Mutex - latencyBuckets = tally.DurationBuckets{ - time.Millisecond, - 5 * time.Millisecond, - 10 * time.Millisecond, - 25 * time.Millisecond, - 50 * time.Millisecond, - 100 * time.Millisecond, - 250 * time.Millisecond, - 500 * time.Millisecond, - time.Second, - 10 * time.Second, - time.Minute, - } - startedAtKey = startedAtCtxKey{} sojournKey = sojournCtxKey{} latencyKey = latencyCtxKey{} @@ -130,9 +95,10 @@ func (q *queue) Enqueue(ctx context.Context, item osqueue.Item, at time.Time) er queueName = item.QueueName } - go q.scope.Tagged(map[string]string{ - "kind": item.Kind, - }).Counter(counterQueueItemsEnqueued).Inc(1) + telemetry.IncrQueueItemEnqueuedCounter(ctx, telemetry.CounterOpt{ + PkgName: pkgName, + Tags: map[string]any{"kind": item.Kind}, + }) qi := QueueItem{ ID: id, @@ -489,7 +455,7 @@ func (q *queue) claimSequentialLease(ctx context.Context) { if q.seqLeaseID == nil { // Only track this if we're creating a new lease, not if we're renewing // a lease. - go q.scope.Counter(counterSequentialLeaseClaims).Inc(1) + telemetry.IncrQueueSequentialLeaseClaimsCounter(ctx, telemetry.CounterOpt{PkgName: pkgName}) } q.seqLeaseID = leaseID q.seqLeaseLock.Unlock() @@ -552,7 +518,7 @@ func (q *queue) runScavenger(ctx context.Context) { if q.scavengerLeaseID == nil { // Only track this if we're creating a new lease, not if we're renewing // a lease. - go q.scope.Counter(counterSequentialLeaseClaims).Inc(1) + telemetry.IncrQueueSequentialLeaseClaimsCounter(ctx, telemetry.CounterOpt{PkgName: pkgName}) } q.scavengerLeaseID = leaseID q.scavengerLeaseLock.Unlock() @@ -631,7 +597,7 @@ func (q *queue) scan(ctx context.Context) error { if q.capacity() == 0 { // no longer any available workers for partition, so we can skip // work - q.int64counter(ctx, "inngest_queue_scan_no_capacity_total", 1) + telemetry.IncrQueueScanNoCapacityCounter(ctx, telemetry.CounterOpt{PkgName: pkgName}) return nil } if err := q.processPartition(ctx, &p, shard); err != nil { @@ -647,9 +613,9 @@ func (q *queue) scan(ctx context.Context) error { return err } - q.int64counter(ctx, "inngest_queue_partition_processed_total", 1, attribute.KeyValue{ - Key: attribute.Key("shard"), - Value: attribute.StringValue(metricShardName), + telemetry.IncrQueuePartitionProcessedCounter(ctx, telemetry.CounterOpt{ + PkgName: pkgName, + Tags: map[string]any{"shard": metricShardName}, }) return nil }) @@ -661,8 +627,6 @@ func (q *queue) scan(ctx context.Context) error { // NOTE: Shard is only passed as a reference if the partition was peeked from // a shard. It exists for accounting and tracking purposes only, eg. to report shard metrics. func (q *queue) processPartition(ctx context.Context, p *QueuePartition, shard *QueueShard) error { - q.scope.Counter(counterPartitionProcess).Inc(1) - // Attempt to lease items. This checks partition-level concurrency limits // // For oprimization, because this is the only thread that can be leasing @@ -683,18 +647,18 @@ func (q *queue) processPartition(ctx context.Context, p *QueuePartition, shard * // scanning of jobs altogether. go l.OnConcurrencyLimitReached(context.WithoutCancel(ctx), p.WorkflowID) } - q.scope.Counter(counterPartitionConcurrencyLimitReached).Inc(1) + telemetry.IncrQueuePartitionConcurrencyLimitCounter(ctx, telemetry.CounterOpt{PkgName: pkgName}) return q.PartitionRequeue(ctx, p, getNow().Truncate(time.Second).Add(PartitionConcurrencyLimitRequeueExtension), true) } if err == ErrPartitionAlreadyLeased { - q.scope.Counter(counterPartitionLeaseConflict).Inc(1) + telemetry.IncrQueuePartitionLeaseContentionCounter(ctx, telemetry.CounterOpt{PkgName: pkgName}) return nil } if err == ErrPartitionNotFound { // Another worker must have pocessed this partition between // this worker's peek and process. Increase partition // contention metric and continue. This is unsolvable. - q.scope.Counter(counterPartitionGone).Inc(1) + telemetry.IncrPartitionGoneCounter(ctx, telemetry.CounterOpt{PkgName: pkgName}) return nil } if err != nil { @@ -718,6 +682,7 @@ func (q *queue) processPartition(ctx context.Context, p *QueuePartition, shard * if err != nil { return err } + telemetry.IncrQueuePeekedCounter(ctx, int64(len(queue)), telemetry.CounterOpt{PkgName: pkgName}) var ( processErr error @@ -729,7 +694,7 @@ func (q *queue) processPartition(ctx context.Context, p *QueuePartition, shard * ) // Record the number of partitions we're leasing. - q.int64counter(ctx, "inngest_queue_partition_lease_total", 1) + telemetry.IncrQueuePartitionLeasedCounter(ctx, telemetry.CounterOpt{PkgName: pkgName}) // staticTime is used as the processing time for all items in the queue. // We process queue items sequentially, and time progresses linearly as each @@ -749,19 +714,19 @@ ProcessLoop: if q.capacity() == 0 { // no longer any available workers for partition, so we can skip // work for now. - q.int64counter(ctx, "inngest_queue_process_no_capacity_total", 1) + telemetry.IncrQueueProcessNoCapacityCounter(ctx, telemetry.CounterOpt{PkgName: pkgName}) break ProcessLoop } item := qi if item.IsLeased(getNow()) { - q.int64counter(ctx, "inngest_queue_partition_lease_contention_total", 1) + telemetry.IncrQueueItemLeaseContentionCounter(ctx, telemetry.CounterOpt{PkgName: pkgName}) continue } // Cbeck if there's capacity from our local workers atomically prior to leasing our tiems. if !q.sem.TryAcquire(1) { - q.int64counter(ctx, "inngest_queue_partition_process_no_capacity_total", 1) + telemetry.IncrQueuePartitionProcessNoCapacityCounter(ctx, telemetry.CounterOpt{PkgName: pkgName}) break } @@ -815,7 +780,7 @@ ProcessLoop: case ErrQueueItemThrottled: ctrRateLimit++ processErr = nil - q.int64counter(ctx, "inngest_queue_throttled_total", 1) + telemetry.IncrQueueThrottledCounter(ctx, telemetry.CounterOpt{PkgName: pkgName}) continue case ErrPartitionConcurrencyLimit, ErrAccountConcurrencyLimit: ctrConcurrency++ @@ -885,7 +850,7 @@ ProcessLoop: go l.OnConcurrencyLimitReached(context.WithoutCancel(ctx), p.WorkflowID) } // Requeue this partition as we hit concurrency limits. - q.int64counter(ctx, "inngest_queue_partition_concurrency_limit_total", 1) + telemetry.IncrQueuePartitionConcurrencyLimitCounter(ctx, telemetry.CounterOpt{PkgName: pkgName}) return q.PartitionRequeue(ctx, p, getNow().Truncate(time.Second).Add(PartitionConcurrencyLimitRequeueExtension), true) } @@ -915,10 +880,6 @@ func (q *queue) process(ctx context.Context, p QueuePartition, qi QueueItem, s * var err error leaseID := qi.LeaseID - scope := q.scope.Tagged(map[string]string{ - "kind": qi.Data.Kind, - }) - // Allow the main runner to block until this work is done q.wg.Add(1) defer q.wg.Done() @@ -1010,14 +971,19 @@ func (q *queue) process(ctx context.Context, p QueuePartition, qi QueueItem, s * // Update the ewma latencySem.Lock() latencyAvg.Add(float64(latency)) - scope.Gauge(gaugeQueueItemLatencyEWMA).Update(latencyAvg.Value() / 1e6) + telemetry.GaugeQueueItemLatencyEWMA(ctx, int64(latencyAvg.Value()/1e6), telemetry.GaugeOpt{ + PkgName: pkgName, + Tags: map[string]any{"kind": qi.Data.Kind}, + }) latencySem.Unlock() // Set the metrics historgram and gauge, which reports the ewma value. - scope.Histogram(histogramItemLatency, latencyBuckets).RecordDuration(latency) + telemetry.HistogramQueueItemLatency(ctx, latency.Milliseconds(), telemetry.HistogramOpt{ + PkgName: pkgName, + }) }() - go scope.Counter(counterQueueItemsStarted).Inc(1) + telemetry.IncrQueueItemStartedCounter(ctx, telemetry.CounterOpt{PkgName: pkgName}) runInfo := osqueue.RunInfo{ Latency: latency, @@ -1033,11 +999,11 @@ func (q *queue) process(ctx context.Context, p QueuePartition, qi QueueItem, s * err := f(jobCtx, runInfo, qi.Data) extendLeaseTick.Stop() if err != nil { - go scope.Counter(counterQueueItemsErrored).Inc(1) + telemetry.IncrQueueItemErroredCounter(ctx, telemetry.CounterOpt{PkgName: pkgName}) errCh <- err return } - go scope.Counter(counterQueueItemsComplete).Inc(1) + telemetry.IncrQueueItemCompletedCounter(ctx, telemetry.CounterOpt{PkgName: pkgName}) // Closing this channel prevents the goroutine which extends lease from leaking, // and dequeues the job @@ -1132,11 +1098,17 @@ func (q *queue) capacity() int64 { // peekSize returns the total number of available workers which can consume individual // queue items. func (q *queue) peekSize() int64 { - f := q.capacity() - if f > QueuePeekMax { - return QueuePeekMax + size := q.peek + if size == 0 { + size = QueuePeekMax + } + + cap := q.capacity() + if size > cap { + size = cap } - return f + + return size } func (q *queue) isSequential() bool { @@ -1157,39 +1129,22 @@ func (q *queue) isScavenger() bool { func (q *queue) queueGauges(ctx context.Context) { // Report gauges to otel. - _, _ = q.meter.Int64ObservableGauge( - "inngest_queue_capacity_total", - metric.WithDescription("Capacity of current worker"), - metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { - o.Observe(q.capacity()) - return nil - }), - ) - _, _ = q.meter.Int64ObservableGauge( - "inngest_queue_global_partition_total_count", - metric.WithDescription("Number of total partitions in the global queue"), - metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { - cnt, err := q.partitionSize(ctx, q.kg.GlobalPartitionIndex(), getNow().Add(time.Hour*24*365)) - if err != nil { - q.logger.Error().Err(err).Msg("error getting global partition total for gauge") - } - o.Observe(cnt) - return nil - }), - ) - // Report gauges to otel. - _, _ = q.meter.Int64ObservableGauge( - "inngest_queue_global_partition_available_count", - metric.WithDescription("Number of available partitions in the global queue"), - metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { - cnt, err := q.partitionSize(ctx, q.kg.GlobalPartitionIndex(), getNow().Add(PartitionLookahead)) - if err != nil { - q.logger.Error().Err(err).Msg("error getting global partition available for gauge") - } - o.Observe(cnt) - return nil - }), - ) + telemetry.GaugeWorkerQueueCapacity(ctx, q.capacity(), telemetry.GaugeOpt{PkgName: pkgName}) + + telemetry.GaugeGlobalQueuePartitionCount(ctx, telemetry.GaugeOpt{ + PkgName: pkgName, + Observer: func(ctx context.Context) (int64, error) { + dur := time.Hour * 24 * 365 + return q.partitionSize(ctx, q.kg.GlobalPartitionIndex(), getNow().Add(dur)) + }, + }) + + telemetry.GaugeGlobalQueuePartitionAvailable(ctx, telemetry.GaugeOpt{ + PkgName: pkgName, + Observer: func(ctx context.Context) (int64, error) { + return q.partitionSize(ctx, q.kg.GlobalPartitionIndex(), getNow().Add(PartitionLookahead)) + }, + }) } // shardGauges reports shard gauges via otel. @@ -1208,64 +1163,28 @@ func (q *queue) shardGauges(ctx context.Context) { case <-tick.C: // Reload shards. shards, err = q.getShards(ctx) + if err != nil { + q.logger.Error().Err(err).Msg("error retrieving shards") + } } } }() // Report gauges to otel. - _, _ = q.meter.Int64ObservableGauge( - "inngest_queue_shards_count", - metric.WithDescription("Number of shards in the queue"), - metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { - o.Observe(int64(len(shards))) - return err - }), - ) - _, _ = q.meter.Int64ObservableGauge( - "inngest_queue_shards_guaranteed_capacity_count", - metric.WithDescription("Shard guaranteed capacity, by shard name"), - metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { - for _, shard := range shards { - o.Observe(int64(shard.GuaranteedCapacity), metric.WithAttributes( - attribute.KeyValue{Key: attribute.Key("shard_name"), Value: attribute.StringValue(shard.Name)}, - )) - } - return err - }), - ) - _, _ = q.meter.Int64ObservableGauge( - "inngest_queue_shards_lease_count", - metric.WithDescription("Shard current lease count, by shard name"), - metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { - for _, shard := range shards { - o.Observe(int64(len(shard.Leases)), metric.WithAttributes( - attribute.KeyValue{Key: attribute.Key("shard_name"), Value: attribute.StringValue(shard.Name)}, - )) - } - return err - }), - ) - _, _ = q.meter.Int64ObservableGauge( - "inngest_queue_shard_partition_available_count", - metric.WithDescription("The number of avaialble partitions by shard"), - metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { - for _, shard := range shards { - cnt, err := q.partitionSize(ctx, q.kg.ShardPartitionIndex(shard.Name), getNow().Add(PartitionLookahead)) - if err != nil { - q.logger.Error().Err(err).Msg("error getting shard partition size for gauge") - } - o.Observe(cnt, metric.WithAttributes( - attribute.KeyValue{Key: attribute.Key("shard_name"), Value: attribute.StringValue(shard.Name)}, - )) - } - return nil - }), - ) -} + telemetry.GaugeQueueShardCount(ctx, int64(len(shards)), telemetry.GaugeOpt{PkgName: pkgName}) -func (q *queue) int64counter(ctx context.Context, name string, value int64, attrs ...attribute.KeyValue) { - if c, err := q.meter.Int64Counter(name); err == nil { - c.Add(ctx, value, metric.WithAttributes(attrs...)) + for _, shard := range shards { + tags := map[string]any{"shard_name": shard.Name} + + telemetry.GaugeQueueShardGuaranteedCapacityCount(ctx, int64(shard.GuaranteedCapacity), telemetry.GaugeOpt{PkgName: pkgName, Tags: tags}) + telemetry.GaugeQueueShardLeaseCount(ctx, int64(len(shard.Leases)), telemetry.GaugeOpt{PkgName: pkgName, Tags: tags}) + telemetry.GaugeQueueShardPartitionAvailableCount(ctx, telemetry.GaugeOpt{ + PkgName: pkgName, + Tags: tags, + Observer: func(ctx context.Context) (int64, error) { + return q.partitionSize(ctx, q.kg.ShardPartitionIndex(shard.Name), getNow().Add(PartitionLookahead)) + }, + }) } } diff --git a/pkg/telemetry/counter.go b/pkg/telemetry/counter.go new file mode 100644 index 000000000..afb8025a4 --- /dev/null +++ b/pkg/telemetry/counter.go @@ -0,0 +1,152 @@ +package telemetry + +import "context" + +type CounterOpt struct { + PkgName string + Tags map[string]any +} + +func IncrQueuePeekedCounter(ctx context.Context, incr int64, opts CounterOpt) { + recordCounterMetric(ctx, incr, counterOpt{ + Name: opts.PkgName, + MetricName: "queue_peeked_total", + Description: "The total number of queues peeked", + Attributes: opts.Tags, + }) +} + +func IncrQueuePartitionLeasedCounter(ctx context.Context, opts CounterOpt) { + recordCounterMetric(ctx, 1, counterOpt{ + Name: opts.PkgName, + MetricName: "queue_partition_lease_total", + Description: "The total number of queue partitions leased", + Attributes: opts.Tags, + }) +} + +func IncrQueueProcessNoCapacityCounter(ctx context.Context, opts CounterOpt) { + recordCounterMetric(ctx, 1, counterOpt{ + Name: opts.PkgName, + MetricName: "queue_process_no_capacity_total", + Description: "Total number of times the queue no longer has capacity to process items", + Attributes: opts.Tags, + }) +} + +func IncrQueuePartitionLeaseContentionCounter(ctx context.Context, opts CounterOpt) { + recordCounterMetric(ctx, 1, counterOpt{ + Name: opts.PkgName, + MetricName: "queue_partition_lease_contention_total", + Description: "The total number of times contention occurred for partition leasing", + Attributes: opts.Tags, + }) +} + +func IncrQueueItemLeaseContentionCounter(ctx context.Context, opts CounterOpt) { + recordCounterMetric(ctx, 1, counterOpt{ + Name: opts.PkgName, + MetricName: "queue_item_lease_contention_total", + Description: "The total number of times contention occurred for item leasing", + Attributes: opts.Tags, + }) +} + +func IncrQueuePartitionProcessNoCapacityCounter(ctx context.Context, opts CounterOpt) { + recordCounterMetric(ctx, 1, counterOpt{ + Name: opts.PkgName, + MetricName: "queue_partition_process_no_capacity_total", + Description: "The number of times the queue no longer has capacity to process partitions", + Attributes: opts.Tags, + }) +} + +func IncrQueueThrottledCounter(ctx context.Context, opts CounterOpt) { + recordCounterMetric(ctx, 1, counterOpt{ + Name: opts.PkgName, + MetricName: "queue_throttled_total", + Description: "The number of times the queue has been throttled", + Attributes: opts.Tags, + }) +} + +func IncrQueuePartitionConcurrencyLimitCounter(ctx context.Context, opts CounterOpt) { + recordCounterMetric(ctx, 1, counterOpt{ + Name: opts.PkgName, + MetricName: "queue_partition_concurrency_limit_total", + Description: "The total number of times the queue partition hits concurrency limits", + Attributes: opts.Tags, + }) +} + +func IncrQueueScanNoCapacityCounter(ctx context.Context, opts CounterOpt) { + recordCounterMetric(ctx, 1, counterOpt{ + Name: opts.PkgName, + MetricName: "queue_scan_no_capacity_total", + Description: "The total number of times the queue no longer have workers to scan", + Attributes: opts.Tags, + }) +} + +func IncrQueuePartitionProcessedCounter(ctx context.Context, opts CounterOpt) { + recordCounterMetric(ctx, 1, counterOpt{ + Name: opts.PkgName, + MetricName: "queue_partition_processed_total", + Description: "The total number of queue partitions processed", + Attributes: opts.Tags, + }) +} + +func IncrPartitionGoneCounter(ctx context.Context, opts CounterOpt) { + recordCounterMetric(ctx, 1, counterOpt{ + Name: opts.PkgName, + MetricName: "partition_gone_total", + Description: "The total number of times a worker didn't find a partition", + Attributes: opts.Tags, + }) +} + +func IncrQueueItemEnqueuedCounter(ctx context.Context, opts CounterOpt) { + recordCounterMetric(ctx, 1, counterOpt{ + Name: opts.PkgName, + MetricName: "queue_items_enqueued_total", + Description: "Total number of queue items enqueued", + Attributes: opts.Tags, + }) +} + +func IncrQueueItemStartedCounter(ctx context.Context, opts CounterOpt) { + recordCounterMetric(ctx, 1, counterOpt{ + Name: opts.PkgName, + MetricName: "queue_items_started_total", + Description: "Total number of queue items started", + Attributes: opts.Tags, + }) +} + +func IncrQueueItemErroredCounter(ctx context.Context, opts CounterOpt) { + recordCounterMetric(ctx, 1, counterOpt{ + Name: opts.PkgName, + MetricName: "queue_items_errored_total", + Description: "Total number of queue items errored", + Attributes: opts.Tags, + }) +} + +func IncrQueueItemCompletedCounter(ctx context.Context, opts CounterOpt) { + recordCounterMetric(ctx, 1, counterOpt{ + Name: opts.PkgName, + MetricName: "queue_items_completed_total", + Description: "Total number of queue items completed", + Attributes: opts.Tags, + }) +} + +func IncrQueueSequentialLeaseClaimsCounter(ctx context.Context, opts CounterOpt) { + recordCounterMetric(ctx, 1, counterOpt{ + Name: opts.PkgName, + MetricName: "queue_sequential_lease_claims_total", + Description: "Total number of sequential lease claimed by worker", + Attributes: opts.Tags, + }) +} diff --git a/pkg/telemetry/gauge.go b/pkg/telemetry/gauge.go new file mode 100644 index 000000000..9742f4a5f --- /dev/null +++ b/pkg/telemetry/gauge.go @@ -0,0 +1,99 @@ +package telemetry + +import "context" + +type GaugeOpt struct { + PkgName string + Tags map[string]any + Observer GaugeCallback +} + +func GaugeQueueItemLatencyEWMA(ctx context.Context, value int64, opts GaugeOpt) { + recordGaugeMetric(ctx, gaugeOpt{ + Name: opts.PkgName, + MetricName: "queue_item_latency_ewma", + Description: "The moving average of the queue item latency", + Attributes: opts.Tags, + Callback: func(ctx context.Context) (int64, error) { + return value, nil + }, + }) +} + +func GaugeWorkerQueueCapacity(ctx context.Context, value int64, opts GaugeOpt) { + recordGaugeMetric(ctx, gaugeOpt{ + Name: opts.PkgName, + MetricName: "queue_capacity_total", + Description: "Capacity of current worker", + Attributes: opts.Tags, + Callback: func(ctx context.Context) (int64, error) { + return value, nil + }, + }) +} + +func GaugeGlobalQueuePartitionCount(ctx context.Context, opts GaugeOpt) { + recordGaugeMetric(ctx, gaugeOpt{ + Name: opts.PkgName, + MetricName: "queue_global_partition_total_count", + Description: "Number of total partitions in the global queue", + Attributes: opts.Tags, + Callback: opts.Observer, + }) +} + +func GaugeGlobalQueuePartitionAvailable(ctx context.Context, opts GaugeOpt) { + recordGaugeMetric(ctx, gaugeOpt{ + Name: opts.PkgName, + MetricName: "queue_global_partition_available_count", + Description: "Number of available partitions in the global queue", + Attributes: opts.Tags, + Callback: opts.Observer, + }) +} + +func GaugeQueueShardCount(ctx context.Context, value int64, opts GaugeOpt) { + recordGaugeMetric(ctx, gaugeOpt{ + Name: opts.PkgName, + MetricName: "queue_shards_count", + Description: "Number of shards in the queue", + Attributes: opts.Tags, + Callback: func(ctx context.Context) (int64, error) { + return value, nil + }, + }) +} + +func GaugeQueueShardGuaranteedCapacityCount(ctx context.Context, value int64, opts GaugeOpt) { + recordGaugeMetric(ctx, gaugeOpt{ + Name: opts.PkgName, + MetricName: "queue_shards_guaranteed_capacity_count", + Description: "Shard guaranteed capacity", + Attributes: opts.Tags, + Callback: func(ctx context.Context) (int64, error) { + return value, nil + }, + }) +} + +func GaugeQueueShardLeaseCount(ctx context.Context, value int64, opts GaugeOpt) { + recordGaugeMetric(ctx, gaugeOpt{ + Name: opts.PkgName, + MetricName: "queue_shards_lease_count", + Description: "Shard current lease count", + Attributes: opts.Tags, + Callback: func(ctx context.Context) (int64, error) { + return value, nil + }, + }) +} + +func GaugeQueueShardPartitionAvailableCount(ctx context.Context, opts GaugeOpt) { + recordGaugeMetric(ctx, gaugeOpt{ + Name: opts.PkgName, + MetricName: "queue_shard_partition_available_count", + Description: "The number of shard partitions available", + Attributes: opts.Tags, + Callback: opts.Observer, + }) +} diff --git a/pkg/telemetry/histogram.go b/pkg/telemetry/histogram.go new file mode 100644 index 000000000..45aadc8ec --- /dev/null +++ b/pkg/telemetry/histogram.go @@ -0,0 +1,30 @@ +package telemetry + +import "context" + +var ( + // in milliseconds + // defaultBoundaries = []float64{10, 50, 100, 200, 500, 1000, 2000, 5000, 10000} + queueItemLatencyBoundaries = []float64{ + 5, 10, 50, 100, 200, 500, // < 1s + 1000, 2000, 5000, 30_000, // < 1m + 60_000, 300_000, // < 10m + 600_000, 1_800_000, // < 1h + } +) + +type HistogramOpt struct { + PkgName string + Tags map[string]any +} + +func HistogramQueueItemLatency(ctx context.Context, value int64, opts HistogramOpt) { + recordIntHistogramMetric(ctx, value, histogramOpt{ + Name: opts.PkgName, + MetricName: "queue_item_latency_duration", + Description: "Distribution of queue item latency", + Attributes: opts.Tags, + Unit: "ms", + Boundaries: queueItemLatencyBoundaries, + }) +} diff --git a/pkg/telemetry/utils.go b/pkg/telemetry/utils.go index 4b1eed922..d97c32cca 100644 --- a/pkg/telemetry/utils.go +++ b/pkg/telemetry/utils.go @@ -20,7 +20,7 @@ func env() string { return val } -type CounterOpt struct { +type counterOpt struct { Name string Description string Meter metric.Meter @@ -31,7 +31,7 @@ type CounterOpt struct { // RecordCounterMetric increments the counter by the provided value. // The meter used can either be passed in or is the global meter -func RecordCounterMetric(ctx context.Context, incr int64, opts CounterOpt) { +func recordCounterMetric(ctx context.Context, incr int64, opts counterOpt) { attrs := []attribute.KeyValue{} if opts.Attributes != nil { attrs = append(attrs, parseAttributes(opts.Attributes)...) @@ -50,14 +50,14 @@ func RecordCounterMetric(ctx context.Context, incr int64, opts CounterOpt) { metric.WithUnit(opts.Unit), ) if err != nil { - log.From(ctx).Error().Err(err).Msg(fmt.Sprintf("error for meter: %s", opts.MetricName)) + log.From(ctx).Error().Err(err).Str("metric", opts.MetricName).Msg("error recording counter metric") return } c.Add(ctx, incr, metric.WithAttributes(attrs...)) } -type GaugeOpt struct { +type gaugeOpt struct { Name string Description string MetricName string @@ -71,7 +71,7 @@ type GaugeCallback func(ctx context.Context) (int64, error) // RecordGaugeMetric records the gauge value via a callback. // The callback needs to be passed in so it doesn't get captured as a closure when instrumenting the value -func RecordGaugeMetric(ctx context.Context, opts GaugeOpt) { +func recordGaugeMetric(ctx context.Context, opts gaugeOpt) { // use the global one by default meter := otel.Meter(opts.Name) if opts.Meter != nil { @@ -100,12 +100,12 @@ func RecordGaugeMetric(ctx context.Context, opts GaugeOpt) { metric.WithUnit(opts.Unit), metric.WithInt64Callback(observe), ); err != nil { - log.From(ctx).Error().Err(err).Msg(fmt.Sprintf("error for meter: %s", opts.MetricName)) + log.From(ctx).Error().Err(err).Str("metric", opts.MetricName).Msg("error recording gauge metric") return } } -type HistogramOpt struct { +type histogramOpt struct { Name string Description string Meter metric.Meter @@ -117,7 +117,7 @@ type HistogramOpt struct { // RecordIntHistogramMetric records the observed value for distributions. // Bucket can be provided -func RecordIntHistogramMetric(ctx context.Context, value int64, opts HistogramOpt) { +func recordIntHistogramMetric(ctx context.Context, value int64, opts histogramOpt) { // use the global one by default meter := otel.Meter(opts.Name) if opts.Meter != nil { @@ -133,7 +133,7 @@ func RecordIntHistogramMetric(ctx context.Context, value int64, opts HistogramOp ) if err != nil { - log.From(ctx).Err(err).Msg(fmt.Sprintf("error for meter: %s", opts.MetricName)) + log.From(ctx).Err(err).Str("metric", opts.MetricName).Msg("error recording histogram metric") return }