diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java index 62e8fa74ae6c9..c0d99ed03a18f 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java @@ -19,16 +19,7 @@ */ package org.neo4j.coreedge.raft; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Set; -import java.util.concurrent.TimeoutException; -import java.util.function.Predicate; -import java.util.function.Supplier; - import org.neo4j.coreedge.helper.VolatileFuture; -import org.neo4j.coreedge.network.Message; import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLogCompactedException; import org.neo4j.coreedge.raft.log.RaftLogEntry; @@ -45,16 +36,22 @@ import org.neo4j.coreedge.raft.state.StateStorage; import org.neo4j.coreedge.raft.state.term.TermState; import org.neo4j.coreedge.raft.state.vote.VoteState; -import org.neo4j.graphdb.TransientFailureException; import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.function.Supplier; + import static java.lang.String.format; import static java.util.Collections.singletonList; - import static org.neo4j.coreedge.raft.roles.Role.LEADER; /** @@ -77,7 +74,8 @@ * * @param The membership type. */ -public class RaftInstance implements LeaderLocator, Inbound.MessageHandler, CoreMetaData +public class RaftInstance implements LeaderLocator, + Inbound.MessageHandler>, CoreMetaData { private final LeaderNotFoundMonitor leaderNotFoundMonitor; @@ -111,7 +109,7 @@ public RaftInstance( MEMBER myself, StateStorage termStorage, StateStorage> voteStorage, RaftLog entryLog, RaftStateMachine raftStateMachine, long electionTimeout, long heartbeatInterval, RenewableTimeoutService renewableTimeoutService, - final Inbound inbound, final Outbound outbound, + final Inbound> inbound, final Outbound outbound, LogProvider logProvider, RaftMembershipManager membershipManager, RaftLogShippingManager logShipping, Supplier databaseHealthSupplier, @@ -293,12 +291,11 @@ else if ( oldLeader != null && !oldLeader.equals( outcome.getLeader() ) ) return false; } - public synchronized void handle( Message incomingMessage ) + public synchronized void handle( RaftMessages.RaftMessage incomingMessage ) { try { - Outcome outcome = currentRole.handler.handle( - (RaftMessages.RaftMessage) incomingMessage, state, log ); + Outcome outcome = currentRole.handler.handle( incomingMessage, state, log ); boolean newLeaderWasElected = leaderChanged( outcome, state.leader() ); boolean newCommittedEntry = outcome.getCommitIndex() > state.commitIndex(); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java index 34df4df8a2856..69847b4eb2daa 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java @@ -46,12 +46,12 @@ import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -public class RaftServer extends LifecycleAdapter implements Inbound +public class RaftServer extends LifecycleAdapter implements Inbound> { private final ListenSocketAddress listenAddress; private final Log log; private final ByteBufMarshal marshal; - private MessageHandler messageHandler; + private MessageHandler> messageHandler; private EventLoopGroup workerGroup; private Channel channel; @@ -118,7 +118,7 @@ protected void initChannel( SocketChannel ch ) throws Exception } @Override - public void registerHandler( Inbound.MessageHandler handler ) + public void registerHandler( Inbound.MessageHandler> handler ) { this.messageHandler = handler; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Inbound.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Inbound.java index 10425baae5df1..4cd286a3d16c2 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Inbound.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Inbound.java @@ -21,12 +21,12 @@ import org.neo4j.coreedge.network.Message; -public interface Inbound +public interface Inbound { - void registerHandler( MessageHandler handler ); + void registerHandler( MessageHandler handler ); - interface MessageHandler + interface MessageHandler { - void handle( Message message ); + void handle( M message ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/LoggingInbound.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/LoggingInbound.java index 9caf767130eb9..a1f13596f0076 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/LoggingInbound.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/LoggingInbound.java @@ -24,13 +24,13 @@ import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.logging.MessageLogger; -public class LoggingInbound implements Inbound +public class LoggingInbound implements Inbound { - private final Inbound inbound; + private final Inbound inbound; private final MessageLogger messageLogger; private final AdvertisedSocketAddress me; - public LoggingInbound( Inbound inbound, MessageLogger messageLogger, + public LoggingInbound( Inbound inbound, MessageLogger messageLogger, AdvertisedSocketAddress me ) { this.inbound = inbound; @@ -39,11 +39,11 @@ public LoggingInbound( Inbound inbound, MessageLogger m } @Override - public void registerHandler( final MessageHandler handler ) + public void registerHandler( final MessageHandler handler ) { - inbound.registerHandler( new MessageHandler() + inbound.registerHandler( new MessageHandler() { - public synchronized void handle( Message message ) + public synchronized void handle( M message ) { messageLogger.log( me, message ); handler.handle( message ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index 52ea20ae7b3e8..9a7c7992f5f52 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -43,11 +43,7 @@ import org.neo4j.coreedge.discovery.CoreDiscoveryService; import org.neo4j.coreedge.discovery.DiscoveryServiceFactory; import org.neo4j.coreedge.discovery.RaftDiscoveryServiceConnector; -import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService; -import org.neo4j.coreedge.raft.LeaderLocator; -import org.neo4j.coreedge.raft.RaftInstance; -import org.neo4j.coreedge.raft.RaftServer; -import org.neo4j.coreedge.raft.RaftStateMachine; +import org.neo4j.coreedge.raft.*; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; import org.neo4j.coreedge.raft.log.MonitoredRaftLog; import org.neo4j.coreedge.raft.log.NaiveDurableRaftLog; @@ -625,7 +621,8 @@ fileSystem, new File( clusterStateDirectory, "term-state" ), "term-state", throw new RuntimeException( e ); } - LoggingInbound loggingRaftInbound = new LoggingInbound( raftServer, messageLogger, myself.getRaftAddress() ); + LoggingInbound> + loggingRaftInbound = new LoggingInbound<>( raftServer, messageLogger, myself.getRaftAddress() ); long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout ); long heartbeatInterval = electionTimeout / 3; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java index 4b681803f7c3e..947cddb8bb0ed 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java @@ -53,7 +53,7 @@ public class RaftInstanceBuilder private RenewableTimeoutService renewableTimeoutService = new DelayedRenewableTimeoutService( Clock.SYSTEM_CLOCK, NullLogProvider.getInstance() ); - private Inbound inbound = handler -> {}; + private Inbound> inbound = handler -> {}; private Outbound outbound = ( advertisedSocketAddress, messages ) -> {}; private LogProvider logProvider = NullLogProvider.getInstance(); @@ -61,7 +61,6 @@ public class RaftInstanceBuilder private long electionTimeout = 500; private long heartbeatInterval = 150; - private long leaderWaitTimeout = 10000; private long catchupTimeout = 30000; private long retryTimeMillis = electionTimeout / 2; private int catchupBatchSize = 64; @@ -94,12 +93,6 @@ public RaftInstance build() membershipManager, logShipping, databaseHealthSupplier, monitors ); } - public RaftInstanceBuilder leaderWaitTimeout( long leaderWaitTimeout ) - { - this.leaderWaitTimeout = leaderWaitTimeout; - return this; - } - public RaftInstanceBuilder electionTimeout( long electionTimeout ) { this.electionTimeout = electionTimeout; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java index 5a44b7f966abe..91a36c5bbb4bc 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java @@ -329,9 +329,8 @@ public void shouldThrowExceptionIfReceivesClientRequestWithNoLeaderElected() thr // Given ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); - int leaderWaitTimeout = 10; RaftInstance raft = new RaftInstanceBuilder<>( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) - .timeoutService( timeouts ).leaderWaitTimeout( leaderWaitTimeout ).build(); + .timeoutService( timeouts ).build(); raft.bootstrapWithInitialMembers( new RaftTestGroup( asSet( myself, member1, member2 ) ) ); // @logIndex=0 @@ -460,15 +459,12 @@ public void shouldMonitorLeaderNotFound() throws Exception // Given ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService(); - int leaderWaitTimeout = 10; - Monitors monitors = new Monitors(); LeaderNotFoundMonitor leaderNotFoundMonitor = new StubLeaderNotFoundMonitor(); monitors.addMonitorListener( leaderNotFoundMonitor ); RaftInstance raft = new RaftInstanceBuilder<>( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) .timeoutService( timeouts ) - .leaderWaitTimeout( leaderWaitTimeout ) .monitors(monitors) .build(); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestNetwork.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestNetwork.java index 521450dbb1902..9dc300921de84 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestNetwork.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestNetwork.java @@ -278,9 +278,9 @@ public synchronized void run() } } - public class Inbound implements org.neo4j.coreedge.raft.net.Inbound + public class Inbound implements org.neo4j.coreedge.raft.net.Inbound { - private MessageHandler handler; + private MessageHandler handler; private final BlockingQueue Q = new ArrayBlockingQueue<>( 64, true ); private NetworkThread networkThread; @@ -311,7 +311,7 @@ public synchronized void deliver( Message message ) } @Override - public void registerHandler( MessageHandler handler ) + public void registerHandler( MessageHandler handler ) { this.handler = handler; }