Skip to content

Commit

Permalink
Ensure send timeout to send wiretap is sufficient
Browse files Browse the repository at this point in the history
  • Loading branch information
atooni committed Aug 4, 2022
1 parent 3673504 commit 065c154
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 10 deletions.
Expand Up @@ -71,7 +71,7 @@ object JmsOperationConfig {

encoding = cfg.getString(encodingPath, "UTF-8"),

wiratapTTL = cfg.getDurationOption(wiretapTTLPath),
wiretapTtl = cfg.getDurationOption(wiretapTTLPath),

reqResourceType = cfg.getStringOption(reqResourceTypePath),

Expand All @@ -98,7 +98,7 @@ case class JmsOperationConfig(

encoding : String,

wiratapTTL : Option[FiniteDuration],
wiretapTtl : Option[FiniteDuration],

reqResourceType: Option[String],

Expand Down
Expand Up @@ -16,7 +16,6 @@ import blended.streams.message.{BinaryFlowMessage, FlowEnvelope, FlowEnvelopeLog
import blended.streams.{BlendedStreamsConfig, FlowHeaderConfig, FlowProcessor, StreamController, StreamFactories}
import blended.util.logging.{LogLevel, Logger}

import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
Expand Down Expand Up @@ -89,17 +88,24 @@ class SimpleRestJmsService(
val resType : Option[String] = if (isRequest) opCfg.reqResourceType else opCfg.respResourceType

resType.foreach{ resType =>
log.info(s"Sending wiretap [${env.id}] [${if (isRequest) "request" else "response"}] to dispatcher with [$resType]")

val wtTtl : Long = opCfg.wiratapTTL.map(_.toMillis).getOrElse(0L)

val toSend = env
val wiretapMsg = env
.withHeader(headerCfg.headerResourceType, resType).get
.withHeader(headerCfg.headerBridgeVendor, internalCfg.vendor).get
.withHeader(headerCfg.headerBridgeProvider, internalCfg.provider).get
.withHeader(destHeader(headerCfg.prefix), internalCfg.inbound.asString).get
.removeHeader(replyToHeader(headerCfg.prefix))

wiretapJms.sendMessages(wiretapSettings, envLogger, FiniteDuration(wtTtl, TimeUnit.MILLISECONDS), toSend)
val toSend = opCfg.wiretapTtl.fold(
wiretapMsg.removeHeader(expireHeader(headerCfg.prefix))
)(
ttl => wiretapMsg.withHeader(expireHeader(headerCfg.prefix), System.currentTimeMillis() + ttl.toMillis).get
)

log.debug(s"Wiretap message : $toSend")

wiretapJms.sendMessages(wiretapSettings, envLogger, 10.seconds, toSend)
}
}

Expand Down
Expand Up @@ -23,8 +23,6 @@
content-types: ["text/xml", "application/json"]

requestResourceType: "Redeem"
responseResourceType: "RedeemResp"
wiretapTimeToLive = 10 minutes

}

Expand All @@ -42,6 +40,8 @@
"org.apache.cxf.request.uri": "RedeemService/direct"
}
jmsreply : false

requestResourceType: "direct"
}

soap: {
Expand Down
@@ -1,12 +1,21 @@
package blended.akka.http.restjms.internal

import akka.actor.ActorSystem
import blended.jms.utils.{IdAwareConnectionFactory, JmsDestination}
import blended.streams.jms.JmsStreamSupport
import blended.streams.message.FlowEnvelope
import blended.streams.processor.Collector
import blended.testsupport.RequiresForkedJVM
import blended.util.logging.Logger
import sttp.client._
import sttp.model.StatusCode

import scala.util.Try
import scala.concurrent.Await
import scala.concurrent.duration._

@RequiresForkedJVM
class JMSRequestorSpec extends AbstractJmsRequestorSpec {
class JMSRequestorSpec extends AbstractJmsRequestorSpec with JmsStreamSupport {

private val log : Logger = Logger[JMSRequestorSpec]
private implicit val backend = HttpURLConnectionBackend()
Expand All @@ -18,6 +27,26 @@ class JMSRequestorSpec extends AbstractJmsRequestorSpec {
.header("Content-Type", cType, true)
}

private def consumeMessages(
destName : String,
completeOn : Option[Seq[FlowEnvelope] => Boolean],
timeout : FiniteDuration
)(implicit system : ActorSystem) : Try[List[FlowEnvelope]] = Try {

val cf : IdAwareConnectionFactory = jmsConnectionFactory(registry, mustConnect = true, timeout = timeout).get

val coll : Collector[FlowEnvelope] = receiveMessages(
headerCfg = headerCfg,
cf = cf,
dest = JmsDestination.create(destName).get,
log = envLogger(log),
completeOn = completeOn,
timeout = Some(timeout),
ackTimeout = 1.second
)
Await.result(coll.result, timeout + 100.millis)
}

"The JMSRequestor should" - {

"respond to a posted message if the operation is configured [json]" in {
Expand All @@ -26,6 +55,11 @@ class JMSRequestorSpec extends AbstractJmsRequestorSpec {
response.code should be (StatusCode.Ok)
response.body should be (Right(MockResponses.json))
response.header("Content-Type") should be (Some("application/json"))

val wiretapped =
consumeMessages("bridge.data.in", Some(_.nonEmpty), 10.seconds)(actorSystem).get

wiretapped should not be(empty)
}

"respond to a posted message if the operation is configured [xml]" in {
Expand All @@ -34,6 +68,11 @@ class JMSRequestorSpec extends AbstractJmsRequestorSpec {
response.code should be (StatusCode.Ok)
response.body should be (Right(MockResponses.xml))
response.header("Content-Type") should be (Some("text/xml"))

val wiretapped =
consumeMessages("bridge.data.in", Some(_.nonEmpty), 10.seconds)(actorSystem).get

wiretapped should not be(empty)
}

"respond with a not found return code if the operation is not configured" in {
Expand All @@ -59,6 +98,11 @@ class JMSRequestorSpec extends AbstractJmsRequestorSpec {
response.code should be (StatusCode.Ok)
response.body should be (Right(""))
response.header("Content-Type") should be (Some("application/json"))

val wiretapped =
consumeMessages("bridge.data.in", Some(_.nonEmpty), 10.seconds)(actorSystem).get

wiretapped should not be(empty)
}

"respond directly with Accepted and an empty body if 'jmsreply' is set to false and isSoap is set to true in the config" in {
Expand Down

0 comments on commit 065c154

Please sign in to comment.