Skip to content

Commit

Permalink
logs
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
  • Loading branch information
OlegDokuka committed Mar 28, 2023
1 parent 02d166b commit 0004a6b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@
import io.rsocket.internal.BaseDuplexConnection;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.logging.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.netty.Connection;

/** An implementation of {@link DuplexConnection} that connects via TCP. */
public final class TcpDuplexConnection extends BaseDuplexConnection {
static final Logger LOGGER = LoggerFactory.getLogger(TcpDuplexConnection.class);

private final String side;
private final Connection connection;
Expand All @@ -57,7 +62,11 @@ public TcpDuplexConnection(String side, Connection connection) {
.outbound()
.send(sender.hide())
.then()
.doFinally(__ -> connection.dispose())
.doFinally(
__ -> {
connection.dispose();
LOGGER.info(this + " has been disposed");
})
.subscribe();
}

Expand Down Expand Up @@ -94,7 +103,16 @@ public void sendErrorAndClose(RSocketErrorException e) {

@Override
public Flux<ByteBuf> receive() {
return connection.inbound().receive().map(FrameLengthCodec::frame);
return connection
.inbound()
.receive()
.map(FrameLengthCodec::frame)
.log(
this + " receive=",
Level.INFO,
SignalType.ON_COMPLETE,
SignalType.ON_ERROR,
SignalType.CANCEL);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import io.rsocket.internal.BaseDuplexConnection;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.logging.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.netty.Connection;

/**
Expand All @@ -36,6 +40,7 @@
* stitched back on for frames received.
*/
public final class WebsocketDuplexConnection extends BaseDuplexConnection {
static final Logger LOGGER = LoggerFactory.getLogger(WebsocketDuplexConnection.class);

private final String side;
private final Connection connection;
Expand All @@ -62,7 +67,11 @@ public WebsocketDuplexConnection(String side, Connection connection) {
.outbound()
.sendObject(sender.map(BinaryWebSocketFrame::new).hide())
.then()
.doFinally(__ -> connection.dispose())
.doFinally(
__ -> {
connection.dispose();
LOGGER.info(this + " has been disposed");
})
.subscribe();
}

Expand Down Expand Up @@ -93,7 +102,15 @@ public Mono<Void> onClose() {

@Override
public Flux<ByteBuf> receive() {
return connection.inbound().receive();
return connection
.inbound()
.receive()
.log(
this + " receive=",
Level.INFO,
SignalType.ON_COMPLETE,
SignalType.ON_ERROR,
SignalType.CANCEL);
}

@Override
Expand Down

0 comments on commit 0004a6b

Please sign in to comment.