Skip to content

Commit

Permalink
Don't block the event loop when reading inputstream. Fixes #6100
Browse files Browse the repository at this point in the history
  • Loading branch information
graemerocher committed Sep 8, 2021
1 parent acd3cc6 commit 0d69008
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.micronaut.http.server.netty.binders;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.subscriber.CompletionAwareSubscriber;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.type.Argument;
Expand All @@ -26,19 +27,27 @@
import io.micronaut.http.server.netty.HttpContentProcessorResolver;
import io.micronaut.http.server.netty.NettyHttpRequest;
import io.micronaut.http.server.netty.NettyHttpServer;
import io.micronaut.scheduling.TaskExecutors;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.EmptyByteBuf;
import jakarta.inject.Named;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;

/**
* Responsible for binding to a {@link InputStream} argument from the body of the request.
Expand All @@ -53,12 +62,19 @@ public class InputStreamBodyBinder implements NonBlockingBodyArgumentBinder<Inpu
private static final Logger LOG = LoggerFactory.getLogger(NettyHttpServer.class);

private final HttpContentProcessorResolver processorResolver;
private final Scheduler ioExecutor;

/**
* @param processorResolver The http content processor resolver
* @param ioExecutor The io executor
*/
public InputStreamBodyBinder(HttpContentProcessorResolver processorResolver) {
public InputStreamBodyBinder(
HttpContentProcessorResolver processorResolver,
@Named(TaskExecutors.IO)
@Nullable
ExecutorService ioExecutor) {
this.processorResolver = processorResolver;
this.ioExecutor = Schedulers.fromExecutor(ioExecutor != null ? ioExecutor : ForkJoinPool.commonPool());
}

@Override
Expand All @@ -77,59 +93,62 @@ public BindingResult<InputStream> bind(ArgumentConversionContext<InputStream> co
PipedOutputStream outputStream = new PipedOutputStream();
try {
PipedInputStream inputStream = new PipedInputStream(outputStream);
processor.subscribe(new CompletionAwareSubscriber<ByteBufHolder>() {
Flux.from(processor)
.onBackpressureBuffer()
.publishOn(ioExecutor)
.subscribeOn(ioExecutor)
.subscribe(new CompletionAwareSubscriber<ByteBufHolder>() {

@Override
protected void doOnSubscribe(Subscription subscription) {
subscription.request(1);
}

@Override
protected void doOnNext(ByteBufHolder message) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received streaming message for argument [{}]: {}", context.getArgument(), message);
}
ByteBuf content = message.content();
if (!(content instanceof EmptyByteBuf)) {
try {
byte[] bytes = ByteBufUtil.getBytes(content);
outputStream.write(bytes, 0, bytes.length);
} catch (IOException e) {
subscription.cancel();
return;
} finally {
content.release();
@Override
protected void doOnSubscribe(Subscription subscription) {
subscription.request(1);
}
}
subscription.request(1);
}

@Override
protected void doOnError(Throwable t) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received error for argument [" + context.getArgument() + "]: " + t.getMessage(), t);
}
try {
outputStream.close();
} catch (IOException ignored) {
} finally {
subscription.cancel();
}
}
@Override
protected void doOnNext(ByteBufHolder message) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received streaming message for argument [{}]: {}", context.getArgument(), message);
}
ByteBuf content = message.content();
if (!(content instanceof EmptyByteBuf)) {
try {
byte[] bytes = ByteBufUtil.getBytes(content);
outputStream.write(bytes, 0, bytes.length);
} catch (IOException e) {
subscription.cancel();
return;
} finally {
content.release();
}
}
subscription.request(1);
}

@Override
protected void doOnComplete() {
if (LOG.isTraceEnabled()) {
LOG.trace("Done receiving messages for argument: {}", context.getArgument());
}
try {
outputStream.close();
} catch (IOException ignored) {
}
}
@Override
protected void doOnError(Throwable t) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received error for argument [" + context.getArgument() + "]: " + t.getMessage(), t);
}
try {
outputStream.close();
} catch (IOException ignored) {
} finally {
subscription.cancel();
}
}

});
@Override
protected void doOnComplete() {
if (LOG.isTraceEnabled()) {
LOG.trace("Done receiving messages for argument: {}", context.getArgument());
}
try {
outputStream.close();
} catch (IOException ignored) {
}
}

});
return () -> Optional.of(inputStream);
} catch (IOException e) {
context.reject(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.micronaut.http.server.netty.binders;

import java.util.concurrent.ExecutorService;

import io.micronaut.context.BeanLocator;
import io.micronaut.context.BeanProvider;
import io.micronaut.context.event.BeanCreatedEvent;
Expand All @@ -26,6 +28,8 @@
import io.micronaut.http.server.HttpServerConfiguration;
import io.micronaut.http.server.netty.HttpContentProcessorResolver;
import io.micronaut.http.server.netty.multipart.MultipartBodyArgumentBinder;
import io.micronaut.scheduling.TaskExecutors;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

/**
Expand All @@ -41,6 +45,7 @@ class NettyBinderRegistrar implements BeanCreatedEventListener<RequestBinderRegi
private final HttpContentProcessorResolver httpContentProcessorResolver;
private final BeanLocator beanLocator;
private final BeanProvider<HttpServerConfiguration> httpServerConfiguration;
private final BeanProvider<ExecutorService> ioExecutor;

/**
* Default constructor.
Expand All @@ -49,16 +54,20 @@ class NettyBinderRegistrar implements BeanCreatedEventListener<RequestBinderRegi
* @param httpContentProcessorResolver The processor resolver
* @param beanLocator The bean locator
* @param httpServerConfiguration The server config
* @param ioExecutor The executor service
*/
NettyBinderRegistrar(
@Nullable ConversionService<?> conversionService,
HttpContentProcessorResolver httpContentProcessorResolver,
BeanLocator beanLocator,
BeanProvider<HttpServerConfiguration> httpServerConfiguration) {
BeanProvider<HttpServerConfiguration> httpServerConfiguration,
@Named(TaskExecutors.IO)
BeanProvider<ExecutorService> ioExecutor) {
this.conversionService = conversionService == null ? ConversionService.SHARED : conversionService;
this.httpContentProcessorResolver = httpContentProcessorResolver;
this.beanLocator = beanLocator;
this.httpServerConfiguration = httpServerConfiguration;
this.ioExecutor = ioExecutor;
}

@Override
Expand All @@ -77,7 +86,8 @@ public RequestBinderRegistry onCreated(BeanCreatedEvent<RequestBinderRegistry> e
httpServerConfiguration
));
registry.addRequestArgumentBinder(new InputStreamBodyBinder(
httpContentProcessorResolver
httpContentProcessorResolver,
ioExecutor.orElse(null)
));
return registry;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package io.micronaut.http.server.netty.stream

import io.micronaut.context.ApplicationContext
import io.micronaut.http.*
import io.micronaut.http.annotation.*
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.exceptions.HttpClientResponseException
import io.micronaut.http.client.multipart.MultipartBody
import io.micronaut.runtime.server.EmbeddedServer
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import jakarta.inject.Inject
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification

import javax.annotation.Nullable
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

class InputStreamBodySpec extends Specification {

@Shared @AutoCleanup EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer)

void "test apply load to InputStream read"() {
given:
HttpClient client = embeddedServer.applicationContext.createBean(HttpClient, embeddedServer.getURI())

when:
int max = 30
CountDownLatch latch = new CountDownLatch(max)

ExecutorService pool = Executors.newCachedThreadPool()
ConcurrentLinkedQueue<HttpStatus> responses = new ConcurrentLinkedQueue()
for (int i = 0; i < max; i++) {
pool.submit(() -> {
try {
MultipartBody multipartBody = MultipartBody.builder()
.addPart("myfile",
"largefile" * 1024)
.build()
HttpRequest request = HttpRequest.POST("/input-stream-test/hello", multipartBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE)
HttpResponse response = client.toBlocking()
.exchange(request)
responses.add(response.status())
System.out.println(response.getStatus())
System.out.println(response.getHeaders().asMap())

} catch (HttpClientResponseException e) {
System.out.println(e.getStatus())
} catch (URISyntaxException e) {
e.printStackTrace()
} finally {
latch.countDown()
}

})
}
latch.await()

then:
responses.size() == 30
responses.every({ it == HttpStatus.OK})
}
}

@Controller("/input-stream-test")
class PayloadInputStream {

@Inject
@Client("/")
private HttpClient httpClient

private String responsePayload = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">\n" +
" <soap:Body>\n" +
" <ns2:getHelloWorldAsStringResponse xmlns:ns2=\"http://sample.soap.oracle/\">\n" +
" <return>Hello World %s</return>\n" +
" </ns2:getHelloWorldAsStringResponse>\n" +
" </soap:Body>\n" +
"</soap:Envelope>"


@Post("/hello")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.ALL)
@ExecuteOn(TaskExecutors.IO)
MutableHttpResponse<String> stream(@Body @Nullable InputStream payload) throws IOException {

//blocking read on injected http client
HttpResponse<String> resp = httpClient.toBlocking().exchange(HttpRequest
.GET(URI.create("/input-stream-test/hello/other")), String.class)
System.out.println(resp.getBody(String.class).get())


byte[] body = payload.bytes
String b = new String(body)
int l = body.length

responsePayload = "{\n" +
"\t\"payload\" : {\n" +
"\t\t\"name\" : \"1542\"\n" +
"\t}\n" +
"}"
return HttpResponse.ok().body(responsePayload).contentType(MediaType.APPLICATION_JSON)
}

@Get("/hello/other")
String other() {
return "Some body content"
}
}
13 changes: 13 additions & 0 deletions inject/src/main/java/io/micronaut/context/BeanProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,17 @@ default void ifResolvable(@NonNull Consumer<T> consumer) {
return (Argument) Argument.of(BeanProvider.class, Objects.requireNonNull(type, "Type cannot be null"));
}

/**
* Allows selecting an alternative bean if the backing bean is not present.
* @param alternative The alternative, can be {@code null}
* @return The bean if present or else the supplied alternative
* @since 3.0.2
*/
default @Nullable T orElse(@Nullable T alternative) {
if (isPresent()) {
return get();
} else {
return alternative;
}
}
}

0 comments on commit 0d69008

Please sign in to comment.