Skip to content

Commit

Permalink
cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed May 11, 2016
1 parent 96773ed commit 346d90e
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 51 deletions.
Expand Up @@ -19,16 +19,7 @@
*/
package org.neo4j.coreedge.raft;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.neo4j.coreedge.helper.VolatileFuture;
import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
Expand All @@ -45,16 +36,22 @@
import org.neo4j.coreedge.raft.state.StateStorage;
import org.neo4j.coreedge.raft.state.term.TermState;
import org.neo4j.coreedge.raft.state.vote.VoteState;
import org.neo4j.graphdb.TransientFailureException;
import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static java.lang.String.format;
import static java.util.Collections.singletonList;

import static org.neo4j.coreedge.raft.roles.Role.LEADER;

/**
Expand All @@ -77,7 +74,8 @@
*
* @param <MEMBER> The membership type.
*/
public class RaftInstance<MEMBER> implements LeaderLocator<MEMBER>, Inbound.MessageHandler, CoreMetaData
public class RaftInstance<MEMBER> implements LeaderLocator<MEMBER>,
Inbound.MessageHandler<RaftMessages.RaftMessage<MEMBER>>, CoreMetaData
{
private final LeaderNotFoundMonitor leaderNotFoundMonitor;

Expand Down Expand Up @@ -111,7 +109,7 @@ public RaftInstance( MEMBER myself, StateStorage<TermState> termStorage,
StateStorage<VoteState<MEMBER>> voteStorage, RaftLog entryLog,
RaftStateMachine raftStateMachine, long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService,
final Inbound inbound, final Outbound<MEMBER> outbound,
final Inbound<RaftMessages.RaftMessage<MEMBER>> inbound, final Outbound<MEMBER> outbound,
LogProvider logProvider, RaftMembershipManager<MEMBER> membershipManager,
RaftLogShippingManager<MEMBER> logShipping,
Supplier<DatabaseHealth> databaseHealthSupplier,
Expand Down Expand Up @@ -293,12 +291,11 @@ else if ( oldLeader != null && !oldLeader.equals( outcome.getLeader() ) )
return false;
}

public synchronized void handle( Message incomingMessage )
public synchronized void handle( RaftMessages.RaftMessage<MEMBER> incomingMessage )
{
try
{
Outcome<MEMBER> outcome = currentRole.handler.handle(
(RaftMessages.RaftMessage<MEMBER>) incomingMessage, state, log );
Outcome<MEMBER> outcome = currentRole.handler.handle( incomingMessage, state, log );

boolean newLeaderWasElected = leaderChanged( outcome, state.leader() );
boolean newCommittedEntry = outcome.getCommitIndex() > state.commitIndex();
Expand Down
Expand Up @@ -46,12 +46,12 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class RaftServer<MEMBER> extends LifecycleAdapter implements Inbound
public class RaftServer<MEMBER> extends LifecycleAdapter implements Inbound<RaftMessages.RaftMessage<MEMBER>>
{
private final ListenSocketAddress listenAddress;
private final Log log;
private final ByteBufMarshal<ReplicatedContent> marshal;
private MessageHandler messageHandler;
private MessageHandler<RaftMessages.RaftMessage<MEMBER>> messageHandler;
private EventLoopGroup workerGroup;
private Channel channel;

Expand Down Expand Up @@ -118,7 +118,7 @@ protected void initChannel( SocketChannel ch ) throws Exception
}

@Override
public void registerHandler( Inbound.MessageHandler handler )
public void registerHandler( Inbound.MessageHandler<RaftMessages.RaftMessage<MEMBER>> handler )
{
this.messageHandler = handler;
}
Expand Down
Expand Up @@ -21,12 +21,12 @@

import org.neo4j.coreedge.network.Message;

public interface Inbound
public interface Inbound<M extends Message>
{
void registerHandler( MessageHandler handler );
void registerHandler( MessageHandler<M> handler );

interface MessageHandler
interface MessageHandler<M extends Message>
{
void handle( Message message );
void handle( M message );
}
}
Expand Up @@ -24,13 +24,13 @@
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.logging.MessageLogger;

public class LoggingInbound implements Inbound
public class LoggingInbound<M extends Message> implements Inbound<M>
{
private final Inbound inbound;
private final Inbound<M> inbound;
private final MessageLogger<AdvertisedSocketAddress> messageLogger;
private final AdvertisedSocketAddress me;

public LoggingInbound( Inbound inbound, MessageLogger<AdvertisedSocketAddress> messageLogger,
public LoggingInbound( Inbound<M> inbound, MessageLogger<AdvertisedSocketAddress> messageLogger,
AdvertisedSocketAddress me )
{
this.inbound = inbound;
Expand All @@ -39,11 +39,11 @@ public LoggingInbound( Inbound inbound, MessageLogger<AdvertisedSocketAddress> m
}

@Override
public void registerHandler( final MessageHandler handler )
public void registerHandler( final MessageHandler<M> handler )
{
inbound.registerHandler( new MessageHandler()
inbound.registerHandler( new MessageHandler<M>()
{
public synchronized void handle( Message message )
public synchronized void handle( M message )
{
messageLogger.log( me, message );
handler.handle( message );
Expand Down
Expand Up @@ -43,11 +43,7 @@
import org.neo4j.coreedge.discovery.CoreDiscoveryService;
import org.neo4j.coreedge.discovery.DiscoveryServiceFactory;
import org.neo4j.coreedge.discovery.RaftDiscoveryServiceConnector;
import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.raft.LeaderLocator;
import org.neo4j.coreedge.raft.RaftInstance;
import org.neo4j.coreedge.raft.RaftServer;
import org.neo4j.coreedge.raft.RaftStateMachine;
import org.neo4j.coreedge.raft.*;
import org.neo4j.coreedge.raft.log.InMemoryRaftLog;
import org.neo4j.coreedge.raft.log.MonitoredRaftLog;
import org.neo4j.coreedge.raft.log.NaiveDurableRaftLog;
Expand Down Expand Up @@ -625,7 +621,8 @@ fileSystem, new File( clusterStateDirectory, "term-state" ), "term-state",
throw new RuntimeException( e );
}

LoggingInbound loggingRaftInbound = new LoggingInbound( raftServer, messageLogger, myself.getRaftAddress() );
LoggingInbound<RaftMessages.RaftMessage<CoreMember>>
loggingRaftInbound = new LoggingInbound<>( raftServer, messageLogger, myself.getRaftAddress() );

long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout );
long heartbeatInterval = electionTimeout / 3;
Expand Down
Expand Up @@ -53,15 +53,14 @@ public class RaftInstanceBuilder<MEMBER>
private RenewableTimeoutService renewableTimeoutService = new DelayedRenewableTimeoutService( Clock.SYSTEM_CLOCK,
NullLogProvider.getInstance() );

private Inbound inbound = handler -> {};
private Inbound<RaftMessages.RaftMessage<MEMBER>> inbound = handler -> {};
private Outbound<MEMBER> outbound = ( advertisedSocketAddress, messages ) -> {};

private LogProvider logProvider = NullLogProvider.getInstance();
private Clock clock = Clock.SYSTEM_CLOCK;

private long electionTimeout = 500;
private long heartbeatInterval = 150;
private long leaderWaitTimeout = 10000;
private long catchupTimeout = 30000;
private long retryTimeMillis = electionTimeout / 2;
private int catchupBatchSize = 64;
Expand Down Expand Up @@ -94,12 +93,6 @@ public RaftInstance<MEMBER> build()
membershipManager, logShipping, databaseHealthSupplier, monitors );
}

public RaftInstanceBuilder<MEMBER> leaderWaitTimeout( long leaderWaitTimeout )
{
this.leaderWaitTimeout = leaderWaitTimeout;
return this;
}

public RaftInstanceBuilder<MEMBER> electionTimeout( long electionTimeout )
{
this.electionTimeout = electionTimeout;
Expand Down
Expand Up @@ -329,9 +329,8 @@ public void shouldThrowExceptionIfReceivesClientRequestWithNoLeaderElected() thr
// Given
ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService();

int leaderWaitTimeout = 10;
RaftInstance<RaftTestMember> raft = new RaftInstanceBuilder<>( myself, 3, RaftTestMemberSetBuilder.INSTANCE )
.timeoutService( timeouts ).leaderWaitTimeout( leaderWaitTimeout ).build();
.timeoutService( timeouts ).build();

raft.bootstrapWithInitialMembers( new RaftTestGroup( asSet( myself, member1, member2 ) ) ); // @logIndex=0

Expand Down Expand Up @@ -460,15 +459,12 @@ public void shouldMonitorLeaderNotFound() throws Exception
// Given
ControlledRenewableTimeoutService timeouts = new ControlledRenewableTimeoutService();

int leaderWaitTimeout = 10;

Monitors monitors = new Monitors();
LeaderNotFoundMonitor leaderNotFoundMonitor = new StubLeaderNotFoundMonitor();
monitors.addMonitorListener( leaderNotFoundMonitor );

RaftInstance<RaftTestMember> raft = new RaftInstanceBuilder<>( myself, 3, RaftTestMemberSetBuilder.INSTANCE )
.timeoutService( timeouts )
.leaderWaitTimeout( leaderWaitTimeout )
.monitors(monitors)
.build();

Expand Down
Expand Up @@ -278,9 +278,9 @@ public synchronized void run()
}
}

public class Inbound implements org.neo4j.coreedge.raft.net.Inbound
public class Inbound implements org.neo4j.coreedge.raft.net.Inbound<Message>
{
private MessageHandler handler;
private MessageHandler<Message> handler;
private final BlockingQueue<Message> Q = new ArrayBlockingQueue<>( 64, true );
private NetworkThread networkThread;

Expand Down Expand Up @@ -311,7 +311,7 @@ public synchronized void deliver( Message message )
}

@Override
public void registerHandler( MessageHandler handler )
public void registerHandler( MessageHandler<Message> handler )
{
this.handler = handler;
}
Expand Down

0 comments on commit 346d90e

Please sign in to comment.