Skip to content

Commit

Permalink
Merge branch 'release-3.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
atooni committed Jun 6, 2019
2 parents 6769187 + 75608b1 commit aebcc4e
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 26 deletions.
Expand Up @@ -11,7 +11,7 @@ import blended.akka.internal.BlendedAkkaActivator
import blended.container.context.api.ContainerIdentifierService
import blended.jms.utils.{IdAwareConnectionFactory, JmsDestination, JmsQueue}
import blended.streams.jms._
import blended.streams.message.{FlowEnvelope, FlowMessage, TextFlowMessage}
import blended.streams.message.{BinaryFlowMessage, FlowEnvelope, FlowMessage, TextFlowMessage}
import blended.streams.processor.Collector
import blended.streams.transaction.{FlowHeaderConfig, FlowTransactionEvent, FlowTransactionStarted, FlowTransactionUpdate}
import blended.testsupport.pojosr.{BlendedPojoRegistry, PojoSrTestHelper, SimplePojoContainerSpec}
Expand Down Expand Up @@ -141,7 +141,25 @@ class InboundBridgeUntrackedSpec extends BridgeSpecSupport {

implicit val timeout : FiniteDuration = 1.second

val msg : TextFlowMessage = TextFlowMessage(null, FlowMessage.noProps)
val msg : FlowMessage = TextFlowMessage(null, FlowMessage.noProps)
val msgs : Seq[FlowEnvelope] = Seq(FlowEnvelope(msg))

val switch : KillSwitch = sendMessages("sampleIn", external)(msgs : _*)

val messages : List[FlowEnvelope] =
consumeMessages(internal, "bridge.data.in.activemq.external")(1.second, system, materializer).get

messages should have size msgs.size

consumeEvents().get should be(empty)

switch.shutdown()
}

"process messages with an empty binary body" in {
implicit val timeout : FiniteDuration = 1.second

val msg : FlowMessage = BinaryFlowMessage(Array.empty[Byte], FlowMessage.noProps)
val msgs : Seq[FlowEnvelope] = Seq(FlowEnvelope(msg))

val switch : KillSwitch = sendMessages("sampleIn", external)(msgs : _*)
Expand Down
Expand Up @@ -90,7 +90,6 @@ trait MgmtReporter extends Actor with PrickleSupport {
super.postStop()
}


private def handleTick(state : MgmtReporterState) : Receive = {
case Tick =>
config.foreach { cfg =>
Expand Down
Expand Up @@ -46,7 +46,7 @@ class OsgiMgmtReporter(cfg : OSGIActorConfig) extends OSGIActor(cfg) with MgmtRe
x
}

override protected val idSvc: ContainerIdentifierService = cfg.idSvc
override protected val idSvc : ContainerIdentifierService = cfg.idSvc
}

object OsgiMgmtReporter {
Expand Down
Expand Up @@ -22,14 +22,14 @@ class MgmtMockClients(config : Config) {
private[this] val rnd = new Random()

private[this] val ctCtxt : ContainerContext = new ContainerContext {
override def getContainerDirectory(): String = "."
override def getContainerConfigDirectory(): String = getContainerDirectory()
override def getContainerLogDirectory(): String = getContainerDirectory()
override def getProfileDirectory(): String = getContainerDirectory()
override def getProfileConfigDirectory(): String = getContainerDirectory()
override def getContainerHostname(): String = "localhost"
override def getContainerCryptoSupport(): ContainerCryptoSupport = BlendedCryptoSupport.initCryptoSupport("pwd.txt")
override def getContainerConfig(): TSConfig = ConfigFactory.empty()
override def getContainerDirectory() : String = "."
override def getContainerConfigDirectory() : String = getContainerDirectory()
override def getContainerLogDirectory() : String = getContainerDirectory()
override def getProfileDirectory() : String = getContainerDirectory()
override def getProfileConfigDirectory() : String = getContainerDirectory()
override def getContainerHostname() : String = "localhost"
override def getContainerCryptoSupport() : ContainerCryptoSupport = BlendedCryptoSupport.initCryptoSupport("pwd.txt")
override def getContainerConfig() : TSConfig = ConfigFactory.empty()
}

implicit val system : ActorSystem = ActorSystem("MgmtMockClients")
Expand All @@ -51,7 +51,7 @@ class MgmtMockClients(config : Config) {
)

val idSvc : ContainerIdentifierService = new ContainerIdentifierServiceImpl(ctCtxt) {
override lazy val uuid: String = ci.containerId
override lazy val uuid : String = ci.containerId
}

system.actorOf(ContainerActor.props(reporterConfig, idSvc), name = "container-" + ci.containerId)
Expand Down
Expand Up @@ -65,9 +65,9 @@ trait ArtifactRepoRoutes {
path(Segment) { repoId =>
getRepoFile(repoId, "")
} ~
path(Segment / Remaining) { (repoId, rest) =>
getRepoFile(repoId, rest)
}
path(Segment / Remaining) { (repoId, rest) =>
getRepoFile(repoId, rest)
}
}

}
Expand Up @@ -24,7 +24,7 @@ class MemoryKeystoreSpec extends LoggingFreeSpec
cnProvider = cnProvider
)

private def newHostCertificate(cn : String, issuedBy : CertificateHolder, days: Int) : CertificateHolder =
private def newHostCertificate(cn : String, issuedBy : CertificateHolder, days : Int) : CertificateHolder =
createHostCertificate(cn, issuedBy, days) match {
case Success(h) => h
case Failure(t) => fail(t)
Expand All @@ -45,7 +45,7 @@ class MemoryKeystoreSpec extends LoggingFreeSpec
val ms : MemoryKeystore = MemoryKeystore(Map.empty)
ms.nextCertificateTimeout() match {
case Success(to) => assert(start <= to.getTime() && System.currentTimeMillis() >= to.getTime())
case Failure(t) => fail(t)
case Failure(t) => fail(t)
}
}

Expand All @@ -58,10 +58,9 @@ class MemoryKeystoreSpec extends LoggingFreeSpec
val ms : MemoryKeystore = MemoryKeystore(Map("root" -> root))
ms.nextCertificateTimeout() match {
case Success(to) => assert(start <= to.getTime() && to.getTime() <= end)
case Failure(t) => fail(t)
case Failure(t) => fail(t)
}


}

"not allow inconsistent updates" in {
Expand Down
Expand Up @@ -93,7 +93,7 @@ case class DispatcherFanout(
}

newEnv = if (oh.clearBody) {
newEnv.copy(flowMessage = BaseFlowMessage(newEnv.flowMessage.header))
newEnv.copy(flowMessage = newEnv.flowMessage.clearBody())
} else {
newEnv
}
Expand Down
Expand Up @@ -51,6 +51,7 @@ trait JmsEnvelopeHeader {
val deliveryModeHeader : String => String = s => jmsHeaderPrefix(s) + "DeliveryMode"
val replyToHeader : String => String = s => jmsHeaderPrefix(s) + "ReplyTo"
val timestampHeader : String => String = s => jmsHeaderPrefix(s) + "Timestamp"
val typeHeader : String => String = s => jmsHeaderPrefix(s) + "Type"

val replyToQueueName : String = "replyTo"
}
Expand Down Expand Up @@ -90,7 +91,8 @@ object JmsFlowSupport extends JmsEnvelopeHeader {
srcDestHeader(prefix) -> dest,
priorityHeader(prefix) -> msg.getJMSPriority(),
deliveryModeHeader(prefix) -> delMode,
timestampHeader(prefix) -> msg.getJMSTimestamp()
timestampHeader(prefix) -> msg.getJMSTimestamp(),
typeHeader(prefix) -> msg.getJMSType()
).get

val expireHeaderMap : Map[String, MsgProperty] = msg.getJMSExpiration() match {
Expand Down
Expand Up @@ -86,6 +86,8 @@ import blended.streams.message.FlowMessage.FlowMessageProps
sealed abstract class FlowMessage(msgHeader : FlowMessageProps) {

def body() : Any
def clearBody() : FlowMessage = this

def header : FlowMessageProps = msgHeader

def bodySize() : Int
Expand Down Expand Up @@ -175,7 +177,7 @@ sealed abstract class FlowMessage(msgHeader : FlowMessageProps) {

def withHeader(key : String, value : Any, overwrite : Boolean = true) : Try[FlowMessage]

protected def doRemoveHeader(keys : String*) : FlowMessageProps = header.filter(k => !keys.contains(k))
protected def doRemoveHeader(keys : String*) : FlowMessageProps = header.filterKeys(k => !keys.contains(k))

protected def newHeader(key : String, value : Any, overwrite : Boolean) : Try[FlowMessageProps] = Try {
if (overwrite) {
Expand Down Expand Up @@ -223,13 +225,17 @@ case class BinaryFlowMessage(content : ByteString, override val header : FlowMes
}

override def removeHeader(keys : String*) : FlowMessage = copy(header = doRemoveHeader(keys : _*))

override def clearBody() : FlowMessage = BinaryFlowMessage(Array.empty[Byte], header)
}

case class TextFlowMessage(content : String, override val header : FlowMessageProps) extends FlowMessage(header) {

private val textContent : Option[String] = Option(content)

override def body() : Any = textContent.getOrElse(null)
// scalastyle:off null
override def body() : Any = textContent.orNull
// scalastyle:on null

def getText() : String = textContent.getOrElse("")

Expand All @@ -240,4 +246,8 @@ case class TextFlowMessage(content : String, override val header : FlowMessagePr
}

override def removeHeader(keys : String*) : FlowMessage = copy(header = doRemoveHeader(keys : _*))

// scalastyle:off null
override def clearBody() : FlowMessage = TextFlowMessage("", header)
// scalastyle:on null
}
Expand Up @@ -19,7 +19,7 @@ object Artifact extends ((String, Option[String], Option[String]) => Artifact) {
// scalastyle:off null
fileName : String = null,
sha1Sum : String = null
// scalastyle:on null
// scalastyle:on null
) : Artifact = {
Artifact(
url = url,
Expand Down
Expand Up @@ -16,7 +16,7 @@ class ArtifactDownloader(mvnRepositories : List[String])

private[this] val log = Logger[ArtifactDownloader]

private def fileIssue(file : File, artifact: Artifact) : Option[String] = {
private def fileIssue(file : File, artifact : Artifact) : Option[String] = {
if (!file.exists()) {
Some(s"File does not exist: $file")
} else {
Expand Down

0 comments on commit aebcc4e

Please sign in to comment.