diff --git a/ISSUE_TEMPLATE.md b/ISSUE_TEMPLATE.md index 6f6d40abfb826..591b3fe579d91 100644 --- a/ISSUE_TEMPLATE.md +++ b/ISSUE_TEMPLATE.md @@ -3,7 +3,6 @@ Please note that GitHub issues are only meant for bug reports/feature requests. If you have questions on how to use Neo4j, please ask on [StackOverflow](http://stackoverflow.com/questions/tagged/neo4j) instead of creating an issue here. If you want to make a feature request then there is no guideline, so feel free to stop reading and open an issue. If you have a bug report however, please continue reading. -osjf To help us understand your issue, please specify important details, primarily: - Neo4j version: X.Y.Z diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java index cbbbf4ebec324..c8bc00c22d8d1 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java @@ -85,9 +85,7 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config leaderRetryStrategy, platformModule.availabilityGuard, logProvider, - replicationLimit, - platformModule.clock - ) ); + replicationLimit ) ); } public RaftReplicator getReplicator() diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/RaftLeader.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderInfo.java similarity index 68% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/RaftLeader.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderInfo.java index 6595f42129629..c8f7aa3317a34 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/RaftLeader.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderInfo.java @@ -17,36 +17,39 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.discovery; +package org.neo4j.causalclustering.core.consensus; import java.io.Serializable; -import java.util.UUID; import org.neo4j.causalclustering.identity.MemberId; -/** - * Simple struct representing a raft leader - move to the raft module and make sure its used properly there. - */ -public class RaftLeader implements Serializable +// TODO: basics for Serializable +public class LeaderInfo implements Serializable { + public static final LeaderInfo INITIAL = new LeaderInfo( null, -1 ); + private final MemberId memberId; private final long term; - private final UUID memberUUID; - public RaftLeader( MemberId memberId, long term ) + public LeaderInfo( MemberId memberId, long term ) { - this.memberUUID = memberId.getUuid(); + this.memberId = memberId; this.term = term; } + public MemberId memberId() + { + return memberId; + } + public long term() { return term; } - public MemberId memberId() + @Override + public String toString() { - return new MemberId( memberUUID ); + return "LeaderInfo{" + "memberId=" + memberId + ", term=" + term + '}'; } } - diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderListener.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderListener.java new file mode 100644 index 0000000000000..a6368ff423aaa --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderListener.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.core.consensus; + +public interface LeaderListener +{ + void onLeaderSwitch( LeaderInfo leaderInfo ); +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderLocator.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderLocator.java index 76e8ea43a706e..73ba105c67f44 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderLocator.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/LeaderLocator.java @@ -20,13 +20,12 @@ package org.neo4j.causalclustering.core.consensus; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.kernel.impl.util.Listener; public interface LeaderLocator { MemberId getLeader() throws NoLeaderFoundException; - void registerListener( Listener listener ); + void registerListener( LeaderListener listener ); - void unregisterListener( Listener listener ); + void unregisterListener( LeaderListener listener ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java index 5333f97df7a0c..71d77510c132d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java @@ -45,7 +45,6 @@ import org.neo4j.causalclustering.helper.VolatileFuture; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.messaging.Outbound; -import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -186,17 +185,17 @@ private MemberId waitForLeader( long timeoutMillis, Predicate predicat } } - private Collection> leaderListeners = new ArrayList<>(); + private Collection leaderListeners = new ArrayList<>(); @Override - public synchronized void registerListener( Listener listener ) + public synchronized void registerListener( LeaderListener listener ) { leaderListeners.add( listener ); - listener.receive( state.leader() ); + listener.onLeaderSwitch( state.leaderInfo() ); } @Override - public synchronized void unregisterListener( Listener listener ) + public synchronized void unregisterListener( LeaderListener listener ) { leaderListeners.remove( listener ); } @@ -213,10 +212,10 @@ public synchronized ExposedRaftState state() private void notifyLeaderChanges( Outcome outcome ) { - for ( Listener listener : leaderListeners ) + LeaderInfo leaderInfo = new LeaderInfo( outcome.getLeader(), outcome.getTerm() ); + for ( LeaderListener listener : leaderListeners ) { - //TODO: Update to pass leader and term, probably creating a leader-term pair/immutable struct - listener.receive( outcome.getLeader() ); + listener.onLeaderSwitch( leaderInfo ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/RaftState.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/RaftState.java index f86d32f635770..796d554809c19 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/RaftState.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/RaftState.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.Set; +import org.neo4j.causalclustering.core.consensus.LeaderInfo; import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache; import org.neo4j.causalclustering.core.consensus.log.RaftLog; import org.neo4j.causalclustering.core.consensus.log.ReadableRaftLog; @@ -52,6 +53,7 @@ public class RaftState implements ReadableRaftState private VoteState voteState; private MemberId leader; + private LeaderInfo leaderInfo = LeaderInfo.INITIAL; private Set votesForMe = new HashSet<>(); private Set preVotesForMe = new HashSet<>(); private Set heartbeatResponses = new HashSet<>(); @@ -124,6 +126,12 @@ public MemberId leader() return leader; } + @Override + public LeaderInfo leaderInfo() + { + return leaderInfo; + } + @Override public long leaderCommit() { @@ -218,6 +226,7 @@ public void update( Outcome outcome ) throws IOException logIfLeaderChanged( outcome.getLeader() ); leader = outcome.getLeader(); + leaderInfo = new LeaderInfo( outcome.getLeader(), outcome.getTerm() ); leaderCommit = outcome.getLeaderCommit(); votesForMe = outcome.getVotesForMe(); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/ReadableRaftState.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/ReadableRaftState.java index eef9d9ecd0de2..dc3f84e1b06c5 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/ReadableRaftState.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/ReadableRaftState.java @@ -21,6 +21,7 @@ import java.util.Set; +import org.neo4j.causalclustering.core.consensus.LeaderInfo; import org.neo4j.causalclustering.core.consensus.log.ReadableRaftLog; import org.neo4j.causalclustering.core.consensus.roles.follower.FollowerStates; import org.neo4j.causalclustering.identity.MemberId; @@ -37,6 +38,8 @@ public interface ReadableRaftState MemberId leader(); + LeaderInfo leaderInfo(); + long leaderCommit(); MemberId votedFor(); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java index 5f034f979ef94..7abeff6b6df41 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java @@ -19,10 +19,11 @@ */ package org.neo4j.causalclustering.core.replication; -import java.time.Clock; import java.util.concurrent.Future; import java.util.function.BiConsumer; +import org.neo4j.causalclustering.core.consensus.LeaderInfo; +import org.neo4j.causalclustering.core.consensus.LeaderListener; import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.core.consensus.RaftMessages; @@ -33,7 +34,6 @@ import org.neo4j.causalclustering.messaging.Outbound; import org.neo4j.graphdb.DatabaseShutdownException; import org.neo4j.kernel.AvailabilityGuard; -import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -41,7 +41,7 @@ /** * A replicator implementation suitable in a RAFT context. Will handle resending due to timeouts and leader switches. */ -public class RaftReplicator extends LifecycleAdapter implements Replicator, Listener +public class RaftReplicator extends LifecycleAdapter implements Replicator, LeaderListener { private final MemberId me; private final Outbound outbound; @@ -53,11 +53,10 @@ public class RaftReplicator extends LifecycleAdapter implements Replicator, List private final TimeoutStrategy leaderTimeoutStrategy; private final Log log; private final Throttler throttler; - private final Clock clock; public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound outbound, LocalSessionPool sessionPool, ProgressTracker progressTracker, TimeoutStrategy progressTimeoutStrategy, TimeoutStrategy leaderTimeoutStrategy, - AvailabilityGuard availabilityGuard, LogProvider logProvider, long replicationLimit, Clock clock ) + AvailabilityGuard availabilityGuard, LogProvider logProvider, long replicationLimit ) { this.me = me; this.outbound = outbound; @@ -68,7 +67,6 @@ public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound replicate0( ReplicatedContent command, boolean trackResul } @Override - public void receive( MemberId leader ) + public void onLeaderSwitch( LeaderInfo leaderInfo ) { progressTracker.triggerReplicationEvent(); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/id/IdReusabilityCondition.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/id/IdReusabilityCondition.java index 48bdefd7b4b5f..d20640ae445ad 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/id/IdReusabilityCondition.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/id/IdReusabilityCondition.java @@ -21,15 +21,16 @@ import java.util.function.BooleanSupplier; +import org.neo4j.causalclustering.core.consensus.LeaderInfo; +import org.neo4j.causalclustering.core.consensus.LeaderListener; import org.neo4j.causalclustering.core.consensus.RaftMachine; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.kernel.impl.util.Listener; /** * Determines whether it is safe to reuse freed ids, based on current leader and tracking its own transactions. * This should guarantee that a single freed id only ends up on a single core. */ -public class IdReusabilityCondition implements BooleanSupplier, Listener +public class IdReusabilityCondition implements BooleanSupplier, LeaderListener { private static final BooleanSupplier ALWAYS_FALSE = () -> false; @@ -54,9 +55,9 @@ public boolean getAsBoolean() } @Override - public void receive( MemberId newLeader ) + public void onLeaderSwitch( LeaderInfo leaderInfo ) { - if ( myself.equals( newLeader ) ) + if ( myself.equals( leaderInfo.memberId() ) ) { // We just became leader currentSupplier = new LeaderIdReusabilityCondition( commandIndexTracker, raftMachine ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java index 0da36acbbef50..2aa0f72328575 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/CoreTopologyService.java @@ -19,9 +19,9 @@ */ package org.neo4j.causalclustering.discovery; +import org.neo4j.causalclustering.core.consensus.LeaderInfo; import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure; import org.neo4j.causalclustering.identity.ClusterId; -import org.neo4j.causalclustering.identity.MemberId; /** * Extends upon the topology service with a few extra services, connected to @@ -47,13 +47,10 @@ public interface CoreTopologyService extends TopologyService * Sets or updates the leader memberId for the given database (i.e. Raft consensus group). * This is intended for informational purposes **only**, e.g. in {@link ClusterOverviewProcedure}. * The leadership information should otherwise be communicated via raft as before. - * - * @param memberId The member ID to declare as the new leader + * @param leaderInfo Information about the new leader * @param dbName The database name for which memberId is the new leader - * @param term The raft term for which memberId is the leader of dbName - * */ - void setLeader( MemberId memberId, String dbName, long term ); + void setLeader( LeaderInfo leaderInfo, String dbName ); interface Listener { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java index c56c702d39993..c3717478deed5 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopology.java @@ -40,13 +40,13 @@ import java.util.stream.Stream; import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.core.consensus.LeaderInfo; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.configuration.Config; import org.neo4j.logging.Log; -import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static org.neo4j.causalclustering.core.CausalClusteringSettings.refuse_to_be_leader; import static java.util.stream.Stream.concat; @@ -145,7 +145,7 @@ private static ClusterId getClusterId( HazelcastInstance hazelcastInstance, Stri return uuid != null ? new ClusterId( uuid ) : null; } - public static Set getDBNames( HazelcastInstance hazelcastInstance ) + private static Set getDBNames( HazelcastInstance hazelcastInstance ) { IMap uuidPerDbCluster = hazelcastInstance.getMap( CLUSTER_UUID_DB_NAME_MAP ); return uuidPerDbCluster.keySet(); @@ -211,27 +211,27 @@ private static Map readReplicas( HazelcastInstance haz return result; } - static boolean casLeaders( HazelcastInstance hazelcastInstance, MemberId leader, long term, String dbName ) + static void casLeaders( HazelcastInstance hazelcastInstance, LeaderInfo leaderInfo, String dbName ) { - IAtomicReference leaderRef = hazelcastInstance.getAtomicReference( DB_NAME_LEADER_TERM_PREFIX + dbName ); + IAtomicReference leaderRef = hazelcastInstance.getAtomicReference( DB_NAME_LEADER_TERM_PREFIX + dbName ); - RaftLeader expected = leaderRef.get(); + LeaderInfo expected = leaderRef.get(); - boolean noUpdate = Optional.ofNullable( expected ).map( RaftLeader::memberId ).equals( Optional.ofNullable( leader ) ); + boolean noUpdate = Optional.ofNullable( expected ).map( LeaderInfo::memberId ).equals( Optional.ofNullable( leaderInfo.memberId() ) ); - boolean greaterOrEqualTermExists = Optional.ofNullable( expected ).map(l -> l.term() >= term ).orElse( false ); + boolean greaterOrEqualTermExists = Optional.ofNullable( expected ).map(l -> l.term() >= leaderInfo.term() ).orElse( false ); if ( greaterOrEqualTermExists || noUpdate ) { - return false; + return; } - return leaderRef.compareAndSet( expected, new RaftLeader( leader, term ) ); + leaderRef.compareAndSet( expected, leaderInfo ); } - static Optional getLeaderForDBName( HazelcastInstance hazelcastInstance, String dbName ) + private static Optional getLeaderForDBName( HazelcastInstance hazelcastInstance, String dbName ) { - IAtomicReference leader = hazelcastInstance.getAtomicReference( DB_NAME_LEADER_TERM_PREFIX + dbName ); + IAtomicReference leader = hazelcastInstance.getAtomicReference( DB_NAME_LEADER_TERM_PREFIX + dbName ); return Optional.ofNullable( leader.get() ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java index 672d862aaa634..0938c2990f87e 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java @@ -38,6 +38,7 @@ import com.hazelcast.core.MembershipListener; import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.core.consensus.LeaderInfo; import org.neo4j.causalclustering.helper.RobustJobSchedulerWrapper; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; @@ -82,8 +83,7 @@ public class HazelcastCoreTopologyService extends AbstractTopologyService implem private String membershipRegistrationId; private JobScheduler.JobHandle refreshJob; - private volatile MemberId localLeader; - private volatile long term; + private volatile LeaderInfo leaderInfo = LeaderInfo.INITIAL; private volatile HazelcastInstance hazelcastInstance; private volatile ReadReplicaTopology readReplicaTopology = ReadReplicaTopology.EMPTY; private volatile CoreTopology coreTopology = CoreTopology.EMPTY; @@ -105,7 +105,6 @@ protected HazelcastCoreTopologyService( Config config, MemberId myself, JobSched this.refreshPeriod = config.get( CausalClusteringSettings.cluster_topology_refresh ).toMillis(); this.hostnameResolver = hostnameResolver; this.topologyServiceRetryStrategy = topologyServiceRetryStrategy; - this.term = -1L; this.localDBName = config.get( CausalClusteringSettings.database ); } @@ -130,12 +129,11 @@ public boolean setClusterId( ClusterId clusterId, String dbName ) throws Interru } @Override - public void setLeader( MemberId memberId, String dbName, long term ) + public void setLeader( LeaderInfo leaderInfo, String dbName ) { - if ( this.term < term ) + if ( this.leaderInfo.term() < leaderInfo.term() ) { - localLeader = memberId; - this.term = term; + this.leaderInfo = leaderInfo; } } @@ -352,10 +350,10 @@ private Optional retrieveSocketAddress( MemberId member private void refreshRoles() throws InterruptedException { - if ( localLeader != null && localLeader.equals( myself ) ) + if ( leaderInfo != null && leaderInfo.memberId().equals( myself ) ) { waitOnHazelcastInstanceCreation(); - HazelcastClusterTopology.casLeaders( hazelcastInstance, localLeader, term, localDBName ); + HazelcastClusterTopology.casLeaders( hazelcastInstance, leaderInfo, localDBName ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/RaftCoreTopologyConnector.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/RaftCoreTopologyConnector.java index cfd18707d5971..75cd745e5f619 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/RaftCoreTopologyConnector.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/RaftCoreTopologyConnector.java @@ -21,15 +21,16 @@ import java.util.Set; +import org.neo4j.causalclustering.core.consensus.LeaderInfo; +import org.neo4j.causalclustering.core.consensus.LeaderListener; import org.neo4j.causalclustering.core.consensus.RaftMachine; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.lifecycle.LifecycleAdapter; /** * Makes the Raft aware of changes to the core topology and visa versa */ -public class RaftCoreTopologyConnector extends LifecycleAdapter implements CoreTopologyService.Listener, Listener +public class RaftCoreTopologyConnector extends LifecycleAdapter implements CoreTopologyService.Listener, LeaderListener { private final CoreTopologyService coreTopologyService; private final RaftMachine raftMachine; @@ -57,11 +58,9 @@ public synchronized void onCoreTopologyChange( CoreTopology coreTopology ) } @Override - public void receive( MemberId notification ) + public void onLeaderSwitch( LeaderInfo leaderInfo ) { - //TODO: Create LeaderListener interface implementing Listener - //Don't like this pattern as its not clear form the api that receive() is called from raftMachine. - coreTopologyService.setLeader( notification, dbName, raftMachine.term() ); + coreTopologyService.setLeader( leaderInfo, dbName ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/MemberId.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/MemberId.java index e258cee40b36c..1dce6a0ddc630 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/MemberId.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/identity/MemberId.java @@ -20,6 +20,7 @@ package org.neo4j.causalclustering.identity; import java.io.IOException; +import java.io.Serializable; import java.util.Objects; import java.util.UUID; @@ -29,7 +30,8 @@ import static java.lang.String.format; -public class MemberId +// TODO: basics for Serializable +public class MemberId implements Serializable { private final UUID uuid; private final String shortName; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/election/ElectionUtil.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/election/ElectionUtil.java index 6597ed0dc967d..484fd69eb1a01 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/election/ElectionUtil.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/election/ElectionUtil.java @@ -28,10 +28,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.neo4j.causalclustering.core.consensus.LeaderListener; import org.neo4j.causalclustering.core.consensus.RaftMachine; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.collection.Iterables; -import org.neo4j.kernel.impl.util.Listener; public class ElectionUtil { @@ -74,24 +74,24 @@ private static Runnable leaderViewUpdatingListener( RaftMachine raft, Iterable leaderViews, long viewCount, CompletableFuture futureAgreedLeader ) { - Listener listener = newLeader -> + LeaderListener listener = newLeader -> { synchronized ( leaderViews ) { - leaderViews.put( raft.identity(), newLeader ); + leaderViews.put( raft.identity(), newLeader.memberId() ); boolean leaderIsValid = false; for ( RaftMachine validRaft : validRafts ) { - if ( validRaft.identity().equals( newLeader ) ) + if ( validRaft.identity().equals( newLeader.memberId() ) ) { leaderIsValid = true; } } - if ( newLeader != null && leaderIsValid && allAgreeOnLeader( leaderViews, viewCount, newLeader ) ) + if ( newLeader.memberId() != null && leaderIsValid && allAgreeOnLeader( leaderViews, viewCount, newLeader.memberId() ) ) { - futureAgreedLeader.complete( newLeader ); + futureAgreedLeader.complete( newLeader.memberId() ); } } }; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/explorer/ComparableRaftState.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/explorer/ComparableRaftState.java index 382e221143efb..2929ec2517140 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/explorer/ComparableRaftState.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/explorer/ComparableRaftState.java @@ -24,6 +24,7 @@ import java.util.Objects; import java.util.Set; +import org.neo4j.causalclustering.core.consensus.LeaderInfo; import org.neo4j.causalclustering.core.consensus.log.cache.ConsecutiveInFlightCache; import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache; import org.neo4j.causalclustering.core.consensus.log.RaftLog; @@ -47,6 +48,7 @@ public class ComparableRaftState implements ReadableRaftState private final Log log; protected long term; protected MemberId leader; + private LeaderInfo leaderInfo = LeaderInfo.INITIAL; private long leaderCommit = -1; private MemberId votedFor; private Set votesForMe = new HashSet<>(); @@ -108,6 +110,12 @@ public MemberId leader() return leader; } + @Override + public LeaderInfo leaderInfo() + { + return leaderInfo; + } + @Override public long leaderCommit() { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java index 6743e99b03ca2..9ea605c0eecdb 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java @@ -24,7 +24,6 @@ import org.junit.Test; import org.junit.rules.ExpectedException; -import java.time.Clock; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -84,8 +83,7 @@ public void shouldSendReplicatedContentToLeader() throws Exception RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, sessionPool, - capturedProgress, noWaitTimeoutStrategy, noWaitTimeoutStrategy, availabilityGuard, NullLogProvider.getInstance(), replicationLimit, - Clock.systemUTC() ); + capturedProgress, noWaitTimeoutStrategy, noWaitTimeoutStrategy, availabilityGuard, NullLogProvider.getInstance(), replicationLimit ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, false ); @@ -112,8 +110,7 @@ public void shouldResendAfterTimeout() throws Exception CapturingOutbound outbound = new CapturingOutbound<>(); RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, - sessionPool, capturedProgress, noWaitTimeoutStrategy, noWaitTimeoutStrategy, availabilityGuard, NullLogProvider.getInstance(), replicationLimit, - Clock.systemUTC() ); + sessionPool, capturedProgress, noWaitTimeoutStrategy, noWaitTimeoutStrategy, availabilityGuard, NullLogProvider.getInstance(), replicationLimit ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, false ); @@ -143,7 +140,7 @@ public void shouldRetryGettingLeader() throws Exception RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, sessionPool, capturedProgress, noWaitTimeoutStrategy, new SpyRetryStrategy( leaderRetries ), - availabilityGuard, NullLogProvider.getInstance(), replicationLimit, Clock.systemUTC() ); + availabilityGuard, NullLogProvider.getInstance(), replicationLimit ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, false ); @@ -168,8 +165,7 @@ public void shouldReleaseSessionWhenFinished() throws Exception CapturingOutbound outbound = new CapturingOutbound<>(); RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, - sessionPool, capturedProgress, noWaitTimeoutStrategy, noWaitTimeoutStrategy, availabilityGuard, NullLogProvider.getInstance(), replicationLimit, - Clock.systemUTC() ); + sessionPool, capturedProgress, noWaitTimeoutStrategy, noWaitTimeoutStrategy, availabilityGuard, NullLogProvider.getInstance(), replicationLimit ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, true ); @@ -200,7 +196,7 @@ public void stopReplicationOnShutdown() throws NoLeaderFoundException, Interrupt RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, sessionPool, capturedProgress, noWaitTimeoutStrategy, noWaitTimeoutStrategy, - availabilityGuard, NullLogProvider.getInstance(), replicationLimit, Clock.systemUTC() ); + availabilityGuard, NullLogProvider.getInstance(), replicationLimit ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); ReplicatingThread replicatingThread = replicatingThread( replicator, content, true ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/id/IdReusabilityConditionTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/id/IdReusabilityConditionTest.java index 81f63a05df7e6..f51ca6e52d962 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/id/IdReusabilityConditionTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/id/IdReusabilityConditionTest.java @@ -24,6 +24,7 @@ import java.util.UUID; +import org.neo4j.causalclustering.core.consensus.LeaderInfo; import org.neo4j.causalclustering.core.consensus.RaftMachine; import org.neo4j.causalclustering.core.consensus.state.ExposedRaftState; import org.neo4j.causalclustering.identity.MemberId; @@ -62,7 +63,7 @@ public void shouldNeverReuseWhenNotLeader() { MemberId someoneElse = new MemberId( UUID.randomUUID() ); - idReusabilityCondition.receive( someoneElse ); + idReusabilityCondition.onLeaderSwitch( new LeaderInfo( someoneElse, 1 )); assertFalse( idReusabilityCondition.getAsBoolean() ); } @@ -74,7 +75,7 @@ public void shouldNotReturnTrueWithPendingTransactions() when( commandIndexTracker.getAppliedCommandIndex() ).thenReturn( 2L ); // gap-free when( state.lastLogIndexBeforeWeBecameLeader() ).thenReturn( 5L ); - idReusabilityCondition.receive( myself ); + idReusabilityCondition.onLeaderSwitch( new LeaderInfo( myself, 1 ) ); assertFalse( idReusabilityCondition.getAsBoolean() ); assertFalse( idReusabilityCondition.getAsBoolean() ); @@ -92,7 +93,7 @@ public void shouldOnlyReturnTrueWhenOldTransactionsBeenApplied() when( commandIndexTracker.getAppliedCommandIndex() ).thenReturn( 2L, 5L, 6L ); // gap-free when( state.lastLogIndexBeforeWeBecameLeader() ).thenReturn( 5L ); - idReusabilityCondition.receive( myself ); + idReusabilityCondition.onLeaderSwitch( new LeaderInfo( myself, 1 ) ); assertFalse( idReusabilityCondition.getAsBoolean() ); assertFalse( idReusabilityCondition.getAsBoolean() ); @@ -110,14 +111,14 @@ public void shouldNotReuseIfReelection() when( commandIndexTracker.getAppliedCommandIndex() ).thenReturn( 2L, 5L, 6L ); // gap-free when( state.lastLogIndexBeforeWeBecameLeader() ).thenReturn( 5L ); - idReusabilityCondition.receive( myself ); + idReusabilityCondition.onLeaderSwitch( new LeaderInfo( myself, 1 ) ); assertFalse( idReusabilityCondition.getAsBoolean() ); assertFalse( idReusabilityCondition.getAsBoolean() ); assertTrue( idReusabilityCondition.getAsBoolean() ); MemberId someoneElse = new MemberId( UUID.randomUUID() ); - idReusabilityCondition.receive( someoneElse ); + idReusabilityCondition.onLeaderSwitch( new LeaderInfo( someoneElse, 1 ) ); assertFalse( idReusabilityCondition.getAsBoolean() ); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/id/ReplicatedIdGeneratorTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/id/ReplicatedIdGeneratorTest.java index a8473e0d92045..821eedecdc9fa 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/id/ReplicatedIdGeneratorTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/id/ReplicatedIdGeneratorTest.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.UUID; +import org.neo4j.causalclustering.core.consensus.LeaderInfo; import org.neo4j.causalclustering.core.consensus.RaftMachine; import org.neo4j.causalclustering.core.consensus.state.ExposedRaftState; import org.neo4j.causalclustering.identity.MemberId; @@ -165,7 +166,7 @@ public void shouldReuseIdOnlyWhenLeader() when( commandIndexTracker.getAppliedCommandIndex() ).thenReturn( 6L ); // gap-free when( state.lastLogIndexBeforeWeBecameLeader() ).thenReturn( 5L ); - idReusabilityCondition.receive( myself ); + idReusabilityCondition.onLeaderSwitch( new LeaderInfo( myself, 1 ) ); idGenerator.freeId( 10 ); assertEquals( 1, idGenerator.getDefragCount() ); @@ -210,7 +211,7 @@ public void freeIdOnlyWhenReusabilityConditionAllows() when( commandIndexTracker.getAppliedCommandIndex() ).thenReturn( 4L, 6L ); // gap-free when( state.lastLogIndexBeforeWeBecameLeader() ).thenReturn( 5L ); - idReusabilityCondition.receive( myself ); + idReusabilityCondition.onLeaderSwitch( new LeaderInfo( myself, 1 ) ); assertEquals( 24, idGenerator.nextId() ); idGenerator.freeId( 11 ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderServiceTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderServiceTest.java index b56aa01dc875b..5ff0ae4eb4dea 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderServiceTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderServiceTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.neo4j.causalclustering.catchup.CatchupAddressProvider; +import org.neo4j.causalclustering.core.consensus.LeaderListener; import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.core.state.CommandApplicationProcess; @@ -37,7 +38,6 @@ import org.neo4j.function.Predicates; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.kernel.impl.util.CountingJobScheduler; -import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.impl.util.Neo4jJobScheduler; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -149,13 +149,13 @@ public MemberId getLeader() throws NoLeaderFoundException } @Override - public void registerListener( Listener listener ) + public void registerListener( LeaderListener listener ) { // do nothing } @Override - public void unregisterListener( Listener listener ) + public void unregisterListener( LeaderListener listener ) { // do nothing } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java index e64fbcd74015f..97a3a64e1a486 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/HazelcastClusterTopologyTest.java @@ -20,7 +20,6 @@ package org.neo4j.causalclustering.discovery; import com.hazelcast.client.impl.MemberImpl; -import com.hazelcast.concurrent.atomicreference.AtomicReferenceProxy; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IAtomicReference; import com.hazelcast.core.IMap; @@ -32,7 +31,6 @@ import org.junit.Test; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -46,6 +44,7 @@ import java.util.stream.Stream; import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.core.consensus.LeaderInfo; import org.neo4j.causalclustering.helpers.CausalClusteringTestHelpers; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; @@ -245,14 +244,14 @@ public void shouldCorrectlyReturnCoreMemberRoles() .mapToObj( ignored -> new MemberId( UUID.randomUUID() ) ).collect( Collectors.toList() ); @SuppressWarnings( "unchecked" ) - IAtomicReference leaderRef = mock( IAtomicReference.class ); + IAtomicReference leaderRef = mock( IAtomicReference.class ); MemberId chosenLeaderId = members.get( 0 ); - when( leaderRef.get() ).thenReturn( new RaftLeader( chosenLeaderId, 0L ) ); + when( leaderRef.get() ).thenReturn( new LeaderInfo( chosenLeaderId, 0L ) ); @SuppressWarnings( "unchecked" ) IMap uuidDBMap = mock( IMap.class ); when( uuidDBMap.keySet() ).thenReturn( Collections.singleton( DEFAULT_DB_NAME ) ); - when( hzInstance.getAtomicReference( startsWith( DB_NAME_LEADER_TERM_PREFIX ) ) ).thenReturn( leaderRef ); + when( hzInstance.getAtomicReference( startsWith( DB_NAME_LEADER_TERM_PREFIX ) ) ).thenReturn( leaderRef ); when( hzInstance.getMap( eq( CLUSTER_UUID_DB_NAME_MAP ) ) ).thenReturn( uuidDBMap ); // when diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java index 116c480b78fcf..d3f4567854574 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryCoreClient.java @@ -23,6 +23,7 @@ import java.util.Optional; import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.core.consensus.LeaderInfo; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; @@ -40,9 +41,7 @@ class SharedDiscoveryCoreClient extends AbstractTopologyService implements CoreT private final boolean refusesToBeLeader; private final String localDBName; - private MemberId localLeader; - private long term; - + private volatile LeaderInfo leaderInfo = LeaderInfo.INITIAL; private volatile CoreTopology coreTopology; private volatile ReadReplicaTopology readReplicaTopology; @@ -55,7 +54,6 @@ class SharedDiscoveryCoreClient extends AbstractTopologyService implements CoreT this.coreServerInfo = extractCoreServerInfo( config ); this.log = logProvider.getLog( getClass() ); this.refusesToBeLeader = config.get( CausalClusteringSettings.refuse_to_be_leader ); - this.term = -1L; this.localDBName = config.get( CausalClusteringSettings.database ); } @@ -91,13 +89,12 @@ public Map allCoreRoles() } @Override - public void setLeader( MemberId memberId, String dbName, long term ) + public void setLeader( LeaderInfo newLeader, String dbName ) { - if ( this.term < term && memberId != null ) + if ( this.leaderInfo.term() < newLeader.term() && newLeader.memberId() != null ) { - localLeader = memberId; - this.term = term; - sharedDiscoveryService.casLeaders( localLeader, term, localDBName ); + this.leaderInfo = newLeader; + sharedDiscoveryService.casLeaders( newLeader, localDBName ); } } @@ -193,6 +190,6 @@ public boolean refusesToBeLeader() public String toString() { return "SharedDiscoveryCoreClient{" + "myself=" + myself + ", coreServerInfo=" + coreServerInfo + ", refusesToBeLeader=" + refusesToBeLeader + - ", localDBName='" + localDBName + '\'' + ", localLeader=" + localLeader + ", term=" + term + ", coreTopology=" + coreTopology + '}'; + ", localDBName='" + localDBName + '\'' + ", leaderInfo=" + leaderInfo + ", coreTopology=" + coreTopology + '}'; } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java index c4d37f948e9c0..b04c66893dbe4 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/SharedDiscoveryService.java @@ -31,6 +31,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import org.neo4j.causalclustering.core.consensus.LeaderInfo; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; @@ -42,7 +43,7 @@ public class SharedDiscoveryService private final ConcurrentMap readReplicas; private final Set listeningClients; private final ConcurrentMap clusterIdDbNames; - private final ConcurrentMap leaderMap; + private final ConcurrentMap leaderMap; private final CountDownLatch enoughMembers; SharedDiscoveryService() @@ -125,19 +126,19 @@ void unRegisterReadReplica( SharedDiscoveryReadReplicaClient client ) notifyCoreClients(); } - synchronized void casLeaders( MemberId leader, long term, String dbName ) + synchronized void casLeaders( LeaderInfo leaderInfo, String dbName ) { - Optional current = Optional.ofNullable( leaderMap.get( dbName ) ); + Optional current = Optional.ofNullable( leaderMap.get( dbName ) ); - boolean noUpdate = current.map( RaftLeader::memberId ).equals( Optional.ofNullable( leader ) ); + boolean noUpdate = current.map( LeaderInfo::memberId ).equals( Optional.ofNullable( leaderInfo.memberId() ) ); - boolean greaterOrEqualTermExists = current.map(l -> l.term() >= term ).orElse( false ); + boolean greaterOrEqualTermExists = current.map(l -> l.term() >= leaderInfo.term() ).orElse( false ); boolean success = !( greaterOrEqualTermExists || noUpdate ); if ( success ) { - leaderMap.put( dbName, new RaftLeader( leader, term ) ); + leaderMap.put( dbName, leaderInfo ); } } @@ -161,7 +162,7 @@ Map getCoreRoles() .map( dbName -> Optional.ofNullable( leaderMap.get( dbName ) ) ) .filter( Optional::isPresent ) .map( Optional::get ) - .map( RaftLeader::memberId ) + .map( LeaderInfo::memberId ) .collect( Collectors.toSet()); Function roleMapper = m -> allLeaders.contains( m ) ? RoleInfo.LEADER : RoleInfo.FOLLOWER;