Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/org/elasticmq/MessageAttribute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ case class BinaryMessageAttribute(binaryValue: Array[Byte], override val customT

def asBase64: String = Base64.getEncoder.encodeToString(binaryValue)
}

object BinaryMessageAttribute {
def fromBase64(base64Str: String, customType: Option[String] = None): BinaryMessageAttribute =
BinaryMessageAttribute(
binaryValue = Base64.getDecoder.decode(base64Str),
customType = customType
)

def fromByteBuffer(byteBuffer: ByteBuffer, customType: Option[String] = None) =
def fromByteBuffer(byteBuffer: ByteBuffer, customType: Option[String] = None): BinaryMessageAttribute =
BinaryMessageAttribute(
binaryValue = {
byteBuffer.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package org.elasticmq.rest.sqs

import org.elasticmq.{BinaryMessageAttribute, MessageAttribute, NumberMessageAttribute, StringMessageAttribute}
import org.scalatest.matchers.should.Matchers
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.sqs.model.{GetQueueUrlRequest => AwsSdkGetQueueUrlRequest, _}

import scala.collection.JavaConverters._
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.sqs.model._
import software.amazon.awssdk.services.sqs.model.{GetQueueUrlRequest => AwsSdkGetQueueUrlRequest}

class AmazonJavaSdkV2TestSuite extends SqsClientServerWithSdkV2Communication with Matchers {

Expand Down Expand Up @@ -46,8 +45,7 @@ class AmazonJavaSdkV2TestSuite extends SqsClientServerWithSdkV2Communication wit
Map(
"red" -> StringMessageAttribute("fish"),
"blue" -> StringMessageAttribute("cat"),
// affected by https://github.com/softwaremill/elasticmq/issues/946
// "green" -> BinaryMessageAttribute("dog".getBytes("UTF-8")),
"green" -> BinaryMessageAttribute("dog".getBytes("UTF-8")),
"yellow" -> NumberMessageAttribute("1234567890"),
"orange" -> NumberMessageAttribute("0987654321", Some("custom"))
)
Expand All @@ -60,8 +58,7 @@ class AmazonJavaSdkV2TestSuite extends SqsClientServerWithSdkV2Communication wit
Map(
"red" -> StringMessageAttribute("fish"),
"blue" -> StringMessageAttribute("cat"),
// affected by https://github.com/softwaremill/elasticmq/issues/946
// "green" -> BinaryMessageAttribute("dog".getBytes("UTF-8")),
"green" -> BinaryMessageAttribute("dog".getBytes("UTF-8")),
"yellow" -> NumberMessageAttribute("1234567890"),
"orange" -> NumberMessageAttribute("0987654321", Some("custom"))
),
Expand All @@ -75,8 +72,7 @@ class AmazonJavaSdkV2TestSuite extends SqsClientServerWithSdkV2Communication wit
Map(
"red" -> StringMessageAttribute("fish"),
"blue" -> StringMessageAttribute("cat"),
// affected by https://github.com/softwaremill/elasticmq/issues/946
// "green" -> BinaryMessageAttribute("dog".getBytes("UTF-8")),
"green" -> BinaryMessageAttribute("dog".getBytes("UTF-8")),
"yellow" -> NumberMessageAttribute("1234567890"),
"orange" -> NumberMessageAttribute("0987654321", Some("custom"))
),
Expand All @@ -89,17 +85,17 @@ class AmazonJavaSdkV2TestSuite extends SqsClientServerWithSdkV2Communication wit
}

private def doTestSendAndReceiveMessageWithAttributes(
content: String,
messageAttributes: Map[String, MessageAttribute]
): Unit = {
content: String,
messageAttributes: Map[String, MessageAttribute]
): Unit = {
doTestSendAndReceiveMessageWithAttributes(content, messageAttributes, List("All"))
}

private def doTestSendAndReceiveMessageWithAttributes(
content: String,
messageAttributes: Map[String, MessageAttribute],
requestedAttributes: List[String]
): Unit = {
content: String,
messageAttributes: Map[String, MessageAttribute],
requestedAttributes: List[String]
): Unit = {
// Given
val queue = clientV2.createQueue(CreateQueueRequest.builder().queueName("testQueue1").build())

Expand Down Expand Up @@ -148,11 +144,11 @@ class AmazonJavaSdkV2TestSuite extends SqsClientServerWithSdkV2Communication wit
}

private def checkMessageAttributesMatchRequestedAttributes(
messageAttributes: Map[String, MessageAttribute],
requestedAttributes: List[String],
sendMessageRequest: SendMessageRequest,
message: Message
) = {
messageAttributes: Map[String, MessageAttribute],
requestedAttributes: List[String],
sendMessageRequest: SendMessageRequest,
message: Message
) = {
val filteredSendMessageAttr =
filterBasedOnRequestedAttributes(requestedAttributes, sendMessageRequest.messageAttributes.asScala.toMap).asJava
val filteredMessageAttributes = filterBasedOnRequestedAttributes(requestedAttributes, messageAttributes)
Expand Down Expand Up @@ -182,9 +178,9 @@ class AmazonJavaSdkV2TestSuite extends SqsClientServerWithSdkV2Communication wit
}

private def filterBasedOnRequestedAttributes[T](
requestedAttributes: List[String],
messageAttributes: Map[String, T]
): Map[String, T] = {
requestedAttributes: List[String],
messageAttributes: Map[String, T]
): Map[String, T] = {
if (requestedAttributes.contains("All")) {
messageAttributes
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.elasticmq.rest.sqs

import org.elasticmq.{BinaryMessageAttribute, MessageAttribute, NumberMessageAttribute, StringMessageAttribute}
import spray.json.DefaultJsonProtocol._
import spray.json.{JsObject, JsString, JsValue, JsonReader, RootJsonFormat}
import spray.json.{JsArray, JsObject, JsString, JsValue, JsonReader, RootJsonFormat}

trait MessageAttributesSupport {

Expand All @@ -18,20 +18,31 @@ trait MessageAttributesSupport {
case StringMessageAttribute(value, customType) =>
JsObject("DataType" -> JsString("String" + customTypeAsString(customType)), "StringValue" -> JsString(value))
case msg: BinaryMessageAttribute =>
JsObject("DataType" -> JsString("Binary" + customTypeAsString(msg.customType)), "BinaryValue" -> JsString(msg.asBase64))
JsObject(
"DataType" -> JsString("Binary" + customTypeAsString(msg.customType)),
"BinaryValue" -> JsString(msg.asBase64)
)
}

override def read(json: JsValue): MessageAttribute = {
val fields = json.asJsObject.fields

def pickFieldRaw(name: String) =
fields.getOrElse(name, throw new SQSException(s"Field $name is required"))

def pickField[O: JsonReader](name: String) =
fields.getOrElse(name, throw new SQSException(s"Field $name is required")).convertTo[O]
pickFieldRaw(name).convertTo[O]

val dataType = pickField[String]("DataType")
dataType match {
case SomeNumber(ct) => NumberMessageAttribute(pickField[String]("StringValue"), customType(ct))
case SomeString(ct) => StringMessageAttribute(pickField[String]("StringValue"), customType(ct))
case SomeBinary(ct) => BinaryMessageAttribute(pickField[Array[Byte]]("BinaryValue"), customType(ct))
case SomeBinary(ct) =>
pickFieldRaw("BinaryValue") match {
case arr: JsArray => BinaryMessageAttribute(arr.convertTo[Array[Byte]], customType(ct))
case str: JsString => BinaryMessageAttribute.fromBase64(str.convertTo[String], customType(ct))
case any: Any => throw new SQSException(s"Field BinaryValue has unsupported type $any")
}
case _ =>
throw new Exception("Currently only handles String, Number and Binary typed attributes")
}
Expand Down