Skip to content

Commit

Permalink
Fix a bug in the nio transport where the sender stop wasn't executing…
Browse files Browse the repository at this point in the history
… in the sender thread correctly.
  • Loading branch information
Jim Carroll committed May 12, 2017
1 parent 458a027 commit f00d17f
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 12 deletions.
Expand Up @@ -67,7 +67,6 @@ public void send(final Object message) throws MessageTransportException {
@Override
public synchronized void stop() {
running = false;
owner.idleSenders.remove(this);
dontInterrupt(() -> Thread.sleep(1));

final List<Object> drainTo = new ArrayList<>();
Expand All @@ -88,7 +87,7 @@ public synchronized void stop() {
else if ((System.currentTimeMillis() - startTime) > 500)
break;
else
Thread.yield();
dontInterrupt(() -> Thread.sleep(1));
}

// if X seconds have passed let's just close it from this side and move on.
Expand All @@ -101,6 +100,7 @@ else if ((System.currentTimeMillis() - startTime) > 500)
}

drainTo.forEach(o -> statsCollector.messageNotSent());
owner.idleSenders.remove(this);
owner.imDone(addr);
}

Expand Down
Expand Up @@ -11,6 +11,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -26,7 +27,6 @@
import net.dempsy.transport.NodeAddress;
import net.dempsy.transport.SenderFactory;
import net.dempsy.transport.tcp.TcpAddress;
import net.dempsy.util.StupidHashMap;

public class NioSenderFactory implements SenderFactory {
public final static Logger LOGGER = LoggerFactory.getLogger(NioSenderFactory.class);
Expand All @@ -48,7 +48,7 @@ public class NioSenderFactory implements SenderFactory {

private final ConcurrentHashMap<TcpAddress, NioSender> senders = new ConcurrentHashMap<>();

final StupidHashMap<NioSender, NioSender> idleSenders = new StupidHashMap<>();
final ConcurrentHashMap<NioSender, NioSender> idleSenders = new ConcurrentHashMap<>();

// =======================================
// Read from NioSender
Expand Down Expand Up @@ -161,10 +161,10 @@ public static class Sending implements Runnable {
final AtomicBoolean isRunning;
final Selector selector;
final String nodeId;
final StupidHashMap<NioSender, NioSender> idleSenders;
final Map<NioSender, NioSender> idleSenders;
final NodeStatsCollector statsCollector;

Sending(final AtomicBoolean isRunning, final String nodeId, final StupidHashMap<NioSender, NioSender> idleSenders,
Sending(final AtomicBoolean isRunning, final String nodeId, final Map<NioSender, NioSender> idleSenders,
final NodeStatsCollector statsCollector) throws MessageTransportException {
this.isRunning = isRunning;
this.nodeId = nodeId;
Expand All @@ -190,6 +190,8 @@ public void run() {
// =====================================================================
// nothing ready ... might as well spend some time serializing messages
final Set<SelectionKey> keys = selector.keys();
// keys are removed when there's nothing to write to them. When there's no writing to do
// but there's data queued up to be written we can move to start serializing
if (keys != null && keys.size() > 0) {
numNothing = 0; // reset the yield count since we have something to do
final SenderHolder thisOneCanSerialize = keys.stream()
Expand All @@ -200,10 +202,19 @@ public void run() {
.orElse(null);
if (thisOneCanSerialize != null)
thisOneCanSerialize.trySerialize();
else { // see if we need to stop
final SelectionKey thisOneCanClose = keys.stream()
.filter(k -> ((SenderHolder) k.attachment()).shouldClose())
.findAny().orElse(null);
if (thisOneCanClose != null)
((SenderHolder) thisOneCanClose.attachment()).close(thisOneCanClose);
}
}
// =====================================================================
// otherwise there's no data to be written and (last we knew) no data
// to be serialized which results in (eventually) all keys being removed.
else { // nothing to serialize, do we have any new senders that need handling?
if (!checkForNewSenders()) { // if we didn't do anything then sleep/yield based on how long we've been bord.
if (!checkForNewSenders()) { // if we didn't do anything then sleep/yield based on how long we've been bored.
numNothing++;
if (numNothing > 1000) {
dontInterrupt(() -> Thread.sleep(1), ie -> {
Expand All @@ -230,9 +241,18 @@ public void run() {

if (key.isWritable()) {
final SenderHolder sh = (SenderHolder) key.attachment();
// write something and return whether or not we're done.
if (sh.writeSomethingReturnDone(key, statsCollector)) {
idleSenders.putIfAbsent(sh.sender, sh.sender);
key.cancel();
// if we're done, does that mean that we should be closing the connection?
if (sh.shouldClose()) {
if (!sh.close(key)) { // this should close the socket. If that works then this will also cancel the key.
idleSenders.putIfAbsent(sh.sender, sh.sender); // otherwise, drop it back on idleSenders so we can try the cancel again later.
key.cancel();
}
} else {
idleSenders.putIfAbsent(sh.sender, sh.sender);
key.cancel();
}
}
}
}
Expand Down
Expand Up @@ -77,6 +77,15 @@ private static ByteBuffer[] removeFirst(final ByteBuffer[] src, final boolean an
return tmp.toArray(new ByteBuffer[tmp.size()]);
}

public boolean close(final SelectionKey key) {
final SocketChannel channel = (SocketChannel) key.channel();
if (closeQuietly(channel, LOGGER, sender.nodeId + " failed to close previous channel to " + sender.addr)) {
// if we closed it then unregister and move on.
key.cancel();
return true;
} else return false;
}

public boolean writeSomethingReturnDone(final SelectionKey key, final NodeStatsCollector statsCollector) throws IOException {
prepareToWriteBestEffort();

Expand Down Expand Up @@ -145,8 +154,13 @@ public boolean writeSomethingReturnDone(final SelectionKey key, final NodeStatsC
statsCollector.messageSent(null);

return !(readyToWrite(false) || readyToSerialize());
} else
return sender.messages.peek() == null;
} else {
final Object peek = sender.messages.peek();
if (peek != null)
return (peek instanceof StopMessage); // we're "done" if the next message is a StopMessage.
else
return true; // we're done if there's no message left.
}

}

Expand Down
Expand Up @@ -22,12 +22,14 @@ public static ReturnableBufferOutput get() {
return ret;
}

public static void closeQuietly(final AutoCloseable ac, final Logger LOGGER, final String failedMessage) {
public static boolean closeQuietly(final AutoCloseable ac, final Logger LOGGER, final String failedMessage) {
try {
ac.close();
return true;
} catch (final Exception e) {
LOGGER.warn(failedMessage, e);
}
return false;
}

public static void dontInterrupt(final RunnableThrows<InterruptedException> runner) {
Expand Down

0 comments on commit f00d17f

Please sign in to comment.