Skip to content

Commit

Permalink
Making Outbound generic
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Needham committed Jun 16, 2016
1 parent 2651e8c commit 1ed96d0
Show file tree
Hide file tree
Showing 22 changed files with 86 additions and 89 deletions.
Expand Up @@ -30,6 +30,7 @@
import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.discovery.CoreServerSelectionException;
import org.neo4j.coreedge.helper.VolatileFuture;
import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.segmented.InFlightMap;
Expand Down Expand Up @@ -111,7 +112,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName
private final VolatileFuture<MEMBER> volatileLeader = new VolatileFuture<>( null );

private final CoreServerSelectionStrategy defaultStrategy;
private final Outbound<MEMBER> outbound;
private final Outbound<MEMBER, RaftMessages.RaftMessage<MEMBER>> outbound;
private final Log log;
private Role currentRole = Role.FOLLOWER;

Expand All @@ -122,7 +123,7 @@ public RaftInstance( MEMBER myself, StateStorage<TermState> termStorage,
RaftStateMachine raftStateMachine, long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService,
CoreServerSelectionStrategy defaultStrategy,
final Outbound<MEMBER> outbound,
Outbound<MEMBER, RaftMessages.RaftMessage<MEMBER>> outbound,
LogProvider logProvider, RaftMembershipManager<MEMBER> membershipManager,
RaftLogShippingManager<MEMBER> logShipping,
Supplier<DatabaseHealth> databaseHealthSupplier,
Expand Down
Expand Up @@ -269,7 +269,7 @@ else if ( isSafeToRemoveMember() && superfluousMembers().size() > 0 )
@Override
public void doConsensus( Set<MEMBER> newVotingMemberSet )
{
replicator.replicate( memberSetBuilder.build( newVotingMemberSet ), localDatabase.storeId() );
replicator.replicate( memberSetBuilder.build( newVotingMemberSet ) );
}

@Override
Expand Down
Expand Up @@ -23,21 +23,21 @@

import org.neo4j.coreedge.server.logging.MessageLogger;

public class LoggingOutbound<MEMBER> implements Outbound<MEMBER>
public class LoggingOutbound<MEMBER, MESSAGE extends Message> implements Outbound<MEMBER, MESSAGE>
{
private final Outbound<MEMBER> outbound;
private final Outbound<MEMBER,MESSAGE> outbound;
private final MEMBER me;
private final MessageLogger<MEMBER> messageLogger;

public LoggingOutbound( Outbound<MEMBER> outbound, MEMBER me, MessageLogger<MEMBER> messageLogger )
public LoggingOutbound( Outbound<MEMBER,MESSAGE> outbound, MEMBER me, MessageLogger<MEMBER> messageLogger )
{
this.outbound = outbound;
this.me = me;
this.messageLogger = messageLogger;
}

@Override
public void send( MEMBER to, Message... messages )
public void send( MEMBER to, MESSAGE... messages )
{
messageLogger.log( me, to, messages );
outbound.send( to, messages );
Expand Down
Expand Up @@ -27,12 +27,12 @@
* delivered to the wrong host.
* @param <MEMBER> The type of members that messages will be sent to.
*/
public interface Outbound<MEMBER>
public interface Outbound<MEMBER, MESSAGE extends Message>
{
/**
* Asynchronous, best effort delivery to destination.
* @param to destination
* @param messages The messages to send
*/
void send( MEMBER to, Message... messages );
void send( MEMBER to, MESSAGE... messages );
}
Expand Up @@ -30,23 +30,23 @@
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreMember;

public class RaftOutbound implements Outbound<CoreMember>
public class RaftOutbound implements Outbound<CoreMember, RaftMessage<CoreMember>>
{
private final Outbound<AdvertisedSocketAddress> outbound;
private final Outbound<AdvertisedSocketAddress,Message> outbound;
private final LocalDatabase localDatabase;

public RaftOutbound( Outbound<AdvertisedSocketAddress> outbound, LocalDatabase localDatabase )
public RaftOutbound( Outbound<AdvertisedSocketAddress,Message> outbound, LocalDatabase localDatabase )
{
this.outbound = outbound;
this.localDatabase = localDatabase;
}

@Override
public void send( CoreMember to, Message... messages )
public void send( CoreMember to, RaftMessage<CoreMember>... messages )
{
@SuppressWarnings("unchecked")
StoreIdAwareMessage<CoreMember>[] storeIdAwareMessages = Arrays.stream( messages ).
map( m -> new StoreIdAwareMessage<>( localDatabase.storeId(), (RaftMessage<CoreMember>) m ) ).
map( m -> new StoreIdAwareMessage<>( localDatabase.storeId(), m ) ).
toArray( StoreIdAwareMessage[]::new );
outbound.send( to.getRaftAddress(), storeIdAwareMessages );
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.coreedge.raft.replication;

import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.net.Outbound;
import org.neo4j.kernel.impl.store.StoreId;
Expand All @@ -27,16 +28,17 @@
public class LeaderOnlyReplicator<MEMBER>
{
private final MEMBER source;
private final Outbound<MEMBER> outbound;
private final Outbound<MEMBER, RaftMessages.RaftMessage<MEMBER>> outbound;

public LeaderOnlyReplicator( MEMBER source, Outbound<MEMBER> outbound )
public LeaderOnlyReplicator( MEMBER source, Outbound<MEMBER, RaftMessages.RaftMessage<MEMBER>> outbound )
{
this.source = source;
this.outbound = outbound;
}

public void replicate( ReplicatedContent content, StoreId storeId )
public void replicate( ReplicatedContent content )
{
//noinspection unchecked
outbound.send( source, new RaftMessages.NewEntry.Request<>( source, content ) );
}
}
Expand Up @@ -23,6 +23,7 @@
import java.util.function.BiConsumer;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.LeaderLocator;
import org.neo4j.coreedge.raft.NoLeaderFoundException;
import org.neo4j.coreedge.raft.RaftMessages;
Expand All @@ -38,24 +39,23 @@
public class RaftReplicator<MEMBER> implements Replicator<ReplicatedContent>, Listener<MEMBER>
{
private final MEMBER me;
private final Outbound<MEMBER> outbound;
private final Outbound<MEMBER, RaftMessages.RaftMessage<MEMBER>> outbound;
private final ProgressTracker progressTracker;
private final LocalSessionPool sessionPool;
private final RetryStrategy retryStrategy;
private final LocalDatabase localDatabase;

private MEMBER leader;

public RaftReplicator( LeaderLocator<MEMBER> leaderLocator, MEMBER me, Outbound<MEMBER> outbound,
public RaftReplicator( LeaderLocator<MEMBER> leaderLocator, MEMBER me,
Outbound<MEMBER, RaftMessages.RaftMessage<MEMBER>> outbound,
LocalSessionPool<MEMBER> sessionPool, ProgressTracker progressTracker,
RetryStrategy retryStrategy, LocalDatabase localDatabase )
RetryStrategy retryStrategy )
{
this.me = me;
this.outbound = outbound;
this.progressTracker = progressTracker;
this.sessionPool = sessionPool;
this.retryStrategy = retryStrategy;
this.localDatabase = localDatabase;

try
{
Expand All @@ -79,6 +79,7 @@ public Future<Object> replicate( ReplicatedContent command, boolean trackResult
RetryStrategy.Timeout timeout = retryStrategy.newTimeout();
do
{
//noinspection unchecked
outbound.send( leader, new RaftMessages.NewEntry.Request<>( me, operation ) );
try
{
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.time.Clock;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.raft.LeaderContext;
import org.neo4j.coreedge.raft.RaftMessages;
Expand Down Expand Up @@ -94,7 +95,7 @@ enum Mode
PIPELINE
}

private final Outbound<MEMBER> outbound;
private final Outbound<MEMBER, RaftMessages.RaftMessage<MEMBER>> outbound;
private final LogProvider logProvider;
private final Log log;
private final ReadableRaftLog raftLog;
Expand Down Expand Up @@ -126,7 +127,8 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName

private Mode mode = Mode.MISMATCH;

RaftLogShipper( Outbound<MEMBER> outbound, LogProvider logProvider, ReadableRaftLog raftLog, Clock clock,
RaftLogShipper( Outbound<MEMBER, RaftMessages.RaftMessage<MEMBER>> outbound, LogProvider logProvider,
ReadableRaftLog raftLog, Clock clock,
MEMBER leader, MEMBER follower, long leaderTerm, long leaderCommit, long retryTimeMillis,
int catchupBatchSize, int maxAllowedShippingLag, InFlightMap<Long, RaftLogEntry> inFlightMap )
{
Expand Down
Expand Up @@ -26,7 +26,9 @@
import java.util.Map;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.LeaderContext;
import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.log.segmented.InFlightMap;
Expand All @@ -39,7 +41,7 @@

public class RaftLogShippingManager<MEMBER> implements RaftMembership.Listener
{
private final Outbound<MEMBER> outbound;
private final Outbound<MEMBER,RaftMessages.RaftMessage<MEMBER>> outbound;
private final LogProvider logProvider;
private final ReadableRaftLog raftLog;
private final Clock clock;
Expand All @@ -50,18 +52,18 @@ public class RaftLogShippingManager<MEMBER> implements RaftMembership.Listener
private final int catchupBatchSize;
private final int maxAllowedShippingLag;
private final InFlightMap<Long,RaftLogEntry> inFlightMap;
private final LocalDatabase localDatabase;

private Map<MEMBER,RaftLogShipper> logShippers = new HashMap<>();
private LeaderContext lastLeaderContext;

private boolean running;
private boolean destroyed = false;

public RaftLogShippingManager( Outbound<MEMBER> outbound, LogProvider logProvider, ReadableRaftLog raftLog,
public RaftLogShippingManager( Outbound<MEMBER,RaftMessages.RaftMessage<MEMBER>> outbound, LogProvider logProvider,
ReadableRaftLog raftLog,
Clock clock, MEMBER myself, RaftMembership<MEMBER> membership, long retryTimeMillis,
int catchupBatchSize, int maxAllowedShippingLag,
InFlightMap<Long, RaftLogEntry> inFlightMap, LocalDatabase localDatabase )
InFlightMap<Long, RaftLogEntry> inFlightMap )
{
this.outbound = outbound;
this.logProvider = logProvider;
Expand All @@ -73,8 +75,6 @@ public RaftLogShippingManager( Outbound<MEMBER> outbound, LogProvider logProvide
this.catchupBatchSize = catchupBatchSize;
this.maxAllowedShippingLag = maxAllowedShippingLag;
this.inFlightMap = inFlightMap;
this.localDatabase = localDatabase;

membership.registerListener( this );
}

Expand Down
Expand Up @@ -42,7 +42,7 @@

import static java.util.concurrent.TimeUnit.MICROSECONDS;

public class SenderService extends LifecycleAdapter implements Outbound<AdvertisedSocketAddress>
public class SenderService extends LifecycleAdapter implements Outbound<AdvertisedSocketAddress,Message>
{
private NonBlockingChannels nonBlockingChannels;

Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.discovery.DiscoveryServiceFactory;
import org.neo4j.coreedge.discovery.RaftDiscoveryServiceConnector;
import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.BatchingMessageHandler;
import org.neo4j.coreedge.raft.ContinuousJob;
import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService;
Expand Down Expand Up @@ -290,7 +291,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
messageLogger = new NullMessageLogger<>();
}

LoggingOutbound<AdvertisedSocketAddress> loggingOutbound = new LoggingOutbound<>(
LoggingOutbound<AdvertisedSocketAddress,Message> loggingOutbound = new LoggingOutbound<>(
senderService, myself.getRaftAddress(), messageLogger );

ListenSocketAddress raftListenAddress = config.get( CoreEdgeClusterSettings.raft_listen_address );
Expand Down Expand Up @@ -381,7 +382,7 @@ fileSystem, new File( clusterStateDirectory, "session-tracker-state" ), "session

RaftReplicator<CoreMember> replicator = new RaftReplicator<>( raft, myself,
new RaftOutbound( loggingOutbound, localDatabase ), sessionPool, progressTracker,
new ExponentialBackoffStrategy( 10, SECONDS ), localDatabase );
new ExponentialBackoffStrategy( 10, SECONDS ) );

dependencies.satisfyDependency( raft );

Expand Down Expand Up @@ -610,7 +611,7 @@ private File createClusterStateDirectory( File dir, FileSystemAbstraction fileSy
}

private static RaftInstance<CoreMember> createRaft( LifeSupport life,
Outbound<AdvertisedSocketAddress> outbound,
Outbound<AdvertisedSocketAddress, Message> outbound,
CoreTopologyService discoveryService,
Config config,
MessageLogger<AdvertisedSocketAddress> messageLogger,
Expand Down Expand Up @@ -693,7 +694,7 @@ fileSystem, new File( clusterStateDirectory, "term-state" ), "term-state",
Clock.systemUTC(), myself, raftMembershipManager, electionTimeout,
config.get( CoreEdgeClusterSettings.catchup_batch_size ),
config.get( CoreEdgeClusterSettings.log_shipping_max_lag ),
inFlightMap, localDatabase );
inFlightMap );

RaftInstance<CoreMember> raftInstance = new RaftInstance<>(
myself, termState, voteState, raftLog, raftStateMachine, electionTimeout, heartbeatInterval,
Expand Down

0 comments on commit 1ed96d0

Please sign in to comment.