-
Notifications
You must be signed in to change notification settings - Fork 357
Closed
Labels
Description
io.rsocket.core.RSocketResponder#handleRequestResponse
private void handleRequestResponse(int streamId, Mono<Payload> response) {
final BaseSubscriber<Payload> subscriber =
new BaseSubscriber<Payload>() {
private boolean isEmpty = true;
@Override
protected void hookOnNext(Payload payload) {
if (isEmpty) {
isEmpty = false;
}
if (!PayloadValidationUtils.isValid(mtu, payload, maxFrameLength)) {
payload.release();
cancel();
final IllegalArgumentException t =
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
handleError(streamId, t);
return;
}
ByteBuf byteBuf =
PayloadFrameCodec.encodeNextCompleteReleasingPayload(allocator, streamId, payload);
sendProcessor.onNext(byteBuf);
}
@Override
protected void hookOnError(Throwable throwable) {
if (sendingSubscriptions.remove(streamId, this)) {
handleError(streamId, throwable);
}
}
@Override
protected void hookOnComplete() {
if (isEmpty) {
if (sendingSubscriptions.remove(streamId, this)) {
sendProcessor.onNext(PayloadFrameCodec.encodeComplete(allocator, streamId));
}
}
}
};
sendingSubscriptions.put(streamId, subscriber);
response.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER).subscribe(subscriber);
}set isEmpty=false in hookOnNext, but there is no code to set it to true afterwards. sendingSubscriptions.remove(streamId, this) in hookOnComplete has no chance to be executed. cause the subscriber object cannot be recycle.
Why not put sendingSubscriptions.remove(streamId, this) into hookFinally?