Skip to content

Commit

Permalink
Adapt tests to use multile protcol versions
Browse files Browse the repository at this point in the history
  • Loading branch information
RagnarW authored and martinfurmanski committed Jun 11, 2018
1 parent 6f08d1e commit f3298e6
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 29 deletions.
Expand Up @@ -26,7 +26,7 @@
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.ReplicatedContentChunkEncoder; import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunkEncoder;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer; import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.ContentTypeEncoder; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.ContentTypeEncoder;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftLogEntryTermEncoder; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftLogEntryTermEncoder;
Expand Down
Expand Up @@ -17,14 +17,12 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
package org.neo4j.causalclustering.messaging.marshalling.v2.encoding; package org.neo4j.causalclustering.messaging.marshalling;


import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;


import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunk;

public class ReplicatedContentChunkEncoder extends MessageToByteEncoder<ReplicatedContentChunk> public class ReplicatedContentChunkEncoder extends MessageToByteEncoder<ReplicatedContentChunk>
{ {
@Override @Override
Expand Down
Expand Up @@ -30,14 +30,17 @@
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;


import java.time.Duration; import java.time.Duration;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import java.util.stream.Stream;


import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.membership.MemberIdSet;
import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolClientInstaller; import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolClientInstaller;
import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolServerInstaller; import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolServerInstaller;
import org.neo4j.causalclustering.core.consensus.membership.MemberIdSet;
import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.net.Server; import org.neo4j.causalclustering.net.Server;
Expand All @@ -61,7 +64,6 @@
import org.neo4j.ports.allocation.PortAuthority; import org.neo4j.ports.allocation.PortAuthority;


import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory.VOID_WRAPPER; import static org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory.VOID_WRAPPER;
Expand All @@ -72,8 +74,8 @@ public class SenderServiceIT
{ {
private final LogProvider logProvider = NullLogProvider.getInstance(); private final LogProvider logProvider = NullLogProvider.getInstance();


private final ApplicationSupportedProtocols supportedApplicationProtocol = private final ApplicationSupportedProtocols supportedApplicationProtocol = new ApplicationSupportedProtocols( Protocol.ApplicationProtocolCategory.RAFT,
new ApplicationSupportedProtocols( Protocol.ApplicationProtocolCategory.RAFT, emptyList() ); Arrays.asList( ApplicationProtocols.RAFT_1.implementation(), ApplicationProtocols.RAFT_2.implementation() ) );
private final Collection<ModifierSupportedProtocols> supportedModifierProtocols = emptyList(); private final Collection<ModifierSupportedProtocols> supportedModifierProtocols = emptyList();


private final ApplicationProtocolRepository applicationProtocolRepository = private final ApplicationProtocolRepository applicationProtocolRepository =
Expand All @@ -84,10 +86,18 @@ public class SenderServiceIT
@Parameterized.Parameter @Parameterized.Parameter
public boolean blocking; public boolean blocking;


@Parameterized.Parameters( name = "blocking={0}" ) @Parameterized.Parameter( 1 )
public static Iterable<Boolean> params() public ApplicationProtocols clientProtocol;

@Parameterized.Parameters( name = "blocking={0} protocol={1}" )
public static Iterable<Object[]> params()
{ {
return asSet( true, false ); return clientRepositories().stream().flatMap( r -> Stream.of( new Object[]{true, r}, new Object[]{false, r} ) ).collect( Collectors.toList() );
}

private static Collection<ApplicationProtocols> clientRepositories()
{
return Arrays.asList( ApplicationProtocols.RAFT_1, ApplicationProtocols.RAFT_2 );
} }


@Test @Test
Expand Down Expand Up @@ -133,9 +143,11 @@ private Server raftServer( ChannelInboundHandler nettyHandler, int port )
{ {
NettyPipelineBuilderFactory pipelineFactory = new NettyPipelineBuilderFactory( VOID_WRAPPER ); NettyPipelineBuilderFactory pipelineFactory = new NettyPipelineBuilderFactory( VOID_WRAPPER );


RaftProtocolServerInstaller.Factory raftProtocolServerInstaller = new RaftProtocolServerInstaller.Factory( nettyHandler, pipelineFactory, logProvider ); RaftProtocolServerInstaller.Factory factoryV1 = new RaftProtocolServerInstaller.Factory( nettyHandler, pipelineFactory, logProvider );
org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolServerInstaller.Factory factoryV2 =
new org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolServerInstaller.Factory( nettyHandler, pipelineFactory, logProvider );
ProtocolInstallerRepository<ProtocolInstaller.Orientation.Server> installer = ProtocolInstallerRepository<ProtocolInstaller.Orientation.Server> installer =
new ProtocolInstallerRepository<>( singletonList( raftProtocolServerInstaller ), ModifierProtocolInstaller.allServerInstallers ); new ProtocolInstallerRepository<>( Arrays.asList( factoryV1, factoryV2 ), ModifierProtocolInstaller.allServerInstallers );


HandshakeServerInitializer channelInitializer = new HandshakeServerInitializer( applicationProtocolRepository, modifierProtocolRepository, HandshakeServerInitializer channelInitializer = new HandshakeServerInitializer( applicationProtocolRepository, modifierProtocolRepository,
installer, pipelineFactory, logProvider ); installer, pipelineFactory, logProvider );
Expand All @@ -148,12 +160,13 @@ private SenderService raftSender()
{ {
NettyPipelineBuilderFactory pipelineFactory = new NettyPipelineBuilderFactory( VOID_WRAPPER ); NettyPipelineBuilderFactory pipelineFactory = new NettyPipelineBuilderFactory( VOID_WRAPPER );


RaftProtocolClientInstaller.Factory raftProtocolClientInstaller = new RaftProtocolClientInstaller.Factory( pipelineFactory, logProvider ); RaftProtocolClientInstaller.Factory factoryV1 = new RaftProtocolClientInstaller.Factory( pipelineFactory, logProvider );
org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolClientInstaller.Factory factoryV2 =
new org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolClientInstaller.Factory( pipelineFactory, logProvider );
ProtocolInstallerRepository<ProtocolInstaller.Orientation.Client> protocolInstaller = ProtocolInstallerRepository<ProtocolInstaller.Orientation.Client> protocolInstaller =
new ProtocolInstallerRepository<>( singletonList( raftProtocolClientInstaller ), ModifierProtocolInstaller.allClientInstallers ); new ProtocolInstallerRepository<>( Arrays.asList( factoryV1, factoryV2 ), ModifierProtocolInstaller.allClientInstallers );


HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer( HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer( clientRepository(),
applicationProtocolRepository,
modifierProtocolRepository, modifierProtocolRepository,
protocolInstaller, protocolInstaller,
pipelineFactory, pipelineFactory,
Expand All @@ -163,4 +176,10 @@ private SenderService raftSender()


return new SenderService( channelInitializer, logProvider ); return new SenderService( channelInitializer, logProvider );
} }

private ApplicationProtocolRepository clientRepository()
{
return new ApplicationProtocolRepository( new ApplicationProtocols[]{ApplicationProtocols.RAFT_2},
new ApplicationSupportedProtocols( Protocol.ApplicationProtocolCategory.RAFT, emptyList() ) );
}
} }
Expand Up @@ -44,6 +44,7 @@


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.time.Duration; import java.time.Duration;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
Expand All @@ -54,8 +55,8 @@


import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolClientInstaller; import org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolClientInstaller;
import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolServerInstaller; import org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolServerInstaller;
import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory; import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
Expand All @@ -77,6 +78,7 @@
import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.contains;
import static org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocolCategory.RAFT; import static org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocolCategory.RAFT;
import static org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocols.RAFT_1; import static org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocols.RAFT_1;
import static org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocols.RAFT_2;
import static org.neo4j.causalclustering.protocol.Protocol.ModifierProtocolCategory.COMPRESSION; import static org.neo4j.causalclustering.protocol.Protocol.ModifierProtocolCategory.COMPRESSION;
import static org.neo4j.test.assertion.Assert.assertEventually; import static org.neo4j.test.assertion.Assert.assertEventually;


Expand All @@ -99,19 +101,25 @@ public static Collection<Parameters> data()
Stream<Optional<ModifierProtocol>> noModifierProtocols = Stream.of( Optional.empty() ); Stream<Optional<ModifierProtocol>> noModifierProtocols = Stream.of( Optional.empty() );
Stream<Optional<ModifierProtocol>> individualModifierProtocols = Stream.of( ModifierProtocols.values() ).map( Optional::of ); Stream<Optional<ModifierProtocol>> individualModifierProtocols = Stream.of( ModifierProtocols.values() ).map( Optional::of );


// TODO permutations across differently identified modifiers, when we have some return Stream

.concat( noModifierProtocols, individualModifierProtocols )
return Stream.concat( noModifierProtocols, individualModifierProtocols ) .flatMap( protocol -> Stream.of( raft1WithCompressionModifier( protocol ), raft2WithCompressionModifiers( protocol ) ) )
.map( NettyInstalledProtocolsIT::raft1WithCompressionModifier )
.collect( Collectors.toList() ); .collect( Collectors.toList() );
} }


@SuppressWarnings( "OptionalUsedAsFieldOrParameterType" ) @SuppressWarnings( "OptionalUsedAsFieldOrParameterType" )
private static Parameters raft1WithCompressionModifier( Optional<ModifierProtocol> protocol ) private static Parameters raft1WithCompressionModifier( Optional<ModifierProtocol> protocol )
{ {
List<String> versions = Streams.ofOptional( protocol ).map( Protocol::implementation ).collect( Collectors.toList() ); List<String> versions = Streams.ofOptional( protocol ).map( Protocol::implementation ).collect( Collectors.toList() );
return new Parameters( "Raft 1, modifiers: " + protocol, return new Parameters( "Raft 1, modifiers: " + protocol, new ApplicationSupportedProtocols( RAFT, singletonList( RAFT_1.implementation() ) ),
new ApplicationSupportedProtocols( RAFT, singletonList( RAFT_1.implementation() ) ), singletonList( new ModifierSupportedProtocols( COMPRESSION, versions ) ) );
}

@SuppressWarnings( "OptionalUsedAsFieldOrParameterType" )
private static Parameters raft2WithCompressionModifiers( Optional<ModifierProtocol> protocol )
{
List<String> versions = Streams.ofOptional( protocol ).map( Protocol::implementation ).collect( Collectors.toList() );
return new Parameters( "Raft 2, modifiers: " + protocol, new ApplicationSupportedProtocols( RAFT, singletonList( RAFT_2.implementation() ) ),
singletonList( new ModifierSupportedProtocols( COMPRESSION, versions ) ) ); singletonList( new ModifierSupportedProtocols( COMPRESSION, versions ) ) );
} }


Expand Down Expand Up @@ -208,10 +216,13 @@ protected void channelRead0( ChannelHandlerContext ctx, Object msg )


void start( final ApplicationProtocolRepository applicationProtocolRepository, final ModifierProtocolRepository modifierProtocolRepository ) void start( final ApplicationProtocolRepository applicationProtocolRepository, final ModifierProtocolRepository modifierProtocolRepository )
{ {
RaftProtocolServerInstaller.Factory raftFactory = RaftProtocolServerInstaller.Factory raftFactoryV2 =
new RaftProtocolServerInstaller.Factory( nettyHandler, pipelineBuilderFactory, logProvider ); new RaftProtocolServerInstaller.Factory( nettyHandler, pipelineBuilderFactory, logProvider );
org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolServerInstaller.Factory raftFactoryV1 =
new org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolServerInstaller.Factory( nettyHandler, pipelineBuilderFactory,
logProvider );
ProtocolInstallerRepository<ProtocolInstaller.Orientation.Server> protocolInstallerRepository = ProtocolInstallerRepository<ProtocolInstaller.Orientation.Server> protocolInstallerRepository =
new ProtocolInstallerRepository<>( singletonList( raftFactory ), ModifierProtocolInstaller.allServerInstallers ); new ProtocolInstallerRepository<>( Arrays.asList( raftFactoryV1, raftFactoryV2 ), ModifierProtocolInstaller.allServerInstallers );


eventLoopGroup = new NioEventLoopGroup(); eventLoopGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap().group( eventLoopGroup ) ServerBootstrap bootstrap = new ServerBootstrap().group( eventLoopGroup )
Expand Down Expand Up @@ -251,9 +262,11 @@ static class Client
Client( ApplicationProtocolRepository applicationProtocolRepository, ModifierProtocolRepository modifierProtocolRepository, Client( ApplicationProtocolRepository applicationProtocolRepository, ModifierProtocolRepository modifierProtocolRepository,
NettyPipelineBuilderFactory pipelineBuilderFactory, Config config ) NettyPipelineBuilderFactory pipelineBuilderFactory, Config config )
{ {
RaftProtocolClientInstaller.Factory raftFactory = new RaftProtocolClientInstaller.Factory( pipelineBuilderFactory, logProvider ); RaftProtocolClientInstaller.Factory raftFactoryV2 = new RaftProtocolClientInstaller.Factory( pipelineBuilderFactory, logProvider );
org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolClientInstaller.Factory raftFactoryV1 =
new org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolClientInstaller.Factory( pipelineBuilderFactory, logProvider );
ProtocolInstallerRepository<ProtocolInstaller.Orientation.Client> protocolInstallerRepository = ProtocolInstallerRepository<ProtocolInstaller.Orientation.Client> protocolInstallerRepository =
new ProtocolInstallerRepository<>( singletonList( raftFactory ), ModifierProtocolInstaller.allClientInstallers ); new ProtocolInstallerRepository<>( Arrays.asList( raftFactoryV1, raftFactoryV2 ), ModifierProtocolInstaller.allClientInstallers );
eventLoopGroup = new NioEventLoopGroup(); eventLoopGroup = new NioEventLoopGroup();
Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout ); Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout );
handshakeClientInitializer = new HandshakeClientInitializer( applicationProtocolRepository, modifierProtocolRepository, handshakeClientInitializer = new HandshakeClientInitializer( applicationProtocolRepository, modifierProtocolRepository,
Expand Down

0 comments on commit f3298e6

Please sign in to comment.