Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.elasticmq

import java.io.File

import scala.util.Random

package object test {
Expand Down Expand Up @@ -30,5 +29,6 @@ package object test {
dir.listFiles().filter(_.isDirectory).foreach(deleteDirRecursively)
dir.listFiles().filter(!_.isDirectory).foreach(_.delete())
dir.delete()
()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ case class FifoDeduplicationIdsHistory(
}

object FifoDeduplicationIdsHistory {
val DeduplicationIntervalMinutes = 5
val DeduplicationIntervalMinutes = 5L

private def apply(
messagesByDeduplicationId: Map[DeduplicationId, InternalMessage],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ object MessageQueue {
messageQueue.clear()
}

override def remove(messageId: String): Unit = messagesById.remove(messageId)
override def remove(messageId: String): Unit = {
val _: Option[InternalMessage] = messagesById.remove(messageId)
}

override def filterNot(p: InternalMessage => Boolean): MessageQueue = {
val newMessageQueue = new SimpleMessageQueue
Expand Down Expand Up @@ -180,7 +182,7 @@ object MessageQueue {
messagesById += message.id -> message
val messageGroupId = getMessageGroupIdUnsafe(message)
val groupMessages = messagesbyMessageGroupId.getOrElseUpdate(messageGroupId, mutable.PriorityQueue.empty)
messagesbyMessageGroupId.put(messageGroupId, groupMessages += message)
val _ = messagesbyMessageGroupId.put(messageGroupId, groupMessages += message)
}

override def clear(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package org.elasticmq.actor.queue

import org.elasticmq.{FifoDeduplicationIdsHistory, QueueStatistics}
import org.elasticmq.actor.reply.ReplyAction
import org.elasticmq.actor.reply.valueToReplyWith
import org.elasticmq.actor.reply.{valueToReplyWith, ReplyAction}
import org.elasticmq.msg._
import org.elasticmq.util.Logging

Expand Down Expand Up @@ -54,9 +53,9 @@ trait QueueActorQueueOps extends Logging {
}

private def getQueueStatistics(deliveryTime: Long) = {
var visible = 0
var invisible = 0
var delayed = 0
var visible = 0L
var invisible = 0L
var delayed = 0L

messageQueue.all.foreach { internalMessage =>
if (internalMessage.nextDelivery < deliveryTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO

override def receiveAndReplyMessageMsg[T](msg: QueueMessageMsg[T]): ReplyAction[T] = {
msg match {
case SendMessage(message) =>
case SendMessage(_) =>
val result = super.receiveAndReplyMessageMsg(msg)
tryReply()
scheduleTryReplyWhenAvailable()
Expand All @@ -50,7 +50,7 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO
} else
result.send()

case uvm: UpdateVisibilityTimeout =>
case _: UpdateVisibilityTimeout =>
val result = super.receiveAndReplyMessageMsg(msg)
tryReply()
scheduleTryReplyWhenAvailable()
Expand Down Expand Up @@ -91,6 +91,7 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO

private def scheduleTimeoutReply(seq: Long, waitForMessages: Duration): Unit = {
schedule(waitForMessages.toMillis, ReplyIfTimeout(seq, Nil))
()
}

private def scheduleTryReplyWhenAvailable(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.elasticmq.actor.queue

import scala.collection.mutable

import org.elasticmq.actor.queue.ReceiveRequestAttemptCache.ReceiveFailure
import org.elasticmq.actor.queue.ReceiveRequestAttemptCache.ReceiveFailure.{Expired, Invalid}
import org.elasticmq.util.NowProvider

import scala.collection.mutable

class ReceiveRequestAttemptCache {

private val cache: mutable.Map[String, (Long, List[InternalMessage])] = mutable.Map.empty
Expand All @@ -15,7 +15,7 @@ class ReceiveRequestAttemptCache {

def add(attemptId: String, messages: List[InternalMessage])(implicit nowProvider: NowProvider): Unit = {
// Cache the given attempt
cache.put(attemptId, (nowProvider.nowMillis, messages))
val _ = cache.put(attemptId, (nowProvider.nowMillis, messages))

// Remove attempts that are older than fifteen minutes
// TODO: Rather than do this on every add, maybe this could be done periodically?
Expand All @@ -27,7 +27,7 @@ class ReceiveRequestAttemptCache {
): Either[ReceiveFailure, Option[List[InternalMessage]]] = {
cache.get(attemptId) match {
case Some((cacheTime, _)) if cacheTime + FIVE_MINUTES < nowProvider.nowMillis =>
cache.remove(attemptId)
val _ = cache.remove(attemptId)
Left(Expired)
case Some((_, messages)) if !allValid(messages, messageQueue) =>
Left(Invalid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package org.elasticmq.actor.reply

import org.apache.pekko.actor.Actor
import org.apache.pekko.actor.Status.Failure

import scala.reflect.ClassTag
import scala.language.higherKinds

trait ReplyingActor extends Actor {
type M[X] <: Replyable[X]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package org.elasticmq.persistence
import org.elasticmq.persistence.TopologicalSorter.Node
import org.elasticmq.util.Logging

import scala.collection.mutable
import scala.collection.immutable.ListMap

object QueueSorter extends Logging {

Expand All @@ -22,7 +22,7 @@ object QueueSorter extends Logging {
private def createReferencedQueuesEdges(
nodes: List[CreateQueueMetadata]
): Map[CreateQueueMetadata, Set[CreateQueueMetadata]] = {
val edges = new mutable.ListMap[CreateQueueMetadata, Set[CreateQueueMetadata]]
var edges = new ListMap[CreateQueueMetadata, Set[CreateQueueMetadata]]

// create map to look up queues by name
val queueMap = nodes.map { cq => cq.name -> cq }.toMap
Expand All @@ -43,7 +43,7 @@ object QueueSorter extends Logging {
queueMap.get(queueName) match {
case Some(queue) =>
val edgesForNode = edges.getOrElse(cq, Set.empty)
edges.put(cq, edgesForNode + queue)
edges = edges.updated(cq, edgesForNode + queue)
case None => logger.error("{} queue {} not found", label, queueName)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ package org.elasticmq.persistence.file

import org.apache.pekko.actor.{Actor, ActorRef}
import org.apache.pekko.util.Timeout
import org.elasticmq.{ElasticMQError, QueueData}
import org.elasticmq.actor.queue._
import org.elasticmq.actor.reply._
import org.elasticmq.msg.CreateQueue
import org.elasticmq.persistence.CreateQueueMetadata
import org.elasticmq.util.Logging
import org.elasticmq.{ElasticMQError, QueueData}

import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

class ConfigBasedQueuePersistenceActor(storagePath: String, baseQueues: List[CreateQueueMetadata])
Expand All @@ -25,15 +25,15 @@ class ConfigBasedQueuePersistenceActor(storagePath: String, baseQueues: List[Cre

def receive: Receive = {
case QueueEvent.QueueCreated(queue) =>
queues.put(queue.name, queue)
val _: Option[QueueData] = queues.put(queue.name, queue)
QueuePersister.saveToConfigFile(queues.values.toList, storagePath)

case QueueEvent.QueueDeleted(queueName) =>
queues.remove(queueName)
val _: Option[QueueData] = queues.remove(queueName)
QueuePersister.saveToConfigFile(queues.values.toList, storagePath)

case QueueEvent.QueueMetadataUpdated(queue) =>
queues.put(queue.name, queue)
val _: Option[QueueData] = queues.put(queue.name, queue)
QueuePersister.saveToConfigFile(queues.values.toList, storagePath)

case _: QueueEvent.MessageAdded | _: QueueEvent.MessageUpdated | _: QueueEvent.MessageRemoved =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object QueueConfigUtil extends Logging {
.map(readPersistedQueuesFromConfig)
.getOrElse(Nil)

def readPersistedQueuesFromConfig(persistedQueuesConfig: Config): List[CreateQueueMetadata] =
def readPersistedQueuesFromConfig(persistedQueuesConfig: Config): List[CreateQueueMetadata] = {
Try(
persistedQueuesConfig
.getObject("queues")
Expand All @@ -30,6 +30,7 @@ object QueueConfigUtil extends Logging {
throw new IllegalStateException(ex)
}
}
}

private def getQueuesFromConfig(queuesConfig: Map[String, ConfigValue]): List[CreateQueueMetadata] = {
Try(getQueuesFromConfigUnsafe(queuesConfig)) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object QueuePersister {

def saveToConfigFile(queues: List[QueueData], storagePath: String): Unit = {
val queuesConfig: String = prepareQueuesConfig(queues)
new PrintWriter(storagePath) {
val _: PrintWriter = new PrintWriter(storagePath) {
write(queuesConfig)
close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ case class DBMessage(
isFifo = false,
groupId,
deduplicationId.map(id => DeduplicationId(id)),
tracingId.map(TracingId),
tracingId.map(TracingId.apply),
sequenceNumber
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class MessageRepository(queueName: String, db: DB) extends Logging {

if (db.persistenceConfig.pruneDataOnInit) {
logger.debug(s"Deleting stored messages for queue $queueName")
sql"drop table if exists $tableName".execute.apply()
val _: Boolean = sql"drop table if exists $tableName".execute.apply()
}

sql"""
Expand All @@ -34,7 +34,7 @@ class MessageRepository(queueName: String, db: DB) extends Logging {
)""".execute.apply()

def drop(): Unit = {
sql"drop table if exists $tableName".execute.apply()
val _: Boolean = sql"drop table if exists $tableName".execute.apply()
}

def findAll(): List[InternalMessage] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class QueueRepository(db: DB) extends Logging {

if (db.persistenceConfig.pruneDataOnInit) {
logger.debug(s"Deleting stored queues")
sql"drop table if exists $tableName".execute.apply()
val _: Boolean = sql"drop table if exists $tableName".execute.apply()
}

sql"""
Expand All @@ -22,7 +22,7 @@ class QueueRepository(db: DB) extends Logging {
)""".execute.apply()

def drop(): Unit = {
sql"drop table if exists $tableName".execute.apply()
val _: Boolean = sql"drop table if exists $tableName".execute.apply()
}

def findAll(): List[CreateQueueMetadata] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import org.elasticmq.persistence.CreateQueueMetadata
import org.elasticmq.util.Logging

import scala.collection.mutable
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}

case class GetAllMessages(queueName: String) extends Replyable[List[InternalMessage]]
Expand All @@ -34,22 +34,24 @@ class SqlQueuePersistenceActor(
logger.debug(s"Storing queue data: $queueData")

if (repos.contains(queueData.name)) {
queueRepo.update(CreateQueueMetadata.from(queueData))
val _: Int = queueRepo.update(CreateQueueMetadata.from(queueData))
} else {
queueRepo.add(CreateQueueMetadata.from(queueData))
repos.put(queueData.name, new MessageRepository(queueData.name, db))
()
}

case QueueEvent.QueueDeleted(queueName) =>
logger.debug(s"Removing queue data for queue $queueName")
queueRepo.remove(queueName)
val _: Int = queueRepo.remove(queueName)
repos.remove(queueName).foreach(_.drop())

case QueueEvent.QueueMetadataUpdated(queueData) =>
logger.whenDebugEnabled {
logger.debug(s"Updating queue: $queueData")
}
queueRepo.update(CreateQueueMetadata.from(queueData))
()

case QueueEvent.MessageAdded(queueName, message) =>
logger.whenDebugEnabled {
Expand Down Expand Up @@ -85,7 +87,7 @@ class SqlQueuePersistenceActor(

private def createQueues(
queueManagerActor: ActorRef
)(implicit timeout: Timeout): Future[Either[List[ElasticMQError], OperationStatus]] = {
): Future[Either[List[ElasticMQError], OperationStatus]] = {
val persistedQueues = queueRepo.findAll()
val allQueues = CreateQueueMetadata.mergePersistedAndBaseQueues(persistedQueues, baseQueues)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
package org.elasticmq.persistence
package org.elasticmq.persistence.sql
import org.elasticmq.persistence.{CreateQueueMetadata, DeadLettersQueue}
import spray.json.{DefaultJsonProtocol, JsonFormat}

package object sql {
case class SerializableAttribute(
key: String,
primaryDataType: String,
stringValue: String,
customType: Option[String]
)

case class SerializableAttribute(
key: String,
primaryDataType: String,
stringValue: String,
customType: Option[String]
)

object SerializableAttributeProtocol extends DefaultJsonProtocol {
implicit val colorFormat: JsonFormat[SerializableAttribute] = jsonFormat4(SerializableAttribute)
}
object SerializableAttributeProtocol extends DefaultJsonProtocol {
implicit val colorFormat: JsonFormat[SerializableAttribute] = jsonFormat4(SerializableAttribute.apply)
}

object DeadLettersQueueProtocol extends DefaultJsonProtocol {
implicit val DeadLettersQueueFormat: JsonFormat[DeadLettersQueue] = jsonFormat2(DeadLettersQueue)
}
object DeadLettersQueueProtocol extends DefaultJsonProtocol {
implicit val DeadLettersQueueFormat: JsonFormat[DeadLettersQueue] = jsonFormat2(DeadLettersQueue.apply)
}

import DeadLettersQueueProtocol._
import org.elasticmq.persistence.sql.DeadLettersQueueProtocol._

object CreateQueueProtocol extends DefaultJsonProtocol {
implicit val CreateQueueFormat: JsonFormat[CreateQueueMetadata] = jsonFormat12(CreateQueueMetadata.apply)
}
object CreateQueueProtocol extends DefaultJsonProtocol {
implicit val CreateQueueFormat: JsonFormat[CreateQueueMetadata] = jsonFormat12(CreateQueueMetadata.apply)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.elasticmq.rest.sqs

import org.elasticmq.{NumberMessageAttribute, BinaryMessageAttribute, StringMessageAttribute, MessageAttribute}
import org.elasticmq.{BinaryMessageAttribute, MessageAttribute, NumberMessageAttribute, StringMessageAttribute}

trait AttributesModule {
val attributeNamesReader = new AttributeNamesReader
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package org.elasticmq.rest.sqs

import Constants.{EmptyRequestId, QueueUrlParameter}
import org.elasticmq.rest.sqs.Constants.QueueUrlParameter
import org.elasticmq.Limits

import java.util.regex.Pattern
import spray.json.DefaultJsonProtocol._
import spray.json.{JsonFormat, RootJsonFormat}
import spray.json.DefaultJsonProtocol._

import java.util.regex.Pattern
import scala.concurrent.Future
import scala.xml.Elem

Expand Down Expand Up @@ -40,7 +39,7 @@ trait BatchRequestsModule {
Future
.sequence(result)
.map(
_.foldLeft(Option.empty[List[Failed]], List.empty[R]) {
_.foldLeft((Option.empty[List[Failed]], List.empty[R])) {
case ((failures, successes), Left(failed)) =>
(failures.map(_ :+ failed).orElse(Some(List(failed))), successes)
case ((failures, successes), Right(success)) => (failures, successes :+ success)
Expand Down
Loading