From 195eabc111f1a10bb02635ee3968159a3014a20f Mon Sep 17 00:00:00 2001 From: Chris Gioran Date: Wed, 22 Jun 2016 18:07:57 +0300 Subject: [PATCH] Solves the problem of detecting failures when the cluster suffers partitions HeartbeatContextImpl will now consult its suspicions before making a decision whether a suspected instance is actually failed or not. Previously, only failed instances would be ignored when determining quorum of suspicions. Now, suspected instances will not be counted towards the required suspicion cap, effectively allowing minority partitions to mark all disconnected instances as failed and eventually have HA move to PENDING state. --- .../context/HeartbeatContextImpl.java | 26 +++---- .../context/HeartbeatContextImplTest.java | 76 ++++++++++++++++++- .../HighAvailabilityMemberStateMachine.java | 7 +- .../cluster/HighAvailabilityModeSwitcher.java | 6 +- ...ionTestIT.java => ClusterPartitionIT.java} | 10 +-- .../HighAvailabilityModeSwitcherTest.java | 4 +- 6 files changed, 101 insertions(+), 28 deletions(-) rename enterprise/ha/src/test/java/org/neo4j/kernel/ha/{ClusterPartitionTestIT.java => ClusterPartitionIT.java} (98%) diff --git a/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/HeartbeatContextImpl.java b/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/HeartbeatContextImpl.java index a908e0931f551..9cef2b06dd372 100644 --- a/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/HeartbeatContextImpl.java +++ b/enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/HeartbeatContextImpl.java @@ -299,21 +299,17 @@ public void serverLeftCluster( InstanceId node ) @Override public boolean isFailed( InstanceId node ) { - List suspicions = getSuspicionsOf( node ); + List suspicionsForNode = getSuspicionsOf( node ); + int countOfInstancesSuspectedByMe = getSuspicionsFor( getMyId() ).size(); /* - * This looks weird but trust me, there is a reason for it. - * See below in the test, where we subtract the failed size() from the total cluster size? If the instance - * under question is already in the failed set then that's it, as expected. But if it is not in the failed set - * then we must not take it's opinion under consideration (which we implicitly don't for every member of the - * failed set). That's what the adjust represents - the node's opinion on whether it is alive or not. Run a - * 3 cluster simulation in your head with 2 instances failed and one coming back online and you'll see why. + * If more than half *non suspected instances* suspect this node, fail it. This takes care of partitions + * that contain less than half of the cluster, ensuring that they will eventually detect the disconnect without + * waiting to have a majority of suspicions. This is accomplished by counting as quorum only instances + * that are not suspected by me. */ - int adjust = failed.contains( node ) ? 0 : 1; - - // If more than half suspect this node, fail it - return suspicions.size() > - (commonState.configuration().getMembers().size() - failed.size() - adjust) / 2; + return suspicionsForNode.size() > + (commonState.configuration().getMembers().size() - countOfInstancesSuspectedByMe ) / 2; } /** @@ -352,13 +348,13 @@ public Set getSuspicionsFor( InstanceId instanceId ) return new HashSet<>( suspicions ); } - private Set suspicionsFor( InstanceId uri ) + private Set suspicionsFor( InstanceId instanceId ) { - Set serverSuspicions = nodeSuspicions.get( uri ); + Set serverSuspicions = nodeSuspicions.get( instanceId ); if ( serverSuspicions == null ) { serverSuspicions = new HashSet<>(); - nodeSuspicions.put( uri, serverSuspicions ); + nodeSuspicions.put( instanceId, serverSuspicions ); } return serverSuspicions; } diff --git a/enterprise/cluster/src/test/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/HeartbeatContextImplTest.java b/enterprise/cluster/src/test/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/HeartbeatContextImplTest.java index 86cc7ab121fb5..d1de984396c92 100644 --- a/enterprise/cluster/src/test/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/HeartbeatContextImplTest.java +++ b/enterprise/cluster/src/test/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/context/HeartbeatContextImplTest.java @@ -20,10 +20,9 @@ package org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.context; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Set; import org.junit.Test; @@ -148,4 +147,77 @@ public void alive( InstanceId server ) assertEquals( 1, failed.size() ); assertTrue( failed.contains( member3 ) ); } + + @Test + public void majorityOfNonSuspectedInstancesShouldBeEnoughToMarkAnInstanceAsFailed() throws Exception + { + // Given + InstanceId me = new InstanceId( 1 ); + InstanceId member2 = new InstanceId( 2 ); + InstanceId member3 = new InstanceId( 3 ); + InstanceId member4 = new InstanceId( 4 ); + InstanceId member5 = new InstanceId( 5 ); + + Timeouts timeouts = mock( Timeouts.class ); + + CommonContextState commonState = mock( CommonContextState.class ); + ClusterConfiguration configuration = mock ( ClusterConfiguration.class ); + when( commonState.configuration() ).thenReturn( configuration ); + when( configuration.getMembers() ).thenReturn( members( 5 ) ); + when( configuration.getMemberIds() ).thenReturn( ids( 5 ) ); + + DelayedDirectExecutor executor = new DelayedDirectExecutor( NullLogProvider.getInstance() ); + HeartbeatContext context = + new HeartbeatContextImpl( me, commonState, NullLogProvider.getInstance(), timeouts, + executor ); + + final List failed = new ArrayList<>( 4 ); + HeartbeatListener listener = new HeartbeatListener() + { + @Override + public void failed( InstanceId server ) + { + failed.add( server ); + } + + @Override + public void alive( InstanceId server ) + { + failed.remove( server ); + } + }; + + context.addHeartbeatListener( listener ); + + // when + // just two suspicions come, no extra failing action should be taken since this is not majority + context.suspect( member2 ); + context.suspect( member3 ); + executor.drain(); + + // then + assertEquals( 0, failed.size() ); + + // when + // the another instance suspects them, therefore have a majority of non suspected, then 2 and 3 must fail + Set suspicionsFrom5 = new HashSet<>(); + suspicionsFrom5.add( member2 ); + suspicionsFrom5.add( member3 ); + context.suspicions( member5, suspicionsFrom5 ); + executor.drain(); + + // then + assertEquals( 2, failed.size() ); + assertTrue( failed.contains( member2 ) ); + assertTrue( failed.contains( member3 ) ); + + // when + // an instance sends a heartbeat, it should be set as alive + context.alive( member2 ); + executor.drain(); + + // then + assertEquals( 1, failed.size() ); + assertTrue( failed.contains( member3 ) ); + } } diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMemberStateMachine.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMemberStateMachine.java index 47959461a2ae0..92efd9226fa23 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMemberStateMachine.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMemberStateMachine.java @@ -271,14 +271,14 @@ public void memberIsFailed( InstanceId instanceId ) HighAvailabilityMemberState oldState = state; changeStateToDetached(); log.debug( "Got memberIsFailed(" + instanceId + ") and cluster lost quorum to continue, moved to " - + state + " from " + oldState ); + + state + " from " + oldState + ", while maintaining read only capability." ); } else if ( instanceId.equals( context.getElectedMasterId() ) && state == HighAvailabilityMemberState.SLAVE ) { HighAvailabilityMemberState oldState = state; changeStateToDetached(); log.debug( "Got memberIsFailed(" + instanceId + ") which was the master and i am a slave, moved to " - + state + " from " + oldState ); + + state + " from " + oldState + ", while maintaining read only capability." ); } else { @@ -334,6 +334,9 @@ public void notify( HighAvailabilityMemberListener listener ) listener.instanceDetached( event ); } } ); + + context.setAvailableHaMasterId( null ); + context.setElectedMasterId( null ); } private long getAliveCount() diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityModeSwitcher.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityModeSwitcher.java index 5fd755d6d8246..053b5920e6395 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityModeSwitcher.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityModeSwitcher.java @@ -485,8 +485,9 @@ public void notify( ModeSwitcher listener ) { modeSwitcherFuture.get( 10, TimeUnit.SECONDS ); } - catch ( Exception ignored ) + catch ( Exception e ) { + msgLog.warn( "Exception received while waiting for switching to pending", e ); } } @@ -530,8 +531,9 @@ public void notify( ModeSwitcher listener ) { modeSwitcherFuture.get( 10, TimeUnit.SECONDS ); } - catch ( Exception ignored ) + catch ( Exception e ) { + msgLog.warn( "Exception received while waiting for switching to detached", e ); } } diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/ClusterPartitionTestIT.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/ClusterPartitionIT.java similarity index 98% rename from enterprise/ha/src/test/java/org/neo4j/kernel/ha/ClusterPartitionTestIT.java rename to enterprise/ha/src/test/java/org/neo4j/kernel/ha/ClusterPartitionIT.java index e4832a88216e7..00e93925695c8 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/ClusterPartitionTestIT.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/ClusterPartitionIT.java @@ -19,7 +19,6 @@ */ package org.neo4j.kernel.ha; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -40,11 +39,10 @@ import static org.neo4j.kernel.ha.cluster.HighAvailabilityMemberState.PENDING; import static org.neo4j.kernel.impl.ha.ClusterManager.allSeesAllAsAvailable; import static org.neo4j.kernel.impl.ha.ClusterManager.masterAvailable; -import static org.neo4j.kernel.impl.ha.ClusterManager.masterSeesMembers; import static org.neo4j.kernel.impl.ha.ClusterManager.masterSeesSlavesAsAvailable; import static org.neo4j.kernel.impl.ha.ClusterManager.memberSeesOtherMemberAsFailed; -public class ClusterPartitionTestIT +public class ClusterPartitionIT { @Rule public LoggerRule logger = new LoggerRule(); @@ -144,6 +142,7 @@ public void losingQuorumIncrementallyShouldMakeAllInstancesPendingAndReadOnly() .withProvider( ClusterManager.clusterOfSize( clusterSize ) ) .withSharedConfig( stringMap( ClusterSettings.heartbeat_interval.name(), "1", + ClusterSettings.heartbeat_timeout.name(), "3", HaSettings.tx_push_factor.name(), "4" ) ) // so we know the initial data made it everywhere .build(); @@ -206,7 +205,6 @@ public void losingQuorumIncrementallyShouldMakeAllInstancesPendingAndReadOnly() } @Test - @Ignore("Currently failing because the clustering layer does not properly detect such failures. WIP.") public void losingQuorumAbruptlyShouldMakeAllInstancesPendingAndReadOnly() throws Throwable { int clusterSize = 5; // we need 5 to differentiate between all other instances gone and just quorum being gone @@ -215,6 +213,7 @@ public void losingQuorumAbruptlyShouldMakeAllInstancesPendingAndReadOnly() throw .withProvider( ClusterManager.clusterOfSize( clusterSize ) ) .withSharedConfig( stringMap( ClusterSettings.heartbeat_interval.name(), "1", + ClusterSettings.heartbeat_timeout.name(), "3", HaSettings.tx_push_factor.name(), "4" ) ) // so we know the initial data made it everywhere .build(); @@ -263,12 +262,11 @@ public void losingQuorumAbruptlyShouldMakeAllInstancesPendingAndReadOnly() throw rk1.repair(); cluster.await( masterAvailable( failed2, failed3 ) ); - cluster.await( masterSeesMembers( 2 ) ); + cluster.await( masterSeesSlavesAsAvailable( 2 ) ); ensureInstanceIsWritable( master ); ensureInstanceIsWritable( remainingSlave ); ensureInstanceIsWritable( failed1 ); - } finally { diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/HighAvailabilityModeSwitcherTest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/HighAvailabilityModeSwitcherTest.java index fc98e1f90ef32..c4f4ab151a73c 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/HighAvailabilityModeSwitcherTest.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/HighAvailabilityModeSwitcherTest.java @@ -614,7 +614,7 @@ public void shouldUseProperServerIdWhenDemotingFromMasterOnException() throws Th } @Test - public void shouldSwitchToSlaveForNullMasterAndBeSilentWhenMovingToDetached() throws Exception + public void shouldSwitchToSlaveForNullMasterAndBeSilentWhenMovingToDetached() throws Throwable { // Given ClusterMemberAvailability availability = mock( ClusterMemberAvailability.class ); @@ -630,6 +630,8 @@ public void shouldSwitchToSlaveForNullMasterAndBeSilentWhenMovingToDetached() th toTest.addModeSwitcher( mockSwitcher ); // When + toTest.init(); + toTest.start(); toTest.instanceDetached( new HighAvailabilityMemberChangeEvent( HighAvailabilityMemberState.MASTER, HighAvailabilityMemberState.PENDING, null, null ) );