Skip to content

Commit

Permalink
Merge 5a4fa85 into d07f58a
Browse files Browse the repository at this point in the history
  • Loading branch information
mattfrohligeraver committed Aug 31, 2018
2 parents d07f58a + 5a4fa85 commit a176ac2
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 6 deletions.
33 changes: 32 additions & 1 deletion localstack/services/awslambda/lambda_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
LAMBDA_RUNTIME_DOTNETCORE2,
LAMBDA_RUNTIME_GOLANG)
from localstack.utils.common import (to_str, load_file, save_file, TMP_FILES, ensure_readable,
mkdir, unzip, is_zip_file, run, short_uid, is_jar_archive, timestamp, TIMESTAMP_FORMAT_MILLIS)
mkdir, unzip, is_zip_file, run, short_uid, is_jar_archive, timestamp, TIMESTAMP_FORMAT_MILLIS,
md5)
from localstack.utils.aws import aws_stack, aws_responses
from localstack.utils.analytics import event_publisher
from localstack.utils.cloudwatch.cloudwatch_util import cloudwatched
Expand Down Expand Up @@ -212,6 +213,36 @@ def process_kinesis_records(records, stream_name):
LOG.warning('Unable to run Lambda function on Kinesis records: %s %s' % (e, traceback.format_exc()))


def process_sqs_message(message_body, queue_name):
# feed message into the first listening lambda
try:
queue_arn = aws_stack.sqs_queue_arn(queue_name)
source = next(iter(get_event_sources(source_arn=queue_arn)), None)
if source:
arn = source['FunctionArn']
event = {'Records': [{
'body': message_body,
'receiptHandle': 'MessageReceiptHandle',
'md5OfBody': md5(message_body),
'eventSourceARN': queue_arn,
'eventSource': 'aws:sqs',
'awsRegion': aws_stack.get_local_region(),
'messageId': str(uuid.uuid4()),
'attributes': {
'ApproximateFirstReceiveTimestamp': '{}000'.format(int(time.time())),
'SenderId': '123456789012',
'ApproximateReceiveCount': '1',
'SentTimestamp': '{}000'.format(int(time.time()))
},
'messageAttributes': {},
'sqs': True,
}]}
run_lambda(event=event, context={}, func_arn=arn)
return True
except Exception as e:
LOG.warning('Unable to run Lambda function on SQS messages: %s %s' % (e, traceback.format_exc()))


def get_event_sources(func_name=None, source_arn=None):
result = []
for m in event_source_mappings:
Expand Down
32 changes: 31 additions & 1 deletion localstack/services/sqs/sqs_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,30 @@
from requests.models import Request, Response
from localstack import config
from localstack.config import HOSTNAME_EXTERNAL
from localstack.utils.common import to_str
from localstack.utils.common import to_str, md5
from localstack.utils.analytics import event_publisher
from localstack.services.awslambda import lambda_api
from localstack.services.generic_proxy import ProxyListener


XMLNS_SQS = 'http://queue.amazonaws.com/doc/2012-11-05/'


SUCCESSFUL_SEND_MESSAGE_XML_TEMPLATE = (
'<?xml version="1.0"?>' # noqa: W291
'<SendMessageResponse xmlns="' + XMLNS_SQS + '">' # noqa: W291
'<SendMessageResult>' # noqa: W291
'<MD5OfMessageAttributes>{message_attr_hash}</MD5OfMessageAttributes>' # noqa: W291
'<MD5OfMessageBody>{message_body_hash}</MD5OfMessageBody>' # noqa: W291
'<MessageId>{message_id}</MessageId>' # noqa: W291
'</SendMessageResult>' # noqa: W291
'<ResponseMetadata>' # noqa: W291
'<RequestId>00000000-0000-0000-0000-000000000000</RequestId>' # noqa: W291
'</ResponseMetadata>' # noqa: W291
'</SendMessageResponse>' # noqa: W291
)


class ProxyListenerSQS(ProxyListener):

def forward_request(self, method, path, data, headers):
Expand All @@ -24,6 +40,20 @@ def forward_request(self, method, path, data, headers):
encoded_data = urlencode(req_data, doseq=True)
request = Request(data=encoded_data, headers=headers, method=method)
return request
elif req_data.get('Action', [None])[0] == 'SendMessage':
queue_url = req_data.get('QueueUrl', [None])[0]
queue_name = queue_url[queue_url.rindex('/') + 1:]
message_body = req_data.get('MessageBody', [None])[0]
if lambda_api.process_sqs_message(message_body, queue_name):
# If an lambda was listening, do not add the message to the queue
new_response = Response()
new_response._content = SUCCESSFUL_SEND_MESSAGE_XML_TEMPLATE.format(
message_attr_hash=md5(data),
message_body_hash=md5(message_body),
message_id=str(uuid.uuid4()),
)
new_response.status_code = 200
return new_response

return True

Expand Down
2 changes: 2 additions & 0 deletions localstack/utils/aws/aws_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ def get(obj, pool=None, type=None):
inst.table = table
else:
inst = DynamoDB(obj)
elif obj.startswith('arn:aws:sqs:'):
inst = SqsQueue(obj)
elif type:
for o in EventSource.filter_type(pool, type):
if o.name() == obj:
Expand Down
3 changes: 2 additions & 1 deletion localstack/utils/aws/aws_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ def s3_bucket_arn(bucket_name, account_id=None):

def sqs_queue_arn(queue_name, account_id=None):
account_id = get_account_id(account_id)
return ('arn:aws:sqs:%s:%s:%s' % (get_local_region(), account_id, queue_name))
# ElasticMQ sets a static region of "elasticmq"
return ('arn:aws:sqs:elasticmq:%s:%s' % (account_id, queue_name))


def sns_topic_arn(topic_name, account_id=None):
Expand Down
20 changes: 20 additions & 0 deletions localstack/utils/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,23 @@ def send_dynamodb_request(path, action, request_body):
}
url = '{}/{}'.format(os.getenv('TEST_DYNAMODB_URL'), path)
return requests.put(url, data=request_body, headers=headers, verify=False)


def create_sqs_queue(queue_name):
"""Utility method to create a new queue via SQS API"""

client = aws_stack.connect_to_service('sqs')

# create queue
queue_url = client.create_queue(QueueName=queue_name)['QueueUrl']

# get the queue arn
queue_arn = client.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=['QueueArn'],
)['Attributes']['QueueArn']

return {
'QueueUrl': queue_url,
'QueueArn': queue_arn,
}
4 changes: 4 additions & 0 deletions tests/integration/lambdas/lambda_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ def deserialize_event(event):
assert kinesis['sequenceNumber']
kinesis['data'] = json.loads(to_str(base64.b64decode(kinesis['data'])))
return kinesis
sqs = event.get('sqs')
if sqs:
result = {'data': event['body']}
return result
return event.get('Sns')


Expand Down
19 changes: 16 additions & 3 deletions tests/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
TEST_TABLE_NAME = 'test_stream_table'
TEST_LAMBDA_NAME_DDB = 'test_lambda_ddb'
TEST_LAMBDA_NAME_STREAM = 'test_lambda_stream'
TEST_LAMBDA_NAME_QUEUE = 'test_lambda_queue'
TEST_FIREHOSE_NAME = 'test_firehose'
TEST_BUCKET_NAME = lambda_integration.TEST_BUCKET_NAME
TEST_TOPIC_NAME = 'test_topic'
Expand Down Expand Up @@ -114,14 +115,14 @@ def test_firehose_kinesis_to_s3():
testutil.assert_objects(json.loads(to_str(test_data)), all_objects)


def test_kinesis_lambda_sns_ddb_streams():

def test_kinesis_lambda_sns_ddb_sqs_streams():
ddb_lease_table_suffix = '-kclapp'
dynamodb = aws_stack.connect_to_resource('dynamodb')
dynamodb_service = aws_stack.connect_to_service('dynamodb')
dynamodbstreams = aws_stack.connect_to_service('dynamodbstreams')
kinesis = aws_stack.connect_to_service('kinesis')
sns = aws_stack.connect_to_service('sns')
sqs = aws_stack.connect_to_service('sqs')

LOGGER.info('Creating test streams...')
run_safe(lambda: dynamodb_service.delete_table(
Expand Down Expand Up @@ -166,6 +167,11 @@ def process_records(records, shard_id):
testutil.create_lambda_function(func_name=TEST_LAMBDA_NAME_STREAM,
zip_file=zip_file, event_source_arn=kinesis_event_source_arn, runtime=LAMBDA_RUNTIME_PYTHON27)

# deploy test lambda connected to SQS queue
sqs_queue_info = testutil.create_sqs_queue(TEST_LAMBDA_NAME_QUEUE)
testutil.create_lambda_function(func_name=TEST_LAMBDA_NAME_QUEUE,
zip_file=zip_file, event_source_arn=sqs_queue_info['QueueArn'], runtime=LAMBDA_RUNTIME_PYTHON27)

# set number of items to update/put to table
num_events_ddb = 15
num_put_new_items = 5
Expand Down Expand Up @@ -231,10 +237,15 @@ def process_records(records, shard_id):
shard_id='shardId-000000000000', count=10)
assert len(latest) == 10

# send messages to SQS queue
num_events_sqs = 4
for i in range(num_events_sqs):
sqs.send_message(QueueUrl=sqs_queue_info['QueueUrl'], MessageBody=str(i))

LOGGER.info('Waiting some time before finishing test.')
time.sleep(2)

num_events = num_events_ddb + num_events_kinesis + num_events_sns
num_events = num_events_ddb + num_events_kinesis + num_events_sns + num_events_sqs

def check_events():
if len(EVENTS) != num_events:
Expand All @@ -259,6 +270,8 @@ def check_events():
assert len(stats2['Datapoints']) == 1
stats3 = get_lambda_metrics(TEST_LAMBDA_NAME_DDB)
assert len(stats3['Datapoints']) == num_events_ddb
stats3 = get_lambda_metrics(TEST_LAMBDA_NAME_QUEUE)
assert len(stats3['Datapoints']) == num_events_sqs


def test_kinesis_lambda_forward_chain():
Expand Down

0 comments on commit a176ac2

Please sign in to comment.