Permalink
Browse files

Refactoring config to use timeunits rather than numbers

  • Loading branch information...
atooni committed Nov 20, 2018
1 parent 01b3d55 commit ab7ff92abd79d1a5802198f2298c45a1a768a245
@@ -6,6 +6,7 @@ import blended.util.config.Implicits._
import com.typesafe.config.Config
import org.apache.activemq.ActiveMQConnectionFactory
import scala.concurrent.duration.FiniteDuration
import scala.util.Try
case class BrokerConfig (
@@ -16,12 +17,12 @@ case class BrokerConfig (
override val pingEnabled : Boolean,
override val pingTolerance : Int,
override val pingDestination : String,
override val pingInterval : Int,
override val pingTimeout : Int,
override val minReconnect : Int,
override val maxReconnectTimeout : Int,
override val pingInterval : FiniteDuration,
override val pingTimeout : FiniteDuration,
override val minReconnect : FiniteDuration,
override val maxReconnectTimeout : Option[FiniteDuration],
override val properties : Map[String, String],
override val retryInterval : Int,
override val retryInterval : FiniteDuration,
brokerName : String,
file : String,
withSsl : Boolean,
@@ -6,6 +6,7 @@ import blended.updater.config.util.ConfigPropertyMapConverter
import blended.util.config.Implicits._
import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
object BlendedJMSConnectionConfig {
@@ -17,11 +18,11 @@ object BlendedJMSConnectionConfig {
jmxEnabled = true,
pingEnabled = true,
pingTolerance = 5,
pingInterval = 30,
pingTimeout = 3000,
retryInterval = 5,
minReconnect = 300,
maxReconnectTimeout = -1,
pingInterval = 30.seconds,
pingTimeout = 3.seconds,
retryInterval = 5.seconds,
minReconnect = 5.minutes,
maxReconnectTimeout = None,
clientId = "$[[" + ContainerIdentifierService.containerId + "]]",
defaultUser = None,
defaultPassword = None,
@@ -39,11 +40,11 @@ object BlendedJMSConnectionConfig {
val jmxEnabled : Config => Boolean = cfg => cfg.getBoolean("jmxEnabled", defaultConfig.jmxEnabled)
val pingEnabled : Config => Boolean = cfg => cfg.getBoolean("pingEnabled", defaultConfig.pingEnabled)
val pingTolerance : Config => Int = cfg => cfg.getInt("pingTolerance", defaultConfig.pingTolerance)
val pingInterval : Config => Int = cfg => cfg.getInt("pingInterval", defaultConfig.pingInterval)
val pingTimeout : Config => Int = cfg => cfg.getInt("pingTimeout", defaultConfig.pingTimeout)
val retryInterval : Config => Int = cfg => cfg.getInt("retryInterval", defaultConfig.retryInterval)
val minReconnect : Config => Int = cfg => cfg.getInt("minReconnect", defaultConfig.minReconnect)
val maxReconnectTimeout : Config => Int = cfg => cfg.getInt("maxReconnectTimeout", defaultConfig.maxReconnectTimeout)
val pingInterval : Config => FiniteDuration = cfg => cfg.getDuration("pingInterval", defaultConfig.pingInterval)
val pingTimeout : Config => FiniteDuration = cfg => cfg.getDuration("pingTimeout", defaultConfig.pingTimeout)
val retryInterval : Config => FiniteDuration = cfg => cfg.getDuration("retryInterval", defaultConfig.retryInterval)
val minReconnect : Config => FiniteDuration = cfg => cfg.getDuration("minReconnect", defaultConfig.minReconnect)
val maxReconnectTimeout : Config => Option[FiniteDuration] = cfg => cfg.getDurationOption("maxReconnectTimeout")
val defaultUser : Config => Option[String] = cfg => cfg.getStringOption(DEFAULT_USER)
val defaultPasswd : Config => Option[String] = cfg => cfg.getStringOption(DEFAULT_PWD)
@@ -105,11 +106,11 @@ case class BlendedJMSConnectionConfig(
override val jmxEnabled : Boolean,
override val pingEnabled : Boolean,
override val pingTolerance : Int,
override val pingInterval : Int,
override val pingTimeout : Int,
override val retryInterval : Int,
override val minReconnect : Int,
override val maxReconnectTimeout: Int,
override val pingInterval : FiniteDuration,
override val pingTimeout : FiniteDuration,
override val retryInterval : FiniteDuration,
override val minReconnect : FiniteDuration,
override val maxReconnectTimeout: Option[FiniteDuration],
override val clientId : String,
override val defaultUser : Option[String],
override val defaultPassword : Option[String],
@@ -1,5 +1,7 @@
package blended.jms.utils
import scala.concurrent.duration.FiniteDuration
trait ConnectionConfig {
val vendor : String
@@ -8,11 +10,11 @@ trait ConnectionConfig {
val jmxEnabled : Boolean
val pingEnabled : Boolean
val pingTolerance : Int
val pingInterval : Int
val pingTimeout : Int
val retryInterval : Int
val minReconnect : Int
val maxReconnectTimeout: Int
val pingInterval : FiniteDuration
val pingTimeout : FiniteDuration
val retryInterval : FiniteDuration
val minReconnect : FiniteDuration
val maxReconnectTimeout: Option[FiniteDuration]
val clientId : String
val defaultUser : Option[String]
val defaultPassword : Option[String]
@@ -2,13 +2,12 @@ package blended.jms.utils.internal
import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props}
import akka.event.LoggingReceive
import blended.jms.utils.internal.ConnectionState._
import blended.jms.utils.{BlendedJMSConnection, BlendedJMSConnectionConfig, ConnectionConfig, ConnectionException}
import blended.jms.utils.{BlendedJMSConnection, ConnectionConfig, ConnectionException}
import javax.jms.Connection
import scala.concurrent.duration._
@@ -44,10 +43,10 @@ class ConnectionStateManager(config: ConnectionConfig, monitor: ActorRef, holder
// the retry Schedule is the time interval we retry a connection after a failed connect attempt
// usually that is only a fraction of the ping interval (i.e. 5 seconds)
val retrySchedule = config.retryInterval.seconds
val retrySchedule : FiniteDuration = config.retryInterval
// The schedule is the interval for the normal connection ping
val schedule = Duration(config.pingInterval, TimeUnit.SECONDS)
val schedule : FiniteDuration = config.pingInterval
// The ping timer is used to schedule ping messages over the underlying connection to check it's
// health
@@ -171,7 +170,7 @@ class ConnectionStateManager(config: ConnectionConfig, monitor: ActorRef, holder
// All good, happily disconnected
case ConnectionClosed =>
conn = None
checkConnection(config.minReconnect.seconds, true)
checkConnection(config.minReconnect, true)
switchState(
disconnected(),
publishEvents(state, s"Connection for provider [$vendor:$provider] successfully closed.")
@@ -247,7 +246,7 @@ class ConnectionStateManager(config: ConnectionConfig, monitor: ActorRef, holder
val remaining : Double = s.lastDisconnect match {
case None => 0
case Some(l) => config.minReconnect.seconds.toMillis - (System.currentTimeMillis() - l.getTime())
case Some(l) => config.minReconnect.toMillis - (System.currentTimeMillis() - l.getTime())
}
// if we were ever disconnected from the JMS provider since the container start we will check
@@ -286,10 +285,12 @@ class ConnectionStateManager(config: ConnectionConfig, monitor: ActorRef, holder
// This only happens if we have configured a maximum reconnect timeout in the config and we ever
// had a connection since this container was last restarted and we haven't started the timer yet
val newState = if (config.maxReconnectTimeout > 0 && state.firstReconnectAttempt.isEmpty && state.lastDisconnect.isDefined) {
val newState = if (config.maxReconnectTimeout.isDefined && state.firstReconnectAttempt.isEmpty && state.lastDisconnect.isDefined) {
events = (s"Starting max reconnect timeout monitor for provider [$vendor:$provider] with [${config.maxReconnectTimeout}]s") :: events
state.copy(firstReconnectAttempt = Some(lastConnectAttempt))
} else state
} else {
state
}
controller ! Connect(lastConnectAttempt, config.clientId)
@@ -307,9 +308,14 @@ class ConnectionStateManager(config: ConnectionConfig, monitor: ActorRef, holder
log.error(e, s"Error connecting to JMS provider [$vendor:$provider].")
if (config.maxReconnectTimeout > 0 && s.firstReconnectAttempt.isDefined) {
if (config.maxReconnectTimeout.isDefined && s.firstReconnectAttempt.isDefined) {
s.firstReconnectAttempt.foreach { t =>
if ((System.currentTimeMillis() - t.getTime()) / 1000l > config.maxReconnectTimeout) {
val restart : Boolean = config.maxReconnectTimeout.exists{ to =>
(System.currentTimeMillis() - t.getTime()).millis > to
}
if (restart) {
val e = new Exception(s"Unable to reconnect to JMS provider [$vendor:$provider] in [${config.maxReconnectTimeout}]s. Restarting container ...")
monitor ! RestartContainer(e)
result = true
@@ -330,7 +336,7 @@ class ConnectionStateManager(config: ConnectionConfig, monitor: ActorRef, holder
pingTimer = None
// Notify the connection controller of the disconnect
controller ! Disconnect(config.minReconnect.seconds)
controller ! Disconnect(config.minReconnect)
switchState(closing(), s.copy(status = CLOSING))
}
@@ -351,7 +357,7 @@ class ConnectionStateManager(config: ConnectionConfig, monitor: ActorRef, holder
private[this] def reconnect(s: ConnectionState) : Unit = {
disconnect(s)
checkConnection((config.minReconnect + 1).seconds)
checkConnection(config.minReconnect + 1.seconds)
}
private[this] def ping(c: Connection) : Unit = {
@@ -5,7 +5,7 @@ import java.util.concurrent.atomic.AtomicLong
import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props}
import akka.pattern.pipe
import blended.jms.utils.{BlendedJMSConnectionConfig, ConnectionConfig, JMSSupport}
import blended.jms.utils.{ConnectionConfig, JMSSupport}
import blended.util.logging.Logger
import javax.jms._
@@ -55,7 +55,7 @@ private[internal] trait PingOperations { this : JMSSupport =>
def initialisePing(con: Connection, config: ConnectionConfig, pingId: String)(implicit eCtxt: ExecutionContext) : Future[PingInfo] = Future {
val timeOutMillis = config.pingTimeout.millis.toMillis
val timeOutMillis = config.pingTimeout.toMillis
var session : Option[Session] = None
var consumer : Option[MessageConsumer] = None
@@ -176,7 +176,7 @@ class JmsPingPerformer(config: ConnectionConfig, con: Connection, operations: Pi
case ExecutePing(pingActor, id) =>
pingId = s"$id-$pingId"
log.info(s"Executing ping [$pingId] for connection factory [${config.vendor}:${config.provider}]")
timer = Some(context.system.scheduler.scheduleOnce(config.pingTimeout.millis, self, Timeout))
timer = Some(context.system.scheduler.scheduleOnce(config.pingTimeout, self, Timeout))
context.become(initializing(pingActor).orElse(timeoutHandler(pingActor)))
operations.initialisePing(con, config, pingId).pipeTo(self)
}
@@ -71,7 +71,7 @@ class ConnectionStateManagerSpec extends TestKit(ActorSystem("ConnectionManger")
val probe = TestProbe()
val props = ConnectionStateManager.props(
cfg.copy(
minReconnect = 3
minReconnect = 3.seconds
),
probe.ref,
holder
@@ -140,7 +140,7 @@ abstract class JMSPingPerformerSpec extends TestKit(ActorSystem("JMSPingPerforme
execPing(PingExecute(
count = counter.incrementAndGet(),
con = con.get,
cfg = cfg.copy(clientId = "jmsPing", pingDestination = s"topic:$pingTopic", pingTimeout = 1),
cfg = cfg.copy(clientId = "jmsPing", pingDestination = s"topic:$pingTopic", pingTimeout = 1.milli),
operations = timingOut
))(3.seconds), 3.seconds
)
@@ -170,7 +170,7 @@ abstract class JMSPingPerformerSpec extends TestKit(ActorSystem("JMSPingPerforme
execPing(PingExecute(
count = counter.incrementAndGet(),
con = con.get,
cfg = cfg.copy(clientId = "jmsPing", pingDestination = s"topic:$pingTopic", pingTimeout = 50),
cfg = cfg.copy(clientId = "jmsPing", pingDestination = s"topic:$pingTopic", pingTimeout = 50.millis),
operations = timingOut
))(10.seconds)
}
@@ -1,9 +1,10 @@
package blended.util.config
import scala.collection.JavaConverters._
import com.typesafe.config.Config
import scala.concurrent.duration._
trait ConfigDefaultGetter extends ConfigAccessor {
implicit class RichDefaultConfig(config: Config) {
@@ -43,6 +44,14 @@ trait ConfigDefaultGetter extends ConfigAccessor {
default
}
def getDuration(key: String, default : FiniteDuration) : FiniteDuration = {
if (config.hasPath(key)) {
config.getDuration(key).toMillis.millis
} else {
default
}
}
def getStringList(key: String, default: List[String]): List[String] =
if (config.hasPath(key)) {
config.getStringList(key).asScala.toList
@@ -1,9 +1,10 @@
package blended.util.config
import scala.collection.JavaConverters.asScalaBufferConverter
import com.typesafe.config.Config
import scala.concurrent.duration._
trait ConfigOptionGetter extends ConfigAccessor {
implicit class RichOptionConfig(config: Config) {
@@ -43,6 +44,13 @@ trait ConfigOptionGetter extends ConfigAccessor {
None
}
def getDurationOption(key : String) : Option[FiniteDuration] =
if(config.hasPath(key)) {
Option(config.getDuration(key).toMillis.millis)
} else {
None
}
def getStringListOption(key: String): Option[List[String]] =
if (config.hasPath(key)) {
Option(config.getStringList(key).asScala.toList)
@@ -1,6 +1,11 @@
#!/bin/bash
git clone https://github.com/woq-blended/akka.git akka
cd akka
git checkout osgi
sbt publishM2 > akka-sbt.out
cd ..
if [ ! -f$HOME/.m2/repository/com/typesafe/akka/akka-actor_2.12/2.5.17.1/akka-actor_2.12-2.5.17.1.jar ]
then
git clone https://github.com/woq-blended/akka.git akka
cd akka
git checkout osgi
sbt publishM2 > akka-sbt.out
cd ..
fi

0 comments on commit ab7ff92

Please sign in to comment.