diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 4df9d8ea0578..292ce1b84e29 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -230,11 +230,10 @@ public void write(byte[] b, int off, int len) throws IOException continue; // Should we aggregate? - int capacity = getBufferSize(); if (!complete && len<=_commitSize) { if (_aggregate == null) - _aggregate = _channel.getByteBufferPool().acquire(capacity, false); + _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false); // YES - fill the aggregate with content from the buffer int filled = BufferUtil.fill(_aggregate, b, off, len); @@ -303,15 +302,30 @@ public void write(byte[] b, int off, int len) throws IOException // write any remaining content in the buffer directly if (len>0) + { // pass as readonly to avoid space stealing optimisation in HttpConnection - _channel.write(ByteBuffer.wrap(b, off, len).asReadOnlyBuffer(), complete); + ByteBuffer wrap = ByteBuffer.wrap(b, off, len); + ByteBuffer readonly = wrap.asReadOnlyBuffer(); + + // write a buffer capacity at a time to avoid JVM pooling large direct buffers + // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541 + while (len>getBufferSize()) + { + int p=readonly.position(); + int l=p+getBufferSize(); + readonly.limit(p+getBufferSize()); + _channel.write(readonly,false); + len-=getBufferSize(); + readonly.limit(l+Math.min(len,getBufferSize())); + readonly.position(l); + } + _channel.write(readonly,complete); + } else if (complete) _channel.write(BufferUtil.EMPTY_BUFFER,complete); if (complete) - { closed(); - } } @@ -679,88 +693,10 @@ public void run() } } } - - private class AsyncWrite extends AsyncFlush + + + private abstract class AsyncICB extends IteratingCallback { - private final ByteBuffer _buffer; - private final boolean _complete; - private final int _len; - - public AsyncWrite(byte[] b, int off, int len, boolean complete) - { - _buffer=ByteBuffer.wrap(b, off, len); - _complete=complete; - _len=len; - } - - public AsyncWrite(ByteBuffer buffer, boolean complete) - { - _buffer=buffer; - _complete=complete; - _len=buffer.remaining(); - } - - @Override - protected State process() - { - // flush any content from the aggregate - if (BufferUtil.hasContent(_aggregate)) - { - _channel.write(_aggregate, _complete && _len==0, this); - return State.SCHEDULED; - } - - if (!_complete && _len0 && !_flushed) - { - _flushed=true; - _channel.write(_buffer, _complete, this); - return State.SCHEDULED; - } - else if (_len==0 && !_flushed) - { - _flushed=true; - _channel.write(BufferUtil.EMPTY_BUFFER, _complete, this); - return State.SCHEDULED; - } - - if (_complete) - closed(); - return State.SUCCEEDED; - } - } - - private class AsyncFlush extends IteratingCallback - { - protected boolean _flushed; - - public AsyncFlush() - { - } - - @Override - protected State process() - { - if (BufferUtil.hasContent(_aggregate)) - { - _flushed=true; - _channel.write(_aggregate, false, this); - return State.SCHEDULED; - } - - if (!_flushed) - { - _flushed=true; - _channel.write(BufferUtil.EMPTY_BUFFER,false,this); - return State.SCHEDULED; - } - - return State.SUCCEEDED; - } - @Override protected void completed() { @@ -806,8 +742,122 @@ public void failed(Throwable e) _onError=e; _channel.getState().onWritePossible(); } + } + + + private class AsyncFlush extends AsyncICB + { + protected volatile boolean _flushed; + + public AsyncFlush() + { + } + + @Override + protected State process() + { + if (BufferUtil.hasContent(_aggregate)) + { + _flushed=true; + _channel.write(_aggregate, false, this); + return State.SCHEDULED; + } + + if (!_flushed) + { + _flushed=true; + _channel.write(BufferUtil.EMPTY_BUFFER,false,this); + return State.SCHEDULED; + } + + return State.SUCCEEDED; + } + } + + + + private class AsyncWrite extends AsyncICB + { + private final ByteBuffer _buffer; + private final ByteBuffer _slice; + private final boolean _complete; + private final int _len; + protected volatile boolean _completed; + + public AsyncWrite(byte[] b, int off, int len, boolean complete) + { + _buffer=ByteBuffer.wrap(b, off, len); + _len=len; + // always use a view for large byte arrays to avoid JVM pooling large direct buffers + _slice=_len0;) + _handler._content.put(i,(byte)'x'); + _handler._arrayBuffer=new byte[8192]; + + String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n"); + assertThat(response,containsString("HTTP/1.1 200 OK")); + assertThat(response,containsString("Content-Length")); + } + + @Test public void testWriteBufferSmall() throws Exception { final Resource big = Resource.newClassPathResource("simple/big.txt"); + _handler._writeLengthIfKnown=false; _handler._content=BufferUtil.toBuffer(big,false); - _handler._buffer=BufferUtil.allocate(8); + _handler._byteBuffer=BufferUtil.allocate(8); String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n"); assertThat(response,containsString("HTTP/1.1 200 OK")); assertThat(response,Matchers.not(containsString("Content-Length"))); assertThat(response,containsString("400\tThis is a big file")); } - + @Test public void testWriteBufferMed() throws Exception { final Resource big = Resource.newClassPathResource("simple/big.txt"); + _handler._writeLengthIfKnown=false; _handler._content=BufferUtil.toBuffer(big,false); - _handler._buffer=BufferUtil.allocate(4000); + _handler._byteBuffer=BufferUtil.allocate(4000); String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n"); assertThat(response,containsString("HTTP/1.1 200 OK")); @@ -310,8 +412,9 @@ public void testWriteBufferMed() throws Exception public void testWriteBufferLarge() throws Exception { final Resource big = Resource.newClassPathResource("simple/big.txt"); + _handler._writeLengthIfKnown=false; _handler._content=BufferUtil.toBuffer(big,false); - _handler._buffer=BufferUtil.allocate(8192); + _handler._byteBuffer=BufferUtil.allocate(8192); String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n"); assertThat(response,containsString("HTTP/1.1 200 OK")); @@ -319,12 +422,29 @@ public void testWriteBufferLarge() throws Exception assertThat(response,containsString("400\tThis is a big file")); } + + @Test + public void testAsyncWriteByte() throws Exception + { + final Resource big = Resource.newClassPathResource("simple/big.txt"); + _handler._writeLengthIfKnown=false; + _handler._content=BufferUtil.toBuffer(big,false); + _handler._arrayBuffer=new byte[1]; + _handler._async=true; + + String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n"); + assertThat(response,containsString("HTTP/1.1 200 OK")); + assertThat(response,Matchers.not(containsString("Content-Length"))); + assertThat(response,containsString("400\tThis is a big file")); + } + @Test public void testAsyncWriteSmall() throws Exception { final Resource big = Resource.newClassPathResource("simple/big.txt"); + _handler._writeLengthIfKnown=false; _handler._content=BufferUtil.toBuffer(big,false); - _handler._bytes=new byte[8]; + _handler._arrayBuffer=new byte[8]; _handler._async=true; String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n"); @@ -337,8 +457,9 @@ public void testAsyncWriteSmall() throws Exception public void testAsyncWriteMed() throws Exception { final Resource big = Resource.newClassPathResource("simple/big.txt"); + _handler._writeLengthIfKnown=false; _handler._content=BufferUtil.toBuffer(big,false); - _handler._bytes=new byte[4000]; + _handler._arrayBuffer=new byte[4000]; _handler._async=true; String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n"); @@ -351,8 +472,9 @@ public void testAsyncWriteMed() throws Exception public void testAsyncWriteLarge() throws Exception { final Resource big = Resource.newClassPathResource("simple/big.txt"); + _handler._writeLengthIfKnown=false; _handler._content=BufferUtil.toBuffer(big,false); - _handler._bytes=new byte[8192]; + _handler._arrayBuffer=new byte[8192]; _handler._async=true; String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n"); @@ -361,12 +483,32 @@ public void testAsyncWriteLarge() throws Exception assertThat(response,containsString("400\tThis is a big file")); } + + @Test + public void testAsyncWriteHuge() throws Exception + { + _handler._writeLengthIfKnown=true; + _handler._content=BufferUtil.allocate(4*1024*1024); + _handler._content.limit(_handler._content.capacity()); + for (int i=_handler._content.capacity();i-->0;) + _handler._content.put(i,(byte)'x'); + _handler._arrayBuffer=new byte[8192]; + _handler._async=true; + + String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n"); + assertThat(response,containsString("HTTP/1.1 200 OK")); + assertThat(response,containsString("Content-Length")); + } + + + @Test public void testAsyncWriteBufferSmall() throws Exception { final Resource big = Resource.newClassPathResource("simple/big.txt"); + _handler._writeLengthIfKnown=false; _handler._content=BufferUtil.toBuffer(big,false); - _handler._buffer=BufferUtil.allocate(8); + _handler._byteBuffer=BufferUtil.allocate(8); _handler._async=true; String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n"); @@ -374,13 +516,14 @@ public void testAsyncWriteBufferSmall() throws Exception assertThat(response,Matchers.not(containsString("Content-Length"))); assertThat(response,containsString("400\tThis is a big file")); } - + @Test public void testAsyncWriteBufferMed() throws Exception { final Resource big = Resource.newClassPathResource("simple/big.txt"); + _handler._writeLengthIfKnown=false; _handler._content=BufferUtil.toBuffer(big,false); - _handler._buffer=BufferUtil.allocate(4000); + _handler._byteBuffer=BufferUtil.allocate(4000); _handler._async=true; String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n"); @@ -393,8 +536,9 @@ public void testAsyncWriteBufferMed() throws Exception public void testAsyncWriteBufferLarge() throws Exception { final Resource big = Resource.newClassPathResource("simple/big.txt"); + _handler._writeLengthIfKnown=false; _handler._content=BufferUtil.toBuffer(big,false); - _handler._buffer=BufferUtil.allocate(8192); + _handler._byteBuffer=BufferUtil.allocate(8192); _handler._async=true; String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n"); @@ -405,9 +549,10 @@ public void testAsyncWriteBufferLarge() throws Exception static class ContentHandler extends AbstractHandler { + boolean _writeLengthIfKnown=true; boolean _async; - ByteBuffer _buffer; - byte[] _bytes; + ByteBuffer _byteBuffer; + byte[] _arrayBuffer; InputStream _contentInputStream; ReadableByteChannel _contentChannel; ByteBuffer _content; @@ -435,7 +580,10 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques return; } - if (_bytes!=null) + if (_content!=null && _writeLengthIfKnown) + response.setContentLength(_content.remaining()); + + if (_arrayBuffer!=null) { if (_async) { @@ -447,18 +595,23 @@ public void onWritePossible() throws IOException { while (out.isReady()) { + Assert.assertTrue(out.isReady()); int len=_content.remaining(); - if (len>_bytes.length) - len=_bytes.length; + if (len>_arrayBuffer.length) + len=_arrayBuffer.length; if (len==0) { async.complete(); break; } - _content.get(_bytes,0,len); - out.write(_bytes,0,len); + _content.get(_arrayBuffer,0,len); + if (len==1) + out.write(_arrayBuffer[0]); + else + out.write(_arrayBuffer,0,len); } + Assert.assertFalse(out.isReady()); } @Override @@ -472,20 +625,22 @@ public void onError(Throwable t) return; } - while(BufferUtil.hasContent(_content)) { int len=_content.remaining(); - if (len>_bytes.length) - len=_bytes.length; - _content.get(_bytes,0,len); - out.write(_bytes,0,len); + if (len>_arrayBuffer.length) + len=_arrayBuffer.length; + _content.get(_arrayBuffer,0,len); + if (len==1) + out.write(_arrayBuffer[0]); + else + out.write(_arrayBuffer,0,len); } return; } - if (_buffer!=null) + if (_byteBuffer!=null) { if (_async) { @@ -497,17 +652,19 @@ public void onWritePossible() throws IOException { while (out.isReady()) { + Assert.assertTrue(out.isReady()); if(BufferUtil.isEmpty(_content)) { async.complete(); break; } - BufferUtil.clearToFill(_buffer); - BufferUtil.put(_content,_buffer); - BufferUtil.flipToFlush(_buffer,0); - out.write(_buffer); + BufferUtil.clearToFill(_byteBuffer); + BufferUtil.put(_content,_byteBuffer); + BufferUtil.flipToFlush(_byteBuffer,0); + out.write(_byteBuffer); } + Assert.assertFalse(out.isReady()); } @Override @@ -524,10 +681,10 @@ public void onError(Throwable t) while(BufferUtil.hasContent(_content)) { - BufferUtil.clearToFill(_buffer); - BufferUtil.put(_content,_buffer); - BufferUtil.flipToFlush(_buffer,0); - out.write(_buffer); + BufferUtil.clearToFill(_byteBuffer); + BufferUtil.put(_content,_byteBuffer); + BufferUtil.flipToFlush(_byteBuffer,0); + out.write(_byteBuffer); } return; @@ -535,7 +692,6 @@ public void onError(Throwable t) if (_content!=null) { - response.setContentLength(_content.remaining()); if (_content.hasArray()) out.write(_content.array(),_content.arrayOffset()+_content.position(),_content.remaining()); else diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java index 6e350cac472d..df1cb669f0fa 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java @@ -784,8 +784,8 @@ public void testBigBlocks() throws Exception max = len; } - // Check that a direct content buffer was used as a chunk - Assert.assertEquals(128 * 1024, max); + // Check that biggest chunk was <= buffer size + Assert.assertEquals(_connector.getBean(HttpConnectionFactory.class).getHttpConfiguration().getOutputBufferSize() , max); // read and check the times are < 999ms String[] times = in.readLine().split(","); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestFixture.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestFixture.java index 32fe7ebe159f..2462df8136bb 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestFixture.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestFixture.java @@ -46,7 +46,7 @@ public class HttpServerTestFixture protected Server _server; protected URI _serverURI; - protected NetworkConnector _connector; + protected ServerConnector _connector; protected String _scheme="http"; protected Socket newSocket(String host,int port) throws Exception @@ -64,7 +64,7 @@ public void before() _server = new Server(); } - protected void startServer(NetworkConnector connector) throws Exception + protected void startServer(ServerConnector connector) throws Exception { _connector = connector; _server.addConnector(_connector);