Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#533 drafts receiveCloseStatus method #815

Merged
merged 1 commit into from Sep 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -33,10 +33,12 @@
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.FutureMono;
import reactor.netty.NettyOutbound;
import reactor.netty.NettyPipeline;
Expand All @@ -55,6 +57,7 @@ final class WebsocketClientOperations extends HttpClientOperations
implements WebsocketInbound, WebsocketOutbound {

final WebSocketClientHandshaker handshaker;
final MonoProcessor<WebSocketCloseStatus> onCloseState;

volatile int closeSent;

Expand All @@ -64,6 +67,7 @@ final class WebsocketClientOperations extends HttpClientOperations
HttpClientOperations replaced) {
super(replaced);
Channel channel = channel();
onCloseState = MonoProcessor.create();

handshaker = WebSocketClientHandshakerFactory.newHandshaker(currentURI,
WebSocketVersion.V13,
Expand Down Expand Up @@ -195,13 +199,20 @@ public Mono<Void> sendClose(int rsv, int statusCode, @javax.annotation.Nullable
return sendClose(new CloseWebSocketFrame(true, rsv, statusCode, reasonText));
}

@Override
@SuppressWarnings("unchecked")
public Mono<WebSocketCloseStatus> receiveCloseStatus() {
return onCloseState.or((Mono)onTerminate());
}

Mono<Void> sendClose(CloseWebSocketFrame frame) {
if (CLOSE_SENT.get(this) == 0) {
//commented for now as we assume the close is always scheduled (deferFuture runs)
//onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame));
return FutureMono.deferFuture(() -> {
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
discard();
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
return channel().writeAndFlush(frame)
.addListener(ChannelFutureListener.CLOSE);
}
Expand All @@ -219,8 +230,15 @@ void sendCloseNow(@Nullable CloseWebSocketFrame frame) {
return;
}
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
channel().writeAndFlush(frame == null ? new CloseWebSocketFrame() : frame)
.addListener(ChannelFutureListener.CLOSE);
if (frame != null) {
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
channel().writeAndFlush(frame)
.addListener(ChannelFutureListener.CLOSE);
} else {
onCloseState.onNext(new WebSocketCloseStatus(-1, ""));
channel().writeAndFlush(new CloseWebSocketFrame())
.addListener(ChannelFutureListener.CLOSE);
}
}
else if (frame != null) {
frame.release();
Expand Down
Expand Up @@ -33,12 +33,14 @@
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.FutureMono;
import reactor.netty.NettyOutbound;
import reactor.netty.NettyPipeline;
Expand All @@ -58,8 +60,9 @@
final class WebsocketServerOperations extends HttpServerOperations
implements WebsocketInbound, WebsocketOutbound {

final WebSocketServerHandshaker handshaker;
final ChannelPromise handshakerResult;
final WebSocketServerHandshaker handshaker;
final ChannelPromise handshakerResult;
final MonoProcessor<WebSocketCloseStatus> onCloseState;

volatile int closeSent;

Expand All @@ -70,6 +73,7 @@ final class WebsocketServerOperations extends HttpServerOperations
super(replaced);

Channel channel = replaced.channel();
onCloseState = MonoProcessor.create();

// Handshake
WebSocketServerHandshakerFactory wsFactory =
Expand Down Expand Up @@ -162,6 +166,14 @@ protected void onOutboundError(Throwable err) {
}
}

@Override
protected void onInboundCancel() {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Cancelling Websocket inbound. Closing Websocket"));
}
sendCloseNow(null, f -> terminate());
}

@Override
public Mono<Void> sendClose() {
return sendClose(new CloseWebSocketFrame());
Expand All @@ -182,13 +194,20 @@ public Mono<Void> sendClose(int rsv, int statusCode, @Nullable String reasonText
return sendClose(new CloseWebSocketFrame(true, rsv, statusCode, reasonText));
}

@Override
@SuppressWarnings("unchecked")
public Mono<WebSocketCloseStatus> receiveCloseStatus() {
return onCloseState.or((Mono)onTerminate());
}

Mono<Void> sendClose(CloseWebSocketFrame frame) {
if (CLOSE_SENT.get(this) == 0) {
//commented for now as we assume the close is always scheduled (deferFuture runs)
//onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame));
return FutureMono.deferFuture(() -> {
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
discard();
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
return channel().writeAndFlush(frame)
.addListener(ChannelFutureListener.CLOSE);
}
Expand All @@ -206,9 +225,15 @@ void sendCloseNow(@Nullable CloseWebSocketFrame frame, ChannelFutureListener lis
return;
}
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
ChannelFuture f = channel().writeAndFlush(
frame == null ? new CloseWebSocketFrame() : frame);
f.addListener(listener);
if (frame != null) {
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
channel().writeAndFlush(frame)
.addListener(listener);
} else {
onCloseState.onNext(new WebSocketCloseStatus(-1, ""));
channel().writeAndFlush(new CloseWebSocketFrame())
.addListener(listener);
}
}
else if (frame != null) {
frame.release();
Expand Down
Expand Up @@ -17,9 +17,11 @@
package reactor.netty.http.websocket;

import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.NettyInbound;
import reactor.util.annotation.Nullable;

Expand Down Expand Up @@ -48,6 +50,12 @@ public interface WebsocketInbound extends NettyInbound {
*/
HttpHeaders headers();

/**
* Receive the close status code and reason if sent by the remote peer,
* or empty if the connection completes otherwise.
*/
Mono<WebSocketCloseStatus> receiveCloseStatus();

/**
* Turn this {@link WebsocketInbound} into aggregating mode which will only produce
* fully formed frame that have been received fragmented.
Expand Down