Skip to content

Commit

Permalink
Don't block the event loop when reading inputstream. (#6120)
Browse files Browse the repository at this point in the history
* Don't block the event loop when reading inputstream. Fixes #6100

* Use only publishOn

* add issue link

* synchronize access to input stream

* Lazily init processor

* Skip offloading to IO

Co-authored-by: jameskleeh <james.kleeh@gmail.com>
  • Loading branch information
graemerocher and jameskleeh committed Sep 9, 2021
1 parent 3b31252 commit 9de5f7b
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,62 +73,80 @@ public BindingResult<InputStream> bind(ArgumentConversionContext<InputStream> co
NettyHttpRequest nettyHttpRequest = (NettyHttpRequest) source;
io.netty.handler.codec.http.HttpRequest nativeRequest = nettyHttpRequest.getNativeRequest();
if (nativeRequest instanceof StreamedHttpRequest) {
HttpContentProcessor<ByteBufHolder> processor = (HttpContentProcessor<ByteBufHolder>) processorResolver.resolve(nettyHttpRequest, context.getArgument());
PipedOutputStream outputStream = new PipedOutputStream();
try {
PipedInputStream inputStream = new PipedInputStream(outputStream);
processor.subscribe(new CompletionAwareSubscriber<ByteBufHolder>() {
PipedInputStream inputStream = new PipedInputStream(outputStream) {
private volatile HttpContentProcessor<ByteBufHolder> processor;

@Override
protected void doOnSubscribe(Subscription subscription) {
subscription.request(1);
}
private void init() {
if (processor == null) {
processor = (HttpContentProcessor<ByteBufHolder>) processorResolver.resolve(nettyHttpRequest, context.getArgument());
processor.subscribe(new CompletionAwareSubscriber<ByteBufHolder>() {

@Override
protected void doOnNext(ByteBufHolder message) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received streaming message for argument [{}]: {}", context.getArgument(), message);
}
ByteBuf content = message.content();
if (!(content instanceof EmptyByteBuf)) {
try {
byte[] bytes = ByteBufUtil.getBytes(content);
outputStream.write(bytes, 0, bytes.length);
} catch (IOException e) {
subscription.cancel();
return;
} finally {
content.release();
}
@Override
protected void doOnSubscribe(Subscription subscription) {
subscription.request(1);
}

@Override
protected synchronized void doOnNext(ByteBufHolder message) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received streaming message for argument [{}]: {}", context.getArgument(), message);
}
ByteBuf content = message.content();
if (!(content instanceof EmptyByteBuf)) {
try {
byte[] bytes = ByteBufUtil.getBytes(content);
outputStream.write(bytes, 0, bytes.length);
} catch (IOException e) {
subscription.cancel();
return;
} finally {
content.release();
}
}
subscription.request(1);
}

@Override
protected synchronized void doOnError(Throwable t) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received error for argument [" + context.getArgument() + "]: " + t.getMessage(), t);
}
try {
outputStream.close();
} catch (IOException ignored) {
} finally {
subscription.cancel();
}
}

@Override
protected synchronized void doOnComplete() {
if (LOG.isTraceEnabled()) {
LOG.trace("Done receiving messages for argument: {}", context.getArgument());
}
try {
outputStream.close();
} catch (IOException ignored) {
}
}
});
}
subscription.request(1);
}

@Override
protected void doOnError(Throwable t) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received error for argument [" + context.getArgument() + "]: " + t.getMessage(), t);
}
try {
outputStream.close();
} catch (IOException ignored) {
} finally {
subscription.cancel();
}
public synchronized int read(byte[] b, int off, int len) throws IOException {
init();
return super.read(b, off, len);
}

@Override
protected void doOnComplete() {
if (LOG.isTraceEnabled()) {
LOG.trace("Done receiving messages for argument: {}", context.getArgument());
}
try {
outputStream.close();
} catch (IOException ignored) {
}
public synchronized int read() throws IOException {
init();
return super.read();
}

});
};

return () -> Optional.of(inputStream);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.micronaut.http.server.netty.stream

import io.micronaut.context.ApplicationContext
import io.micronaut.http.*
import io.micronaut.http.annotation.*
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.exceptions.HttpClientResponseException
import io.micronaut.http.client.multipart.MultipartBody
import io.micronaut.runtime.server.EmbeddedServer
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import jakarta.inject.Inject
import spock.lang.AutoCleanup
import spock.lang.Issue
import spock.lang.Shared
import spock.lang.Specification

import javax.annotation.Nullable
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

class InputStreamBodySpec extends Specification {

@Shared @AutoCleanup EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer)

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/6100')
void "test apply load to InputStream read"() {
given:
HttpClient client = embeddedServer.applicationContext.createBean(HttpClient, embeddedServer.getURI())

when:
int max = 30
CountDownLatch latch = new CountDownLatch(max)

ExecutorService pool = Executors.newCachedThreadPool()
ConcurrentLinkedQueue<HttpStatus> responses = new ConcurrentLinkedQueue()
for (int i = 0; i < max; i++) {
pool.submit(() -> {
try {
MultipartBody multipartBody = MultipartBody.builder()
.addPart("myfile",
"largefile" * 1024)
.build()
HttpRequest request = HttpRequest.POST("/input-stream-test/hello", multipartBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE)
HttpResponse response = client.toBlocking()
.exchange(request)
responses.add(response.status())
System.out.println(response.getStatus())
System.out.println(response.getHeaders().asMap())

} catch (HttpClientResponseException e) {
System.out.println(e.getStatus())
} catch (URISyntaxException e) {
e.printStackTrace()
} finally {
latch.countDown()
}

})
}
latch.await()

then:
responses.size() == 30
responses.every({ it == HttpStatus.OK})
}
}

@Controller("/input-stream-test")
class PayloadInputStream {

@Inject
@Client("/")
private HttpClient httpClient

private String responsePayload = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">\n" +
" <soap:Body>\n" +
" <ns2:getHelloWorldAsStringResponse xmlns:ns2=\"http://sample.soap.oracle/\">\n" +
" <return>Hello World %s</return>\n" +
" </ns2:getHelloWorldAsStringResponse>\n" +
" </soap:Body>\n" +
"</soap:Envelope>"


@Post("/hello")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.ALL)
@ExecuteOn(TaskExecutors.IO)
MutableHttpResponse<String> stream(@Body @Nullable InputStream payload) throws IOException {

//blocking read on injected http client
HttpResponse<String> resp = httpClient.toBlocking().exchange(HttpRequest
.GET(URI.create("/input-stream-test/hello/other")), String.class)
System.out.println(resp.getBody(String.class).get())


byte[] body = payload.bytes
String b = new String(body)
int l = body.length

responsePayload = "{\n" +
"\t\"payload\" : {\n" +
"\t\t\"name\" : \"1542\"\n" +
"\t}\n" +
"}"
return HttpResponse.ok().body(responsePayload).contentType(MediaType.APPLICATION_JSON)
}

@Get("/hello/other")
String other() {
return "Some body content"
}
}
13 changes: 13 additions & 0 deletions inject/src/main/java/io/micronaut/context/BeanProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,17 @@ default void ifResolvable(@NonNull Consumer<T> consumer) {
return (Argument) Argument.of(BeanProvider.class, Objects.requireNonNull(type, "Type cannot be null"));
}

/**
* Allows selecting an alternative bean if the backing bean is not present.
* @param alternative The alternative, can be {@code null}
* @return The bean if present or else the supplied alternative
* @since 3.0.2
*/
default @Nullable T orElse(@Nullable T alternative) {
if (isPresent()) {
return get();
} else {
return alternative;
}
}
}

0 comments on commit 9de5f7b

Please sign in to comment.