Skip to content

Commit

Permalink
feat(queue): add getPrioritized and getPrioritizedCount methods (#2005)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Jun 23, 2023
1 parent 2e7c90d commit 7363abe
Show file tree
Hide file tree
Showing 13 changed files with 87 additions and 25 deletions.
10 changes: 7 additions & 3 deletions docs/gitbook/guide/architecture.md
Expand Up @@ -10,12 +10,16 @@ In order to use the full potential of Bull queues, it is important to understand

<figure><img src="../.gitbook/assets/mermaid-diagram-2023-06-22-093303.png" alt=""><figcaption><p>Lifecycle of a job - Queue</p></figcaption></figure>

When a job is added to a queue it can be in one of two states, it can either be in the “wait” status, which is, in fact, a waiting list, where all jobs must enter before they can be processed, or it can be in a delayed” status: a delayed status implies that the job is waiting for some timeout or to be promoted for being processed, however, a delayed job will not be processed directly, instead it will be placed at the beginning of the waiting list and processed as soon as a worker is idle.
When a job is added to a queue it can be in one of three states, it can either be in the **“wait”** status, which is, in fact, a waiting list, where all jobs must enter before they can be processed, or it can be in a **“prioritized“** status: a prioritized status implies that a job with higher priority will be processed first, or it can be in a **delayed”** status: a delayed status implies that the job is waiting for some timeout or to be promoted for being processed, however, a delayed job will not be processed directly, instead it will be placed at the beginning of the waiting list or at prioritized set and processed as soon as a worker is idle.

The next state for a job is the “active” state. The active state is represented by a set, and are jobs that are currently being processed, i.e. they are running in the `process` function explained in the previous chapter. A job can be in the active state for an unlimited amount of time until the process is completed or an exception is thrown so that the job will end in either the “completed” or the “failed” status.
{% hint style="warning" %}
Note that priorities go from 0 to 2^21, where 0 is the highest priority, this follows a similar standard as processed in Unix (https://en.wikipedia.org/wiki/Nice_(Unix), where a higher number means less priority).
{% endhint %}

The next state for a job is the **“active”** state. The active state is represented by a set, and are jobs that are currently being processed, i.e. they are running in the `process` function explained in the previous chapter. A job can be in the active state for an unlimited amount of time until the process is completed or an exception is thrown so that the job will end in either the **“completed”** or the **“failed”** status.

Another way to add a job is by the [`add`](https://api.docs.bullmq.io/classes/FlowProducer.html#add) method on a flow producer instance.

<figure><img src="../.gitbook/assets/mermaid-diagram-2023-06-22-095138.png" alt=""><figcaption><p>Lifecycle of a job - Flow Producer</p></figcaption></figure>

When a job is added by a flow producer, it can be in one of two states, it can either be in the “wait” status, when there aren't children, or it can be in a “waiting-children” status: a waiting-children status implies that the job is waiting for all its children to be completed, however, a waiting-children job will not be processed directly, instead it will be placed at the waiting list or at delayed set (if delay is provided) as soon as the last child is marked as completed.
When a job is added by a flow producer, it can be in one of three states, it can either be in the **“wait”** or **“prioritized“** or **“delayed“** status, when there aren't children, or it can be in a **“waiting-children”** status: a waiting-children status implies that the job is waiting for all its children to be completed, however, a waiting-children job will not be processed directly, instead it will be placed at the waiting list or at delayed set (if delay is provided) or at prioritized set (if delay is 0 and priority is greater than 0) as soon as the last child is marked as completed.
2 changes: 1 addition & 1 deletion docs/gitbook/guide/jobs/prioritized.md
Expand Up @@ -3,7 +3,7 @@
Jobs can also include a priority option. Using priorities, job's processing order will be affected by the specified priority instead of following a FIFO or LIFO pattern.

{% hint style="warning" %}
Adding prioritized jobs is a slower operation than the other types of jobs, with a complexity O(n) relative to the number of jobs waiting in the Queue.
Adding prioritized jobs is a slower operation than the other types of jobs, with a complexity O(log(n)) relative to the number of jobs in prioritized set in the Queue.
{% endhint %}

Note that the priorities go from 1 to MAX_INT, whereas a lower number is always a higher priority than higher numbers.
Expand Down
24 changes: 23 additions & 1 deletion src/classes/queue-getters.ts
Expand Up @@ -45,6 +45,7 @@ export class QueueGetters<
case 'completed':
case 'failed':
case 'delayed':
case 'prioritized':
case 'repeat':
case 'waiting-children':
return callback(key, count ? 'zcard' : 'zrange');
Expand Down Expand Up @@ -89,13 +90,15 @@ export class QueueGetters<
}

/**
Returns the number of jobs waiting to be processed. This includes jobs that are "waiting" or "delayed".
Returns the number of jobs waiting to be processed. This includes jobs that are
"waiting" or "delayed" or "prioritized" or "waiting-children".
*/
async count(): Promise<number> {
const count = await this.getJobCountByTypes(
'waiting',
'paused',
'delayed',
'prioritized',
'waiting-children',
);

Expand Down Expand Up @@ -173,6 +176,13 @@ export class QueueGetters<
return this.getJobCountByTypes('active');
}

/**
* Returns the number of jobs in prioritized status.
*/
getPrioritizedCount(): Promise<number> {
return this.getJobCountByTypes('prioritized');
}

/**
* Returns the number of jobs in waiting or paused statuses.
*/
Expand Down Expand Up @@ -235,6 +245,18 @@ export class QueueGetters<
return this.getJobs(['delayed'], start, end, true);
}

/**
* Returns the jobs that are in the "prioritized" status.
* @param start - zero based index from where to start returning jobs.
* @param end - zero based index where to stop returning jobs.
*/
getPrioritized(
start = 0,
end = -1,
): Promise<Job<DataType, ResultType, NameType>[]> {
return this.getJobs(['prioritized'], start, end, true);
}

/**
* Returns the jobs that are in the "completed" status.
* @param start - zero based index from where to start returning jobs.
Expand Down
8 changes: 2 additions & 6 deletions src/classes/scripts.ts
Expand Up @@ -493,13 +493,9 @@ export class Scripts {
});

if (isRedisVersionLowerThan(this.queue.redisVersion, '6.0.6')) {
return (<any>client).getState(
keys.concat([jobId, this.queue.toKey(jobId)]),
);
return (<any>client).getState(keys.concat([jobId]));
}
return (<any>client).getStateV2(
keys.concat([jobId, this.queue.toKey(jobId)]),
);
return (<any>client).getStateV2(keys.concat([jobId]));
}

async changeDelay(jobId: string, delay: number): Promise<void> {
Expand Down
1 change: 0 additions & 1 deletion src/commands/getState-8.lua
Expand Up @@ -12,7 +12,6 @@
KEYS[8] 'prioritized' key
ARGV[1] job id
ARGV[2] job key
Output:
'completed'
'failed'
Expand Down
1 change: 0 additions & 1 deletion src/commands/getStateV2-8.lua
Expand Up @@ -12,7 +12,6 @@
KEYS[8] 'prioritized' key
ARGV[1] job id
ARGV[2] job key
Output:
'completed'
'failed'
Expand Down
3 changes: 1 addition & 2 deletions src/commands/includes/prepareJobForProcessing.lua
Expand Up @@ -22,11 +22,10 @@
]]

-- Includes
--- @include "addJobWithPriority"
--- @include "pushBackJobWithPriority"

local function prepareJobForProcessing(keys, keyPrefix, targetKey, jobId, processedOn,
maxJobs, expireTime, paused, opts)
maxJobs, expireTime, opts)
local jobKey = keyPrefix .. jobId

-- Check if we need to perform rate limiting.
Expand Down
2 changes: 1 addition & 1 deletion src/commands/moveJobFromActiveToWait-9.lua
Expand Up @@ -31,7 +31,7 @@ local pttl = rcall("PTTL", KEYS[7])
if lockToken == token and pttl > 0 then
local removed = rcall("LREM", KEYS[1], 1, jobId)
if (removed > 0) then
local target, paused = getTargetQueueList(KEYS[6], KEYS[2], KEYS[5])
local target = getTargetQueueList(KEYS[6], KEYS[2], KEYS[5])

rcall("SREM", KEYS[3], jobId)

Expand Down
7 changes: 3 additions & 4 deletions src/commands/moveToActive-10.lua
Expand Up @@ -92,18 +92,17 @@ if jobId then
end

if jobId then
-- this script is not really moving, it is preparing the job for processing
return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, paused, opts)
return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, opts)
else
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10])
if jobId then
return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, paused, opts)
return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, opts)
end
end
else
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10])
if jobId then
return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, paused, opts)
return prepareJobForProcessing(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, opts)
end
end

Expand Down
5 changes: 2 additions & 3 deletions src/commands/moveToFinished-13.lua
Expand Up @@ -210,15 +210,14 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
if string.sub(jobId, 1, 2) == "0:" then
rcall("LREM", KEYS[2], 1, jobId)
else
-- this script is not really moving, it is preparing the job for processing
return prepareJobForProcessing(KEYS, ARGV[8], target, jobId, timestamp, maxJobs,
expireTime, paused, opts)
expireTime, opts)
end
else
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10])
if jobId then
return prepareJobForProcessing(KEYS, ARGV[8], target, jobId, timestamp, maxJobs,
expireTime, paused, opts)
expireTime, opts)
end
end

Expand Down
2 changes: 1 addition & 1 deletion tests/test_clean.ts
Expand Up @@ -620,7 +620,7 @@ describe('Cleaner', () => {
});

const count = await queue.count();
expect(count).to.be.eql(0);
expect(count).to.be.eql(1);

const priorityCount = await queue.getJobCounts('prioritized');
expect(priorityCount.prioritized).to.be.eql(1);
Expand Down
45 changes: 45 additions & 0 deletions tests/test_getters.ts
Expand Up @@ -331,6 +331,51 @@ describe('Jobs getters', function () {
await failed;
});

describe('.count', () => {
describe('when there are prioritized jobs', () => {
it('retries count considering prioritized jobs', async () => {
await queue.waitUntilReady();

for (const index of Array.from(Array(8).keys())) {
await queue.add('test', { idx: index }, { priority: index + 1 });
}
await queue.add('test', {});

const count = await queue.count();

expect(count).to.be.equal(9);
});
});
});

describe('.getPrioritized', () => {
it('retries prioritized job instances', async () => {
await queue.waitUntilReady();

for (const index of Array.from(Array(8).keys())) {
await queue.add('test', { idx: index }, { priority: index + 1 });
}

const prioritizedJobs = await queue.getPrioritized();

expect(prioritizedJobs.length).to.be.equal(8);
});
});

describe('.getPrioritizedCount', () => {
it('retries prioritized count', async () => {
await queue.waitUntilReady();

for (const index of Array.from(Array(8).keys())) {
await queue.add('test', { idx: index }, { priority: index + 1 });
}

const prioritizedCount = await queue.getPrioritizedCount();

expect(prioritizedCount).to.be.equal(8);
});
});

it('should get all failed jobs when no range is provided', async () => {
const worker = new Worker(
queueName,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_queue.ts
Expand Up @@ -118,7 +118,7 @@ describe('queues', function () {

await Promise.all(added);
const count = await queue.count();
expect(count).to.be.eql(0);
expect(count).to.be.eql(100);
const priorityCount = await queue.getJobCountByTypes('prioritized');
expect(priorityCount).to.be.eql(100);

Expand Down

0 comments on commit 7363abe

Please sign in to comment.