Skip to content

Commit

Permalink
refactor(message): handle various errors when deleting a message
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Jan 3, 2024
1 parent 97665fe commit a4505ae
Showing 1 changed file with 34 additions and 29 deletions.
63 changes: 34 additions & 29 deletions src/common/redis-client/lua/delete-message.lua
Original file line number Diff line number Diff line change
Expand Up @@ -44,40 +44,45 @@ local function updateQueue()
end

local function deleteMessage()
local deleted = 0;
local messageStatus = redis.call("HGET", keyMessage, EMessagePropertyStatus)
if messageStatus ~= EMessagePropertyStatusProcessing then
if messageStatus == EMessagePropertyStatusAcknowledged then
deleted = redis.call("LREM", keyQueueAcknowledged, 1, messageId)
elseif messageStatus == EMessagePropertyStatusDeadLettered then
deleted = redis.call("LREM", keyQueueDL, 1, messageId)
elseif messageStatus == EMessagePropertyStatusScheduled then
deleted = redis.call("ZREM", keyScheduledMessages, messageId)
redis.call("ZREM", keyQueueScheduled, messageId)
elseif messageStatus == EMessagePropertyStatusUnackDelaying then
deleted = redis.call("LREM", keyDelayedMessages, 1, messageId)
elseif messageStatus == EMessagePropertyStatusUnackRequeuing then
deleted = redis.call("LREM", keyRequeueMessages, 1, messageId)
elseif messageStatus == EMessagePropertyStatusPending then
local queueType = redis.call("HGET", keyQueueProperties, EQueuePropertyQueueType)
if queueType ~= false then
if queueType == EQueuePropertyQueueTypePriorityQueue then
deleted = redis.call("ZREM", keyPriorityQueuePending, messageId)
end
if queueType == EQueuePropertyQueueTypeFIFOQueue or queueType == EQueuePropertyQueueTypeLIFOQueue then
deleted = redis.call("LREM", keyQueuePending, 1, messageId)
end
if messageStatus == false then
return 'MESSAGE_NOT_FOUND'
end
if messageStatus == EMessagePropertyStatusProcessing then
return 'MESSAGE_IN_PROCESS'
end
local deleted = 0;
if messageStatus == EMessagePropertyStatusAcknowledged then
deleted = redis.call("LREM", keyQueueAcknowledged, 1, messageId)
elseif messageStatus == EMessagePropertyStatusDeadLettered then
deleted = redis.call("LREM", keyQueueDL, 1, messageId)
elseif messageStatus == EMessagePropertyStatusScheduled then
deleted = redis.call("ZREM", keyScheduledMessages, messageId)
redis.call("ZREM", keyQueueScheduled, messageId)
elseif messageStatus == EMessagePropertyStatusUnackDelaying then
deleted = redis.call("LREM", keyDelayedMessages, 1, messageId)
elseif messageStatus == EMessagePropertyStatusUnackRequeuing then
deleted = redis.call("LREM", keyRequeueMessages, 1, messageId)
elseif messageStatus == EMessagePropertyStatusPending then
local queueType = redis.call("HGET", keyQueueProperties, EQueuePropertyQueueType)
if queueType ~= false then
if queueType == EQueuePropertyQueueTypePriorityQueue then
deleted = redis.call("ZREM", keyPriorityQueuePending, messageId)
end
if queueType == EQueuePropertyQueueTypeFIFOQueue or queueType == EQueuePropertyQueueTypeLIFOQueue then
deleted = redis.call("LREM", keyQueuePending, 1, messageId)
end
end
end
if deleted == 1 then
redis.call("DEL", keyMessage)
return true
return 'OK'
end
return false
return 'MESSAGE_NOT_DELETED'
end


local deleteMessageStatus = 'INVALID_PARAMETERS'

if #ARGV > argvIndexOffset then
for index in pairs(ARGV) do
Expand All @@ -91,13 +96,13 @@ if #ARGV > argvIndexOffset then
keyQueueScheduled = KEYS[keyIndexOffset + 6]
keyPriorityQueuePending = KEYS[keyIndexOffset + 7]
keyIndexOffset = keyIndexOffset + 7
local reply = deleteMessage()
if reply == true then
updateQueue()
deleteMessageStatus = deleteMessage()
if deleteMessageStatus ~= 'OK' then
break
end
updateQueue()
end
end
return 'OK'
end

return 'INVALID_PARAMETERS'
return deleteMessageStatus

0 comments on commit a4505ae

Please sign in to comment.