Skip to content

Commit

Permalink
Finalizing 3.0-M2
Browse files Browse the repository at this point in the history
  • Loading branch information
atooni committed Jan 6, 2019
1 parent 3d9dc30 commit 705bd81
Show file tree
Hide file tree
Showing 17 changed files with 98 additions and 37 deletions.
Expand Up @@ -4,6 +4,10 @@ import blended.jms.utils.IdAwareConnectionFactory

import scala.concurrent.{ExecutionContext, Future}

trait ConnectionVerifierFactory {
def createConnectionVerifier() : ConnectionVerifier
}

trait ConnectionVerifier {
def verifyConnection(cf : IdAwareConnectionFactory)(implicit eCtxt: ExecutionContext) : Future[Boolean]
}
Expand Down
@@ -1,17 +1,16 @@
package blended.activemq.client.internal
package blended.activemq.client

import java.util.UUID

import akka.actor.ActorSystem
import akka.pattern.after
import akka.stream.{ActorMaterializer, Materializer}
import blended.activemq.client.ConnectionVerifier
import blended.jms.utils.{IdAwareConnectionFactory, JmsDestination}
import blended.streams.jms.{JmsEnvelopeHeader, JmsProducerSettings, JmsStreamSupport, MessageDestinationResolver}
import blended.streams.message.FlowEnvelope
import blended.streams.processor.Collector
import blended.streams.transaction.FlowHeaderConfig
import blended.util.logging.Logger
import akka.pattern.after
import blended.streams.processor.Collector

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
Expand All @@ -33,7 +32,12 @@ class RoundtripConnectionVerifier(
private val verified : Promise[Boolean] = Promise[Boolean]()

override def verifyConnection(cf: IdAwareConnectionFactory)(implicit eCtxt: ExecutionContext): Future[Boolean] = {
probe(cf)
after[Unit](10.millis, system. scheduler) {
Future {
probe(cf)
}
}

verified.future
}

Expand All @@ -54,6 +58,8 @@ class RoundtripConnectionVerifier(
destinationResolver = s => new MessageDestinationResolver(headerConfig, s)
)

log.info(s"Running verification probe for connection [${cf.vendor}:${cf.provider}]")

sendMessages(pSettings, log, probeEnv) match {
case Success(s) =>
log.info(s"Request message sent successfully to [${requestDest.asString}]")
Expand Down
@@ -1,7 +1,7 @@
package blended.activemq.client.internal

import akka.actor.ActorSystem
import blended.activemq.client.{ConnectionVerifier, VerificationFailedHandler}
import blended.activemq.client.{ConnectionVerifierFactory, VerificationFailedHandler}
import blended.akka.ActorSystemWatching
import blended.jms.utils._
import blended.util.config.Implicits._
Expand All @@ -12,7 +12,7 @@ import domino.logging.Logging
import javax.jms.ConnectionFactory
import org.apache.activemq.ActiveMQConnectionFactory

import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

class AmqClientActivator extends DominoActivator with ActorSystemWatching with Logging {
Expand All @@ -25,15 +25,18 @@ class AmqClientActivator extends DominoActivator with ActorSystemWatching with L
implicit val eCtxt : ExecutionContext = osgiCfg.system.dispatcher

// First we register a default verifier
new DefaultConnectionVerifier().providesService[ConnectionVerifier]("name" -> "default")
new DefaultVerificationFailedHandler(osgiCfg.bundleContext).providesService[VerificationFailedHandler]("name" -> "default")
new DefaultConnectionVerifierFactory()
.providesService[ConnectionVerifierFactory]("name" -> "default")

new DefaultVerificationFailedHandler(osgiCfg.bundleContext)
.providesService[VerificationFailedHandler]("name" -> "default")

val verifierName = osgiCfg.config.getString("verifier")
val failedHandlerName = osgiCfg.config.getString("failedHandler")
log.info(s"ActiveMQ Client connections using verifier [$verifierName]")
log.info(s"Using verification failed handler [$failedHandlerName]")

whenAdvancedServicePresent[ConnectionVerifier](s"(name=$verifierName)") { verifier =>
whenAdvancedServicePresent[ConnectionVerifierFactory](s"(name=$verifierName)") { verifierFactory =>
whenAdvancedServicePresent[VerificationFailedHandler](s"(name=$failedHandlerName)") { failedHandler =>

val cfg: Config = osgiCfg.config
Expand All @@ -58,7 +61,9 @@ class AmqClientActivator extends DominoActivator with ActorSystemWatching with L
connectionCfg, Some(osgiCfg.bundleContext)
)

verifier.verifyConnection(cf).onComplete {
val verified : Future[Boolean] = verifierFactory.createConnectionVerifier().verifyConnection(cf)

verified.onComplete {
case Success(b) => if (b) {
log.info(s"Connection [${cf.vendor}:${cf.provider}] verified and ready to use.")
cf.providesService[ConnectionFactory, IdAwareConnectionFactory](
Expand Down
@@ -1,10 +1,14 @@
package blended.activemq.client.internal

import blended.activemq.client.ConnectionVerifier
import blended.activemq.client.{ConnectionVerifier, ConnectionVerifierFactory}
import blended.jms.utils.IdAwareConnectionFactory

import scala.concurrent.{ExecutionContext, Future}

class DefaultConnectionVerifierFactory extends ConnectionVerifierFactory {
override def createConnectionVerifier(): ConnectionVerifier = new DefaultConnectionVerifier()
}

class DefaultConnectionVerifier extends ConnectionVerifier {
override def verifyConnection(cf: IdAwareConnectionFactory)(implicit eCtxt : ExecutionContext): Future[Boolean] = Future { true }
}
Expand Up @@ -2,7 +2,7 @@ package blended.activemq.client.internal

import java.io.File

import blended.activemq.client.{ConnectionVerifier, VerificationFailedHandler}
import blended.activemq.client.{ConnectionVerifier, ConnectionVerifierFactory, VerificationFailedHandler}
import blended.akka.internal.BlendedAkkaActivator
import blended.container.context.api.ContainerIdentifierService
import blended.jms.utils.IdAwareConnectionFactory
Expand All @@ -27,8 +27,12 @@ class FailingClientActivatorSpec extends SimplePojoContainerSpec

private class FailingActivator extends DominoActivator {

private val failVerifier : ConnectionVerifier = new ConnectionVerifier {
override def verifyConnection(cf: IdAwareConnectionFactory)(implicit eCtxt: ExecutionContext): Future[Boolean] = Future { false }
private val failFactory : ConnectionVerifierFactory = new ConnectionVerifierFactory {

override def createConnectionVerifier(): ConnectionVerifier = new ConnectionVerifier {
override def verifyConnection(cf: IdAwareConnectionFactory)(implicit eCtxt: ExecutionContext): Future[Boolean] = Future { false }
}

}

private val failHandler : VerificationFailedHandler = new VerificationFailedHandler {
Expand All @@ -38,7 +42,7 @@ class FailingClientActivatorSpec extends SimplePojoContainerSpec
}

whenBundleActive {
failVerifier.providesService[ConnectionVerifier]("name" -> "failing")
failFactory.providesService[ConnectionVerifierFactory]("name" -> "failing")
failHandler.providesService[VerificationFailedHandler]("name" -> "failing")
}
}
Expand Down
@@ -1,7 +1,7 @@
package blended.activemq.client.internal

import akka.actor.ActorSystem
import blended.activemq.client.ConnectionVerifier
import blended.activemq.client.{ConnectionVerifier, RoundtripConnectionVerifier}
import blended.jms.utils.{IdAwareConnectionFactory, JmsQueue, SimpleIdAwareConnectionFactory}
import blended.streams.message.{FlowEnvelope, FlowMessage}
import blended.streams.transaction.FlowHeaderConfig
Expand Down Expand Up @@ -76,5 +76,27 @@ class RoundtripConnectionVerifierSpec extends LoggingFreeSpec
val f = verifier.verifyConnection(cf)
assert(!Await.result(f, 5.seconds))
}

"stay unresolve if the connection to the broker did not succeed" in {

val ucf : IdAwareConnectionFactory = SimpleIdAwareConnectionFactory(
vendor = "amq",
provider = "unresolved",
clientId = "spec",
cf = new ActiveMQConnectionFactory("vm://unresolved?create=false")
)

val verifier : ConnectionVerifier = new RoundtripConnectionVerifier(
probeMsg = () => FlowEnvelope(FlowMessage("Hello Broker")(FlowMessage.noProps)),
verify = env => false,
requestDest = JmsQueue("roundtrip"),
responseDest = JmsQueue("roundtrip"),
headerConfig = FlowHeaderConfig(prefix = "App")
)

val f = verifier.verifyConnection(ucf)
Thread.sleep(5.seconds.toMillis)
assert(!f.isCompleted)
}
}
}
4 changes: 2 additions & 2 deletions blended.file/src/main/scala/blended/file/FilePollActor.scala
Expand Up @@ -2,7 +2,7 @@ package blended.file

import java.io.{File, FilenameFilter}

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.actor.{Actor, ActorRef, Props}
import akka.pattern.ask
import akka.util.Timeout
import blended.akka.SemaphoreActor.{Acquire, Acquired, Release, Waiting}
Expand Down Expand Up @@ -61,7 +61,7 @@ class FilePollActor(
}

if (!srcDir.exists() || !srcDir.isDirectory() || !srcDir.canRead()) {
log.info(s"Directory [$srcDir] for [${cfg.id}]does not exist or is not readable.")
log.info(s"Directory [$srcDir] for [${cfg.id}] does not exist or is not readable.")
List.empty
} else if (locked()) {
List.empty
Expand Down
Expand Up @@ -50,6 +50,7 @@ class FileProcessActor extends Actor with ActorLogging {
context.become(cleanUp(requestor, cmd, success = true))

case Failure(e) =>
log.warning(s"Failed to process file [${cmd.f.getAbsolutePath()}] : [${e.getMessage()}]")
context.actorOf(Props[FileManipulationActor]).tell(RenameFile(tempFile, cmd.f), self)
context.become(cleanUp(requestor, cmd, success = false))
}
Expand Down
Expand Up @@ -24,11 +24,11 @@ class JMSFilePollHandler(
val body : ByteString = ByteString(Source.fromFile(file).mkString)

FlowEnvelope(FlowMessage(body)(header))
.withHeader("BlendedFileName", file.getName()).get
.withHeader("BlendedFilePath", file.getAbsolutePath()).get
.withHeader("BlendedFileName", cmd.f.getName()).get
.withHeader("BlendedFilePath", cmd.f.getAbsolutePath()).get
}

override def processFile(cmd: FileProcessCmd, f: File)(implicit system: ActorSystem): Try[Unit] = Try {
override def processFile(cmd: FileProcessCmd, f : File)(implicit system: ActorSystem): Try[Unit] = Try {

implicit val materializer : Materializer = ActorMaterializer()
implicit val eCtxt : ExecutionContext = system.dispatcher
Expand Down
@@ -1,6 +1,5 @@
package blended.file
import java.io.File
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.ActorSystem

Expand All @@ -14,9 +13,9 @@ class FailingFileHandler extends FilePollHandler {

class SucceedingFileHandler extends FilePollHandler {

val count : AtomicInteger = new AtomicInteger(0)
var handled : List[FileProcessCmd] = List.empty

override def processFile(cmd: FileProcessCmd, f: File)(implicit system: ActorSystem): Try[Unit] = Try {
count.incrementAndGet()
handled = cmd :: handled
}
}
8 changes: 5 additions & 3 deletions blended.file/src/test/scala/blended/file/FilePollSpec.scala
Expand Up @@ -56,7 +56,9 @@ class FilePollSpec extends LoggingFreeSpec with Matchers {

probe.expectMsgType[FileProcessed]
srcFile.exists() should be (false)
handler.count.get should be (1)
handler.handled should have size(1)

handler.handled.head.f.getName() should be ("test.txt")
}

"The File Poller should" - {
Expand Down Expand Up @@ -99,8 +101,8 @@ class FilePollSpec extends LoggingFreeSpec with Matchers {

files.forall{ f => (f.getName().endsWith("txt") && !f.exists()) || (!f.getName().endsWith("txt") && f.exists()) } should be (true)

handler.count.get() should be (3)

handler.handled should have size(3)
handler.handled.head.f.getName() should be ("test2.txt")
}

"block the message processing if specified lock file exists (relative)" in TestActorSys { testkit =>
Expand Down
Expand Up @@ -34,7 +34,7 @@ class FileProcessActorSpec extends FreeSpec with Matchers {
probe.expectMsg(FileProcessed(cmd, success = true))
evtProbe.expectMsg(FileProcessed(cmd, success = true))

handler.count.get() should be (1)
handler.handled should have size(1)
srcFile.exists() should be (false)
}

Expand Down Expand Up @@ -75,7 +75,7 @@ class FileProcessActorSpec extends FreeSpec with Matchers {
}
}) should have size (1 + oldArchiveDirSize)

handler.count.get() should be (1)
handler.handled should have size(1)
srcFile.exists() should be (false)
}

Expand Down
Expand Up @@ -6,6 +6,7 @@ import akka.actor.ActorSystem
import akka.pattern.after
import akka.stream.stage.{AsyncCallback, GraphStageLogic}
import blended.jms.utils.{IdAwareConnectionFactory, JmsSession}
import blended.util.logging.Logger
import javax.jms._

import scala.concurrent.{ExecutionContext, Future, TimeoutException}
Expand All @@ -26,6 +27,8 @@ object JmsConnector {

trait JmsConnector[S <: JmsSession] { this: GraphStageLogic =>

private[this] val logger : Logger = Logger[JmsConnector.type]

implicit protected var ec : ExecutionContext = _
implicit protected var system : ActorSystem = _

Expand All @@ -44,6 +47,7 @@ trait JmsConnector[S <: JmsSession] { this: GraphStageLogic =>
}}

protected val fail: AsyncCallback[Throwable] = getAsyncCallback[Throwable]{e =>
logger.warn(s"Failing stage [$id]")
failStage(e)
}

Expand Down
Expand Up @@ -35,7 +35,8 @@ class JmsSinkStage(
) with JmsConnector[JmsProducerSession] {

override private[jms] val handleError = getAsyncCallback[Throwable]{ ex =>
failStage(ex)
log.warn(s"Failing stage [$name]")
failStage(ex)
}

private[this] val rnd = new Random()
Expand Down
Expand Up @@ -2,7 +2,7 @@ package blended.streams.jms

import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}

import akka.NotUsed
import akka.{Done, NotUsed}
import akka.actor.{ActorRef, ActorSystem}
import akka.stream._
import akka.stream.scaladsl.{Flow, Keep, RestartSource, Sink, Source}
Expand Down Expand Up @@ -38,10 +38,11 @@ trait JmsStreamSupport {
val hasException : AtomicBoolean = new AtomicBoolean(false)
val sendCount : AtomicInteger = new AtomicInteger(0)

val ((actor : ActorRef, killswitch : KillSwitch), errEnv: Future[Option[FlowEnvelope]]) =
val (((actor : ActorRef, killswitch : KillSwitch), done: Future[Done]), errEnv: Future[Option[FlowEnvelope]]) =
Source.actorRef[FlowEnvelope](msgs.size, OverflowStrategy.fail)
.viaMat(processFlow)(Keep.left)
.viaMat(KillSwitches.single)(Keep.both)
.watchTermination()(Keep.both)
.via(Flow.fromFunction[FlowEnvelope, FlowEnvelope]{env =>
if (env.exception.isDefined) {
env.exception.foreach { t =>
Expand All @@ -64,9 +65,17 @@ trait JmsStreamSupport {
// any have thrown an exception causing the stream to fail
do {
Thread.sleep(10)

if (hasException.get()) {
Await.result(errEnv, 1.second).flatMap(_.exception).foreach(t => throw t)
}

// if the stream has is finished before sending off all the messages, something went wrong.
// TODO: This looks strange
if (done.isCompleted) {
throw new Exception("Failed to create flow.")
}

} while(!hasException.get && sendCount.get < msgs.size)

killswitch
Expand Down
Expand Up @@ -4,13 +4,13 @@ import java.io._
import java.net.URL
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.security.{DigestInputStream, MessageDigest}
import java.util.{Formatter, Properties}
import java.util.Formatter

import blended.updater.config.util.ConfigPropertyMapConverter
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigValue}

import scala.collection.JavaConverters.{asScalaBufferConverter, asScalaSetConverter, mapAsJavaMapConverter, seqAsJavaListConverter}
import scala.util.{Success, Try}
import scala.collection.JavaConverters.{asScalaBufferConverter, mapAsJavaMapConverter, seqAsJavaListConverter}
import scala.util.Try
import scala.util.control.NonFatal

object RuntimeConfigCompanion {
Expand Down
2 changes: 1 addition & 1 deletion project/BlendedUtil.scala
Expand Up @@ -14,7 +14,7 @@ object BlendedUtil extends ProjectFactory {
Dependencies.logbackCore % "test"
),
adaptBundle = b => b.copy(
exportPackage = Seq(b.bundleSymbolicName, s"${b.bundleSymbolicName}.protocol", s"${b.bundleSymbolicName}.config")
exportPackage = Seq(b.bundleSymbolicName, s"${b.bundleSymbolicName}.config")
)
)

Expand Down

0 comments on commit 705bd81

Please sign in to comment.