Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(queue): different score purpose per state #2133

Merged
merged 8 commits into from Sep 20, 2023
12 changes: 9 additions & 3 deletions src/commands/cleanJobsInSet-2.lua
Expand Up @@ -34,13 +34,19 @@ local result
if ARGV[4] == "active" then
result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false)
elseif ARGV[4] == "delayed" then
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"processedOn", "timestamp"})
rangeEnd = "+inf"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zsets uses +inf instead of -1 on lists

result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit,
{"processedOn", "timestamp"}, false)
elseif ARGV[4] == "prioritized" then
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"timestamp"})
rangeEnd = "+inf"
result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit,
{"timestamp"}, false)
elseif ARGV[4] == "wait" or ARGV[4] == "paused" then
result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true)
else
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"finishedOn"} )
rangeEnd = ARGV[2]
result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit,
{"finishedOn"}, true)
end

rcall("XADD", KEYS[2], "*", "event", "cleaned", "count", result[2])
Expand Down
2 changes: 1 addition & 1 deletion src/commands/includes/cleanList.lua
Expand Up @@ -30,7 +30,7 @@ local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd,
-- Fetch all three of these (in that order) and use the first one that is set so that we'll leave jobs
-- that have been active within the grace period:
jobTS = getTimestamp(jobKey, {"finishedOn", "processedOn", "timestamp"})
if (not jobTS or jobTS < timestamp) then
if (not jobTS or jobTS <= timestamp) then
-- replace the entry with a deletion marker; the actual deletion will
-- occur at the end of the script
rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker)
Expand Down
16 changes: 11 additions & 5 deletions src/commands/includes/cleanSet.lua
Expand Up @@ -9,8 +9,8 @@
--- @include "getTimestamp"
--- @include "removeJob"

local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, limit, attributes)
local jobs = getJobsInZset(setKey, rangeStart, rangeEnd, timestamp, limit)
local function cleanSet(setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attributes, isFinished)
local jobs = getJobsInZset(setKey, rangeEnd, limit)
local deleted = {}
local deletedCount = 0
local jobTS
Expand All @@ -20,12 +20,18 @@ local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, l
end

local jobKey = jobKeyPrefix .. job
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
jobTS = getTimestamp(jobKey, attributes)
if (not jobTS or jobTS < timestamp) then
if isFinished then
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when clean is made for completed or failed states, we should be ok deleting job keys without timestamp check, as when we get these records we uses timestamp as one of the limits, this is why I'm using <= to be consistent on the other states

removeJob(job, true, jobKeyPrefix)
deletedCount = deletedCount + 1
table.insert(deleted, job)
else
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
jobTS = getTimestamp(jobKey, attributes)
if (not jobTS or jobTS <= timestamp) then
removeJob(job, true, jobKeyPrefix)
deletedCount = deletedCount + 1
table.insert(deleted, job)
end
end
end

Expand Down
7 changes: 3 additions & 4 deletions src/commands/includes/getJobsInZset.lua
Expand Up @@ -2,11 +2,10 @@
-- of items in a sorted set only run a single iteration. If we simply used
-- ZRANGE, we may take a long time traversing through jobs that are within the
-- grace period.
local function getJobsInZset(zsetKey, rangeStart, rangeEnd, maxTimestamp, limit)
local function getJobsInZset(zsetKey, rangeEnd, limit)
if limit > 0 then
return rcall("ZRANGEBYSCORE", zsetKey, 0, maxTimestamp, "LIMIT", 0, limit)
return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd, "LIMIT", 0, limit)
else
return rcall("ZRANGE", zsetKey, rangeStart, rangeEnd)
return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd)
end
end

20 changes: 20 additions & 0 deletions tests/test_clean.ts
Expand Up @@ -175,6 +175,26 @@ describe('Cleaner', () => {
expect(count).to.be.eql(0);
});

it('should clean all delayed jobs when limit is given', async () => {
await queue.add('test', { some: 'data' }, { delay: 5000 });
await queue.add('test', { some: 'data' }, { delay: 5000 });
await delay(100);
const jobs = await queue.clean(0, 1000, 'delayed');
expect(jobs.length).to.be.eql(2);
const count = await queue.count();
expect(count).to.be.eql(0);
});

it('should clean all prioritized jobs when limit is given', async () => {
await queue.add('test', { some: 'data' }, { priority: 5000 });
await queue.add('test', { some: 'data' }, { priority: 5001 });
await delay(100);
const jobs = await queue.clean(0, 1000, 'prioritized');
expect(jobs.length).to.be.eql(2);
const count = await queue.count();
expect(count).to.be.eql(0);
});

describe('when prioritized state is provided', async () => {
it('should clean the number of jobs requested', async () => {
await queue.add('test', { some: 'data' }, { priority: 1 }); // as queue is empty, this job will be added to wait
Expand Down