Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't block the event loop when reading inputstream. #6120

Merged
merged 6 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.micronaut.http.server.netty.binders;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.subscriber.CompletionAwareSubscriber;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.type.Argument;
Expand All @@ -26,19 +27,27 @@
import io.micronaut.http.server.netty.HttpContentProcessorResolver;
import io.micronaut.http.server.netty.NettyHttpRequest;
import io.micronaut.http.server.netty.NettyHttpServer;
import io.micronaut.scheduling.TaskExecutors;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.EmptyByteBuf;
import jakarta.inject.Named;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;

/**
* Responsible for binding to a {@link InputStream} argument from the body of the request.
Expand All @@ -53,12 +62,19 @@ public class InputStreamBodyBinder implements NonBlockingBodyArgumentBinder<Inpu
private static final Logger LOG = LoggerFactory.getLogger(NettyHttpServer.class);

private final HttpContentProcessorResolver processorResolver;
private final Scheduler ioExecutor;

/**
* @param processorResolver The http content processor resolver
* @param ioExecutor The io executor
*/
public InputStreamBodyBinder(HttpContentProcessorResolver processorResolver) {
public InputStreamBodyBinder(
HttpContentProcessorResolver processorResolver,
@Named(TaskExecutors.IO)
@Nullable
ExecutorService ioExecutor) {
this.processorResolver = processorResolver;
this.ioExecutor = Schedulers.fromExecutor(ioExecutor != null ? ioExecutor : ForkJoinPool.commonPool());
}

@Override
Expand All @@ -77,59 +93,61 @@ public BindingResult<InputStream> bind(ArgumentConversionContext<InputStream> co
PipedOutputStream outputStream = new PipedOutputStream();
try {
PipedInputStream inputStream = new PipedInputStream(outputStream);
processor.subscribe(new CompletionAwareSubscriber<ByteBufHolder>() {
Flux.from(processor)
.onBackpressureBuffer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this lead to OOM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InputStream is not thread safe, so I don't see an alternative

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I don't think this is an improvement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide a reproducer for your OOM theory in that case. "I don't think" is not a valid argument opposing a bug fix

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to create the scenario. In the meantime, relax with your tone

.publishOn(ioExecutor)
.subscribe(new CompletionAwareSubscriber<ByteBufHolder>() {

@Override
protected void doOnSubscribe(Subscription subscription) {
subscription.request(1);
}

@Override
protected void doOnNext(ByteBufHolder message) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received streaming message for argument [{}]: {}", context.getArgument(), message);
}
ByteBuf content = message.content();
if (!(content instanceof EmptyByteBuf)) {
try {
byte[] bytes = ByteBufUtil.getBytes(content);
outputStream.write(bytes, 0, bytes.length);
} catch (IOException e) {
subscription.cancel();
return;
} finally {
content.release();
@Override
protected void doOnSubscribe(Subscription subscription) {
subscription.request(1);
}
}
subscription.request(1);
}

@Override
protected void doOnError(Throwable t) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received error for argument [" + context.getArgument() + "]: " + t.getMessage(), t);
}
try {
outputStream.close();
} catch (IOException ignored) {
} finally {
subscription.cancel();
}
}
@Override
protected synchronized void doOnNext(ByteBufHolder message) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received streaming message for argument [{}]: {}", context.getArgument(), message);
}
ByteBuf content = message.content();
if (!(content instanceof EmptyByteBuf)) {
try {
byte[] bytes = ByteBufUtil.getBytes(content);
outputStream.write(bytes, 0, bytes.length);
} catch (IOException e) {
subscription.cancel();
return;
} finally {
content.release();
}
}
subscription.request(1);
}

@Override
protected void doOnComplete() {
if (LOG.isTraceEnabled()) {
LOG.trace("Done receiving messages for argument: {}", context.getArgument());
}
try {
outputStream.close();
} catch (IOException ignored) {
}
}
@Override
protected synchronized void doOnError(Throwable t) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received error for argument [" + context.getArgument() + "]: " + t.getMessage(), t);
}
try {
outputStream.close();
} catch (IOException ignored) {
} finally {
subscription.cancel();
}
}

});
@Override
protected synchronized void doOnComplete() {
if (LOG.isTraceEnabled()) {
LOG.trace("Done receiving messages for argument: {}", context.getArgument());
}
try {
outputStream.close();
} catch (IOException ignored) {
}
}

});
return () -> Optional.of(inputStream);
} catch (IOException e) {
context.reject(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.micronaut.http.server.netty.binders;

import java.util.concurrent.ExecutorService;

import io.micronaut.context.BeanLocator;
import io.micronaut.context.BeanProvider;
import io.micronaut.context.event.BeanCreatedEvent;
Expand All @@ -26,6 +28,8 @@
import io.micronaut.http.server.HttpServerConfiguration;
import io.micronaut.http.server.netty.HttpContentProcessorResolver;
import io.micronaut.http.server.netty.multipart.MultipartBodyArgumentBinder;
import io.micronaut.scheduling.TaskExecutors;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

/**
Expand All @@ -41,6 +45,7 @@ class NettyBinderRegistrar implements BeanCreatedEventListener<RequestBinderRegi
private final HttpContentProcessorResolver httpContentProcessorResolver;
private final BeanLocator beanLocator;
private final BeanProvider<HttpServerConfiguration> httpServerConfiguration;
private final BeanProvider<ExecutorService> ioExecutor;

/**
* Default constructor.
Expand All @@ -49,16 +54,20 @@ class NettyBinderRegistrar implements BeanCreatedEventListener<RequestBinderRegi
* @param httpContentProcessorResolver The processor resolver
* @param beanLocator The bean locator
* @param httpServerConfiguration The server config
* @param ioExecutor The executor service
*/
NettyBinderRegistrar(
@Nullable ConversionService<?> conversionService,
HttpContentProcessorResolver httpContentProcessorResolver,
BeanLocator beanLocator,
BeanProvider<HttpServerConfiguration> httpServerConfiguration) {
BeanProvider<HttpServerConfiguration> httpServerConfiguration,
@Named(TaskExecutors.IO)
BeanProvider<ExecutorService> ioExecutor) {
this.conversionService = conversionService == null ? ConversionService.SHARED : conversionService;
this.httpContentProcessorResolver = httpContentProcessorResolver;
this.beanLocator = beanLocator;
this.httpServerConfiguration = httpServerConfiguration;
this.ioExecutor = ioExecutor;
}

@Override
Expand All @@ -77,7 +86,8 @@ public RequestBinderRegistry onCreated(BeanCreatedEvent<RequestBinderRegistry> e
httpServerConfiguration
));
registry.addRequestArgumentBinder(new InputStreamBodyBinder(
httpContentProcessorResolver
httpContentProcessorResolver,
ioExecutor.orElse(null)
));
return registry;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.micronaut.http.server.netty.stream

import io.micronaut.context.ApplicationContext
import io.micronaut.http.*
import io.micronaut.http.annotation.*
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.exceptions.HttpClientResponseException
import io.micronaut.http.client.multipart.MultipartBody
import io.micronaut.runtime.server.EmbeddedServer
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import jakarta.inject.Inject
import spock.lang.AutoCleanup
import spock.lang.Issue
import spock.lang.Shared
import spock.lang.Specification

import javax.annotation.Nullable
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

class InputStreamBodySpec extends Specification {

@Shared @AutoCleanup EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer)

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/6100')
void "test apply load to InputStream read"() {
given:
HttpClient client = embeddedServer.applicationContext.createBean(HttpClient, embeddedServer.getURI())

when:
int max = 30
CountDownLatch latch = new CountDownLatch(max)

ExecutorService pool = Executors.newCachedThreadPool()
ConcurrentLinkedQueue<HttpStatus> responses = new ConcurrentLinkedQueue()
for (int i = 0; i < max; i++) {
pool.submit(() -> {
try {
MultipartBody multipartBody = MultipartBody.builder()
.addPart("myfile",
"largefile" * 1024)
.build()
HttpRequest request = HttpRequest.POST("/input-stream-test/hello", multipartBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE)
HttpResponse response = client.toBlocking()
.exchange(request)
responses.add(response.status())
System.out.println(response.getStatus())
System.out.println(response.getHeaders().asMap())

} catch (HttpClientResponseException e) {
System.out.println(e.getStatus())
} catch (URISyntaxException e) {
e.printStackTrace()
} finally {
latch.countDown()
}

})
}
latch.await()

then:
responses.size() == 30
responses.every({ it == HttpStatus.OK})
}
}

@Controller("/input-stream-test")
class PayloadInputStream {

@Inject
@Client("/")
private HttpClient httpClient

private String responsePayload = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">\n" +
" <soap:Body>\n" +
" <ns2:getHelloWorldAsStringResponse xmlns:ns2=\"http://sample.soap.oracle/\">\n" +
" <return>Hello World %s</return>\n" +
" </ns2:getHelloWorldAsStringResponse>\n" +
" </soap:Body>\n" +
"</soap:Envelope>"


@Post("/hello")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.ALL)
@ExecuteOn(TaskExecutors.IO)
MutableHttpResponse<String> stream(@Body @Nullable InputStream payload) throws IOException {

//blocking read on injected http client
HttpResponse<String> resp = httpClient.toBlocking().exchange(HttpRequest
.GET(URI.create("/input-stream-test/hello/other")), String.class)
System.out.println(resp.getBody(String.class).get())


byte[] body = payload.bytes
String b = new String(body)
int l = body.length

responsePayload = "{\n" +
"\t\"payload\" : {\n" +
"\t\t\"name\" : \"1542\"\n" +
"\t}\n" +
"}"
return HttpResponse.ok().body(responsePayload).contentType(MediaType.APPLICATION_JSON)
}

@Get("/hello/other")
String other() {
return "Some body content"
}
}
13 changes: 13 additions & 0 deletions inject/src/main/java/io/micronaut/context/BeanProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,17 @@ default void ifResolvable(@NonNull Consumer<T> consumer) {
return (Argument) Argument.of(BeanProvider.class, Objects.requireNonNull(type, "Type cannot be null"));
}

/**
* Allows selecting an alternative bean if the backing bean is not present.
* @param alternative The alternative, can be {@code null}
* @return The bean if present or else the supplied alternative
* @since 3.0.2
*/
default @Nullable T orElse(@Nullable T alternative) {
if (isPresent()) {
return get();
} else {
return alternative;
}
}
}