Skip to content

Commit

Permalink
The master of a two instance cluster will now serve writes on slave f…
Browse files Browse the repository at this point in the history
…ailure

A two instance cluster cannot tolerate any failures, since quorum is the cluster
 size. This can be a problem in master+slave_only deployments, where the slave_only
 instance is meant as simply a read only replica. For this particular scenario,
 if the slave dies then it makes sense for the master to maintain write capabilities.
 This commit changes things to make it so that only for two instance clusters, if
 the slave dies (or a partition happens), the master will continue serving
 writes
  • Loading branch information
digitalstain committed Mar 9, 2017
1 parent 373f070 commit 326874b
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 5 deletions.
Expand Up @@ -245,7 +245,14 @@ public void memberIsUnavailable( String role, InstanceId unavailableId )
public void memberIsFailed( InstanceId instanceId ) public void memberIsFailed( InstanceId instanceId )
{ {
// If we don't have quorum anymore with the currently alive members, then go to pending // If we don't have quorum anymore with the currently alive members, then go to pending
if ( !isQuorum( getAliveCount(), getTotalCount() ) ) /*
* Unless this is a two instance cluster and we are the MASTER. This is an edge case in which a cluster
* of two instances gets a partition and we want to maintain write capability on one side.
* This, in combination with use of slave_only, is a cheap way to provide quasi-read-replica
* functionality for HA under the 2-instance scenario.
*/
if ( !isQuorum( getAliveCount(), getTotalCount() ) &&
!( getTotalCount() == 2 && state == HighAvailabilityMemberState.MASTER ) )
{ {
HighAvailabilityMemberState oldState = state; HighAvailabilityMemberState oldState = state;
changeStateToDetached(); changeStateToDetached();
Expand Down
@@ -0,0 +1,131 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.ha;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberState;
import org.neo4j.kernel.impl.ha.ClusterManager;
import org.neo4j.test.ha.ClusterRule;

import static org.junit.Assert.assertEquals;
import static org.neo4j.kernel.impl.ha.ClusterManager.allSeesAllAsAvailable;
import static org.neo4j.kernel.impl.ha.ClusterManager.clusterOfSize;
import static org.neo4j.kernel.impl.ha.ClusterManager.memberSeesOtherMemberAsFailed;

public class TwoInstanceClusterIT
{
@Rule
public final ClusterRule clusterRule = new ClusterRule( getClass() );

private ClusterManager.ManagedCluster cluster;

@Before
public void setup() throws Exception
{
cluster = clusterRule
.withSharedSetting( HaSettings.read_timeout, "1s" )
.withSharedSetting( HaSettings.state_switch_timeout, "2s" )
.withSharedSetting( HaSettings.com_chunk_size, "1024" )
.withCluster( clusterOfSize( 2 ) )
.startCluster();
}

@Test
public void masterShouldRemainAvailableIfTheSlaveDiesAndRecovers() throws Throwable
{
HighlyAvailableGraphDatabase master = cluster.getMaster();
HighlyAvailableGraphDatabase theSlave = cluster.getAnySlave();

String propertyName = "prop";
String propertyValue1 = "value1";
String propertyValue2 = "value2";
long masterNodeId;
long slaveNodeId;

ClusterManager.RepairKit repairKit = cluster.fail( theSlave );
cluster.await( memberSeesOtherMemberAsFailed( master, theSlave ) );

try( Transaction tx = master.beginTx() )
{
Node node = master.createNode();
node.setProperty( propertyName, propertyValue1 );
masterNodeId = node.getId();
tx.success();
}

repairKit.repair();

cluster.await( allSeesAllAsAvailable() );

try( Transaction tx = theSlave.beginTx() )
{
Node node = theSlave.createNode();
node.setProperty( propertyName, propertyValue2 );
assertEquals( propertyValue1, theSlave.getNodeById( masterNodeId ).getProperty( propertyName ) );
slaveNodeId = node.getId();
tx.success();
}

try( Transaction tx = master.beginTx() )
{
assertEquals( propertyValue2, master.getNodeById( slaveNodeId ).getProperty( propertyName ) );
tx.success();
}
}

@Test
public void slaveShouldMoveToPendingAndThenRecoverIfMasterDiesAndThenRecovers() throws Throwable
{
HighlyAvailableGraphDatabase master = cluster.getMaster();
HighlyAvailableGraphDatabase theSlave = cluster.getAnySlave();

String propertyName = "prop";
String propertyValue = "value1";
long slaveNodeId;

ClusterManager.RepairKit repairKit = cluster.fail( master );
cluster.await( memberSeesOtherMemberAsFailed( theSlave, master) );

assertEquals( HighAvailabilityMemberState.PENDING, theSlave.getInstanceState() );

repairKit.repair();

cluster.await( allSeesAllAsAvailable() );

try( Transaction tx = theSlave.beginTx() )
{
Node node = theSlave.createNode();
slaveNodeId = node.getId();
node.setProperty( propertyName, propertyValue );
tx.success();
}

try( Transaction tx = master.beginTx() )
{
assertEquals( propertyValue, master.getNodeById( slaveNodeId ).getProperty( propertyName ) );
tx.success();
}
}
}
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -180,7 +181,7 @@ public void shouldSwitchToToSlaveOnMasterAvailableForSomeoneElse() throws Throwa
} }


@Test @Test
public void whenInMasterStateLosingQuorumShouldPutInPending() throws Throwable public void whenInMasterStateLosingQuorumFromTwoInstancesShouldRemainMaster() throws Throwable
{ {
// Given // Given
InstanceId me = new InstanceId( 1 ); InstanceId me = new InstanceId( 1 );
Expand Down Expand Up @@ -209,6 +210,46 @@ public void whenInMasterStateLosingQuorumShouldPutInPending() throws Throwable
// When // When
memberListener.memberIsFailed( new InstanceId( 2 ) ); memberListener.memberIsFailed( new InstanceId( 2 ) );


// Then
assertThat( stateMachine.getCurrentState(), equalTo( HighAvailabilityMemberState.MASTER ) );
assertThat( probe.instanceStops, is( false ) );
assertThat( probe.instanceDetached, is( false ) );
}

@Test
public void whenInMasterStateLosingQuorumFromThreeInstancesShouldGoToPending() throws Throwable
{
// Given
InstanceId me = new InstanceId( 1 );
InstanceId other1 = new InstanceId( 2 );
InstanceId other2 = new InstanceId( 3 );
HighAvailabilityMemberContext context = new SimpleHighAvailabilityMemberContext( me, false );

AvailabilityGuard guard = mock( AvailabilityGuard.class );
List<InstanceId> otherInstances = new LinkedList();
otherInstances.add( other1 );
otherInstances.add( other2 );
ObservedClusterMembers members = mockClusterMembers( me, emptyList(), otherInstances );

ClusterMemberEvents events = mock( ClusterMemberEvents.class );
ClusterMemberListenerContainer memberListenerContainer = mockAddClusterMemberListener( events );

HighAvailabilityMemberStateMachine stateMachine = buildMockedStateMachine( context, events, members, guard );

stateMachine.init();
ClusterMemberListener memberListener = memberListenerContainer.get();
HAStateChangeListener probe = new HAStateChangeListener();
stateMachine.addHighAvailabilityMemberListener( probe );

// Send it to MASTER
memberListener.coordinatorIsElected( me );
memberListener.memberIsAvailable( MASTER, me, URI.create( "ha://whatever" ), StoreId.DEFAULT );

assertThat( stateMachine.getCurrentState(), equalTo( HighAvailabilityMemberState.MASTER ) );

// When
memberListener.memberIsFailed( new InstanceId( 2 ) );

// Then // Then
assertThat( stateMachine.getCurrentState(), equalTo( HighAvailabilityMemberState.PENDING ) ); assertThat( stateMachine.getCurrentState(), equalTo( HighAvailabilityMemberState.PENDING ) );
assertThat( probe.instanceStops, is( false ) ); assertThat( probe.instanceStops, is( false ) );
Expand Down Expand Up @@ -252,9 +293,8 @@ public void whenInSlaveStateLosingOtherSlaveShouldNotPutInPending() throws Throw
assertThat( probe.instanceStops, is( false ) ); assertThat( probe.instanceStops, is( false ) );
} }



@Test @Test
public void whenInSlaveStateLosingMasterShouldPutInPending() throws Throwable public void whenInSlaveStateWith3MemberClusterLosingMasterShouldPutInPending() throws Throwable
{ {
// Given // Given
InstanceId me = new InstanceId( 1 ); InstanceId me = new InstanceId( 1 );
Expand Down Expand Up @@ -292,6 +332,43 @@ public void whenInSlaveStateLosingMasterShouldPutInPending() throws Throwable
verify( guard, times( 1 ) ).require( any( AvailabilityRequirement.class ) ); verify( guard, times( 1 ) ).require( any( AvailabilityRequirement.class ) );
} }


@Test
public void whenInSlaveStateWith2MemberClusterLosingMasterShouldPutInPending() throws Throwable
{
// Given
InstanceId me = new InstanceId( 1 );
InstanceId master = new InstanceId( 2 );
HighAvailabilityMemberContext context = new SimpleHighAvailabilityMemberContext( me, false );
AvailabilityGuard guard = mock( AvailabilityGuard.class );
ObservedClusterMembers members = mockClusterMembers( me, emptyList(), singletonList( master ) );

ClusterMemberEvents events = mock( ClusterMemberEvents.class );
ClusterMemberListenerContainer memberListenerContainer = mockAddClusterMemberListener( events );

HighAvailabilityMemberStateMachine stateMachine = buildMockedStateMachine( context, events, members, guard );

stateMachine.init();
ClusterMemberListener memberListener = memberListenerContainer.get();
HAStateChangeListener probe = new HAStateChangeListener();
stateMachine.addHighAvailabilityMemberListener( probe );

// Send it to MASTER
memberListener.coordinatorIsElected( master );
memberListener.memberIsAvailable( MASTER, master, URI.create( "ha://whatever" ), StoreId.DEFAULT );
memberListener.memberIsAvailable( SLAVE, me, URI.create( "ha://whatever3" ), StoreId.DEFAULT );

assertThat( stateMachine.getCurrentState(), equalTo( HighAvailabilityMemberState.SLAVE ) );

// When
memberListener.memberIsFailed( master );

// Then
assertThat( stateMachine.getCurrentState(), equalTo( HighAvailabilityMemberState.PENDING ) );
assertThat( probe.instanceStops, is( false ) );
assertThat( probe.instanceDetached, is( true ) );
verify( guard, times( 1 ) ).require( any( AvailabilityRequirement.class ) );
}

@Test @Test
public void whenInToMasterStateLosingQuorumShouldPutInPending() throws Throwable public void whenInToMasterStateLosingQuorumShouldPutInPending() throws Throwable
{ {
Expand Down
Expand Up @@ -45,7 +45,6 @@
import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.test.TargetDirectory; import org.neo4j.test.TargetDirectory;


import static org.neo4j.cluster.ClusterSettings.broadcast_timeout;
import static org.neo4j.cluster.ClusterSettings.default_timeout; import static org.neo4j.cluster.ClusterSettings.default_timeout;
import static org.neo4j.cluster.ClusterSettings.join_timeout; import static org.neo4j.cluster.ClusterSettings.join_timeout;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.pagecache_memory; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.pagecache_memory;
Expand Down

0 comments on commit 326874b

Please sign in to comment.