Skip to content

Commit

Permalink
Netty: Make sure channel closing never happens on i/o thread
Browse files Browse the repository at this point in the history
Similar to NettyTransport.doStop() all actions which disconnect
from a node (and thus call awaitUnterruptibly) should not be executed
on the I/O thread.

This patch ensures that all disconnects happen in the generic threadpool, trying to avoid unnecessary `disconnectFromNode` calls.

Also added a missing return statement in case the component was not yet
started when catching an exception on the netty layer.

Closes elastic#7726
  • Loading branch information
spinscale authored and javanna committed Sep 15, 2014
1 parent 407b120 commit ce2dcb5
Showing 1 changed file with 24 additions and 9 deletions.
Expand Up @@ -470,6 +470,7 @@ public BoundTransportAddress boundAddress() {
void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
if (!lifecycle.started()) {
// ignore
return;
}
if (isCloseConnectionException(e.getCause())) {
logger.trace("close connection exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(), ctx.getChannel());
Expand Down Expand Up @@ -794,14 +795,20 @@ private boolean disconnectFromNode(DiscoveryNode node, Channel channel, String r
/**
* Disconnects from a node if a channel is found as part of that nodes channels.
*/
private void disconnectFromNodeChannel(Channel channel, Throwable failure) {
for (DiscoveryNode node : connectedNodes.keySet()) {
if (disconnectFromNode(node, channel, ExceptionsHelper.detailedMessage(failure))) {
// if we managed to find this channel and disconnect from it, then break, no need to check on
// the rest of the nodes
break;
private void disconnectFromNodeChannel(final Channel channel, final Throwable failure) {
threadPool().generic().execute(new Runnable() {

@Override
public void run() {
for (DiscoveryNode node : connectedNodes.keySet()) {
if (disconnectFromNode(node, channel, ExceptionsHelper.detailedMessage(failure))) {
// if we managed to find this channel and disconnect from it, then break, no need to check on
// the rest of the nodes
break;
}
}
}
}
});
}

private Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException {
Expand Down Expand Up @@ -885,8 +892,16 @@ private ChannelCloseListener(DiscoveryNode node) {
}

@Override
public void operationComplete(ChannelFuture future) throws Exception {
disconnectFromNode(node, future.getChannel(), "channel closed event");
public void operationComplete(final ChannelFuture future) throws Exception {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(future.getChannel())) {
threadPool().generic().execute(new Runnable() {
@Override
public void run() {
disconnectFromNode(node, future.getChannel(), "channel closed event");
}
});
}
}
}

Expand Down

0 comments on commit ce2dcb5

Please sign in to comment.