Skip to content

Commit

Permalink
Improved assert condition
Browse files Browse the repository at this point in the history
  • Loading branch information
lefou committed Dec 5, 2018
1 parent 41cda73 commit 1b1e7b6
Showing 1 changed file with 24 additions and 20 deletions.
Expand Up @@ -12,11 +12,12 @@ import akka.util.Timeout
import blended.jms.utils.{BlendedJMSConnectionConfig, JMSSupport}
import javax.jms.{Connection, MessageProducer, Session}
import org.scalatest.FreeSpecLike

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Try

import blended.testsupport.scalatest.LoggingFreeSpecLike

case class PingExecute(
count: Long,
con: Connection,
Expand All @@ -37,41 +38,41 @@ class PingExecutor extends Actor {
context.become(executing(sender()))
}

def executing(requestor: ActorRef) : Receive = {
case Terminated(_) => context.stop(self)
def executing(requestor: ActorRef): Receive = {
case Terminated(_) => context.stop(self)
case m => requestor ! m
}
}

abstract class JMSPingPerformerSpec extends TestKit(ActorSystem("JMSPingPerformer"))
with FreeSpecLike
with LoggingFreeSpecLike
with ImplicitSender
with JMSSupport {

private[this] val counter = new AtomicLong(0)
private[this] implicit val materializer = ActorMaterializer()

val pingQueue : String
val pingTopic : String
val pingQueue: String
val pingTopic: String

val cfg : BlendedJMSConnectionConfig
var con : Option[Connection]
val cfg: BlendedJMSConnectionConfig
var con: Option[Connection]

val bulkCount : Int = 100000
val bulkCount: Int = 100000
val bulkTimeout = Math.max(1, bulkCount / 50000).minutes

private[this] implicit val eCtxt : ExecutionContext = system.dispatchers.lookup("FixedPool")
private[this] implicit val eCtxt: ExecutionContext = system.dispatchers.lookup("FixedPool")

private[this] def execPing(exec : PingExecute)(implicit to: Timeout) : Future[PingResult] = {
private[this] def execPing(exec: PingExecute)(implicit to: Timeout): Future[PingResult] = {
(system.actorOf(Props[PingExecutor]) ? exec).mapTo[PingResult]
}

private[this] val pingSuccess : PartialFunction[Any, Boolean] = {
private[this] val pingSuccess: PartialFunction[Any, Boolean] = {
case PingSuccess(_) => true
case _ => false
}

private[this] val pingFailed : PartialFunction[Any, Boolean] = {
private[this] val pingFailed: PartialFunction[Any, Boolean] = {
case PingFailed(_) => true
case _ => false
}
Expand Down Expand Up @@ -149,15 +150,15 @@ abstract class JMSPingPerformerSpec extends TestKit(ActorSystem("JMSPingPerforme

"does not leak threads on successful pings" in {

val src = Source(1.to(bulkCount)).map { i : Int =>
val src = Source(1.to(bulkCount)).map { i: Int =>
execPing(PingExecute(
count = counter.incrementAndGet(),
con = con.get,
cfg = cfg.copy(clientId = "jmsPing", pingDestination = s"topic:$pingTopic")
))(3.seconds)
}

val result = src.mapAsync(10)(i => i).runFold(true)( (c,i) => c && i.isInstanceOf[PingSuccess])
val result = src.mapAsync(10)(i => i).runFold(true)((c, i) => c && i.isInstanceOf[PingSuccess])

assert(Await.result(result, bulkTimeout))
Thread.sleep(10000)
Expand All @@ -166,7 +167,7 @@ abstract class JMSPingPerformerSpec extends TestKit(ActorSystem("JMSPingPerforme

"does not leak threads on failed ping inits" in {

val src = Source(1.to(bulkCount)).map { i : Int =>
val src = Source(1.to(bulkCount)).map { i: Int =>
execPing(PingExecute(
count = counter.incrementAndGet(),
con = con.get,
Expand All @@ -175,7 +176,7 @@ abstract class JMSPingPerformerSpec extends TestKit(ActorSystem("JMSPingPerforme
))(10.seconds)
}

val result = src.mapAsync(10)(i => i).runFold(true)( (c,i) => c && i == PingTimeout)
val result = src.mapAsync(10)(i => i).runFold(true)((c, i) => c && i == PingTimeout)

assert(Await.result(result, bulkTimeout * 2))
Thread.sleep(10000)
Expand All @@ -184,7 +185,7 @@ abstract class JMSPingPerformerSpec extends TestKit(ActorSystem("JMSPingPerforme

"does not leak threads on failed ping probes" in {

val src = Source(1.to(bulkCount)).map { i : Int =>
val src = Source(1.to(bulkCount)).map { i: Int =>
execPing(PingExecute(
count = counter.incrementAndGet(),
con = con.get,
Expand All @@ -193,9 +194,12 @@ abstract class JMSPingPerformerSpec extends TestKit(ActorSystem("JMSPingPerforme
))(3.seconds)
}

val result = src.mapAsync(10)(i => i).runFold(true)( (c,i) => c && i.isInstanceOf[PingFailed])
val result: Future[(Int, Int)] = src.mapAsync(10)(i => i).runFold((0, 0)) {
case ((otherCount, failedCount), i: PingFailed) => (otherCount, failedCount + 1)
case ((otherCount, failedCount), i) => (otherCount + 1, failedCount)
}

assert(Await.result(result, bulkTimeout))
assert(Await.result(result, bulkTimeout) === (0, bulkCount))
Thread.sleep(10000)
assert(threadCount() <= 100)
}
Expand Down

0 comments on commit 1b1e7b6

Please sign in to comment.