Skip to content

Commit

Permalink
ensures there is no extra logging about a dropped error (#859)
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed Jun 4, 2020
1 parent 01e6e94 commit 8959bf7
Showing 1 changed file with 20 additions and 32 deletions.
52 changes: 20 additions & 32 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
Expand Up @@ -412,20 +412,19 @@ protected void hookOnNext(Payload payload) {

@Override
protected void hookOnError(Throwable throwable) {
handleError(streamId, throwable);
if (sendingSubscriptions.remove(streamId, this)) {
handleError(streamId, throwable);
}
}

@Override
protected void hookOnComplete() {
if (isEmpty) {
sendProcessor.onNext(PayloadFrameCodec.encodeComplete(allocator, streamId));
if (sendingSubscriptions.remove(streamId, this)) {
sendProcessor.onNext(PayloadFrameCodec.encodeComplete(allocator, streamId));
}
}
}

@Override
protected void hookFinally(SignalType type) {
sendingSubscriptions.remove(streamId, this);
}
};

sendingSubscriptions.put(streamId, subscriber);
Expand Down Expand Up @@ -491,36 +490,17 @@ protected void hookOnNext(Payload payload) {

@Override
protected void hookOnComplete() {
sendProcessor.onNext(PayloadFrameCodec.encodeComplete(allocator, streamId));
if (sendingSubscriptions.remove(streamId, this)) {
sendProcessor.onNext(PayloadFrameCodec.encodeComplete(allocator, streamId));
}
}

@Override
protected void hookOnError(Throwable throwable) {
handleError(streamId, throwable);
}

@Override
protected void hookOnCancel() {
// specifically for requestChannel case so when requester sends Cancel frame so the
// whole chain MUST be terminated
// Note: CancelFrame is redundant from the responder side due to spec
// (https://github.com/rsocket/rsocket/blob/master/Protocol.md#request-channel)
// Upon receiving a CANCEL, the stream is terminated on the Responder.
// Upon sending a CANCEL, the stream is terminated on the Requester.
if (requestChannel != null) {
channelProcessors.remove(streamId, requestChannel);
try {
requestChannel.dispose();
} catch (Exception e) {
// might be thrown back if stream is cancelled
}
if (sendingSubscriptions.remove(streamId, this)) {
handleError(streamId, throwable);
}
}

@Override
protected void hookFinally(SignalType type) {
sendingSubscriptions.remove(streamId);
}
};

sendingSubscriptions.put(streamId, subscriber);
Expand Down Expand Up @@ -588,7 +568,15 @@ protected void hookOnError(Throwable throwable) {}

private void handleCancelFrame(int streamId) {
Subscription subscription = sendingSubscriptions.remove(streamId);
channelProcessors.remove(streamId);
Processor<Payload, Payload> processor = channelProcessors.remove(streamId);

if (processor != null) {
try {
processor.onError(new CancellationException("Disposed"));
} catch (Exception e) {
// ignore
}
}

if (subscription != null) {
subscription.cancel();
Expand Down

0 comments on commit 8959bf7

Please sign in to comment.