Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* limitations under the License.
*/
package io.rsocket.transport.netty;

import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
Expand All @@ -28,44 +28,51 @@ public class NettyDuplexConnection implements DuplexConnection {
private final NettyInbound in;
private final NettyOutbound out;
private final NettyContext context;

private final MonoProcessor<Void> onClose;

public NettyDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
this.in = in;
this.out = out;
this.context = context;
this.onClose = MonoProcessor.create();

context.onClose(onClose::onComplete);
this.onClose
.doFinally(
s -> {
this.context.dispose();
this.context.channel().close();
})
.subscribe();
}

@Override
public Mono<Void> send(Publisher<Frame> frames) {
return Flux.from(frames).concatMap(this::sendOne).then();
}

@Override
public Mono<Void> sendOne(Frame frame) {
return out.sendObject(frame.content()).then();
}

@Override
public Flux<Frame> receive() {
return in.receive().map(buf -> Frame.from(buf.retain()));
}

@Override
public Mono<Void> close() {
return Mono.fromRunnable(
() -> {
context.dispose();
context.channel().close();
});
return Mono.fromRunnable(onClose::onComplete);
}

@Override
public Mono<Void> onClose() {
return context.onClose();
return onClose;
}

@Override
public double availability() {
return context.isDisposed() ? 0.0 : 1.0;
return onClose.isTerminated() ? 0.0 : 1.0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/
package io.rsocket.transport.netty;

import static io.netty.buffer.Unpooled.wrappedBuffer;
import static io.rsocket.frame.FrameHeaderFlyweight.FRAME_LENGTH_SIZE;

Expand All @@ -27,6 +26,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
Expand All @@ -39,57 +39,63 @@
* back on for frames received.
*/
public class WebsocketDuplexConnection implements DuplexConnection {
private final NettyInbound in;
private final NettyOutbound out;
private final NettyContext context;

public WebsocketDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
this.in = in;
this.out = out;
this.context = context;
}

@Override
public Mono<Void> send(Publisher<Frame> frames) {
return Flux.from(frames).concatMap(this::sendOne).then();
}

@Override
public Mono<Void> sendOne(Frame frame) {
return out.sendObject(new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE)))
.then();
}

@Override
public Flux<Frame> receive() {
return in.receive()
.map(
buf -> {
CompositeByteBuf composite = context.channel().alloc().compositeBuffer();
ByteBuf length = wrappedBuffer(new byte[FRAME_LENGTH_SIZE]);
FrameHeaderFlyweight.encodeLength(length, 0, buf.readableBytes());
composite.addComponents(true, length, buf.retain());
return Frame.from(composite);
});
}

@Override
public Mono<Void> close() {
return Mono.fromRunnable(
() -> {
if (!context.isDisposed()) {
context.channel().close();
}
});
}

@Override
public Mono<Void> onClose() {
return context.onClose();
}

@Override
public double availability() {
return context.isDisposed() ? 0.0 : 1.0;
}
private final NettyInbound in;
private final NettyOutbound out;
private final NettyContext context;
private final MonoProcessor<Void> onClose;

public WebsocketDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
this.in = in;
this.out = out;
this.context = context;
this.onClose = MonoProcessor.create();

context.onClose(onClose::onComplete);
this.onClose
.doFinally(
s -> {
this.context.dispose();
this.context.channel().close();
})
.subscribe();
}

@Override
public Mono<Void> send(Publisher<Frame> frames) {
return Flux.from(frames).concatMap(this::sendOne).then();
}

@Override
public Mono<Void> sendOne(Frame frame) {
return out.sendObject(new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE)))
.then();
}

@Override
public Flux<Frame> receive() {
return in.receive()
.map(
buf -> {
CompositeByteBuf composite = context.channel().alloc().compositeBuffer();
ByteBuf length = wrappedBuffer(new byte[FRAME_LENGTH_SIZE]);
FrameHeaderFlyweight.encodeLength(length, 0, buf.readableBytes());
composite.addComponents(true, length, buf.retain());
return Frame.from(composite);
});
}

@Override
public Mono<Void> close() {
return Mono.fromRunnable(onClose::onComplete);
}

@Override
public Mono<Void> onClose() {
return onClose;
}

@Override
public double availability() {
return onClose.isTerminated() ? 0.0 : 1.0;
}
}