Skip to content

Commit

Permalink
MultiDcSplitBrainSpec: Turn on gossip loggig; Increase gossip frequen…
Browse files Browse the repository at this point in the history
…cy (akka#24024)

The last time this failed there was no gossip to or from a node that
didn't see fifth coming back.

Also note that this test doesn't quite test what it says as the split
brain is repaired before starting the second actor system but without
extensions to the multi jvm test kit this can't be improved.

Refs akka#23306
  • Loading branch information
chbatey authored and manonthegithub committed Jan 31, 2018
1 parent 31af358 commit f385905
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ object MultiDcSplitBrainMultiJvmSpec extends MultiNodeConfig {
"""
akka.loglevel = DEBUG
akka.cluster.debug.verbose-heartbeat-logging = on
akka.cluster.debug.verbose-gossip-logging = on
akka.remote.netty.tcp.connection-timeout = 5 s # speedup in case of connection issue
akka.remote.retry-gate-closed-for = 1 s
akka.cluster.multi-data-center {
Expand All @@ -34,7 +35,7 @@ object MultiDcSplitBrainMultiJvmSpec extends MultiNodeConfig {
}
}
akka.cluster {
gossip-interval = 1s
gossip-interval = 500ms
leader-actions-interval = 1s
auto-down-unreachable-after = 1s
}
Expand Down Expand Up @@ -69,8 +70,8 @@ abstract class MultiDcSplitBrainSpec
val dc2 = List(third, fourth, fifth)
var barrierCounter = 0

def splitDataCenters(notMembers: Set[RoleName]): Unit = {
val memberNodes = (dc1 ++ dc2).filterNot(notMembers)
def splitDataCenters(doNotVerify: Set[RoleName]): Unit = {
val memberNodes = (dc1 ++ dc2).filterNot(doNotVerify)
val probe = TestProbe()
runOn(memberNodes: _*) {
cluster.subscribe(probe.ref, classOf[DataCenterReachabilityEvent])
Expand Down Expand Up @@ -145,7 +146,7 @@ abstract class MultiDcSplitBrainSpec

"be able to have a data center member join while there is inter data center split" in within(20.seconds) {
// introduce a split between data centers
splitDataCenters(notMembers = Set(fourth, fifth))
splitDataCenters(doNotVerify = Set(fourth, fifth))

runOn(fourth) {
cluster.join(third)
Expand All @@ -172,8 +173,10 @@ abstract class MultiDcSplitBrainSpec
enterBarrier("inter-data-center-split-1-done")
}

// fifth is still not a member of the cluster

"be able to have data center member leave while there is inter data center split" in within(20.seconds) {
splitDataCenters(notMembers = Set(fifth))
splitDataCenters(doNotVerify = Set(fifth))

runOn(fourth) {
cluster.leave(fourth)
Expand All @@ -192,13 +195,16 @@ abstract class MultiDcSplitBrainSpec
enterBarrier("inter-data-center-split-2-done")
}

// forth has left the cluster, fifth is still not a member

"be able to have data center member restart (same host:port) while there is inter data center split" in within(60.seconds) {
val subscribeProbe = TestProbe()
runOn(first, second, third, fifth) {
Cluster(system).subscribe(subscribeProbe.ref, InitialStateAsSnapshot, classOf[MemberUp], classOf[MemberRemoved])
subscribeProbe.expectMsgType[CurrentClusterState]
}
enterBarrier("subscribed")

runOn(fifth) {
Cluster(system).join(third)
}
Expand All @@ -211,16 +217,18 @@ abstract class MultiDcSplitBrainSpec
}
enterBarrier("fifth-joined")

splitDataCenters(notMembers = Set(fourth))
splitDataCenters(doNotVerify = Set(fourth))

runOn(fifth) {
Cluster(system).shutdown()
}

runOn(third) {
awaitAssert(clusterView.members.collect {
case m if m.dataCenter == "dc2" m.address
} should ===(Set(address(third))))
}

enterBarrier("fifth-removed")

runOn(fifth) {
Expand All @@ -230,11 +238,13 @@ abstract class MultiDcSplitBrainSpec
enterBarrier("fifth-waiting-for-termination")
Await.ready(system.whenTerminated, remaining)

val port = Cluster(system).selfAddress.port.get
val restartedSystem = ActorSystem(
system.name,
ConfigFactory.parseString(s"""
akka.remote.netty.tcp.port = ${Cluster(system).selfAddress.port.get}
akka.remote.artery.canonical.port = ${Cluster(system).selfAddress.port.get}
ConfigFactory.parseString(
s"""
akka.remote.netty.tcp.port = $port
akka.remote.artery.canonical.port = $port
akka.coordinated-shutdown.terminate-actor-system = on
""").withFallback(system.settings.config))
Cluster(restartedSystem).join(thirdAddress)
Expand All @@ -254,6 +264,7 @@ abstract class MultiDcSplitBrainSpec
}
testConductor.shutdown(fifth)
}

runOn(remainingRoles: _*) {
enterBarrier("fifth-restarted")
}
Expand All @@ -267,13 +278,16 @@ abstract class MultiDcSplitBrainSpec
subscribeProbe.expectMsgType[MemberRemoved].member.uniqueAddress should ===(fifthOriginalUniqueAddress.get)
subscribeProbe.expectMsgType[MemberUp].member.address should ===(fifthOriginalUniqueAddress.get.address)
}

runOn(remainingRoles: _*) {
enterBarrier("fifth-re-joined")
}

runOn(first) {
// to shutdown the restartedSystem on fifth
Cluster(system).leave(fifthOriginalUniqueAddress.get.address)
}

runOn(first, second, third) {
awaitAssert({
clusterView.members.map(_.address) should ===(Set(address(first), address(second), address(third)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,10 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
if (selfIndex == 0) {
testConductor.removeNode(myself)
within(testConductor.Settings.BarrierTimeout.duration) {
awaitCond {
awaitCond({
// Await.result(testConductor.getNodes, remaining).filterNot(_ == myself).isEmpty
testConductor.getNodes.await.filterNot(_ == myself).isEmpty
}
testConductor.getNodes.await.forall(_ == myself)
}, message = s"Nodes not shutdown: ${testConductor.getNodes.await}")
}
}
shutdown(system, duration = shutdownTimeout)
Expand Down

0 comments on commit f385905

Please sign in to comment.