Permalink
Browse files

Refactoring Specs

  • Loading branch information...
atooni committed Nov 8, 2018
1 parent 27dfb7c commit 999c6fff736161de331ed9a12bb79747e1521ccd
Showing with 433 additions and 212 deletions.
  1. +79 −81 ...emq.brokerstarter/src/main/scala/blended/activemq/brokerstarter/internal/BrokerControlActor.scala
  2. +1 −0 blended.akka/src/main/scala/blended/akka/internal/BlendedAkkaActivator.scala
  3. +3 −0 blended.jms.bridge/src/main/scala/blended/jms/bridge/BridgeProviderRegistry.scala
  4. +0 −44 blended.jms.bridge/src/test/scala/blended/jms/bridge/internal/AbstractStreamRunner.scala
  5. +16 −3 blended.jms.utils/src/main/scala/blended/jms/utils/BlendedSingleConnectionFactory.scala
  6. +1 −2 blended.jms.utils/src/main/scala/blended/jms/utils/internal/JmsConnectionController.scala
  7. +49 −2 ...d.streams.dispatcher/src/main/scala/blended/streams/dispatcher/internal/DispatcherActivator.scala
  8. +2 −7 ...eams.dispatcher/src/main/scala/blended/streams/dispatcher/internal/ResourceTypeRouterConfig.scala
  9. +2 −1 ...patcher/src/main/scala/blended/streams/dispatcher/internal/builder/DispatcherBuilderSupport.scala
  10. +6 −1 ...eams.dispatcher/src/main/scala/blended/streams/dispatcher/internal/builder/DispatcherFanout.scala
  11. +163 −0 ...ms.dispatcher/src/main/scala/blended/streams/dispatcher/internal/builder/RunnableDispatcher.scala
  12. +15 −15 blended.streams.dispatcher/src/test/resources/container/etc/application.conf
  13. +6 −1 ...reams.dispatcher/src/test/scala/blended/streams/dispatcher/internal/DispatcherActivatorSpec.scala
  14. +1 −4 ...dispatcher/src/test/scala/blended/streams/dispatcher/internal/builder/DispatcherSpecSupport.scala
  15. +2 −0 ...spatcher/src/test/scala/blended/streams/dispatcher/internal/builder/TransactionOutboundSpec.scala
  16. +60 −44 blended.streams/src/main/scala/blended/streams/jms/JmsDestinationResolver.scala
  17. +2 −3 blended.streams/src/main/scala/blended/streams/jms/JmsFlowSupport.scala
  18. +0 −1 blended.streams/src/main/scala/blended/streams/jms/JmsStreamSupport.scala
  19. +2 −0 blended.streams/src/main/scala/blended/streams/message/FlowEnvelope.scala
  20. +8 −3 blended.testsupport.pojosr/src/main/scala/blended/testsupport/pojosr/PojoSrTestHelper.scala
  21. +11 −0 blended.testsupport/src/main/scala/blended/testsupport/RequiresForkedJVM.scala
  22. +2 −0 project/BlendedStreamsDispatcher.scala
  23. +1 −0 project/BlendedTestsupportPojosr.scala
  24. +1 −0 project/ProjectSettings.scala
@@ -1,9 +1,11 @@
package blended.activemq.brokerstarter.internal
import java.lang.management.ManagementFactory
import java.net.URI
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
import akka.actor.{Actor, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.actor.{Actor, PoisonPill, Props}
import blended.akka.OSGIActorConfig
import blended.jms.utils.{BlendedJMSConnectionConfig, BlendedSingleConnectionFactory, IdAwareConnectionFactory}
import blended.util.logging.Logger
@@ -16,7 +18,7 @@ import org.apache.activemq.broker.{BrokerFactory, BrokerService, DefaultBrokerFa
import org.apache.activemq.xbean.XBeanBrokerFactory
import org.osgi.framework.{BundleContext, ServiceRegistration}
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.language.reflectiveCalls
import scala.util.control.NonFatal
@@ -54,34 +56,25 @@ class BrokerControlSupervisor(
log.debug(s"Configuring Broker controller for [$brokerCfg]")
val controlProps = BrokerControlActor.props(brokerCfg, cfg, sslContext)
val restartProps = BackoffSupervisor.props(
Backoff.onStop(
childProps = controlProps,
childName = brokerCfg.brokerName,
minBackoff = 10.seconds,
maxBackoff = 2.minutes,
randomFactor = 0.2,
maxNrOfRetries = -1
).withAutoReset(10.seconds)
.withSupervisorStrategy(
OneForOneStrategy() {
case _ : Throwable => SupervisorStrategy.Restart
}
)
)
context.system.actorOf(restartProps, brokerCfg.brokerName)
val actor = context.system.actorOf(controlProps, brokerCfg.brokerName)
actor ! BrokerControlActor.StartBroker
}
case Stop =>
context.children.foreach(a => context.stop(a))
context.children.foreach { a =>
a ! BrokerControlActor.StopBroker
a ! PoisonPill
}
}
}
object BrokerControlActor {
case object StartBroker
case object StopBroker
case class BrokerStarted(uuid : String)
val debugCnt : AtomicLong = new AtomicLong(0L)
def props(
brokerCfg : BrokerConfig,
@@ -94,47 +87,59 @@ class BrokerControlActor(brokerCfg: BrokerConfig, cfg: OSGIActorConfig, sslCtxt:
extends Actor {
private[this] val log = Logger[BrokerControlActor]
private[this] var broker : Option[BrokerService] = None
private[this] var svcReg : Option[ServiceRegistration[_]] = None
private[this] val uuid = UUID.randomUUID().toString()
override def toString: String = s"BrokerControlActor(${brokerCfg})"
override def preStart(): Unit = self ! BrokerControlActor.StartBroker
// Memorize pending cleanup tasks
private[this] var cleanUp : List[() => Unit] = List.empty
private[this] def startBroker() :
(BrokerService, ServiceRegistration[BlendedSingleConnectionFactory]) = {
private[this] def startBroker() : Unit = {
val oldLoader = Thread.currentThread().getContextClassLoader()
val cfgDir = cfg.idSvc.containerContext.getProfileConfigDirectory()
val uri = s"file://$cfgDir/${brokerCfg.file}"
try {
log.info(s"Starting ActiveMQ broker [${brokerCfg.brokerName}] with config file [$uri] ")
BrokerFactory.setStartDefault(false)
Thread.currentThread().setContextClassLoader(classOf[DefaultBrokerFactory].getClassLoader())
BrokerFactory.setStartDefault(false)
val brokerFactory = new XBeanBrokerFactory()
brokerFactory.setValidate(false)
val broker = brokerFactory.createBroker(new URI(uri))
val b = brokerFactory.createBroker(new URI(uri))
broker = Some(b)
sslCtxt.foreach{ ctxt =>
val amqSslContext = new org.apache.activemq.broker.SslContext()
amqSslContext.setSSLContext(ctxt)
broker.setSslContext(amqSslContext)
b.setSslContext(amqSslContext)
}
// TODO: set Datadirectories from Code ?
broker.setBrokerName(brokerCfg.brokerName)
broker.start()
broker.waitUntilStarted()
log.info(s"ActiveMQ broker [${brokerCfg.brokerName}] started successfully.")
val actor = context.self
// TODO: set Datadirectories from Code ??
val f = Future {
b.setBrokerName(brokerCfg.brokerName)
b.setStartAsync(false)
b.start()
b.waitUntilStarted()
log.info(s"ActiveMQ broker [${brokerCfg.brokerName}] started successfully.")
actor ! BrokerControlActor.BrokerStarted(uuid)
}(context.system.dispatcher)
} catch {
case NonFatal(t) =>
log.warn(t)(s"Error starting ActiveMQ broker [${brokerCfg.brokerName}]")
throw t
} finally {
Thread.currentThread().setContextClassLoader(oldLoader)
}
}
val registrar = new Object with ServiceProviding {
private[this] def registerService(): Unit = {
if (svcReg.isEmpty) {
new Object with ServiceProviding {
override protected def capsuleContext: CapsuleContext = new SimpleDynamicCapsuleContext()
@@ -160,69 +165,62 @@ class BrokerControlActor(brokerCfg: BrokerConfig, cfg: OSGIActorConfig, sslCtxt:
Some(cfg.bundleContext)
)
val svcReg : ServiceRegistration[BlendedSingleConnectionFactory] = cf.providesService[ConnectionFactory, IdAwareConnectionFactory](Map(
svcReg = Some(cf.providesService[ConnectionFactory, IdAwareConnectionFactory](Map(
"vendor" -> brokerCfg.vendor,
"provider" -> brokerCfg.provider,
"brokerName" -> brokerCfg.brokerName
))
)))
}
cleanUp = List(() => stopBroker(broker, registrar.svcReg))
(broker, registrar.svcReg)
} catch {
case NonFatal(t) =>
log.warn(t)(s"Error starting ActiveMQ broker [${brokerCfg.brokerName}]")
throw t
} finally {
Thread.currentThread().setContextClassLoader(oldLoader)
}
}
private[this] def stopBroker(broker: BrokerService, svcReg: ServiceRegistration[BlendedSingleConnectionFactory]) : Unit = {
log.info(s"Stopping ActiveMQ Broker [${brokerCfg.brokerName}]")
try {
broker.stop()
broker.waitUntilStopped()
} catch {
case t : Throwable =>
log.error(t)(s"Error stopping ActiveMQ broker [${brokerCfg.brokerName}]")
} finally {
private[this] def stopBroker() : Unit = {
broker.foreach { b =>
log.info(s"Stopping ActiveMQ Broker [${brokerCfg.brokerName}]")
try {
b.stop()
b.waitUntilStopped()
} catch {
case t : Throwable =>
log.error(t)(s"Error stopping ActiveMQ broker [${brokerCfg.brokerName}]")
} finally {
try {
log.info(s"Removing OSGi service for Activemq Broker [${brokerCfg.brokerName}]")
svcReg.unregister()
svcReg.foreach(_.unregister())
} catch {
case _ : IllegalStateException => // was already unregistered
}
cleanUp = List.empty
}
}
broker = None
svcReg = None
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
log.error(reason)(s"Error starting Active MQ broker [${brokerCfg.brokerName}]")
super.preRestart(reason, message)
}
override def postStop(): Unit = cleanUp.foreach(t => t())
override def receive : Receive = withoutBroker
def withoutBroker : Receive = {
case BrokerControlActor.StartBroker =>
val (broker, reg) = startBroker()
context.become(withBroker(broker, reg))
case BrokerControlActor.StopBroker =>
log.debug("Ignoring stop command for ActiveMQ as Broker is already stopped")
override def postStop(): Unit = broker.foreach{ b =>
b.stop()
b.waitUntilStopped()
}
def withBroker(broker: BrokerService, reg: ServiceRegistration[BlendedSingleConnectionFactory]) : Receive = {
private val jvmId = ManagementFactory.getRuntimeMXBean().getName()
override def receive : Receive = {
case BrokerControlActor.StartBroker =>
log.debug("Ignoring start command for ActiveMQ as Broker is already started")
log.debug(s"Received StartBroker Command for [$brokerCfg] [$jvmId][$uuid-${BrokerControlActor.debugCnt.incrementAndGet()}]")
if (broker.isEmpty) { startBroker() }
case started : BrokerControlActor.BrokerStarted =>
log.debug(s"Received BrokerStarted Event for [$brokerCfg] [$jvmId][$uuid-${BrokerControlActor.debugCnt.incrementAndGet()}]")
if (started.uuid == uuid) {
broker.foreach{ b => registerService() }
}
case BrokerControlActor.StopBroker =>
stopBroker(broker, reg)
context.become(withoutBroker)
log.debug(s"Received StopBroker Command for [$brokerCfg] [$jvmId][$uuid-${BrokerControlActor.debugCnt.incrementAndGet()}]")
stopBroker()
}
}
@@ -25,6 +25,7 @@ class BlendedAkkaActivator extends DominoActivator {
onStop {
// TODO: Should we really wait here ?
log.info("Stopping ActorSystem")
Await.result(system.terminate(), 10.seconds)
}
} catch {
@@ -10,6 +10,8 @@ class BridgeProviderRegistry(
provider : List[BridgeProviderConfig]
) {
val allProvider : List[BridgeProviderConfig] = provider
def internalProvider : Try[BridgeProviderConfig] = Try {
provider.find(_.internal) match {
case None => throw new NoInternalBridgeProviderException
@@ -26,4 +28,5 @@ class BridgeProviderRegistry(
def jmsProvider(v: String, p: String): Option[BridgeProviderConfig] =
provider.find(bp => bp.vendor == v && bp.provider == p)
}

This file was deleted.

Oops, something went wrong.
@@ -1,9 +1,9 @@
package blended.jms.utils
import java.lang.management.ManagementFactory
import javax.jms.{Connection, ConnectionFactory, JMSException}
import javax.management.ObjectName
import javax.jms.{Connection, ConnectionFactory, JMSException}
import javax.management.{MBeanInfo, ObjectName}
import akka.actor.ActorSystem
import akka.util.Timeout
import blended.jms.utils.internal._
@@ -52,7 +52,20 @@ class BlendedSingleConnectionFactory(
val jmxBean = new ConnectionMonitor(vendor, provider, clientId)
val objName = new ObjectName(s"blended:type=ConnectionMonitor,vendor=$vendor,provider=$provider")
jmxServer.registerMBean(jmxBean, objName)
if (jmxServer.isRegistered(objName)) {
try {
jmxServer.unregisterMBean(objName)
} catch {
case t : Throwable =>
}
}
try {
jmxServer.registerMBean(jmxBean, objName)
} catch {
case t : Throwable => log.warn(s"Could not register MBean [${objName.toString}]")
}
Some(jmxBean)
} else {
@@ -12,7 +12,6 @@ import scala.util.control.NonFatal
import scala.util.{Failure, Success}
object JmsConnectionController {
def props(holder: ConnectionHolder) = Props(new JmsConnectionController(holder))
}
@@ -55,4 +54,4 @@ class JmsConnectionController(holder: ConnectionHolder) extends Actor with Actor
}
override def toString: String = "JMSConnectionController(" + holder.toString() + ")"
}
}
Oops, something went wrong.

0 comments on commit 999c6ff

Please sign in to comment.