Skip to content

Commit

Permalink
Improved how current cluster member role is determined
Browse files Browse the repository at this point in the history
ClusterMembers now use HighAvailabilityMemberStateMachine to determine role of
the current instance. Since state machine is able to deal with out of order
messages it knows better what the actual role is. State of other cluster members
is tracked by ObservedClusterMembers that uses cluster events to figure out what
roles, uris, etc. There is no guarantee that information in ObservedClusterMembers
is fully up-to-date because of possible events reordering.

Signed-off-by: @MishaDemianenko
  • Loading branch information
lutovich committed Oct 27, 2015
1 parent 08ade8e commit 3598df6
Show file tree
Hide file tree
Showing 11 changed files with 859 additions and 623 deletions.
Expand Up @@ -73,6 +73,7 @@
import org.neo4j.kernel.ha.cluster.SwitchToSlave; import org.neo4j.kernel.ha.cluster.SwitchToSlave;
import org.neo4j.kernel.ha.cluster.member.ClusterMembers; import org.neo4j.kernel.ha.cluster.member.ClusterMembers;
import org.neo4j.kernel.ha.cluster.member.HighAvailabilitySlaves; import org.neo4j.kernel.ha.cluster.member.HighAvailabilitySlaves;
import org.neo4j.kernel.ha.cluster.member.ObservedClusterMembers;
import org.neo4j.kernel.ha.com.RequestContextFactory; import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.DefaultSlaveFactory; import org.neo4j.kernel.ha.com.master.DefaultSlaveFactory;
import org.neo4j.kernel.ha.com.master.Master; import org.neo4j.kernel.ha.com.master.Master;
Expand Down Expand Up @@ -370,11 +371,16 @@ public void elected( String role, InstanceId instanceId, URI electedMember )
clusterEventsDelegateInvocationHandler.setDelegate( localClusterEvents ); clusterEventsDelegateInvocationHandler.setDelegate( localClusterEvents );
clusterMemberAvailabilityDelegateInvocationHandler.setDelegate( localClusterMemberAvailability ); clusterMemberAvailabilityDelegateInvocationHandler.setDelegate( localClusterMemberAvailability );


members = dependencies.satisfyDependency( new ClusterMembers( clusterClient, clusterClient, clusterEvents, ObservedClusterMembers observedMembers = new ObservedClusterMembers( logging, clusterClient, clusterClient,
config.get( ClusterSettings.server_id ) ) ); clusterEvents, config.get( ClusterSettings.server_id ) );
memberStateMachine = paxosLife.add( new HighAvailabilityMemberStateMachine( memberContext, availabilityGuard,
members, clusterEvents, clusterClient, logging.getMessagesLog( HighAvailabilityMemberStateMachine.class HighAvailabilityMemberStateMachine stateMachine = new HighAvailabilityMemberStateMachine( memberContext,
) ) ); availabilityGuard, observedMembers, clusterEvents, clusterClient,
logging.getMessagesLog( HighAvailabilityMemberStateMachine.class ) );

members = dependencies.satisfyDependency( new ClusterMembers( observedMembers, stateMachine ) );

memberStateMachine = paxosLife.add( stateMachine );


HighAvailabilityConsoleLogger highAvailabilityConsoleLogger = new HighAvailabilityConsoleLogger( logging HighAvailabilityConsoleLogger highAvailabilityConsoleLogger = new HighAvailabilityConsoleLogger( logging
.getConsoleLog( HighAvailabilityConsoleLogger.class ), config.get( ClusterSettings .getConsoleLog( HighAvailabilityConsoleLogger.class ), config.get( ClusterSettings
Expand Down Expand Up @@ -714,12 +720,12 @@ public HighAvailabilityMemberState getInstanceState()


public String role() public String role()
{ {
return members.getSelf().getHARole(); return members.getCurrentMemberRole();
} }


public boolean isMaster() public boolean isMaster()
{ {
return memberStateMachine.getCurrentState() == HighAvailabilityMemberState.MASTER; return HighAvailabilityModeSwitcher.MASTER.equals( role() );
} }


@Override @Override
Expand Down
Expand Up @@ -28,7 +28,7 @@
import org.neo4j.helpers.Listeners; import org.neo4j.helpers.Listeners;
import org.neo4j.helpers.collection.Iterables; import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.ha.cluster.member.ClusterMembers; import org.neo4j.kernel.ha.cluster.member.ObservedClusterMembers;
import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.util.StringLogger; import org.neo4j.kernel.impl.util.StringLogger;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand All @@ -48,7 +48,7 @@ public class HighAvailabilityMemberStateMachine extends LifecycleAdapter impleme
private final AvailabilityGuard availabilityGuard; private final AvailabilityGuard availabilityGuard;
private final ClusterMemberEvents events; private final ClusterMemberEvents events;
private final StringLogger logger; private final StringLogger logger;
private final ClusterMembers members; private final ObservedClusterMembers observedMembers;
private final Election election; private final Election election;


private Iterable<HighAvailabilityMemberListener> memberListeners = Listeners.newListeners(); private Iterable<HighAvailabilityMemberListener> memberListeners = Listeners.newListeners();
Expand All @@ -57,12 +57,14 @@ public class HighAvailabilityMemberStateMachine extends LifecycleAdapter impleme


public HighAvailabilityMemberStateMachine( HighAvailabilityMemberContext context, public HighAvailabilityMemberStateMachine( HighAvailabilityMemberContext context,
AvailabilityGuard availabilityGuard, AvailabilityGuard availabilityGuard,
ClusterMembers members, ClusterMemberEvents events, Election election, ObservedClusterMembers observedMembers,
ClusterMemberEvents events,
Election election,
StringLogger logger ) StringLogger logger )
{ {
this.context = context; this.context = context;
this.availabilityGuard = availabilityGuard; this.availabilityGuard = availabilityGuard;
this.members = members; this.observedMembers = observedMembers;
this.events = events; this.events = events;
this.election = election; this.election = election;
this.logger = logger; this.logger = logger;
Expand Down Expand Up @@ -337,12 +339,12 @@ public void notify( HighAvailabilityMemberListener listener )


private long getAliveCount() private long getAliveCount()
{ {
return Iterables.count( Iterables.filter( ClusterMembers.ALIVE, members.getMembers() ) ); return Iterables.count( observedMembers.getAliveMembers() );
} }


private long getTotalCount() private long getTotalCount()
{ {
return Iterables.count( members.getMembers() ); return Iterables.count( observedMembers.getMembers() );
} }
} }
} }
Expand Up @@ -19,40 +19,22 @@
*/ */
package org.neo4j.kernel.ha.cluster.member; package org.neo4j.kernel.ha.cluster.member;


import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.neo4j.cluster.InstanceId; import org.neo4j.cluster.InstanceId;
import org.neo4j.cluster.member.ClusterMemberEvents; import org.neo4j.function.Function;
import org.neo4j.cluster.member.ClusterMemberListener;
import org.neo4j.cluster.protocol.cluster.Cluster;
import org.neo4j.cluster.protocol.cluster.ClusterConfiguration;
import org.neo4j.cluster.protocol.cluster.ClusterListener;
import org.neo4j.cluster.protocol.heartbeat.Heartbeat;
import org.neo4j.cluster.protocol.heartbeat.HeartbeatListener;
import org.neo4j.helpers.Predicate; import org.neo4j.helpers.Predicate;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberState;
import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberStateMachine;
import org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher; import org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.util.CopyOnWriteHashMap;


/** /**
* Keeps an up to date list of members, their roles and availability for * Keeps a list of members, their roles and availability for display for example in JMX or REST.
* display for example in JMX. * <p>
* Member state info is based on {@link ObservedClusterMembers} and {@link HighAvailabilityMemberStateMachine}.
* State of the current member is always valid, all other instances are only 'best effort'.
*/ */
public class ClusterMembers public class ClusterMembers
{ {
public static final Predicate<ClusterMember> ALIVE = new Predicate<ClusterMember>()
{
@Override
public boolean accept( ClusterMember item )
{
return item.isAlive();
}
};

private final InstanceId me;

public static Predicate<ClusterMember> inRole( final String role ) public static Predicate<ClusterMember> inRole( final String role )
{ {
return new Predicate<ClusterMember>() return new Predicate<ClusterMember>()
Expand All @@ -77,161 +59,70 @@ public boolean accept( ClusterMember item )
}; };
} }


private final Map<InstanceId, ClusterMember> members = new CopyOnWriteHashMap<>(); private final ObservedClusterMembers observedClusterMembers;

private final HighAvailabilityMemberStateMachine stateMachine;
public ClusterMembers( Cluster cluster, Heartbeat heartbeat, ClusterMemberEvents events, InstanceId me )
{
this.me = me;
cluster.addClusterListener( new HAMClusterListener() );
heartbeat.addHeartbeatListener( new HAMHeartbeatListener() );
events.addClusterMemberListener( new HAMClusterMemberListener() );
}


public Iterable<ClusterMember> getMembers() public ClusterMembers( ObservedClusterMembers observedClusterMembers,
HighAvailabilityMemberStateMachine stateMachine )
{ {
return members.values(); this.observedClusterMembers = observedClusterMembers;
this.stateMachine = stateMachine;
} }


public ClusterMember getSelf() public ClusterMember getCurrentMember()
{ {
for ( ClusterMember clusterMember : getMembers() ) ClusterMember currentMember = observedClusterMembers.getCurrentMember();
if ( currentMember == null )
{ {
if ( clusterMember.getInstanceId().equals( me ) ) return null;
{
return clusterMember;
}
} }
return null; String currentRole = roleOf( stateMachine.getCurrentState() );
} return currentMember.availableAs( currentRole, currentMember.getHAUri(), currentMember.getStoreId() );

public synchronized void waitForEvent( long timeout ) throws InterruptedException
{
wait( timeout );
} }


private synchronized void eventOccurred() public String getCurrentMemberRole()
{ {
notifyAll(); ClusterMember currentMember = getCurrentMember();
return (currentMember == null) ? HighAvailabilityModeSwitcher.UNKNOWN : currentMember.getHARole();
} }


private ClusterMember getMember( InstanceId server ) public Iterable<ClusterMember> getMembers()
{ {
ClusterMember clusterMember = members.get( server ); return getActualMembers( observedClusterMembers.getMembers() );
if ( clusterMember == null )
{
throw new IllegalStateException( "Member " + server + " not found in " + new HashMap<>( members ) );
}
return clusterMember;
} }


private class HAMClusterListener extends ClusterListener.Adapter public Iterable<ClusterMember> getAliveMembers()
{ {
@Override return getActualMembers( observedClusterMembers.getAliveMembers() );
public void enteredCluster( ClusterConfiguration configuration )
{
Map<InstanceId, ClusterMember> newMembers = new HashMap<>();
for ( InstanceId memberClusterId : configuration.getMemberIds() )
{
newMembers.put( memberClusterId, new ClusterMember( memberClusterId ) );
}
members.clear();
members.putAll( newMembers );
}

@Override
public void leftCluster()
{
members.clear();
}

@Override
public void joinedCluster( InstanceId member, URI memberUri )
{
members.put( member, new ClusterMember( member ) );
}

@Override
public void leftCluster( InstanceId instanceId, URI member )
{
members.remove( instanceId );
}
} }


private class HAMClusterMemberListener extends ClusterMemberListener.Adapter private Iterable<ClusterMember> getActualMembers( Iterable<ClusterMember> members )
{ {
private InstanceId masterId = null; final ClusterMember currentMember = getCurrentMember();

if ( currentMember == null )
@Override
public void coordinatorIsElected( InstanceId coordinatorId )
{
if ( coordinatorId.equals( this.masterId ) )
{
return;
}
this.masterId = coordinatorId;
Map<InstanceId, ClusterMember> newMembers = new HashMap<>();
for ( Map.Entry<InstanceId, ClusterMember> memberEntry : members.entrySet() )
{
newMembers.put( memberEntry.getKey(), memberEntry.getValue().unavailableAs(
HighAvailabilityModeSwitcher.MASTER ).unavailableAs( HighAvailabilityModeSwitcher.SLAVE ) );
}
members.clear();
members.putAll( newMembers );
}

@Override
public void memberIsAvailable( String role, InstanceId instanceId, URI roleUri, StoreId storeId )
{
members.put( instanceId, getMember( instanceId ).availableAs( role, roleUri, storeId ) );
eventOccurred();
}

@Override
public void memberIsUnavailable( String role, InstanceId unavailableId )
{ {
ClusterMember member; return members;
try
{
member = getMember( unavailableId );
members.put( unavailableId, member.unavailableAs( role ) );
}
catch ( IllegalStateException e )
{
// Unknown member
}
} }

return Iterables.map( new Function<ClusterMember,ClusterMember>()
@Override
public void memberIsFailed( InstanceId instanceId )
{ {
// Make it unavailable for all its current roles @Override
ClusterMember member = getMember( instanceId ); public ClusterMember apply( ClusterMember member ) throws RuntimeException
for ( String role : member.getRoles() )
{ {
member = member.unavailableAs( role ); // ClusterMember is copy-on-write return currentMember.getInstanceId().equals( member.getInstanceId() ) ? currentMember : member;
} }
members.put( instanceId, member ); // replace with the new copy }, members );
}
} }


private class HAMHeartbeatListener extends HeartbeatListener.Adapter private static String roleOf( HighAvailabilityMemberState state )
{ {
@Override switch ( state )
public void failed( InstanceId server )
{ {
if ( members.containsKey( server ) ) case MASTER:
{ return HighAvailabilityModeSwitcher.MASTER;
members.put( server, getMember( server ).failed() ); case SLAVE:
} return HighAvailabilityModeSwitcher.SLAVE;
} default:

return HighAvailabilityModeSwitcher.UNKNOWN;
@Override
public void alive( InstanceId server )
{
if ( members.containsKey( server ) )
{
members.put( server, getMember( server ).alive() );
}
} }
} }
} }
Expand Up @@ -89,9 +89,8 @@ public Iterable<Slave> getSlaves()
// Return all cluster members which are currently SLAVEs, // Return all cluster members which are currently SLAVEs,
// are alive, and convert to Slave with a cache if possible // are alive, and convert to Slave with a cache if possible
return map( withDefaults( slaveForMember(), Functions.map( slaves ) ), return map( withDefaults( slaveForMember(), Functions.map( slaves ) ),
filter( ClusterMembers.ALIVE,
filter( inRole( HighAvailabilityModeSwitcher.SLAVE ), filter( inRole( HighAvailabilityModeSwitcher.SLAVE ),
clusterMembers.getMembers() ) ) ); clusterMembers.getAliveMembers() ) );
} }


@Override @Override
Expand Down

0 comments on commit 3598df6

Please sign in to comment.