Skip to content

Commit

Permalink
#533 drafts receiveCloseStatus method
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed Sep 1, 2019
1 parent 5ab4470 commit 0939f23
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 7 deletions.
Expand Up @@ -33,10 +33,12 @@
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.FutureMono;
import reactor.netty.NettyOutbound;
import reactor.netty.NettyPipeline;
Expand All @@ -55,6 +57,7 @@ final class WebsocketClientOperations extends HttpClientOperations
implements WebsocketInbound, WebsocketOutbound {

final WebSocketClientHandshaker handshaker;
final MonoProcessor<WebSocketCloseStatus> onCloseState;

volatile int closeSent;

Expand All @@ -64,6 +67,7 @@ final class WebsocketClientOperations extends HttpClientOperations
HttpClientOperations replaced) {
super(replaced);
Channel channel = channel();
onCloseState = MonoProcessor.create();

handshaker = WebSocketClientHandshakerFactory.newHandshaker(currentURI,
WebSocketVersion.V13,
Expand Down Expand Up @@ -195,13 +199,20 @@ public Mono<Void> sendClose(int rsv, int statusCode, @javax.annotation.Nullable
return sendClose(new CloseWebSocketFrame(true, rsv, statusCode, reasonText));
}

@Override
@SuppressWarnings("unchecked")
public Mono<WebSocketCloseStatus> receiveCloseStatus() {
return onCloseState.or((Mono)onTerminate());
}

Mono<Void> sendClose(CloseWebSocketFrame frame) {
if (CLOSE_SENT.get(this) == 0) {
//commented for now as we assume the close is always scheduled (deferFuture runs)
//onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame));
return FutureMono.deferFuture(() -> {
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
discard();
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
return channel().writeAndFlush(frame)
.addListener(ChannelFutureListener.CLOSE);
}
Expand All @@ -219,8 +230,15 @@ void sendCloseNow(@Nullable CloseWebSocketFrame frame) {
return;
}
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
channel().writeAndFlush(frame == null ? new CloseWebSocketFrame() : frame)
.addListener(ChannelFutureListener.CLOSE);
if (frame != null) {
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
channel().writeAndFlush(frame)
.addListener(ChannelFutureListener.CLOSE);
} else {
onCloseState.onNext(new WebSocketCloseStatus(-1, ""));
channel().writeAndFlush(new CloseWebSocketFrame())
.addListener(ChannelFutureListener.CLOSE);
}
}
else if (frame != null) {
frame.release();
Expand Down
Expand Up @@ -33,12 +33,14 @@
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.FutureMono;
import reactor.netty.NettyOutbound;
import reactor.netty.NettyPipeline;
Expand All @@ -58,8 +60,9 @@
final class WebsocketServerOperations extends HttpServerOperations
implements WebsocketInbound, WebsocketOutbound {

final WebSocketServerHandshaker handshaker;
final ChannelPromise handshakerResult;
final WebSocketServerHandshaker handshaker;
final ChannelPromise handshakerResult;
final MonoProcessor<WebSocketCloseStatus> onCloseState;

volatile int closeSent;

Expand All @@ -70,6 +73,7 @@ final class WebsocketServerOperations extends HttpServerOperations
super(replaced);

Channel channel = replaced.channel();
onCloseState = MonoProcessor.create();

// Handshake
WebSocketServerHandshakerFactory wsFactory =
Expand Down Expand Up @@ -182,13 +186,20 @@ public Mono<Void> sendClose(int rsv, int statusCode, @Nullable String reasonText
return sendClose(new CloseWebSocketFrame(true, rsv, statusCode, reasonText));
}

@Override
@SuppressWarnings("unchecked")
public Mono<WebSocketCloseStatus> receiveCloseStatus() {
return onCloseState.or((Mono)onTerminate());
}

Mono<Void> sendClose(CloseWebSocketFrame frame) {
if (CLOSE_SENT.get(this) == 0) {
//commented for now as we assume the close is always scheduled (deferFuture runs)
//onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame));
return FutureMono.deferFuture(() -> {
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
discard();
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
return channel().writeAndFlush(frame)
.addListener(ChannelFutureListener.CLOSE);
}
Expand All @@ -206,9 +217,15 @@ void sendCloseNow(@Nullable CloseWebSocketFrame frame, ChannelFutureListener lis
return;
}
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
ChannelFuture f = channel().writeAndFlush(
frame == null ? new CloseWebSocketFrame() : frame);
f.addListener(listener);
if (frame != null) {
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
channel().writeAndFlush(frame)
.addListener(ChannelFutureListener.CLOSE);
} else {
onCloseState.onNext(new WebSocketCloseStatus(-1, ""));
channel().writeAndFlush(new CloseWebSocketFrame())
.addListener(ChannelFutureListener.CLOSE);
}
}
else if (frame != null) {
frame.release();
Expand Down
Expand Up @@ -17,9 +17,11 @@
package reactor.netty.http.websocket;

import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.NettyInbound;
import reactor.util.annotation.Nullable;

Expand Down Expand Up @@ -48,6 +50,12 @@ public interface WebsocketInbound extends NettyInbound {
*/
HttpHeaders headers();

/**
* Receive the close status code and reason if sent by the remote peer,
* or empty if the connection completes otherwise.
*/
Mono<WebSocketCloseStatus> receiveCloseStatus();

/**
* Turn this {@link WebsocketInbound} into aggregating mode which will only produce
* fully formed frame that have been received fragmented.
Expand Down
203 changes: 203 additions & 0 deletions src/test/java/reactor/netty/http/server/HttpServerTests.java
Expand Up @@ -56,6 +56,7 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
Expand All @@ -64,8 +65,11 @@
import org.junit.Ignore;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.UnicastProcessor;
import reactor.netty.ByteBufFlux;
import reactor.netty.ChannelBindException;
import reactor.netty.Connection;
Expand Down Expand Up @@ -1153,4 +1157,203 @@ public void testExpectErrorWhenConnectionClosed() throws Exception {
assertThat(error.get()).isInstanceOf(AbortedException.class);
server.dispose();
}

@Test
public void testNormalConnectionCloseForWebSocketClient() {
Flux<String> flux = Flux.range(0, 100)
.map(n -> String.format("%010d", n));
UnicastProcessor<String> receiver = UnicastProcessor.create();
MonoProcessor<WebSocketCloseStatus> statusServer = MonoProcessor.create();
MonoProcessor<WebSocketCloseStatus> statusClient = MonoProcessor.create();
List<String> test =
flux.collectList()
.block();
assertThat(test).isNotNull();

DisposableServer c = HttpServer.create()
.port(0)
.handle((req, resp) -> resp.sendWebsocket((in, out) -> out.send(
flux.map(s -> ByteBufAllocator.DEFAULT.buffer().writeBytes(s.getBytes()))
)
.then(out.sendClose(4404, "test"))
.then(in.receiveCloseStatus().subscribeWith(statusServer).then())
))
.wiretap(true)
.bindNow();

HttpClient.create()
.port(c.address()
.getPort())
.wiretap(true)
.websocket()
.uri("/")
.handle((in, out) -> {
MonoProcessor done = MonoProcessor.create();
in.receiveCloseStatus()
.subscribeWith(statusClient);
in.receive()
.map(bb -> {
byte[] content = new byte[bb.capacity()];
bb.readBytes(content);
return new String(content);
})
.doFinally((s) -> done.onComplete())
.subscribeWith(receiver);
return done.then(Mono.delay(Duration.ofMillis(500)));
})
.blockLast();

StepVerifier.create(receiver)
.expectNextSequence(test)
.expectComplete()
.verify(Duration.ofSeconds(30));

StepVerifier.create(statusClient)
.expectNext(new WebSocketCloseStatus(4404, "test"))
.expectComplete()
.verify(Duration.ofSeconds(30));


StepVerifier.create(statusServer)
.expectNext(new WebSocketCloseStatus(4404, "test"))
.expectComplete()
.verify(Duration.ofSeconds(30));

c.disposeNow();
}


@Test
public void testNormalConnectionCloseForWebSocketServer() {
MonoProcessor<WebSocketCloseStatus> statusServer = MonoProcessor.create();
MonoProcessor<WebSocketCloseStatus> statusClient = MonoProcessor.create();

DisposableServer c = HttpServer.create()
.port(0)
.handle((req, resp) ->
resp.sendWebsocket((in, out) -> in.receiveCloseStatus()
.subscribeWith(statusServer)
.then())
)
.wiretap(true)
.bindNow();

HttpClient.create()
.port(c.address()
.getPort())
.wiretap(true)
.websocket()
.uri("/")
.handle((in, out) -> out.sendClose(4404, "test")
.then(in.receiveCloseStatus()
.subscribeWith(statusClient)))
.blockLast();

StepVerifier.create(statusClient)
.expectNext(new WebSocketCloseStatus(4404, "test"))
.expectComplete()
.verify(Duration.ofSeconds(30));

StepVerifier.create(statusServer)
.expectNext(new WebSocketCloseStatus(4404, "test"))
.expectComplete()
.verify(Duration.ofSeconds(30));

c.disposeNow();
}

@Test
public void testCancelConnectionCloseForWebSocketClient() {
MonoProcessor<WebSocketCloseStatus> statusServer = MonoProcessor.create();
MonoProcessor<WebSocketCloseStatus> statusClient = MonoProcessor.create();

DisposableServer c = HttpServer.create()
.port(0)
.handle((req, resp) ->
resp.sendWebsocket((in, out) -> in.receiveCloseStatus()
.subscribeWith(statusServer)
.then())
)
.wiretap(true)
.bindNow();

HttpClient.create()
.port(c.address()
.getPort())
.wiretap(true)
.websocket()
.uri("/")
.handle((in, out) -> {
in.receiveCloseStatus()
.subscribeWith(statusClient);

((Connection) in).dispose();

return Mono.never();
})
.subscribe();


StepVerifier.create(statusClient)
.expectNext(new WebSocketCloseStatus(-1, ""))
.expectComplete()
.verify(Duration.ofSeconds(30));

StepVerifier.create(statusServer)
.expectNext(new WebSocketCloseStatus(-1, ""))
.expectComplete()
.verify(Duration.ofSeconds(30));

c.disposeNow();
}

@Test
public void testCancelConnectionCloseForWebSocketServer() {
MonoProcessor<WebSocketCloseStatus> statusServer = MonoProcessor.create();
MonoProcessor<WebSocketCloseStatus> statusClient = MonoProcessor.create();

DisposableServer c = HttpServer
.create()
.port(0)
.handle((req, resp) -> resp.sendWebsocket((in, out) -> {
in.receiveCloseStatus()
.subscribeWith(statusServer)
.then();

((Connection) in).dispose();

return Mono.never();
}))
.wiretap(true)
.bindNow();

HttpClient.create()
.port(c.address()
.getPort())
.wiretap(true)
.websocket()
.uri("/")
.handle((in, out) -> {
in.receiveCloseStatus()
.subscribeWith(statusClient);

return Mono.never();
})
.subscribe();


StepVerifier.create(statusClient)
// FIXME: asymmetric behavior compares to client disposal
// .expectNext(new WebSocketCloseStatus(-1, ""))
.expectComplete()
.verify(Duration.ofSeconds(30));

StepVerifier.create(statusServer)
// FIXME: asymmetric behavior compares to client disposal
// .expectNext(new WebSocketCloseStatus(-1, ""))
.expectComplete()
.verify(Duration.ofSeconds(30));

c.disposeNow();
}
}

0 comments on commit 0939f23

Please sign in to comment.