Skip to content

Commit

Permalink
Add versioning to all core-edge messages
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed Aug 16, 2016
1 parent 6f48bb9 commit 7d3cc58
Show file tree
Hide file tree
Showing 98 changed files with 1,019 additions and 507 deletions.
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.function.Predicate;

import org.neo4j.coreedge.messaging.Message;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public abstract class VersionCheckerChannelInboundHandler<M extends Message> extends SimpleChannelInboundHandler<M>
{
private final Predicate<Message> versionChecker;
private final Log log;

protected VersionCheckerChannelInboundHandler( Predicate<Message> versionChecker, LogProvider logProvider )
{
this.versionChecker = versionChecker;
this.log = logProvider.getLog( getClass() );
}

@Override
protected final void channelRead0( ChannelHandlerContext ctx, M message ) throws Exception
{
if ( !versionChecker.test( message ) )
{
log.error( "Unsupported version %d, unable to process message %s", message.version(), message );
return;
}

doChannelRead0( ctx, message );
}

protected abstract void doChannelRead0( ChannelHandlerContext ctx, M msg ) throws Exception;
}
Expand Up @@ -33,6 +33,7 @@
import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.stream.ChunkedWriteHandler;


import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier; import java.util.function.Supplier;


import org.neo4j.coreedge.catchup.CatchupServerProtocol.State; import org.neo4j.coreedge.catchup.CatchupServerProtocol.State;
Expand All @@ -52,6 +53,7 @@
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotRequestHandler; import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotRequestHandler;
import org.neo4j.coreedge.identity.StoreId; import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.coreedge.logging.ExceptionLoggingHandler; import org.neo4j.coreedge.logging.ExceptionLoggingHandler;
import org.neo4j.coreedge.messaging.Message;
import org.neo4j.coreedge.messaging.address.ListenSocketAddress; import org.neo4j.coreedge.messaging.address.ListenSocketAddress;
import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.NeoStoreDataSource;
Expand All @@ -63,6 +65,8 @@
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


import static org.neo4j.coreedge.messaging.Message.CURRENT_VERSION;

public class CatchupServer extends LifecycleAdapter public class CatchupServer extends LifecycleAdapter
{ {
private final LogProvider logProvider; private final LogProvider logProvider;
Expand Down Expand Up @@ -135,14 +139,16 @@ protected void initChannel( SocketChannel ch ) throws Exception


pipeline.addLast( decoders( protocol ) ); pipeline.addLast( decoders( protocol ) );


pipeline.addLast( new TxPullRequestHandler( protocol, storeIdSupplier, Predicate<Message> versionChecker = (m) -> m.version() == CURRENT_VERSION;
transactionIdStoreSupplier, logicalTransactionStoreSupplier, pipeline.addLast( new TxPullRequestHandler( versionChecker, protocol, storeIdSupplier,
monitors, logProvider ) ); transactionIdStoreSupplier, logicalTransactionStoreSupplier, monitors, logProvider ) );
pipeline.addLast( new ChunkedWriteHandler() ); pipeline.addLast( new ChunkedWriteHandler() );
pipeline.addLast( new GetStoreRequestHandler( protocol, dataSourceSupplier, pipeline.addLast( new GetStoreRequestHandler( versionChecker, protocol, dataSourceSupplier,
checkPointerSupplier ) ); checkPointerSupplier, logProvider ) );
pipeline.addLast( new GetStoreIdRequestHandler( protocol, storeIdSupplier ) ); pipeline.addLast( new GetStoreIdRequestHandler( versionChecker, protocol, storeIdSupplier,
pipeline.addLast( new CoreSnapshotRequestHandler( protocol, coreState ) ); logProvider ) );
pipeline.addLast(
new CoreSnapshotRequestHandler( versionChecker, protocol, coreState, logProvider ) );
pipeline.addLast( new ExceptionLoggingHandler( log ) ); pipeline.addLast( new ExceptionLoggingHandler( log ) );
} }
} ); } );
Expand Down
Expand Up @@ -45,7 +45,10 @@ public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exceptio
{ {
if ( protocol.isExpecting( CatchupClientProtocol.State.MESSAGE_TYPE ) ) if ( protocol.isExpecting( CatchupClientProtocol.State.MESSAGE_TYPE ) )
{ {
ResponseMessageType responseMessageType = from( ((ByteBuf) msg).readByte() ); ByteBuf buffer = (ByteBuf) msg;
byte version = buffer.readByte();
byte messageType = buffer.readByte();
ResponseMessageType responseMessageType = from( version, messageType );


switch ( responseMessageType ) switch ( responseMessageType )
{ {
Expand All @@ -68,7 +71,7 @@ public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exceptio
protocol.expect( CatchupClientProtocol.State.TX_STREAM_FINISHED ); protocol.expect( CatchupClientProtocol.State.TX_STREAM_FINISHED );
break; break;
default: default:
log.warn( "No handler found for message type %s", responseMessageType ); log.warn( "No handler found for version %d and message type %s", version, responseMessageType );
} }


ReferenceCountUtil.release( msg ); ReferenceCountUtil.release( msg );
Expand Down
Expand Up @@ -62,6 +62,7 @@
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static org.neo4j.coreedge.messaging.Message.CURRENT_VERSION;


public abstract class CoreClient extends LifecycleAdapter implements StoreFileReceiver, StoreIdReceiver, public abstract class CoreClient extends LifecycleAdapter implements StoreFileReceiver, StoreIdReceiver,
StoreFileStreamingCompleteListener, TxStreamCompleteListener, TxPullResponseListener, CoreSnapshotListener StoreFileStreamingCompleteListener, TxStreamCompleteListener, TxPullResponseListener, CoreSnapshotListener
Expand Down Expand Up @@ -91,27 +92,27 @@ public CoreClient( LogProvider logProvider, ChannelInitializer<SocketChannel> ch


public void requestStore( MemberId serverAddress ) public void requestStore( MemberId serverAddress )
{ {
GetStoreRequest getStoreRequest = new GetStoreRequest(); GetStoreRequest getStoreRequest = new GetStoreRequest( CURRENT_VERSION );
send( serverAddress, RequestMessageType.STORE, getStoreRequest ); send( serverAddress, RequestMessageType.STORE, getStoreRequest );
} }


public void requestStoreId( MemberId serverAddress ) public void requestStoreId( MemberId serverAddress )
{ {
GetStoreIdRequest getStoreIdRequest = new GetStoreIdRequest(); GetStoreIdRequest getStoreIdRequest = new GetStoreIdRequest( CURRENT_VERSION );
send( serverAddress, RequestMessageType.STORE_ID, getStoreIdRequest ); send( serverAddress, RequestMessageType.STORE_ID, getStoreIdRequest );
} }


public CompletableFuture<CoreSnapshot> requestCoreSnapshot( MemberId serverAddress ) public CompletableFuture<CoreSnapshot> requestCoreSnapshot( MemberId serverAddress )
{ {
coreSnapshotFuture = new CompletableFuture<>(); coreSnapshotFuture = new CompletableFuture<>();
CoreSnapshotRequest coreSnapshotRequest = new CoreSnapshotRequest(); CoreSnapshotRequest coreSnapshotRequest = new CoreSnapshotRequest( CURRENT_VERSION );
send( serverAddress, RequestMessageType.RAFT_STATE, coreSnapshotRequest ); send( serverAddress, RequestMessageType.RAFT_STATE, coreSnapshotRequest );
return coreSnapshotFuture; return coreSnapshotFuture;
} }


public void pollForTransactions( MemberId serverAddress, StoreId storeId, long lastTransactionId ) public void pollForTransactions( MemberId serverAddress, StoreId storeId, long lastTransactionId )
{ {
TxPullRequest txPullRequest = new TxPullRequest( lastTransactionId, storeId ); TxPullRequest txPullRequest = new TxPullRequest( CURRENT_VERSION, lastTransactionId, storeId );
send( serverAddress, RequestMessageType.TX_PULL_REQUEST, txPullRequest ); send( serverAddress, RequestMessageType.TX_PULL_REQUEST, txPullRequest );
pullRequestMonitor.txPullRequest( lastTransactionId ); pullRequestMonitor.txPullRequest( lastTransactionId );
} }
Expand Down
Expand Up @@ -26,6 +26,7 @@
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;


import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;


import org.neo4j.coreedge.catchup.storecopy.FileContentHandler; import org.neo4j.coreedge.catchup.storecopy.FileContentHandler;
import org.neo4j.coreedge.catchup.storecopy.FileHeaderHandler; import org.neo4j.coreedge.catchup.storecopy.FileHeaderHandler;
Expand All @@ -39,10 +40,13 @@
import org.neo4j.coreedge.discovery.CoreTopologyService; import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.logging.ExceptionLoggingHandler; import org.neo4j.coreedge.logging.ExceptionLoggingHandler;
import org.neo4j.coreedge.messaging.IdleChannelReaperHandler; import org.neo4j.coreedge.messaging.IdleChannelReaperHandler;
import org.neo4j.coreedge.messaging.Message;
import org.neo4j.coreedge.messaging.NonBlockingChannels; import org.neo4j.coreedge.messaging.NonBlockingChannels;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


import static org.neo4j.coreedge.messaging.Message.CURRENT_VERSION;

public class CoreToCoreClient extends CoreClient public class CoreToCoreClient extends CoreClient
{ {
public CoreToCoreClient( LogProvider logProvider, ChannelInitializer channelInitializer, Monitors monitors, public CoreToCoreClient( LogProvider logProvider, ChannelInitializer channelInitializer, Monitors monitors,
Expand Down Expand Up @@ -89,12 +93,13 @@ protected void initChannel( SocketChannel ch ) throws Exception


pipeline.addLast( owner.decoders( protocol ) ); pipeline.addLast( owner.decoders( protocol ) );


pipeline.addLast( new TxPullResponseHandler( protocol, owner ) ); Predicate<Message> versionChecker = (m) -> m.version() == CURRENT_VERSION;
pipeline.addLast( new CoreSnapshotResponseHandler( protocol, owner ) ); pipeline.addLast( new TxPullResponseHandler( versionChecker, protocol, owner, logProvider ) );
pipeline.addLast( new StoreCopyFinishedResponseHandler( protocol, owner ) ); pipeline.addLast( new CoreSnapshotResponseHandler( versionChecker, protocol, owner, logProvider ) );
pipeline.addLast( new TxStreamFinishedResponseHandler( protocol, owner ) ); pipeline.addLast( new StoreCopyFinishedResponseHandler( versionChecker, protocol, owner, logProvider ) );
pipeline.addLast( new FileHeaderHandler( protocol, logProvider ) ); pipeline.addLast( new TxStreamFinishedResponseHandler( versionChecker, protocol, owner, logProvider ) );
pipeline.addLast( new FileContentHandler( protocol, owner ) ); pipeline.addLast( new FileHeaderHandler( versionChecker, protocol, logProvider ) );
pipeline.addLast( new FileContentHandler( versionChecker, protocol, owner, logProvider ) );


pipeline.addLast( new IdleStateHandler( 0, 0, 2, TimeUnit.MINUTES) ); pipeline.addLast( new IdleStateHandler( 0, 0, 2, TimeUnit.MINUTES) );
pipeline.addLast( new IdleChannelReaperHandler(nonBlockingChannels)); pipeline.addLast( new IdleChannelReaperHandler(nonBlockingChannels));
Expand Down
Expand Up @@ -21,35 +21,46 @@


import org.neo4j.coreedge.messaging.Message; import org.neo4j.coreedge.messaging.Message;


import static java.lang.String.format;

public enum RequestMessageType implements Message public enum RequestMessageType implements Message
{ {
TX_PULL_REQUEST( (byte) 1 ), TX_PULL_REQUEST( CURRENT_VERSION, (byte) 1 ),
STORE( (byte) 2 ), STORE( CURRENT_VERSION, (byte) 2 ),
RAFT_STATE( (byte) 3 ), RAFT_STATE( CURRENT_VERSION, (byte) 3 ),
STORE_ID( (byte) 4 ), STORE_ID( CURRENT_VERSION, (byte) 4 ),
UNKNOWN( (byte) 404 ); UNKNOWN( CURRENT_VERSION, (byte) 404 );


private byte version;
private byte messageType; private byte messageType;


RequestMessageType( byte messageType ) RequestMessageType( byte version, byte messageType )
{ {
this.version = version;
this.messageType = messageType; this.messageType = messageType;
} }


public static RequestMessageType from( byte b ) public static RequestMessageType from( byte version, byte messageType )
{ {
if ( version != CURRENT_VERSION )
{
return UNKNOWN;
}

for ( RequestMessageType responseMessageType : values() ) for ( RequestMessageType responseMessageType : values() )
{ {
if ( responseMessageType.messageType == b ) if ( responseMessageType.messageType == messageType )
{ {
return responseMessageType; return responseMessageType;
} }
} }
return UNKNOWN; return UNKNOWN;
} }


@Override
public byte version()
{
return version;
}

public byte messageType() public byte messageType()
{ {
return messageType; return messageType;
Expand All @@ -58,6 +69,6 @@ public byte messageType()
@Override @Override
public String toString() public String toString()
{ {
return format( "RequestMessageType{messageType=%s}", messageType ); return "RequestMessageType{" + "version=" + version + ", messageType=" + messageType + '}';
} }
} }
Expand Up @@ -19,18 +19,19 @@
*/ */
package org.neo4j.coreedge.catchup; package org.neo4j.coreedge.catchup;


import java.util.List;

import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.handler.codec.MessageToMessageEncoder;


import java.util.List;

public class RequestMessageTypeEncoder extends MessageToMessageEncoder<RequestMessageType> public class RequestMessageTypeEncoder extends MessageToMessageEncoder<RequestMessageType>
{ {
@Override @Override
protected void encode( ChannelHandlerContext ctx, RequestMessageType request, List<Object> out ) throws Exception protected void encode( ChannelHandlerContext ctx, RequestMessageType request, List<Object> out ) throws Exception
{ {
ByteBuf encoded = ctx.alloc().buffer(); ByteBuf encoded = ctx.alloc().buffer();
encoded.writeByte( request.version() );
encoded.writeByte( request.messageType() ); encoded.writeByte( request.messageType() );
out.add( encoded ); out.add( encoded );
} }
Expand Down
Expand Up @@ -19,37 +19,50 @@
*/ */
package org.neo4j.coreedge.catchup; package org.neo4j.coreedge.catchup;


import static java.lang.String.format; import org.neo4j.coreedge.messaging.Message;


public enum ResponseMessageType public enum ResponseMessageType implements Message
{ {
TX( (byte) 1 ), TX( CURRENT_VERSION, (byte) 1 ),
STORE_ID( (byte) 2 ), STORE_ID( CURRENT_VERSION, (byte) 2 ),
FILE( (byte) 3 ), FILE( CURRENT_VERSION, (byte) 3 ),
STORE_COPY_FINISHED( (byte) 4 ), STORE_COPY_FINISHED( CURRENT_VERSION, (byte) 4 ),
CORE_SNAPSHOT( (byte) 5 ), CORE_SNAPSHOT( CURRENT_VERSION, (byte) 5 ),
TX_STREAM_FINISHED( (byte) 6 ), TX_STREAM_FINISHED( CURRENT_VERSION, (byte) 6 ),
UNKNOWN( (byte) 200 ),; UNKNOWN( CURRENT_VERSION, (byte) 200 ),;


private byte version;
private byte messageType; private byte messageType;


ResponseMessageType( byte messageType ) ResponseMessageType( byte version, byte messageType )
{ {
this.version = version;
this.messageType = messageType; this.messageType = messageType;
} }


public static ResponseMessageType from( byte b ) public static ResponseMessageType from( byte version, byte messageType )
{ {
if ( version != CURRENT_VERSION )
{
return UNKNOWN;
}

for ( ResponseMessageType responseMessageType : values() ) for ( ResponseMessageType responseMessageType : values() )
{ {
if ( responseMessageType.messageType == b ) if ( responseMessageType.messageType == messageType )
{ {
return responseMessageType; return responseMessageType;
} }
} }
return UNKNOWN; return UNKNOWN;
} }


@Override
public byte version()
{
return version;
}

public byte messageType() public byte messageType()
{ {
return messageType; return messageType;
Expand All @@ -58,6 +71,6 @@ public byte messageType()
@Override @Override
public String toString() public String toString()
{ {
return format( "ResponseMessageType{messageType=%s}", messageType ); return "ResponseMessageType{" + "version=" + version + ", messageType=" + messageType + '}';
} }
} }
Expand Up @@ -31,6 +31,7 @@ public class ResponseMessageTypeEncoder extends MessageToMessageEncoder<Response
protected void encode( ChannelHandlerContext ctx, ResponseMessageType response, List<Object> out ) throws Exception protected void encode( ChannelHandlerContext ctx, ResponseMessageType response, List<Object> out ) throws Exception
{ {
ByteBuf encoded = ctx.alloc().buffer(); ByteBuf encoded = ctx.alloc().buffer();
encoded.writeByte( response.version() );
encoded.writeByte( response.messageType() ); encoded.writeByte( response.messageType() );
out.add( encoded ); out.add( encoded );
} }
Expand Down
Expand Up @@ -44,7 +44,10 @@ public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exceptio
{ {
if ( protocol.isExpecting( CatchupServerProtocol.State.MESSAGE_TYPE ) ) if ( protocol.isExpecting( CatchupServerProtocol.State.MESSAGE_TYPE ) )
{ {
RequestMessageType requestMessageType = RequestMessageType.from( ((ByteBuf) msg).readByte() ); ByteBuf buffer = (ByteBuf) msg;
byte version = buffer.readByte();
byte messageType = buffer.readByte();
RequestMessageType requestMessageType = RequestMessageType.from( version, messageType );


if ( requestMessageType.equals( RequestMessageType.TX_PULL_REQUEST ) ) if ( requestMessageType.equals( RequestMessageType.TX_PULL_REQUEST ) )
{ {
Expand All @@ -64,7 +67,7 @@ else if ( requestMessageType.equals( RequestMessageType.RAFT_STATE ) )
} }
else else
{ {
log.warn( "No handler found for message type %s", requestMessageType ); log.warn( "No handler found for version %d and message type %s", version, requestMessageType );
} }


ReferenceCountUtil.release( msg ); ReferenceCountUtil.release( msg );
Expand Down

0 comments on commit 7d3cc58

Please sign in to comment.