From efc50ae756fa67fac4e08dcc7b9bdc9febbe57bb Mon Sep 17 00:00:00 2001 From: Andreas Gies Date: Tue, 4 Jun 2019 16:30:08 +0200 Subject: [PATCH 1/3] Adding unit test for jms handling of binary messages with empty body --- .../jms/bridge/internal/BridgeSpec.scala | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/blended.jms.bridge/src/test/scala/blended/jms/bridge/internal/BridgeSpec.scala b/blended.jms.bridge/src/test/scala/blended/jms/bridge/internal/BridgeSpec.scala index de0143653..00e87efc6 100644 --- a/blended.jms.bridge/src/test/scala/blended/jms/bridge/internal/BridgeSpec.scala +++ b/blended.jms.bridge/src/test/scala/blended/jms/bridge/internal/BridgeSpec.scala @@ -11,7 +11,7 @@ import blended.akka.internal.BlendedAkkaActivator import blended.container.context.api.ContainerIdentifierService import blended.jms.utils.{IdAwareConnectionFactory, JmsDestination, JmsQueue} import blended.streams.jms._ -import blended.streams.message.{FlowEnvelope, FlowMessage, TextFlowMessage} +import blended.streams.message.{BinaryFlowMessage, FlowEnvelope, FlowMessage, TextFlowMessage} import blended.streams.processor.Collector import blended.streams.transaction.{FlowHeaderConfig, FlowTransactionEvent, FlowTransactionStarted, FlowTransactionUpdate} import blended.testsupport.pojosr.{BlendedPojoRegistry, PojoSrTestHelper, SimplePojoContainerSpec} @@ -141,7 +141,25 @@ class InboundBridgeUntrackedSpec extends BridgeSpecSupport { implicit val timeout : FiniteDuration = 1.second - val msg : TextFlowMessage = TextFlowMessage(null, FlowMessage.noProps) + val msg : FlowMessage = TextFlowMessage(null, FlowMessage.noProps) + val msgs : Seq[FlowEnvelope] = Seq(FlowEnvelope(msg)) + + val switch : KillSwitch = sendMessages("sampleIn", external)(msgs:_*) + + val messages : List[FlowEnvelope] = + consumeMessages(internal, "bridge.data.in.activemq.external")(1.second, system, materializer).get + + messages should have size msgs.size + + consumeEvents().get should be (empty) + + switch.shutdown() + } + + "process messages with an empty binary body" in { + implicit val timeout : FiniteDuration = 1.second + + val msg : FlowMessage = BinaryFlowMessage(Array.empty[Byte], FlowMessage.noProps) val msgs : Seq[FlowEnvelope] = Seq(FlowEnvelope(msg)) val switch : KillSwitch = sendMessages("sampleIn", external)(msgs:_*) From 32b675cc14d3cb27c1719aef1cad449a68a7f15a Mon Sep 17 00:00:00 2001 From: Andreas Gies Date: Tue, 4 Jun 2019 17:36:12 +0200 Subject: [PATCH 2/3] Releasing 3.0-M15 --- version.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.txt b/version.txt index 0609f8e46..7b39c270c 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -3.0-M15-SNAPSHOT +3.0-M15 From 75608b13cb4118e239a1654992b169fa8e67cd77 Mon Sep 17 00:00:00 2001 From: Andreas Gies Date: Thu, 6 Jun 2019 21:25:03 +0200 Subject: [PATCH 3/3] Avoiding that a clearBody() changes the MessageType --- .../internal/builder/DispatcherFanout.scala | 2 +- .../blended/streams/jms/JmsFlowSupport.scala | 4 +++- .../blended/streams/message/FlowMessage.scala | 16 +++++++++++++--- version.txt | 2 +- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/blended.streams.dispatcher/src/main/scala/blended/streams/dispatcher/internal/builder/DispatcherFanout.scala b/blended.streams.dispatcher/src/main/scala/blended/streams/dispatcher/internal/builder/DispatcherFanout.scala index b734fc5a8..b6c613b5d 100644 --- a/blended.streams.dispatcher/src/main/scala/blended/streams/dispatcher/internal/builder/DispatcherFanout.scala +++ b/blended.streams.dispatcher/src/main/scala/blended/streams/dispatcher/internal/builder/DispatcherFanout.scala @@ -91,7 +91,7 @@ case class DispatcherFanout( } newEnv = if (oh.clearBody) { - newEnv.copy(flowMessage = BaseFlowMessage(newEnv.flowMessage.header)) + newEnv.copy(flowMessage = newEnv.flowMessage.clearBody()) } else { newEnv } diff --git a/blended.streams/src/main/scala/blended/streams/jms/JmsFlowSupport.scala b/blended.streams/src/main/scala/blended/streams/jms/JmsFlowSupport.scala index 28d2080a6..df8b6c4ca 100644 --- a/blended.streams/src/main/scala/blended/streams/jms/JmsFlowSupport.scala +++ b/blended.streams/src/main/scala/blended/streams/jms/JmsFlowSupport.scala @@ -51,6 +51,7 @@ trait JmsEnvelopeHeader { val deliveryModeHeader : String => String = s => jmsHeaderPrefix(s) + "DeliveryMode" val replyToHeader : String => String = s => jmsHeaderPrefix(s) + "ReplyTo" val timestampHeader : String => String = s => jmsHeaderPrefix(s) + "Timestamp" + val typeHeader : String => String = s => jmsHeaderPrefix(s) + "Type" val replyToQueueName : String = "replyTo" } @@ -90,7 +91,8 @@ object JmsFlowSupport extends JmsEnvelopeHeader { srcDestHeader(prefix) -> dest, priorityHeader(prefix) -> msg.getJMSPriority(), deliveryModeHeader(prefix) -> delMode, - timestampHeader(prefix) -> msg.getJMSTimestamp() + timestampHeader(prefix) -> msg.getJMSTimestamp(), + typeHeader(prefix) -> msg.getJMSType() ).get val expireHeaderMap : Map[String, MsgProperty] = msg.getJMSExpiration() match { diff --git a/blended.streams/src/main/scala/blended/streams/message/FlowMessage.scala b/blended.streams/src/main/scala/blended/streams/message/FlowMessage.scala index be2487157..3a4142736 100644 --- a/blended.streams/src/main/scala/blended/streams/message/FlowMessage.scala +++ b/blended.streams/src/main/scala/blended/streams/message/FlowMessage.scala @@ -85,6 +85,8 @@ import blended.streams.message.FlowMessage.FlowMessageProps sealed abstract class FlowMessage(msgHeader: FlowMessageProps) { def body() : Any + def clearBody() : FlowMessage = this + def header : FlowMessageProps = msgHeader def bodySize() : Int @@ -173,7 +175,7 @@ sealed abstract class FlowMessage(msgHeader: FlowMessageProps) { def withHeader(key : String, value: Any, overwrite: Boolean = true) : Try[FlowMessage] - protected def doRemoveHeader(keys : String*) : FlowMessageProps = header.filter(k => !keys.contains(k)) + protected def doRemoveHeader(keys : String*) : FlowMessageProps = header.filterKeys(k => !keys.contains(k)) protected def newHeader(key : String, value: Any, overwrite: Boolean) : Try[FlowMessageProps] = Try { if (overwrite) { @@ -221,13 +223,17 @@ case class BinaryFlowMessage(content : ByteString, override val header: FlowMess } override def removeHeader(keys: String*): FlowMessage = copy(header = doRemoveHeader(keys:_*)) + + override def clearBody(): FlowMessage = BinaryFlowMessage(Array.empty[Byte], header) } case class TextFlowMessage(content : String, override val header: FlowMessageProps) extends FlowMessage(header) { - private val textContent : Option [String] = Option(content) + private val textContent : Option[String] = Option(content) - override def body(): Any = textContent.getOrElse(null) + // scalastyle:off null + override def body(): Any = textContent.orNull + // scalastyle:on null def getText(): String = textContent.getOrElse("") @@ -238,4 +244,8 @@ case class TextFlowMessage(content : String, override val header: FlowMessagePro } override def removeHeader(keys: String*): FlowMessage = copy(header = doRemoveHeader(keys:_*)) + + // scalastyle:off null + override def clearBody(): FlowMessage = TextFlowMessage("", header) + // scalastyle:on null } diff --git a/version.txt b/version.txt index 7b39c270c..c72685bf0 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -3.0-M15 +3.0-M16