Permalink
Browse files

Correction Transaction log in case the inbound message is ent directl…

…y to the inbound queue of the internal bridge provider.
  • Loading branch information...
atooni committed Nov 26, 2018
1 parent dc6e255 commit 7ed4077cf43f384f404d317ad13435caecdcde4e
@@ -125,9 +125,9 @@ jobs:
script:
- bash $TRAVIS_BUILD_DIR/scripts/itest.sh
after_failure:
- $TRAVIS_BUILD_DIR/scripts/upload-directory.sh $TRAVIS_BUILD_DIR/container/target itest
- $TRAVIS_BUILD_DIR/scripts/upload-directory.sh $TRAVIS_BUILD_DIR/container/itest itest
after_success:
- $TRAVIS_BUILD_DIR/scripts/upload-directory.sh $TRAVIS_BUILD_DIR/container/target itest
- $TRAVIS_BUILD_DIR/scripts/upload-directory.sh $TRAVIS_BUILD_DIR/container/itest itest
# ------ Stage Site Deploy
- stage: Site
@@ -2,7 +2,7 @@ package blended.jms.bridge
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Source, Zip}
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Source}
import akka.stream.{FlowShape, Graph, Materializer}
import blended.container.context.api.ContainerIdentifierService
import blended.jms.bridge.TrackTransaction.TrackTransaction
@@ -11,11 +11,10 @@ import blended.streams.jms._
import blended.streams.message.FlowEnvelope
import blended.streams.processor.{HeaderProcessorConfig, HeaderTransformProcessor}
import blended.streams.transaction._
import blended.streams.worklist.WorklistState
import blended.streams.{FlowProcessor, StreamControllerConfig}
import blended.util.logging.Logger
import scala.concurrent.duration._
import scala.concurrent.duration._
import scala.util.Try
class InvalidBridgeConfigurationException(msg: String) extends Exception(msg)
@@ -107,94 +106,6 @@ class JmsStreamBuilder(
doTrack
}
private[bridge] def transactionSink(internalCf : IdAwareConnectionFactory, eventDest : JmsDestination) :
Flow[FlowEnvelope, FlowEnvelope, _] = {
val g = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val settings = JmsProducerSettings(
connectionFactory = internalCf,
deliveryMode = JmsDeliveryMode.Persistent,
jmsDestination = Some(eventDest)
)
val producer = b.add(jmsProducer(
name = "event",
settings = settings,
autoAck = false,
log = bridgeLogger
))
val switchOffTracking = b.add(Flow.fromFunction[FlowEnvelope, FlowEnvelope] { env =>
bridgeLogger.debug(s"About to send envelope [$env]")
env.withHeader(cfg.headerCfg.headerTrack, false).get
})
switchOffTracking ~> producer
FlowShape(switchOffTracking.in, producer.out)
}
Flow.fromGraph(g)
}
private[bridge] val createTransaction : Flow[FlowEnvelope, FlowEnvelope, NotUsed] = {
def startTransaction(env: FlowEnvelope) : FlowTransactionEvent = {
FlowTransactionStarted(env.id, env.flowMessage.header)
}
def updateTransaction(env: FlowEnvelope) : FlowTransactionEvent = {
env.exception match {
case None =>
val branch = env.header[String](cfg.headerCfg.headerBranch).getOrElse("default")
FlowTransactionUpdate(env.id, env.flowMessage.header, WorklistState.Completed, branch)
case Some(e) => FlowTransactionFailed(env.id, env.flowMessage.header, Some(e.getMessage()))
}
}
val g = FlowProcessor.fromFunction("createTransaction", bridgeLogger){ env =>
Try {
val event : FlowTransactionEvent = if (cfg.inbound) {
startTransaction(env)
} else {
updateTransaction(env)
}
bridgeLogger.debug(s"Generated bridge transaction event [$event]")
FlowTransactionEvent.event2envelope(cfg.headerCfg)(event)
.withHeader(cfg.headerCfg.headerTrackSource, streamId).get
}
}
Flow.fromGraph(g)
}
private[bridge] def transactionWiretap(internalCf : IdAwareConnectionFactory, eventDest : JmsDestination) : Flow[FlowEnvelope, FlowEnvelope, NotUsed] = {
val g = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val split = b.add(Broadcast[FlowEnvelope](2))
val trans = b.add(createTransaction)
val sink = b.add(transactionSink(internalCf, eventDest))
val zip = b.add(Zip[FlowEnvelope, FlowEnvelope]())
val select = b.add(Flow.fromFunction[(FlowEnvelope, FlowEnvelope), FlowEnvelope]{ _._2 })
split.out(1) ~> zip.in1
split.out(0) ~> trans ~> sink ~> zip.in0
zip.out ~> select
FlowShape(split.in, select.out)
}
Flow.fromGraph(g)
}
private val stream : Source[FlowEnvelope, NotUsed] = {
val g : Graph[FlowShape[FlowEnvelope, FlowEnvelope], NotUsed] = GraphDSL.create() { implicit b =>
@@ -203,7 +114,16 @@ class JmsStreamBuilder(
val trackSplit = b.add(trackFilter)
val mergeResult = b.add(Merge[FlowEnvelope](2))
val wiretap = transactionWiretap(internalCf.get, internalProvider.get.transactions)
val wiretap : Flow[FlowEnvelope, FlowEnvelope, NotUsed] =
new TransactionWiretap(
cf = internalCf.get,
eventDest = internalProvider.get.transactions,
headerCfg = cfg.headerCfg,
inbound = cfg.inbound,
trackSource = streamId,
log = bridgeLogger
).flow()
trackSplit.out0 ~> wiretap ~> mergeResult.in(0)
trackSplit.out1 ~> mergeResult.in(1)
@@ -2,16 +2,16 @@ package blended.streams.dispatcher.internal.builder
import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.{Flow, GraphDSL, Keep, Merge, Sink, Source}
import akka.stream._
import akka.stream.scaladsl.{Flow, GraphDSL, Source}
import blended.container.context.api.ContainerIdentifierService
import blended.jms.bridge.{BridgeProviderConfig, BridgeProviderRegistry}
import blended.jms.utils.{IdAwareConnectionFactory, JmsDestination}
import blended.streams.{FlowProcessor, StreamController, StreamControllerConfig}
import blended.streams.dispatcher.internal.ResourceTypeRouterConfig
import blended.streams.jms._
import blended.streams.message.FlowEnvelope
import blended.streams.transaction.{FlowTransactionEvent, FlowTransactionManager}
import blended.streams.transaction.{FlowTransactionEvent, FlowTransactionManager, TransactionWiretap}
import blended.streams.{StreamController, StreamControllerConfig}
import blended.util.logging.Logger
import scala.collection.mutable
@@ -116,12 +116,27 @@ class RunnableDispatcher(
}
)
RestartableJmsSource(
val source = jmsConsumer(
name = settings.jmsDestination.get.asString,
settings = settings,
headerConfig = bs.headerConfig,
log = logger
)
if (provider.internal) {
val startTransaction = new TransactionWiretap(
cf = cf,
eventDest = provider.transactions,
headerCfg = bs.headerConfig,
inbound = true,
trackSource = "internalDispatcher",
log = bs.streamLogger
).flow()
source.via(startTransaction)
} else {
source
}
}
def start() : Unit = {
@@ -20,7 +20,7 @@ class TransactionOutbound(
dispatcherCfg : ResourceTypeRouterConfig,
internalCf: IdAwareConnectionFactory,
log: Logger
)(implicit system : ActorSystem, bs: DispatcherBuilderSupport) {
)(implicit system : ActorSystem, bs: DispatcherBuilderSupport) extends JmsStreamSupport {
private implicit val materializer : Materializer = ActorMaterializer()
private val config = dispatcherCfg.providerRegistry.mandatoryProvider(internalCf.vendor, internalCf.provider)
@@ -33,7 +33,7 @@ class TransactionOutbound(
.withDestination(Some(config.get.transactions))
.withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge)
RestartableJmsSource(
jmsConsumer(
name = "transactionOutbound",
settings = srcSettings,
headerConfig = headerConfig,
@@ -2,7 +2,7 @@ package blended.streams.jms
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.{Flow, RestartSource, Source}
import akka.stream.{KillSwitch, Materializer}
import blended.jms.utils.{IdAwareConnectionFactory, JmsDestination}
import blended.streams.StreamFactories
@@ -54,7 +54,7 @@ trait JmsStreamSupport {
StreamFactories.runSourceWithTimeLimit(
dest.asString,
RestartableJmsSource(
restartableConsumer(
name = dest.asString,
headerConfig = headerCfg,
log = log,
@@ -82,4 +82,35 @@ trait JmsStreamSupport {
f
}
}
def jmsConsumer(
name : String,
settings : JMSConsumerSettings,
headerConfig: FlowHeaderConfig,
log: Logger
)(implicit system: ActorSystem): Source[FlowEnvelope, NotUsed] = {
if (settings.acknowledgeMode == AcknowledgeMode.ClientAcknowledge) {
Source.fromGraph(new JmsAckSourceStage(name, settings, headerConfig, log))
} else {
Source.fromGraph(new JmsSourceStage(name, settings, headerConfig, log))
}
}
def restartableConsumer(
name : String,
settings : JMSConsumerSettings,
headerConfig: FlowHeaderConfig,
log: Logger
)(implicit system: ActorSystem) : Source[FlowEnvelope, NotUsed] = {
val innerSource : Source[FlowEnvelope, NotUsed] = jmsConsumer(name, settings, headerConfig, log)
RestartSource.onFailuresWithBackoff(
minBackoff = 2.seconds,
maxBackoff = 10.seconds,
randomFactor = 0.2,
maxRestarts = 10,
) { () => innerSource }
}
}

This file was deleted.

Oops, something went wrong.
@@ -44,6 +44,16 @@ final case class FlowEnvelope private[message] (
flowContext : Map[String, Any] = Map.empty,
) {
override def toString: String = {
val err : String = exception.map(t => s"[exception=${t.getMessage}]").getOrElse("")
val ctxtKeys : String = flowContext match {
case e if e.isEmpty => ""
case m => s"[context keys=${m.keys.mkString(",")}]"
}
s"FlowEnvelope[$id][$requiresAcknowledge][$flowMessage]$ctxtKeys$err"
}
def header[T](key: String)(implicit m: Manifest[T]): Option[T] = flowMessage.header[T](key)
def headerWithDefault[T](key: String, default : T)(implicit m: Manifest[T]) : T = flowMessage.headerWithDefault[T](key, default)
@@ -57,16 +67,16 @@ final case class FlowEnvelope private[message] (
def removeHeader(keys: String*) : FlowEnvelope = copy(flowMessage.removeHeader(keys:_*))
def removeFromContext(key: String) : FlowEnvelope = copy(flowContext = flowContext.filter(_ != key))
def withContextObject(key: String, o: Any) : FlowEnvelope = copy(flowContext = flowContext.filter(_ != key) + (key -> o))
def removeFromContext(key: String) : FlowEnvelope = copy(flowContext = flowContext.filterKeys(_ != key))
def withContextObject(key: String, o: Any) : FlowEnvelope = copy(flowContext = flowContext.filterKeys(_ != key) + (key -> o))
def getFromContext[T](key: String) : Try[Option[T]] = Try { flowContext.get(key).map(_.asInstanceOf[T]) }
def clearException(): FlowEnvelope = copy(exception = None)
def withException(t: Throwable): FlowEnvelope = copy(exception = Some(t))
def withRequiresAcknowledge(b: Boolean): FlowEnvelope = copy(requiresAcknowledge = b)
def withAckHandler(handler : Option[AcknowledgeHandler]) = copy(ackHandler = handler)
def withAckHandler(handler : Option[AcknowledgeHandler]): FlowEnvelope = copy(ackHandler = handler)
// For the default we simply do nothing when a downstream consumer calls acknowledge
def acknowledge(): Unit = ackHandler.foreach(h => h.acknowledge(this))
@@ -171,7 +171,6 @@ sealed abstract class FlowMessage(msgHeader: FlowMessageProps) {
case class BaseFlowMessage(override val header: FlowMessageProps) extends FlowMessage(header) {
override def body(): Any = NotUsed
override def bodySize(): Int = 0
override def withHeader(key: String, value: Any, overwrite: Boolean = true): Try[FlowMessage] = Try {
@@ -61,6 +61,8 @@ class FlowTransactionStream(
case Success(t) =>
if (t.state == FlowTransactionState.Started || t.terminated) {
log.info(t.toString())
} else {
log.debug(t.toString())
}
Success(FlowTransaction.transaction2envelope(cfg)(t))
case Failure(t) =>
Oops, something went wrong.

0 comments on commit 7ed4077

Please sign in to comment.