Skip to content
Permalink
Browse files

Finalizing JmsRetry Processor and specs

  • Loading branch information...
atooni committed Apr 23, 2019
1 parent f5fbd28 commit 9c3fa724d7fcc1526dadd1e083a4c7143adbf66f
@@ -14,7 +14,7 @@ import akka.stream.scaladsl.GraphDSL.Implicits._
import javax.jms.Session

import scala.concurrent.duration._
import scala.util.Try
import scala.util.{Failure, Success, Try}

case class JmsRetryConfig(
cf : IdAwareConnectionFactory,
@@ -35,18 +35,25 @@ class JmsRetryProcessor(name : String, retryCfg : JmsRetryConfig)(

private[this] var actor : Option[ActorRef] = None

private[this] val router = new JmsRetryRouter("route", retryCfg, retryLog)

class RetryDestinationResolver(
override val headerConfig : FlowHeaderConfig,
override val settings : JmsProducerSettings
override val settings : JmsProducerSettings,
val validator : FlowProcessor.IntegrationStep
) extends FlowHeaderConfigAware with JmsEnvelopeHeader {

override def sendParameter(session: Session, env: FlowEnvelope): Try[JmsSendParameter] = Try {

val dest : JmsDestination = env.exception match {
case None =>
JmsDestination.create(env.headerWithDefault[String](headerConfig.headerRetryDestination, retryCfg.failedDestName)).get

case Some(e) =>
JmsDestination.create(retryCfg.failedDestName).get
validator(env) match {
case Success(_) => JmsDestination.create(retryCfg.retryDestName).get
case Failure(_) => JmsDestination.create(retryCfg.failedDestName).get
}
}

JmsSendParameter(
@@ -75,16 +82,11 @@ class JmsRetryProcessor(name : String, retryCfg : JmsRetryConfig)(
)
}

protected def routeRetry : Graph[FlowShape[FlowEnvelope, FlowEnvelope], NotUsed] = {
val router = new JmsRetryRouter("route", retryCfg, retryLog)
router.flow
}

protected def sendToOriginal : Flow[FlowEnvelope, FlowEnvelope, NotUsed] = {
protected def resendMessage : Flow[FlowEnvelope, FlowEnvelope, NotUsed] = {
val producerSettings : JmsProducerSettings = JmsProducerSettings(
log = retryLog,
connectionFactory = retryCfg.cf,
destinationResolver = s => new RetryDestinationResolver(retryCfg.headerCfg, s),
destinationResolver = s => new RetryDestinationResolver(retryCfg.headerCfg, s, router.validate),
deliveryMode = JmsDeliveryMode.Persistent,
timeToLive = None,
clearPreviousException = true
@@ -97,31 +99,16 @@ class JmsRetryProcessor(name : String, retryCfg : JmsRetryConfig)(
)
}

protected def sendToRetry : Flow[FlowEnvelope, FlowEnvelope, NotUsed] = {

val producerSettings : JmsProducerSettings = JmsProducerSettings(
log = retryLog,
connectionFactory = retryCfg.cf,
destinationResolver = s => new SettingsDestinationResolver(s),
jmsDestination = Some(JmsDestination.create(retryCfg.retryDestName).get),
deliveryMode = JmsDeliveryMode.Persistent,
timeToLive = None,
clearPreviousException = true
)
protected def sendToOriginal : Flow[FlowEnvelope, FlowEnvelope, NotUsed] = resendMessage

jmsProducer(
name = name + ".retrySend",
settings = producerSettings,
autoAck = false
)
}
protected def sendToRetry : Flow[FlowEnvelope, FlowEnvelope, NotUsed] = resendMessage

protected def retryGraph : Graph[FlowShape[FlowEnvelope, FlowEnvelope], NotUsed] = {

GraphDSL.create() { implicit b =>

// determine the retry routing parameters from the message
val route = b.add(routeRetry)
val route = b.add(router.flow)

val routeSend = b.add(sendToOriginal)

@@ -2,8 +2,8 @@ package blended.streams.jms

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Flow
import akka.stream.{ActorMaterializer, Materializer}
import akka.testkit.TestKit
import blended.jms.utils.{IdAwareConnectionFactory, JmsDestination, SimpleIdAwareConnectionFactory}
import blended.streams.FlowProcessor
@@ -23,25 +23,47 @@ import scala.concurrent.{Await, ExecutionContext}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

@RequiresForkedJVM
class JmsRetryProcessorSpec extends TestKit(ActorSystem("JmsRetrySpec"))
abstract class ProcessorSpecSupport(name: String) extends TestKit(ActorSystem(name))
with LoggingFreeSpecLike
with Matchers
with JmsStreamSupport
with BeforeAndAfterAll {

private val brokerName : String = "retry"
private val consumerCount : Int = 5
private val headerCfg : FlowHeaderConfig = FlowHeaderConfig.create(prefix = "spec")
lazy implicit val actorSystem : ActorSystem = system
lazy implicit val materializer = ActorMaterializer()
lazy implicit val eCtxt : ExecutionContext = actorSystem.dispatcher

private lazy val amqCf : IdAwareConnectionFactory = SimpleIdAwareConnectionFactory(
vendor = "amq",
provider = "amq",
clientId = "spec",
cf = new ActiveMQConnectionFactory(s"vm://$brokerName?create=false&jms.prefetchPolicy.queuePrefetch=10")
)
val log : Logger
val prefix : String = "spec"

val brokerName : String = "retry"
val consumerCount : Int = 5
val headerCfg : FlowHeaderConfig = FlowHeaderConfig.create(prefix = prefix)

private var brokerSvc : Option[BrokerService] = None

val amqCf : IdAwareConnectionFactory = {
val foo = broker

private val broker : BrokerService = {
SimpleIdAwareConnectionFactory(
vendor = "amq",
provider = "amq",
clientId = "spec",
cf = new ActiveMQConnectionFactory(s"vm://$brokerName?create=false&jms.prefetchPolicy.queuePrefetch=10")
)
}

def broker : BrokerService = {
brokerSvc match {
case Some(s) => s
case None =>
val b = createBroker
brokerSvc = Some(b)
b
}
}

private def createBroker : BrokerService = {

val b = new BrokerService()
b.setBrokerName(brokerName)
@@ -56,25 +78,26 @@ class JmsRetryProcessorSpec extends TestKit(ActorSystem("JmsRetrySpec"))
b
}

private implicit val actorSystem : ActorSystem = system
private implicit val materializer : Materializer = ActorMaterializer()
private implicit val eCtxt : ExecutionContext = system.dispatcher

private val log : Logger = Logger[JmsRetryProcessorSpec]
override protected def beforeAll(): Unit = {
createBroker
}

override protected def afterAll(): Unit = {
broker.stop()
broker.waitUntilStopped()
system.terminate()
brokerSvc.foreach { b =>
b.stop()
b.waitUntilStopped()
}
actorSystem.terminate()
}

private val producerSettings : String => JmsProducerSettings = destName => JmsProducerSettings(
def producerSettings : String => JmsProducerSettings = destName => JmsProducerSettings(
log = log,
connectionFactory = amqCf,
jmsDestination = Some(JmsDestination.create(destName).get)
)

private val retryCfg : JmsRetryConfig = JmsRetryConfig(
def retryCfg : JmsRetryConfig = JmsRetryConfig(
cf = amqCf,
retryDestName = "retryQueue",
failedDestName = "retryFailed",
@@ -84,14 +107,18 @@ class JmsRetryProcessorSpec extends TestKit(ActorSystem("JmsRetrySpec"))
headerCfg = headerCfg
)

def withExpectedDestination(destName : String, retryProcessor : JmsRetryProcessor)(env : FlowEnvelope): Try[List[FlowEnvelope]] = try {
def withExpectedDestination(
destName : String,
retryProcessor : JmsRetryProcessor,
timeout : FiniteDuration = retryCfg.retryInterval + 500.millis
)(env : FlowEnvelope): Try[List[FlowEnvelope]] = try {

log.info("Starting Retry Processor ...")
retryProcessor.start()

sendMessages(producerSettings(retryCfg.retryDestName), log, Seq(env):_*) match {
case Success(s) =>
val msgs : List[FlowEnvelope] = consumeMessages(destName)(retryCfg.retryInterval).get
val msgs : List[FlowEnvelope] = consumeMessages(destName)(timeout).get
s.shutdown()
Success(msgs)

@@ -117,46 +144,126 @@ class JmsRetryProcessorSpec extends TestKit(ActorSystem("JmsRetrySpec"))

Await.result(coll.result, timeout + 100.millis)
}
}

"The Jms Retry Processor should" - {
@RequiresForkedJVM
class JmsRetryProcessorForwardSpec extends ProcessorSpecSupport("retryForward") {

override val log: Logger = Logger[JmsRetryProcessorForwardSpec]

"Consume messages from the retry destination and reinsert them into the original destination" in {

val srcQueue : String = "myQueue"

val retryMsg : FlowEnvelope = FlowEnvelope()
.withHeader(headerCfg.headerRetryDestination, srcQueue).get

withExpectedDestination(srcQueue, new JmsRetryProcessor("spec", retryCfg))(retryMsg).get.headOption match {
case None =>
fail("Expected message in original JMS destination")
case Some(env) =>
env.header[Long](headerCfg.headerRetryCount) should be (Some(1))
}
}
}

@RequiresForkedJVM
class JmsRetryProcessorRetryCountSpec extends ProcessorSpecSupport("retryCount") {

override val log: Logger = Logger[JmsRetryProcessorRetryCountSpec]


"Consume messages from the retry destination and pass them to the retry failed destination if the retry cont exceeds" in {
val srcQueue : String = "myQueue"

val retryMsg : FlowEnvelope = FlowEnvelope()
.withHeader(headerCfg.headerRetryDestination, srcQueue).get
.withHeader(headerCfg.headerRetryCount, retryCfg.maxRetries).get

withExpectedDestination(retryCfg.failedDestName, new JmsRetryProcessor("spec", retryCfg))(retryMsg).get.headOption match {
case None => fail(s"Expected message in [${retryCfg.failedDestName}]")
case Some(env) =>
env.header[Long](headerCfg.headerRetryCount) should be (Some(retryCfg.maxRetries + 1))
}
}
}

@RequiresForkedJVM
class JmsRetryProcessorRetryTimeoutSpec extends ProcessorSpecSupport("retryTimeout") {

override val log: Logger = Logger[JmsRetryProcessorRetryTimeoutSpec]

// "Consume messages from the retry destination and reinsert them into the original destination" in {
//
// val srcQueue : String = "myQueue"
//
// val retryMsg : FlowEnvelope = FlowEnvelope()
// .withHeader(headerCfg.headerRetryDestination, srcQueue).get
//
// val env = withExpectedDestination(srcQueue, new JmsRetryProcessor("spec", retryCfg))(retryMsg).get
//
// env.header[Long](headerCfg.headerRetryCount) should be (Some(1))
// }
//
// "Consume messages from the retry destination and pass them to the retry failed destination if the retry cont exceeds" in {
// val srcQueue : String = "myQueue"
//
// val retryMsg : FlowEnvelope = FlowEnvelope()
// .withHeader(headerCfg.headerRetryDestination, srcQueue).get
// .withHeader(headerCfg.headerRetryCount, retryCfg.maxRetries).get
//
// withExpectedDestination(retryCfg.failedDestName, new JmsRetryProcessor("spec", retryCfg))(retryMsg)
// }
//
// "Consume messages from the retry destination and pass them to the retry failed destination if the retry timeout exceeds" in {
// val srcQueue : String = "myQueue"
//
// val retryMsg : FlowEnvelope = FlowEnvelope()
// .withHeader(headerCfg.headerRetryDestination, srcQueue).get
// .withHeader(headerCfg.headerFirstRetry, System.currentTimeMillis() - 2 * retryCfg.retryTimeout.toMillis).get
//
// withExpectedDestination(retryCfg.failedDestName, new JmsRetryProcessor("spec", retryCfg))(retryMsg)
// }
//
// "Consume messages from the retry destination and pass them to the retry failed destination if no original destination is known" in {
// val retryMsg : FlowEnvelope = FlowEnvelope()
//
// withExpectedDestination(retryCfg.failedDestName, new JmsRetryProcessor("spec", retryCfg))(retryMsg)
// }
"Consume messages from the retry destination and pass them to the retry failed destination if the retry timeout exceeds" in {
val srcQueue : String = "myQueue"

val retryMsg : FlowEnvelope = FlowEnvelope()
.withHeader(headerCfg.headerRetryDestination, srcQueue).get
.withHeader(headerCfg.headerFirstRetry, System.currentTimeMillis() - 2 * retryCfg.retryTimeout.toMillis).get

withExpectedDestination(retryCfg.failedDestName, new JmsRetryProcessor("spec", retryCfg))(retryMsg).get.headOption match {
case None => fail(s"Expected message in [${retryCfg.failedDestName}]")
case Some(env) =>

val now = System.currentTimeMillis()
val first = env.header[Long](headerCfg.headerFirstRetry).get

assert(first + retryCfg.retryTimeout.toMillis <= now)
}
}
}

class JmsRetryProcessorMissingDestinationSpec extends ProcessorSpecSupport("missingDest") {

override val log: Logger = Logger[JmsRetryProcessorMissingDestinationSpec]

"Consume messages from the retry destination and pass them to the retry failed destination if no original destination is known" in {
val retryMsg : FlowEnvelope = FlowEnvelope()

withExpectedDestination(retryCfg.failedDestName, new JmsRetryProcessor("spec", retryCfg))(retryMsg).get.headOption match {
case None => fail(s"Expected message in [${retryCfg.failedDestName}]")
case Some(env) =>
env.header[Long](headerCfg.headerRetryCount) should be (Some(1))
}
}
}

@RequiresForkedJVM
class JmsRetryProcessorSendToRetrySpec extends ProcessorSpecSupport("sendToRetry") {

override val log: Logger = Logger[JmsRetryProcessorSendToRetrySpec]

"Reinsert messages into the retry destination if the send to the original destination fails" in {
val srcQueue : String = "myQueue"

val router = new JmsRetryProcessor("spec", retryCfg.copy(maxRetries = 2, retryTimeout = 1.day)) {

// This causes the send to the original destination to fail within the flow, causing
// the envelope to travel the error path.
override protected def sendToOriginal: Flow[FlowEnvelope, FlowEnvelope, NotUsed] = Flow.fromFunction[FlowEnvelope, FlowEnvelope]{ env =>
env.withException(new Exception("Boom"))
}
}

val retryMsg : FlowEnvelope = FlowEnvelope()
.withHeader(headerCfg.headerRetryDestination, srcQueue).get

val messages = withExpectedDestination(srcQueue, router, retryCfg.retryInterval * 3)(retryMsg).get
messages should be (empty)

consumeMessages(retryCfg.failedDestName)(1.second).get.headOption match {
case None => fail(s"Expected message in [${retryCfg.failedDestName}]")
case Some(env) =>
env.header[Long](headerCfg.headerRetryCount) should be (Some(3))
}
}
}

@RequiresForkedJVM
class JmsRetryProcessorFailedSpec extends ProcessorSpecSupport("JmsRetrySpec") {

override val log: Logger = Logger[JmsRetryProcessorFailedSpec]

"The Jms Retry Processor should" - {

"Deny messages that cannot be processed correctly by the retry router" in {
val router = new JmsRetryProcessor("spec", retryCfg) {
@@ -186,10 +293,5 @@ class JmsRetryProcessorSpec extends TestKit(ActorSystem("JmsRetrySpec"))
messages should be (empty)
consumeMessages(retryCfg.retryDestName)(1.second).get should not be (empty)
}

"Retry messages that cannot be sent onwards to the retry destination" in {
pending
}

}
}
Oops, something went wrong.

0 comments on commit 9c3fa72

Please sign in to comment.
You can’t perform that action at this time.