diff --git a/docs/gitbook/guide/jobs/delayed.md b/docs/gitbook/guide/jobs/delayed.md index b0ec16a248..e43beea0d3 100644 --- a/docs/gitbook/guide/jobs/delayed.md +++ b/docs/gitbook/guide/jobs/delayed.md @@ -20,9 +20,26 @@ await myQueue.add('house', { color: 'white' }, { delay: 5000 }); If you want to process the job after a specific point in time, just add the time remaining to that point in time. For example, let's say you want to process the job on the third of July 2035 at 10:30: ```typescript -const targetTime = new Date("03-07-2035 10:30"); +const targetTime = new Date('03-07-2035 10:30'); const delay = Number(targetTime) - Number(new Date()); await myQueue.add('house', { color: 'white' }, { delay }); ``` +## Change delay + +If you want to change the delay after inserting a delayed job, just use **changeDelay** method. For example, let's say you want to change the delay from 2000 to 4000 milliseconds: + +```typescript +const job = await Job.create(queue, 'test', { foo: 'bar' }, { delay: 2000 }); + +await job.changeDelay(4000); +``` + +{% hint style="warning" %} +Take in count that your job must be into delayed state when you change the delay. +{% endhint %} + +## Read more: + +- 💡 [Change Delay API Reference](https://api.docs.bullmq.io/classes/Job.html#changeDelay) diff --git a/docs/gitbook/guide/jobs/prioritized.md b/docs/gitbook/guide/jobs/prioritized.md index cc8798584a..85ec8f22d7 100644 --- a/docs/gitbook/guide/jobs/prioritized.md +++ b/docs/gitbook/guide/jobs/prioritized.md @@ -6,7 +6,7 @@ Jobs can also include a priority option. Using priorities, job's processing orde Adding prioritized jobs is a slower operation than the other types of jobs, with a complexity O(n) relative to the number of jobs waiting in the Queue. {% endhint %} -Note that the priorities go from 1 to MAX\_INT, whereas a lower number is always a higher priority than higher numbers. +Note that the priorities go from 1 to MAX_INT, whereas a lower number is always a higher priority than higher numbers. Jobs without a priority assigned will get the least priority. @@ -24,3 +24,25 @@ await myQueue.add('wall', { color: 'blue' }, { priority: 7 }); ``` If several jobs are added with the same priority value, then the jobs within that priority will be processed in FIFO (First in first out) fashion. + +## Change priority + +If you want to change the priority after inserting a job, just use the **changePriority** method. For example, let's say that you want to change the priority from 16 to 1: + +```typescript +const job = await Job.create(queue, 'test2', { foo: 'bar' }, { priority: 16 }); + +await job.changePriority({ + priority: 1, +}); +``` + +or if you want to use lifo option: + +```typescript +const job = await Job.create(queue, 'test2', { foo: 'bar' }, { priority: 16 }); + +await job.changePriority({ + lifo: true, +}); +``` diff --git a/src/classes/job.ts b/src/classes/job.ts index b514921cfb..9d72416a35 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -738,6 +738,18 @@ export class Job< this.delay = delay; } + /** + * Change job priority. + * + * @returns void + */ + async changePriority(opts: { + priority?: number; + lifo?: boolean; + }): Promise { + await this.scripts.changePriority(this.id, opts.priority, opts.lifo); + } + /** * Get this jobs children result values if any. * diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 774c2cd5ed..2bbdb7a3ad 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -527,6 +527,40 @@ export class Scripts { return keys.concat([delay, JSON.stringify(timestamp), jobId]); } + async changePriority( + jobId: string, + priority = 0, + lifo = false, + ): Promise { + const client = await this.queue.client; + + const args = this.changePriorityArgs(jobId, priority, lifo); + const result = await (client).changePriority(args); + if (result < 0) { + throw this.finishedErrors(result, jobId, 'changePriority'); + } + } + + private changePriorityArgs( + jobId: string, + priority = 0, + lifo = false, + ): (string | number)[] { + const keys: (string | number)[] = [ + this.queue.keys.wait, + this.queue.keys.paused, + this.queue.keys.meta, + this.queue.keys.priority, + ]; + + return keys.concat([ + priority, + this.queue.toKey(jobId), + jobId, + lifo ? 1 : 0, + ]); + } + // Note: We have an issue here with jobs using custom job ids moveToDelayedArgs( jobId: string, diff --git a/src/commands/changePriority-4.lua b/src/commands/changePriority-4.lua new file mode 100644 index 0000000000..6a21c541d9 --- /dev/null +++ b/src/commands/changePriority-4.lua @@ -0,0 +1,50 @@ +--[[ + Change job priority + Input: + KEYS[1] 'wait', + KEYS[2] 'paused' + KEYS[3] 'meta' + KEYS[4] 'priority' + + ARGV[1] priority value + ARGV[2] job key + ARGV[3] job id + ARGV[4] lifo + + Output: + 0 - OK + -1 - Missing job +]] +local jobKey = ARGV[2] +local jobId = ARGV[3] +local priority = tonumber(ARGV[1]) +local rcall = redis.call + +-- Includes +--- @include "includes/addJobWithPriority" +--- @include "includes/getTargetQueueList" + +if rcall("EXISTS", jobKey) == 1 then + local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2]) + + local numRemovedElements = rcall("LREM", target, -1, jobId) + if numRemovedElements > 0 then + rcall("ZREM", KEYS[4], jobId) + + -- Standard or priority add + if priority == 0 then + -- LIFO or FIFO + local pushCmd = ARGV[4] == '1' and 'RPUSH' or 'LPUSH'; + rcall(pushCmd, target, jobId) + else + -- Priority add + addJobWithPriority(KEYS[4], priority, target, jobId) + end + end + + rcall("HSET", jobKey, "priority", priority) + + return 0 +else + return -1 +end diff --git a/src/commands/includes/getTargetQueueList.lua b/src/commands/includes/getTargetQueueList.lua index 6a0a2a7b36..a208fdca6a 100644 --- a/src/commands/includes/getTargetQueueList.lua +++ b/src/commands/includes/getTargetQueueList.lua @@ -5,8 +5,8 @@ local function getTargetQueueList(queueMetaKey, waitKey, pausedKey) if rcall("HEXISTS", queueMetaKey, "paused") ~= 1 then - return waitKey + return waitKey, false else - return pausedKey + return pausedKey, true end end diff --git a/tests/test_job.ts b/tests/test_job.ts index 7804a5fc87..72b47ad4f6 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -294,7 +294,7 @@ describe('Job', function () { const job = await Job.create(queue, 'test', { foo: 'bar' }); await job.updateProgress(42); const storedJob = await Job.fromId(queue, job.id); - expect(storedJob.progress).to.be.equal(42); + expect(storedJob!.progress).to.be.equal(42); }); it('can set and get progress as object', async function () { @@ -609,7 +609,7 @@ describe('Job', function () { const token = 'my-token'; await Job.create(queue, 'test', { foo: 'bar' }, { attempts: 1 }); const job = (await worker.getNextJob(token)) as Job; - await delay(100); + await delay(105); await job.remove(); await expect( @@ -759,6 +759,207 @@ describe('Job', function () { }); }); + describe('.changePriority', () => { + it('can change priority of a job', async function () { + await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 8 }); + const job = await Job.create( + queue, + 'test2', + { foo: 'bar' }, + { priority: 16 }, + ); + + await job.changePriority({ + priority: 1, + }); + + const worker = new Worker( + queueName, + async () => { + await delay(20); + }, + { connection }, + ); + await worker.waitUntilReady(); + + const completing = new Promise(resolve => { + worker.on( + 'completed', + after(2, job => { + expect(job.name).to.be.eql('test1'); + resolve(); + }), + ); + }); + + await completing; + + await worker.close(); + }); + + describe('when queue is paused', () => { + it('respects new priority', async () => { + await queue.pause(); + await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 8 }); + const job = await Job.create( + queue, + 'test2', + { foo: 'bar' }, + { priority: 16 }, + ); + + await job.changePriority({ + priority: 1, + }); + + const worker = new Worker( + queueName, + async () => { + await delay(20); + }, + { connection }, + ); + await worker.waitUntilReady(); + + const completing = new Promise(resolve => { + worker.on( + 'completed', + after(2, job => { + expect(job.name).to.be.eql('test1'); + resolve(); + }), + ); + }); + + await queue.resume(); + + await completing; + + await worker.close(); + }); + }); + + describe('when lifo option is provided as true', () => { + it('moves job to the head of wait list', async () => { + await queue.pause(); + await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 8 }); + const job = await Job.create( + queue, + 'test2', + { foo: 'bar' }, + { priority: 16 }, + ); + + await job.changePriority({ + lifo: true, + }); + + const worker = new Worker( + queueName, + async () => { + await delay(20); + }, + { connection }, + ); + await worker.waitUntilReady(); + + const completing = new Promise(resolve => { + worker.on( + 'completed', + after(2, job => { + expect(job.name).to.be.eql('test1'); + resolve(); + }), + ); + }); + + await queue.resume(); + + await completing; + + await worker.close(); + }); + }); + + describe('when lifo option is provided as false', () => { + it('moves job to the tail of wait list', async () => { + await queue.pause(); + const job = await Job.create( + queue, + 'test1', + { foo: 'bar' }, + { priority: 8 }, + ); + await Job.create(queue, 'test2', { foo: 'bar' }, { priority: 16 }); + + await job.changePriority({ + lifo: false, + }); + + const worker = new Worker( + queueName, + async () => { + await delay(20); + }, + { connection }, + ); + await worker.waitUntilReady(); + + const completing = new Promise(resolve => { + worker.on( + 'completed', + after(2, job => { + expect(job.name).to.be.eql('test1'); + resolve(); + }), + ); + }); + + await queue.resume(); + + await completing; + + await worker.close(); + }); + }); + + describe('when job is not in wait state', () => { + it('does not add a record in priority zset', async () => { + const job = await Job.create( + queue, + 'test1', + { foo: 'bar' }, + { delay: 500 }, + ); + + await job.changePriority({ + priority: 10, + }); + + const client = await queue.client; + const count = await client.zcard(`bull:${queueName}:priority`); + const priority = await client.hget( + `bull:${queueName}:${job.id}`, + 'priority', + ); + + expect(count).to.be.eql(0); + expect(priority).to.be.eql('10'); + }); + }); + + describe('when job does not exist', () => { + it('throws an error', async () => { + const job = await Job.create(queue, 'test', { foo: 'bar' }); + await job.remove(); + + await expect(job.changePriority({ priority: 2 })).to.be.rejectedWith( + `Missing key for job ${job.id}. changePriority`, + ); + }); + }); + }); + describe('.promote', () => { it('can promote a delayed job to be executed immediately', async () => { const job = await Job.create( @@ -792,7 +993,7 @@ describe('Job', function () { const done = new Promise(resolve => { worker.on('completed', job => { - completed.push(job.id); + completed.push(job.id!); if (completed.length > 3) { expect(completed).to.be.eql(['a', 'b', 'c', 'd']); resolve();