Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job): add changePriority method #1901

Merged
merged 8 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 18 additions & 1 deletion docs/gitbook/guide/jobs/delayed.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,26 @@ await myQueue.add('house', { color: 'white' }, { delay: 5000 });
If you want to process the job after a specific point in time, just add the time remaining to that point in time. For example, let's say you want to process the job on the third of July 2035 at 10:30:

```typescript
const targetTime = new Date("03-07-2035 10:30");
const targetTime = new Date('03-07-2035 10:30');
const delay = Number(targetTime) - Number(new Date());

await myQueue.add('house', { color: 'white' }, { delay });
```

## Change delay

If you want to change the delay after inserting a delayed job, just use **changeDelay** method. For example, let's say you want to change the delay from 2000 to 4000 milliseconds:

```typescript
const job = await Job.create(queue, 'test', { foo: 'bar' }, { delay: 2000 });

await job.changeDelay(4000);
```

{% hint style="warning" %}
Take in count that your job must be into delayed state when you change the delay.
{% endhint %}

## Read more:

- 💡 [Change Delay API Reference](https://api.docs.bullmq.io/classes/Job.html#changeDelay)
24 changes: 23 additions & 1 deletion docs/gitbook/guide/jobs/prioritized.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Jobs can also include a priority option. Using priorities, job's processing orde
Adding prioritized jobs is a slower operation than the other types of jobs, with a complexity O(n) relative to the number of jobs waiting in the Queue.
{% endhint %}

Note that the priorities go from 1 to MAX\_INT, whereas a lower number is always a higher priority than higher numbers.
Note that the priorities go from 1 to MAX_INT, whereas a lower number is always a higher priority than higher numbers.

Jobs without a priority assigned will get the least priority.

Expand All @@ -24,3 +24,25 @@ await myQueue.add('wall', { color: 'blue' }, { priority: 7 });
```

If several jobs are added with the same priority value, then the jobs within that priority will be processed in FIFO (First in first out) fashion.

## Change priority

If you want to change the priority after inserting a job, just use the **changePriority** method. For example, let's say that you want to change the priority from 16 to 1:

```typescript
const job = await Job.create(queue, 'test2', { foo: 'bar' }, { priority: 16 });

await job.changePriority({
priority: 1,
});
```

or if you want to use lifo option:

```typescript
const job = await Job.create(queue, 'test2', { foo: 'bar' }, { priority: 16 });

await job.changePriority({
lifo: true,
});
```
12 changes: 12 additions & 0 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,18 @@ export class Job<
this.delay = delay;
}

/**
* Change job priority.
*
* @returns void
*/
async changePriority(opts: {
priority?: number;
lifo?: boolean;
}): Promise<void> {
await this.scripts.changePriority(this.id, opts.priority, opts.lifo);
}

/**
* Get this jobs children result values if any.
*
Expand Down
34 changes: 34 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,40 @@ export class Scripts {
return keys.concat([delay, JSON.stringify(timestamp), jobId]);
}

async changePriority(
jobId: string,
priority = 0,
lifo = false,
): Promise<void> {
const client = await this.queue.client;

const args = this.changePriorityArgs(jobId, priority, lifo);
const result = await (<any>client).changePriority(args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'changePriority');
}
}

private changePriorityArgs(
jobId: string,
priority = 0,
lifo = false,
): (string | number)[] {
const keys: (string | number)[] = [
this.queue.keys.wait,
this.queue.keys.paused,
this.queue.keys.meta,
this.queue.keys.priority,
];

return keys.concat([
priority,
this.queue.toKey(jobId),
jobId,
lifo ? 1 : 0,
]);
}

// Note: We have an issue here with jobs using custom job ids
moveToDelayedArgs(
jobId: string,
Expand Down
50 changes: 50 additions & 0 deletions src/commands/changePriority-4.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
--[[
Change job priority
Input:
KEYS[1] 'wait',
KEYS[2] 'paused'
KEYS[3] 'meta'
KEYS[4] 'priority'

ARGV[1] priority value
ARGV[2] job key
ARGV[3] job id
ARGV[4] lifo

Output:
0 - OK
-1 - Missing job
]]
local jobKey = ARGV[2]
local jobId = ARGV[3]
local priority = tonumber(ARGV[1])
local rcall = redis.call

-- Includes
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"

if rcall("EXISTS", jobKey) == 1 then
local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])

local numRemovedElements = rcall("LREM", target, -1, jobId)
if numRemovedElements > 0 then
rcall("ZREM", KEYS[4], jobId)

-- Standard or priority add
if priority == 0 then
-- LIFO or FIFO
local pushCmd = ARGV[4] == '1' and 'RPUSH' or 'LPUSH';
rcall(pushCmd, target, jobId)
else
-- Priority add
addJobWithPriority(KEYS[4], priority, target, jobId)
end
end

rcall("HSET", jobKey, "priority", priority)

return 0
else
return -1
end
4 changes: 2 additions & 2 deletions src/commands/includes/getTargetQueueList.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

local function getTargetQueueList(queueMetaKey, waitKey, pausedKey)
if rcall("HEXISTS", queueMetaKey, "paused") ~= 1 then
return waitKey
return waitKey, false
else
return pausedKey
return pausedKey, true
end
end