Skip to content

Commit

Permalink
fixes tests
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed May 11, 2020
1 parent 8d7a59d commit 7e035bc
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Expand Up @@ -50,7 +50,6 @@
import io.rsocket.keepalive.KeepAliveSupport;
import io.rsocket.lease.RequesterLeaseHandler;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
Expand All @@ -64,6 +63,7 @@
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;
Expand Down Expand Up @@ -109,6 +109,7 @@ class RSocketRequester implements RSocket {
private final RequesterLeaseHandler leaseHandler;
private final ByteBufAllocator allocator;
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
private final MonoProcessor<Void> onClose;
private final Scheduler serialScheduler;

RSocketRequester(
Expand All @@ -129,6 +130,7 @@ class RSocketRequester implements RSocket {
this.leaseHandler = leaseHandler;
this.senders = new SynchronizedIntObjectHashMap<>();
this.receivers = new SynchronizedIntObjectHashMap<>();
this.onClose = MonoProcessor.create();
this.serialScheduler = serialScheduler;

// DO NOT Change the order here. The Send processor must be subscribed to before receiving
Expand Down Expand Up @@ -182,17 +184,17 @@ public double availability() {

@Override
public void dispose() {
tryTerminate(() -> new CancellationException("Disposed"));
tryShutdown();
}

@Override
public boolean isDisposed() {
return connection.isDisposed();
return onClose.isDisposed();
}

@Override
public Mono<Void> onClose() {
return connection.onClose();
return onClose;
}

private Mono<Void> handleFireAndForget(Payload payload) {
Expand Down Expand Up @@ -758,6 +760,11 @@ private void terminate(Throwable e) {
senders.clear();
receivers.clear();
sendProcessor.dispose();
if (e == CLOSED_CHANNEL_EXCEPTION) {
onClose.onComplete();
} else {
onClose.onError(e);
}
}

private void handleSendProcessorError(Throwable t) {
Expand Down

0 comments on commit 7e035bc

Please sign in to comment.