From a97b145f605d6b5c7d9f6f83c844c9d2447c5786 Mon Sep 17 00:00:00 2001 From: Weyoss Date: Sat, 2 Dec 2023 11:25:59 +0100 Subject: [PATCH] fix(scheduler): handle unacked messages with retry delay correctly --- .../lua/publish-scheduled-message.lua | 99 ++++++++++++------- src/workers/publish-scheduled.worker.ts | 58 ++++++----- 2 files changed, 99 insertions(+), 58 deletions(-) diff --git a/src/common/redis-client/lua/publish-scheduled-message.lua b/src/common/redis-client/lua/publish-scheduled-message.lua index 5cc9fe79..955d7e51 100644 --- a/src/common/redis-client/lua/publish-scheduled-message.lua +++ b/src/common/redis-client/lua/publish-scheduled-message.lua @@ -16,16 +16,16 @@ local EMessagePropertyMessage = ARGV[9] local keyMessage = '' local keyQueuePending = '' +local keyQueueMessages = '' local keyQueueProperties = '' local keyPriorityQueuePending = '' local keyQueueScheduled = '' local keyScheduledMessage = '' local messageId = '' -local messagePriority = '' local message = '' local messageState = '' -local keyQueueMessages = '' +local messagePriority = '' local scheduledMessageId = '' local scheduledMessageNextScheduleTimestamp = '' local scheduledMessageState = '' @@ -37,14 +37,7 @@ local argvIndexOffset = 9 --- -local function publishMessage(queueType) - if queueType == EQueuePropertyQueueTypeLIFOQueue then - redis.call("RPUSH", keyQueuePending, messageId) - elseif queueType == EQueuePropertyQueueTypeFIFOQueue then - redis.call("LPUSH", keyQueuePending, messageId) - else - redis.call("ZADD", keyPriorityQueuePending, messagePriority, messageId) - end +local function addToQueueMessages() redis.call("SADD", keyQueueMessages, messageId) redis.call( "HMSET", keyMessage, @@ -55,62 +48,96 @@ local function publishMessage(queueType) redis.call("HINCRBY", keyQueueProperties, EQueuePropertyMessagesCount, 1) end -local function deleteScheduledMessage(queueExists) - redis.call("ZREM", keyScheduledMessages, scheduledMessageId) - redis.call("ZREM", keyQueueScheduled, scheduledMessageId) +local function publishMessage(queueType, msgId) + if queueType == EQueuePropertyQueueTypeLIFOQueue then + redis.call("RPUSH", keyQueuePending, msgId) + elseif queueType == EQueuePropertyQueueTypeFIFOQueue then + redis.call("LPUSH", keyQueuePending, msgId) + else + redis.call("ZADD", keyPriorityQueuePending, messagePriority, msgId) + end +end + +local function deletedScheduledMessage(updateMessageCount) redis.call("DEL", keyScheduledMessage) - if queueExists == true then + if updateMessageCount == true then redis.call("HINCRBY", keyQueueProperties, EQueuePropertyMessagesCount, -1) end end -local function scheduleMessage() - if scheduledMessageNextScheduleTimestamp == "0" then - deleteScheduledMessage(true) - else - redis.call("ZADD", keyScheduledMessages, scheduledMessageNextScheduleTimestamp, scheduledMessageId) - redis.call("ZADD", keyQueueScheduled, scheduledMessageNextScheduleTimestamp, scheduledMessageId) +local function removeFromScheduled() + redis.call("ZREM", keyScheduledMessages, scheduledMessageId) + redis.call("ZREM", keyQueueScheduled, scheduledMessageId) +end + +local function updateScheduledMessageProperties(status ) + if status == '' then redis.call("HSET", keyScheduledMessage, EMessagePropertyState, scheduledMessageState) + else + redis.call( + "HMSET", keyScheduledMessage, + EMessagePropertyState, scheduledMessageState, + EMessagePropertyStatus, status + ) end end +local function scheduleMessage() + redis.call("ZADD", keyScheduledMessages, scheduledMessageNextScheduleTimestamp, scheduledMessageId) + redis.call("ZADD", keyQueueScheduled, scheduledMessageNextScheduleTimestamp, scheduledMessageId) +end + local function handleMessage() local properties = redis.call("HMGET", keyQueueProperties, EQueuePropertyQueueType) local queueType = properties[1] - if (queueType == false) or (queueType ~= EQueuePropertyQueueTypeLIFOQueue and queueType ~= EQueuePropertyQueueTypeFIFOQueue and queueType ~= EQueuePropertyQueueTypePriorityQueue) then - deleteScheduledMessage(false) + if (queueType == false) then + removeFromScheduled() + deletedScheduledMessage(false) + elseif (queueType ~= EQueuePropertyQueueTypeLIFOQueue and queueType ~= EQueuePropertyQueueTypeFIFOQueue and queueType ~= EQueuePropertyQueueTypePriorityQueue) then + removeFromScheduled() + deletedScheduledMessage(true) + elseif messageId == '' then + publishMessage(queueType, scheduledMessageId) + removeFromScheduled() + updateScheduledMessageProperties(EMessagePropertyStatusPending) else - publishMessage(queueType) - scheduleMessage() + publishMessage(queueType, messageId) + addToQueueMessages(); + if scheduledMessageNextScheduleTimestamp == "0" then + removeFromScheduled() + deletedScheduledMessage(true) + else + scheduleMessage() + updateScheduledMessageProperties('') + end end end if #ARGV > argvIndexOffset then for index in pairs(ARGV) do if (index > argvIndexOffset) then - local idx = index % 8 - if idx == 2 then + local idx = index % 7 + if idx == 3 then messageId = ARGV[index] keyMessage = KEYS[keyIndexOffset + 1] keyQueuePending = KEYS[keyIndexOffset + 2] keyQueueProperties = KEYS[keyIndexOffset + 3] - keyPriorityQueuePending = KEYS[keyIndexOffset + 4] - keyQueueScheduled = KEYS[keyIndexOffset + 5] - keyScheduledMessage = KEYS[keyIndexOffset + 6] - keyIndexOffset = keyIndexOffset + 6 - elseif idx == 3 then - messagePriority = ARGV[index] + keyQueueMessages = KEYS[keyIndexOffset + 4] + keyPriorityQueuePending = KEYS[keyIndexOffset + 5] + keyQueueScheduled = KEYS[keyIndexOffset + 6] + keyScheduledMessage = KEYS[keyIndexOffset + 7] + keyIndexOffset = keyIndexOffset + 7 elseif idx == 4 then message = ARGV[index] elseif idx == 5 then messageState = ARGV[index] elseif idx == 6 then - keyQueueMessages = ARGV[index] - elseif idx == 7 then - scheduledMessageId = ARGV[index] + messagePriority = ARGV[index] elseif idx == 0 then - scheduledMessageNextScheduleTimestamp = ARGV[index] + scheduledMessageId = ARGV[index] elseif idx == 1 then + scheduledMessageNextScheduleTimestamp = ARGV[index] + elseif idx == 2 then scheduledMessageState = ARGV[index] handleMessage() end diff --git a/src/workers/publish-scheduled.worker.ts b/src/workers/publish-scheduled.worker.ts index c75a6139..59dc25e8 100644 --- a/src/workers/publish-scheduled.worker.ts +++ b/src/workers/publish-scheduled.worker.ts @@ -20,6 +20,7 @@ import { import { ICallback } from 'redis-smq-common'; import { _getMessages } from '../lib/queue/queue-messages/_get-message'; import { _fromMessage } from '../lib/message/_from-message'; +import { MessageState } from '../lib/message/message-state'; export class PublishScheduledWorker extends Worker { protected redisClient: RedisClient; @@ -68,23 +69,8 @@ export class PublishScheduledWorker extends Worker { messages, (msg, _, done) => { const ts = Date.now(); - const message = _fromMessage(msg, null, null); - const messageState = message - .resetScheduledParams() - .getSetMessageState() - .setPublishedAt(ts) - .setScheduledMessageId(msg.getRequiredId()); - const messageId = messageState.getId(); - const messagePriority = message.getPriority() ?? ''; - const queue = message.getDestinationQueue(); - const { - keyQueueProperties, - keyQueuePending, - keyPriorityQueuePending, - keyQueueScheduled, - keyQueueMessages, - } = redisKeys.getQueueKeys(queue); - const { keyMessage } = redisKeys.getMessageKeys(messageId); + const messagePriority = msg.getPriority() ?? ''; + const queue = msg.getDestinationQueue(); const { keyMessage: keyScheduledMessage } = redisKeys.getMessageKeys( msg.getRequiredId(), ); @@ -92,20 +78,48 @@ export class PublishScheduledWorker extends Worker { const scheduledMessageState = msg .getRequiredMessageState() .setLastScheduledAt(ts); + const { + keyQueueProperties, + keyQueuePending, + keyPriorityQueuePending, + keyQueueScheduled, + keyQueueMessages, + } = redisKeys.getQueueKeys(queue); + + let newMessage: Message | null = null; + let newMessageState: MessageState | null = null; + let newMessageId: string = ''; + let newKeyMessage: string = ''; + + const hasBeenUnacknowledged = + msg.getRetryDelay() > 0 && + msg.getRequiredMessageState().getAttempts() > 0; + + if (!hasBeenUnacknowledged) { + newMessage = _fromMessage(msg, null, null); + newMessageState = newMessage + .resetScheduledParams() + .getSetMessageState() + .setPublishedAt(ts) + .setScheduledMessageId(msg.getRequiredId()); + newMessageId = newMessageState.getId(); + newKeyMessage = redisKeys.getMessageKeys(newMessageId).keyMessage; + } + keys.push( - keyMessage, + newKeyMessage, keyQueuePending, keyQueueProperties, + keyQueueMessages, keyPriorityQueuePending, keyQueueScheduled, keyScheduledMessage, ); argv.push( - messageId, + newMessageId, + newMessage ? JSON.stringify(newMessage) : '', + newMessageState ? JSON.stringify(newMessageState) : '', messagePriority, - JSON.stringify(message), - JSON.stringify(messageState), - keyQueueMessages, msg.getRequiredId(), nextScheduleTimestamp, JSON.stringify(scheduledMessageState),