Permalink
Browse files

First cut of DispatcherActivatorSpec

  • Loading branch information...
atooni committed Nov 11, 2018
1 parent 82cf8ca commit ee498396829ad16b03e3c52bf8127efb13849307
Showing with 162 additions and 55 deletions.
  1. +1 −1 ...d.streams.dispatcher/src/main/scala/blended/streams/dispatcher/internal/DispatcherActivator.scala
  2. +6 −1 ...eams.dispatcher/src/main/scala/blended/streams/dispatcher/internal/ResourceTypeRouterConfig.scala
  3. +61 −10 ...ams.dispatcher/src/main/scala/blended/streams/dispatcher/internal/builder/DispatcherBuilder.scala
  4. +3 −2 ...patcher/src/main/scala/blended/streams/dispatcher/internal/builder/DispatcherBuilderSupport.scala
  5. +4 −3 ...ms.dispatcher/src/main/scala/blended/streams/dispatcher/internal/builder/RunnableDispatcher.scala
  6. +13 −6 blended.streams.dispatcher/src/test/resources/container/etc/application.conf
  7. +35 −6 ...reams.dispatcher/src/test/scala/blended/streams/dispatcher/internal/DispatcherActivatorSpec.scala
  8. +7 −7 ...ms.dispatcher/src/test/scala/blended/streams/dispatcher/internal/builder/CoreDispatcherSpec.scala
  9. +3 −3 ...streams/dispatcher/internal/builder/{OutboundDispatcherSpec.scala → DispatcherOutboundSpec.scala}
  10. +2 −3 ...treams.dispatcher/src/test/scala/blended/streams/dispatcher/internal/builder/DispatcherSpec.scala
  11. +3 −2 ...patcher/src/test/scala/blended/streams/dispatcher/internal/builder/WorklistEventhandlerSpec.scala
  12. +16 −7 blended.streams/src/main/scala/blended/streams/jms/JmsFlowSupport.scala
  13. +0 −2 blended.streams/src/main/scala/blended/streams/jms/JmsStreamSupport.scala
  14. +4 −2 blended.util.logging/src/main/scala/blended/util/logging/Logger.scala
  15. +2 −0 blended.util.logging/src/main/scala/blended/util/logging/LoggerJul.scala
  16. +2 −0 blended.util.logging/src/main/scala/blended/util/logging/LoggerSlf4j.scala
@@ -27,7 +27,7 @@ class DispatcherActivator extends DominoActivator
val bs = new DispatcherBuilderSupport {
override def containerConfig: Config = cfg.idSvc.containerContext.getContainerConfig()
override val streamLogger: Logger = Logger("flow.dispatcher")
override val streamLogger: Logger = Logger(headerConfig.prefix + ".dispatcher")
}
whenAdvancedServicePresent[IdAwareConnectionFactory](internalProvider.osgiBrokerFilter) { cf =>
@@ -19,6 +19,7 @@ object ResourceTypeRouterConfig {
private[this] val applicationLogHeaderPath = "applicationLogHeader"
private[this] val defaultHeaderPath = "defaultHeader"
private[this] val resourcetypesPath = "resourcetypes"
private[this] val handledExceptionsPath = "handledExceptions"
def create(
idSvc : ContainerIdentifierService,
@@ -53,13 +54,16 @@ object ResourceTypeRouterConfig {
DefaultHeaderConfig.create(cfg)
}
val handledExceptions : List[String] = cfg.getStringList(handledExceptionsPath, List.empty)
ResourceTypeRouterConfig(
defaultProvider = internalProvider,
eventProvider = eventProvider,
applicationLogHeader = logHeader,
defaultHeader = defaultHeader,
resourceTypeConfigs = routes,
providerRegistry = provider
providerRegistry = provider,
handledExceptions = handledExceptions
)
}
}
@@ -88,6 +92,7 @@ case class ResourceTypeRouterConfig(
providerRegistry : BridgeProviderRegistry,
applicationLogHeader : List[String],
defaultHeader : List[DefaultHeaderConfig],
handledExceptions : List[String],
resourceTypeConfigs : Map[String, ResourceTypeConfig]
)
@@ -7,6 +7,7 @@ import akka.stream.scaladsl.{Flow, GraphDSL, Merge}
import blended.container.context.api.ContainerIdentifierService
import blended.streams.FlowProcessor
import blended.streams.dispatcher.internal._
import blended.streams.jms.{JmsDeliveryMode, JmsEnvelopeHeader}
import blended.streams.message.{FlowEnvelope, FlowMessage}
import blended.streams.transaction.{FlowTransactionEvent, FlowTransactionFailed, FlowTransactionUpdate}
import blended.streams.worklist._
@@ -32,8 +33,9 @@ class JmsDestinationMissing(env: FlowEnvelope, outboundId : String)
case class DispatcherBuilder(
idSvc : ContainerIdentifierService,
dispatcherCfg: ResourceTypeRouterConfig
)(implicit val bs : DispatcherBuilderSupport) {
dispatcherCfg: ResourceTypeRouterConfig,
sendFlow : Flow[FlowEnvelope, FlowEnvelope, NotUsed]
)(implicit val bs : DispatcherBuilderSupport) extends JmsEnvelopeHeader {
private[this] val logger = Logger[DispatcherBuilder]
@@ -78,7 +80,7 @@ case class DispatcherBuilder(
// If after the oubound flow the envelope is marked with an exception,
// We will generate a worklist failed event, otherwise we will generate
// a Worklist completed event with the in bound envelope.
def outbound(sendFlow : Flow[FlowEnvelope, FlowEnvelope, NotUsed])
def outbound()
: Graph[FanOutShape2[FlowEnvelope, WorklistEvent, FlowEnvelope], NotUsed] = {
GraphDSL.create() { implicit b =>
@@ -219,6 +221,59 @@ case class DispatcherBuilder(
}
}
def errorHandler() : Flow[FlowEnvelope, FlowTransactionEvent, NotUsed] = {
val g : Graph[FlowShape[FlowEnvelope, FlowEnvelope], NotUsed] = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val sendSplitter = b.add(FlowProcessor.partition[FlowEnvelope]{ env =>
dispatcherCfg.handledExceptions.contains(env.exception.map(_.getClass().getName()).getOrElse(""))
})
val merge = b.add(Merge[FlowEnvelope](2))
val routeError = b.add(Flow.fromFunction[FlowEnvelope, FlowEnvelope] { env =>
try {
val vendor = env.header[String](srcVendorHeader(bs.headerConfig.prefix)).get
val provider = env.header[String](srcProviderHeader(bs.headerConfig.prefix)).get
val errProvider = dispatcherCfg.providerRegistry.jmsProvider(vendor, provider).get
val dest = errProvider.errors.asString
bs.streamLogger.debug(s"Routing error envelope [${env.id}] to [$vendor:$provider:$dest]")
env
.withHeader(bs.headerDeliveryMode, JmsDeliveryMode.Persistent.asString).get
.withHeader(bs.headerBridgeVendor, vendor).get
.withHeader(bs.headerBridgeProvider, provider).get
.withHeader(bs.headerBridgeDest, dest).get
} catch {
case t : Throwable =>
bs.streamLogger.warn(s"Failed to resolve error routing for envelope [${env.id}] : [${t.getMessage()}]")
env
}
})
val sendError = b.add(sendFlow)
val ackError = Flow.fromFunction[FlowEnvelope, FlowEnvelope] { env =>
bs.streamLogger.debug(s"Acknowledging envelope [${env.id}]")
env.acknowledge()
env
}
sendSplitter.out0 ~> routeError ~> sendError ~> ackError ~> merge.in(0)
sendSplitter.out1 ~> merge.in(1)
FlowShape(sendSplitter.in, merge.out)
}
Flow.fromGraph(g)
.via(Flow.fromFunction[FlowEnvelope, FlowTransactionEvent] { env =>
FlowTransactionFailed(env.id, env.exception.map(_.getMessage()))
})
}
// The dispatcher processes a stream of inbound FlowEnvelopes and generates TransactionEvents to update
// a monitored FlowTransaction.
//
@@ -244,16 +299,14 @@ case class DispatcherBuilder(
// 9. The combined transaction events are passed down stream
def dispatcher(
sendFlow : Flow[FlowEnvelope, FlowEnvelope, NotUsed]
) : Graph[FlowShape[FlowEnvelope, FlowTransactionEvent], NotUsed] = {
def dispatcher() : Graph[FlowShape[FlowEnvelope, FlowTransactionEvent], NotUsed] = {
GraphDSL.create() { implicit b =>
// of course we start with the core
val callCore = b.add(core())
// we do need a send processor
val callSend = b.add(outbound(sendFlow))
val callSend = b.add(outbound())
// We will collect the errors here
val error = b.add(Merge[FlowEnvelope](3))
@@ -284,9 +337,7 @@ case class DispatcherBuilder(
// if envelopes marked with an exception again got to the error channel
callSend.out1 ~> error
val errHandler = b.add(Flow.fromFunction[FlowEnvelope, FlowTransactionEvent] { env =>
FlowTransactionFailed(env.id, env.exception.map(_.getMessage()))
})
val errHandler = b.add(errorHandler())
// generate and collect transaction failures for exceptions
error.out ~> errHandler ~> trans.in(1)
@@ -3,6 +3,7 @@ package blended.streams.dispatcher.internal.builder
import java.util.UUID
import blended.streams.dispatcher.internal.{OutboundRouteConfig, ResourceTypeConfig}
import blended.streams.jms.JmsEnvelopeHeader
import blended.streams.message.FlowEnvelope
import blended.streams.transaction.FlowHeaderConfig
import blended.streams.worklist.{FlowWorklistItem, Worklist, WorklistItem}
@@ -12,7 +13,7 @@ import com.typesafe.config.Config
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}
trait DispatcherBuilderSupport {
trait DispatcherBuilderSupport extends JmsEnvelopeHeader {
def containerConfig : Config
def headerConfig : FlowHeaderConfig = FlowHeaderConfig.create(containerConfig.getConfig("blended.flow.header"))
@@ -34,7 +35,7 @@ trait DispatcherBuilderSupport {
def headerBridgeVendor : String = header("BridgeVendor")
def headerBridgeProvider : String = header("BridgeProvider")
def headerBridgeDest : String = header("BridgeDestination")
def headerBridgeDest : String = destHeader(headerConfig.prefix)
def headerCbeEnabled : String = header("CbeEnabled")
@@ -45,7 +45,7 @@ class RunnableDispatcher(
case (internal.vendor, internal.provider) =>
JmsDestination.create(env.header[String](bs.headerBridgeDest).get).get
case (v, p) =>
val dest = s"${internal.inbound.name}.$v.$p"
val dest = s"${internal.outbound.name}.$v.$p"
JmsDestination.create(dest).get
}
@@ -109,7 +109,8 @@ class RunnableDispatcher(
private val builder = DispatcherBuilder(
idSvc = idSvc,
dispatcherCfg = routerCfg
dispatcherCfg = routerCfg,
dispatcherSend()
)(bs)
def bridgeSource(
@@ -140,7 +141,7 @@ class RunnableDispatcher(
def start() : Unit = {
val dispatcher : Flow[FlowEnvelope, FlowTransactionEvent, NotUsed] =
Flow.fromGraph(builder.dispatcher(dispatcherSend()))
Flow.fromGraph(builder.dispatcher())
val transSend : Sink[FlowTransactionEvent, NotUsed] = transactionSend()
@@ -86,8 +86,9 @@ blended {
internal : false
inbound : "bridge.data.in"
outbound : "bridge.data.out"
errorQueue: "global.error"
eventQueue: "$[["${blended.flow.header.prefix}"Country]].global.evnt.out"
errors: "global.error"
transactions: "$[["${blended.flow.header.prefix}"Country]].global.trans.out"
cbes: "$[["${blended.flow.header.prefix}"Country]].global.evnt.out"
listener: 3
},
{
@@ -96,8 +97,9 @@ blended {
internal : false
inbound : "bridge.data.in"
outbound : "bridge.data.out"
errorQueue: "global.error"
eventQueue: "event.out"
errors: "global.error"
transactions: "global.transactions"
cbes: "event.out"
listener: 3
},
{
@@ -106,8 +108,9 @@ blended {
internal : false
inbound : "bridge.data.in"
outbound : "bridge.data.out"
errorQueue: "global.error"
eventQueue: "event.out"
errors: "global.error"
transactions: "global.transactions"
cbes: "event.out"
listener: 3
}
]
@@ -154,6 +157,10 @@ blended {
{ name : ${blended.flow.header.prefix}"BridgeDestination" }
]
handledExceptions = [
"blended.streams.dispatcher.internal.builder.IllegalResourceType"
]
resourcetypes = {
SagTest = {
@@ -2,21 +2,28 @@ package blended.streams.dispatcher.internal
import java.io.File
import akka.stream.ActorMaterializer
import blended.activemq.brokerstarter.BrokerActivator
import blended.akka.internal.BlendedAkkaActivator
import blended.jms.bridge.internal.BridgeActivator
import blended.jms.utils.JmsDestination
import blended.streams.dispatcher.internal.builder.DispatcherSpecSupport
import blended.streams.jms.JmsStreamSupport
import blended.streams.message.FlowEnvelope
import blended.testsupport.pojosr.PojoSrTestHelper
import blended.testsupport.{BlendedTestSupport, RequiresForkedJVM}
import blended.util.logging.Logger
import org.osgi.framework.BundleActivator
import org.scalatest.Matchers
import scala.concurrent.Await
import scala.concurrent.duration._
@RequiresForkedJVM
class DispatcherActivatorSpec extends DispatcherSpecSupport
with Matchers
with PojoSrTestHelper {
with PojoSrTestHelper
with JmsStreamSupport {
System.setProperty("AppCountry", country)
System.setProperty("AppLocation", location)
@@ -42,15 +49,37 @@ class DispatcherActivatorSpec extends DispatcherSpecSupport
withDispatcherConfig { ctxt =>
implicit val system = ctxt.system
implicit val materializer = ActorMaterializer()
implicit val eCtxt = ctxt.system.dispatcher
implicit val timeout = 3.seconds
implicit val timeout = 5.seconds
// make sure we can connect to all connection factories
val amq = jmsConnectionFactory(registry, ctxt)("activemq", "activemq", timeout)
val sonic = jmsConnectionFactory(registry, ctxt)("sonic75", "central", timeout)
val ccQueue = jmsConnectionFactory(registry, ctxt)("sagum", s"${country}_queue", timeout)
val amq = jmsConnectionFactory(registry, ctxt)("activemq", "activemq", timeout).get
val sonic = jmsConnectionFactory(registry, ctxt)("sonic75", "central", timeout).get
val ccQueue = jmsConnectionFactory(registry, ctxt)("sagum", s"${country}_queue", timeout).get
val switch = sendMessages(
headerCfg = ctxt.bs.headerConfig,
cf = sonic,
dest = JmsDestination.create("sonic.data.in").get,
log = Logger(loggerName),
msgs = FlowEnvelope().withHeader(ctxt.bs.headerResourceType, "Dummy").get
)
val coll = receiveMessages(
headerCfg = ctxt.bs.headerConfig,
cf = sonic,
dest = JmsDestination.create("global.error").get
)
pending
try {
val msgs = Await.result(coll.result, timeout + 1.second)
msgs should have size 1
} finally {
switch.shutdown()
}
}
}
}
@@ -24,6 +24,7 @@ class CoreDispatcherSpec extends DispatcherSpecSupport
with Matchers {
override def loggerName: String = classOf[CoreDispatcherSpec].getName()
val goodFlow = Flow.fromFunction[FlowEnvelope, FlowEnvelope]{env => env}
val defaultTimeout : FiniteDuration = 1.second
@@ -47,8 +48,6 @@ class CoreDispatcherSpec extends DispatcherSpecSupport
Collector[FlowEnvelope],
RunnableGraph[(ActorRef, KillSwitch)]
)= {
implicit val materializer : Materializer = ActorMaterializer()
val jmsCollector = Collector[FlowEnvelope]("jms")
@@ -66,7 +65,7 @@ class CoreDispatcherSpec extends DispatcherSpecSupport
val worklist : Inlet[WorklistEvent] = builder.add(wlCollector.sink).in
val error : Inlet[FlowEnvelope] = builder.add(errCollector.sink).in
val dispatcher = builder.add(DispatcherBuilder(ctxt.idSvc, ctxt.cfg)(ctxt.bs).core())
val dispatcher = builder.add(DispatcherBuilder(ctxt.idSvc, ctxt.cfg, goodFlow)(ctxt.bs).core())
dispatcher.out0 ~> out
dispatcher.out1 ~> worklist
@@ -91,8 +90,8 @@ class CoreDispatcherSpec extends DispatcherSpecSupport
testMessages : FlowEnvelope*
)(implicit system: ActorSystem) : Future[DispatcherResult] = {
implicit val eCtxt = system.dispatcher
implicit val materializer = ActorMaterializer()
implicit val eCtxt : ExecutionContext = system.dispatcher
implicit val materializer : Materializer = ActorMaterializer()
val source = StreamFactories.keepAliveSource[FlowEnvelope](testMessages.size)
val (jmsColl, wlColl, errorColl, g) = runnableDispatcher(ctxt, testMessages.size)
@@ -144,7 +143,8 @@ class CoreDispatcherSpec extends DispatcherSpecSupport
val builder = DispatcherBuilder(
idSvc = ctxt.idSvc,
dispatcherCfg = ctxt.cfg
dispatcherCfg = ctxt.cfg,
goodFlow
)(ctxt.bs)
val core = builder.core()
@@ -154,7 +154,7 @@ class CoreDispatcherSpec extends DispatcherSpecSupport
env
}
val dispatcher = builder.dispatcher(dummyOut)
val dispatcher = builder.dispatcher()
travesty.toFile(core, OutputFormat.SVG)(new File(BlendedTestSupport.projectTestOutput, "dispatcher_core.svg").getAbsolutePath())
travesty.toFile(event, OutputFormat.SVG)(new File(BlendedTestSupport.projectTestOutput, "dispatcher_wlEvent.svg").getAbsolutePath())
Oops, something went wrong.

0 comments on commit ee49839

Please sign in to comment.