Skip to content

Commit

Permalink
Harden NettyInstalledProtocolsIT
Browse files Browse the repository at this point in the history
* Stricter semantics using sync*() which will rethrow exceptions.
* Fix concurreny issue by using concurrent list.
* Get rid of minor code warnings.
  • Loading branch information
martinfurmanski committed Mar 26, 2018
1 parent 01d5dc7 commit 9c99b39
Showing 1 changed file with 16 additions and 16 deletions.
Expand Up @@ -41,11 +41,11 @@


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
Expand All @@ -66,10 +66,10 @@
import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository; import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.NullLogProvider; import org.neo4j.logging.NullLogProvider;
import org.neo4j.ports.allocation.PortAuthority;
import org.neo4j.stream.Streams; import org.neo4j.stream.Streams;
import org.neo4j.test.assertion.Assert; import org.neo4j.test.assertion.Assert;


import static java.util.Arrays.asList;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.contains;
Expand All @@ -87,7 +87,7 @@ public NettyInstalledProtocolsIT( Parameters parameters )
this.parameters = parameters; this.parameters = parameters;
} }


private int timeoutSeconds = 10; private static final int TIMEOUT_SECONDS = 10;


@Parameterized.Parameters( name = "{0}" ) @Parameterized.Parameters( name = "{0}" )
public static Collection<Parameters> data() public static Collection<Parameters> data()
Expand All @@ -102,12 +102,13 @@ public static Collection<Parameters> data()
.collect( Collectors.toList() ); .collect( Collectors.toList() );
} }


@SuppressWarnings( "OptionalUsedAsFieldOrParameterType" )
private static Parameters raft1WithCompressionModifier( Optional<ModifierProtocol> protocol ) private static Parameters raft1WithCompressionModifier( Optional<ModifierProtocol> protocol )
{ {
List<String> versions = Streams.ofOptional( protocol ).map( Protocol::implementation ).collect( Collectors.toList() ); List<String> versions = Streams.ofOptional( protocol ).map( Protocol::implementation ).collect( Collectors.toList() );
return new Parameters( "Raft 1, modifiers: " + protocol, return new Parameters( "Raft 1, modifiers: " + protocol,
new ApplicationSupportedProtocols( RAFT, asList( RAFT_1.implementation() ) ), new ApplicationSupportedProtocols( RAFT, singletonList( RAFT_1.implementation() ) ),
asList( new ModifierSupportedProtocols( COMPRESSION, versions ) ) ); singletonList( new ModifierSupportedProtocols( COMPRESSION, versions ) ) );
} }


@Test @Test
Expand All @@ -119,13 +120,13 @@ public void shouldSuccessfullySendAndReceiveAMessage() throws Throwable
RaftMessages.ClusterIdAwareMessage.of( new ClusterId( UUID.randomUUID() ), raftMessage ); RaftMessages.ClusterIdAwareMessage.of( new ClusterId( UUID.randomUUID() ), raftMessage );


// when // when
client.send( networkMessage ); client.send( networkMessage ).syncUninterruptibly();


// then // then
Assert.assertEventually( Assert.assertEventually(
messages -> String.format( "Received messages %s should contain message decorating %s", messages, raftMessage ), messages -> String.format( "Received messages %s should contain message decorating %s", messages, raftMessage ),
() -> server.received(), () -> server.received(),
contains( messageMatches( networkMessage ) ), timeoutSeconds, TimeUnit.SECONDS ); contains( messageMatches( networkMessage ) ), TIMEOUT_SECONDS, TimeUnit.SECONDS );
} }


private Server server; private Server server;
Expand All @@ -145,7 +146,7 @@ public void setUp()
server = new Server( serverPipelineBuilderFactory ); server = new Server( serverPipelineBuilderFactory );
server.start( applicationProtocolRepository, modifierProtocolRepository ); server.start( applicationProtocolRepository, modifierProtocolRepository );


Config config = Config.builder().withSetting( CausalClusteringSettings.handshake_timeout, timeoutSeconds + "s" ).build(); Config config = Config.builder().withSetting( CausalClusteringSettings.handshake_timeout, TIMEOUT_SECONDS + "s" ).build();


client = new Client( applicationProtocolRepository, modifierProtocolRepository, clientPipelineBuilderFactory, config ); client = new Client( applicationProtocolRepository, modifierProtocolRepository, clientPipelineBuilderFactory, config );


Expand Down Expand Up @@ -184,7 +185,7 @@ static class Server
{ {
private Channel channel; private Channel channel;
private NioEventLoopGroup eventLoopGroup; private NioEventLoopGroup eventLoopGroup;
private final List<Object> received = new ArrayList<>(); private final List<Object> received = new CopyOnWriteArrayList<>();
private NettyPipelineBuilderFactory pipelineBuilderFactory; private NettyPipelineBuilderFactory pipelineBuilderFactory;


ChannelInboundHandler nettyHandler = new SimpleChannelInboundHandler<Object>() ChannelInboundHandler nettyHandler = new SimpleChannelInboundHandler<Object>()
Expand Down Expand Up @@ -214,7 +215,7 @@ void start( final ApplicationProtocolRepository applicationProtocolRepository, f
ServerBootstrap bootstrap = new ServerBootstrap().group( eventLoopGroup ) ServerBootstrap bootstrap = new ServerBootstrap().group( eventLoopGroup )
.channel( NioServerSocketChannel.class ) .channel( NioServerSocketChannel.class )
.option( ChannelOption.SO_REUSEADDR, true ) .option( ChannelOption.SO_REUSEADDR, true )
.localAddress( 0 ) .localAddress( PortAuthority.allocatePort() )
.childHandler( new HandshakeServerInitializer( applicationProtocolRepository, modifierProtocolRepository, .childHandler( new HandshakeServerInitializer( applicationProtocolRepository, modifierProtocolRepository,
protocolInstallerRepository, pipelineBuilderFactory, log ).asChannelInitializer() ); protocolInstallerRepository, pipelineBuilderFactory, log ).asChannelInitializer() );


Expand All @@ -223,9 +224,8 @@ void start( final ApplicationProtocolRepository applicationProtocolRepository, f


void stop() void stop()
{ {
channel.close().awaitUninterruptibly(); channel.close().syncUninterruptibly();
channel = null; eventLoopGroup.shutdownGracefully( 0, TIMEOUT_SECONDS, SECONDS );
eventLoopGroup.shutdownGracefully( 0, 0, SECONDS );
} }


int port() int port()
Expand Down Expand Up @@ -263,16 +263,16 @@ static class Client
@SuppressWarnings( "SameParameterValue" ) @SuppressWarnings( "SameParameterValue" )
void connect( int port ) void connect( int port )
{ {
ChannelFuture channelFuture = bootstrap.connect( "localhost", port ).awaitUninterruptibly(); ChannelFuture channelFuture = bootstrap.connect( "localhost", port ).syncUninterruptibly();
channel = channelFuture.channel(); channel = channelFuture.channel();
} }


void disconnect() void disconnect()
{ {
if ( channel != null ) if ( channel != null )
{ {
channel.close().awaitUninterruptibly(); channel.close().syncUninterruptibly();
eventLoopGroup.shutdownGracefully( 0, 0, SECONDS ); eventLoopGroup.shutdownGracefully( 0, TIMEOUT_SECONDS, SECONDS ).syncUninterruptibly();
} }
} }


Expand Down

0 comments on commit 9c99b39

Please sign in to comment.