Skip to content

SPR-13657 Buffered AsyncClientHttpResponse #915

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.http.client;

import org.springframework.http.HttpMethod;

import java.io.IOException;
import java.net.URI;

/**
* A buffered {@link AsyncClientHttpRequestFactory} adding buffering of the request and response streams.
* Configuring the request factory allows for multiple reads of the response body stream.
*
* @author Jakub Narloch
* @see AsyncClientHttpRequestFactory
*/
public class BufferingAsyncClientHttpRequestFactory implements AsyncClientHttpRequestFactory {

private AsyncClientHttpRequestFactory delegate;

/**
* Creates new instance of {@link BufferingAsyncClientHttpRequestFactory} class.
*
* @param delegate the delegates request factory
*/
public BufferingAsyncClientHttpRequestFactory(AsyncClientHttpRequestFactory delegate) {
this.delegate = delegate;
}

@Override
public AsyncClientHttpRequest createAsyncRequest(URI uri, HttpMethod httpMethod) throws IOException {
AsyncClientHttpRequest request = delegate.createAsyncRequest(uri, httpMethod);
if(!shouldBuffer(uri, httpMethod)) {
return request;
}
return new BufferingAsyncClientHttpRequestWrapper(request);
}

protected boolean shouldBuffer(URI uri, HttpMethod httpMethod) {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.http.client;

import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.StreamUtils;
import org.springframework.util.concurrent.ListenableFuture;

import java.io.IOException;
import java.net.URI;

/**
* A buffering {@link AsyncClientHttpRequest} allowing to buffer the request output stream.
*
* @author Jakub Narloch
* @see BufferingAsyncClientHttpRequestFactory
*/
public class BufferingAsyncClientHttpRequestWrapper extends AbstractBufferingAsyncClientHttpRequest {

private final AsyncClientHttpRequest request;

public BufferingAsyncClientHttpRequestWrapper(AsyncClientHttpRequest request) {
this.request = request;
}

@Override
public HttpMethod getMethod() {
return request.getMethod();
}

@Override
public URI getURI() {
return request.getURI();
}

@Override
protected ListenableFuture<ClientHttpResponse> executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
request.getHeaders().putAll(headers);
if(bufferedOutput.length > 0) {
StreamUtils.copy(bufferedOutput, request.getBody());
}
return new BufferingAsyncClientHttpResponseFutureAdapter(request.executeAsync());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.http.client;

import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureAdapter;

import java.util.concurrent.ExecutionException;

/**
* A {@link ListenableFutureAdapter} that returns buffered {@link ClientHttpResponse} that buffers the response
* input stream.
*
* @author Jakub Narloch
* @see BufferingAsyncClientHttpRequestFactory
*/
public class BufferingAsyncClientHttpResponseFutureAdapter extends ListenableFutureAdapter<ClientHttpResponse, ClientHttpResponse> {

/**
* Creates new instance of {@link BufferingAsyncClientHttpResponseFutureAdapter} with delegated listenable future.
*
* @param adaptee the delegated future
*/
public BufferingAsyncClientHttpResponseFutureAdapter(ListenableFuture<ClientHttpResponse> adaptee) {
super(adaptee);
}

@Override
protected ClientHttpResponse adapt(ClientHttpResponse adapteeResult) throws ExecutionException {
return new BufferingClientHttpResponseWrapper(adapteeResult);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.springframework.http.client;

import org.junit.Before;
import org.junit.Test;
import org.springframework.http.HttpMethod;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.StreamUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;

import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

/**
* Tests the {@link BufferingAsyncClientHttpRequestFactory} class.
*
* @author Jakub Narloch
*/
public class BufferingAsyncClientHttpRequestFactoryTests extends AbstractJettyServerTestCase {

private AsyncClientHttpRequestFactory requestFactory;

@Before
public void setUp() throws Exception {

requestFactory = new BufferingAsyncClientHttpRequestFactory(new HttpComponentsAsyncClientHttpRequestFactory());
}

@Test
public void testResponseBuffering() throws Exception {

byte[] message = "Async buffering works".getBytes("UTF-8");
ResponseBodyExtractor originalResponse = new ResponseBodyExtractor();
ResponseBodyExtractor responseCopy = new ResponseBodyExtractor();
AsyncClientHttpRequest request = requestFactory.createAsyncRequest(new URI(baseUrl + "/echo"), HttpMethod.PUT);
request.getHeaders().setContentLength(message.length);
FileCopyUtils.copy(message, request.getBody());

ListenableFuture<ClientHttpResponse> future = request.executeAsync();
future.addCallback(originalResponse, ex -> fail(ex.getMessage()));
future.addCallback(responseCopy, ex -> fail(ex.getMessage()));
future.get();

assertArrayEquals(originalResponse.body, responseCopy.body);
assertArrayEquals(message, originalResponse.body);
}

public class ResponseBodyExtractor implements SuccessCallback<ClientHttpResponse> {

private byte[] body;

@Override
public void onSuccess(ClientHttpResponse result) {
try {
body = StreamUtils.copyToByteArray(result.getBody());
} catch (IOException e) {
// ignores
}
}
}
}