diff --git a/src/main/java/org/jboss/remoting3/remote/RemoteConnection.java b/src/main/java/org/jboss/remoting3/remote/RemoteConnection.java index 7b809215d..dc3289d7c 100644 --- a/src/main/java/org/jboss/remoting3/remote/RemoteConnection.java +++ b/src/main/java/org/jboss/remoting3/remote/RemoteConnection.java @@ -245,6 +245,7 @@ final class RemoteWriteListener implements ChannelListener> queue = new ArrayDeque>(); private volatile XnioExecutor.Key heartKey; private boolean closed; + private boolean flushing; private ByteBuffer headerBuffer = ByteBuffer.allocateDirect(4); private final ByteBuffer[] cachedArray = new ByteBuffer[] { headerBuffer, null }; private volatile long expireTime = -1; @@ -337,12 +338,21 @@ public void shutdownWrites() { synchronized (queue) { closed = true; terminateHeartbeat(); + final ConduitStreamSinkChannel sinkChannel = connection.getSinkChannel(); + if (! queue.isEmpty()) { + sinkChannel.resumeWrites(); + return; + } + if (!flushing) { + doShutdownWrites(); + } + } + } + + private void doShutdownWrites() { + synchronized (queue) { final ConduitStreamSinkChannel sinkChannel = connection.getSinkChannel(); try { - if (! queue.isEmpty()) { - sinkChannel.resumeWrites(); - return; - } sinkChannel.shutdownWrites(); if (! sinkChannel.flush()) { sinkChannel.resumeWrites(); @@ -386,6 +396,11 @@ public void send(final Pooled pooled, final boolean close) { //that are to be send they can all be batched into a single write, while also //preventing a resumeWrites unless it is actually required if (identity != null) { + synchronized (queue) { + flushing = true; + } + //do not check for already flushing... we could end upt with a race where we + // havent finished the task but have already written previous data connection.getIoThread().execute(flushTask); } else // if identity is null, we are opening connection @@ -437,6 +452,12 @@ public void run() { if(!queue.isEmpty()) { connection.getSinkChannel().resumeWrites(); } + synchronized (queue) { + if (closed) { + doShutdownWrites(); + } + flushing = false; + } } }; }