Skip to content

Commit

Permalink
fix(promote): consider empty queue when paused (#1335)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Jul 26, 2022
1 parent 788faef commit 9f742e8
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 75 deletions.
1 change: 1 addition & 0 deletions src/classes/scripts.ts
Expand Up @@ -762,6 +762,7 @@ export class Scripts {
this.queue.keys.delayed,
this.queue.keys.wait,
this.queue.keys.paused,
this.queue.keys.meta,
this.queue.keys.priority,
this.queue.keys.events,
];
Expand Down
14 changes: 2 additions & 12 deletions src/commands/addJob-9.lua
Expand Up @@ -59,6 +59,7 @@ local parentData

-- Includes
--- @include "includes/destructureJobKey"
--- @include "includes/getTargetQueueList"
--- @include "includes/trimEvents"

if parentKey ~= nil then
Expand Down Expand Up @@ -145,18 +146,7 @@ elseif (delayedTimestamp ~= 0) then
delayedTimestamp)
rcall("XADD", KEYS[9], "*", "nextTimestamp", delayedTimestamp)
else
local target

-- We check for the meta.paused key to decide if we are paused or not
-- (since an empty list and !EXISTS are not really the same)
local paused
if rcall("HEXISTS", KEYS[3], "paused") ~= 1 then
target = KEYS[1]
paused = false
else
target = KEYS[2]
paused = true
end
local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])

-- Standard or priority add
if priority == 0 then
Expand Down
12 changes: 12 additions & 0 deletions src/commands/includes/getTargetQueueList.lua
@@ -0,0 +1,12 @@
--[[
Function to check for the meta.paused key to decide if we are paused or not
(since an empty list and !EXISTS are not really the same).
]]

local function getTargetQueueList(queueMetaKey, waitKey, pausedKey)
if rcall("HEXISTS", queueMetaKey, "paused") ~= 1 then
return waitKey
else
return pausedKey
end
end
32 changes: 12 additions & 20 deletions src/commands/moveStalledJobsToWait-8.lua
Expand Up @@ -7,10 +7,8 @@
KEYS[3] 'active', (LIST)
KEYS[4] 'failed', (ZSET)
KEYS[5] 'stalled-check', (KEY)
KEYS[6] 'meta', (KEY)
KEYS[7] 'paused', (LIST)
KEYS[8] 'event stream' (STREAM)
ARGV[1] Max stalled job count
Expand All @@ -25,6 +23,7 @@ local rcall = redis.call

-- Includes
--- @include "includes/batches"
--- @include "includes/getTargetQueueList"
--- @include "includes/removeJob"
--- @include "includes/removeJobsByMaxAge"
--- @include "includes/removeJobsByMaxCount"
Expand All @@ -43,15 +42,6 @@ local stalling = rcall('SMEMBERS', KEYS[1])
local stalled = {}
local failed = {}
if (#stalling > 0) then

local dst
-- wait or paused destination
if rcall("HEXISTS", KEYS[6], "paused") ~= 1 then
dst = KEYS[2]
else
dst = KEYS[7]
end

rcall('DEL', KEYS[1])

local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])
Expand Down Expand Up @@ -103,15 +93,17 @@ if (#stalling > 0) then

table.insert(failed, jobId)
else
-- Move the job back to the wait queue, to immediately be picked up by a waiting worker.
rcall("RPUSH", dst, jobId)
rcall("XADD", KEYS[8], "*", "event", "waiting", "jobId",
jobId, 'prev', 'active')

-- Emit the stalled event
rcall("XADD", KEYS[8], "*", "event", "stalled", "jobId",
jobId)
table.insert(stalled, jobId)
local target = getTargetQueueList(KEYS[6], KEYS[2], KEYS[7])

-- Move the job back to the wait queue, to immediately be picked up by a waiting worker.
rcall("RPUSH", target, jobId)
rcall("XADD", KEYS[8], "*", "event", "waiting", "jobId",
jobId, 'prev', 'active')

-- Emit the stalled event
rcall("XADD", KEYS[8], "*", "event", "stalled", "jobId",
jobId)
table.insert(stalled, jobId)
end
end
end
Expand Down
21 changes: 10 additions & 11 deletions src/commands/promote-5.lua → src/commands/promote-6.lua
Expand Up @@ -5,8 +5,9 @@
KEYS[1] 'delayed'
KEYS[2] 'wait'
KEYS[3] 'paused'
KEYS[4] 'priority'
KEYS[5] 'event stream'
KEYS[4] 'meta'
KEYS[5] 'priority'
KEYS[6] 'event stream'
ARGV[1] queue.toKey('')
ARGV[2] jobId
Expand All @@ -17,22 +18,20 @@
local rcall = redis.call;
local jobId = ARGV[2]

-- Includes
--- @include "includes/getTargetQueueList"

if rcall("ZREM", KEYS[1], jobId) == 1 then
local priority = tonumber(rcall("HGET", ARGV[1] .. jobId, "priority")) or 0

local target = KEYS[2];

if rcall("EXISTS", KEYS[3]) == 1 then
target = KEYS[3]
end
local target = getTargetQueueList(KEYS[4], KEYS[2], KEYS[3])

if priority == 0 then
-- LIFO or FIFO
rcall("LPUSH", target, jobId)
else
-- Priority add
rcall("ZADD", KEYS[4], priority, jobId)
local count = rcall("ZCOUNT", KEYS[4], 0, priority)
rcall("ZADD", KEYS[5], priority, jobId)
local count = rcall("ZCOUNT", KEYS[5], 0, priority)

local len = rcall("LLEN", target)
local id = rcall("LINDEX", target, len - (count - 1))
Expand All @@ -44,7 +43,7 @@ if rcall("ZREM", KEYS[1], jobId) == 1 then
end

-- Emit waiting event (wait..ing@token)
rcall("XADD", KEYS[5], "*", "event", "waiting", "jobId", jobId, "prev", "delayed");
rcall("XADD", KEYS[6], "*", "event", "waiting", "jobId", jobId, "prev", "delayed");

rcall("HSET", ARGV[1] .. jobId, "delay", 0)

Expand Down
10 changes: 4 additions & 6 deletions src/commands/retryJob-6.lua
Expand Up @@ -23,6 +23,9 @@
]]
local rcall = redis.call

-- Includes
--- @include "includes/getTargetQueueList"

if rcall("EXISTS", KEYS[4]) == 1 then

if ARGV[3] ~= "0" then
Expand All @@ -34,12 +37,7 @@ if rcall("EXISTS", KEYS[4]) == 1 then
end
end

local target
if rcall("HEXISTS", KEYS[5], "paused") ~= 1 then
target = KEYS[2]
else
target = KEYS[3]
end
local target = getTargetQueueList(KEYS[5], KEYS[2], KEYS[3])

rcall("LREM", KEYS[1], 0, ARGV[2])
rcall(ARGV[1], target, ARGV[2])
Expand Down
8 changes: 2 additions & 6 deletions src/commands/retryJobs-6.lua
Expand Up @@ -24,14 +24,10 @@ local rcall = redis.call;

-- Includes
--- @include "includes/batches"
--- @include "includes/getTargetQueueList"
--- @include "includes/getZSetItems"

local target
if rcall("HEXISTS", KEYS[6], "paused") ~= 1 then
target = KEYS[4]
else
target = KEYS[5]
end
local target = getTargetQueueList(KEYS[6], KEYS[4], KEYS[5])

local jobs = rcall('ZRANGEBYSCORE', KEYS[3], 0, timestamp, 'LIMIT', 0, maxCount)
if (#jobs > 0) then
Expand Down
10 changes: 4 additions & 6 deletions src/commands/updateDelaySet-7.lua
Expand Up @@ -20,6 +20,9 @@
]]
local rcall = redis.call

-- Includes
--- @include "includes/getTargetQueueList"

-- Try to get as much as 1000 jobs at once
local jobs = rcall("ZRANGEBYSCORE", KEYS[1], 0, tonumber(ARGV[2]) * 0x1000,
"LIMIT", 0, 1000)
Expand All @@ -28,12 +31,7 @@ if (#jobs > 0) then
rcall("ZREM", KEYS[1], unpack(jobs))

-- check if we need to use push in paused instead of waiting
local target
if rcall("HEXISTS", KEYS[5], "paused") ~= 1 then
target = KEYS[2]
else
target = KEYS[4]
end
local target = getTargetQueueList(KEYS[5], KEYS[2], KEYS[4])

for _, jobId in ipairs(jobs) do
local priority =
Expand Down
55 changes: 41 additions & 14 deletions tests/test_job.ts
Expand Up @@ -675,22 +675,49 @@ describe('Job', function () {
);
});

it('should promote delayed job to the right queue if queue is paused', async () => {
await queue.add('normal', { foo: 'bar' });
const delayedJob = await queue.add(
'delayed',
{ foo: 'bar' },
{ delay: 1 },
);
describe('when queue is paused', () => {
it('should promote delayed job to the right queue', async () => {
await queue.add('normal', { foo: 'bar' });
const delayedJob = await queue.add(
'delayed',
{ foo: 'bar' },
{ delay: 100 },
);

await queue.pause();
await delayedJob.promote();
await queue.resume();
await queue.pause();
await delayedJob.promote();

const waitingJobsCount = await queue.getWaitingCount();
expect(waitingJobsCount).to.be.equal(2);
const delayedJobsNewState = await delayedJob.getState();
expect(delayedJobsNewState).to.be.equal('waiting');
const pausedJobsCount = await queue.getJobCountByTypes('paused');
expect(pausedJobsCount).to.be.equal(2);
await queue.resume();

const waitingJobsCount = await queue.getWaitingCount();
expect(waitingJobsCount).to.be.equal(2);
const delayedJobsNewState = await delayedJob.getState();
expect(delayedJobsNewState).to.be.equal('waiting');
});

describe('when queue is empty', () => {
it('should promote delayed job to the right queue', async () => {
const delayedJob = await queue.add(
'delayed',
{ foo: 'bar' },
{ delay: 100 },
);

await queue.pause();
await delayedJob.promote();

const pausedJobsCount = await queue.getJobCountByTypes('paused');
expect(pausedJobsCount).to.be.equal(1);
await queue.resume();

const waitingJobsCount = await queue.getWaitingCount();
expect(waitingJobsCount).to.be.equal(1);
const delayedJobsNewState = await delayedJob.getState();
expect(delayedJobsNewState).to.be.equal('waiting');
});
});
});
});

Expand Down

0 comments on commit 9f742e8

Please sign in to comment.