diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java index 40c175f54d295..5ec943ce3b27b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.stream.Collectors; -import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.ReplicatedContentChunkEncoder; +import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunkEncoder; import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.ContentTypeEncoder; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftLogEntryTermEncoder; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/ReplicatedContentChunkEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunkEncoder.java similarity index 88% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/ReplicatedContentChunkEncoder.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunkEncoder.java index 1442ad24fd404..0f1d3eabfba56 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/ReplicatedContentChunkEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunkEncoder.java @@ -17,14 +17,12 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.messaging.marshalling.v2.encoding; +package org.neo4j.causalclustering.messaging.marshalling; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; -import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunk; - public class ReplicatedContentChunkEncoder extends MessageToByteEncoder { @Override diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SenderServiceIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SenderServiceIT.java index a28602e666756..a228b48ca3f0a 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SenderServiceIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SenderServiceIT.java @@ -30,14 +30,17 @@ import org.junit.runners.Parameterized; import java.time.Duration; +import java.util.Arrays; import java.util.Collection; import java.util.UUID; import java.util.concurrent.Semaphore; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.neo4j.causalclustering.core.consensus.RaftMessages; +import org.neo4j.causalclustering.core.consensus.membership.MemberIdSet; import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolClientInstaller; import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolServerInstaller; -import org.neo4j.causalclustering.core.consensus.membership.MemberIdSet; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.net.Server; @@ -61,7 +64,6 @@ import org.neo4j.ports.allocation.PortAuthority; import static java.util.Collections.emptyList; -import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertTrue; import static org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory.VOID_WRAPPER; @@ -72,8 +74,8 @@ public class SenderServiceIT { private final LogProvider logProvider = NullLogProvider.getInstance(); - private final ApplicationSupportedProtocols supportedApplicationProtocol = - new ApplicationSupportedProtocols( Protocol.ApplicationProtocolCategory.RAFT, emptyList() ); + private final ApplicationSupportedProtocols supportedApplicationProtocol = new ApplicationSupportedProtocols( Protocol.ApplicationProtocolCategory.RAFT, + Arrays.asList( ApplicationProtocols.RAFT_1.implementation(), ApplicationProtocols.RAFT_2.implementation() ) ); private final Collection supportedModifierProtocols = emptyList(); private final ApplicationProtocolRepository applicationProtocolRepository = @@ -84,10 +86,18 @@ public class SenderServiceIT @Parameterized.Parameter public boolean blocking; - @Parameterized.Parameters( name = "blocking={0}" ) - public static Iterable params() + @Parameterized.Parameter( 1 ) + public ApplicationProtocols clientProtocol; + + @Parameterized.Parameters( name = "blocking={0} protocol={1}" ) + public static Iterable params() { - return asSet( true, false ); + return clientRepositories().stream().flatMap( r -> Stream.of( new Object[]{true, r}, new Object[]{false, r} ) ).collect( Collectors.toList() ); + } + + private static Collection clientRepositories() + { + return Arrays.asList( ApplicationProtocols.RAFT_1, ApplicationProtocols.RAFT_2 ); } @Test @@ -133,9 +143,11 @@ private Server raftServer( ChannelInboundHandler nettyHandler, int port ) { NettyPipelineBuilderFactory pipelineFactory = new NettyPipelineBuilderFactory( VOID_WRAPPER ); - RaftProtocolServerInstaller.Factory raftProtocolServerInstaller = new RaftProtocolServerInstaller.Factory( nettyHandler, pipelineFactory, logProvider ); + RaftProtocolServerInstaller.Factory factoryV1 = new RaftProtocolServerInstaller.Factory( nettyHandler, pipelineFactory, logProvider ); + org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolServerInstaller.Factory factoryV2 = + new org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolServerInstaller.Factory( nettyHandler, pipelineFactory, logProvider ); ProtocolInstallerRepository installer = - new ProtocolInstallerRepository<>( singletonList( raftProtocolServerInstaller ), ModifierProtocolInstaller.allServerInstallers ); + new ProtocolInstallerRepository<>( Arrays.asList( factoryV1, factoryV2 ), ModifierProtocolInstaller.allServerInstallers ); HandshakeServerInitializer channelInitializer = new HandshakeServerInitializer( applicationProtocolRepository, modifierProtocolRepository, installer, pipelineFactory, logProvider ); @@ -148,12 +160,13 @@ private SenderService raftSender() { NettyPipelineBuilderFactory pipelineFactory = new NettyPipelineBuilderFactory( VOID_WRAPPER ); - RaftProtocolClientInstaller.Factory raftProtocolClientInstaller = new RaftProtocolClientInstaller.Factory( pipelineFactory, logProvider ); + RaftProtocolClientInstaller.Factory factoryV1 = new RaftProtocolClientInstaller.Factory( pipelineFactory, logProvider ); + org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolClientInstaller.Factory factoryV2 = + new org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolClientInstaller.Factory( pipelineFactory, logProvider ); ProtocolInstallerRepository protocolInstaller = - new ProtocolInstallerRepository<>( singletonList( raftProtocolClientInstaller ), ModifierProtocolInstaller.allClientInstallers ); + new ProtocolInstallerRepository<>( Arrays.asList( factoryV1, factoryV2 ), ModifierProtocolInstaller.allClientInstallers ); - HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer( - applicationProtocolRepository, + HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer( clientRepository(), modifierProtocolRepository, protocolInstaller, pipelineFactory, @@ -163,4 +176,10 @@ private SenderService raftSender() return new SenderService( channelInitializer, logProvider ); } + + private ApplicationProtocolRepository clientRepository() + { + return new ApplicationProtocolRepository( new ApplicationProtocols[]{ApplicationProtocols.RAFT_2}, + new ApplicationSupportedProtocols( Protocol.ApplicationProtocolCategory.RAFT, emptyList() ) ); + } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/protocol/handshake/NettyInstalledProtocolsIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/protocol/handshake/NettyInstalledProtocolsIT.java index 8efa64a27d8af..2facf0bfcea89 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/protocol/handshake/NettyInstalledProtocolsIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/protocol/handshake/NettyInstalledProtocolsIT.java @@ -44,6 +44,7 @@ import java.net.InetSocketAddress; import java.time.Duration; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -54,8 +55,8 @@ import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.consensus.RaftMessages; -import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolClientInstaller; -import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolServerInstaller; +import org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolClientInstaller; +import org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolServerInstaller; import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.MemberId; @@ -77,6 +78,7 @@ import static org.hamcrest.Matchers.contains; import static org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocolCategory.RAFT; import static org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocols.RAFT_1; +import static org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocols.RAFT_2; import static org.neo4j.causalclustering.protocol.Protocol.ModifierProtocolCategory.COMPRESSION; import static org.neo4j.test.assertion.Assert.assertEventually; @@ -99,10 +101,9 @@ public static Collection data() Stream> noModifierProtocols = Stream.of( Optional.empty() ); Stream> individualModifierProtocols = Stream.of( ModifierProtocols.values() ).map( Optional::of ); - // TODO permutations across differently identified modifiers, when we have some - - return Stream.concat( noModifierProtocols, individualModifierProtocols ) - .map( NettyInstalledProtocolsIT::raft1WithCompressionModifier ) + return Stream + .concat( noModifierProtocols, individualModifierProtocols ) + .flatMap( protocol -> Stream.of( raft1WithCompressionModifier( protocol ), raft2WithCompressionModifiers( protocol ) ) ) .collect( Collectors.toList() ); } @@ -110,8 +111,15 @@ public static Collection data() private static Parameters raft1WithCompressionModifier( Optional protocol ) { List versions = Streams.ofOptional( protocol ).map( Protocol::implementation ).collect( Collectors.toList() ); - return new Parameters( "Raft 1, modifiers: " + protocol, - new ApplicationSupportedProtocols( RAFT, singletonList( RAFT_1.implementation() ) ), + return new Parameters( "Raft 1, modifiers: " + protocol, new ApplicationSupportedProtocols( RAFT, singletonList( RAFT_1.implementation() ) ), + singletonList( new ModifierSupportedProtocols( COMPRESSION, versions ) ) ); + } + + @SuppressWarnings( "OptionalUsedAsFieldOrParameterType" ) + private static Parameters raft2WithCompressionModifiers( Optional protocol ) + { + List versions = Streams.ofOptional( protocol ).map( Protocol::implementation ).collect( Collectors.toList() ); + return new Parameters( "Raft 2, modifiers: " + protocol, new ApplicationSupportedProtocols( RAFT, singletonList( RAFT_2.implementation() ) ), singletonList( new ModifierSupportedProtocols( COMPRESSION, versions ) ) ); } @@ -208,10 +216,13 @@ protected void channelRead0( ChannelHandlerContext ctx, Object msg ) void start( final ApplicationProtocolRepository applicationProtocolRepository, final ModifierProtocolRepository modifierProtocolRepository ) { - RaftProtocolServerInstaller.Factory raftFactory = + RaftProtocolServerInstaller.Factory raftFactoryV2 = new RaftProtocolServerInstaller.Factory( nettyHandler, pipelineBuilderFactory, logProvider ); + org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolServerInstaller.Factory raftFactoryV1 = + new org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolServerInstaller.Factory( nettyHandler, pipelineBuilderFactory, + logProvider ); ProtocolInstallerRepository protocolInstallerRepository = - new ProtocolInstallerRepository<>( singletonList( raftFactory ), ModifierProtocolInstaller.allServerInstallers ); + new ProtocolInstallerRepository<>( Arrays.asList( raftFactoryV1, raftFactoryV2 ), ModifierProtocolInstaller.allServerInstallers ); eventLoopGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap().group( eventLoopGroup ) @@ -251,9 +262,11 @@ static class Client Client( ApplicationProtocolRepository applicationProtocolRepository, ModifierProtocolRepository modifierProtocolRepository, NettyPipelineBuilderFactory pipelineBuilderFactory, Config config ) { - RaftProtocolClientInstaller.Factory raftFactory = new RaftProtocolClientInstaller.Factory( pipelineBuilderFactory, logProvider ); + RaftProtocolClientInstaller.Factory raftFactoryV2 = new RaftProtocolClientInstaller.Factory( pipelineBuilderFactory, logProvider ); + org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolClientInstaller.Factory raftFactoryV1 = + new org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolClientInstaller.Factory( pipelineBuilderFactory, logProvider ); ProtocolInstallerRepository protocolInstallerRepository = - new ProtocolInstallerRepository<>( singletonList( raftFactory ), ModifierProtocolInstaller.allClientInstallers ); + new ProtocolInstallerRepository<>( Arrays.asList( raftFactoryV1, raftFactoryV2 ), ModifierProtocolInstaller.allClientInstallers ); eventLoopGroup = new NioEventLoopGroup(); Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout ); handshakeClientInitializer = new HandshakeClientInitializer( applicationProtocolRepository, modifierProtocolRepository,