Skip to content

Commit

Permalink
Merge a216558 into 6c6e893
Browse files Browse the repository at this point in the history
  • Loading branch information
adriangalera committed Oct 30, 2018
2 parents 6c6e893 + a216558 commit d65d344
Showing 1 changed file with 41 additions and 40 deletions.
81 changes: 41 additions & 40 deletions localstack/services/sns/sns_listener.py
Expand Up @@ -65,47 +65,48 @@ def forward_request(self, method, path, data, headers):
elif req_action == 'Publish':
message = req_data['Message'][0]
sqs_client = aws_stack.connect_to_service('sqs')
for subscriber in SNS_SUBSCRIPTIONS[topic_arn]:
filter_policy = json.loads(subscriber.get('FilterPolicy', '{}'))
message_attributes = get_message_attributes(req_data)
if check_filter_policy(filter_policy, message_attributes):
if subscriber['Protocol'] == 'sqs':
endpoint = subscriber['Endpoint']
if 'sqs_queue_url' in subscriber:
queue_url = subscriber.get('sqs_queue_url')
elif '://' in endpoint:
queue_url = endpoint
else:
queue_name = endpoint.split(':')[5]
queue_url = aws_stack.get_sqs_queue_url(queue_name)
subscriber['sqs_queue_url'] = queue_url
try:
sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=create_sns_message_body(subscriber, req_data)
if topic_arn is not None:
for subscriber in SNS_SUBSCRIPTIONS[topic_arn]:
filter_policy = json.loads(subscriber.get('FilterPolicy', '{}'))
message_attributes = get_message_attributes(req_data)
if check_filter_policy(filter_policy, message_attributes):
if subscriber['Protocol'] == 'sqs':
endpoint = subscriber['Endpoint']
if 'sqs_queue_url' in subscriber:
queue_url = subscriber.get('sqs_queue_url')
elif '://' in endpoint:
queue_url = endpoint
else:
queue_name = endpoint.split(':')[5]
queue_url = aws_stack.get_sqs_queue_url(queue_name)
subscriber['sqs_queue_url'] = queue_url
try:
sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=create_sns_message_body(subscriber, req_data)
)
except Exception as exc:
return make_error(message=str(exc), code=400)
elif subscriber['Protocol'] == 'lambda':
lambda_api.process_sns_notification(
subscriber['Endpoint'],
topic_arn, message, subject=req_data.get('Subject', [None])[0]
)
elif subscriber['Protocol'] in ['http', 'https']:
try:
message_body = create_sns_message_body(subscriber, req_data)
except Exception as exc:
return make_error(message=str(exc), code=400)
requests.post(
subscriber['Endpoint'],
headers={
'Content-Type': 'text/plain',
'x-amz-sns-message-type': 'Notification'
},
data=message_body
)
except Exception as exc:
return make_error(message=str(exc), code=400)
elif subscriber['Protocol'] == 'lambda':
lambda_api.process_sns_notification(
subscriber['Endpoint'],
topic_arn, message, subject=req_data.get('Subject', [None])[0]
)
elif subscriber['Protocol'] in ['http', 'https']:
try:
message_body = create_sns_message_body(subscriber, req_data)
except Exception as exc:
return make_error(message=str(exc), code=400)
requests.post(
subscriber['Endpoint'],
headers={
'Content-Type': 'text/plain',
'x-amz-sns-message-type': 'Notification'
},
data=message_body
)
else:
LOGGER.warning('Unexpected protocol "%s" for SNS subscription' % subscriber['Protocol'])
else:
LOGGER.warning('Unexpected protocol "%s" for SNS subscription' % subscriber['Protocol'])
# return response here because we do not want the request to be forwarded to SNS
return make_response(req_action)

Expand Down

0 comments on commit d65d344

Please sign in to comment.