From cba67914d2d6ed349f2f4b001009144c04b28405 Mon Sep 17 00:00:00 2001 From: Andreas Veithen Date: Tue, 24 Oct 2017 11:10:20 -0500 Subject: [PATCH] SWS-707 - Enable streaming incoming responses Original pull request: #85 --- pom.xml | 6 ++ .../http/AbstractHttpSenderConnection.java | 39 ++++++--- .../AbstractHttpSenderConnectionTest.java | 85 +++++++++++++++++++ 3 files changed, 117 insertions(+), 13 deletions(-) create mode 100644 spring-ws-core/src/test/java/org/springframework/ws/transport/http/AbstractHttpSenderConnectionTest.java diff --git a/pom.xml b/pom.xml index af6ed1047..0e1dee519 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,7 @@ 1.8.12 1.2.20 3.1 + 2.5 1.2 1.6.1 3.1 @@ -123,6 +124,11 @@ commons-logging ${commons-logging.version} + + commons-io + commons-io + ${commons-io.version} + org.springframework spring-core diff --git a/spring-ws-core/src/main/java/org/springframework/ws/transport/http/AbstractHttpSenderConnection.java b/spring-ws-core/src/main/java/org/springframework/ws/transport/http/AbstractHttpSenderConnection.java index 8cfa03c53..5ff80b80b 100644 --- a/spring-ws-core/src/main/java/org/springframework/ws/transport/http/AbstractHttpSenderConnection.java +++ b/spring-ws-core/src/main/java/org/springframework/ws/transport/http/AbstractHttpSenderConnection.java @@ -16,14 +16,13 @@ package org.springframework.ws.transport.http; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.PushbackInputStream; import java.util.Iterator; import java.util.zip.GZIPInputStream; import javax.xml.namespace.QName; -import org.springframework.util.FileCopyUtils; import org.springframework.util.StringUtils; import org.springframework.ws.transport.AbstractSenderConnection; import org.springframework.ws.transport.FaultAwareWebServiceConnection; @@ -33,13 +32,21 @@ * Abstract base class for {@link WebServiceConnection} implementations that send request over HTTP. * * @author Arjen Poutsma + * @author Andreas Veithen * @since 1.0.0 */ public abstract class AbstractHttpSenderConnection extends AbstractSenderConnection implements FaultAwareWebServiceConnection { - /** Buffer used for reading the response, when the content length is invalid. */ - private byte[] responseBuffer; + /** + * Cached result of {@link #hasResponse}. + */ + private Boolean hasResponse; + + /** + * The raw response input stream to use instead of calling {@link #getRawResponseInputStream()}. + */ + private PushbackInputStream rawResponseInputStream; @Override public final boolean hasError() throws IOException { @@ -69,23 +76,29 @@ protected final boolean hasResponse() throws IOException { HttpTransportConstants.STATUS_NO_CONTENT == responseCode) { return false; } + if (hasResponse != null) { + return hasResponse; + } long contentLength = getResponseContentLength(); if (contentLength < 0) { - if (responseBuffer == null) { - responseBuffer = FileCopyUtils.copyToByteArray(getRawResponseInputStream()); + rawResponseInputStream = new PushbackInputStream(getRawResponseInputStream()); + int b = rawResponseInputStream.read(); + if (b == -1) { + hasResponse = Boolean.FALSE; + } else { + hasResponse = Boolean.TRUE; + rawResponseInputStream.unread(b); } - contentLength = responseBuffer.length; + } else { + hasResponse = contentLength > 0; } - return contentLength > 0; + return hasResponse; } @Override protected final InputStream getResponseInputStream() throws IOException { - InputStream inputStream; - if (responseBuffer != null) { - inputStream = new ByteArrayInputStream(responseBuffer); - } - else { + InputStream inputStream = rawResponseInputStream; + if (inputStream == null) { inputStream = getRawResponseInputStream(); } return isGzipResponse() ? new GZIPInputStream(inputStream) : inputStream; diff --git a/spring-ws-core/src/test/java/org/springframework/ws/transport/http/AbstractHttpSenderConnectionTest.java b/spring-ws-core/src/test/java/org/springframework/ws/transport/http/AbstractHttpSenderConnectionTest.java new file mode 100644 index 000000000..acdce38dc --- /dev/null +++ b/spring-ws-core/src/test/java/org/springframework/ws/transport/http/AbstractHttpSenderConnectionTest.java @@ -0,0 +1,85 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.ws.transport.http; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.Collections; +import java.util.Random; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.CountingInputStream; +import org.easymock.Capture; +import org.junit.Test; + +import org.springframework.ws.WebServiceMessage; +import org.springframework.ws.WebServiceMessageFactory; + +import static org.easymock.EasyMock.*; +import static org.junit.Assert.*; + +/** + * @author Andreas Veithen + */ +public class AbstractHttpSenderConnectionTest { + + /** + * Tests that {@link AbstractHttpSenderConnection} doesn't consume the response stream before + * passing it to the message factory. This is a regression test for SWS-707. + * + * @param chunking + * Specifies whether the test should simulate a response with chunking enabled. + * @throws Exception + */ + private void testSupportsStreaming(boolean chunking) throws Exception { + byte[] content = new byte[16*1024]; + new Random().nextBytes(content); + CountingInputStream rawInputStream = new CountingInputStream(new ByteArrayInputStream(content)); + + AbstractHttpSenderConnection connection = createNiceMock(AbstractHttpSenderConnection.class); + expect(connection.getResponseCode()).andReturn(200); + // Simulate response with chunking enabled + expect(connection.getResponseContentLength()).andReturn(chunking ? -1L : content.length); + expect(connection.getRawResponseInputStream()).andReturn(rawInputStream); + expect(connection.getResponseHeaders(anyObject())).andReturn(Collections.emptyIterator()); + + // Create a mock message factory to capture the InputStream passed to it + WebServiceMessageFactory messageFactory = createNiceMock(WebServiceMessageFactory.class); + WebServiceMessage message = createNiceMock(WebServiceMessage.class); + Capture inputStreamCapture = new Capture<>(); + expect(messageFactory.createWebServiceMessage(capture(inputStreamCapture))).andReturn(message); + + replay(connection, messageFactory, message); + + connection.receive(messageFactory); + + assertTrue("The raw input stream has been completely consumed", + rawInputStream.getCount() < content.length); + assertArrayEquals("Unexpected content received by the message factory", + content, IOUtils.toByteArray(inputStreamCapture.getValue())); + } + + @Test + public void testSupportsStreamingWithChunkingEnabled() throws Exception { + testSupportsStreaming(true); + } + + @Test + public void testSupportsStreamingWithChunkingDisabled() throws Exception { + testSupportsStreaming(false); + } +} \ No newline at end of file