diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/EmptyStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/EmptyStateMachine.java new file mode 100644 index 000000000000..b6c73a2afa1e --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/EmptyStateMachine.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft; + +import org.neo4j.coreedge.server.CoreMember; + +class EmptyStateMachine implements RaftStateMachine +{ + @Override + public void notifyCommitted( long commitIndex ) + { + } + + @Override + public void notifyNeedFreshSnapshot() + { + } + + @Override + public void downloadSnapshot( CoreMember from ) + { + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/LeaderLocator.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/LeaderLocator.java index 26327dd0eca9..b1c502b3d1d2 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/LeaderLocator.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/LeaderLocator.java @@ -26,7 +26,7 @@ public interface LeaderLocator { CoreMember getLeader() throws NoLeaderFoundException; - void registerListener( Listener listener ); + void registerListener( Listener listener ); - void unregisterListener( Listener listener ); + void unregisterListener( Listener listener ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java index a9fc006e6198..e39260d0ef53 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java @@ -27,8 +27,6 @@ import java.util.function.Predicate; import java.util.function.Supplier; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; -import org.neo4j.coreedge.discovery.CoreTopologyService; import org.neo4j.coreedge.helper.VolatileFuture; import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLogEntry; @@ -47,9 +45,6 @@ import org.neo4j.coreedge.raft.state.term.TermState; import org.neo4j.coreedge.raft.state.vote.VoteState; import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.coreedge.server.core.LeaderOnlySelectionStrategy; -import org.neo4j.coreedge.server.core.NotMyselfSelectionStrategy; -import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy; import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.monitoring.Monitors; @@ -101,10 +96,8 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName private final long electionTimeout; private final Supplier databaseHealthSupplier; - private final LocalDatabase localDatabase; private final VolatileFuture volatileLeader = new VolatileFuture<>( null ); - private final CoreServerSelectionStrategy defaultStrategy; private final Outbound outbound; private final Log log; private Role currentRole = Role.FOLLOWER; @@ -112,16 +105,15 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName private RaftLogShippingManager logShipping; public RaftInstance( CoreMember myself, StateStorage termStorage, - StateStorage voteStorage, RaftLog entryLog, - RaftStateMachine raftStateMachine, long electionTimeout, long heartbeatInterval, - RenewableTimeoutService renewableTimeoutService, - CoreTopologyService discoveryService, - Outbound outbound, - LogProvider logProvider, RaftMembershipManager membershipManager, - RaftLogShippingManager logShipping, - Supplier databaseHealthSupplier, - InFlightMap inFlightMap, - Monitors monitors, LocalDatabase localDatabase ) + StateStorage voteStorage, RaftLog entryLog, + RaftStateMachine raftStateMachine, long electionTimeout, long heartbeatInterval, + RenewableTimeoutService renewableTimeoutService, + Outbound outbound, + LogProvider logProvider, RaftMembershipManager membershipManager, + RaftLogShippingManager logShipping, + Supplier databaseHealthSupplier, + InFlightMap inFlightMap, + Monitors monitors ) { this.myself = myself; this.entryLog = entryLog; @@ -130,12 +122,10 @@ public RaftInstance( CoreMember myself, StateStorage termStorage, this.heartbeatInterval = heartbeatInterval; this.renewableTimeoutService = renewableTimeoutService; - this.defaultStrategy = new NotMyselfSelectionStrategy( discoveryService, myself ); this.outbound = outbound; this.logShipping = logShipping; this.databaseHealthSupplier = databaseHealthSupplier; - this.localDatabase = localDatabase; this.log = logProvider.getLog( getClass() ); this.membershipManager = membershipManager; @@ -212,7 +202,7 @@ public CoreMember getLeader() throws NoLeaderFoundException return waitForLeader( 0, member -> member != null ); } - private CoreMember waitForLeader( long timeoutMillis, Predicate predicate ) throws NoLeaderFoundException + private CoreMember waitForLeader( long timeoutMillis, Predicate predicate ) throws NoLeaderFoundException { try { @@ -232,10 +222,10 @@ private CoreMember waitForLeader( long timeoutMillis, Predicate predicate ) thro } } - private Collection leaderListeners = new ArrayList<>(); + private Collection> leaderListeners = new ArrayList<>(); @Override - public synchronized void registerListener( Listener listener ) + public synchronized void registerListener( Listener listener ) { leaderListeners.add( listener ); listener.receive( state.leader() ); @@ -256,16 +246,13 @@ private void checkForSnapshotNeed( Outcome outcome ) { if ( outcome.needsFreshSnapshot() ) { - CoreServerSelectionStrategy strategy = outcome.isProcessable() - ? defaultStrategy - : new LeaderOnlySelectionStrategy( outcome ); - raftStateMachine.notifyNeedFreshSnapshot( strategy ); + raftStateMachine.notifyNeedFreshSnapshot(); } } private void notifyLeaderChanges( Outcome outcome ) { - for ( Listener listener : leaderListeners ) + for ( Listener listener : leaderListeners ) { listener.receive( outcome.getLeader() ); } @@ -314,7 +301,7 @@ public synchronized void handle( RaftMessages.RaftMessage incomingMessage ) { try { - Outcome outcome = currentRole.handler.handle( incomingMessage, state, log, localDatabase ); + Outcome outcome = currentRole.handler.handle( incomingMessage, state, log ); boolean newLeaderWasElected = leaderChanged( outcome, state.leader() ); boolean newCommittedEntry = outcome.getCommitIndex() > state.commitIndex(); @@ -410,7 +397,7 @@ public String toString() public static class BootstrapException extends Exception { - public BootstrapException( Throwable cause ) + BootstrapException( Throwable cause ) { super( cause ); } @@ -426,12 +413,12 @@ private long randomTimeoutRange() return electionTimeout; } - public Set votingMembers() + public Set votingMembers() { return membershipManager.votingMembers(); } - public Set replicationMembers() + public Set replicationMembers() { return membershipManager.replicationMembers(); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessageHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessageHandler.java index 4d0357ccdcfe..de28268cb854 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessageHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessageHandler.java @@ -21,15 +21,11 @@ import java.io.IOException; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.outcome.Outcome; -import org.neo4j.coreedge.raft.state.RaftState; import org.neo4j.coreedge.raft.state.ReadableRaftState; -import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.logging.Log; public interface RaftMessageHandler { - Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState context, Log log, LocalDatabase localDatabase ) - throws IOException; + Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState context, Log log ) throws IOException; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java index 4dccf00e499f..84ca7cecb8a5 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java @@ -148,7 +148,7 @@ protected void channelRead0( ChannelHandlerContext channelHandlerContext, { if ( localDatabase.isEmpty() ) { - raftStateMachine.notifyNeedFreshSnapshot( message::from ); + raftStateMachine.downloadSnapshot( message.from() ); } else { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftStateMachine.java index 13fbedafdd07..791edb59b158 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftStateMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftStateMachine.java @@ -19,7 +19,7 @@ */ package org.neo4j.coreedge.raft; -import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy; +import org.neo4j.coreedge.server.CoreMember; /** * The RAFT external entity that is interested in log entries and @@ -30,14 +30,15 @@ public interface RaftStateMachine /** * Called when the highest committed index increases. */ - default void notifyCommitted( long commitIndex ) {} + void notifyCommitted( long commitIndex ); /** * Download and install a snapshot of state from another member of the cluster. *

* Called when the consensus system no longer has the log entries required to * further update the state machine, because they have been deleted through pruning. - * @param strategy the strategy on how to pick a core to download from */ - default void notifyNeedFreshSnapshot( CoreServerSelectionStrategy strategy ) {} + void notifyNeedFreshSnapshot(); + + void downloadSnapshot( CoreMember from ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java index c8150a4732ca..026565c8e782 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java @@ -26,7 +26,6 @@ import java.util.HashSet; import java.util.Set; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLogCursor; import org.neo4j.coreedge.raft.log.ReadableRaftLog; @@ -68,22 +67,19 @@ public class RaftMembershipManager implements RaftMembership, MembershipDriver private final Log log; private final int expectedClusterSize; private final StateStorage stateStorage; - private final LocalDatabase localDatabase; private final RaftMembershipState raftMembershipState; private long lastApplied = -1; public RaftMembershipManager( SendToMyself replicator, RaftGroup.Builder memberSetBuilder, RaftLog entryLog, - LogProvider logProvider, int expectedClusterSize, long electionTimeout, - Clock clock, long catchupTimeout, - StateStorage stateStorage, - LocalDatabase localDatabase) + LogProvider logProvider, int expectedClusterSize, long electionTimeout, + Clock clock, long catchupTimeout, + StateStorage stateStorage ) { this.replicator = replicator; this.memberSetBuilder = memberSetBuilder; this.entryLog = entryLog; this.expectedClusterSize = expectedClusterSize; this.stateStorage = stateStorage; - this.localDatabase = localDatabase; this.raftMembershipState = stateStorage.getInitialState(); this.log = logProvider.getLog( getClass() ); @@ -97,7 +93,7 @@ public void processLog( long commitIndex, Collection logCommands ) t { if ( logCommand instanceof TruncateLogCommand ) { - onTruncated(commitIndex); + onTruncated( commitIndex ); } if ( logCommand instanceof AppendLogEntry ) { @@ -201,9 +197,9 @@ private Pair> findLastMembershipEntry() throws IOExce { Pair> lastMembershipEntry = null; long index = 0; - try( RaftLogCursor cursor = entryLog.getEntryCursor( index ) ) + try ( RaftLogCursor cursor = entryLog.getEntryCursor( index ) ) { - while( cursor.next() ) + while ( cursor.next() ) { ReplicatedContent content = cursor.get().content(); if ( content instanceof RaftGroup ) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java index 8c8093bc2198..bf58733b5cce 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java @@ -54,8 +54,6 @@ public class Outcome implements Message private long commitIndex; - private boolean processable; - /* Follower */ private CoreMember votedFor; private boolean renewElectionTimeout; @@ -66,7 +64,7 @@ public class Outcome implements Message private long lastLogIndexBeforeWeBecameLeader; /* Leader */ - private FollowerStates followerStates; + private FollowerStates followerStates; private Collection shipCommands = new ArrayList<>(); private boolean electedLeader; private boolean steppingDown; @@ -78,7 +76,7 @@ public Outcome( Role currentRole, ReadableRaftState ctx ) public Outcome( Role nextRole, long term, CoreMember leader, long leaderCommit, CoreMember votedFor, Set votesForMe, long lastLogIndexBeforeWeBecameLeader, - FollowerStates followerStates, boolean renewElectionTimeout, + FollowerStates followerStates, boolean renewElectionTimeout, Collection logCommands, Collection outgoingMessages, Collection shipCommands, long commitIndex ) { @@ -110,7 +108,6 @@ private void defaults( Role currentRole, ReadableRaftState ctx ) votedFor = ctx.votedFor(); renewElectionTimeout = false; needsFreshSnapshot = false; - processable = true; votesForMe = (currentRole == Role.CANDIDATE) ? new HashSet<>( ctx.votesForMe() ) : new HashSet<>(); @@ -165,16 +162,6 @@ public void markNeedForFreshSnapshot() this.needsFreshSnapshot = true; } - public void markUnprocessable() - { - this.processable = false; - } - - public boolean isProcessable() - { - return processable; - } - public void addVoteForMe( CoreMember voteFrom ) { this.votesForMe.add( voteFrom ); @@ -185,7 +172,7 @@ public void setLastLogIndexBeforeWeBecameLeader( long lastLogIndexBeforeWeBecame this.lastLogIndexBeforeWeBecameLeader = lastLogIndexBeforeWeBecameLeader; } - public void replaceFollowerStates( FollowerStates followerStates ) + public void replaceFollowerStates( FollowerStates followerStates ) { this.followerStates = followerStates; } @@ -274,7 +261,7 @@ public boolean needsFreshSnapshot() return needsFreshSnapshot; } - public Set getVotesForMe() + public Set getVotesForMe() { return votesForMe; } @@ -284,7 +271,7 @@ public long getLastLogIndexBeforeWeBecameLeader() return lastLogIndexBeforeWeBecameLeader; } - public FollowerStates getFollowerStates() + public FollowerStates getFollowerStates() { return followerStates; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java index 9344e5c43ad8..ec93990755be 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java @@ -31,13 +31,11 @@ import org.neo4j.coreedge.raft.outcome.TruncateLogCommand; import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.coreedge.raft.state.ReadableRaftState; -import org.neo4j.kernel.impl.store.StoreId; public class Appending { - public static void handleAppendEntriesRequest( - ReadableRaftState state, Outcome outcome, RaftMessages.AppendEntries.Request request, StoreId localStoreId ) - throws IOException + static void handleAppendEntriesRequest( ReadableRaftState state, Outcome outcome, + RaftMessages.AppendEntries.Request request ) throws IOException { if ( request.leaderTerm() < state.term() ) { @@ -113,8 +111,7 @@ else if ( logTerm != request.entries()[offset].term() ) } } - public static void appendNewEntry( ReadableRaftState ctx, Outcome outcome, ReplicatedContent - content ) throws IOException + static void appendNewEntry( ReadableRaftState ctx, Outcome outcome, ReplicatedContent content ) throws IOException { long prevLogIndex = ctx.entryLog().appendIndex(); long prevLogTerm = prevLogIndex == -1 ? -1 : @@ -128,7 +125,7 @@ public static void appendNewEntry( ReadableRaftState ctx, Outcome outcome, Repl outcome.addLogCommand( new AppendLogEntry( prevLogIndex + 1, newLogEntry ) ); } - public static void appendNewEntries( ReadableRaftState ctx, Outcome outcome, + static void appendNewEntries( ReadableRaftState ctx, Outcome outcome, List contents ) throws IOException { long prevLogIndex = ctx.entryLog().appendIndex(); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java index b10f02c74bef..90fc1501a54d 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java @@ -21,7 +21,6 @@ import java.io.IOException; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.NewLeaderBarrier; import org.neo4j.coreedge.raft.RaftMessageHandler; import org.neo4j.coreedge.raft.RaftMessages; @@ -34,11 +33,10 @@ import static org.neo4j.coreedge.raft.roles.Role.FOLLOWER; import static org.neo4j.coreedge.raft.roles.Role.LEADER; -public class Candidate implements RaftMessageHandler +class Candidate implements RaftMessageHandler { @Override - public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, - Log log, LocalDatabase localDatabase ) throws IOException + public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, Log log ) throws IOException { Outcome outcome = new Outcome( CANDIDATE, ctx ); @@ -77,7 +75,7 @@ public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, outcome.setNextRole( FOLLOWER ); log.info( "Moving to FOLLOWER state after receiving append entries request from %s at term %d (i am at %d)n", req.from(), req.leaderTerm(), ctx.term() ); - Appending.handleAppendEntriesRequest( ctx, outcome, req, localDatabase.storeId() ); + Appending.handleAppendEntriesRequest( ctx, outcome, req ); break; } @@ -127,7 +125,7 @@ else if ( res.term() < ctx.term() || !res.voteGranted() ) outcome.setNextRole( FOLLOWER ); log.info( "Moving to FOLLOWER state after receiving vote request from %s at term %d (i am at %d)", req.from(), req.term(), ctx.term() ); - Voting.handleVoteRequest( ctx, outcome, req, localDatabase.storeId() ); + Voting.handleVoteRequest( ctx, outcome, req ); break; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java index b6e0f9d28018..5c6afccef191 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java @@ -21,7 +21,6 @@ import java.io.IOException; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.RaftMessageHandler; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RaftMessages.AppendEntries; @@ -34,10 +33,9 @@ import static org.neo4j.coreedge.raft.roles.Role.CANDIDATE; import static org.neo4j.coreedge.raft.roles.Role.FOLLOWER; -public class Follower implements RaftMessageHandler +class Follower implements RaftMessageHandler { - public static boolean logHistoryMatches( ReadableRaftState ctx, long prevLogIndex, - long prevLogTerm ) + static boolean logHistoryMatches( ReadableRaftState ctx, long prevLogIndex, long prevLogTerm ) throws IOException { // NOTE: A prevLogIndex before or at our log's prevIndex means that we @@ -50,8 +48,7 @@ public static boolean logHistoryMatches( ReadableRaftState ctx, long prevLogInde ctx.entryLog().readEntryTerm( prevLogIndex ) == prevLogTerm; } - public static void commitToLogOnUpdate( - ReadableRaftState ctx, long indexOfLastNewEntry, long leaderCommit, Outcome outcome ) + static void commitToLogOnUpdate( ReadableRaftState ctx, long indexOfLastNewEntry, long leaderCommit, Outcome outcome ) { long newCommitIndex = min( leaderCommit, indexOfLastNewEntry ); @@ -61,8 +58,7 @@ public static void commitToLogOnUpdate( } } - public static void handleLeaderLogCompaction( - ReadableRaftState ctx, Outcome outcome, RaftMessages.LogCompactionInfo compactionInfo ) + private static void handleLeaderLogCompaction( ReadableRaftState ctx, Outcome outcome, RaftMessages.LogCompactionInfo compactionInfo ) { if ( compactionInfo.leaderTerm() < ctx.term() ) { @@ -76,8 +72,7 @@ public static void handleLeaderLogCompaction( } @Override - public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, Log log, - LocalDatabase localDatabase ) throws IOException + public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, Log log ) throws IOException { Outcome outcome = new Outcome( FOLLOWER, ctx ); @@ -91,15 +86,13 @@ public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, case APPEND_ENTRIES_REQUEST: { - Appending.handleAppendEntriesRequest( ctx, outcome, (AppendEntries.Request) message, - localDatabase.storeId() ); + Appending.handleAppendEntriesRequest( ctx, outcome, (AppendEntries.Request) message ); break; } case VOTE_REQUEST: { - Voting.handleVoteRequest( ctx, outcome, (RaftMessages.Vote.Request) message, - localDatabase.storeId() ); + Voting.handleVoteRequest( ctx, outcome, (RaftMessages.Vote.Request) message ); break; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java index 39dbcb2610b3..5b235a8c266c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java @@ -25,9 +25,9 @@ import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.state.ReadableRaftState; -public class Heart +class Heart { - public static void beat( ReadableRaftState state, Outcome outcome, RaftMessages.Heartbeat request ) throws IOException + static void beat( ReadableRaftState state, Outcome outcome, RaftMessages.Heartbeat request ) throws IOException { if ( request.leaderTerm() < state.term() ) { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java index 5533e6f860a3..2551899c3e56 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.List; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.Followers; import org.neo4j.coreedge.raft.RaftMessageHandler; import org.neo4j.coreedge.raft.RaftMessages; @@ -35,7 +34,6 @@ import org.neo4j.coreedge.raft.state.follower.FollowerStates; import org.neo4j.coreedge.server.CoreMember; import org.neo4j.helpers.collection.FilteringIterable; -import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.logging.Log; import static java.lang.Math.max; @@ -49,8 +47,7 @@ private static Iterable replicationTargets( final ReadableRaftState return new FilteringIterable<>( ctx.replicationMembers(), member -> !member.equals( ctx.myself() ) ); } - private static void sendHeartbeats( ReadableRaftState ctx, - Outcome outcome, StoreId storeId ) throws IOException + private static void sendHeartbeats( ReadableRaftState ctx, Outcome outcome ) throws IOException { long commitIndex = ctx.leaderCommit(); long commitIndexTerm = ctx.entryLog().readEntryTerm( commitIndex ); @@ -64,7 +61,7 @@ private static void sendHeartbeats( ReadableRaftState ctx, @Override public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, - Log log, LocalDatabase localDatabase ) throws IOException + Log log ) throws IOException { Outcome outcome = new Outcome( LEADER, ctx ); @@ -89,7 +86,7 @@ public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, case HEARTBEAT_TIMEOUT: { - sendHeartbeats( ctx, outcome, localDatabase.storeId() ); + sendHeartbeats( ctx, outcome ); break; } @@ -117,7 +114,7 @@ else if ( req.leaderTerm() == ctx.term() ) outcome.setNextRole( FOLLOWER ); log.info( "Moving to FOLLOWER state after receiving append request at term %d (my term is " + "%d) from %s", req.leaderTerm(), ctx.term(), req.from() ); - Appending.handleAppendEntriesRequest( ctx, outcome, req, localDatabase.storeId() ); + Appending.handleAppendEntriesRequest( ctx, outcome, req ); break; } } @@ -211,7 +208,7 @@ else if ( response.term() > ctx.term() ) "%d) from %s", req.term(), ctx.term(), req.from() ); outcome.setNextRole( FOLLOWER ); - Voting.handleVoteRequest( ctx, outcome, req, localDatabase.storeId() ); + Voting.handleVoteRequest( ctx, outcome, req ); break; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Voting.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Voting.java index 3903b5a08ea9..f9db4859c538 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Voting.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Voting.java @@ -25,13 +25,11 @@ import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.state.ReadableRaftState; import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.kernel.impl.store.StoreId; public class Voting { - public static void handleVoteRequest( ReadableRaftState state, Outcome outcome, - RaftMessages.Vote.Request voteRequest, - StoreId storeId ) throws IOException + static void handleVoteRequest( ReadableRaftState state, Outcome outcome, + RaftMessages.Vote.Request voteRequest ) throws IOException { if ( voteRequest.term() > state.term() ) { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java index c6b8ab40f4b2..ea663196a256 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java @@ -59,6 +59,7 @@ public class CoreState extends LifecycleAdapter implements RaftStateMachine, Log private final InFlightMap inFlightMap; private final Log log; private final CoreStateApplier applier; + private final CoreServerSelectionStrategy someoneElse; private final CoreStateDownloader downloader; private final RaftLogCommitIndexMonitor commitIndexMonitor; private final OperationBatcher batcher; @@ -79,6 +80,7 @@ public CoreState( ProgressTracker progressTracker, StateStorage lastFlushedStorage, StateStorage sessionStorage, + CoreServerSelectionStrategy someoneElse, CoreStateApplier applier, CoreStateDownloader downloader, InFlightMap inFlightMap, @@ -89,6 +91,7 @@ public CoreState( this.flushEvery = flushEvery; this.progressTracker = progressTracker; this.sessionStorage = sessionStorage; + this.someoneElse = someoneElse; this.applier = applier; this.downloader = downloader; this.log = logProvider.getLog( getClass() ); @@ -200,15 +203,15 @@ private void flush() throws Exception } @Override - public synchronized void notifyNeedFreshSnapshot( CoreServerSelectionStrategy strategy ) + public synchronized void notifyNeedFreshSnapshot() { try { - downloadSnapshot( strategy.coreServer() ); + downloadSnapshot( someoneElse.coreServer() ); } - catch ( InterruptedException | StoreCopyFailedException | CoreServerSelectionException e ) + catch ( CoreServerSelectionException e ) { - log.error( "Failed to download snapshot", e ); + log.error( "Failed to select server", e ); } } @@ -224,13 +227,20 @@ public void compact() throws IOException /** * Attempts to download a fresh snapshot from another core instance. + * * @param source The source address to attempt a download of a snapshot from. */ public synchronized void downloadSnapshot( CoreMember source ) - throws InterruptedException, StoreCopyFailedException { - applier.sync( true ); - downloader.downloadSnapshot( source, this ); + try + { + applier.sync( true ); + downloader.downloadSnapshot( source, this ); + } + catch ( InterruptedException | StoreCopyFailedException e ) + { + log.error( "Failed to download snapshot", e ); + } } private void handleOperations( long commandIndex, List operations ) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java index b3e4b35f1ea2..f94b23b0be7d 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java @@ -68,7 +68,8 @@ public CoreStateMachines( ReplicatedIdAllocationStateMachine idAllocationStateMachine, CoreState coreState, RecoverTransactionLogState txLogState, - MonitoredRaftLog raftLog, LocalDatabase localDatabase ) + MonitoredRaftLog raftLog, + LocalDatabase localDatabase ) { this.replicatedTxStateMachine = replicatedTxStateMachine; this.labelTokenStateMachine = labelTokenStateMachine; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java index bc8d6ff6cd09..e39c5ff3934d 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java @@ -45,9 +45,9 @@ public class RaftState implements ReadableRaftState private CoreMember leader; private long leaderCommit = -1; private final VoteState voteState; - private Set votesForMe = new HashSet<>(); + private Set votesForMe = new HashSet<>(); private long lastLogIndexBeforeWeBecameLeader = -1; - private FollowerStates followerStates = new FollowerStates<>(); + private FollowerStates followerStates = new FollowerStates<>(); private final RaftLog entryLog; private final InFlightMap inFlightMap; private long commitIndex = -1; @@ -76,13 +76,13 @@ public CoreMember myself() } @Override - public Set votingMembers() + public Set votingMembers() { return membership.votingMembers(); } @Override - public Set replicationMembers() + public Set replicationMembers() { return membership.replicationMembers(); } @@ -112,7 +112,7 @@ public CoreMember votedFor() } @Override - public Set votesForMe() + public Set votesForMe() { return votesForMe; } @@ -124,7 +124,7 @@ public long lastLogIndexBeforeWeBecameLeader() } @Override - public FollowerStates followerStates() + public FollowerStates followerStates() { return followerStates; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/membership/RaftMembershipState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/membership/RaftMembershipState.java index 37d3e2ec1ceb..20834b1193fb 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/membership/RaftMembershipState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/membership/RaftMembershipState.java @@ -86,20 +86,20 @@ public void logIndex( long logIndex ) private void updateReplicationMembers() { - HashSet newReplicationMembers = new HashSet<>( votingMembers ); + HashSet newReplicationMembers = new HashSet<>( votingMembers ); newReplicationMembers.addAll( additionalReplicationMembers ); this.replicationMembers = newReplicationMembers; } @Override - public Set votingMembers() + public Set votingMembers() { return new HashSet<>( votingMembers ); } @Override - public Set replicationMembers() + public Set replicationMembers() { return new HashSet<>( replicationMembers ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index 2e08bc8ecd9c..aa3f7a5a5150 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -318,11 +318,13 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, InFlightMap inFlightMap = new InFlightMap<>(); + NotMyselfSelectionStrategy someoneElse = new NotMyselfSelectionStrategy( discoveryService, myself ); + coreState = dependencies.satisfyDependency( new CoreState( raftLog, config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ), config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), databaseHealthSupplier, logProvider, progressTracker, lastFlushedStorage, - sessionTrackerStorage, applier, downloader, inFlightMap, platformModule.monitors ) ); + sessionTrackerStorage, someoneElse, applier, downloader, inFlightMap, platformModule.monitors ) ); raftServer = new RaftServer( marshal, raftListenAddress, localDatabase, logProvider, coreState ); @@ -617,8 +619,8 @@ private static RaftInstance createRaft( LifeSupport life, RaftMembershipManager raftMembershipManager = new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider, expectedClusterSize, electionTimeout, systemUTC(), - config.get( CoreEdgeClusterSettings.join_catch_up_timeout ), raftMembershipStorage, - localDatabase ); + config.get( CoreEdgeClusterSettings.join_catch_up_timeout ), raftMembershipStorage + ); RaftLogShippingManager logShipping = new RaftLogShippingManager( raftOutbound, logProvider, raftLog, systemUTC(), @@ -628,10 +630,8 @@ expectedClusterSize, electionTimeout, systemUTC(), RaftInstance raftInstance = new RaftInstance( myself, termState, voteState, raftLog, raftStateMachine, electionTimeout, - heartbeatInterval, raftTimeoutService, - discoveryService, raftOutbound, - logProvider, raftMembershipManager, logShipping, databaseHealthSupplier, inFlightMap, monitors, - localDatabase ); + heartbeatInterval, raftTimeoutService, raftOutbound, logProvider, raftMembershipManager, + logShipping, databaseHealthSupplier, inFlightMap, monitors ); int queueSize = config.get( CoreEdgeClusterSettings.raft_in_queue_size ); int maxBatch = config.get( CoreEdgeClusterSettings.raft_in_queue_max_batch ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesMessageFlowTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesMessageFlowTest.java index c5ad125924da..f162846d7347 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesMessageFlowTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesMessageFlowTest.java @@ -26,19 +26,16 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.net.Outbound; import org.neo4j.coreedge.server.CoreMember; import org.neo4j.coreedge.server.RaftTestMemberSetBuilder; -import org.neo4j.kernel.impl.store.StoreId; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import static org.neo4j.coreedge.raft.TestMessageBuilders.appendEntriesRequest; import static org.neo4j.coreedge.raft.TestMessageBuilders.appendEntriesResponse; import static org.neo4j.coreedge.server.RaftTestMember.member; @@ -51,12 +48,8 @@ public class AppendEntriesMessageFlowTest private ReplicatedInteger data = ReplicatedInteger.valueOf( 1 ); - private final StoreId storeId = new StoreId( 1, 2, 3, 4, 5 ); - @Mock private Outbound outbound; - @Mock - private LocalDatabase localDatabase; ReplicatedInteger data( int value ) { @@ -69,10 +62,8 @@ ReplicatedInteger data( int value ) public void setup() { // given - when( localDatabase.storeId() ).thenReturn( storeId ); raft = new RaftInstanceBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) .outbound( outbound ) - .localDatabase( localDatabase ) .build(); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesRequestBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesRequestBuilder.java index 659bfde39e9b..0f3a911c9b07 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesRequestBuilder.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesRequestBuilder.java @@ -24,7 +24,6 @@ import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.kernel.impl.store.StoreId; public class AppendEntriesRequestBuilder { diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesResponseBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesResponseBuilder.java index 5c2235f8968e..ad014e717269 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesResponseBuilder.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesResponseBuilder.java @@ -20,11 +20,10 @@ package org.neo4j.coreedge.raft; import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.kernel.impl.store.StoreId; public class AppendEntriesResponseBuilder { - boolean success; + private boolean success; private long term = -1; private CoreMember from; private long matchIndex = -1; @@ -33,7 +32,7 @@ public class AppendEntriesResponseBuilder public RaftMessages.AppendEntries.Response build() { // a response of false should always have a match index of -1 - assert !(success == false && matchIndex != -1); + assert success || matchIndex == -1; return new RaftMessages.AppendEntries.Response( from, term, success, matchIndex, appendIndex ); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java index 594776ae4524..0de67d51b07c 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java @@ -25,7 +25,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.net.Inbound; import org.neo4j.logging.NullLogProvider; @@ -35,10 +34,8 @@ public class BatchingMessageHandlerTest { - private static final long DEFAULT_TIMEOUT_MS = 15_000; private static final int MAX_BATCH = 16; private static final int QUEUE_SIZE = 64; - private LocalDatabase localDatabase = mock( LocalDatabase.class ); @Test public void shouldInvokeInnerHandlerWhenRun() throws Exception diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/CatchUpTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/CatchUpTest.java index 17ce112f4141..480805bba2f8 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/CatchUpTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/CatchUpTest.java @@ -30,7 +30,6 @@ import org.neo4j.coreedge.raft.membership.RaftTestGroup; import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.kernel.impl.store.StoreId; import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.Matchers.empty; @@ -41,8 +40,6 @@ public class CatchUpTest { - private StoreId storeId = new StoreId( 1, 2, 3, 4, 5 ); - @Test public void happyClusterPropagatesUpdates() throws Throwable { diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java index a93b56fb4202..fe9acfeb3519 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java @@ -23,8 +23,6 @@ import java.util.Collection; import java.util.function.Supplier; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; -import org.neo4j.coreedge.discovery.CoreTopologyService; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLogEntry; @@ -46,8 +44,6 @@ import org.neo4j.logging.LogProvider; import org.neo4j.logging.NullLogProvider; -import static org.mockito.Mockito.mock; - public class RaftInstanceBuilder { private final CoreMember member; @@ -89,9 +85,8 @@ public void send( CoreMember to, Collection raftMessag private StateStorage raftMembership = new InMemoryStateStorage<>( new RaftMembershipState() ); private Monitors monitors = new Monitors(); - private RaftStateMachine raftStateMachine = new RaftStateMachine(){}; + private RaftStateMachine raftStateMachine = new EmptyStateMachine(); private final InFlightMap inFlightMap; - private LocalDatabase localDatabase = mock( LocalDatabase.class ); public RaftInstanceBuilder( CoreMember member, int expectedClusterSize, RaftGroup.Builder memberSetBuilder ) { @@ -106,26 +101,17 @@ public RaftInstance build() SendToMyself leaderOnlyReplicator = new SendToMyself( member, outbound ); RaftMembershipManager membershipManager = new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider, expectedClusterSize, electionTimeout, clock, catchupTimeout, - raftMembership, localDatabase ); + raftMembership ); RaftLogShippingManager logShipping = new RaftLogShippingManager( outbound, logProvider, raftLog, clock, member, membershipManager, retryTimeMillis, catchupBatchSize, maxAllowedShippingLag, inFlightMap ); - RaftInstance raft = new RaftInstance( member, termState, voteState, raftLog, raftStateMachine, electionTimeout, - heartbeatInterval, renewableTimeoutService, mock( CoreTopologyService.class), - outbound, - logProvider, membershipManager, logShipping, databaseHealthSupplier, inFlightMap, monitors, - localDatabase ); + heartbeatInterval, renewableTimeoutService, outbound, logProvider, + membershipManager, logShipping, databaseHealthSupplier, inFlightMap, monitors ); inbound.registerHandler( raft ); return raft; } - public RaftInstanceBuilder localDatabase( LocalDatabase localDatabase ) - { - this.localDatabase = localDatabase; - return this; - } - public RaftInstanceBuilder electionTimeout( long electionTimeout ) { this.electionTimeout = electionTimeout; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java index 6309d24303aa..de6a71f742ae 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java @@ -23,7 +23,6 @@ import java.io.IOException; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLogCursor; @@ -33,7 +32,6 @@ import org.neo4j.coreedge.server.CoreMember; import org.neo4j.coreedge.server.RaftTestMemberSetBuilder; import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; -import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.internal.KernelEventHandlers; import org.neo4j.kernel.monitoring.Monitors; @@ -47,8 +45,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.neo4j.coreedge.raft.RaftInstance.Timeouts.ELECTION; import static org.neo4j.coreedge.raft.TestMessageBuilders.appendEntriesRequest; import static org.neo4j.coreedge.raft.TestMessageBuilders.voteRequest; @@ -72,7 +68,6 @@ public class RaftInstanceTest private ReplicatedInteger data1 = ReplicatedInteger.valueOf( 1 ); private RaftLog raftLog = new InMemoryRaftLog(); - private StoreId storeId = new StoreId( 1, 2, 3, 4, 5 ); @Test public void shouldAlwaysStartAsFollower() throws Exception @@ -355,15 +350,10 @@ public void shouldThrowExceptionIfReceivesClientRequestWithNoLeaderElected() thr public void shouldPersistAtSpecifiedLogIndex() throws Exception { // given - StoreId storeId = new StoreId( 1, 2, 3, 4, 5 ); - LocalDatabase localDatabase = mock( LocalDatabase.class ); - when(localDatabase.storeId()).thenReturn( storeId ); - ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); RaftInstance raft = new RaftInstanceBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) .timeoutService( timeouts ) .raftLog( raftLog ) - .localDatabase( localDatabase ) .build(); raft.bootstrapWithInitialMembers( new RaftTestGroup( asSet( myself, member1, member2 ) ) ); // @logIndex=0 diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/ElectionPerformanceIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/ElectionPerformanceIT.java index 8c1917d844c5..408c31836be4 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/ElectionPerformanceIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/ElectionPerformanceIT.java @@ -70,7 +70,12 @@ public void notifyCommitted( long commitIndex ) } @Override - public void notifyNeedFreshSnapshot( CoreServerSelectionStrategy strategy ) + public void notifyNeedFreshSnapshot() + { + } + + @Override + public void downloadSnapshot( CoreMember from ) { } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/RaftMembershipManagerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/RaftMembershipManagerTest.java index 755e4f483eac..02c7391ced3c 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/RaftMembershipManagerTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/RaftMembershipManagerTest.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.List; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.outcome.AppendLogEntry; @@ -49,8 +48,6 @@ public class RaftMembershipManagerTest { - private LocalDatabase localDatabase = mock( LocalDatabase.class ); - @Test public void membershipManagerShouldUseLatestAppendedMembershipSetEntries() throws Exception @@ -61,7 +58,7 @@ public void membershipManagerShouldUseLatestAppendedMembershipSetEntries() RaftMembershipManager membershipManager = new RaftMembershipManager( null, RaftTestMemberSetBuilder.INSTANCE, log, NullLogProvider.getInstance(), 3, 1000, new FakeClock(), - 1000, new InMemoryStateStorage<>( new RaftMembershipState() ), localDatabase ); + 1000, new InMemoryStateStorage<>( new RaftMembershipState() ) ); // when membershipManager.processLog( 0, asList( @@ -83,7 +80,7 @@ public void membershipManagerShouldRevertToOldMembershipSetAfterTruncationCauses RaftMembershipManager membershipManager = new RaftMembershipManager( null, RaftTestMemberSetBuilder.INSTANCE, log, NullLogProvider.getInstance(), 3, 1000, new FakeClock(), - 1000, new InMemoryStateStorage<>( new RaftMembershipState() ), localDatabase ); + 1000, new InMemoryStateStorage<>( new RaftMembershipState() ) ); // when List logCommands = asList( @@ -113,7 +110,7 @@ public void membershipManagerShouldRevertToEarlierAppendedMembershipSetAfterTrun RaftMembershipManager membershipManager = new RaftMembershipManager( null, RaftTestMemberSetBuilder.INSTANCE, log, NullLogProvider.getInstance(), 3, 1000, new FakeClock(), - 1000, new InMemoryStateStorage<>( new RaftMembershipState() ), localDatabase ); + 1000, new InMemoryStateStorage<>( new RaftMembershipState() ) ); // when List logCommands = asList( @@ -148,7 +145,7 @@ public void shouldNotOverwriteCurrentStateWithPreviousState() throws Exception RaftMembershipManager membershipManager = new RaftMembershipManager( null, RaftTestMemberSetBuilder.INSTANCE, log, NullLogProvider.getInstance(), 3, 1000, new FakeClock(), - 1000, stateStorage, localDatabase ); + 1000, stateStorage ); // when membershipManager.processLog( 0, Collections.singletonList( new AppendLogEntry( 0, new RaftLogEntry( 0, new diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/RaftMessageProcessingTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/RaftMessageProcessingTest.java index 6a23be5c17e0..26b58ecd0f8b 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/RaftMessageProcessingTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/net/RaftMessageProcessingTest.java @@ -36,7 +36,6 @@ import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.coreedge.raft.state.ChannelMarshal; import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.storageengine.api.ReadPastEndException; import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.WritableChannel; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java index 312b340d7bba..2ce0b56f4686 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java @@ -29,7 +29,6 @@ import java.util.Collection; import java.util.concurrent.atomic.AtomicBoolean; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.LeaderContext; import org.neo4j.coreedge.raft.OutboundMessageCollector; import org.neo4j.coreedge.raft.RaftMessages; @@ -43,7 +42,6 @@ import org.neo4j.coreedge.raft.log.segmented.InFlightMap; import org.neo4j.coreedge.server.CoreMember; import org.neo4j.helpers.collection.Iterables; -import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import org.neo4j.test.DoubleLatch; @@ -80,7 +78,6 @@ public class RaftLogShipperTest private RaftLogEntry entry1 = new RaftLogEntry( 0, ReplicatedString.valueOf( "kedha" ) ); private RaftLogEntry entry2 = new RaftLogEntry( 0, ReplicatedInteger.valueOf( 2000 ) ); private RaftLogEntry entry3 = new RaftLogEntry( 0, ReplicatedString.valueOf( "chupchick" ) ); - private StoreId storeId = new StoreId( 1, 2, 3, 4, 5 ); @Before public void setup() @@ -111,13 +108,8 @@ public void teardown() private void startLogShipper() { - LocalDatabase localDatabase = mock( LocalDatabase.class ); - when( localDatabase.storeId() ).thenReturn( storeId ); - - logShipper = - new RaftLogShipper( outbound, logProvider, raftLog, clock, leader, follower, leaderTerm, leaderCommit, - retryTimeMillis, catchupBatchSize, maxAllowedShippingLag, new InFlightMap<>() - ); + logShipper = new RaftLogShipper( outbound, logProvider, raftLog, clock, leader, follower, leaderTerm, leaderCommit, + retryTimeMillis, catchupBatchSize, maxAllowedShippingLag, new InFlightMap<>() ); logShipper.start(); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendEntriesRequestTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendEntriesRequestTest.java index 767a8012e7b7..94cf93aecef1 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendEntriesRequestTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendEntriesRequestTest.java @@ -28,7 +28,6 @@ import java.util.Arrays; import java.util.Collection; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.RaftMessages.AppendEntries.Response; import org.neo4j.coreedge.raft.ReplicatedString; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; @@ -47,7 +46,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; import static org.neo4j.coreedge.raft.MessageUtils.messageFor; import static org.neo4j.coreedge.raft.TestMessageBuilders.appendEntriesRequest; import static org.neo4j.coreedge.raft.roles.AppendEntriesRequestTest.ContentGenerator.content; @@ -58,8 +56,6 @@ public class AppendEntriesRequestTest { - private final LocalDatabase localDatabase = mock( LocalDatabase.class); - @Parameterized.Parameters(name = "{0} with leader {1} terms ahead.") public static Collection data() { @@ -94,7 +90,7 @@ public void shouldAcceptInitialEntry() throws Exception .prevLogIndex( -1 ) .prevLogTerm( -1 ) .logEntry( logEntry ) - .build(), state, log(), localDatabase ); + .build(), state, log() ); // then assertTrue( ((Response) messageFor( outcome, leader )).success() ); @@ -120,7 +116,7 @@ public void shouldAcceptInitialEntries() throws Exception .prevLogTerm( -1 ) .logEntry( logEntry1 ) .logEntry( logEntry2 ) - .build(), state, log(), localDatabase ); + .build(), state, log() ); // then assertTrue( ((Response) messageFor( outcome, leader )).success() ); @@ -145,7 +141,7 @@ public void shouldRejectDiscontinuousEntries() throws Exception .prevLogIndex( state.entryLog().appendIndex() + 1 ) .prevLogTerm( leaderTerm ) .logEntry( new RaftLogEntry( leaderTerm, content() ) ) - .build(), state, log(), localDatabase ); + .build(), state, log() ); // then Response response = (Response) messageFor( outcome, leader ); @@ -172,7 +168,7 @@ public void shouldAcceptContinuousEntries() throws Exception .prevLogIndex( raftLog.appendIndex() ) .prevLogTerm( leaderTerm ) .logEntry( new RaftLogEntry( leaderTerm, content() ) ) - .build(), state, log(), localDatabase ); + .build(), state, log() ); // then assertTrue( ((Response) messageFor( outcome, leader )).success() ); @@ -199,7 +195,7 @@ public void shouldTruncateOnReceiptOfConflictingEntry() throws Exception .prevLogIndex( raftLog.appendIndex() - 1 ) .prevLogTerm( -1 ) .logEntry( new RaftLogEntry( leaderTerm, content() ) ) - .build(), state, log(), localDatabase ); + .build(), state, log() ); // then assertTrue( ((Response) messageFor( outcome, leader )).success() ); @@ -226,7 +222,7 @@ public void shouldCommitEntry() throws Exception .prevLogIndex( raftLog.appendIndex() ) .prevLogTerm( leaderTerm ) .leaderCommit( 0 ) - .build(), state, log(), localDatabase ); + .build(), state, log() ); // then assertTrue( ((Response) messageFor( outcome, leader )).success() ); @@ -256,7 +252,7 @@ public void shouldAppendNewEntryAndCommitPreviouslyAppendedEntry() throws Except .prevLogTerm( leaderTerm ) .logEntry( newLogEntry ) .leaderCommit( 0 ) - .build(), state, log(), localDatabase ); + .build(), state, log() ); // then assertTrue( ((Response) messageFor( outcome, leader )).success() ); @@ -286,7 +282,7 @@ public void shouldNotCommitAheadOfMatchingHistory() throws Exception .prevLogIndex( raftLog.appendIndex() + 1 ) .prevLogTerm( leaderTerm ) .leaderCommit( 0 ) - .build(), state, log(), localDatabase ); + .build(), state, log() ); // then assertFalse( ((Response) messageFor( outcome, leader )).success() ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java index 6481b1463d19..c9d141180ffd 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java @@ -32,7 +32,6 @@ import org.neo4j.coreedge.raft.outcome.TruncateLogCommand; import org.neo4j.coreedge.raft.state.ReadableRaftState; import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.kernel.impl.store.StoreId; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -47,7 +46,6 @@ public class AppendingTest { private CoreMember aMember = member( 0 ); - private StoreId storeId = new StoreId( 1, 2, 3, 4, 5 ); @Test public void shouldPerformTruncation() throws Exception @@ -73,7 +71,7 @@ public void shouldPerformTruncation() throws Exception localTermForAllEntries, new RaftLogEntry[]{ new RaftLogEntry( localTermForAllEntries + 1, ReplicatedInteger.valueOf( 2 ) )}, - appendIndex + 3 ), storeId ); + appendIndex + 3 ) ); // then // we must produce a TruncateLogCommand at the earliest mismatching index @@ -104,7 +102,7 @@ public void shouldNotAllowTruncationAtCommit() throws Exception localTermForAllEntries, new RaftLogEntry[]{ new RaftLogEntry( localTermForAllEntries + 1, ReplicatedInteger.valueOf( 2 ) )}, - commitIndex + 3 ), storeId ); + commitIndex + 3 ) ); fail( "Appending should not allow truncation at or before the commit index" ); } catch ( IllegalStateException expected ) @@ -137,7 +135,7 @@ public void shouldNotAllowTruncationBeforeCommit() throws Exception localTermForAllEntries, new RaftLogEntry[]{ new RaftLogEntry( localTermForAllEntries + 1, ReplicatedInteger.valueOf( 2 ) )}, - commitIndex + 3 ), storeId ); + commitIndex + 3 ) ); fail( "Appending should not allow truncation at or before the commit index" ); } catch ( IllegalStateException expected ) @@ -175,7 +173,7 @@ public void shouldNotAttemptToTruncateAtIndexBeforeTheLogPrevIndex() throws Exce prevTerm, new RaftLogEntry[]{ new RaftLogEntry( prevTerm, ReplicatedInteger.valueOf( 2 ) )}, - commitIndex + 3 ), storeId ); + commitIndex + 3 ) ); // then // there should be no truncate commands. Actually, the whole thing should be a no op diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java index a2645abcfbbc..7f5312599437 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java @@ -26,7 +26,6 @@ import java.io.IOException; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.NewLeaderBarrier; import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.net.Inbound; @@ -41,7 +40,6 @@ import static org.hamcrest.CoreMatchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; import static org.neo4j.coreedge.raft.TestMessageBuilders.voteResponse; import static org.neo4j.coreedge.raft.roles.Role.CANDIDATE; import static org.neo4j.coreedge.raft.roles.Role.FOLLOWER; @@ -59,7 +57,6 @@ public class CandidateTest private Inbound inbound; private LogProvider logProvider = NullLogProvider.getInstance(); - private final LocalDatabase storeId = mock( LocalDatabase.class); @Test public void shouldBeElectedLeaderOnReceivingGrantedVoteResponseWithCurrentTerm() throws Exception @@ -72,7 +69,7 @@ public void shouldBeElectedLeaderOnReceivingGrantedVoteResponseWithCurrentTerm() .term( state.term() ) .from( member1 ) .grant() - .build(), state, log(), storeId ); + .build(), state, log() ); // then assertEquals( LEADER, outcome.getRole() ); @@ -91,7 +88,7 @@ public void shouldStayAsCandidateOnReceivingDeniedVoteResponseWithCurrentTerm() .term( state.term() ) .from( member1 ) .deny() - .build(), state, log(), storeId ); + .build(), state, log() ); // then assertEquals( CANDIDATE, outcome.getRole() ); @@ -110,7 +107,7 @@ public void shouldUpdateTermOnReceivingVoteResponseWithLaterTerm() throws Except .term( voterTerm ) .from( member1 ) .grant() - .build(), state, log(), storeId ); + .build(), state, log() ); // then assertEquals( FOLLOWER, outcome.getRole() ); @@ -130,7 +127,7 @@ public void shouldRejectVoteResponseWithOldTerm() throws Exception .term( voterTerm ) .from( member1 ) .grant() - .build(), state, log(), storeId ); + .build(), state, log() ); // then assertEquals( CANDIDATE, outcome.getRole() ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java index b3bf9e7a26c3..cc6c96ab3c75 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java @@ -26,7 +26,6 @@ import java.io.IOException; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RaftMessages.RaftMessage; import org.neo4j.coreedge.raft.RaftMessages.Timeout.Election; @@ -36,7 +35,6 @@ import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.state.RaftState; import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.coreedge.server.RaftTestMember; import org.neo4j.logging.Log; import org.neo4j.logging.NullLogProvider; @@ -44,7 +42,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; import static org.neo4j.coreedge.raft.MessageUtils.messageFor; import static org.neo4j.coreedge.raft.RaftMessages.AppendEntries; import static org.neo4j.coreedge.raft.TestMessageBuilders.appendEntriesRequest; @@ -65,7 +62,6 @@ public class FollowerTest @Mock private Inbound inbound; - private final LocalDatabase storeId = mock( LocalDatabase.class); @Test public void followerShouldTransitToCandidateAndInstigateAnElectionAfterTimeout() throws Exception @@ -77,7 +73,7 @@ public void followerShouldTransitToCandidateAndInstigateAnElectionAfterTimeout() .build(); // when - Outcome outcome = new Follower().handle( new Election( myself ), state, log(), storeId ); + Outcome outcome = new Follower().handle( new Election( myself ), state, log() ); state.update( outcome ); @@ -100,7 +96,7 @@ public void shouldBecomeCandidateOnReceivingElectionTimeoutMessage() throws Exce Follower follower = new Follower(); // when - Outcome outcome = follower.handle( new Election( myself ), state, log(), storeId ); + Outcome outcome = follower.handle( new Election( myself ), state, log() ); // then assertEquals( CANDIDATE, outcome.getRole() ); @@ -126,7 +122,7 @@ public void followerReceivingHeartbeatIndicatingClusterIsAheadShouldElicitAppend .prevLogTerm( term ) // in the same term .build(); // no entries, this is a heartbeat - Outcome outcome = follower.handle( heartbeat, state, log(), storeId ); + Outcome outcome = follower.handle( heartbeat, state, log() ); assertEquals( 1, outcome.getOutgoingMessages().size() ); RaftMessage outgoing = outcome.getOutgoingMessages().iterator().next().message(); @@ -151,14 +147,14 @@ public void shouldTruncateIfTermDoesNotMatch() throws Exception new RaftLogEntry[]{ new RaftLogEntry( 2, ContentGenerator.content() ), }, - -1 ), state, log(), storeId ) ); + -1 ), state, log() ) ); RaftLogEntry[] entries = { new RaftLogEntry( 1, new ReplicatedString( "commit this!" ) ), }; Outcome outcome = follower.handle( - new AppendEntries.Request( member1, 1, -1, -1, entries, -1 ), state, log(), storeId ); + new AppendEntries.Request( member1, 1, -1, -1, entries, -1 ), state, log() ); state.update( outcome ); // then @@ -181,8 +177,7 @@ public void followerLearningAboutHigherCommitCausesValuesTobeAppliedToItsLog() t // when receiving AppEntries with high leader commit (3) Outcome outcome = follower.handle( new AppendEntries.Request( myself, 0, 2, 0, - new RaftLogEntry[] { new RaftLogEntry( 0, ContentGenerator.content() ) }, 3 ), state, log(), - storeId ); + new RaftLogEntry[] { new RaftLogEntry( 0, ContentGenerator.content() ) }, 3 ), state, log() ); state.update( outcome ); @@ -221,7 +216,7 @@ public void shouldUpdateCommitIndexIfNecessary() throws Exception Outcome outcome = follower.handle( appendEntriesRequest() .leaderTerm( term ).prevLogIndex( 2 ) .prevLogTerm( term ).leaderCommit( localCommitIndex + 4 ) - .build(), state, log(), storeId ); + .build(), state, log() ); state.update( outcome ); @@ -242,7 +237,7 @@ public void shouldRenewElectionTimeoutOnReceiptOfHeartbeatInCurrentOrHigherTerm( Follower follower = new Follower(); Outcome outcome = follower.handle( new RaftMessages.Heartbeat( myself, 1, 1, 1 ), - state, log(), storeId ); + state, log() ); // then assertTrue( outcome.electionTimeoutRenewed() ); @@ -260,7 +255,7 @@ public void shouldNotRenewElectionTimeoutOnReceiptOfHeartbeatInLowerTerm() throw Follower follower = new Follower(); Outcome outcome = follower.handle( new RaftMessages.Heartbeat( myself, 1, 1, 1 ), - state, log(), storeId ); + state, log() ); // then assertFalse( outcome.electionTimeoutRenewed() ); @@ -275,13 +270,13 @@ private void appendSomeEntriesToLog( RaftState raft, Follower follower, int numb { raft.update( follower.handle( new AppendEntries.Request( myself, term, i - 1, -1, new RaftLogEntry[] { new RaftLogEntry( term, ContentGenerator.content() ) }, -1 - ), raft, log(), storeId ) ); + ), raft, log() ) ); } else { raft.update( follower.handle( new AppendEntries.Request( myself, term, i - 1, term, new RaftLogEntry[]{new RaftLogEntry( term, ContentGenerator.content() )}, -1 ), raft, - log(), storeId ) ); + log() ) ); } } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/HeartbeatTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/HeartbeatTest.java index 3b0aed60ee1d..52bd13b3b198 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/HeartbeatTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/HeartbeatTest.java @@ -27,7 +27,6 @@ import java.util.Arrays; import java.util.Collection; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; import org.neo4j.coreedge.raft.log.RaftLogEntry; @@ -39,7 +38,6 @@ import static org.hamcrest.Matchers.empty; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; import static org.neo4j.coreedge.raft.TestMessageBuilders.heartbeat; import static org.neo4j.coreedge.raft.roles.AppendEntriesRequestTest.ContentGenerator.content; import static org.neo4j.coreedge.raft.state.RaftStateBuilder.raftState; @@ -48,8 +46,6 @@ @RunWith(Parameterized.class) public class HeartbeatTest { - private final LocalDatabase storeId = mock( LocalDatabase.class); - @Parameterized.Parameters(name = "{0} with leader {1} terms ahead.") public static Collection data() { @@ -86,7 +82,7 @@ public void shouldNotResultInCommitIfReferringToFutureEntries() throws Exception .leaderTerm( leaderTerm ) .build(); - Outcome outcome = role.handler.handle( heartbeat, state, log(), storeId ); + Outcome outcome = role.handler.handle( heartbeat, state, log() ); assertThat( outcome.getLogCommands(), empty()); } @@ -110,7 +106,7 @@ public void shouldNotResultInCommitIfHistoryMismatches() throws Exception .leaderTerm( leaderTerm ) .build(); - Outcome outcome = role.handler.handle( heartbeat, state, log(), storeId ); + Outcome outcome = role.handler.handle( heartbeat, state, log() ); assertThat( outcome.getCommitIndex(), Matchers.equalTo(0L) ); } @@ -134,7 +130,7 @@ public void shouldResultInCommitIfHistoryMatches() throws Exception .leaderTerm( leaderTerm ) .build(); - Outcome outcome = role.handler.handle( heartbeat, state, log(), storeId ); + Outcome outcome = role.handler.handle( heartbeat, state, log() ); assertThat( outcome.getLogCommands(), empty() ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java index 46b49a8d5adc..1af2ef0135c5 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java @@ -24,7 +24,6 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RaftMessages.AppendEntries; import org.neo4j.coreedge.raft.RaftMessages.Timeout.Heartbeat; @@ -82,8 +81,6 @@ public class LeaderTest private LogProvider logProvider = NullLogProvider.getInstance(); - private final LocalDatabase localDatabase = mock( LocalDatabase.class); - private static final ReplicatedString CONTENT = ReplicatedString.valueOf( "some-content-to-raft" ); @Test @@ -103,7 +100,7 @@ public void leaderShouldNotRespondToSuccessResponseFromFollowerThatWillSoonUpToD ReadableRaftState state = mock( ReadableRaftState.class ); - FollowerStates followerState = new FollowerStates<>(); + FollowerStates followerState = new FollowerStates<>(); followerState = new FollowerStates<>( followerState, instance2, instance2State ); ReadableRaftLog logMock = mock( ReadableRaftLog.class ); @@ -119,13 +116,13 @@ public void leaderShouldNotRespondToSuccessResponseFromFollowerThatWillSoonUpToD RaftMessages.AppendEntries.Response response = appendEntriesResponse().success() .matchIndex( 90 ).term( 4 ).from( instance2 ).build(); - Outcome outcome = leader.handle( response, state, mock( Log.class ), localDatabase ); + Outcome outcome = leader.handle( response, state, mock( Log.class ) ); // then // The leader should not be trying to send any messages to that instance assertTrue( outcome.getOutgoingMessages().isEmpty() ); // And the follower state should be updated - FollowerStates leadersViewOfFollowerStates = outcome.getFollowerStates(); + FollowerStates leadersViewOfFollowerStates = outcome.getFollowerStates(); assertEquals( 90, leadersViewOfFollowerStates.get( instance2 ).getMatchIndex() ); } @@ -145,7 +142,7 @@ public void leaderShouldNotRespondToSuccessResponseThatIndicatesUpToDateFollower ReadableRaftState state = mock( ReadableRaftState.class ); - FollowerStates followerState = new FollowerStates<>(); + FollowerStates followerState = new FollowerStates<>(); followerState = new FollowerStates<>( followerState, instance2, instance2State ); ReadableRaftLog logMock = mock( ReadableRaftLog.class ); @@ -161,13 +158,13 @@ public void leaderShouldNotRespondToSuccessResponseThatIndicatesUpToDateFollower RaftMessages.AppendEntries.Response response = appendEntriesResponse().success() .matchIndex( 100 ).term( 4 ).from( instance2 ).build(); - Outcome outcome = leader.handle( response, state, mock( Log.class ), localDatabase ); + Outcome outcome = leader.handle( response, state, mock( Log.class ) ); // then // The leader should not be trying to send any messages to that instance assertTrue( outcome.getOutgoingMessages().isEmpty() ); // And the follower state should be updated - FollowerStates updatedFollowerStates = outcome.getFollowerStates(); + FollowerStates updatedFollowerStates = outcome.getFollowerStates(); assertEquals( 100, updatedFollowerStates.get( instance2 ).getMatchIndex() ); } @@ -188,7 +185,7 @@ public void leaderShouldRespondToSuccessResponseThatIndicatesLaggingFollowerWith ReadableRaftState state = mock( ReadableRaftState.class ); - FollowerStates followerState = new FollowerStates<>(); + FollowerStates followerState = new FollowerStates<>(); followerState = new FollowerStates<>( followerState, instance2, instance2State ); ReadableRaftLog logMock = mock( ReadableRaftLog.class ); @@ -208,7 +205,7 @@ public void leaderShouldRespondToSuccessResponseThatIndicatesLaggingFollowerWith .term( 231 ) .from( instance2 ).build(); - Outcome outcome = leader.handle( response, state, mock( Log.class ), localDatabase ); + Outcome outcome = leader.handle( response, state, mock( Log.class ) ); // then int matchCount = 0; @@ -241,7 +238,7 @@ public void leaderShouldIgnoreSuccessResponseThatIndicatesLaggingWhileLocalState ReadableRaftState state = mock( ReadableRaftState.class ); - FollowerStates followerState = new FollowerStates<>(); + FollowerStates followerState = new FollowerStates<>(); followerState = new FollowerStates<>( followerState, instance2, instance2State ); ReadableRaftLog logMock = mock( ReadableRaftLog.class ); @@ -261,13 +258,13 @@ public void leaderShouldIgnoreSuccessResponseThatIndicatesLaggingWhileLocalState .term( 4 ) .from( instance2 ).build(); - Outcome outcome = leader.handle( response, state, mock( Log.class ), localDatabase ); + Outcome outcome = leader.handle( response, state, mock( Log.class ) ); // then the leader should not send anything, since this is a delayed, out of order response to a previous append // request assertTrue( outcome.getOutgoingMessages().isEmpty() ); // The follower state should not be touched - FollowerStates updatedFollowerStates = outcome.getFollowerStates(); + FollowerStates updatedFollowerStates = outcome.getFollowerStates(); assertEquals( 100, updatedFollowerStates.get( instance2 ).getMatchIndex() ); } @@ -293,7 +290,7 @@ public void leaderShouldSpawnMismatchCommandOnFailure() throws Exception ReadableRaftState state = mock( ReadableRaftState.class ); - FollowerStates followerState = new FollowerStates<>(); + FollowerStates followerState = new FollowerStates<>(); followerState = new FollowerStates<>( followerState, instance2, instance2State ); RaftLog log = new InMemoryRaftLog(); @@ -316,7 +313,7 @@ public void leaderShouldSpawnMismatchCommandOnFailure() throws Exception .term( 4 ) .from( instance2 ).build(); - Outcome outcome = leader.handle( response, state, mock( Log.class ), localDatabase ); + Outcome outcome = leader.handle( response, state, mock( Log.class ) ); // then int mismatchCount = 0; @@ -344,7 +341,7 @@ public void leaderShouldRejectAppendEntriesResponseWithNewerTermAndBecomeAFollow .from( member1 ) .term( state.term() + 1 ) .build(); - Outcome outcome = leader.handle( message, state, log(), localDatabase ); + Outcome outcome = leader.handle( message, state, log() ); // then assertEquals( 0, count( outcome.getOutgoingMessages() ) ); @@ -366,7 +363,7 @@ public void leaderShouldSendHeartbeatsToAllClusterMembersOnReceiptOfHeartbeatTic Leader leader = new Leader(); // when - Outcome outcome = leader.handle( new Heartbeat( member1 ), state, log(), localDatabase ); + Outcome outcome = leader.handle( new Heartbeat( member1 ), state, log() ); // then assertTrue( messageFor( outcome, member1 ) instanceof RaftMessages.Heartbeat ); @@ -388,7 +385,7 @@ public void leaderShouldDecideToAppendToItsLogAndSendAppendEntriesMessageOnRecei member( 9 ), CONTENT ); // when - Outcome outcome = leader.handle( newEntryRequest, state, log(), localDatabase ); + Outcome outcome = leader.handle( newEntryRequest, state, log() ); //state.update( outcome ); // then @@ -419,7 +416,7 @@ public void leaderShouldHandleBatch() throws Exception batchRequest.add( valueOf( 2 ) ); // when - Outcome outcome = leader.handle( batchRequest, state, log(), localDatabase ); + Outcome outcome = leader.handle( batchRequest, state, log() ); // then BatchAppendLogEntries logCommand = (BatchAppendLogEntries) single( outcome.getLogCommands() ); @@ -462,7 +459,7 @@ public void leaderShouldCommitOnMajorityResponse() throws Exception // when a single instance responds (plus self == 2 out of 3 instances) Outcome outcome = leader.handle( new RaftMessages.AppendEntries.Response( member1, 0, true, 0, 0 ), - state, log(), localDatabase ); + state, log() ); // then assertEquals( 0L, outcome.getCommitIndex() ); @@ -491,7 +488,7 @@ public void leaderShouldCommitAllPreviouslyAppendedEntriesWhenCommittingLaterEnt // when Outcome outcome = leader.handle( new AppendEntries.Response( member1, 0, true, 2, 2 ), - state, log(), localDatabase ); + state, log() ); state.update( outcome ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/NonFollowerVoteRequestTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/NonFollowerVoteRequestTest.java index 3e7a7571e6b6..915237926727 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/NonFollowerVoteRequestTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/NonFollowerVoteRequestTest.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.util.Collection; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.state.RaftState; @@ -37,7 +36,6 @@ import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.mockito.Mockito.mock; import static org.neo4j.coreedge.raft.MessageUtils.messageFor; import static org.neo4j.coreedge.raft.TestMessageBuilders.voteRequest; import static org.neo4j.coreedge.raft.state.RaftStateBuilder.raftState; @@ -57,8 +55,6 @@ public static Collection data() { private CoreMember myself = member( 0 ); private CoreMember member1 = member( 1 ); - private final LocalDatabase storeId = mock( LocalDatabase.class); - @Test public void shouldRejectVoteRequestFromCurrentTerm() throws Exception { @@ -69,7 +65,7 @@ public void shouldRejectVoteRequestFromCurrentTerm() throws Exception Outcome outcome = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) .lastLogIndex( 0 ) - .lastLogTerm( -1 ).build(), state, log(), storeId ); + .lastLogTerm( -1 ).build(), state, log() ); // then assertFalse( ((RaftMessages.Vote.Response) messageFor( outcome, member1 )).voteGranted() ); @@ -86,7 +82,7 @@ public void shouldRejectVoteRequestFromPreviousTerm() throws Exception Outcome outcome = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) .lastLogIndex( 0 ) - .lastLogTerm( -1 ).build(), state, log(), storeId ); + .lastLogTerm( -1 ).build(), state, log() ); // then assertFalse( ((RaftMessages.Vote.Response) messageFor( outcome, member1 )).voteGranted() ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/VoteRequestTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/VoteRequestTest.java index c54afe9fbf94..95a2c83e1974 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/VoteRequestTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/VoteRequestTest.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.util.Collection; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.state.RaftState; @@ -38,7 +37,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; import static org.neo4j.coreedge.raft.MessageUtils.messageFor; import static org.neo4j.coreedge.raft.TestMessageBuilders.voteRequest; import static org.neo4j.coreedge.raft.state.RaftStateBuilder.raftState; @@ -50,8 +48,6 @@ @RunWith(Parameterized.class) public class VoteRequestTest { - private final LocalDatabase storeId = mock( LocalDatabase.class); - @Parameterized.Parameters(name = "{0}") public static Collection data() { return asList( Role.values() ); @@ -75,7 +71,7 @@ public void shouldVoteForCandidateInLaterTerm() throws Exception Outcome outcome = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) .lastLogIndex( 0 ) - .lastLogTerm( -1 ).build(), state, log(), storeId ); + .lastLogTerm( -1 ).build(), state, log() ); // then assertTrue( ((RaftMessages.Vote.Response) messageFor( outcome, member1 )).voteGranted() ); @@ -92,7 +88,7 @@ public void shouldDenyForCandidateInPreviousTerm() throws Exception Outcome outcome = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) .lastLogIndex( 0 ) - .lastLogTerm( -1 ).build(), state, log(), storeId ); + .lastLogTerm( -1 ).build(), state, log() ); // then assertFalse( ((RaftMessages.Vote.Response) messageFor( outcome, member1 )).voteGranted() ); @@ -110,13 +106,13 @@ public void shouldVoteForOnlyOneCandidatePerTerm() throws Exception Outcome outcome1 = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) .lastLogIndex( 0 ) - .lastLogTerm( -1 ).build(), state, log(), storeId ); + .lastLogTerm( -1 ).build(), state, log() ); state.update( outcome1 ); Outcome outcome2 = role.handler.handle( voteRequest().from( member2 ).term( candidateTerm ) .lastLogIndex( 0 ) - .lastLogTerm( -1 ).build(), state, log(), storeId ); + .lastLogTerm( -1 ).build(), state, log() ); // then assertFalse( ((RaftMessages.Vote.Response) messageFor( outcome2, member2 )).voteGranted() ); @@ -133,7 +129,7 @@ public void shouldStayInCurrentRoleOnRequestFromCurrentTerm() throws Exception Outcome outcome = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) .lastLogIndex( 0 ) - .lastLogTerm( -1 ).build(), state, log(), storeId ); + .lastLogTerm( -1 ).build(), state, log() ); // then assertEquals( role, outcome.getRole() ); @@ -150,7 +146,7 @@ public void shouldMoveToFollowerIfRequestIsFromLaterTerm() throws Exception Outcome outcome = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) .lastLogIndex( 0 ) - .lastLogTerm( -1 ).build(), state, log(), storeId ); + .lastLogTerm( -1 ).build(), state, log() ); // then assertEquals( Role.FOLLOWER, outcome.getRole() ); @@ -167,7 +163,7 @@ public void shouldUpdateTermIfRequestIsFromLaterTerm() throws Exception Outcome outcome = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) .lastLogIndex( 0 ) - .lastLogTerm( -1 ).build(), state, log(), storeId ); + .lastLogTerm( -1 ).build(), state, log() ); // then assertEquals( candidateTerm, outcome.getTerm() ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java index 4e2dc895beb9..d83b822104dc 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java @@ -38,6 +38,7 @@ import org.neo4j.coreedge.raft.replication.session.LocalOperationId; import org.neo4j.coreedge.raft.replication.tx.CoreReplicatedContent; import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransaction; +import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy; import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.monitoring.Monitors; @@ -79,7 +80,7 @@ public class CoreStateTest private final Monitors monitors = new Monitors(); private final CoreState coreState = new CoreState( raftLog, batchSize, flushEvery, () -> dbHealth, NullLogProvider.getInstance(), new ProgressTrackerImpl( globalSession ), lastFlushedStorage, - sessionStorage, applier, mock( CoreStateDownloader.class ), inFlightMap, monitors ); + sessionStorage, mock( CoreServerSelectionStrategy.class), applier, mock( CoreStateDownloader.class ), inFlightMap, monitors ); private ReplicatedTransaction nullTx = new ReplicatedTransaction( null ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/ProcessMessage.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/ProcessMessage.java index 6b7cdaa9eea7..7987e42e746b 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/ProcessMessage.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/ProcessMessage.java @@ -23,7 +23,6 @@ import java.util.LinkedList; import java.util.Queue; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.state.explorer.ClusterState; @@ -56,8 +55,7 @@ public ClusterState advance( ClusterState previous ) throws IOException } ComparableRaftState memberState = previous.states.get( member ); ComparableRaftState newMemberState = new ComparableRaftState( memberState ); - Outcome outcome = previous.roles.get( member ).handler.handle( message, memberState, log, - mock( LocalDatabase.class) ); + Outcome outcome = previous.roles.get( member ).handler.handle( message, memberState, log ); newMemberState.update( outcome ); for ( RaftMessages.Directed outgoingMessage : outcome.getOutgoingMessages() )