Permalink
Browse files

Refactoring ConnectionHolder to faciltate easier test set up

  • Loading branch information...
atooni committed Dec 3, 2018
1 parent 5b2fbad commit b36e05d3a32dbe0bebac79798b3dbb00ba215866
Showing with 173 additions and 134 deletions.
  1. +3 −4 ...emq.brokerstarter/src/main/scala/blended/activemq/brokerstarter/internal/BrokerControlActor.scala
  2. +24 −19 blended.jms.utils/src/main/scala/blended/jms/utils/BlendedSingleConnectionFactory.scala
  3. +1 −2 blended.jms.utils/src/main/scala/blended/jms/utils/ConnectionFactoryActivator.scala
  4. +3 −5 blended.jms.utils/src/main/scala/blended/jms/utils/JmsSession.scala
  5. +95 −81 blended.jms.utils/src/main/scala/blended/jms/utils/internal/ConnectionHolder.scala
  6. +1 −1 blended.jms.utils/src/test/scala/blended/jms/utils/internal/CamelShutdownSpec.scala
  7. +8 −3 blended.jms.utils/src/test/scala/blended/jms/utils/internal/DummyConnection.scala
  8. +2 −0 blended.mgmt.ws/src/test/scala/blended/mgmt/ws/internal/MgmtWebSocketSpec.scala
  9. +2 −0 blended.security.login.impl/src/test/scala/blended/security/login/internal/TokenStoreSpec.scala
  10. +10 −7 blended.streams.testsupport/src/main/scala/blended/streams/testsupport/FlowMessageAssertion.scala
  11. +1 −1 ...ded.streams.testsupport/src/test/scala/blended/streams/testsupport/FlowMessageAssertionSpec.scala
  12. +3 −1 blended.streams/src/main/scala/blended/streams/jms/JMSConnector.scala
  13. +1 −1 blended.streams/src/main/scala/blended/streams/jms/JmsFlowSupport.scala
  14. +14 −6 blended.streams/src/main/scala/blended/streams/jms/JmsStreamSupport.scala
  15. +2 −2 blended.streams/src/test/scala/blended/streams/jms/JmsAckSourceSpec.scala
  16. +3 −1 blended.streams/src/test/scala/blended/streams/transaction/FlowTransactionStreamSpec.scala
@@ -148,10 +148,9 @@ class BrokerControlActor(brokerCfg: BrokerConfig, cfg: OSGIActorConfig, sslCtxt:
val jmsCfg = brokerCfg.copy(properties = brokerCfg.properties + ("brokerURL" -> url))
val cf = new BlendedSingleConnectionFactory(
jmsCfg,
cfg.system,
Some(cfg.bundleContext)
)
config = jmsCfg,
bundleContext = Some(cfg.bundleContext)
)(system = cfg.system)
svcReg = Some(cf.providesService[ConnectionFactory, IdAwareConnectionFactory](Map(
"vendor" -> brokerCfg.vendor,
@@ -18,24 +18,10 @@ trait IdAwareConnectionFactory extends ConnectionFactory with ProviderAware {
override def id : String = super.id + s"($clientId)"
}
class SimpleIdAwareConnectionFactory(
override val vendor : String,
override val provider : String,
override val clientId : String,
cf : ConnectionFactory,
) extends IdAwareConnectionFactory {
override def createConnection(): Connection =
cf.createConnection()
override def createConnection(userName: String, password: String): Connection =
cf.createConnection(userName, password)
}
class BlendedSingleConnectionFactory(
config : ConnectionConfig,
system: ActorSystem,
bundleContext : Option[BundleContext]
) extends IdAwareConnectionFactory {
)(implicit system: ActorSystem) extends IdAwareConnectionFactory {
override val vendor : String = config.vendor
override val provider : String = config.provider
@@ -49,10 +35,13 @@ class BlendedSingleConnectionFactory(
override val clientId : String = config.clientId
val holder = BlendedConnectionHolder(
config = config,
system = system
)
protected def createHolder(cfg : ConnectionConfig) : ConnectionHolder = if (config.useJndi) {
new JndiConnectionHolder(cfg)(system)
} else {
new ReflectionConfigHolder(cfg)(system)
}
private val holder = createHolder(config)
private[this] lazy val cfEnabled : Boolean = config.enabled && config.cfEnabled.forall(f => f(config))
@@ -121,3 +110,19 @@ class BlendedSingleConnectionFactory(
createConnection()
}
}
class SimpleIdAwareConnectionFactory(
override val vendor : String,
override val provider : String,
override val clientId : String,
cf : ConnectionFactory,
)(implicit system: ActorSystem) extends BlendedSingleConnectionFactory(
BlendedJMSConnectionConfig.defaultConfig.copy(
vendor = vendor,
provider = provider,
clientId = clientId
), None
) {
override protected def createHolder(cfg: ConnectionConfig) : ConnectionHolder = new FactoryConfigHolder(cfg, cf)
}
@@ -69,9 +69,8 @@ abstract class ConnectionFactoryActivator extends DominoActivator with ActorSyst
ctxtClassName = ctxtClass,
jmsClassloader = factoryClassLoader
),
system = osgiCfg.system,
bundleContext = Some(osgiCfg.bundleContext)
)
)(system = osgiCfg.system)
singleCf.providesService[ConnectionFactory, IdAwareConnectionFactory](
"vendor" -> cfVendor,
@@ -1,9 +1,7 @@
package blended.jms.utils
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.atomic.AtomicBoolean
import blended.util.logging.Logger
import javax.jms._
import scala.concurrent.duration._
@@ -27,10 +25,10 @@ abstract class JmsSession {
}
case class JmsProducerSession(
val connection: Connection,
val session: Session,
connection: Connection,
session: Session,
override val sessionId : String,
val jmsDestination: Option[JmsDestination]
jmsDestination: Option[JmsDestination]
) extends JmsSession
class JmsConsumerSession(
@@ -5,7 +5,7 @@ import java.util
import java.util.concurrent.atomic.AtomicBoolean
import akka.actor.ActorSystem
import blended.jms.utils.{BlendedJMSConnection, ConnectionConfig, ConnectionException}
import blended.jms.utils.{BlendedJMSConnection, BlendedJMSConnectionConfig, ConnectionConfig, ConnectionException}
import blended.util.ReflectionHelper
import blended.util.logging.Logger
import javax.jms.{Connection, ConnectionFactory, ExceptionListener, JMSException}
@@ -14,28 +14,90 @@ import javax.naming.{Context, InitialContext}
import scala.util.Try
import scala.util.control.NonFatal
trait ConnectionHolder {
val vendor : String
val provider : String
def getConnection() : Option[BlendedJMSConnection]
abstract class ConnectionHolder(config : ConnectionConfig)(implicit system: ActorSystem) {
def connect() : Connection
val vendor : String = config.vendor
val provider : String = config.provider
def close() : Try[Unit]
}
private[this] val log = Logger[ConnectionHolder]
private[this] var conn : Option[BlendedJMSConnection] = None
private[this] var connecting : AtomicBoolean = new AtomicBoolean(false)
case class BlendedConnectionHolder(
config : ConnectionConfig,
system : ActorSystem
) extends ConnectionHolder {
def getConnectionFactory() : ConnectionFactory
override val vendor : String = config.vendor
override val provider : String = config.provider
def getConnection() : Option[BlendedJMSConnection] = {
log.trace(s"Underlying connection is established : [${conn.isDefined}]")
conn
}
private[this] val log = Logger[ConnectionHolder]
private[this] var conn : Option[BlendedJMSConnection] = None
@throws[JMSException]
def connect() : Connection = {
conn match {
case Some(c) => c
case None =>
private[this] var connecting : AtomicBoolean = new AtomicBoolean(false)
if (!connecting.getAndSet(true)) {
try {
log.info(s"Creating underlying connection for provider [$vendor:$provider] with client id [${config.clientId}]")
val cf : ConnectionFactory = getConnectionFactory()
val c = config.defaultUser match {
case None => cf.createConnection()
case Some(user) => cf.createConnection(user, config.defaultPassword.getOrElse(null))
}
try {
c.setClientID(config.clientId)
c.setExceptionListener(new ExceptionListener {
override def onException(e: JMSException): Unit = {
log.warn(s"Exception encountered in connection for provider [$vendor:$provider] : ${e.getMessage()}")
system.eventStream.publish(ConnectionException(vendor, provider, e))
}
})
} catch {
case NonFatal(e) =>
log.error(s"Error setting client Id [${config.clientId}]...Closing Connection...")
c.close()
throw e
}
c.start()
log.info(s"Successfully connected to [$vendor:$provider] with clientId [${config.clientId}]")
val wrappedConnection = new BlendedJMSConnection(c)
conn = Some(wrappedConnection)
wrappedConnection
} catch {
case e : JMSException =>
log.warn(s"Error creating connection [$vendor:$provider] : [${e.getMessage()}] ")
throw e
} finally {
connecting.set(false)
}
} else {
throw new JMSException(s"Connection Factory for provider [$provider] is still connecting.")
}
}
}
def close() : Try[Unit] = Try {
log.info(s"Closing underlying connection for provider [$provider]")
conn.foreach { c =>
c.connection.close()
}
conn = None
}
}
class JndiConnectionHolder(
config : ConnectionConfig
)(implicit system: ActorSystem) extends ConnectionHolder(config) {
private[this] val log : Logger = Logger[JndiConnectionHolder]
private[this] val initialContextEnv : util.Hashtable[String, Object] = {
val envMap = new util.Hashtable[String, Object]()
@@ -52,7 +114,7 @@ case class BlendedConnectionHolder(
envMap
}
private[this] def lookupConnectionFactory() : ConnectionFactory = {
override def getConnectionFactory(): ConnectionFactory = {
val oldLoader = Thread.currentThread().getContextClassLoader()
@@ -64,7 +126,7 @@ case class BlendedConnectionHolder(
case (Some(n), Some(c)) => (n,c)
case (_, _) =>
throw new JMSException(s"Context Factory class and JNDI name have to be defined for JNDI lookup [$vendor:$provider].")
}
}
config.jmsClassloader.foreach(Thread.currentThread().setContextClassLoader)
@@ -87,8 +149,15 @@ case class BlendedConnectionHolder(
Thread.currentThread().setContextClassLoader(oldLoader)
}
}
}
private[this] def createConnectionFactory() : ConnectionFactory = {
class ReflectionConfigHolder(
config: ConnectionConfig
)(implicit system: ActorSystem) extends ConnectionHolder(config) {
private[this] val log : Logger = Logger[ReflectionConfigHolder]
override def getConnectionFactory(): ConnectionFactory = {
val oldLoader = Thread.currentThread().getContextClassLoader()
@@ -118,66 +187,11 @@ case class BlendedConnectionHolder(
Thread.currentThread().setContextClassLoader(oldLoader)
}
}
}
def getConnection() : Option[BlendedJMSConnection] = conn
@throws[JMSException]
def connect() : Connection = conn match {
case Some(c) => c
case None =>
if (!connecting.getAndSet(true)) {
try {
log.info(s"Creating underlying connection for provider [$vendor:$provider] with client id [${config.clientId}]")
val cf : ConnectionFactory = if (config.useJndi) lookupConnectionFactory() else createConnectionFactory()
val c = config.defaultUser match {
case None => cf.createConnection()
case Some(user) => cf.createConnection(user, config.defaultPassword.getOrElse(null))
}
try {
c.setClientID(config.clientId)
c.setExceptionListener(new ExceptionListener {
override def onException(e: JMSException): Unit = {
log.warn(s"Exception encountered in connection for provider [$vendor:$provider] : ${e.getMessage()}")
system.eventStream.publish(ConnectionException(vendor, provider, e))
}
})
} catch {
case NonFatal(e) =>
log.error(s"Error setting client Id [${config.clientId}]...Closing Connection...")
c.close()
throw e
}
c.start()
log.info(s"Successfully connected to [$vendor:$provider] with clientId [${config.clientId}]")
val wrappedConnection = new BlendedJMSConnection(c)
conn = Some(wrappedConnection)
wrappedConnection
} catch {
case e : JMSException =>
log.warn(s"Error creating connection [$vendor:$provider] : [${e.getMessage()}] ")
throw e
} finally {
connecting.set(false)
}
} else {
throw new JMSException(s"Connection Factory for provider [$provider] is still connecting.")
}
}
def close() : Try[Unit] = Try {
log.info(s"Closing underlying connection for provider [$provider]")
conn.foreach { c =>
c.connection.close()
}
conn = None
}
class FactoryConfigHolder(
config: ConnectionConfig,
cf : ConnectionFactory
)(implicit system: ActorSystem) extends ConnectionHolder(config) {
override def getConnectionFactory(): ConnectionFactory = cf
}
@@ -40,7 +40,7 @@ class CamelShutdownSpec extends TestKit(ActorSystem("CamelShutdown"))
properties = Map( "brokerURL" -> amqCf().getBrokerURL())
)
val cf = new BlendedSingleConnectionFactory(cfg, system, None)
val cf = new BlendedSingleConnectionFactory(cfg, None)
val exceptionHandler : Exception => Unit = { e =>
@@ -1,6 +1,7 @@
package blended.jms.utils.internal
import blended.jms.utils.BlendedJMSConnection
import akka.actor.ActorSystem
import blended.jms.utils.{BlendedJMSConnection, BlendedJMSConnectionConfig}
import javax.jms._
import scala.util.Try
@@ -33,10 +34,14 @@ class DummyConnection extends Connection {
override def createDurableConnectionConsumer(topic: Topic, s: String, s1: String, serverSessionPool: ServerSessionPool, i: Int): ConnectionConsumer = ???
}
class DummyHolder(f : () => Connection) extends ConnectionHolder {
class DummyHolder(f : () => Connection)(implicit system: ActorSystem)
extends ConnectionHolder(BlendedJMSConnectionConfig.defaultConfig) {
override val vendor: String = "dummy"
override val provider: String = "dummy"
override def getConnectionFactory(): ConnectionFactory = ???
private[this] var conn : Option[BlendedJMSConnection] = None
override def getConnection(): Option[BlendedJMSConnection] = conn
@@ -53,4 +58,4 @@ class DummyHolder(f : () => Connection) extends ConnectionHolder {
conn.foreach{ c => c.connection.close() }
conn = None
}
}
}
@@ -22,6 +22,7 @@ import blended.security.login.impl.LoginActivator
import blended.security.login.rest.internal.RestLoginActivator
import blended.testsupport.BlendedTestSupport
import blended.testsupport.pojosr.{BlendedPojoRegistry, PojoSrTestHelper, SimplePojoContainerSpec}
import blended.testsupport.scalatest.LoggingFreeSpecLike
import blended.updater.config.ContainerInfo
import blended.updater.config.json.PrickleProtocol._
import blended.updater.remote.internal.RemoteUpdaterActivator
@@ -39,6 +40,7 @@ import scala.reflect.ClassTag
import scala.util.Try
class MgmtWebSocketSpec extends SimplePojoContainerSpec
with LoggingFreeSpecLike
with Matchers
with PojoSrTestHelper {
Oops, something went wrong.

0 comments on commit b36e05d

Please sign in to comment.