Skip to content

Commit

Permalink
Add reactive multipart request support
Browse files Browse the repository at this point in the history
This commit introduces reactive multipart support by adding
2 new methods to ServerWebExchange:
 - Flux<Part> getParts()
 - Mono<MultiValueMap<String, Part>> getAllParts()

The first one emits parts as a stream as they are received while the
second lazily parse all parts of a reactive request.

These operations are performed using a reactive MultipartResolver
which can be configured at HttpWebHandlerAdapter/WebHttpHandlerBuilder
level.

An implementation based on https://github.com/synchronoss/nio-multipart
is provided. The parsing of the request is non-blocking, but consuming the
parts is still blocking since NIO Multipart is currently using InputStream
and OutputStream. See synchronoss/nio-multipart#4
for more details about this limitation expected to be fixed in an upcoming
release.

Issue: SPR-14546
  • Loading branch information
sdeleuze committed Oct 25, 2016
1 parent 2075932 commit 6a38b53
Show file tree
Hide file tree
Showing 13 changed files with 918 additions and 1 deletion.
1 change: 1 addition & 0 deletions build.gradle
Expand Up @@ -777,6 +777,7 @@ project("spring-web") {
optional("javax.xml.bind:jaxb-api:${jaxbVersion}")
optional("javax.xml.ws:jaxws-api:${jaxwsVersion}")
optional("javax.mail:javax.mail-api:${javamailVersion}")
optional("org.synchronoss.cloud:nio-multipart-parser:1.0.0")
testCompile(project(":spring-context-support")) // for JafMediaTypeFactory
testCompile("io.projectreactor.addons:reactor-test:1.0.0.BUILD-SNAPSHOT")
testCompile("org.apache.taglibs:taglibs-standard-jstlel:1.2.1") {
Expand Down
@@ -0,0 +1,65 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.web.multipart.reactive;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.MultiValueMap;
import org.springframework.web.multipart.MultipartException;

/**
* A strategy interface for multipart resolution in accordance
* with <a href="https://tools.ietf.org/html/rfc7578">RFC 7578</a>.
* Implementations are typically usable both within an application context
* and standalone.
*
* @author Sebastien Deleuze
* @since 5.0
*/
public interface MultipartResolver {

/**
* Determine if the given request contains multipart content.
* <p>Will typically check for content type "multipart/form-data", but the actually
* accepted requests might depend on the capabilities of the resolver implementation.
* @param request the servlet request to be evaluated
* @return whether the request contains multipart content
*/
boolean isMultipart(ServerHttpRequest request);

/**
* Parse the given HTTP request into a stream of parts as they are received.
* @param request the servlet request (must be with a multipart content type)
* @return a {@link Flux} containing the various parts if the current request is a multipart
* one, or one emitting a {@link MultipartException} if the request is not multipart, or if
* implementation-specific problems are encountered (such as exceeding file size limits).
*/
Flux<Part> resolveParts(ServerHttpRequest request);

/**
* Lazily parse all parts of a multipart request.
* @param request the servlet request (must be with a multipart content type)
* @return a {@link Flux} containing a map with part name as key and the various parts
* as values if the current request is a multipart one, or one emitting a
* {@link MultipartException} if the request is not multipart, or if
* implementation-specific problems are encountered (such as exceeding file size limits).
*/
Mono<MultiValueMap<String, Part>> resolveAllParts(ServerHttpRequest request);

}
@@ -0,0 +1,70 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.web.multipart.reactive;

import java.io.File;
import java.util.Optional;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;

/**
* A representation of a part received in a multipart request. Could contain a file, the
* string or json value of a parameter.
*
* @author Sebastien Deleuze
* @since 5.0
*/
public interface Part {

/**
* @return the headers of this part
*/
HttpHeaders getHeaders();

/**
* @return the name of the parameter in the multipart form
*/
String getName();

/**
* @return optionally the filename if the part contains a file
*/
Optional<String> getFilename();

/**
* @return the content of the part as a String
*/
Mono<String> getValue();

/**
* @return the content of the part as a stream of bytes
*/
Flux<DataBuffer> getContent();

/**
* Transfer the file contained in this part to the specified destination.
* @param dest the destination file
* @return a {@link Mono} that indicates completion of the file transfer or an error,
* for example an {@link IllegalStateException} if the part does not contain a file
*/
Mono<Void> transferTo(File dest);

}
@@ -0,0 +1,167 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.web.multipart.reactive.nio;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import org.synchronoss.cloud.nio.multipart.Multipart;
import org.synchronoss.cloud.nio.multipart.MultipartContext;
import org.synchronoss.cloud.nio.multipart.MultipartUtils;
import org.synchronoss.cloud.nio.multipart.NioMultipartParser;
import org.synchronoss.cloud.nio.multipart.NioMultipartParserListener;
import org.synchronoss.cloud.nio.stream.storage.StreamStorage;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.multipart.MultipartException;
import org.springframework.web.multipart.reactive.Part;
import org.springframework.web.multipart.reactive.MultipartResolver;

/**
* {@link MultipartResolver} implementation based on the NIO Multipart library.
*
* The parsing of the request is non-blocking, but consuming the parts is not yet
* since NIO Multipart is currently using {@code InputStream} / {@code OutputStream}.
* See <a href="https://github.com/synchronoss/nio-multipart/issues/4">this GitHub issue</a>
* for more details about this limitation expected to be fixed in an upcoming release.
*
* @author Sebastien Deleuze
* @since 5.0
* @see <a href="https://github.com/synchronoss/nio-multipart">NIO Multipart</a>
*/
public class NioMultipartResolver implements MultipartResolver {

@Override
public boolean isMultipart(ServerHttpRequest request) {
if (!HttpMethod.POST.equals(request.getMethod())) {
return false;
}
MediaType contentType = request.getHeaders().getContentType();
return (contentType != null && MultipartUtils.isMultipart(contentType.toString()));
}

@Override
public Flux<Part> resolveParts(ServerHttpRequest request) {
if (!isMultipart(request)) {
return Flux.empty();
}
return Flux.create(new NioMultipartConsumer(request));
}

@Override
public Mono<MultiValueMap<String, Part>> resolveAllParts(ServerHttpRequest request) {
return resolveParts(request)
.collectList()
.map(parts -> {
MultiValueMap<String, Part> partsMap = new LinkedMultiValueMap<>();
parts.forEach(part -> partsMap.add(part.getName(), part));
return partsMap;
});
}

private static class NioMultipartConsumer implements Consumer<FluxSink<Part>> {

private final ServerHttpRequest request;


public NioMultipartConsumer(ServerHttpRequest request) {
this.request = request;
}


@Override
public void accept(FluxSink<Part> emitter) {
HttpHeaders headers = request.getHeaders();
MultipartContext context = new MultipartContext(
headers.getContentType().toString(),
Math.toIntExact(headers.getContentLength()),
headers.getFirst(HttpHeaders.ACCEPT_CHARSET));
NioMultipartParser parser = Multipart.multipart(context).forNIO(new NioMultipartParserListener() {
@Override
public void onPartFinished(StreamStorage streamStorage, Map<String, List<String>> headersFromPart) {
HttpHeaders headers = new HttpHeaders();
headers.putAll(headersFromPart);
emitter.next(new NioPart(headers, streamStorage));
}

@Override
public void onFormFieldPartFinished(String fieldName, String fieldValue, Map<String, List<String>> headersFromPart) {
HttpHeaders headers = new HttpHeaders();
headers.putAll(headersFromPart);
emitter.next(new NioPart(headers, fieldValue));
}

@Override
public void onAllPartsFinished() {
emitter.complete();
}

@Override
public void onNestedPartStarted(Map<String, List<String>> headersFromParentPart) {
}

@Override
public void onNestedPartFinished() {
}

@Override
public void onError(String message, Throwable cause) {
emitter.error(new MultipartException(message, cause));
}
});

request.getBody().subscribe(buffer -> {
byte[] resultBytes = new byte[buffer.readableByteCount()];
buffer.read(resultBytes);
try {
parser.write(resultBytes);
}
catch (IOException ex) {
throw Exceptions.bubble(ex);
}

}, (e) -> {
try {
parser.close();
}
catch (IOException ex) {
throw Exceptions.bubble(e);
}
}, () -> {
try {
parser.close();
}
catch (IOException ex) {
throw Exceptions.bubble(ex);
}
});

}
}

}

0 comments on commit 6a38b53

Please sign in to comment.