New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf(priority): change priority as a new state #1984
Conversation
decouple priority to keep one zset BREAKING CHANGE: priority is separeted in its own zset, no duplication needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good. Some small comments only.
local limiterDuration = opts['limiter'] and opts['limiter']['duration'] | ||
local integerDuration = math.floor(math.abs(limiterDuration)) | ||
rcall("PEXPIRE", rateLimiterKey, integerDuration) | ||
end | ||
|
||
-- check if we passed rate limit, we need to remove the job and return expireTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should rename "passed" to "exceeded" here.
|
||
if priority == 0 then | ||
-- LIFO or FIFO | ||
rcall("LPUSH", targetKey, jobId) | ||
else | ||
addJobWithPriority(priorityKey, priority, targetKey, jobId) | ||
rcall("SET", "DEBUG", "DELAYED") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't forget to remove the debug code 👍
src/commands/moveToActive-9.lua
Outdated
return moveJobFromWaitToActive(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, opts) | ||
return moveJobFromWaitToActive(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, paused, opts) | ||
else | ||
local prioritizedJob = rcall("ZPOPMIN", KEYS[3]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be refactored.
else | ||
rcall("RPUSH", targetKey, jobId) | ||
rcall("ZADD", priorityKey, priority, timestamp .. ":" .. jobId) | ||
rcall("HSET", jobKey, "pprefix", timestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about priority-prefix as the name of this new attribute, suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a shorter name, pp if we want to call it priority-prefix, maybe there is a better name 🤔
src/commands/changePriority-4.lua
Outdated
|
||
local isPrioritized = pprefix and (rcall("ZREM", KEYS[4], pprefix .. ":" .. jobId) > 0) | ||
if isPrioritized then | ||
-- Priority add |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for the comment as the function is descriptive enough
src/classes/queue-getters.ts
Outdated
@@ -287,7 +288,12 @@ export class QueueGetters< | |||
if (asc && multiCommands[index] === 'lrange') { | |||
results = results.concat(result.reverse()); | |||
} else { | |||
results = results.concat(result); | |||
if(types[index] === "priority"){ | |||
const jobIds = result.map(pattern => pattern.substring(14)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
14 here will not always be correct. It would be better to use a regexp that catches the jobId:
const regex = /^[\d]+:(.*)$/;
|
||
{% tabs %} | ||
{% tab title="TypeScript" %} | ||
|
||
```typescript | ||
const job = await Job.create(queue, 'wall', { color: 'red' }); | ||
|
||
await job.update({ | ||
await job.updateData({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more descriptive name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Starts to look ready, just a few comments.
src/classes/job.ts
Outdated
throw new Error(`Priority should not be float`); | ||
} | ||
|
||
if (this.opts.priority > 2097152) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can assign this constant to a const and also use the exponential operator: 2**21 (so that it is more clear why that number)
src/classes/job.ts
Outdated
} | ||
|
||
if (this.opts.priority > 2097152) { | ||
throw new Error(`Priority should not be greater than 2097152`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error should say: Priority should be between 0 and 2097152.
src/classes/queue.ts
Outdated
/** | ||
* Delete old priority helper key. | ||
*/ | ||
async removePriorityKey(): Promise<number> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> removeDeprecatedPriorityKey
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment was addressed few commits ago
src/commands/moveToActive-10.lua
Outdated
@@ -85,7 +93,17 @@ if jobId then | |||
|
|||
if jobId then | |||
-- this script is not really moving, it is preparing the job for processing | |||
return moveJobFromWaitToActive(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, opts) | |||
return moveJobFromWaitToActive(KEYS, ARGV[1], target, jobId, ARGV[2], maxJobs, expireTime, paused, opts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function name "moveJobFromWaitToActive" is starting to become annoying, specially now when we see it mixed with "moveJobFromPriorityToActive" which really is moving the job to active. I think we could rename it to a more correct name like "prepareJobForProcessing" or something like that. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good to me
# [4.0.0](v3.15.8...v4.0.0) (2023-06-21) ### Bug Fixes * **python:** pass right params to xtrim method ([#2004](#2004)) ([a55fd77](a55fd77)) ### Performance Improvements * **priority:** add prioritized as a new state ([#1984](#1984)) (python) ([42a890a](42a890a)) ### BREAKING CHANGES * **priority:** priority is separeted in its own zset, no duplication needed * feat(queue): add removeDeprecatedPriorityKey method * refactor: change job method name update to updateData ref [faster priority jobs](https://bullmq.io/news/062123/faster-priority-jobs/)
decouple priority to keep one zset
BREAKING CHANGE: priority is separeted in its own zset, no duplication needed