Skip to content
This repository has been archived by the owner on May 31, 2024. It is now read-only.

Commit

Permalink
Fix more timing issues for unstable tests, add more events to detect …
Browse files Browse the repository at this point in the history
…test is finished (avoid using the extensive timeouts if possible)
  • Loading branch information
atooni committed May 20, 2020
1 parent aea13c3 commit 4a73819
Show file tree
Hide file tree
Showing 26 changed files with 259 additions and 97 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
java-version: ${{ matrix.java-version }}

- name: Run tests
run: ./millw -i -k blended[${{ matrix.scala-version }}].__.testCached
run: ./millw -i -j 0 -k blended[${{ matrix.scala-version }}].__.testCached

- name: Upload test logs
uses: actions/upload-artifact@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import akka.testkit.{TestActorRef, TestProbe}
import blended.itestsupport.condition.{Condition, ConditionActor}
import blended.itestsupport.jolokia.JolokiaAvailableCondition
import blended.testsupport.TestActorSys
import org.scalatest.{Matchers, WordSpec}

import scala.concurrent.duration._
import blended.itestsupport.condition.ConditionActor.CheckCondition
import blended.itestsupport.condition.ConditionActor.ConditionCheckResult
import blended.jolokia.{JolokiaAddress, JolokiaClient}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import blended.testsupport.BlendedTestSupport.freePort

class JolokiaConditionSpec extends WordSpec
class JolokiaConditionSpec extends AnyWordSpec
with Matchers {

"The JolokiaAvailableCondition" should {
Expand All @@ -22,7 +24,7 @@ class JolokiaConditionSpec extends WordSpec

val t = 10.seconds

val client : JolokiaClient = new JolokiaClient(JolokiaAddress("http://localhost:7777/jolokia"))
val client : JolokiaClient = new JolokiaClient(JolokiaAddress(System.getProperty("jolokia.agent")))
val condition = JolokiaAvailableCondition(client, Some(t))

val checker = TestActorRef(ConditionActor.props(cond = condition))
Expand All @@ -37,7 +39,7 @@ class JolokiaConditionSpec extends WordSpec

val t = 5.seconds

val client : JolokiaClient = new JolokiaClient(JolokiaAddress("http://localhost:8888/jolokia"))
val client : JolokiaClient = new JolokiaClient(JolokiaAddress(s"http://localhost:$freePort/jolokia"))
val condition = JolokiaAvailableCondition(client, Some(t))

val checker = TestActorRef(ConditionActor.props(cond = condition))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import blended.itestsupport.condition.ConditionActor.CheckCondition
import blended.itestsupport.condition.ConditionActor.ConditionCheckResult
import blended.testsupport.TestActorSys
import blended.testsupport.scalatest.LoggingFreeSpec
import blended.testsupport.BlendedTestSupport.freePort

class HttpAvailableConditionSpec extends LoggingFreeSpec with ScalatestRouteTest {

Expand All @@ -24,7 +25,7 @@ class HttpAvailableConditionSpec extends LoggingFreeSpec with ScalatestRouteTest

val t = 5.seconds

val condition = HttpAvailableCondition("http://localhost:8888/nonExisting", Some(t))
val condition = HttpAvailableCondition(s"http://localhost:$freePort/nonExisting", Some(t))

val checker = TestActorRef(ConditionActor.props(cond = condition))
checker.tell(CheckCondition, probe.ref)
Expand All @@ -36,17 +37,16 @@ class HttpAvailableConditionSpec extends LoggingFreeSpec with ScalatestRouteTest
implicit val system = testkit.system
val probe = TestProbe()

val localPort = 9999
val route = get {
path("hello") {
complete("Hello")
}
}

TestServer.withServer(localPort, route) { () =>
TestServer.withServer(route) { port =>
val t = 10.seconds

val condition = HttpAvailableCondition(s"http://localhost:${localPort}/hello", Some(t))
val condition = HttpAvailableCondition(s"http://localhost:${port}/hello", Some(t))

val checker = TestActorRef(ConditionActor.props(cond = condition))
checker.tell(CheckCondition, probe.ref)
Expand All @@ -59,22 +59,21 @@ class HttpAvailableConditionSpec extends LoggingFreeSpec with ScalatestRouteTest
implicit val system = testkit.system
val probe = TestProbe()

val localPort = 9999
val route = get {
path("hello") {
complete("Hello")
}
}

TestServer.withServer(localPort, route) { () =>
val t = 10.seconds
TestServer.withServer(route) { port =>
val t = 5.seconds

val condition = HttpAvailableCondition(s"http://localhost:${localPort}/missing", Some(t))
val condition = HttpAvailableCondition(s"http://localhost:${port}/missing", Some(t))

val checker = TestActorRef(ConditionActor.props(cond = condition))
checker.tell(CheckCondition, probe.ref)

probe.expectMsg(t, ConditionCheckResult(List.empty[Condition], List.empty[Condition]))
probe.expectMsg(t + 1.second, ConditionCheckResult(List.empty[Condition], List(condition)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,21 @@ object TestServer {

private[this] val log = Logger[TestServer.type]

def withServer(
port: Int,
route: Route
)(
f: => Unit
def withServer(route: Route)(
f : Int => Unit
)(
implicit
actorSystem: ActorSystem,
actorMaterializer: ActorMaterializer
): Unit = {

val serverFut = Http().bindAndHandle(route, "localhost", port)
val serverFut = Http().bindAndHandle(route, "localhost", 0)
val server = Await.result(serverFut, 10.seconds)
try {
log.info(s"Started test HTTP server on localhost:$port")
f
log.info(s"Started test HTTP server on ${server.localAddress}")
f(server.localAddress.getPort)
} finally {
log.info(s"Stopping test HTTP server on localhost:$port")
log.info(s"Stopping test HTTP server on ${server.localAddress}")
Await.result(server.unbind(), 10.seconds)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ abstract class BridgeSpecSupport extends SimplePojoContainerSpec
with ScalaCheckPropertyChecks
with JmsConnectionHelper {

override def timeout: FiniteDuration = 5.seconds
private implicit val to : FiniteDuration = timeout

protected val log : Logger = Logger(getClass().getName())
Expand Down Expand Up @@ -65,6 +64,16 @@ abstract class BridgeSpecSupport extends SimplePojoContainerSpec
protected def consumeMessages(
cf: IdAwareConnectionFactory,
destName : String,
expected : Int,
timeout : FiniteDuration
)(implicit system : ActorSystem) : Try[List[FlowEnvelope]] = consumeMessages(
cf = cf, destName = destName, completeOn = Some(l => l.size == expected), timeout = timeout
)

protected def consumeMessages(
cf: IdAwareConnectionFactory,
destName : String,
completeOn : Option[Seq[FlowEnvelope] => Boolean] = None,
timeout : FiniteDuration
)(implicit system : ActorSystem) : Try[List[FlowEnvelope]] = Try {

Expand All @@ -73,6 +82,7 @@ abstract class BridgeSpecSupport extends SimplePojoContainerSpec
cf = cf,
dest = JmsDestination.create(destName).get,
log = envLogger(log),
completeOn = completeOn,
timeout = Some(timeout)
)
Await.result(coll.result, timeout + 100.millis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ class InboundBridgeTrackedSpec extends BridgeSpecSupport {
val switch = sendInbound(external, msgCount)

val messages : List[FlowEnvelope] =
consumeMessages(internal, "bridge.data.in.activemq.external", timeout)(actorSys).get
consumeMessages(
cf = internal,
destName = "bridge.data.in.activemq.external",
timeout = timeout
)(actorSys).get

messages should have size(msgCount)

Expand Down Expand Up @@ -68,7 +72,12 @@ class InboundBridgeTrackedSpec extends BridgeSpecSupport {

val switch : KillSwitch = sendMessages("SampleHeaderIn", external)(env)

val result : List[FlowEnvelope] = consumeMessages(internal, "bridge.data.in.activemq.external", 5.seconds)(actorSys).get
val result : List[FlowEnvelope] = consumeMessages(
cf = internal,
destName = "bridge.data.in.activemq.external",
expected = 1,
timeout = timeout
)(actorSys).get

result should have size 1
result.head.header[String]("ResourceType") should be (Some(desc))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ class InboundBridgeUntrackedSpec extends BridgeSpecSupport {
val switch = sendInbound(external, msgCount)

val messages : List[FlowEnvelope] =
consumeMessages(internal, "bridge.data.in.activemq.external", timeout)(actorSys).get
consumeMessages(
cf = internal,
destName = "bridge.data.in.activemq.external",
expected = msgCount,
timeout = timeout
)(actorSys).get

messages should have size msgCount

Expand All @@ -60,11 +65,16 @@ class InboundBridgeUntrackedSpec extends BridgeSpecSupport {
val switch : KillSwitch = sendMessages("sampleIn", external)(msgs:_*)

val messages : List[FlowEnvelope] =
consumeMessages(internal, "bridge.data.in.activemq.external", timeout)(actorSys).get
consumeMessages(
cf = internal,
destName = "bridge.data.in.activemq.external",
expected = msgs.size,
timeout = timeout
)(actorSys).get

messages should have size msgs.size

consumeEvents(internal, timeout)(actorSys).get should be (empty)
consumeEvents(cf = internal, timeout = timeout)(actorSys).get should be (empty)

switch.shutdown()
}
Expand All @@ -81,7 +91,12 @@ class InboundBridgeUntrackedSpec extends BridgeSpecSupport {
val switch : KillSwitch = sendMessages("sampleIn", external)(msgs:_*)

val messages : List[FlowEnvelope] =
consumeMessages(internal, "bridge.data.in.activemq.external", timeout)(actorSys).get
consumeMessages(
cf = internal,
destName = "bridge.data.in.activemq.external",
expected = msgs.size,
timeout = timeout
)(actorSys).get

messages should have size msgs.size

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,15 @@ class InboundRejectBridgeSpec extends BridgeSpecSupport {

val switch = sendInbound(external, msgCount)

consumeMessages(internal, "bridge.data.in.activemq.external", timeout)(actorSys).get should be (empty)
consumeMessages(cf = internal, destName = "bridge.data.in.activemq.external", timeout = timeout)(actorSys).get should be (empty)
consumeEvents(internal, timeout)(actorSys).get should be (empty)

consumeMessages(external, "sampleIn", timeout)(actorSys).get should have size(msgCount)
consumeMessages(
cf = external,
destName = "sampleIn",
expected = msgCount,
timeout = timeout
)(actorSys).get should have size(msgCount)

switch.shutdown()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ class MapToExternalBridgeSpec extends BridgeSpecSupport {
val switch : KillSwitch = sendMessages("bridge.data.out", internal)(msgs:_*)

val messages : List[FlowEnvelope] =
consumeMessages(external, "sampleOut", timeout)(actorSystem).get
consumeMessages(
cf = external,
destName = "sampleOut",
expected = msgCount,
timeout = timeout
)(actorSystem).get

messages should have size(msgCount)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ class OutboundBridgeSpec extends BridgeSpecSupport {
val switch = sendOutbound(internal, msgCount, track = false)

val messages : List[FlowEnvelope] =
consumeMessages(external, "sampleOut", timeout)(actorSys).get
consumeMessages(
cf = external,
destName = "sampleOut",
expected = msgCount,
timeout = timeout
)(actorSys).get

messages should have size(msgCount)

Expand All @@ -57,7 +62,12 @@ class OutboundBridgeSpec extends BridgeSpecSupport {
val switch = sendOutbound(internal, msgCount, true)

val messages : List[FlowEnvelope] =
consumeMessages(external, "sampleOut", timeout)(actorSys).get
consumeMessages(
cf = external,
destName = "sampleOut",
expected = msgCount,
timeout = timeout
)(actorSys).get

messages should have size(msgCount)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,24 @@ class RouteAfterRetrySpec extends BridgeSpecSupport {
}
println()

val retried : List[FlowEnvelope] = consumeMessages(internal, "retries", timeout)(actorSys).get
val retried : List[FlowEnvelope] = consumeMessages(
cf = internal,
destName = "retries",
timeout = timeout
)(actorSys).get
retried should be (empty)

consumeEvents(internal, timeout)(actorSys).get should not be empty

consumeMessages(external, "sampleOut", timeout)(actorSys).get should have size(msgCount)
val messages : List[FlowEnvelope] =
consumeMessages(
cf = external,
destName = "sampleOut",
expected = msgCount,
timeout = timeout
)(actorSys).get

messages should have size(msgCount)

switch.shutdown()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ class SendFailedRejectBridgeSpec extends BridgeSpecSupport {

val switch = sendOutbound(internal, msgCount, true)

val retried : List[FlowEnvelope] = consumeMessages(internal, "retries", timeout)(actorSys).get
val retried : List[FlowEnvelope] = consumeMessages(
cf = internal,
destName = "retries",
timeout = timeout
)(actorSys).get
retried should be (empty)

consumeEvents(internal, timeout)(actorSys).get should be (empty)
Expand All @@ -62,7 +66,12 @@ class SendFailedRejectBridgeSpec extends BridgeSpecSupport {
env.header[Unit]("UnitProperty") should be (Some(()))
}

consumeMessages(internal, "bridge.data.out.activemq.external", timeout)(actorSys).get should have size(msgCount)
consumeMessages(
cf = internal,
destName = "bridge.data.out.activemq.external",
expected = msgCount,
timeout = timeout
)(actorSys).get should have size(msgCount)

switch.shutdown()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,19 @@ class SendFailedRetryBridgeSpec extends BridgeSpecSupport {
"The outbound bridge should " - {

"pass the message to the retry destination and not generate a transaction event if the forwarding of the message fails" in {
val timeout : FiniteDuration = 1.second
val msgCount = 2

val actorSys = system(registry)
val (internal, external) = getConnectionFactories(registry)

val switch = sendOutbound(internal, msgCount, true)

val retried : List[FlowEnvelope] = consumeMessages(internal, "retries", timeout)(actorSys).get
val retried : List[FlowEnvelope] = consumeMessages(
cf = internal,
destName = "retries",
expected = msgCount,
timeout = timeout
)(actorSys).get

retried should have size(msgCount)

Expand All @@ -60,7 +64,7 @@ class SendFailedRetryBridgeSpec extends BridgeSpecSupport {
}

consumeEvents(internal, timeout)(actorSys).get should be (empty)
consumeMessages(external, "sampleOut", timeout)(actorSys).get should be (empty)
consumeMessages(cf = external, destName = "sampleOut", timeout = timeout)(actorSys).get should be (empty)

switch.shutdown()
}
Expand Down
Loading

0 comments on commit 4a73819

Please sign in to comment.