Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make WebFlux multipart support fully Reactive [SPR-17122] #21659

Open
spring-issuemaster opened this issue Aug 6, 2018 · 9 comments
Assignees

Comments

@spring-issuemaster
Copy link
Collaborator

@spring-issuemaster spring-issuemaster commented Aug 6, 2018

Sébastien Deleuze opened SPR-17122 and commented

Spring Framework 5.0 and 5.1 provides support for reactive multipart leveraging Synchronoss nio-multipart library which comes with several limitations. Spring WebFlux feeds data to the parser, which then provides a callback when the entire content for the part is ready, potentially creating temporary files when the content is too big to avoid consuming too much memory. These limitations mainly comes from the fact that their StreamStorage abstraction is based on InputStream / OutputStream.

As proposed initially by Arjen Poutsma, we should probably write our own reactive multipart implementation to overcome these limitation and provide a fully reactive behavior where the content of the file is Reactive Streams compliant (bytes comes to the user as they are received) in order to give more control to the user. If we provide this, I tend to think that we don't have to create temporary files.


Issue Links:

  • #21180 Race-Condition in Multipart upload proxy scenario
@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

@spring-issuemaster spring-issuemaster commented Aug 6, 2018

Rossen Stoyanchev commented

There are two styles of usage, one via Flux<Part> with @RequestBody and other is through a MultiValueMap<String, Part> which could happen through data binding, or via @RequestPart.

For reading via Flux using memory with back pressure makes sense. The MultiValueMap usage however could put put a burden on memory if content is over a certain size. We could recommend using Flux for reading larger content, and otherwise put a limit on input buffering. However there might still be a case for temp files, e.g. data binding with a named FilePart bound onto a field.

@thekalinga

This comment has been minimized.

Copy link

@thekalinga thekalinga commented Jun 22, 2019

Can someone correct me if I am wrong

I see that the current implementation uses bufferUntil operator which will buffer all the data in memory till the end of each part. So, even if I use Flux<Part> variation, we are dealing with a faulty implementation

This means, if I am uploading a file of size 2GB, a proportional amount of RAM is used up by databuffers, which is a recipe for Out Of Memory errors

This is what I am referring to, DefaultMultipartMessageReader#read calls DataBufferUtils.split(body, boundaryNeedle)

and the implementation of split is

public static Flux<DataBuffer> split(Publisher<DataBuffer> dataBuffers, byte[] delimiter,
  // a lot of code
  return Flux.from(dataBuffers)
    .flatMap(buffer -> endFrameOnDelimiter(buffer, matcher))
    .bufferUntil(buffer -> buffer == END_FRAME)
  // a lot more code
}

Since we are doing buffering, we are collecting all buffers into a List<DataBuffer>s. Which further means, users are forced to cache the whole file content (alteast one full part) in RAM before its consumed. What if I as a user dont want to cache/even write temporarily to disk by framework to reduce memory pressure (which adds to the latency), but want to have backpressure (whole point of using reactive streams) & want the framework not to do pre read & cache everything in memory

Here are my two cents on how the implementation might needs to be changed to

split should be return Flux<Flux<DataBuffer>> instead of Flux<DataBuffer> & each of the element in the top level Flux corresponds to the part from http request

This way, we don't buffer the data in memory & allow the backpressure be properly cascaded

Please let me know your thoughts

@thekalinga

This comment has been minimized.

Copy link

@thekalinga thekalinga commented Jun 22, 2019

Can we use concatMap instead of flatMap inside split method as we care about the order

Since the mapping inside flatMap in this specific case is synchronous, we are not seeing the issue with out of order completion of inner publishers, but using concatMap makes both the behaviour & intent clear

I am referring to

Flux.from(dataBuffers)
  .flatMap(buffer -> endFrameOnDelimiter(buffer, matcher))
@thekalinga

This comment has been minimized.

Copy link

@thekalinga thekalinga commented Jun 22, 2019

Have deleted few comments & corrected few comments above as they were wrong/misleading in earlier versions

@thekalinga

This comment has been minimized.

Copy link

@thekalinga thekalinga commented Jun 22, 2019

Please note that I dont have the complete understanding of the source. Its possible that I might have the incorrect understanding

@thekalinga

This comment has been minimized.

Copy link

@thekalinga thekalinga commented Jun 22, 2019

If I am not wrong, we can add the following methods instead to have lazy reading & not buffer anything in memory

public static Flux<Flux<DataBuffer>> streamingSplit(Publisher<DataBuffer> dataBuffers, byte[] delimiter) {
  DataBufferUtils.Matcher matcher = matcher(delimiter);
  return Flux.from(dataBuffers)
      .concatMap(buffer -> {
        return endFrameOnDelimiter(buffer, matcher, delimiter.length)
            .windowWhile(bufferFrame -> bufferFrame != END_FRAME);
      })
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}

private static Flux<DataBuffer> endFrameOnDelimiter(DataBuffer dataBuffer, DataBufferUtils.Matcher matcher, int delimiterLength) {
  List<DataBuffer> result = new ArrayList<>();
  do {
    int endIdx = matcher.match(dataBuffer);
    int readPosition = dataBuffer.readPosition();
    if (endIdx != -1) {
      int length = endIdx + 1 - readPosition ;
      result.add(dataBuffer.retainedSlice(readPosition, length));
      result.add(END_FRAME);
      dataBuffer.readPosition(endIdx + delimiterLength + 1); // lets skip delimiter length
    } else {
      result.add(retain(dataBuffer));
      break;
    }
  } while (dataBuffer.readableByteCount() > 0);

  DataBufferUtils.release(dataBuffer);
  return Flux.fromIterable(result);
}

@poutsma @bclozel
Please let me know what you think

@poutsma

This comment has been minimized.

Copy link
Contributor

@poutsma poutsma commented Jun 24, 2019

@thekalinga I think you are right, and have created #23184 to track this issue. Thank you for trying the 5.2 milestones and reporting bugs in them!

In the future, feel free to file an issue directly instead of commenting on a closed issue (though you might want to refer to your newly created issue in a comment, to get our attention).

@poutsma

This comment has been minimized.

Copy link
Contributor

@poutsma poutsma commented Jun 24, 2019

private static Flux<DataBuffer> endFrameOnDelimiter(DataBuffer dataBuffer, DataBufferUtils.Matcher matcher, int delimiterLength) {
  ...
      dataBuffer.readPosition(endIdx + delimiterLength + 1); // lets skip delimiter length
  ...
}

AFAICT, the only difference in your last snippet is the one that skips the delimiter length (shown above). But endIdx points to the index of the last byte of the delimiter, so there is no need to skip further ahead.

poutsma added a commit that referenced this issue Jul 10, 2019
The DefaultMultipartMessageReader has been removed for 5.2 and will be
part of a future release. This commit switches back to the
SynchronossPartHttpMessageReader.

gh-21659
@poutsma

This comment has been minimized.

Copy link
Contributor

@poutsma poutsma commented Jul 11, 2019

The refactoring of the DefaultMultipartParser proved to be too ambitious for the 5.2 release deadline, so we are pushing this issue back to the next release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.