Skip to content

Commit

Permalink
fix(flow): emit delayed event when parent is moved to delayed (#2055)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Jul 10, 2023
1 parent da7dd07 commit f419ff1
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
16 changes: 9 additions & 7 deletions src/commands/includes/updateParentDepsIfNeeded.lua
Expand Up @@ -26,15 +26,17 @@ local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDepende
local parentDelayedKey = parentQueueKey .. ":delayed"
rcall("ZADD", parentDelayedKey, score, parentId)

rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId,
"delay", delayedTimestamp)
addDelayMarkerIfNeeded(parentTarget, parentDelayedKey)
-- Standard or priority add
elseif priority == 0 then
rcall("RPUSH", parentTarget, parentId)
else
addJobWithPriority(parentWaitKey, parentQueueKey .. ":prioritized", priority, paused,
parentId, parentQueueKey .. ":pc")
if priority == 0 then
rcall("RPUSH", parentTarget, parentId)
else
addJobWithPriority(parentWaitKey, parentQueueKey .. ":prioritized", priority, paused,
parentId, parentQueueKey .. ":pc")
end
rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
end

rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
end
end
12 changes: 12 additions & 0 deletions tests/test_flow.ts
Expand Up @@ -2092,6 +2092,17 @@ describe('flows', () => {
connection,
},
);
const queueEvents = new QueueEvents(topQueueName, { connection });
await queueEvents.waitUntilReady();

const delayed = new Promise<void>(resolve => {
queueEvents.on('delayed', async ({ jobId, delay }) => {
const milliseconds = delay - Date.now();
expect(milliseconds).to.be.lessThanOrEqual(3000);
expect(milliseconds).to.be.greaterThan(2000);
resolve();
});
});

const completed = new Promise<void>((resolve, reject) => {
childrenWorker.on('completed', async function () {
Expand Down Expand Up @@ -2153,6 +2164,7 @@ describe('flows', () => {
expect(children[0].job.data.foo).to.be.eql('bar');

await completed;
await delayed;
await childrenWorker.close();

const isDelayed = await job.isDelayed();
Expand Down

0 comments on commit f419ff1

Please sign in to comment.