Skip to content

Commit

Permalink
fix(scheduler): handle unacked messages with retry delay correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Dec 2, 2023
1 parent 1cff3e3 commit a97b145
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 58 deletions.
99 changes: 63 additions & 36 deletions src/common/redis-client/lua/publish-scheduled-message.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ local EMessagePropertyMessage = ARGV[9]

local keyMessage = ''
local keyQueuePending = ''
local keyQueueMessages = ''
local keyQueueProperties = ''
local keyPriorityQueuePending = ''
local keyQueueScheduled = ''
local keyScheduledMessage = ''

local messageId = ''
local messagePriority = ''
local message = ''
local messageState = ''
local keyQueueMessages = ''
local messagePriority = ''
local scheduledMessageId = ''
local scheduledMessageNextScheduleTimestamp = ''
local scheduledMessageState = ''
Expand All @@ -37,14 +37,7 @@ local argvIndexOffset = 9

---

local function publishMessage(queueType)
if queueType == EQueuePropertyQueueTypeLIFOQueue then
redis.call("RPUSH", keyQueuePending, messageId)
elseif queueType == EQueuePropertyQueueTypeFIFOQueue then
redis.call("LPUSH", keyQueuePending, messageId)
else
redis.call("ZADD", keyPriorityQueuePending, messagePriority, messageId)
end
local function addToQueueMessages()
redis.call("SADD", keyQueueMessages, messageId)
redis.call(
"HMSET", keyMessage,
Expand All @@ -55,62 +48,96 @@ local function publishMessage(queueType)
redis.call("HINCRBY", keyQueueProperties, EQueuePropertyMessagesCount, 1)
end

local function deleteScheduledMessage(queueExists)
redis.call("ZREM", keyScheduledMessages, scheduledMessageId)
redis.call("ZREM", keyQueueScheduled, scheduledMessageId)
local function publishMessage(queueType, msgId)
if queueType == EQueuePropertyQueueTypeLIFOQueue then
redis.call("RPUSH", keyQueuePending, msgId)
elseif queueType == EQueuePropertyQueueTypeFIFOQueue then
redis.call("LPUSH", keyQueuePending, msgId)
else
redis.call("ZADD", keyPriorityQueuePending, messagePriority, msgId)
end
end

local function deletedScheduledMessage(updateMessageCount)
redis.call("DEL", keyScheduledMessage)
if queueExists == true then
if updateMessageCount == true then
redis.call("HINCRBY", keyQueueProperties, EQueuePropertyMessagesCount, -1)
end
end

local function scheduleMessage()
if scheduledMessageNextScheduleTimestamp == "0" then
deleteScheduledMessage(true)
else
redis.call("ZADD", keyScheduledMessages, scheduledMessageNextScheduleTimestamp, scheduledMessageId)
redis.call("ZADD", keyQueueScheduled, scheduledMessageNextScheduleTimestamp, scheduledMessageId)
local function removeFromScheduled()
redis.call("ZREM", keyScheduledMessages, scheduledMessageId)
redis.call("ZREM", keyQueueScheduled, scheduledMessageId)
end

local function updateScheduledMessageProperties(status )
if status == '' then
redis.call("HSET", keyScheduledMessage, EMessagePropertyState, scheduledMessageState)
else
redis.call(
"HMSET", keyScheduledMessage,
EMessagePropertyState, scheduledMessageState,
EMessagePropertyStatus, status
)
end
end

local function scheduleMessage()
redis.call("ZADD", keyScheduledMessages, scheduledMessageNextScheduleTimestamp, scheduledMessageId)
redis.call("ZADD", keyQueueScheduled, scheduledMessageNextScheduleTimestamp, scheduledMessageId)
end

local function handleMessage()
local properties = redis.call("HMGET", keyQueueProperties, EQueuePropertyQueueType)
local queueType = properties[1]
if (queueType == false) or (queueType ~= EQueuePropertyQueueTypeLIFOQueue and queueType ~= EQueuePropertyQueueTypeFIFOQueue and queueType ~= EQueuePropertyQueueTypePriorityQueue) then
deleteScheduledMessage(false)
if (queueType == false) then
removeFromScheduled()
deletedScheduledMessage(false)
elseif (queueType ~= EQueuePropertyQueueTypeLIFOQueue and queueType ~= EQueuePropertyQueueTypeFIFOQueue and queueType ~= EQueuePropertyQueueTypePriorityQueue) then
removeFromScheduled()
deletedScheduledMessage(true)
elseif messageId == '' then
publishMessage(queueType, scheduledMessageId)
removeFromScheduled()
updateScheduledMessageProperties(EMessagePropertyStatusPending)
else
publishMessage(queueType)
scheduleMessage()
publishMessage(queueType, messageId)
addToQueueMessages();
if scheduledMessageNextScheduleTimestamp == "0" then
removeFromScheduled()
deletedScheduledMessage(true)
else
scheduleMessage()
updateScheduledMessageProperties('')
end
end
end

if #ARGV > argvIndexOffset then
for index in pairs(ARGV) do
if (index > argvIndexOffset) then
local idx = index % 8
if idx == 2 then
local idx = index % 7
if idx == 3 then
messageId = ARGV[index]
keyMessage = KEYS[keyIndexOffset + 1]
keyQueuePending = KEYS[keyIndexOffset + 2]
keyQueueProperties = KEYS[keyIndexOffset + 3]
keyPriorityQueuePending = KEYS[keyIndexOffset + 4]
keyQueueScheduled = KEYS[keyIndexOffset + 5]
keyScheduledMessage = KEYS[keyIndexOffset + 6]
keyIndexOffset = keyIndexOffset + 6
elseif idx == 3 then
messagePriority = ARGV[index]
keyQueueMessages = KEYS[keyIndexOffset + 4]
keyPriorityQueuePending = KEYS[keyIndexOffset + 5]
keyQueueScheduled = KEYS[keyIndexOffset + 6]
keyScheduledMessage = KEYS[keyIndexOffset + 7]
keyIndexOffset = keyIndexOffset + 7
elseif idx == 4 then
message = ARGV[index]
elseif idx == 5 then
messageState = ARGV[index]
elseif idx == 6 then
keyQueueMessages = ARGV[index]
elseif idx == 7 then
scheduledMessageId = ARGV[index]
messagePriority = ARGV[index]
elseif idx == 0 then
scheduledMessageNextScheduleTimestamp = ARGV[index]
scheduledMessageId = ARGV[index]
elseif idx == 1 then
scheduledMessageNextScheduleTimestamp = ARGV[index]
elseif idx == 2 then
scheduledMessageState = ARGV[index]
handleMessage()
end
Expand Down
58 changes: 36 additions & 22 deletions src/workers/publish-scheduled.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
import { ICallback } from 'redis-smq-common';
import { _getMessages } from '../lib/queue/queue-messages/_get-message';
import { _fromMessage } from '../lib/message/_from-message';
import { MessageState } from '../lib/message/message-state';

export class PublishScheduledWorker extends Worker {
protected redisClient: RedisClient;
Expand Down Expand Up @@ -68,44 +69,57 @@ export class PublishScheduledWorker extends Worker {
messages,
(msg, _, done) => {
const ts = Date.now();
const message = _fromMessage(msg, null, null);
const messageState = message
.resetScheduledParams()
.getSetMessageState()
.setPublishedAt(ts)
.setScheduledMessageId(msg.getRequiredId());
const messageId = messageState.getId();
const messagePriority = message.getPriority() ?? '';
const queue = message.getDestinationQueue();
const {
keyQueueProperties,
keyQueuePending,
keyPriorityQueuePending,
keyQueueScheduled,
keyQueueMessages,
} = redisKeys.getQueueKeys(queue);
const { keyMessage } = redisKeys.getMessageKeys(messageId);
const messagePriority = msg.getPriority() ?? '';
const queue = msg.getDestinationQueue();
const { keyMessage: keyScheduledMessage } = redisKeys.getMessageKeys(
msg.getRequiredId(),
);
const nextScheduleTimestamp = msg.getNextScheduledTimestamp();
const scheduledMessageState = msg
.getRequiredMessageState()
.setLastScheduledAt(ts);
const {
keyQueueProperties,
keyQueuePending,
keyPriorityQueuePending,
keyQueueScheduled,
keyQueueMessages,
} = redisKeys.getQueueKeys(queue);

let newMessage: Message | null = null;
let newMessageState: MessageState | null = null;
let newMessageId: string = '';
let newKeyMessage: string = '';

const hasBeenUnacknowledged =
msg.getRetryDelay() > 0 &&
msg.getRequiredMessageState().getAttempts() > 0;

if (!hasBeenUnacknowledged) {
newMessage = _fromMessage(msg, null, null);
newMessageState = newMessage
.resetScheduledParams()
.getSetMessageState()
.setPublishedAt(ts)
.setScheduledMessageId(msg.getRequiredId());
newMessageId = newMessageState.getId();
newKeyMessage = redisKeys.getMessageKeys(newMessageId).keyMessage;
}

keys.push(
keyMessage,
newKeyMessage,
keyQueuePending,
keyQueueProperties,
keyQueueMessages,
keyPriorityQueuePending,
keyQueueScheduled,
keyScheduledMessage,
);
argv.push(
messageId,
newMessageId,
newMessage ? JSON.stringify(newMessage) : '',
newMessageState ? JSON.stringify(newMessageState) : '',
messagePriority,
JSON.stringify(message),
JSON.stringify(messageState),
keyQueueMessages,
msg.getRequiredId(),
nextScheduleTimestamp,
JSON.stringify(scheduledMessageState),
Expand Down

0 comments on commit a97b145

Please sign in to comment.