Permalink
Browse files

Adding DefaultHeader Section in configuration & specs

  • Loading branch information...
atooni committed Oct 14, 2018
1 parent 2b9c087 commit b05584d98f06c6f47b25504f95602f2632556577
Showing with 278 additions and 92 deletions.
  1. +1 −1 ....activemq.brokerstarter/src/main/scala/blended/activemq/brokerstarter/internal/BrokerConfig.scala
  2. +1 −1 blended.akka.http.proxy/src/main/scala/blended/akka/http/proxy/BlendedAkkaHttpProxyActivator.scala
  3. +1 −1 blended.akka/src/main/scala/blended/akka/internal/BlendedAkkaActivator.scala
  4. +2 −0 blended.container.context.api/src/main/scala/blended/container/context/api/ContainerContext.scala
  5. +11 −5 ...ntainer.context.api/src/main/scala/blended/container/context/api/ContainerIdentifierService.scala
  6. +32 −14 ...ontainer.context.api/src/main/scala/blended/container/context/api/ContainerPropertyResolver.scala
  7. +4 −0 ...ded.container.context.api/src/test/scala/blended/container/context/api/PropertyResolverSpec.scala
  8. +12 −6 ....impl/src/main/scala/blended/container/context/impl/internal/ContainerIdentifierServiceImpl.scala
  9. +1 −1 blended.jms.bridge/src/main/scala/blended/jms/bridge/BridgeProviderConfig.scala
  10. +2 −2 blended.jms.bridge/src/main/scala/blended/jms/bridge/internal/BridgeActivator.scala
  11. +1 −1 blended.jms.bridge/src/main/scala/blended/jms/bridge/internal/InboundConfig.scala
  12. +1 −0 blended.jms.bridge/src/test/scala/blended/jms/bridge/internal/BridgeActivatorSpec.scala
  13. +4 −5 blended.jms.utils/src/main/scala/blended/jms/utils/BlendedJMSConnectionConfig.scala
  14. +3 −3 blended.security.ssl/src/main/scala/blended/security/ssl/internal/ConfigCommonNameProvider.scala
  15. +31 −1 blended.streams.dispatcher/src/main/scala/blended/streams/dispatcher/DispatcherBuilder.scala
  16. +7 −0 ...d.streams.dispatcher/src/main/scala/blended/streams/dispatcher/internal/DispatcherActivator.scala
  17. +35 −15 ...eams.dispatcher/src/main/scala/blended/streams/dispatcher/internal/ResourcetypeRouterConfig.scala
  18. +14 −5 ...ed.streams.dispatcher/src/test/scala/blended/streams/dispatcher/internal/DispatcherExecutor.scala
  19. +61 −14 ...ms.dispatcher/src/test/scala/blended/streams/dispatcher/internal/DispatcherGraphBuilderSpec.scala
  20. +1 −1 ....dispatcher/src/test/scala/blended/streams/dispatcher/internal/ResourceTypeRouterConfigSpec.scala
  21. +6 −0 blended.streams/src/main/scala/blended/streams/message/FlowEnvelope.scala
  22. +24 −8 blended.streams/src/main/scala/blended/streams/message/FlowMessage.scala
  23. +4 −2 blended.streams/src/main/scala/blended/streams/processor/HeaderTransformProcessor.scala
  24. +1 −0 blended.streams/src/test/scala/blended/streams/FlowProcessorSpec.scala
  25. +3 −3 blended.streams/src/test/scala/blended/streams/processor/HeaderProcessorSpec.scala
  26. +7 −3 blended.streams/src/test/scala/blended/streams/testapps/FlowMessageSpec.scala
  27. +8 −0 blended.testsupport.pojosr/src/main/scala/blended/testsupport/pojosr/PojoSrTestHelper.scala
@@ -19,7 +19,7 @@ object BrokerConfig {
def create(brokerName : String, idSvc: ContainerIdentifierService, cfg: Config) : Try[BrokerConfig] = Try {
def resolve(value: String) : String = idSvc.resolvePropertyString(value).get
def resolve(value: String) : String = idSvc.resolvePropertyString(value).map(_.toString()).get
val name = resolve(cfg.getString("brokerName", brokerName))
val provider = resolve(cfg.getString("provider", brokerName))
@@ -27,7 +27,7 @@ class BlendedAkkaHttpProxyActivator extends DominoActivator with ActorSystemWatc
// handle each configured proxy endpoint independently
config.get.paths.foreach { proxyTarget =>
// setup proxys route according to config and register it into the service registry
val proxyConfig = proxyTarget.copy(uri = cfg.idSvc.resolvePropertyString(proxyTarget.uri).get)
val proxyConfig = proxyTarget.copy(uri = cfg.idSvc.resolvePropertyString(proxyTarget.uri).map(_.toString()).get)
log.debug(s"About to setup proxy [${proxyConfig}]")
val sslContextFilter = "(type=client)"
@@ -15,7 +15,7 @@ class BlendedAkkaActivator extends DominoActivator {
whenServicePresent[ContainerIdentifierService] { svc =>
val ctConfig = svc.containerContext.getContainerConfig()
log.debug(s"$ctConfig")
log.trace(s"$ctConfig")
try {
val system : ActorSystem = ActorSystem.create("BlendedActorSystem", ctConfig, classOf[ActorSystem].getClassLoader())
@@ -2,6 +2,8 @@ package blended.container.context.api
import com.typesafe.config.Config
import scala.annotation.meta.beanGetter
trait ContainerContext {
def getContainerDirectory() : String
@@ -2,6 +2,8 @@ package blended.container.context.api
import java.util.UUID
import scala.annotation.meta.beanGetter
import scala.beans.BeanProperty
import scala.util.Try
class PropertyResolverException(msg: String) extends Exception(msg)
@@ -14,20 +16,24 @@ class PropertyResolverException(msg: String) extends Exception(msg)
* container meta data.
*/
trait ContainerIdentifierService {
@BeanProperty
lazy val uuid: String = UUID.randomUUID().toString()
@BeanProperty
val properties: Map[String, String]
@BeanProperty
val containerContext: ContainerContext
/**
* Try to resolve the properties inside a given String and return a string with the replaced properties values.
*/
def resolvePropertyString(value : String) : Try[String] = resolvePropertyString(value, Map.empty)
def resolvePropertyString(value : String) : Try[AnyRef] = resolvePropertyString(value, Map.empty)
def resolvePropertyString(value: String, additionalProps: Map[String, String]) : Try[String] =
Try(ContainerPropertyResolver.resolve(this, value, additionalProps))
def resolvePropertyString(value: String, additionalProps: Map[String, Any]) : Try[AnyRef] = Try {
def evaluatePropertyString(value: String, additionalProps: Map[String, Any]) : Try[Any] =
Try(ContainerPropertyResolver.evaluate(this, value, additionalProps))
val r = ContainerPropertyResolver.resolve(this, value, additionalProps)
r
}
}
object ContainerIdentifierService {
@@ -3,9 +3,6 @@ package blended.container.context.api
import blended.util.logging.Logger
import org.springframework.expression.spel.standard.SpelExpressionParser
import org.springframework.expression.spel.support.StandardEvaluationContext
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
object ContainerPropertyResolver {
@@ -14,12 +11,16 @@ object ContainerPropertyResolver {
private[this] val log = Logger[ContainerPropertyResolver.type]
private[this] val startDelim = "$[["
private[this] val endDelim = "]]"
private[this] val resolveStartDelim = "$[["
private[this] val resolveEndDelim = "]]"
private[this] val evalStartDelim = "${{"
private[this] val evalEndDelim = "}}"
private[this] val parser = new SpelExpressionParser()
private[this] def extractRule(line: String) : (String, String, String) = {
private[this] def extractVariableElement(line: String, startDelim: String, endDelim: String) : (String, String, String) = {
line.lastIndexOf(startDelim) match {
case -1 => ("", line, "")
case start => line.indexOf(endDelim, start) match {
@@ -117,23 +118,40 @@ object ContainerPropertyResolver {
result
}
def resolve(idSvc: ContainerIdentifierService, line: String, additionalProps: Map[String, String] = Map.empty) : String = line.indexOf(startDelim) match {
case n if n < 0 => line
case n if n >= 0 =>
val (prefix, rule, suffix) = extractRule(line)
resolve(idSvc, prefix + processRule(idSvc, rule, additionalProps.mapValues(_.toString())) + suffix, additionalProps)
private[api] def resolve(idSvc: ContainerIdentifierService, line: String, additionalProps: Map[String, Any] = Map.empty) : AnyRef = {
log.trace(s"Resolving [$line]")
// First we check if we have replacements in "Blended Style"
line.indexOf(resolveStartDelim) match {
case n if n < 0 =>
// We don't have any
line.indexOf(evalStartDelim) match {
case i if i < 0 => line
case i if i >= 0 =>
val (prefix, eval, suffix) = extractVariableElement(line, evalStartDelim, evalEndDelim)
if (prefix.isEmpty && suffix.isEmpty) {
evaluate(idSvc, eval, additionalProps)
} else {
resolve(idSvc, prefix + evaluate(idSvc, eval, additionalProps.mapValues(_.toString())) + suffix, additionalProps)
}
}
case n if n >= 0 =>
val (prefix, rule, suffix) = extractVariableElement(line, resolveStartDelim, resolveEndDelim)
resolve(idSvc, prefix + processRule(idSvc, rule, additionalProps.mapValues(_.toString())) + suffix, additionalProps)
}
}
def evaluate(
private[api] def evaluate(
idSvc: ContainerIdentifierService, line: String, additionalProps : Map[String, Any] = Map.empty
) : AnyRef = {
val resolved = resolve(idSvc, line, additionalProps.mapValues(_.toString()))
val context = new StandardEvaluationContext()
context.setRootObject(idSvc)
additionalProps.foreach { case (k,v) => context.setVariable(k,v) }
context.setVariable("idSvc", idSvc)
val exp = parser.parseExpression(resolved)
val exp = parser.parseExpression(line)
val result = exp.getValue(context)
result
@@ -3,6 +3,7 @@ package blended.container.context.api
import com.typesafe.config.Config
import org.scalatest.{FreeSpec, Matchers}
import scala.beans.BeanProperty
import scala.util.control.NonFatal
@@ -29,8 +30,11 @@ class PropertyResolverSpec extends FreeSpec
val idSvc : ContainerIdentifierService = new ContainerIdentifierService {
@BeanProperty
override lazy val uuid: String = "id"
@BeanProperty
override val containerContext: ContainerContext = ctCtxt
@BeanProperty
override val properties: Map[String, String] = Map(
"foo" -> "bar",
"bar" -> "test",
@@ -4,17 +4,22 @@ import java.io.File
import java.nio.file.Files
import scala.collection.JavaConverters._
import scala.util.{ Success, Try }
import blended.container.context.api.{ ContainerContext, ContainerIdentifierService }
import scala.util.{Success, Try}
import blended.container.context.api.{ContainerContext, ContainerIdentifierService}
import blended.updater.config.RuntimeConfig
import blended.util.logging.Logger
import com.typesafe.config.{ Config, ConfigFactory, ConfigParseOptions }
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
import scala.beans.BeanProperty
class ContainerIdentifierServiceImpl(override val containerContext: ContainerContext) extends ContainerIdentifierService {
class ContainerIdentifierServiceImpl(
@BeanProperty
override val containerContext: ContainerContext
) extends ContainerIdentifierService {
private[this] val log = Logger[ContainerIdentifierServiceImpl]
@BeanProperty
override lazy val uuid : String = {
val idFile = new File(System.getProperty("blended.home") + "/etc", s"blended.container.context.id")
val lines = Files.readAllLines(idFile.toPath)
@@ -26,6 +31,7 @@ class ContainerIdentifierServiceImpl(override val containerContext: ContainerCon
}
}
@BeanProperty
override val properties : Map[String,String] = {
val mandatoryPropNames : Seq[String] = Option(System.getProperty(RuntimeConfig.Properties.PROFILE_PROPERTY_KEYS)) match {
@@ -50,7 +56,7 @@ class ContainerIdentifierServiceImpl(override val containerContext: ContainerCon
throw new RuntimeException(msg)
}
val resolve : Map[String, Try[String]] = unresolved.map{ case (k,v) => (k, resolvePropertyString(v)) }
val resolve : Map[String, Try[String]] = unresolved.map{ case (k,v) => (k, resolvePropertyString(v).map(_.toString())) }
val resolveErrors = resolve.filter(_._2.isFailure)
@@ -23,7 +23,7 @@ object BridgeProviderConfig {
def create(idSvc: ContainerIdentifierService, cfg: Config) : Try[BridgeProviderConfig] = Try {
def resolve(value: String) : String = idSvc.resolvePropertyString(value).get
def resolve(value: String) : String = idSvc.resolvePropertyString(value).map(_.toString()).get
val errorQueue = resolve(cfg.getString("errorQueue", "blended.error"))
val eventQueue = resolve(cfg.getString("eventQueue", "blended.event"))
@@ -45,9 +45,9 @@ class BridgeActivator extends DominoActivator with ActorSystemWatching {
}.toList
val inboundList : List[InboundConfig ]=
osgiCfg.config.getConfigList("inbound").asScala.map { i =>
osgiCfg.config.getConfigList("inbound", List.empty).map { i =>
InboundConfig.create(osgiCfg.idSvc, i).get
}.toList
}
val queuePrefix = osgiCfg.config.getString("queuePrefix", "blended.bridge")
@@ -11,7 +11,7 @@ object InboundConfig {
def create(idSvc : ContainerIdentifierService, cfg: Config): Try[InboundConfig] = Try {
def resolve(value: String) : String = idSvc.resolvePropertyString(value).get
def resolve(value: String) : String = idSvc.resolvePropertyString(value).map(_.toString()).get
val name = resolve(cfg.getString("name"))
val vendor = resolve(cfg.getString("vendor"))
@@ -21,6 +21,7 @@ import org.scalatest.Matchers
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
import scala.util.{Failure, Success}
import MsgProperty.Implicits._
class BridgeActivatorSpec extends LoggingFreeSpec
@@ -3,10 +3,9 @@ package blended.jms.utils
import blended.container.context.api.ContainerIdentifierService
import blended.jms.utils.ConnectionFactoryActivator.{CF_JNDI_NAME, DEFAULT_PWD, DEFAULT_USER, USE_JNDI}
import blended.updater.config.util.ConfigPropertyMapConverter
import com.typesafe.config.Config
import blended.util.config.Implicits._
import com.typesafe.config.Config
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
object BlendedJMSConnectionConfig {
@@ -36,7 +35,7 @@ object BlendedJMSConnectionConfig {
jmsClassloader = None
)
def fromConfig(stringResolver : String => Try[String])(vendor: String, provider: String, cfg: Config) : BlendedJMSConnectionConfig = {
def fromConfig(stringResolver : String => Try[Any])(vendor: String, provider: String, cfg: Config) : BlendedJMSConnectionConfig = {
val prov = cfg.getString("provider", provider)
val enabled = cfg.getBoolean("enabled", defaultConfig.enabled)
@@ -52,7 +51,7 @@ object BlendedJMSConnectionConfig {
val clientId = if (cfg.hasPath("clientId"))
stringResolver(cfg.getString("clientId")) match {
case Failure(t) => throw t
case Success(id) => id
case Success(id) => id.toString()
}
else
defaultConfig.clientId
@@ -66,7 +65,7 @@ object BlendedJMSConnectionConfig {
.mapValues { v =>
stringResolver(v) match {
case Failure(t) => throw t
case Success(s) => s
case Success(s) => s.toString()
}
}
@@ -9,11 +9,11 @@ import scala.util.Try
class ConfigCommonNameProvider (cfg : Config, idSvc: ContainerIdentifierService) extends CommonNameProvider {
override def commonName(): Try[String] = idSvc.resolvePropertyString(cfg.getString("commonName"))
override def commonName(): Try[String] = idSvc.resolvePropertyString(cfg.getString("commonName")).map(_.toString())
override def alternativeNames(): Try[List[String]] = Try {
cfg.getStringListOption("logicalHostnames").getOrElse(List.empty).map{ s =>
idSvc.resolvePropertyString(s).get
idSvc.resolvePropertyString(s).map(_.toString()).get
}
}
}
}
@@ -1,13 +1,22 @@
package blended.streams.dispatcher
import akka.NotUsed
import akka.stream._
import akka.stream.javadsl.RunnableGraph
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source}
import blended.container.context.api.ContainerIdentifierService
import blended.streams.dispatcher.internal.ResourceTypeRouterConfig
import blended.streams.message.FlowEnvelope
import blended.streams.processor.HeaderTransformProcessor
case class DispatcherBuilder(
idSvc : ContainerIdentifierService,
// The Dispatcher configuration
cfg: ResourceTypeRouterConfig,
// Inbound messages
source : Source[FlowEnvelope, _],
@@ -21,6 +30,25 @@ case class DispatcherBuilder(
errorOut : Sink[FlowEnvelope, _]
) {
private val defaultHeader : Graph[FlowShape[FlowEnvelope, FlowEnvelope], NotUsed] = {
val noOverwrite = HeaderTransformProcessor(
name = "headerNoOverwrite",
rules = cfg.defaultHeader.filter(!_.overwrite).map(h => (h.name, h.value)),
overwrite = false,
idSvc = Some(idSvc)
).flow
val overwrite = HeaderTransformProcessor(
name = "headerNoOverwrite",
rules = cfg.defaultHeader.filter(_.overwrite).map(h => (h.name, h.value)),
overwrite = true,
idSvc = Some(idSvc)
).flow
Flow.fromGraph(noOverwrite).via(Flow.fromGraph(overwrite))
}
def build() = {
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
@@ -30,11 +58,13 @@ case class DispatcherBuilder(
val event : Inlet[FlowEnvelope] = builder.add(eventOut).in
val error : Inlet[FlowEnvelope] = builder.add(errorOut).in
val header = builder.add(defaultHeader)
val errorSplit = builder.add(Broadcast[FlowEnvelope](2))
val toJms = builder.add(Flow[FlowEnvelope].filter(_.exception.isEmpty))
val toError = builder.add(Flow[FlowEnvelope].filter(_.exception.isDefined))
in ~> errorSplit.in
in ~> header ~> errorSplit.in
Source.empty[FlowEnvelope] ~> event
@@ -0,0 +1,7 @@
package blended.streams.dispatcher.internal
import domino.DominoActivator
class DispatcherActivator extends DominoActivator {
}
Oops, something went wrong.

0 comments on commit b05584d

Please sign in to comment.