Skip to content
Permalink
Browse files

Adding JMSRetry Router and specs

  • Loading branch information...
atooni committed Apr 22, 2019
1 parent 0e52cc0 commit 3ff0bc081ef4fead39f41e9c02ea691408db6c37
@@ -78,8 +78,6 @@ class JmsStreamBuilder(
.withDestinationResolver(destResolver)
.withDeliveryMode(JmsDeliveryMode.Persistent)



private val internalProvider : Try[BridgeProviderConfig] = cfg.registry.internalProvider
private val internalId = (internalProvider.get.vendor, internalProvider.get.provider)

@@ -0,0 +1,75 @@
package blended.streams.jms

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import blended.jms.utils.{IdAwareConnectionFactory, JmsDestination}
import blended.streams.{StreamController, StreamControllerConfig}
import blended.streams.message.FlowEnvelope
import blended.streams.processor.AckProcessor
import blended.streams.transaction.FlowHeaderConfig
import blended.util.logging.Logger

import scala.concurrent.duration._

case class JmsRetryConfig(
cf : IdAwareConnectionFactory,
headerCfg : FlowHeaderConfig,
retryDestName : String,
failedDestName : String,
retryInterval : FiniteDuration,
maxRetries : Long = -1,
retryTimeout : FiniteDuration = 1.day
)

object JmsRetryProcessor {
def apply(name : String, cfg : JmsRetryConfig)(
implicit system : ActorSystem, materializer : Materializer
): JmsRetryProcessor = new JmsRetryProcessor(name, cfg)
}

class JmsRetryProcessor(name : String, cfg : JmsRetryConfig)(
implicit system : ActorSystem, materializer : Materializer
) extends JmsStreamSupport {

private[this] val retryLog : Logger = Logger(cfg.headerCfg.prefix + ".retry." + cfg.retryDestName)
private[this] val log : Logger = Logger[JmsRetryProcessor]

private[this] val retrySource : Source[FlowEnvelope, NotUsed] = {
val settings = JMSConsumerSettings(
log = retryLog,
connectionFactory = cfg.cf,
acknowledgeMode = AcknowledgeMode.ClientAcknowledge,
jmsDestination = Some(JmsDestination.create(cfg.retryDestName).get)
)

jmsConsumer(
name = settings.jmsDestination.get.asString,
settings = settings,
headerConfig = cfg.headerCfg,
minMessageDelay = Some(cfg.retryInterval)
)
}

def start() : Unit = {

log.info(s"Starting Jms Retry processor for [${cfg.retryDestName}] with retry interval [${cfg.retryInterval}]")

val src : Source[FlowEnvelope, NotUsed] = retrySource
.via(new AckProcessor(name + ".ack").flow)

// TODO: Load from config
val streamCfg : StreamControllerConfig = StreamControllerConfig(
name = name,
source = src,
minDelay = 10.seconds,
maxDelay = 3.minutes,
exponential = true,
onFailureOnly = true,
random = 0.2
)

system.actorOf(StreamController.props(streamCfg))
}
}
@@ -0,0 +1,50 @@
package blended.streams.jms

import java.text.SimpleDateFormat
import java.util.Date

import blended.streams.FlowProcessor
import blended.streams.transaction.FlowHeaderConfig

import scala.util.Try

class RetryCountExceededException(n : Long)
extends Exception(s"Maximum Retry [$n] count exceeded")

class RetryTimeoutException(t : Long)
extends Exception(s"Retry timeout [${new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss:SSS").format(new Date(t))}] exceeded")

class MissingRetryDestinationException(d : String)
extends Exception(s"The retry destination header [$d] is missing in the message.")

class JmsRetryRouter(
retryCfg : JmsRetryConfig
) {

private[this] val headerCfg : FlowHeaderConfig = retryCfg.headerCfg

val resolve : FlowProcessor.IntegrationStep = env => Try {

val maxRetries : Long = env.header[Long](headerCfg.headerMaxRetries).getOrElse(retryCfg.maxRetries)
val retryCount : Long = env.header[Long](headerCfg.headerRetryCount).getOrElse(0L) + 1
val retryTimeout : Long = env.header[Long](headerCfg.headerRetryTimeout).getOrElse(retryCfg.retryTimeout.toMillis)
val firstRetry : Long = env.header[Long](headerCfg.headerFirstRetry).getOrElse(System.currentTimeMillis())

if (maxRetries > 0 && retryCount > maxRetries) {
throw new RetryCountExceededException(maxRetries)
}

if (System.currentTimeMillis() - firstRetry > retryTimeout) {
throw new RetryTimeoutException(firstRetry + retryTimeout)
}

if (env.header[String](headerCfg.headerRetryDestination).isEmpty) {
throw new MissingRetryDestinationException(headerCfg.headerRetryDestination)
}

env
.withHeader(headerCfg.headerMaxRetries, maxRetries).get
.withHeader(headerCfg.headerRetryCount, retryCount).get
.withHeader(headerCfg.headerFirstRetry, firstRetry).get
}
}
@@ -20,8 +20,8 @@ import scala.util.Try
trait JmsStreamSupport {
/**
* Process a sequence of messages with a given flow. If any of the processed
* messages cause an exception in the provided flow, the Stream will terminate
* with this exeption been thrown.
* messages cause an exception in the provided flow, the Stream will terminated
* with this exception been thrown.
*
* The resulting stream will expose a killswitch, so that it stays
* open and the test code needs to tear it down eventually.
@@ -19,6 +19,22 @@ object FlowHeaderConfig {
private val statePath = "transactionState"
private val trackTransactionPath = "trackTransaction"
private val trackSourcePath = "trackSource"
private val retryCountPath = "retryCount"
private val maxRetriesPath = "maxRetries"
private val retryTimeoutPath = "retryTimeout"
private val retryDestPath = "retryDestination"
private val firstRetryPath = "firstRetry"

private val transId = "TransactionId"
private val branchId = "BranchId"
private val transState = "TransactionState"
private val trackTrans = "TrackTransaction"
private val trackSource = "TrackSource"
private val retryCount = "RetryCount"
private val maxRetries = "MaxRetries"
private val retryTimeout = "RetryTimeout"
private val retryDest = "RetryDestination"
private val firstRetry = "FirstRetry"

val headerConfigPath : String = "blended.flow.header"
val header : String => String => String = prefix => name => prefix + name
@@ -27,21 +43,46 @@ object FlowHeaderConfig {
idSvc.containerContext.getContainerConfig().getConfig(FlowHeaderConfig.headerConfigPath)
)

def create(prefix : String) : FlowHeaderConfig = FlowHeaderConfig(
prefix = prefix,
headerTrans = header(prefix)(transId),
headerBranch = header(prefix)(branchId),
headerState = header(prefix)(transState),
headerTrack = header(prefix)(trackTrans),
headerTrackSource = header(prefix)(trackSource),
headerRetryCount = header(prefix)(retryCount),
headerMaxRetries = header(prefix)(maxRetries),
headerRetryTimeout = header(prefix)(retryTimeout),
headerRetryDestination = header(prefix)(retryDest),
headerFirstRetry = header(prefix)(firstRetry)
)

def create(cfg: Config): FlowHeaderConfig = {

val prefix = cfg.getString(prefixPath, "Blended")
val headerTrans = cfg.getString(transIdPath, "TransactionId")
val headerBranch = cfg.getString(branchIdPath, "BranchId")
val headerState = cfg.getString(statePath, "TransactionState")
val headerTrack = cfg.getString(trackTransactionPath, "TrackTransaction")
val headerTrackSource = cfg.getString(trackSourcePath, "TrackSource")
val headerTrans = cfg.getString(transIdPath, transId)
val headerBranch = cfg.getString(branchIdPath, branchId)
val headerState = cfg.getString(statePath, transState)
val headerTrack = cfg.getString(trackTransactionPath, trackTrans)
val headerTrackSource = cfg.getString(trackSourcePath, trackSource)
val headerRetryCount = cfg.getString(retryCountPath, retryCount)
val headerMaxRetries = cfg.getString(maxRetriesPath, maxRetries)
val headerRetryTimeout = cfg.getString(retryTimeoutPath, retryTimeout)
val headerRetryDest = cfg.getString(retryDestPath, retryDest)
val headerFirstRetry = cfg.getString(firstRetryPath, firstRetry)

FlowHeaderConfig(
prefix = prefix,
headerTrans = header(prefix)(headerTrans),
headerBranch = header(prefix)(headerBranch),
headerState = header(prefix)(headerState),
headerTrack = header(prefix)(headerTrack),
headerTrackSource = header(prefix)(headerTrackSource)
headerTrackSource = header(prefix)(headerTrackSource),
headerRetryCount = header(prefix)(headerRetryCount),
headerMaxRetries = header(prefix)(headerMaxRetries),
headerRetryTimeout = header(prefix)(headerRetryTimeout),
headerRetryDestination = header(prefix)(headerRetryDest),
headerFirstRetry = header(prefix)(headerFirstRetry)
)
}
}
@@ -52,7 +93,12 @@ case class FlowHeaderConfig(
headerBranch : String = "BranchId",
headerState : String = "TransactionState",
headerTrack : String = "TrackTransaction",
headerTrackSource : String = "TrackSource"
headerTrackSource : String = "TrackSource",
headerRetryCount : String = "RetryCount",
headerMaxRetries : String = "MaxRetries",
headerRetryTimeout : String = "RetryTimeout",
headerRetryDestination : String = "RetryDestination",
headerFirstRetry : String = "FirstRetry"
)

object FlowTransactionEvent {
@@ -0,0 +1,150 @@
package blended.streams.jms

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.{ActorMaterializer, Materializer}
import akka.testkit.TestKit
import blended.jms.utils.{IdAwareConnectionFactory, JmsDestination, SimpleIdAwareConnectionFactory}
import blended.streams.StreamFactories
import blended.streams.message.FlowEnvelope
import blended.streams.processor.Collector
import blended.streams.transaction.FlowHeaderConfig
import blended.testsupport.RequiresForkedJVM
import blended.testsupport.scalatest.LoggingFreeSpecLike
import blended.util.logging.Logger
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.broker.BrokerService
import org.apache.activemq.store.memory.MemoryPersistenceAdapter
import org.scalatest.{BeforeAndAfterAll, Matchers}

import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

@RequiresForkedJVM
class JmsRetryProcessorSpec extends TestKit(ActorSystem("JmsRetrySpec"))
with LoggingFreeSpecLike
with Matchers
with JmsStreamSupport
with BeforeAndAfterAll {

private val brokerName : String = "retry"
private val consumerCount : Int = 5
private val headerCfg : FlowHeaderConfig = FlowHeaderConfig(prefix = "spec")

private lazy val amqCf : IdAwareConnectionFactory = SimpleIdAwareConnectionFactory(
vendor = "amq",
provider = "amq",
clientId = "spec",
cf = new ActiveMQConnectionFactory(s"vm://$brokerName?create=false&jms.prefetchPolicy.queuePrefetch=10")
)

private val broker : BrokerService = {

val b = new BrokerService()
b.setBrokerName(brokerName)
b.setPersistent(false)
b.setUseJmx(false)
b.setPersistenceAdapter(new MemoryPersistenceAdapter)
b.setDedicatedTaskRunner(true)

b.start()
b.waitUntilStarted()

b
}

private implicit val actorSystem : ActorSystem = system
private implicit val materializer : Materializer = ActorMaterializer()
private implicit val eCtxt : ExecutionContext = system.dispatcher

private val log : Logger = Logger[JmsRetryProcessorSpec]

override protected def afterAll(): Unit = {
broker.stop()
broker.waitUntilStopped()
system.terminate()
}

private val consumerSettings : String => JMSConsumerSettings = destName => {

val dest = JmsDestination.create(destName).get

JMSConsumerSettings(
log = log,
connectionFactory = amqCf,
jmsDestination = Some(dest),
sessionCount = consumerCount,
acknowledgeMode = AcknowledgeMode.ClientAcknowledge
)
}

private val producerSettings : String => JmsProducerSettings = destName => JmsProducerSettings(
log = log,
connectionFactory = amqCf,
jmsDestination = Some(JmsDestination.create(destName).get)
)

private val jmsConsumer : JMSConsumerSettings => Option[FiniteDuration] => Source[FlowEnvelope, NotUsed] =
cSettings => minMessageDelay => restartableConsumer(
name = "retry",
settings = cSettings,
headerConfig = headerCfg,
minMessageDelay = minMessageDelay
).via(Flow.fromFunction{env =>
env.acknowledge()
env
})

"The Jms Retry Processor should" - {

"Consume messages from the retry destination and reinsert them into the original destination" in {

val srcQueue : String = "myQueue"

val retryCfg : JmsRetryConfig = JmsRetryConfig(
cf = amqCf,
retryDestName = "retryQueue",
failedDestName = "retryFailed",
retryInterval = 3.seconds,
headerCfg = headerCfg
)

val retryProcessor = JmsRetryProcessor("spec", retryCfg)
retryProcessor.start()

val retryMsg : FlowEnvelope = FlowEnvelope()
.withHeader(headerCfg.headerRetryDestination, srcQueue).get

sendMessages(producerSettings(retryCfg.retryDestName), log, Seq(retryMsg):_*) match {
case Success(s) =>
val coll : Collector[FlowEnvelope] = StreamFactories.runSourceWithTimeLimit(
name = "retryConsumer",
source = jmsConsumer(consumerSettings(srcQueue))(None),
timeout = retryCfg.retryInterval + 500.millis
)(e => e.acknowledge())

s.shutdown()

val result : List[FlowEnvelope] = Await.result(coll.result, retryCfg.retryInterval + 1.second)
result should have size (1)

case Failure(t) => fail(t)
}
}

"Consume messages from the retry destination and pass them to the retry failed destination if the retry cont exceeds" in {
pending
}

"Consume messages from the retry destination and pass them to the retry failed destination if the retry timeout exceeds" in {
pending
}

"Consume messages from the retry destination and pass them to the retry failed destination if no original destination is known" in {
pending
}

}
}
Oops, something went wrong.

0 comments on commit 3ff0bc0

Please sign in to comment.
You can’t perform that action at this time.