Skip to content
Permalink
Browse files

Reviewing scheduling for retry mechanism

  • Loading branch information
atooni committed Mar 17, 2020
1 parent 085205e commit 86ee4520ff98c2d5bd20ccbfa0f61fc99331f9b8
Showing with 12 additions and 2 deletions.
  1. +12 −2 blended.streams/src/main/scala/blended/streams/jms/JmsConsumerStage.scala
@@ -100,6 +100,8 @@ final class JmsConsumerStage(
private[this] def addConsumer(s : String, c : MessageConsumer) : Unit = {
consumer.put(s, c)
consumerSettings.log.underlying.debug(s"Jms Consumer count of [$name] is [${consumer.size}]")
nextPollRelative = None
pollImmediately.invoke()
}

private[this] def removeConsumer(s : String) : Unit = {
@@ -151,8 +153,16 @@ final class JmsConsumerStage(
1.to(consumerSettings.sessionCount).map(i => s"$id-$i").toList

private var nextPollRelative : Option[FiniteDuration] = None
override protected def nextPoll(): Option[FiniteDuration] =
Some(nextPollRelative.getOrElse(consumerSettings.pollInterval))
override protected def nextPoll(): Option[FiniteDuration] = {
nextPollRelative match {
case None =>
Some(consumerSettings.pollInterval)
case Some(npr) =>
nextPollRelative = None
consumerSettings.log.underlying.debug(s"Overriding next poll interval with [$npr]")
Some(npr)
}
}

private def receive(session : JmsSession) : Try[Option[Message]] = Try {

0 comments on commit 86ee452

Please sign in to comment.
You can’t perform that action at this time.