Skip to content

Commit

Permalink
fix #61 Offer a simple blocking alternative API for client/server
Browse files Browse the repository at this point in the history
This commit offers a simplified API to start/stop client and servers in a
blocking fashion.

 - `start` method blocks until initialized, then offers a simplified API to
 shutdown.
 - `startAndAwait` blocks for the whole lifecycle of the client/server, which
 would usually be used in a main class (only way to shut down some servers
 would be to sigkill).
  • Loading branch information
simonbasle committed Jul 3, 2017
1 parent 4cc5e04 commit d9d9563
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 1 deletion.
51 changes: 51 additions & 0 deletions src/main/java/reactor/ipc/netty/NettyConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@

package reactor.ipc.netty;

import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.annotation.Nullable;

import io.netty.buffer.ByteBuf;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.ipc.connector.Connector;
import reactor.ipc.netty.tcp.BlockingNettyContext;

/**
* A Netty {@link Connector}
Expand All @@ -29,4 +36,48 @@
public interface NettyConnector<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound>
extends Connector<ByteBuf, ByteBuf, INBOUND, OUTBOUND> {

@Override
Mono<? extends NettyContext> newHandler(BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> ioHandler);

/**
* Start a Client or Server in a blocking fashion, and wait for it to finish initializing.
* The returned {@link BlockingNettyContext} class offers a simplified API around operating
* the client/server in a blocking fashion, including to {@link BlockingNettyContext#shutdown() shut it down}.
*
* @param handler the handler to start the client or server with.
* @param <T>
* @return a {@link BlockingNettyContext}
*/
default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
BlockingNettyContext start(T handler) {
return new BlockingNettyContext(newHandler(handler), getClass().getSimpleName());
}

/**
* Start a Client or Server in a fully blocking fashion, not only waiting for it to
* initialize but also blocking during the full lifecycle of the client/server.
* Since most servers will be long-lived, this is more adapted to running a server
* out of a main method, only allowing shutdown of the servers through sigkill.
* <p>
* Note that a {@link Runtime#addShutdownHook(Thread) JVM shutdown hook} is added
* by this method in order to properly disconnect the client/server upon receiving
* a sigkill signal.
*
* @param handler the handler to execute.
* @param onStart an optional callback to be invoked once the client/server has finished
* initializing (see {@link #startAndAwait(BiFunction, Consumer)}).
*/
default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
void startAndAwait(T handler, @Nullable Consumer<BlockingNettyContext> onStart) {
BlockingNettyContext facade = new BlockingNettyContext(newHandler(handler), getClass().getSimpleName());

if (onStart != null) {
onStart.accept(facade);
}
Runtime.getRuntime().addShutdownHook(new Thread(facade::shutdown));

facade.getContext()
.onClose()
.block();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public ChannelOperations<INBOUND, OUTBOUND> context(Consumer<NettyContext> conte
@Override
public void dispose() {
inbound.cancel();
//TODO shouldn't super.dispose be called there / channel closed?
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ final public void channelRead(ChannelHandlerContext ctx, Object msg)
try {
ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
if (ops != null) {
ChannelOperations.get(ctx.channel()).onInboundNext(ctx, msg);
ops.onInboundNext(ctx, msg);
}
else {
if (log.isDebugEnabled()) {
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/reactor/ipc/netty/http/server/HttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import reactor.ipc.netty.channel.ContextHandler;
import reactor.ipc.netty.http.HttpResources;
import reactor.ipc.netty.options.ServerOptions;
import reactor.ipc.netty.tcp.BlockingNettyContext;
import reactor.ipc.netty.tcp.TcpServer;

/**
Expand Down Expand Up @@ -156,6 +157,21 @@ public Mono<? extends NettyContext> newRouter(Consumer<? super HttpServerRoutes>
return newHandler(routes);
}

public BlockingNettyContext startRouter(Consumer<? super HttpServerRoutes> routesBuilder) {
Objects.requireNonNull(routesBuilder, "routeBuilder");
HttpServerRoutes routes = HttpServerRoutes.newRoutes();
routesBuilder.accept(routes);
return start(routes);
}

public void startRouterAndAwait(Consumer<? super HttpServerRoutes> routesBuilder,
Consumer<BlockingNettyContext> onStart) {
Objects.requireNonNull(routesBuilder, "routeBuilder");
HttpServerRoutes routes = HttpServerRoutes.newRoutes();
routesBuilder.accept(routes);
startAndAwait(routes, onStart);
}

static final LoggingHandler loggingHandler = new LoggingHandler(HttpServer.class);

final class TcpBridgeServer extends TcpServer
Expand Down
98 changes: 98 additions & 0 deletions src/main/java/reactor/ipc/netty/tcp/BlockingNettyContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package reactor.ipc.netty.tcp;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.concurrent.TimeoutException;

import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyContext;
import reactor.util.Logger;
import reactor.util.Loggers;

/**
* Wrap a {@link NettyContext} obtained from a {@link Mono} and offer methods to manage
* its lifecycle in a blocking fashion.
*
* @author Simon Baslé
*/
public class BlockingNettyContext {

private static final Logger LOG = Loggers.getLogger(BlockingNettyContext.class);

private final NettyContext context;
private final String description;

private Duration lifecycleTimeout;

public BlockingNettyContext(Mono<? extends NettyContext> contextAsync,
String description) {
this(contextAsync, description, Duration.ofSeconds(3));
}

public BlockingNettyContext(Mono<? extends NettyContext> contextAsync,
String description, Duration lifecycleTimeout) {
this.description = description;
this.lifecycleTimeout = lifecycleTimeout;
this.context = contextAsync
.timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be started within " + lifecycleTimeout.toMillis() + "ms")))
.doOnNext(ctx -> LOG.info("Started {} on {}", description, ctx.address()))
.block();

context.onClose().subscribe(null,
e -> LOG.error("Stopped {} on {} with an error {}", description, context.address(), e),
() -> LOG.info("Stopped {} on {}", description, context.address()));
}

/**
* Change the lifecycle timeout applied to the {@link #shutdown()} operation (as this can
* only be called AFTER the {@link NettyContext} has been "started").
*
* @param timeout the new timeout to apply on shutdown.
*/
public void setLifecycleTimeout(Duration timeout) {
this.lifecycleTimeout = timeout;
}

/**
* Get the {@link NettyContext} wrapped by this facade.
* @return the original NettyContext.
*/
public NettyContext getContext() {
return context;
}

/**
* Return this server's port.
* @return The port the server is bound to.
*/
public int getPort() {
return context.address().getPort();
}

/**
* Return the server's host String. That is, the hostname or in case the server was bound
* to a literal IP adress, the IP string representation (rather than performing a reverse-DNS
* lookup).
*
* @return the host string, without reverse DNS lookup
* @see NettyContext#address()
* @see InetSocketAddress#getHostString()
*/
public String getHost() {
return context.address().getHostString();
}

/**
* Shut down the {@link NettyContext} and wait for its termination, up to the
* {@link #setLifecycleTimeout(Duration) lifecycle timeout}.
*/
public void shutdown() {
if (context.isDisposed()) {
return;
}
context.dispose();
context.onClose()
.timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms")))
.block();
}
}
157 changes: 157 additions & 0 deletions src/test/java/reactor/ipc/netty/tcp/BlockingNettyContextTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package reactor.ipc.netty.tcp;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import io.netty.channel.Channel;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyPipeline;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

public class BlockingNettyContextTest {

static final NettyContext NEVER_STOP_CONTEXT = new NettyContext() {
@Override
public Channel channel() {
return new EmbeddedChannel();
}

@Override
public InetSocketAddress address() {
return InetSocketAddress.createUnresolved("localhost", 4321);
}

@Override
public Mono<Void> onClose() {
return Mono.never();
}
};

@Test
public void simpleServerFromAsyncServer() throws InterruptedException {
BlockingNettyContext simpleServer =
TcpServer.create()
.start((in, out) -> out
.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(
in.receive()
.asString()
.takeUntil(s -> s.endsWith("CONTROL"))
.map(s -> "ECHO: " + s.replaceAll("CONTROL", ""))
.concatWith(Mono.just("DONE"))
)
.neverComplete()
);

System.out.println(simpleServer.getHost());
System.out.println(simpleServer.getPort());

AtomicReference<List<String>> data1 = new AtomicReference<>();
AtomicReference<List<String>> data2 = new AtomicReference<>();

BlockingNettyContext simpleClient1 =
TcpClient.create(simpleServer.getPort())
.start((in, out) -> out.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(Flux.just("Hello", "World", "CONTROL"))
.then(in.receive()
.asString()
.takeUntil(s -> s.endsWith("DONE"))
.map(s -> s.replaceAll("DONE", ""))
.filter(s -> !s.isEmpty())
.collectList()
.doOnNext(data1::set)
.doOnNext(System.err::println)
.then()));

BlockingNettyContext simpleClient2 =
TcpClient.create(simpleServer.getPort())
.start((in, out) -> out.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(Flux.just("How", "Are", "You?", "CONTROL"))
.then(in.receive()
.asString()
.takeUntil(s -> s.endsWith("DONE"))
.map(s -> s.replaceAll("DONE", ""))
.filter(s -> !s.isEmpty())
.collectList()
.doOnNext(data2::set)
.doOnNext(System.err::println)
.then()));

Thread.sleep(100);
System.err.println("STOPPING 1");
simpleClient1.shutdown();

System.err.println("STOPPING 2");
simpleClient2.shutdown();

System.err.println("STOPPING SERVER");
simpleServer.shutdown();

assertThat(data1.get())
.allSatisfy(s -> assertThat(s).startsWith("ECHO: "));
assertThat(data2.get())
.allSatisfy(s -> assertThat(s).startsWith("ECHO: "));

assertThat(data1.get()
.toString()
.replaceAll("ECHO: ", "")
.replaceAll(", ", ""))
.isEqualTo("[HelloWorld]");
assertThat(data2.get()
.toString()
.replaceAll("ECHO: ", "")
.replaceAll(", ", ""))
.isEqualTo("[HowAreYou?]");
}

@Test
public void testTimeoutOnStart() {
assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(() -> new BlockingNettyContext(Mono.never(), "TEST NEVER START", Duration.ofMillis(100)))
.withCauseExactlyInstanceOf(TimeoutException.class)
.withMessage("java.util.concurrent.TimeoutException: TEST NEVER START couldn't be started within 100ms");
}

@Test
public void testTimeoutOnStop() {
final BlockingNettyContext neverStop =
new BlockingNettyContext(Mono.just(NEVER_STOP_CONTEXT), "TEST NEVER STOP", Duration.ofMillis(100));

assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(neverStop::shutdown)
.withCauseExactlyInstanceOf(TimeoutException.class)
.withMessage("java.util.concurrent.TimeoutException: TEST NEVER STOP couldn't be stopped within 100ms");
}

@Test
public void testTimeoutOnStopChangedTimeout() {
final BlockingNettyContext neverStop =
new BlockingNettyContext(Mono.just(NEVER_STOP_CONTEXT), "TEST NEVER STOP", Duration.ofMillis(500));

neverStop.setLifecycleTimeout(Duration.ofMillis(100));

assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(neverStop::shutdown)
.withCauseExactlyInstanceOf(TimeoutException.class)
.withMessage("java.util.concurrent.TimeoutException: TEST NEVER STOP couldn't be stopped within 100ms");
}

@Test
public void getContextAddressAndHost() {
BlockingNettyContext
facade = new BlockingNettyContext(Mono.just(NEVER_STOP_CONTEXT), "foo");

assertThat(facade.getContext()).isSameAs(NEVER_STOP_CONTEXT);
assertThat(facade.getPort()).isEqualTo(NEVER_STOP_CONTEXT.address().getPort());
assertThat(facade.getHost()).isEqualTo(NEVER_STOP_CONTEXT.address().getHostString());
}
}

0 comments on commit d9d9563

Please sign in to comment.