diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d93e64c7..e4adf0cd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,20 +27,6 @@ jobs: with: node-version: 16 - - name: Install AWS cli 1 - run: | - curl "https://s3.amazonaws.com/aws-cli/awscli-bundle-1.27.127.zip" -o "awscliv1.zip" - unzip ./awscliv1.zip - ls - sudo ./awscli-bundle/install -i /usr/local/aws1 -b /usr/local/bin/aws1 - - - name: Install AWS cli 2 - run: | - curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" - unzip ./awscliv2.zip - ls - sudo ./aws/install -i /usr/local/aws2 -b /usr/local/bin/aws2 - - name: Cache SBT id: cache-sbt uses: actions/cache@v2 @@ -76,18 +62,6 @@ jobs: with: node-version: 16 - - name: Install AWS cli 1 - run: | - curl "https://s3.amazonaws.com/aws-cli/awscli-bundle-1.27.127.zip" -o "awscliv1.zip" - unzip -qq ./awscliv1.zip - sudo ./awscli-bundle/install -i /usr/local/aws1 -b /usr/local/bin/aws1 - - - name: Install AWS cli 2 - run: | - curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" - unzip -qq ./awscliv2.zip - sudo ./aws/install -i /usr/local/aws2 -b /usr/local/bin/aws2 - - name: Cache SBT id: cache-sbt uses: actions/cache@v2 @@ -122,18 +96,6 @@ jobs: with: node-version: 16 - - name: Install AWS cli 1 - run: | - curl "https://s3.amazonaws.com/aws-cli/awscli-bundle-1.27.127.zip" -o "awscliv1.zip" - unzip -qq ./awscliv1.zip - sudo ./awscli-bundle/install -i /usr/local/aws1 -b /usr/local/bin/aws1 - - - name: Install AWS cli 2 - run: | - curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" - unzip -qq ./awscliv2.zip - sudo ./aws/install -i /usr/local/aws2 -b /usr/local/bin/aws2 - - name: Cache SBT id: cache-sbt uses: actions/cache@v2 diff --git a/build.sbt b/build.sbt index bb87df55..e7f502f1 100644 --- a/build.sbt +++ b/build.sbt @@ -52,11 +52,6 @@ val sprayJson = "io.spray" %% "spray-json" % "1.3.6" val pekkoHttpSprayJson = "org.apache.pekko" %% "pekko-http-spray-json" % pekkoHttpVersion val pekkoHttpTestkit = "org.apache.pekko" %% "pekko-http-testkit" % pekkoHttpVersion % "test" -val awsSpringMessagingVersion = "2.2.6.RELEASE" -val springVersion = "5.3.34" -val awsSpringMessaging = "org.springframework.cloud" % "spring-cloud-aws-messaging" % awsSpringMessagingVersion -val springWeb = "org.springframework" % "spring-web" % springVersion - val scala2Async = "org.scala-lang.modules" %% "scala-async" % "1.0.1" val scala3Async = "com.github.rssh" %% "shim-scala-async-dotty-cps-async" % "0.9.21" // allows cross compilation w/o changes in source code @@ -211,9 +206,7 @@ lazy val restSqsTestingAmazonJavaSdk: Project = libraryDependencies ++= Seq( amazonJavaSdkSqs, amazonJavaV2SdkSqs, - awsSpringMessaging, - jclOverSlf4j, - springWeb + jclOverSlf4j ) ++ common, publish / skip := true ) diff --git a/core/src/main/scala/org/elasticmq/MessageAttribute.scala b/core/src/main/scala/org/elasticmq/MessageAttribute.scala index cf57b26e..802b38ad 100644 --- a/core/src/main/scala/org/elasticmq/MessageAttribute.scala +++ b/core/src/main/scala/org/elasticmq/MessageAttribute.scala @@ -23,17 +23,17 @@ case class NumberMessageAttribute(stringValue: String, override val customType: protected override val primaryDataType: String = "Number" } -case class BinaryMessageAttribute(binaryValue: Array[Byte], override val customType: Option[String] = None) +case class BinaryMessageAttribute(binaryValue: Seq[Byte], override val customType: Option[String] = None) extends MessageAttribute(customType) { protected override val primaryDataType: String = "Binary" - def asBase64: String = Base64.getEncoder.encodeToString(binaryValue) + def asBase64: String = Base64.getEncoder.encodeToString(binaryValue.toArray) } object BinaryMessageAttribute { def fromBase64(base64Str: String, customType: Option[String] = None): BinaryMessageAttribute = BinaryMessageAttribute( - binaryValue = Base64.getDecoder.decode(base64Str), + binaryValue = Base64.getDecoder.decode(base64Str).toSeq, customType = customType ) @@ -44,7 +44,7 @@ object BinaryMessageAttribute { val value = new Array[Byte](byteBuffer.capacity()) byteBuffer.get(value) value - }, + }.toSeq, customType = customType ) } diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonCliTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonCliTestSuite.scala deleted file mode 100644 index 4226f0a8..00000000 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonCliTestSuite.scala +++ /dev/null @@ -1,543 +0,0 @@ -package org.elasticmq.rest.sqs - -import org.elasticmq.StringMessageAttribute -import org.elasticmq.rest.sqs.model.RedrivePolicy -import org.elasticmq.rest.sqs.model.RedrivePolicyJson.format -import org.scalatest.matchers.should.Matchers -import org.scalatest.prop.TableDrivenPropertyChecks -import org.scalatest.{Inside, LoneElement, OptionValues, Tag} -import spray.json.DefaultJsonProtocol.listFormat -import spray.json._ - -import scala.collection.mutable -import scala.language.postfixOps -import scala.sys.process._ - -object Only213 extends Tag("org.elasticmq.rest.sqs.Only213") -class AmazonCliTestSuite - extends SqsClientServerCommunication - with Matchers - with TableDrivenPropertyChecks - with OptionValues - with Inside - with LoneElement { - - val cliVersions = Table( - "cli version", - AWSCli(Option(System.getenv("AWS_CLI_V1_EXECUTABLE")).getOrElse("aws1"), "aws version 1"), - AWSCli(Option(System.getenv("AWS_CLI_V2_EXECUTABLE")).getOrElse("aws"), "aws version 2") - ) - - def createQueue(name: String, attributesJson: Option[String] = None)(implicit cli: AWSCli): CreateQueueResponse = { - val attributesOption = attributesJson.fold("")(json => s"--attributes='$json'") - val result = - s"""${cli.executable} sqs create-queue --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-name=$name $attributesOption""" !! - - result.parseJson.convertTo[CreateQueueResponse] - } - - def tag(url: String, tags: String)(implicit cli: AWSCli): Unit = - s"""${cli.executable} sqs tag-queue --tags="$tags" --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=$url""" !! - - def untag(url: String, tags: String)(implicit cli: AWSCli): Unit = - s"""${cli.executable} sqs untag-queue --tag-keys="$tags" --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=$url""" !! - - def listTags(url: String)(implicit cli: AWSCli): ListQueueTagsResponse = { - val result = - s"""${cli.executable} sqs list-queue-tags --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=$url""" !! - - result.parseJson.convertTo[ListQueueTagsResponse] - } - - def deleteQueue(url: String)(implicit cli: AWSCli): Unit = { - s"""${cli.executable} sqs delete-queue --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=$url """ !! - } - - def getQueueUrl(name: String)(implicit cli: AWSCli): GetQueueURLResponse = { - val result = - s"""${cli.executable} sqs get-queue-url --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-name=$name""" !! - - result.parseJson.convertTo[GetQueueURLResponse] - } - - def listQueues(prefix: Option[String] = None)(implicit cli: AWSCli): ListQueuesResponse = { - val prefixStr = prefix.fold("")(s => s"--queue-name-prefix=$s") - - val result = - s"""${cli.executable} sqs list-queues $prefixStr --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request""" !! - - if (result.nonEmpty) { - result.parseJson.convertTo[ListQueuesResponse] - } else { - ListQueuesResponse(List.empty) - } - } - - def sendMessage(messageBody: String, queueUrl: String)(implicit cli: AWSCli): SendMessageResponse = { - val result = - s"""${cli.executable} sqs send-message --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=$queueUrl --message-body=$messageBody""" !! - - result.parseJson.convertTo[SendMessageResponse] - } - - def sendMessageWithAttributes( - messageBody: String, - queueUrl: String, - messageAttributes: String - )(implicit cli: AWSCli): SendMessageResponse = { - val result = - s"""${cli.executable} sqs send-message --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=$queueUrl --message-body=$messageBody --message-attributes='$messageAttributes'""" !! - - result.parseJson.convertTo[SendMessageResponse] - } - - def sendMessage( - messageBody: String, - queueUrl: String, - messageAttributes: Option[String], - systemAttributes: Option[String] - )(implicit cli: AWSCli): SendMessageResponse = { - val messageAttributesStr = messageAttributes.fold("")(v => s"--message-attributes='$v'") - val systemAttributesStr = systemAttributes.fold("")(v => s"--message-system-attributes='$v'") - - val result = - s"""${cli.executable} sqs send-message --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=$queueUrl --message-body=$messageBody $messageAttributesStr $systemAttributesStr""" !! - - result.parseJson.convertTo[SendMessageResponse] - } - - def sendMessageWithAttributesAndSystemAttributes( - messageBody: String, - queueUrl: String, - messageAttributes: String, - systemAttributes: String - )(implicit cli: AWSCli): SendMessageResponse = - sendMessage(messageBody, queueUrl, Some(messageAttributes), Some(systemAttributes)) - - def receiveMessage( - queueUrl: String, - attributeNames: String = "All", - messageAttributeNames: String = "All", - maxNumberOfMessages: String = "10" - )(implicit - cli: AWSCli - ): ReceiveMessageResponse = { - val attributeNamesStr = s"--attribute-names='$attributeNames'" - val messageAttributeNamesStr = s"--message-attribute-names='$messageAttributeNames'" - val maxNumberOfMessagesStr = s"--max-number-of-messages='$maxNumberOfMessages'" - - val result = - s"""${cli.executable} sqs receive-message --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=$queueUrl $attributeNamesStr $messageAttributeNamesStr $maxNumberOfMessagesStr""" !! - - result.parseJson.convertTo[ReceiveMessageResponse] - } - - def sendMessageBatch(queueUrl: String, entries: String)(implicit - cli: AWSCli - ): BatchResponse[BatchMessageSendResponseEntry] = { - val result = - s"""${cli.executable} sqs send-message-batch --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=$queueUrl --entries='[$entries]'""" !! - - result.parseJson.convertTo[BatchResponse[BatchMessageSendResponseEntry]] - } - - def deleteMessage(queueUrl: String, receiptHandle: String)(implicit cli: AWSCli): Unit = { - s"""${cli.executable} sqs delete-message --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=$queueUrl --receipt-handle="$receiptHandle" """ !! - } - - def deleteMessageBatch(queueUrl: String, entries: String)(implicit - cli: AWSCli - ): List[BatchDeleteMessageResponseEntry] = { - val result = - s"""${cli.executable} sqs delete-message-batch --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=$queueUrl --entries='[$entries]'""" !! - - result.parseJson.asJsObject.fields - .getOrElse("Successful", fail("couldn't find successful field")) - .convertTo[List[BatchDeleteMessageResponseEntry]] - } - - forAll(cliVersions) { implicit version => - test(s"should create a queue and get queue url ${version.name}") { - - val create = createQueue("test-queue") - - val get = getQueueUrl("test-queue") - - get.QueueUrl shouldBe create.QueueUrl - } - - test(s"should list created queues ${version.name}") { - createQueue("test-queue1") - createQueue("test-queue2") - - val get = listQueues() - - get.QueueUrls should contain allOf (s"$ServiceEndpoint/$awsAccountId/test-queue2", s"$ServiceEndpoint/$awsAccountId/test-queue1") - } - - test(s"should list queues with the specified prefix ${version.name}") { - // Given - createQueue("aaa-test-queue1") - createQueue("aaa-test-queue2") - createQueue("bbb-test-queue2") - - // When - val get = listQueues(prefix = Some("aaa")) - - get.QueueUrls.length shouldBe 2 - get.QueueUrls should contain allOf (s"$ServiceEndpoint/$awsAccountId/aaa-test-queue2", s"$ServiceEndpoint/$awsAccountId/aaa-test-queue1") - } - - test(s"should send a single message to queue ${version.name}", Only213) { - // given - createQueue("test-queue") - val queueUrl = getQueueUrl("test-queue").QueueUrl - - // when - val message = sendMessage("simpleMessage", queueUrl) - - // then - message.MessageId.isEmpty shouldBe false - message.MD5OfMessageBody.isEmpty shouldBe false - message.MD5OfMessageAttributes shouldBe empty - message.MD5OfMessageSystemAttributes shouldBe empty - message.SequenceNumber shouldBe empty - } - - test(s"should send a single message with Attributes to queue ${version.name}", Only213) { - // given - createQueue("test-queue") - val queueUrl = getQueueUrl("test-queue").QueueUrl - val messageAttributes = """{ "firstAttribute": { "DataType": "String", "StringValue": "hello world" } }""" - - // when - val message = sendMessageWithAttributes("simpleMessage", queueUrl, messageAttributes) - - // then - message.MessageId.isEmpty shouldBe false - message.MD5OfMessageBody.isEmpty shouldBe false - message.MD5OfMessageAttributes.isEmpty shouldBe false - message.MD5OfMessageSystemAttributes shouldBe empty - message.SequenceNumber shouldBe empty - } - - test(s"should send a single message with Attributes and System Attributes to queue ${version.name}", Only213) { - // given - createQueue("test-queue") - val queueUrl = getQueueUrl("test-queue").QueueUrl - val messageAttributes = """{ "firstAttribute": { "DataType": "String", "StringValue": "hello world" } }""" - val systemAttributes = - """{ "AWSTraceHeader": { "DataType": "String", "StringValue": "your-xray-trace-header-string" } }""" - - // when - val message = - sendMessageWithAttributesAndSystemAttributes("simpleMessage", queueUrl, messageAttributes, systemAttributes) - - // then - message.MessageId.isEmpty shouldBe false - message.MD5OfMessageBody.isEmpty shouldBe false - message.MD5OfMessageAttributes.isEmpty shouldBe false - message.MD5OfMessageSystemAttributes.isEmpty shouldBe false - message.SequenceNumber shouldBe empty - } - - test(s"should fail if message is sent to missing queue ${version.name}", Only213) { - // given - val outLines = mutable.ListBuffer.empty[String] - val errLines = mutable.ListBuffer.empty[String] - - val logger = new ProcessLogger { - override def out(s: => String): Unit = outLines.append(s) - - override def err(s: => String): Unit = errLines.append(s) - - override def buffer[T](f: => T): T = f - } - - // when - val queueUrl = s"$ServiceEndpoint/$awsAccountId/miss" - - intercept[Exception] { - s"""${version.executable} sqs send-message --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --message-body=hello --queue-url=$queueUrl""" !! (logger) - } - - // then - outLines.mkString("\n") shouldBe empty - errLines.mkString("\n") should include("The specified queue does not exist.") - } - - test(s"should tag, untag and list queue tags ${version.name}", Only213) { - // given - val url = createQueue("test-queue").QueueUrl - - // when - tag(url, "a='A', b='B'") - - // then - listTags(url).Tags shouldBe Map("a" -> "A", "b" -> "B") - - // when - untag(url, "b") - - // then - listTags(url).Tags shouldBe Map("a" -> "A") - - deleteQueue(url) - } - - test(s"should add permission ${version.name}") { - // given - val url = createQueue("permission-test").QueueUrl - - // when ~> then - s"""${version.executable} sqs add-permission --label l --aws-account-ids=$awsAccountId --actions=get --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=$url""" !! - } - - test(s"should remove permission ${version.name}", Only213) { - // given - val url = createQueue("permission-test").QueueUrl - s"""${version.executable} sqs add-permission --label l --aws-account-ids=$awsAccountId --actions=get --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=$url""" ! - - // when ~> then - s"""${version.executable} sqs remove-permission --label l --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=$url""" !! - } - - test(s"should purge queue ${version.name}") { - // given - val url = createQueue("permission-test").QueueUrl - - // when ~> then - s"""${version.executable} sqs purge-queue --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=$url""" !! - } - - test(s"should delete created queue with ${version.name}") { - // given - val queue = createQueue("test-queue") - val listQueuesBeforeDelete = listQueues() - listQueuesBeforeDelete.QueueUrls.size shouldBe 1 - - // when - deleteQueue(queue.QueueUrl) - val listQueuesAfterDelete = listQueues() - - // then - listQueuesAfterDelete.QueueUrls.size shouldBe 0 - } - - test(s"should receive message with ${version.name}", Only213) { - // given - val queue = createQueue("test-queue") - val firstMessageBody = "simpleMessageOne" - val secondMessageBody = "simpleMessageTwo" - val firstMessageAttributes = - """{ "firstAttribute": { "DataType": "String", "StringValue": "hello world one" } }""" - val secondMessageAttributes = - """{ "secondAttribute": { "DataType": "String", "StringValue": "hello world two" } }""" - val firstMessage = sendMessageWithAttributes(firstMessageBody, queue.QueueUrl, firstMessageAttributes) - val secondMessage = sendMessageWithAttributes(secondMessageBody, queue.QueueUrl, secondMessageAttributes) - - // when - val receivedMessage = receiveMessage(queue.QueueUrl, "All", "All", "2") - - // then - receivedMessage.Messages.size shouldBe 2 - inside(receivedMessage.Messages.head) { case msg => - import msg._ - MessageId shouldBe firstMessage.MessageId - ReceiptHandle.nonEmpty shouldBe true - MD5OfBody shouldBe firstMessage.MD5OfMessageBody - Body shouldBe firstMessageBody - Attributes.get.size shouldBe 4 - Attributes.get.get("SentTimestamp").map(v => v.nonEmpty shouldBe true) - Attributes.get.get("ApproximateReceiveCount").map(_ shouldBe "1") - Attributes.get.get("ApproximateFirstReceiveTimestamp").map(v => v.nonEmpty shouldBe true) - Attributes.get.get("SenderId").map(_ shouldBe "127.0.0.1") - MD5OfMessageAttributes.nonEmpty shouldBe true - MessageAttributes.get.nonEmpty shouldBe true - MessageAttributes.get.get("firstAttribute").map { msgAtt => - msgAtt.getDataType() shouldBe "String" - msgAtt shouldBe a[StringMessageAttribute] - inside(msgAtt) { case stringMessageAttribute: StringMessageAttribute => - stringMessageAttribute.stringValue shouldBe "hello world one" - } - } - } - inside(receivedMessage.Messages(1)) { case msg => - import msg._ - MessageId shouldBe secondMessage.MessageId - ReceiptHandle.nonEmpty shouldBe true - MD5OfBody shouldBe secondMessage.MD5OfMessageBody - Body shouldBe secondMessageBody - Attributes.get.size shouldBe 4 - Attributes.get.get("SentTimestamp").map(v => v.nonEmpty shouldBe true) - Attributes.get.get("ApproximateReceiveCount").map(_ shouldBe "1") - Attributes.get.get("ApproximateFirstReceiveTimestamp").map(v => v.nonEmpty shouldBe true) - Attributes.get.get("SenderId").map(_ shouldBe "127.0.0.1") - MD5OfMessageAttributes.nonEmpty shouldBe true - MessageAttributes.get.nonEmpty shouldBe true - MessageAttributes.get.get("secondAttribute").map { msgAtt => - msgAtt.getDataType() shouldBe "String" - msgAtt shouldBe a[StringMessageAttribute] - inside(msgAtt) { case stringMessageAttribute: StringMessageAttribute => - stringMessageAttribute.stringValue shouldBe "hello world two" - } - } - } - } - - test(s"should send message batch with ${version.name}", Only213) { - // given - val queue = createQueue("test-queue") - val firstMessageBody = "messageOne" - val entries = s"""{"Id": "1", "MessageBody": "$firstMessageBody"}, {"Id": "2", "MessageBody": ""}""" - - // when - val batchMessages = sendMessageBatch(queue.QueueUrl, entries) - - // then - val failed = batchMessages.Failed.toList.flatten - failed.size shouldBe 1 - inside(failed.head) { case failedMessage => - import failedMessage._ - Id shouldBe "2" - SenderFault shouldBe true - Code shouldBe "InvalidAttributeValue" - Message shouldBe "The request must contain the parameter MessageBody." - } - - batchMessages.Successful.size shouldBe 1 - inside(batchMessages.Successful.head) { case successfulMessage => - import successfulMessage._ - Id shouldBe "1" - MessageId.nonEmpty shouldBe true - MD5OfMessageBody.nonEmpty shouldBe true - } - } - - test(s"should delete message with ${version.name}", Only213) { - // given - val queue = createQueue("test-queue") - val messageAttributes = - """{ "firstAttribute": { "DataType": "String", "StringValue": "hello world one" } }""" - sendMessageWithAttributes("simpleMessage", queue.QueueUrl, messageAttributes) - val receivedMessage = receiveMessage(queue.QueueUrl) - val receiptHandle = receivedMessage.Messages.head.ReceiptHandle - - // when - // then - deleteMessage(queue.QueueUrl, receiptHandle) - } - - test(s"should delete message batch with ${version.name}", Only213) { - // given - val queue = createQueue("test-queue") - val firstMessageBody = "simpleMessageOne" - val secondMessageBody = "simpleMessageTwo" - val firstMessageAttributes = - """{ "firstAttribute": { "DataType": "String", "StringValue": "hello world one" } }""" - val secondMessageAttributes = - """{ "secondAttribute": { "DataType": "String", "StringValue": "hello world two" } }""" - val firstMessage = sendMessageWithAttributes(firstMessageBody, queue.QueueUrl, firstMessageAttributes) - val secondMessage = sendMessageWithAttributes(secondMessageBody, queue.QueueUrl, secondMessageAttributes) - - val receivedMessages = receiveMessage(queue.QueueUrl) - val receiptHandles = receivedMessages.Messages.map(msg => msg.MessageId -> msg.ReceiptHandle).toMap - val entries = receiptHandles - .map { case (id, receiptHandle) => - s"""{"Id": "$id", "ReceiptHandle": "$receiptHandle"}""" - } - .mkString(", ") - - // when - val deleteMessageBatchEntries = deleteMessageBatch(queue.QueueUrl, entries) - - // then - deleteMessageBatchEntries.size shouldBe 2 - inside(deleteMessageBatchEntries.head) { case deletedMessage => - deletedMessage.Id shouldBe firstMessage.MessageId - } - inside(deleteMessageBatchEntries(1)) { case deletedMessage => - deletedMessage.Id shouldBe secondMessage.MessageId - } - } - - test(s"change message visibility with ${version.name}", Only213) { - // given - val queue = createQueue("test-queue") - sendMessageWithAttributes( - "hello", - queue.QueueUrl, - """{ "firstAttribute": { "DataType": "String", "StringValue": "hello world one" } }""" - ) - - val msg = receiveMessage(queue.QueueUrl).Messages.loneElement - - // when ~> then - s"""${version.executable} sqs change-message-visibility --visibility-timeout=100 --receipt-handle=${msg.ReceiptHandle} --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=${queue.QueueUrl}""" !! - } - - test(s"change message visibility batch with ${version.name}", Only213) { - // given - val queue = createQueue("test-queue") - val attrs = """{ "firstAttribute": { "DataType": "String", "StringValue": "hello world one" } }""" - sendMessageWithAttributes( - "hello1", - queue.QueueUrl, - attrs - ) - sendMessageWithAttributes( - "hello2", - queue.QueueUrl, - attrs - ) - sendMessageWithAttributes( - "hello3", - queue.QueueUrl, - attrs - ) - - val entries = JsArray( - receiveMessage(queue.QueueUrl).Messages.map { m => - JsObject( - "Id" -> JsString(s"${m.MessageId}"), - "ReceiptHandle" -> JsString(s"${m.ReceiptHandle}"), - "VisibilityTimeout" -> JsNumber(500) - ) - }.toVector - ).compactPrint - - // then - s"""${version.executable} sqs change-message-visibility-batch --entries='$entries' --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url=${queue.QueueUrl}""" !! - } - - test(s"should set and get queue attributes with ${version.name}", Only213) { - val queue = createQueue("test") - - // when - s"${version.executable} sqs set-queue-attributes --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url ${queue.QueueUrl} --attributes='VisibilityTimeout=99'" !! - - val result = - s"${version.executable} sqs get-queue-attributes --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url ${queue.QueueUrl} --attribute-names='VisibilityTimeout'" !! - - result.parseJson.convertTo[GetQueueAttributesResponse].Attributes.get("VisibilityTimeout") shouldBe Some("99") - } - - test(s"should get source queues of a dlq with ${version.name}", Only213) { - // given - val dlq = createQueue("testDlq") - val redrivePolicy = - RedrivePolicy("testDlq", awsRegion, awsAccountId, 1).toJson.toString.replaceAll("\"", "\\\\\"") - val main = createQueue("main", Some(s"""{"RedrivePolicy":"$redrivePolicy"}""")) - - // when - val response = - s"""${version.executable} sqs list-dead-letter-source-queues --endpoint=$ServiceEndpoint --region=us-west-1 --no-sign-request --queue-url ${dlq.QueueUrl}""" !! - - // then - val result = response.parseJson.convertTo[ListDeadLetterSourceQueuesResponse] - result.queueUrls should contain only (main.QueueUrl) - } - } -} - -case class AWSCli(executable: String, name: String) diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala index a07efdee..f2d3f97d 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala @@ -335,7 +335,7 @@ class AmazonJavaSdkTestSuite extends SqsClientServerCommunication with Matchers v match { case s: StringMessageAttribute => attr.setStringValue(s.stringValue) case n: NumberMessageAttribute => attr.setStringValue(n.stringValue) - case b: BinaryMessageAttribute => attr.setBinaryValue(ByteBuffer.wrap(b.binaryValue)) + case b: BinaryMessageAttribute => attr.setBinaryValue(ByteBuffer.wrap(b.binaryValue.toArray)) } message.addMessageAttributesEntry(k, attr) diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkV2TestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkV2TestSuite.scala deleted file mode 100644 index 96e4f4fe..00000000 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkV2TestSuite.scala +++ /dev/null @@ -1,217 +0,0 @@ -package org.elasticmq.rest.sqs - -import org.elasticmq.rest.sqs.model.RedrivePolicy -import org.elasticmq.{BinaryMessageAttribute, MessageAttribute, NumberMessageAttribute, StringMessageAttribute} -import org.elasticmq.rest.sqs.model.RedrivePolicyJson.format -import org.scalatest.matchers.should.Matchers -import software.amazon.awssdk.core.SdkBytes -import software.amazon.awssdk.services.sqs.model.{GetQueueUrlRequest => AwsSdkGetQueueUrlRequest, _} -import spray.json.enrichAny - -import scala.collection.JavaConverters._ - -class AmazonJavaSdkV2TestSuite extends SqsClientServerWithSdkV2Communication with Matchers { - - test("should create a queue") { - clientV2.createQueue(CreateQueueRequest.builder().queueName("testQueue1").build()) - } - - test("should get queue url") { - // Given - clientV2.createQueue(CreateQueueRequest.builder().queueName("testQueue1").build()) - - // When - val queueUrl = clientV2.getQueueUrl(AwsSdkGetQueueUrlRequest.builder().queueName("testQueue1").build()).queueUrl() - - // Then - queueUrl shouldEqual "http://localhost:9321/123456789012/testQueue1" - } - - test("should fail to get queue url if queue doesn't exist") { - // When - val thrown = intercept[QueueDoesNotExistException] { - clientV2.getQueueUrl(AwsSdkGetQueueUrlRequest.builder().queueName("testQueue1").build()).queueUrl() - } - - // Then - thrown.awsErrorDetails().errorCode() shouldBe "QueueDoesNotExist" - thrown.awsErrorDetails().errorMessage() shouldBe "The specified queue does not exist." - } - - test("should send and receive a simple message") { - doTestSendAndReceiveMessage("test msg 123") - } - - test("should send and receive a simple message with message attributes") { - doTestSendAndReceiveMessageWithAttributes( - "Message 1", - Map( - "red" -> StringMessageAttribute("fish"), - "blue" -> StringMessageAttribute("cat"), - "green" -> BinaryMessageAttribute("dog".getBytes("UTF-8")), - "yellow" -> NumberMessageAttribute("1234567890"), - "orange" -> NumberMessageAttribute("0987654321", Some("custom")) - ) - ) - } - - test("should send a simple message with message attributes and only receive requested attributes") { - doTestSendAndReceiveMessageWithAttributes( - "Message 1", - Map( - "red" -> StringMessageAttribute("fish"), - "blue" -> StringMessageAttribute("cat"), - "green" -> BinaryMessageAttribute("dog".getBytes("UTF-8")), - "yellow" -> NumberMessageAttribute("1234567890"), - "orange" -> NumberMessageAttribute("0987654321", Some("custom")) - ), - List("red", "green", "orange") - ) - } - - test("should send a simple message with message attributes and only receive no requested attributes by default") { - doTestSendAndReceiveMessageWithAttributes( - "Message 1", - Map( - "red" -> StringMessageAttribute("fish"), - "blue" -> StringMessageAttribute("cat"), - "green" -> BinaryMessageAttribute("dog".getBytes("UTF-8")), - "yellow" -> NumberMessageAttribute("1234567890"), - "orange" -> NumberMessageAttribute("0987654321", Some("custom")) - ), - List() - ) - } - - test("should return DeadLetterQueueSourceArn in receive message attributes") { - // Given - clientV2.createQueue(CreateQueueRequest.builder().queueName("testDlq").build()) - val queue = clientV2.createQueue(CreateQueueRequest.builder() - .queueName("testQueue1") - .attributes(Map(QueueAttributeName.REDRIVE_POLICY -> RedrivePolicy("testDlq", awsRegion, awsAccountId, 1).toJson.toString).asJava) - .build()) - - // When - clientV2.sendMessage(SendMessageRequest.builder() - .queueUrl(queue.queueUrl()) - .messageBody("test123") - .build()) - val receiveResult = clientV2.receiveMessage(ReceiveMessageRequest.builder() - .queueUrl(queue.queueUrl()) - .attributeNamesWithStrings("All") - .build()) - - // Then - receiveResult.messages().asScala.toList.flatMap(_.attributes().asScala.toList) should contain( - (MessageSystemAttributeName.DEAD_LETTER_QUEUE_SOURCE_ARN, s"arn:aws:sqs:$awsRegion:$awsAccountId:testDlq") - ) - } - - private def doTestSendAndReceiveMessage(content: String): Unit = { - doTestSendAndReceiveMessageWithAttributes(content, Map(), List()) - } - - private def doTestSendAndReceiveMessageWithAttributes( - content: String, - messageAttributes: Map[String, MessageAttribute] - ): Unit = { - doTestSendAndReceiveMessageWithAttributes(content, messageAttributes, List("All")) - } - - private def doTestSendAndReceiveMessageWithAttributes( - content: String, - messageAttributes: Map[String, MessageAttribute], - requestedAttributes: List[String] - ): Unit = { - // Given - val queue = clientV2.createQueue(CreateQueueRequest.builder().queueName("testQueue1").build()) - - // When - val attributes = messageAttributes.map { - case (k, v: StringMessageAttribute) => - k -> MessageAttributeValue.builder().dataType(v.getDataType()).stringValue(v.stringValue).build() - case (k, v: NumberMessageAttribute) => - k -> MessageAttributeValue.builder().dataType(v.getDataType()).stringValue(v.stringValue).build() - case (k, v: BinaryMessageAttribute) => - k -> MessageAttributeValue - .builder() - .dataType(v.getDataType()) - .binaryValue(SdkBytes.fromByteArray(v.binaryValue)) - .build() - } - - val sendMessageRequest = SendMessageRequest - .builder() - .queueUrl(queue.queueUrl()) - .messageBody(content) - .messageAttributes(attributes.asJava) - .build() - - clientV2.sendMessage(sendMessageRequest) - - val message = receiveSingleMessageObject(queue.queueUrl(), requestedAttributes).orNull - - // Then - message.body() shouldBe content - checkMessageAttributesMatchRequestedAttributes(messageAttributes, requestedAttributes, sendMessageRequest, message) - } - - private def receiveSingleMessageObject(queueUrl: String, requestedAttributes: List[String]): Option[Message] = { - clientV2 - .receiveMessage( - ReceiveMessageRequest - .builder() - .queueUrl(queueUrl) - .messageAttributeNames(requestedAttributes.asJava) - .build() - ) - .messages - .asScala - .headOption - } - - private def checkMessageAttributesMatchRequestedAttributes( - messageAttributes: Map[String, MessageAttribute], - requestedAttributes: List[String], - sendMessageRequest: SendMessageRequest, - message: Message - ) = { - val filteredSendMessageAttr = - filterBasedOnRequestedAttributes(requestedAttributes, sendMessageRequest.messageAttributes.asScala.toMap).asJava - val filteredMessageAttributes = filterBasedOnRequestedAttributes(requestedAttributes, messageAttributes) - - message.messageAttributes should be(filteredSendMessageAttr) - message.messageAttributes.asScala.map { case (k, attr) => - ( - k, - if (attr.dataType.startsWith("String") && attr.stringValue != null) { - StringMessageAttribute(attr.stringValue).stringValue - } else if (attr.dataType.startsWith("Number") && attr.stringValue != null) { - NumberMessageAttribute(attr.stringValue).stringValue - } else { - BinaryMessageAttribute.fromByteBuffer(attr.binaryValue.asByteBuffer()).asBase64 - } - ) - } should be(filteredMessageAttributes.map { case (k, attr) => - ( - k, - attr match { - case s: StringMessageAttribute => s.stringValue - case n: NumberMessageAttribute => n.stringValue - case b: BinaryMessageAttribute => b.asBase64 - } - ) - }) - } - - private def filterBasedOnRequestedAttributes[T]( - requestedAttributes: List[String], - messageAttributes: Map[String, T] - ): Map[String, T] = { - if (requestedAttributes.contains("All")) { - messageAttributes - } else { - messageAttributes.filterKeys(k => requestedAttributes.contains(k)).toMap - } - } -} diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/SpringAWSTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/SpringAWSTestSuite.scala deleted file mode 100644 index 33a785db..00000000 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/SpringAWSTestSuite.scala +++ /dev/null @@ -1,40 +0,0 @@ -package org.elasticmq.rest.sqs - -import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider} -import com.amazonaws.client.builder.AwsClientBuilder -import com.amazonaws.services.sqs.AmazonSQSAsyncClient -import com.amazonaws.services.sqs.model.CreateQueueRequest -import org.scalatest.matchers.should.Matchers -import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate - -class SpringAWSTestSuite extends SqsClientServerCommunication with Matchers { - - def withSpringClient(body: QueueMessagingTemplate => Unit) = { - val sqs = AmazonSQSAsyncClient - .asyncBuilder() - .withCredentials(new AWSCredentialsProvider { - override def getCredentials: AWSCredentials = null - override def refresh(): Unit = () - }) - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(ServiceEndpoint, "us-west-1")) - .build() - - val template = new QueueMessagingTemplate(sqs) - try { - body(template) - } finally (sqs.shutdown()) - } - - test("should send message with spring aws") { - - withSpringClient { template => - // given - val queue = client.createQueue(new CreateQueueRequest("spring-test-queue")) - - // when ~> then - template.convertAndSend(queue.getQueueUrl, "hello") - - } - } - -} diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/aws/AmazonJavaSdkNewTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/aws/AmazonJavaSdkNewTestSuite.scala new file mode 100644 index 00000000..2168742a --- /dev/null +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/aws/AmazonJavaSdkNewTestSuite.scala @@ -0,0 +1,417 @@ +package org.elasticmq.rest.sqs.aws + +import org.elasticmq.rest.sqs.client._ +import org.elasticmq.rest.sqs.model.RedrivePolicy +import org.elasticmq.rest.sqs.model.RedrivePolicyJson.format +import org.elasticmq.rest.sqs.{AwsConfig, SqsClientServerCommunication, SqsClientServerWithSdkV2Communication, client} +import org.elasticmq.{BinaryMessageAttribute, MessageAttribute, NumberMessageAttribute, StringMessageAttribute} +import org.scalatest.concurrent.Eventually +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import spray.json.enrichAny + +import scala.concurrent.duration.DurationInt + +abstract class AmazonJavaSdkNewTestSuite + extends AnyFunSuite + with HasSqsTestClient + with AwsConfig + with Matchers + with Eventually { + + test("should create a queue") { + testClient.createQueue("testQueue1") + } + + test("should get queue url") { + // given + testClient.createQueue("testQueue1") + + // when + val queueUrl = testClient.getQueueUrl("testQueue1").toOption.get + + // then + queueUrl shouldEqual "http://localhost:9321/123456789012/testQueue1" + } + + test("should list queues") { + // given + testClient.createQueue("testQueue1") + testClient.createQueue("testQueue2") + + // when + val queueUrls = testClient.listQueues() + + // then + queueUrls shouldBe List( + "http://localhost:9321/123456789012/testQueue1", + "http://localhost:9321/123456789012/testQueue2" + ) + } + + test("should list queues with specified prefix") { + // given + testClient.createQueue("aaa-testQueue1") + testClient.createQueue("bbb-testQueue2") + testClient.createQueue("bbb-testQueue3") + + // when + val queueUrls = testClient.listQueues(Some("bbb")) + + // then + queueUrls shouldBe List( + "http://localhost:9321/123456789012/bbb-testQueue2", + "http://localhost:9321/123456789012/bbb-testQueue3" + ) + } + + test("should fail to get queue url if queue doesn't exist") { + testClient.getQueueUrl("testQueue1") shouldBe Left( + SqsClientError(QueueDoesNotExist, "The specified queue does not exist.") + ) + } + + test("should send and receive a simple message") { + doTestSendAndReceiveMessageWithAttributes("test msg 123") + } + + test("should send and receive a simple message with message attributes") { + doTestSendAndReceiveMessageWithAttributes( + "Message 1", + Map( + "red" -> StringMessageAttribute("fish"), + "blue" -> StringMessageAttribute("cat"), + "green" -> BinaryMessageAttribute("dog".getBytes("UTF-8")), + "yellow" -> NumberMessageAttribute("1234567890"), + "orange" -> NumberMessageAttribute("0987654321", Some("custom")) + ) + ) + } + + test("should send a simple message with message attributes and only receive requested attributes") { + doTestSendAndReceiveMessageWithAttributes( + "Message 1", + Map( + "red" -> StringMessageAttribute("fish"), + "blue" -> StringMessageAttribute("cat"), + "green" -> BinaryMessageAttribute("dog".getBytes("UTF-8")), + "yellow" -> NumberMessageAttribute("1234567890"), + "orange" -> NumberMessageAttribute("0987654321", Some("custom")) + ), + List("red", "green", "orange") + ) + } + + test("should send a simple message with message attributes and only receive no requested attributes by default") { + doTestSendAndReceiveMessageWithAttributes( + "Message 1", + Map( + "red" -> StringMessageAttribute("fish"), + "blue" -> StringMessageAttribute("cat"), + "green" -> BinaryMessageAttribute("dog".getBytes("UTF-8")), + "yellow" -> NumberMessageAttribute("1234567890"), + "orange" -> NumberMessageAttribute("0987654321", Some("custom")) + ), + List() + ) + } + + test("should send and receive a simple message with message attributes and AWSTraceHeader") { + val message = doTestSendAndReceiveMessageWithAttributes( + "Message 1", + Map( + "red" -> StringMessageAttribute("fish"), + "blue" -> StringMessageAttribute("cat"), + "green" -> BinaryMessageAttribute("dog".getBytes("UTF-8")), + "yellow" -> NumberMessageAttribute("1234567890"), + "orange" -> NumberMessageAttribute("0987654321", Some("custom")) + ), + requestedAttributes = List("All"), + awsTraceHeader = Some("abc-123"), + requestedSystemAttributes = List("AWSTraceHeader") + ) + + message.attributes shouldBe Map(AWSTraceHeader -> "abc-123") + } + + test("should return DeadLetterQueueSourceArn in receive message attributes") { + // given + testClient.createQueue("testDlq") + val queue = testClient.createQueue( + "testQueue1", + Map(RedrivePolicyAttributeName -> RedrivePolicy("testDlq", awsRegion, awsAccountId, 1).toJson.compactPrint) + ) + + // when + testClient.sendMessage(queue, "test123") + val receiveResult = testClient.receiveMessage(queue, List("All")) + + // then + receiveResult.flatMap(_.attributes.toList) should contain( + (DeadLetterQueueSourceArn, s"arn:aws:sqs:$awsRegion:$awsAccountId:testDlq") + ) + } + + test("should fail if message is sent to missing queue") { + testClient.sendMessage("http://localhost:9321/123456789012/missingQueue", "test123") shouldBe Left( + SqsClientError(QueueDoesNotExist, "The specified queue does not exist.") + ) + } + + test("should tag, untag and list queue tags") { + // given + val queueUrl = testClient.createQueue("testQueue1") + + // when + testClient.tagQueue(queueUrl, Map("tag1" -> "value1", "tag2" -> "value2")) + + // then + testClient.listQueueTags(queueUrl) shouldBe Map("tag1" -> "value1", "tag2" -> "value2") + + // when + testClient.untagQueue(queueUrl, List("tag1")) + + // then + testClient.listQueueTags(queueUrl) shouldBe Map("tag2" -> "value2") + } + + test("should add permission") { + // given + val queueUrl = testClient.createQueue("testQueue1") + + // expect + testClient.addPermission(queueUrl, "l", List(awsAccountId), List("get")) + } + + test("should remove permission") { + // given + val queueUrl = testClient.createQueue("testQueue1") + testClient.addPermission(queueUrl, "l", List(awsAccountId), List("get")) + + // expect + testClient.removePermission(queueUrl, "l") + } + + test("should delete queue") { + // given + val queueUrl = testClient.createQueue("testQueue1") + + // when + testClient.deleteQueue(queueUrl) + + // then + eventually(timeout(5.seconds), interval(100.millis)) { + testClient.listQueues() shouldBe empty + } + } + + test("should purge queue") { + // given + val queueUrl = testClient.createQueue("testQueue1") + testClient.sendMessage(queueUrl, "test123") + testClient.sendMessage(queueUrl, "test234") + testClient.sendMessage(queueUrl, "test345") + + // when + testClient.purgeQueue(queueUrl) + + // then + eventually(timeout(5.seconds), interval(100.millis)) { + testClient + .getQueueAttributes(queueUrl, ApproximateNumberOfMessagesAttributeName)( + ApproximateNumberOfMessagesAttributeName.value + ) + .toInt shouldBe 0 + } + } + + test("should send message batch") { + // given + val queueUrl = testClient.createQueue("testQueue1") + + // when + val response = testClient + .sendMessageBatch(queueUrl, List(SendMessageBatchEntry("1", "test123"), SendMessageBatchEntry("2", ""))) + .toOption + .get + + // then + response.successful should have size 1 + response.successful.head.id shouldBe "1" + response.successful.head.messageId should not be empty + response.successful.head.md5OfMessageBody should not be empty + + response.failed should have size 1 + response.failed.head.id shouldBe "2" + response.failed.head.senderFault shouldBe true + response.failed.head.code shouldBe "InvalidAttributeValue" + response.failed.head.message shouldBe "The request must contain the parameter MessageBody." + } + + test("should delete message") { + // given + val queueUrl = testClient.createQueue("testQueue1") + testClient.sendMessage(queueUrl, "test123") + val message = testClient.receiveMessage(queueUrl).head + + // when + testClient.deleteMessage(queueUrl, message.receiptHandle) + + // then + eventually(timeout(5.seconds), interval(100.millis)) { + val attrs = testClient.getQueueAttributes(queueUrl, AllAttributeNames) + attrs(ApproximateNumberOfMessagesAttributeName.value).toInt shouldBe 0 + attrs(ApproximateNumberOfMessagesNotVisibleAttributeName.value).toInt shouldBe 0 + } + } + + test("should delete message batch") { + // given + val queueUrl = testClient.createQueue("testQueue1") + testClient.sendMessage(queueUrl, "test123") + testClient.sendMessage(queueUrl, "test234") + testClient.sendMessage(queueUrl, "test345") + val messages = testClient.receiveMessage(queueUrl, maxNumberOfMessages = Some(3)) + + // then + messages should have size 3 + + // when + val result = + testClient + .deleteMessageBatch( + queueUrl, + List( + DeleteMessageBatchEntry("A", messages.head.receiptHandle), + DeleteMessageBatchEntry("B", messages(1).receiptHandle) + ) + ) + .toOption + .get + + // then + result.successful should have size 2 + result.successful.map(_.id) shouldBe List("A", "B") + result.failed shouldBe empty + } + + test("should change message visibility") { + // given + val queueUrl = testClient.createQueue("testQueue1") + testClient.sendMessage(queueUrl, "test123") + val message = testClient.receiveMessage(queueUrl).head + + // when + testClient.changeMessageVisibility(queueUrl, message.receiptHandle, 0) + val messagesReceivedAgain = testClient.receiveMessage(queueUrl) + + // then + messagesReceivedAgain should have size 1 + } + + test("should change message visibility batch") { + // given + val queueUrl = testClient.createQueue("testQueue1") + testClient.sendMessage(queueUrl, "test123") + testClient.sendMessage(queueUrl, "test234") + testClient.sendMessage(queueUrl, "test345") + val messages = testClient.receiveMessage(queueUrl, maxNumberOfMessages = Some(3)) + + // then + messages should have size 3 + + // when + val result = + testClient + .changeMessageVisibilityBatch( + queueUrl, + List( + ChangeMessageVisibilityBatchEntry("A", messages.head.receiptHandle, 0), + ChangeMessageVisibilityBatchEntry("B", messages(2).receiptHandle, 0) + ) + ) + .toOption + .get + val messagesReceivedAgain = testClient.receiveMessage(queueUrl, maxNumberOfMessages = Some(3)) + + // then + result.successful should have size 2 + result.successful.map(_.id) shouldBe List("A", "B") + result.failed shouldBe empty + + messagesReceivedAgain should have size 2 + } + + test("should list dead letter source queues") { + // given + val dlq1Url = testClient.createQueue("testDlq1") + val redrivePolicyJson = RedrivePolicy("testDlq1", awsRegion, awsAccountId, 3).toJson.compactPrint + val queue1Url = testClient.createQueue("testQueue1", Map(RedrivePolicyAttributeName -> redrivePolicyJson)) + val queue2Url = testClient.createQueue("testQueue2", Map(RedrivePolicyAttributeName -> redrivePolicyJson)) + val queue4Url = testClient.createQueue("testQueue4", Map(RedrivePolicyAttributeName -> redrivePolicyJson)) + testClient.createQueue("testDlq2") + testClient.createQueue( + "testQueue3", + Map(RedrivePolicyAttributeName -> RedrivePolicy("testDlq2", awsRegion, awsAccountId, 3).toJson.compactPrint) + ) + testClient.createQueue("testQueue5") + + // when + val result = testClient.listDeadLetterSourceQueues(dlq1Url) + + // then + result should contain theSameElementsAs Set(queue1Url, queue2Url, queue4Url) + } + + private def doTestSendAndReceiveMessageWithAttributes( + content: String, + messageAttributes: Map[String, MessageAttribute] = Map.empty, + requestedAttributes: List[String] = List.empty, + awsTraceHeader: Option[String] = None, + requestedSystemAttributes: List[String] = List.empty + ) = { + // given + val queue = testClient.createQueue("testQueue1") + testClient.sendMessage(queue, content, messageAttributes, awsTraceHeader) + val message = receiveSingleMessageObject(queue, requestedAttributes, requestedSystemAttributes).orNull + + // then + message.body shouldBe content + checkMessageAttributesMatchRequestedAttributes(messageAttributes, requestedAttributes, message) + + message + } + + private def receiveSingleMessageObject( + queueUrl: String, + requestedAttributes: List[String], + requestedSystemAttributes: List[String] + ): Option[client.ReceivedMessage] = { + testClient + .receiveMessage(queueUrl, systemAttributes = requestedSystemAttributes, messageAttributes = requestedAttributes) + .headOption + } + + private def checkMessageAttributesMatchRequestedAttributes( + messageAttributes: Map[String, MessageAttribute], + requestedAttributes: List[String], + message: client.ReceivedMessage + ) = { + val filteredMessageAttributes = filterBasedOnRequestedAttributes(requestedAttributes, messageAttributes) + message.messageAttributes should be(filteredMessageAttributes) + } + + private def filterBasedOnRequestedAttributes[T]( + requestedAttributes: List[String], + messageAttributes: Map[String, T] + ): Map[String, T] = { + if (requestedAttributes.contains("All")) { + messageAttributes + } else { + messageAttributes.filterKeys(k => requestedAttributes.contains(k)).toMap + } + } +} + +class AmazonJavaSdkV1TestSuite extends AmazonJavaSdkNewTestSuite with SqsClientServerCommunication +class AmazonJavaSdkV2TestSuite extends AmazonJavaSdkNewTestSuite with SqsClientServerWithSdkV2Communication diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala index c896f5f6..d4f28f26 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala @@ -1,8 +1,11 @@ package org.elasticmq.rest.sqs.client import com.amazonaws.services.sqs.AmazonSQS -import com.amazonaws.services.sqs.model.{CancelMessageMoveTaskRequest, CreateQueueRequest, GetQueueAttributesRequest, ListMessageMoveTasksRequest, QueueDoesNotExistException, ReceiveMessageRequest, ResourceNotFoundException, SendMessageRequest, StartMessageMoveTaskRequest, UnsupportedOperationException} +import com.amazonaws.services.sqs.model.{BatchResultErrorEntry, CancelMessageMoveTaskRequest, ChangeMessageVisibilityBatchRequest, ChangeMessageVisibilityBatchRequestEntry, CreateQueueRequest, DeleteMessageBatchRequest, DeleteMessageBatchRequestEntry, GetQueueAttributesRequest, GetQueueUrlRequest, ListDeadLetterSourceQueuesRequest, ListMessageMoveTasksRequest, MessageAttributeValue, MessageSystemAttributeValue, PurgeQueueRequest, QueueDoesNotExistException, ReceiveMessageRequest, ResourceNotFoundException, SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest, StartMessageMoveTaskRequest, UnsupportedOperationException} +import org.elasticmq._ +import java.nio.ByteBuffer +import java.util import scala.collection.JavaConverters._ class AwsSdkV1SqsClient(client: AmazonSQS) extends SqsClient { @@ -21,19 +24,247 @@ class AwsSdkV1SqsClient(client: AmazonSQS) extends SqsClient { ) .getQueueUrl + override def getQueueUrl(queueName: String): Either[SqsClientError, QueueUrl] = interceptErrors { + client + .getQueueUrl( + new GetQueueUrlRequest() + .withQueueName(queueName) + ) + .getQueueUrl + } + + override def purgeQueue( + queueUrl: QueueUrl + ): Either[SqsClientError, Unit] = interceptErrors { + client.purgeQueue(new PurgeQueueRequest().withQueueUrl(queueUrl)) + } + + override def deleteQueue( + queueUrl: QueueUrl + ): Either[SqsClientError, Unit] = interceptErrors { + client.deleteQueue(queueUrl) + } + override def sendMessage( queueUrl: QueueUrl, - messageBody: String - ): Unit = client.sendMessage( - new SendMessageRequest() - .withQueueUrl(queueUrl) - .withMessageBody(messageBody) - ) - - override def receiveMessage(queueUrl: QueueUrl): List[ReceivedMessage] = - client.receiveMessage(new ReceiveMessageRequest().withQueueUrl(queueUrl)).getMessages.asScala.toList.map { msg => - ReceivedMessage(msg.getMessageId, msg.getReceiptHandle, msg.getBody) + messageBody: String, + messageAttributes: Map[String, MessageAttribute] = Map.empty, + awsTraceHeader: Option[String] = None + ): Either[SqsClientError, Unit] = interceptErrors { + client.sendMessage( + new SendMessageRequest() + .withQueueUrl(queueUrl) + .withMessageBody(messageBody) + .withMessageSystemAttributes( + mapAwsTraceHeader(awsTraceHeader) + ) + .withMessageAttributes(mapMessageAttributes(messageAttributes)) + ) + } + + override def sendMessageBatch( + queueUrl: QueueUrl, + entries: List[SendMessageBatchEntry] + ): Either[SqsClientError, SendMessageBatchResult] = interceptErrors { + val result = client.sendMessageBatch( + new SendMessageBatchRequest() + .withQueueUrl(queueUrl) + .withEntries( + entries + .map(entry => + new SendMessageBatchRequestEntry() + .withId(entry.id) + .withMessageBody(entry.messageBody) + .withDelaySeconds(entry.delaySeconds.map(Int.box).orNull) + .withMessageDeduplicationId(entry.messageDeduplicationId.orNull) + .withMessageGroupId(entry.messageGroupId.orNull) + .withMessageSystemAttributes(mapAwsTraceHeader(entry.awsTraceHeader)) + .withMessageAttributes(mapMessageAttributes(entry.messageAttributes)) + ) + .asJava + ) + ) + SendMessageBatchResult( + result.getSuccessful.asScala.map { entry => + SendMessageBatchSuccessEntry( + entry.getId, + entry.getMessageId, + entry.getMD5OfMessageBody, + Option(entry.getMD5OfMessageAttributes), + Option(entry.getMD5OfMessageSystemAttributes), + Option(entry.getSequenceNumber) + ) + }.toList, + mapBatchResultErrorEntries(result.getFailed) + ) + } + + override def deleteMessageBatch( + queueUrl: QueueUrl, + entries: List[DeleteMessageBatchEntry] + ): Either[ + SqsClientError, + DeleteMessageBatchResult + ] = interceptErrors { + val result = client.deleteMessageBatch( + new DeleteMessageBatchRequest() + .withQueueUrl(queueUrl) + .withEntries( + entries + .map(entry => + new DeleteMessageBatchRequestEntry() + .withId(entry.id) + .withReceiptHandle(entry.receiptHandle) + ) + .asJava + ) + ) + DeleteMessageBatchResult( + result.getSuccessful.asScala.map { entry => + DeleteMessageBatchSuccessEntry(entry.getId) + }.toList, + mapBatchResultErrorEntries(result.getFailed) + ) + } + + private def mapBatchResultErrorEntries( + failed: util.List[BatchResultErrorEntry] + ) = { + failed.asScala.map { entry => + BatchOperationErrorEntry( + entry.getId, + entry.isSenderFault, + entry.getCode, + entry.getMessage + ) + }.toList + } + + override def deleteMessage( + queueUrl: QueueUrl, + receiptHandle: MessageMoveTaskStatus + ): Unit = client.deleteMessage(queueUrl, receiptHandle) + + override def changeMessageVisibility( + queueUrl: QueueUrl, + receiptHandle: MessageMoveTaskStatus, + visibilityTimeout: Int + ): Unit = client.changeMessageVisibility(queueUrl, receiptHandle, visibilityTimeout) + + override def changeMessageVisibilityBatch( + queueUrl: QueueUrl, + entries: List[ + ChangeMessageVisibilityBatchEntry + ] + ): Either[ + SqsClientError, + ChangeMessageVisibilityBatchResult + ] = interceptErrors { + val result = client.changeMessageVisibilityBatch( + new ChangeMessageVisibilityBatchRequest() + .withQueueUrl(queueUrl) + .withEntries( + entries + .map(entry => + new ChangeMessageVisibilityBatchRequestEntry() + .withId(entry.id) + .withReceiptHandle(entry.receiptHandle) + .withVisibilityTimeout(entry.visibilityTimeout) + ) + .asJava + ) + ) + ChangeMessageVisibilityBatchResult( + result.getSuccessful.asScala.map { entry => + ChangeMessageVisibilityBatchSuccessEntry(entry.getId) + }.toList, + mapBatchResultErrorEntries(result.getFailed) + ) + } + + override def listDeadLetterSourceQueues(queueUrl: QueueUrl): List[QueueUrl] = client + .listDeadLetterSourceQueues(new ListDeadLetterSourceQueuesRequest().withQueueUrl(queueUrl)) + .getQueueUrls + .asScala + .toList + + private def mapAwsTraceHeader(awsTraceHeader: Option[MessageMoveTaskStatus]) = { + awsTraceHeader + .map(header => Map("AWSTraceHeader" -> new MessageSystemAttributeValue().withStringValue(header).withDataType("String")).asJava) + .orNull + } + + private def mapMessageAttributes( + messageAttributes: Map[ + MessageMoveTaskStatus, + MessageAttribute + ] + ) = { + messageAttributes.map { + case (k, v: StringMessageAttribute) => + k -> new MessageAttributeValue().withDataType(v.getDataType()).withStringValue(v.stringValue) + case (k, v: NumberMessageAttribute) => + k -> new MessageAttributeValue().withDataType(v.getDataType()).withStringValue(v.stringValue) + case (k, v: BinaryMessageAttribute) => + k -> new MessageAttributeValue() + .withDataType(v.getDataType()) + .withBinaryValue(ByteBuffer.wrap(v.binaryValue.toArray)) + }.asJava + } + + override def receiveMessage( + queueUrl: QueueUrl, + systemAttributes: List[String] = List.empty, + messageAttributes: List[String] = List.empty, + maxNumberOfMessages: Option[Int] = None + ): List[ReceivedMessage] = { + client + .receiveMessage( + new ReceiveMessageRequest() + .withQueueUrl(queueUrl) + .withAttributeNames(systemAttributes.asJava) + .withMessageAttributeNames(messageAttributes.asJava) + .withMaxNumberOfMessages(maxNumberOfMessages.map(Int.box).orNull) + ) + .getMessages + .asScala + .toList + .map { msg => + ReceivedMessage( + msg.getMessageId, + msg.getReceiptHandle, + msg.getBody, + mapSystemAttributes(msg.getAttributes), + mapMessageAttributes(msg.getMessageAttributes) + ) + } + } + + private def mapSystemAttributes( + attributes: java.util.Map[String, String] + ): Map[MessageSystemAttributeName, String] = { + attributes.asScala.toMap.map { case (k, v) => (MessageSystemAttributeName.from(k), v) } + } + + private def mapMessageAttributes( + attributes: java.util.Map[String, MessageAttributeValue] + ): Map[String, MessageAttribute] = { + attributes.asScala.toMap.map { case (k, v) => (k, mapMessageAttribute(v)) } + } + + private def mapMessageAttribute(attr: MessageAttributeValue): MessageAttribute = { + if (attr.getDataType.equals("String") && attr.getStringValue != null) { + StringMessageAttribute(attr.getStringValue) + } else if (attr.getDataType.startsWith("String.") && attr.getStringValue != null) { + StringMessageAttribute(attr.getStringValue, Some(attr.getDataType.stripPrefix("String."))) + } else if (attr.getDataType.equals("Number") && attr.getStringValue != null) { + NumberMessageAttribute(attr.getStringValue) + } else if (attr.getDataType.startsWith("Number.") && attr.getStringValue != null) { + NumberMessageAttribute(attr.getStringValue, Some(attr.getDataType.stripPrefix("Number."))) + } else { + BinaryMessageAttribute.fromByteBuffer(attr.getBinaryValue) } + } override def getQueueAttributes( queueUrl: QueueUrl, @@ -48,6 +279,26 @@ class AwsSdkV1SqsClient(client: AmazonSQS) extends SqsClient { .asScala .toMap + override def tagQueue(queueUrl: QueueUrl, tags: Map[String, String]): Unit = { + client.tagQueue(queueUrl, tags.asJava) + } + + override def untagQueue(queueUrl: QueueUrl, tagKeys: List[String]): Unit = { + client.untagQueue(queueUrl, tagKeys.asJava) + } + + override def listQueueTags( + queueUrl: QueueUrl + ): Map[String, String] = client.listQueueTags(queueUrl).getTags.asScala.toMap + + override def listQueues( + prefix: Option[String] + ): List[QueueUrl] = client + .listQueues(prefix.orNull) + .getQueueUrls + .asScala + .toList + override def startMessageMoveTask( sourceArn: Arn, maxNumberOfMessagesPerSecond: Option[Int] @@ -98,6 +349,18 @@ class AwsSdkV1SqsClient(client: AmazonSQS) extends SqsClient { .getApproximateNumberOfMessagesMoved } + override def addPermission( + queueUrl: QueueUrl, + label: MessageMoveTaskStatus, + awsAccountIds: List[MessageMoveTaskStatus], + actions: List[MessageMoveTaskStatus] + ): Unit = client.addPermission(queueUrl, label, awsAccountIds.asJava, actions.asJava) + + override def removePermission( + queueUrl: QueueUrl, + label: MessageMoveTaskStatus + ): Unit = client.removePermission(queueUrl, label) + private def interceptErrors[T](f: => T): Either[SqsClientError, T] = { try { Right(f) diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala index 48fd325f..fbb5aa17 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala @@ -1,5 +1,7 @@ package org.elasticmq.rest.sqs.client -import software.amazon.awssdk.services.sqs.model.{CancelMessageMoveTaskRequest, CreateQueueRequest, GetQueueAttributesRequest, ListMessageMoveTasksRequest, QueueDoesNotExistException, ReceiveMessageRequest, ResourceNotFoundException, SendMessageRequest, StartMessageMoveTaskRequest, UnsupportedOperationException, QueueAttributeName => AwsQueueAttributeName} +import org.elasticmq.{BinaryMessageAttribute, MessageAttribute, NumberMessageAttribute, StringMessageAttribute} +import software.amazon.awssdk.core.SdkBytes +import software.amazon.awssdk.services.sqs.model.{AddPermissionRequest, BatchResultErrorEntry, CancelMessageMoveTaskRequest, ChangeMessageVisibilityBatchRequest, ChangeMessageVisibilityBatchRequestEntry, ChangeMessageVisibilityRequest, CreateQueueRequest, DeleteMessageBatchRequest, DeleteMessageBatchRequestEntry, DeleteMessageRequest, DeleteQueueRequest, GetQueueAttributesRequest, GetQueueUrlRequest, ListDeadLetterSourceQueuesRequest, ListMessageMoveTasksRequest, ListQueueTagsRequest, ListQueuesRequest, MessageAttributeValue, MessageSystemAttributeNameForSends, MessageSystemAttributeValue, PurgeQueueRequest, QueueDoesNotExistException, ReceiveMessageRequest, RemovePermissionRequest, ResourceNotFoundException, SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest, StartMessageMoveTaskRequest, TagQueueRequest, UnsupportedOperationException, UntagQueueRequest, MessageSystemAttributeName => SdkMessageSystemAttributeName, QueueAttributeName => AwsQueueAttributeName} import scala.collection.JavaConverters._ @@ -21,22 +23,257 @@ class AwsSdkV2SqsClient(client: software.amazon.awssdk.services.sqs.SqsClient) e ) .queueUrl() + override def getQueueUrl(queueName: String): Either[SqsClientError, QueueUrl] = interceptErrors { + client + .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()) + .queueUrl() + } + + override def purgeQueue(queueUrl: QueueUrl): Either[SqsClientError, Unit] = interceptErrors { + client.purgeQueue(PurgeQueueRequest.builder().queueUrl(queueUrl).build()) + } + + override def deleteQueue(queueUrl: QueueUrl): Either[SqsClientError, Unit] = interceptErrors { + client.deleteQueue(DeleteQueueRequest.builder().queueUrl(queueUrl).build()) + } + override def sendMessage( queueUrl: QueueUrl, - messageBody: String - ): Unit = client.sendMessage( - SendMessageRequest - .builder() - .queueUrl(queueUrl) - .messageBody(messageBody) - .build() - ) - - override def receiveMessage(queueUrl: QueueUrl): List[ReceivedMessage] = - client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).build()).messages().asScala.toList.map { - msg => - ReceivedMessage(msg.messageId(), msg.receiptHandle(), msg.body()) + messageBody: String, + messageAttributes: Map[String, MessageAttribute] = Map.empty, + awsTraceHeader: Option[String] = None + ): Either[SqsClientError, Unit] = interceptErrors { + client.sendMessage( + SendMessageRequest + .builder() + .queueUrl(queueUrl) + .messageBody(messageBody) + .messageSystemAttributes(mapAwsTraceHeader(awsTraceHeader)) + .messageAttributes(mapMessageAttributes(messageAttributes)) + .build() + ) + } + + private def mapAwsTraceHeader(awsTraceHeader: Option[String]) = { + awsTraceHeader + .map(header => + Map( + MessageSystemAttributeNameForSends.AWS_TRACE_HEADER -> MessageSystemAttributeValue + .builder() + .dataType("String") + .stringValue(header) + .build() + ) + ) + .getOrElse(Map.empty) + .asJava + } + + private def mapMessageAttributes( + messageAttributes: Map[ + String, + MessageAttribute + ] + ) = { + messageAttributes.map { + case (k, v: StringMessageAttribute) => + k -> MessageAttributeValue.builder().dataType(v.getDataType()).stringValue(v.stringValue).build() + case (k, v: NumberMessageAttribute) => + k -> MessageAttributeValue.builder().dataType(v.getDataType()).stringValue(v.stringValue).build() + case (k, v: BinaryMessageAttribute) => + k -> MessageAttributeValue + .builder() + .dataType(v.getDataType()) + .binaryValue(SdkBytes.fromByteArray(v.binaryValue.toArray)) + .build() + }.asJava + } + + override def sendMessageBatch( + queueUrl: QueueUrl, + entries: List[SendMessageBatchEntry] + ): Either[SqsClientError, SendMessageBatchResult] = interceptErrors { + val result = client.sendMessageBatch( + SendMessageBatchRequest + .builder() + .queueUrl(queueUrl) + .entries( + entries + .map(entry => + SendMessageBatchRequestEntry + .builder() + .id(entry.id) + .messageBody(entry.messageBody) + .delaySeconds(entry.delaySeconds.map(Int.box).orNull) + .messageDeduplicationId(entry.messageDeduplicationId.orNull) + .messageGroupId(entry.messageGroupId.orNull) + .messageSystemAttributes(mapAwsTraceHeader(entry.awsTraceHeader)) + .messageAttributes(mapMessageAttributes(entry.messageAttributes)) + .build() + ) + .asJava + ) + .build() + ) + SendMessageBatchResult( + result.successful().asScala.toList.map { entry => + SendMessageBatchSuccessEntry( + entry.id(), + entry.messageId(), + entry.md5OfMessageBody(), + Option(entry.md5OfMessageAttributes()), + Option(entry.md5OfMessageSystemAttributes()), + Option(entry.sequenceNumber()) + ) + }, + mapBatchResultErrorEntries(result.failed()) + ) + } + + private def mapBatchResultErrorEntries(failed: java.util.List[BatchResultErrorEntry]) = { + failed.asScala.toList.map { entry => + BatchOperationErrorEntry( + entry.id(), + entry.senderFault(), + entry.code(), + entry.message() + ) } + } + + override def deleteMessageBatch( + queueUrl: QueueUrl, + entries: List[DeleteMessageBatchEntry] + ): Either[SqsClientError, DeleteMessageBatchResult] = interceptErrors { + val result = client.deleteMessageBatch( + DeleteMessageBatchRequest + .builder() + .queueUrl(queueUrl) + .entries( + entries + .map(entry => + DeleteMessageBatchRequestEntry + .builder() + .id(entry.id) + .receiptHandle(entry.receiptHandle) + .build() + ) + .asJava + ) + .build() + ) + DeleteMessageBatchResult( + result.successful().asScala.toList.map { entry => + DeleteMessageBatchSuccessEntry(entry.id()) + }, + mapBatchResultErrorEntries(result.failed()) + ) + } + + override def receiveMessage( + queueUrl: QueueUrl, + systemAttributes: List[String] = List.empty, + messageAttributes: List[String] = List.empty, + maxNumberOfMessages: Option[Int] = None + ): List[ReceivedMessage] = + client + .receiveMessage( + ReceiveMessageRequest + .builder() + .queueUrl(queueUrl) + .attributeNamesWithStrings(systemAttributes.asJava) + .messageAttributeNames(messageAttributes.asJava) + .maxNumberOfMessages(maxNumberOfMessages.map(Int.box).orNull) + .build() + ) + .messages() + .asScala + .toList + .map { msg => + ReceivedMessage( + msg.messageId(), + msg.receiptHandle(), + msg.body(), + mapSystemAttributes(msg.attributes()), + mapMessageAttributes(msg.messageAttributes()) + ) + } + + override def deleteMessage(queueUrl: QueueUrl, receiptHandle: String): Unit = + client.deleteMessage(DeleteMessageRequest.builder().queueUrl(queueUrl).receiptHandle(receiptHandle).build()) + + override def changeMessageVisibility(queueUrl: QueueUrl, receiptHandle: String, visibilityTimeout: Int): Unit = + client.changeMessageVisibility( + ChangeMessageVisibilityRequest + .builder() + .queueUrl(queueUrl) + .receiptHandle(receiptHandle) + .visibilityTimeout(visibilityTimeout) + .build() + ) + + override def changeMessageVisibilityBatch( + queueUrl: QueueUrl, + entries: List[ChangeMessageVisibilityBatchEntry] + ): Either[SqsClientError, ChangeMessageVisibilityBatchResult] = interceptErrors { + val result = client.changeMessageVisibilityBatch( + ChangeMessageVisibilityBatchRequest + .builder() + .queueUrl(queueUrl) + .entries( + entries + .map(entry => + ChangeMessageVisibilityBatchRequestEntry + .builder() + .id(entry.id) + .receiptHandle(entry.receiptHandle) + .visibilityTimeout(entry.visibilityTimeout) + .build() + ) + .asJava + ) + .build() + ) + ChangeMessageVisibilityBatchResult( + result.successful().asScala.toList.map { entry => + ChangeMessageVisibilityBatchSuccessEntry(entry.id()) + }, + mapBatchResultErrorEntries(result.failed()) + ) + } + + override def listDeadLetterSourceQueues(queueUrl: QueueUrl): List[QueueUrl] = + client + .listDeadLetterSourceQueues(ListDeadLetterSourceQueuesRequest.builder().queueUrl(queueUrl).build()) + .queueUrls() + .asScala + .toList + + private def mapSystemAttributes( + attributes: java.util.Map[SdkMessageSystemAttributeName, String] + ): Map[MessageSystemAttributeName, String] = { + attributes.asScala.toMap.map { case (k, v) => (MessageSystemAttributeName.from(k.toString), v) } + } + + private def mapMessageAttributes( + attributes: java.util.Map[String, MessageAttributeValue] + ): Map[String, MessageAttribute] = { + attributes.asScala.toMap.map { case (k, v) => (k, mapMessageAttribute(v)) } + } + + private def mapMessageAttribute(attr: MessageAttributeValue): MessageAttribute = { + if (attr.dataType().equals("String") && attr.stringValue() != null) { + StringMessageAttribute(attr.stringValue()) + } else if (attr.dataType().startsWith("String.") && attr.stringValue() != null) { + StringMessageAttribute(attr.stringValue(), Some(attr.dataType().stripPrefix("String."))) + } else if (attr.dataType().equals("Number") && attr.stringValue() != null) { + NumberMessageAttribute(attr.stringValue()) + } else if (attr.dataType().startsWith("Number.") && attr.stringValue() != null) { + NumberMessageAttribute(attr.stringValue(), Some(attr.dataType().stripPrefix("Number."))) + } else { + BinaryMessageAttribute.fromByteBuffer(attr.binaryValue().asByteBuffer()) + } + } override def getQueueAttributes( queueUrl: QueueUrl, @@ -54,6 +291,57 @@ class AwsSdkV2SqsClient(client: software.amazon.awssdk.services.sqs.SqsClient) e .map { case (k, v) => (k.toString, v) } .toMap + override def tagQueue(queueUrl: QueueUrl, tags: Map[String, String]): Unit = { + client.tagQueue( + TagQueueRequest + .builder() + .queueUrl(queueUrl) + .tags(tags.asJava) + .build() + ) + } + + override def untagQueue( + queueUrl: QueueUrl, + tagKeys: List[MessageMoveTaskStatus] + ): Unit = { + client.untagQueue( + UntagQueueRequest + .builder() + .queueUrl(queueUrl) + .tagKeys(tagKeys.asJava) + .build() + ) + } + + override def listQueueTags(queueUrl: QueueUrl): Map[String, String] = { + client + .listQueueTags( + ListQueueTagsRequest + .builder() + .queueUrl(queueUrl) + .build() + ) + .tags() + .asScala + .toMap + } + + override def listQueues( + prefix: Option[MessageMoveTaskStatus] + ): List[QueueUrl] = { + client + .listQueues( + ListQueuesRequest + .builder() + .queueNamePrefix(prefix.orNull) + .build() + ) + .queueUrls() + .asScala + .toList + } + override def startMessageMoveTask( sourceArn: Arn, maxNumberOfMessagesPerSecond: Option[Int] @@ -108,6 +396,33 @@ class AwsSdkV2SqsClient(client: software.amazon.awssdk.services.sqs.SqsClient) e .approximateNumberOfMessagesMoved() } + override def addPermission( + queueUrl: QueueUrl, + label: String, + awsAccountIds: List[String], + actions: List[String] + ): Unit = + client + .addPermission( + AddPermissionRequest + .builder() + .queueUrl(queueUrl) + .label(label) + .awsAccountIds(awsAccountIds.asJava) + .actions(actions.asJava) + .build() + ) + + override def removePermission(queueUrl: QueueUrl, label: String): Unit = + client + .removePermission( + RemovePermissionRequest + .builder() + .queueUrl(queueUrl) + .label(label) + .build() + ) + private def interceptErrors[T](f: => T): Either[SqsClientError, T] = { try { Right(f) diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala index 7fb188b5..51c2843b 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala @@ -1,12 +1,63 @@ package org.elasticmq.rest.sqs.client +import org.elasticmq.MessageAttribute trait SqsClient { def createQueue(queueName: String, attributes: Map[QueueAttributeName, String] = Map.empty): QueueUrl - def sendMessage(queueUrl: QueueUrl, messageBody: String): Unit - def receiveMessage(queueUrl: QueueUrl): List[ReceivedMessage] + def getQueueUrl(queueName: String): Either[SqsClientError, QueueUrl] + def deleteQueue(queueUrl: QueueUrl): Either[SqsClientError, Unit] + def purgeQueue(queueUrl: QueueUrl): Either[SqsClientError, Unit] def getQueueAttributes(queueUrl: QueueUrl, attributeNames: QueueAttributeName*): Map[String, String] - def startMessageMoveTask(sourceArn: Arn, maxNumberOfMessagesPerSecond: Option[Int] = None): Either[SqsClientError, TaskHandle] - def listMessageMoveTasks(sourceArn: Arn, maxResults: Option[Int] = None): Either[SqsClientError, List[MessageMoveTask]] + def tagQueue(queueUrl: QueueUrl, tags: Map[String, String]): Unit + def untagQueue(queueUrl: QueueUrl, tagKeys: List[String]): Unit + def listQueueTags(queueUrl: QueueUrl): Map[String, String] + + def listQueues(prefix: Option[String] = None): List[QueueUrl] + + def sendMessage( + queueUrl: QueueUrl, + messageBody: String, + messageAttributes: Map[String, MessageAttribute] = Map.empty, + awsTraceHeader: Option[String] = None + ): Either[SqsClientError, Unit] + + def receiveMessage( + queueUrl: QueueUrl, + systemAttributes: List[String] = List.empty, + messageAttributes: List[String] = List.empty, + maxNumberOfMessages: Option[Int] = None + ): List[ReceivedMessage] + + def deleteMessage(queueUrl: QueueUrl, receiptHandle: String): Unit + def changeMessageVisibility(queueUrl: QueueUrl, receiptHandle: String, visibilityTimeout: Int): Unit + + def sendMessageBatch( + queueUrl: QueueUrl, + entries: List[SendMessageBatchEntry] + ): Either[SqsClientError, SendMessageBatchResult] + + def deleteMessageBatch( + queueUrl: QueueUrl, + entries: List[DeleteMessageBatchEntry] + ): Either[SqsClientError, DeleteMessageBatchResult] + + def changeMessageVisibilityBatch( + queueUrl: QueueUrl, + entries: List[ChangeMessageVisibilityBatchEntry] + ): Either[SqsClientError, ChangeMessageVisibilityBatchResult] + + def startMessageMoveTask( + sourceArn: Arn, + maxNumberOfMessagesPerSecond: Option[Int] = None + ): Either[SqsClientError, TaskHandle] + def listMessageMoveTasks( + sourceArn: Arn, + maxResults: Option[Int] = None + ): Either[SqsClientError, List[MessageMoveTask]] def cancelMessageMoveTask(taskHandle: TaskHandle): Either[SqsClientError, ApproximateNumberOfMessagesMoved] + + def addPermission(queueUrl: QueueUrl, label: String, awsAccountIds: List[String], actions: List[String]): Unit + def removePermission(queueUrl: QueueUrl, label: String): Unit + + def listDeadLetterSourceQueues(queueUrl: QueueUrl): List[QueueUrl] } diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/package.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/package.scala index 6d67fc51..8846ad3d 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/package.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/package.scala @@ -9,6 +9,7 @@ package object client { } package client { + import org.elasticmq.MessageAttribute sealed abstract class QueueAttributeName(val value: String) case object AllAttributeNames extends QueueAttributeName("All") case object PolicyAttributeName extends QueueAttributeName("Policy") @@ -35,7 +36,94 @@ package client { case object RedriveAllowPolicyAttributeName extends QueueAttributeName("RedriveAllowPolicy") case object SqsManagedSseEnabledAttributeName extends QueueAttributeName("SqsManagedSseEnabled") - case class ReceivedMessage(messageId: String, receiptHandle: String, body: String) + sealed abstract class MessageSystemAttributeName(val value: String) + case object SenderId extends MessageSystemAttributeName("SenderId") + case object SentTimestamp extends MessageSystemAttributeName("SentTimestamp") + case object ApproximateReceiveCount extends MessageSystemAttributeName("ApproximateReceiveCount") + case object ApproximateFirstReceiveTimestamp extends MessageSystemAttributeName("ApproximateFirstReceiveTimestamp") + case object SequenceNumber extends MessageSystemAttributeName("SequenceNumber") + case object MessageDeduplicationId extends MessageSystemAttributeName("MessageDeduplicationId") + case object MessageGroupId extends MessageSystemAttributeName("MessageGroupId") + case object AWSTraceHeader extends MessageSystemAttributeName("AWSTraceHeader") + case object DeadLetterQueueSourceArn extends MessageSystemAttributeName("DeadLetterQueueSourceArn") + + object MessageSystemAttributeName { + def from(value: String): MessageSystemAttributeName = { + value match { + case SenderId.value => SenderId + case SentTimestamp.value => SentTimestamp + case ApproximateReceiveCount.value => ApproximateReceiveCount + case ApproximateFirstReceiveTimestamp.value => ApproximateFirstReceiveTimestamp + case SequenceNumber.value => SequenceNumber + case MessageDeduplicationId.value => MessageDeduplicationId + case MessageGroupId.value => MessageGroupId + case AWSTraceHeader.value => AWSTraceHeader + case DeadLetterQueueSourceArn.value => DeadLetterQueueSourceArn + case _ => throw new IllegalArgumentException(s"Unknown message system attribute: $value") + } + } + } + + case class SendMessageBatchEntry( + id: String, + messageBody: String, + delaySeconds: Option[Int] = None, + messageDeduplicationId: Option[String] = None, + messageGroupId: Option[String] = None, + awsTraceHeader: Option[String] = None, + messageAttributes: Map[String, MessageAttribute] = Map.empty + ) + + case class SendMessageBatchSuccessEntry( + id: String, + messageId: String, + md5OfMessageBody: String, + md5OfMessageAttributes: Option[String], + md5OfMessageSystemAttributes: Option[String], + sequenceNumber: Option[String] + ) + + case class BatchOperationErrorEntry( + id: String, + senderFault: Boolean, + code: String, + message: String + ) + + case class SendMessageBatchResult( + successful: List[SendMessageBatchSuccessEntry], + failed: List[BatchOperationErrorEntry] + ) + + case class DeleteMessageBatchEntry(id: String, receiptHandle: String) + + case class DeleteMessageBatchSuccessEntry(id: String) + + case class DeleteMessageBatchResult( + successful: List[DeleteMessageBatchSuccessEntry], + failed: List[BatchOperationErrorEntry] + ) + + case class ChangeMessageVisibilityBatchEntry( + id: String, + receiptHandle: String, + visibilityTimeout: Int + ) + + case class ChangeMessageVisibilityBatchSuccessEntry(id: String) + + case class ChangeMessageVisibilityBatchResult( + successful: List[ChangeMessageVisibilityBatchSuccessEntry], + failed: List[BatchOperationErrorEntry] + ) + + case class ReceivedMessage( + messageId: String, + receiptHandle: String, + body: String, + attributes: Map[MessageSystemAttributeName, String], + messageAttributes: Map[String, MessageAttribute] + ) case class MessageMoveTask( taskHandle: TaskHandle, diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/QueueAttributesOps.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/QueueAttributesOps.scala index 5ba196df..e8d8c7da 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/QueueAttributesOps.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/QueueAttributesOps.scala @@ -98,7 +98,12 @@ trait QueueAttributesOps extends AttributesModule with AwsConfiguration { } val rules = alwaysAvailableParameterRules ++ optionalRules.flatten ++ fifoRules - attributeValuesCalculator.calculate(attributeNames, rules: _*) + val attributeNamesToReturn = if (attributeNames.contains("All")) { + QueueReadableAttributeNames.AllAttributeNames + } else { + attributeNames + } + attributeValuesCalculator.calculate(attributeNamesToReturn, rules: _*) } Future.sequence(calculateAttributeValues(attributeNames).map(p => p._2.map((p._1, _)))) diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala index 445e76e4..043dd775 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala @@ -11,7 +11,12 @@ import org.elasticmq.actor.QueueManagerActor import org.elasticmq.metrics.QueuesMetrics import org.elasticmq.rest.sqs.Constants._ import org.elasticmq.rest.sqs.XmlNsVersion.extractXmlNs -import org.elasticmq.rest.sqs.directives.{ AnyParamDirectives,AWSProtocolDirectives, ElasticMQDirectives, UnmatchedActionRoutes} +import org.elasticmq.rest.sqs.directives.{ + AnyParamDirectives, + AWSProtocolDirectives, + ElasticMQDirectives, + UnmatchedActionRoutes +} import org.elasticmq.rest.sqs.model.RequestPayload import org.elasticmq.util.{Logging, NowProvider} @@ -406,7 +411,7 @@ object MD5Util { addEncodedString(byteStream, n.stringValue.toString) case b: BinaryMessageAttribute => byteStream.write(2) - addEncodedByteArray(byteStream, b.binaryValue) + addEncodedByteArray(byteStream, b.binaryValue.toArray) } }