Permalink
Browse files

Adding a convenience method to subscribe an SQS queue to an SNS topic.

  • Loading branch information...
1 parent 31d8d71 commit cbf8a86aea1556a8314ff77e93510934405b8c81 Mitch.Garnaat committed Apr 20, 2010
Showing with 42 additions and 0 deletions.
  1. +42 −0 boto/sns/__init__.py
View
@@ -22,6 +22,7 @@
from boto.connection import AWSQueryConnection
from boto.sdb.regioninfo import SDBRegionInfo
import boto
+import uuid
try:
import json
except ImportError:
@@ -251,6 +252,47 @@ def subscribe(self, topic, protocol, endpoint):
boto.log.error('%s' % body)
raise self.ResponseError(response.status, response.reason, body)
+ def subscribe_sqs_queue(self, topic, queue):
+ """
+ Subscribe an SQS queue to a topic.
+
+ This is convenience method that handles most of the complexity involved
+ in using ans SQS queue as an endpoint for an SNS topic. To achieve this
+ the following operations are performed:
+ * The correct ARN is constructed for the SQS queue and that ARN is
+ then subscribed to the topic.
+ * A JSON policy document is contructed that grants permission to
+ the SNS topic to send messages to the SQS queue.
+ * This JSON policy is then associated with the SQS queue using
+ the queue's set_attribute method. If the queue already has
+ a policy associated with it, this process will add a Statement to
+ that policy. If no policy exists, a new policy will be created.
+
+ :type topic: string
+ :param topic: The name of the new topic.
+
+ :type queue: A boto Queue object
+ :param queue: The queue you wish to subscribe to the SNS Topic.
+ """
+ t = queue.id.split('/')
+ q_arn = 'arn:aws:sqs:%s:%s:%s' % (queue.connection.region.name,
+ t[1], t[2])
+ resp = self.subscribe(topic, 'sqs', q_arn)
+ policy = queue.get_attributes('Policy')
+ if 'Version' not in policy:
+ policy['Version'] = '2008-10-17'
+ if 'Statement' not in policy:
+ policy['Statement'] = []
+ statement = {'Action' : 'SQS:SendMessage',
+ 'Effect' : 'Allow',
+ 'Principal' : {'AWS' : '*'},
+ 'Resource' : q_arn,
+ 'Sid' : str(uuid.uuid4()),
+ 'Condition' : {'StringLike' : {'aws:SourceArn' : topic}}}
+ policy['Statement'].append(statement)
+ queue.set_attribute('Policy', json.dumps(policy))
+ return resp
+
def confirm_subscription(self, topic, token,
authenticate_on_unsubscribe=False):
"""

0 comments on commit cbf8a86

Please sign in to comment.