From df011ca53836476c35c47abb236aae4c505dc4f3 Mon Sep 17 00:00:00 2001 From: Gene Taylor Date: Wed, 10 May 2017 14:02:32 +1000 Subject: [PATCH] #97: Make receive-message match sqs behaviour SQS only includes message attributes if they are specified in the recieve message request, while ElasticMQ was returning all attributes by default. I have updated the ReceiveMessageDirective to respect the messageAttributeName parameter and to no longer include attributes unless specified. --- .../rest/sqs/AmazonJavaSdkTestSuite.scala | 70 ++++++++++++++++--- .../rest/sqs/ReceiveMessageDirectives.scala | 21 +++++- 2 files changed, 81 insertions(+), 10 deletions(-) diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala index c2983d125..06be440c7 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala @@ -8,6 +8,8 @@ import com.amazonaws.auth.BasicAWSCredentials import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClient} import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + import com.amazonaws.services.sqs.model._ import scala.util.control.Exception._ import com.amazonaws.AmazonServiceException @@ -169,6 +171,24 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter )) } + test("should send a simple message with message attributes and only receive requested attributes") { + doTestSendAndReceiveMessageWithAttributes("Message 1", Map( + "red" -> StringMessageAttribute("fish"), + "blue" -> StringMessageAttribute("cat"), + "green" -> BinaryMessageAttribute("dog".getBytes("UTF-8")), + "yellow" -> NumberMessageAttribute("1234567890") + ), List("red", "green")) + } + + test("should send a simple message with message attributes and only receive no requested attributes by default") { + doTestSendAndReceiveMessageWithAttributes("Message 1", Map( + "red" -> StringMessageAttribute("fish"), + "blue" -> StringMessageAttribute("cat"), + "green" -> BinaryMessageAttribute("dog".getBytes("UTF-8")), + "yellow" -> NumberMessageAttribute("1234567890") + ), List()) + } + test("should send and receive a message with caret return and new line characters") { doTestSendAndReceiveMessage("a\rb\r\nc\nd") } @@ -189,7 +209,9 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter doTestSendAndReceiveMessage(builder.toString()) } - def doTestSendAndReceiveMessageWithAttributes(content: String, messageAttributes: Map[String, MessageAttribute]) { + def doTestSendAndReceiveMessageWithAttributes(content: String, + messageAttributes: Map[String, MessageAttribute], + requestedAttributes: List[String]) { // Given val queueUrl = client.createQueue(new CreateQueueRequest("testQueue1")).getQueueUrl @@ -208,11 +230,21 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter } client.sendMessage(sendMessage) - val message = receiveSingleMessageObject(queueUrl).orNull + val message = receiveSingleMessageObject(queueUrl, requestedAttributes).orNull // Then message.getBody should be(content) - message.getMessageAttributes should be(sendMessage.getMessageAttributes) // Checks they match + checkMessageAttributesMatchRequestedAttributes(messageAttributes, requestedAttributes, sendMessage, message) + } + + private def checkMessageAttributesMatchRequestedAttributes(messageAttributes: Map[String, MessageAttribute], + requestedAttributes: List[String], + sendMessage: SendMessageRequest, + message: Message) = { + val filteredSendMessageAttr = filterBasedOnRequestedAttributes(requestedAttributes, sendMessage.getMessageAttributes.toMap).asJava + val filteredMessageAttributes = filterBasedOnRequestedAttributes(requestedAttributes, messageAttributes) + + message.getMessageAttributes should be(filteredSendMessageAttr) // Checks they match message.getMessageAttributes.map { case (k, attr) => (k, if (attr.getDataType.startsWith("String") && attr.getStringValue != null) { StringMessageAttribute(attr.getStringValue).stringValue @@ -221,7 +253,7 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter } else { BinaryMessageAttribute.fromByteBuffer(attr.getBinaryValue).asBase64 }) - } should be(messageAttributes.map { case (k, attr) => + } should be(filteredMessageAttributes.map { case (k, attr) => (k, attr match { case s: StringMessageAttribute => s.stringValue case n: NumberMessageAttribute => n.stringValue @@ -230,11 +262,25 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter }) // Checks they match map } + private def filterBasedOnRequestedAttributes[T](requestedAttributes: List[String], messageAttributes: Map[String, T]): Map[String, T] = { + if (requestedAttributes.contains("All")) { + messageAttributes + } else { + messageAttributes.filterKeys(k => requestedAttributes.contains(k)) + } + } + + // Alias for send and receive with no attributes def doTestSendAndReceiveMessage(content: String) { - doTestSendAndReceiveMessageWithAttributes(content, Map()) + doTestSendAndReceiveMessageWithAttributes(content, Map(), List()) } + def doTestSendAndReceiveMessageWithAttributes(content: String, messageAttributes: Map[String, MessageAttribute]): Unit = { + doTestSendAndReceiveMessageWithAttributes(content, messageAttributes, List("All")) + } + + test("should receive two messages in a batch") { // Given val queueUrl = client.createQueue(new CreateQueueRequest("testQueue1")).getQueueUrl @@ -926,7 +972,11 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter .toLong } - def receiveSingleMessage(queueUrl: String) = { + def receiveSingleMessage(queueUrl: String): Option[String] = { + receiveSingleMessage(queueUrl, List("All")) + } + + def receiveSingleMessage(queueUrl: String, requestedAttributes: List[String]): Option[String] = { val messages = client.receiveMessage(new ReceiveMessageRequest(queueUrl)).getMessages if (messages.size() == 0) { None @@ -935,8 +985,12 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter } } - def receiveSingleMessageObject(queueUrl: String) = { - val messages = client.receiveMessage(new ReceiveMessageRequest(queueUrl)).getMessages + def receiveSingleMessageObject(queueUrl: String): Option[Message] = { + receiveSingleMessageObject(queueUrl, List("All")) + } + + def receiveSingleMessageObject(queueUrl: String, requestedAttributes: List[String]): Option[Message] = { + val messages = client.receiveMessage(new ReceiveMessageRequest(queueUrl).withMessageAttributeNames(requestedAttributes)).getMessages if (messages.size() == 0) { None } else { diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala index f17bec358..d56af6b68 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala @@ -16,6 +16,7 @@ trait ReceiveMessageDirectives { this: ElasticMQDirectives with AttributesModule val SenderIdAttribute = "SenderId" val MaxNumberOfMessagesAttribute = "MaxNumberOfMessages" val WaitTimeSecondsAttribute = "WaitTimeSeconds" + val MessageAttributeNamePattern = "MessageAttributeName\\.\\d".r val AllAttributeNames = SentTimestampAttribute :: ApproximateReceiveCountAttribute :: ApproximateFirstReceiveTimestampAttribute :: SenderIdAttribute :: Nil @@ -38,6 +39,8 @@ trait ReceiveMessageDirectives { this: ElasticMQDirectives with AttributesModule val waitTimeSecondsFromParameters = waitTimeSecondsAttributeOpt.map(Duration.standardSeconds) + val messageAttributeNames = getMessageAttributeNames(p) + ifStrictLimits(maxNumberOfMessagesFromParameters < 1 || maxNumberOfMessagesFromParameters > 10) { "ReadCountOutOfRange" } @@ -64,20 +67,29 @@ trait ReceiveMessageDirectives { this: ElasticMQDirectives with AttributesModule }).toString)) } + def getFilteredAttributeNames(messageAttributeNames: Iterable[String], msg: MessageData) = { + if (messageAttributeNames.exists(s => s == "All" || s == ".*")) { + msg.messageAttributes + } else { + msg.messageAttributes.filterKeys(k => messageAttributeNames.exists(s => s == k || k.r.findFirstIn(s).isDefined)) + } + } + msgsFuture.map { msgs => respondWith { {msgs.map { msg => val receipt = msg.deliveryReceipt.map(_.receipt).getOrElse(throw new RuntimeException("No receipt for a received msg.")) + val filteredMessageAttributes = getFilteredAttributeNames(messageAttributeNames, msg) {msg.id.id} {receipt} {md5Digest(msg.content)} {XmlUtil.convertTexWithCRToNodeSeq(msg.content)} {attributesToXmlConverter.convert(calculateAttributeValues(msg))} - {md5AttributeDigest(msg.messageAttributes)} - {messageAttributesToXmlConverter.convert(msg.messageAttributes.toList)} + {md5AttributeDigest(filteredMessageAttributes)} + {messageAttributesToXmlConverter.convert(filteredMessageAttributes.toList)} }} @@ -89,4 +101,9 @@ trait ReceiveMessageDirectives { this: ElasticMQDirectives with AttributesModule } } } + + def getMessageAttributeNames(p: AnyParams): Iterable[String] = { + p.filterKeys(k => MessageReadeableAttributeNames.MessageAttributeNamePattern.findFirstIn(k).isDefined).values + } + } \ No newline at end of file