Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix SNS subscription deleted when SQS endpoint was deleted #6645

Merged
merged 1 commit into from Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 4 additions & 6 deletions localstack/services/sns/provider.py
Expand Up @@ -991,12 +991,10 @@ async def message_to_subscriber(
store_delivery_log(subscriber, False, message, message_id)
sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))
if "NonExistentQueue" in str(exc):
LOG.info(
'Removing non-existent queue "%s" subscribed to topic "%s"',
queue_url,
topic_arn,
)
subscriptions.remove(subscriber)
LOG.debug("The SQS queue endpoint does not exist anymore")
# todo: if the queue got deleted, even if we recreate a queue with the same name/url
# AWS won't send to it anymore. Would need to unsub/resub.
# We should mark this subscription as "broken"
return

elif subscriber["Protocol"] == "lambda":
Expand Down
89 changes: 89 additions & 0 deletions tests/integration/test_sns.py
Expand Up @@ -2,6 +2,7 @@
import json
import queue
import random
import time
from operator import itemgetter

import pytest
Expand Down Expand Up @@ -2113,3 +2114,91 @@ def test_publish_to_gcm(self, sns_client):

sns_client.delete_endpoint(EndpointArn=endpoint_arn)
sns_client.delete_platform_application(PlatformApplicationArn=platform_app_arn)

@pytest.mark.aws_validated
@pytest.mark.skip_snapshot_verify(
paths=[
"$..Attributes.Owner",
"$..Attributes.ConfirmationWasAuthenticated",
"$..Attributes.RawMessageDelivery",
"$..Attributes.sqs_queue_url",
"$..Subscriptions..Owner",
]
)
def test_subscription_after_failure_to_deliver(
self,
sns_client,
sqs_client,
sns_create_topic,
sqs_create_queue,
sqs_queue_arn,
sqs_queue_exists,
sns_create_sqs_subscription,
sns_allow_topic_sqs_queue,
snapshot,
):
topic_arn = sns_create_topic()["TopicArn"]
queue_name = f"test-queue-{short_uid()}"
queue_url = sqs_create_queue(QueueName=queue_name)

subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)
subscription_arn = subscription["SubscriptionArn"]

dlq_url = sqs_create_queue()
dlq_arn = sqs_queue_arn(dlq_url)

sns_allow_topic_sqs_queue(
sqs_queue_url=dlq_url,
sqs_queue_arn=dlq_arn,
sns_topic_arn=topic_arn,
)

sub_attrs = sns_client.get_subscription_attributes(SubscriptionArn=subscription_arn)
snapshot.match("subscriptions-attrs", sub_attrs)

message = "test_dlq_before_sqs_endpoint_deleted"
sns_client.publish(TopicArn=topic_arn, Message=message)
response = sqs_client.receive_message(
QueueUrl=queue_url, WaitTimeSeconds=10, MaxNumberOfMessages=4
)
snapshot.match("messages-before-delete", response)
sqs_client.delete_message(
QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]
)

sqs_client.delete_queue(QueueUrl=queue_url)
# try to send a message before setting a DLQ
message = "test_dlq_after_sqs_endpoint_deleted"
sns_client.publish(TopicArn=topic_arn, Message=message)
# to avoid race condition, publish is async and the redrive policy can be in effect before the actual publish
time.sleep(1)

# check the subscription is still there after we deleted the queue
subscriptions = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)
snapshot.match("subscriptions", subscriptions)

sns_client.set_subscription_attributes(
SubscriptionArn=subscription_arn,
AttributeName="RedrivePolicy",
AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),
)

sub_attrs = sns_client.get_subscription_attributes(SubscriptionArn=subscription_arn)
snapshot.match("subscriptions-attrs-with-redrive", sub_attrs)

# AWS takes some time to delete the queue, which make the test fails as it delivers the message correctly
assert poll_condition(lambda: not sqs_queue_exists(queue_url), timeout=5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poll_condition 💯


# test sending and receiving multiple messages
for i in range(2):
message = f"test_dlq_after_sqs_endpoint_deleted_{i}"

sns_client.publish(TopicArn=topic_arn, Message=message)
response = sqs_client.receive_message(
QueueUrl=dlq_url, WaitTimeSeconds=10, MaxNumberOfMessages=4
)
sqs_client.delete_message(
QueueUrl=dlq_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]
)

snapshot.match(f"message-{i}-after-delete", response)
127 changes: 127 additions & 0 deletions tests/integration/test_sns.snapshot.json
Expand Up @@ -1668,5 +1668,132 @@
"tests/integration/test_sns.py::TestSNSProvider::test_publish_sqs_from_sns_with_xray_propagation": {
"recorded-date": "09-08-2022, 11:35:46",
"recorded-content": {}
},
"tests/integration/test_sns.py::TestSNSProvider::test_subscription_after_failure_to_deliver": {
"recorded-date": "10-08-2022, 17:04:52",
"recorded-content": {
"subscriptions-attrs": {
"Attributes": {
"ConfirmationWasAuthenticated": "true",
"Endpoint": "arn:aws:sqs:<region>:111111111111:<resource:1>",
"Owner": "111111111111",
"PendingConfirmation": "false",
"Protocol": "sqs",
"RawMessageDelivery": "false",
"SubscriptionArn": "arn:aws:sns:<region>:111111111111:<resource:3>:<resource:2>",
"TopicArn": "arn:aws:sns:<region>:111111111111:<resource:3>"
},
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 200
}
},
"messages-before-delete": {
"Messages": [
{
"Body": {
"Type": "Notification",
"MessageId": "<uuid:1>",
"TopicArn": "arn:aws:sns:<region>:111111111111:<resource:3>",
"Message": "test_dlq_before_sqs_endpoint_deleted",
"Timestamp": "date",
"SignatureVersion": "1",
"Signature": "<signature>",
"SigningCertURL": "https://sns.<region>.amazonaws.com/SimpleNotificationService-<signing-cert-file:1>",
"UnsubscribeURL": "<unsubscribe-domain>/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:<region>:111111111111:<resource:3>:<resource:2>"
},
"MD5OfBody": "<md5-hash>",
"MessageId": "<uuid:2>",
"ReceiptHandle": "<receipt-handle:1>"
}
],
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 200
}
},
"subscriptions": {
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 200
},
"Subscriptions": [
{
"Endpoint": "arn:aws:sqs:<region>:111111111111:<resource:1>",
"Owner": "111111111111",
"Protocol": "sqs",
"SubscriptionArn": "arn:aws:sns:<region>:111111111111:<resource:3>:<resource:2>",
"TopicArn": "arn:aws:sns:<region>:111111111111:<resource:3>"
}
]
},
"subscriptions-attrs-with-redrive": {
"Attributes": {
"ConfirmationWasAuthenticated": "true",
"Endpoint": "arn:aws:sqs:<region>:111111111111:<resource:1>",
"Owner": "111111111111",
"PendingConfirmation": "false",
"Protocol": "sqs",
"RawMessageDelivery": "false",
"RedrivePolicy": {
"deadLetterTargetArn": "arn:aws:sqs:<region>:111111111111:<resource:4>"
},
"SubscriptionArn": "arn:aws:sns:<region>:111111111111:<resource:3>:<resource:2>",
"TopicArn": "arn:aws:sns:<region>:111111111111:<resource:3>"
},
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 200
}
},
"message-0-after-delete": {
"Messages": [
{
"Body": {
"Type": "Notification",
"MessageId": "<uuid:3>",
"TopicArn": "arn:aws:sns:<region>:111111111111:<resource:3>",
"Message": "test_dlq_after_sqs_endpoint_deleted_0",
"Timestamp": "date",
"SignatureVersion": "1",
"Signature": "<signature>",
"SigningCertURL": "https://sns.<region>.amazonaws.com/SimpleNotificationService-<signing-cert-file:1>",
"UnsubscribeURL": "<unsubscribe-domain>/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:<region>:111111111111:<resource:3>:<resource:2>"
},
"MD5OfBody": "<md5-hash>",
"MessageId": "<uuid:4>",
"ReceiptHandle": "<receipt-handle:2>"
}
],
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 200
}
},
"message-1-after-delete": {
"Messages": [
{
"Body": {
"Type": "Notification",
"MessageId": "<uuid:5>",
"TopicArn": "arn:aws:sns:<region>:111111111111:<resource:3>",
"Message": "test_dlq_after_sqs_endpoint_deleted_1",
"Timestamp": "date",
"SignatureVersion": "1",
"Signature": "<signature>",
"SigningCertURL": "https://sns.<region>.amazonaws.com/SimpleNotificationService-<signing-cert-file:1>",
"UnsubscribeURL": "<unsubscribe-domain>/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:<region>:111111111111:<resource:3>:<resource:2>"
},
"MD5OfBody": "<md5-hash>",
"MessageId": "<uuid:6>",
"ReceiptHandle": "<receipt-handle:3>"
}
],
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 200
}
}
}
}
}