diff --git a/src/commands/includes/updateParentDepsIfNeeded.lua b/src/commands/includes/updateParentDepsIfNeeded.lua index 07d7d18796..590776d4ae 100644 --- a/src/commands/includes/updateParentDepsIfNeeded.lua +++ b/src/commands/includes/updateParentDepsIfNeeded.lua @@ -26,15 +26,17 @@ local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDepende local parentDelayedKey = parentQueueKey .. ":delayed" rcall("ZADD", parentDelayedKey, score, parentId) + rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId, + "delay", delayedTimestamp) addDelayMarkerIfNeeded(parentTarget, parentDelayedKey) - -- Standard or priority add - elseif priority == 0 then - rcall("RPUSH", parentTarget, parentId) else - addJobWithPriority(parentWaitKey, parentQueueKey .. ":prioritized", priority, paused, - parentId, parentQueueKey .. ":pc") + if priority == 0 then + rcall("RPUSH", parentTarget, parentId) + else + addJobWithPriority(parentWaitKey, parentQueueKey .. ":prioritized", priority, paused, + parentId, parentQueueKey .. ":pc") + end + rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children") end - - rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children") end end diff --git a/tests/test_flow.ts b/tests/test_flow.ts index 0770d290a5..56b99b1741 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -2092,6 +2092,17 @@ describe('flows', () => { connection, }, ); + const queueEvents = new QueueEvents(topQueueName, { connection }); + await queueEvents.waitUntilReady(); + + const delayed = new Promise(resolve => { + queueEvents.on('delayed', async ({ jobId, delay }) => { + const milliseconds = delay - Date.now(); + expect(milliseconds).to.be.lessThanOrEqual(3000); + expect(milliseconds).to.be.greaterThan(2000); + resolve(); + }); + }); const completed = new Promise((resolve, reject) => { childrenWorker.on('completed', async function () { @@ -2153,6 +2164,7 @@ describe('flows', () => { expect(children[0].job.data.foo).to.be.eql('bar'); await completed; + await delayed; await childrenWorker.close(); const isDelayed = await job.isDelayed();