From 871cc4c67e40078c6ec3e088252b3f15b1428d4c Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Sun, 1 Sep 2019 23:22:18 +0300 Subject: [PATCH] Add more tests for canceling the incoming data for websocket client/server --- .../netty/http/server/HttpServerTests.java | 150 ++++++++++++++---- 1 file changed, 123 insertions(+), 27 deletions(-) diff --git a/src/test/java/reactor/netty/http/server/HttpServerTests.java b/src/test/java/reactor/netty/http/server/HttpServerTests.java index 5a617a04af..caf978c29c 100644 --- a/src/test/java/reactor/netty/http/server/HttpServerTests.java +++ b/src/test/java/reactor/netty/http/server/HttpServerTests.java @@ -65,7 +65,6 @@ import org.junit.Ignore; import org.junit.Test; import org.reactivestreams.Publisher; -import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; @@ -1166,22 +1165,21 @@ public void testNormalConnectionCloseForWebSocketClient() { MonoProcessor statusServer = MonoProcessor.create(); MonoProcessor statusClient = MonoProcessor.create(); List test = - flux.collectList() - .block(); + flux.collectList() + .block(); assertThat(test).isNotNull(); - DisposableServer c = HttpServer - .create() - .port(0) - .handle((req, resp) -> resp.sendWebsocket((in, out) -> - out.sendString(flux) - .then(out.sendClose(4404, "test")) - .then(in.receiveCloseStatus() - .subscribeWith(statusServer) - .then()) - )) - .wiretap(true) - .bindNow(); + DisposableServer c = HttpServer.create() + .port(0) + .handle((req, resp) -> resp.sendWebsocket((in, out) -> + out.sendString(flux) + .then(out.sendClose(4404, "test")) + .then(in.receiveCloseStatus() + .subscribeWith(statusServer) + .then()) + )) + .wiretap(true) + .bindNow(); HttpClient.create() .port(c.address() @@ -1305,25 +1303,74 @@ public void testCancelConnectionCloseForWebSocketClient() { c.disposeNow(); } + @Test + public void testCancelReceivingForWebSocketClient() { + MonoProcessor statusServer = MonoProcessor.create(); + MonoProcessor statusClient = MonoProcessor.create(); + + DisposableServer c = HttpServer.create() + .port(0) + .handle((req, resp) -> + resp.sendWebsocket((in, out) -> { + in.receiveCloseStatus() + .subscribeWith(statusServer); + + return out.sendString(Flux.interval(Duration.ofMillis(10)) + .map(l -> l + "")); + }) + ) + .wiretap(true) + .bindNow(); + + HttpClient.create() + .port(c.address() + .getPort()) + .wiretap(true) + .websocket() + .uri("/") + .handle((in, out) -> { + in.receiveCloseStatus() + .subscribeWith(statusClient); + + in.receive() + .take(1) + .subscribe(); + + return Mono.never(); + }) + .subscribe(); + + + StepVerifier.create(statusClient) + .expectNext(new WebSocketCloseStatus(-1, "")) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + StepVerifier.create(statusServer) + .expectNext(new WebSocketCloseStatus(-1, "")) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + c.disposeNow(); + } + @Test public void testCancelConnectionCloseForWebSocketServer() { MonoProcessor statusServer = MonoProcessor.create(); MonoProcessor statusClient = MonoProcessor.create(); - DisposableServer c = HttpServer - .create() - .port(0) - .handle((req, resp) -> resp.sendWebsocket((in, out) -> { - in.receiveCloseStatus() - .subscribeWith(statusServer) - .then(); + DisposableServer c = HttpServer.create() + .port(0) + .handle((req, resp) -> resp.sendWebsocket((in, out) -> { + in.receiveCloseStatus() + .subscribeWith(statusServer); - in.withConnection(Connection::dispose); + in.withConnection(Connection::dispose); - return Mono.never(); - })) - .wiretap(true) - .bindNow(); + return Mono.never(); + })) + .wiretap(true) + .bindNow(); HttpClient.create() .port(c.address() @@ -1340,6 +1387,55 @@ public void testCancelConnectionCloseForWebSocketServer() { .subscribe(); + StepVerifier.create(statusClient) + .expectNext(new WebSocketCloseStatus(-1, "")) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + StepVerifier.create(statusServer) + .expectNext(new WebSocketCloseStatus(-1, "")) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + c.disposeNow(); + } + + @Test + public void testCancelReceivingForWebSocketServer() { + MonoProcessor statusServer = MonoProcessor.create(); + MonoProcessor statusClient = MonoProcessor.create(); + + DisposableServer c = HttpServer.create() + .port(0) + .handle((req, resp) -> resp.sendWebsocket((in, out) -> { + in.receiveCloseStatus() + .subscribeWith(statusServer); + + in.receive() + .take(1) + .subscribe(); + + return Mono.never(); + })) + .wiretap(true) + .bindNow(); + + HttpClient.create() + .port(c.address() + .getPort()) + .wiretap(true) + .websocket() + .uri("/") + .handle((in, out) -> { + in.receiveCloseStatus() + .subscribeWith(statusClient); + + return out.sendString(Flux.interval(Duration.ofMillis(10)) + .map(l -> l + "")); + }) + .subscribe(); + + StepVerifier.create(statusClient) .expectNext(new WebSocketCloseStatus(-1, "")) .expectComplete()