Skip to content

Commit

Permalink
fix sqs message group visibility when sending messages (#9975)
Browse files Browse the repository at this point in the history
  • Loading branch information
thrau committed Jan 3, 2024
1 parent 4a5a9fd commit 367ff33
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 1 deletion.
4 changes: 3 additions & 1 deletion localstack/services/sqs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,9 @@ def _put_message(self, message: SqsMessage):
# put the message into the group
message_group.push(message)

# if a message becomes visible in the queue, that message's group becomes visible also
# if an older message becomes visible again in the queue, that message's group becomes visible also.
if message.receive_count < 1:
return
if message_group in self.inflight_groups:
self.inflight_groups.remove(message_group)
self.message_group_queue.put_nowait(message_group)
Expand Down
48 changes: 48 additions & 0 deletions tests/aws/services/sqs/test_sqs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
import os
import re
import threading
import time
from queue import Empty, Queue
from threading import Timer
from typing import Dict

Expand Down Expand Up @@ -1312,6 +1314,52 @@ def test_external_host_via_header_complete_message_lifecycle(self, monkeypatch):
assert result.status_code == 200
assert message_body in result.text

@markers.aws.validated
def test_fifo_message_group_visibility(self, sqs_create_queue, aws_client):
queue_url = sqs_create_queue(
QueueName=f"queue-{short_uid()}.fifo",
Attributes={
"FifoQueue": "true",
"VisibilityTimeout": "60",
"ContentBasedDeduplication": "true",
},
)

queue = Queue()

def _receive_message():
"""Worker thread for receiving messages"""
response = aws_client.sqs.receive_message(
QueueUrl=queue_url, MaxNumberOfMessages=1, WaitTimeSeconds=10
)
if not response.get("Messages"):
return None
queue.put(response["Messages"][0])

# start three concurrent listeners
threading.Thread(target=_receive_message).start()
threading.Thread(target=_receive_message).start()
threading.Thread(target=_receive_message).start()

# send a message to the queue
aws_client.sqs.send_message(QueueUrl=queue_url, MessageBody="message-1", MessageGroupId="1")

# one worker should return immediately with the message and put the message group into "inflight"
message = queue.get(timeout=2)
assert message["Body"] == "message-1"

# sending new messages to the message group should not modify its visibility, so the message group is still
# in inflight mode, even after waiting 2 seconds on the message.
aws_client.sqs.send_message(QueueUrl=queue_url, MessageBody="message-2", MessageGroupId="1")
with pytest.raises(Empty):
# if the queue is not empty, it means one of the threads received a message when it shouldn't
queue.get(timeout=2)

# now we delete the original message, which should make the group visible immediately
aws_client.sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"])
message = queue.get(timeout=2)
assert message["Body"] == "message-2"

@markers.aws.validated
def test_fifo_messages_in_order_after_timeout(self, sqs_create_queue, aws_client):
# issue 4287
Expand Down
3 changes: 3 additions & 0 deletions tests/aws/services/sqs/test_sqs.validation.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_fifo_message_attributes": {
"last_validated_date": "2023-11-14T10:58:35+00:00"
},
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_fifo_message_group_visibility": {
"last_validated_date": "2024-01-03T00:55:48+00:00"
},
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_fifo_queue_send_message_with_delay_seconds_fails": {
"last_validated_date": "2023-11-14T10:58:29+00:00"
},
Expand Down

0 comments on commit 367ff33

Please sign in to comment.