Skip to content

Commit

Permalink
feat(queue): add promoteJobs to promote all delayed jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Jul 19, 2023
1 parent 75a594f commit 6074592
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 20 deletions.
22 changes: 20 additions & 2 deletions src/classes/queue.ts
Expand Up @@ -430,8 +430,11 @@ export class Queue<
/**
* Retry all the failed jobs.
*
* @param opts - contains number to limit how many jobs will be moved to wait status per iteration,
* state (failed, completed) failed by default or from which timestamp.
* @param opts: { count: number; state: FinishedStatus; timestamp: number}
* - count number to limit how many jobs will be moved to wait status per iteration,
* - state failed by default or completed.
* - timestamp from which timestamp to start moving jobs to wait status, default Date.now().
*
* @returns
*/
async retryJobs(
Expand All @@ -447,6 +450,21 @@ export class Queue<
} while (cursor);
}

/**
* Promote all the delayed jobs.
*
* @param opts: { count: number }
* - count number to limit how many jobs will be moved to wait status per iteration
*
* @returns
*/
async promoteJobs(opts: { count?: number } = {}): Promise<void> {
let cursor = 0;
do {
cursor = await this.scripts.promoteJobs(opts.count);
} while (cursor);
}

/**
* Trim the event stream to an approximately maxLength.
*
Expand Down
10 changes: 9 additions & 1 deletion src/classes/scripts.ts
Expand Up @@ -751,7 +751,7 @@ export class Scripts {
}

protected retryJobsArgs(
state: FinishedStatus,
state: FinishedStatus | 'delayed',
count: number,
timestamp: number,
): (string | number)[] {
Expand Down Expand Up @@ -781,6 +781,14 @@ export class Scripts {
return (<any>client).retryJobs(args);
}

async promoteJobs(count = 1000): Promise<number> {
const client = await this.queue.client;

const args = this.retryJobsArgs('delayed', count, Number.MAX_VALUE);

return (<any>client).retryJobs(args);
}

/**
* Attempts to reprocess a job
*
Expand Down
53 changes: 36 additions & 17 deletions src/commands/retryJobs-6.lua
@@ -1,10 +1,13 @@
--[[
Attempts to retry all failed jobs
Attempts to retry all jobs
Note: as this script now can be used also for completed and delayed jobs, the name "retry"
is not really accurate anymore.
Input:
KEYS[1] base key
KEYS[2] events stream
KEYS[3] state key (failed, completed)
KEYS[3] state key (failed, completed, delayed)
KEYS[4] 'wait'
KEYS[5] 'paused'
KEYS[6] 'meta'
Expand All @@ -26,28 +29,44 @@ local rcall = redis.call;
--- @include "includes/batches"
--- @include "includes/getTargetQueueList"

local target = getTargetQueueList(KEYS[6], KEYS[4], KEYS[5])
local metaKey = KEYS[6]
local target = getTargetQueueList(metaKey, KEYS[4], KEYS[5])

local jobs = rcall('ZRANGEBYSCORE', KEYS[3], 0, timestamp, 'LIMIT', 0, maxCount)
if (#jobs > 0) then
for i, key in ipairs(jobs) do
local jobKey = KEYS[1] .. key
rcall("HDEL", jobKey, "finishedOn", "processedOn", "failedReason", "returnvalue")

-- Emit waiting event
rcall("XADD", KEYS[2], "*", "event", "waiting", "jobId", key, "prev", ARGV[3]);
end

for from, to in batches(#jobs, 7000) do
rcall("ZREM", KEYS[3], unpack(jobs, from, to))
rcall("LPUSH", target, unpack(jobs, from, to))
end

if KEYS[3]:match("failed$") then
for i, key in ipairs(jobs) do
local jobKey = KEYS[1] .. key
rcall("HDEL", jobKey, "finishedOn", "processedOn", "failedReason")
end
end

if KEYS[3]:match("completed$") then
for i, key in ipairs(jobs) do
local jobKey = KEYS[1] .. key
rcall("HDEL", jobKey, "finishedOn", "processedOn", "returnvalue")
end
end

local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents") or 10000

for i, key in ipairs(jobs) do
-- Emit waiting event
rcall("XADD", KEYS[2], "MAXLEN", "~", maxEvents, "*", "event",
"waiting", "jobId", key, "prev", ARGV[3]);
end

for from, to in batches(#jobs, 7000) do
rcall("ZREM", KEYS[3], unpack(jobs, from, to))
rcall("LPUSH", target, unpack(jobs, from, to))
end
end

maxCount = maxCount - #jobs

if(maxCount <= 0) then
return 1
if (maxCount <= 0) then
return 1
end

return 0
39 changes: 39 additions & 0 deletions tests/test_queue.ts
Expand Up @@ -647,4 +647,43 @@ describe('queues', function () {
});
});
});

describe('.promoteJobs', () => {
it('promotes all delayed jobs by default', async () => {
await queue.waitUntilReady();
const jobCount = 8;

for (let i = 0; i < jobCount; i++) {
await queue.add('test', { idx: i }, { delay: 10000 });
}

const delayedCount = await queue.getJobCounts('delayed');
expect(delayedCount.delayed).to.be.equal(jobCount);

await queue.promoteJobs();

const waitingCount = await queue.getJobCounts('waiting');
expect(waitingCount.waiting).to.be.equal(jobCount);

const worker = new Worker(
queueName,
async () => {
await delay(10);
},
{ connection },
);
await worker.waitUntilReady();

const completing = new Promise<number>(resolve => {
worker.on('completed', after(jobCount, resolve));
});

await completing;

const promotedCount = await queue.getJobCounts('delayed');
expect(promotedCount.delayed).to.be.equal(0);

await worker.close();
});
});
});

0 comments on commit 6074592

Please sign in to comment.