diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala index ac358c12d20a..c7b11a5c6286 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala @@ -25,6 +25,7 @@ object MultiDcSplitBrainMultiJvmSpec extends MultiNodeConfig { """ akka.loglevel = DEBUG akka.cluster.debug.verbose-heartbeat-logging = on + akka.cluster.debug.verbose-gossip-logging = on akka.remote.netty.tcp.connection-timeout = 5 s # speedup in case of connection issue akka.remote.retry-gate-closed-for = 1 s akka.cluster.multi-data-center { @@ -34,7 +35,7 @@ object MultiDcSplitBrainMultiJvmSpec extends MultiNodeConfig { } } akka.cluster { - gossip-interval = 1s + gossip-interval = 500ms leader-actions-interval = 1s auto-down-unreachable-after = 1s } @@ -69,8 +70,8 @@ abstract class MultiDcSplitBrainSpec val dc2 = List(third, fourth, fifth) var barrierCounter = 0 - def splitDataCenters(notMembers: Set[RoleName]): Unit = { - val memberNodes = (dc1 ++ dc2).filterNot(notMembers) + def splitDataCenters(doNotVerify: Set[RoleName]): Unit = { + val memberNodes = (dc1 ++ dc2).filterNot(doNotVerify) val probe = TestProbe() runOn(memberNodes: _*) { cluster.subscribe(probe.ref, classOf[DataCenterReachabilityEvent]) @@ -145,7 +146,7 @@ abstract class MultiDcSplitBrainSpec "be able to have a data center member join while there is inter data center split" in within(20.seconds) { // introduce a split between data centers - splitDataCenters(notMembers = Set(fourth, fifth)) + splitDataCenters(doNotVerify = Set(fourth, fifth)) runOn(fourth) { cluster.join(third) @@ -172,8 +173,10 @@ abstract class MultiDcSplitBrainSpec enterBarrier("inter-data-center-split-1-done") } + // fifth is still not a member of the cluster + "be able to have data center member leave while there is inter data center split" in within(20.seconds) { - splitDataCenters(notMembers = Set(fifth)) + splitDataCenters(doNotVerify = Set(fifth)) runOn(fourth) { cluster.leave(fourth) @@ -192,6 +195,8 @@ abstract class MultiDcSplitBrainSpec enterBarrier("inter-data-center-split-2-done") } + // forth has left the cluster, fifth is still not a member + "be able to have data center member restart (same host:port) while there is inter data center split" in within(60.seconds) { val subscribeProbe = TestProbe() runOn(first, second, third, fifth) { @@ -199,6 +204,7 @@ abstract class MultiDcSplitBrainSpec subscribeProbe.expectMsgType[CurrentClusterState] } enterBarrier("subscribed") + runOn(fifth) { Cluster(system).join(third) } @@ -211,16 +217,18 @@ abstract class MultiDcSplitBrainSpec } enterBarrier("fifth-joined") - splitDataCenters(notMembers = Set(fourth)) + splitDataCenters(doNotVerify = Set(fourth)) runOn(fifth) { Cluster(system).shutdown() } + runOn(third) { awaitAssert(clusterView.members.collect { case m if m.dataCenter == "dc2" ⇒ m.address } should ===(Set(address(third)))) } + enterBarrier("fifth-removed") runOn(fifth) { @@ -230,11 +238,13 @@ abstract class MultiDcSplitBrainSpec enterBarrier("fifth-waiting-for-termination") Await.ready(system.whenTerminated, remaining) + val port = Cluster(system).selfAddress.port.get val restartedSystem = ActorSystem( system.name, - ConfigFactory.parseString(s""" - akka.remote.netty.tcp.port = ${Cluster(system).selfAddress.port.get} - akka.remote.artery.canonical.port = ${Cluster(system).selfAddress.port.get} + ConfigFactory.parseString( + s""" + akka.remote.netty.tcp.port = $port + akka.remote.artery.canonical.port = $port akka.coordinated-shutdown.terminate-actor-system = on """).withFallback(system.settings.config)) Cluster(restartedSystem).join(thirdAddress) @@ -254,6 +264,7 @@ abstract class MultiDcSplitBrainSpec } testConductor.shutdown(fifth) } + runOn(remainingRoles: _*) { enterBarrier("fifth-restarted") } @@ -267,13 +278,16 @@ abstract class MultiDcSplitBrainSpec subscribeProbe.expectMsgType[MemberRemoved].member.uniqueAddress should ===(fifthOriginalUniqueAddress.get) subscribeProbe.expectMsgType[MemberUp].member.address should ===(fifthOriginalUniqueAddress.get.address) } + runOn(remainingRoles: _*) { enterBarrier("fifth-re-joined") } + runOn(first) { // to shutdown the restartedSystem on fifth Cluster(system).leave(fifthOriginalUniqueAddress.get.address) } + runOn(first, second, third) { awaitAssert({ clusterView.members.map(_.address) should ===(Set(address(first), address(second), address(third))) diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index a8ffeeb90e9f..2c9bf69b792a 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -303,10 +303,10 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: if (selfIndex == 0) { testConductor.removeNode(myself) within(testConductor.Settings.BarrierTimeout.duration) { - awaitCond { + awaitCond({ // Await.result(testConductor.getNodes, remaining).filterNot(_ == myself).isEmpty - testConductor.getNodes.await.filterNot(_ == myself).isEmpty - } + testConductor.getNodes.await.forall(_ == myself) + }, message = s"Nodes not shutdown: ${testConductor.getNodes.await}") } } shutdown(system, duration = shutdownTimeout)