Skip to content

Commit

Permalink
#533 drafts receiveCloseStatus method
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed Sep 1, 2019
1 parent 5ab4470 commit 51a897d
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 7 deletions.
Expand Up @@ -33,10 +33,12 @@
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.FutureMono;
import reactor.netty.NettyOutbound;
import reactor.netty.NettyPipeline;
Expand All @@ -55,6 +57,7 @@ final class WebsocketClientOperations extends HttpClientOperations
implements WebsocketInbound, WebsocketOutbound {

final WebSocketClientHandshaker handshaker;
final MonoProcessor<WebSocketCloseStatus> onCloseState;

volatile int closeSent;

Expand All @@ -64,6 +67,7 @@ final class WebsocketClientOperations extends HttpClientOperations
HttpClientOperations replaced) {
super(replaced);
Channel channel = channel();
onCloseState = MonoProcessor.create();

handshaker = WebSocketClientHandshakerFactory.newHandshaker(currentURI,
WebSocketVersion.V13,
Expand Down Expand Up @@ -195,13 +199,20 @@ public Mono<Void> sendClose(int rsv, int statusCode, @javax.annotation.Nullable
return sendClose(new CloseWebSocketFrame(true, rsv, statusCode, reasonText));
}

@Override
@SuppressWarnings("unchecked")
public Mono<WebSocketCloseStatus> receiveCloseStatus() {
return onCloseState.or((Mono)onTerminate());
}

Mono<Void> sendClose(CloseWebSocketFrame frame) {
if (CLOSE_SENT.get(this) == 0) {
//commented for now as we assume the close is always scheduled (deferFuture runs)
//onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame));
return FutureMono.deferFuture(() -> {
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
discard();
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
return channel().writeAndFlush(frame)
.addListener(ChannelFutureListener.CLOSE);
}
Expand All @@ -219,8 +230,15 @@ void sendCloseNow(@Nullable CloseWebSocketFrame frame) {
return;
}
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
channel().writeAndFlush(frame == null ? new CloseWebSocketFrame() : frame)
.addListener(ChannelFutureListener.CLOSE);
if (frame != null) {
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
channel().writeAndFlush(frame)
.addListener(ChannelFutureListener.CLOSE);
} else {
onCloseState.onNext(new WebSocketCloseStatus(-1, ""));
channel().writeAndFlush(new CloseWebSocketFrame())
.addListener(ChannelFutureListener.CLOSE);
}
}
else if (frame != null) {
frame.release();
Expand Down
Expand Up @@ -33,12 +33,14 @@
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.FutureMono;
import reactor.netty.NettyOutbound;
import reactor.netty.NettyPipeline;
Expand All @@ -58,8 +60,9 @@
final class WebsocketServerOperations extends HttpServerOperations
implements WebsocketInbound, WebsocketOutbound {

final WebSocketServerHandshaker handshaker;
final ChannelPromise handshakerResult;
final WebSocketServerHandshaker handshaker;
final ChannelPromise handshakerResult;
final MonoProcessor<WebSocketCloseStatus> onCloseState;

volatile int closeSent;

Expand All @@ -70,6 +73,7 @@ final class WebsocketServerOperations extends HttpServerOperations
super(replaced);

Channel channel = replaced.channel();
onCloseState = MonoProcessor.create();

// Handshake
WebSocketServerHandshakerFactory wsFactory =
Expand Down Expand Up @@ -162,6 +166,14 @@ protected void onOutboundError(Throwable err) {
}
}

@Override
protected void onInboundCancel() {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Cancelling Websocket inbound. Closing Websocket"));
}
sendCloseNow(null, f -> terminate());
}

@Override
public Mono<Void> sendClose() {
return sendClose(new CloseWebSocketFrame());
Expand All @@ -182,13 +194,20 @@ public Mono<Void> sendClose(int rsv, int statusCode, @Nullable String reasonText
return sendClose(new CloseWebSocketFrame(true, rsv, statusCode, reasonText));
}

@Override
@SuppressWarnings("unchecked")
public Mono<WebSocketCloseStatus> receiveCloseStatus() {
return onCloseState.or((Mono)onTerminate());
}

Mono<Void> sendClose(CloseWebSocketFrame frame) {
if (CLOSE_SENT.get(this) == 0) {
//commented for now as we assume the close is always scheduled (deferFuture runs)
//onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame));
return FutureMono.deferFuture(() -> {
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
discard();
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
return channel().writeAndFlush(frame)
.addListener(ChannelFutureListener.CLOSE);
}
Expand All @@ -206,9 +225,15 @@ void sendCloseNow(@Nullable CloseWebSocketFrame frame, ChannelFutureListener lis
return;
}
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
ChannelFuture f = channel().writeAndFlush(
frame == null ? new CloseWebSocketFrame() : frame);
f.addListener(listener);
if (frame != null) {
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
channel().writeAndFlush(frame)
.addListener(listener);
} else {
onCloseState.onNext(new WebSocketCloseStatus(-1, ""));
channel().writeAndFlush(new CloseWebSocketFrame())
.addListener(listener);
}
}
else if (frame != null) {
frame.release();
Expand Down
Expand Up @@ -17,9 +17,11 @@
package reactor.netty.http.websocket;

import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.NettyInbound;
import reactor.util.annotation.Nullable;

Expand Down Expand Up @@ -48,6 +50,12 @@ public interface WebsocketInbound extends NettyInbound {
*/
HttpHeaders headers();

/**
* Receive the close status code and reason if sent by the remote peer,
* or empty if the connection completes otherwise.
*/
Mono<WebSocketCloseStatus> receiveCloseStatus();

/**
* Turn this {@link WebsocketInbound} into aggregating mode which will only produce
* fully formed frame that have been received fragmented.
Expand Down

0 comments on commit 51a897d

Please sign in to comment.