Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to WritableBodyPublisher #54

Closed
gpsfl opened this issue Jun 10, 2022 · 3 comments
Closed

Improvements to WritableBodyPublisher #54

gpsfl opened this issue Jun 10, 2022 · 3 comments

Comments

@gpsfl
Copy link

gpsfl commented Jun 10, 2022

Right now users have to manually start a thread which writes into WritableBodyPublisher. This has two disadvantages:

  1. Data is written into the internal buffer even if the HTTP request fails or is never executed
  2. Users need to do manually close the stream/channel and handle exceptions

Instead I would suggest to pass a callback to WritableBodyPublisher which is invoked in a new thread after the http client subscribes to the body handler. This thread will also close the publisher and handle exceptions. Users may also pass a custom executor to launch the thread.

Current version:

var requestBody = WritableBodyPublisher.create();
var request = MutableRequest.POST("https://example.com", requestBody);

CompletableFuture.runAsync(() -> {
  try (var outputStream  = requestBody.outputStream()) {
    outputStream .write(...);
  } catch (IOException ioe) {
    requestBody.closeExceptionally(ioe);
  }
});

client.sendAsync(request, BodyHandlers.discarding());

Suggestion:

// Optionally pass an executor using WritableBodyPublisher.withStream(executor, callback)
var requestBody = WritableBodyPublisher.withStream(outputStream -> {
    // This is called in a new thread once the body publisher is subscribed to
    // It will close afterwards either normally or exceptionally
    outputStream.write(...)
});
var request = MutableRequest.POST("https://example.com", requestBody);

client.sendAsync(request, BodyHandlers.discarding());
@gpsfl
Copy link
Author

gpsfl commented Jun 10, 2022

One more question regarding WritableBodyPublisher: Is there a reason why it buffers everything in-memory instead of blocking the writer until data is consumed? Assuming you send a big file over a slow connection: Can't this cause the application to run out of memory?

@mizosoft
Copy link
Owner

mizosoft commented Jun 12, 2022

Hi @gpsfl, thanks for your suggestions!

Passing a callback that's invoked only when content is needed sounds neat. Although mixing it with WritableBodyPublisher API might be confusing, as then, for instance, WritableBodyPublisher::outputStream won't be needed. A good choice is to add something like MoreBodyPublishers.ofOutputStream(callback, executor) that returns a BodyPublisher .

Regarding request failure, it can happen either before the HTTP client subscribes to the publisher or after. On the latter case, WritableBodyPublisher should actually stop the writer thread by setting closed = true when the subscription is cancelled. I don't know how I missed this!

Otherwise, one can do:

var requestBody = WritableBodyPublisher.create();
var request = MutableRequest.POST("https://example.com", requestBody);

CompletableFuture.runAsync(() -> {
  try (var outputStream  = requestBody.outputStream()) {
    outputStream .write(...);
  } catch (IOException ioe) {
    requestBody.closeExceptionally(ioe);
  }
});

Methanol.create().sendAsync(request, BodyHandlers.discarding())
    .whenComplete((r, t) -> {
      if (t != null) {
        requestBody.closeExceptionally(t);
      }
    });

to handle both cases. However, the callback option seems more neat.

Regarding WritableBodyPublisher's lack of a backpressure strategy (it just buffers content as it gets it as you've noticed), it's not clear if such a strategy would be practically beneficial. I've noticed the HTTP client is pretty eager in fetching request's content (this makes ProgressTracker inaccurate when uploading, try running the upload sample). Doing something like blocking the writer thread when buffered content hits a specified maximum won't be that useful as the client's eagerness won't let that maximum be reached in the first place. However, I need to investigate this further when I have time.

@gpsfl
Copy link
Author

gpsfl commented Jun 12, 2022

Passing a callback that's invoked only when content is needed sounds neat. Although mixing it with WritableBodyPublisher API might be confusing, as then, for instance, WritableBodyPublisher::outputStream won't be needed. A good choice is to add something like MoreBodyPublishers.ofOutputStream(callback, executor) that returns a BodyPublisher .

This sounds great! We have a similar method in our project right now which creates a WritableBodyPublisher and a new thread, however it already starts the thread before the BodyPublisher is subscribed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants