Skip to content
This repository has been archived by the owner on May 31, 2024. It is now read-only.

Commit

Permalink
Fixing Bridge inbound flow with optional header configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
atooni committed Nov 18, 2018
1 parent 537e642 commit 13736af
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Source, Zip}
import akka.stream.{FlowShape, Graph, Materializer}
import blended.container.context.api.ContainerIdentifierService
import blended.jms.bridge.TrackTransaction.TrackTransaction
import blended.jms.utils.{IdAwareConnectionFactory, JmsDestination}
import blended.streams.jms._
Expand Down Expand Up @@ -35,7 +36,8 @@ case class JmsStreamConfig(
registry : BridgeProviderRegistry,
headerCfg : FlowHeaderConfig,
subscriberName : Option[String],
header : List[HeaderProcessorConfig]
header : List[HeaderProcessorConfig],
idSvc : Option[ContainerIdentifierService] = None
)

class JmsStreamBuilder(
Expand Down Expand Up @@ -212,15 +214,20 @@ class JmsStreamBuilder(
log = bridgeLogger
)

val header : Graph[FlowShape[FlowEnvelope, FlowEnvelope], NotUsed] = HeaderTransformProcessor(
name = streamId + "-header",
log = bridgeLogger,
rules = cfg.header
).flow(bridgeLogger)
val jmsSource : Source[FlowEnvelope, NotUsed] = if (cfg.inbound && cfg.header.nonEmpty) {

bridgeLogger.info(s"Creating Stream with header configs [${cfg.header}]")

val header : Graph[FlowShape[FlowEnvelope, FlowEnvelope], NotUsed] = HeaderTransformProcessor(
name = streamId + "-header",
log = bridgeLogger,
rules = cfg.header,
idSvc = cfg.idSvc
).flow(bridgeLogger)

val jmsSource : Source[FlowEnvelope, NotUsed] = if (cfg.inbound) {
src.via(header)
} else {
bridgeLogger.info(s"Creating Stream without additional header configs")
src
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ private[bridge] object BridgeControllerConfig {
internalCf = internalCf,
registry = registry,
headerCfg = headerCfg,
inbound = inboundList
inbound = inboundList,
idSvc = idSvc
)
}
}
Expand All @@ -55,7 +56,8 @@ private[bridge] case class BridgeControllerConfig(
internalCf : IdAwareConnectionFactory,
registry : BridgeProviderRegistry,
headerCfg : FlowHeaderConfig,
inbound : List[InboundConfig]
inbound : List[InboundConfig],
idSvc : ContainerIdentifierService
)

object BridgeController{
Expand Down Expand Up @@ -97,7 +99,8 @@ class BridgeController(ctrlCfg: BridgeControllerConfig)(implicit system : ActorS
headerCfg = ctrlCfg.headerCfg,
trackTransAction = TrackTransaction.On,
subscriberName = in.subscriberName,
header = in.header
header = in.header,
idSvc = Some(ctrlCfg.idSvc)
)

val streamCfg: StreamControllerConfig = new JmsStreamBuilder(inCfg).streamCfg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,22 @@ blended {

inbound : [
{
name = "sampleIn",
name = "sampleIn"
vendor = "activemq"
provider = "external"
from = "sampleIn"
},
{
name = "SampleHeaderIn"
vendor = "activemq"
provider = "external"
from = "SampleHeaderIn"
header : [
{
name : "ResourceType"
expression : "${{#Description}}"
}
]
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import blended.testsupport.pojosr.{BlendedPojoRegistry, PojoSrTestHelper, Simple
import blended.testsupport.scalatest.LoggingFreeSpecLike
import blended.util.logging.Logger
import org.osgi.framework.BundleActivator
import org.scalacheck.Gen
import org.scalatest.Matchers
import org.scalatest.prop.PropertyChecks

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
Expand All @@ -28,7 +30,8 @@ class BridgeSpec extends SimplePojoContainerSpec
with LoggingFreeSpecLike
with PojoSrTestHelper
with Matchers
with JmsStreamSupport {
with JmsStreamSupport
with PropertyChecks {

private val log = Logger[BridgeSpec]

Expand All @@ -40,14 +43,45 @@ class BridgeSpec extends SimplePojoContainerSpec
"blended.jms.bridge" -> new BridgeActivator()
)

private def brokerFilter(provider : String) : String = s"(&(vendor=activemq)(provider=$provider))"
implicit val timeout = 5.seconds

private def withStartedBridge[T](t : FiniteDuration)(f : ActorSystem => BlendedPojoRegistry => Unit) : Unit= {
implicit val timeout : FiniteDuration = t
val system : ActorSystem = mandatoryService[ActorSystem](registry)(None)
private implicit val system : ActorSystem = mandatoryService[ActorSystem](registry)(None)
private implicit val materializer : ActorMaterializer = ActorMaterializer()
private implicit val ectxt : ExecutionContext = system.dispatcher

f(system)(registry)
}
private val (internal, external) = getConnectionFactories(registry)
private val idSvc = mandatoryService[ContainerIdentifierService](registry)(None)

private val headerCfg : FlowHeaderConfig =
FlowHeaderConfig.create(idSvc.containerContext.getContainerConfig().getConfig("blended.flow.header"))

private val destHeader = new JmsEnvelopeHeader(){}.destHeader(headerCfg.prefix)

val ctrlCfg : BridgeControllerConfig = BridgeControllerConfig.create(
cfg = idSvc.containerContext.getContainerConfig().getConfig("blended.jms.bridge"),
internalCf = internal,
idSvc = idSvc
)

val cfg : JmsStreamConfig = JmsStreamConfig(
inbound = true,
headerCfg = ctrlCfg.headerCfg,
fromCf = internal,
fromDest = JmsDestination.create(s"bridge.data.in.${external.vendor}.${external.provider}").get,
toCf = internal,
toDest = Some(JmsDestination.create(s"bridge.data.out.${external.vendor}.${external.provider}").get),
listener = 3,
selector = None,
registry = ctrlCfg.registry,
trackTransAction = TrackTransaction.Off,
subscriberName = None,
header = List.empty
)

private val streamCfg = new JmsStreamBuilder(cfg).streamCfg
system.actorOf(StreamController.props(streamCfg))

private def brokerFilter(provider : String) : String = s"(&(vendor=activemq)(provider=$provider))"

private def getConnectionFactories(sr: BlendedPojoRegistry)(implicit timeout : FiniteDuration) : (IdAwareConnectionFactory, IdAwareConnectionFactory) = {
val cf1 = mandatoryService[IdAwareConnectionFactory](sr)(Some(brokerFilter("internal")))
Expand All @@ -57,87 +91,66 @@ class BridgeSpec extends SimplePojoContainerSpec

"The bridge activator should" - {

"process in- and outbound messages" in {
"process normal in- and outbound messages" in {

implicit val timeout = 5.seconds
val msgCount = 2

withStartedBridge(timeout) { s => sr =>
val msgs : Seq[FlowEnvelope] = 1.to(msgCount).map { i =>
FlowMessage(s"Message $i")(FlowMessage.noProps)
.withHeader(destHeader, s"sampleOut.$i").get
.withHeader(headerCfg.headerTrack, true).get
} map { FlowEnvelope.apply }

implicit val system : ActorSystem = s
implicit val materializer : ActorMaterializer = ActorMaterializer()
implicit val ectxt : ExecutionContext = system.dispatcher
val switch = sendMessages(external, JmsQueue("sampleIn"), log, msgs:_*)

val (internal, external) = getConnectionFactories(sr)
val idSvc = mandatoryService[ContainerIdentifierService](sr)(None)

val headerCfg : FlowHeaderConfig =
FlowHeaderConfig.create(idSvc.containerContext.getContainerConfig().getConfig("blended.flow.header"))

val ctrlCfg : BridgeControllerConfig = BridgeControllerConfig.create(
cfg = idSvc.containerContext.getContainerConfig().getConfig("blended.jms.bridge"),
internalCf = internal,
idSvc = idSvc
)

val msgCount = 2

val destHeader = new JmsEnvelopeHeader(){}.destHeader(headerCfg.prefix)

val msgs = 1.to(msgCount).map { i =>
FlowMessage(s"Message $i")(FlowMessage.noProps)
.withHeader(destHeader, s"sampleOut.$i").get
.withHeader(headerCfg.headerTrack, true).get
} map { FlowEnvelope.apply }
1.to(msgCount).map { i =>
val messages = receiveMessages(ctrlCfg.headerCfg, external, JmsQueue(s"sampleOut.$i"))(1.second, system, materializer)
messages.result.map { l =>
l should have size(1)
}
}

val cfg : JmsStreamConfig = JmsStreamConfig(
inbound = true,
headerCfg = ctrlCfg.headerCfg,
fromCf = internal,
fromDest = JmsDestination.create(s"bridge.data.in.${external.vendor}.${external.provider}").get,
toCf = internal,
toDest = Some(JmsDestination.create(s"bridge.data.out.${external.vendor}.${external.provider}").get),
listener = 3,
selector = None,
registry = ctrlCfg.registry,
trackTransAction = TrackTransaction.Off,
subscriberName = None,
header = List.empty
)
val collector = receiveMessages(ctrlCfg.headerCfg, internal, JmsQueue("internal.transactions"))(1.second, system, materializer)

val streamCfg = new JmsStreamBuilder(cfg).streamCfg
val result = collector.result.map { l =>
val envelopes = l.map(env => FlowTransactionEvent.envelope2event(ctrlCfg.headerCfg)(env).get)

system.actorOf(StreamController.props(streamCfg))
envelopes should have size(msgCount * 2)
val (started, updated) = envelopes.partition(_.isInstanceOf[FlowTransactionStarted])

val switch = sendMessages(external, JmsQueue("sampleIn"), log, msgs:_*)
started should have size msgCount
updated should have size msgCount

1.to(msgCount).map { i =>
val messages = receiveMessages(ctrlCfg.headerCfg, external, JmsQueue(s"sampleOut.$i"))(1.second, system, materializer)
messages.result.map { l =>
l should have size(1)
}
}
assert(updated.forall(_.isInstanceOf[FlowTransactionUpdate]))

val collector = receiveMessages(ctrlCfg.headerCfg, internal, JmsQueue("internal.transactions"))(1.second, system, materializer)
val sIds = started.map(_.transactionId)
val uIds = updated.map(_.transactionId)

val result = collector.result.map { l =>
val envelopes = l.map(env => FlowTransactionEvent.envelope2event(ctrlCfg.headerCfg)(env).get)
assert(sIds.forall(id => uIds.contains(id)))
}

envelopes should have size(msgCount * 2)
val (started, updated) = envelopes.partition(_.isInstanceOf[FlowTransactionStarted])
Await.result(result, 3.seconds)
switch.shutdown()
}

started should have size msgCount
updated should have size msgCount
"process messages with optional header configs" in {

assert(updated.forall(_.isInstanceOf[FlowTransactionUpdate]))
forAll { (desc : String) =>
whenever(desc.nonEmpty) {
val env : FlowEnvelope = FlowEnvelope(FlowMessage("Header")(FlowMessage.props(
destHeader -> "SampleHeaderOut",
"Description" -> desc,
headerCfg.headerTrack -> false
).get))

val sIds = started.map(_.transactionId)
val uIds = updated.map(_.transactionId)
val switch = sendMessages(external, JmsQueue("SampleHeaderIn"), log, env)
val coll = receiveMessages(ctrlCfg.headerCfg, external, JmsQueue("SampleHeaderOut"))(1.second, system, materializer)
val result = Await.result(coll.result, 1100.millis)
switch.shutdown()

assert(sIds.forall(id => uIds.contains(id)))
result should have size 1
result.head.header[String]("ResourceType") should be (Some(desc))
}

Await.result(result, 3.seconds)
switch.shutdown()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.io.File

import blended.container.context.impl.internal.ContainerIdentifierServiceImpl
import blended.jms.utils.{JmsDurableTopic, JmsQueue}
import blended.streams.processor.HeaderProcessorConfig
import blended.testsupport.BlendedTestSupport
import blended.testsupport.pojosr.MockContainerContext
import blended.testsupport.scalatest.LoggingFreeSpec
Expand Down Expand Up @@ -65,7 +66,29 @@ class InboundConfigSpec extends LoggingFreeSpec
inbound.from should be (JmsDurableTopic("de.09999.data.in", "de09999"))
inbound.provider should be (Some("de_topic"))
inbound.listener should be (4)
}

"initialize with optional headers correctly" in {
val cfgString =
"""
|{
| name = "test"
| vendor = "activemq"
| from = "inQueue"
| header : [
| {
| name : "ResourceType"
| expression : "Test"
| }
| ]
|}
""".stripMargin

val cfg = ConfigFactory.parseString(cfgString)
val inbound = InboundConfig.create(idSvc, cfg).get

inbound.header should have size 1
inbound.header.head should be (HeaderProcessorConfig("ResourceType", Some("Test"), true))
}
}
}
3 changes: 2 additions & 1 deletion project/BlendedJmsBridge.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ object BlendedJmsBridge extends ProjectFactory {
Dependencies.logbackCore % "test",
Dependencies.logbackClassic % "test",
Dependencies.activeMqBroker % "test",
Dependencies.scalatest % "test"
Dependencies.scalatest % "test",
Dependencies.scalacheck % "test"
),
adaptBundle = b => b.copy(
bundleActivator = s"${b.bundleSymbolicName}.internal.BridgeActivator"
Expand Down

0 comments on commit 13736af

Please sign in to comment.