Skip to content

Commit

Permalink
Add more tests for canceling the incoming data for websocket client/s…
Browse files Browse the repository at this point in the history
…erver
  • Loading branch information
violetagg committed Sep 1, 2019
1 parent 78950ac commit 871cc4c
Showing 1 changed file with 123 additions and 27 deletions.
150 changes: 123 additions & 27 deletions src/test/java/reactor/netty/http/server/HttpServerTests.java
Expand Up @@ -65,7 +65,6 @@
import org.junit.Ignore;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
Expand Down Expand Up @@ -1166,22 +1165,21 @@ public void testNormalConnectionCloseForWebSocketClient() {
MonoProcessor<WebSocketCloseStatus> statusServer = MonoProcessor.create();
MonoProcessor<WebSocketCloseStatus> statusClient = MonoProcessor.create();
List<String> test =
flux.collectList()
.block();
flux.collectList()
.block();
assertThat(test).isNotNull();

DisposableServer c = HttpServer
.create()
.port(0)
.handle((req, resp) -> resp.sendWebsocket((in, out) ->
out.sendString(flux)
.then(out.sendClose(4404, "test"))
.then(in.receiveCloseStatus()
.subscribeWith(statusServer)
.then())
))
.wiretap(true)
.bindNow();
DisposableServer c = HttpServer.create()
.port(0)
.handle((req, resp) -> resp.sendWebsocket((in, out) ->
out.sendString(flux)
.then(out.sendClose(4404, "test"))
.then(in.receiveCloseStatus()
.subscribeWith(statusServer)
.then())
))
.wiretap(true)
.bindNow();

HttpClient.create()
.port(c.address()
Expand Down Expand Up @@ -1305,25 +1303,74 @@ public void testCancelConnectionCloseForWebSocketClient() {
c.disposeNow();
}

@Test
public void testCancelReceivingForWebSocketClient() {
MonoProcessor<WebSocketCloseStatus> statusServer = MonoProcessor.create();
MonoProcessor<WebSocketCloseStatus> statusClient = MonoProcessor.create();

DisposableServer c = HttpServer.create()
.port(0)
.handle((req, resp) ->
resp.sendWebsocket((in, out) -> {
in.receiveCloseStatus()
.subscribeWith(statusServer);

return out.sendString(Flux.interval(Duration.ofMillis(10))
.map(l -> l + ""));
})
)
.wiretap(true)
.bindNow();

HttpClient.create()
.port(c.address()
.getPort())
.wiretap(true)
.websocket()
.uri("/")
.handle((in, out) -> {
in.receiveCloseStatus()
.subscribeWith(statusClient);

in.receive()
.take(1)
.subscribe();

return Mono.never();
})
.subscribe();


StepVerifier.create(statusClient)
.expectNext(new WebSocketCloseStatus(-1, ""))
.expectComplete()
.verify(Duration.ofSeconds(30));

StepVerifier.create(statusServer)
.expectNext(new WebSocketCloseStatus(-1, ""))
.expectComplete()
.verify(Duration.ofSeconds(30));

c.disposeNow();
}

@Test
public void testCancelConnectionCloseForWebSocketServer() {
MonoProcessor<WebSocketCloseStatus> statusServer = MonoProcessor.create();
MonoProcessor<WebSocketCloseStatus> statusClient = MonoProcessor.create();

DisposableServer c = HttpServer
.create()
.port(0)
.handle((req, resp) -> resp.sendWebsocket((in, out) -> {
in.receiveCloseStatus()
.subscribeWith(statusServer)
.then();
DisposableServer c = HttpServer.create()
.port(0)
.handle((req, resp) -> resp.sendWebsocket((in, out) -> {
in.receiveCloseStatus()
.subscribeWith(statusServer);

in.withConnection(Connection::dispose);
in.withConnection(Connection::dispose);

return Mono.never();
}))
.wiretap(true)
.bindNow();
return Mono.never();
}))
.wiretap(true)
.bindNow();

HttpClient.create()
.port(c.address()
Expand All @@ -1340,6 +1387,55 @@ public void testCancelConnectionCloseForWebSocketServer() {
.subscribe();


StepVerifier.create(statusClient)
.expectNext(new WebSocketCloseStatus(-1, ""))
.expectComplete()
.verify(Duration.ofSeconds(30));

StepVerifier.create(statusServer)
.expectNext(new WebSocketCloseStatus(-1, ""))
.expectComplete()
.verify(Duration.ofSeconds(30));

c.disposeNow();
}

@Test
public void testCancelReceivingForWebSocketServer() {
MonoProcessor<WebSocketCloseStatus> statusServer = MonoProcessor.create();
MonoProcessor<WebSocketCloseStatus> statusClient = MonoProcessor.create();

DisposableServer c = HttpServer.create()
.port(0)
.handle((req, resp) -> resp.sendWebsocket((in, out) -> {
in.receiveCloseStatus()
.subscribeWith(statusServer);

in.receive()
.take(1)
.subscribe();

return Mono.never();
}))
.wiretap(true)
.bindNow();

HttpClient.create()
.port(c.address()
.getPort())
.wiretap(true)
.websocket()
.uri("/")
.handle((in, out) -> {
in.receiveCloseStatus()
.subscribeWith(statusClient);

return out.sendString(Flux.interval(Duration.ofMillis(10))
.map(l -> l + ""));
})
.subscribe();


StepVerifier.create(statusClient)
.expectNext(new WebSocketCloseStatus(-1, ""))
.expectComplete()
Expand Down

0 comments on commit 871cc4c

Please sign in to comment.