Skip to content

Commit

Permalink
Simplifying AbstractByteBufferCommandTransport to write a single `B…
Browse files Browse the repository at this point in the history
…yteBuffer`
  • Loading branch information
jglick committed Jan 20, 2023
1 parent 2d296a8 commit f2ead41
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 27 deletions.
Expand Up @@ -70,18 +70,18 @@ public abstract class AbstractByteBufferCommandTransport extends CommandTranspor
* Our channel.
*/
private Channel channel;
/**
* The chunk header buffer.
*/
private final ByteBuffer writeChunkHeader = ByteBuffer.allocate(2);
@Deprecated
private final ByteBuffer writeChunkHeader;
/**
* The transport frame size.
*/
private int transportFrameSize = 8192;
@Deprecated
private ByteBuffer writeChunkBody;
/**
* The chunk body.
* The chunk header & body buffer.
*/
private ByteBuffer writeChunkBody = ByteBuffer.allocate(transportFrameSize);
private ByteBuffer writeChunkCombined;
/**
* The delegate, this is required as we cannot access some of the methods of {@link ChunkHeader} outside of the
* remoting module.
Expand Down Expand Up @@ -120,14 +120,45 @@ public abstract class AbstractByteBufferCommandTransport extends CommandTranspor
*/
private final ByteBufferQueue sendStaging = new ByteBufferQueue(transportFrameSize);

/**
* @deprecated Pass {@code true} to {@link #AbstractByteBufferCommandTransport(boolean)} and switch {@link #write(ByteBuffer, ByteBuffer)} to {@link #write(ByteBuffer)}.
*/
@Deprecated
protected AbstractByteBufferCommandTransport() {
this(false);
}

protected AbstractByteBufferCommandTransport(boolean combineBuffers) {
if (combineBuffers) {
writeChunkHeader = null;
writeChunkBody = null;
writeChunkCombined = ByteBuffer.allocate(transportFrameSize + 2);
} else { // deprecated
writeChunkHeader = ByteBuffer.allocate(2);
writeChunkBody = ByteBuffer.allocate(transportFrameSize);
writeChunkCombined = null;
}
}

/**
* @deprecated pass true to {@link #AbstractByteBufferCommandTransport(boolean)} and implement {@link #write(ByteBuffer)}
*/
@Deprecated
protected void write(ByteBuffer header, ByteBuffer data) throws IOException {
throw new AbstractMethodError("implement write(ByteBuffer, ByteBuffer) if !combineBuffers");
}

/**
* Write the packet.
*
* @param headerAndData the header and data to write.
* @param header the header to write.
* @param data the data to write.
* @throws IOException if the data could not be written.
*/
protected abstract void write(ByteBuffer header, ByteBuffer data) throws IOException;
protected void write(ByteBuffer headerAndData) throws IOException {
throw new AbstractMethodError("implement write(ByteBuffer) if combineBuffers");
}

/**
* Handle receiving some data.
Expand Down Expand Up @@ -247,7 +278,11 @@ public void setFrameSize(int transportFrameSize) {
}
this.transportFrameSize = transportFrameSize;
// this is the only one that matters when it comes to sizing as we have to accept any frame size on receive
writeChunkBody = ByteBuffer.allocate(transportFrameSize);
if (writeChunkHeader == null) {
writeChunkCombined = ByteBuffer.allocate(transportFrameSize + 2);
} else {
writeChunkBody = ByteBuffer.allocate(transportFrameSize);
}
}

/**
Expand Down Expand Up @@ -293,14 +328,23 @@ public final void write(Command cmd, boolean last) throws IOException {
int frame = remaining > transportFrameSize
? transportFrameSize
: (int) remaining; // # of bytes we send in this chunk
((Buffer) writeChunkHeader).clear();
ChunkHeader.write(writeChunkHeader, frame, remaining > transportFrameSize);
((Buffer) writeChunkHeader).flip();
((Buffer) writeChunkBody).clear();
((Buffer) writeChunkBody).limit(frame);
sendStaging.get(writeChunkBody);
((Buffer) writeChunkBody).flip();
write(writeChunkHeader, writeChunkBody);
if (writeChunkHeader == null) {
((Buffer) writeChunkCombined).clear();
ChunkHeader.write(writeChunkCombined, frame, remaining > transportFrameSize);
((Buffer) writeChunkCombined).limit(frame + 2);
sendStaging.get(writeChunkCombined);
((Buffer) writeChunkCombined).flip();
write(writeChunkCombined);
} else {
((Buffer) writeChunkHeader).clear();
ChunkHeader.write(writeChunkHeader, frame, remaining > transportFrameSize);
((Buffer) writeChunkHeader).flip();
((Buffer) writeChunkBody).clear();
((Buffer) writeChunkBody).limit(frame);
sendStaging.get(writeChunkBody);
((Buffer) writeChunkBody).flip();
write(writeChunkHeader, writeChunkBody);
}
remaining -= frame;
}
}
Expand Down
11 changes: 3 additions & 8 deletions src/main/java/hudson/remoting/Engine.java
Expand Up @@ -644,17 +644,12 @@ public void onError(Session session, Throwable x) {
class Transport extends AbstractByteBufferCommandTransport {
final Session session;
Transport(Session session) {
super(true);
this.session = session;
}
@Override
protected void write(ByteBuffer header, ByteBuffer data) throws IOException {
LOGGER.finest(() -> "sending message of length " + ChunkHeader.length(ChunkHeader.peek(header)));
// RemoteEndpoint.Async seems to lack isLast overloads
// adapted from https://stackoverflow.com/a/42170003/12916
ByteBuffer headerAndData = ByteBuffer.allocate(header.remaining() + data.remaining());
headerAndData.put(header.duplicate());
headerAndData.put(data.duplicate());
headerAndData.rewind();
protected void write(ByteBuffer headerAndData) throws IOException {
LOGGER.finest(() -> "sending message of length " + (headerAndData.remaining() - 2));
try {
session.getAsyncRemote().sendBinary(headerAndData).get(5, TimeUnit.MINUTES);
} catch (Exception x) {
Expand Down
Expand Up @@ -313,19 +313,19 @@ private class ByteBufferCommandTransport extends AbstractByteBufferCommandTransp
* @param remoteCapability the remote capability
*/
public ByteBufferCommandTransport(Capability remoteCapability) {
super(true);
this.remoteCapability = remoteCapability;
}

/**
* {@inheritDoc}
*/
@Override
protected void write(ByteBuffer header, ByteBuffer data) throws IOException {
protected void write(ByteBuffer headerAndData) throws IOException {
//TODO: Any way to get channel information here
if (isWriteOpen()) {
try {
ChannelApplicationLayer.this.write(header);
ChannelApplicationLayer.this.write(data);
ChannelApplicationLayer.this.write(headerAndData);
} catch (ClosedChannelException e) {
// Probably it should be another exception type at all
throw new ChannelClosedException(null, "Protocol stack cannot write data anymore. ChannelApplicationLayer reports that the NIO Channel is closed", e);
Expand Down

0 comments on commit f2ead41

Please sign in to comment.