Permalink
Browse files

Refactoring ActiveMQ client connection bundle

Adding support to verify connections before they are exposed to OSGi.
  • Loading branch information...
atooni committed Dec 21, 2018
1 parent 23dc4d5 commit ed263a7544d9320e8ea349dd60ddb9e28552b3a4
Showing with 602 additions and 47 deletions.
  1. +15 −0 blended.activemq.client/src/main/scala/blended/activemq/client/ConnectionVerifier.scala
  2. +64 −11 blended.activemq.client/src/main/scala/blended/activemq/client/internal/AmqClientActivator.scala
  3. +10 −0 ...d.activemq.client/src/main/scala/blended/activemq/client/internal/DefaultConnectionVerifier.scala
  4. +17 −0 ...emq.client/src/main/scala/blended/activemq/client/internal/DefaultVerificationFailedHandler.scala
  5. +102 −0 ...activemq.client/src/main/scala/blended/activemq/client/internal/RoundtripConnectionVerifier.scala
  6. +22 −0 blended.activemq.client/src/test/resources/application.conf
  7. +59 −0 blended.activemq.client/src/test/resources/default/etc/application.conf
  8. +59 −0 blended.activemq.client/src/test/resources/failing/etc/application.conf
  9. +36 −0 ....activemq.client/src/test/scala/blended/activemq/client/internal/DefaultClientActivatorSpec.scala
  10. +64 −0 ....activemq.client/src/test/scala/blended/activemq/client/internal/FailingClientActivatorSpec.scala
  11. +80 −0 ...vemq.client/src/test/scala/blended/activemq/client/internal/RoundtripConnectionVerifierSpec.scala
  12. +1 −2 blended.akka.http.jmsqueue/src/test/scala/blended/akka/http/jmsqueue/internal/AmqBrokerSupport.scala
  13. +22 −9 blended.jms.utils/src/main/scala/blended/jms/utils/BlendedSingleConnectionFactory.scala
  14. +1 −1 ...er/src/main/scala/blended/streams/dispatcher/internal/builder/DispatcherDestinationResolver.scala
  15. +9 −9 blended.streams.dispatcher/src/test/resources/container/etc/application.conf
  16. +1 −1 blended.streams/src/main/scala/blended/streams/jms/JmsAckSourceStage.scala
  17. +16 −6 blended.streams/src/main/scala/blended/streams/jms/JmsDestinationResolver.scala
  18. +7 −4 blended.streams/src/main/scala/blended/streams/jms/JmsStreamSupport.scala
  19. +1 −1 blended.streams/src/test/scala/blended/streams/jms/JmsAckSourceSpec.scala
  20. +4 −1 blended.testsupport.pojosr/src/main/scala/blended/testsupport/pojosr/PojoSrTestHelper.scala
  21. +12 −2 project/BlendedActivemqClient.scala
@@ -0,0 +1,15 @@
package blended.activemq.client

import blended.jms.utils.IdAwareConnectionFactory

import scala.concurrent.{ExecutionContext, Future}

trait ConnectionVerifier {
def verifyConnection(cf : IdAwareConnectionFactory)(implicit eCtxt: ExecutionContext) : Future[Boolean]
}

trait VerificationFailedHandler {
def verificationFailed(cf: IdAwareConnectionFactory) : Unit
}


@@ -1,26 +1,79 @@
package blended.activemq.client.internal

import blended.domino.TypesafeConfigWatching
import akka.actor.ActorSystem
import blended.activemq.client.{ConnectionVerifier, VerificationFailedHandler}
import blended.akka.ActorSystemWatching
import blended.jms.utils._
import blended.util.config.Implicits._
import blended.util.logging.Logger
import com.typesafe.config.Config
import domino.DominoActivator
import domino.logging.Logging
import javax.jms.ConnectionFactory
import org.apache.activemq.ActiveMQConnectionFactory

class AmqClientActivator extends DominoActivator with TypesafeConfigWatching with Logging {
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

class AmqClientActivator extends DominoActivator with ActorSystemWatching with Logging {

private[this] val log = Logger[AmqClientActivator]

whenBundleActive {
whenTypesafeConfigAvailable { (cfg, idSvc) =>

if (!cfg.isEmpty) {
val url = cfg.getString("brokerUrl")
log.info(s"Creating connection factory to broker [$url]")
val cf = new ActiveMQConnectionFactory(url)
cf.providesService[ConnectionFactory](Map("provider" -> "activemq"))
} else {
log.info("No ActiveMQ client configuration, no client started")
whenActorSystemAvailable { osgiCfg =>

implicit val eCtxt : ExecutionContext = osgiCfg.system.dispatcher

// First we register a default verifier
new DefaultConnectionVerifier().providesService[ConnectionVerifier]("name" -> "default")
new DefaultVerificationFailedHandler(osgiCfg.bundleContext).providesService[VerificationFailedHandler]("name" -> "default")

val verifierName = osgiCfg.config.getString("verifier")
val failedHandlerName = osgiCfg.config.getString("failedHandler")
log.info(s"ActiveMQ Client connections using verifier [$verifierName]")
log.info(s"Using verification failed handler [$failedHandlerName]")

whenAdvancedServicePresent[ConnectionVerifier](s"(name=$verifierName)") { verifier =>
whenAdvancedServicePresent[VerificationFailedHandler](s"(name=$failedHandlerName)") { failedHandler =>

val cfg: Config = osgiCfg.config
implicit val system: ActorSystem = osgiCfg.system

// TODO: Include connection verifier
val cfgMap: Map[String, Config] = cfg.getConfigMap("connections", Map.empty)
log.info(s"Starting ActiveMQ client connection(s) : [${cfgMap.values.mkString(",")}]")

cfgMap.foreach { case (key, config) =>
val connectionCfg: ConnectionConfig = BlendedJMSConnectionConfig.fromConfig(
stringResolver = osgiCfg.idSvc.resolvePropertyString
)(
vendor = "activemq",
provider = key,
cfg = config
).copy(
cfClassName = Some(classOf[ActiveMQConnectionFactory].getName())
)

val cf: IdAwareConnectionFactory = new BlendedSingleConnectionFactory(
connectionCfg, Some(osgiCfg.bundleContext)
)

verifier.verifyConnection(cf).onComplete {
case Success(b) => if (b) {
log.info(s"Connection [${cf.vendor}:${cf.provider}] verified and ready to use.")
cf.providesService[ConnectionFactory, IdAwareConnectionFactory](
"vendor" -> connectionCfg.vendor,
"provider" -> connectionCfg.provider
)
} else {
log.warn(s"Failed to verify connection [${cf.vendor}:${cf.provider}]...invoking failed handler")
failedHandler.verificationFailed(cf)
}
case Failure(t) =>
s"Unable to verify connection [${cf.vendor}:${cf.provider}]. This connection will not be active"
}
}
}
}
}
}
@@ -0,0 +1,10 @@
package blended.activemq.client.internal

import blended.activemq.client.ConnectionVerifier
import blended.jms.utils.IdAwareConnectionFactory

import scala.concurrent.{ExecutionContext, Future}

class DefaultConnectionVerifier extends ConnectionVerifier {
override def verifyConnection(cf: IdAwareConnectionFactory)(implicit eCtxt : ExecutionContext): Future[Boolean] = Future { true }
}
@@ -0,0 +1,17 @@
package blended.activemq.client.internal

import blended.activemq.client.VerificationFailedHandler
import blended.jms.utils.IdAwareConnectionFactory
import blended.util.logging.Logger
import org.osgi.framework.BundleContext

class DefaultVerificationFailedHandler(bundleContext : BundleContext) extends VerificationFailedHandler {

private val log : Logger = Logger[DefaultVerificationFailedHandler]

override def verificationFailed(cf: IdAwareConnectionFactory): Unit = {

log.error(s"Verification for connection [${cf.vendor}:${cf.provider}] has failed. Shutting down container ...")
bundleContext.getBundle(0).stop()
}
}
@@ -0,0 +1,102 @@
package blended.activemq.client.internal

import java.util.UUID

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import blended.activemq.client.ConnectionVerifier
import blended.jms.utils.{IdAwareConnectionFactory, JmsDestination}
import blended.streams.jms.{JmsEnvelopeHeader, JmsProducerSettings, JmsStreamSupport, MessageDestinationResolver}
import blended.streams.message.FlowEnvelope
import blended.streams.transaction.FlowHeaderConfig
import blended.util.logging.Logger
import akka.pattern.after
import blended.streams.processor.Collector

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}

class RoundtripConnectionVerifier(
probeMsg : () => FlowEnvelope,
verify : FlowEnvelope => Boolean,
requestDest : JmsDestination,
responseDest : JmsDestination,
headerConfig : FlowHeaderConfig,
retryInterval : FiniteDuration = 1.second,
receiveTimeout : FiniteDuration = 250.millis
)(implicit system : ActorSystem) extends ConnectionVerifier
with JmsStreamSupport
with JmsEnvelopeHeader {

private val log : Logger = Logger[RoundtripConnectionVerifier]
private val verified : Promise[Boolean] = Promise[Boolean]()

override def verifyConnection(cf: IdAwareConnectionFactory)(implicit eCtxt: ExecutionContext): Future[Boolean] = {
probe(cf)
verified.future
}

private def probe(cf: IdAwareConnectionFactory) : Unit = {

implicit val materializer : Materializer = ActorMaterializer()
implicit val eCtxt : ExecutionContext = system.dispatcher

val id : String = UUID.randomUUID().toString()

val probeEnv : FlowEnvelope = probeMsg()
.withHeader(corrIdHeader(headerConfig.prefix), id, true).get
.withHeader(replyToHeader(headerConfig.prefix), responseDest.asString).get

val pSettings : JmsProducerSettings = JmsProducerSettings(
connectionFactory = cf,
jmsDestination = Some(requestDest),
destinationResolver = s => new MessageDestinationResolver(headerConfig, s)
)

sendMessages(pSettings, log, probeEnv) match {
case Success(s) =>
log.info("Request message sent successfully")
s.shutdown()

implicit val to : FiniteDuration = receiveTimeout

val collector : Collector[FlowEnvelope] = receiveMessages(
headerCfg = headerConfig,
cf = cf,
dest = responseDest,
log = log,
listener = 1,
selector = Some(s"JMSCorrelationID='$id'")
)

collector.result.onComplete {
case Success(l) => l match {
case Nil =>
log.debug(s"No response received to verify connection [${cf.vendor}:${cf.provider}]")
scheduleRetry(cf)
case h :: _ =>
log.info(s"Verified client connection [${cf.vendor}:${cf.provider}]")
verified.complete(Success(verify(h)))
}

case Failure(t) =>
log.debug(s"Failed to receive verification response to verify connection [${cf.vendor}:${cf.provider}] : [${t.getMessage()}]")
scheduleRetry(cf)
}

case Failure(t) =>
log.debug(s"Failed to send verification request to verify connection [${cf.vendor}:${cf.provider}] : [${t.getMessage()}]")
scheduleRetry(cf)
}
}

private def scheduleRetry(cf: IdAwareConnectionFactory) : Unit = {

implicit val eCtxt : ExecutionContext = system.dispatcher

after[Unit](1.second, system.scheduler){
Future { probe(cf) }
}
}
}
@@ -0,0 +1,22 @@
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = info

log-config-on-start = off

actor {
debug {
receive = off
lifecycle = on
}
}
}

FixedPool {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 1
}
@@ -0,0 +1,59 @@
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = debug

log-config-on-start = off

actor {
debug {
receive = off
lifecycle = on
}
}
}

FixedPool {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 1
}

blended {
activemq {
// Specify the ActiveMq client connections to a remote broker. Each vlient connection will be exposed
// as a service of type IdAwareConnectionFactory and is subject to the normal keep alive operations
// and connection monitoring.
// Before the service is exposed, the connection verifier is invoked and establishes that the client
// is connected to the correct broker. The default verifier always evaluates to true and allows the
// connection.
// To override this behavior, a connection verifier with a different "name"-property can be registered
// in the container. The "verifier" setting below indicates, which verifier shall be used.
// Likewise, a VerificationFailedHandler can be registered to determine the action that has to be taken
// upon a failed verification. The DefaultVerificationFailedHandler is registered with the name "default"
// and shuts down the container.
// The Connection Factory will only be exposed to OSGi if the verification was successful. Verification
// only happens once when the activator of the AMQ Client bundle is started.
client {
verifier : "default"
failedHandler : "default"

connections {
conn1 {
properties {
brokerURL = "vm://conn1?create=false"
clientId = "foo"
}
}
conn2 {
properties {
brokerURL = "vm://conn2?create=false"
clientId = "bar"
}
}
}
}
}
}
@@ -0,0 +1,59 @@
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = debug

log-config-on-start = off

actor {
debug {
receive = off
lifecycle = on
}
}
}

FixedPool {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 1
}

blended {
activemq {
// Specify the ActiveMq client connections to a remote broker. Each vlient connection will be exposed
// as a service of type IdAwareConnectionFactory and is subject to the normal keep alive operations
// and connection monitoring.
// Before the service is exposed, the connection verifier is invoked and establishes that the client
// is connected to the correct broker. The default verifier always evaluates to true and allows the
// connection.
// To override this behavior, a connection verifier with a different "name"-property can be registered
// in the container. The "verifier" setting below indicates, which verifier shall be used.
// Likewise, a VerificationFailedHandler can be registered to determine the action that has to be taken
// upon a failed verification. The DefaultVerificationFailedHandler is registered with the name "default"
// and shuts down the container.
// The Connection Factory will only be exposed to OSGi if the verification was successful. Verification
// only happens once when the activator of the AMQ Client bundle is started.
client {
verifier : "failing"
failedHandler : "failing"

connections {
conn1 {
properties {
brokerURL = "vm://conn1?create=false"
clientId = "foo"
}
}
conn2 {
properties {
brokerURL = "vm://conn2?create=false"
clientId = "bar"
}
}
}
}
}
}
Oops, something went wrong.

0 comments on commit ed263a7

Please sign in to comment.