Skip to content

Commit

Permalink
[REM3-343] At RemoteConnection.RemoteWriteListener, only shutdown wri…
Browse files Browse the repository at this point in the history
…tes if flush task is not scheduled
  • Loading branch information
fl4via committed Aug 12, 2019
1 parent 40a5b42 commit 01c58f8
Showing 1 changed file with 25 additions and 4 deletions.
29 changes: 25 additions & 4 deletions src/main/java/org/jboss/remoting3/remote/RemoteConnection.java
Expand Up @@ -245,6 +245,7 @@ final class RemoteWriteListener implements ChannelListener<ConduitStreamSinkChan
private final Queue<Pooled<ByteBuffer>> queue = new ArrayDeque<Pooled<ByteBuffer>>();
private volatile XnioExecutor.Key heartKey;
private boolean closed;
private boolean flushing;
private ByteBuffer headerBuffer = ByteBuffer.allocateDirect(4);
private final ByteBuffer[] cachedArray = new ByteBuffer[] { headerBuffer, null };
private volatile long expireTime = -1;
Expand Down Expand Up @@ -337,12 +338,21 @@ public void shutdownWrites() {
synchronized (queue) {
closed = true;
terminateHeartbeat();
final ConduitStreamSinkChannel sinkChannel = connection.getSinkChannel();
if (! queue.isEmpty()) {
sinkChannel.resumeWrites();
return;
}
if (!flushing) {
doShutdownWrites();
}
}
}

private void doShutdownWrites() {
synchronized (queue) {
final ConduitStreamSinkChannel sinkChannel = connection.getSinkChannel();
try {
if (! queue.isEmpty()) {
sinkChannel.resumeWrites();
return;
}
sinkChannel.shutdownWrites();
if (! sinkChannel.flush()) {
sinkChannel.resumeWrites();
Expand Down Expand Up @@ -386,6 +396,11 @@ public void send(final Pooled<ByteBuffer> pooled, final boolean close) {
//that are to be send they can all be batched into a single write, while also
//preventing a resumeWrites unless it is actually required
if (identity != null) {
synchronized (queue) {
flushing = true;
}
//do not check for already flushing... we could end upt with a race where we
// havent finished the task but have already written previous data
connection.getIoThread().execute(flushTask);
} else
// if identity is null, we are opening connection
Expand Down Expand Up @@ -437,6 +452,12 @@ public void run() {
if(!queue.isEmpty()) {
connection.getSinkChannel().resumeWrites();
}
synchronized (queue) {
if (closed) {
doShutdownWrites();
}
flushing = false;
}
}
};
}
Expand Down

0 comments on commit 01c58f8

Please sign in to comment.