Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions bench/worker_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ func BenchmarkWorkerRunJob(b *testing.B) {
b.StopTimer()

for n := 0; n < b.N; n++ {
require.NoError(b, redistest.Reset(client))
require.NoError(b, redistest.Reset(client, "{ns-bench}"))

wp := work.NewWorkerPoolWithOptions(
struct{}{}, 1, "{ns1}", pool,
struct{}{}, 1, "{ns-bench}", pool,
work.WorkerPoolOptions{
SleepBackoffs: []int64{1000},
},
Expand All @@ -49,7 +49,7 @@ func BenchmarkWorkerRunJob(b *testing.B) {
return nil
})

enqueuer := work.NewEnqueuer("{ns1}", pool)
enqueuer := work.NewEnqueuer("{ns-bench}", pool)
for i := 0; i < k; i++ {
_, err := enqueuer.Enqueue("test", nil)
require.NoError(b, err)
Expand All @@ -68,11 +68,11 @@ func BenchmarkWorkerRunJob(b *testing.B) {
b.StopTimer()

for n := 0; n < b.N; n++ {
require.NoError(b, redistest.Reset(client))
require.NoError(b, redistest.Reset(client, "{ns-bench}"))

queue := work2.NewRedisQueue(client)
w := work2.NewWorker(&work2.WorkerOptions{
Namespace: "{ns1}",
Namespace: "{ns-bench}",
Queue: queue,
})
var wg sync.WaitGroup
Expand All @@ -93,7 +93,7 @@ func BenchmarkWorkerRunJob(b *testing.B) {
job := work2.NewJob()

err := queue.Enqueue(job, &work2.EnqueueOptions{
Namespace: "{ns1}",
Namespace: "{ns-bench}",
QueueID: "test",
})
require.NoError(b, err)
Expand Down
52 changes: 26 additions & 26 deletions http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func TestServer(t *testing.T) {
client := redistest.NewClient()
defer client.Close()
require.NoError(t, redistest.Reset(client))
require.NoError(t, redistest.Reset(client, "{ns-http}"))
q := work.NewRedisQueue(client)

srv := NewServer(&ServerOptions{
Expand Down Expand Up @@ -89,15 +89,15 @@ func TestServer(t *testing.T) {
},
{
reqMethod: "DELETE",
reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=xxx",
reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=xxx",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n",
},
{
reqMethod: "GET",
reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=xxx",
reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=xxx",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"status\":\"completed\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n",
respBody: "{\"namespace\":\"{ns-http}\",\"status\":\"completed\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n",
},
{
// bad duration
Expand All @@ -122,91 +122,91 @@ func TestServer(t *testing.T) {
reqMethod: "POST",
reqURL: "http://example.com/jobs",
reqBody: `{
"namespace": "{ns1}",
"namespace": "{ns-http}",
"queue_id": "q1",
"payload": "payload1",
"delay": "10s"
}`,
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
},
{
reqMethod: "GET",
reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1",
reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":0,\"scheduled_total\":1,\"total\":1,\"latency\":0}\n",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":0,\"scheduled_total\":1,\"total\":1,\"latency\":0}\n",
},
{
reqMethod: "POST",
reqURL: "http://example.com/jobs",
reqBody: `{
"namespace": "{ns1}",
"namespace": "{ns-http}",
"queue_id": "q1",
"payload": "payload1"
}`,
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
},
{
reqMethod: "GET",
reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1",
reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n",
},
{
reqMethod: "POST",
reqURL: "http://example.com/jobs",
reqBody: `{
"namespace": "{ns1}",
"namespace": "{ns-http}",
"queue_id": "q1",
"id": "id1",
"payload": "payload1"
}`,
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
},
{
// same job id
reqMethod: "POST",
reqURL: "http://example.com/jobs",
reqBody: `{
"namespace": "{ns1}",
"namespace": "{ns-http}",
"queue_id": "q1",
"id": "id1",
"payload": "payload2"
}`,
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
},
{
reqMethod: "GET",
reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1",
reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":2,\"scheduled_total\":1,\"total\":3,\"latency\":[0-9]+}\n",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":2,\"scheduled_total\":1,\"total\":3,\"latency\":[0-9]+}\n",
},
{
reqMethod: "GET",
reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=id1",
reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=id1",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"status\":\"ready\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
respBody: "{\"namespace\":\"{ns-http}\",\"status\":\"ready\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
},
{
reqMethod: "DELETE",
reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=id1",
reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=id1",
respCode: 500,
respBody: "{\"error\":\"work: empty queue id\"}\n",
},
{
reqMethod: "DELETE",
reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&queue_id=q1&job_id=id1",
reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&queue_id=q1&job_id=id1",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
},
{
reqMethod: "GET",
reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1",
reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n",
},
} {
var reqBody io.Reader
Expand Down
25 changes: 25 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,31 @@ type Job struct {
Retries int64 `msgpack:"retries"`
// If the job previously fails, LastError will be populated with error string.
LastError string `msgpack:"last_error"`

// AllowPromotion controls whether Enqueue and PromoteJob may lower this
// job's score.
//
// When false (default), both operations use ZADD XX GT semantics: once a
// job is scheduled at time T, a subsequent Enqueue with score T' < T is
// a no-op, and PromoteJob cannot reduce the score. This preserves the
// "deferral" guarantee relied on by deterministic-ID jobs that may be
// enqueued multiple times concurrently (a dedup pattern) and prevents
// PromoteJob from demoting a job whose score has been bumped to
// now + InvisibleSec by Dequeue.
//
// When true, both operations use ZADD XX (no GT). This lets an
// explicit caller lower the score, for example in an opt-in retry flow
// where backoff should override Dequeue's InvisibleSec mark, and lets
// PromoteJob advance a scheduled-but-pending job to now. The caller is
// responsible for ensuring this is safe — typically that the job has a
// unique ID and is not relied on for dedup-deferral.
//
// This field is write-only at the Go API: it is persisted to Redis on
// Enqueue and consulted server-side by Enqueue and PromoteJob, but it
// is NOT rehydrated onto jobs returned by Dequeue or BulkFind — those
// always observe the zero value. PromoteJob reads the persisted value
// directly from Redis, so callers do not need to round-trip it.
AllowPromotion bool `msgpack:"-" json:",omitempty"`
}

// InvalidJobPayloadError wraps json or msgpack decoding error.
Expand Down
16 changes: 8 additions & 8 deletions middleware/concurrent/dequeuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func TestDequeuer(t *testing.T) {
client := redistest.NewClient()
defer client.Close()
require.NoError(t, redistest.Reset(client))
require.NoError(t, redistest.Reset(client, "{ns-concurrent}"))

var called int
h1 := func(*work.DequeueOptions) (*work.Job, error) {
Expand All @@ -38,7 +38,7 @@ func TestDequeuer(t *testing.T) {
}

opt := &work.DequeueOptions{
Namespace: "{ns1}",
Namespace: "{ns-concurrent}",
QueueID: "q1",
At: time.Now(),
InvisibleSec: 60,
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestDequeuer(t *testing.T) {

z, err := client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:lock:q1",
"{ns-concurrent}:lock:q1",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Expand All @@ -87,7 +87,7 @@ func TestDequeuer(t *testing.T) {
require.EqualValues(t, opt.At.Unix()+60, z[0].Score)
require.EqualValues(t, opt.At.Unix()+60, z[1].Score)

require.NoError(t, client.ZRem(context.Background(), "{ns1}:lock:q1", "w1").Err())
require.NoError(t, client.ZRem(context.Background(), "{ns-concurrent}:lock:q1", "w1").Err())
optLater := *opt
optLater.At = opt.At.Add(10 * time.Second)
// worker 0 is locked already
Expand All @@ -105,7 +105,7 @@ func TestDequeuer(t *testing.T) {

z, err = client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:lock:q1",
"{ns-concurrent}:lock:q1",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestDequeuer(t *testing.T) {

z, err = client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:lock:q1",
"{ns-concurrent}:lock:q1",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Expand All @@ -155,7 +155,7 @@ func BenchmarkConcurrency(b *testing.B) {

client := redistest.NewClient()
defer client.Close()
require.NoError(b, redistest.Reset(client))
require.NoError(b, redistest.Reset(client, "{ns-concurrent}"))

var called int
h1 := func(*work.DequeueOptions) (*work.Job, error) {
Expand All @@ -178,7 +178,7 @@ func BenchmarkConcurrency(b *testing.B) {
}

opt := &work.DequeueOptions{
Namespace: "{ns1}",
Namespace: "{ns-concurrent}",
QueueID: "q1",
At: time.Now(),
InvisibleSec: 60,
Expand Down
2 changes: 1 addition & 1 deletion middleware/concurrent/local_dequeuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestLocalDequeuer(t *testing.T) {
}

opt := &work.DequeueOptions{
Namespace: "{ns1}",
Namespace: "{ns-concurrent}",
QueueID: "q1",
At: time.Now(),
InvisibleSec: 60,
Expand Down
2 changes: 1 addition & 1 deletion middleware/discard/after_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func TestAfter(t *testing.T) {
job := work.NewJob()
opt := &work.DequeueOptions{
Namespace: "{ns1}",
Namespace: "{ns-discard}",
QueueID: "q1",
}
d := After(time.Minute)
Expand Down
2 changes: 1 addition & 1 deletion middleware/discard/invalid_payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func TestInvalidPayload(t *testing.T) {
job := work.NewJob()
opt := &work.DequeueOptions{
Namespace: "{ns1}",
Namespace: "{ns-discard}",
QueueID: "q1",
}
h := InvalidPayload(func(*work.Job, *work.DequeueOptions) error {
Expand Down
2 changes: 1 addition & 1 deletion middleware/discard/max_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func TestMaxRetry(t *testing.T) {
job := work.NewJob()
opt := &work.DequeueOptions{
Namespace: "{ns1}",
Namespace: "{ns-discard}",
QueueID: "q1",
}
d := MaxRetry(1)
Expand Down
6 changes: 3 additions & 3 deletions middleware/heartbeat/heartbeater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
func TestHeartbeater(t *testing.T) {
client := redistest.NewClient()
defer client.Close()
require.NoError(t, redistest.Reset(client))
require.NoError(t, redistest.Reset(client, "{ns-heartbeat}"))

job := work.NewJob()
opt := &work.DequeueOptions{
Namespace: "{ns1}",
Namespace: "{ns-heartbeat}",
QueueID: "q1",
At: time.Now(),
InvisibleSec: 60,
Expand All @@ -41,7 +41,7 @@ func TestHeartbeater(t *testing.T) {

z, err := client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:queue:q1",
"{ns-heartbeat}:queue:q1",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Expand Down
4 changes: 2 additions & 2 deletions middleware/logrus/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func TestHandleFuncLogger(t *testing.T) {
job := work.NewJob()
opt := &work.DequeueOptions{
Namespace: "{ns1}",
Namespace: "{ns-logrus}",
QueueID: "q1",
}
h := HandleFuncLogger(func(*work.Job, *work.DequeueOptions) error {
Expand All @@ -31,7 +31,7 @@ func TestHandleFuncLogger(t *testing.T) {
func TestEnqueueFuncLogger(t *testing.T) {
job := work.NewJob()
opt := &work.EnqueueOptions{
Namespace: "{ns1}",
Namespace: "{ns-logrus}",
QueueID: "q1",
}
h := EnqueueFuncLogger(func(*work.Job, *work.EnqueueOptions) error {
Expand Down
8 changes: 4 additions & 4 deletions middleware/prometheus/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestHandleFuncMetrics(t *testing.T) {

job := work.NewJob()
opt := &work.DequeueOptions{
Namespace: "{ns1}",
Namespace: "{ns-prometheus}",
QueueID: "q1",
}
h := HandleFuncMetrics(func(*work.Job, *work.DequeueOptions) error {
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestEnqueueFuncMetrics(t *testing.T) {

job := work.NewJob()
opt := &work.EnqueueOptions{
Namespace: "{ns1}",
Namespace: "{ns-prometheus}",
QueueID: "q1",
}
h := EnqueueFuncMetrics(func(*work.Job, *work.EnqueueOptions) error {
Expand Down Expand Up @@ -93,10 +93,10 @@ func TestExportWorkerMetrics(t *testing.T) {

client := redistest.NewClient()
defer client.Close()
require.NoError(t, redistest.Reset(client))
require.NoError(t, redistest.Reset(client, "{ns-prometheus}"))

w := work.NewWorker(&work.WorkerOptions{
Namespace: "{ns1}",
Namespace: "{ns-prometheus}",
Queue: work.NewRedisQueue(client),
})
err = w.Register("test",
Expand Down
Loading