Skip to content
This repository has been archived by the owner on May 31, 2024. It is now read-only.

Commit

Permalink
Refactoring JmsStages to allow confirgurable recovery time for sessio…
Browse files Browse the repository at this point in the history
…ns that have been closed

after an exception.
  • Loading branch information
atooni committed Jan 16, 2019
1 parent a415cd6 commit 6fe506f
Show file tree
Hide file tree
Showing 30 changed files with 257 additions and 192 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class RoundtripConnectionVerifier(
.withHeader(replyToHeader(headerConfig.prefix), responseDest.asString).get .withHeader(replyToHeader(headerConfig.prefix), responseDest.asString).get


val pSettings : JmsProducerSettings = JmsProducerSettings( val pSettings : JmsProducerSettings = JmsProducerSettings(
log = log,
connectionFactory = cf, connectionFactory = cf,
jmsDestination = Some(requestDest), jmsDestination = Some(requestDest),
destinationResolver = s => new MessageDestinationResolver(headerConfig, s) destinationResolver = s => new MessageDestinationResolver(headerConfig, s)
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class HttpQueueServiceSpec extends FreeSpec
val env : FlowEnvelope = FlowEnvelope(FlowMessage(msg)(FlowMessage.noProps)) val env : FlowEnvelope = FlowEnvelope(FlowMessage(msg)(FlowMessage.noProps))


val pSettings : JmsProducerSettings = JmsProducerSettings( val pSettings : JmsProducerSettings = JmsProducerSettings(
log = log,
connectionFactory = cf, connectionFactory = cf,
jmsDestination = Some(JmsQueue("Queue1")) jmsDestination = Some(JmsQueue("Queue1"))
) )
Expand All @@ -108,6 +109,7 @@ class HttpQueueServiceSpec extends FreeSpec
val env : FlowEnvelope = FlowEnvelope(FlowMessage(ByteString(msg))(FlowMessage.noProps)) val env : FlowEnvelope = FlowEnvelope(FlowMessage(ByteString(msg))(FlowMessage.noProps))


val pSettings : JmsProducerSettings = JmsProducerSettings( val pSettings : JmsProducerSettings = JmsProducerSettings(
log = log,
connectionFactory = cf, connectionFactory = cf,
jmsDestination = Some(JmsQueue("Queue1")) jmsDestination = Some(JmsQueue("Queue1"))
) )
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class SttpQueueServiceSpec extends SimplePojoContainerSpec
val env : FlowEnvelope = FlowEnvelope(FlowMessage(msg)(FlowMessage.noProps)) val env : FlowEnvelope = FlowEnvelope(FlowMessage(msg)(FlowMessage.noProps))


val pSettings : JmsProducerSettings = JmsProducerSettings( val pSettings : JmsProducerSettings = JmsProducerSettings(
log = log,
connectionFactory = amqCF, connectionFactory = amqCF,
jmsDestination = Some(JmsQueue("Queue1")) jmsDestination = Some(JmsQueue("Queue1"))
) )
Expand All @@ -111,6 +112,7 @@ class SttpQueueServiceSpec extends SimplePojoContainerSpec
val env : FlowEnvelope = FlowEnvelope(FlowMessage(ByteString(msg))(FlowMessage.noProps)) val env : FlowEnvelope = FlowEnvelope(FlowMessage(ByteString(msg))(FlowMessage.noProps))


val pSettings : JmsProducerSettings = JmsProducerSettings( val pSettings : JmsProducerSettings = JmsProducerSettings(
log = log,
connectionFactory = amqCF, connectionFactory = amqCF,
jmsDestination = Some(JmsQueue("Queue1")) jmsDestination = Some(JmsQueue("Queue1"))
) )
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import blended.streams.{FlowProcessor, StreamControllerConfig}
import blended.util.logging.Logger import blended.util.logging.Logger
import com.typesafe.config.Config import com.typesafe.config.Config


import scala.concurrent.duration.FiniteDuration
import scala.util.Try import scala.util.Try


class InvalidBridgeConfigurationException(msg: String) extends Exception(msg) class InvalidBridgeConfigurationException(msg: String) extends Exception(msg)
Expand All @@ -38,7 +39,8 @@ case class JmsStreamConfig(
subscriberName : Option[String], subscriberName : Option[String],
header : List[HeaderProcessorConfig], header : List[HeaderProcessorConfig],
idSvc : Option[ContainerIdentifierService] = None, idSvc : Option[ContainerIdentifierService] = None,
rawConfig : Config rawConfig : Config,
sessionRecreateTimeout : FiniteDuration
) )


class JmsStreamBuilder( class JmsStreamBuilder(
Expand All @@ -49,9 +51,10 @@ class JmsStreamBuilder(
private val inId = s"${cfg.fromCf.vendor}:${cfg.fromCf.provider}:${cfg.fromDest.asString}" private val inId = s"${cfg.fromCf.vendor}:${cfg.fromCf.provider}:${cfg.fromDest.asString}"
private val outId = s"${cfg.toCf.vendor}:${cfg.toCf.provider}:${cfg.toDest.map(_.asString).getOrElse("out")}" private val outId = s"${cfg.toCf.vendor}:${cfg.toCf.provider}:${cfg.toDest.map(_.asString).getOrElse("out")}"
private val streamId = s"${cfg.headerCfg.prefix}.bridge.JmsStream($inId->$outId)" private val streamId = s"${cfg.headerCfg.prefix}.bridge.JmsStream($inId->$outId)"
private val bridgeLogger = Logger(streamId)


// configure the consumer // configure the consumer
private val srcSettings = JMSConsumerSettings(cfg.fromCf) private val srcSettings = JMSConsumerSettings(bridgeLogger, cfg.fromCf)
.withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge) .withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge)
.withDestination(Some(cfg.fromDest)) .withDestination(Some(cfg.fromDest))
.withSessionCount(cfg.listener) .withSessionCount(cfg.listener)
Expand All @@ -68,13 +71,14 @@ class JmsStreamBuilder(
} }


private val toSettings = JmsProducerSettings( private val toSettings = JmsProducerSettings(
log = bridgeLogger,
connectionFactory = cfg.toCf connectionFactory = cfg.toCf
) )
.withDestination(cfg.toDest) .withDestination(cfg.toDest)
.withDestinationResolver(destResolver) .withDestinationResolver(destResolver)
.withDeliveryMode(JmsDeliveryMode.Persistent) .withDeliveryMode(JmsDeliveryMode.Persistent)


private val bridgeLogger = Logger(streamId)


private val internalProvider : Try[BridgeProviderConfig] = cfg.registry.internalProvider private val internalProvider : Try[BridgeProviderConfig] = cfg.registry.internalProvider
private val internalId = (internalProvider.get.vendor, internalProvider.get.provider) private val internalId = (internalProvider.get.vendor, internalProvider.get.provider)
Expand Down Expand Up @@ -136,8 +140,7 @@ class JmsStreamBuilder(
Source.fromGraph(new JmsAckSourceStage( Source.fromGraph(new JmsAckSourceStage(
name = streamId + "-source", name = streamId + "-source",
settings = srcSettings, settings = srcSettings,
headerConfig = cfg.headerCfg, headerConfig = cfg.headerCfg
log = bridgeLogger
)) ))


val jmsSource : Source[FlowEnvelope, NotUsed] = if (cfg.inbound && cfg.header.nonEmpty) { val jmsSource : Source[FlowEnvelope, NotUsed] = if (cfg.inbound && cfg.header.nonEmpty) {
Expand All @@ -159,7 +162,7 @@ class JmsStreamBuilder(


jmsSource jmsSource
.via(Flow.fromGraph(g)) .via(Flow.fromGraph(g))
.via(jmsProducer(name = streamId + "-sink", settings = toSettings, autoAck = true, log = bridgeLogger)) .via(jmsProducer(name = streamId + "-sink", settings = toSettings, autoAck = true))
} }


bridgeLogger.info(s"Starting bridge stream with config [inbound=${cfg.inbound},trackTransaction=${cfg.trackTransaction}]") bridgeLogger.info(s"Starting bridge stream with config [inbound=${cfg.inbound},trackTransaction=${cfg.trackTransaction}]")
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.typesafe.config.Config


import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.util.{Failure, Success} import scala.util.{Failure, Success}
import scala.concurrent.duration._


private[bridge] object BridgeControllerConfig { private[bridge] object BridgeControllerConfig {


Expand Down Expand Up @@ -101,7 +102,8 @@ class BridgeController(ctrlCfg: BridgeControllerConfig)(implicit system : ActorS
subscriberName = in.subscriberName, subscriberName = in.subscriberName,
header = in.header, header = in.header,
idSvc = Some(ctrlCfg.idSvc), idSvc = Some(ctrlCfg.idSvc),
rawConfig = ctrlCfg.rawConfig rawConfig = ctrlCfg.rawConfig,
sessionRecreateTimeout = in.sessionRecreateTimeout
) )


val streamCfg: StreamControllerConfig = new JmsStreamBuilder(inCfg).streamCfg val streamCfg: StreamControllerConfig = new JmsStreamBuilder(inCfg).streamCfg
Expand Down Expand Up @@ -133,7 +135,8 @@ class BridgeController(ctrlCfg: BridgeControllerConfig)(implicit system : ActorS
trackTransaction = TrackTransaction.FromMessage, trackTransaction = TrackTransaction.FromMessage,
subscriberName = None, subscriberName = None,
header = List.empty, header = List.empty,
rawConfig = ctrlCfg.rawConfig rawConfig = ctrlCfg.rawConfig,
sessionRecreateTimeout = 1.second
) )


val streamCfg: StreamControllerConfig = new JmsStreamBuilder(outCfg).streamCfg val streamCfg: StreamControllerConfig = new JmsStreamBuilder(outCfg).streamCfg
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import blended.streams.processor.HeaderProcessorConfig
import blended.util.config.Implicits._ import blended.util.config.Implicits._
import com.typesafe.config.Config import com.typesafe.config.Config


import scala.concurrent.duration._
import scala.util.Try import scala.util.Try


object InboundConfig { object InboundConfig {
Expand Down Expand Up @@ -39,6 +40,8 @@ object InboundConfig {
HeaderProcessorConfig.create(cfg) HeaderProcessorConfig.create(cfg)
} }


val sessionRecreateTimeout : FiniteDuration = cfg.getDuration("sessionRecreateTimeout", 1.second)

InboundConfig( InboundConfig(
name = name, name = name,
vendor = vendor, vendor = vendor,
Expand All @@ -48,7 +51,8 @@ object InboundConfig {
persistent = persistent, persistent = persistent,
subscriberName = subscriberName, subscriberName = subscriberName,
listener = listener, listener = listener,
header = header header = header,
sessionRecreateTimeout = sessionRecreateTimeout
) )
} }
} }
Expand All @@ -62,5 +66,6 @@ case class InboundConfig (
persistent : JmsDeliveryMode, persistent : JmsDeliveryMode,
subscriberName : Option[String], subscriberName : Option[String],
listener : Int, listener : Int,
header : List[HeaderProcessorConfig] header : List[HeaderProcessorConfig],
sessionRecreateTimeout : FiniteDuration
) )
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class BridgeSpec extends SimplePojoContainerSpec
trackTransaction = TrackTransaction.Off, trackTransaction = TrackTransaction.Off,
subscriberName = None, subscriberName = None,
header = List.empty, header = List.empty,
rawConfig = ctrlCfg.rawConfig rawConfig = ctrlCfg.rawConfig,
sessionRecreateTimeout = 1.second
) )


private val streamCfg = new JmsStreamBuilder(cfg).streamCfg private val streamCfg = new JmsStreamBuilder(cfg).streamCfg
Expand All @@ -101,6 +102,7 @@ class BridgeSpec extends SimplePojoContainerSpec
} map { FlowEnvelope.apply } } map { FlowEnvelope.apply }


val pSettings : JmsProducerSettings = JmsProducerSettings( val pSettings : JmsProducerSettings = JmsProducerSettings(
log = log,
connectionFactory = external, connectionFactory = external,
jmsDestination = Some(JmsQueue("sampleIn")) jmsDestination = Some(JmsQueue("sampleIn"))
) )
Expand Down Expand Up @@ -148,6 +150,7 @@ class BridgeSpec extends SimplePojoContainerSpec
).get)) ).get))


val pSettings : JmsProducerSettings = JmsProducerSettings( val pSettings : JmsProducerSettings = JmsProducerSettings(
log = log,
connectionFactory = external, connectionFactory = external,
jmsDestination = Some(JmsQueue("SampleHeaderIn")) jmsDestination = Some(JmsQueue("SampleHeaderIn"))
) )
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -1,16 +1,17 @@
package blended.launcher.jvmrunner package blended.launcher.jvmrunner


import java.io.{ File, FileInputStream } import java.io.{File, FileInputStream}
import java.util.Properties import java.util.Properties


import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.util.Try import scala.util.Try
import scala.util.control.NonFatal import scala.util.control.NonFatal

import blended.launcher.internal.ARM import blended.launcher.internal.ARM
import blended.updater.config.OverlayConfigCompanion import blended.updater.config.OverlayConfigCompanion
import blended.util.logging.Logger import blended.util.logging.Logger


import scala.annotation.tailrec

object JvmLauncher { object JvmLauncher {


private[this] lazy val log = Logger[JvmLauncher.type] private[this] lazy val log = Logger[JvmLauncher.type]
Expand All @@ -28,7 +29,7 @@ object JvmLauncher {
} }


/** /**
* A small Java wrapper rresponsiblefor controlling the actual Container JVM. * A small Java wrapper responsiblefor controlling the actual Container JVM.
*/ */
class JvmLauncher() { class JvmLauncher() {


Expand Down Expand Up @@ -166,7 +167,7 @@ class JvmLauncher() {
} }
} }


case class Config( case class JvmLauncherConfig(
classpath: Seq[File] = Seq(), classpath: Seq[File] = Seq(),
otherArgs: Seq[String] = Seq(), otherArgs: Seq[String] = Seq(),
action: Option[String] = None, action: Option[String] = None,
Expand All @@ -186,7 +187,9 @@ class JvmLauncher() {
override def toString(): String = prettyPrint override def toString(): String = prettyPrint
} }


def parse(args: Seq[String], initialConfig: Config = Config()): Config = { @tailrec
final def parse(args: Seq[String], initialConfig: JvmLauncherConfig = JvmLauncherConfig()): JvmLauncherConfig = {

args match { args match {
case Seq() => case Seq() =>
initialConfig initialConfig
Expand Down Expand Up @@ -214,13 +217,18 @@ class JvmLauncher() {
} }
} }


def checkConfig(config: Config): Try[Config] = Try { private def checkConfig(config: JvmLauncherConfig): Try[JvmLauncherConfig] = Try {
if (config.action.isEmpty) sys.error("Missing arguments for action: start|stop") if (config.action.isEmpty) {
if (config.classpath.isEmpty) Console.err.println("Warning: No classpath given") sys.error("Missing arguments for action: start|stop")
}

if (config.classpath.isEmpty) {
Console.err.println("Warning: No classpath given")
}
config config
} }


def startJava(classpath: Seq[File], private def startJava(classpath: Seq[File],
jvmOpts: Array[String], jvmOpts: Array[String],
arguments: Array[String], arguments: Array[String],
interactive: Boolean = false, interactive: Boolean = false,
Expand All @@ -231,30 +239,20 @@ class JvmLauncher() {
log.debug("About to run Java process") log.debug("About to run Java process")


// lookup java by JAVA_HOME env variable // lookup java by JAVA_HOME env variable
val javaHome = System.getenv("JAVA_HOME") val java = Option(System.getenv("JAVA_HOME")) match {
val java = case Some(javaHome) => s"$javaHome/bin/java"
if (javaHome != null) s"${ case None => "java"
javaHome }
}/bin/java"
else "java"


log.debug("Using java executable: " + java) log.debug("Using java executable: " + java)


val cpArgs = classpath match { val cpArgs = Option(classpath) match {
case null | Seq() => Array[String]() case None | Some(Seq()) => Array[String]()
case cp => Array("-cp", pathAsArg(classpath)) case Some(cp) => Array("-cp", pathAsArg(classpath))
} }
log.debug("Using classpath args: " + cpArgs.mkString(" ")) log.debug("Using classpath args: " + cpArgs.mkString(" "))


// FIXME: Only pass explicitly configured System properties to the inner JVM val command = Array(java) ++ cpArgs ++ jvmOpts ++ arguments
val propArgs = System.getProperties.asScala.map(p => s"-D${
p._1
}=${
p._2
}").toArray[String]
log.debug("Using property args: " + propArgs.mkString(" "))

val command = Array(java) ++ cpArgs ++ jvmOpts ++ propArgs ++ arguments


val pb = new ProcessBuilder(command: _*) val pb = new ProcessBuilder(command: _*)
log.debug("Run command: " + command.mkString(" ")) log.debug("Run command: " + command.mkString(" "))
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class RunningProcess(process: Process, errorsIntoOutput: Boolean, interactive: B
* Starts a new thread which copies an InputStream into an Output stream. Does not close the streams. * Starts a new thread which copies an InputStream into an Output stream. Does not close the streams.
*/ */


def asyncCopy(in: InputStream, out: OutputStream, immediately: Boolean = false): Thread = private def asyncCopy(in: InputStream, out: OutputStream, immediately: Boolean = false): Thread =
new Thread("StreamCopyThread") { new Thread("StreamCopyThread") {
setDaemon(true) setDaemon(true)


Expand All @@ -84,7 +84,7 @@ class RunningProcess(process: Process, errorsIntoOutput: Boolean, interactive: B
/** /**
* Copies an InputStream into an OutputStream. Does not close the streams. * Copies an InputStream into an OutputStream. Does not close the streams.
*/ */
def copy(in: InputStream, out: OutputStream, immediately: Boolean = false): Unit = { private def copy(in: InputStream, out: OutputStream, immediately: Boolean = false): Unit = {
if (immediately) { if (immediately) {
while (true) { while (true) {
if (in.available > 0) { if (in.available > 0) {
Expand All @@ -109,4 +109,4 @@ class RunningProcess(process: Process, errorsIntoOutput: Boolean, interactive: B
} }
} }
} }
} }
22 changes: 15 additions & 7 deletions blended.launcher/src/runner/resources/bin/blended.sh
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -44,15 +44,23 @@ if [ "x$BLENDED_STRICT" != "x" ] ; then
LAUNCHER_OPTS="$LAUNCHER_OPTS --strict" LAUNCHER_OPTS="$LAUNCHER_OPTS --strict"
fi fi


LOGBACK_CONFIG_SETTING="-Dlogback.configurationFile=${BLENDED_HOME}/etc/logback.xml"

# Options for the service daemen JVM (outer) with controls the container JVM # Options for the service daemen JVM (outer) with controls the container JVM
JAVA_OPTS="${JAVA_OPTS} -Xmx24m" JAVA_OPTS="${JAVA_OPTS} -Xmx24m"
JAVA_OPTS="${JAVA_OPTS} -Dlogback.configurationFile=${BLENDED_HOME}/etc/logback.xml" JAVA_OPTS="${JAVA_OPTS} ${LOGBACK_CONFIG_SETTING}"
JAVA_OPTS="${JAVA_OPTS} -Dblended.home=${BLENDED_HOME}" JAVA_OPTS="${JAVA_OPTS} -Dblended.home=${BLENDED_HOME}"
JAVA_OPTS="${JAVA_OPTS} -Dsun.net.client.defaultConnectTimeout=500 -Dsun.net.client.defaultReadTimeout=500"


# Options for the container JVM (inner) started/managed by the service daemon JVM # Options for the container JVM (inner) started/managed by the service daemon JVM
# Use prefix "-jvmOpt=" to mark JVM options for the container JVM # Use prefix "-jvmOpt=" to mark each JVM option to be passed to the container JVM
#CONTAINER_JAVA_OPTS="${CONTAINER_JAVA_OPTS} -jvmOpt=-Xmx1024m"
CONTAINER_JAVA_OPTS="${CONTAINER_JAVA_OPTS} -jvmOpt=-Dsun.net.client.defaultConnectTimeout=500"
CONTAINER_JAVA_OPTS="${CONTAINER_JAVA_OPTS} -jvmOpt=-Dsun.net.client.defaultReadTimeout=500"
CONTAINER_JAVA_OPTS="${CONTAINER_JAVA_OPTS} -jvmOpt=${LOGBACK_CONFIG_SETTING}"
CONTAINER_JAVA_OPTS="${CONTAINER_JAVA_OPTS} -jvmOpt=-Dblended.home=${BLENDED_HOME}"

# Enable this when you need to debug SSL issues
CONTAINER_JAVA_OPTS="${CONTAINER_JAVA_OPTS} -jvmOpt=-Djavax.net.debug=ssl"


if [ -n "$DEBUG_PORT" ] ; then if [ -n "$DEBUG_PORT" ] ; then
if [ -n "$DEBUG_WAIT" ] ; then if [ -n "$DEBUG_WAIT" ] ; then
Expand All @@ -79,11 +87,11 @@ OUTER_CP="${BLENDED_HOME}/lib/*"
# semicolon-separated # semicolon-separated
INNER_CP="\ INNER_CP="\
${BLENDED_HOME}/etc;\ ${BLENDED_HOME}/etc;\
${BLENDED_HOME}/lib/blended.launcher-@blended.launcher.version@.jar;\ ${BLENDED_HOME}/lib/blended.launcher_@scala.binary.version@-@blended.launcher.version@.jar;\
${BLENDED_HOME}/lib/config-@typesafe.config.version@.jar;\ ${BLENDED_HOME}/lib/config-@typesafe.config.version@.jar;\
${BLENDED_HOME}/lib/org.osgi.core-@org.osgi.core.version@.jar;\ ${BLENDED_HOME}/lib/org.osgi.core-@org.osgi.core.version@.jar;\
${BLENDED_HOME}/lib/blended.updater.config-@blended.updater.config.version@.jar;\ ${BLENDED_HOME}/lib/blended.updater.config_@scala.binary.version@-@blended.updater.config.version@.jar;\
${BLENDED_HOME}/lib/blended.util.logging-@blended.util.logging.version@.jar;\ ${BLENDED_HOME}/lib/blended.util.logging_@scala.binary.version@-@blended.util.logging.version@.jar;\
${BLENDED_HOME}/lib/de.tototec.cmdoption-@cmdoption.version@.jar;\ ${BLENDED_HOME}/lib/de.tototec.cmdoption-@cmdoption.version@.jar;\
${BLENDED_HOME}/lib/scala-library-@scala.library.version@.jar;\ ${BLENDED_HOME}/lib/scala-library-@scala.library.version@.jar;\
${BLENDED_HOME}/lib/slf4j-api-@slf4j.version@.jar;\ ${BLENDED_HOME}/lib/slf4j-api-@slf4j.version@.jar;\
Expand Down
Loading

0 comments on commit 6fe506f

Please sign in to comment.