Permalink
Browse files

Creating sample configs to make travis work again

  • Loading branch information...
atooni committed Oct 26, 2018
1 parent 3688e3b commit 92d447d225d68ff235e016219edc43578ab37712
@@ -36,7 +36,6 @@ object ContainerPropertyResolver {
}
}
private[this] def extractVariableElement(line: String, startDelim: String, endDelim: String) : (String, String, String) = {
line.lastIndexOf(startDelim) match {
@@ -177,7 +176,7 @@ object ContainerPropertyResolver {
log.warn(s"Could not resolve expression [${line}], using empty String")
""
case Some(r) =>
log.debug(s"Evaluated [$line] to [$r][${r.getClass().getName()}]")
log.trace(s"Evaluated [$line] to [$r][${r.getClass().getName()}]")
r
}
}
@@ -225,7 +225,9 @@ object InboundRouteConfig {
InboundRouteConfig(
entry = JmsDestination.create(destName).get,
header = cfg.getStringMap(headerPath, Map.empty).mapValues(s => idSvc.resolvePropertyString(s).map(_.toString()).get)
// This will be resolved in message context
// TODO: generalize resolver concept
header = cfg.getStringMap(headerPath, Map.empty) // .mapValues(s => idSvc.resolvePropertyString(s).map(_.toString()).get)
)
}
}
@@ -49,16 +49,18 @@ case class DispatcherFanout(
case Some(c) =>
val use = idSvc.resolvePropertyString(c, env.flowMessage.header.mapValues(_.value)).map(_.asInstanceOf[Boolean]).get
val s = s"using header for [${env.id}]:[outboundMsg] block with expression [$c]"
if (use) {
bs.streamLogger.info(s"Using header for [${env.id}]:[outboundMsg] block with expression [$c]")
bs.streamLogger.info(s)
} else {
bs.streamLogger.info("Not " + s )
}
use
}
}
}
Try {
var newEnv : FlowEnvelope = env
.withHeader(bs.headerBridgeMaxRetry, outCfg.maxRetries).get
.withHeader(bs.headerBridgeClose, outCfg.autoComplete).get
@@ -70,7 +72,7 @@ case class DispatcherFanout(
outCfg.outboundHeader.filter(b => useHeaderBlock(b).get).foreach { oh =>
oh.header.foreach { case (header, value) =>
val resolved = idSvc.resolvePropertyString(value, env.flowMessage.header.mapValues(_.value)).get
bs.streamLogger.debug(s"Resolved property [$header] to [$resolved]")
bs.streamLogger.trace(s"[${newEnv.id}]:[${outCfg.id}] - resolved property [$header] to [$resolved]")
newEnv = newEnv.withHeader(header, resolved).get
}
}
@@ -120,7 +122,7 @@ case class DispatcherFanout(
val worklist = builder.add(Flow[Seq[(OutboundRouteConfig, FlowEnvelope)]].map(toWorklist))
val wlLog = builder.add(Flow.fromFunction[WorklistEvent, WorklistEvent] { evt =>
bs.streamLogger.debug(s"About to send worklist event [$evt]")
bs.streamLogger.trace(s"About to send worklist event [$evt]")
evt
})

This file was deleted.

Oops, something went wrong.
@@ -77,8 +77,6 @@ class CoreDispatcherSpec extends LoggingFreeSpec
val worklist : Inlet[WorklistEvent] = builder.add(worklistSink).in
val error : Inlet[FlowEnvelope] = builder.add(errorSink).in
val devNull = Inlet[FlowEnvelope]("devNull")
val dispatcher = builder.add(DispatcherBuilder(idSvc, dispatcherCfg)(bs).core())
dispatcher.out0 ~> out
@@ -5,7 +5,7 @@ import akka.stream.scaladsl.Sink
import akka.testkit.TestProbe
import blended.akka.internal.BlendedAkkaActivator
import blended.container.context.api.ContainerIdentifierService
import blended.jms.bridge.BridgeProviderRegistry
import blended.jms.bridge.{BridgeProviderConfig, BridgeProviderRegistry}
import blended.jms.bridge.internal.BridgeActivator
import blended.streams.dispatcher.internal.ResourceTypeRouterConfig
import blended.streams.message.FlowEnvelope
@@ -29,6 +29,9 @@ trait DispatcherSpecSupport extends SimplePojosrBlendedContainer with PojoSrTest
def baseDir : String
def loggerName : String
def providerId(vendor: String, provider: String) : String =
classOf[BridgeProviderConfig].getSimpleName() + s"($vendor:$provider)"
def collector[T](name : String)(implicit system : ActorSystem, clazz : ClassTag[T]) : (TestProbe, Sink[T, _]) = {
val p = TestProbe(name)
val actor = system.actorOf(CollectingActor.props[T](name, p.ref))
@@ -8,14 +8,12 @@ import akka.stream._
import akka.stream.scaladsl.{Flow, GraphDSL, Keep, Source}
import blended.streams.dispatcher.internal.OutboundRouteConfig
import blended.streams.message.{FlowEnvelope, FlowMessage}
import blended.streams.testsupport.StreamFactories
import blended.streams.worklist.{WorklistEvent, WorklistState}
import blended.testsupport.BlendedTestSupport
import blended.testsupport.scalatest.LoggingFreeSpec
import blended.util.logging.Logger
import org.scalatest.Matchers
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
@@ -33,7 +31,6 @@ class FanoutSpec extends LoggingFreeSpec
override val streamLogger: Logger = Logger(loggerName)
}
def performFanout(
ctxt : DispatcherExecContext,
fanout : DispatcherFanout,
@@ -111,13 +108,8 @@ class FanoutSpec extends LoggingFreeSpec
val envOut = b.add(envSink)
val wlOut = b.add(wlsink)
val wlLog = b.add(Flow.fromFunction[WorklistEvent, WorklistEvent] { evt =>
bs.streamLogger.debug(s"Received worklist event [$evt]")
evt
})
fanoutGraph.out0 ~> envOut
fanoutGraph.out1 ~> wlLog ~> wlOut
fanoutGraph.out1 ~> wlOut
SinkShape(fanoutGraph.in)
}
@@ -136,14 +128,11 @@ class FanoutSpec extends LoggingFreeSpec
val envelope = FlowEnvelope(FlowMessage.noProps)
val rtCfg = ctxt.cfg.resourceTypeConfigs.get(resType).get
val source = StreamFactories.keepAliveSource[FlowEnvelope](1)
val source = Source(List(envelope.withContextObject(bs.rtConfigKey, rtCfg)))
val runnable = runnableFanout(source, fanout)
val (actor, switch) = runnable._3.run()
akka.pattern.after(100.millis, system.scheduler)(Future { switch.shutdown() })
actor ! envelope.withContextObject(bs.rtConfigKey, rtCfg)
runnable._3.run()
val envelopes = runnable._1.expectMsgType[List[FlowEnvelope]](1.second)
val worklists = runnable._2.expectMsgType[List[WorklistEvent]](1.second)

This file was deleted.

Oops, something went wrong.
@@ -6,6 +6,7 @@ import akka.stream.scaladsl.Flow
import blended.streams.FlowProcessor
import blended.testsupport.BlendedTestSupport
import blended.testsupport.scalatest.LoggingFreeSpec
import blended.util.logging.Logger
import org.scalatest.Matchers
import scala.util.Success
@@ -14,28 +15,32 @@ class OutboundDispatcherSpec extends LoggingFreeSpec
with Matchers
with DispatcherSpecSupport {
override def country: String = "cc"
override def location: String = "09999"
override def loggerName: String = getClass().getName()
override def baseDir : String = new File(BlendedTestSupport.projectTestOutput, "container").getAbsolutePath()
implicit val bs = new DispatcherBuilderSupport {
override val prefix: String = "SIB"
override val streamLogger: Logger = Logger(loggerName)
}
"The outbound flow of the dispatcher should" - {
"produce a worklist completed event for successfull completions of the outbound flow" in {
// withDispatcherConfig() { ctxt =>
// // a simple identity outbound flow for testing
// val outbound = Flow.fromGraph(FlowProcessor.fromFunction("out", bs.streamLogger){ env =>
// Success(env)
// })
//
// val g = DispatcherBuilder(ctxt.idSvc, ctxt.cfg).outbound(outbound)
//
// fail()
//
// }
//
withDispatcherConfig() { ctxt =>
// a simple identity outbound flow for testing
val outbound = Flow.fromGraph(FlowProcessor.fromFunction("out", bs.streamLogger){ env =>
Success(env)
})
val g = DispatcherBuilder(ctxt.idSvc, ctxt.cfg).outbound(outbound)
fail()
}
}
}
Oops, something went wrong.

0 comments on commit 92d447d

Please sign in to comment.