Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ val jclOverSlf4j = "org.slf4j" % "jcl-over-slf4j" % "2.0.12" // needed form amaz
val scalatest = "org.scalatest" %% "scalatest" % "3.2.18"
val awaitility = "org.awaitility" % "awaitility-scala" % "4.2.1"

val amazonJavaSdkSqs = "com.amazonaws" % "aws-java-sdk-sqs" % "1.12.472" exclude ("commons-logging", "commons-logging")
val amazonJavaSdkSqs = "com.amazonaws" % "aws-java-sdk-sqs" % "1.12.580" exclude ("commons-logging", "commons-logging")
val amazonJavaV2SdkSqs = "software.amazon.awssdk" % "sqs" % "2.21.43"

val pekkoVersion = "1.0.2"
Expand Down
33 changes: 18 additions & 15 deletions core/src/main/scala/org/elasticmq/ElasticMQError.scala
Original file line number Diff line number Diff line change
@@ -1,32 +1,35 @@
package org.elasticmq
import org.elasticmq.msg.MessageMoveTaskHandle

trait ElasticMQError {
sealed trait ElasticMQError {
val queueName: String
val code: String
val code: String // TODO: code should be handled in rest-sqs module
val message: String
}

class QueueAlreadyExists(val queueName: String) extends ElasticMQError {
final case class QueueAlreadyExists(val queueName: String) extends ElasticMQError {
val code = "QueueAlreadyExists"
val message = s"Queue already exists: $queueName"
}

case class QueueCreationError(queueName: String, reason: String) extends ElasticMQError {
val code = "QueueCreationError"
val message = s"Queue named $queueName could not be created because of $reason"
}

case class InvalidParameterValue(queueName: String, reason: String) extends ElasticMQError {
final case class InvalidParameterValue(queueName: String, reason: String) extends ElasticMQError {
val code = "InvalidParameterValue"
val message = reason
}

class MessageDoesNotExist(val queueName: String, messageId: MessageId) extends ElasticMQError {
val code = "MessageDoesNotExist"
val message = s"Message does not exist: $messageId in queue: $queueName"
}

class InvalidReceiptHandle(val queueName: String, receiptHandle: String) extends ElasticMQError {
final case class InvalidReceiptHandle(val queueName: String, receiptHandle: String) extends ElasticMQError {
val code = "ReceiptHandleIsInvalid"
val message = s"""The receipt handle "$receiptHandle" is not valid."""
}

final case class InvalidMessageMoveTaskHandle(val taskHandle: MessageMoveTaskHandle) extends ElasticMQError {
val code = "ResourceNotFoundException"
val message = s"""The task handle "$taskHandle" is not valid or does not exist"""

override val queueName: String = "invalid"
}

final case class MessageMoveTaskAlreadyRunning(val queueName: String) extends ElasticMQError {
val code = "AWS.SimpleQueueService.UnsupportedOperation"
val message = s"""A message move task is already running on queue "$queueName""""
}
26 changes: 22 additions & 4 deletions core/src/main/scala/org/elasticmq/actor/QueueManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,23 @@ import org.elasticmq.actor.reply._
import org.elasticmq.msg._
import org.elasticmq.util.{Logging, NowProvider}

import scala.collection.mutable
import scala.reflect._

class QueueManagerActor(nowProvider: NowProvider, limits: Limits, queueEventListener: Option[ActorRef])
extends ReplyingActor
with QueueManagerActorStorage
with QueueManagerMessageMoveOps
with Logging {

type M[X] = QueueManagerMsg[X]
val ev: ClassTag[QueueManagerMsg[Unit]] = classTag[M[Unit]]

case class ActorWithQueueData(actorRef: ActorRef, queueData: QueueData)
private val queues = collection.mutable.HashMap[String, ActorWithQueueData]()
val queues: mutable.Map[MessageMoveTaskHandle, ActorWithQueueData] = mutable.HashMap[String, ActorWithQueueData]()

def receiveAndReply[T](msg: QueueManagerMsg[T]): ReplyAction[T] =
// TODO: create *Ops class like in QueueActor
def receiveAndReply[T](msg: QueueManagerMsg[T]): ReplyAction[T] = {
val self = context.self
msg match {
case CreateQueue(request) =>
queues.get(request.name) match {
Expand Down Expand Up @@ -63,7 +68,19 @@ class QueueManagerActor(nowProvider: NowProvider, limits: Limits, queueEventList
queues.collect {
case (name, actor) if actor.queueData.deadLettersQueue.exists(_.name == queueName) => name
}.toList

case StartMessageMoveTask(
sourceQueue,
sourceArn,
destinationQueue,
destinationArn,
maxNumberOfMessagesPerSecond
) =>
startMessageMoveTask(sourceQueue, sourceArn, destinationQueue, destinationArn, maxNumberOfMessagesPerSecond)
case MessageMoveTaskFinished(taskHandle) => onMessageMoveTaskFinished(taskHandle)
case CancelMessageMoveTask(taskHandle) => cancelMessageMoveTask(taskHandle)
}
}

protected def createQueueActor(
nowProvider: NowProvider,
Expand All @@ -88,7 +105,8 @@ class QueueManagerActor(nowProvider: NowProvider, limits: Limits, queueEventList
moveMessagesToQueueActor,
queueEventListener
)
)
),
s"queue-${queueData.name}"
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.elasticmq.actor
import org.apache.pekko.actor.{ActorContext, ActorRef}
import org.apache.pekko.util.Timeout
import org.elasticmq.QueueData

import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt

trait QueueManagerActorStorage {

def context: ActorContext

implicit lazy val ec: ExecutionContext = context.dispatcher
implicit lazy val timeout: Timeout = 5.seconds

case class ActorWithQueueData(actorRef: ActorRef, queueData: QueueData)
def queues: mutable.Map[String, ActorWithQueueData]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.elasticmq.actor
import org.apache.pekko.actor.ActorRef
import org.apache.pekko.util.Timeout
import org.elasticmq.actor.reply._
import org.elasticmq.msg.{CancelMovingMessages, GetQueueData, MessageMoveTaskHandle, StartMovingMessages}
import org.elasticmq.util.Logging
import org.elasticmq.{ElasticMQError, InvalidMessageMoveTaskHandle}

import scala.collection.mutable
import scala.concurrent.Future
import scala.util.{Failure, Success}

trait QueueManagerMessageMoveOps extends Logging {
this: QueueManagerActorStorage =>

private val messageMoveTasks = mutable.HashMap[MessageMoveTaskHandle, ActorRef]()

def startMessageMoveTask(
sourceQueue: ActorRef,
sourceArn: String,
destinationQueue: Option[ActorRef],
destinationArn: Option[String],
maxNumberOfMessagesPerSecond: Option[Int]
)(implicit timeout: Timeout): ReplyAction[Either[ElasticMQError, MessageMoveTaskHandle]] = {
val self = context.self
val replyTo = context.sender()
(for {
destinationQueueActorRef <- destinationQueue
.map(Future.successful)
.getOrElse(findDeadLetterQueueSource(sourceQueue))
result <- sourceQueue ? StartMovingMessages(
destinationQueueActorRef,
destinationArn,
sourceArn,
maxNumberOfMessagesPerSecond,
self
)
} yield (result, destinationQueueActorRef)).onComplete {
case Success((result, destinationQueueActorRef)) =>
result match {
case Right(taskHandle) =>
logger.debug("Message move task {} => {} created", sourceQueue, destinationQueueActorRef)
messageMoveTasks.put(taskHandle, sourceQueue)
replyTo ! Right(taskHandle)
case Left(error) =>
logger.error("Failed to start message move task: {}", error)
replyTo ! Left(error)
}
case Failure(ex) => logger.error("Failed to start message move task", ex)
}
DoNotReply()
}

def onMessageMoveTaskFinished(taskHandle: MessageMoveTaskHandle): ReplyAction[Unit] = {
logger.debug("Message move task {} finished", taskHandle)
messageMoveTasks.remove(taskHandle)
DoNotReply()
}

def cancelMessageMoveTask(taskHandle: MessageMoveTaskHandle): ReplyAction[Either[ElasticMQError, Long]] = {
logger.info("Cancelling message move task {}", taskHandle)
messageMoveTasks.get(taskHandle) match {
case Some(sourceQueue) =>
val replyTo = context.sender()
sourceQueue ? CancelMovingMessages() onComplete {
case Success(numMessageMoved) =>
logger.debug("Message move task {} cancelled", taskHandle)
messageMoveTasks.remove(taskHandle)
replyTo ! Right(numMessageMoved)
case Failure(ex) =>
logger.error("Failed to cancel message move task", ex)
replyTo ! Left(ex)
}
DoNotReply()
case None =>
ReplyWith(Left(new InvalidMessageMoveTaskHandle(taskHandle)))
}
}

private def findDeadLetterQueueSource(sourceQueue: ActorRef) = {
val queueDataF = sourceQueue ? GetQueueData()
queueDataF.map { queueData =>
queues
.filter { case (_, data) =>
data.queueData.deadLettersQueue.exists(dlqData => dlqData.name == queueData.name)
}
.head
._2
.actorRef
}
}
}
21 changes: 21 additions & 0 deletions core/src/main/scala/org/elasticmq/actor/queue/MessageQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,22 @@ sealed trait MessageQueue {
*/
def getById(id: String): Option[InternalMessage]

/** Remove the first message from the queue
*
* @return
* The message or None if not exists
*/
def pop: Option[InternalMessage]

/** Get all messages in queue
*
* @return
* All messages in queue
*/
def all: Iterable[InternalMessage]

def size: Long

/** Drop all messages on the queue
*/
def clear(): Unit
Expand Down Expand Up @@ -137,8 +146,20 @@ object MessageQueue {

override def getById(id: String): Option[InternalMessage] = messagesById.get(id)

override def pop: Option[InternalMessage] = {
if (messageQueue.isEmpty) {
None
} else {
val firstMessage = messageQueue.dequeue()
remove(firstMessage.id)
Some(firstMessage)
}
}

override def all: Iterable[InternalMessage] = messagesById.values

override def size: Long = messageQueue.size

override def clear(): Unit = {
messagesById.clear()
messageQueue.clear()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.elasticmq.actor.queue

import org.apache.pekko.actor.ActorRef
import org.apache.pekko.actor.{ActorLogging, ActorRef}
import org.elasticmq.QueueData
import org.elasticmq.actor.reply.ReplyingActor
import org.elasticmq.actor.reply.{ReplyAction, ReplyingActor}
import org.elasticmq.msg._
import org.elasticmq.util.{Logging, NowProvider}

Expand All @@ -24,7 +24,7 @@ class QueueActor(
type M[X] = QueueMsg[X]
val ev = classTag[M[Unit]]

def receiveAndReply[T](msg: QueueMsg[T]) =
def receiveAndReply[T](msg: QueueMsg[T]): ReplyAction[T] =
msg match {
case m: QueueQueueMsg[T] =>
val replyAction = receiveAndReplyQueueMsg(m)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ trait QueueActorMessageOps
with DeleteMessageOps
with ReceiveMessageOps
with MoveMessageOps
with MoveMessagesAsyncOps
with Timers {
this: QueueActorStorage =>

Expand All @@ -38,6 +39,20 @@ trait QueueActorMessageOps
fifoMessagesHistory = fifoMessagesHistory.cleanOutdatedMessages(nowProvider)
DoNotReply()
case RestoreMessages(messages) => restoreMessages(messages)
case StartMovingMessages(
destinationQueue,
destinationArn,
sourceArn,
maxNumberOfMessagesPerSecond,
queueManager
) =>
startMovingMessages(destinationQueue, destinationArn, sourceArn, maxNumberOfMessagesPerSecond, queueManager)
case CancelMovingMessages() =>
cancelMovingMessages()
case MoveFirstMessage(destinationQueue, queueManager) =>
moveFirstMessage(destinationQueue, queueManager).send()
case GetMovingMessagesTasks() =>
getMovingMessagesTasks
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ trait QueueActorStorage {
notificationF.onComplete {
case Success(_) =>
result match {
case Some(r) => actualSender ! r
case Some(r) =>
logger.debug(s"Sending message $r from ${context.self} to $actualSender")
actualSender ! r
case None =>
}
case Failure(ex) => logger.error(s"Failed to notify queue event listener. The state may be inconsistent.", ex)
Expand Down
Loading