Skip to content

Commit

Permalink
fix(rate-limit): keep priority fifo order (#1991) fixes #1929 (python)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Jun 16, 2023
1 parent dadeef3 commit 56bd7ad
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 12 deletions.
32 changes: 22 additions & 10 deletions src/commands/includes/moveJobFromWaitToActive.lua
Expand Up @@ -17,39 +17,51 @@
opts - limiter
]]

-- Includes
--- @include "pushBackJobWithPriority"

local function moveJobFromWaitToActive(keys, keyPrefix, targetKey, jobId, processedOn,
maxJobs, expireTime, opts)
rcall("ZREM", keys[3], jobId) -- remove from priority
local jobKey = keyPrefix .. jobId

-- Check if we need to perform rate limiting.
if maxJobs then
local rateLimiterKey = keys[6];
local jobCounter = tonumber(rcall("INCR", rateLimiterKey))

if jobCounter == 1 then
local limiterDuration = opts['limiter'] and opts['limiter']['duration']
local integerDuration = math.floor(math.abs(limiterDuration))
rcall("PEXPIRE", rateLimiterKey, integerDuration)
end

-- check if we passed rate limit, we need to remove the job and return expireTime
if expireTime > 0 then
-- remove from active queue and add back to the wait list
rcall("LREM", keys[2], 1, jobId)
rcall("RPUSH", targetKey, jobId)

local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0

if priority > 0 then
pushBackJobWithPriority(keys[3], priority, targetKey, jobId)
else
rcall("RPUSH", targetKey, jobId)
end

-- Return when we can process more jobs
return {0, 0, expireTime, 0}
end

local jobCounter = tonumber(rcall("INCR", rateLimiterKey))

if jobCounter == 1 then
local limiterDuration = opts['limiter'] and opts['limiter']['duration']
local integerDuration = math.floor(math.abs(limiterDuration))
rcall("PEXPIRE", rateLimiterKey, integerDuration)
end
end

local jobKey = keyPrefix .. jobId
local lockKey = jobKey .. ':lock'

-- get a lock
if opts['token'] ~= "0" then
rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration'])
end

rcall("ZREM", keys[3], jobId) -- remove from priority
rcall("XADD", keys[4], "*", "event", "active", "jobId", jobId, "prev", "waiting")
rcall("HSET", jobKey, "processedOn", processedOn)
rcall("HINCRBY", jobKey, "attemptsMade", 1)
Expand Down
17 changes: 17 additions & 0 deletions src/commands/includes/pushBackJobWithPriority.lua
@@ -0,0 +1,17 @@
--[[
Function to add push back job considering priority in front of same prioritized jobs.
]]
local function pushBackJobWithPriority(priorityKey, priority, targetKey, jobId)
rcall("ZADD", priorityKey, priority, jobId)
local count = rcall("ZCOUNT", priorityKey, 0, priority-1)

local len = rcall("LLEN", targetKey)
local id = rcall("LINDEX", targetKey, len - count)
rcall("ZADD", priorityKey, priority, jobId)

if id then
rcall("LINSERT", targetKey, "BEFORE", id, jobId)
else
rcall("RPUSH", targetKey, jobId)
end
end
4 changes: 2 additions & 2 deletions src/commands/moveJobFromActiveToWait-9.lua
Expand Up @@ -19,7 +19,7 @@
local rcall = redis.call

-- Includes
--- @include "includes/addJobWithPriority"
--- @include "includes/pushBackJobWithPriority"
--- @include "includes/getTargetQueueList"

local jobId = ARGV[1]
Expand All @@ -38,7 +38,7 @@ if lockToken == token and pttl > 0 then
local priority = tonumber(rcall("HGET", ARGV[3], "priority")) or 0

if priority > 0 then
addJobWithPriority(KEYS[8], priority, target, jobId)
pushBackJobWithPriority(KEYS[8], priority, target, jobId)
else
rcall("RPUSH", target, jobId)
end
Expand Down
72 changes: 72 additions & 0 deletions tests/test_rate_limiter.ts
Expand Up @@ -436,6 +436,78 @@ describe('Rate Limiter', function () {
await result;
await worker.close();
});

describe('when priority is the same for some jobs', () => {
it('should get jobs in fifo order', async function () {
this.timeout(6000);

const numJobs = 4;
const dynamicLimit = 250;
const duration = 100;

const worker = new Worker(
queueName,
async () => {
await worker.rateLimit(dynamicLimit);
throw Worker.RateLimitError();
},
{
connection,
limiter: {
max: 1,
duration,
},
},
);

for (let i = 1; i <= numJobs; i++) {
await queue.add(`${i}`, {}, { priority: 10 });
}

await delay(dynamicLimit);

const jobs = await queue.getJobs(['waiting'], 0, -1, true);
expect(jobs.map(x => x.name)).to.eql(['1', '2', '3', '4']);

await worker.close();
});
});

describe('when priority is different for some jobs', () => {
it('should get jobs in fifo order', async function () {
this.timeout(6000);

const numJobs = 4;
const dynamicLimit = 250;
const duration = 100;

const worker = new Worker(
queueName,
async () => {
await worker.rateLimit(dynamicLimit);
throw Worker.RateLimitError();
},
{
connection,
limiter: {
max: 1,
duration,
},
},
);

for (let i = 1; i <= numJobs; i++) {
await queue.add(`${i}`, {}, { priority: ((i - 1) % 2) + 1 });
}

await delay(dynamicLimit * 4);

const jobs = await queue.getJobs(['waiting'], 0, -1, true);
expect(jobs.map(x => x.name)).to.eql(['1', '3', '2', '4']);

await worker.close();
});
});
});

describe('when queue is paused', () => {
Expand Down

0 comments on commit 56bd7ad

Please sign in to comment.