Skip to content

Commit

Permalink
Closes #65: When there are messages in the queue which will become av…
Browse files Browse the repository at this point in the history
…ailable, and clients awaiting for messages, scheduling a try-reply
  • Loading branch information
adamw committed Mar 23, 2016
1 parent 52d898d commit 3a6fd86
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import org.joda.time.DateTime
import org.elasticmq.util.Logging
import org.elasticmq.util.NowProvider
import org.elasticmq.OnDateTimeReceived
import scala.Some
import org.elasticmq.NewMessageData
import org.elasticmq.msg.DeleteMessage
import org.elasticmq.MessageId
Expand Down Expand Up @@ -94,12 +93,12 @@ trait QueueActorMessageOps extends Logging {

@tailrec
private def receiveMessage(deliveryTime: Long, newNextDelivery: MillisNextDelivery): Option[MessageData] = {
if (messageQueue.size == 0) {
if (messageQueue.isEmpty) {
None
} else {
val internalMessage = messageQueue.dequeue()
val id = MessageId(internalMessage.id)
if (internalMessage.nextDelivery > deliveryTime) {
if (!internalMessage.deliverable(deliveryTime)) {
// Putting the msg back. That's the youngest msg, so there is no msg that can be received.
messageQueue += internalMessage
None
Expand Down Expand Up @@ -136,7 +135,7 @@ trait QueueActorMessageOps extends Logging {
val msgId = deliveryReceipt.extractId.toString

messagesById.get(msgId).foreach { msgData =>
if (msgData.deliveryReceipt == Some(deliveryReceipt.receipt)) {
if (msgData.deliveryReceipt.contains(deliveryReceipt.receipt)) {
// Just removing the msg from the map. The msg will be removed from the queue when trying to receive it.
messagesById.remove(msgId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ trait QueueActorStorage {
MillisNextDelivery(nextDelivery),
created,
MessageStatistics(firstReceive, receiveCount))

def deliverable(deliveryTime: Long): Boolean = nextDelivery <= deliveryTime
}

object InternalMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO
this: QueueActorStorage =>

private var senderSequence = 0L
private var tryReplyScheduled = false
private val awaitingReply = new collection.mutable.HashMap[Long, AwaitingData]()

override def receive = super.receive orElse {
Expand All @@ -20,24 +21,30 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO
originalSender ! replyWith
}
}

case TryReply =>
tryReply()
tryReplyScheduled = false
}

override def receiveAndReplyMessageMsg[T](msg: QueueMessageMsg[T]): ReplyAction[T] = msg match {
case SendMessage(message) => {
case SendMessage(message) =>
val result = super.receiveAndReplyMessageMsg(msg)
tryReply()
scheduleTryReplyWhenAvailable()
result
}
case rm@ReceiveMessages(visibilityTimeout, count, waitForMessagesOpt) => {

case rm@ReceiveMessages(visibilityTimeout, count, waitForMessagesOpt) =>
val result = super.receiveAndReplyMessageMsg(msg)
val waitForMessages = waitForMessagesOpt.getOrElse(queueData.receiveMessageWait)
if (result == ReplyWith(Nil) && waitForMessages.getMillis > 0) {
val seq = assignSequenceFor(rm)
logger.debug(s"${queueData.name}: Awaiting messages: start for sequence $seq.")
scheduleTimeoutReply(seq, waitForMessages)
scheduleTryReplyWhenAvailable()
DoNotReply()
} else result
}

case _ => super.receiveAndReplyMessageMsg(msg)
}

Expand Down Expand Up @@ -67,14 +74,45 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO
}

private def scheduleTimeoutReply(seq: Long, waitForMessages: Duration) {
schedule(waitForMessages.getMillis, ReplyIfTimeout(seq, Nil))
}

private def scheduleTryReplyWhenAvailable(): Unit = {
@tailrec def dequeueUntilDeleted(): Unit = {
messageQueue.headOption match {
case Some(msg) if !messagesById.contains(msg.id) =>
messageQueue.dequeue()
dequeueUntilDeleted()
case _ => // stop
}
}

if (!tryReplyScheduled && awaitingReply.nonEmpty) {
dequeueUntilDeleted()

val deliveryTime = nowProvider.nowMillis

messageQueue.headOption match {
case Some(msg) if !msg.deliverable(deliveryTime) =>
schedule(msg.nextDelivery - deliveryTime + 1, TryReply)
tryReplyScheduled = true

case _ => // there are deliverable messages right now, no need to schedule a try-reply
}
}
}

private def schedule(afterMillis: Long, msg: Any): Unit = {
import context.dispatcher
context.system.scheduler.scheduleOnce(
scd.Duration(waitForMessages.getMillis, scd.MILLISECONDS),
scd.Duration(afterMillis, scd.MILLISECONDS),
self,
ReplyIfTimeout(seq, Nil))
msg)
}

case class ReplyIfTimeout(seq: Long, replyWith: AnyRef)

case class AwaitingData(originalSender: ActorRef, originalReceiveMessages: ReceiveMessages, waitStart: Long)

case object TryReply
}
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,19 @@ class AmazonJavaSdkTestSuite extends FunSuite with MustMatchers with BeforeAndAf
attributes.get("ApproximateNumberOfMessagesDelayed") must be ("0")
}

test("should receive delayed messages when waiting for messages") {
// Given
val queueUrl = client.createQueue(new CreateQueueRequest("testQueue1")).getQueueUrl

// When
client.sendMessage(new SendMessageRequest(queueUrl, "Message 1").withDelaySeconds(2))
val messages = client.receiveMessage(new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(4))
.getMessages

// Then
messages.size must be (1)
}

def queueVisibilityTimeout(queueUrl: String) = getQueueLongAttribute(queueUrl, visibilityTimeoutAttribute)

def queueDelay(queueUrl: String) = getQueueLongAttribute(queueUrl, delaySecondsAttribute)
Expand Down

0 comments on commit 3a6fd86

Please sign in to comment.