diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/VersionCheckerChannelInboundHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/VersionCheckerChannelInboundHandler.java
new file mode 100644
index 0000000000000..a7c9bd67ed0fb
--- /dev/null
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/VersionCheckerChannelInboundHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.coreedge;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+import java.util.function.Predicate;
+
+import org.neo4j.coreedge.messaging.Message;
+import org.neo4j.logging.Log;
+import org.neo4j.logging.LogProvider;
+
+public abstract class VersionCheckerChannelInboundHandler extends SimpleChannelInboundHandler
+{
+ private final Predicate versionChecker;
+ private final Log log;
+
+ protected VersionCheckerChannelInboundHandler( Predicate versionChecker, LogProvider logProvider )
+ {
+ this.versionChecker = versionChecker;
+ this.log = logProvider.getLog( getClass() );
+ }
+
+ @Override
+ protected final void channelRead0( ChannelHandlerContext ctx, M message ) throws Exception
+ {
+ if ( !versionChecker.test( message ) )
+ {
+ log.error( "Unsupported version %d, unable to process message %s", message.version(), message );
+ return;
+ }
+
+ doChannelRead0( ctx, message );
+ }
+
+ protected abstract void doChannelRead0( ChannelHandlerContext ctx, M msg ) throws Exception;
+}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java
index 91eb57538440e..9e57294755c40 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java
@@ -33,6 +33,7 @@
import io.netty.handler.stream.ChunkedWriteHandler;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import org.neo4j.coreedge.catchup.CatchupServerProtocol.State;
@@ -52,6 +53,7 @@
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotRequestHandler;
import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.coreedge.logging.ExceptionLoggingHandler;
+import org.neo4j.coreedge.messaging.Message;
import org.neo4j.coreedge.messaging.address.ListenSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.NeoStoreDataSource;
@@ -63,6 +65,8 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
+import static org.neo4j.coreedge.messaging.Message.CURRENT_VERSION;
+
public class CatchupServer extends LifecycleAdapter
{
private final LogProvider logProvider;
@@ -135,14 +139,16 @@ protected void initChannel( SocketChannel ch ) throws Exception
pipeline.addLast( decoders( protocol ) );
- pipeline.addLast( new TxPullRequestHandler( protocol, storeIdSupplier,
- transactionIdStoreSupplier, logicalTransactionStoreSupplier,
- monitors, logProvider ) );
+ Predicate versionChecker = (m) -> m.version() == CURRENT_VERSION;
+ pipeline.addLast( new TxPullRequestHandler( versionChecker, protocol, storeIdSupplier,
+ transactionIdStoreSupplier, logicalTransactionStoreSupplier, monitors, logProvider ) );
pipeline.addLast( new ChunkedWriteHandler() );
- pipeline.addLast( new GetStoreRequestHandler( protocol, dataSourceSupplier,
- checkPointerSupplier ) );
- pipeline.addLast( new GetStoreIdRequestHandler( protocol, storeIdSupplier ) );
- pipeline.addLast( new CoreSnapshotRequestHandler( protocol, coreState ) );
+ pipeline.addLast( new GetStoreRequestHandler( versionChecker, protocol, dataSourceSupplier,
+ checkPointerSupplier, logProvider ) );
+ pipeline.addLast( new GetStoreIdRequestHandler( versionChecker, protocol, storeIdSupplier,
+ logProvider ) );
+ pipeline.addLast(
+ new CoreSnapshotRequestHandler( versionChecker, protocol, coreState, logProvider ) );
pipeline.addLast( new ExceptionLoggingHandler( log ) );
}
} );
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/ClientMessageTypeHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/ClientMessageTypeHandler.java
index 090d9d61ad2cd..5ea1a3b2bf60c 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/ClientMessageTypeHandler.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/ClientMessageTypeHandler.java
@@ -45,7 +45,10 @@ public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exceptio
{
if ( protocol.isExpecting( CatchupClientProtocol.State.MESSAGE_TYPE ) )
{
- ResponseMessageType responseMessageType = from( ((ByteBuf) msg).readByte() );
+ ByteBuf buffer = (ByteBuf) msg;
+ byte version = buffer.readByte();
+ byte messageType = buffer.readByte();
+ ResponseMessageType responseMessageType = from( version, messageType );
switch ( responseMessageType )
{
@@ -68,7 +71,7 @@ public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exceptio
protocol.expect( CatchupClientProtocol.State.TX_STREAM_FINISHED );
break;
default:
- log.warn( "No handler found for message type %s", responseMessageType );
+ log.warn( "No handler found for version %d and message type %s", version, responseMessageType );
}
ReferenceCountUtil.release( msg );
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreClient.java
index 26ce1ec2e98b7..4da083c8229e1 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreClient.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreClient.java
@@ -62,6 +62,7 @@
import org.neo4j.logging.LogProvider;
import static java.util.Arrays.asList;
+import static org.neo4j.coreedge.messaging.Message.CURRENT_VERSION;
public abstract class CoreClient extends LifecycleAdapter implements StoreFileReceiver, StoreIdReceiver,
StoreFileStreamingCompleteListener, TxStreamCompleteListener, TxPullResponseListener, CoreSnapshotListener
@@ -91,27 +92,27 @@ public CoreClient( LogProvider logProvider, ChannelInitializer ch
public void requestStore( MemberId serverAddress )
{
- GetStoreRequest getStoreRequest = new GetStoreRequest();
+ GetStoreRequest getStoreRequest = new GetStoreRequest( CURRENT_VERSION );
send( serverAddress, RequestMessageType.STORE, getStoreRequest );
}
public void requestStoreId( MemberId serverAddress )
{
- GetStoreIdRequest getStoreIdRequest = new GetStoreIdRequest();
+ GetStoreIdRequest getStoreIdRequest = new GetStoreIdRequest( CURRENT_VERSION );
send( serverAddress, RequestMessageType.STORE_ID, getStoreIdRequest );
}
public CompletableFuture requestCoreSnapshot( MemberId serverAddress )
{
coreSnapshotFuture = new CompletableFuture<>();
- CoreSnapshotRequest coreSnapshotRequest = new CoreSnapshotRequest();
+ CoreSnapshotRequest coreSnapshotRequest = new CoreSnapshotRequest( CURRENT_VERSION );
send( serverAddress, RequestMessageType.RAFT_STATE, coreSnapshotRequest );
return coreSnapshotFuture;
}
public void pollForTransactions( MemberId serverAddress, StoreId storeId, long lastTransactionId )
{
- TxPullRequest txPullRequest = new TxPullRequest( lastTransactionId, storeId );
+ TxPullRequest txPullRequest = new TxPullRequest( CURRENT_VERSION, lastTransactionId, storeId );
send( serverAddress, RequestMessageType.TX_PULL_REQUEST, txPullRequest );
pullRequestMonitor.txPullRequest( lastTransactionId );
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreToCoreClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreToCoreClient.java
index 6494ff15e5392..c7344c0986fe9 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreToCoreClient.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreToCoreClient.java
@@ -26,6 +26,7 @@
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import org.neo4j.coreedge.catchup.storecopy.FileContentHandler;
import org.neo4j.coreedge.catchup.storecopy.FileHeaderHandler;
@@ -39,10 +40,13 @@
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.logging.ExceptionLoggingHandler;
import org.neo4j.coreedge.messaging.IdleChannelReaperHandler;
+import org.neo4j.coreedge.messaging.Message;
import org.neo4j.coreedge.messaging.NonBlockingChannels;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;
+import static org.neo4j.coreedge.messaging.Message.CURRENT_VERSION;
+
public class CoreToCoreClient extends CoreClient
{
public CoreToCoreClient( LogProvider logProvider, ChannelInitializer channelInitializer, Monitors monitors,
@@ -89,12 +93,13 @@ protected void initChannel( SocketChannel ch ) throws Exception
pipeline.addLast( owner.decoders( protocol ) );
- pipeline.addLast( new TxPullResponseHandler( protocol, owner ) );
- pipeline.addLast( new CoreSnapshotResponseHandler( protocol, owner ) );
- pipeline.addLast( new StoreCopyFinishedResponseHandler( protocol, owner ) );
- pipeline.addLast( new TxStreamFinishedResponseHandler( protocol, owner ) );
- pipeline.addLast( new FileHeaderHandler( protocol, logProvider ) );
- pipeline.addLast( new FileContentHandler( protocol, owner ) );
+ Predicate versionChecker = (m) -> m.version() == CURRENT_VERSION;
+ pipeline.addLast( new TxPullResponseHandler( versionChecker, protocol, owner, logProvider ) );
+ pipeline.addLast( new CoreSnapshotResponseHandler( versionChecker, protocol, owner, logProvider ) );
+ pipeline.addLast( new StoreCopyFinishedResponseHandler( versionChecker, protocol, owner, logProvider ) );
+ pipeline.addLast( new TxStreamFinishedResponseHandler( versionChecker, protocol, owner, logProvider ) );
+ pipeline.addLast( new FileHeaderHandler( versionChecker, protocol, logProvider ) );
+ pipeline.addLast( new FileContentHandler( versionChecker, protocol, owner, logProvider ) );
pipeline.addLast( new IdleStateHandler( 0, 0, 2, TimeUnit.MINUTES) );
pipeline.addLast( new IdleChannelReaperHandler(nonBlockingChannels));
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/RequestMessageType.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/RequestMessageType.java
index 7bcb77b3f7a70..f9ef6aa7b27ed 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/RequestMessageType.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/RequestMessageType.java
@@ -21,28 +21,33 @@
import org.neo4j.coreedge.messaging.Message;
-import static java.lang.String.format;
-
public enum RequestMessageType implements Message
{
- TX_PULL_REQUEST( (byte) 1 ),
- STORE( (byte) 2 ),
- RAFT_STATE( (byte) 3 ),
- STORE_ID( (byte) 4 ),
- UNKNOWN( (byte) 404 );
+ TX_PULL_REQUEST( CURRENT_VERSION, (byte) 1 ),
+ STORE( CURRENT_VERSION, (byte) 2 ),
+ RAFT_STATE( CURRENT_VERSION, (byte) 3 ),
+ STORE_ID( CURRENT_VERSION, (byte) 4 ),
+ UNKNOWN( CURRENT_VERSION, (byte) 404 );
+ private byte version;
private byte messageType;
- RequestMessageType( byte messageType )
+ RequestMessageType( byte version, byte messageType )
{
+ this.version = version;
this.messageType = messageType;
}
- public static RequestMessageType from( byte b )
+ public static RequestMessageType from( byte version, byte messageType )
{
+ if ( version != CURRENT_VERSION )
+ {
+ return UNKNOWN;
+ }
+
for ( RequestMessageType responseMessageType : values() )
{
- if ( responseMessageType.messageType == b )
+ if ( responseMessageType.messageType == messageType )
{
return responseMessageType;
}
@@ -50,6 +55,12 @@ public static RequestMessageType from( byte b )
return UNKNOWN;
}
+ @Override
+ public byte version()
+ {
+ return version;
+ }
+
public byte messageType()
{
return messageType;
@@ -58,6 +69,6 @@ public byte messageType()
@Override
public String toString()
{
- return format( "RequestMessageType{messageType=%s}", messageType );
+ return "RequestMessageType{" + "version=" + version + ", messageType=" + messageType + '}';
}
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/RequestMessageTypeEncoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/RequestMessageTypeEncoder.java
index 236114f6a122f..253b6248bf3a7 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/RequestMessageTypeEncoder.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/RequestMessageTypeEncoder.java
@@ -19,18 +19,19 @@
*/
package org.neo4j.coreedge.catchup;
-import java.util.List;
-
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
+import java.util.List;
+
public class RequestMessageTypeEncoder extends MessageToMessageEncoder
{
@Override
protected void encode( ChannelHandlerContext ctx, RequestMessageType request, List