Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(priority): change priority as a new state #1984

Merged
merged 33 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
193b92a
perf(priority): change priority as a new state
roggervalf Jun 14, 2023
d0b196e
refactor(priority): reuse addJobWithPriority include
roggervalf Jun 14, 2023
0f87cf8
chore: pass paused param in missing places
roggervalf Jun 14, 2023
d42e66c
refactor(change-priority): consider last job in wait list
roggervalf Jun 14, 2023
f386db6
refactor(pause): consider moving prioritized job to wait if needed
roggervalf Jun 14, 2023
6b77001
chore: fix test cases
roggervalf Jun 15, 2023
0f98015
chore: use correct variables in addJobWIthPriority
roggervalf Jun 15, 2023
18b3c1d
refactor(priority): set marker when wait len is 0
roggervalf Jun 15, 2023
499017f
chore: remove not needed check for last job
roggervalf Jun 15, 2023
933dc97
docs: add better description for addPriorityMarkerIfNeeded
roggervalf Jun 15, 2023
5c3db07
chore: update python pause script
roggervalf Jun 15, 2023
e36049c
chore: delete extra param
roggervalf Jun 15, 2023
be80d90
refactor: use timestamp for fifo order in priority zset
roggervalf Jun 17, 2023
b28fa1b
refactor(priority): consider fifo order with lexicographical order
roggervalf Jun 19, 2023
fb73c30
chore: restore python changelog
roggervalf Jun 19, 2023
220e0a4
chore(python): fix scripts references
roggervalf Jun 19, 2023
f535033
test: fix repeat test case
roggervalf Jun 19, 2023
0be0c0b
fix: flaky test
roggervalf Jun 19, 2023
bbf3afb
refactor: add moveJobFromPriorityToActive include
roggervalf Jun 19, 2023
a5de7da
test(priority): fix flaky test
roggervalf Jun 19, 2023
fab6b3a
chore: merge branch 'master' into better-priority
roggervalf Jun 20, 2023
527eef5
refactor(priority): use priority counter key
roggervalf Jun 20, 2023
9020413
chore: remove extra args
roggervalf Jun 20, 2023
acd0fef
refactor: update params
roggervalf Jun 20, 2023
3d3a8e3
refactor(priority): use counter and priority as score
roggervalf Jun 21, 2023
ab92413
feat(queue): add removePriorityKey method
roggervalf Jun 21, 2023
570ef51
refactor: reset priority counter when prioritized state is empty
roggervalf Jun 21, 2023
6f20687
refactor(drained): consider prioritized length
roggervalf Jun 21, 2023
8fc8fb7
refactor: change methods names updateData and removeDeprecatedPriorit…
roggervalf Jun 21, 2023
4f069c6
chore: fix typo
roggervalf Jun 21, 2023
070af22
refactor(push-back): re-add job at the head of same prioritized jobs
roggervalf Jun 21, 2023
6d13180
chore: address comments
roggervalf Jun 21, 2023
0a53a05
chore: merge branch 'master' into better-priority
roggervalf Jun 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/gitbook/guide/jobs/job-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ job.data # { color: 'red' }

## Update data

If you want to change the data after inserting a job, just use the **update** method. For example:
If you want to change the data after inserting a job, just use the **updateData** method. For example:

{% tabs %}
{% tab title="TypeScript" %}

```typescript
const job = await Job.create(queue, 'wall', { color: 'red' });

await job.update({
await job.updateData({
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more descriptive name

color: 'blue',
});

Expand All @@ -69,4 +69,4 @@ job.data # { color: 'blue' }

## Read more:

- 💡 [Update API Reference](https://api.docs.bullmq.io/classes/Job.html#update)
- 💡 [Update API Reference](https://api.docs.bullmq.io/classes/Job.html#updateData)
20 changes: 10 additions & 10 deletions docs/gitbook/patterns/process-step-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ const worker = new Worker(
switch (step) {
case Step.Initial: {
await doInitialStepStuff();
await job.update({
await job.updateData({
step: Step.Second,
});
step = Step.Second;
break;
}
case Step.Second: {
await doSecondStepStuff();
await job.update({
await job.updateData({
step: Step.Finish,
});
step = Step.Finish;
Expand Down Expand Up @@ -67,15 +67,15 @@ const worker = new Worker(
case Step.Initial: {
await doInitialStepStuff();
await job.moveToDelayed(Date.now() + 200, token);
await job.update({
await job.updateData({
step: Step.Second,
});
step = Step.Second;
break;
}
case Step.Second: {
await doSecondStepStuff();
await job.update({
await job.updateData({
step: Step.Finish,
});
throw new DelayedError();
Expand Down Expand Up @@ -124,7 +124,7 @@ const worker = new Worker(
},
},
);
await job.update({
await job.updateData({
step: Step.Second,
});
step = Step.Second;
Expand All @@ -142,7 +142,7 @@ const worker = new Worker(
},
},
);
await job.update({
await job.updateData({
step: Step.Third,
});
step = Step.Third;
Expand All @@ -151,7 +151,7 @@ const worker = new Worker(
case Step.Third: {
const shouldWait = await job.moveToWaitingChildren(token);
if (!shouldWait) {
await job.update({
await job.updateData({
step: Step.Finish,
});
step = Step.Finish;
Expand Down Expand Up @@ -223,15 +223,15 @@ const worker = new Worker(
},
});

await job.update({
await job.updateData({
step: Step.Second,
});
step = Step.Second;
break;
}
case Step.Second: {
await doSecondStepStuff();
await job.update({
await job.updateData({
step: Step.Third,
});
step = Step.Third;
Expand All @@ -240,7 +240,7 @@ const worker = new Worker(
case Step.Third: {
const shouldWait = await job.moveToWaitingChildren(token);
if (!shouldWait) {
await job.update({
await job.updateData({
step: Step.Finish,
});
step = Step.Finish;
Expand Down
6 changes: 6 additions & 0 deletions python/bullmq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ def trimEvents(self, maxLength: int):
"""
return self.client.xtrim(f"{self.prefix}:{self.name}:events", maxlen = maxLength, approximate = "~")

def removeDeprecatedPriorityKey(self):
"""
Delete old priority helper key.
"""
return self.client.delete(f"{self.prefix}:{self.name}:priority")

async def getJobCounts(self, *types):
"""
Returns the job counts for each type specified or every list/set in the queue by default.
Expand Down
43 changes: 22 additions & 21 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,21 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
self.redisConnection = redisConnection
self.redisClient = redisConnection.conn
self.commands = {
"addJob": self.redisClient.register_script(self.getScript("addJob-8.lua")),
"changePriority": self.redisClient.register_script(self.getScript("changePriority-4.lua")),
"addJob": self.redisClient.register_script(self.getScript("addJob-9.lua")),
"changePriority": self.redisClient.register_script(self.getScript("changePriority-5.lua")),
"extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")),
"getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")),
"getState": self.redisClient.register_script(self.getScript("getState-7.lua")),
"getStateV2": self.redisClient.register_script(self.getScript("getStateV2-7.lua")),
"getState": self.redisClient.register_script(self.getScript("getState-8.lua")),
"getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-9.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-10.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-12.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-13.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-4.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-5.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-6.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-8.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-9.lua")),
"retryJobs": self.redisClient.register_script(self.getScript("retryJobs-6.lua")),
"saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")),
"updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")),
Expand All @@ -53,7 +53,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection

# loop all the names and add them to the keys object
names = ["", "active", "wait", "paused", "completed", "failed", "delayed",
"stalled", "limiter", "priority", "id", "stalled-check", "meta", "events", "waiting-children"]
"stalled", "limiter", "prioritized", "id", "stalled-check", "meta", "pc", "events", "waiting-children"]
for name in names:
self.keys[name] = self.toKey(name)

Expand Down Expand Up @@ -96,7 +96,7 @@ def addJob(self, job: Job):
packedOpts = msgpack.packb(job.opts)

keys = self.getKeys(['wait', 'paused', 'meta', 'id',
'delayed', 'priority', 'completed', 'events'])
'delayed', 'prioritized', 'completed', 'events', 'pc'])

return self.commands["addJob"](keys=keys, args=[packedArgs, jsonData, packedOpts])

Expand All @@ -112,7 +112,8 @@ def retryJobArgs(self, job_id: str, lifo: bool, token: str):
keys.append(self.keys['meta'])
keys.append(self.keys['events'])
keys.append(self.keys['delayed'])
keys.append(self.keys['priority'])
keys.append(self.keys['prioritized'])
keys.append(self.keys['pc'])

push_cmd = "R" if lifo else "L"

Expand All @@ -127,7 +128,7 @@ def moveToDelayedArgs(self, job_id: str, timestamp: int, token: str):
if timestamp > 0:
max_timestamp = max_timestamp * 0x1000 + (convert_to_int(job_id) & 0xfff)

keys = self.getKeys(['wait', 'active', 'priority', 'delayed'])
keys = self.getKeys(['wait', 'active', 'prioritized', 'delayed'])
keys.append(self.toKey(job_id))
keys.append(self.keys['events'])
keys.append(self.keys['paused'])
Expand Down Expand Up @@ -163,9 +164,9 @@ def getCounts(self, types):

async def getState(self, job_id):
keys = self.getKeys(['completed', 'failed', 'delayed', 'active', 'wait',
'paused', 'waiting-children'])
'paused', 'waiting-children', 'prioritized'])

args = [job_id]
args = [job_id, self.toKey(job_id)]

redis_version = await self.redisConnection.getRedisVersion()

Expand All @@ -180,7 +181,8 @@ async def changePriority(self, job_id: str, priority:int = 0, lifo:bool = False)
keys = [self.keys['wait'],
self.keys['paused'],
self.keys['meta'],
self.keys['priority']]
self.keys['prioritized'],
self.keys['pc']]

args = [priority, self.toKey(job_id), job_id, 1 if lifo else 0]

Expand Down Expand Up @@ -231,7 +233,7 @@ def pause(self, pause: bool = True):
"""
src = "wait" if pause else "paused"
dst = "paused" if pause else "wait"
keys = self.getKeys([src, dst, 'meta', 'events'])
keys = self.getKeys([src, dst, 'meta', 'prioritized', 'events'])
return self.commands["pause"](keys, args=["paused" if pause else "resumed"])

async def obliterate(self, count: int, force: bool = False):
Expand Down Expand Up @@ -265,8 +267,8 @@ async def moveToActive(self, token: str, opts: dict, jobId: str = "") -> list[An
lockDuration = opts.get("lockDuration", 0)
limiter = opts.get("limiter", None)

keys = self.getKeys(['wait', 'active', 'priority', 'events',
'stalled', 'limiter', 'delayed', 'paused', 'meta'])
keys = self.getKeys(['wait', 'active', 'prioritized', 'events',
'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc'])
packedOpts = msgpack.packb(
{"token": token, "lockDuration": lockDuration, "limiter": limiter}, use_bin_type=True)
args = [self.keys[''], timestamp, jobId or "", packedOpts]
Expand Down Expand Up @@ -296,10 +298,9 @@ def moveToFinishedArgs(self, job: Job, val: Any, propVal: str, shouldRemove, tar
timestamp = round(time.time() * 1000)
metricsKey = self.toKey('metrics:' + target)

keys = self.getKeys(['wait', 'active', 'priority', 'events',
'stalled', 'limiter', 'delayed', 'paused', target])
keys = self.getKeys(['wait', 'active', 'prioritized', 'events',
'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc', target])
keys.append(self.toKey(job.id))
keys.append(self.keys['meta'])
keys.append(metricsKey)

def getKeepJobs(shouldRemove: bool | dict | int | None):
Expand Down
2 changes: 1 addition & 1 deletion src/classes/child-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ function wrapJob(
/*
* Emulate the real job `update` function.
*/
update: async (data: any) => {
updateData: async (data: any) => {
send({
cmd: ParentCommand.Update,
value: data,
Expand Down
17 changes: 14 additions & 3 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ export class Job<
*
* @param data - the data that will replace the current jobs data.
*/
update(data: DataType): Promise<void> {
updateData(data: DataType): Promise<void> {
this.data = data;

return this.scripts.updateData<DataType, ReturnType, NameType>(this, data);
Expand Down Expand Up @@ -551,7 +551,7 @@ export class Job<
);

const result = await this.scripts.moveToFinished(this.id, args);
this.finishedOn = args[13] as number;
this.finishedOn = args[14] as number;

return result;
}
Expand Down Expand Up @@ -632,7 +632,7 @@ export class Job<
fetchNext,
);
(<any>multi).moveToFinished(args);
finishedOn = args[13];
finishedOn = args[14];
command = 'failed';
}

Expand Down Expand Up @@ -1085,6 +1085,17 @@ export class Job<
);
}

if (this.opts.priority) {
if (Math.trunc(this.opts.priority) !== this.opts.priority) {
throw new Error(`Priority should not be float`);
}

const priorityLimit = 2 ** 21;
if (this.opts.priority > 2 ** 21) {
throw new Error(`Priority should be between 0 and ${priorityLimit}`);
}
}

return this.scripts.addJob(
client,
jobData,
Expand Down
1 change: 1 addition & 0 deletions src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export class QueueGetters<
'delayed',
'failed',
'paused',
'prioritized',
'waiting',
'waiting-children',
];
Expand Down
3 changes: 2 additions & 1 deletion src/classes/queue-keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export class QueueKeys {
'resumed',
'id',
'delayed',
'priority',
'prioritized',
'stalled-check',
'completed',
'failed',
Expand All @@ -26,6 +26,7 @@ export class QueueKeys {
'meta',
'events',
'delay',
'pc',
].forEach(key => {
keys[key] = this.toKey(name, key);
});
Expand Down
9 changes: 9 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ export class Queue<
| 'wait'
| 'active'
| 'paused'
| 'prioritized'
| 'delayed'
| 'failed' = 'completed',
): Promise<string[]> {
Expand Down Expand Up @@ -454,4 +455,12 @@ export class Queue<
const client = await this.client;
return client.xtrim(this.keys.events, 'MAXLEN', '~', maxLength);
}

/**
* Delete old priority helper key.
*/
async removeDeprecatedPriorityKey(): Promise<number> {
const client = await this.client;
return client.del(this.toKey('priority'));
}
}
2 changes: 1 addition & 1 deletion src/classes/sandbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const sandbox = <T, R, N extends string>(
await job.log(msg.value);
break;
case ParentCommand.Update:
await job.update(msg.value);
await job.updateData(msg.value);
break;
}
};
Expand Down