diff --git a/src/main/java/hudson/remoting/AbstractByteBufferCommandTransport.java b/src/main/java/hudson/remoting/AbstractByteBufferCommandTransport.java index c72351009..9e2bfd212 100644 --- a/src/main/java/hudson/remoting/AbstractByteBufferCommandTransport.java +++ b/src/main/java/hudson/remoting/AbstractByteBufferCommandTransport.java @@ -70,18 +70,18 @@ public abstract class AbstractByteBufferCommandTransport extends CommandTranspor * Our channel. */ private Channel channel; - /** - * The chunk header buffer. - */ - private final ByteBuffer writeChunkHeader = ByteBuffer.allocate(2); + @Deprecated + private final ByteBuffer writeChunkHeader; /** * The transport frame size. */ private int transportFrameSize = 8192; + @Deprecated + private ByteBuffer writeChunkBody; /** - * The chunk body. + * The chunk header & body buffer. */ - private ByteBuffer writeChunkBody = ByteBuffer.allocate(transportFrameSize); + private ByteBuffer writeChunkCombined; /** * The delegate, this is required as we cannot access some of the methods of {@link ChunkHeader} outside of the * remoting module. @@ -120,14 +120,45 @@ public abstract class AbstractByteBufferCommandTransport extends CommandTranspor */ private final ByteBufferQueue sendStaging = new ByteBufferQueue(transportFrameSize); + /** + * @deprecated Pass {@code true} to {@link #AbstractByteBufferCommandTransport(boolean)} and switch {@link #write(ByteBuffer, ByteBuffer)} to {@link #write(ByteBuffer)}. + */ + @Deprecated + protected AbstractByteBufferCommandTransport() { + this(false); + } + + protected AbstractByteBufferCommandTransport(boolean combineBuffers) { + if (combineBuffers) { + writeChunkHeader = null; + writeChunkBody = null; + writeChunkCombined = ByteBuffer.allocate(transportFrameSize + 2); + } else { // deprecated + writeChunkHeader = ByteBuffer.allocate(2); + writeChunkBody = ByteBuffer.allocate(transportFrameSize); + writeChunkCombined = null; + } + } + + /** + * @deprecated pass true to {@link #AbstractByteBufferCommandTransport(boolean)} and implement {@link #write(ByteBuffer)} + */ + @Deprecated + protected void write(ByteBuffer header, ByteBuffer data) throws IOException { + throw new AbstractMethodError("implement write(ByteBuffer, ByteBuffer) if !combineBuffers"); + } + /** * Write the packet. * + * @param headerAndData the header and data to write. * @param header the header to write. * @param data the data to write. * @throws IOException if the data could not be written. */ - protected abstract void write(ByteBuffer header, ByteBuffer data) throws IOException; + protected void write(ByteBuffer headerAndData) throws IOException { + throw new AbstractMethodError("implement write(ByteBuffer) if combineBuffers"); + } /** * Handle receiving some data. @@ -247,7 +278,11 @@ public void setFrameSize(int transportFrameSize) { } this.transportFrameSize = transportFrameSize; // this is the only one that matters when it comes to sizing as we have to accept any frame size on receive - writeChunkBody = ByteBuffer.allocate(transportFrameSize); + if (writeChunkHeader == null) { + writeChunkCombined = ByteBuffer.allocate(transportFrameSize + 2); + } else { + writeChunkBody = ByteBuffer.allocate(transportFrameSize); + } } /** @@ -293,14 +328,23 @@ public final void write(Command cmd, boolean last) throws IOException { int frame = remaining > transportFrameSize ? transportFrameSize : (int) remaining; // # of bytes we send in this chunk - ((Buffer) writeChunkHeader).clear(); - ChunkHeader.write(writeChunkHeader, frame, remaining > transportFrameSize); - ((Buffer) writeChunkHeader).flip(); - ((Buffer) writeChunkBody).clear(); - ((Buffer) writeChunkBody).limit(frame); - sendStaging.get(writeChunkBody); - ((Buffer) writeChunkBody).flip(); - write(writeChunkHeader, writeChunkBody); + if (writeChunkHeader == null) { + ((Buffer) writeChunkCombined).clear(); + ChunkHeader.write(writeChunkCombined, frame, remaining > transportFrameSize); + ((Buffer) writeChunkCombined).limit(frame + 2); + sendStaging.get(writeChunkCombined); + ((Buffer) writeChunkCombined).flip(); + write(writeChunkCombined); + } else { + ((Buffer) writeChunkHeader).clear(); + ChunkHeader.write(writeChunkHeader, frame, remaining > transportFrameSize); + ((Buffer) writeChunkHeader).flip(); + ((Buffer) writeChunkBody).clear(); + ((Buffer) writeChunkBody).limit(frame); + sendStaging.get(writeChunkBody); + ((Buffer) writeChunkBody).flip(); + write(writeChunkHeader, writeChunkBody); + } remaining -= frame; } } diff --git a/src/main/java/hudson/remoting/Engine.java b/src/main/java/hudson/remoting/Engine.java index 13866ab1e..194bf145a 100644 --- a/src/main/java/hudson/remoting/Engine.java +++ b/src/main/java/hudson/remoting/Engine.java @@ -644,17 +644,12 @@ public void onError(Session session, Throwable x) { class Transport extends AbstractByteBufferCommandTransport { final Session session; Transport(Session session) { + super(true); this.session = session; } @Override - protected void write(ByteBuffer header, ByteBuffer data) throws IOException { - LOGGER.finest(() -> "sending message of length " + ChunkHeader.length(ChunkHeader.peek(header))); - // RemoteEndpoint.Async seems to lack isLast overloads - // adapted from https://stackoverflow.com/a/42170003/12916 - ByteBuffer headerAndData = ByteBuffer.allocate(header.remaining() + data.remaining()); - headerAndData.put(header.duplicate()); - headerAndData.put(data.duplicate()); - headerAndData.rewind(); + protected void write(ByteBuffer headerAndData) throws IOException { + LOGGER.finest(() -> "sending message of length " + (headerAndData.remaining() - 2)); try { session.getAsyncRemote().sendBinary(headerAndData).get(5, TimeUnit.MINUTES); } catch (Exception x) { diff --git a/src/main/java/org/jenkinsci/remoting/protocol/impl/ChannelApplicationLayer.java b/src/main/java/org/jenkinsci/remoting/protocol/impl/ChannelApplicationLayer.java index d467c6d07..1b756f491 100644 --- a/src/main/java/org/jenkinsci/remoting/protocol/impl/ChannelApplicationLayer.java +++ b/src/main/java/org/jenkinsci/remoting/protocol/impl/ChannelApplicationLayer.java @@ -313,6 +313,7 @@ private class ByteBufferCommandTransport extends AbstractByteBufferCommandTransp * @param remoteCapability the remote capability */ public ByteBufferCommandTransport(Capability remoteCapability) { + super(true); this.remoteCapability = remoteCapability; } @@ -320,12 +321,11 @@ public ByteBufferCommandTransport(Capability remoteCapability) { * {@inheritDoc} */ @Override - protected void write(ByteBuffer header, ByteBuffer data) throws IOException { + protected void write(ByteBuffer headerAndData) throws IOException { //TODO: Any way to get channel information here if (isWriteOpen()) { try { - ChannelApplicationLayer.this.write(header); - ChannelApplicationLayer.this.write(data); + ChannelApplicationLayer.this.write(headerAndData); } catch (ClosedChannelException e) { // Probably it should be another exception type at all throw new ChannelClosedException(null, "Protocol stack cannot write data anymore. ChannelApplicationLayer reports that the NIO Channel is closed", e);