Skip to content

Commit

Permalink
logs
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
  • Loading branch information
OlegDokuka committed Mar 31, 2023
1 parent 5d57926 commit f92d4b6
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {

if (keepAliveTickPeriod != 0 && keepAliveHandler != null) {
KeepAliveSupport keepAliveSupport =
new ClientKeepAliveSupport(this.getAllocator(), keepAliveTickPeriod, keepAliveAckTimeout);
new ClientKeepAliveSupport(
connection, this.getAllocator(), keepAliveTickPeriod, keepAliveAckTimeout);
this.keepAliveFramesAcceptor =
keepAliveHandler.start(
keepAliveSupport,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.KeepAliveFrameCodec;
import io.rsocket.resume.ResumeStateHolder;
import java.time.Duration;
Expand All @@ -37,6 +38,7 @@ public abstract class KeepAliveSupport implements KeepAliveFramesAcceptor {
final RuntimeException exception = new RuntimeException();
static final Logger LOGGER = LoggerFactory.getLogger(KeepAliveSupport.class);

final DuplexConnection connection;
final ByteBufAllocator allocator;
final Scheduler scheduler;
final Duration keepAliveInterval;
Expand All @@ -61,7 +63,11 @@ public abstract class KeepAliveSupport implements KeepAliveFramesAcceptor {
volatile long lastReceivedMillis;

private KeepAliveSupport(
ByteBufAllocator allocator, int keepAliveInterval, int keepAliveTimeout) {
DuplexConnection connection,
ByteBufAllocator allocator,
int keepAliveInterval,
int keepAliveTimeout) {
this.connection = connection;
this.allocator = allocator;
this.scheduler = Schedulers.parallel();
this.keepAliveInterval = Duration.ofMillis(keepAliveInterval);
Expand Down Expand Up @@ -141,7 +147,9 @@ void send(ByteBuf frame) {
+ " "
+ this
+ " "
+ Arrays.toString(exception.getStackTrace()));
+ Arrays.toString(exception.getStackTrace())
+ " "
+ connection);
if (onFrameSent != null) {
onFrameSent.accept(frame);
}
Expand Down Expand Up @@ -183,8 +191,11 @@ long localLastReceivedPosition() {
public static final class ClientKeepAliveSupport extends KeepAliveSupport {

public ClientKeepAliveSupport(
ByteBufAllocator allocator, int keepAliveInterval, int keepAliveTimeout) {
super(allocator, keepAliveInterval, keepAliveTimeout);
DuplexConnection connection,
ByteBufAllocator allocator,
int keepAliveInterval,
int keepAliveTimeout) {
super(connection, allocator, keepAliveInterval, keepAliveTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ void sessionTimeoutSmokeTest() {
true);

final KeepAliveSupport.ClientKeepAliveSupport keepAliveSupport =
new KeepAliveSupport.ClientKeepAliveSupport(transport.alloc(), 1000000, 10000000);
new KeepAliveSupport.ClientKeepAliveSupport(
resumableDuplexConnection, transport.alloc(), 1000000, 10000000);
session.setKeepAliveSupport(keepAliveSupport);

// connection is active. just advance time
Expand Down Expand Up @@ -208,7 +209,8 @@ void sessionTerminationOnWrongFrameTest() {
true);

final KeepAliveSupport.ClientKeepAliveSupport keepAliveSupport =
new KeepAliveSupport.ClientKeepAliveSupport(transport.alloc(), 1000000, 10000000);
new KeepAliveSupport.ClientKeepAliveSupport(
resumableDuplexConnection, transport.alloc(), 1000000, 10000000);
session.setKeepAliveSupport(keepAliveSupport);

// connection is active. just advance time
Expand Down Expand Up @@ -340,7 +342,8 @@ void shouldErrorWithNoRetriesOnErrorFrameTest() {
true);

final KeepAliveSupport.ClientKeepAliveSupport keepAliveSupport =
new KeepAliveSupport.ClientKeepAliveSupport(transport.alloc(), 1000000, 10000000);
new KeepAliveSupport.ClientKeepAliveSupport(
resumableDuplexConnection, transport.alloc(), 1000000, 10000000);
session.setKeepAliveSupport(keepAliveSupport);

// connection is active. just advance time
Expand Down Expand Up @@ -436,7 +439,8 @@ void shouldTerminateConnectionOnIllegalStateInKeepAliveFrame() {
true);

final KeepAliveSupport.ClientKeepAliveSupport keepAliveSupport =
new KeepAliveSupport.ClientKeepAliveSupport(transport.alloc(), 1000000, 10000000);
new KeepAliveSupport.ClientKeepAliveSupport(
resumableDuplexConnection, transport.alloc(), 1000000, 10000000);
keepAliveSupport.resumeState(session);
session.setKeepAliveSupport(keepAliveSupport);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ void sessionTimeoutSmokeTest() {
true);

final KeepAliveSupport.ClientKeepAliveSupport keepAliveSupport =
new KeepAliveSupport.ClientKeepAliveSupport(transport.alloc(), 1000000, 10000000);
new KeepAliveSupport.ClientKeepAliveSupport(
resumableDuplexConnection, transport.alloc(), 1000000, 10000000);
session.setKeepAliveSupport(keepAliveSupport);

// connection is active. just advance time
Expand Down Expand Up @@ -151,7 +152,8 @@ void shouldTerminateConnectionOnIllegalStateInKeepAliveFrame() {
true);

final KeepAliveSupport.ClientKeepAliveSupport keepAliveSupport =
new KeepAliveSupport.ClientKeepAliveSupport(transport.alloc(), 1000000, 10000000);
new KeepAliveSupport.ClientKeepAliveSupport(
resumableDuplexConnection, transport.alloc(), 1000000, 10000000);
keepAliveSupport.resumeState(session);
session.setKeepAliveSupport(keepAliveSupport);

Expand Down

0 comments on commit f92d4b6

Please sign in to comment.