From 400841fa4de8558489daf01a8f8a29223fd4d0d8 Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Wed, 13 May 2026 16:58:55 -0700 Subject: [PATCH 1/7] Per-job AllowPromotion opts out of ZADD GT guard Enqueue and PromoteJob have both used ZADD XX GT since #3 to preserve the deferral guarantee for jobs enqueued with deterministic IDs: once a schedule sits at time T in the future, a duplicate enqueue at T' < T must be a no-op, and PromoteJob must not demote a job whose score has been bumped to now + InvisibleSec by Dequeue. That is the right semantic for dedup-style jobs that may race their own re-enqueue. It is the wrong semantic for two cases that have grown up around the queue since: 1. Worker retry rescheduling. When a handler returns an error the retry middleware computes a backoff delay and calls Enqueue with score = now + delay. With GT, that score is rejected because the Dequeue invisibility mark (now + InvisibleSec, typically 60s) is greater. The configured Backoff is effectively dead for any value less than InvisibleSec; gated handlers re-run only on the InvisibleSec cadence regardless of how short the backoff is. 2. Subqueue PromoteOnAck. The subqueue middleware advances the next gated job after the prior handler Acks by calling PromoteJob on that job's ID. With GT, the score (still sitting at the InvisibleSec mark from its last dequeue/gated cycle) is also rejected and the next job continues to wait out its full invisibility window. Add a per-job AllowPromotion flag. Default false preserves today's GT semantics so dedup-deferral jobs are unaffected; setting true causes Enqueue to use plain ZADD XX so backoff can lower the score, and causes PromoteJob to use ZADD XX (without GT) so the next gated subqueue entry can be advanced. The flag rides in the job's msgpack storage so it survives the worker retry round-trip without callers having to track it across Enqueue/PromoteJob boundaries. The Enqueue Lua script splits the per-job arg list into two ZADD calls (one with gt, one without) so a mixed BulkEnqueue stays atomic. PromoteJob does an HGET to read the flag before issuing the ZADD; this adds one round-trip per promotion but keeps the API stable. --- job.go | 19 ++++++ redis_queue.go | 88 ++++++++++++++++++++------- redis_queue_test.go | 142 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 227 insertions(+), 22 deletions(-) diff --git a/job.go b/job.go index 329a293..7a075bb 100644 --- a/job.go +++ b/job.go @@ -30,6 +30,25 @@ type Job struct { Retries int64 `msgpack:"retries"` // If the job previously fails, LastError will be populated with error string. LastError string `msgpack:"last_error"` + + // AllowPromotion controls whether Enqueue and PromoteJob may lower this + // job's score. + // + // When false (default), both operations use ZADD XX GT semantics: once a + // job is scheduled at time T, a subsequent Enqueue with score T' < T is + // a no-op, and PromoteJob cannot reduce the score. This preserves the + // "deferral" guarantee relied on by deterministic-ID jobs that may be + // enqueued multiple times concurrently (a dedup pattern) and prevents + // PromoteJob from demoting a job whose score has been bumped to + // now + InvisibleSec by Dequeue. + // + // When true, both operations use ZADD XX (no GT). Backoff-based + // rescheduling from the worker retry path may then lower the score so + // the configured Backoff actually takes effect, and PromoteJob may + // advance a scheduled-but-pending job to now. The caller is responsible + // for ensuring this is safe — typically that the job has a unique ID + // and is not relied on for dedup-deferral. + AllowPromotion bool `msgpack:"allow_promotion,omitempty" json:",omitempty"` } // InvalidJobPayloadError wraps json or msgpack decoding error. diff --git a/redis_queue.go b/redis_queue.go index 0c58025..362dc3c 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -58,22 +58,41 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { local queue_id = ARGV[2] local queue_key = table.concat({ns, "queue", queue_id}, ":") - local zadd_args = {} - - for i = 3,table.getn(ARGV),3 do + -- Per-job AllowPromotion (passed alongside each job) selects between + -- two ZADD variants. Guarded entries use ZADD ... gt so a duplicate + -- enqueue cannot demote an already-deferred deterministic-ID job; + -- promoted entries use plain ZADD so worker retry backoff can lower + -- the score below the InvisibleSec mark set by Dequeue. + local guarded_args = {} + local promoted_args = {} + + for i = 3,table.getn(ARGV),4 do local at = tonumber(ARGV[i]) - local job_id = ARGV[i+1] - local jobm = ARGV[i+2] + local allow_promotion = ARGV[i+1] + local job_id = ARGV[i+2] + local jobm = ARGV[i+3] local job_key = table.concat({ns, "job", job_id}, ":") -- update job fields redis.call("hset", job_key, "msgpack", jobm) - -- enqueue - table.insert(zadd_args, at) - table.insert(zadd_args, job_key) + if allow_promotion == "1" then + table.insert(promoted_args, at) + table.insert(promoted_args, job_key) + else + table.insert(guarded_args, at) + table.insert(guarded_args, job_key) + end + end + + local added = 0 + if table.getn(guarded_args) > 0 then + added = added + tonumber(redis.call("zadd", queue_key, "gt", unpack(guarded_args))) end - return redis.call("zadd", queue_key, "gt", unpack(zadd_args)) + if table.getn(promoted_args) > 0 then + added = added + tonumber(redis.call("zadd", queue_key, unpack(promoted_args))) + end + return added `) dequeueScript := redis.NewScript(` @@ -209,7 +228,7 @@ func (q *redisQueue) bulkEnqueueSmallBatch(jobs []*Job, opt *EnqueueOptions) err if len(jobs) == 0 { return nil } - args := make([]interface{}, 2+3*len(jobs)) + args := make([]interface{}, 2+4*len(jobs)) args[0] = opt.Namespace args[1] = opt.QueueID for i, job := range jobs { @@ -217,9 +236,14 @@ func (q *redisQueue) bulkEnqueueSmallBatch(jobs []*Job, opt *EnqueueOptions) err if err != nil { return err } - args[2+3*i] = job.EnqueuedAt.Unix() - args[2+3*i+1] = job.ID - args[2+3*i+2] = jobm + args[2+4*i] = job.EnqueuedAt.Unix() + if job.AllowPromotion { + args[2+4*i+1] = "1" + } else { + args[2+4*i+1] = "0" + } + args[2+4*i+2] = job.ID + args[2+4*i+3] = jobm } return q.enqueueScript.Run(context.Background(), q.client, []string{opt.Namespace}, args...).Err() } @@ -360,20 +384,40 @@ func (q *redisQueue) PromoteJob(jobID string, opt *PromoteOptions) error { queueKey := opt.Namespace + ":queue:" + opt.QueueID jobKey := opt.Namespace + ":job:" + jobID - // ZADD with both XX and GT flags: - // - XX: Only update existing members (don't resurrect completed jobs) - // - GT: Only update if new score > current score (don't demote processing jobs) + // Look up the job's AllowPromotion flag so PromoteJob honors the same + // per-job semantic as Enqueue. With AllowPromotion=false (default), + // PromoteJob keeps the GT guard and is effectively a no-op for any + // job whose score sits in the future (either deferred via dedup or + // in-flight via Dequeue's InvisibleSec mark). With AllowPromotion=true, + // the caller has asserted that the job's calling pattern is safe to + // demote — the typical use case is a subqueue handler middleware that + // promotes the next gated job after the prior handler Acks. // - // Safety guarantees: - // 1. If job was completed and removed: XX prevents re-adding it - // 2. If job is being processed (score = now + invisibleSec): GT prevents demotion - // 3. If job is pending (score <= now): Both flags allow promotion + // If the job is no longer stored (already Ack'd or never enqueued), + // the XX flag below would prevent (re-)adding it anyway, so a missing + // hash is treated as a no-op rather than an error. + jobm, err := q.client.HGet(context.Background(), jobKey, "msgpack").Result() + if err != nil { + if errors.Is(err, redis.Nil) { + return nil + } + return err + } + var job Job + if err := unmarshal(strings.NewReader(jobm), &job); err != nil { + return err + } + + // XX always: do not resurrect a job whose hash exists but was removed + // from the queue ZSET (e.g. after an Ack race) — and never add a + // member that doesn't already exist. GT only when the job did not + // opt into promotion. return q.client.ZAddArgs( context.Background(), queueKey, redis.ZAddArgs{ - XX: true, // Only update existing - GT: true, // Only if new score is greater + XX: true, + GT: !job.AllowPromotion, Members: []redis.Z{{ Score: float64(time.Now().Unix()), Member: jobKey, diff --git a/redis_queue_test.go b/redis_queue_test.go index eb5d0ad..f7433c1 100644 --- a/redis_queue_test.go +++ b/redis_queue_test.go @@ -409,6 +409,148 @@ func TestRedisQueuePromoteJob(t *testing.T) { require.Equal(t, float64(0), exists) } +func TestRedisQueueEnqueueGuardDoesNotDemote(t *testing.T) { + // AllowPromotion defaults to false. A second Enqueue of the same job + // with an earlier score must be a no-op so deterministic-ID dedup + // jobs cannot have their deferred run time clobbered. + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client)) + q := NewRedisQueue(client) + + opts := &EnqueueOptions{Namespace: "{ns1}", QueueID: "q1"} + + job := NewJob() + deferredAt := time.Now().Add(time.Minute) + job.EnqueuedAt = deferredAt + require.NoError(t, q.Enqueue(job, opts)) + + queueKey := "{ns1}:queue:q1" + jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + scoreAfterFirst, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.EqualValues(t, deferredAt.Unix(), scoreAfterFirst) + + // Re-enqueue the same ID with an earlier score (the dedup pattern). + job.EnqueuedAt = time.Now() + require.NoError(t, q.Enqueue(job, opts)) + + scoreAfterSecond, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Equal(t, scoreAfterFirst, scoreAfterSecond, "GT must reject the earlier-score re-enqueue when AllowPromotion is false") +} + +func TestRedisQueueEnqueueAllowPromotionDemotes(t *testing.T) { + // AllowPromotion=true opts the job out of the GT guard so the worker + // retry path can lower the score from the Dequeue InvisibleSec mark + // back down to now + backoff. + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client)) + q := NewRedisQueue(client) + + opts := &EnqueueOptions{Namespace: "{ns1}", QueueID: "q1"} + + job := NewJob() + job.AllowPromotion = true + deferredAt := time.Now().Add(time.Minute) + job.EnqueuedAt = deferredAt + require.NoError(t, q.Enqueue(job, opts)) + + queueKey := "{ns1}:queue:q1" + jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + + // Re-enqueue the same ID with an earlier score. + earlierAt := time.Now() + job.EnqueuedAt = earlierAt + require.NoError(t, q.Enqueue(job, opts)) + + scoreAfter, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.EqualValues(t, earlierAt.Unix(), scoreAfter, "plain ZADD must lower the score when AllowPromotion is true") +} + +func TestRedisQueueBulkEnqueueMixedAllowPromotion(t *testing.T) { + // A single BulkEnqueue containing both guarded (AllowPromotion=false) + // and promotable (AllowPromotion=true) jobs must apply the right + // semantic to each, atomically. + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client)) + q := NewRedisQueue(client) + + opts := &EnqueueOptions{Namespace: "{ns1}", QueueID: "q1"} + + guarded := NewJob() + deferredAt := time.Now().Add(time.Minute) + guarded.EnqueuedAt = deferredAt + + promotable := NewJob() + promotable.AllowPromotion = true + promotable.EnqueuedAt = deferredAt + + require.NoError(t, q.BulkEnqueue([]*Job{guarded, promotable}, opts)) + + // Re-enqueue both with an earlier score. + earlierAt := time.Now() + guarded.EnqueuedAt = earlierAt + promotable.EnqueuedAt = earlierAt + require.NoError(t, q.BulkEnqueue([]*Job{guarded, promotable}, opts)) + + queueKey := "{ns1}:queue:q1" + + guardedScore, err := client.ZScore(context.Background(), queueKey, fmt.Sprintf("{ns1}:job:%s", guarded.ID)).Result() + require.NoError(t, err) + require.EqualValues(t, deferredAt.Unix(), guardedScore, "guarded job must retain its later score") + + promotableScore, err := client.ZScore(context.Background(), queueKey, fmt.Sprintf("{ns1}:job:%s", promotable.ID)).Result() + require.NoError(t, err) + require.EqualValues(t, earlierAt.Unix(), promotableScore, "promotable job must accept the earlier score") +} + +func TestRedisQueuePromoteJobAllowPromotionDemotes(t *testing.T) { + // PromoteJob on a job with AllowPromotion=true must be able to advance + // the score even when the job is currently sitting at the InvisibleSec + // mark from a prior Dequeue — that is precisely the state the subqueue + // PromoteOnAck path needs to advance. + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client)) + q := NewRedisQueue(client) + + opts := &EnqueueOptions{Namespace: "{ns1}", QueueID: "q1"} + + job := NewJob() + job.AllowPromotion = true + job.EnqueuedAt = time.Now() + require.NoError(t, q.Enqueue(job, opts)) + + dequeued, err := q.Dequeue(&DequeueOptions{ + Namespace: "{ns1}", + QueueID: "q1", + At: time.Now(), + InvisibleSec: 60, + }) + require.NoError(t, err) + require.Equal(t, job.ID, dequeued.ID) + + queueKey := "{ns1}:queue:q1" + jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + scoreAfterDequeue, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + + beforePromote := time.Now().Unix() + require.NoError(t, q.PromoteJob(job.ID, &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + })) + + scoreAfterPromote, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Less(t, int64(scoreAfterPromote), int64(scoreAfterDequeue), "PromoteJob must lower the score for AllowPromotion=true jobs") + require.GreaterOrEqual(t, int64(scoreAfterPromote), beforePromote) +} + func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) { client := redistest.NewClient() defer client.Close() From ca031403c36a6d1f81aed135f7851ae39f958199 Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Wed, 13 May 2026 17:20:12 -0700 Subject: [PATCH 2/7] Store allow-promotion separately from job payload --- job.go | 2 +- redis_queue.go | 77 ++++++++++++++++++++++++++++++++------------- redis_queue_test.go | 19 +++++++++-- 3 files changed, 72 insertions(+), 26 deletions(-) diff --git a/job.go b/job.go index 7a075bb..c6f7a25 100644 --- a/job.go +++ b/job.go @@ -48,7 +48,7 @@ type Job struct { // advance a scheduled-but-pending job to now. The caller is responsible // for ensuring this is safe — typically that the job has a unique ID // and is not relied on for dedup-deferral. - AllowPromotion bool `msgpack:"allow_promotion,omitempty" json:",omitempty"` + AllowPromotion bool `msgpack:"-" json:",omitempty"` } // InvalidJobPayloadError wraps json or msgpack decoding error. diff --git a/redis_queue.go b/redis_queue.go index 362dc3c..2166328 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -68,10 +68,15 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { for i = 3,table.getn(ARGV),4 do local at = tonumber(ARGV[i]) - local allow_promotion = ARGV[i+1] + local requested_allow_promotion = ARGV[i+1] local job_id = ARGV[i+2] local jobm = ARGV[i+3] local job_key = table.concat({ns, "job", job_id}, ":") + local allow_promotion = redis.call("hget", job_key, "allow_promotion") + if allow_promotion == false then + allow_promotion = requested_allow_promotion + redis.call("hset", job_key, "allow_promotion", allow_promotion) + end -- update job fields redis.call("hset", job_key, "msgpack", jobm) @@ -113,6 +118,7 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { for i, job_key in pairs(job_keys) do local jobm = redis.call("hget", job_key, "msgpack") + local allow_promotion = redis.call("hget", job_key, "allow_promotion") -- job is deleted unexpectedly if jobm == false then @@ -123,7 +129,7 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { table.insert(zadd_args, at + invis_sec) table.insert(zadd_args, job_key) end - table.insert(ret, jobm) + table.insert(ret, {jobm, allow_promotion}) end end if table.getn(zadd_args) > 0 then @@ -168,8 +174,13 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { local job_id = ARGV[i] local job_key = table.concat({ns, "job", job_id}, ":") local jobm = redis.call("hget", job_key, "msgpack") + local allow_promotion = redis.call("hget", job_key, "allow_promotion") - table.insert(ret, jobm) + if jobm == false then + table.insert(ret, false) + else + table.insert(ret, {jobm, allow_promotion}) + end end return ret `) @@ -289,12 +300,12 @@ func (q *redisQueue) bulkDequeueSmallBatch(count int64, opt *DequeueOptions) ([] jobm := res.([]interface{}) jobs := make([]*Job, len(jobm)) for i, iface := range jobm { - var job Job - err := unmarshal(strings.NewReader(iface.(string)), &job) + jobFields := iface.([]interface{}) + job, err := jobFromRedisFields(jobFields) if err != nil { return nil, err } - jobs[i] = &job + jobs[i] = job } return jobs, nil } @@ -362,15 +373,18 @@ func (q *redisQueue) bulkFindSmallBatch(jobIDs []string, opt *FindOptions) ([]*J jobm := res.([]interface{}) jobs := make([]*Job, len(jobm)) for i, iface := range jobm { - switch payload := iface.(type) { - case string: - var job Job - err := unmarshal(strings.NewReader(payload), &job) - if err != nil { - return nil, err - } - jobs[i] = &job + if iface == nil { + continue + } + jobFields, ok := iface.([]interface{}) + if !ok { + continue } + job, err := jobFromRedisFields(jobFields) + if err != nil { + return nil, err + } + jobs[i] = job } return jobs, nil } @@ -384,8 +398,9 @@ func (q *redisQueue) PromoteJob(jobID string, opt *PromoteOptions) error { queueKey := opt.Namespace + ":queue:" + opt.QueueID jobKey := opt.Namespace + ":job:" + jobID - // Look up the job's AllowPromotion flag so PromoteJob honors the same - // per-job semantic as Enqueue. With AllowPromotion=false (default), + // Look up the separately-stored AllowPromotion metadata so PromoteJob + // honors the same per-job semantic as Enqueue without depending on the + // mutable serialized job payload. With AllowPromotion=false (default), // PromoteJob keeps the GT guard and is effectively a no-op for any // job whose score sits in the future (either deferred via dedup or // in-flight via Dequeue's InvisibleSec mark). With AllowPromotion=true, @@ -396,17 +411,13 @@ func (q *redisQueue) PromoteJob(jobID string, opt *PromoteOptions) error { // If the job is no longer stored (already Ack'd or never enqueued), // the XX flag below would prevent (re-)adding it anyway, so a missing // hash is treated as a no-op rather than an error. - jobm, err := q.client.HGet(context.Background(), jobKey, "msgpack").Result() + allowPromotion, err := q.client.HGet(context.Background(), jobKey, "allow_promotion").Result() if err != nil { if errors.Is(err, redis.Nil) { return nil } return err } - var job Job - if err := unmarshal(strings.NewReader(jobm), &job); err != nil { - return err - } // XX always: do not resurrect a job whose hash exists but was removed // from the queue ZSET (e.g. after an Ack race) — and never add a @@ -417,7 +428,7 @@ func (q *redisQueue) PromoteJob(jobID string, opt *PromoteOptions) error { queueKey, redis.ZAddArgs{ XX: true, - GT: !job.AllowPromotion, + GT: !redisAllowPromotion(allowPromotion), Members: []redis.Z{{ Score: float64(time.Now().Unix()), Member: jobKey, @@ -426,6 +437,28 @@ func (q *redisQueue) PromoteJob(jobID string, opt *PromoteOptions) error { ).Err() } +func jobFromRedisFields(fields []interface{}) (*Job, error) { + var job Job + if err := unmarshal(strings.NewReader(fields[0].(string)), &job); err != nil { + return nil, err + } + if len(fields) > 1 { + job.AllowPromotion = redisAllowPromotion(fields[1]) + } + return &job, nil +} + +func redisAllowPromotion(v interface{}) bool { + switch value := v.(type) { + case string: + return value == "1" + case []byte: + return string(value) == "1" + default: + return false + } +} + func (q *redisQueue) GetQueueMetrics(opt *QueueMetricsOptions) (*QueueMetrics, error) { err := opt.Validate() if err != nil { diff --git a/redis_queue_test.go b/redis_queue_test.go index f7433c1..caab5da 100644 --- a/redis_queue_test.go +++ b/redis_queue_test.go @@ -38,7 +38,8 @@ func TestRedisQueueEnqueue(t *testing.T) { jobm, err := marshal(job) require.NoError(t, err) require.Equal(t, map[string]string{ - "msgpack": string(jobm), + "allow_promotion": "0", + "msgpack": string(jobm), }, h) jobs, err := q.BulkFind([]string{job.ID, "not-exist-id"}, &FindOptions{ @@ -153,7 +154,8 @@ func TestRedisQueueDequeue(t *testing.T) { jobm, err := marshal(job) require.NoError(t, err) require.Equal(t, map[string]string{ - "msgpack": string(jobm), + "allow_promotion": "0", + "msgpack": string(jobm), }, h) z, err = client.ZRangeByScoreWithScores( @@ -206,7 +208,8 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { jobm, err := marshal(job) require.NoError(t, err) require.Equal(t, map[string]string{ - "msgpack": string(jobm), + "allow_promotion": "0", + "msgpack": string(jobm), }, h) require.NoError(t, client.Del(context.Background(), jobKey).Err()) @@ -433,11 +436,21 @@ func TestRedisQueueEnqueueGuardDoesNotDemote(t *testing.T) { // Re-enqueue the same ID with an earlier score (the dedup pattern). job.EnqueuedAt = time.Now() + job.AllowPromotion = true require.NoError(t, q.Enqueue(job, opts)) scoreAfterSecond, err := client.ZScore(context.Background(), queueKey, jobKey).Result() require.NoError(t, err) require.Equal(t, scoreAfterFirst, scoreAfterSecond, "GT must reject the earlier-score re-enqueue when AllowPromotion is false") + + require.NoError(t, q.PromoteJob(job.ID, &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + })) + + scoreAfterPromote, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Equal(t, scoreAfterFirst, scoreAfterPromote, "no-op re-enqueue must not flip AllowPromotion") } func TestRedisQueueEnqueueAllowPromotionDemotes(t *testing.T) { From 477cf9cd937c9da6be54048354270f662239e63d Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Wed, 13 May 2026 17:31:46 -0700 Subject: [PATCH 3/7] Handle missing allow_promotion in PromoteJob --- redis_queue.go | 7 +++---- redis_queue_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/redis_queue.go b/redis_queue.go index 2166328..9488145 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -412,10 +412,9 @@ func (q *redisQueue) PromoteJob(jobID string, opt *PromoteOptions) error { // the XX flag below would prevent (re-)adding it anyway, so a missing // hash is treated as a no-op rather than an error. allowPromotion, err := q.client.HGet(context.Background(), jobKey, "allow_promotion").Result() - if err != nil { - if errors.Is(err, redis.Nil) { - return nil - } + if errors.Is(err, redis.Nil) { + allowPromotion = "" + } else if err != nil { return err } diff --git a/redis_queue_test.go b/redis_queue_test.go index caab5da..9c5e5ba 100644 --- a/redis_queue_test.go +++ b/redis_queue_test.go @@ -611,3 +611,45 @@ func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) { require.NoError(t, err) require.Equal(t, scoreAfterDequeue, scoreAfterPromote) } + +func TestRedisQueuePromoteJobMissingAllowPromotionDoesNotDemote(t *testing.T) { + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client)) + q := NewRedisQueue(client) + + job := NewJob() + job.EnqueuedAt = time.Now() + + opts := &EnqueueOptions{ + Namespace: "{ns1}", + QueueID: "q1", + } + + require.NoError(t, q.Enqueue(job, opts)) + + jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + require.NoError(t, client.HDel(context.Background(), jobKey, "allow_promotion").Err()) + + dequeuedJob, err := q.Dequeue(&DequeueOptions{ + Namespace: "{ns1}", + QueueID: "q1", + At: time.Now(), + InvisibleSec: 60, + }) + require.NoError(t, err) + require.Equal(t, job.ID, dequeuedJob.ID) + + queueKey := "{ns1}:queue:q1" + scoreAfterDequeue, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + + require.NoError(t, q.PromoteJob(job.ID, &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + })) + + scoreAfterPromote, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Equal(t, scoreAfterDequeue, scoreAfterPromote) +} From 528749969cf586292f0434bdf6cee7881f744c6b Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Wed, 13 May 2026 17:41:13 -0700 Subject: [PATCH 4/7] Update comments to make intent clear --- job.go | 12 ++++++------ redis_queue.go | 4 ++-- redis_queue_test.go | 6 +++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/job.go b/job.go index c6f7a25..be03a09 100644 --- a/job.go +++ b/job.go @@ -42,12 +42,12 @@ type Job struct { // PromoteJob from demoting a job whose score has been bumped to // now + InvisibleSec by Dequeue. // - // When true, both operations use ZADD XX (no GT). Backoff-based - // rescheduling from the worker retry path may then lower the score so - // the configured Backoff actually takes effect, and PromoteJob may - // advance a scheduled-but-pending job to now. The caller is responsible - // for ensuring this is safe — typically that the job has a unique ID - // and is not relied on for dedup-deferral. + // When true, both operations use ZADD XX (no GT). This lets an + // explicit caller lower the score, for example in an opt-in retry flow + // where backoff should override Dequeue's InvisibleSec mark, and lets + // PromoteJob advance a scheduled-but-pending job to now. The caller is + // responsible for ensuring this is safe — typically that the job has a + // unique ID and is not relied on for dedup-deferral. AllowPromotion bool `msgpack:"-" json:",omitempty"` } diff --git a/redis_queue.go b/redis_queue.go index 9488145..600ffab 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -61,8 +61,8 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { -- Per-job AllowPromotion (passed alongside each job) selects between -- two ZADD variants. Guarded entries use ZADD ... gt so a duplicate -- enqueue cannot demote an already-deferred deterministic-ID job; - -- promoted entries use plain ZADD so worker retry backoff can lower - -- the score below the InvisibleSec mark set by Dequeue. + -- promoted entries use plain ZADD so an explicit opt-in caller can + -- lower the score below the InvisibleSec mark set by Dequeue. local guarded_args = {} local promoted_args = {} diff --git a/redis_queue_test.go b/redis_queue_test.go index 9c5e5ba..af28510 100644 --- a/redis_queue_test.go +++ b/redis_queue_test.go @@ -454,9 +454,9 @@ func TestRedisQueueEnqueueGuardDoesNotDemote(t *testing.T) { } func TestRedisQueueEnqueueAllowPromotionDemotes(t *testing.T) { - // AllowPromotion=true opts the job out of the GT guard so the worker - // retry path can lower the score from the Dequeue InvisibleSec mark - // back down to now + backoff. + // AllowPromotion=true opts the job out of the GT guard so an explicit + // opt-in retry flow can lower the score from the Dequeue InvisibleSec + // mark back down to now + backoff. client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) From 9028585d05a31a2af4a799f7df7223d9f8042e13 Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Wed, 13 May 2026 18:02:47 -0700 Subject: [PATCH 5/7] Simplify AllowPromotion read path and atomicize PromoteJob AllowPromotion is consulted only server-side (by Enqueue and PromoteJob), so rehydrating it onto jobs returned by Dequeue/BulkFind added a brittle fields[0].(string) assertion and a wider Lua-vs-Go contract for no benefit. Drop the rehydration; the Lua scripts return jobm strings as before, and the Go side reverts to its prior simple form. PromoteJob now runs as a single Lua script (HGET allow_promotion + ZADD XX [GT]) instead of two round trips. This closes a narrow race where an Ack + re-enqueue between the HGET and ZADD could let a stale "AllowPromotion=true" read demote a freshly-enqueued job, and matches the atomic style of every other queue op. --- job.go | 6 +++ redis_queue.go | 139 ++++++++++++++++++++++--------------------------- 2 files changed, 67 insertions(+), 78 deletions(-) diff --git a/job.go b/job.go index be03a09..de8cc17 100644 --- a/job.go +++ b/job.go @@ -48,6 +48,12 @@ type Job struct { // PromoteJob advance a scheduled-but-pending job to now. The caller is // responsible for ensuring this is safe — typically that the job has a // unique ID and is not relied on for dedup-deferral. + // + // This field is write-only at the Go API: it is persisted to Redis on + // Enqueue and consulted server-side by Enqueue and PromoteJob, but it + // is NOT rehydrated onto jobs returned by Dequeue or BulkFind — those + // always observe the zero value. PromoteJob reads the persisted value + // directly from Redis, so callers do not need to round-trip it. AllowPromotion bool `msgpack:"-" json:",omitempty"` } diff --git a/redis_queue.go b/redis_queue.go index 600ffab..d508e56 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -30,6 +30,7 @@ type redisQueue struct { dequeueScript *redis.Script ackScript *redis.Script findScript *redis.Script + promoteScript *redis.Script metricScript *redis.Script } @@ -118,7 +119,6 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { for i, job_key in pairs(job_keys) do local jobm = redis.call("hget", job_key, "msgpack") - local allow_promotion = redis.call("hget", job_key, "allow_promotion") -- job is deleted unexpectedly if jobm == false then @@ -129,7 +129,7 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { table.insert(zadd_args, at + invis_sec) table.insert(zadd_args, job_key) end - table.insert(ret, {jobm, allow_promotion}) + table.insert(ret, jobm) end end if table.getn(zadd_args) > 0 then @@ -174,17 +174,36 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { local job_id = ARGV[i] local job_key = table.concat({ns, "job", job_id}, ":") local jobm = redis.call("hget", job_key, "msgpack") - local allow_promotion = redis.call("hget", job_key, "allow_promotion") - if jobm == false then - table.insert(ret, false) - else - table.insert(ret, {jobm, allow_promotion}) - end + table.insert(ret, jobm) end return ret `) + // PromoteJob is read-then-write (HGET allow_promotion, then ZADD) + // done atomically in a single script so an Ack + re-enqueue between + // the two operations cannot let a stale "AllowPromotion=true" read + // demote a freshly-enqueued job whose owner did not opt in. + promoteScript := redis.NewScript(` + local ns = ARGV[1] + local queue_id = ARGV[2] + local job_id = ARGV[3] + local at = ARGV[4] + local queue_key = table.concat({ns, "queue", queue_id}, ":") + local job_key = table.concat({ns, "job", job_id}, ":") + + -- XX always: never (re-)add a job whose hash is gone (Ack'd or never + -- enqueued). GT only when the job did not opt into promotion — without + -- GT, an explicit caller can lower the score from Dequeue's + -- InvisibleSec mark back down to now. + local allow_promotion = redis.call("hget", job_key, "allow_promotion") + if allow_promotion == "1" then + return redis.call("zadd", queue_key, "XX", at, job_key) + else + return redis.call("zadd", queue_key, "XX", "GT", at, job_key) + end + `) + metricScript := redis.NewScript(` local ns = ARGV[1] local queue_id = ARGV[2] @@ -213,6 +232,7 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { dequeueScript: dequeueScript, ackScript: ackScript, findScript: findScript, + promoteScript: promoteScript, metricScript: metricScript, } } @@ -300,12 +320,12 @@ func (q *redisQueue) bulkDequeueSmallBatch(count int64, opt *DequeueOptions) ([] jobm := res.([]interface{}) jobs := make([]*Job, len(jobm)) for i, iface := range jobm { - jobFields := iface.([]interface{}) - job, err := jobFromRedisFields(jobFields) + var job Job + err := unmarshal(strings.NewReader(iface.(string)), &job) if err != nil { return nil, err } - jobs[i] = job + jobs[i] = &job } return jobs, nil } @@ -373,18 +393,15 @@ func (q *redisQueue) bulkFindSmallBatch(jobIDs []string, opt *FindOptions) ([]*J jobm := res.([]interface{}) jobs := make([]*Job, len(jobm)) for i, iface := range jobm { - if iface == nil { - continue - } - jobFields, ok := iface.([]interface{}) - if !ok { - continue - } - job, err := jobFromRedisFields(jobFields) - if err != nil { - return nil, err + switch payload := iface.(type) { + case string: + var job Job + err := unmarshal(strings.NewReader(payload), &job) + if err != nil { + return nil, err + } + jobs[i] = &job } - jobs[i] = job } return jobs, nil } @@ -395,67 +412,33 @@ func (q *redisQueue) PromoteJob(jobID string, opt *PromoteOptions) error { return err } - queueKey := opt.Namespace + ":queue:" + opt.QueueID - jobKey := opt.Namespace + ":job:" + jobID - - // Look up the separately-stored AllowPromotion metadata so PromoteJob - // honors the same per-job semantic as Enqueue without depending on the - // mutable serialized job payload. With AllowPromotion=false (default), - // PromoteJob keeps the GT guard and is effectively a no-op for any - // job whose score sits in the future (either deferred via dedup or - // in-flight via Dequeue's InvisibleSec mark). With AllowPromotion=true, - // the caller has asserted that the job's calling pattern is safe to - // demote — the typical use case is a subqueue handler middleware that - // promotes the next gated job after the prior handler Acks. + // promoteScript reads the separately-stored AllowPromotion metadata and + // performs the ZADD atomically. With AllowPromotion=false (default), the + // ZADD keeps the GT guard and is effectively a no-op for any job whose + // score sits in the future (either deferred via dedup or in-flight via + // Dequeue's InvisibleSec mark). With AllowPromotion=true, the caller + // has asserted that the job's calling pattern is safe to demote — the + // typical use case is a subqueue handler middleware that promotes the + // next gated job after the prior handler Acks. // // If the job is no longer stored (already Ack'd or never enqueued), - // the XX flag below would prevent (re-)adding it anyway, so a missing - // hash is treated as a no-op rather than an error. - allowPromotion, err := q.client.HGet(context.Background(), jobKey, "allow_promotion").Result() - if errors.Is(err, redis.Nil) { - allowPromotion = "" - } else if err != nil { - return err - } - - // XX always: do not resurrect a job whose hash exists but was removed - // from the queue ZSET (e.g. after an Ack race) — and never add a - // member that doesn't already exist. GT only when the job did not - // opt into promotion. - return q.client.ZAddArgs( + // the script's XX flag prevents (re-)adding it, so a missing hash is a + // no-op rather than an error. + err = q.promoteScript.Run( context.Background(), - queueKey, - redis.ZAddArgs{ - XX: true, - GT: !redisAllowPromotion(allowPromotion), - Members: []redis.Z{{ - Score: float64(time.Now().Unix()), - Member: jobKey, - }}, - }, + q.client, + []string{opt.Namespace}, + opt.Namespace, + opt.QueueID, + jobID, + time.Now().Unix(), ).Err() -} - -func jobFromRedisFields(fields []interface{}) (*Job, error) { - var job Job - if err := unmarshal(strings.NewReader(fields[0].(string)), &job); err != nil { - return nil, err - } - if len(fields) > 1 { - job.AllowPromotion = redisAllowPromotion(fields[1]) - } - return &job, nil -} - -func redisAllowPromotion(v interface{}) bool { - switch value := v.(type) { - case string: - return value == "1" - case []byte: - return string(value) == "1" - default: - return false + if errors.Is(err, redis.Nil) { + // ZADD XX with no member added returns nil; go-redis surfaces this + // as redis.Nil from the script. Treat it as a successful no-op. + return nil } + return err } func (q *redisQueue) GetQueueMetrics(opt *QueueMetricsOptions) (*QueueMetrics, error) { From 4256ddd2f3ba60ec68748612ac3722e3b0867956 Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Wed, 13 May 2026 18:07:28 -0700 Subject: [PATCH 6/7] Fix Redis bulk enqueue batch sizing --- redis_queue.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/redis_queue.go b/redis_queue.go index d508e56..bfb7245 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -10,8 +10,7 @@ import ( "github.com/redis/go-redis/v9" ) -func batchSlice(n int) [][]int { - const size = 1000 +func batchSliceWithSize(n int, size int) [][]int { var batches [][]int for i := 0; i < n; i += size { j := i + size @@ -23,6 +22,11 @@ func batchSlice(n int) [][]int { return batches } +func batchSlice(n int) [][]int { + const size = 1000 + return batchSliceWithSize(n, size) +} + type redisQueue struct { client redis.UniversalClient @@ -242,7 +246,11 @@ func (q *redisQueue) Enqueue(job *Job, opt *EnqueueOptions) error { } func (q *redisQueue) BulkEnqueue(jobs []*Job, opt *EnqueueOptions) error { - for _, batch := range batchSlice(len(jobs)) { + // Keep the total script ARGV count within the previous 3-args-per-job budget. + // bulkEnqueueSmallBatch now sends 4 values per job, so the maximum safe batch + // size is 1000 * 3 / 4 = 750 jobs. + const bulkEnqueueBatchSize = 750 + for _, batch := range batchSliceWithSize(len(jobs), bulkEnqueueBatchSize) { err := q.bulkEnqueueSmallBatch(jobs[batch[0]:batch[1]], opt) if err != nil { return err From 8bd73a906a7af4fbb00baba9aa7ad8ac76b4b2da Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Wed, 13 May 2026 18:26:35 -0700 Subject: [PATCH 7/7] Scope test Redis cleanup to per-package namespaces Reset previously called FlushAll, which wiped the entire Redis DB. Because go test ./... runs packages in parallel against a shared Redis instance, one package's Reset could erase another package's in-flight data mid-test, surfacing as flakes (most visibly the 100k-job bulk-enqueue tests). Reset now takes namespaces and scans+deletes only matching keys, and each test package uses a distinct namespace so parallel packages no longer collide. --- bench/worker_bench_test.go | 12 +- http/server_test.go | 52 +++---- middleware/concurrent/dequeuer_test.go | 16 +-- middleware/concurrent/local_dequeuer_test.go | 2 +- middleware/discard/after_test.go | 2 +- middleware/discard/invalid_payload_test.go | 2 +- middleware/discard/max_retry_test.go | 2 +- middleware/heartbeat/heartbeater_test.go | 6 +- middleware/logrus/logger_test.go | 4 +- middleware/prometheus/metrics_test.go | 8 +- middleware/recovery/catch_panic_test.go | 2 +- middleware/unique/enqueuer_test.go | 16 +-- redis_queue_test.go | 142 +++++++++---------- redistest/client.go | 58 +++++++- sidekiq/queue_test.go | 6 +- worker_test.go | 92 ++++++------ 16 files changed, 233 insertions(+), 189 deletions(-) diff --git a/bench/worker_bench_test.go b/bench/worker_bench_test.go index f6e2204..739361b 100644 --- a/bench/worker_bench_test.go +++ b/bench/worker_bench_test.go @@ -34,10 +34,10 @@ func BenchmarkWorkerRunJob(b *testing.B) { b.StopTimer() for n := 0; n < b.N; n++ { - require.NoError(b, redistest.Reset(client)) + require.NoError(b, redistest.Reset(client, "{ns-bench}")) wp := work.NewWorkerPoolWithOptions( - struct{}{}, 1, "{ns1}", pool, + struct{}{}, 1, "{ns-bench}", pool, work.WorkerPoolOptions{ SleepBackoffs: []int64{1000}, }, @@ -49,7 +49,7 @@ func BenchmarkWorkerRunJob(b *testing.B) { return nil }) - enqueuer := work.NewEnqueuer("{ns1}", pool) + enqueuer := work.NewEnqueuer("{ns-bench}", pool) for i := 0; i < k; i++ { _, err := enqueuer.Enqueue("test", nil) require.NoError(b, err) @@ -68,11 +68,11 @@ func BenchmarkWorkerRunJob(b *testing.B) { b.StopTimer() for n := 0; n < b.N; n++ { - require.NoError(b, redistest.Reset(client)) + require.NoError(b, redistest.Reset(client, "{ns-bench}")) queue := work2.NewRedisQueue(client) w := work2.NewWorker(&work2.WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-bench}", Queue: queue, }) var wg sync.WaitGroup @@ -93,7 +93,7 @@ func BenchmarkWorkerRunJob(b *testing.B) { job := work2.NewJob() err := queue.Enqueue(job, &work2.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-bench}", QueueID: "test", }) require.NoError(b, err) diff --git a/http/server_test.go b/http/server_test.go index 2069c8c..cfef907 100644 --- a/http/server_test.go +++ b/http/server_test.go @@ -16,7 +16,7 @@ import ( func TestServer(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-http}")) q := work.NewRedisQueue(client) srv := NewServer(&ServerOptions{ @@ -89,15 +89,15 @@ func TestServer(t *testing.T) { }, { reqMethod: "DELETE", - reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=xxx", + reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=xxx", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n", }, { reqMethod: "GET", - reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=xxx", + reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=xxx", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"status\":\"completed\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"status\":\"completed\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n", }, { // bad duration @@ -122,91 +122,91 @@ func TestServer(t *testing.T) { reqMethod: "POST", reqURL: "http://example.com/jobs", reqBody: `{ - "namespace": "{ns1}", + "namespace": "{ns-http}", "queue_id": "q1", "payload": "payload1", "delay": "10s" }`, respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { reqMethod: "GET", - reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":0,\"scheduled_total\":1,\"total\":1,\"latency\":0}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":0,\"scheduled_total\":1,\"total\":1,\"latency\":0}\n", }, { reqMethod: "POST", reqURL: "http://example.com/jobs", reqBody: `{ - "namespace": "{ns1}", + "namespace": "{ns-http}", "queue_id": "q1", "payload": "payload1" }`, respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { reqMethod: "GET", - reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n", }, { reqMethod: "POST", reqURL: "http://example.com/jobs", reqBody: `{ - "namespace": "{ns1}", + "namespace": "{ns-http}", "queue_id": "q1", "id": "id1", "payload": "payload1" }`, respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { // same job id reqMethod: "POST", reqURL: "http://example.com/jobs", reqBody: `{ - "namespace": "{ns1}", + "namespace": "{ns-http}", "queue_id": "q1", "id": "id1", "payload": "payload2" }`, respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { reqMethod: "GET", - reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":2,\"scheduled_total\":1,\"total\":3,\"latency\":[0-9]+}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":2,\"scheduled_total\":1,\"total\":3,\"latency\":[0-9]+}\n", }, { reqMethod: "GET", - reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=id1", + reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=id1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"status\":\"ready\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"status\":\"ready\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { reqMethod: "DELETE", - reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=id1", + reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=id1", respCode: 500, respBody: "{\"error\":\"work: empty queue id\"}\n", }, { reqMethod: "DELETE", - reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&queue_id=q1&job_id=id1", + reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&queue_id=q1&job_id=id1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { reqMethod: "GET", - reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n", }, } { var reqBody io.Reader diff --git a/middleware/concurrent/dequeuer_test.go b/middleware/concurrent/dequeuer_test.go index 805c7c3..e903e65 100644 --- a/middleware/concurrent/dequeuer_test.go +++ b/middleware/concurrent/dequeuer_test.go @@ -15,7 +15,7 @@ import ( func TestDequeuer(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-concurrent}")) var called int h1 := func(*work.DequeueOptions) (*work.Job, error) { @@ -38,7 +38,7 @@ func TestDequeuer(t *testing.T) { } opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-concurrent}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, @@ -75,7 +75,7 @@ func TestDequeuer(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:lock:q1", + "{ns-concurrent}:lock:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -87,7 +87,7 @@ func TestDequeuer(t *testing.T) { require.EqualValues(t, opt.At.Unix()+60, z[0].Score) require.EqualValues(t, opt.At.Unix()+60, z[1].Score) - require.NoError(t, client.ZRem(context.Background(), "{ns1}:lock:q1", "w1").Err()) + require.NoError(t, client.ZRem(context.Background(), "{ns-concurrent}:lock:q1", "w1").Err()) optLater := *opt optLater.At = opt.At.Add(10 * time.Second) // worker 0 is locked already @@ -105,7 +105,7 @@ func TestDequeuer(t *testing.T) { z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:lock:q1", + "{ns-concurrent}:lock:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -137,7 +137,7 @@ func TestDequeuer(t *testing.T) { z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:lock:q1", + "{ns-concurrent}:lock:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -155,7 +155,7 @@ func BenchmarkConcurrency(b *testing.B) { client := redistest.NewClient() defer client.Close() - require.NoError(b, redistest.Reset(client)) + require.NoError(b, redistest.Reset(client, "{ns-concurrent}")) var called int h1 := func(*work.DequeueOptions) (*work.Job, error) { @@ -178,7 +178,7 @@ func BenchmarkConcurrency(b *testing.B) { } opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-concurrent}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, diff --git a/middleware/concurrent/local_dequeuer_test.go b/middleware/concurrent/local_dequeuer_test.go index b3971a0..988db93 100644 --- a/middleware/concurrent/local_dequeuer_test.go +++ b/middleware/concurrent/local_dequeuer_test.go @@ -30,7 +30,7 @@ func TestLocalDequeuer(t *testing.T) { } opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-concurrent}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, diff --git a/middleware/discard/after_test.go b/middleware/discard/after_test.go index e07bf83..9f8105c 100644 --- a/middleware/discard/after_test.go +++ b/middleware/discard/after_test.go @@ -12,7 +12,7 @@ import ( func TestAfter(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-discard}", QueueID: "q1", } d := After(time.Minute) diff --git a/middleware/discard/invalid_payload_test.go b/middleware/discard/invalid_payload_test.go index 5ef2c8c..3f706ae 100644 --- a/middleware/discard/invalid_payload_test.go +++ b/middleware/discard/invalid_payload_test.go @@ -10,7 +10,7 @@ import ( func TestInvalidPayload(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-discard}", QueueID: "q1", } h := InvalidPayload(func(*work.Job, *work.DequeueOptions) error { diff --git a/middleware/discard/max_retry_test.go b/middleware/discard/max_retry_test.go index c8b15a7..b5257f4 100644 --- a/middleware/discard/max_retry_test.go +++ b/middleware/discard/max_retry_test.go @@ -11,7 +11,7 @@ import ( func TestMaxRetry(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-discard}", QueueID: "q1", } d := MaxRetry(1) diff --git a/middleware/heartbeat/heartbeater_test.go b/middleware/heartbeat/heartbeater_test.go index 98e1b56..23acc57 100644 --- a/middleware/heartbeat/heartbeater_test.go +++ b/middleware/heartbeat/heartbeater_test.go @@ -14,11 +14,11 @@ import ( func TestHeartbeater(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-heartbeat}")) job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-heartbeat}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, @@ -41,7 +41,7 @@ func TestHeartbeater(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-heartbeat}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", diff --git a/middleware/logrus/logger_test.go b/middleware/logrus/logger_test.go index effb308..b192e89 100644 --- a/middleware/logrus/logger_test.go +++ b/middleware/logrus/logger_test.go @@ -11,7 +11,7 @@ import ( func TestHandleFuncLogger(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-logrus}", QueueID: "q1", } h := HandleFuncLogger(func(*work.Job, *work.DequeueOptions) error { @@ -31,7 +31,7 @@ func TestHandleFuncLogger(t *testing.T) { func TestEnqueueFuncLogger(t *testing.T) { job := work.NewJob() opt := &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-logrus}", QueueID: "q1", } h := EnqueueFuncLogger(func(*work.Job, *work.EnqueueOptions) error { diff --git a/middleware/prometheus/metrics_test.go b/middleware/prometheus/metrics_test.go index f593e89..6921a57 100644 --- a/middleware/prometheus/metrics_test.go +++ b/middleware/prometheus/metrics_test.go @@ -20,7 +20,7 @@ func TestHandleFuncMetrics(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-prometheus}", QueueID: "q1", } h := HandleFuncMetrics(func(*work.Job, *work.DequeueOptions) error { @@ -58,7 +58,7 @@ func TestEnqueueFuncMetrics(t *testing.T) { job := work.NewJob() opt := &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-prometheus}", QueueID: "q1", } h := EnqueueFuncMetrics(func(*work.Job, *work.EnqueueOptions) error { @@ -93,10 +93,10 @@ func TestExportWorkerMetrics(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-prometheus}")) w := work.NewWorker(&work.WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-prometheus}", Queue: work.NewRedisQueue(client), }) err = w.Register("test", diff --git a/middleware/recovery/catch_panic_test.go b/middleware/recovery/catch_panic_test.go index 0f3e3f2..1b8eb16 100644 --- a/middleware/recovery/catch_panic_test.go +++ b/middleware/recovery/catch_panic_test.go @@ -10,7 +10,7 @@ import ( func TestCatchPanic(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-recovery}", QueueID: "q1", } h := CatchPanic(func(*work.Job, *work.DequeueOptions) error { diff --git a/middleware/unique/enqueuer_test.go b/middleware/unique/enqueuer_test.go index 3c217e7..2b52c0a 100644 --- a/middleware/unique/enqueuer_test.go +++ b/middleware/unique/enqueuer_test.go @@ -13,7 +13,7 @@ import ( func TestEnqueuerBypass(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-unique}")) enq := Enqueuer(&EnqueuerOptions{ Client: client, @@ -30,7 +30,7 @@ func TestEnqueuerBypass(t *testing.T) { for i := 0; i < 3; i++ { job := work.NewJob() err := h(job, &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-unique}", QueueID: "q1", }) require.NoError(t, err) @@ -41,7 +41,7 @@ func TestEnqueuerBypass(t *testing.T) { func TestEnqueuer(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-unique}")) enq := Enqueuer(&EnqueuerOptions{ Client: client, @@ -58,7 +58,7 @@ func TestEnqueuer(t *testing.T) { for i := 0; i < 3; i++ { job := work.NewJob() err := h(job, &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-unique}", QueueID: "q1", }) require.NoError(t, err) @@ -66,10 +66,10 @@ func TestEnqueuer(t *testing.T) { require.Equal(t, 1, called) for i := 0; i < 3; i++ { - require.NoError(t, client.Del(context.Background(), "{ns1}:unique:q1:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08").Err()) + require.NoError(t, client.Del(context.Background(), "{ns-unique}:unique:q1:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08").Err()) job := work.NewJob() err := h(job, &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-unique}", QueueID: "q1", }) require.NoError(t, err) @@ -82,7 +82,7 @@ func BenchmarkEnqueuer(b *testing.B) { client := redistest.NewClient() defer client.Close() - require.NoError(b, redistest.Reset(client)) + require.NoError(b, redistest.Reset(client, "{ns-unique}")) enq := Enqueuer(&EnqueuerOptions{ Client: client, @@ -101,7 +101,7 @@ func BenchmarkEnqueuer(b *testing.B) { for n := 0; n < b.N; n++ { job := work.NewJob() h(job, &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-unique}", QueueID: "q1", }) } diff --git a/redis_queue_test.go b/redis_queue_test.go index af28510..a4983ea 100644 --- a/redis_queue_test.go +++ b/redis_queue_test.go @@ -14,7 +14,7 @@ import ( func TestRedisQueueEnqueue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) type message struct { @@ -26,12 +26,12 @@ func TestRedisQueueEnqueue(t *testing.T) { require.NoError(t, err) err = q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) h, err := client.HGetAll(context.Background(), jobKey).Result() require.NoError(t, err) @@ -43,7 +43,7 @@ func TestRedisQueueEnqueue(t *testing.T) { }, h) jobs, err := q.BulkFind([]string{job.ID, "not-exist-id"}, &FindOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", }) require.NoError(t, err) require.Len(t, jobs, 2) @@ -53,12 +53,12 @@ func TestRedisQueueEnqueue(t *testing.T) { jobs[0].LastError = "hello world" err = q.Enqueue(jobs[0], &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) jobs, err = q.BulkFind([]string{job.ID}, &FindOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", }) require.NoError(t, err) require.Len(t, jobs, 1) @@ -67,7 +67,7 @@ func TestRedisQueueEnqueue(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -78,14 +78,14 @@ func TestRedisQueueEnqueue(t *testing.T) { require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score) err = q.Enqueue(job.Delay(time.Minute), &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -99,7 +99,7 @@ func TestRedisQueueEnqueue(t *testing.T) { func TestRedisQueueDequeue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) type message struct { @@ -109,10 +109,10 @@ func TestRedisQueueDequeue(t *testing.T) { job := NewJob() err := job.MarshalPayload(message{Text: "hello"}) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) err = q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) @@ -120,7 +120,7 @@ func TestRedisQueueDequeue(t *testing.T) { now := job.EnqueuedAt.Add(123 * time.Second) jobDequeued, err := q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: now, InvisibleSec: 0, @@ -130,7 +130,7 @@ func TestRedisQueueDequeue(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -141,7 +141,7 @@ func TestRedisQueueDequeue(t *testing.T) { require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score) jobDequeued, err = q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: now, InvisibleSec: 60, @@ -160,7 +160,7 @@ func TestRedisQueueDequeue(t *testing.T) { z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -172,7 +172,7 @@ func TestRedisQueueDequeue(t *testing.T) { // empty _, err = q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: now, InvisibleSec: 60, @@ -184,7 +184,7 @@ func TestRedisQueueDequeue(t *testing.T) { func TestRedisQueueDequeueDeletedJob(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) type message struct { @@ -196,12 +196,12 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { require.NoError(t, err) err = q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) h, err := client.HGetAll(context.Background(), jobKey).Result() require.NoError(t, err) @@ -215,7 +215,7 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { require.NoError(t, client.Del(context.Background(), jobKey).Err()) _, err = q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: job.EnqueuedAt, InvisibleSec: 60, @@ -224,7 +224,7 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -236,22 +236,22 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { func TestRedisQueueAck(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) job := NewJob() err := q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -266,14 +266,14 @@ func TestRedisQueueAck(t *testing.T) { require.EqualValues(t, 1, e) err = q.Ack(job, &AckOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -286,7 +286,7 @@ func TestRedisQueueAck(t *testing.T) { require.EqualValues(t, 0, e) err = q.Ack(job, &AckOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) @@ -295,36 +295,36 @@ func TestRedisQueueAck(t *testing.T) { func TestRedisQueueGetQueueMetrics(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) job := NewJob() err := q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) m, err := q.GetQueueMetrics(&QueueMetricsOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: job.EnqueuedAt, }) require.NoError(t, err) - require.Equal(t, "{ns1}", m.Namespace) + require.Equal(t, "{ns-work}", m.Namespace) require.Equal(t, "q1", m.QueueID) require.EqualValues(t, 1, m.ReadyTotal) require.EqualValues(t, 0, m.ScheduledTotal) require.True(t, 0 < m.Latency && m.Latency < time.Minute) m, err = q.GetQueueMetrics(&QueueMetricsOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: job.EnqueuedAt.Add(-time.Second), }) require.NoError(t, err) - require.Equal(t, "{ns1}", m.Namespace) + require.Equal(t, "{ns-work}", m.Namespace) require.Equal(t, "q1", m.QueueID) require.EqualValues(t, 0, m.ReadyTotal) require.EqualValues(t, 1, m.ScheduledTotal) @@ -334,7 +334,7 @@ func TestRedisQueueGetQueueMetrics(t *testing.T) { func TestRedisQueueBulkEnqueue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) const jobCount = 100000 @@ -345,12 +345,12 @@ func TestRedisQueueBulkEnqueue(t *testing.T) { } err := q.BulkEnqueue(jobs, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) - count, err := client.ZCard(context.Background(), "{ns1}:queue:q1").Result() + count, err := client.ZCard(context.Background(), "{ns-work}:queue:q1").Result() require.NoError(t, err) require.Equal(t, int64(jobCount), count) } @@ -358,7 +358,7 @@ func TestRedisQueueBulkEnqueue(t *testing.T) { func TestRedisQueuePromoteJob(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) // Enqueue two jobs with old timestamps (in the past) @@ -368,7 +368,7 @@ func TestRedisQueuePromoteJob(t *testing.T) { job2.EnqueuedAt = time.Now().Add(-time.Hour) // 1 hour ago opts := &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", } @@ -378,8 +378,8 @@ func TestRedisQueuePromoteJob(t *testing.T) { require.NoError(t, err) // Check initial score of job2 (should be old timestamp) - queueKey := "{ns1}:queue:q1" - jobKey := fmt.Sprintf("{ns1}:job:%s", job2.ID) + queueKey := "{ns-work}:queue:q1" + jobKey := fmt.Sprintf("{ns-work}:job:%s", job2.ID) initialScore, err := client.ZScore(context.Background(), queueKey, jobKey).Result() require.NoError(t, err) require.Equal(t, float64(job2.EnqueuedAt.Unix()), initialScore) @@ -407,7 +407,7 @@ func TestRedisQueuePromoteJob(t *testing.T) { require.NoError(t, err) // Verify non-existent job was not added to queue - exists, err := client.ZScore(context.Background(), queueKey, "{ns1}:job:non-existent-job").Result() + exists, err := client.ZScore(context.Background(), queueKey, "{ns-work}:job:non-existent-job").Result() require.Error(t, err) // redis.Nil error expected require.Equal(t, float64(0), exists) } @@ -418,18 +418,18 @@ func TestRedisQueueEnqueueGuardDoesNotDemote(t *testing.T) { // jobs cannot have their deferred run time clobbered. client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) - opts := &EnqueueOptions{Namespace: "{ns1}", QueueID: "q1"} + opts := &EnqueueOptions{Namespace: "{ns-work}", QueueID: "q1"} job := NewJob() deferredAt := time.Now().Add(time.Minute) job.EnqueuedAt = deferredAt require.NoError(t, q.Enqueue(job, opts)) - queueKey := "{ns1}:queue:q1" - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + queueKey := "{ns-work}:queue:q1" + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) scoreAfterFirst, err := client.ZScore(context.Background(), queueKey, jobKey).Result() require.NoError(t, err) require.EqualValues(t, deferredAt.Unix(), scoreAfterFirst) @@ -459,10 +459,10 @@ func TestRedisQueueEnqueueAllowPromotionDemotes(t *testing.T) { // mark back down to now + backoff. client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) - opts := &EnqueueOptions{Namespace: "{ns1}", QueueID: "q1"} + opts := &EnqueueOptions{Namespace: "{ns-work}", QueueID: "q1"} job := NewJob() job.AllowPromotion = true @@ -470,8 +470,8 @@ func TestRedisQueueEnqueueAllowPromotionDemotes(t *testing.T) { job.EnqueuedAt = deferredAt require.NoError(t, q.Enqueue(job, opts)) - queueKey := "{ns1}:queue:q1" - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + queueKey := "{ns-work}:queue:q1" + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) // Re-enqueue the same ID with an earlier score. earlierAt := time.Now() @@ -489,10 +489,10 @@ func TestRedisQueueBulkEnqueueMixedAllowPromotion(t *testing.T) { // semantic to each, atomically. client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) - opts := &EnqueueOptions{Namespace: "{ns1}", QueueID: "q1"} + opts := &EnqueueOptions{Namespace: "{ns-work}", QueueID: "q1"} guarded := NewJob() deferredAt := time.Now().Add(time.Minute) @@ -510,13 +510,13 @@ func TestRedisQueueBulkEnqueueMixedAllowPromotion(t *testing.T) { promotable.EnqueuedAt = earlierAt require.NoError(t, q.BulkEnqueue([]*Job{guarded, promotable}, opts)) - queueKey := "{ns1}:queue:q1" + queueKey := "{ns-work}:queue:q1" - guardedScore, err := client.ZScore(context.Background(), queueKey, fmt.Sprintf("{ns1}:job:%s", guarded.ID)).Result() + guardedScore, err := client.ZScore(context.Background(), queueKey, fmt.Sprintf("{ns-work}:job:%s", guarded.ID)).Result() require.NoError(t, err) require.EqualValues(t, deferredAt.Unix(), guardedScore, "guarded job must retain its later score") - promotableScore, err := client.ZScore(context.Background(), queueKey, fmt.Sprintf("{ns1}:job:%s", promotable.ID)).Result() + promotableScore, err := client.ZScore(context.Background(), queueKey, fmt.Sprintf("{ns-work}:job:%s", promotable.ID)).Result() require.NoError(t, err) require.EqualValues(t, earlierAt.Unix(), promotableScore, "promotable job must accept the earlier score") } @@ -528,10 +528,10 @@ func TestRedisQueuePromoteJobAllowPromotionDemotes(t *testing.T) { // PromoteOnAck path needs to advance. client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) - opts := &EnqueueOptions{Namespace: "{ns1}", QueueID: "q1"} + opts := &EnqueueOptions{Namespace: "{ns-work}", QueueID: "q1"} job := NewJob() job.AllowPromotion = true @@ -539,7 +539,7 @@ func TestRedisQueuePromoteJobAllowPromotionDemotes(t *testing.T) { require.NoError(t, q.Enqueue(job, opts)) dequeued, err := q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, @@ -547,8 +547,8 @@ func TestRedisQueuePromoteJobAllowPromotionDemotes(t *testing.T) { require.NoError(t, err) require.Equal(t, job.ID, dequeued.ID) - queueKey := "{ns1}:queue:q1" - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + queueKey := "{ns-work}:queue:q1" + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) scoreAfterDequeue, err := client.ZScore(context.Background(), queueKey, jobKey).Result() require.NoError(t, err) @@ -567,7 +567,7 @@ func TestRedisQueuePromoteJobAllowPromotionDemotes(t *testing.T) { func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) // Enqueue a job @@ -575,7 +575,7 @@ func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) { job.EnqueuedAt = time.Now() opts := &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", } @@ -584,7 +584,7 @@ func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) { // Dequeue the job (this sets score to now + invisibleSec) dequeueOpts := &DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, // 60 seconds @@ -594,8 +594,8 @@ func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) { require.Equal(t, job.ID, dequeuedJob.ID) // Check that job's score is now + invisibleSec - queueKey := "{ns1}:queue:q1" - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + queueKey := "{ns-work}:queue:q1" + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) scoreAfterDequeue, err := client.ZScore(context.Background(), queueKey, jobKey).Result() require.NoError(t, err) @@ -615,24 +615,24 @@ func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) { func TestRedisQueuePromoteJobMissingAllowPromotionDoesNotDemote(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) job := NewJob() job.EnqueuedAt = time.Now() opts := &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", } require.NoError(t, q.Enqueue(job, opts)) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) require.NoError(t, client.HDel(context.Background(), jobKey, "allow_promotion").Err()) dequeuedJob, err := q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, @@ -640,7 +640,7 @@ func TestRedisQueuePromoteJobMissingAllowPromotionDoesNotDemote(t *testing.T) { require.NoError(t, err) require.Equal(t, job.ID, dequeuedJob.ID) - queueKey := "{ns1}:queue:q1" + queueKey := "{ns-work}:queue:q1" scoreAfterDequeue, err := client.ZScore(context.Background(), queueKey, jobKey).Result() require.NoError(t, err) diff --git a/redistest/client.go b/redistest/client.go index 89ca770..0a62f34 100644 --- a/redistest/client.go +++ b/redistest/client.go @@ -23,12 +23,56 @@ func NewClient() redis.UniversalClient { }) } -// Reset is used to clear redis for next test. -func Reset(client redis.UniversalClient) error { - if cc, ok := client.(*redis.ClusterClient); ok { - return cc.ForEachMaster(context.Background(), func(ctx context.Context, c *redis.Client) error { - return c.FlushAll(ctx).Err() - }) +// Reset deletes keys belonging to the given namespaces so the caller starts +// from a clean slate. Scoping cleanup to namespaces lets tests in different +// packages run in parallel against the same Redis without one test's reset +// wiping another test's in-progress data (which a FlushAll would do). +// +// Passing no namespaces falls back to FlushAll for legacy callers. +func Reset(client redis.UniversalClient, namespaces ...string) error { + ctx := context.Background() + if len(namespaces) == 0 { + if cc, ok := client.(*redis.ClusterClient); ok { + return cc.ForEachMaster(ctx, func(ctx context.Context, c *redis.Client) error { + return c.FlushAll(ctx).Err() + }) + } + return client.FlushAll(ctx).Err() } - return client.FlushAll(context.Background()).Err() + + deleteMatching := func(ctx context.Context, c redis.Cmdable, pattern string) error { + var cursor uint64 + for { + keys, next, err := c.Scan(ctx, cursor, pattern, 1000).Result() + if err != nil { + return err + } + if len(keys) > 0 { + if err := c.Del(ctx, keys...).Err(); err != nil { + return err + } + } + if next == 0 { + return nil + } + cursor = next + } + } + + for _, ns := range namespaces { + pattern := ns + ":*" + if cc, ok := client.(*redis.ClusterClient); ok { + err := cc.ForEachMaster(ctx, func(ctx context.Context, c *redis.Client) error { + return deleteMatching(ctx, c, pattern) + }) + if err != nil { + return err + } + continue + } + if err := deleteMatching(ctx, client, pattern); err != nil { + return err + } + } + return nil } diff --git a/sidekiq/queue_test.go b/sidekiq/queue_test.go index 2a2a8f2..44ab979 100644 --- a/sidekiq/queue_test.go +++ b/sidekiq/queue_test.go @@ -14,7 +14,7 @@ import ( func TestSidekiqQueueExternalEnqueue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{sidekiq}")) q := NewQueue(client) job := work.NewJob() @@ -44,7 +44,7 @@ func TestSidekiqQueueExternalEnqueue(t *testing.T) { func TestSidekiqQueueExternalEnqueueScheduled(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{sidekiq}")) q := NewQueue(client) job := work.NewJob() @@ -80,7 +80,7 @@ func TestSidekiqQueueExternalEnqueueScheduled(t *testing.T) { func TestSidekiqQueueExternalBulkEnqueue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{sidekiq}")) q := NewQueue(client) const jobCount = 100000 diff --git a/worker_test.go b/worker_test.go index 039c8ad..4d7acb5 100644 --- a/worker_test.go +++ b/worker_test.go @@ -16,10 +16,10 @@ import ( func TestWorkerStartStop(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) w := NewWorker(&WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", Queue: NewRedisQueue(client), }) err := w.Register("test", @@ -41,10 +41,10 @@ func TestWorkerStartStop(t *testing.T) { func TestWorkerExportMetrics(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) w := NewWorker(&WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", Queue: NewRedisQueue(client), }) err := w.Register("test", @@ -60,7 +60,7 @@ func TestWorkerExportMetrics(t *testing.T) { all, err := w.ExportMetrics() require.NoError(t, err) require.Len(t, all.Queue, 1) - require.Equal(t, all.Queue[0].Namespace, "{ns1}") + require.Equal(t, all.Queue[0].Namespace, "{ns-work}") require.Equal(t, all.Queue[0].QueueID, "test") } @@ -98,14 +98,14 @@ func waitEmpty(client redis.UniversalClient, key string, timeout time.Duration) func TestWorkerRunJobMultiQueue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) type message struct { Text string } w := NewWorker(&WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", Queue: NewRedisQueue(client), }) err := w.Register("test1", @@ -147,7 +147,7 @@ func TestWorkerRunJobMultiQueue(t *testing.T) { require.NoError(t, err) err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "test1", }) require.NoError(t, err) @@ -159,32 +159,32 @@ func TestWorkerRunJobMultiQueue(t *testing.T) { require.NoError(t, err) err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "test2", }) require.NoError(t, err) } - count, err := client.ZCard(context.Background(), "{ns1}:queue:test1").Result() + count, err := client.ZCard(context.Background(), "{ns-work}:queue:test1").Result() require.NoError(t, err) require.EqualValues(t, 3, count) - count, err = client.ZCard(context.Background(), "{ns1}:queue:test2").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:test2").Result() require.NoError(t, err) require.EqualValues(t, 3, count) w.Start() - err = waitEmpty(client, "{ns1}:queue:test1", 10*time.Second) + err = waitEmpty(client, "{ns-work}:queue:test1", 10*time.Second) require.NoError(t, err) - err = waitEmpty(client, "{ns1}:queue:test2", 10*time.Second) + err = waitEmpty(client, "{ns-work}:queue:test2", 10*time.Second) require.NoError(t, err) w.Stop() - count, err = client.ZCard(context.Background(), "{ns1}:queue:test1").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:test1").Result() require.NoError(t, err) require.EqualValues(t, 0, count) - count, err = client.ZCard(context.Background(), "{ns1}:queue:test2").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:test2").Result() require.NoError(t, err) require.EqualValues(t, 0, count) } @@ -192,10 +192,10 @@ func TestWorkerRunJobMultiQueue(t *testing.T) { func TestWorkerRunJob(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) w := NewWorker(&WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", Queue: NewRedisQueue(client), }) err := w.Register("success", @@ -237,22 +237,22 @@ func TestWorkerRunJob(t *testing.T) { require.NoError(t, err) err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "success", }) require.NoError(t, err) } - count, err := client.ZCard(context.Background(), "{ns1}:queue:success").Result() + count, err := client.ZCard(context.Background(), "{ns-work}:queue:success").Result() require.NoError(t, err) require.EqualValues(t, 3, count) w.Start() - err = waitEmpty(client, "{ns1}:queue:success", 10*time.Second) + err = waitEmpty(client, "{ns-work}:queue:success", 10*time.Second) require.NoError(t, err) w.Stop() - count, err = client.ZCard(context.Background(), "{ns1}:queue:success").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:success").Result() require.NoError(t, err) require.EqualValues(t, 0, count) @@ -262,28 +262,28 @@ func TestWorkerRunJob(t *testing.T) { require.NoError(t, err) err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "failure", }) require.NoError(t, err) } - count, err = client.ZCard(context.Background(), "{ns1}:queue:failure").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:failure").Result() require.NoError(t, err) require.EqualValues(t, 3, count) w.Start() - err = waitEmpty(client, "{ns1}:queue:failure", 10*time.Second) + err = waitEmpty(client, "{ns-work}:queue:failure", 10*time.Second) require.NoError(t, err) w.Stop() - count, err = client.ZCard(context.Background(), "{ns1}:queue:failure").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:failure").Result() require.NoError(t, err) require.EqualValues(t, 3, count) for i := 0; i < 3; i++ { job, err := NewRedisQueue(client).Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "failure", At: time.Now().Add(time.Hour), InvisibleSec: 3600, @@ -299,28 +299,28 @@ func TestWorkerRunJob(t *testing.T) { require.NoError(t, err) err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "panic", }) require.NoError(t, err) } - count, err = client.ZCard(context.Background(), "{ns1}:queue:panic").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:panic").Result() require.NoError(t, err) require.EqualValues(t, 3, count) w.Start() - err = waitEmpty(client, "{ns1}:queue:panic", 10*time.Second) + err = waitEmpty(client, "{ns-work}:queue:panic", 10*time.Second) require.NoError(t, err) w.Stop() - count, err = client.ZCard(context.Background(), "{ns1}:queue:panic").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:panic").Result() require.NoError(t, err) require.EqualValues(t, 3, count) for i := 0; i < 3; i++ { job, err := NewRedisQueue(client).Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "panic", At: time.Now().Add(time.Hour), InvisibleSec: 3600, @@ -334,43 +334,43 @@ func TestWorkerRunJob(t *testing.T) { func TestWorkerRunOnce(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) job := NewJob() err := NewRedisQueue(client).Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "success", }) require.NoError(t, err) - count, err := client.ZCard(context.Background(), "{ns1}:queue:success").Result() + count, err := client.ZCard(context.Background(), "{ns-work}:queue:success").Result() require.NoError(t, err) require.EqualValues(t, 1, count) job2 := NewJob() err = NewRedisQueue(client).Enqueue(job2, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "failure", }) require.NoError(t, err) - count, err = client.ZCard(context.Background(), "{ns1}:queue:failure").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:failure").Result() require.NoError(t, err) require.EqualValues(t, 1, count) job3 := NewJob() err = NewRedisQueue(client).Enqueue(job3, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "panic", }) require.NoError(t, err) - count, err = client.ZCard(context.Background(), "{ns1}:queue:panic").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:panic").Result() require.NoError(t, err) require.EqualValues(t, 1, count) w := NewWorker(&WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", Queue: NewRedisQueue(client), }) @@ -384,7 +384,7 @@ func TestWorkerRunOnce(t *testing.T) { ) require.NoError(t, err) - count, err = client.ZCard(context.Background(), "{ns1}:queue:success").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:success").Result() require.NoError(t, err) require.EqualValues(t, 0, count) @@ -399,7 +399,7 @@ func TestWorkerRunOnce(t *testing.T) { require.Error(t, err) require.Equal(t, "no reason", err.Error()) - count, err = client.ZCard(context.Background(), "{ns1}:queue:failure").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:failure").Result() require.NoError(t, err) require.EqualValues(t, 1, count) @@ -414,7 +414,7 @@ func TestWorkerRunOnce(t *testing.T) { require.Error(t, err) require.True(t, strings.HasPrefix(err.Error(), "panic: unexpected")) - count, err = client.ZCard(context.Background(), "{ns1}:queue:panic").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:panic").Result() require.NoError(t, err) require.EqualValues(t, 1, count) } @@ -428,11 +428,11 @@ func TestWrappedHandlerError(t *testing.T) { func TestRetry(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) job := NewJob() opt := &DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", InvisibleSec: 10, } @@ -448,7 +448,7 @@ func TestRetry(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -480,7 +480,7 @@ func TestRetry(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf",