From 133d0c269ab536d84da0a244751b8667ecb1be78 Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Sun, 15 Oct 2017 22:01:32 -0700 Subject: [PATCH] use MonoProcessor to close and not depend on netty --- .../netty/NettyDuplexConnection.java | 37 +++--- .../netty/WebsocketDuplexConnection.java | 114 +++++++++--------- 2 files changed, 82 insertions(+), 69 deletions(-) diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java index 2afd85160..0132253f9 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java @@ -14,12 +14,12 @@ * limitations under the License. */ package io.rsocket.transport.netty; - import io.rsocket.DuplexConnection; import io.rsocket.Frame; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import reactor.ipc.netty.NettyContext; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; @@ -28,44 +28,51 @@ public class NettyDuplexConnection implements DuplexConnection { private final NettyInbound in; private final NettyOutbound out; private final NettyContext context; - + private final MonoProcessor onClose; + public NettyDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) { this.in = in; this.out = out; this.context = context; + this.onClose = MonoProcessor.create(); + + context.onClose(onClose::onComplete); + this.onClose + .doFinally( + s -> { + this.context.dispose(); + this.context.channel().close(); + }) + .subscribe(); } - + @Override public Mono send(Publisher frames) { return Flux.from(frames).concatMap(this::sendOne).then(); } - + @Override public Mono sendOne(Frame frame) { return out.sendObject(frame.content()).then(); } - + @Override public Flux receive() { return in.receive().map(buf -> Frame.from(buf.retain())); } - + @Override public Mono close() { - return Mono.fromRunnable( - () -> { - context.dispose(); - context.channel().close(); - }); + return Mono.fromRunnable(onClose::onComplete); } - + @Override public Mono onClose() { - return context.onClose(); + return onClose; } - + @Override public double availability() { - return context.isDisposed() ? 0.0 : 1.0; + return onClose.isTerminated() ? 0.0 : 1.0; } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java index 754f3feff..540f312a2 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java @@ -14,7 +14,6 @@ * limitations under the License. */ package io.rsocket.transport.netty; - import static io.netty.buffer.Unpooled.wrappedBuffer; import static io.rsocket.frame.FrameHeaderFlyweight.FRAME_LENGTH_SIZE; @@ -27,6 +26,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import reactor.ipc.netty.NettyContext; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; @@ -39,57 +39,63 @@ * back on for frames received. */ public class WebsocketDuplexConnection implements DuplexConnection { - private final NettyInbound in; - private final NettyOutbound out; - private final NettyContext context; - - public WebsocketDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) { - this.in = in; - this.out = out; - this.context = context; - } - - @Override - public Mono send(Publisher frames) { - return Flux.from(frames).concatMap(this::sendOne).then(); - } - - @Override - public Mono sendOne(Frame frame) { - return out.sendObject(new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE))) - .then(); - } - - @Override - public Flux receive() { - return in.receive() - .map( - buf -> { - CompositeByteBuf composite = context.channel().alloc().compositeBuffer(); - ByteBuf length = wrappedBuffer(new byte[FRAME_LENGTH_SIZE]); - FrameHeaderFlyweight.encodeLength(length, 0, buf.readableBytes()); - composite.addComponents(true, length, buf.retain()); - return Frame.from(composite); - }); - } - - @Override - public Mono close() { - return Mono.fromRunnable( - () -> { - if (!context.isDisposed()) { - context.channel().close(); - } - }); - } - - @Override - public Mono onClose() { - return context.onClose(); - } - - @Override - public double availability() { - return context.isDisposed() ? 0.0 : 1.0; - } + private final NettyInbound in; + private final NettyOutbound out; + private final NettyContext context; + private final MonoProcessor onClose; + + public WebsocketDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) { + this.in = in; + this.out = out; + this.context = context; + this.onClose = MonoProcessor.create(); + + context.onClose(onClose::onComplete); + this.onClose + .doFinally( + s -> { + this.context.dispose(); + this.context.channel().close(); + }) + .subscribe(); + } + + @Override + public Mono send(Publisher frames) { + return Flux.from(frames).concatMap(this::sendOne).then(); + } + + @Override + public Mono sendOne(Frame frame) { + return out.sendObject(new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE))) + .then(); + } + + @Override + public Flux receive() { + return in.receive() + .map( + buf -> { + CompositeByteBuf composite = context.channel().alloc().compositeBuffer(); + ByteBuf length = wrappedBuffer(new byte[FRAME_LENGTH_SIZE]); + FrameHeaderFlyweight.encodeLength(length, 0, buf.readableBytes()); + composite.addComponents(true, length, buf.retain()); + return Frame.from(composite); + }); + } + + @Override + public Mono close() { + return Mono.fromRunnable(onClose::onComplete); + } + + @Override + public Mono onClose() { + return onClose; + } + + @Override + public double availability() { + return onClose.isTerminated() ? 0.0 : 1.0; + } }