Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reactive multipart request support #1201

Closed
wants to merge 6 commits into from

Conversation

sdeleuze
Copy link
Contributor

@sdeleuze sdeleuze commented Oct 12, 2016

This draft PR is a first Multipart support implementation here for high-level feedbacks and discussion.

It does not include RequestMapping level support, integration tests do not work yet with Reactor and RxNetty and NioMultipartResolver need to be rewritten using Flux.create() instead of ReplayProcessor.

@sdeleuze sdeleuze force-pushed the SPR-14546 branch 3 times, most recently from 21e701a to a15cc85 Compare October 13, 2016 16:17
@sdeleuze
Copy link
Contributor Author

sdeleuze commented Oct 13, 2016

I have updated this PR witgh the following changes:

  • Tests now run fine with Reactor Netty and RxNetty
  • getAllParts() now caches the resulting mono to allow multiple invocations
  • There is a single Part object suitable usable for file and non file parts
  • We now use Flux.create() instead of ReplayProcessor
  • NioPart#transferTo() now uses zero copy when possible (when a temporary file has been previously created)

@rstoyanchev Could you please make a high-level review of this commit?

@rstoyanchev
Copy link
Contributor

rstoyanchev commented Oct 14, 2016

@sdeleuze, apologies for suggesting it but ServerHttpRequest doesn't seem like the right level for exposing multipart support. Having MultipartResolver take ServerHttpRequest as input is already an indication that it's a higher level concern. Then MultipartResolver has to be injected into every request implementation, which is rather repetitive and artificial since it doesn't benefit from being in every request implementation. It's also a cyclical dependency even at the package level since the http package is logically at a level below the web package and should not depend on anything from it.

As it looks currently it's a better fit exposing it in ServerWebExchange. To avoid a circular dependency there once again, MultipartResolver and Part should be in the server package with maybe a server.multipart sub-package for implementations. For initialization MultipartResolver could be a field of HttpWebHandlerAdapter and then passed into ServerWebExchange instances. Very similar to WebSessionManager and WebSession.

For the caching, the cache operator should really be called at construction time or otherwise concurrent calls to getParts can each create a separate pipeline. See how DefaultWebExchange does it for WebSession and also note that DefaultWebSession uses the defer operator to ensure it doesn't do anything at the time of declaring the pipeline.

When the request is not multipart could we be more lenient and return an empty stream from getParts and a Mono with empty map from getAllParts perhaps? It flips the model from having to call isMultipartRequest first to being able to call getParts and getAllParts without error or side effect and optionally checking isMultipartRequest first if you have to.

Do we need Part.getContentType if we have getHeaders? Just thinking there are plenty of methods on part already?

I'm still not entirely sure how a JSON multipart would work out? Would it be a call to getValue() or getContent()? The underlying library seems to split the callbacks that way, but I'm not sure what it would do if there is no filename but there is a non-text/plain content-type.

I haven't looked into the implementation library but I presume it is non-blocking indeed? It seems to be returning java.io.InputStream.

@rstoyanchev rstoyanchev self-assigned this Oct 14, 2016
@sdeleuze
Copy link
Contributor Author

sdeleuze commented Oct 18, 2016

No worry indeed, I agree that ServerHttpRequest is not the right level to implement that for the reasons you mentioned. I also agree with your remarks on caching and being more lenient if the request is not multipart.

We can remove Part.getContentType() since this is just a shortcut to Part.getHeaders().getContentType() (with maybe a Javadoc hint to help users for file parts).

Currently I think JSON parts will be natively treated by the underlying library like a part with a String based value since it has no filename header. Even if in NioPart I allowed to get the content both as a String or as a Publisher<DataBuffer>, that could be not optimal.

Not sure if that makes sens, but should we add some HttpMessageReader based capabilities with some <T> Flux<T> getContentAsStream(Class<T> type) and <T> Mono<T> getContentAs(Class<T> type) methods available on Part ? Or should we delegate that part to a composite MultipartHttpMessageReader that could be initialized with a List<HttpMessageReader> like we do on the writing side for SSE?

About the NIO Multipart library, it seems to me that the request parsing is done in a non blocking way but sadly we can only access to data as String or InputStream depending on the part type, so it may be blocking I suppose. I have implemented an optimisation to be able to perform a zero-copy transfer if the InputStream is a FileInputStream.

I can take care of the changes discussed here if you are ok.

I have created an issue on their GitHub to ask them more insight about the blocking behavior implied by InputStream and that fact that non-file part content is only accessible as a String.

@rstoyanchev
Copy link
Contributor

Okay, I'm not too sure what you mean with the HttpMessageReader based capabilities. I would think that Part should only expose basic ways of getting to the data, but once you've updated the PR maybe it will be more clear.

@sdeleuze
Copy link
Contributor Author

I have pushed an updated PR, and asked a new question to NIO Multipart project lead about how to perform non-blocking processing with their current InputStream / OutputStream based API.

@sdeleuze sdeleuze force-pushed the SPR-14546 branch 4 times, most recently from e484c44 to c7fb77d Compare October 25, 2016 21:45
@sdeleuze sdeleuze changed the title Add reactive multipart request support (WIP) Add reactive multipart request support Oct 25, 2016
@sdeleuze
Copy link
Contributor Author

@rstoyanchev Could you please review the last version of this PR?

Copy link
Contributor

@rstoyanchev rstoyanchev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having ServerWebExchange to expose multipart methods and delegate to MultipartResolver internally looks alright. In fact the MultipartResolver seems to belong in the http package. It's like an HttpMessageReader and could even be used with an HttpMessageReader to further parse the content of a part. I suggest moving web.multipart.reactive to http.reactive right next to the codec package.

/**
* @return the content of the part as a String
*/
Mono<String> getValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc should mention the algorithm to determine the charset and what if there is no indication, is there a natural default or does it error out?

Also as this is a shortcut method it would make sense to call it getContentAsString(). Technically the method could be even left out but It helps to support form field values.

MultiValueMap<String, Part> partsMap = new LinkedMultiValueMap<>();
parts.forEach(part -> partsMap.add(part.getName(), part));
return partsMap;
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using collectMap instead of collectList + map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reactor collectMultimap() is using a Collection for the value type while LinkedMultiValueMap is taking a Map<K, List<V>> parameter, that's why I did not use it in my first implementation, but after more thoughts I was able to use it in my updated commit thanks to Collectors.toMap() to change the value type of the map.

}
catch (IOException ex) {
throw Exceptions.bubble(e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this notify the parser about the error which calls error on the emitter, so that subscribers for the parts will get the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some changes to the error handling in order to notify the parser about errors and ensuring the only 1 error is emitted, but I am not 100% sure this is what you meant, so this is something we should discuss together.

public void onError(String message, Throwable cause) {
emitter.error(new MultipartException(message, cause));
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An inner class perhaps given the number of methods?

* or a {@link Mono} emitting an error the current request is not a multipart one.
*/
Mono<MultiValueMap<String, Part>> getAllParts();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These could be ordered right below getPrincipal() above with getAllParts ahead of getParts as the more common case. Same for decorator and default implementations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the order and also proposed new names in order to be more consistent with getFormData(): Mono<MultiValueMap<String, Part>> getFormParts() and Flux<Part> getFormPartsAsFlux(). Are you ok with these new names?

else {
fail();
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could exchange.getParts() with collectToMap be used to get all parts first and then invoke a method that asserts all parts, which can also be called from assertGetAllParts?

@sdeleuze sdeleuze force-pushed the SPR-14546 branch 9 times, most recently from 00dddba to d3a23ce Compare October 30, 2016 11:52
@sdeleuze
Copy link
Contributor Author

@rstoyanchev I took in account your feedbacks and:

  • Changed the package to org.springframework.http.codec.multipart
  • Polished the Javadoc
  • Renamed getAllParts() to getFormParts()
  • Renamed getParts() to getFormPartsAsFlux()
  • Integrated the MultipartHttpMessageWriter commit

This commit introduces reactive multipart support by adding a new
MultipartHttpMessageReader interface (with default methods) and a
SynchronossMultipartHttpMessageReader implementation based on
the Synchronoss NIO Multipart implementation
(https://github.com/synchronoss/nio-multipart).

Issue: SPR-14546
This commit adds a reactive HttpMessageWriter that allows
to write multipart HTML forms with multipart/form-data
media type.

Issue: SPR-14546
With this commit, ServerCodecConfigurer is now exposed as a bean in
order to be provided to DefaultServerWebExchange via
WebHttpHandlerBuilder and HttpWebHandlerAdapter. This allows
DefaultServerWebExchange to get configured codecs for reading form or
multipart requests.

Issue: SPR-14546
@rstoyanchev
Copy link
Contributor

This is now in master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants