Skip to content

Commit

Permalink
Add dispatcher that delegates to the correct decoder in CoreClients
Browse files Browse the repository at this point in the history
This will avoid unnecessary buffer copies.

Also introduce a FileContent objects that wraps the ByteBuf to be use
in FileContentHandler in order to add FileContentDecoder and make it
more compatible with the other Decoder/Handler implementations.
  • Loading branch information
davidegrohmann committed Aug 16, 2016
1 parent fc67fa1 commit 53ef8b1
Show file tree
Hide file tree
Showing 27 changed files with 260 additions and 267 deletions.
Expand Up @@ -19,18 +19,20 @@
*/
package org.neo4j.coreedge.catchup;

public class CatchupClientProtocol
public class CatchupClientProtocol implements Protocol<CatchupClientProtocol.NextMessage>
{
private NextMessage nextMessage = NextMessage.MESSAGE_TYPE;

@Override
public void expect( NextMessage nextMessage )
{
this.nextMessage = nextMessage;
}

public boolean isExpecting( NextMessage message )
@Override
public NextMessage expecting()
{
return this.nextMessage == message;
return nextMessage;
}

public enum NextMessage
Expand Down
Expand Up @@ -152,7 +152,8 @@ protected void initChannel( SocketChannel ch ) throws Exception

private ChannelInboundHandler decoders( CatchupServerProtocol protocol )
{
RequestDecoderDispatcher decoderDispatcher = new RequestDecoderDispatcher( protocol, logProvider );
RequestDecoderDispatcher<CatchupServerProtocol.NextMessage> decoderDispatcher =
new RequestDecoderDispatcher<>( protocol, logProvider );
decoderDispatcher.register( NextMessage.TX_PULL, new TxPullRequestDecoder() );
decoderDispatcher.register( NextMessage.GET_STORE, new SimpleRequestDecoder( GetStoreRequest::new ) );
decoderDispatcher.register( NextMessage.GET_STORE_ID, new SimpleRequestDecoder( GetStoreIdRequest::new ) );
Expand Down
Expand Up @@ -19,16 +19,18 @@
*/
package org.neo4j.coreedge.catchup;

public class CatchupServerProtocol
public class CatchupServerProtocol implements Protocol<CatchupServerProtocol.NextMessage>
{
private NextMessage nextMessage = NextMessage.MESSAGE_TYPE;

@Override
public void expect( NextMessage nextMessage )
{
this.nextMessage = nextMessage;
}

NextMessage expecting()
@Override
public NextMessage expecting()
{
return nextMessage;
}
Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage;
import static org.neo4j.coreedge.catchup.ResponseMessageType.from;

public class ClientMessageTypeHandler extends ChannelInboundHandlerAdapter
Expand All @@ -44,29 +43,29 @@ public ClientMessageTypeHandler( CatchupClientProtocol protocol, LogProvider log
@Override
public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exception
{
if ( protocol.isExpecting( NextMessage.MESSAGE_TYPE ) )
if ( CatchupClientProtocol.NextMessage.MESSAGE_TYPE.equals( protocol.expecting() ) )
{
ResponseMessageType responseMessageType = from( ((ByteBuf) msg).readByte() );

switch ( responseMessageType )
{
case STORE_ID:
protocol.expect( NextMessage.STORE_ID );
protocol.expect( CatchupClientProtocol.NextMessage.STORE_ID );
break;
case TX:
protocol.expect( NextMessage.TX_PULL_RESPONSE );
protocol.expect( CatchupClientProtocol.NextMessage.TX_PULL_RESPONSE );
break;
case FILE:
protocol.expect( NextMessage.FILE_HEADER );
protocol.expect( CatchupClientProtocol.NextMessage.FILE_HEADER );
break;
case STORE_COPY_FINISHED:
protocol.expect( NextMessage.STORE_COPY_FINISHED );
protocol.expect( CatchupClientProtocol.NextMessage.STORE_COPY_FINISHED );
break;
case CORE_SNAPSHOT:
protocol.expect( NextMessage.CORE_SNAPSHOT );
protocol.expect( CatchupClientProtocol.NextMessage.CORE_SNAPSHOT );
break;
case TX_STREAM_FINISHED:
protocol.expect( NextMessage.TX_STREAM_FINISHED );
protocol.expect( CatchupClientProtocol.NextMessage.TX_STREAM_FINISHED );
break;
default:
log.warn( "No handler found for message type %s", responseMessageType );
Expand Down
Expand Up @@ -19,24 +19,33 @@
*/
package org.neo4j.coreedge.catchup;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage;
import org.neo4j.coreedge.catchup.storecopy.FileContentDecoder;
import org.neo4j.coreedge.catchup.storecopy.FileHeaderDecoder;
import org.neo4j.coreedge.catchup.storecopy.GetStoreIdRequest;
import org.neo4j.coreedge.catchup.storecopy.GetStoreIdResponseDecoder;
import org.neo4j.coreedge.catchup.storecopy.GetStoreRequest;
import org.neo4j.coreedge.catchup.storecopy.StoreCopyFinishedResponseDecoder;
import org.neo4j.coreedge.catchup.storecopy.StoreFileReceiver;
import org.neo4j.coreedge.catchup.storecopy.StoreFileStreamingCompleteListener;
import org.neo4j.coreedge.catchup.storecopy.StoreFileStreams;
import org.neo4j.coreedge.catchup.storecopy.StoreIdReceiver;
import org.neo4j.coreedge.catchup.tx.PullRequestMonitor;
import org.neo4j.coreedge.catchup.tx.TxPullRequest;
import org.neo4j.coreedge.catchup.tx.TxPullResponse;
import org.neo4j.coreedge.catchup.tx.TxPullResponseDecoder;
import org.neo4j.coreedge.catchup.tx.TxPullResponseListener;
import org.neo4j.coreedge.catchup.tx.TxStreamCompleteListener;
import org.neo4j.coreedge.catchup.tx.TxStreamFinishedResponseDecoder;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshot;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotDecoder;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotListener;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotRequest;
import org.neo4j.coreedge.discovery.TopologyService;
Expand All @@ -55,26 +64,26 @@
import static java.util.Arrays.asList;

public abstract class CoreClient extends LifecycleAdapter implements StoreFileReceiver, StoreIdReceiver,
StoreFileStreamingCompleteListener,
TxStreamCompleteListener, TxPullResponseListener,
CoreSnapshotListener
StoreFileStreamingCompleteListener, TxStreamCompleteListener, TxPullResponseListener, CoreSnapshotListener
{
private final PullRequestMonitor pullRequestMonitor;
private final LogProvider logProvider;
private final SenderService senderService;
private StoreFileStreams storeFileStreams;
private Consumer<StoreId> storeIdConsumer;
private final Outbound<MemberId,Message> outbound;
private final PullRequestMonitor pullRequestMonitor;

private final Listeners<StoreFileStreamingCompleteListener> storeFileStreamingCompleteListeners = new Listeners<>();
private final Listeners<TxStreamCompleteListener> txStreamCompleteListeners = new Listeners<>();
private final Listeners<TxPullResponseListener> txPullResponseListeners = new Listeners<>();
private CompletableFuture<CoreSnapshot> coreSnapshotFuture;

private Outbound<MemberId, Message> outbound;
private StoreFileStreams storeFileStreams;
private Consumer<StoreId> storeIdConsumer;
private CompletableFuture<CoreSnapshot> coreSnapshotFuture;

public CoreClient( LogProvider logProvider, ChannelInitializer<SocketChannel> channelInitializer, Monitors monitors,
int maxQueueSize, NonBlockingChannels nonBlockingChannels, TopologyService discoveryService,
long logThresholdMillis )
int maxQueueSize, NonBlockingChannels nonBlockingChannels, TopologyService discoveryService, long logThresholdMillis )
{
senderService =
this.logProvider = logProvider;
this.senderService =
new SenderService( channelInitializer, logProvider, monitors, maxQueueSize, nonBlockingChannels );
this.outbound = new CoreOutbound( discoveryService, senderService, logProvider, logThresholdMillis );
this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class );
Expand Down Expand Up @@ -163,8 +172,7 @@ public void setStoreIdConsumer( Consumer<StoreId> storeIdConsumer )
@Override
public void onFileStreamingComplete( long lastCommittedTxBeforeStoreCopy )
{
storeFileStreamingCompleteListeners.notify(
listener -> listener.onFileStreamingComplete( lastCommittedTxBeforeStoreCopy ) );
storeFileStreamingCompleteListeners.notify( listener -> listener.onFileStreamingComplete( lastCommittedTxBeforeStoreCopy ) );
}

@Override
Expand Down Expand Up @@ -200,4 +208,18 @@ public void removeTxStreamCompleteListener( TxStreamCompleteListener listener )
{
txStreamCompleteListeners.remove( listener );
}

protected ChannelInboundHandler decoders( CatchupClientProtocol protocol )
{
RequestDecoderDispatcher<NextMessage> decoderDispatcher =
new RequestDecoderDispatcher<>( protocol, logProvider );
decoderDispatcher.register( NextMessage.STORE_ID, new GetStoreIdResponseDecoder() );
decoderDispatcher.register( NextMessage.TX_PULL_RESPONSE, new TxPullResponseDecoder() );
decoderDispatcher.register( NextMessage.CORE_SNAPSHOT, new CoreSnapshotDecoder() );
decoderDispatcher.register( NextMessage.STORE_COPY_FINISHED, new StoreCopyFinishedResponseDecoder() );
decoderDispatcher.register( NextMessage.TX_STREAM_FINISHED, new TxStreamFinishedResponseDecoder() );
decoderDispatcher.register( NextMessage.FILE_HEADER, new FileHeaderDecoder() );
decoderDispatcher.register( NextMessage.FILE_CONTENTS, new FileContentDecoder() );
return decoderDispatcher;
}
}
Expand Up @@ -28,23 +28,18 @@
import java.util.concurrent.TimeUnit;

import org.neo4j.coreedge.catchup.storecopy.FileContentHandler;
import org.neo4j.coreedge.catchup.storecopy.FileHeaderDecoder;
import org.neo4j.coreedge.catchup.storecopy.FileHeaderHandler;
import org.neo4j.coreedge.catchup.storecopy.GetStoreRequestEncoder;
import org.neo4j.coreedge.catchup.storecopy.StoreCopyFinishedResponseDecoder;
import org.neo4j.coreedge.catchup.storecopy.StoreCopyFinishedResponseHandler;
import org.neo4j.coreedge.catchup.tx.TxPullRequestEncoder;
import org.neo4j.coreedge.catchup.tx.TxPullResponseDecoder;
import org.neo4j.coreedge.catchup.tx.TxPullResponseHandler;
import org.neo4j.coreedge.catchup.tx.TxStreamFinishedResponseDecoder;
import org.neo4j.coreedge.catchup.tx.TxStreamFinishedResponseHandler;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotDecoder;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotRequestEncoder;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotResponseHandler;
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.logging.ExceptionLoggingHandler;
import org.neo4j.coreedge.messaging.IdleChannelReaperHandler;
import org.neo4j.coreedge.messaging.NonBlockingChannels;
import org.neo4j.coreedge.logging.ExceptionLoggingHandler;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;

Expand Down Expand Up @@ -92,18 +87,12 @@ protected void initChannel( SocketChannel ch ) throws Exception

pipeline.addLast( new ClientMessageTypeHandler( protocol, logProvider ) );

pipeline.addLast( new TxPullResponseDecoder( protocol ) );
pipeline.addLast( new CoreSnapshotDecoder( protocol ) );
pipeline.addLast( new StoreCopyFinishedResponseDecoder( protocol ) );
pipeline.addLast( new TxStreamFinishedResponseDecoder( protocol ) );
pipeline.addLast( new FileHeaderDecoder( protocol ) );
pipeline.addLast( owner.decoders( protocol ) );

pipeline.addLast( new TxPullResponseHandler( protocol, owner ) );
pipeline.addLast( new CoreSnapshotResponseHandler( protocol, owner ) );
pipeline.addLast( new StoreCopyFinishedResponseHandler( protocol, owner ) );
pipeline.addLast( new TxStreamFinishedResponseHandler( protocol, owner ) );

// keep these after type-specific handlers since they process ByteBufs
pipeline.addLast( new FileHeaderHandler( protocol, logProvider ) );
pipeline.addLast( new FileContentHandler( protocol, owner ) );

Expand Down
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup;

public interface Protocol<E extends Enum<E>>
{
void expect( E next );

E expecting();
}
Expand Up @@ -26,17 +26,16 @@
import java.util.HashMap;
import java.util.Map;

import org.neo4j.coreedge.catchup.CatchupServerProtocol.NextMessage;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class RequestDecoderDispatcher extends ChannelInboundHandlerAdapter
class RequestDecoderDispatcher<E extends Enum<E>> extends ChannelInboundHandlerAdapter
{
private final Map<NextMessage, ChannelInboundHandler> decoders = new HashMap<>();
private final CatchupServerProtocol protocol;
private final Map<E, ChannelInboundHandler> decoders = new HashMap<>();
private final Protocol<E> protocol;
private final Log log;

public RequestDecoderDispatcher( CatchupServerProtocol protocol, LogProvider logProvider )
RequestDecoderDispatcher( Protocol<E> protocol, LogProvider logProvider )
{
this.protocol = protocol;
this.log = logProvider.getLog( getClass() );
Expand All @@ -45,18 +44,17 @@ public RequestDecoderDispatcher( CatchupServerProtocol protocol, LogProvider log
@Override
public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exception
{
NextMessage expecting = protocol.expecting();
E expecting = protocol.expecting();
ChannelInboundHandler delegate = decoders.get( expecting );
if ( delegate == null )
{
log.warn( "Unknown message %s", expecting );
return;
}

delegate.channelRead( ctx, msg );
}

public void register( NextMessage type, ChannelInboundHandler decoder )
public void register( E type, ChannelInboundHandler decoder )
{
assert !decoders.containsKey( type ) : "registering twice a decoder for the same type?";
decoders.put( type, decoder );
Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static org.neo4j.coreedge.catchup.CatchupServerProtocol.NextMessage;

class ServerMessageTypeHandler extends ChannelInboundHandlerAdapter
{
Expand All @@ -43,25 +42,25 @@ class ServerMessageTypeHandler extends ChannelInboundHandlerAdapter
@Override
public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exception
{
if ( protocol.expecting().equals( NextMessage.MESSAGE_TYPE ) )
if ( CatchupServerProtocol.NextMessage.MESSAGE_TYPE.equals( protocol.expecting() ) )
{
RequestMessageType requestMessageType = RequestMessageType.from( ((ByteBuf) msg).readByte() );

if ( requestMessageType.equals( RequestMessageType.TX_PULL_REQUEST ) )
{
protocol.expect( NextMessage.TX_PULL );
protocol.expect( CatchupServerProtocol.NextMessage.TX_PULL );
}
else if ( requestMessageType.equals( RequestMessageType.STORE ) )
{
protocol.expect( NextMessage.GET_STORE );
protocol.expect( CatchupServerProtocol.NextMessage.GET_STORE );
}
else if ( requestMessageType.equals( RequestMessageType.STORE_ID ) )
{
protocol.expect( NextMessage.GET_STORE_ID );
protocol.expect( CatchupServerProtocol.NextMessage.GET_STORE_ID );
}
else if ( requestMessageType.equals( RequestMessageType.RAFT_STATE ) )
{
protocol.expect( NextMessage.GET_RAFT_STATE );
protocol.expect( CatchupServerProtocol.NextMessage.GET_RAFT_STATE );
}
else
{
Expand Down
Expand Up @@ -29,18 +29,16 @@

import org.neo4j.coreedge.catchup.CatchupClientProtocol;
import org.neo4j.coreedge.catchup.ClientMessageTypeHandler;
import org.neo4j.coreedge.catchup.CoreClient;
import org.neo4j.coreedge.catchup.RequestMessageTypeEncoder;
import org.neo4j.coreedge.catchup.ResponseMessageTypeEncoder;
import org.neo4j.coreedge.catchup.CoreClient;
import org.neo4j.coreedge.catchup.tx.TxPullRequestEncoder;
import org.neo4j.coreedge.catchup.tx.TxPullResponseDecoder;
import org.neo4j.coreedge.catchup.tx.TxPullResponseHandler;
import org.neo4j.coreedge.catchup.tx.TxStreamFinishedResponseDecoder;
import org.neo4j.coreedge.catchup.tx.TxStreamFinishedResponseHandler;
import org.neo4j.coreedge.discovery.TopologyService;
import org.neo4j.coreedge.logging.ExceptionLoggingHandler;
import org.neo4j.coreedge.messaging.IdleChannelReaperHandler;
import org.neo4j.coreedge.messaging.NonBlockingChannels;
import org.neo4j.coreedge.logging.ExceptionLoggingHandler;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;

Expand Down Expand Up @@ -88,18 +86,12 @@ protected void initChannel( SocketChannel ch ) throws Exception

pipeline.addLast( new ClientMessageTypeHandler( protocol, logProvider ) );

pipeline.addLast( new GetStoreIdResponseDecoder( protocol ) );
pipeline.addLast( new TxPullResponseDecoder( protocol ) );
pipeline.addLast( new StoreCopyFinishedResponseDecoder( protocol ) );
pipeline.addLast( new TxStreamFinishedResponseDecoder( protocol ) );
pipeline.addLast( new FileHeaderDecoder( protocol ) );
pipeline.addLast( owner.decoders( protocol ) );

pipeline.addLast( new GetStoreIdResponseHandler( protocol, owner ) );
pipeline.addLast( new TxPullResponseHandler( protocol, owner ) );
pipeline.addLast( new StoreCopyFinishedResponseHandler( protocol, owner ) );
pipeline.addLast( new TxStreamFinishedResponseHandler( protocol, owner ) );

// keep these after type-specific handlers since they process ByteBufs
pipeline.addLast( new FileHeaderHandler( protocol, logProvider ) );
pipeline.addLast( new FileContentHandler( protocol, owner ) );

Expand Down

0 comments on commit 53ef8b1

Please sign in to comment.