diff --git a/core/src/main/scala/org/elasticmq/QueueData.scala b/core/src/main/scala/org/elasticmq/QueueData.scala index b3057e183..e91f52789 100644 --- a/core/src/main/scala/org/elasticmq/QueueData.scala +++ b/core/src/main/scala/org/elasticmq/QueueData.scala @@ -3,15 +3,14 @@ package org.elasticmq import org.joda.time.{Duration, DateTime} case class QueueData(name: String, - defaultVisibilityTimeout: MillisVisibilityTimeout, - delay: Duration, - receiveMessageWait: Duration, - created: DateTime, - lastModified: DateTime, - deadLettersQueue: Option[DeadLettersQueueData] = None, - maxReceiveCount: Option[Int] = None, - isFifo: Boolean = false, - hasContentBasedDeduplication: Boolean = false -) + defaultVisibilityTimeout: MillisVisibilityTimeout, + delay: Duration, + receiveMessageWait: Duration, + created: DateTime, + lastModified: DateTime, + deadLettersQueue: Option[DeadLettersQueueData] = None, + maxReceiveCount: Option[Int] = None, + isFifo: Boolean = false, + hasContentBasedDeduplication: Boolean = false) case class DeadLettersQueueData(name: String, maxReceiveCount: Int) diff --git a/core/src/main/scala/org/elasticmq/actor/queue/InternalMessage.scala b/core/src/main/scala/org/elasticmq/actor/queue/InternalMessage.scala index 519c19259..6bec5d7e1 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/InternalMessage.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/InternalMessage.scala @@ -5,20 +5,32 @@ import java.util.UUID import scala.collection.mutable import org.elasticmq.util.NowProvider -import org.elasticmq.{DeliveryReceipt, MessageAttribute, MessageData, MessageId, MessageStatistics, MillisNextDelivery, NeverReceived, NewMessageData, OnDateTimeReceived, QueueData, Received} +import org.elasticmq.{ + DeliveryReceipt, + MessageAttribute, + MessageData, + MessageId, + MessageStatistics, + MillisNextDelivery, + NeverReceived, + NewMessageData, + OnDateTimeReceived, + QueueData, + Received +} import org.joda.time.DateTime case class InternalMessage(id: String, - deliveryReceipts: mutable.Buffer[String], - var nextDelivery: Long, - content: String, - messageAttributes: Map[String, MessageAttribute], - created: DateTime, - var firstReceive: Received, - var receiveCount: Int, - isFifo: Boolean, - messageGroupId: Option[String], - messageDeduplicationId: Option[String]) + deliveryReceipts: mutable.Buffer[String], + var nextDelivery: Long, + content: String, + messageAttributes: Map[String, MessageAttribute], + created: DateTime, + var firstReceive: Received, + var receiveCount: Int, + isFifo: Boolean, + messageGroupId: Option[String], + messageDeduplicationId: Option[String]) extends Comparable[InternalMessage] { // Priority queues have biggest elements first @@ -34,10 +46,10 @@ case class InternalMessage(id: String, } /** - * Keep track of delivering this message to a client - * - * @param nextDeliveryMillis When this message should become available for its next delivery - */ + * Keep track of delivering this message to a client + * + * @param nextDeliveryMillis When this message should become available for its next delivery + */ def trackDelivery(nextDeliveryMillis: MillisNextDelivery)(implicit nowProvider: NowProvider): Unit = { deliveryReceipts += DeliveryReceipt.generate(MessageId(id)).receipt nextDelivery = nextDeliveryMillis.millis @@ -48,24 +60,26 @@ case class InternalMessage(id: String, } } - def toMessageData = MessageData( - MessageId(id), - deliveryReceipts.lastOption.map(DeliveryReceipt(_)), - content, - messageAttributes, - MillisNextDelivery(nextDelivery), - created, - MessageStatistics(firstReceive, receiveCount), - messageGroupId, - messageDeduplicationId) + def toMessageData = + MessageData( + MessageId(id), + deliveryReceipts.lastOption.map(DeliveryReceipt(_)), + content, + messageAttributes, + MillisNextDelivery(nextDelivery), + created, + MessageStatistics(firstReceive, receiveCount), + messageGroupId, + messageDeduplicationId + ) - def toNewMessageData = NewMessageData( - Some(MessageId(id)), - content, - messageAttributes, - MillisNextDelivery(nextDelivery), - messageGroupId, - messageDeduplicationId) + def toNewMessageData = + NewMessageData(Some(MessageId(id)), + content, + messageAttributes, + MillisNextDelivery(nextDelivery), + messageGroupId, + messageDeduplicationId) def deliverable(deliveryTime: Long): Boolean = nextDelivery <= deliveryTime } @@ -85,9 +99,9 @@ object InternalMessage { 0, queueData.isFifo, newMessageData.messageGroupId, - newMessageData.messageDeduplicationId) + newMessageData.messageDeduplicationId + ) } private def generateId() = MessageId(UUID.randomUUID().toString) } - diff --git a/core/src/main/scala/org/elasticmq/actor/queue/MessageQueue.scala b/core/src/main/scala/org/elasticmq/actor/queue/MessageQueue.scala index a9ce3c171..714cf7e20 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/MessageQueue.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/MessageQueue.scala @@ -6,66 +6,68 @@ import scala.collection.mutable sealed trait MessageQueue { /** - * Add a message onto the queue. Note that this doesn't do any deduplication, that should've happened in an earlier - * step. - * - * @param message The message to add onto the queue - */ + * Add a message onto the queue. Note that this doesn't do any deduplication, that should've happened in an earlier + * step. + * + * @param message The message to add onto the queue + */ def +=(message: InternalMessage): Unit /** - * Get the messages indexed by their unique id - * - * @return The messages indexed by their id - */ + * Get the messages indexed by their unique id + * + * @return The messages indexed by their id + */ def byId: Map[String, InternalMessage] /** - * Drop all messages on the queue - */ + * Drop all messages on the queue + */ def clear(): Unit /** - * Remove the message with the given id - * - * @param messageId The id of the message to remove - */ + * Remove the message with the given id + * + * @param messageId The id of the message to remove + */ def remove(messageId: String): Unit /** - * Return a message queue where all the messages on the queue do not match the given predicate function - * - * @param p The predicate function to filter the message by. Any message that does not match the predicate will be - * retained on the new queue - * @return The new message queue - */ + * Return a message queue where all the messages on the queue do not match the given predicate function + * + * @param p The predicate function to filter the message by. Any message that does not match the predicate will be + * retained on the new queue + * @return The new message queue + */ def filterNot(p: InternalMessage => Boolean): MessageQueue /** - * Dequeues `count` messages from the queue - * - * @param count The number of messages to dequeue from the queue - * @param deliveryTime The timestamp from which messages should be available (usually, this is the current millis - * since epoch. It is useful to pass in a special value during the tests however.) - * @return The dequeued messages, if any - */ + * Dequeues `count` messages from the queue + * + * @param count The number of messages to dequeue from the queue + * @param deliveryTime The timestamp from which messages should be available (usually, this is the current millis + * since epoch. It is useful to pass in a special value during the tests however.) + * @return The dequeued messages, if any + */ def dequeue(count: Int, deliveryTime: Long): List[InternalMessage] /** - * Get the next available message on the given queue - * - * @param priorityQueue The queue for which to get the next available message. It's assumed the messages on this - * queue all belong to the same message group. - * @param deliveryTime The timestamp from which messages should be available - * @param accBatch An accumulator holding the messages that have already been retrieved. - * @param accMessage An accumulator holding the messages that have been dequeued from the priority queue and - * cannot be delivered. These messages should be put back on the queue before returning - * to the caller - * @return - */ + * Get the next available message on the given queue + * + * @param priorityQueue The queue for which to get the next available message. It's assumed the messages on this + * queue all belong to the same message group. + * @param deliveryTime The timestamp from which messages should be available + * @param accBatch An accumulator holding the messages that have already been retrieved. + * @param accMessage An accumulator holding the messages that have been dequeued from the priority queue and + * cannot be delivered. These messages should be put back on the queue before returning + * to the caller + * @return + */ @tailrec - protected final def nextVisibleMessage(priorityQueue: mutable.PriorityQueue[InternalMessage], deliveryTime: Long, - accBatch: List[InternalMessage], accMessage: Seq[InternalMessage] = Seq.empty): Option[InternalMessage] = { + protected final def nextVisibleMessage(priorityQueue: mutable.PriorityQueue[InternalMessage], + deliveryTime: Long, + accBatch: List[InternalMessage], + accMessage: Seq[InternalMessage] = Seq.empty): Option[InternalMessage] = { if (priorityQueue.nonEmpty) { val msg = priorityQueue.dequeue() @@ -100,15 +102,16 @@ sealed trait MessageQueue { object MessageQueue { - def apply(isFifo: Boolean): MessageQueue = if (isFifo) { - new FifoMessageQueue - } else { - new SimpleMessageQueue - } + def apply(isFifo: Boolean): MessageQueue = + if (isFifo) { + new FifoMessageQueue + } else { + new SimpleMessageQueue + } /** - * A "simple" straightforward message queue. The queue represents the common SQS behaviour - */ + * A "simple" straightforward message queue. The queue represents the common SQS behaviour + */ class SimpleMessageQueue extends MessageQueue { protected val messagesById: mutable.HashMap[String, InternalMessage] = mutable.HashMap.empty protected val messageQueue: mutable.PriorityQueue[InternalMessage] = mutable.PriorityQueue.empty @@ -146,15 +149,15 @@ object MessageQueue { } else { nextVisibleMessage(messageQueue, deliveryTime, acc) match { case Some(msg) => dequeue0(count - 1, deliveryTime, acc :+ msg) - case None => acc + case None => acc } } } } /** - * A FIFO queue that mimics SQS' FIFO queue implementation - */ + * A FIFO queue that mimics SQS' FIFO queue implementation + */ class FifoMessageQueue extends SimpleMessageQueue { private val messagesbyMessageGroupId = mutable.HashMap.empty[String, mutable.PriorityQueue[InternalMessage]] @@ -199,18 +202,19 @@ object MessageQueue { acc } else { dequeueFromFifo(acc, deliveryTime) match { - case Some(msg) => dequeue0(count -1, deliveryTime, acc :+ msg) - case None => acc + case Some(msg) => dequeue0(count - 1, deliveryTime, acc :+ msg) + case None => acc } } } /** - * Dequeue a message from the fifo queue. Try to dequeue a message from the same message group as the previous - * message before trying other message groups. - */ - private def dequeueFromFifo(accBatch: List[InternalMessage], deliveryTime: Long, - triedMessageGroups: Set[String] = Set.empty): Option[InternalMessage] = { + * Dequeue a message from the fifo queue. Try to dequeue a message from the same message group as the previous + * message before trying other message groups. + */ + private def dequeueFromFifo(accBatch: List[InternalMessage], + deliveryTime: Long, + triedMessageGroups: Set[String] = Set.empty): Option[InternalMessage] = { val messageGroupIdHint = accBatch.lastOption.map(getMessageGroupIdUnsafe).filterNot(triedMessageGroups.contains) messageGroupIdHint.orElse(randomMessageGroup(triedMessageGroups)).flatMap { messageGroupId => dequeueFromMessageGroup(messageGroupId, deliveryTime, accBatch) @@ -219,10 +223,11 @@ object MessageQueue { } /** - * Try to dequeue a message from the given message group - */ - private def dequeueFromMessageGroup(messageGroupId: String, deliveryTime: Long, - accBatch: List[InternalMessage]): Option[InternalMessage] = { + * Try to dequeue a message from the given message group + */ + private def dequeueFromMessageGroup(messageGroupId: String, + deliveryTime: Long, + accBatch: List[InternalMessage]): Option[InternalMessage] = { messagesbyMessageGroupId.get(messageGroupId) match { case Some(priorityQueue) if priorityQueue.nonEmpty => val msg = nextVisibleMessage(priorityQueue, deliveryTime, accBatch) @@ -237,38 +242,38 @@ object MessageQueue { } /** - * Return a message group id that has at least 1 message active on the queue and that is not part of the given set - * of `triedMessageGroupIds` - * - * @param triedMessageGroupIds The ids of message groups to ignore - * @return The id of a random message group that is not part of `triedMessageGroupIds` - */ + * Return a message group id that has at least 1 message active on the queue and that is not part of the given set + * of `triedMessageGroupIds` + * + * @param triedMessageGroupIds The ids of message groups to ignore + * @return The id of a random message group that is not part of `triedMessageGroupIds` + */ private def randomMessageGroup(triedMessageGroupIds: Set[String]): Option[String] = { val remainingMessageGroupIds = messagesbyMessageGroupId.keySet -- triedMessageGroupIds remainingMessageGroupIds.headOption } /** - * Get the message group id from a given message. If the message has no message group id, an - * [[IllegalStateException]] will be thrown. - * - * @param msg The message to get the message group id for - * @return The message group id - * @throws IllegalStateException if the message has no message group id - */ + * Get the message group id from a given message. If the message has no message group id, an + * [[IllegalStateException]] will be thrown. + * + * @param msg The message to get the message group id for + * @return The message group id + * @throws IllegalStateException if the message has no message group id + */ private def getMessageGroupIdUnsafe(msg: InternalMessage): String = getMessageGroupIdUnsafe(msg.messageGroupId) /** - * Get the message group id from an optional string. If the given optional string is empty, an - * [[IllegalStateException]] will be thrown - * - * @param messageGroupId The optional string - * @return The message group id - * @throws IllegalStateException if the optional string holds no message group id - */ + * Get the message group id from an optional string. If the given optional string is empty, an + * [[IllegalStateException]] will be thrown + * + * @param messageGroupId The optional string + * @return The message group id + * @throws IllegalStateException if the optional string holds no message group id + */ private def getMessageGroupIdUnsafe(messageGroupId: Option[String]) = - messageGroupId.getOrElse(throw new IllegalStateException( - "Messages on a FIFO queue are required to have a message group id")) + messageGroupId.getOrElse( + throw new IllegalStateException("Messages on a FIFO queue are required to have a message group id")) } } diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala index 404c178d3..c85da9ecc 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala @@ -13,12 +13,12 @@ trait QueueActorMessageOps extends Logging { def nowProvider: NowProvider def receiveAndReplyMessageMsg[T](msg: QueueMessageMsg[T]): ReplyAction[T] = msg match { - case SendMessage(message) => sendMessage(message) + case SendMessage(message) => sendMessage(message) case UpdateVisibilityTimeout(messageId, visibilityTimeout) => updateVisibilityTimeout(messageId, visibilityTimeout) case ReceiveMessages(visibilityTimeout, count, _, receiveRequestAttemptId) => receiveMessages(visibilityTimeout, count, receiveRequestAttemptId) case DeleteMessage(deliveryReceipt) => deleteMessage(deliveryReceipt) - case LookupMessage(messageId) => messageQueue.byId.get(messageId.id).map(_.toMessageData) + case LookupMessage(messageId) => messageQueue.byId.get(messageId.id).map(_.toMessageData) } private def sendMessage(message: NewMessageData): MessageData = { @@ -30,7 +30,7 @@ trait QueueActorMessageOps extends Logging { // deleted message (that was sent less than 5 minutes ago, the new message should not be added). messageQueue.byId.values.find(isDuplicate(message, _)) match { case Some(messageOnQueue) => messageOnQueue.toMessageData - case None => addMessage(message) + case None => addMessage(message) } } else { addMessage(message) @@ -38,12 +38,12 @@ trait QueueActorMessageOps extends Logging { } /** - * Check whether a new message is a duplicate of the message that's on the queue. - * - * @param newMessage The message that needs to be added to the queue - * @param queueMessage The message that's already on the queue - * @return Whether the new message counts as a duplicate - */ + * Check whether a new message is a duplicate of the message that's on the queue. + * + * @param newMessage The message that needs to be added to the queue + * @param queueMessage The message that's already on the queue + * @return Whether the new message counts as a duplicate + */ private def isDuplicate(newMessage: NewMessageData, queueMessage: InternalMessage): Boolean = { lazy val isWithinDeduplicationWindow = queueMessage.created.plusMinutes(5).isAfter(nowProvider.now) newMessage.messageDeduplicationId == queueMessage.messageDeduplicationId && isWithinDeduplicationWindow @@ -87,21 +87,22 @@ trait QueueActorMessageOps extends Logging { } } - protected def receiveMessages(visibilityTimeout: VisibilityTimeout, count: Int, - receiveRequestAttemptId: Option[String]): List[MessageData] = { + protected def receiveMessages(visibilityTimeout: VisibilityTimeout, + count: Int, + receiveRequestAttemptId: Option[String]): List[MessageData] = { implicit val np = nowProvider val messages = receiveRequestAttemptId - .flatMap(getMessagesFromRequestAttemptCache) - .getOrElse(getMessagesFromQueue(visibilityTimeout, count)) - .map { internalMessage => - // Putting the msg again into the queue, with a new next delivery - val newNextDelivery = computeNextDelivery(visibilityTimeout) - internalMessage.trackDelivery(newNextDelivery) - messageQueue += internalMessage - - logger.debug(s"${queueData.name}: Receiving message ${internalMessage.id}") - internalMessage - } + .flatMap(getMessagesFromRequestAttemptCache) + .getOrElse(getMessagesFromQueue(visibilityTimeout, count)) + .map { internalMessage => + // Putting the msg again into the queue, with a new next delivery + val newNextDelivery = computeNextDelivery(visibilityTimeout) + internalMessage.trackDelivery(newNextDelivery) + messageQueue += internalMessage + + logger.debug(s"${queueData.name}: Receiving message ${internalMessage.id}") + internalMessage + } receiveRequestAttemptId.foreach { attemptId => receiveRequestAttemptCache.add(attemptId, messages) @@ -111,11 +112,11 @@ trait QueueActorMessageOps extends Logging { } private def getMessagesFromRequestAttemptCache(receiveRequestAttemptId: String)( - implicit np: NowProvider): Option[List[InternalMessage]] = { + implicit np: NowProvider): Option[List[InternalMessage]] = { receiveRequestAttemptCache.get(receiveRequestAttemptId, messageQueue) match { - case Left(Expired) => throw new RuntimeException("Attempt expired") - case Left(Invalid) => throw new RuntimeException("Invalid") - case Right(None) => None + case Left(Expired) => throw new RuntimeException("Attempt expired") + case Left(Invalid) => throw new RuntimeException("Invalid") + case Right(None) => None case Right(Some(messages)) => Some(messages) } } @@ -140,7 +141,7 @@ trait QueueActorMessageOps extends Logging { } private def getVisibilityTimeoutMillis(visibilityTimeout: VisibilityTimeout) = visibilityTimeout match { - case DefaultVisibilityTimeout => queueData.defaultVisibilityTimeout.millis + case DefaultVisibilityTimeout => queueData.defaultVisibilityTimeout.millis case MillisVisibilityTimeout(millis) => millis } @@ -154,4 +155,4 @@ trait QueueActorMessageOps extends Logging { } } } -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorQueueOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorQueueOps.scala index 5bb5981b6..1d8fa12c9 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorQueueOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorQueueOps.scala @@ -33,7 +33,7 @@ trait QueueActorQueueOps extends Logging { if (internalMessage.nextDelivery < deliveryTime) { visible += 1 } else if (internalMessage.deliveryReceipts.nonEmpty) { - invisible +=1 + invisible += 1 } else { delayed += 1 } diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorWaitForMessagesOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorWaitForMessagesOps.scala index 6a019e1fb..6a2d26531 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorWaitForMessagesOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorWaitForMessagesOps.scala @@ -39,7 +39,7 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO scheduleTryReplyWhenAvailable() result - case rm@ReceiveMessages(_, _, waitForMessagesOpt, _) => + case rm @ ReceiveMessages(_, _, waitForMessagesOpt, _) => val result = super.receiveAndReplyMessageMsg(msg) val waitForMessages = waitForMessagesOpt.getOrElse(queueData.receiveMessageWait) @@ -63,7 +63,9 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO @tailrec private def tryReply() { awaitingReply.headOption match { - case Some((seq, AwaitingData(originalSender, ReceiveMessages(visibilityTimeout, count, _, receiveRequestAttemptId), _))) => + case Some( + (seq, + AwaitingData(originalSender, ReceiveMessages(visibilityTimeout, count, _, receiveRequestAttemptId), _))) => val received = super.receiveMessages(visibilityTimeout, count, receiveRequestAttemptId) if (received != Nil) { @@ -100,7 +102,7 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO messageQueue.byId.values.toList.sortBy(_.nextDelivery).headOption match { case Some(msg) => scheduledTryReply = Some(schedule(msg.nextDelivery - deliveryTime + 1, TryReply)) - case None => + case None => } } } diff --git a/core/src/main/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCache.scala b/core/src/main/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCache.scala index 16c971f83..7bc6ebd81 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCache.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCache.scala @@ -8,57 +8,58 @@ import org.elasticmq.util.NowProvider class ReceiveRequestAttemptCache { - private val cache: mutable.Map[String, (Long, List[InternalMessage])] = mutable.Map.empty + private val cache: mutable.Map[String, (Long, List[InternalMessage])] = mutable.Map.empty private val FIVE_MINUTES = 5 * 60 * 1000 private val FIFTEEN_MINUTES = 15 * 60 * 1000 - def add(attemptId: String, messages: List[InternalMessage])(implicit nowProvider: NowProvider): Unit = { + def add(attemptId: String, messages: List[InternalMessage])(implicit nowProvider: NowProvider): Unit = { // Cache the given attempt cache.put(attemptId, (nowProvider.nowMillis, messages)) // Remove attempts that are older than fifteen minutes // TODO: Rather than do this on every add, maybe this could be done periodically? clean() - } + } - def get(attemptId: String, messageQueue: MessageQueue)(implicit nowProvider: NowProvider): Either[ReceiveFailure, Option[List[InternalMessage]]] = { - cache.get(attemptId) match { - case Some((cacheTime, _)) if cacheTime + FIVE_MINUTES < nowProvider.nowMillis => - cache.remove(attemptId) - Left(Expired) - case Some((_, messages)) if !allValid(messages, messageQueue) => - Left(Invalid) - case Some((_, messages)) => - Right(Some(messages)) - case None => - Right(None) - } - } + def get(attemptId: String, messageQueue: MessageQueue)( + implicit nowProvider: NowProvider): Either[ReceiveFailure, Option[List[InternalMessage]]] = { + cache.get(attemptId) match { + case Some((cacheTime, _)) if cacheTime + FIVE_MINUTES < nowProvider.nowMillis => + cache.remove(attemptId) + Left(Expired) + case Some((_, messages)) if !allValid(messages, messageQueue) => + Left(Invalid) + case Some((_, messages)) => + Right(Some(messages)) + case None => + Right(None) + } + } /** - * Ensure that all the given messages are still valid on the queue - * - * @param messages The messages to check - * @param messageQueue The queue that should contain the messages - * @return `true` if all the given messages are still valid and present on the queue - */ - private def allValid(messages: List[InternalMessage], messageQueue: MessageQueue): Boolean = { - messages.forall { lastAttemptMessage => - messageQueue.byId.get(lastAttemptMessage.id) match { - case None => - // If the message has been deleted, than this message can no longer be returned for the same request attempt. - false - case Some(messageOnQueue) => - // Verify the message hasn't been updated since the last time we've received it - messageOnQueue.nextDelivery == lastAttemptMessage.nextDelivery - } - } - } + * Ensure that all the given messages are still valid on the queue + * + * @param messages The messages to check + * @param messageQueue The queue that should contain the messages + * @return `true` if all the given messages are still valid and present on the queue + */ + private def allValid(messages: List[InternalMessage], messageQueue: MessageQueue): Boolean = { + messages.forall { lastAttemptMessage => + messageQueue.byId.get(lastAttemptMessage.id) match { + case None => + // If the message has been deleted, than this message can no longer be returned for the same request attempt. + false + case Some(messageOnQueue) => + // Verify the message hasn't been updated since the last time we've received it + messageOnQueue.nextDelivery == lastAttemptMessage.nextDelivery + } + } + } /** - * Remove any keys that are older than fifteen minutes - */ + * Remove any keys that are older than fifteen minutes + */ private def clean()(implicit nowProvider: NowProvider): Unit = { val expiredKeys = cache.collect { case (oldAttemptId, (time, _)) if time + FIFTEEN_MINUTES < nowProvider.nowMillis => oldAttemptId @@ -69,17 +70,17 @@ class ReceiveRequestAttemptCache { object ReceiveRequestAttemptCache { - sealed trait ReceiveFailure - object ReceiveFailure { + sealed trait ReceiveFailure + object ReceiveFailure { - /** + /** * Indicates the attempt id has expired */ - case object Expired extends ReceiveFailure + case object Expired extends ReceiveFailure /** - * Indicates the attempt cannot be retried as a message has been deleted from the queue - */ + * Indicates the attempt cannot be retried as a message has been deleted from the queue + */ case object Invalid extends ReceiveFailure - } + } } diff --git a/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala b/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala index afe7bd9de..6863271f6 100644 --- a/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala +++ b/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala @@ -20,7 +20,10 @@ case class ClearQueue() extends QueueQueueMsg[Unit] case class SendMessage(message: NewMessageData) extends QueueMessageMsg[MessageData] case class UpdateVisibilityTimeout(messageId: MessageId, visibilityTimeout: VisibilityTimeout) extends QueueMessageMsg[Either[MessageDoesNotExist, Unit]] -case class ReceiveMessages(visibilityTimeout: VisibilityTimeout, count: Int, waitForMessages: Option[Duration], - receiveRequestAttemptId: Option[String]) extends QueueMessageMsg[List[MessageData]] +case class ReceiveMessages(visibilityTimeout: VisibilityTimeout, + count: Int, + waitForMessages: Option[Duration], + receiveRequestAttemptId: Option[String]) + extends QueueMessageMsg[List[MessageData]] case class DeleteMessage(deliveryReceipt: DeliveryReceipt) extends QueueMessageMsg[Unit] case class LookupMessage(messageId: MessageId) extends QueueMessageMsg[Option[MessageData]] diff --git a/core/src/test/scala/org/elasticmq/actor/QueueActorMsgOpsTest.scala b/core/src/test/scala/org/elasticmq/actor/QueueActorMsgOpsTest.scala index 6d0c9d028..68507a745 100644 --- a/core/src/test/scala/org/elasticmq/actor/QueueActorMsgOpsTest.scala +++ b/core/src/test/scala/org/elasticmq/actor/QueueActorMsgOpsTest.scala @@ -263,9 +263,9 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D List(receiveResult2) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) } yield { // Then - lookupResult.statistics should be (MessageStatistics.empty) - receiveResult1.statistics should be (MessageStatistics(OnDateTimeReceived(new DateTime(100L)), 1)) - receiveResult2.statistics should be (MessageStatistics(OnDateTimeReceived(new DateTime(100L)), 2)) + lookupResult.statistics should be(MessageStatistics.empty) + receiveResult1.statistics should be(MessageStatistics(OnDateTimeReceived(new DateTime(100L)), 1)) + receiveResult2.statistics should be(MessageStatistics(OnDateTimeReceived(new DateTime(100L)), 2)) } } @@ -346,7 +346,10 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D Right(queueActor) <- queueManagerActor ? CreateQueue(q1) // When - receiveResultsFuture = queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, Some(Duration.millis(1000L)), None) + receiveResultsFuture = queueActor ? ReceiveMessages(DefaultVisibilityTimeout, + 5, + Some(Duration.millis(1000L)), + None) _ <- { Thread.sleep(500); nowProvider.mutableNowMillis.set(200L); queueActor ? SendMessage(msg) @@ -370,8 +373,14 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D Right(queueActor) <- queueManagerActor ? CreateQueue(q1) // When - receiveResults1Future = queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, Some(Duration.millis(1000L)), None) - receiveResults2Future = queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, Some(Duration.millis(1000L)), None) + receiveResults1Future = queueActor ? ReceiveMessages(DefaultVisibilityTimeout, + 5, + Some(Duration.millis(1000L)), + None) + receiveResults2Future = queueActor ? ReceiveMessages(DefaultVisibilityTimeout, + 5, + Some(Duration.millis(1000L)), + None) _ <- { Thread.sleep(500); queueActor ? SendMessage(msg) } @@ -399,9 +408,18 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D Right(queueActor) <- queueManagerActor ? CreateQueue(q1) // When - receiveResults1Future = queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, Some(Duration.millis(1000L)), None) - receiveResults2Future = queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, Some(Duration.millis(1000L)), None) - receiveResults3Future = queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, Some(Duration.millis(1000L)), None) + receiveResults1Future = queueActor ? ReceiveMessages(DefaultVisibilityTimeout, + 5, + Some(Duration.millis(1000L)), + None) + receiveResults2Future = queueActor ? ReceiveMessages(DefaultVisibilityTimeout, + 5, + Some(Duration.millis(1000L)), + None) + receiveResults3Future = queueActor ? ReceiveMessages(DefaultVisibilityTimeout, + 5, + Some(Duration.millis(1000L)), + None) _ <- { Thread.sleep(500); queueActor ? SendMessage(msg1); diff --git a/core/src/test/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCacheTest.scala b/core/src/test/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCacheTest.scala index d16fb0c10..62870fdda 100644 --- a/core/src/test/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCacheTest.scala +++ b/core/src/test/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCacheTest.scala @@ -8,66 +8,88 @@ import org.elasticmq.actor.test.MutableNowProvider import org.joda.time.DateTime import org.scalatest.{FunSuite, Matchers} -class ReceiveRequestAttemptCacheTest extends FunSuite with Matchers{ +class ReceiveRequestAttemptCacheTest extends FunSuite with Matchers { - test("should retrieve message by attempt id or return the appropriate failure") { - val cache = new ReceiveRequestAttemptCache + test("should retrieve message by attempt id or return the appropriate failure") { + val cache = new ReceiveRequestAttemptCache - implicit val nowProvider = new MutableNowProvider - nowProvider.mutableNowMillis.set(1L) + implicit val nowProvider = new MutableNowProvider + nowProvider.mutableNowMillis.set(1L) - val attemptId1 = "attempt-1" - val attemptId2 = "attempt-2" - val msg1 = InternalMessage("id-1", mutable.Buffer.empty, 1L, "content", Map.empty, DateTime.now(), NeverReceived, - receiveCount = 0, isFifo = false, messageGroupId = None, messageDeduplicationId = None) - val msg2 = msg1.copy(id = "id-2") - val msg3 = msg1.copy(id = "id-3") + val attemptId1 = "attempt-1" + val attemptId2 = "attempt-2" + val msg1 = InternalMessage( + "id-1", + mutable.Buffer.empty, + 1L, + "content", + Map.empty, + DateTime.now(), + NeverReceived, + receiveCount = 0, + isFifo = false, + messageGroupId = None, + messageDeduplicationId = None + ) + val msg2 = msg1.copy(id = "id-2") + val msg3 = msg1.copy(id = "id-3") - val messageQueue = MessageQueue(isFifo = false) - messageQueue += msg1 - messageQueue += msg2 - messageQueue += msg3 + val messageQueue = MessageQueue(isFifo = false) + messageQueue += msg1 + messageQueue += msg2 + messageQueue += msg3 - cache.add(attemptId1, List(msg1, msg2)) - cache.add(attemptId2, List(msg3)) + cache.add(attemptId1, List(msg1, msg2)) + cache.add(attemptId2, List(msg3)) - // Verify the happy path - cache.get(attemptId1, messageQueue).right.get should be(Some(List(msg1, msg2))) - cache.get(attemptId2, messageQueue).right.get should be(Some(List(msg3))) - cache.get("unknown-attempt", messageQueue).right.get should be(empty) + // Verify the happy path + cache.get(attemptId1, messageQueue).right.get should be(Some(List(msg1, msg2))) + cache.get(attemptId2, messageQueue).right.get should be(Some(List(msg3))) + cache.get("unknown-attempt", messageQueue).right.get should be(empty) - // Using the attempt id from another queue - messageQueue.remove(msg1.id) - messageQueue.remove(msg3.id) - cache.get(attemptId1, messageQueue).left.get should be(Invalid) + // Using the attempt id from another queue + messageQueue.remove(msg1.id) + messageQueue.remove(msg3.id) + cache.get(attemptId1, messageQueue).left.get should be(Invalid) - // Verify expirations (5 minutes later + some change) - nowProvider.mutableNowMillis.set(1L + 5 * 60 * 1000 + 2000) - cache.get(attemptId1, messageQueue).left.get should be(ReceiveRequestAttemptCache.ReceiveFailure.Expired) - cache.get(attemptId2, messageQueue).left.get should be(ReceiveRequestAttemptCache.ReceiveFailure.Expired) - } + // Verify expirations (5 minutes later + some change) + nowProvider.mutableNowMillis.set(1L + 5 * 60 * 1000 + 2000) + cache.get(attemptId1, messageQueue).left.get should be(ReceiveRequestAttemptCache.ReceiveFailure.Expired) + cache.get(attemptId2, messageQueue).left.get should be(ReceiveRequestAttemptCache.ReceiveFailure.Expired) + } - test("adding an attempt should clean out any old attempts") { - // Given - val cache = new ReceiveRequestAttemptCache - implicit val nowProvider = new MutableNowProvider - nowProvider.mutableNowMillis.set(1L) - val attemptId1 = "attempt-1" - val attemptId2 = "attempt-2" - val msg1 = InternalMessage("id-1", mutable.Buffer.empty, 1L, "content", Map.empty, DateTime.now(), NeverReceived, - receiveCount = 0, isFifo = false, messageGroupId = None, messageDeduplicationId = None) - val msg2 = msg1.copy(id = "id-2") - val messageQueue = MessageQueue(isFifo = false) - messageQueue += msg1 - messageQueue += msg2 - cache.add(attemptId1, List(msg1)) + test("adding an attempt should clean out any old attempts") { + // Given + val cache = new ReceiveRequestAttemptCache + implicit val nowProvider = new MutableNowProvider + nowProvider.mutableNowMillis.set(1L) + val attemptId1 = "attempt-1" + val attemptId2 = "attempt-2" + val msg1 = InternalMessage( + "id-1", + mutable.Buffer.empty, + 1L, + "content", + Map.empty, + DateTime.now(), + NeverReceived, + receiveCount = 0, + isFifo = false, + messageGroupId = None, + messageDeduplicationId = None + ) + val msg2 = msg1.copy(id = "id-2") + val messageQueue = MessageQueue(isFifo = false) + messageQueue += msg1 + messageQueue += msg2 + cache.add(attemptId1, List(msg1)) - // Sanity-check the attempt is still in the cache - cache.get(attemptId1, messageQueue).right.get should be(defined) + // Sanity-check the attempt is still in the cache + cache.get(attemptId1, messageQueue).right.get should be(defined) - // 15 minutes pass, the attempt should be removed from the cache to prevent OOMs - nowProvider.mutableNowMillis.set(15 * 60 * 1000 + 2) - cache.add(attemptId2, List(msg2)) - cache.get(attemptId1, messageQueue).right.get should be(empty) - } + // 15 minutes pass, the attempt should be removed from the cache to prevent OOMs + nowProvider.mutableNowMillis.set(15 * 60 * 1000 + 2) + cache.add(attemptId2, List(msg2)) + cache.get(attemptId1, messageQueue).right.get should be(empty) + } } diff --git a/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala b/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala index 3e3ad2f46..f6be474dc 100644 --- a/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala +++ b/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala @@ -20,19 +20,45 @@ trait DataCreationHelpers { deadLettersQueue, None) - def createMessageData(id: String, content: String, messageAttributes: Map[String, MessageAttribute], - nextDelivery: MillisNextDelivery, deliveryReceipt: Option[DeliveryReceipt] = None, - messageGroupId: Option[String] = None, messageDeduplicationId: Option[String] = None) = - MessageData(MessageId(id), deliveryReceipt, content, messageAttributes, nextDelivery, new DateTime(0), - MessageStatistics(NeverReceived, 0), messageGroupId, messageDeduplicationId) + def createMessageData(id: String, + content: String, + messageAttributes: Map[String, MessageAttribute], + nextDelivery: MillisNextDelivery, + deliveryReceipt: Option[DeliveryReceipt] = None, + messageGroupId: Option[String] = None, + messageDeduplicationId: Option[String] = None) = + MessageData( + MessageId(id), + deliveryReceipt, + content, + messageAttributes, + nextDelivery, + new DateTime(0), + MessageStatistics(NeverReceived, 0), + messageGroupId, + messageDeduplicationId + ) - def createNewMessageData(id: String, content: String, messageAttributes: Map[String, MessageAttribute], - nextDelivery: MillisNextDelivery, messageGroupId: Option[String] = None, - messageDeduplicationId: Option[String] = None) = - NewMessageData(Some(MessageId(id)), content, messageAttributes, nextDelivery, messageGroupId, - messageDeduplicationId) + def createNewMessageData(id: String, + content: String, + messageAttributes: Map[String, MessageAttribute], + nextDelivery: MillisNextDelivery, + messageGroupId: Option[String] = None, + messageDeduplicationId: Option[String] = None) = + NewMessageData(Some(MessageId(id)), + content, + messageAttributes, + nextDelivery, + messageGroupId, + messageDeduplicationId) def createNewMessageData(messageData: MessageData) = - NewMessageData(Some(messageData.id), messageData.content, messageData.messageAttributes, messageData.nextDelivery, - messageData.messageGroupId, messageData.messageDeduplicationId) + NewMessageData( + Some(messageData.id), + messageData.content, + messageData.messageAttributes, + messageData.nextDelivery, + messageData.messageGroupId, + messageData.messageDeduplicationId + ) } diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala index 03ce4a76c..61e46b524 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala @@ -326,9 +326,12 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter test("FIFO queues should return an error if an invalid message group id parameter is provided") { // Given - val fifoQueueUrl = client.createQueue(new CreateQueueRequest("testQueue.fifo") - .addAttributesEntry("FifoQueue", "true") - .addAttributesEntry("ContentBasedDeduplication", "true")).getQueueUrl + val fifoQueueUrl = client + .createQueue( + new CreateQueueRequest("testQueue.fifo") + .addAttributesEntry("FifoQueue", "true") + .addAttributesEntry("ContentBasedDeduplication", "true")) + .getQueueUrl val regularQueueUrl = client.createQueue(new CreateQueueRequest("testQueue1")).getQueueUrl // An illegal character @@ -356,17 +359,18 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter test("FIFO queues do not support delaying individual messages") { val queueUrl = createFifoQueue() an[AmazonSQSException] shouldBe thrownBy { - client.sendMessage(new SendMessageRequest(queueUrl, "body") + client.sendMessage( + new SendMessageRequest(queueUrl, "body") .withMessageDeduplicationId("1") .withMessageGroupId("1") - .withDelaySeconds(10) - ) + .withDelaySeconds(10)) } - val result = client.sendMessageBatch(new SendMessageBatchRequest(queueUrl).withEntries( - new SendMessageBatchRequestEntry("1", "Message 1").withMessageGroupId("1"), - new SendMessageBatchRequestEntry("2", "Message 2").withMessageGroupId("2").withDelaySeconds(10) - )) + val result = client.sendMessageBatch( + new SendMessageBatchRequest(queueUrl).withEntries( + new SendMessageBatchRequestEntry("1", "Message 1").withMessageGroupId("1"), + new SendMessageBatchRequestEntry("2", "Message 2").withMessageGroupId("2").withDelaySeconds(10) + )) result.getSuccessful should have size 1 result.getFailed should have size 1 } @@ -386,9 +390,12 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter test("FIFO queues should return an error if an invalid message deduplication id parameter is provided") { // Given - val fifoQueueUrl = client.createQueue(new CreateQueueRequest("testQueue.fifo") - .addAttributesEntry("FifoQueue", "true") - .addAttributesEntry("ContentBasedDeduplication", "true")).getQueueUrl + val fifoQueueUrl = client + .createQueue( + new CreateQueueRequest("testQueue.fifo") + .addAttributesEntry("FifoQueue", "true") + .addAttributesEntry("ContentBasedDeduplication", "true")) + .getQueueUrl val regularQueueUrl = client.createQueue(new CreateQueueRequest("testQueue1")).getQueueUrl // An illegal character @@ -411,11 +418,11 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter test("FIFO queues need a content based deduplication strategy when no message deduplication ids are provided") { // Given val noStrategyRequest = new CreateQueueRequest("testQueue1.fifo") - .addAttributesEntry("FifoQueue", "true") + .addAttributesEntry("FifoQueue", "true") val noStrategyQueueUrl = client.createQueue(noStrategyRequest).getQueueUrl val withStrategyRequest = new CreateQueueRequest("testQueue2.fifo") - .addAttributesEntry("FifoQueue", "true") - .addAttributesEntry("ContentBasedDeduplication", "true") + .addAttributesEntry("FifoQueue", "true") + .addAttributesEntry("ContentBasedDeduplication", "true") val withStrategyQueueUrl = client.createQueue(withStrategyRequest).getQueueUrl // When @@ -431,7 +438,8 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter result2.isLeft should be(false) } - test("FIFO queues should not return a second message for the same message group if the first has not been deleted yet") { + test( + "FIFO queues should not return a second message for the same message group if the first has not been deleted yet") { // Given val queueUrl = createFifoQueue() @@ -482,10 +490,14 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter // When sending 4 distinct messages (based on dedup id), two for each message group val group1 = "group1" val group2 = "group2" - client.sendMessage(new SendMessageRequest(queueUrl, group1).withMessageGroupId(group1).withMessageDeduplicationId("1")) - client.sendMessage(new SendMessageRequest(queueUrl, group1).withMessageGroupId(group1).withMessageDeduplicationId("2")) - client.sendMessage(new SendMessageRequest(queueUrl, group2).withMessageGroupId(group2).withMessageDeduplicationId("3")) - client.sendMessage(new SendMessageRequest(queueUrl, group2).withMessageGroupId(group2).withMessageDeduplicationId("4")) + client.sendMessage( + new SendMessageRequest(queueUrl, group1).withMessageGroupId(group1).withMessageDeduplicationId("1")) + client.sendMessage( + new SendMessageRequest(queueUrl, group1).withMessageGroupId(group1).withMessageDeduplicationId("2")) + client.sendMessage( + new SendMessageRequest(queueUrl, group2).withMessageGroupId(group2).withMessageDeduplicationId("3")) + client.sendMessage( + new SendMessageRequest(queueUrl, group2).withMessageGroupId(group2).withMessageDeduplicationId("4")) val messages1 = client.receiveMessage(new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(2)).getMessages val messages2 = client.receiveMessage(new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(2)).getMessages @@ -515,7 +527,8 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter messages.head } // Messages received in a batch should be in order as well - val batchReceive = client.receiveMessage(new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10)).getMessages + val batchReceive = + client.receiveMessage(new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10)).getMessages val allMessages = deliveredSingleReceives ++ batchReceive allMessages.map(_.getBody) should be(messageBodies) @@ -545,9 +558,12 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter test("FIFO queues should return an error if an invalid receive request attempt id parameter is provided") { // Given - val fifoQueueUrl = client.createQueue(new CreateQueueRequest("testQueue.fifo") - .addAttributesEntry("FifoQueue", "true") - .addAttributesEntry("ContentBasedDeduplication", "true")).getQueueUrl + val fifoQueueUrl = client + .createQueue( + new CreateQueueRequest("testQueue.fifo") + .addAttributesEntry("FifoQueue", "true") + .addAttributesEntry("ContentBasedDeduplication", "true")) + .getQueueUrl val regularQueueUrl = client.createQueue(new CreateQueueRequest("testQueue1")).getQueueUrl // An illegal character @@ -579,7 +595,6 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter client.deleteMessage(queueUrl, messages1.head.getReceiptHandle) val messages2 = client.receiveMessage(new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(4)).getMessages - // Then sentMessages.map(_.getMessageId).toSet should have size 1 messages1.map(_.getBody) should have size 1 @@ -592,7 +607,8 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter // When for (i <- 1 to 10) { - client.sendMessage(new SendMessageRequest(queueUrl, s"Message $i") + client.sendMessage( + new SendMessageRequest(queueUrl, s"Message $i") .withMessageDeduplicationId("DedupId") .withMessageGroupId("1")) } @@ -601,23 +617,69 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter client.deleteMessage(queueUrl, m1.get.getReceiptHandle) val m2 = receiveSingleMessage(queueUrl) - // Then m1.map(_.getBody) should be(Some("Message 1")) m2 should be(empty) } + test("FIFO queue messages should return FIFO attribute names") { + val queueUrl = createFifoQueue() + + // When + val messageGroupId = "myMessageGroupId" + val deduplicationId = "myDedupId" + for (i <- 0 to 10) { + client.sendMessage( + new SendMessageRequest(queueUrl, s"Message $i") + .withMessageDeduplicationId(deduplicationId + i) + .withMessageGroupId(messageGroupId + i)) + } + + // Then + val messages = client + .receiveMessage(new ReceiveMessageRequest(queueUrl).withAttributeNames("All").withMaxNumberOfMessages(2)) + .getMessages + messages should have size 2 + messages.foreach { message => + message.getAttributes.get("MessageGroupId") should startWith(messageGroupId) + message.getAttributes.get("MessageDeduplicationId") should startWith(deduplicationId) + } + + // Specific attributes + val withMessageGroupIdMessages = client + .receiveMessage(new ReceiveMessageRequest(queueUrl).withAttributeNames("MessageGroupId").withMaxNumberOfMessages(2)) + .getMessages + withMessageGroupIdMessages should have size 2 + withMessageGroupIdMessages.foreach { message => + val attrs = message.getAttributes.toMap + attrs("MessageGroupId") should startWith(messageGroupId) + attrs.get("MessageDeduplicationId") should be(empty) + } + + val withDedupIdMessages = client + .receiveMessage(new ReceiveMessageRequest(queueUrl).withAttributeNames("MessageDeduplicationId").withMaxNumberOfMessages(2)) + .getMessages + withDedupIdMessages should have size 2 + withDedupIdMessages.foreach { message => + val attrs = message.getAttributes.toMap + attrs.get("MessageGroupId") should be(empty) + attrs("MessageDeduplicationId") should startWith(deduplicationId) + } + } + test("FIFO queues should be purgable") { // Given val queueUrl = createFifoQueue() client.sendMessage(new SendMessageRequest(queueUrl, "Body 1").withMessageGroupId("1")) client.sendMessage(new SendMessageRequest(queueUrl, "Body 2").withMessageGroupId("1")) - val attributes1 = client.getQueueAttributes(new GetQueueAttributesRequest(queueUrl).withAttributeNames("All")).getAttributes + val attributes1 = + client.getQueueAttributes(new GetQueueAttributesRequest(queueUrl).withAttributeNames("All")).getAttributes val m1 = receiveSingleMessage(queueUrl) // When client.purgeQueue(new PurgeQueueRequest().withQueueUrl(queueUrl)) - val attributes2 = client.getQueueAttributes(new GetQueueAttributesRequest(queueUrl).withAttributeNames("All")).getAttributes + val attributes2 = + client.getQueueAttributes(new GetQueueAttributesRequest(queueUrl).withAttributeNames("All")).getAttributes client.sendMessage(new SendMessageRequest(queueUrl, "Body 3").withMessageGroupId("1")) val m2 = receiveSingleMessage(queueUrl) @@ -1395,11 +1457,13 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter test("should validate redrive policy json") { // Then a[AmazonSQSException] shouldBe thrownBy { - client.createQueue(new CreateQueueRequest("q1") + client.createQueue( + new CreateQueueRequest("q1") .withAttributes(Map(redrivePolicyAttribute -> "not a proper json policy"))) } a[AmazonSQSException] shouldBe thrownBy { - client.createQueue(new CreateQueueRequest("q1") + client.createQueue( + new CreateQueueRequest("q1") .withAttributes(Map(redrivePolicyAttribute -> """{"wrong": "json"}"""))) } } diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CreateQueueDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CreateQueueDirectives.scala index dc589bffc..16cf1609e 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CreateQueueDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CreateQueueDirectives.scala @@ -78,7 +78,8 @@ trait CreateQueueDirectives { redrivePolicy.map(rd => DeadLettersQueueData(rd.queueName, rd.maxReceiveCount)), maxReceiveCount = None, isFifo, - hasContentBasedDeduplication) + hasContentBasedDeduplication + ) if (!queueName.matches("[\\p{Alnum}\\._-]*")) { throw SQSException.invalidParameterValue diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala index 5588bb162..2ab713f1c 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala @@ -20,9 +20,12 @@ trait ReceiveMessageDirectives { val WaitTimeSecondsAttribute = "WaitTimeSeconds" val ReceiveRequestAttemptIdAttribute = "ReceiveRequestAttemptId" val MessageAttributeNamePattern = "MessageAttributeName\\.\\d".r + val MessageDeduplicationIdAttribute = "MessageDeduplicationId" + val MessageGroupIdAttribute = "MessageGroupId" val AllAttributeNames = SentTimestampAttribute :: ApproximateReceiveCountAttribute :: - ApproximateFirstReceiveTimestampAttribute :: SenderIdAttribute :: Nil + ApproximateFirstReceiveTimestampAttribute :: SenderIdAttribute :: MessageDeduplicationIdAttribute :: + MessageGroupIdAttribute :: Nil } def receiveMessage(p: AnyParams) = { @@ -67,10 +70,12 @@ trait ReceiveMessageDirectives { verifyMessageWaitTime(waitTimeSecondsAttributeOpt) val msgsFuture = queueActor ? ReceiveMessages(visibilityTimeoutFromParameters, - maxNumberOfMessagesFromParameters, waitTimeSecondsFromParameters, receiveRequestAttemptId) + maxNumberOfMessagesFromParameters, + waitTimeSecondsFromParameters, + receiveRequestAttemptId) - lazy val attributeNames = - attributeNamesReader.read(p, AllAttributeNames) + val attributeNames = attributeNamesReader.read(p, AllAttributeNames) + println(attributeNames) def calculateAttributeValues(msg: MessageData): List[(String, String)] = { import AttributeValuesCalculator.Rule @@ -80,6 +85,8 @@ trait ReceiveMessageDirectives { Rule(SenderIdAttribute, () => "127.0.0.1"), Rule(SentTimestampAttribute, () => msg.created.getMillis.toString), Rule(ApproximateReceiveCountAttribute, () => msg.statistics.approximateReceiveCount.toString), + Rule(MessageDeduplicationIdAttribute, () => msg.messageDeduplicationId.getOrElse("")), + Rule(MessageGroupIdAttribute, () => msg.messageGroupId.getOrElse("")), Rule( ApproximateFirstReceiveTimestampAttribute, () => diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageBatchDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageBatchDirectives.scala index 5af647577..441aec45f 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageBatchDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageBatchDirectives.scala @@ -13,8 +13,9 @@ trait SendMessageBatchDirectives { queueActorAndDataFromRequest(p) { (queueActor, queueData) => verifyMessagesNotTooLong(p) val resultsFuture = batchRequest(SendMessageBatchPrefix, p) { (messageData, id) => - doSendMessage(queueActor, messageData, queueData).map { case (message, digest, messageAttributeDigest) => - + doSendMessage(queueActor, messageData, queueData).map { + case (message, digest, messageAttributeDigest) => + {id} {messageAttributeDigest} {digest} diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageDirectives.scala index 2a6e488bb..15e17f9a4 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageDirectives.scala @@ -25,9 +25,10 @@ trait SendMessageDirectives { this: ElasticMQDirectives with SQSLimitsModule => def sendMessage(p: AnyParams): Route = { p.action("SendMessage") { queueActorAndDataFromRequest(p) { (queueActor, queueData) => - doSendMessage(queueActor, p, queueData).map { case (message, digest, messageAttributeDigest) => - respondWith { - + doSendMessage(queueActor, p, queueData).map { + case (message, digest, messageAttributeDigest) => + respondWith { + {messageAttributeDigest} {digest} @@ -92,7 +93,9 @@ trait SendMessageDirectives { this: ElasticMQDirectives with SQSLimitsModule => }.toMap } - def doSendMessage(queueActor: ActorRef, parameters: Map[String, String], queueData: QueueData): Future[(MessageData, String, String)] = { + def doSendMessage(queueActor: ActorRef, + parameters: Map[String, String], + queueData: QueueData): Future[(MessageData, String, String)] = { val body = parameters(MessageBodyParameter) val messageAttributes = getMessageAttributes(parameters) @@ -140,10 +143,10 @@ trait SendMessageDirectives { this: ElasticMQDirectives with SQSLimitsModule => val delaySecondsOption = parameters.parseOptionalLong(DelaySecondsParameter) match { // FIFO queues don't support delays case Some(_) if queueData.isFifo => throw SQSException.invalidParameterValue - case d => d + case d => d } - val messageToSend = createMessage(body, messageAttributes, delaySecondsOption, messageGroupId, - messageDeduplicationId) + val messageToSend = + createMessage(body, messageAttributes, delaySecondsOption, messageGroupId, messageDeduplicationId) val digest = md5Digest(body) val messageAttributeDigest = md5AttributeDigest(messageAttributes) @@ -181,8 +184,11 @@ trait SendMessageDirectives { this: ElasticMQDirectives with SQSLimitsModule => findInvalidCharacter(0) } - private def createMessage(body: String, messageAttributes: Map[String, MessageAttribute], - delaySecondsOption: Option[Long], groupId: Option[String], deduplicationId: Option[String]) = { + private def createMessage(body: String, + messageAttributes: Map[String, MessageAttribute], + delaySecondsOption: Option[Long], + groupId: Option[String], + deduplicationId: Option[String]) = { val nextDelivery = delaySecondsOption match { case None => ImmediateNextDelivery case Some(delaySeconds) => AfterMillisNextDelivery(delaySeconds * 1000) @@ -192,6 +198,7 @@ trait SendMessageDirectives { this: ElasticMQDirectives with SQSLimitsModule => } private def sha256Hash(text: String): String = { - String.format("%064x", new java.math.BigInteger(1, MessageDigest.getInstance("SHA-256").digest(text.getBytes("UTF-8")))) + String.format("%064x", + new java.math.BigInteger(1, MessageDigest.getInstance("SHA-256").digest(text.getBytes("UTF-8")))) } } diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/ElasticMQDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/ElasticMQDirectives.scala index 446294e12..f03979cac 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/ElasticMQDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/ElasticMQDirectives.scala @@ -17,10 +17,10 @@ trait ElasticMQDirectives with Logging { /** - * A valid FIFO parameter value is at most 128 characters and can contain - * - alphanumeric characters (a-z , A-Z , 0-9 ) and - * - punctuation (!"#$%'()*+,-./:;=?@[\]^_`{|}~ ). - */ + * A valid FIFO parameter value is at most 128 characters and can contain + * - alphanumeric characters (a-z , A-Z , 0-9 ) and + * - punctuation (!"#$%'()*+,-./:;=?@[\]^_`{|}~ ). + */ private val validFifoParameterValueCharsRe = """^[a-zA-Z0-9!"#\$%&'\(\)\*\+,-\./:;<=>?@\[\\\]\^_`\{|\}~]{1,128}$""".r def rootPath(body: Route) = { @@ -30,12 +30,12 @@ trait ElasticMQDirectives } /** - * Valid values are alphanumeric characters and punctuation (!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~). The maximum length is - * 128 characters - * - * @param propValue The string to validate - * @return `true` if the string is valid, false otherwise - */ + * Valid values are alphanumeric characters and punctuation (!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~). The maximum length is + * 128 characters + * + * @param propValue The string to validate + * @return `true` if the string is valid, false otherwise + */ protected def isValidFifoPropertyValue(propValue: String): Boolean = validFifoParameterValueCharsRe.findFirstIn(propValue).isDefined }