Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 33 additions & 25 deletions src/main/java/io/reactivesocket/Requester.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,23 @@
*/
package io.reactivesocket;

import static rx.Observable.empty;
import static rx.Observable.error;
import static rx.Observable.just;
import static rx.RxReactiveStreams.toObservable;
import static rx.RxReactiveStreams.toPublisher;

import java.util.concurrent.atomic.AtomicBoolean;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import rx.Observable;
import rx.Producer;
import rx.observables.ConnectableObservable;
import rx.observers.Subscribers;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;

import java.util.concurrent.atomic.AtomicBoolean;

import static rx.Observable.empty;
import static rx.Observable.error;
import static rx.Observable.just;
import static rx.RxReactiveStreams.toObservable;
import static rx.RxReactiveStreams.toPublisher;

/**
* RProtocol implementation abstracted over a {@link DuplexConnection}.
* <p>
Expand Down Expand Up @@ -138,27 +137,36 @@ private void start() {
Observable<Frame> input = multicastedInputStream
.filter(m -> m.getStreamId() == requestFrame.getStreamId());

AtomicBoolean terminated = new AtomicBoolean(false);
// combine input and output so errors and unsubscription are composed, then subscribe
rx.Subscription subscription = Observable
.merge(input, written.cast(Frame.class))
.takeUntil(m -> (m.getType() == FrameType.COMPLETE
|| m.getType() == FrameType.ERROR))
.flatMap(m -> {
// convert ERROR/COMPLETE messages into terminal events
if (m.getType() == FrameType.ERROR) {
return error(new Exception(m.getData()));
} else if (m.getType() == FrameType.COMPLETE) {
return empty();// unsubscribe handled in takeUntil above
} else if (m.getType() == FrameType.NEXT) {
return just(m.getData());
} else {
return error(new Exception("Unexpected FrameType: " + m.getType()));
}
}).subscribe(Subscribers.from(child));// only propagate Observer methods, backpressure is via Producer above
.merge(input, written.cast(Frame.class))
.takeUntil(m -> (m.getType() == FrameType.COMPLETE
|| m.getType() == FrameType.ERROR))
.flatMap(m -> {
// convert ERROR/COMPLETE messages into terminal events
if (m.getType() == FrameType.ERROR) {
terminated.set(true);
return error(new Exception(m.getData()));
} else if (m.getType() == FrameType.COMPLETE) {
terminated.set(true);
return empty();// unsubscribe handled in takeUntil above
} else if (m.getType() == FrameType.NEXT) {
return just(m.getData());
} else if (m.getType() == FrameType.NEXT_COMPLETE) {
terminated.set(true);
return just(m.getData());
} else {
return error(new Exception("Unexpected FrameType: " + m.getType()));
}
})
.subscribe(Subscribers.from(child));// only propagate Observer methods, backpressure is via Producer above

// if the child unsubscribes, we need to send a CANCEL message if we're not terminated
child.add(Subscriptions.create(() -> {
writer.onNext(just(Frame.from(requestFrame.getStreamId(), FrameType.CANCEL, "")));
if (!terminated.get()) {
writer.onNext(just(Frame.from(requestFrame.getStreamId(), FrameType.CANCEL, "")));
}
// after sending the CANCEL we then tear down this stream
subscription.unsubscribe();
}));
Expand Down
26 changes: 10 additions & 16 deletions src/main/java/io/reactivesocket/Responder.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@
*/
package io.reactivesocket;

import static rx.Observable.empty;
import static rx.Observable.error;
import static rx.Observable.just;
import static rx.RxReactiveStreams.toObservable;
import static rx.RxReactiveStreams.toPublisher;

import java.util.concurrent.ConcurrentHashMap;

import org.reactivestreams.Publisher;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func0;
import rx.functions.Func1;

import java.util.concurrent.ConcurrentHashMap;

import static rx.Observable.empty;
import static rx.Observable.error;
import static rx.Observable.just;
import static rx.RxReactiveStreams.toObservable;
import static rx.RxReactiveStreams.toPublisher;

/**
* Protocol implementation abstracted over a {@link DuplexConnection}.
* <p>
Expand Down Expand Up @@ -99,11 +98,7 @@ private Observable<Void> handleRequestResponse(DuplexConnection ws, Frame reques
toObservable(requestHandler.handleRequestResponse(requestFrame.getData()))
.single()// enforce that it is a request/response
.flatMap(v -> just(
// TODO evaluate this ... as it is less efficient than a special NEXT_COMPLETE type
// TODO as a stream of 2 can not be as easily optimized like a scalar response
// NEXT with immediate COMPLETE as we have a single NEXT
Frame.from(streamId, FrameType.NEXT, v),
Frame.from(streamId, FrameType.COMPLETE, "")))
Frame.from(streamId, FrameType.NEXT_COMPLETE, v)))
.onErrorReturn(err -> Frame.from(streamId, FrameType.ERROR, err.getMessage()))
.takeUntil(cancellationToken)
.finallyDo(() -> cancellationObservables.remove(streamId)))));
Expand Down Expand Up @@ -175,8 +170,7 @@ private Observable<Void> handleStream(

/**
* Fire-and-Forget so we invoke the handler and return nothing, not even errors.
*
* @param ws
*
* @param requestFrame
* @return
*/
Expand Down
35 changes: 17 additions & 18 deletions src/test/java/io/reactivesocket/ResponderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,27 @@
*/
package io.reactivesocket;

import static org.junit.Assert.*;
import static rx.Observable.*;
import static rx.RxReactiveStreams.*;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;
import org.reactivestreams.Subscription;

import rx.Observable;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import rx.subjects.ReplaySubject;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static rx.Observable.error;
import static rx.Observable.interval;
import static rx.Observable.just;
import static rx.Observable.never;
import static rx.Observable.range;
import static rx.RxReactiveStreams.toPublisher;

public class ResponderTest
{
@Test
Expand All @@ -45,20 +50,14 @@ public void testRequestResponseSuccess() {
// perform a request/response
conn.toInput.onNext(Frame.from(1, FrameType.REQUEST_RESPONSE, "hello"));

// TODO do we want to receive 2 frames, or just a single NEXT_COMPLETE?
assertEquals(2, cachedResponses.getValues().length);// 1 onNext + 1 onCompleted
List<Frame> frames = cachedResponses.take(2).toList().toBlocking().first();
assertEquals(1, cachedResponses.getValues().length);// 1 onNext + 1 onCompleted
List<Frame> frames = cachedResponses.take(1).toList().toBlocking().first();

// assert
Frame first = frames.get(0);
assertEquals(1, first.getStreamId());
assertEquals(FrameType.NEXT, first.getType());
assertEquals(FrameType.NEXT_COMPLETE, first.getType());
assertEquals("hello world", first.getData());

Frame second = frames.get(1);
assertEquals(1, second.getStreamId());
assertEquals(FrameType.COMPLETE, second.getType());
assertEquals("", second.getData());
}

@Test
Expand Down