Skip to content

Commit

Permalink
Add queue peek size control and additional metrics (#1327)
Browse files Browse the repository at this point in the history
Add attribute to control the queue peek size.

Also add additional metrics for instrumenting queue performance, and reorganize some of the existing counters into `telemetry`.
`scope` and `meter` has been removed from the queue attributes. these should be accessible globally without being embedded.

---------
Co-authored-by: Darwin D Wu <darwin67@users.noreply.github.com>
  • Loading branch information
darwin67 committed May 8, 2024
1 parent 4be9b44 commit 9b93885
Show file tree
Hide file tree
Showing 6 changed files with 382 additions and 194 deletions.
50 changes: 19 additions & 31 deletions pkg/execution/state/redis_state/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ import (
"github.com/oklog/ulid/v2"
"github.com/redis/rueidis"
"github.com/rs/zerolog"
"github.com/uber-go/tally/v4"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"gonum.org/v1/gonum/stat/sampleuv"
"lukechampine.com/frand"
)
Expand All @@ -37,6 +34,7 @@ var (
)

const (
pkgName = "redis_state.state.execution.inngest"

// PartitionLeaseDuration dictates how long a worker holds the lease for
// a partition. This gives the worker a right to scan all queue items
Expand All @@ -62,7 +60,7 @@ const (
PartitionLookahead = time.Second

QueuePeekMax int64 = 1000
QueuePeekDefault int64 = 200
QueuePeekDefault int64 = 250
QueueLeaseDuration = 10 * time.Second
ConfigLeaseDuration = 10 * time.Second
ConfigLeaseMax = 20 * time.Second
Expand Down Expand Up @@ -154,71 +152,65 @@ func WithName(name string) func(q *queue) {
}
}

func WithQueueLifecycles(l ...QueueLifecycleListener) func(q *queue) {
func WithQueueLifecycles(l ...QueueLifecycleListener) QueueOpt {
return func(q *queue) {
q.lifecycles = l
}
}

func WithMetricsScope(scope tally.Scope) func(q *queue) {
return func(q *queue) {
q.scope = scope
}
}

func WithPriorityFinder(pf PriorityFinder) func(q *queue) {
func WithPriorityFinder(pf PriorityFinder) QueueOpt {
return func(q *queue) {
q.pf = pf
}
}

func WithShardFinder(sf ShardFinder) func(q *queue) {
func WithShardFinder(sf ShardFinder) QueueOpt {
return func(q *queue) {
q.sf = sf
}
}

func WithQueueKeyGenerator(kg QueueKeyGenerator) func(q *queue) {
func WithQueueKeyGenerator(kg QueueKeyGenerator) QueueOpt {
return func(q *queue) {
q.kg = kg
}
}

func WithIdempotencyTTL(t time.Duration) func(q *queue) {
func WithIdempotencyTTL(t time.Duration) QueueOpt {
return func(q *queue) {
q.idempotencyTTL = t
}
}

func WithOtelMeter(m metric.Meter) func(q *queue) {
return func(q *queue) {
q.meter = m
}
}

// WithIdempotencyTTLFunc returns custom idempotecy durations given a QueueItem.
// This allows customization of the idempotency TTL based off of specific jobs.
func WithIdempotencyTTLFunc(f func(context.Context, QueueItem) time.Duration) func(q *queue) {
func WithIdempotencyTTLFunc(f func(context.Context, QueueItem) time.Duration) QueueOpt {
return func(q *queue) {
q.idempotencyTTLFunc = f
}
}

func WithNumWorkers(n int32) func(q *queue) {
func WithNumWorkers(n int32) QueueOpt {
return func(q *queue) {
q.numWorkers = n
}
}

func WithPeekSize(n int64) QueueOpt {
return func(q *queue) {
q.peek = n
}
}

// WithPollTick specifies the interval at which the queue will poll the backing store
// for available partitions.
func WithPollTick(t time.Duration) func(q *queue) {
func WithPollTick(t time.Duration) QueueOpt {
return func(q *queue) {
q.pollTick = t
}
}

func WithQueueItemIndexer(i QueueItemIndexer) func(q *queue) {
func WithQueueItemIndexer(i QueueItemIndexer) QueueOpt {
return func(q *queue) {
q.itemIndexer = i
}
Expand Down Expand Up @@ -345,8 +337,6 @@ func NewQueue(r rueidis.Client, opts ...QueueOpt) *queue {
pollTick: defaultPollTick,
idempotencyTTL: defaultIdempotencyTTL,
queueKindMapping: make(map[string]string),
scope: tally.NoopScope,
meter: otel.Meter("redis_state.queue"),
logger: logger.From(context.Background()),
partitionConcurrencyGen: func(ctx context.Context, p QueuePartition) (string, int) {
return p.Queue(), 10_000
Expand Down Expand Up @@ -399,6 +389,8 @@ type queue struct {
wg *sync.WaitGroup
// numWorkers stores the number of workers available to concurrently process jobs.
numWorkers int32
// peek sets the number of items to check on queue peeks
peek int64
// workers is a buffered channel which allows scanners to send queue items
// to workers to be processed
workers chan processItem
Expand Down Expand Up @@ -444,10 +436,6 @@ type queue struct {
shardLeases []leasedShard
shardLeaseLock *sync.Mutex

// metrics allows reporting of metrics
scope tally.Scope
meter metric.Meter

// backoffFunc is the backoff function to use when retrying operations.
backoffFunc backoff.BackoffFunc
}
Expand Down

0 comments on commit 9b93885

Please sign in to comment.