Skip to content

Commit

Permalink
New leader listener
Browse files Browse the repository at this point in the history
- combine memberId+term into LeaderInfo
- introduce specific LeaderListener
  • Loading branch information
hugofirth committed Mar 11, 2018
1 parent 0c4fd5b commit 872c458
Show file tree
Hide file tree
Showing 24 changed files with 151 additions and 118 deletions.
1 change: 0 additions & 1 deletion ISSUE_TEMPLATE.md
Expand Up @@ -3,7 +3,6 @@
Please note that GitHub issues are only meant for bug reports/feature requests. If you have questions on how to use Neo4j, please ask on [StackOverflow](http://stackoverflow.com/questions/tagged/neo4j) instead of creating an issue here. Please note that GitHub issues are only meant for bug reports/feature requests. If you have questions on how to use Neo4j, please ask on [StackOverflow](http://stackoverflow.com/questions/tagged/neo4j) instead of creating an issue here.


If you want to make a feature request then there is no guideline, so feel free to stop reading and open an issue. If you have a bug report however, please continue reading. If you want to make a feature request then there is no guideline, so feel free to stop reading and open an issue. If you have a bug report however, please continue reading.
osjf
To help us understand your issue, please specify important details, primarily: To help us understand your issue, please specify important details, primarily:


- Neo4j version: X.Y.Z - Neo4j version: X.Y.Z
Expand Down
Expand Up @@ -85,9 +85,7 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config
leaderRetryStrategy, leaderRetryStrategy,
platformModule.availabilityGuard, platformModule.availabilityGuard,
logProvider, logProvider,
replicationLimit, replicationLimit ) );
platformModule.clock
) );
} }


public RaftReplicator getReplicator() public RaftReplicator getReplicator()
Expand Down
Expand Up @@ -17,36 +17,39 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
package org.neo4j.causalclustering.discovery; package org.neo4j.causalclustering.core.consensus;


import java.io.Serializable; import java.io.Serializable;
import java.util.UUID;


import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;


/** // TODO: basics for Serializable
* Simple struct representing a raft leader - move to the raft module and make sure its used properly there. public class LeaderInfo implements Serializable
*/
public class RaftLeader implements Serializable
{ {
public static final LeaderInfo INITIAL = new LeaderInfo( null, -1 );


private final MemberId memberId;
private final long term; private final long term;
private final UUID memberUUID;


public RaftLeader( MemberId memberId, long term ) public LeaderInfo( MemberId memberId, long term )
{ {
this.memberUUID = memberId.getUuid(); this.memberId = memberId;
this.term = term; this.term = term;
} }


public MemberId memberId()
{
return memberId;
}

public long term() public long term()
{ {
return term; return term;
} }


public MemberId memberId() @Override
public String toString()
{ {
return new MemberId( memberUUID ); return "LeaderInfo{" + "memberId=" + memberId + ", term=" + term + '}';
} }
} }

@@ -0,0 +1,25 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.consensus;

public interface LeaderListener
{
void onLeaderSwitch( LeaderInfo leaderInfo );
}
Expand Up @@ -20,13 +20,12 @@
package org.neo4j.causalclustering.core.consensus; package org.neo4j.causalclustering.core.consensus;


import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.kernel.impl.util.Listener;


public interface LeaderLocator public interface LeaderLocator
{ {
MemberId getLeader() throws NoLeaderFoundException; MemberId getLeader() throws NoLeaderFoundException;


void registerListener( Listener<MemberId> listener ); void registerListener( LeaderListener listener );


void unregisterListener( Listener<MemberId> listener ); void unregisterListener( LeaderListener listener );
} }
Expand Up @@ -45,7 +45,6 @@
import org.neo4j.causalclustering.helper.VolatileFuture; import org.neo4j.causalclustering.helper.VolatileFuture;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Outbound; import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
Expand Down Expand Up @@ -186,17 +185,17 @@ private MemberId waitForLeader( long timeoutMillis, Predicate<MemberId> predicat
} }
} }


private Collection<Listener<MemberId>> leaderListeners = new ArrayList<>(); private Collection<LeaderListener> leaderListeners = new ArrayList<>();


@Override @Override
public synchronized void registerListener( Listener<MemberId> listener ) public synchronized void registerListener( LeaderListener listener )
{ {
leaderListeners.add( listener ); leaderListeners.add( listener );
listener.receive( state.leader() ); listener.onLeaderSwitch( state.leaderInfo() );
} }


@Override @Override
public synchronized void unregisterListener( Listener listener ) public synchronized void unregisterListener( LeaderListener listener )
{ {
leaderListeners.remove( listener ); leaderListeners.remove( listener );
} }
Expand All @@ -213,10 +212,10 @@ public synchronized ExposedRaftState state()


private void notifyLeaderChanges( Outcome outcome ) private void notifyLeaderChanges( Outcome outcome )
{ {
for ( Listener<MemberId> listener : leaderListeners ) LeaderInfo leaderInfo = new LeaderInfo( outcome.getLeader(), outcome.getTerm() );
for ( LeaderListener listener : leaderListeners )
{ {
//TODO: Update to pass leader and term, probably creating a leader-term pair/immutable struct listener.onLeaderSwitch( leaderInfo );
listener.receive( outcome.getLeader() );
} }
} }


Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;


import org.neo4j.causalclustering.core.consensus.LeaderInfo;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache; import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.log.RaftLog; import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.ReadableRaftLog; import org.neo4j.causalclustering.core.consensus.log.ReadableRaftLog;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class RaftState implements ReadableRaftState
private VoteState voteState; private VoteState voteState;


private MemberId leader; private MemberId leader;
private LeaderInfo leaderInfo = LeaderInfo.INITIAL;
private Set<MemberId> votesForMe = new HashSet<>(); private Set<MemberId> votesForMe = new HashSet<>();
private Set<MemberId> preVotesForMe = new HashSet<>(); private Set<MemberId> preVotesForMe = new HashSet<>();
private Set<MemberId> heartbeatResponses = new HashSet<>(); private Set<MemberId> heartbeatResponses = new HashSet<>();
Expand Down Expand Up @@ -124,6 +126,12 @@ public MemberId leader()
return leader; return leader;
} }


@Override
public LeaderInfo leaderInfo()
{
return leaderInfo;
}

@Override @Override
public long leaderCommit() public long leaderCommit()
{ {
Expand Down Expand Up @@ -218,6 +226,7 @@ public void update( Outcome outcome ) throws IOException


logIfLeaderChanged( outcome.getLeader() ); logIfLeaderChanged( outcome.getLeader() );
leader = outcome.getLeader(); leader = outcome.getLeader();
leaderInfo = new LeaderInfo( outcome.getLeader(), outcome.getTerm() );


leaderCommit = outcome.getLeaderCommit(); leaderCommit = outcome.getLeaderCommit();
votesForMe = outcome.getVotesForMe(); votesForMe = outcome.getVotesForMe();
Expand Down
Expand Up @@ -21,6 +21,7 @@


import java.util.Set; import java.util.Set;


import org.neo4j.causalclustering.core.consensus.LeaderInfo;
import org.neo4j.causalclustering.core.consensus.log.ReadableRaftLog; import org.neo4j.causalclustering.core.consensus.log.ReadableRaftLog;
import org.neo4j.causalclustering.core.consensus.roles.follower.FollowerStates; import org.neo4j.causalclustering.core.consensus.roles.follower.FollowerStates;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
Expand All @@ -37,6 +38,8 @@ public interface ReadableRaftState


MemberId leader(); MemberId leader();


LeaderInfo leaderInfo();

long leaderCommit(); long leaderCommit();


MemberId votedFor(); MemberId votedFor();
Expand Down
Expand Up @@ -19,10 +19,11 @@
*/ */
package org.neo4j.causalclustering.core.replication; package org.neo4j.causalclustering.core.replication;


import java.time.Clock;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;


import org.neo4j.causalclustering.core.consensus.LeaderInfo;
import org.neo4j.causalclustering.core.consensus.LeaderListener;
import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.RaftMessages;
Expand All @@ -33,15 +34,14 @@
import org.neo4j.causalclustering.messaging.Outbound; import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.graphdb.DatabaseShutdownException; import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


/** /**
* A replicator implementation suitable in a RAFT context. Will handle resending due to timeouts and leader switches. * A replicator implementation suitable in a RAFT context. Will handle resending due to timeouts and leader switches.
*/ */
public class RaftReplicator extends LifecycleAdapter implements Replicator, Listener<MemberId> public class RaftReplicator extends LifecycleAdapter implements Replicator, LeaderListener
{ {
private final MemberId me; private final MemberId me;
private final Outbound<MemberId,RaftMessages.RaftMessage> outbound; private final Outbound<MemberId,RaftMessages.RaftMessage> outbound;
Expand All @@ -53,11 +53,10 @@ public class RaftReplicator extends LifecycleAdapter implements Replicator, List
private final TimeoutStrategy leaderTimeoutStrategy; private final TimeoutStrategy leaderTimeoutStrategy;
private final Log log; private final Log log;
private final Throttler throttler; private final Throttler throttler;
private final Clock clock;


public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound<MemberId,RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool, public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound<MemberId,RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool,
ProgressTracker progressTracker, TimeoutStrategy progressTimeoutStrategy, TimeoutStrategy leaderTimeoutStrategy, ProgressTracker progressTracker, TimeoutStrategy progressTimeoutStrategy, TimeoutStrategy leaderTimeoutStrategy,
AvailabilityGuard availabilityGuard, LogProvider logProvider, long replicationLimit, Clock clock ) AvailabilityGuard availabilityGuard, LogProvider logProvider, long replicationLimit )
{ {
this.me = me; this.me = me;
this.outbound = outbound; this.outbound = outbound;
Expand All @@ -68,7 +67,6 @@ public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound<Member
this.availabilityGuard = availabilityGuard; this.availabilityGuard = availabilityGuard;
this.throttler = new Throttler( replicationLimit ); this.throttler = new Throttler( replicationLimit );
this.leaderLocator = leaderLocator; this.leaderLocator = leaderLocator;
this.clock = clock;
leaderLocator.registerListener( this ); leaderLocator.registerListener( this );
log = logProvider.getLog( getClass() ); log = logProvider.getLog( getClass() );
} }
Expand Down Expand Up @@ -137,7 +135,7 @@ private Future<Object> replicate0( ReplicatedContent command, boolean trackResul
} }


@Override @Override
public void receive( MemberId leader ) public void onLeaderSwitch( LeaderInfo leaderInfo )
{ {
progressTracker.triggerReplicationEvent(); progressTracker.triggerReplicationEvent();
} }
Expand Down
Expand Up @@ -21,15 +21,16 @@


import java.util.function.BooleanSupplier; import java.util.function.BooleanSupplier;


import org.neo4j.causalclustering.core.consensus.LeaderInfo;
import org.neo4j.causalclustering.core.consensus.LeaderListener;
import org.neo4j.causalclustering.core.consensus.RaftMachine; import org.neo4j.causalclustering.core.consensus.RaftMachine;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.kernel.impl.util.Listener;


/** /**
* Determines whether it is safe to reuse freed ids, based on current leader and tracking its own transactions. * Determines whether it is safe to reuse freed ids, based on current leader and tracking its own transactions.
* This should guarantee that a single freed id only ends up on a single core. * This should guarantee that a single freed id only ends up on a single core.
*/ */
public class IdReusabilityCondition implements BooleanSupplier, Listener<MemberId> public class IdReusabilityCondition implements BooleanSupplier, LeaderListener
{ {
private static final BooleanSupplier ALWAYS_FALSE = () -> false; private static final BooleanSupplier ALWAYS_FALSE = () -> false;


Expand All @@ -54,9 +55,9 @@ public boolean getAsBoolean()
} }


@Override @Override
public void receive( MemberId newLeader ) public void onLeaderSwitch( LeaderInfo leaderInfo )
{ {
if ( myself.equals( newLeader ) ) if ( myself.equals( leaderInfo.memberId() ) )
{ {
// We just became leader // We just became leader
currentSupplier = new LeaderIdReusabilityCondition( commandIndexTracker, raftMachine ); currentSupplier = new LeaderIdReusabilityCondition( commandIndexTracker, raftMachine );
Expand Down
Expand Up @@ -19,9 +19,9 @@
*/ */
package org.neo4j.causalclustering.discovery; package org.neo4j.causalclustering.discovery;


import org.neo4j.causalclustering.core.consensus.LeaderInfo;
import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure; import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure;
import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;


/** /**
* Extends upon the topology service with a few extra services, connected to * Extends upon the topology service with a few extra services, connected to
Expand All @@ -47,13 +47,10 @@ public interface CoreTopologyService extends TopologyService
* Sets or updates the leader memberId for the given database (i.e. Raft consensus group). * Sets or updates the leader memberId for the given database (i.e. Raft consensus group).
* This is intended for informational purposes **only**, e.g. in {@link ClusterOverviewProcedure}. * This is intended for informational purposes **only**, e.g. in {@link ClusterOverviewProcedure}.
* The leadership information should otherwise be communicated via raft as before. * The leadership information should otherwise be communicated via raft as before.
* * @param leaderInfo Information about the new leader
* @param memberId The member ID to declare as the new leader
* @param dbName The database name for which memberId is the new leader * @param dbName The database name for which memberId is the new leader
* @param term The raft term for which memberId is the leader of dbName
*
*/ */
void setLeader( MemberId memberId, String dbName, long term ); void setLeader( LeaderInfo leaderInfo, String dbName );


interface Listener interface Listener
{ {
Expand Down
Expand Up @@ -40,13 +40,13 @@
import java.util.stream.Stream; import java.util.stream.Stream;


import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.LeaderInfo;
import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;


import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.refuse_to_be_leader; import static org.neo4j.causalclustering.core.CausalClusteringSettings.refuse_to_be_leader;
import static java.util.stream.Stream.concat; import static java.util.stream.Stream.concat;
Expand Down Expand Up @@ -145,7 +145,7 @@ private static ClusterId getClusterId( HazelcastInstance hazelcastInstance, Stri
return uuid != null ? new ClusterId( uuid ) : null; return uuid != null ? new ClusterId( uuid ) : null;
} }


public static Set<String> getDBNames( HazelcastInstance hazelcastInstance ) private static Set<String> getDBNames( HazelcastInstance hazelcastInstance )
{ {
IMap<String, UUID> uuidPerDbCluster = hazelcastInstance.getMap( CLUSTER_UUID_DB_NAME_MAP ); IMap<String, UUID> uuidPerDbCluster = hazelcastInstance.getMap( CLUSTER_UUID_DB_NAME_MAP );
return uuidPerDbCluster.keySet(); return uuidPerDbCluster.keySet();
Expand Down Expand Up @@ -211,27 +211,27 @@ private static Map<MemberId,ReadReplicaInfo> readReplicas( HazelcastInstance haz
return result; return result;
} }


static boolean casLeaders( HazelcastInstance hazelcastInstance, MemberId leader, long term, String dbName ) static void casLeaders( HazelcastInstance hazelcastInstance, LeaderInfo leaderInfo, String dbName )
{ {
IAtomicReference<RaftLeader> leaderRef = hazelcastInstance.getAtomicReference( DB_NAME_LEADER_TERM_PREFIX + dbName ); IAtomicReference<LeaderInfo> leaderRef = hazelcastInstance.getAtomicReference( DB_NAME_LEADER_TERM_PREFIX + dbName );


RaftLeader expected = leaderRef.get(); LeaderInfo expected = leaderRef.get();


boolean noUpdate = Optional.ofNullable( expected ).map( RaftLeader::memberId ).equals( Optional.ofNullable( leader ) ); boolean noUpdate = Optional.ofNullable( expected ).map( LeaderInfo::memberId ).equals( Optional.ofNullable( leaderInfo.memberId() ) );


boolean greaterOrEqualTermExists = Optional.ofNullable( expected ).map(l -> l.term() >= term ).orElse( false ); boolean greaterOrEqualTermExists = Optional.ofNullable( expected ).map(l -> l.term() >= leaderInfo.term() ).orElse( false );


if ( greaterOrEqualTermExists || noUpdate ) if ( greaterOrEqualTermExists || noUpdate )
{ {
return false; return;
} }


return leaderRef.compareAndSet( expected, new RaftLeader( leader, term ) ); leaderRef.compareAndSet( expected, leaderInfo );
} }


static Optional<RaftLeader> getLeaderForDBName( HazelcastInstance hazelcastInstance, String dbName ) private static Optional<LeaderInfo> getLeaderForDBName( HazelcastInstance hazelcastInstance, String dbName )
{ {
IAtomicReference<RaftLeader> leader = hazelcastInstance.getAtomicReference( DB_NAME_LEADER_TERM_PREFIX + dbName ); IAtomicReference<LeaderInfo> leader = hazelcastInstance.getAtomicReference( DB_NAME_LEADER_TERM_PREFIX + dbName );
return Optional.ofNullable( leader.get() ); return Optional.ofNullable( leader.get() );
} }


Expand Down

0 comments on commit 872c458

Please sign in to comment.