diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/binders/InputStreamBodyBinder.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/binders/InputStreamBodyBinder.java index a2433a745e5..89612e117ca 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/binders/InputStreamBodyBinder.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/binders/InputStreamBodyBinder.java @@ -73,62 +73,80 @@ public BindingResult bind(ArgumentConversionContext co NettyHttpRequest nettyHttpRequest = (NettyHttpRequest) source; io.netty.handler.codec.http.HttpRequest nativeRequest = nettyHttpRequest.getNativeRequest(); if (nativeRequest instanceof StreamedHttpRequest) { - HttpContentProcessor processor = (HttpContentProcessor) processorResolver.resolve(nettyHttpRequest, context.getArgument()); PipedOutputStream outputStream = new PipedOutputStream(); try { - PipedInputStream inputStream = new PipedInputStream(outputStream); - processor.subscribe(new CompletionAwareSubscriber() { + PipedInputStream inputStream = new PipedInputStream(outputStream) { + private volatile HttpContentProcessor processor; - @Override - protected void doOnSubscribe(Subscription subscription) { - subscription.request(1); - } + private void init() { + if (processor == null) { + processor = (HttpContentProcessor) processorResolver.resolve(nettyHttpRequest, context.getArgument()); + processor.subscribe(new CompletionAwareSubscriber() { - @Override - protected void doOnNext(ByteBufHolder message) { - if (LOG.isTraceEnabled()) { - LOG.trace("Server received streaming message for argument [{}]: {}", context.getArgument(), message); - } - ByteBuf content = message.content(); - if (!(content instanceof EmptyByteBuf)) { - try { - byte[] bytes = ByteBufUtil.getBytes(content); - outputStream.write(bytes, 0, bytes.length); - } catch (IOException e) { - subscription.cancel(); - return; - } finally { - content.release(); - } + @Override + protected void doOnSubscribe(Subscription subscription) { + subscription.request(1); + } + + @Override + protected synchronized void doOnNext(ByteBufHolder message) { + if (LOG.isTraceEnabled()) { + LOG.trace("Server received streaming message for argument [{}]: {}", context.getArgument(), message); + } + ByteBuf content = message.content(); + if (!(content instanceof EmptyByteBuf)) { + try { + byte[] bytes = ByteBufUtil.getBytes(content); + outputStream.write(bytes, 0, bytes.length); + } catch (IOException e) { + subscription.cancel(); + return; + } finally { + content.release(); + } + } + subscription.request(1); + } + + @Override + protected synchronized void doOnError(Throwable t) { + if (LOG.isTraceEnabled()) { + LOG.trace("Server received error for argument [" + context.getArgument() + "]: " + t.getMessage(), t); + } + try { + outputStream.close(); + } catch (IOException ignored) { + } finally { + subscription.cancel(); + } + } + + @Override + protected synchronized void doOnComplete() { + if (LOG.isTraceEnabled()) { + LOG.trace("Done receiving messages for argument: {}", context.getArgument()); + } + try { + outputStream.close(); + } catch (IOException ignored) { + } + } + }); } - subscription.request(1); } @Override - protected void doOnError(Throwable t) { - if (LOG.isTraceEnabled()) { - LOG.trace("Server received error for argument [" + context.getArgument() + "]: " + t.getMessage(), t); - } - try { - outputStream.close(); - } catch (IOException ignored) { - } finally { - subscription.cancel(); - } + public synchronized int read(byte[] b, int off, int len) throws IOException { + init(); + return super.read(b, off, len); } @Override - protected void doOnComplete() { - if (LOG.isTraceEnabled()) { - LOG.trace("Done receiving messages for argument: {}", context.getArgument()); - } - try { - outputStream.close(); - } catch (IOException ignored) { - } + public synchronized int read() throws IOException { + init(); + return super.read(); } - - }); + }; return () -> Optional.of(inputStream); } catch (IOException e) { diff --git a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/stream/InputStreamBodySpec.groovy b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/stream/InputStreamBodySpec.groovy new file mode 100644 index 00000000000..b517ba56f46 --- /dev/null +++ b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/stream/InputStreamBodySpec.groovy @@ -0,0 +1,117 @@ +package io.micronaut.http.server.netty.stream + +import io.micronaut.context.ApplicationContext +import io.micronaut.http.* +import io.micronaut.http.annotation.* +import io.micronaut.http.client.HttpClient +import io.micronaut.http.client.annotation.Client +import io.micronaut.http.client.exceptions.HttpClientResponseException +import io.micronaut.http.client.multipart.MultipartBody +import io.micronaut.runtime.server.EmbeddedServer +import io.micronaut.scheduling.TaskExecutors +import io.micronaut.scheduling.annotation.ExecuteOn +import jakarta.inject.Inject +import spock.lang.AutoCleanup +import spock.lang.Issue +import spock.lang.Shared +import spock.lang.Specification + +import javax.annotation.Nullable +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.CountDownLatch +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors + +class InputStreamBodySpec extends Specification { + + @Shared @AutoCleanup EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer) + + @Issue('https://github.com/micronaut-projects/micronaut-core/issues/6100') + void "test apply load to InputStream read"() { + given: + HttpClient client = embeddedServer.applicationContext.createBean(HttpClient, embeddedServer.getURI()) + + when: + int max = 30 + CountDownLatch latch = new CountDownLatch(max) + + ExecutorService pool = Executors.newCachedThreadPool() + ConcurrentLinkedQueue responses = new ConcurrentLinkedQueue() + for (int i = 0; i < max; i++) { + pool.submit(() -> { + try { + MultipartBody multipartBody = MultipartBody.builder() + .addPart("myfile", + "largefile" * 1024) + .build() + HttpRequest request = HttpRequest.POST("/input-stream-test/hello", multipartBody) + .contentType(MediaType.MULTIPART_FORM_DATA_TYPE) + HttpResponse response = client.toBlocking() + .exchange(request) + responses.add(response.status()) + System.out.println(response.getStatus()) + System.out.println(response.getHeaders().asMap()) + + } catch (HttpClientResponseException e) { + System.out.println(e.getStatus()) + } catch (URISyntaxException e) { + e.printStackTrace() + } finally { + latch.countDown() + } + + }) + } + latch.await() + + then: + responses.size() == 30 + responses.every({ it == HttpStatus.OK}) + } +} + +@Controller("/input-stream-test") +class PayloadInputStream { + + @Inject + @Client("/") + private HttpClient httpClient + + private String responsePayload = "\n" + + " \n" + + " \n" + + " Hello World %s\n" + + " \n" + + " \n" + + "" + + + @Post("/hello") + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.ALL) + @ExecuteOn(TaskExecutors.IO) + MutableHttpResponse stream(@Body @Nullable InputStream payload) throws IOException { + + //blocking read on injected http client + HttpResponse resp = httpClient.toBlocking().exchange(HttpRequest + .GET(URI.create("/input-stream-test/hello/other")), String.class) + System.out.println(resp.getBody(String.class).get()) + + + byte[] body = payload.bytes + String b = new String(body) + int l = body.length + + responsePayload = "{\n" + + "\t\"payload\" : {\n" + + "\t\t\"name\" : \"1542\"\n" + + "\t}\n" + + "}" + return HttpResponse.ok().body(responsePayload).contentType(MediaType.APPLICATION_JSON) + } + + @Get("/hello/other") + String other() { + return "Some body content" + } +} \ No newline at end of file diff --git a/inject/src/main/java/io/micronaut/context/BeanProvider.java b/inject/src/main/java/io/micronaut/context/BeanProvider.java index 36ac551f0d1..258f2af5973 100644 --- a/inject/src/main/java/io/micronaut/context/BeanProvider.java +++ b/inject/src/main/java/io/micronaut/context/BeanProvider.java @@ -144,4 +144,17 @@ default void ifResolvable(@NonNull Consumer consumer) { return (Argument) Argument.of(BeanProvider.class, Objects.requireNonNull(type, "Type cannot be null")); } + /** + * Allows selecting an alternative bean if the backing bean is not present. + * @param alternative The alternative, can be {@code null} + * @return The bean if present or else the supplied alternative + * @since 3.0.2 + */ + default @Nullable T orElse(@Nullable T alternative) { + if (isPresent()) { + return get(); + } else { + return alternative; + } + } }