Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't block the event loop when reading inputstream. #6120

Merged
merged 6 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,62 +73,80 @@ public BindingResult<InputStream> bind(ArgumentConversionContext<InputStream> co
NettyHttpRequest nettyHttpRequest = (NettyHttpRequest) source;
io.netty.handler.codec.http.HttpRequest nativeRequest = nettyHttpRequest.getNativeRequest();
if (nativeRequest instanceof StreamedHttpRequest) {
HttpContentProcessor<ByteBufHolder> processor = (HttpContentProcessor<ByteBufHolder>) processorResolver.resolve(nettyHttpRequest, context.getArgument());
PipedOutputStream outputStream = new PipedOutputStream();
try {
PipedInputStream inputStream = new PipedInputStream(outputStream);
processor.subscribe(new CompletionAwareSubscriber<ByteBufHolder>() {
PipedInputStream inputStream = new PipedInputStream(outputStream) {
private volatile HttpContentProcessor<ByteBufHolder> processor;

@Override
protected void doOnSubscribe(Subscription subscription) {
subscription.request(1);
}
private void init() {
if (processor == null) {
processor = (HttpContentProcessor<ByteBufHolder>) processorResolver.resolve(nettyHttpRequest, context.getArgument());
processor.subscribe(new CompletionAwareSubscriber<ByteBufHolder>() {

@Override
protected void doOnNext(ByteBufHolder message) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received streaming message for argument [{}]: {}", context.getArgument(), message);
}
ByteBuf content = message.content();
if (!(content instanceof EmptyByteBuf)) {
try {
byte[] bytes = ByteBufUtil.getBytes(content);
outputStream.write(bytes, 0, bytes.length);
} catch (IOException e) {
subscription.cancel();
return;
} finally {
content.release();
}
@Override
protected void doOnSubscribe(Subscription subscription) {
subscription.request(1);
}

@Override
protected synchronized void doOnNext(ByteBufHolder message) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received streaming message for argument [{}]: {}", context.getArgument(), message);
}
ByteBuf content = message.content();
if (!(content instanceof EmptyByteBuf)) {
try {
byte[] bytes = ByteBufUtil.getBytes(content);
outputStream.write(bytes, 0, bytes.length);
} catch (IOException e) {
subscription.cancel();
return;
} finally {
content.release();
}
}
subscription.request(1);
}

@Override
protected synchronized void doOnError(Throwable t) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received error for argument [" + context.getArgument() + "]: " + t.getMessage(), t);
}
try {
outputStream.close();
} catch (IOException ignored) {
} finally {
subscription.cancel();
}
}

@Override
protected synchronized void doOnComplete() {
if (LOG.isTraceEnabled()) {
LOG.trace("Done receiving messages for argument: {}", context.getArgument());
}
try {
outputStream.close();
} catch (IOException ignored) {
}
}
});
}
subscription.request(1);
}

@Override
protected void doOnError(Throwable t) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received error for argument [" + context.getArgument() + "]: " + t.getMessage(), t);
}
try {
outputStream.close();
} catch (IOException ignored) {
} finally {
subscription.cancel();
}
public synchronized int read(byte[] b, int off, int len) throws IOException {
init();
return super.read(b, off, len);
}

@Override
protected void doOnComplete() {
if (LOG.isTraceEnabled()) {
LOG.trace("Done receiving messages for argument: {}", context.getArgument());
}
try {
outputStream.close();
} catch (IOException ignored) {
}
public synchronized int read() throws IOException {
init();
return super.read();
}

});
};

return () -> Optional.of(inputStream);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.micronaut.http.server.netty.stream

import io.micronaut.context.ApplicationContext
import io.micronaut.http.*
import io.micronaut.http.annotation.*
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.exceptions.HttpClientResponseException
import io.micronaut.http.client.multipart.MultipartBody
import io.micronaut.runtime.server.EmbeddedServer
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import jakarta.inject.Inject
import spock.lang.AutoCleanup
import spock.lang.Issue
import spock.lang.Shared
import spock.lang.Specification

import javax.annotation.Nullable
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

class InputStreamBodySpec extends Specification {

@Shared @AutoCleanup EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer)

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/6100')
void "test apply load to InputStream read"() {
given:
HttpClient client = embeddedServer.applicationContext.createBean(HttpClient, embeddedServer.getURI())

when:
int max = 30
CountDownLatch latch = new CountDownLatch(max)

ExecutorService pool = Executors.newCachedThreadPool()
ConcurrentLinkedQueue<HttpStatus> responses = new ConcurrentLinkedQueue()
for (int i = 0; i < max; i++) {
pool.submit(() -> {
try {
MultipartBody multipartBody = MultipartBody.builder()
.addPart("myfile",
"largefile" * 1024)
.build()
HttpRequest request = HttpRequest.POST("/input-stream-test/hello", multipartBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE)
HttpResponse response = client.toBlocking()
.exchange(request)
responses.add(response.status())
System.out.println(response.getStatus())
System.out.println(response.getHeaders().asMap())

} catch (HttpClientResponseException e) {
System.out.println(e.getStatus())
} catch (URISyntaxException e) {
e.printStackTrace()
} finally {
latch.countDown()
}

})
}
latch.await()

then:
responses.size() == 30
responses.every({ it == HttpStatus.OK})
}
}

@Controller("/input-stream-test")
class PayloadInputStream {

@Inject
@Client("/")
private HttpClient httpClient

private String responsePayload = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">\n" +
" <soap:Body>\n" +
" <ns2:getHelloWorldAsStringResponse xmlns:ns2=\"http://sample.soap.oracle/\">\n" +
" <return>Hello World %s</return>\n" +
" </ns2:getHelloWorldAsStringResponse>\n" +
" </soap:Body>\n" +
"</soap:Envelope>"


@Post("/hello")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.ALL)
@ExecuteOn(TaskExecutors.IO)
MutableHttpResponse<String> stream(@Body @Nullable InputStream payload) throws IOException {

//blocking read on injected http client
HttpResponse<String> resp = httpClient.toBlocking().exchange(HttpRequest
.GET(URI.create("/input-stream-test/hello/other")), String.class)
System.out.println(resp.getBody(String.class).get())


byte[] body = payload.bytes
String b = new String(body)
int l = body.length

responsePayload = "{\n" +
"\t\"payload\" : {\n" +
"\t\t\"name\" : \"1542\"\n" +
"\t}\n" +
"}"
return HttpResponse.ok().body(responsePayload).contentType(MediaType.APPLICATION_JSON)
}

@Get("/hello/other")
String other() {
return "Some body content"
}
}
13 changes: 13 additions & 0 deletions inject/src/main/java/io/micronaut/context/BeanProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,17 @@ default void ifResolvable(@NonNull Consumer<T> consumer) {
return (Argument) Argument.of(BeanProvider.class, Objects.requireNonNull(type, "Type cannot be null"));
}

/**
* Allows selecting an alternative bean if the backing bean is not present.
* @param alternative The alternative, can be {@code null}
* @return The bean if present or else the supplied alternative
* @since 3.0.2
*/
default @Nullable T orElse(@Nullable T alternative) {
if (isPresent()) {
return get();
} else {
return alternative;
}
}
}