Permalink
Show file tree
Hide file tree
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
[JENKINS-36871] Stripped back higher performance ByteBufferQueue Inpu…
…tStream implementation - Drops support for mark and unbounded read in return for faster performance
- Loading branch information
Showing
2 changed files
with
294 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@@ -0,0 +1,134 @@ | ||
/* | ||
* The MIT License | ||
* | ||
* Copyright (c) 2016, CloudBees, Inc. | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in | ||
* all copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
* THE SOFTWARE. | ||
*/ | ||
package org.jenkinsci.remoting.util; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.nio.BufferUnderflowException; | ||
import java.nio.ByteBuffer; | ||
|
||
/** | ||
* An {@link InputStream} backed by a set number of bytes from the head of a {@link ByteBufferQueue}. | ||
* Assumes that the backing {@link ByteBufferQueue} will not be read by another thread during calls to {@link #read()} | ||
* so all methods are non-blocking. Does not support {@link #mark(int)}. | ||
* | ||
* @since FIXME | ||
*/ | ||
public class FastByteBufferQueueInputStream extends InputStream { | ||
|
||
/** | ||
* The backing queue. | ||
*/ | ||
private final ByteBufferQueue queue; | ||
/** | ||
* How much to read off the queue. | ||
*/ | ||
private final int length; | ||
/** | ||
* How far we have read. | ||
*/ | ||
private int pos; | ||
|
||
/** | ||
* Constructs a new instance. | ||
* | ||
* @param queue the backing {@link ByteBufferQueue}. | ||
* @param length the limit of bytes to take from the backing queue. | ||
*/ | ||
public FastByteBufferQueueInputStream(ByteBufferQueue queue, int length) { | ||
this.queue = queue; | ||
this.length = length; | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public int read() throws IOException { | ||
return pos++ >= length ? -1 : (queue.get() & 0xff); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public int read(byte[] b, int off, int len) throws IOException { | ||
int rem = length - pos; | ||
if (rem <= 0) { | ||
return -1; | ||
} | ||
int read = queue.get(b, off, len > rem ? rem : len); | ||
if (read <= 0) { | ||
return -1; | ||
} | ||
pos += read; | ||
return read; | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public long skip(long n) throws IOException { | ||
if (pos >= length) { | ||
return -1; | ||
} | ||
if (pos + n >= length) { | ||
n = length - pos; | ||
} | ||
long skipped = queue.skip(n); | ||
pos += skipped; | ||
return skipped; | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public int available() throws IOException { | ||
return pos >= length ? -1 : length - pos; | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public synchronized void mark(int readlimit) { | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public synchronized void reset() throws IOException { | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public boolean markSupported() { | ||
return false; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@@ -0,0 +1,160 @@ | ||
/* | ||
* The MIT License | ||
* | ||
* Copyright (c) 2016, CloudBees, Inc. | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in | ||
* all copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
* THE SOFTWARE. | ||
*/ | ||
package org.jenkinsci.remoting.util; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.nio.ByteBuffer; | ||
import org.junit.Test; | ||
|
||
import static org.hamcrest.Matchers.is; | ||
import static org.junit.Assert.assertThat; | ||
import static org.junit.Assume.assumeThat; | ||
|
||
public class FastByteBufferQueueInputStreamTest { | ||
@Test | ||
public void readAll() throws Exception { | ||
String str = "AbCdEfGhIjKlMnOpQrStUvWxYz"; | ||
|
||
ByteBufferQueue queue = new ByteBufferQueue(10); | ||
queue.put(ByteBuffer.wrap(str.getBytes(Charsets.UTF_8))); | ||
FastByteBufferQueueInputStream instance = new FastByteBufferQueueInputStream(queue,26); | ||
|
||
assertThat(read(instance), is(str)); | ||
} | ||
|
||
@Test | ||
public void readLimit() throws Exception { | ||
String str = "AbCdEfGhIjKlMnOpQrStUvWxYz"; | ||
|
||
ByteBufferQueue queue = new ByteBufferQueue(10); | ||
queue.put(ByteBuffer.wrap(str.getBytes(Charsets.UTF_8))); | ||
FastByteBufferQueueInputStream instance = new FastByteBufferQueueInputStream(queue, 10); | ||
|
||
assertThat(read(instance, 10), is("AbCdEfGhIj")); | ||
} | ||
|
||
@Test | ||
public void readSome() throws Exception { | ||
String str = "AbCdEfGhIjKlMnOpQrStUvWxYz"; | ||
|
||
ByteBufferQueue queue = new ByteBufferQueue(10); | ||
queue.put(ByteBuffer.wrap(str.getBytes(Charsets.UTF_8))); | ||
FastByteBufferQueueInputStream instance = new FastByteBufferQueueInputStream(queue,26); | ||
|
||
assertThat(read(instance, 10), is("AbCdEfGhIj")); | ||
} | ||
|
||
@Test | ||
public void readBytes() throws Exception { | ||
String str = "AbCdEfGhIjKlMnOpQrStUvWxYz"; | ||
|
||
ByteBufferQueue queue = new ByteBufferQueue(10); | ||
queue.put(ByteBuffer.wrap(str.getBytes(Charsets.UTF_8))); | ||
FastByteBufferQueueInputStream instance = new FastByteBufferQueueInputStream(queue,26); | ||
|
||
byte[] bytes = new byte[10]; | ||
assertThat(instance.read(bytes), is(10)); | ||
assertThat(new String(bytes, Charsets.UTF_8), is("AbCdEfGhIj")); | ||
assertThat(instance.read(bytes), is(10)); | ||
assertThat(new String(bytes, Charsets.UTF_8), is("KlMnOpQrSt")); | ||
assertThat(instance.read(bytes), is(6)); | ||
assertThat(new String(bytes, Charsets.UTF_8), is("UvWxYzQrSt")); | ||
} | ||
|
||
@Test | ||
public void readBytesOffset() throws Exception { | ||
String str = "AbCdEfGhIjKlMnOpQrStUvWxYz"; | ||
|
||
ByteBufferQueue queue = new ByteBufferQueue(10); | ||
queue.put(ByteBuffer.wrap(str.getBytes(Charsets.UTF_8))); | ||
FastByteBufferQueueInputStream instance = new FastByteBufferQueueInputStream(queue,26); | ||
|
||
byte[] bytes = new byte[10]; | ||
assertThat(instance.read(bytes,5,3), is(3)); | ||
assertThat(new String(bytes, Charsets.UTF_8), is("\u0000\u0000\u0000\u0000\u0000AbC\u0000\u0000")); | ||
assertThat(instance.read(bytes, 0, 2), is(2)); | ||
assertThat(new String(bytes, Charsets.UTF_8), is("dE\u0000\u0000\u0000AbC\u0000\u0000")); | ||
assertThat(instance.read(bytes, 2, 8), is(8)); | ||
assertThat(new String(bytes, Charsets.UTF_8), is("dEfGhIjKlM")); | ||
assertThat(instance.read(bytes, 2, 8), is(8)); | ||
assertThat(new String(bytes, Charsets.UTF_8), is("dEnOpQrStU")); | ||
assertThat(instance.read(bytes, 2, 8), is(5)); | ||
assertThat(new String(bytes, Charsets.UTF_8), is("dEvWxYzStU")); | ||
} | ||
|
||
@Test | ||
public void skipRead() throws Exception { | ||
String str = "AbCdEfGhIjKlMnOpQrStUvWxYz"; | ||
|
||
ByteBufferQueue queue = new ByteBufferQueue(10); | ||
queue.put(ByteBuffer.wrap(str.getBytes(Charsets.UTF_8))); | ||
FastByteBufferQueueInputStream instance = new FastByteBufferQueueInputStream(queue,26); | ||
|
||
StringBuffer buf = new StringBuffer(); | ||
int b; | ||
do { | ||
if (instance.skip(1) != 1) { | ||
b = -1; | ||
} else { | ||
b = instance.read(); | ||
if (b != -1) { | ||
buf.append((char) b); | ||
} | ||
} | ||
} while (b != -1); | ||
|
||
assertThat(buf.toString(), is("bdfhjlnprtvxz")); | ||
} | ||
|
||
@Test | ||
public void markRead() throws Exception { | ||
String str = "AbCdEfGhIjKlMnOpQrStUvWxYz"; | ||
|
||
ByteBufferQueue queue = new ByteBufferQueue(10); | ||
queue.put(ByteBuffer.wrap(str.getBytes(Charsets.UTF_8))); | ||
FastByteBufferQueueInputStream instance = new FastByteBufferQueueInputStream(queue,26); | ||
assertThat(instance.markSupported(), is(false)); | ||
} | ||
|
||
private static String read(InputStream is) throws IOException { | ||
ByteArrayOutputStream tmp = new ByteArrayOutputStream(); | ||
int b; | ||
while (-1 != (b = is.read())) { | ||
tmp.write(b); | ||
} | ||
return new String(tmp.toByteArray(), Charsets.UTF_8); | ||
} | ||
|
||
private static String read(InputStream is, int count) throws IOException { | ||
ByteArrayOutputStream tmp = new ByteArrayOutputStream(); | ||
int b; | ||
while (count > 0 && -1 != (b = is.read())) { | ||
tmp.write(b); | ||
count--; | ||
} | ||
return new String(tmp.toByteArray(), Charsets.UTF_8); | ||
} | ||
} |