diff --git a/common-test/src/main/scala/org/elasticmq/test/package.scala b/common-test/src/main/scala/org/elasticmq/test/package.scala index 922c35048..54cd98c68 100644 --- a/common-test/src/main/scala/org/elasticmq/test/package.scala +++ b/common-test/src/main/scala/org/elasticmq/test/package.scala @@ -1,7 +1,6 @@ package org.elasticmq import java.io.File - import scala.util.Random package object test { @@ -30,5 +29,6 @@ package object test { dir.listFiles().filter(_.isDirectory).foreach(deleteDirRecursively) dir.listFiles().filter(!_.isDirectory).foreach(_.delete()) dir.delete() + () } } diff --git a/core/src/main/scala/org/elasticmq/FifoDeduplicationIdsHistory.scala b/core/src/main/scala/org/elasticmq/FifoDeduplicationIdsHistory.scala index d34cc8e05..985a27dfe 100644 --- a/core/src/main/scala/org/elasticmq/FifoDeduplicationIdsHistory.scala +++ b/core/src/main/scala/org/elasticmq/FifoDeduplicationIdsHistory.scala @@ -65,7 +65,7 @@ case class FifoDeduplicationIdsHistory( } object FifoDeduplicationIdsHistory { - val DeduplicationIntervalMinutes = 5 + val DeduplicationIntervalMinutes = 5L private def apply( messagesByDeduplicationId: Map[DeduplicationId, InternalMessage], diff --git a/core/src/main/scala/org/elasticmq/actor/queue/MessageQueue.scala b/core/src/main/scala/org/elasticmq/actor/queue/MessageQueue.scala index 79ec2f809..51521ac79 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/MessageQueue.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/MessageQueue.scala @@ -144,7 +144,9 @@ object MessageQueue { messageQueue.clear() } - override def remove(messageId: String): Unit = messagesById.remove(messageId) + override def remove(messageId: String): Unit = { + val _: Option[InternalMessage] = messagesById.remove(messageId) + } override def filterNot(p: InternalMessage => Boolean): MessageQueue = { val newMessageQueue = new SimpleMessageQueue @@ -180,7 +182,7 @@ object MessageQueue { messagesById += message.id -> message val messageGroupId = getMessageGroupIdUnsafe(message) val groupMessages = messagesbyMessageGroupId.getOrElseUpdate(messageGroupId, mutable.PriorityQueue.empty) - messagesbyMessageGroupId.put(messageGroupId, groupMessages += message) + val _ = messagesbyMessageGroupId.put(messageGroupId, groupMessages += message) } override def clear(): Unit = { diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorQueueOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorQueueOps.scala index 041b61326..140d693cd 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorQueueOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorQueueOps.scala @@ -1,8 +1,7 @@ package org.elasticmq.actor.queue import org.elasticmq.{FifoDeduplicationIdsHistory, QueueStatistics} -import org.elasticmq.actor.reply.ReplyAction -import org.elasticmq.actor.reply.valueToReplyWith +import org.elasticmq.actor.reply.{valueToReplyWith, ReplyAction} import org.elasticmq.msg._ import org.elasticmq.util.Logging @@ -54,9 +53,9 @@ trait QueueActorQueueOps extends Logging { } private def getQueueStatistics(deliveryTime: Long) = { - var visible = 0 - var invisible = 0 - var delayed = 0 + var visible = 0L + var invisible = 0L + var delayed = 0L messageQueue.all.foreach { internalMessage => if (internalMessage.nextDelivery < deliveryTime) { diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorWaitForMessagesOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorWaitForMessagesOps.scala index 8cb81d7cc..69f0bc315 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorWaitForMessagesOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorWaitForMessagesOps.scala @@ -31,7 +31,7 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO override def receiveAndReplyMessageMsg[T](msg: QueueMessageMsg[T]): ReplyAction[T] = { msg match { - case SendMessage(message) => + case SendMessage(_) => val result = super.receiveAndReplyMessageMsg(msg) tryReply() scheduleTryReplyWhenAvailable() @@ -50,7 +50,7 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO } else result.send() - case uvm: UpdateVisibilityTimeout => + case _: UpdateVisibilityTimeout => val result = super.receiveAndReplyMessageMsg(msg) tryReply() scheduleTryReplyWhenAvailable() @@ -91,6 +91,7 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO private def scheduleTimeoutReply(seq: Long, waitForMessages: Duration): Unit = { schedule(waitForMessages.toMillis, ReplyIfTimeout(seq, Nil)) + () } private def scheduleTryReplyWhenAvailable(): Unit = { diff --git a/core/src/main/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCache.scala b/core/src/main/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCache.scala index e10aebbf5..6fa4fa052 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCache.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCache.scala @@ -1,11 +1,11 @@ package org.elasticmq.actor.queue -import scala.collection.mutable - import org.elasticmq.actor.queue.ReceiveRequestAttemptCache.ReceiveFailure import org.elasticmq.actor.queue.ReceiveRequestAttemptCache.ReceiveFailure.{Expired, Invalid} import org.elasticmq.util.NowProvider +import scala.collection.mutable + class ReceiveRequestAttemptCache { private val cache: mutable.Map[String, (Long, List[InternalMessage])] = mutable.Map.empty @@ -15,7 +15,7 @@ class ReceiveRequestAttemptCache { def add(attemptId: String, messages: List[InternalMessage])(implicit nowProvider: NowProvider): Unit = { // Cache the given attempt - cache.put(attemptId, (nowProvider.nowMillis, messages)) + val _ = cache.put(attemptId, (nowProvider.nowMillis, messages)) // Remove attempts that are older than fifteen minutes // TODO: Rather than do this on every add, maybe this could be done periodically? @@ -27,7 +27,7 @@ class ReceiveRequestAttemptCache { ): Either[ReceiveFailure, Option[List[InternalMessage]]] = { cache.get(attemptId) match { case Some((cacheTime, _)) if cacheTime + FIVE_MINUTES < nowProvider.nowMillis => - cache.remove(attemptId) + val _ = cache.remove(attemptId) Left(Expired) case Some((_, messages)) if !allValid(messages, messageQueue) => Left(Invalid) diff --git a/core/src/main/scala/org/elasticmq/actor/reply/ReplyingActor.scala b/core/src/main/scala/org/elasticmq/actor/reply/ReplyingActor.scala index c11abfa4a..fc5ff6fa4 100644 --- a/core/src/main/scala/org/elasticmq/actor/reply/ReplyingActor.scala +++ b/core/src/main/scala/org/elasticmq/actor/reply/ReplyingActor.scala @@ -2,8 +2,8 @@ package org.elasticmq.actor.reply import org.apache.pekko.actor.Actor import org.apache.pekko.actor.Status.Failure + import scala.reflect.ClassTag -import scala.language.higherKinds trait ReplyingActor extends Actor { type M[X] <: Replyable[X] diff --git a/persistence/persistence-core/src/main/scala/org/elasticmq/persistence/QueueSorter.scala b/persistence/persistence-core/src/main/scala/org/elasticmq/persistence/QueueSorter.scala index 8b018f253..76f63e894 100644 --- a/persistence/persistence-core/src/main/scala/org/elasticmq/persistence/QueueSorter.scala +++ b/persistence/persistence-core/src/main/scala/org/elasticmq/persistence/QueueSorter.scala @@ -3,7 +3,7 @@ package org.elasticmq.persistence import org.elasticmq.persistence.TopologicalSorter.Node import org.elasticmq.util.Logging -import scala.collection.mutable +import scala.collection.immutable.ListMap object QueueSorter extends Logging { @@ -22,7 +22,7 @@ object QueueSorter extends Logging { private def createReferencedQueuesEdges( nodes: List[CreateQueueMetadata] ): Map[CreateQueueMetadata, Set[CreateQueueMetadata]] = { - val edges = new mutable.ListMap[CreateQueueMetadata, Set[CreateQueueMetadata]] + var edges = new ListMap[CreateQueueMetadata, Set[CreateQueueMetadata]] // create map to look up queues by name val queueMap = nodes.map { cq => cq.name -> cq }.toMap @@ -43,7 +43,7 @@ object QueueSorter extends Logging { queueMap.get(queueName) match { case Some(queue) => val edgesForNode = edges.getOrElse(cq, Set.empty) - edges.put(cq, edgesForNode + queue) + edges = edges.updated(cq, edgesForNode + queue) case None => logger.error("{} queue {} not found", label, queueName) } } diff --git a/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/ConfigBasedQueuePersistenceActor.scala b/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/ConfigBasedQueuePersistenceActor.scala index d3716ddd3..ab9d7ccfb 100644 --- a/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/ConfigBasedQueuePersistenceActor.scala +++ b/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/ConfigBasedQueuePersistenceActor.scala @@ -2,16 +2,16 @@ package org.elasticmq.persistence.file import org.apache.pekko.actor.{Actor, ActorRef} import org.apache.pekko.util.Timeout +import org.elasticmq.{ElasticMQError, QueueData} import org.elasticmq.actor.queue._ import org.elasticmq.actor.reply._ import org.elasticmq.msg.CreateQueue import org.elasticmq.persistence.CreateQueueMetadata import org.elasticmq.util.Logging -import org.elasticmq.{ElasticMQError, QueueData} import scala.collection.mutable -import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ import scala.util.{Failure, Success} class ConfigBasedQueuePersistenceActor(storagePath: String, baseQueues: List[CreateQueueMetadata]) @@ -25,15 +25,15 @@ class ConfigBasedQueuePersistenceActor(storagePath: String, baseQueues: List[Cre def receive: Receive = { case QueueEvent.QueueCreated(queue) => - queues.put(queue.name, queue) + val _: Option[QueueData] = queues.put(queue.name, queue) QueuePersister.saveToConfigFile(queues.values.toList, storagePath) case QueueEvent.QueueDeleted(queueName) => - queues.remove(queueName) + val _: Option[QueueData] = queues.remove(queueName) QueuePersister.saveToConfigFile(queues.values.toList, storagePath) case QueueEvent.QueueMetadataUpdated(queue) => - queues.put(queue.name, queue) + val _: Option[QueueData] = queues.put(queue.name, queue) QueuePersister.saveToConfigFile(queues.values.toList, storagePath) case _: QueueEvent.MessageAdded | _: QueueEvent.MessageUpdated | _: QueueEvent.MessageRemoved => diff --git a/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/QueueConfigUtil.scala b/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/QueueConfigUtil.scala index 00cf8055e..24021d47f 100644 --- a/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/QueueConfigUtil.scala +++ b/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/QueueConfigUtil.scala @@ -17,7 +17,7 @@ object QueueConfigUtil extends Logging { .map(readPersistedQueuesFromConfig) .getOrElse(Nil) - def readPersistedQueuesFromConfig(persistedQueuesConfig: Config): List[CreateQueueMetadata] = + def readPersistedQueuesFromConfig(persistedQueuesConfig: Config): List[CreateQueueMetadata] = { Try( persistedQueuesConfig .getObject("queues") @@ -30,6 +30,7 @@ object QueueConfigUtil extends Logging { throw new IllegalStateException(ex) } } + } private def getQueuesFromConfig(queuesConfig: Map[String, ConfigValue]): List[CreateQueueMetadata] = { Try(getQueuesFromConfigUnsafe(queuesConfig)) match { diff --git a/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/QueuePersister.scala b/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/QueuePersister.scala index f2f42e810..036ae6cfe 100644 --- a/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/QueuePersister.scala +++ b/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/QueuePersister.scala @@ -41,7 +41,7 @@ object QueuePersister { def saveToConfigFile(queues: List[QueueData], storagePath: String): Unit = { val queuesConfig: String = prepareQueuesConfig(queues) - new PrintWriter(storagePath) { + val _: PrintWriter = new PrintWriter(storagePath) { write(queuesConfig) close() } diff --git a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/DBMessage.scala b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/DBMessage.scala index cd61bac72..d2b02c7c4 100644 --- a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/DBMessage.scala +++ b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/DBMessage.scala @@ -56,7 +56,7 @@ case class DBMessage( isFifo = false, groupId, deduplicationId.map(id => DeduplicationId(id)), - tracingId.map(TracingId), + tracingId.map(TracingId.apply), sequenceNumber ) } diff --git a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/MessageRepository.scala b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/MessageRepository.scala index 1ecb82493..ef2748c7e 100644 --- a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/MessageRepository.scala +++ b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/MessageRepository.scala @@ -14,7 +14,7 @@ class MessageRepository(queueName: String, db: DB) extends Logging { if (db.persistenceConfig.pruneDataOnInit) { logger.debug(s"Deleting stored messages for queue $queueName") - sql"drop table if exists $tableName".execute.apply() + val _: Boolean = sql"drop table if exists $tableName".execute.apply() } sql""" @@ -34,7 +34,7 @@ class MessageRepository(queueName: String, db: DB) extends Logging { )""".execute.apply() def drop(): Unit = { - sql"drop table if exists $tableName".execute.apply() + val _: Boolean = sql"drop table if exists $tableName".execute.apply() } def findAll(): List[InternalMessage] = { diff --git a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/QueueRepository.scala b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/QueueRepository.scala index 46aa424c6..830125c97 100644 --- a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/QueueRepository.scala +++ b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/QueueRepository.scala @@ -12,7 +12,7 @@ class QueueRepository(db: DB) extends Logging { if (db.persistenceConfig.pruneDataOnInit) { logger.debug(s"Deleting stored queues") - sql"drop table if exists $tableName".execute.apply() + val _: Boolean = sql"drop table if exists $tableName".execute.apply() } sql""" @@ -22,7 +22,7 @@ class QueueRepository(db: DB) extends Logging { )""".execute.apply() def drop(): Unit = { - sql"drop table if exists $tableName".execute.apply() + val _: Boolean = sql"drop table if exists $tableName".execute.apply() } def findAll(): List[CreateQueueMetadata] = { diff --git a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/SqlQueuePersistenceActor.scala b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/SqlQueuePersistenceActor.scala index 14c29b699..e71d295d4 100644 --- a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/SqlQueuePersistenceActor.scala +++ b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/SqlQueuePersistenceActor.scala @@ -10,8 +10,8 @@ import org.elasticmq.persistence.CreateQueueMetadata import org.elasticmq.util.Logging import scala.collection.mutable -import scala.concurrent.duration.DurationInt import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.DurationInt import scala.util.{Failure, Success} case class GetAllMessages(queueName: String) extends Replyable[List[InternalMessage]] @@ -34,15 +34,16 @@ class SqlQueuePersistenceActor( logger.debug(s"Storing queue data: $queueData") if (repos.contains(queueData.name)) { - queueRepo.update(CreateQueueMetadata.from(queueData)) + val _: Int = queueRepo.update(CreateQueueMetadata.from(queueData)) } else { queueRepo.add(CreateQueueMetadata.from(queueData)) repos.put(queueData.name, new MessageRepository(queueData.name, db)) + () } case QueueEvent.QueueDeleted(queueName) => logger.debug(s"Removing queue data for queue $queueName") - queueRepo.remove(queueName) + val _: Int = queueRepo.remove(queueName) repos.remove(queueName).foreach(_.drop()) case QueueEvent.QueueMetadataUpdated(queueData) => @@ -50,6 +51,7 @@ class SqlQueuePersistenceActor( logger.debug(s"Updating queue: $queueData") } queueRepo.update(CreateQueueMetadata.from(queueData)) + () case QueueEvent.MessageAdded(queueName, message) => logger.whenDebugEnabled { @@ -85,7 +87,7 @@ class SqlQueuePersistenceActor( private def createQueues( queueManagerActor: ActorRef - )(implicit timeout: Timeout): Future[Either[List[ElasticMQError], OperationStatus]] = { + ): Future[Either[List[ElasticMQError], OperationStatus]] = { val persistedQueues = queueRepo.findAll() val allQueues = CreateQueueMetadata.mergePersistedAndBaseQueues(persistedQueues, baseQueues) diff --git a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/package.scala b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/package.scala index 4d35edef1..d51075654 100644 --- a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/package.scala +++ b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/package.scala @@ -1,26 +1,24 @@ -package org.elasticmq.persistence +package org.elasticmq.persistence.sql +import org.elasticmq.persistence.{CreateQueueMetadata, DeadLettersQueue} import spray.json.{DefaultJsonProtocol, JsonFormat} -package object sql { +case class SerializableAttribute( + key: String, + primaryDataType: String, + stringValue: String, + customType: Option[String] +) - case class SerializableAttribute( - key: String, - primaryDataType: String, - stringValue: String, - customType: Option[String] - ) - - object SerializableAttributeProtocol extends DefaultJsonProtocol { - implicit val colorFormat: JsonFormat[SerializableAttribute] = jsonFormat4(SerializableAttribute) - } +object SerializableAttributeProtocol extends DefaultJsonProtocol { + implicit val colorFormat: JsonFormat[SerializableAttribute] = jsonFormat4(SerializableAttribute.apply) +} - object DeadLettersQueueProtocol extends DefaultJsonProtocol { - implicit val DeadLettersQueueFormat: JsonFormat[DeadLettersQueue] = jsonFormat2(DeadLettersQueue) - } +object DeadLettersQueueProtocol extends DefaultJsonProtocol { + implicit val DeadLettersQueueFormat: JsonFormat[DeadLettersQueue] = jsonFormat2(DeadLettersQueue.apply) +} - import DeadLettersQueueProtocol._ +import org.elasticmq.persistence.sql.DeadLettersQueueProtocol._ - object CreateQueueProtocol extends DefaultJsonProtocol { - implicit val CreateQueueFormat: JsonFormat[CreateQueueMetadata] = jsonFormat12(CreateQueueMetadata.apply) - } +object CreateQueueProtocol extends DefaultJsonProtocol { + implicit val CreateQueueFormat: JsonFormat[CreateQueueMetadata] = jsonFormat12(CreateQueueMetadata.apply) } diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/AttributesModule.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/AttributesModule.scala index 3fa66e10a..6b8a82f79 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/AttributesModule.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/AttributesModule.scala @@ -1,6 +1,6 @@ package org.elasticmq.rest.sqs -import org.elasticmq.{NumberMessageAttribute, BinaryMessageAttribute, StringMessageAttribute, MessageAttribute} +import org.elasticmq.{BinaryMessageAttribute, MessageAttribute, NumberMessageAttribute, StringMessageAttribute} trait AttributesModule { val attributeNamesReader = new AttributeNamesReader diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/BatchRequestsModule.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/BatchRequestsModule.scala index 619a22126..06d42b6b3 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/BatchRequestsModule.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/BatchRequestsModule.scala @@ -1,12 +1,11 @@ package org.elasticmq.rest.sqs -import Constants.{EmptyRequestId, QueueUrlParameter} +import org.elasticmq.rest.sqs.Constants.QueueUrlParameter import org.elasticmq.Limits - -import java.util.regex.Pattern -import spray.json.DefaultJsonProtocol._ import spray.json.{JsonFormat, RootJsonFormat} +import spray.json.DefaultJsonProtocol._ +import java.util.regex.Pattern import scala.concurrent.Future import scala.xml.Elem @@ -40,7 +39,7 @@ trait BatchRequestsModule { Future .sequence(result) .map( - _.foldLeft(Option.empty[List[Failed]], List.empty[R]) { + _.foldLeft((Option.empty[List[Failed]], List.empty[R])) { case ((failures, successes), Left(failed)) => (failures.map(_ :+ failed).orElse(Some(List(failed))), successes) case ((failures, successes), Right(success)) => (failures, successes :+ success) diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ChangeMessageVisibilityBatchDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ChangeMessageVisibilityBatchDirectives.scala index 21318aab1..87d903f21 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ChangeMessageVisibilityBatchDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ChangeMessageVisibilityBatchDirectives.scala @@ -1,14 +1,15 @@ package org.elasticmq.rest.sqs -import Constants._ import org.elasticmq.{DeliveryReceipt, MillisVisibilityTimeout} -import org.elasticmq.msg.UpdateVisibilityTimeout import org.elasticmq.actor.reply._ +import org.elasticmq.msg.UpdateVisibilityTimeout import org.elasticmq.rest.sqs.Action.ChangeMessageVisibilityBatch +import org.elasticmq.rest.sqs.Constants._ import org.elasticmq.rest.sqs.directives.ElasticMQDirectives import org.elasticmq.rest.sqs.model.RequestPayload import spray.json.DefaultJsonProtocol._ import spray.json.RootJsonFormat + import scala.concurrent.Future import scala.xml.Elem diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ChangeMessageVisibilityDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ChangeMessageVisibilityDirectives.scala index 0fb6fb8d8..f53d26073 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ChangeMessageVisibilityDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ChangeMessageVisibilityDirectives.scala @@ -1,16 +1,14 @@ package org.elasticmq.rest.sqs -import org.apache.pekko.http.scaladsl.model.HttpEntity +import org.elasticmq.{DeliveryReceipt, MillisVisibilityTimeout} import org.elasticmq.actor.reply._ import org.elasticmq.msg.UpdateVisibilityTimeout import org.elasticmq.rest.sqs.Action.ChangeMessageVisibility import org.elasticmq.rest.sqs.Constants._ import org.elasticmq.rest.sqs.directives.ElasticMQDirectives -import org.elasticmq.{DeliveryReceipt, MillisVisibilityTimeout} -import spray.json.RootJsonFormat -import spray.json.DefaultJsonProtocol._ -import org.apache.pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import org.elasticmq.rest.sqs.model.RequestPayload +import spray.json.DefaultJsonProtocol._ +import spray.json.RootJsonFormat trait ChangeMessageVisibilityDirectives { this: ElasticMQDirectives with ResponseMarshaller => def changeMessageVisibility(p: RequestPayload)(implicit marshallerDependencies: MarshallerDependencies) = { @@ -31,7 +29,7 @@ trait ChangeMessageVisibilityDirectives { this: ElasticMQDirectives with Respons } } - case class ChangeMessageVisibilityActionRequest(QueueUrl: String, ReceiptHandle: String, VisibilityTimeout: Int) + case class ChangeMessageVisibilityActionRequest(QueueUrl: String, ReceiptHandle: String, VisibilityTimeout: Long) object ChangeMessageVisibilityActionRequest { implicit val requestJsonFormat: RootJsonFormat[ChangeMessageVisibilityActionRequest] = jsonFormat3( @@ -43,7 +41,7 @@ trait ChangeMessageVisibilityDirectives { this: ElasticMQDirectives with Respons override def read(params: Map[String, String]): ChangeMessageVisibilityActionRequest = { val queueUrl = requiredParameter(params)(QueueUrlParameter) val receiptHandle = requiredParameter(params)(ReceiptHandleParameter) - val visibilityTimeout = requiredParameter(params)(VisibilityTimeoutParameter).toInt + val visibilityTimeout = requiredParameter(params)(VisibilityTimeoutParameter).toLong ChangeMessageVisibilityActionRequest(queueUrl, receiptHandle, visibilityTimeout) } } diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/DeleteQueueDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/DeleteQueueDirectives.scala index 42d58f86f..ef4ec2a78 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/DeleteQueueDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/DeleteQueueDirectives.scala @@ -1,16 +1,16 @@ package org.elasticmq.rest.sqs -import Constants._ import org.elasticmq.actor.reply._ - -import scala.async.Async._ import org.elasticmq.msg.DeleteQueue import org.elasticmq.rest.sqs.Action.{DeleteQueue => DeleteQueueAction} +import org.elasticmq.rest.sqs.Constants._ import org.elasticmq.rest.sqs.directives.ElasticMQDirectives import org.elasticmq.rest.sqs.model.RequestPayload import spray.json.DefaultJsonProtocol._ import spray.json.RootJsonFormat +import scala.async.Async._ + trait DeleteQueueDirectives { this: ElasticMQDirectives with QueueURLModule with ResponseMarshaller => def deleteQueue(p: RequestPayload)(implicit marshallerDependencies: MarshallerDependencies) = { p.action(DeleteQueueAction) { diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala index 565a0732e..0b19413f7 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala @@ -44,7 +44,7 @@ trait ReceiveMessageDirectives { p.action(ReceiveMessage) { val requestParameters = p.as[ReceiveMessageActionRequest] queueActorAndDataFromQueueUrl(requestParameters.QueueUrl) { (queueActor, queueData) => - val visibilityTimeoutParameterOpt: Option[Int] = requestParameters.VisibilityTimeout + val visibilityTimeoutParameterOpt: Option[Long] = requestParameters.VisibilityTimeout val maxNumberOfMessagesAttributeOpt: Option[Int] = requestParameters.MaxNumberOfMessages val waitTimeSecondsAttributeOpt: Option[Long] = requestParameters.WaitTimeSeconds @@ -162,7 +162,7 @@ trait ReceiveMessageDirectives { MessageAttributeNames: Option[List[String]], QueueUrl: String, ReceiveRequestAttemptId: Option[String], - VisibilityTimeout: Option[Int], + VisibilityTimeout: Option[Long], WaitTimeSeconds: Option[Long] ) @@ -173,7 +173,7 @@ trait ReceiveMessageDirectives { MessageAttributeNames: Option[List[String]], QueueUrl: String, ReceiveRequestAttemptId: Option[String], - VisibilityTimeout: Option[Int], + VisibilityTimeout: Option[Long], WaitTimeSeconds: Option[Long] ): ReceiveMessageActionRequest = { new ReceiveMessageActionRequest( @@ -200,7 +200,7 @@ trait ReceiveMessageDirectives { val messageAttributeNames = getMessageAttributeNames(params).toList val queueUrl = requiredParameter(params)(QueueUrlParameter) val receiveRequestAttemptId = params.get(MessageReadeableAttributeNames.ReceiveRequestAttemptIdAttribute) - val visibilityTimeout = params.get(VisibilityTimeoutParameter).map(_.toInt) + val visibilityTimeout = params.get(VisibilityTimeoutParameter).map(_.toLong) val waitTimeSeconds = params.get(MessageReadeableAttributeNames.WaitTimeSecondsAttribute).map(_.toLong) ReceiveMessageActionRequest( Some(attributeNames), diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala index 2dff116fd..4ae9fcb74 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala @@ -1,24 +1,18 @@ package org.elasticmq.rest.sqs +import com.typesafe.config.ConfigFactory import org.apache.pekko.actor.{ActorRef, ActorSystem, Props} import org.apache.pekko.http.scaladsl.Http -import org.apache.pekko.http.scaladsl.server.{Directive1, Directives} -import org.apache.pekko.stream.ActorMaterializer +import org.apache.pekko.http.scaladsl.server.Directive1 +import org.apache.pekko.stream.Materializer import org.apache.pekko.util.Timeout -import com.typesafe.config.ConfigFactory import org.elasticmq._ import org.elasticmq.actor.QueueManagerActor import org.elasticmq.metrics.QueuesMetrics import org.elasticmq.rest.sqs.Constants._ import org.elasticmq.rest.sqs.XmlNsVersion.extractXmlNs -import org.elasticmq.rest.sqs.directives.{ - AWSProtocolDirectives, - AnyParamDirectives, - ElasticMQDirectives, - UnmatchedActionRoutes -} +import org.elasticmq.rest.sqs.directives.{AnyParamDirectives, AWSProtocolDirectives, ElasticMQDirectives, UnmatchedActionRoutes} import org.elasticmq.rest.sqs.model.RequestPayload -import org.elasticmq.rest.sqs.{AWSProtocol, XmlNsVersion} import org.elasticmq.util.{Logging, NowProvider} import java.io.ByteArrayOutputStream @@ -30,10 +24,10 @@ import java.util.concurrent.atomic.AtomicReference import javax.management.ObjectName import scala.collection.immutable.TreeMap import scala.collection.mutable.ArrayBuffer -import scala.concurrent.duration._ import scala.concurrent.{Await, Future} -import scala.util.control.NonFatal +import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal import scala.xml._ /** By default: