Permalink
Browse files

Adding configuration options to StreamControllerConfig

  • Loading branch information...
atooni committed Jan 2, 2019
1 parent c735e96 commit 149820b10aef1a41623f97ba17276de10ddca088
@@ -4,30 +4,24 @@ import java.io.File
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import akka.util.ByteString
import blended.jms.utils.{IdAwareConnectionFactory, JmsDestination}
import blended.streams.jms.{JmsDeliveryMode, JmsProducerSettings, JmsStreamSupport}
import blended.streams.message.{FlowEnvelope, FlowMessage, MsgProperty}
import blended.streams.jms.{JmsProducerSettings, JmsStreamSupport}
import blended.streams.message.{FlowEnvelope, FlowMessage}
import blended.util.logging.Logger

import scala.concurrent.ExecutionContext
import scala.io.Source
import scala.util.{Success, Try}

class JMSFilePollHandler(
cf: IdAwareConnectionFactory,
dest: String,
deliveryMode: Int,
priority: Int,
ttl: Long,
props: Map[String, String]
settings : JmsProducerSettings,
header : FlowMessage.FlowMessageProps
) extends FilePollHandler with JmsStreamSupport {

private val log : Logger = Logger[JMSFilePollHandler]

private def createEnvelope(cmd : FileProcessCmd, file : File) : FlowEnvelope = {

val body : ByteString = ByteString(Source.fromFile(file).mkString)
val header : Map[String, MsgProperty] = props.mapValues(v => MsgProperty(v))

FlowEnvelope(FlowMessage(body)(header))
.withHeader("BlendedFileName", file.getName()).get
@@ -41,19 +35,13 @@ class JMSFilePollHandler(

val env : FlowEnvelope = createEnvelope(cmd, f)

val settings : JmsProducerSettings = JmsProducerSettings(
connectionFactory = cf,
jmsDestination = Some(JmsDestination.create(dest).get),
deliveryMode = JmsDeliveryMode.Persistent
)

sendMessages(
producerSettings = settings,
log = log,
env
) match {
case Success(s) => s.shutdown()
case f => f
case _ => // do nothing as the stream is already closed
}
}
}
@@ -13,8 +13,8 @@ import blended.streams.processor.{HeaderProcessorConfig, HeaderTransformProcesso
import blended.streams.transaction._
import blended.streams.{FlowProcessor, StreamControllerConfig}
import blended.util.logging.Logger
import com.typesafe.config.Config

import scala.concurrent.duration._
import scala.util.Try

class InvalidBridgeConfigurationException(msg: String) extends Exception(msg)
@@ -37,7 +37,8 @@ case class JmsStreamConfig(
headerCfg : FlowHeaderConfig,
subscriberName : Option[String],
header : List[HeaderProcessorConfig],
idSvc : Option[ContainerIdentifierService] = None
idSvc : Option[ContainerIdentifierService] = None,
rawConfig : Config
)

class JmsStreamBuilder(
@@ -165,10 +166,9 @@ class JmsStreamBuilder(
// The stream will be handled by an actor which that can be used to shutdown the stream
// and will restart the stream with a backoff strategy on failure
// TODO: Make restart parameters configurable
val streamCfg : StreamControllerConfig = StreamControllerConfig(
name = streamId,
source = stream,
exponential = false,
maxDelay = 30.seconds
)
val streamCfg : StreamControllerConfig = StreamControllerConfig.fromConfig(cfg.rawConfig).get
.copy(
name = streamId,
source = stream
)
}
@@ -45,7 +45,8 @@ private[bridge] object BridgeControllerConfig {
registry = registry,
headerCfg = headerCfg,
inbound = inboundList,
idSvc = idSvc
idSvc = idSvc,
rawConfig = cfg
)
}
}
@@ -55,7 +56,8 @@ private[bridge] case class BridgeControllerConfig(
registry : BridgeProviderRegistry,
headerCfg : FlowHeaderConfig,
inbound : List[InboundConfig],
idSvc : ContainerIdentifierService
idSvc : ContainerIdentifierService,
rawConfig : Config
)

object BridgeController{
@@ -98,7 +100,8 @@ class BridgeController(ctrlCfg: BridgeControllerConfig)(implicit system : ActorS
trackTransaction = TrackTransaction.On,
subscriberName = in.subscriberName,
header = in.header,
idSvc = Some(ctrlCfg.idSvc)
idSvc = Some(ctrlCfg.idSvc),
rawConfig = ctrlCfg.rawConfig
)

val streamCfg: StreamControllerConfig = new JmsStreamBuilder(inCfg).streamCfg
@@ -129,7 +132,8 @@ class BridgeController(ctrlCfg: BridgeControllerConfig)(implicit system : ActorS
registry = ctrlCfg.registry,
trackTransaction = TrackTransaction.FromMessage,
subscriberName = None,
header = List.empty
header = List.empty,
rawConfig = ctrlCfg.rawConfig
)

val streamCfg: StreamControllerConfig = new JmsStreamBuilder(outCfg).streamCfg
@@ -158,7 +162,7 @@ class BridgeController(ctrlCfg: BridgeControllerConfig)(implicit system : ActorS
log.warn("No internal JMS provider found in config")
}

case RemoveConnectionFactory(cf) => {
case RemoveConnectionFactory(cf) =>
log.info(s"Removing connection factory [${cf.vendor}:${cf.provider}]")

streams.filter{ case (key, _) => key.startsWith(cf.id) }.foreach { case (id, stream) =>
@@ -167,5 +171,4 @@ class BridgeController(ctrlCfg: BridgeControllerConfig)(implicit system : ActorS
streams -= id
}
}
}
}
@@ -73,7 +73,8 @@ class BridgeSpec extends SimplePojoContainerSpec
registry = ctrlCfg.registry,
trackTransaction = TrackTransaction.Off,
subscriberName = None,
header = List.empty
header = List.empty,
rawConfig = ctrlCfg.rawConfig
)

private val streamCfg = new JmsStreamBuilder(cfg).streamCfg
@@ -60,7 +60,8 @@ object ResourceTypeRouterConfig {
applicationLogHeader = logHeader,
defaultHeader = defaultHeader,
resourceTypeConfigs = routes,
providerRegistry = provider
providerRegistry = provider,
rawConfig = cfg
)
}
}
@@ -71,7 +72,8 @@ case class ResourceTypeRouterConfig(
providerRegistry : BridgeProviderRegistry,
applicationLogHeader : List[String],
defaultHeader : List[HeaderProcessorConfig],
resourceTypeConfigs : Map[String, ResourceTypeConfig]
resourceTypeConfigs : Map[String, ResourceTypeConfig],
rawConfig : Config
)

object ResourceTypeConfig {
@@ -167,10 +167,11 @@ class RunnableDispatcher(
val source = bridgeSource(internalProvider, provider, dispLogger).via(dispatcher)

// Prepare and start the dispatcher
val streamCfg = StreamControllerConfig(
name = dispLogger.name,
source = source.via(transactionSend())
)
val streamCfg = StreamControllerConfig.fromConfig(routerCfg.rawConfig).get
.copy(
name = dispLogger.name,
source = source.via(transactionSend())
)

val actor = system.actorOf(StreamController.props(streamCfg = streamCfg))

@@ -69,10 +69,11 @@ class TransactionOutbound(

val src : Source[FlowEnvelope, NotUsed] = jmsSource.get.via(transactionStream)

val streamCfg = StreamControllerConfig(
name = "transactionOut",
source = src
)
val streamCfg = StreamControllerConfig.fromConfig(dispatcherCfg.rawConfig).get
.copy(
name = "transactionOut",
source = src
)

system.actorOf(StreamController.props(streamCfg))
}
@@ -3,21 +3,44 @@ package blended.streams
import akka.NotUsed
import akka.actor.{Actor, ActorSystem, Props}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, KillSwitch, KillSwitches, Materializer}
import akka.stream.{KillSwitch, KillSwitches, Materializer}
import blended.streams.message.FlowEnvelope
import blended.util.logging.Logger
import com.typesafe.config.Config
import blended.util.config.Implicits._

import scala.concurrent.duration.{FiniteDuration, _}
import scala.util.{Failure, Random, Success}
import scala.util.{Failure, Random, Success, Try}

object StreamControllerConfig {

def fromConfig(cfg : Config) : Try[StreamControllerConfig] = Try {
val minDelay : FiniteDuration = cfg.getDuration("minDelay", 5.seconds)
val maxDelay : FiniteDuration = cfg.getDuration("maxDelay", 1.minute)
val exponential : Boolean = cfg.getBoolean("exponential", true)
val random : Double = cfg.getDouble("random", 0.2)
val onFailure : Boolean = cfg.getBoolean("onFailureOnly", true)

StreamControllerConfig(
name = "",
source = Source.empty,
minDelay = minDelay,
maxDelay = maxDelay,
exponential = exponential,
onFailureOnly = onFailure,
random = random
)
}
}

case class StreamControllerConfig(
name : String,
source : Source[FlowEnvelope, NotUsed],
minDelay : FiniteDuration = 5.seconds,
maxDelay : FiniteDuration = 1.minute,
exponential : Boolean = true,
onFailureOnly : Boolean = true,
random : Double = 0.2
minDelay : FiniteDuration,
maxDelay : FiniteDuration,
exponential : Boolean,
onFailureOnly : Boolean,
random : Double
)

object StreamController {
@@ -53,7 +76,11 @@ class StreamController(streamCfg: StreamControllerConfig)(implicit system : Acto
}

var newIntervalMillis : Double =
if (streamCfg.exponential) interval.toMillis * 2 else interval.toMillis + initialInterval.toMillis
if (streamCfg.exponential) {
interval.toMillis * 2
} else {
interval.toMillis + initialInterval.toMillis
}

newIntervalMillis = scala.math.min(
streamCfg.maxDelay.toMillis,
@@ -35,54 +35,54 @@ public static InputStream openFile(final String location, final ClassLoader load
loc = location;
}

LOG.debug("Resolving resource {}", loc);
LOG.trace("Resolving resource {}", loc);

try {
is = new BufferedInputStream(new FileInputStream(location));
} catch (Exception e) {
LOG.debug(e.getMessage());
LOG.trace(e.getMessage());
}

if (is == null) {
try {
LOG.debug("Resolving resource {} as URL", loc);
LOG.trace("Resolving resource {} as URL", loc);
URL url = new URL(location);
is = url.openStream();
} catch (Exception e) {
LOG.debug(e.getMessage());
LOG.trace(e.getMessage());
}
}

if (is == null) {
try {
LOG.debug("Resolving resource {} as File", loc);
LOG.trace("Resolving resource {} as File", loc);
final String path = ResourceResolver.class.getResource(location).getPath();
LOG.debug("Resolved path is {}", path);
LOG.trace("Resolved path is {}", path);
is = new FileInputStream(path);
} catch (Exception e) {
LOG.debug(e.getMessage());
LOG.trace(e.getMessage());
}
}

if (is == null) {
try {
LOG.debug("Resolving resource {} from Classpath", loc);
LOG.trace("Resolving resource {} from Classpath", loc);
is = loader.getResourceAsStream(location);
} catch (Exception e) {
LOG.debug(e.getMessage());
LOG.trace(e.getMessage());
}
}

if (is == null) {
try {
is = loader.getResourceAsStream("/" + location);
} catch (Exception e) {
LOG.debug(e.getMessage());
LOG.trace(e.getMessage());
}
}

if (is == null) {
LOG.debug("Resolving resource {} as ByteStream", loc);
LOG.trace("Resolving resource {} as ByteStream", loc);
is = new ByteArrayInputStream(location.getBytes());
}

@@ -44,6 +44,13 @@ trait ConfigDefaultGetter extends ConfigAccessor {
default
}

def getDouble(key: String, default: Double) : Double =
if (config.hasPath(key)) {
config.getDouble(key)
} else {
default
}

def getDuration(key: String, default : FiniteDuration) : FiniteDuration = {
if (config.hasPath(key)) {
config.getDuration(key).toMillis.millis

0 comments on commit 149820b

Please sign in to comment.