diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpFragmentationTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpFragmentationTransportTest.java index 06e75190d..dbd036a50 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpFragmentationTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpFragmentationTransportTest.java @@ -29,6 +29,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.publisher.Flux; import reactor.netty.tcp.TcpClient; import reactor.netty.tcp.TcpServer; @@ -41,6 +44,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx executor.shutdownGracefully().get(30, TimeUnit.SECONDS); } + Disposable channelChecker = Disposables.disposed(); + + @AfterEach + void cleanupCheck() { + channelChecker.dispose(); + } + private final TransportPair transportPair = new TransportPair<>( () -> InetSocketAddress.createUnresolved("localhost", 0), @@ -51,17 +61,14 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx .option(ChannelOption.ALLOCATOR, allocator)), (address, allocator) -> { DefaultChannelGroup channels = new DefaultChannelGroup(executor); + channelChecker = + Flux.interval(Duration.ofSeconds(1)) + .doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c))) + .subscribe(); return TcpServerTransport.create( TcpServer.create() .bindAddress(() -> address) .channelGroup(channels) - .doOnBound( - ds -> - Flux.interval(Duration.ofSeconds(1)) - .doOnNext( - __ -> channels.forEach(c -> logger.info("Active Channel " + c))) - .takeUntilOther(ds.onDispose().log("-disposing-")) - .subscribe()) .doOnUnbound( (ds) -> { try { diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableTransportTest.java index 36286b2fc..10dc18301 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableTransportTest.java @@ -29,6 +29,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.publisher.Flux; import reactor.netty.tcp.TcpClient; import reactor.netty.tcp.TcpServer; @@ -41,6 +44,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx executor.shutdownGracefully().get(30, TimeUnit.SECONDS); } + Disposable channelChecker = Disposables.disposed(); + + @AfterEach + void cleanupCheck() { + channelChecker.dispose(); + } + private final TransportPair transportPair = new TransportPair<>( () -> InetSocketAddress.createUnresolved("localhost", 0), @@ -51,18 +61,15 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx .option(ChannelOption.ALLOCATOR, allocator)), (address, allocator) -> { DefaultChannelGroup channels = new DefaultChannelGroup(executor); + channelChecker = + Flux.interval(Duration.ofSeconds(1)) + .doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c))) + .subscribe(); return TcpServerTransport.create( TcpServer.create() .bindAddress(() -> address) .option(ChannelOption.ALLOCATOR, allocator) .channelGroup(channels) - .doOnBound( - ds -> - Flux.interval(Duration.ofSeconds(1)) - .doOnNext( - __ -> channels.forEach(c -> logger.info("Active Channel " + c))) - .takeUntilOther(ds.onDispose().log("-disposing-")) - .subscribe()) .doOnUnbound( (ds) -> { try { diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableWithFragmentationTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableWithFragmentationTransportTest.java index 7d9766d22..95da898b9 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableWithFragmentationTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableWithFragmentationTransportTest.java @@ -29,6 +29,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.publisher.Flux; import reactor.netty.tcp.TcpClient; import reactor.netty.tcp.TcpServer; @@ -41,6 +44,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx executor.shutdownGracefully().get(30, TimeUnit.SECONDS); } + Disposable channelChecker = Disposables.disposed(); + + @AfterEach + void cleanupCheck() { + channelChecker.dispose(); + } + private final TransportPair transportPair = new TransportPair<>( () -> InetSocketAddress.createUnresolved("localhost", 0), @@ -51,18 +61,15 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx .option(ChannelOption.ALLOCATOR, allocator)), (address, allocator) -> { DefaultChannelGroup channels = new DefaultChannelGroup(executor); + channelChecker = + Flux.interval(Duration.ofSeconds(1)) + .doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c))) + .subscribe(); return TcpServerTransport.create( TcpServer.create() .bindAddress(() -> address) .option(ChannelOption.ALLOCATOR, allocator) .channelGroup(channels) - .doOnBound( - ds -> - Flux.interval(Duration.ofSeconds(1)) - .doOnNext( - __ -> channels.forEach(c -> logger.info("Active Channel " + c))) - .takeUntilOther(ds.onDispose().log("-disposing-")) - .subscribe()) .doOnUnbound( (ds) -> { try { diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpSecureTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpSecureTransportTest.java index e0793cfe0..20e433c22 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpSecureTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpSecureTransportTest.java @@ -17,6 +17,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.netty.tcp.TcpClient; @@ -30,6 +33,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx executor.shutdownGracefully().get(30, TimeUnit.SECONDS); } + Disposable channelChecker = Disposables.disposed(); + + @AfterEach + void cleanupCheck() { + channelChecker.dispose(); + } + private final TransportPair transportPair = new TransportPair<>( () -> new InetSocketAddress("localhost", 0), @@ -46,18 +56,14 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx (address, allocator) -> { try { DefaultChannelGroup channels = new DefaultChannelGroup(executor); + channelChecker = + Flux.interval(Duration.ofSeconds(1)) + .doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c))) + .subscribe(); SelfSignedCertificate ssc = new SelfSignedCertificate(); TcpServer server = TcpServer.create() .channelGroup(channels) - .doOnBound( - ds -> - Flux.interval(Duration.ofSeconds(1)) - .doOnNext( - __ -> - channels.forEach(c -> logger.info("Active Channel " + c))) - .takeUntilOther(ds.onDispose().log("-disposing-")) - .subscribe()) .doOnUnbound( (ds) -> { try { diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpTransportTest.java index 7414f171a..7456e92c8 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpTransportTest.java @@ -29,6 +29,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.publisher.Flux; import reactor.netty.tcp.TcpClient; import reactor.netty.tcp.TcpServer; @@ -42,6 +45,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx executor.shutdownGracefully().get(30, TimeUnit.SECONDS); } + Disposable channelChecker = Disposables.disposed(); + + @AfterEach + void cleanupCheck() { + channelChecker.dispose(); + } + private final TransportPair transportPair = new TransportPair<>( () -> InetSocketAddress.createUnresolved("localhost", 0), @@ -52,18 +62,15 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx .option(ChannelOption.ALLOCATOR, allocator)), (address, allocator) -> { DefaultChannelGroup channels = new DefaultChannelGroup(executor); + channelChecker = + Flux.interval(Duration.ofSeconds(1)) + .doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c))) + .subscribe(); return TcpServerTransport.create( TcpServer.create() .bindAddress(() -> address) .option(ChannelOption.ALLOCATOR, allocator) .channelGroup(channels) - .doOnBound( - ds -> - Flux.interval(Duration.ofSeconds(1)) - .doOnNext( - __ -> channels.forEach(c -> logger.info("Active Channel " + c))) - .takeUntilOther(ds.onDispose()) - .subscribe()) .doOnUnbound( (ds) -> { try { diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableTransportTest.java index d27b68747..3d6322eb0 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableTransportTest.java @@ -29,6 +29,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.publisher.Flux; import reactor.netty.http.client.HttpClient; import reactor.netty.http.server.HttpServer; @@ -41,6 +44,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx executor.shutdownGracefully().get(30, TimeUnit.SECONDS); } + Disposable channelChecker = Disposables.disposed(); + + @AfterEach + void cleanupCheck() { + channelChecker.dispose(); + } + private final TransportPair transportPair = new TransportPair<>( () -> InetSocketAddress.createUnresolved("localhost", 0), @@ -53,18 +63,15 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx ""), (address, allocator) -> { DefaultChannelGroup channels = new DefaultChannelGroup(executor); + channelChecker = + Flux.interval(Duration.ofSeconds(1)) + .doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c))) + .subscribe(); return WebsocketServerTransport.create( HttpServer.create() .host(address.getHostName()) .port(address.getPort()) .channelGroup(channels) - .doOnBound( - ds -> - Flux.interval(Duration.ofSeconds(1)) - .doOnNext( - __ -> channels.forEach(c -> logger.info("Active Channel " + c))) - .takeUntilOther(ds.onDispose().log("-disposing-")) - .subscribe()) .doOnUnbound( (ds) -> { try { diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableWithFragmentationTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableWithFragmentationTransportTest.java index b89b575fa..e2b4f7549 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableWithFragmentationTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableWithFragmentationTransportTest.java @@ -29,6 +29,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.publisher.Flux; import reactor.netty.http.client.HttpClient; import reactor.netty.http.server.HttpServer; @@ -42,6 +45,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx executor.shutdownGracefully().get(30, TimeUnit.SECONDS); } + Disposable channelChecker = Disposables.disposed(); + + @AfterEach + void cleanupCheck() { + channelChecker.dispose(); + } + private final TransportPair transportPair = new TransportPair<>( () -> InetSocketAddress.createUnresolved("localhost", 0), @@ -54,18 +64,15 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx ""), (address, allocator) -> { DefaultChannelGroup channels = new DefaultChannelGroup(executor); + channelChecker = + Flux.interval(Duration.ofSeconds(1)) + .doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c))) + .subscribe(); return WebsocketServerTransport.create( HttpServer.create() .host(address.getHostName()) .port(address.getPort()) .channelGroup(channels) - .doOnBound( - ds -> - Flux.interval(Duration.ofSeconds(1)) - .doOnNext( - __ -> channels.forEach(c -> logger.info("Active Channel " + c))) - .takeUntilOther(ds.onDispose().log("-disposing-")) - .subscribe()) .doOnUnbound( (ds) -> { try { diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketSecureTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketSecureTransportTest.java index 1d3049dff..c9a71429f 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketSecureTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketSecureTransportTest.java @@ -33,6 +33,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.netty.http.client.HttpClient; @@ -47,6 +50,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx executor.shutdownGracefully().get(30, TimeUnit.SECONDS); } + Disposable channelChecker = Disposables.disposed(); + + @AfterEach + void cleanupCheck() { + channelChecker.dispose(); + } + private final TransportPair transportPair = new TransportPair<>( () -> new InetSocketAddress("localhost", 0), @@ -66,6 +76,10 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx (address, allocator) -> { try { DefaultChannelGroup channels = new DefaultChannelGroup(executor); + channelChecker = + Flux.interval(Duration.ofSeconds(1)) + .doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c))) + .subscribe(); SelfSignedCertificate ssc = new SelfSignedCertificate(); HttpServer server = HttpServer.create() @@ -76,14 +90,6 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx ssl.sslContext( SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()))) .channelGroup(channels) - .doOnBound( - ds -> - Flux.interval(Duration.ofSeconds(1)) - .doOnNext( - __ -> - channels.forEach(c -> logger.info("Active Channel " + c))) - .takeUntilOther(ds.onDispose().log("-disposing-")) - .subscribe()) .doOnUnbound( (ds) -> { try { diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketTransportTest.java index bd69bdfe3..45d3c66e5 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketTransportTest.java @@ -29,6 +29,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.publisher.Flux; import reactor.netty.http.client.HttpClient; import reactor.netty.http.server.HttpServer; @@ -41,6 +44,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx executor.shutdownGracefully().get(30, TimeUnit.SECONDS); } + Disposable channelChecker = Disposables.disposed(); + + @AfterEach + void cleanupCheck() { + channelChecker.dispose(); + } + private final TransportPair transportPair = new TransportPair<>( () -> InetSocketAddress.createUnresolved("localhost", 0), @@ -53,18 +63,15 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx ""), (address, allocator) -> { DefaultChannelGroup channels = new DefaultChannelGroup(executor); + channelChecker = + Flux.interval(Duration.ofSeconds(1)) + .doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c))) + .subscribe(); return WebsocketServerTransport.create( HttpServer.create() .host(address.getHostName()) .port(address.getPort()) .channelGroup(channels) - .doOnBound( - ds -> - Flux.interval(Duration.ofSeconds(1)) - .doOnNext( - __ -> channels.forEach(c -> logger.info("Active Channel " + c))) - .takeUntilOther(ds.onDispose().log("-disposing-")) - .subscribe()) .doOnUnbound( (ds) -> { try {