Skip to content

Commit

Permalink
logs
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
Signed-off-by: OlegDokuka <odokuka@vmware.com>
  • Loading branch information
OlegDokuka committed Mar 27, 2023
1 parent 49eb695 commit 857ae08
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpServer;
Expand All @@ -41,6 +44,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
executor.shutdownGracefully().get(30, TimeUnit.SECONDS);
}

Disposable channelChecker = Disposables.disposed();

@AfterEach
void cleanupCheck() {
channelChecker.dispose();
}

private final TransportPair transportPair =
new TransportPair<>(
() -> InetSocketAddress.createUnresolved("localhost", 0),
Expand All @@ -51,17 +61,14 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
.option(ChannelOption.ALLOCATOR, allocator)),
(address, allocator) -> {
DefaultChannelGroup channels = new DefaultChannelGroup(executor);
channelChecker =
Flux.interval(Duration.ofSeconds(1))
.doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c)))
.subscribe();
return TcpServerTransport.create(
TcpServer.create()
.bindAddress(() -> address)
.channelGroup(channels)
.doOnBound(
ds ->
Flux.interval(Duration.ofSeconds(1))
.doOnNext(
__ -> channels.forEach(c -> logger.info("Active Channel " + c)))
.takeUntilOther(ds.onDispose().log("-disposing-"))
.subscribe())
.doOnUnbound(
(ds) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpServer;
Expand All @@ -41,6 +44,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
executor.shutdownGracefully().get(30, TimeUnit.SECONDS);
}

Disposable channelChecker = Disposables.disposed();

@AfterEach
void cleanupCheck() {
channelChecker.dispose();
}

private final TransportPair transportPair =
new TransportPair<>(
() -> InetSocketAddress.createUnresolved("localhost", 0),
Expand All @@ -51,18 +61,15 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
.option(ChannelOption.ALLOCATOR, allocator)),
(address, allocator) -> {
DefaultChannelGroup channels = new DefaultChannelGroup(executor);
channelChecker =
Flux.interval(Duration.ofSeconds(1))
.doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c)))
.subscribe();
return TcpServerTransport.create(
TcpServer.create()
.bindAddress(() -> address)
.option(ChannelOption.ALLOCATOR, allocator)
.channelGroup(channels)
.doOnBound(
ds ->
Flux.interval(Duration.ofSeconds(1))
.doOnNext(
__ -> channels.forEach(c -> logger.info("Active Channel " + c)))
.takeUntilOther(ds.onDispose().log("-disposing-"))
.subscribe())
.doOnUnbound(
(ds) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpServer;
Expand All @@ -41,6 +44,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
executor.shutdownGracefully().get(30, TimeUnit.SECONDS);
}

Disposable channelChecker = Disposables.disposed();

@AfterEach
void cleanupCheck() {
channelChecker.dispose();
}

private final TransportPair transportPair =
new TransportPair<>(
() -> InetSocketAddress.createUnresolved("localhost", 0),
Expand All @@ -51,18 +61,15 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
.option(ChannelOption.ALLOCATOR, allocator)),
(address, allocator) -> {
DefaultChannelGroup channels = new DefaultChannelGroup(executor);
channelChecker =
Flux.interval(Duration.ofSeconds(1))
.doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c)))
.subscribe();
return TcpServerTransport.create(
TcpServer.create()
.bindAddress(() -> address)
.option(ChannelOption.ALLOCATOR, allocator)
.channelGroup(channels)
.doOnBound(
ds ->
Flux.interval(Duration.ofSeconds(1))
.doOnNext(
__ -> channels.forEach(c -> logger.info("Active Channel " + c)))
.takeUntilOther(ds.onDispose().log("-disposing-"))
.subscribe())
.doOnUnbound(
(ds) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.netty.tcp.TcpClient;
Expand All @@ -30,6 +33,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
executor.shutdownGracefully().get(30, TimeUnit.SECONDS);
}

Disposable channelChecker = Disposables.disposed();

@AfterEach
void cleanupCheck() {
channelChecker.dispose();
}

private final TransportPair transportPair =
new TransportPair<>(
() -> new InetSocketAddress("localhost", 0),
Expand All @@ -46,18 +56,14 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
(address, allocator) -> {
try {
DefaultChannelGroup channels = new DefaultChannelGroup(executor);
channelChecker =
Flux.interval(Duration.ofSeconds(1))
.doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c)))
.subscribe();
SelfSignedCertificate ssc = new SelfSignedCertificate();
TcpServer server =
TcpServer.create()
.channelGroup(channels)
.doOnBound(
ds ->
Flux.interval(Duration.ofSeconds(1))
.doOnNext(
__ ->
channels.forEach(c -> logger.info("Active Channel " + c)))
.takeUntilOther(ds.onDispose().log("-disposing-"))
.subscribe())
.doOnUnbound(
(ds) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpServer;
Expand All @@ -42,6 +45,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
executor.shutdownGracefully().get(30, TimeUnit.SECONDS);
}

Disposable channelChecker = Disposables.disposed();

@AfterEach
void cleanupCheck() {
channelChecker.dispose();
}

private final TransportPair transportPair =
new TransportPair<>(
() -> InetSocketAddress.createUnresolved("localhost", 0),
Expand All @@ -52,18 +62,15 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
.option(ChannelOption.ALLOCATOR, allocator)),
(address, allocator) -> {
DefaultChannelGroup channels = new DefaultChannelGroup(executor);
channelChecker =
Flux.interval(Duration.ofSeconds(1))
.doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c)))
.subscribe();
return TcpServerTransport.create(
TcpServer.create()
.bindAddress(() -> address)
.option(ChannelOption.ALLOCATOR, allocator)
.channelGroup(channels)
.doOnBound(
ds ->
Flux.interval(Duration.ofSeconds(1))
.doOnNext(
__ -> channels.forEach(c -> logger.info("Active Channel " + c)))
.takeUntilOther(ds.onDispose())
.subscribe())
.doOnUnbound(
(ds) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.server.HttpServer;
Expand All @@ -41,6 +44,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
executor.shutdownGracefully().get(30, TimeUnit.SECONDS);
}

Disposable channelChecker = Disposables.disposed();

@AfterEach
void cleanupCheck() {
channelChecker.dispose();
}

private final TransportPair transportPair =
new TransportPair<>(
() -> InetSocketAddress.createUnresolved("localhost", 0),
Expand All @@ -53,18 +63,15 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
""),
(address, allocator) -> {
DefaultChannelGroup channels = new DefaultChannelGroup(executor);
channelChecker =
Flux.interval(Duration.ofSeconds(1))
.doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c)))
.subscribe();
return WebsocketServerTransport.create(
HttpServer.create()
.host(address.getHostName())
.port(address.getPort())
.channelGroup(channels)
.doOnBound(
ds ->
Flux.interval(Duration.ofSeconds(1))
.doOnNext(
__ -> channels.forEach(c -> logger.info("Active Channel " + c)))
.takeUntilOther(ds.onDispose().log("-disposing-"))
.subscribe())
.doOnUnbound(
(ds) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.server.HttpServer;
Expand All @@ -42,6 +45,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
executor.shutdownGracefully().get(30, TimeUnit.SECONDS);
}

Disposable channelChecker = Disposables.disposed();

@AfterEach
void cleanupCheck() {
channelChecker.dispose();
}

private final TransportPair transportPair =
new TransportPair<>(
() -> InetSocketAddress.createUnresolved("localhost", 0),
Expand All @@ -54,18 +64,15 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
""),
(address, allocator) -> {
DefaultChannelGroup channels = new DefaultChannelGroup(executor);
channelChecker =
Flux.interval(Duration.ofSeconds(1))
.doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c)))
.subscribe();
return WebsocketServerTransport.create(
HttpServer.create()
.host(address.getHostName())
.port(address.getPort())
.channelGroup(channels)
.doOnBound(
ds ->
Flux.interval(Duration.ofSeconds(1))
.doOnNext(
__ -> channels.forEach(c -> logger.info("Active Channel " + c)))
.takeUntilOther(ds.onDispose().log("-disposing-"))
.subscribe())
.doOnUnbound(
(ds) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.netty.http.client.HttpClient;
Expand All @@ -47,6 +50,13 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
executor.shutdownGracefully().get(30, TimeUnit.SECONDS);
}

Disposable channelChecker = Disposables.disposed();

@AfterEach
void cleanupCheck() {
channelChecker.dispose();
}

private final TransportPair transportPair =
new TransportPair<>(
() -> new InetSocketAddress("localhost", 0),
Expand All @@ -66,6 +76,10 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
(address, allocator) -> {
try {
DefaultChannelGroup channels = new DefaultChannelGroup(executor);
channelChecker =
Flux.interval(Duration.ofSeconds(1))
.doOnNext(__ -> channels.forEach(c -> logger.info("Active Channel " + c)))
.subscribe();
SelfSignedCertificate ssc = new SelfSignedCertificate();
HttpServer server =
HttpServer.create()
Expand All @@ -76,14 +90,6 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx
ssl.sslContext(
SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())))
.channelGroup(channels)
.doOnBound(
ds ->
Flux.interval(Duration.ofSeconds(1))
.doOnNext(
__ ->
channels.forEach(c -> logger.info("Active Channel " + c)))
.takeUntilOther(ds.onDispose().log("-disposing-"))
.subscribe())
.doOnUnbound(
(ds) -> {
try {
Expand Down

0 comments on commit 857ae08

Please sign in to comment.