diff --git a/src/main/java/io/reactivesocket/Requester.java b/src/main/java/io/reactivesocket/Requester.java index 3ada2d379..15d48b5d3 100644 --- a/src/main/java/io/reactivesocket/Requester.java +++ b/src/main/java/io/reactivesocket/Requester.java @@ -15,17 +15,8 @@ */ package io.reactivesocket; -import static rx.Observable.empty; -import static rx.Observable.error; -import static rx.Observable.just; -import static rx.RxReactiveStreams.toObservable; -import static rx.RxReactiveStreams.toPublisher; - -import java.util.concurrent.atomic.AtomicBoolean; - import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; - import rx.Observable; import rx.Producer; import rx.observables.ConnectableObservable; @@ -33,6 +24,14 @@ import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; +import java.util.concurrent.atomic.AtomicBoolean; + +import static rx.Observable.empty; +import static rx.Observable.error; +import static rx.Observable.just; +import static rx.RxReactiveStreams.toObservable; +import static rx.RxReactiveStreams.toPublisher; + /** * RProtocol implementation abstracted over a {@link DuplexConnection}. *
@@ -138,27 +137,36 @@ private void start() { Observable input = multicastedInputStream .filter(m -> m.getStreamId() == requestFrame.getStreamId()); + AtomicBoolean terminated = new AtomicBoolean(false); // combine input and output so errors and unsubscription are composed, then subscribe rx.Subscription subscription = Observable - .merge(input, written.cast(Frame.class)) - .takeUntil(m -> (m.getType() == FrameType.COMPLETE - || m.getType() == FrameType.ERROR)) - .flatMap(m -> { - // convert ERROR/COMPLETE messages into terminal events - if (m.getType() == FrameType.ERROR) { - return error(new Exception(m.getData())); - } else if (m.getType() == FrameType.COMPLETE) { - return empty();// unsubscribe handled in takeUntil above - } else if (m.getType() == FrameType.NEXT) { - return just(m.getData()); - } else { - return error(new Exception("Unexpected FrameType: " + m.getType())); - } - }).subscribe(Subscribers.from(child));// only propagate Observer methods, backpressure is via Producer above + .merge(input, written.cast(Frame.class)) + .takeUntil(m -> (m.getType() == FrameType.COMPLETE + || m.getType() == FrameType.ERROR)) + .flatMap(m -> { + // convert ERROR/COMPLETE messages into terminal events + if (m.getType() == FrameType.ERROR) { + terminated.set(true); + return error(new Exception(m.getData())); + } else if (m.getType() == FrameType.COMPLETE) { + terminated.set(true); + return empty();// unsubscribe handled in takeUntil above + } else if (m.getType() == FrameType.NEXT) { + return just(m.getData()); + } else if (m.getType() == FrameType.NEXT_COMPLETE) { + terminated.set(true); + return just(m.getData()); + } else { + return error(new Exception("Unexpected FrameType: " + m.getType())); + } + }) + .subscribe(Subscribers.from(child));// only propagate Observer methods, backpressure is via Producer above // if the child unsubscribes, we need to send a CANCEL message if we're not terminated child.add(Subscriptions.create(() -> { - writer.onNext(just(Frame.from(requestFrame.getStreamId(), FrameType.CANCEL, ""))); + if (!terminated.get()) { + writer.onNext(just(Frame.from(requestFrame.getStreamId(), FrameType.CANCEL, ""))); + } // after sending the CANCEL we then tear down this stream subscription.unsubscribe(); })); diff --git a/src/main/java/io/reactivesocket/Responder.java b/src/main/java/io/reactivesocket/Responder.java index fd7dfb544..4da9e545f 100644 --- a/src/main/java/io/reactivesocket/Responder.java +++ b/src/main/java/io/reactivesocket/Responder.java @@ -15,21 +15,20 @@ */ package io.reactivesocket; -import static rx.Observable.empty; -import static rx.Observable.error; -import static rx.Observable.just; -import static rx.RxReactiveStreams.toObservable; -import static rx.RxReactiveStreams.toPublisher; - -import java.util.concurrent.ConcurrentHashMap; - import org.reactivestreams.Publisher; - import rx.Observable; import rx.Subscriber; import rx.functions.Func0; import rx.functions.Func1; +import java.util.concurrent.ConcurrentHashMap; + +import static rx.Observable.empty; +import static rx.Observable.error; +import static rx.Observable.just; +import static rx.RxReactiveStreams.toObservable; +import static rx.RxReactiveStreams.toPublisher; + /** * Protocol implementation abstracted over a {@link DuplexConnection}. *
@@ -99,11 +98,7 @@ private Observable