Skip to content

Commit

Permalink
Split Outbound to have two sends which makes it much easier for
Browse files Browse the repository at this point in the history
clients to use
  • Loading branch information
Mark Needham committed Jun 16, 2016
1 parent 1ed96d0 commit af3abdb
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 84 deletions.
Expand Up @@ -23,6 +23,7 @@
import io.netty.channel.socket.SocketChannel;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;

import org.neo4j.coreedge.catchup.RequestMessageType;
Expand All @@ -47,6 +48,8 @@
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;

import static java.util.Arrays.asList;

public abstract class CoreClient extends LifecycleAdapter implements StoreFileReceiver,
StoreFileStreamingCompleteListener,
TxStreamCompleteListener, TxPullResponseListener, CoreSnapshotListener
Expand Down Expand Up @@ -92,7 +95,7 @@ public void pollForTransactions( AdvertisedSocketAddress serverAddress, long las

private void send( AdvertisedSocketAddress to, RequestMessageType messageType, Message contentMessage )
{
senderService.send( to, messageType, contentMessage );
senderService.send( to, asList( messageType, contentMessage ) );
}

@Override
Expand Down
Expand Up @@ -25,8 +25,6 @@
import java.util.List;
import java.util.Objects;

import com.sun.org.apache.xml.internal.security.algorithms.MessageDigestAlgorithm;

import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
Expand Down
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.coreedge.raft.net;

import java.util.Collection;

import org.neo4j.coreedge.network.Message;

import org.neo4j.coreedge.server.logging.MessageLogger;
Expand All @@ -37,7 +39,14 @@ public LoggingOutbound( Outbound<MEMBER,MESSAGE> outbound, MEMBER me, MessageLog
}

@Override
public void send( MEMBER to, MESSAGE... messages )
public void send( MEMBER to, MESSAGE message )
{
messageLogger.log( me, to, message );
outbound.send( to, message );
}

@Override
public void send( MEMBER to, Collection<MESSAGE> messages )
{
messageLogger.log( me, to, messages );
outbound.send( to, messages );
Expand Down
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.coreedge.raft.net;

import java.util.Collection;

import org.neo4j.coreedge.network.Message;

/**
Expand All @@ -29,10 +31,17 @@
*/
public interface Outbound<MEMBER, MESSAGE extends Message>
{
/**
* Asynchronous, best effort delivery to destination.
* @param to destination
* @param message The message to send
*/
void send( MEMBER to, MESSAGE message );

/**
* Asynchronous, best effort delivery to destination.
* @param to destination
* @param messages The messages to send
*/
void send( MEMBER to, MESSAGE... messages );
void send( MEMBER to, Collection<MESSAGE> messages );
}
Expand Up @@ -19,17 +19,17 @@
*/
package org.neo4j.coreedge.raft.net;

import java.util.Arrays;
import java.util.Collection;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.network.Message;

import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.RaftMessages.RaftMessage;
import org.neo4j.coreedge.raft.RaftMessages.StoreIdAwareMessage;
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreMember;

import static java.util.stream.Collectors.toList;

public class RaftOutbound implements Outbound<CoreMember, RaftMessage<CoreMember>>
{
private final Outbound<AdvertisedSocketAddress,Message> outbound;
Expand All @@ -42,12 +42,19 @@ public RaftOutbound( Outbound<AdvertisedSocketAddress,Message> outbound, LocalDa
}

@Override
public void send( CoreMember to, RaftMessage<CoreMember>... messages )
public void send( CoreMember to, RaftMessage<CoreMember> message )
{
outbound.send( to.getRaftAddress(), storeIdify( message ) );
}

@Override
public void send( CoreMember to, Collection<RaftMessage<CoreMember>> raftMessages )
{
outbound.send( to.getRaftAddress(), raftMessages.stream().map( this::storeIdify ).collect( toList() ) );
}

private StoreIdAwareMessage<CoreMember> storeIdify( RaftMessage<CoreMember> m )
{
@SuppressWarnings("unchecked")
StoreIdAwareMessage<CoreMember>[] storeIdAwareMessages = Arrays.stream( messages ).
map( m -> new StoreIdAwareMessage<>( localDatabase.storeId(), m ) ).
toArray( StoreIdAwareMessage[]::new );
outbound.send( to.getRaftAddress(), storeIdAwareMessages );
return new StoreIdAwareMessage<>( localDatabase.storeId(), m );
}
}
Expand Up @@ -135,6 +135,4 @@ private void writeMember( CoreMember member, WritableChannel buffer ) throws IOE
marshal.marshal( member.getCoreAddress(), buffer );
marshal.marshal( member.getRaftAddress(), buffer );
}


}
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.coreedge.server;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -71,7 +72,7 @@ public SenderService( ChannelInitializer<SocketChannel> channelInitializer,
}

@Override
public void send( AdvertisedSocketAddress to, Message... messages )
public void send( AdvertisedSocketAddress to, Message message )
{
serviceLock.readLock().lock();
try
Expand All @@ -81,27 +82,37 @@ public void send( AdvertisedSocketAddress to, Message... messages )
return;
}

MessageQueueMonitor monitor = monitors.newMonitor( MessageQueueMonitor.class, NonBlockingChannel.class );
NonBlockingChannel nonBlockingChannel = getAndUpdateLife( to, monitor );
monitor.register( to.socketAddress() );
for ( Object msg : messages )
{
nonBlockingChannel.send( msg );
}
channel( to ).send( message );
}
finally
{
serviceLock.readLock().unlock();
}
}

public int activeChannelCount()
@Override
public void send( AdvertisedSocketAddress to, Collection<Message> messages )
{
return nonBlockingChannels.size();
serviceLock.readLock().lock();
try
{
if ( !senderServiceRunning )
{
return;
}

NonBlockingChannel channel = channel( to );
messages.forEach( channel::send );
}
finally
{
serviceLock.readLock().unlock();
}
}

private NonBlockingChannel getAndUpdateLife( AdvertisedSocketAddress to, MessageQueueMonitor monitor )
private NonBlockingChannel channel( AdvertisedSocketAddress to )
{
MessageQueueMonitor monitor = monitors.newMonitor( MessageQueueMonitor.class, NonBlockingChannel.class );
NonBlockingChannel nonBlockingChannel = nonBlockingChannels.get( to );

if ( nonBlockingChannel == null )
Expand All @@ -116,6 +127,7 @@ private NonBlockingChannel getAndUpdateLife( AdvertisedSocketAddress to, Message
}
}

monitor.register( to.socketAddress() );
return nonBlockingChannel;
}

Expand Down Expand Up @@ -175,5 +187,4 @@ public synchronized void stop()
serviceLock.writeLock().unlock();
}
}

}
Expand Up @@ -20,6 +20,8 @@
package org.neo4j.coreedge.raft;

import org.neo4j.coreedge.network.Message;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -75,7 +77,8 @@ public void reconnect( long id )
disconnectedMembers.remove( id );
}

public class Outbound implements org.neo4j.coreedge.raft.net.Outbound<RaftTestMember,RaftMessages.RaftMessage<RaftTestMember>>
public class Outbound implements
org.neo4j.coreedge.raft.net.Outbound<RaftTestMember, RaftMessages.RaftMessage<RaftTestMember>>
{
private final long me;

Expand All @@ -85,20 +88,30 @@ public Outbound( long me )
}

@Override
public synchronized void send( RaftTestMember to, final RaftMessages.RaftMessage<RaftTestMember>... messages )
public void send( RaftTestMember to, RaftMessages.RaftMessage<RaftTestMember> message )
{
if ( !messageQueues.containsKey( to.getId() ) ||
disconnectedMembers.contains( to.getId() ) ||
disconnectedMembers.contains( me ) )

if ( canDeliver( to ) )
{
return;
messageQueues.get( to.getId() ).add( message );
}
}

for ( Message message : messages )
@Override
public void send( RaftTestMember to, Collection<RaftMessages.RaftMessage<RaftTestMember>> messages )
{
if ( canDeliver( to ) )
{
messageQueues.get( to.getId() ).add( message );
messageQueues.get( to.getId() ).addAll( messages );
}
}

private boolean canDeliver( RaftTestMember to )
{
return messageQueues.containsKey( to.getId() ) &&
!disconnectedMembers.contains( to.getId() ) &&
!disconnectedMembers.contains( me );
}
}

public class Inbound implements org.neo4j.coreedge.raft.net.Inbound
Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -33,29 +34,39 @@

public class OutboundMessageCollector implements Outbound<RaftTestMember, RaftMessages.RaftMessage<RaftTestMember>>
{
Map<RaftTestMember, List<Message>> sentMessages = new HashMap<>();
Map<RaftTestMember, List<RaftMessages.RaftMessage<RaftTestMember>>> sentMessages = new HashMap<>();

public void clear()
{
sentMessages.clear();
}

@Override
public void send( RaftTestMember to, RaftMessages.RaftMessage<RaftTestMember>... messages )
public void send( RaftTestMember to, RaftMessages.RaftMessage<RaftTestMember> message )
{
List<Message> messagesToMember = sentMessages.get( to );
raftMessages( to ).add( message );
}

@Override
public void send( RaftTestMember to, Collection<RaftMessages.RaftMessage<RaftTestMember>> messages )
{
raftMessages( to ).addAll( messages );
}

private List<RaftMessages.RaftMessage<RaftTestMember>> raftMessages( RaftTestMember to )
{
List<RaftMessages.RaftMessage<RaftTestMember>> messagesToMember = sentMessages.get( to );
if ( messagesToMember == null )
{
messagesToMember = new ArrayList<>();
sentMessages.put( to, messagesToMember );
}

Collections.addAll( messagesToMember, messages );
return messagesToMember;
}

public List<Message> sentTo( RaftTestMember member )
public List<RaftMessages.RaftMessage<RaftTestMember>> sentTo( RaftTestMember member )
{
List<Message> messages = sentMessages.get( member );
List<RaftMessages.RaftMessage<RaftTestMember>> messages = sentMessages.get( member );

if ( messages == null )
{
Expand All @@ -67,7 +78,7 @@ public List<Message> sentTo( RaftTestMember member )

public boolean hasAnyEntriesTo( RaftTestMember member )
{
List<Message> messages = sentMessages.get( member );
List<RaftMessages.RaftMessage<RaftTestMember>> messages = sentMessages.get( member );
return messages != null && messages.size() != 0;
}

Expand All @@ -79,10 +90,7 @@ public boolean hasEntriesTo( RaftTestMember member, RaftLogEntry... expectedMess
{
if ( message instanceof RaftMessages.AppendEntries.Request )
{
for ( RaftLogEntry actualEntry : ((RaftMessages.AppendEntries.Request) message).entries() )
{
actualMessages.add( actualEntry );
}
Collections.addAll( actualMessages, ((RaftMessages.AppendEntries.Request) message).entries() );
}
}

Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.coreedge.raft;

import java.time.Clock;
import java.util.Collection;
import java.util.function.Supplier;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
Expand Down Expand Up @@ -61,7 +62,19 @@ public class RaftInstanceBuilder<MEMBER>
NullLogProvider.getInstance() );

private Inbound<RaftMessages.RaftMessage<MEMBER>> inbound = handler -> {};
private Outbound<MEMBER, RaftMessages.RaftMessage<MEMBER>> outbound = ( advertisedSocketAddress, messages ) -> {};
private Outbound<MEMBER, RaftMessages.RaftMessage<MEMBER>> outbound =
new Outbound<MEMBER, RaftMessages.RaftMessage<MEMBER>>()
{
@Override
public void send( MEMBER to, RaftMessages.RaftMessage<MEMBER> message )
{
}

@Override
public void send( MEMBER to, Collection<RaftMessages.RaftMessage<MEMBER>> raftMessages )
{
}
};

private LogProvider logProvider = NullLogProvider.getInstance();
private Clock clock = Clock.systemUTC();
Expand Down

0 comments on commit af3abdb

Please sign in to comment.