diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupClientProtocol.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupClientProtocol.java index 661ca0fb9b263..5ce89281c169d 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupClientProtocol.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupClientProtocol.java @@ -19,18 +19,20 @@ */ package org.neo4j.coreedge.catchup; -public class CatchupClientProtocol +public class CatchupClientProtocol implements Protocol { private NextMessage nextMessage = NextMessage.MESSAGE_TYPE; + @Override public void expect( NextMessage nextMessage ) { this.nextMessage = nextMessage; } - public boolean isExpecting( NextMessage message ) + @Override + public NextMessage expecting() { - return this.nextMessage == message; + return nextMessage; } public enum NextMessage diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java index f9e157c295fef..89a1735e2ec99 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java @@ -152,7 +152,8 @@ protected void initChannel( SocketChannel ch ) throws Exception private ChannelInboundHandler decoders( CatchupServerProtocol protocol ) { - RequestDecoderDispatcher decoderDispatcher = new RequestDecoderDispatcher( protocol, logProvider ); + RequestDecoderDispatcher decoderDispatcher = + new RequestDecoderDispatcher<>( protocol, logProvider ); decoderDispatcher.register( NextMessage.TX_PULL, new TxPullRequestDecoder() ); decoderDispatcher.register( NextMessage.GET_STORE, new SimpleRequestDecoder( GetStoreRequest::new ) ); decoderDispatcher.register( NextMessage.GET_STORE_ID, new SimpleRequestDecoder( GetStoreIdRequest::new ) ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServerProtocol.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServerProtocol.java index 590c26cc07a95..526d3a761d503 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServerProtocol.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServerProtocol.java @@ -19,16 +19,18 @@ */ package org.neo4j.coreedge.catchup; -public class CatchupServerProtocol +public class CatchupServerProtocol implements Protocol { private NextMessage nextMessage = NextMessage.MESSAGE_TYPE; + @Override public void expect( NextMessage nextMessage ) { this.nextMessage = nextMessage; } - NextMessage expecting() + @Override + public NextMessage expecting() { return nextMessage; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/ClientMessageTypeHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/ClientMessageTypeHandler.java index f148c4b5f0216..7244cbdce4a8d 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/ClientMessageTypeHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/ClientMessageTypeHandler.java @@ -27,7 +27,6 @@ import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -import static org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage; import static org.neo4j.coreedge.catchup.ResponseMessageType.from; public class ClientMessageTypeHandler extends ChannelInboundHandlerAdapter @@ -44,29 +43,29 @@ public ClientMessageTypeHandler( CatchupClientProtocol protocol, LogProvider log @Override public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exception { - if ( protocol.isExpecting( NextMessage.MESSAGE_TYPE ) ) + if ( CatchupClientProtocol.NextMessage.MESSAGE_TYPE.equals( protocol.expecting() ) ) { ResponseMessageType responseMessageType = from( ((ByteBuf) msg).readByte() ); switch ( responseMessageType ) { case STORE_ID: - protocol.expect( NextMessage.STORE_ID ); + protocol.expect( CatchupClientProtocol.NextMessage.STORE_ID ); break; case TX: - protocol.expect( NextMessage.TX_PULL_RESPONSE ); + protocol.expect( CatchupClientProtocol.NextMessage.TX_PULL_RESPONSE ); break; case FILE: - protocol.expect( NextMessage.FILE_HEADER ); + protocol.expect( CatchupClientProtocol.NextMessage.FILE_HEADER ); break; case STORE_COPY_FINISHED: - protocol.expect( NextMessage.STORE_COPY_FINISHED ); + protocol.expect( CatchupClientProtocol.NextMessage.STORE_COPY_FINISHED ); break; case CORE_SNAPSHOT: - protocol.expect( NextMessage.CORE_SNAPSHOT ); + protocol.expect( CatchupClientProtocol.NextMessage.CORE_SNAPSHOT ); break; case TX_STREAM_FINISHED: - protocol.expect( NextMessage.TX_STREAM_FINISHED ); + protocol.expect( CatchupClientProtocol.NextMessage.TX_STREAM_FINISHED ); break; default: log.warn( "No handler found for message type %s", responseMessageType ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreClient.java index 251aedcff77b1..882fd894b1c02 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreClient.java @@ -19,14 +19,20 @@ */ package org.neo4j.coreedge.catchup; -import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; - +import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage; +import org.neo4j.coreedge.catchup.storecopy.FileContentDecoder; +import org.neo4j.coreedge.catchup.storecopy.FileHeaderDecoder; import org.neo4j.coreedge.catchup.storecopy.GetStoreIdRequest; +import org.neo4j.coreedge.catchup.storecopy.GetStoreIdResponseDecoder; import org.neo4j.coreedge.catchup.storecopy.GetStoreRequest; +import org.neo4j.coreedge.catchup.storecopy.StoreCopyFinishedResponseDecoder; import org.neo4j.coreedge.catchup.storecopy.StoreFileReceiver; import org.neo4j.coreedge.catchup.storecopy.StoreFileStreamingCompleteListener; import org.neo4j.coreedge.catchup.storecopy.StoreFileStreams; @@ -34,9 +40,12 @@ import org.neo4j.coreedge.catchup.tx.PullRequestMonitor; import org.neo4j.coreedge.catchup.tx.TxPullRequest; import org.neo4j.coreedge.catchup.tx.TxPullResponse; +import org.neo4j.coreedge.catchup.tx.TxPullResponseDecoder; import org.neo4j.coreedge.catchup.tx.TxPullResponseListener; import org.neo4j.coreedge.catchup.tx.TxStreamCompleteListener; +import org.neo4j.coreedge.catchup.tx.TxStreamFinishedResponseDecoder; import org.neo4j.coreedge.core.state.snapshot.CoreSnapshot; +import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotDecoder; import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotListener; import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotRequest; import org.neo4j.coreedge.discovery.TopologyService; @@ -55,26 +64,26 @@ import static java.util.Arrays.asList; public abstract class CoreClient extends LifecycleAdapter implements StoreFileReceiver, StoreIdReceiver, - StoreFileStreamingCompleteListener, - TxStreamCompleteListener, TxPullResponseListener, - CoreSnapshotListener + StoreFileStreamingCompleteListener, TxStreamCompleteListener, TxPullResponseListener, CoreSnapshotListener { - private final PullRequestMonitor pullRequestMonitor; + private final LogProvider logProvider; private final SenderService senderService; - private StoreFileStreams storeFileStreams; - private Consumer storeIdConsumer; + private final Outbound outbound; + private final PullRequestMonitor pullRequestMonitor; + private final Listeners storeFileStreamingCompleteListeners = new Listeners<>(); private final Listeners txStreamCompleteListeners = new Listeners<>(); private final Listeners txPullResponseListeners = new Listeners<>(); - private CompletableFuture coreSnapshotFuture; - private Outbound outbound; + private StoreFileStreams storeFileStreams; + private Consumer storeIdConsumer; + private CompletableFuture coreSnapshotFuture; public CoreClient( LogProvider logProvider, ChannelInitializer channelInitializer, Monitors monitors, - int maxQueueSize, NonBlockingChannels nonBlockingChannels, TopologyService discoveryService, - long logThresholdMillis ) + int maxQueueSize, NonBlockingChannels nonBlockingChannels, TopologyService discoveryService, long logThresholdMillis ) { - senderService = + this.logProvider = logProvider; + this.senderService = new SenderService( channelInitializer, logProvider, monitors, maxQueueSize, nonBlockingChannels ); this.outbound = new CoreOutbound( discoveryService, senderService, logProvider, logThresholdMillis ); this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class ); @@ -163,8 +172,7 @@ public void setStoreIdConsumer( Consumer storeIdConsumer ) @Override public void onFileStreamingComplete( long lastCommittedTxBeforeStoreCopy ) { - storeFileStreamingCompleteListeners.notify( - listener -> listener.onFileStreamingComplete( lastCommittedTxBeforeStoreCopy ) ); + storeFileStreamingCompleteListeners.notify( listener -> listener.onFileStreamingComplete( lastCommittedTxBeforeStoreCopy ) ); } @Override @@ -200,4 +208,18 @@ public void removeTxStreamCompleteListener( TxStreamCompleteListener listener ) { txStreamCompleteListeners.remove( listener ); } + + protected ChannelInboundHandler decoders( CatchupClientProtocol protocol ) + { + RequestDecoderDispatcher decoderDispatcher = + new RequestDecoderDispatcher<>( protocol, logProvider ); + decoderDispatcher.register( NextMessage.STORE_ID, new GetStoreIdResponseDecoder() ); + decoderDispatcher.register( NextMessage.TX_PULL_RESPONSE, new TxPullResponseDecoder() ); + decoderDispatcher.register( NextMessage.CORE_SNAPSHOT, new CoreSnapshotDecoder() ); + decoderDispatcher.register( NextMessage.STORE_COPY_FINISHED, new StoreCopyFinishedResponseDecoder() ); + decoderDispatcher.register( NextMessage.TX_STREAM_FINISHED, new TxStreamFinishedResponseDecoder() ); + decoderDispatcher.register( NextMessage.FILE_HEADER, new FileHeaderDecoder() ); + decoderDispatcher.register( NextMessage.FILE_CONTENTS, new FileContentDecoder() ); + return decoderDispatcher; + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreToCoreClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreToCoreClient.java index 43f8c1acbd33f..6494ff15e5392 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreToCoreClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreToCoreClient.java @@ -28,23 +28,18 @@ import java.util.concurrent.TimeUnit; import org.neo4j.coreedge.catchup.storecopy.FileContentHandler; -import org.neo4j.coreedge.catchup.storecopy.FileHeaderDecoder; import org.neo4j.coreedge.catchup.storecopy.FileHeaderHandler; import org.neo4j.coreedge.catchup.storecopy.GetStoreRequestEncoder; -import org.neo4j.coreedge.catchup.storecopy.StoreCopyFinishedResponseDecoder; import org.neo4j.coreedge.catchup.storecopy.StoreCopyFinishedResponseHandler; import org.neo4j.coreedge.catchup.tx.TxPullRequestEncoder; -import org.neo4j.coreedge.catchup.tx.TxPullResponseDecoder; import org.neo4j.coreedge.catchup.tx.TxPullResponseHandler; -import org.neo4j.coreedge.catchup.tx.TxStreamFinishedResponseDecoder; import org.neo4j.coreedge.catchup.tx.TxStreamFinishedResponseHandler; -import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotDecoder; import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotRequestEncoder; import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotResponseHandler; import org.neo4j.coreedge.discovery.CoreTopologyService; +import org.neo4j.coreedge.logging.ExceptionLoggingHandler; import org.neo4j.coreedge.messaging.IdleChannelReaperHandler; import org.neo4j.coreedge.messaging.NonBlockingChannels; -import org.neo4j.coreedge.logging.ExceptionLoggingHandler; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.LogProvider; @@ -92,18 +87,12 @@ protected void initChannel( SocketChannel ch ) throws Exception pipeline.addLast( new ClientMessageTypeHandler( protocol, logProvider ) ); - pipeline.addLast( new TxPullResponseDecoder( protocol ) ); - pipeline.addLast( new CoreSnapshotDecoder( protocol ) ); - pipeline.addLast( new StoreCopyFinishedResponseDecoder( protocol ) ); - pipeline.addLast( new TxStreamFinishedResponseDecoder( protocol ) ); - pipeline.addLast( new FileHeaderDecoder( protocol ) ); + pipeline.addLast( owner.decoders( protocol ) ); pipeline.addLast( new TxPullResponseHandler( protocol, owner ) ); pipeline.addLast( new CoreSnapshotResponseHandler( protocol, owner ) ); pipeline.addLast( new StoreCopyFinishedResponseHandler( protocol, owner ) ); pipeline.addLast( new TxStreamFinishedResponseHandler( protocol, owner ) ); - - // keep these after type-specific handlers since they process ByteBufs pipeline.addLast( new FileHeaderHandler( protocol, logProvider ) ); pipeline.addLast( new FileContentHandler( protocol, owner ) ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/Protocol.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/Protocol.java new file mode 100644 index 0000000000000..6aa1e2976e5d3 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/Protocol.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.catchup; + +public interface Protocol> +{ + void expect( E next ); + + E expecting(); +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/RequestDecoderDispatcher.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/RequestDecoderDispatcher.java index 70a64bf910803..d5b8863b69132 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/RequestDecoderDispatcher.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/RequestDecoderDispatcher.java @@ -26,17 +26,16 @@ import java.util.HashMap; import java.util.Map; -import org.neo4j.coreedge.catchup.CatchupServerProtocol.NextMessage; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -public class RequestDecoderDispatcher extends ChannelInboundHandlerAdapter +class RequestDecoderDispatcher> extends ChannelInboundHandlerAdapter { - private final Map decoders = new HashMap<>(); - private final CatchupServerProtocol protocol; + private final Map decoders = new HashMap<>(); + private final Protocol protocol; private final Log log; - public RequestDecoderDispatcher( CatchupServerProtocol protocol, LogProvider logProvider ) + RequestDecoderDispatcher( Protocol protocol, LogProvider logProvider ) { this.protocol = protocol; this.log = logProvider.getLog( getClass() ); @@ -45,18 +44,17 @@ public RequestDecoderDispatcher( CatchupServerProtocol protocol, LogProvider log @Override public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exception { - NextMessage expecting = protocol.expecting(); + E expecting = protocol.expecting(); ChannelInboundHandler delegate = decoders.get( expecting ); if ( delegate == null ) { log.warn( "Unknown message %s", expecting ); return; } - delegate.channelRead( ctx, msg ); } - public void register( NextMessage type, ChannelInboundHandler decoder ) + public void register( E type, ChannelInboundHandler decoder ) { assert !decoders.containsKey( type ) : "registering twice a decoder for the same type?"; decoders.put( type, decoder ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/ServerMessageTypeHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/ServerMessageTypeHandler.java index eaf42a3b6e46b..ad74af870a8c5 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/ServerMessageTypeHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/ServerMessageTypeHandler.java @@ -27,7 +27,6 @@ import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -import static org.neo4j.coreedge.catchup.CatchupServerProtocol.NextMessage; class ServerMessageTypeHandler extends ChannelInboundHandlerAdapter { @@ -43,25 +42,25 @@ class ServerMessageTypeHandler extends ChannelInboundHandlerAdapter @Override public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exception { - if ( protocol.expecting().equals( NextMessage.MESSAGE_TYPE ) ) + if ( CatchupServerProtocol.NextMessage.MESSAGE_TYPE.equals( protocol.expecting() ) ) { RequestMessageType requestMessageType = RequestMessageType.from( ((ByteBuf) msg).readByte() ); if ( requestMessageType.equals( RequestMessageType.TX_PULL_REQUEST ) ) { - protocol.expect( NextMessage.TX_PULL ); + protocol.expect( CatchupServerProtocol.NextMessage.TX_PULL ); } else if ( requestMessageType.equals( RequestMessageType.STORE ) ) { - protocol.expect( NextMessage.GET_STORE ); + protocol.expect( CatchupServerProtocol.NextMessage.GET_STORE ); } else if ( requestMessageType.equals( RequestMessageType.STORE_ID ) ) { - protocol.expect( NextMessage.GET_STORE_ID ); + protocol.expect( CatchupServerProtocol.NextMessage.GET_STORE_ID ); } else if ( requestMessageType.equals( RequestMessageType.RAFT_STATE ) ) { - protocol.expect( NextMessage.GET_RAFT_STATE ); + protocol.expect( CatchupServerProtocol.NextMessage.GET_RAFT_STATE ); } else { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/EdgeToCoreClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/EdgeToCoreClient.java index 694c7818589f0..395bbcb91953e 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/EdgeToCoreClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/EdgeToCoreClient.java @@ -29,18 +29,16 @@ import org.neo4j.coreedge.catchup.CatchupClientProtocol; import org.neo4j.coreedge.catchup.ClientMessageTypeHandler; +import org.neo4j.coreedge.catchup.CoreClient; import org.neo4j.coreedge.catchup.RequestMessageTypeEncoder; import org.neo4j.coreedge.catchup.ResponseMessageTypeEncoder; -import org.neo4j.coreedge.catchup.CoreClient; import org.neo4j.coreedge.catchup.tx.TxPullRequestEncoder; -import org.neo4j.coreedge.catchup.tx.TxPullResponseDecoder; import org.neo4j.coreedge.catchup.tx.TxPullResponseHandler; -import org.neo4j.coreedge.catchup.tx.TxStreamFinishedResponseDecoder; import org.neo4j.coreedge.catchup.tx.TxStreamFinishedResponseHandler; import org.neo4j.coreedge.discovery.TopologyService; +import org.neo4j.coreedge.logging.ExceptionLoggingHandler; import org.neo4j.coreedge.messaging.IdleChannelReaperHandler; import org.neo4j.coreedge.messaging.NonBlockingChannels; -import org.neo4j.coreedge.logging.ExceptionLoggingHandler; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.LogProvider; @@ -88,18 +86,12 @@ protected void initChannel( SocketChannel ch ) throws Exception pipeline.addLast( new ClientMessageTypeHandler( protocol, logProvider ) ); - pipeline.addLast( new GetStoreIdResponseDecoder( protocol ) ); - pipeline.addLast( new TxPullResponseDecoder( protocol ) ); - pipeline.addLast( new StoreCopyFinishedResponseDecoder( protocol ) ); - pipeline.addLast( new TxStreamFinishedResponseDecoder( protocol ) ); - pipeline.addLast( new FileHeaderDecoder( protocol ) ); + pipeline.addLast( owner.decoders( protocol ) ); pipeline.addLast( new GetStoreIdResponseHandler( protocol, owner ) ); pipeline.addLast( new TxPullResponseHandler( protocol, owner ) ); pipeline.addLast( new StoreCopyFinishedResponseHandler( protocol, owner ) ); pipeline.addLast( new TxStreamFinishedResponseHandler( protocol, owner ) ); - - // keep these after type-specific handlers since they process ByteBufs pipeline.addLast( new FileHeaderHandler( protocol, logProvider ) ); pipeline.addLast( new FileContentHandler( protocol, owner ) ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/FileContent.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/FileContent.java new file mode 100644 index 0000000000000..41bc035ab7a72 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/FileContent.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.catchup.storecopy; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.io.OutputStream; + +class FileContent implements AutoCloseable +{ + private final ByteBuf msg; + + FileContent( ByteBuf msg ) + { + msg.retain(); + this.msg = msg; + } + + int writeTo( OutputStream stream ) throws IOException + { + int bytes = msg.readableBytes(); + msg.readBytes( stream, bytes ); + return bytes; + } + + @Override + public void close() + { + msg.release(); + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/FileContentDecoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/FileContentDecoder.java new file mode 100644 index 0000000000000..67fed8d916e4c --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/FileContentDecoder.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.catchup.storecopy; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; + +import java.util.List; + +public class FileContentDecoder extends MessageToMessageDecoder +{ + @Override + protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out ) throws Exception + { + out.add( new FileContent( msg ) ); + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/FileContentHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/FileContentHandler.java index 280eeedc8e19d..164ea3cc4231e 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/FileContentHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/FileContentHandler.java @@ -19,17 +19,14 @@ */ package org.neo4j.coreedge.catchup.storecopy; -import java.io.OutputStream; - -import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import org.neo4j.coreedge.catchup.CatchupClientProtocol; +import java.io.OutputStream; -import static org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage; +import org.neo4j.coreedge.catchup.CatchupClientProtocol; -public class FileContentHandler extends SimpleChannelInboundHandler +public class FileContentHandler extends SimpleChannelInboundHandler { private final CatchupClientProtocol protocol; private long expectedBytes = 0; @@ -50,22 +47,17 @@ void setExpectedFile( FileHeader fileHeader ) } @Override - protected void channelRead0( ChannelHandlerContext ctx, ByteBuf msg ) throws Exception + protected void channelRead0( ChannelHandlerContext ctx, FileContent fileContent ) throws Exception { - if ( protocol.isExpecting( NextMessage.FILE_CONTENTS ) ) + try ( FileContent content = fileContent; + OutputStream outputStream = location.getStoreFileStreams().createStream( destination ) ) { - int bytesInMessage = msg.readableBytes(); - try ( OutputStream outputStream = location.getStoreFileStreams().createStream( destination ) ) - { - msg.readBytes( outputStream, bytesInMessage ); - } - - expectedBytes -= bytesInMessage; + expectedBytes -= content.writeTo( outputStream ); + } - if ( expectedBytes <= 0 ) - { - protocol.expect( NextMessage.MESSAGE_TYPE ); - } + if ( expectedBytes <= 0 ) + { + protocol.expect( CatchupClientProtocol.NextMessage.MESSAGE_TYPE ); } } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/FileHeaderDecoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/FileHeaderDecoder.java index 7262eef648649..a0da4c8088978 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/FileHeaderDecoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/FileHeaderDecoder.java @@ -19,42 +19,21 @@ */ package org.neo4j.coreedge.catchup.storecopy; -import java.util.List; - import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; -import org.neo4j.coreedge.catchup.CatchupClientProtocol; - -import static org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage; +import java.util.List; public class FileHeaderDecoder extends MessageToMessageDecoder { - private final CatchupClientProtocol protocol; - - public FileHeaderDecoder( CatchupClientProtocol protocol ) - { - this.protocol = protocol; - } - @Override protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out ) throws Exception { - if ( protocol.isExpecting( NextMessage.FILE_HEADER ) ) - { - int nameLength = msg.readInt(); - byte[] name = new byte[nameLength]; - msg.readBytes( name ); - - long fileLength = msg.readLong(); - - out.add( new FileHeader( new String( name ), fileLength ) ); - } - else - { - out.add( Unpooled.copiedBuffer( msg ) ); - } + int nameLength = msg.readInt(); + byte[] name = new byte[nameLength]; + msg.readBytes( name ); + long fileLength = msg.readLong(); + out.add( new FileHeader( new String( name ), fileLength ) ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/GetStoreIdResponseDecoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/GetStoreIdResponseDecoder.java index 0a97723259f24..b5934d5a1b6cd 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/GetStoreIdResponseDecoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/GetStoreIdResponseDecoder.java @@ -20,37 +20,21 @@ package org.neo4j.coreedge.catchup.storecopy; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import java.util.List; -import org.neo4j.coreedge.catchup.CatchupClientProtocol; +import org.neo4j.coreedge.identity.StoreId; import org.neo4j.coreedge.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.coreedge.messaging.marsalling.storeid.StoreIdMarshal; -import org.neo4j.coreedge.identity.StoreId; -class GetStoreIdResponseDecoder extends MessageToMessageDecoder +public class GetStoreIdResponseDecoder extends MessageToMessageDecoder { - private final CatchupClientProtocol protocol; - - GetStoreIdResponseDecoder( CatchupClientProtocol protocol ) - { - this.protocol = protocol; - } - @Override protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out ) throws Exception { - if ( protocol.isExpecting( CatchupClientProtocol.NextMessage.STORE_ID ) ) - { - StoreId storeId = StoreIdMarshal.unmarshal( new NetworkReadableClosableChannelNetty4( msg ) ); - out.add( new GetStoreIdResponse( storeId ) ); - } - else - { - out.add( Unpooled.copiedBuffer( msg ) ); - } + StoreId storeId = StoreIdMarshal.unmarshal( new NetworkReadableClosableChannelNetty4( msg ) ); + out.add( new GetStoreIdResponse( storeId ) ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/GetStoreIdResponseHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/GetStoreIdResponseHandler.java index 9bd27d3231c30..2b1f60efd5e13 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/GetStoreIdResponseHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/GetStoreIdResponseHandler.java @@ -38,7 +38,7 @@ class GetStoreIdResponseHandler extends SimpleChannelInboundHandler { - private final CatchupClientProtocol protocol; - - public StoreCopyFinishedResponseDecoder( CatchupClientProtocol protocol ) - { - this.protocol = protocol; - } - @Override protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out ) throws Exception { - if ( protocol.isExpecting( CatchupClientProtocol.NextMessage.STORE_COPY_FINISHED ) ) - { - out.add( new StoreCopyFinishedResponse( msg.readLong() ) ); - } - else - { - out.add( Unpooled.copiedBuffer( msg ) ); - } - + out.add( new StoreCopyFinishedResponse( msg.readLong() ) ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullResponseDecoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullResponseDecoder.java index e0859709ec086..2002d938d1b51 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullResponseDecoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullResponseDecoder.java @@ -20,56 +20,38 @@ package org.neo4j.coreedge.catchup.tx; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import java.util.List; -import org.neo4j.coreedge.catchup.CatchupClientProtocol; +import org.neo4j.coreedge.identity.StoreId; import org.neo4j.coreedge.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.coreedge.messaging.marsalling.storeid.StoreIdMarshal; -import org.neo4j.coreedge.identity.StoreId; import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageCommandReaderFactory; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionCursor; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader; -import static org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage; - public class TxPullResponseDecoder extends MessageToMessageDecoder { - private final CatchupClientProtocol protocol; - - public TxPullResponseDecoder( CatchupClientProtocol protocol ) - { - this.protocol = protocol; - } - @Override protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out ) throws Exception { - if ( protocol.isExpecting( NextMessage.TX_PULL_RESPONSE ) ) - { - NetworkReadableClosableChannelNetty4 logChannel = new NetworkReadableClosableChannelNetty4( msg ); - StoreId storeId = StoreIdMarshal.unmarshal( logChannel ); - LogEntryReader reader = new VersionAwareLogEntryReader<>( - new RecordStorageCommandReaderFactory() ); - PhysicalTransactionCursor transactionCursor = - new PhysicalTransactionCursor<>( logChannel, reader ); + NetworkReadableClosableChannelNetty4 logChannel = new NetworkReadableClosableChannelNetty4( msg ); + StoreId storeId = StoreIdMarshal.unmarshal( logChannel ); + LogEntryReader reader = + new VersionAwareLogEntryReader<>( new RecordStorageCommandReaderFactory() ); + PhysicalTransactionCursor transactionCursor = + new PhysicalTransactionCursor<>( logChannel, reader ); - transactionCursor.next(); - CommittedTransactionRepresentation tx = transactionCursor.get(); + transactionCursor.next(); + CommittedTransactionRepresentation tx = transactionCursor.get(); - if ( tx != null ) - { - out.add( new TxPullResponse( storeId, tx ) ); - } - } - else + if ( tx != null ) { - out.add( Unpooled.copiedBuffer( msg ) ); + out.add( new TxPullResponse( storeId, tx ) ); } } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullResponseHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullResponseHandler.java index 0d549805c60cb..feb4ecdcdc4d8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullResponseHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullResponseHandler.java @@ -39,7 +39,7 @@ public TxPullResponseHandler( CatchupClientProtocol protocol, @Override protected void channelRead0( ChannelHandlerContext ctx, final TxPullResponse msg ) throws Exception { - if ( protocol.isExpecting( CatchupClientProtocol.NextMessage.TX_PULL_RESPONSE ) ) + if ( CatchupClientProtocol.NextMessage.TX_PULL_RESPONSE.equals( protocol.expecting() ) ) { listener.onTxReceived( msg ); protocol.expect( CatchupClientProtocol.NextMessage.MESSAGE_TYPE ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseDecoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseDecoder.java index 3ff0a9c941bfd..737fe803a14de 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseDecoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseDecoder.java @@ -19,38 +19,19 @@ */ package org.neo4j.coreedge.catchup.tx; -import java.util.List; - import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; -import org.neo4j.coreedge.catchup.CatchupClientProtocol; - -import static org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage; +import java.util.List; public class TxStreamFinishedResponseDecoder extends MessageToMessageDecoder { - private final CatchupClientProtocol protocol; - - public TxStreamFinishedResponseDecoder( CatchupClientProtocol protocol ) - { - this.protocol = protocol; - } - @Override protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out ) throws Exception { - if ( protocol.isExpecting( NextMessage.TX_STREAM_FINISHED ) ) - { - long lastTransactionIdSent = msg.readLong(); - boolean success = msg.readBoolean(); - out.add( new TxStreamFinishedResponse( lastTransactionIdSent, success ) ); - } - else - { - out.add( Unpooled.copiedBuffer( msg ) ); - } + long lastTransactionIdSent = msg.readLong(); + boolean success = msg.readBoolean(); + out.add( new TxStreamFinishedResponse( lastTransactionIdSent, success ) ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreSnapshotDecoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreSnapshotDecoder.java index 38f35bcea4651..cee1aae639e3f 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreSnapshotDecoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreSnapshotDecoder.java @@ -19,37 +19,19 @@ */ package org.neo4j.coreedge.core.state.snapshot; -import java.util.List; - import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; -import org.neo4j.coreedge.catchup.CatchupClientProtocol; -import org.neo4j.coreedge.messaging.NetworkReadableClosableChannelNetty4; +import java.util.List; -import static org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage; +import org.neo4j.coreedge.messaging.NetworkReadableClosableChannelNetty4; public class CoreSnapshotDecoder extends MessageToMessageDecoder { - private final CatchupClientProtocol protocol; - - public CoreSnapshotDecoder( CatchupClientProtocol protocol ) - { - this.protocol = protocol; - } - @Override protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out ) throws Exception { - if ( protocol.isExpecting( NextMessage.CORE_SNAPSHOT ) ) - { - out.add( new CoreSnapshot.Marshal().unmarshal( new NetworkReadableClosableChannelNetty4( msg ) ) ); - } - else - { - out.add( Unpooled.copiedBuffer( msg ) ); - } + out.add( new CoreSnapshot.Marshal().unmarshal( new NetworkReadableClosableChannelNetty4( msg ) ) ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreSnapshotResponseHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreSnapshotResponseHandler.java index 0fc9cded46da2..ca13ced315fb7 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreSnapshotResponseHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreSnapshotResponseHandler.java @@ -38,7 +38,7 @@ public CoreSnapshotResponseHandler( CatchupClientProtocol protocol, CoreSnapshot @Override protected void channelRead0( ChannelHandlerContext ctx, final CoreSnapshot coreSnapshot ) throws Exception { - if ( protocol.isExpecting( CatchupClientProtocol.NextMessage.CORE_SNAPSHOT ) ) + if ( CatchupClientProtocol.NextMessage.CORE_SNAPSHOT.equals( protocol.expecting() ) ) { listener.onSnapshotReceived( coreSnapshot ); protocol.expect( CatchupClientProtocol.NextMessage.MESSAGE_TYPE ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/RequestDecoderDispatcherTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/RequestDecoderDispatcherTest.java index 5f2fb45455c2a..7e5133e36f9da 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/RequestDecoderDispatcherTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/RequestDecoderDispatcherTest.java @@ -28,22 +28,41 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.neo4j.coreedge.catchup.CatchupServerProtocol.NextMessage.TX_PULL; import static org.neo4j.logging.AssertableLogProvider.inLog; public class RequestDecoderDispatcherTest { - private final CatchupServerProtocol protocol = new CatchupServerProtocol(); + private final Protocol protocol = new Protocol() + { + private Type next; + + @Override + public void expect( Type next ) + { + this.next = next; + } + + @Override + public Type expecting() + { + return next; + } + }; private final AssertableLogProvider logProvider = new AssertableLogProvider(); + private enum Type + { + type + } + @Test public void shouldDispatchToRegisteredDecoder() throws Exception { // given - RequestDecoderDispatcher dispatcher = new RequestDecoderDispatcher( protocol, logProvider ); - protocol.expect( TX_PULL ); + RequestDecoderDispatcher dispatcher = new RequestDecoderDispatcher<>( protocol, logProvider ); + protocol.expect( Type.type ); ChannelInboundHandler delegate = mock( ChannelInboundHandler.class ); - dispatcher.register( TX_PULL, delegate ); + dispatcher.register( Type.type, delegate ); ChannelHandlerContext ctx = mock( ChannelHandlerContext.class ); Object msg = new Object(); @@ -60,15 +79,15 @@ public void shouldDispatchToRegisteredDecoder() throws Exception public void shouldLogAWarningIfThereIsNoDecoderForTheMessageType() throws Exception { // given - RequestDecoderDispatcher dispatcher = new RequestDecoderDispatcher( protocol, logProvider ); - protocol.expect( TX_PULL ); + RequestDecoderDispatcher dispatcher = new RequestDecoderDispatcher<>( protocol, logProvider ); + protocol.expect( Type.type ); // when dispatcher.channelRead( mock( ChannelHandlerContext.class ), new Object() ); // then AssertableLogProvider.LogMatcher matcher = - inLog( RequestDecoderDispatcher.class ).warn( "Unknown message %s", TX_PULL ); + inLog( RequestDecoderDispatcher.class ).warn( "Unknown message %s", Type.type ); logProvider.assertExactly( matcher ); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyFinishedResponseEncodeDecodeTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyFinishedResponseEncodeDecodeTest.java index d88c228ba58d1..087ba0e825822 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyFinishedResponseEncodeDecodeTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyFinishedResponseEncodeDecodeTest.java @@ -22,25 +22,17 @@ import io.netty.channel.embedded.EmbeddedChannel; import org.junit.Test; -import org.neo4j.coreedge.catchup.CatchupClientProtocol; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; -import static org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage; - public class StoreCopyFinishedResponseEncodeDecodeTest { @Test public void shouldEncodeAndDecodePullRequestMessage() { - CatchupClientProtocol protocol = new CatchupClientProtocol(); - protocol.expect( NextMessage.STORE_COPY_FINISHED ); - - EmbeddedChannel channel = new EmbeddedChannel( new StoreCopyFinishedResponseEncoder(), - new StoreCopyFinishedResponseDecoder( protocol ) ); - // given + EmbeddedChannel channel = + new EmbeddedChannel( new StoreCopyFinishedResponseEncoder(), new StoreCopyFinishedResponseDecoder() ); final long arbitraryId = 23; StoreCopyFinishedResponse sent = new StoreCopyFinishedResponse( arbitraryId ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPullResponseEncodeDecodeTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPullResponseEncodeDecodeTest.java index 30c972567d2e3..685ff85f0c268 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPullResponseEncodeDecodeTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPullResponseEncodeDecodeTest.java @@ -22,7 +22,6 @@ import io.netty.channel.embedded.EmbeddedChannel; import org.junit.Test; -import org.neo4j.coreedge.catchup.CatchupClientProtocol; import org.neo4j.coreedge.identity.StoreId; import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; @@ -36,21 +35,14 @@ import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; -import static org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage; public class TxPullResponseEncodeDecodeTest { @Test public void shouldEncodeAndDecodePullResponseMessage() { - CatchupClientProtocol protocol = new CatchupClientProtocol(); - protocol.expect( NextMessage.TX_PULL_RESPONSE ); - - EmbeddedChannel channel = new EmbeddedChannel( - new TxPullResponseEncoder(), - new TxPullResponseDecoder( protocol ) ); - // given + EmbeddedChannel channel = new EmbeddedChannel( new TxPullResponseEncoder(), new TxPullResponseDecoder() ); TxPullResponse sent = new TxPullResponse( new StoreId( 1, 2, 3, 4 ), newCommittedTransactionRepresentation() ); // when diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseEncodeDecodeTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseEncodeDecodeTest.java index daa20fed19bf3..df35094790b6e 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseEncodeDecodeTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseEncodeDecodeTest.java @@ -22,25 +22,17 @@ import io.netty.channel.embedded.EmbeddedChannel; import org.junit.Test; -import org.neo4j.coreedge.catchup.CatchupClientProtocol; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; -import static org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage; - public class TxStreamFinishedResponseEncodeDecodeTest { @Test public void shouldEncodeAndDecodePullRequestMessage() { - CatchupClientProtocol protocol = new CatchupClientProtocol(); - protocol.expect( NextMessage.TX_STREAM_FINISHED ); - - EmbeddedChannel channel = new EmbeddedChannel( new TxStreamFinishedResponseEncoder(), - new TxStreamFinishedResponseDecoder( protocol ) ); - // given + EmbeddedChannel channel = + new EmbeddedChannel( new TxStreamFinishedResponseEncoder(), new TxStreamFinishedResponseDecoder() ); final long arbitraryId = 23; TxStreamFinishedResponse sent = new TxStreamFinishedResponse( arbitraryId, true ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java index bfd382d8bf88d..f57c5779469e8 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java @@ -43,6 +43,7 @@ import static org.junit.Assert.assertEquals; import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.raft_log_pruning_frequency; import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.raft_log_pruning_strategy; +import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.raft_log_rotation_size; import static org.neo4j.coreedge.discovery.Cluster.dataMatchesEventually; import static org.neo4j.coreedge.scenarios.SampleData.createData; import static org.neo4j.helpers.collection.MapUtil.stringMap;