Skip to content

Commit

Permalink
Solves the problem of detecting failures when the cluster suffers par…
Browse files Browse the repository at this point in the history
…titions

HeartbeatContextImpl will now consult its suspicions before making a
 decision whether a suspected instance is actually failed or not. Previously,
 only failed instances would be ignored when determining quorum of
 suspicions. Now, suspected instances will not be counted towards the
 required suspicion cap, effectively allowing minority partitions to
 mark all disconnected instances as failed and eventually have HA
 move to PENDING state.
  • Loading branch information
digitalstain committed Jun 27, 2016
1 parent dc25f97 commit 195eabc
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 28 deletions.
Expand Up @@ -299,21 +299,17 @@ public void serverLeftCluster( InstanceId node )
@Override @Override
public boolean isFailed( InstanceId node ) public boolean isFailed( InstanceId node )
{ {
List<InstanceId> suspicions = getSuspicionsOf( node ); List<InstanceId> suspicionsForNode = getSuspicionsOf( node );
int countOfInstancesSuspectedByMe = getSuspicionsFor( getMyId() ).size();


/* /*
* This looks weird but trust me, there is a reason for it. * If more than half *non suspected instances* suspect this node, fail it. This takes care of partitions
* See below in the test, where we subtract the failed size() from the total cluster size? If the instance * that contain less than half of the cluster, ensuring that they will eventually detect the disconnect without
* under question is already in the failed set then that's it, as expected. But if it is not in the failed set * waiting to have a majority of suspicions. This is accomplished by counting as quorum only instances
* then we must not take it's opinion under consideration (which we implicitly don't for every member of the * that are not suspected by me.
* failed set). That's what the adjust represents - the node's opinion on whether it is alive or not. Run a
* 3 cluster simulation in your head with 2 instances failed and one coming back online and you'll see why.
*/ */
int adjust = failed.contains( node ) ? 0 : 1; return suspicionsForNode.size() >

(commonState.configuration().getMembers().size() - countOfInstancesSuspectedByMe ) / 2;
// If more than half suspect this node, fail it
return suspicions.size() >
(commonState.configuration().getMembers().size() - failed.size() - adjust) / 2;
} }


/** /**
Expand Down Expand Up @@ -352,13 +348,13 @@ public Set<InstanceId> getSuspicionsFor( InstanceId instanceId )
return new HashSet<>( suspicions ); return new HashSet<>( suspicions );
} }


private Set<InstanceId> suspicionsFor( InstanceId uri ) private Set<InstanceId> suspicionsFor( InstanceId instanceId )
{ {
Set<InstanceId> serverSuspicions = nodeSuspicions.get( uri ); Set<InstanceId> serverSuspicions = nodeSuspicions.get( instanceId );
if ( serverSuspicions == null ) if ( serverSuspicions == null )
{ {
serverSuspicions = new HashSet<>(); serverSuspicions = new HashSet<>();
nodeSuspicions.put( uri, serverSuspicions ); nodeSuspicions.put( instanceId, serverSuspicions );
} }
return serverSuspicions; return serverSuspicions;
} }
Expand Down
Expand Up @@ -20,10 +20,9 @@
package org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.context; package org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.context;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;


import org.junit.Test; import org.junit.Test;


Expand Down Expand Up @@ -148,4 +147,77 @@ public void alive( InstanceId server )
assertEquals( 1, failed.size() ); assertEquals( 1, failed.size() );
assertTrue( failed.contains( member3 ) ); assertTrue( failed.contains( member3 ) );
} }

@Test
public void majorityOfNonSuspectedInstancesShouldBeEnoughToMarkAnInstanceAsFailed() throws Exception
{
// Given
InstanceId me = new InstanceId( 1 );
InstanceId member2 = new InstanceId( 2 );
InstanceId member3 = new InstanceId( 3 );
InstanceId member4 = new InstanceId( 4 );
InstanceId member5 = new InstanceId( 5 );

Timeouts timeouts = mock( Timeouts.class );

CommonContextState commonState = mock( CommonContextState.class );
ClusterConfiguration configuration = mock ( ClusterConfiguration.class );
when( commonState.configuration() ).thenReturn( configuration );
when( configuration.getMembers() ).thenReturn( members( 5 ) );
when( configuration.getMemberIds() ).thenReturn( ids( 5 ) );

DelayedDirectExecutor executor = new DelayedDirectExecutor( NullLogProvider.getInstance() );
HeartbeatContext context =
new HeartbeatContextImpl( me, commonState, NullLogProvider.getInstance(), timeouts,
executor );

final List<InstanceId> failed = new ArrayList<>( 4 );
HeartbeatListener listener = new HeartbeatListener()
{
@Override
public void failed( InstanceId server )
{
failed.add( server );
}

@Override
public void alive( InstanceId server )
{
failed.remove( server );
}
};

context.addHeartbeatListener( listener );

// when
// just two suspicions come, no extra failing action should be taken since this is not majority
context.suspect( member2 );
context.suspect( member3 );
executor.drain();

// then
assertEquals( 0, failed.size() );

// when
// the another instance suspects them, therefore have a majority of non suspected, then 2 and 3 must fail
Set<InstanceId> suspicionsFrom5 = new HashSet<>();
suspicionsFrom5.add( member2 );
suspicionsFrom5.add( member3 );
context.suspicions( member5, suspicionsFrom5 );
executor.drain();

// then
assertEquals( 2, failed.size() );
assertTrue( failed.contains( member2 ) );
assertTrue( failed.contains( member3 ) );

// when
// an instance sends a heartbeat, it should be set as alive
context.alive( member2 );
executor.drain();

// then
assertEquals( 1, failed.size() );
assertTrue( failed.contains( member3 ) );
}
} }
Expand Up @@ -271,14 +271,14 @@ public void memberIsFailed( InstanceId instanceId )
HighAvailabilityMemberState oldState = state; HighAvailabilityMemberState oldState = state;
changeStateToDetached(); changeStateToDetached();
log.debug( "Got memberIsFailed(" + instanceId + ") and cluster lost quorum to continue, moved to " log.debug( "Got memberIsFailed(" + instanceId + ") and cluster lost quorum to continue, moved to "
+ state + " from " + oldState ); + state + " from " + oldState + ", while maintaining read only capability." );
} }
else if ( instanceId.equals( context.getElectedMasterId() ) && state == HighAvailabilityMemberState.SLAVE ) else if ( instanceId.equals( context.getElectedMasterId() ) && state == HighAvailabilityMemberState.SLAVE )
{ {
HighAvailabilityMemberState oldState = state; HighAvailabilityMemberState oldState = state;
changeStateToDetached(); changeStateToDetached();
log.debug( "Got memberIsFailed(" + instanceId + ") which was the master and i am a slave, moved to " log.debug( "Got memberIsFailed(" + instanceId + ") which was the master and i am a slave, moved to "
+ state + " from " + oldState ); + state + " from " + oldState + ", while maintaining read only capability." );
} }
else else
{ {
Expand Down Expand Up @@ -334,6 +334,9 @@ public void notify( HighAvailabilityMemberListener listener )
listener.instanceDetached( event ); listener.instanceDetached( event );
} }
} ); } );

context.setAvailableHaMasterId( null );
context.setElectedMasterId( null );
} }


private long getAliveCount() private long getAliveCount()
Expand Down
Expand Up @@ -485,8 +485,9 @@ public void notify( ModeSwitcher listener )
{ {
modeSwitcherFuture.get( 10, TimeUnit.SECONDS ); modeSwitcherFuture.get( 10, TimeUnit.SECONDS );
} }
catch ( Exception ignored ) catch ( Exception e )
{ {
msgLog.warn( "Exception received while waiting for switching to pending", e );
} }
} }


Expand Down Expand Up @@ -530,8 +531,9 @@ public void notify( ModeSwitcher listener )
{ {
modeSwitcherFuture.get( 10, TimeUnit.SECONDS ); modeSwitcherFuture.get( 10, TimeUnit.SECONDS );
} }
catch ( Exception ignored ) catch ( Exception e )
{ {
msgLog.warn( "Exception received while waiting for switching to detached", e );
} }
} }


Expand Down
Expand Up @@ -19,7 +19,6 @@
*/ */
package org.neo4j.kernel.ha; package org.neo4j.kernel.ha;


import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;


Expand All @@ -40,11 +39,10 @@
import static org.neo4j.kernel.ha.cluster.HighAvailabilityMemberState.PENDING; import static org.neo4j.kernel.ha.cluster.HighAvailabilityMemberState.PENDING;
import static org.neo4j.kernel.impl.ha.ClusterManager.allSeesAllAsAvailable; import static org.neo4j.kernel.impl.ha.ClusterManager.allSeesAllAsAvailable;
import static org.neo4j.kernel.impl.ha.ClusterManager.masterAvailable; import static org.neo4j.kernel.impl.ha.ClusterManager.masterAvailable;
import static org.neo4j.kernel.impl.ha.ClusterManager.masterSeesMembers;
import static org.neo4j.kernel.impl.ha.ClusterManager.masterSeesSlavesAsAvailable; import static org.neo4j.kernel.impl.ha.ClusterManager.masterSeesSlavesAsAvailable;
import static org.neo4j.kernel.impl.ha.ClusterManager.memberSeesOtherMemberAsFailed; import static org.neo4j.kernel.impl.ha.ClusterManager.memberSeesOtherMemberAsFailed;


public class ClusterPartitionTestIT public class ClusterPartitionIT
{ {
@Rule @Rule
public LoggerRule logger = new LoggerRule(); public LoggerRule logger = new LoggerRule();
Expand Down Expand Up @@ -144,6 +142,7 @@ public void losingQuorumIncrementallyShouldMakeAllInstancesPendingAndReadOnly()
.withProvider( ClusterManager.clusterOfSize( clusterSize ) ) .withProvider( ClusterManager.clusterOfSize( clusterSize ) )
.withSharedConfig( stringMap( .withSharedConfig( stringMap(
ClusterSettings.heartbeat_interval.name(), "1", ClusterSettings.heartbeat_interval.name(), "1",
ClusterSettings.heartbeat_timeout.name(), "3",
HaSettings.tx_push_factor.name(), "4" ) ) // so we know the initial data made it everywhere HaSettings.tx_push_factor.name(), "4" ) ) // so we know the initial data made it everywhere
.build(); .build();


Expand Down Expand Up @@ -206,7 +205,6 @@ public void losingQuorumIncrementallyShouldMakeAllInstancesPendingAndReadOnly()
} }


@Test @Test
@Ignore("Currently failing because the clustering layer does not properly detect such failures. WIP.")
public void losingQuorumAbruptlyShouldMakeAllInstancesPendingAndReadOnly() throws Throwable public void losingQuorumAbruptlyShouldMakeAllInstancesPendingAndReadOnly() throws Throwable
{ {
int clusterSize = 5; // we need 5 to differentiate between all other instances gone and just quorum being gone int clusterSize = 5; // we need 5 to differentiate between all other instances gone and just quorum being gone
Expand All @@ -215,6 +213,7 @@ public void losingQuorumAbruptlyShouldMakeAllInstancesPendingAndReadOnly() throw
.withProvider( ClusterManager.clusterOfSize( clusterSize ) ) .withProvider( ClusterManager.clusterOfSize( clusterSize ) )
.withSharedConfig( stringMap( .withSharedConfig( stringMap(
ClusterSettings.heartbeat_interval.name(), "1", ClusterSettings.heartbeat_interval.name(), "1",
ClusterSettings.heartbeat_timeout.name(), "3",
HaSettings.tx_push_factor.name(), "4" ) ) // so we know the initial data made it everywhere HaSettings.tx_push_factor.name(), "4" ) ) // so we know the initial data made it everywhere
.build(); .build();


Expand Down Expand Up @@ -263,12 +262,11 @@ public void losingQuorumAbruptlyShouldMakeAllInstancesPendingAndReadOnly() throw
rk1.repair(); rk1.repair();


cluster.await( masterAvailable( failed2, failed3 ) ); cluster.await( masterAvailable( failed2, failed3 ) );
cluster.await( masterSeesMembers( 2 ) ); cluster.await( masterSeesSlavesAsAvailable( 2 ) );


ensureInstanceIsWritable( master ); ensureInstanceIsWritable( master );
ensureInstanceIsWritable( remainingSlave ); ensureInstanceIsWritable( remainingSlave );
ensureInstanceIsWritable( failed1 ); ensureInstanceIsWritable( failed1 );

} }
finally finally
{ {
Expand Down
Expand Up @@ -614,7 +614,7 @@ public void shouldUseProperServerIdWhenDemotingFromMasterOnException() throws Th
} }


@Test @Test
public void shouldSwitchToSlaveForNullMasterAndBeSilentWhenMovingToDetached() throws Exception public void shouldSwitchToSlaveForNullMasterAndBeSilentWhenMovingToDetached() throws Throwable
{ {
// Given // Given
ClusterMemberAvailability availability = mock( ClusterMemberAvailability.class ); ClusterMemberAvailability availability = mock( ClusterMemberAvailability.class );
Expand All @@ -630,6 +630,8 @@ public void shouldSwitchToSlaveForNullMasterAndBeSilentWhenMovingToDetached() th
toTest.addModeSwitcher( mockSwitcher ); toTest.addModeSwitcher( mockSwitcher );


// When // When
toTest.init();
toTest.start();
toTest.instanceDetached( new HighAvailabilityMemberChangeEvent( HighAvailabilityMemberState.MASTER, toTest.instanceDetached( new HighAvailabilityMemberChangeEvent( HighAvailabilityMemberState.MASTER,
HighAvailabilityMemberState.PENDING, null, null ) ); HighAvailabilityMemberState.PENDING, null, null ) );


Expand Down

0 comments on commit 195eabc

Please sign in to comment.