diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index 694ff157509c..99b865bd70ab 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -22,12 +22,10 @@ import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Result; -import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.util.BufferUtil; @@ -67,7 +65,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener private final AtomicReference senderState = new AtomicReference<>(SenderState.IDLE); private final Callback commitCallback = new CommitCallback(); private final IteratingCallback contentCallback = new ContentCallback(); - private final Callback trailersCallback = new TrailersCallback(); private final Callback lastCallback = new LastCallback(); private final HttpChannel channel; private HttpContent content; @@ -444,15 +441,6 @@ private void terminateRequest(HttpExchange exchange, Throwable failure, Result r */ protected abstract void sendContent(HttpExchange exchange, HttpContent content, Callback callback); - /** - * Implementations should send the HTTP trailers and notify the given {@code callback} of the - * result of this operation. - * - * @param exchange the exchange to send - * @param callback the callback to notify - */ - protected abstract void sendTrailers(HttpExchange exchange, Callback callback); - protected void reset() { HttpContent content = this.content; @@ -745,20 +733,10 @@ private void process() throws Exception if (content == null) return; - HttpRequest request = exchange.getRequest(); - Supplier trailers = request.getTrailers(); - boolean hasContent = content.hasContent(); - if (!hasContent) + if (!content.hasContent()) { - if (trailers == null) - { - // No trailers or content to send, we are done. - someToSuccess(exchange); - } - else - { - sendTrailers(exchange, lastCallback); - } + // No content to send, we are done. + someToSuccess(exchange); } else { @@ -859,9 +837,7 @@ protected Action process() throws Exception if (lastContent) { - HttpRequest request = exchange.getRequest(); - Supplier trailers = request.getTrailers(); - sendContent(exchange, content, trailers == null ? lastCallback : trailersCallback); + sendContent(exchange, content, lastCallback); return Action.IDLE; } @@ -925,28 +901,6 @@ protected void onCompleteSuccess() } } - private class TrailersCallback implements Callback - { - @Override - public void succeeded() - { - HttpExchange exchange = getHttpExchange(); - if (exchange == null) - return; - sendTrailers(exchange, lastCallback); - } - - @Override - public void failed(Throwable x) - { - HttpContent content = HttpSender.this.content; - if (content == null) - return; - content.failed(x); - anyToFailure(x); - } - } - private class LastCallback implements Callback { @Override diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java index 4cab4008904f..1f8d04397dac 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java @@ -59,7 +59,7 @@ protected void sendHeaders(HttpExchange exchange, HttpContent content, Callback { try { - new HeadersCallback(exchange, content, callback, getHttpChannel().getHttpConnection()).iterate(); + new HeadersCallback(exchange, content, callback).iterate(); } catch (Throwable x) { @@ -83,8 +83,8 @@ protected void sendContent(HttpExchange exchange, HttpContent content, Callback HttpGenerator.Result result = generator.generateRequest(null, null, chunk, contentBuffer, lastContent); if (LOG.isDebugEnabled()) LOG.debug("Generated content ({} bytes) - {}/{}", - contentBuffer == null ? -1 : contentBuffer.remaining(), - result, generator); + contentBuffer == null ? -1 : contentBuffer.remaining(), + result, generator); switch (result) { case NEED_CHUNK: @@ -138,21 +138,6 @@ protected void sendContent(HttpExchange exchange, HttpContent content, Callback } } - @Override - protected void sendTrailers(HttpExchange exchange, Callback callback) - { - try - { - new TrailersCallback(callback).iterate(); - } - catch (Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug(x); - callback.failed(x); - } - } - @Override protected void reset() { @@ -191,19 +176,17 @@ private class HeadersCallback extends IteratingCallback private final HttpExchange exchange; private final Callback callback; private final MetaData.Request metaData; - private final HttpConnectionOverHTTP httpConnectionOverHTTP; private ByteBuffer headerBuffer; private ByteBuffer chunkBuffer; private ByteBuffer contentBuffer; private boolean lastContent; private boolean generated; - public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback, HttpConnectionOverHTTP httpConnectionOverHTTP) + public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback) { super(false); this.exchange = exchange; this.callback = callback; - this.httpConnectionOverHTTP = httpConnectionOverHTTP; HttpRequest request = exchange.getRequest(); ContentProvider requestContent = request.getContent(); @@ -231,10 +214,10 @@ protected Action process() throws Exception HttpGenerator.Result result = generator.generateRequest(metaData, headerBuffer, chunkBuffer, contentBuffer, lastContent); if (LOG.isDebugEnabled()) LOG.debug("Generated headers ({} bytes), chunk ({} bytes), content ({} bytes) - {}/{}", - headerBuffer == null ? -1 : headerBuffer.remaining(), - chunkBuffer == null ? -1 : chunkBuffer.remaining(), - contentBuffer == null ? -1 : contentBuffer.remaining(), - result, generator); + headerBuffer == null ? -1 : headerBuffer.remaining(), + chunkBuffer == null ? -1 : chunkBuffer.remaining(), + contentBuffer == null ? -1 : contentBuffer.remaining(), + result, generator); switch (result) { case NEED_HEADER: @@ -249,7 +232,8 @@ protected Action process() throws Exception } case NEED_CHUNK_TRAILER: { - return Action.SUCCEEDED; + chunkBuffer = httpClient.getByteBufferPool().acquire(httpClient.getRequestBufferSize(), false); + break; } case FLUSH: { @@ -260,11 +244,8 @@ protected Action process() throws Exception chunkBuffer = BufferUtil.EMPTY_BUFFER; if (contentBuffer == null) contentBuffer = BufferUtil.EMPTY_BUFFER; - - httpConnectionOverHTTP.addBytesOut( BufferUtil.length(headerBuffer) - + BufferUtil.length(chunkBuffer) - + BufferUtil.length(contentBuffer)); - + long bytes = headerBuffer.remaining() + chunkBuffer.remaining() + contentBuffer.remaining(); + getHttpChannel().getHttpConnection().addBytesOut(bytes); endPoint.write(this, headerBuffer, chunkBuffer, contentBuffer); generated = true; return Action.SCHEDULED; @@ -331,83 +312,6 @@ private void release() } } - private class TrailersCallback extends IteratingCallback - { - private final Callback callback; - private ByteBuffer chunkBuffer; - - public TrailersCallback(Callback callback) - { - this.callback = callback; - } - - @Override - protected Action process() throws Throwable - { - while (true) - { - HttpGenerator.Result result = generator.generateRequest(null, null, chunkBuffer, null, true); - if (LOG.isDebugEnabled()) - LOG.debug("Generated trailers {}/{}", result, generator); - switch (result) - { - case NEED_CHUNK_TRAILER: - { - chunkBuffer = httpClient.getByteBufferPool().acquire(httpClient.getRequestBufferSize(), false); - break; - } - case FLUSH: - { - EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint(); - endPoint.write(this, chunkBuffer); - return Action.SCHEDULED; - } - case SHUTDOWN_OUT: - { - shutdownOutput(); - return Action.SUCCEEDED; - } - case DONE: - { - return Action.SUCCEEDED; - } - default: - { - throw new IllegalStateException(result.toString()); - } - } - } - } - - @Override - public void succeeded() - { - release(); - super.succeeded(); - } - - @Override - public void failed(Throwable x) - { - release(); - callback.failed(x); - super.failed(x); - } - - @Override - protected void onCompleteSuccess() - { - super.onCompleteSuccess(); - callback.succeeded(); - } - - private void release() - { - httpClient.getByteBufferPool().release(chunkBuffer); - chunkBuffer = null; - } - } - private class ByteBufferRecyclerCallback extends Callback.Nested { private final ByteBufferPool pool; @@ -435,7 +339,9 @@ public void succeeded() public void failed(Throwable x) { for (ByteBuffer buffer : buffers) + { pool.release(buffer); + } super.failed(x); } } diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpSenderOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpSenderOverFCGI.java index 3a5a6b63a4d8..93203f26d00d 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpSenderOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpSenderOverFCGI.java @@ -125,10 +125,4 @@ protected void sendContent(HttpExchange exchange, HttpContent content, Callback getHttpChannel().flush(result); } } - - @Override - protected void sendTrailers(HttpExchange exchange, Callback callback) - { - callback.succeeded(); - } } diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/TrailersTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/TrailersTest.java index 5b81e21bf201..348bb41d02b6 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/TrailersTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/TrailersTest.java @@ -18,16 +18,6 @@ package org.eclipse.jetty.http2.client; -import static org.hamcrest.Matchers.is; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -35,7 +25,6 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - import javax.servlet.ServletInputStream; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; @@ -58,10 +47,14 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.Promise; - import org.eclipse.jetty.util.StringUtil; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TrailersTest extends AbstractTest { @@ -289,7 +282,7 @@ public void onData(Stream stream, DataFrame frame, Callback callback) assertTrue(latch.await(5, TimeUnit.SECONDS)); - assertTrue( frames.size()==3, frames.toString()); + assertEquals(3, frames.size(), frames.toString()); HeadersFrame headers = (HeadersFrame)frames.get(0); DataFrame data = (DataFrame)frames.get(1); @@ -298,7 +291,7 @@ public void onData(Stream stream, DataFrame frame, Callback callback) assertFalse(headers.isEndStream()); assertFalse(data.isEndStream()); assertTrue(trailers.isEndStream()); - assertTrue(trailers.getMetaData().getFields().get(trailerName).equals(trailerValue)); + assertEquals(trailers.getMetaData().getFields().get(trailerName), trailerValue); } @Test @@ -358,6 +351,5 @@ public void onReset(Stream stream, ResetFrame frame) assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); - } } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java index b37d77a8b94a..e376a91eeac4 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java @@ -56,42 +56,57 @@ protected void sendHeaders(HttpExchange exchange, final HttpContent content, fin String path = relativize(request.getPath()); HttpURI uri = HttpURI.createHttpURI(request.getScheme(), request.getHost(), request.getPort(), path, null, request.getQuery(), null); MetaData.Request metaData = new MetaData.Request(request.getMethod(), uri, HttpVersion.HTTP_2, request.getHeaders()); - Supplier trailers = request.getTrailers(); - metaData.setTrailerSupplier(trailers); - HeadersFrame headersFrame = new HeadersFrame(metaData, null, trailers == null && !content.hasContent()); - HttpChannelOverHTTP2 channel = getHttpChannel(); - Promise promise = new Promise() + Supplier trailerSupplier = request.getTrailers(); + metaData.setTrailerSupplier(trailerSupplier); + + HeadersFrame headersFrame; + Promise promise; + if (content.hasContent()) { - @Override - public void succeeded(Stream stream) + headersFrame = new HeadersFrame(metaData, null, false); + promise = new HeadersPromise(request, callback) { - channel.setStream(stream); - ((IStream)stream).setAttachment(channel); - long idleTimeout = request.getIdleTimeout(); - if (idleTimeout >= 0) - stream.setIdleTimeout(idleTimeout); - - if (content.hasContent() && !expects100Continue(request)) + @Override + public void succeeded(Stream stream) { - boolean advanced = content.advance(); - boolean lastContent = trailers == null && content.isLast(); - if (advanced || lastContent) + super.succeeded(stream); + if (expects100Continue(request)) + { + // Don't send the content yet. + callback.succeeded(); + } + else { - DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), lastContent); - stream.data(dataFrame, callback); - return; + boolean advanced = content.advance(); + boolean lastContent = content.isLast(); + if (advanced || lastContent) + sendContent(stream, content, trailerSupplier, callback); + else + callback.succeeded(); } } - callback.succeeded(); - } - - @Override - public void failed(Throwable failure) + }; + } + else + { + HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get(); + boolean endStream = trailers == null || trailers.size() == 0; + headersFrame = new HeadersFrame(metaData, null, endStream); + promise = new HeadersPromise(request, callback) { - callback.failed(failure); - } - }; + @Override + public void succeeded(Stream stream) + { + super.succeeded(stream); + if (endStream) + callback.succeeded(); + else + sendTrailers(stream, trailers, callback); + } + }; + } // TODO optimize the send of HEADERS and DATA frames. + HttpChannelOverHTTP2 channel = getHttpChannel(); channel.getSession().newStream(headersFrame, promise, channel.getStreamListener()); } @@ -123,19 +138,59 @@ protected void sendContent(HttpExchange exchange, HttpContent content, Callback else { Stream stream = getHttpChannel().getStream(); - Supplier trailers = exchange.getRequest().getTrailers(); - DataFrame frame = new DataFrame(stream.getId(), content.getByteBuffer(), trailers == null && content.isLast()); - stream.data(frame, callback); + Supplier trailerSupplier = exchange.getRequest().getTrailers(); + sendContent(stream, content, trailerSupplier, callback); } } - @Override - protected void sendTrailers(HttpExchange exchange, Callback callback) + private void sendContent(Stream stream, HttpContent content, Supplier trailerSupplier, Callback callback) { - Supplier trailers = exchange.getRequest().getTrailers(); - MetaData metaData = new MetaData(HttpVersion.HTTP_2, trailers.get()); - Stream stream = getHttpChannel().getStream(); + boolean lastContent = content.isLast(); + HttpFields trailers = null; + boolean endStream = false; + if (lastContent) + { + trailers = trailerSupplier == null ? null : trailerSupplier.get(); + endStream = trailers == null || trailers.size() == 0; + } + DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), endStream); + HttpFields fTrailers = trailers; + stream.data(dataFrame, endStream || !lastContent ? callback : Callback.from(() -> sendTrailers(stream, fTrailers, callback), callback::failed)); + } + + private void sendTrailers(Stream stream, HttpFields trailers, Callback callback) + { + MetaData metaData = new MetaData(HttpVersion.HTTP_2, trailers); HeadersFrame trailersFrame = new HeadersFrame(stream.getId(), metaData, null, true); stream.headers(trailersFrame, callback); } + + private class HeadersPromise implements Promise + { + private final HttpRequest request; + private final Callback callback; + + private HeadersPromise(HttpRequest request, Callback callback) + { + this.request = request; + this.callback = callback; + } + + @Override + public void succeeded(Stream stream) + { + HttpChannelOverHTTP2 channel = getHttpChannel(); + channel.setStream(stream); + ((IStream)stream).setAttachment(channel); + long idleTimeout = request.getIdleTimeout(); + if (idleTimeout >= 0) + stream.setIdleTimeout(idleTimeout); + } + + @Override + public void failed(Throwable x) + { + callback.failed(x); + } + } } diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/RequestTrailersTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/RequestTrailersTest.java new file mode 100644 index 000000000000..e6d321632d3f --- /dev/null +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/RequestTrailersTest.java @@ -0,0 +1,142 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client.http; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.client.HttpRequest; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.util.DeferredContentProvider; +import org.eclipse.jetty.client.util.StringContentProvider; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.util.Callback; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RequestTrailersTest extends AbstractTest +{ + @Test + public void testEmptyTrailersWithoutContent() throws Exception + { + testEmptyTrailers(null); + } + + @Test + public void testEmptyTrailersWithEagerContent() throws Exception + { + testEmptyTrailers("eager_content"); + } + + private void testEmptyTrailers(String content) throws Exception + { + CountDownLatch trailersLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true); + stream.headers(responseFrame, Callback.NOOP); + return new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + trailersLatch.countDown(); + } + }; + } + }); + + HttpRequest request = (HttpRequest)client.newRequest("localhost", connector.getLocalPort()); + HttpFields trailers = new HttpFields(); + request.trailers(() -> trailers); + if (content != null) + request.content(new StringContentProvider(content)); + + ContentResponse response = request.send(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + + // The client must not send the trailers. + assertFalse(trailersLatch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testEmptyTrailersWithDeferredContent() throws Exception + { + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + return new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame dataFrame, Callback callback) + { + callback.succeeded(); + // We should not receive an empty HEADERS frame for the + // trailers, but instead a DATA frame with endStream=true. + if (dataFrame.isEndStream()) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true); + stream.headers(responseFrame, Callback.NOOP); + } + } + }; + } + }); + + HttpRequest request = (HttpRequest)client.newRequest("localhost", connector.getLocalPort()); + HttpFields trailers = new HttpFields(); + request.trailers(() -> trailers); + DeferredContentProvider content = new DeferredContentProvider(); + request.content(content); + + CountDownLatch latch = new CountDownLatch(1); + request.send(result -> + { + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + latch.countDown(); + }); + + // Send deferred content after a while. + Thread.sleep(1000); + content.offer(ByteBuffer.wrap("deferred_content".getBytes(StandardCharsets.UTF_8))); + content.close(); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } +}