Skip to content

Commit

Permalink
Skip offloading to IO
Browse files Browse the repository at this point in the history
  • Loading branch information
jameskleeh committed Sep 9, 2021
1 parent addc552 commit dcf80f3
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.micronaut.http.server.netty.binders;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.subscriber.CompletionAwareSubscriber;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.type.Argument;
Expand All @@ -27,27 +26,19 @@
import io.micronaut.http.server.netty.HttpContentProcessorResolver;
import io.micronaut.http.server.netty.NettyHttpRequest;
import io.micronaut.http.server.netty.NettyHttpServer;
import io.micronaut.scheduling.TaskExecutors;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.EmptyByteBuf;
import jakarta.inject.Named;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;

/**
* Responsible for binding to a {@link InputStream} argument from the body of the request.
Expand All @@ -62,19 +53,12 @@ public class InputStreamBodyBinder implements NonBlockingBodyArgumentBinder<Inpu
private static final Logger LOG = LoggerFactory.getLogger(NettyHttpServer.class);

private final HttpContentProcessorResolver processorResolver;
private final Scheduler ioExecutor;

/**
* @param processorResolver The http content processor resolver
* @param ioExecutor The io executor
*/
public InputStreamBodyBinder(
HttpContentProcessorResolver processorResolver,
@Named(TaskExecutors.IO)
@Nullable
ExecutorService ioExecutor) {
public InputStreamBodyBinder(HttpContentProcessorResolver processorResolver) {
this.processorResolver = processorResolver;
this.ioExecutor = Schedulers.fromExecutor(ioExecutor != null ? ioExecutor : ForkJoinPool.commonPool());
}

@Override
Expand All @@ -91,72 +75,63 @@ public BindingResult<InputStream> bind(ArgumentConversionContext<InputStream> co
if (nativeRequest instanceof StreamedHttpRequest) {
PipedOutputStream outputStream = new PipedOutputStream();
try {

PipedInputStream inputStream = new PipedInputStream(outputStream) {
private volatile HttpContentProcessor<ByteBufHolder> processor;
@Override
public synchronized int read() throws IOException {
init();
return super.read();
}

private void init() {
if (processor == null) {
processor = (HttpContentProcessor<ByteBufHolder>) processorResolver.resolve(nettyHttpRequest, context.getArgument());
Flux.from(processor)
.publishOn(ioExecutor)
.subscribe(new CompletionAwareSubscriber<ByteBufHolder>() {

@Override
protected void doOnSubscribe(Subscription subscription) {
subscription.request(1);
}

@Override
protected synchronized void doOnNext(ByteBufHolder message) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received streaming message for argument [{}]: {}", context.getArgument(), message);
}
ByteBuf content = message.content();
if (!(content instanceof EmptyByteBuf)) {
try {
byte[] bytes = ByteBufUtil.getBytes(content);
outputStream.write(bytes, 0, bytes.length);
} catch (IOException e) {
subscription.cancel();
return;
} finally {
content.release();
}
}
subscription.request(1);
}

@Override
protected synchronized void doOnError(Throwable t) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received error for argument [" + context.getArgument() + "]: " + t.getMessage(), t);
}
try {
outputStream.close();
} catch (IOException ignored) {
} finally {
subscription.cancel();
}
}

@Override
protected synchronized void doOnComplete() {
if (LOG.isTraceEnabled()) {
LOG.trace("Done receiving messages for argument: {}", context.getArgument());
}
try {
outputStream.close();
} catch (IOException ignored) {
}
processor.subscribe(new CompletionAwareSubscriber<ByteBufHolder>() {

@Override
protected void doOnSubscribe(Subscription subscription) {
subscription.request(1);
}

@Override
protected synchronized void doOnNext(ByteBufHolder message) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received streaming message for argument [{}]: {}", context.getArgument(), message);
}
ByteBuf content = message.content();
if (!(content instanceof EmptyByteBuf)) {
try {
byte[] bytes = ByteBufUtil.getBytes(content);
outputStream.write(bytes, 0, bytes.length);
} catch (IOException e) {
subscription.cancel();
return;
} finally {
content.release();
}

});
}
subscription.request(1);
}

@Override
protected synchronized void doOnError(Throwable t) {
if (LOG.isTraceEnabled()) {
LOG.trace("Server received error for argument [" + context.getArgument() + "]: " + t.getMessage(), t);
}
try {
outputStream.close();
} catch (IOException ignored) {
} finally {
subscription.cancel();
}
}

@Override
protected synchronized void doOnComplete() {
if (LOG.isTraceEnabled()) {
LOG.trace("Done receiving messages for argument: {}", context.getArgument());
}
try {
outputStream.close();
} catch (IOException ignored) {
}
}
});
}
}

Expand All @@ -165,6 +140,12 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
init();
return super.read(b, off, len);
}

@Override
public synchronized int read() throws IOException {
init();
return super.read();
}
};

return () -> Optional.of(inputStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.micronaut.http.server.netty.binders;

import java.util.concurrent.ExecutorService;

import io.micronaut.context.BeanLocator;
import io.micronaut.context.BeanProvider;
import io.micronaut.context.event.BeanCreatedEvent;
Expand All @@ -28,8 +26,6 @@
import io.micronaut.http.server.HttpServerConfiguration;
import io.micronaut.http.server.netty.HttpContentProcessorResolver;
import io.micronaut.http.server.netty.multipart.MultipartBodyArgumentBinder;
import io.micronaut.scheduling.TaskExecutors;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

/**
Expand All @@ -45,7 +41,6 @@ class NettyBinderRegistrar implements BeanCreatedEventListener<RequestBinderRegi
private final HttpContentProcessorResolver httpContentProcessorResolver;
private final BeanLocator beanLocator;
private final BeanProvider<HttpServerConfiguration> httpServerConfiguration;
private final BeanProvider<ExecutorService> ioExecutor;

/**
* Default constructor.
Expand All @@ -54,20 +49,16 @@ class NettyBinderRegistrar implements BeanCreatedEventListener<RequestBinderRegi
* @param httpContentProcessorResolver The processor resolver
* @param beanLocator The bean locator
* @param httpServerConfiguration The server config
* @param ioExecutor The executor service
*/
NettyBinderRegistrar(
@Nullable ConversionService<?> conversionService,
HttpContentProcessorResolver httpContentProcessorResolver,
BeanLocator beanLocator,
BeanProvider<HttpServerConfiguration> httpServerConfiguration,
@Named(TaskExecutors.IO)
BeanProvider<ExecutorService> ioExecutor) {
BeanProvider<HttpServerConfiguration> httpServerConfiguration) {
this.conversionService = conversionService == null ? ConversionService.SHARED : conversionService;
this.httpContentProcessorResolver = httpContentProcessorResolver;
this.beanLocator = beanLocator;
this.httpServerConfiguration = httpServerConfiguration;
this.ioExecutor = ioExecutor;
}

@Override
Expand All @@ -86,8 +77,7 @@ public RequestBinderRegistry onCreated(BeanCreatedEvent<RequestBinderRegistry> e
httpServerConfiguration
));
registry.addRequestArgumentBinder(new InputStreamBodyBinder(
httpContentProcessorResolver,
ioExecutor.orElse(null)
httpContentProcessorResolver
));
return registry;
}
Expand Down

0 comments on commit dcf80f3

Please sign in to comment.