From cf840925aa86719f76c2f6b6229d8631367d8342 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 15 May 2023 21:28:59 -0500 Subject: [PATCH 1/6] feat(job): add changePriority method --- src/classes/scripts.ts | 34 +++++++++++++++++++++ src/commands/addJob-8.lua | 2 +- src/commands/changePriority-4.lua | 49 +++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 src/commands/changePriority-4.lua diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 6d4d0aaf63..eba8c797f6 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -527,6 +527,40 @@ export class Scripts { return keys.concat([delay, JSON.stringify(timestamp), jobId]); } + async changePriority( + jobId: string, + priority = 0, + lifo = false, + ): Promise { + const client = await this.queue.client; + + const args = this.changePriorityArgs(jobId, priority, lifo); + const result = await (client).changePriority(args); + if (result < 0) { + throw this.finishedErrors(result, jobId, 'changePriority'); + } + } + + private changePriorityArgs( + jobId: string, + priority = 0, + lifo = false, + ): (string | number)[] { + const keys: (string | number)[] = [ + this.queue.keys.wait, + this.queue.keys.paused, + this.queue.keys.meta, + this.queue.keys.priority, + ]; + + return keys.concat([ + priority, + this.queue.toKey(jobId), + jobId, + lifo ? 1 : 0, + ]); + } + // Note: We have an issue here with jobs using custom job ids moveToDelayedArgs( jobId: string, diff --git a/src/commands/addJob-8.lua b/src/commands/addJob-8.lua index ab2aad7b78..0f30724e8d 100644 --- a/src/commands/addJob-8.lua +++ b/src/commands/addJob-8.lua @@ -101,7 +101,7 @@ else rcall("XADD", KEYS[8], "*", "event", "duplicated", "jobId", jobId) return jobId .. "" -- convert to string - end + end end -- Store the job. diff --git a/src/commands/changePriority-4.lua b/src/commands/changePriority-4.lua new file mode 100644 index 0000000000..b5525ff48f --- /dev/null +++ b/src/commands/changePriority-4.lua @@ -0,0 +1,49 @@ +--[[ + Change job priority + Input: + KEYS[1] 'wait', + KEYS[2] 'paused' + KEYS[3] 'meta' + KEYS[4] 'priority' + + ARGV[1] priority value + ARGV[2] job key + ARGV[3] job id + ARGV[4] lifo + + Output: + 0 - OK + -1 - Missing job +]] +local jobKey = ARGV[2] +local jobId = ARGV[3] +local rcall = redis.call + +-- Includes +--- @include "includes/addJobWithPriority" +--- @include "includes/getTargetQueueList" + +if rcall("EXISTS", jobKey) == 1 then + local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2]) + + local numRemovedElements = rcall("LREM", target, -1, jobId) + if numRemovedElements > 0 then + rcall("ZREM", KEYS[5], jobId) + + -- Standard or priority add + if priority == 0 then + -- LIFO or FIFO + local pushCmd = ARGV[4] == 1 and 'RPUSH' or 'LPUSH'; + rcall(pushCmd, target, jobId) + else + -- Priority add + addJobWithPriority(KEYS[5], tonumber(ARGV[1]), target, jobId) + end + end + + rcall("HSET", jobKey, "priority", tonumber(ARGV[1])) + + return 0 +else + return -1 +end From c39413d6ada73c687b1bd0c44e89b92afa38b77e Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 15 May 2023 21:33:34 -0500 Subject: [PATCH 2/6] chore: add method in job --- src/classes/job.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/classes/job.ts b/src/classes/job.ts index 0779275dfa..e1e1db50db 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -738,6 +738,18 @@ export class Job< this.delay = delay; } + /** + * Change job priority. + * + * @returns void + */ + async changePriority(opts: { + priority?: number; + lifo?: boolean; + }): Promise { + await this.scripts.changePriority(this.id, opts.priority, opts.lifo); + } + /** * Get this jobs children result values if any. * From a92cfe27d3a1e6d9be85be0a13f34c8a2f16ae00 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 16 May 2023 20:53:14 -0500 Subject: [PATCH 3/6] test(job): add test cases for changePriority --- src/commands/changePriority-4.lua | 9 +- tests/test_job.ts | 205 +++++++++++++++++++++++++++++- 2 files changed, 208 insertions(+), 6 deletions(-) diff --git a/src/commands/changePriority-4.lua b/src/commands/changePriority-4.lua index b5525ff48f..6a21c541d9 100644 --- a/src/commands/changePriority-4.lua +++ b/src/commands/changePriority-4.lua @@ -17,6 +17,7 @@ ]] local jobKey = ARGV[2] local jobId = ARGV[3] +local priority = tonumber(ARGV[1]) local rcall = redis.call -- Includes @@ -28,20 +29,20 @@ if rcall("EXISTS", jobKey) == 1 then local numRemovedElements = rcall("LREM", target, -1, jobId) if numRemovedElements > 0 then - rcall("ZREM", KEYS[5], jobId) + rcall("ZREM", KEYS[4], jobId) -- Standard or priority add if priority == 0 then -- LIFO or FIFO - local pushCmd = ARGV[4] == 1 and 'RPUSH' or 'LPUSH'; + local pushCmd = ARGV[4] == '1' and 'RPUSH' or 'LPUSH'; rcall(pushCmd, target, jobId) else -- Priority add - addJobWithPriority(KEYS[5], tonumber(ARGV[1]), target, jobId) + addJobWithPriority(KEYS[4], priority, target, jobId) end end - rcall("HSET", jobKey, "priority", tonumber(ARGV[1])) + rcall("HSET", jobKey, "priority", priority) return 0 else diff --git a/tests/test_job.ts b/tests/test_job.ts index 1049374240..6869a85d62 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -294,7 +294,7 @@ describe('Job', function () { const job = await Job.create(queue, 'test', { foo: 'bar' }); await job.updateProgress(42); const storedJob = await Job.fromId(queue, job.id); - expect(storedJob.progress).to.be.equal(42); + expect(storedJob!.progress).to.be.equal(42); }); it('can set and get progress as object', async function () { @@ -733,6 +733,207 @@ describe('Job', function () { }); }); + describe('.changePriority', () => { + it('can change priority of a job', async function () { + await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 8 }); + const job = await Job.create( + queue, + 'test2', + { foo: 'bar' }, + { priority: 16 }, + ); + + await job.changePriority({ + priority: 1, + }); + + const worker = new Worker( + queueName, + async () => { + await delay(20); + }, + { connection }, + ); + await worker.waitUntilReady(); + + const completing = new Promise(resolve => { + worker.on( + 'completed', + after(2, job => { + expect(job.name).to.be.eql('test1'); + resolve(); + }), + ); + }); + + await completing; + + await worker.close(); + }); + + describe('when queue is paused', () => { + it('respects new priority', async () => { + await queue.pause(); + await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 8 }); + const job = await Job.create( + queue, + 'test2', + { foo: 'bar' }, + { priority: 16 }, + ); + + await job.changePriority({ + priority: 1, + }); + + const worker = new Worker( + queueName, + async () => { + await delay(20); + }, + { connection }, + ); + await worker.waitUntilReady(); + + const completing = new Promise(resolve => { + worker.on( + 'completed', + after(2, job => { + expect(job.name).to.be.eql('test1'); + resolve(); + }), + ); + }); + + await queue.resume(); + + await completing; + + await worker.close(); + }); + }); + + describe('when lifo option is provided as true', () => { + it('moves job to the head of wait list', async () => { + await queue.pause(); + await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 8 }); + const job = await Job.create( + queue, + 'test2', + { foo: 'bar' }, + { priority: 16 }, + ); + + await job.changePriority({ + lifo: true, + }); + + const worker = new Worker( + queueName, + async () => { + await delay(20); + }, + { connection }, + ); + await worker.waitUntilReady(); + + const completing = new Promise(resolve => { + worker.on( + 'completed', + after(2, job => { + expect(job.name).to.be.eql('test1'); + resolve(); + }), + ); + }); + + await queue.resume(); + + await completing; + + await worker.close(); + }); + }); + + describe('when lifo option is provided as false', () => { + it('moves job to the tail of wait list', async () => { + await queue.pause(); + const job = await Job.create( + queue, + 'test1', + { foo: 'bar' }, + { priority: 8 }, + ); + await Job.create(queue, 'test2', { foo: 'bar' }, { priority: 16 }); + + await job.changePriority({ + lifo: false, + }); + + const worker = new Worker( + queueName, + async () => { + await delay(20); + }, + { connection }, + ); + await worker.waitUntilReady(); + + const completing = new Promise(resolve => { + worker.on( + 'completed', + after(2, job => { + expect(job.name).to.be.eql('test1'); + resolve(); + }), + ); + }); + + await queue.resume(); + + await completing; + + await worker.close(); + }); + }); + + describe('when job is not in wait state', () => { + it('does not add a record in priority zset', async () => { + const job = await Job.create( + queue, + 'test1', + { foo: 'bar' }, + { delay: 500 }, + ); + + await job.changePriority({ + priority: 10, + }); + + const client = await queue.client; + const count = await client.zcard(`bull:${queueName}:priority`); + const priority = await client.hget( + `bull:${queueName}:${job.id}`, + 'priority', + ); + + expect(count).to.be.eql(0); + expect(priority).to.be.eql('10'); + }); + }); + + describe('when job does not exist', () => { + it('throws an error', async () => { + const job = await Job.create(queue, 'test', { foo: 'bar' }); + await job.remove(); + + await expect(job.changePriority({ priority: 2 })).to.be.rejectedWith( + `Missing key for job ${job.id}. changePriority`, + ); + }); + }); + }); + describe('.promote', () => { it('can promote a delayed job to be executed immediately', async () => { const job = await Job.create( @@ -766,7 +967,7 @@ describe('Job', function () { const done = new Promise(resolve => { worker.on('completed', job => { - completed.push(job.id); + completed.push(job.id!); if (completed.length > 3) { expect(completed).to.be.eql(['a', 'b', 'c', 'd']); resolve(); From 2d99f6f1f44970002a4ce179d5c60463c91f5953 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 16 May 2023 21:07:39 -0500 Subject: [PATCH 4/6] test: try to fix a flaky test --- tests/test_clean.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/test_clean.ts b/tests/test_clean.ts index 508d4cb243..63997b3f53 100644 --- a/tests/test_clean.ts +++ b/tests/test_clean.ts @@ -49,7 +49,13 @@ describe('Cleaner', () => { }); it('should clean two jobs from the queue', async () => { - const worker = new Worker(queueName, async () => {}, { connection }); + const worker = new Worker( + queueName, + async () => { + await delay(10); + }, + { connection }, + ); await worker.waitUntilReady(); const completing = new Promise(resolve => { @@ -67,7 +73,7 @@ describe('Cleaner', () => { ]); await completing; - await delay(1); + await delay(10); const jobs = await queue.clean(0, 0); expect(jobs.length).to.be.eql(2); From 9a34b832d079b8868752dc4d0e7ba03124a90b68 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 23 May 2023 22:11:01 -0500 Subject: [PATCH 5/6] docs(prioritized): add changePriority documentation --- docs/gitbook/guide/jobs/delayed.md | 19 ++++++++++++++++++- docs/gitbook/guide/jobs/prioritized.md | 24 +++++++++++++++++++++++- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/docs/gitbook/guide/jobs/delayed.md b/docs/gitbook/guide/jobs/delayed.md index b0ec16a248..c149683430 100644 --- a/docs/gitbook/guide/jobs/delayed.md +++ b/docs/gitbook/guide/jobs/delayed.md @@ -20,9 +20,26 @@ await myQueue.add('house', { color: 'white' }, { delay: 5000 }); If you want to process the job after a specific point in time, just add the time remaining to that point in time. For example, let's say you want to process the job on the third of July 2035 at 10:30: ```typescript -const targetTime = new Date("03-07-2035 10:30"); +const targetTime = new Date('03-07-2035 10:30'); const delay = Number(targetTime) - Number(new Date()); await myQueue.add('house', { color: 'white' }, { delay }); ``` +## Change delay + +If you want to change the delay after inserting a delayed job, just use **changeDelay** method. For example, let's say you want to change the delay from 2000 to 4000 milliseconds: + +```typescript +const job = await Job.create(queue, 'test', { foo: 'bar' }, { delay: 2000 }); + +await job.changeDelay(4000); +``` + +{% hint style="warning" %} +Take in count that your job must be in delayed state when you change the delay. +{% endhint %} + +## Read more: + +- 💡 [Change Delay API Reference](https://api.docs.bullmq.io/classes/Job.html#changeDelay) diff --git a/docs/gitbook/guide/jobs/prioritized.md b/docs/gitbook/guide/jobs/prioritized.md index cc8798584a..749674b2cc 100644 --- a/docs/gitbook/guide/jobs/prioritized.md +++ b/docs/gitbook/guide/jobs/prioritized.md @@ -6,7 +6,7 @@ Jobs can also include a priority option. Using priorities, job's processing orde Adding prioritized jobs is a slower operation than the other types of jobs, with a complexity O(n) relative to the number of jobs waiting in the Queue. {% endhint %} -Note that the priorities go from 1 to MAX\_INT, whereas a lower number is always a higher priority than higher numbers. +Note that the priorities go from 1 to MAX_INT, whereas a lower number is always a higher priority than higher numbers. Jobs without a priority assigned will get the least priority. @@ -24,3 +24,25 @@ await myQueue.add('wall', { color: 'blue' }, { priority: 7 }); ``` If several jobs are added with the same priority value, then the jobs within that priority will be processed in FIFO (First in first out) fashion. + +## Change priority + +If you want to change the priority after inserting a job, just use **changePriority** method. For example, let's say you want to change the priority from 16 to 1: + +```typescript +const job = await Job.create(queue, 'test2', { foo: 'bar' }, { priority: 16 }); + +await job.changePriority({ + priority: 1, +}); +``` + +or if you want to use lifo option: + +```typescript +const job = await Job.create(queue, 'test2', { foo: 'bar' }, { priority: 16 }); + +await job.changePriority({ + lifo: true, +}); +``` From d660f613a0d93fa0184f837840edb021f24f1105 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Wed, 31 May 2023 17:43:09 -0500 Subject: [PATCH 6/6] docs: address suggestions --- docs/gitbook/guide/jobs/delayed.md | 2 +- docs/gitbook/guide/jobs/prioritized.md | 2 +- src/commands/includes/getTargetQueueList.lua | 4 ++-- tests/test_job.ts | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/gitbook/guide/jobs/delayed.md b/docs/gitbook/guide/jobs/delayed.md index c149683430..e43beea0d3 100644 --- a/docs/gitbook/guide/jobs/delayed.md +++ b/docs/gitbook/guide/jobs/delayed.md @@ -37,7 +37,7 @@ await job.changeDelay(4000); ``` {% hint style="warning" %} -Take in count that your job must be in delayed state when you change the delay. +Take in count that your job must be into delayed state when you change the delay. {% endhint %} ## Read more: diff --git a/docs/gitbook/guide/jobs/prioritized.md b/docs/gitbook/guide/jobs/prioritized.md index 749674b2cc..85ec8f22d7 100644 --- a/docs/gitbook/guide/jobs/prioritized.md +++ b/docs/gitbook/guide/jobs/prioritized.md @@ -27,7 +27,7 @@ If several jobs are added with the same priority value, then the jobs within tha ## Change priority -If you want to change the priority after inserting a job, just use **changePriority** method. For example, let's say you want to change the priority from 16 to 1: +If you want to change the priority after inserting a job, just use the **changePriority** method. For example, let's say that you want to change the priority from 16 to 1: ```typescript const job = await Job.create(queue, 'test2', { foo: 'bar' }, { priority: 16 }); diff --git a/src/commands/includes/getTargetQueueList.lua b/src/commands/includes/getTargetQueueList.lua index 6a0a2a7b36..a208fdca6a 100644 --- a/src/commands/includes/getTargetQueueList.lua +++ b/src/commands/includes/getTargetQueueList.lua @@ -5,8 +5,8 @@ local function getTargetQueueList(queueMetaKey, waitKey, pausedKey) if rcall("HEXISTS", queueMetaKey, "paused") ~= 1 then - return waitKey + return waitKey, false else - return pausedKey + return pausedKey, true end end diff --git a/tests/test_job.ts b/tests/test_job.ts index 7a69dbe264..72b47ad4f6 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -609,7 +609,7 @@ describe('Job', function () { const token = 'my-token'; await Job.create(queue, 'test', { foo: 'bar' }, { attempts: 1 }); const job = (await worker.getNextJob(token)) as Job; - await delay(100); + await delay(105); await job.remove(); await expect(