From 6a38b538353321c0a5fad839cf73cd8a98f7669a Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Wed, 19 Oct 2016 11:15:03 +0200 Subject: [PATCH] Add reactive multipart request support This commit introduces reactive multipart support by adding 2 new methods to ServerWebExchange: - Flux getParts() - Mono> getAllParts() The first one emits parts as a stream as they are received while the second lazily parse all parts of a reactive request. These operations are performed using a reactive MultipartResolver which can be configured at HttpWebHandlerAdapter/WebHttpHandlerBuilder level. An implementation based on https://github.com/synchronoss/nio-multipart is provided. The parsing of the request is non-blocking, but consuming the parts is still blocking since NIO Multipart is currently using InputStream and OutputStream. See https://github.com/synchronoss/nio-multipart/issues/4 for more details about this limitation expected to be fixed in an upcoming release. Issue: SPR-14546 --- build.gradle | 1 + .../multipart/reactive/MultipartResolver.java | 65 ++++++ .../web/multipart/reactive/Part.java | 70 ++++++ .../reactive/nio/NioMultipartResolver.java | 167 ++++++++++++++ .../web/multipart/reactive/nio/NioPart.java | 141 ++++++++++++ .../web/server/ServerWebExchange.java | 19 ++ .../server/ServerWebExchangeDecorator.java | 12 + .../adapter/DefaultServerWebExchange.java | 31 +++ .../server/adapter/HttpWebHandlerAdapter.java | 23 +- .../server/adapter/WebHttpHandlerBuilder.java | 18 ++ .../reactive/MultipartIntegrationTests.java | 164 ++++++++++++++ .../reactive/NioMultipartResolverTests.java | 206 ++++++++++++++++++ .../web/multipart/reactive/foo.txt | 2 + 13 files changed, 918 insertions(+), 1 deletion(-) create mode 100644 spring-web/src/main/java/org/springframework/web/multipart/reactive/MultipartResolver.java create mode 100644 spring-web/src/main/java/org/springframework/web/multipart/reactive/Part.java create mode 100644 spring-web/src/main/java/org/springframework/web/multipart/reactive/nio/NioMultipartResolver.java create mode 100644 spring-web/src/main/java/org/springframework/web/multipart/reactive/nio/NioPart.java create mode 100644 spring-web/src/test/java/org/springframework/http/server/reactive/MultipartIntegrationTests.java create mode 100644 spring-web/src/test/java/org/springframework/web/multipart/reactive/NioMultipartResolverTests.java create mode 100644 spring-web/src/test/resources/org/springframework/web/multipart/reactive/foo.txt diff --git a/build.gradle b/build.gradle index dbd62d991401..f603973b5d1d 100644 --- a/build.gradle +++ b/build.gradle @@ -777,6 +777,7 @@ project("spring-web") { optional("javax.xml.bind:jaxb-api:${jaxbVersion}") optional("javax.xml.ws:jaxws-api:${jaxwsVersion}") optional("javax.mail:javax.mail-api:${javamailVersion}") + optional("org.synchronoss.cloud:nio-multipart-parser:1.0.0") testCompile(project(":spring-context-support")) // for JafMediaTypeFactory testCompile("io.projectreactor.addons:reactor-test:1.0.0.BUILD-SNAPSHOT") testCompile("org.apache.taglibs:taglibs-standard-jstlel:1.2.1") { diff --git a/spring-web/src/main/java/org/springframework/web/multipart/reactive/MultipartResolver.java b/spring-web/src/main/java/org/springframework/web/multipart/reactive/MultipartResolver.java new file mode 100644 index 000000000000..1f86ecb84fca --- /dev/null +++ b/spring-web/src/main/java/org/springframework/web/multipart/reactive/MultipartResolver.java @@ -0,0 +1,65 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.multipart.reactive; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.util.MultiValueMap; +import org.springframework.web.multipart.MultipartException; + +/** + * A strategy interface for multipart resolution in accordance + * with RFC 7578. + * Implementations are typically usable both within an application context + * and standalone. + * + * @author Sebastien Deleuze + * @since 5.0 + */ +public interface MultipartResolver { + + /** + * Determine if the given request contains multipart content. + *

Will typically check for content type "multipart/form-data", but the actually + * accepted requests might depend on the capabilities of the resolver implementation. + * @param request the servlet request to be evaluated + * @return whether the request contains multipart content + */ + boolean isMultipart(ServerHttpRequest request); + + /** + * Parse the given HTTP request into a stream of parts as they are received. + * @param request the servlet request (must be with a multipart content type) + * @return a {@link Flux} containing the various parts if the current request is a multipart + * one, or one emitting a {@link MultipartException} if the request is not multipart, or if + * implementation-specific problems are encountered (such as exceeding file size limits). + */ + Flux resolveParts(ServerHttpRequest request); + + /** + * Lazily parse all parts of a multipart request. + * @param request the servlet request (must be with a multipart content type) + * @return a {@link Flux} containing a map with part name as key and the various parts + * as values if the current request is a multipart one, or one emitting a + * {@link MultipartException} if the request is not multipart, or if + * implementation-specific problems are encountered (such as exceeding file size limits). + */ + Mono> resolveAllParts(ServerHttpRequest request); + +} diff --git a/spring-web/src/main/java/org/springframework/web/multipart/reactive/Part.java b/spring-web/src/main/java/org/springframework/web/multipart/reactive/Part.java new file mode 100644 index 000000000000..2d8fa7fa6604 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/web/multipart/reactive/Part.java @@ -0,0 +1,70 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.multipart.reactive; + +import java.io.File; +import java.util.Optional; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.HttpHeaders; + +/** + * A representation of a part received in a multipart request. Could contain a file, the + * string or json value of a parameter. + * + * @author Sebastien Deleuze + * @since 5.0 + */ +public interface Part { + + /** + * @return the headers of this part + */ + HttpHeaders getHeaders(); + + /** + * @return the name of the parameter in the multipart form + */ + String getName(); + + /** + * @return optionally the filename if the part contains a file + */ + Optional getFilename(); + + /** + * @return the content of the part as a String + */ + Mono getValue(); + + /** + * @return the content of the part as a stream of bytes + */ + Flux getContent(); + + /** + * Transfer the file contained in this part to the specified destination. + * @param dest the destination file + * @return a {@link Mono} that indicates completion of the file transfer or an error, + * for example an {@link IllegalStateException} if the part does not contain a file + */ + Mono transferTo(File dest); + +} diff --git a/spring-web/src/main/java/org/springframework/web/multipart/reactive/nio/NioMultipartResolver.java b/spring-web/src/main/java/org/springframework/web/multipart/reactive/nio/NioMultipartResolver.java new file mode 100644 index 000000000000..9f0cbd6ce943 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/web/multipart/reactive/nio/NioMultipartResolver.java @@ -0,0 +1,167 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.multipart.reactive.nio; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +import org.synchronoss.cloud.nio.multipart.Multipart; +import org.synchronoss.cloud.nio.multipart.MultipartContext; +import org.synchronoss.cloud.nio.multipart.MultipartUtils; +import org.synchronoss.cloud.nio.multipart.NioMultipartParser; +import org.synchronoss.cloud.nio.multipart.NioMultipartParserListener; +import org.synchronoss.cloud.nio.stream.storage.StreamStorage; +import reactor.core.Exceptions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.multipart.MultipartException; +import org.springframework.web.multipart.reactive.Part; +import org.springframework.web.multipart.reactive.MultipartResolver; + +/** + * {@link MultipartResolver} implementation based on the NIO Multipart library. + * + * The parsing of the request is non-blocking, but consuming the parts is not yet + * since NIO Multipart is currently using {@code InputStream} / {@code OutputStream}. + * See this GitHub issue + * for more details about this limitation expected to be fixed in an upcoming release. + * + * @author Sebastien Deleuze + * @since 5.0 + * @see NIO Multipart + */ +public class NioMultipartResolver implements MultipartResolver { + + @Override + public boolean isMultipart(ServerHttpRequest request) { + if (!HttpMethod.POST.equals(request.getMethod())) { + return false; + } + MediaType contentType = request.getHeaders().getContentType(); + return (contentType != null && MultipartUtils.isMultipart(contentType.toString())); + } + + @Override + public Flux resolveParts(ServerHttpRequest request) { + if (!isMultipart(request)) { + return Flux.empty(); + } + return Flux.create(new NioMultipartConsumer(request)); + } + + @Override + public Mono> resolveAllParts(ServerHttpRequest request) { + return resolveParts(request) + .collectList() + .map(parts -> { + MultiValueMap partsMap = new LinkedMultiValueMap<>(); + parts.forEach(part -> partsMap.add(part.getName(), part)); + return partsMap; + }); + } + + private static class NioMultipartConsumer implements Consumer> { + + private final ServerHttpRequest request; + + + public NioMultipartConsumer(ServerHttpRequest request) { + this.request = request; + } + + + @Override + public void accept(FluxSink emitter) { + HttpHeaders headers = request.getHeaders(); + MultipartContext context = new MultipartContext( + headers.getContentType().toString(), + Math.toIntExact(headers.getContentLength()), + headers.getFirst(HttpHeaders.ACCEPT_CHARSET)); + NioMultipartParser parser = Multipart.multipart(context).forNIO(new NioMultipartParserListener() { + @Override + public void onPartFinished(StreamStorage streamStorage, Map> headersFromPart) { + HttpHeaders headers = new HttpHeaders(); + headers.putAll(headersFromPart); + emitter.next(new NioPart(headers, streamStorage)); + } + + @Override + public void onFormFieldPartFinished(String fieldName, String fieldValue, Map> headersFromPart) { + HttpHeaders headers = new HttpHeaders(); + headers.putAll(headersFromPart); + emitter.next(new NioPart(headers, fieldValue)); + } + + @Override + public void onAllPartsFinished() { + emitter.complete(); + } + + @Override + public void onNestedPartStarted(Map> headersFromParentPart) { + } + + @Override + public void onNestedPartFinished() { + } + + @Override + public void onError(String message, Throwable cause) { + emitter.error(new MultipartException(message, cause)); + } + }); + + request.getBody().subscribe(buffer -> { + byte[] resultBytes = new byte[buffer.readableByteCount()]; + buffer.read(resultBytes); + try { + parser.write(resultBytes); + } + catch (IOException ex) { + throw Exceptions.bubble(ex); + } + + }, (e) -> { + try { + parser.close(); + } + catch (IOException ex) { + throw Exceptions.bubble(e); + } + }, () -> { + try { + parser.close(); + } + catch (IOException ex) { + throw Exceptions.bubble(ex); + } + }); + + } + } + +} diff --git a/spring-web/src/main/java/org/springframework/web/multipart/reactive/nio/NioPart.java b/spring-web/src/main/java/org/springframework/web/multipart/reactive/nio/NioPart.java new file mode 100644 index 000000000000..1543862b4e83 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/web/multipart/reactive/nio/NioPart.java @@ -0,0 +1,141 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.multipart.reactive.nio; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.channels.ReadableByteChannel; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +import org.synchronoss.cloud.nio.multipart.MultipartUtils; +import org.synchronoss.cloud.nio.stream.storage.StreamStorage; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.util.StreamUtils; +import org.springframework.web.multipart.reactive.Part; + +/** + * {@link Part} implementation based on the NIO Multipart library. + * + * @author Sebastien Deleuze + * @since 5.0 + * @see NIO Multipart + */ +public class NioPart implements Part { + + private final HttpHeaders headers; + + private final StreamStorage streamStorage; + + private final String value; + + private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); + + + public NioPart(HttpHeaders headers, StreamStorage streamStorage) { + this.headers = headers; + this.streamStorage = streamStorage; + this.value = null; + } + + public NioPart(HttpHeaders headers, String value) { + this.headers = headers; + this.streamStorage = null; + this.value = value; + } + + + @Override + public String getName() { + return MultipartUtils.getFieldName(headers); + } + + @Override + public HttpHeaders getHeaders() { + return this.headers; + } + + @Override + public Optional getFilename() { + return Optional.ofNullable(MultipartUtils.getFileName(this.headers)); + } + + @Override + public Mono transferTo(File dest) { + if (!getFilename().isPresent()) { + return Mono.error(new IllegalStateException("The part does not contain a file.")); + } + try { + InputStream inputStream = this.streamStorage.getInputStream(); + // Get a FileChannel when possible in order to use zero copy mechanism + ReadableByteChannel inChannel = Channels.newChannel(inputStream); + FileChannel outChannel = new FileOutputStream(dest).getChannel(); + // NIO Multipart has previously limited the size of the content + long count = (inChannel instanceof FileChannel ? ((FileChannel)inChannel).size() : Long.MAX_VALUE); + long result = outChannel.transferFrom(inChannel, 0, count); + if (result < count) { + return Mono.error(new IOException( + "Could only write " + result + " out of " + count + " bytes")); + } + } + catch (IOException ex) { + return Mono.error(ex); + } + return Mono.empty(); + } + + @Override + public Mono getValue() { + if (this.value != null) { + return Mono.just(this.value); + } + MediaType contentType = this.headers.getContentType(); + Charset charset = (contentType.getCharset() == null ? StandardCharsets.UTF_8 : contentType.getCharset()); + try { + return Mono.just(StreamUtils.copyToString(this.streamStorage.getInputStream(), charset)); + } + catch (IOException e) { + return Mono.error(new IllegalStateException("Error while reading part content as a string", e)); + } + } + + @Override + // TODO Make this method fully non-blocking when NIO Multipart will allow it, see https://github.com/synchronoss/nio-multipart/issues/4 + public Flux getContent() { + if (this.value != null) { + DataBuffer buffer = this.bufferFactory.allocateBuffer(this.value.length()); + buffer.write(this.value.getBytes()); + return Flux.just(buffer); + } + InputStream inputStream = this.streamStorage.getInputStream(); + return DataBufferUtils.read(inputStream, this.bufferFactory, 4096); + } + +} diff --git a/spring-web/src/main/java/org/springframework/web/server/ServerWebExchange.java b/spring-web/src/main/java/org/springframework/web/server/ServerWebExchange.java index 3a48d41cba34..d6b53d18919e 100644 --- a/spring-web/src/main/java/org/springframework/web/server/ServerWebExchange.java +++ b/spring-web/src/main/java/org/springframework/web/server/ServerWebExchange.java @@ -21,10 +21,14 @@ import java.util.Map; import java.util.Optional; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.util.MultiValueMap; +import org.springframework.web.multipart.MultipartException; +import org.springframework.web.multipart.reactive.Part; /** * Contract for an HTTP request-response interaction. Provides access to the HTTP @@ -118,4 +122,19 @@ public interface ServerWebExchange { */ boolean checkNotModified(String etag, Instant lastModified); + /** + * Return a stream of parts parsed as they are received. + * @return a {@link Flux} containing the various parts if the current request is a multipart + * one or one emitting a {@link MultipartException} if the request is not multipart, or if + * implementation-specific problems are encountered (such as exceeding file size limits). + */ + Flux getParts(); + + /** + * Lazily parse all parts of a multipart request. + * @return a {@link Mono} with a map containing all the parts parsed, with the name as key, + * or a {@link Mono} emitting an error the current request is not a multipart one. + */ + Mono> getAllParts(); + } diff --git a/spring-web/src/main/java/org/springframework/web/server/ServerWebExchangeDecorator.java b/spring-web/src/main/java/org/springframework/web/server/ServerWebExchangeDecorator.java index 9be776cda11b..36cb49bfe5a6 100644 --- a/spring-web/src/main/java/org/springframework/web/server/ServerWebExchangeDecorator.java +++ b/spring-web/src/main/java/org/springframework/web/server/ServerWebExchangeDecorator.java @@ -20,11 +20,14 @@ import java.util.Map; import java.util.Optional; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.Assert; +import org.springframework.util.MultiValueMap; +import org.springframework.web.multipart.reactive.Part; /** * Wraps another {@link ServerWebExchange} and delegates all methods to it. @@ -101,6 +104,15 @@ public boolean checkNotModified(String etag, Instant lastModified) { return this.getDelegate().checkNotModified(etag, lastModified); } + @Override + public Flux getParts() { + return this.getDelegate().getParts(); + } + + @Override + public Mono> getAllParts() { + return this.getDelegate().getAllParts(); + } @Override public String toString() { diff --git a/spring-web/src/main/java/org/springframework/web/server/adapter/DefaultServerWebExchange.java b/spring-web/src/main/java/org/springframework/web/server/adapter/DefaultServerWebExchange.java index a78b68f792f2..2f7886c1b25c 100644 --- a/spring-web/src/main/java/org/springframework/web/server/adapter/DefaultServerWebExchange.java +++ b/spring-web/src/main/java/org/springframework/web/server/adapter/DefaultServerWebExchange.java @@ -25,6 +25,7 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.http.HttpHeaders; @@ -33,7 +34,10 @@ import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.Assert; +import org.springframework.util.MultiValueMap; import org.springframework.util.StringUtils; +import org.springframework.web.multipart.reactive.MultipartResolver; +import org.springframework.web.multipart.reactive.Part; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebSession; import org.springframework.web.server.session.WebSessionManager; @@ -57,11 +61,21 @@ public class DefaultServerWebExchange implements ServerWebExchange { private final Mono sessionMono; + private final MultipartResolver multipartResolver; + + private final Mono> allParts; + private volatile boolean notModified; public DefaultServerWebExchange(ServerHttpRequest request, ServerHttpResponse response, WebSessionManager sessionManager) { + this(request, response, sessionManager, null); + } + + + public DefaultServerWebExchange(ServerHttpRequest request, ServerHttpResponse response, + WebSessionManager sessionManager, MultipartResolver multipartResolver) { Assert.notNull(request, "'request' is required"); Assert.notNull(response, "'response' is required"); @@ -69,6 +83,10 @@ public DefaultServerWebExchange(ServerHttpRequest request, ServerHttpResponse re this.request = request; this.response = response; this.sessionMono = sessionManager.getSession(this).cache(); + this.multipartResolver = multipartResolver; + this.allParts = (multipartResolver == null ? + Mono.error(new UnsupportedOperationException("No MultipartResolver defined")) : + multipartResolver.resolveAllParts(request).cache()); } @@ -231,4 +249,17 @@ private boolean validateIfModifiedSince(Instant lastModified) { return true; } + @Override + public Flux getParts() { + if (this.multipartResolver == null) { + return Flux.error(new UnsupportedOperationException("No MultipartResolver defined")); + } + return this.multipartResolver.resolveParts(this.request); + } + + @Override + public Mono> getAllParts() { + return this.allParts; + } + } diff --git a/spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java b/spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java index b218a78312f2..d7775084d501 100644 --- a/spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java @@ -25,6 +25,7 @@ import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.Assert; +import org.springframework.web.multipart.reactive.MultipartResolver; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebHandler; import org.springframework.web.server.handler.WebHandlerDecorator; @@ -38,6 +39,7 @@ * then invokes the target {@code WebHandler}. * * @author Rossen Stoyanchev + * @author Sebastien Deleuze * @since 5.0 */ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler { @@ -46,6 +48,8 @@ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHa private WebSessionManager sessionManager = new DefaultWebSessionManager(); + private MultipartResolver multipartResolver; + public HttpWebHandlerAdapter(WebHandler delegate) { super(delegate); @@ -71,6 +75,23 @@ public WebSessionManager getSessionManager() { return this.sessionManager; } + /** + * Configure a {@link MultipartResolver} to resolve parts of a multipart request. + *

By default, no {@link MultipartResolver} is set. + * @param multipartResolver the multipart resolver to use + */ + public void setMultipartResolver(MultipartResolver multipartResolver) { + Assert.notNull(multipartResolver, "MultipartResolver must not be null"); + this.multipartResolver = multipartResolver; + } + + /** + * Return the configured {@link MultipartResolver}. + */ + public MultipartResolver getMultipartResolver() { + return this.multipartResolver; + } + @Override public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { @@ -87,7 +108,7 @@ public Mono handle(ServerHttpRequest request, ServerHttpResponse response) } protected ServerWebExchange createExchange(ServerHttpRequest request, ServerHttpResponse response) { - return new DefaultServerWebExchange(request, response, this.sessionManager); + return new DefaultServerWebExchange(request, response, this.sessionManager, this.multipartResolver); } } diff --git a/spring-web/src/main/java/org/springframework/web/server/adapter/WebHttpHandlerBuilder.java b/spring-web/src/main/java/org/springframework/web/server/adapter/WebHttpHandlerBuilder.java index 70bf4305192d..5fdd890f2a63 100644 --- a/spring-web/src/main/java/org/springframework/web/server/adapter/WebHttpHandlerBuilder.java +++ b/spring-web/src/main/java/org/springframework/web/server/adapter/WebHttpHandlerBuilder.java @@ -22,6 +22,7 @@ import org.springframework.http.server.reactive.HttpHandler; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; +import org.springframework.web.multipart.reactive.MultipartResolver; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebExceptionHandler; import org.springframework.web.server.WebFilter; @@ -60,6 +61,8 @@ public class WebHttpHandlerBuilder { private WebSessionManager sessionManager; + private MultipartResolver multipartResolver; + /** * Private constructor. @@ -114,6 +117,18 @@ public WebHttpHandlerBuilder sessionManager(WebSessionManager sessionManager) { return this; } + /** + * Configure the {@link MultipartResolver} to set on the + * {@link ServerWebExchange WebServerExchange} + * created for each HTTP request. + * @param multipartResolver the multipart resolver + * @see HttpWebHandlerAdapter#setMultipartResolver(MultipartResolver) + */ + public WebHttpHandlerBuilder multipartResolver(MultipartResolver multipartResolver) { + this.multipartResolver = multipartResolver; + return this; + } + /** * Build the {@link HttpHandler}. */ @@ -129,6 +144,9 @@ public HttpHandler build() { if (this.sessionManager != null) { httpHandler.setSessionManager(this.sessionManager); } + if (this.multipartResolver != null) { + httpHandler.setMultipartResolver(this.multipartResolver); + } return httpHandler; } diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/MultipartIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/MultipartIntegrationTests.java new file mode 100644 index 000000000000..167c97328865 --- /dev/null +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/MultipartIntegrationTests.java @@ -0,0 +1,164 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.net.URI; +import java.util.Optional; +import java.util.logging.Level; + +import static org.junit.Assert.*; +import org.junit.Test; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.client.RestTemplate; +import org.springframework.web.multipart.reactive.Part; +import org.springframework.web.multipart.reactive.nio.NioMultipartResolver; +import org.springframework.web.server.ServerWebExchange; +import org.springframework.web.server.WebHandler; +import org.springframework.web.server.adapter.HttpWebHandlerAdapter; + +public class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTests { + + @Override + protected HttpHandler createHttpHandler() { + HttpWebHandlerAdapter handler = new HttpWebHandlerAdapter(new CheckRequestHandler()); + handler.setMultipartResolver(new NioMultipartResolver()); + return handler; + } + + @Test + public void getParts() throws Exception { + RestTemplate restTemplate = new RestTemplate(); + RequestEntity> request = RequestEntity + .post(new URI("http://localhost:" + port + "/parts")) + .contentType(MediaType.MULTIPART_FORM_DATA) + .body(generateBody()); + ResponseEntity response = restTemplate.exchange(request, Void.class); + assertEquals(HttpStatus.OK, response.getStatusCode()); + } + + @Test + public void getAllParts() throws Exception { + RestTemplate restTemplate = new RestTemplate(); + RequestEntity> request = RequestEntity + .post(new URI("http://localhost:" + port + "/all-parts")) + .contentType(MediaType.MULTIPART_FORM_DATA) + .body(generateBody()); + ResponseEntity response = restTemplate.exchange(request, Void.class); + assertEquals(HttpStatus.OK, response.getStatusCode()); + } + + private MultiValueMap generateBody() { + HttpHeaders fooHeaders = new HttpHeaders(); + fooHeaders.setContentType(MediaType.TEXT_PLAIN); + ClassPathResource fooResource = new ClassPathResource("org/springframework/web/multipart/reactive/foo.txt"); + HttpEntity fooPart = new HttpEntity<>(fooResource, fooHeaders); + HttpEntity barPart = new HttpEntity<>("bar"); + MultiValueMap parts = new LinkedMultiValueMap<>(); + parts.add("fooPart", fooPart); + parts.add("barPart", barPart); + return parts; + } + + public static class CheckRequestHandler implements WebHandler { + + @Override + public Mono handle(ServerWebExchange exchange) { + + if (exchange.getRequest().getURI().getPath().equals("/parts")) { + return assertGetParts(exchange); + } + else if (exchange.getRequest().getURI().getPath().equals("/all-parts")) { + return assertGetAllParts(exchange); + } + return Mono.error(new AssertionError()); + } + + private Mono assertGetParts(ServerWebExchange exchange) { + return exchange.getParts() + .log("reactor", Level.SEVERE) + .doOnNext(part -> { + if (part.getName().equals("fooPart")) { + assertEquals("fooPart", part.getName()); + Optional filename = part.getFilename(); + assertTrue(filename.isPresent()); + assertEquals("foo.txt", filename.get()); + DataBuffer buffer = part + .getContent() + .reduce((s1, s2) -> s1.write(s2)) + .block(); + assertEquals(12, buffer.readableByteCount()); + byte[] byteContent = new byte[12]; + buffer.read(byteContent); + assertEquals("Lorem\nIpsum\n", new String(byteContent)); + } + else if (part.getName().equals("barPart")) { + assertEquals("barPart", part.getName()); + Optional filename = part.getFilename(); + assertFalse(filename.isPresent()); + assertEquals("bar", part.getValue().block()); + } + else { + fail(); + } + }) + .then(); + } + + private Mono assertGetAllParts(ServerWebExchange exchange) { + return exchange + .getAllParts() + .doOnNext(parts -> { + assertEquals(2, parts.size()); + assertTrue(parts.containsKey("fooPart")); + + Part part = parts.getFirst("fooPart"); + assertEquals("fooPart", part.getName()); + Optional filename = part.getFilename(); + assertTrue(filename.isPresent()); + assertEquals("foo.txt", filename.get()); + DataBuffer buffer = part + .getContent() + .reduce((s1, s2) -> s1.write(s2)) + .block(); + assertEquals(12, buffer.readableByteCount()); + byte[] byteContent = new byte[12]; + buffer.read(byteContent); + assertEquals("Lorem\nIpsum\n", new String(byteContent)); + + assertTrue(parts.containsKey("barPart")); + part = parts.getFirst("barPart"); + assertEquals("barPart", part.getName()); + filename = part.getFilename(); + assertFalse(filename.isPresent()); + assertEquals("bar", part.getValue().block()); + }) + .then(); + } + } + +} diff --git a/spring-web/src/test/java/org/springframework/web/multipart/reactive/NioMultipartResolverTests.java b/spring-web/src/test/java/org/springframework/web/multipart/reactive/NioMultipartResolverTests.java new file mode 100644 index 000000000000..cabb1c6ef344 --- /dev/null +++ b/spring-web/src/test/java/org/springframework/web/multipart/reactive/NioMultipartResolverTests.java @@ -0,0 +1,206 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.multipart.reactive; + +import java.io.IOException; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.test.subscriber.ScriptedSubscriber; + +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.http.MockHttpOutputMessage; +import org.springframework.http.converter.FormHttpMessageConverter; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.multipart.reactive.nio.NioMultipartResolver; + +/** + * @author Sebastien Deleuze + */ +public class NioMultipartResolverTests { + + @Test + public void isMultipartOnValidMultipartRequest() { + NioMultipartResolver resolver = new NioMultipartResolver(); + MockServerHttpRequest request = new MockServerHttpRequest(HttpMethod.POST, "/foo"); + request.setHeader(HttpHeaders.CONTENT_TYPE.toString(), MediaType.MULTIPART_FORM_DATA_VALUE); + assertTrue(resolver.isMultipart(request)); + } + + @Test + public void isMultipartOnRequestWithInvalidMethod() { + NioMultipartResolver resolver = new NioMultipartResolver(); + MockServerHttpRequest request = new MockServerHttpRequest(HttpMethod.GET, "/foo"); + request.setHeader(HttpHeaders.CONTENT_TYPE.toString(), MediaType.MULTIPART_FORM_DATA_VALUE); + assertFalse(resolver.isMultipart(request)); + } + + @Test + public void resolvePartsOnRequestWithInvalidMethod() { + NioMultipartResolver resolver = new NioMultipartResolver(); + MockServerHttpRequest request = new MockServerHttpRequest(HttpMethod.GET, "/foo"); + request.setHeader(HttpHeaders.CONTENT_TYPE.toString(), MediaType.MULTIPART_FORM_DATA_VALUE); + ScriptedSubscriber.create() + .expectNextCount(0) + .expectComplete() + .verify(resolver.resolveParts(request)); + } + + @Test + public void resolveAllPartsOnRequestWithInvalidMethod() { + NioMultipartResolver resolver = new NioMultipartResolver(); + MockServerHttpRequest request = new MockServerHttpRequest(HttpMethod.GET, "/foo"); + request.setHeader(HttpHeaders.CONTENT_TYPE.toString(), MediaType.MULTIPART_FORM_DATA_VALUE); + ScriptedSubscriber.>create() + .expectNextWith(value -> value.size() == 0) + .expectComplete() + .verify(resolver.resolveAllParts(request)); + } + + @Test + public void isMultipartOnRequestWithInvalidContentType() { + NioMultipartResolver resolver = new NioMultipartResolver(); + MockServerHttpRequest request = new MockServerHttpRequest(HttpMethod.POST, "/foo"); + request.setHeader(HttpHeaders.CONTENT_TYPE.toString(), MediaType.TEXT_HTML_VALUE); + assertFalse(resolver.isMultipart(request)); + } + + @Test + public void resolvePartsOnRequestWithInvalidContentType() { + NioMultipartResolver resolver = new NioMultipartResolver(); + MockServerHttpRequest request = new MockServerHttpRequest(HttpMethod.POST, "/foo"); + request.setHeader(HttpHeaders.CONTENT_TYPE.toString(), MediaType.TEXT_HTML_VALUE); + ScriptedSubscriber.create() + .expectNextCount(0) + .expectComplete() + .verify(resolver.resolveParts(request)); + } + + @Test + public void resolveAllPartsOnRequestWithInvalidContentType() { + NioMultipartResolver resolver = new NioMultipartResolver(); + MockServerHttpRequest request = new MockServerHttpRequest(HttpMethod.POST, "/foo"); + request.setHeader(HttpHeaders.CONTENT_TYPE.toString(), MediaType.TEXT_HTML_VALUE); + ScriptedSubscriber.>create() + .expectNextWith(value -> value.size() == 0) + .expectComplete() + .verify(resolver.resolveAllParts(request)); + } + + @Test + public void resolveParts() throws IOException { + ServerHttpRequest request = generateMultipartRequest(); + MultipartResolver multipartResolver = new NioMultipartResolver(); + Flux partStream = multipartResolver.resolveParts(request); + ScriptedSubscriber.create() + .consumeNextWith(part -> { + assertEquals("fooPart", part.getName()); + Optional filename = part.getFilename(); + assertTrue(filename.isPresent()); + assertEquals("foo.txt", filename.get()); + DataBuffer buffer = part + .getContent() + .reduce((s1, s2) -> s1.write(s2)) + .block(); + assertEquals(12, buffer.readableByteCount()); + byte[] byteContent = new byte[12]; + buffer.read(byteContent); + assertEquals("Lorem\nIpsum\n", new String(byteContent)); + assertEquals("Lorem\nIpsum\n", part.getValue().block()); + + }) + .consumeNextWith(part -> { + assertEquals("barPart", part.getName()); + Optional filename = part.getFilename(); + assertFalse(filename.isPresent()); + assertEquals("bar", part.getValue().block()); + DataBuffer buffer = part + .getContent() + .reduce((s1, s2) -> s1.write(s2)) + .block(); + assertEquals(3, buffer.readableByteCount()); + byte[] byteContent = new byte[3]; + buffer.read(byteContent); + assertEquals("bar", new String(byteContent)); + }) + .expectComplete() + .verify(partStream); + } + + @Test + public void resolveAllParts() throws IOException { + ServerHttpRequest request = generateMultipartRequest(); + MultipartResolver multipartResolver = new NioMultipartResolver(); + MultiValueMap parts = multipartResolver.resolveAllParts(request).block(); + assertEquals(2, parts.size()); + + assertTrue(parts.containsKey("fooPart")); + Part part = parts.getFirst("fooPart"); + assertEquals("fooPart", part.getName()); + Optional filename = part.getFilename(); + assertTrue(filename.isPresent()); + assertEquals("foo.txt", filename.get()); + DataBuffer buffer = part + .getContent() + .reduce((s1, s2) -> s1.write(s2)) + .block(); + assertEquals(12, buffer.readableByteCount()); + byte[] byteContent = new byte[12]; + buffer.read(byteContent); + assertEquals("Lorem\nIpsum\n", new String(byteContent)); + + assertTrue(parts.containsKey("barPart")); + part = parts.getFirst("barPart"); + assertEquals("barPart", part.getName()); + filename = part.getFilename(); + assertFalse(filename.isPresent()); + assertEquals("bar", part.getValue().block()); + } + + private ServerHttpRequest generateMultipartRequest() throws IOException { + HttpHeaders fooHeaders = new HttpHeaders(); + fooHeaders.setContentType(MediaType.TEXT_PLAIN); + ClassPathResource fooResource = new ClassPathResource("org/springframework/web/multipart/reactive/foo.txt"); + HttpEntity fooPart = new HttpEntity<>(fooResource, fooHeaders); + HttpEntity barPart = new HttpEntity<>("bar"); + FormHttpMessageConverter converter = new FormHttpMessageConverter(); + MockHttpOutputMessage outputMessage = new MockHttpOutputMessage(); + MultiValueMap parts = new LinkedMultiValueMap<>(); + parts.add("fooPart", fooPart); + parts.add("barPart", barPart); + converter.write(parts, MediaType.MULTIPART_FORM_DATA, outputMessage); + byte[] content = outputMessage.getBodyAsBytes(); + MockServerHttpRequest request = new MockServerHttpRequest(HttpMethod.POST, "/foo"); + request.getHeaders().setContentType(outputMessage.getHeaders().getContentType()); + request.getHeaders().setContentLength(content.length); + request.setBody(new String(content)); + return request; + } + +} diff --git a/spring-web/src/test/resources/org/springframework/web/multipart/reactive/foo.txt b/spring-web/src/test/resources/org/springframework/web/multipart/reactive/foo.txt new file mode 100644 index 000000000000..08123a7d4d89 --- /dev/null +++ b/spring-web/src/test/resources/org/springframework/web/multipart/reactive/foo.txt @@ -0,0 +1,2 @@ +Lorem +Ipsum