Skip to content

Commit

Permalink
Refactor Protocol interface to solve code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed Aug 16, 2016
1 parent 24874ec commit 2b67141
Show file tree
Hide file tree
Showing 21 changed files with 97 additions and 81 deletions.
Expand Up @@ -19,14 +19,14 @@
*/
package org.neo4j.coreedge.catchup;

public class CatchupClientProtocol extends Protocol<CatchupClientProtocol.NextMessage>
public class CatchupClientProtocol extends Protocol<CatchupClientProtocol.State>
{
public CatchupClientProtocol()
{
super( NextMessage.MESSAGE_TYPE );
super( State.MESSAGE_TYPE );
}

public enum NextMessage
public enum State
{
MESSAGE_TYPE,
STORE_ID,
Expand Down
Expand Up @@ -35,7 +35,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.neo4j.coreedge.catchup.CatchupServerProtocol.NextMessage;
import org.neo4j.coreedge.catchup.CatchupServerProtocol.State;
import org.neo4j.coreedge.catchup.storecopy.FileHeaderEncoder;
import org.neo4j.coreedge.catchup.storecopy.GetStoreIdRequest;
import org.neo4j.coreedge.catchup.storecopy.GetStoreIdRequestHandler;
Expand Down Expand Up @@ -152,12 +152,12 @@ protected void initChannel( SocketChannel ch ) throws Exception

private ChannelInboundHandler decoders( CatchupServerProtocol protocol )
{
RequestDecoderDispatcher<CatchupServerProtocol.NextMessage> decoderDispatcher =
RequestDecoderDispatcher<State> decoderDispatcher =
new RequestDecoderDispatcher<>( protocol, logProvider );
decoderDispatcher.register( NextMessage.TX_PULL, new TxPullRequestDecoder() );
decoderDispatcher.register( NextMessage.GET_STORE, new SimpleRequestDecoder( GetStoreRequest::new ) );
decoderDispatcher.register( NextMessage.GET_STORE_ID, new SimpleRequestDecoder( GetStoreIdRequest::new ) );
decoderDispatcher.register( NextMessage.GET_RAFT_STATE, new SimpleRequestDecoder( CoreSnapshotRequest::new) );
decoderDispatcher.register( State.TX_PULL, new TxPullRequestDecoder() );
decoderDispatcher.register( State.GET_STORE, new SimpleRequestDecoder( GetStoreRequest::new ) );
decoderDispatcher.register( State.GET_STORE_ID, new SimpleRequestDecoder( GetStoreIdRequest::new ) );
decoderDispatcher.register( State.GET_RAFT_STATE, new SimpleRequestDecoder( CoreSnapshotRequest::new) );
return decoderDispatcher;
}

Expand Down
Expand Up @@ -19,14 +19,14 @@
*/
package org.neo4j.coreedge.catchup;

public class CatchupServerProtocol extends Protocol<CatchupServerProtocol.NextMessage>
public class CatchupServerProtocol extends Protocol<CatchupServerProtocol.State>
{
public CatchupServerProtocol()
{
super( NextMessage.MESSAGE_TYPE );
super( State.MESSAGE_TYPE );
}

public enum NextMessage
public enum State
{
MESSAGE_TYPE, GET_STORE, GET_STORE_ID, GET_RAFT_STATE, TX_PULL
}
Expand Down
Expand Up @@ -43,29 +43,29 @@ public ClientMessageTypeHandler( CatchupClientProtocol protocol, LogProvider log
@Override
public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exception
{
if ( CatchupClientProtocol.NextMessage.MESSAGE_TYPE.equals( protocol.expecting() ) )
if ( protocol.isExpecting( CatchupClientProtocol.State.MESSAGE_TYPE ) )
{
ResponseMessageType responseMessageType = from( ((ByteBuf) msg).readByte() );

switch ( responseMessageType )
{
case STORE_ID:
protocol.expect( CatchupClientProtocol.NextMessage.STORE_ID );
protocol.expect( CatchupClientProtocol.State.STORE_ID );
break;
case TX:
protocol.expect( CatchupClientProtocol.NextMessage.TX_PULL_RESPONSE );
protocol.expect( CatchupClientProtocol.State.TX_PULL_RESPONSE );
break;
case FILE:
protocol.expect( CatchupClientProtocol.NextMessage.FILE_HEADER );
protocol.expect( CatchupClientProtocol.State.FILE_HEADER );
break;
case STORE_COPY_FINISHED:
protocol.expect( CatchupClientProtocol.NextMessage.STORE_COPY_FINISHED );
protocol.expect( CatchupClientProtocol.State.STORE_COPY_FINISHED );
break;
case CORE_SNAPSHOT:
protocol.expect( CatchupClientProtocol.NextMessage.CORE_SNAPSHOT );
protocol.expect( CatchupClientProtocol.State.CORE_SNAPSHOT );
break;
case TX_STREAM_FINISHED:
protocol.expect( CatchupClientProtocol.NextMessage.TX_STREAM_FINISHED );
protocol.expect( CatchupClientProtocol.State.TX_STREAM_FINISHED );
break;
default:
log.warn( "No handler found for message type %s", responseMessageType );
Expand Down
Expand Up @@ -26,7 +26,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage;
import org.neo4j.coreedge.catchup.CatchupClientProtocol.State;
import org.neo4j.coreedge.catchup.storecopy.FileContentDecoder;
import org.neo4j.coreedge.catchup.storecopy.FileHeaderDecoder;
import org.neo4j.coreedge.catchup.storecopy.GetStoreIdRequest;
Expand Down Expand Up @@ -211,15 +211,15 @@ public void removeTxStreamCompleteListener( TxStreamCompleteListener listener )

protected ChannelInboundHandler decoders( CatchupClientProtocol protocol )
{
RequestDecoderDispatcher<NextMessage> decoderDispatcher =
RequestDecoderDispatcher<State> decoderDispatcher =
new RequestDecoderDispatcher<>( protocol, logProvider );
decoderDispatcher.register( NextMessage.STORE_ID, new GetStoreIdResponseDecoder() );
decoderDispatcher.register( NextMessage.TX_PULL_RESPONSE, new TxPullResponseDecoder() );
decoderDispatcher.register( NextMessage.CORE_SNAPSHOT, new CoreSnapshotDecoder() );
decoderDispatcher.register( NextMessage.STORE_COPY_FINISHED, new StoreCopyFinishedResponseDecoder() );
decoderDispatcher.register( NextMessage.TX_STREAM_FINISHED, new TxStreamFinishedResponseDecoder() );
decoderDispatcher.register( NextMessage.FILE_HEADER, new FileHeaderDecoder() );
decoderDispatcher.register( NextMessage.FILE_CONTENTS, new FileContentDecoder() );
decoderDispatcher.register( State.STORE_ID, new GetStoreIdResponseDecoder() );
decoderDispatcher.register( State.TX_PULL_RESPONSE, new TxPullResponseDecoder() );
decoderDispatcher.register( State.CORE_SNAPSHOT, new CoreSnapshotDecoder() );
decoderDispatcher.register( State.STORE_COPY_FINISHED, new StoreCopyFinishedResponseDecoder() );
decoderDispatcher.register( State.TX_STREAM_FINISHED, new TxStreamFinishedResponseDecoder() );
decoderDispatcher.register( State.FILE_HEADER, new FileHeaderDecoder() );
decoderDispatcher.register( State.FILE_CONTENTS, new FileContentDecoder() );
return decoderDispatcher;
}
}
Expand Up @@ -19,22 +19,35 @@
*/
package org.neo4j.coreedge.catchup;

import java.util.Map;

public abstract class Protocol<E extends Enum<E>>
{
private E next;
private E state;

protected Protocol( E initialValue )
{
this.state = initialValue;
}

public void expect( E state )
{
this.state = state;
}

Protocol( E initialValue )
public boolean isExpecting( E state )
{
this.next = initialValue;
return this.state == state;
}

public void expect( E next )
public <T> T select( Map<E,T> map )
{
this.next = next;
return map.get( state );
}

public E expecting()
@Override
public String toString()
{
return next;
return getClass().getSimpleName() + "{" + "state=" + state + '}';
}
}
Expand Up @@ -44,19 +44,18 @@ class RequestDecoderDispatcher<E extends Enum<E>> extends ChannelInboundHandlerA
@Override
public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exception
{
E expecting = protocol.expecting();
ChannelInboundHandler delegate = decoders.get( expecting );
ChannelInboundHandler delegate = protocol.select( decoders );
if ( delegate == null )
{
log.warn( "Unregistered handler for message type %s", expecting );
log.warn( "Unregistered handler for protocol %s", protocol );
return;
}
delegate.channelRead( ctx, msg );
}

public void register( E type, ChannelInboundHandler decoder )
{
assert !decoders.containsKey( type ) : "registering twice a decoder for the same type?";
assert !decoders.containsKey( type ) : "registering twice a decoder for the same type (" + type + ")?";
decoders.put( type, decoder );
}
}
Expand Up @@ -42,25 +42,25 @@ class ServerMessageTypeHandler extends ChannelInboundHandlerAdapter
@Override
public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exception
{
if ( CatchupServerProtocol.NextMessage.MESSAGE_TYPE.equals( protocol.expecting() ) )
if ( protocol.isExpecting( CatchupServerProtocol.State.MESSAGE_TYPE ) )
{
RequestMessageType requestMessageType = RequestMessageType.from( ((ByteBuf) msg).readByte() );

if ( requestMessageType.equals( RequestMessageType.TX_PULL_REQUEST ) )
{
protocol.expect( CatchupServerProtocol.NextMessage.TX_PULL );
protocol.expect( CatchupServerProtocol.State.TX_PULL );
}
else if ( requestMessageType.equals( RequestMessageType.STORE ) )
{
protocol.expect( CatchupServerProtocol.NextMessage.GET_STORE );
protocol.expect( CatchupServerProtocol.State.GET_STORE );
}
else if ( requestMessageType.equals( RequestMessageType.STORE_ID ) )
{
protocol.expect( CatchupServerProtocol.NextMessage.GET_STORE_ID );
protocol.expect( CatchupServerProtocol.State.GET_STORE_ID );
}
else if ( requestMessageType.equals( RequestMessageType.RAFT_STATE ) )
{
protocol.expect( CatchupServerProtocol.NextMessage.GET_RAFT_STATE );
protocol.expect( CatchupServerProtocol.State.GET_RAFT_STATE );
}
else
{
Expand Down
Expand Up @@ -57,7 +57,7 @@ protected void channelRead0( ChannelHandlerContext ctx, FileContent fileContent

if ( expectedBytes <= 0 )
{
protocol.expect( CatchupClientProtocol.NextMessage.MESSAGE_TYPE );
protocol.expect( CatchupClientProtocol.State.MESSAGE_TYPE );
}
}
}
Expand Up @@ -26,7 +26,7 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage;
import static org.neo4j.coreedge.catchup.CatchupClientProtocol.State;

public class FileHeaderHandler extends SimpleChannelInboundHandler<FileHeader>
{
Expand All @@ -44,6 +44,6 @@ protected void channelRead0( ChannelHandlerContext ctx, FileHeader msg ) throws
{
log.info( "Receiving file: %s (%d bytes)", msg.fileName(), msg.fileLength() );
ctx.pipeline().get( FileContentHandler.class ).setExpectedFile( msg );
protocol.expect( NextMessage.FILE_CONTENTS );
protocol.expect( State.FILE_CONTENTS );
}
}
Expand Up @@ -30,7 +30,7 @@
import org.neo4j.coreedge.messaging.marsalling.storeid.StoreIdMarshal;
import org.neo4j.coreedge.identity.StoreId;

import static org.neo4j.coreedge.catchup.CatchupServerProtocol.NextMessage;
import static org.neo4j.coreedge.catchup.CatchupServerProtocol.State;

public class GetStoreIdRequestHandler extends SimpleChannelInboundHandler<GetStoreIdRequest>
{
Expand All @@ -51,6 +51,6 @@ protected void channelRead0( ChannelHandlerContext ctx, GetStoreIdRequest msg )
NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( ctx.alloc().buffer() );
StoreIdMarshal.marshal( storeId, channel );
ctx.writeAndFlush( channel.buffer() );
protocol.expect( NextMessage.MESSAGE_TYPE );
protocol.expect( State.MESSAGE_TYPE );
}
}
Expand Up @@ -38,10 +38,10 @@ class GetStoreIdResponseHandler extends SimpleChannelInboundHandler<GetStoreIdRe
@Override
protected void channelRead0( ChannelHandlerContext ctx, final GetStoreIdResponse msg ) throws Exception
{
if ( CatchupClientProtocol.NextMessage.STORE_ID.equals( protocol.expecting() ) )
if ( protocol.isExpecting( CatchupClientProtocol.State.STORE_ID ) )
{
storeIdReceiver.onStoreIdReceived( msg.storeId() );
protocol.expect( CatchupClientProtocol.NextMessage.MESSAGE_TYPE );
protocol.expect( CatchupClientProtocol.State.MESSAGE_TYPE );
}
else
{
Expand Down
Expand Up @@ -36,7 +36,7 @@
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo;

import static org.neo4j.coreedge.catchup.CatchupServerProtocol.NextMessage;
import static org.neo4j.coreedge.catchup.CatchupServerProtocol.State;

public class GetStoreRequestHandler extends SimpleChannelInboundHandler<GetStoreRequest>
{
Expand All @@ -60,7 +60,7 @@ protected void channelRead0( ChannelHandlerContext ctx, GetStoreRequest msg ) th
long lastCheckPointedTx = checkPointerSupplier.get().tryCheckPoint( new SimpleTriggerInfo( "Store copy" ) );
sendFiles( ctx );
endStoreCopy( ctx, lastCheckPointedTx );
protocol.expect( NextMessage.MESSAGE_TYPE );
protocol.expect( State.MESSAGE_TYPE );
}

private void sendFiles( ChannelHandlerContext ctx ) throws IOException
Expand Down
Expand Up @@ -24,7 +24,7 @@

import org.neo4j.coreedge.catchup.CatchupClientProtocol;

import static org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage;
import static org.neo4j.coreedge.catchup.CatchupClientProtocol.State;

public class StoreCopyFinishedResponseHandler extends SimpleChannelInboundHandler<StoreCopyFinishedResponse>
{
Expand All @@ -42,6 +42,6 @@ public StoreCopyFinishedResponseHandler( CatchupClientProtocol protocol,
protected void channelRead0( ChannelHandlerContext ctx, final StoreCopyFinishedResponse msg ) throws Exception
{
storeFileStreamingCompleteListener.onFileStreamingComplete( msg.lastCommittedTxBeforeStoreCopy() );
protocol.expect( NextMessage.MESSAGE_TYPE );
protocol.expect( State.MESSAGE_TYPE );
}
}
Expand Up @@ -25,7 +25,7 @@
import java.util.function.Supplier;

import org.neo4j.coreedge.catchup.CatchupServerProtocol;
import org.neo4j.coreedge.catchup.CatchupServerProtocol.NextMessage;
import org.neo4j.coreedge.catchup.CatchupServerProtocol.State;
import org.neo4j.coreedge.catchup.ResponseMessageType;
import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.cursor.IOCursor;
Expand Down Expand Up @@ -100,7 +100,7 @@ else if ( transactionIdStore.getLastCommittedTransactionId() > startTxId )
ctx.flush();

monitor.increment();
protocol.expect( NextMessage.MESSAGE_TYPE );
protocol.expect( State.MESSAGE_TYPE );
}

@Override
Expand Down
Expand Up @@ -39,10 +39,10 @@ public TxPullResponseHandler( CatchupClientProtocol protocol,
@Override
protected void channelRead0( ChannelHandlerContext ctx, final TxPullResponse msg ) throws Exception
{
if ( CatchupClientProtocol.NextMessage.TX_PULL_RESPONSE.equals( protocol.expecting() ) )
if ( protocol.isExpecting( CatchupClientProtocol.State.TX_PULL_RESPONSE ) )
{
listener.onTxReceived( msg );
protocol.expect( CatchupClientProtocol.NextMessage.MESSAGE_TYPE );
protocol.expect( CatchupClientProtocol.State.MESSAGE_TYPE );
}
else
{
Expand Down
Expand Up @@ -24,7 +24,7 @@

import org.neo4j.coreedge.catchup.CatchupClientProtocol;

import static org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage;
import static org.neo4j.coreedge.catchup.CatchupClientProtocol.State;

public class TxStreamFinishedResponseHandler extends SimpleChannelInboundHandler<TxStreamFinishedResponse>
{
Expand All @@ -42,6 +42,6 @@ public TxStreamFinishedResponseHandler( CatchupClientProtocol protocol, TxStream
protected void channelRead0( ChannelHandlerContext ctx, TxStreamFinishedResponse msg ) throws Exception
{
listener.onTxStreamingComplete( msg.lastTransactionIdSent(), msg.isSuccess() );
protocol.expect( NextMessage.MESSAGE_TYPE );
protocol.expect( State.MESSAGE_TYPE );
}
}
Expand Up @@ -28,7 +28,7 @@
import org.neo4j.coreedge.catchup.ResponseMessageType;
import org.neo4j.coreedge.core.state.CoreState;

import static org.neo4j.coreedge.catchup.CatchupServerProtocol.NextMessage;
import static org.neo4j.coreedge.catchup.CatchupServerProtocol.State;

public class CoreSnapshotRequestHandler extends SimpleChannelInboundHandler<CoreSnapshotRequest>
{
Expand All @@ -45,7 +45,7 @@ public CoreSnapshotRequestHandler( CatchupServerProtocol protocol, CoreState cor
protected void channelRead0( ChannelHandlerContext ctx, CoreSnapshotRequest msg ) throws Exception
{
sendStates( ctx, coreState.snapshot() );
protocol.expect( NextMessage.MESSAGE_TYPE );
protocol.expect( State.MESSAGE_TYPE );
}

private void sendStates( ChannelHandlerContext ctx, CoreSnapshot coreSnapshot ) throws IOException
Expand Down
Expand Up @@ -38,10 +38,10 @@ public CoreSnapshotResponseHandler( CatchupClientProtocol protocol, CoreSnapshot
@Override
protected void channelRead0( ChannelHandlerContext ctx, final CoreSnapshot coreSnapshot ) throws Exception
{
if ( CatchupClientProtocol.NextMessage.CORE_SNAPSHOT.equals( protocol.expecting() ) )
if ( protocol.isExpecting( CatchupClientProtocol.State.CORE_SNAPSHOT ) )
{
listener.onSnapshotReceived( coreSnapshot );
protocol.expect( CatchupClientProtocol.NextMessage.MESSAGE_TYPE );
protocol.expect( CatchupClientProtocol.State.MESSAGE_TYPE );
}
else
{
Expand Down

0 comments on commit 2b67141

Please sign in to comment.