Skip to content

Commit

Permalink
[REM3-338] Improve RemoteConnection handling of keep alive, by creati…
Browse files Browse the repository at this point in the history
…ng the heart beat task less often

At RemoteConnection.send, replace heatKey removal by an update to expire time, and reschedule heart beat every time it runs
  • Loading branch information
fl4via committed Jul 23, 2019
1 parent b606c7c commit c8a4b7d
Showing 1 changed file with 31 additions and 5 deletions.
36 changes: 31 additions & 5 deletions src/main/java/org/jboss/remoting3/remote/RemoteConnection.java
Expand Up @@ -243,10 +243,11 @@ Connection getConnection() {
final class RemoteWriteListener implements ChannelListener<ConduitStreamSinkChannel> {

private final Queue<Pooled<ByteBuffer>> queue = new ArrayDeque<Pooled<ByteBuffer>>();
private XnioExecutor.Key heartKey;
private volatile XnioExecutor.Key heartKey;
private boolean closed;
private ByteBuffer headerBuffer = ByteBuffer.allocateDirect(4);
private final ByteBuffer[] cachedArray = new ByteBuffer[] { headerBuffer, null };
private volatile long expireTime = -1;

RemoteWriteListener() {
}
Expand Down Expand Up @@ -312,7 +313,12 @@ public void handleEvent(final ConduitStreamSinkChannel channel) {
return;
} else {
if (heartbeatInterval != 0) {
this.heartKey = channel.getWriteThread().executeAfter(heartbeatCommand, heartbeatInterval, TimeUnit.MILLISECONDS);
this.expireTime = System.currentTimeMillis() + heartbeatInterval;
if (this.heartKey == null) {
final XnioExecutor executor = channel.getWriteThread();
final Runnable heartBeat = new HeartBeat(executor);
this.heartKey = executor.executeAfter(heartBeat, heartbeatInterval, TimeUnit.MILLISECONDS);
}
}
}
channel.suspendWrites();
Expand Down Expand Up @@ -356,8 +362,7 @@ public void shutdownWrites() {
public void send(final Pooled<ByteBuffer> pooled, final boolean close) {
connection.getIoThread().execute(() -> {
synchronized (queue) {
XnioExecutor.Key heartKey1 = RemoteWriteListener.this.heartKey;
if (heartKey1 != null) heartKey1.remove();
this.expireTime = System.currentTimeMillis() + heartbeatInterval;
if (closed) { pooled.free(); return; }
if (close) { closed = true; }
boolean free = true;
Expand Down Expand Up @@ -390,9 +395,30 @@ public void send(final Pooled<ByteBuffer> pooled, final boolean close) {
}
});
}

private class HeartBeat implements Runnable {

private final XnioExecutor executor;

public HeartBeat(XnioExecutor executor) {
this.executor = executor;
}

public void run() {
long currentTime = System.currentTimeMillis();
if (currentTime >= expireTime) {
sendAlive();
heartKey = executor.executeAfter(this, heartbeatInterval, TimeUnit.MILLISECONDS);
} else {
final long nextBeatInterval = expireTime - System.currentTimeMillis();
// prevent negative intervals by scheduling to run immediately if nextBeatInterval happens to be negative
heartKey = executor.executeAfter(this, nextBeatInterval < 0? 0: nextBeatInterval, TimeUnit.MILLISECONDS);
}

}
}
}

private final Runnable heartbeatCommand = this::sendAlive;

public String toString() {
return String.format("Remoting connection %08x to %s of %s", Integer.valueOf(hashCode()), connection.getPeerAddress(), getRemoteConnectionProvider().getConnectionProviderContext().getEndpoint());
Expand Down

0 comments on commit c8a4b7d

Please sign in to comment.