Skip to content

Commit

Permalink
Update handling of unavailability event for current member.
Browse files Browse the repository at this point in the history
Unavailability listener updated to work as following:
as soon as we receive unavailability event about us - then something went wrong
in a cluster and we need to perform new elections.
Elections should be triggered for all states except HighAvailabilityMemberState.PENDING, since
first of all there is nothing or we already made a switch and waiting election to start,
so no reason to start them again.
Simplify InvalidEpochExceptionHandler to only generate unavailability event.
  • Loading branch information
MishaDemianenko committed Oct 26, 2015
1 parent ceedb2a commit eea557c
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 353 deletions.
Expand Up @@ -483,7 +483,7 @@ protected void createModeSwitcher()
@Override
public void handle()
{
highAvailabilityModeSwitcher.forceElections();
highAvailabilityModeSwitcher.postMemberUnavailable();
}
};

Expand Down
Expand Up @@ -235,16 +235,49 @@ public void notify( HighAvailabilityMemberListener listener )
}
}


/**
* As soon as we receive an unavailability message and the instanceId belongs to us, depending on the current
* state we do the following:
* <ul>
* <li>if current state is <b>not</b> {@link HighAvailabilityMemberState#PENDING} we trigger switch to
* {@link
* HighAvailabilityMemberState#PENDING} and force new elections.</li>
* <li>if current state is {@link HighAvailabilityMemberState#PENDING}
* we only log debug message</li>
* </ul>
* The assumption here is: as soon as we receive unavailability event about us - then something went wrong
* in a cluster and we need to perform new elections.
* Elections should be triggered for all states except {@link HighAvailabilityMemberState#PENDING}, since
* first of all there is nothing or we already made a switch and waiting election to start, so no reason to
* start them again.
* <p>
* Listener invoked from sync block in {@link org.neo4j.cluster.member.paxos.PaxosClusterMemberEvents} so we
* should not have any racing here.
*</p>
* @param role The role for which the member is unavailable
* @param unavailableId The id of the member which became unavailable for that role
*/
@Override
public void memberIsUnavailable( String role, InstanceId unavailableId )
{
if ( context.getMyId().equals( unavailableId ) &&
HighAvailabilityModeSwitcher.SLAVE.equals( role ) &&
state == HighAvailabilityMemberState.SLAVE )
if ( context.getMyId().equals( unavailableId ) )
{
HighAvailabilityMemberState oldState = state;
changeStateToPending();
logger.debug( "Got memberIsUnavailable(" + unavailableId + "), moved to " + state + " from " + oldState );
if ( HighAvailabilityMemberState.PENDING != state )
{
HighAvailabilityMemberState oldState = state;
changeStateToPending();
logger.debug( "Got memberIsUnavailable(" + unavailableId + "), moved to " + state + " from " +
oldState );
logger.debug( "Forcing new round of elections." );
election.performRoleElections();
}
else
{
logger.debug( "Got memberIsUnavailable(" + unavailableId + "), but already in " +
HighAvailabilityMemberState.PENDING + " state, will skip state change and " +
"new election.");
}
}
else
{
Expand Down
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.cluster.BindingListener;
Expand Down Expand Up @@ -99,7 +98,6 @@ public static InstanceId getServerId( URI haUri )
private volatile URI me;
private volatile Future<?> modeSwitcherFuture;
private volatile HighAvailabilityMemberState currentTargetState;
private final AtomicBoolean canAskForElections = new AtomicBoolean( true );

public HighAvailabilityModeSwitcher( SwitchToSlave switchToSlave,
SwitchToMaster switchToMaster,
Expand Down Expand Up @@ -209,13 +207,9 @@ public void removeModeSwitcher( ModeSwitcher modeSwitcher )
modeSwitchListeners = Listeners.removeListener( modeSwitcher, modeSwitchListeners );
}

public void forceElections()
public void postMemberUnavailable()
{
if ( canAskForElections.compareAndSet( true, false ) )
{
clusterMemberAvailability.memberIsUnavailable( HighAvailabilityModeSwitcher.SLAVE );
election.performRoleElections();
}
clusterMemberAvailability.memberIsUnavailable( HighAvailabilityModeSwitcher.SLAVE );
}

private void stateChanged( HighAvailabilityMemberChangeEvent event )
Expand All @@ -239,12 +233,6 @@ private void stateChanged( HighAvailabilityMemberChangeEvent event )
switch ( event.getNewState() )
{
case TO_MASTER:

if ( event.getOldState().equals( HighAvailabilityMemberState.SLAVE ) )
{
clusterMemberAvailability.memberIsUnavailable( SLAVE );
}

switchToMaster();
break;
case TO_SLAVE:
Expand Down Expand Up @@ -300,7 +288,6 @@ public void notify( ModeSwitcher listener )
try
{
masterHaURI = switchToMaster.switchToMaster( haCommunicationLife, me );
canAskForElections.set( true );
}
catch ( Throwable e )
{
Expand Down Expand Up @@ -380,7 +367,6 @@ public void notify( ModeSwitcher listener )
else
{
slaveHaURI = resultingSlaveHaURI;
canAskForElections.set( true );
}
}
catch ( HighAvailabilityStoreFailureException e )
Expand Down
Expand Up @@ -22,13 +22,4 @@
public interface InvalidEpochExceptionHandler
{
void handle();

InvalidEpochExceptionHandler NONE = new InvalidEpochExceptionHandler()
{
@Override
public void handle()
{

}
};
}
Expand Up @@ -19,16 +19,15 @@
*/
package org.neo4j.kernel.ha;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import org.neo4j.cluster.ClusterSettings;
import org.neo4j.cluster.InstanceId;
import org.neo4j.cluster.client.ClusterClient;
Expand All @@ -42,25 +41,20 @@
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.kernel.InternalAbstractGraphDatabase;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberState;
import org.neo4j.kernel.ha.com.master.InvalidEpochException;
import org.neo4j.kernel.impl.ha.ClusterManager;
import org.neo4j.kernel.impl.ha.ClusterManager.RepairKit;
import org.neo4j.kernel.logging.DevNullLoggingService;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.test.CleanupRule;
import org.neo4j.test.ha.ClusterRule;
import org.neo4j.tooling.GlobalGraphOperations;

import static java.util.concurrent.TimeUnit.SECONDS;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import static org.neo4j.cluster.protocol.cluster.ClusterConfiguration.COORDINATOR;
import static org.neo4j.helpers.Predicates.not;
import static org.neo4j.kernel.impl.ha.ClusterManager.allSeesAllAsAvailable;
Expand All @@ -75,7 +69,7 @@ public class ClusterTopologyChangesIT
@Rule
public final CleanupRule cleanup = new CleanupRule();

protected ClusterManager.ManagedCluster cluster;
private ClusterManager.ManagedCluster cluster;

@Before
public void setup() throws Exception
Expand All @@ -88,12 +82,7 @@ public void setup() throws Exception
.startCluster();
}

@After
public void cleanup()
{
cluster = null;
}


@Test
public void masterRejoinsAfterFailureAndReelection() throws Throwable
{
Expand Down Expand Up @@ -242,14 +231,6 @@ public void enteredCluster( ClusterConfiguration clusterConfiguration )
assertEquals( new InstanceId( 2 ), coordinatorIdWhenReJoined.get() );
}

private static long nodeCountOn( HighlyAvailableGraphDatabase db )
{
try ( Transaction ignored = db.beginTx() )
{
return Iterables.count( GlobalGraphOperations.at( db ).getAllNodes() );
}
}

private static ClusterClient clusterClientOf( HighlyAvailableGraphDatabase db )
{
return db.getDependencyResolver().resolveDependency( ClusterClient.class );
Expand Down Expand Up @@ -298,18 +279,4 @@ private static void attemptTransactions( HighlyAvailableGraphDatabase... dbs )
}
}
}

private static void assertHasInvalidEpoch( HighlyAvailableGraphDatabase db )
{
InvalidEpochException invalidEpochException = null;
try
{
createNodeOn( db );
}
catch ( InvalidEpochException e )
{
invalidEpochException = e;
}
assertNotNull( "Expected InvalidEpochException was not thrown", invalidEpochException );
}
}

0 comments on commit eea557c

Please sign in to comment.