Skip to content

Commit

Permalink
Only add closed listener once
Browse files Browse the repository at this point in the history
Also ensure that the expected methods are closing the channels.
  • Loading branch information
RagnarW committed Apr 25, 2018
1 parent eb3ffe5 commit 2c5976e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 11 deletions.
Expand Up @@ -143,8 +143,7 @@ void send( CatchUpRequest request ) throws ConnectException
throw new ConnectException( "Channel is not connected" );
}
nettyChannel.write( request.messageType() );
nettyChannel.closeFuture().addListener( (ChannelFutureListener) future -> handler.onClose() );
nettyChannel.writeAndFlush( request ).addListener( ChannelFutureListener.CLOSE_ON_FAILURE );
nettyChannel.writeAndFlush( request );
}

Optional<Long> millisSinceLastResponse()
Expand All @@ -163,6 +162,8 @@ public void connect() throws Exception
{
ChannelFuture channelFuture = bootstrap.connect( destination.socketAddress() );
nettyChannel = channelFuture.sync().channel();
nettyChannel.closeFuture().addListener( (ChannelFutureListener) future -> handler.onClose() );

}

@Override
Expand Down
Expand Up @@ -23,17 +23,18 @@
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.nio.channels.ClosedChannelException;
import java.time.Clock;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.causalclustering.catchup.storecopy.GetStoreIdRequest;
import org.neo4j.causalclustering.net.Server;
Expand All @@ -45,6 +46,7 @@
import org.neo4j.ports.allocation.PortAuthority;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class CatchUpClientIT
Expand Down Expand Up @@ -72,9 +74,10 @@ public void shouldCloseHandlerIfChannelIsClosedInClient() throws LifecycleExcept
String hostname = "localhost";
int port = PortAuthority.allocatePort();
ListenSocketAddress listenSocketAddress = new ListenSocketAddress( hostname, port );
AtomicBoolean wasClosedByClient = new AtomicBoolean( false );

Server emptyServer = catchupServer( listenSocketAddress );
CatchUpClient closingClient = closingChannelCatchupClient();
CatchUpClient closingClient = closingChannelCatchupClient( wasClosedByClient );

lifeSupport.add( emptyServer );
lifeSupport.add( closingClient );
Expand All @@ -85,6 +88,7 @@ public void shouldCloseHandlerIfChannelIsClosedInClient() throws LifecycleExcept

// then
assertClosedChannelException( hostname, port, closingClient );
assertTrue( wasClosedByClient.get() );
}

@Test
Expand All @@ -94,9 +98,10 @@ public void shouldCloseHandlerIfChannelIsClosedOnServer()
String hostname = "localhost";
int port = PortAuthority.allocatePort();
ListenSocketAddress listenSocketAddress = new ListenSocketAddress( hostname, port );
AtomicBoolean wasClosedByServer = new AtomicBoolean( false );

Server closingChannelServer = closingChannelCatchupServer( listenSocketAddress );
CatchUpClient emptyClient = catchupClient();
Server closingChannelServer = closingChannelCatchupServer( listenSocketAddress, wasClosedByServer );
CatchUpClient emptyClient = emptyClient();

lifeSupport.add( closingChannelServer );
lifeSupport.add( emptyClient );
Expand All @@ -107,6 +112,19 @@ public void shouldCloseHandlerIfChannelIsClosedOnServer()

// then
assertClosedChannelException( hostname, port, emptyClient );
assertTrue( wasClosedByServer.get() );
}

private CatchUpClient emptyClient()
{
return catchupClient( new MessageToByteEncoder<GetStoreIdRequest>()
{
@Override
protected void encode( ChannelHandlerContext channelHandlerContext, GetStoreIdRequest getStoreIdRequest, ByteBuf byteBuf )
{
byteBuf.writeByte( (byte) 1 );
}
} );
}

private void assertClosedChannelException( String hostname, int port, CatchUpClient closingClient )
Expand All @@ -130,25 +148,27 @@ private CatchUpResponseAdaptor<Object> neverCompletingAdaptor()
return new CatchUpResponseAdaptor<>();
}

private CatchUpClient closingChannelCatchupClient()
private CatchUpClient closingChannelCatchupClient( AtomicBoolean wasClosedByClient )
{
return catchupClient( new MessageToByteEncoder()
{
@Override
protected void encode( ChannelHandlerContext ctx, Object msg, ByteBuf out )
{
wasClosedByClient.set( true );
ctx.channel().close();
}
} );
}

private Server closingChannelCatchupServer( ListenSocketAddress listenSocketAddress )
private Server closingChannelCatchupServer( ListenSocketAddress listenSocketAddress, AtomicBoolean wasClosedByServer )
{
return catchupServer( listenSocketAddress, new SimpleChannelInboundHandler<NioSocketChannel>()
return catchupServer( listenSocketAddress, new ByteToMessageDecoder()
{
@Override
protected void channelRead0( ChannelHandlerContext ctx, NioSocketChannel msg )
protected void decode( ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list )
{
wasClosedByServer.set( true );
ctx.channel().close();
}
} );
Expand Down

0 comments on commit 2c5976e

Please sign in to comment.