diff --git a/build.gradle b/build.gradle index 9017ca2aa..e72f3bc09 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ buildscript { - repositories { + repositories { jcenter() } @@ -12,6 +12,9 @@ apply plugin: 'reactivesocket-project' apply plugin: 'java' dependencies { + compile 'io.reactivex:rxjava:1.0.13' + compile 'io.reactivex:rxjava-reactive-streams:1.0.1' + compile 'org.reactivestreams:reactive-streams:1.0.0.final' testCompile 'junit:junit-dep:4.10' testCompile 'org.mockito:mockito-core:1.8.5' } diff --git a/src/main/java/io/reactivesocket/CancellationToken.java b/src/main/java/io/reactivesocket/CancellationToken.java new file mode 100644 index 000000000..b83197379 --- /dev/null +++ b/src/main/java/io/reactivesocket/CancellationToken.java @@ -0,0 +1,64 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket; + +import rx.Observable; +import rx.Subscriber; + +/** + * Intended to ONLY support a single Subscriber and Publisher for notification of a cancellation event, such as with takeUntil. + */ +/* package */ class CancellationToken extends Observable { + + public static CancellationToken create() { + return new CancellationToken(new State()); + } + + private State state; + + protected CancellationToken(State state) { + super(s -> { + synchronized (state) { + if (state.cancelled) { + s.onCompleted(); // always onComplete when cancelled + } else { + if (state.subscriber != null) { + throw new IllegalStateException("Only 1 Subscriber permitted on a CancellationSubject"); + } else { + state.subscriber = s; + } + } + } + }); + this.state = state; + } + + public final void cancel() { + Subscriber emitTo; + synchronized (state) { + state.cancelled = true; + emitTo = state.subscriber; + } + if (emitTo != null) { + emitTo.onCompleted(); + } + } + + public static class State { + private Subscriber subscriber; + private boolean cancelled = false; + } +} diff --git a/src/main/java/io/reactivesocket/DuplexConnection.java b/src/main/java/io/reactivesocket/DuplexConnection.java new file mode 100644 index 000000000..98cc047e1 --- /dev/null +++ b/src/main/java/io/reactivesocket/DuplexConnection.java @@ -0,0 +1,26 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket; + +import org.reactivestreams.Publisher; + +public interface DuplexConnection { + + Publisher getInput(); + + Publisher write(Message o); + +} diff --git a/src/main/java/io/reactivesocket/Message.java b/src/main/java/io/reactivesocket/Message.java new file mode 100644 index 000000000..83f1568bd --- /dev/null +++ b/src/main/java/io/reactivesocket/Message.java @@ -0,0 +1,138 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket; + +import java.nio.ByteBuffer; + +public class Message { + + private Message() { + } + + // not final so we can reuse this object + private ByteBuffer b; + private int messageId; + private MessageType type; + private String message; + + public ByteBuffer getBytes() { + return b; + } + + public String getMessage() { + if (type == null) { + decode(); + } + return message; + } + + public int getMessageId() { + if (type == null) { + decode(); + } + return messageId; + } + + public MessageType getMessageType() { + if (type == null) { + decode(); + } + return type; + } + + /** + * Mutates this Frame to contain the given ByteBuffer + * + * @param b + */ + public void wrap(ByteBuffer b) { + this.messageId = -1; + this.type = null; + this.message = null; + this.b = b; + } + + /** + * Construct a new Frame from the given ByteBuffer + * + * @param b + * @return + */ + public static Message from(ByteBuffer b) { + Message f = new Message(); + f.b = b; + return f; + } + + /** + * Mutates this Frame to contain the given message. + * + * @param messageId + * @param type + * @param message + */ + public void wrap(int messageId, MessageType type, String message) { + this.messageId = messageId; + this.type = type; + this.message = message; + this.b = getBytes(messageId, type, message); + } + + /** + * Construct a new Frame with the given message. + * + * @param messageId + * @param type + * @param message + * @return + */ + public static Message from(int messageId, MessageType type, String message) { + Message f = new Message(); + f.b = getBytes(messageId, type, message); + f.messageId = messageId; + f.type = type; + f.message = message; + return f; + } + + private static ByteBuffer getBytes(int messageId, MessageType type, String message) { + // TODO replace with binary + /** + * This is NOT how we want it for real. Just representing the idea for discussion. + */ + String s = "[" + type.ordinal() + "]" + getIdString(messageId) + message; + // TODO stop allocating ... use flywheels + return ByteBuffer.wrap(s.getBytes()); + } + + private static String getIdString(int id) { + return "[" + id + "]|"; + } + + private void decode() { + // TODO replace with binary + /** + * This is NOT how we want it for real. Just representing the idea for discussion. + */ + String data = new String(b.array()); + int separator = data.indexOf('|'); + String prefix = data.substring(0, separator); + this.type = MessageType.values[Integer.parseInt(prefix.substring(1, data.indexOf(']')))]; + this.messageId = Integer.parseInt(prefix.substring(prefix.lastIndexOf("[") + 1, prefix.length() - 1)); + this.message = data.substring(separator + 1, data.length()); + } + +} diff --git a/src/main/java/io/reactivesocket/MessageType.java b/src/main/java/io/reactivesocket/MessageType.java new file mode 100644 index 000000000..239123bdf --- /dev/null +++ b/src/main/java/io/reactivesocket/MessageType.java @@ -0,0 +1,22 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket; + +enum MessageType { + // DO NOT REORDER OR INSERT NEW ELEMENTS. THE ORDINAL IS PART OF THE PROTOCOL + SUBSCRIBE_REQUEST_RESPONSE, SUBSCRIBE_STREAM, STREAM_REQUEST, DISPOSE, NEXT_COMPLETE, NEXT, ERROR, COMPLETE; + public static MessageType[] values = MessageType.values(); // cached for performance +} \ No newline at end of file diff --git a/src/main/java/io/reactivesocket/ReactiveSocketServerProtocol.java b/src/main/java/io/reactivesocket/ReactiveSocketServerProtocol.java new file mode 100644 index 000000000..a06a47eac --- /dev/null +++ b/src/main/java/io/reactivesocket/ReactiveSocketServerProtocol.java @@ -0,0 +1,82 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket; + +import static rx.Observable.*; +import static rx.RxReactiveStreams.*; + +import java.util.concurrent.ConcurrentHashMap; + +import org.reactivestreams.Publisher; + +import rx.functions.Func1; + +public class ReactiveSocketServerProtocol { + + private Func1> requestResponseHandler; + private Func1> requestStreamHandler; + + ReactiveSocketServerProtocol( + Func1> requestResponseHandler, + Func1> requestStreamHandler) { + this.requestResponseHandler = requestResponseHandler; + this.requestStreamHandler = requestStreamHandler; + } + + public Publisher acceptConnection(DuplexConnection ws) { + /* state of cancellation subjects during connection */ + // TODO consider using the LongObjectHashMap from Agrona for perf improvement + // TODO consider alternate to PublishSubject that assumes a single subscriber and is lighter + final ConcurrentHashMap cancellationObservables = new ConcurrentHashMap<>(); + + return toPublisher(toObservable(ws.getInput()).flatMap(message -> { + if (message.getMessageType() == MessageType.SUBSCRIBE_REQUEST_RESPONSE) { + CancellationToken cancellationToken = CancellationToken.create(); + cancellationObservables.put(message.getMessageId(), cancellationToken); + + return toObservable(requestResponseHandler.call(message.getMessage())) + .single() // enforce that it is a request/response + .map(v -> Message.from(message.getMessageId(), MessageType.NEXT_COMPLETE, v)) + .onErrorReturn(err -> Message.from(message.getMessageId(), MessageType.ERROR, err.getMessage())) + .flatMap(payload -> toObservable(ws.write(payload))) + .takeUntil(cancellationToken) + .finallyDo(() -> cancellationObservables.remove(message.getMessageId())); + } else if (message.getMessageType() == MessageType.SUBSCRIBE_STREAM) { + CancellationToken cancellationToken = CancellationToken.create(); + cancellationObservables.put(message.getMessageId(), cancellationToken); + + //@formatter:off + return toObservable(requestStreamHandler.call(message.getMessage())) + .flatMap(s -> toObservable(ws.write(Message.from(message.getMessageId(), MessageType.NEXT, s))), + err -> toObservable(ws.write(Message.from(message.getMessageId(), MessageType.ERROR, err.getMessage()))), + () -> toObservable(ws.write(Message.from(message.getMessageId(), MessageType.COMPLETE, ""))) + ) + .takeUntil(cancellationToken) + .finallyDo(() -> cancellationObservables.remove(message.getMessageId())); + //@formatter:on + } else if (message.getMessageType() == MessageType.DISPOSE) { + CancellationToken cancellationToken = cancellationObservables.get(message.getMessageId()); + if (cancellationToken != null) { + cancellationToken.cancel(); + } + return empty(); + } else { + return error(new IllegalStateException("Unexpected prefix: " + message.getMessageType())); + } + })); + } + +} diff --git a/src/test/java/io/reactivesocket/MessageTest.java b/src/test/java/io/reactivesocket/MessageTest.java new file mode 100644 index 000000000..0b6844d06 --- /dev/null +++ b/src/test/java/io/reactivesocket/MessageTest.java @@ -0,0 +1,63 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket; + +import static org.junit.Assert.*; + +import java.nio.ByteBuffer; + +import org.junit.Test; + +public class MessageTest { + + @Test + public void testWriteThenRead() { + Message f = Message.from(1, MessageType.SUBSCRIBE_REQUEST_RESPONSE, "hello"); + assertEquals("hello", f.getMessage()); + assertEquals(MessageType.SUBSCRIBE_REQUEST_RESPONSE, f.getMessageType()); + assertEquals(1, f.getMessageId()); + + ByteBuffer b = f.getBytes(); + + Message f2 = Message.from(b); + assertEquals("hello", f2.getMessage()); + assertEquals(MessageType.SUBSCRIBE_REQUEST_RESPONSE, f2.getMessageType()); + assertEquals(1, f2.getMessageId()); + } + + @Test + public void testWrapMessage() { + Message f = Message.from(1, MessageType.SUBSCRIBE_REQUEST_RESPONSE, "hello"); + + f.wrap(2, MessageType.COMPLETE, "done"); + assertEquals("done", f.getMessage()); + assertEquals(MessageType.COMPLETE, f.getMessageType()); + assertEquals(2, f.getMessageId()); + } + + @Test + public void testWrapBytes() { + Message f = Message.from(1, MessageType.SUBSCRIBE_REQUEST_RESPONSE, "hello"); + Message f2 = Message.from(20, MessageType.COMPLETE, "another"); + + ByteBuffer b = f2.getBytes(); + f.wrap(b); + + assertEquals("another", f.getMessage()); + assertEquals(MessageType.COMPLETE, f.getMessageType()); + assertEquals(20, f.getMessageId()); + } +} diff --git a/src/test/java/io/reactivesocket/ReactiveSocketServerProtocolTest.java b/src/test/java/io/reactivesocket/ReactiveSocketServerProtocolTest.java new file mode 100644 index 000000000..cc14c997b --- /dev/null +++ b/src/test/java/io/reactivesocket/ReactiveSocketServerProtocolTest.java @@ -0,0 +1,288 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket; + +import static org.junit.Assert.*; +import static rx.Observable.*; +import static rx.RxReactiveStreams.*; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; +import org.reactivestreams.Subscription; + +import rx.Observable; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; +import rx.subjects.ReplaySubject; + +public class ReactiveSocketServerProtocolTest { + + @Test + public void testRequestResponseSuccess() { + ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol( + request -> toPublisher(just(request + " world")), + null); + + TestConnection conn = establishConnection(p); + Observable cachedResponses = captureResponses(conn); + + // perform a request/response + conn.toServer.onNext(Message.from(1, MessageType.SUBSCRIBE_REQUEST_RESPONSE, "hello")); + + // assert + Message first = cachedResponses.toBlocking().first(); + assertEquals(1, first.getMessageId()); + assertEquals(MessageType.NEXT_COMPLETE, first.getMessageType()); + assertEquals("hello world", first.getMessage()); + } + + @Test + public void testRequestResponseError() { + ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol( + request -> toPublisher(error(new Exception("Request Not Found"))), + null); + + TestConnection conn = establishConnection(p); + Observable cachedResponses = captureResponses(conn); + + // perform a request/response + conn.toServer.onNext(Message.from(1, MessageType.SUBSCRIBE_REQUEST_RESPONSE, "hello")); + + // assert + Message first = cachedResponses.toBlocking().first(); + assertEquals(1, first.getMessageId()); + assertEquals(MessageType.ERROR, first.getMessageType()); + assertEquals("Request Not Found", first.getMessage()); + } + + @Test + public void testRequestResponseCancel() { + AtomicBoolean unsubscribed = new AtomicBoolean(); + Observable delayed = never() + .cast(String.class) + .doOnUnsubscribe(() -> unsubscribed.set(true)); + + ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol( + request -> toPublisher(delayed), + null); + + TestConnection conn = establishConnection(p); + ReplaySubject cachedResponses = captureResponses(conn); + + // perform a request/response + conn.toServer.onNext(Message.from(1, MessageType.SUBSCRIBE_REQUEST_RESPONSE, "hello")); + // assert no response + assertFalse(cachedResponses.hasAnyValue()); + // unsubscribe + assertFalse(unsubscribed.get()); + conn.toServer.onNext(Message.from(1, MessageType.DISPOSE, "")); + assertTrue(unsubscribed.get()); + } + + @Test + public void testRequestStreamSuccess() { + ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol( + null, + request -> toPublisher(range(Integer.parseInt(request), 10).map(i -> i + "!"))); + + TestConnection conn = establishConnection(p); + ReplaySubject cachedResponses = captureResponses(conn); + + // perform a request/response + conn.toServer.onNext(Message.from(1, MessageType.SUBSCRIBE_STREAM, "10")); + + // assert + assertEquals(11, cachedResponses.getValues().length); // 10 onNext + 1 onCompleted + List messages = cachedResponses.take(11).toList().toBlocking().first(); + + // 10 onNext messages + for (int i = 0; i < 10; i++) { + assertEquals(1, messages.get(i).getMessageId()); + assertEquals(MessageType.NEXT, messages.get(i).getMessageType()); + assertEquals((i + 10) + "!", messages.get(i).getMessage()); + } + + // last message is a COMPLETE + assertEquals(1, messages.get(10).getMessageId()); + assertEquals(MessageType.COMPLETE, messages.get(10).getMessageType()); + assertEquals("", messages.get(10).getMessage()); + } + + @Test + public void testRequestStreamError() { + ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol( + null, + request -> toPublisher(range(Integer.parseInt(request), 3) + .map(i -> i + "!") + .concatWith(error(new Exception("Error Occurred!"))))); + + TestConnection conn = establishConnection(p); + ReplaySubject cachedResponses = captureResponses(conn); + + // perform a request/response + conn.toServer.onNext(Message.from(1, MessageType.SUBSCRIBE_STREAM, "0")); + + // assert + assertEquals(4, cachedResponses.getValues().length); // 3 onNext + 1 onError + List messages = cachedResponses.take(4).toList().toBlocking().first(); + + // 3 onNext messages + for (int i = 0; i < 3; i++) { + assertEquals(1, messages.get(i).getMessageId()); + assertEquals(MessageType.NEXT, messages.get(i).getMessageType()); + assertEquals(i + "!", messages.get(i).getMessage()); + } + + // last message is an ERROR + assertEquals(1, messages.get(3).getMessageId()); + assertEquals(MessageType.ERROR, messages.get(3).getMessageType()); + assertEquals("Error Occurred!", messages.get(3).getMessage()); + } + + @Test + public void testRequestStreamCancel() { + TestScheduler ts = Schedulers.test(); + ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol( + null, + request -> toPublisher(interval(1000, TimeUnit.MILLISECONDS, ts).map(i -> i + "!"))); + + TestConnection conn = establishConnection(p); + ReplaySubject cachedResponses = captureResponses(conn); + + // perform a request/response + conn.toServer.onNext(Message.from(1, MessageType.SUBSCRIBE_STREAM, "/aRequest")); + + // no time has passed, so no values + assertEquals(0, cachedResponses.getValues().length); + ts.advanceTimeBy(1000, TimeUnit.MILLISECONDS); + assertEquals(1, cachedResponses.getValues().length); + ts.advanceTimeBy(2000, TimeUnit.MILLISECONDS); + assertEquals(3, cachedResponses.getValues().length); + // dispose + conn.toServer.onNext(Message.from(1, MessageType.DISPOSE, "")); + // still only 1 message + assertEquals(3, cachedResponses.getValues().length); + // advance again, nothing should happen + ts.advanceTimeBy(1000, TimeUnit.MILLISECONDS); + // should still only have 3 message, no ERROR or COMPLETED + assertEquals(3, cachedResponses.getValues().length); + + List messages = cachedResponses.take(3).toList().toBlocking().first(); + + // 3 onNext messages + for (int i = 0; i < 3; i++) { + assertEquals(1, messages.get(i).getMessageId()); + assertEquals(MessageType.NEXT, messages.get(i).getMessageType()); + assertEquals(i + "!", messages.get(i).getMessage()); + } + } + + @Test + public void testMultiplexedStreams() { + TestScheduler ts = Schedulers.test(); + ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol( + null, + request -> toPublisher(interval(1000, TimeUnit.MILLISECONDS, ts).map(i -> i + "_" + request))); + + TestConnection conn = establishConnection(p); + ReplaySubject cachedResponses = captureResponses(conn); + + // perform a request/response + conn.toServer.onNext(Message.from(1, MessageType.SUBSCRIBE_STREAM, "requestA")); + + // no time has passed, so no values + assertEquals(0, cachedResponses.getValues().length); + ts.advanceTimeBy(1000, TimeUnit.MILLISECONDS); + // we should have 1 message from A + assertEquals(1, cachedResponses.getValues().length); + // now request another stream + conn.toServer.onNext(Message.from(2, MessageType.SUBSCRIBE_STREAM, "requestB")); + // advance some more + ts.advanceTimeBy(2000, TimeUnit.MILLISECONDS); + // should have 3 from A and 2 from B + assertEquals(5, cachedResponses.getValues().length); + // dispose A, but leave B + conn.toServer.onNext(Message.from(1, MessageType.DISPOSE, "")); + // still same 5 messages + assertEquals(5, cachedResponses.getValues().length); + // advance again, should get 2 from B + ts.advanceTimeBy(2000, TimeUnit.MILLISECONDS); + assertEquals(7, cachedResponses.getValues().length); + + List messages = cachedResponses.take(7).toList().toBlocking().first(); + + // A messages (positions 0, 1, 3) incrementing 0, 1, 2 + assertEquals(1, messages.get(0).getMessageId()); + assertEquals("0_requestA", messages.get(0).getMessage()); + assertEquals(1, messages.get(1).getMessageId()); + assertEquals("1_requestA", messages.get(1).getMessage()); + assertEquals(1, messages.get(3).getMessageId()); + assertEquals("2_requestA", messages.get(3).getMessage()); + + // B messages (positions 2, 4, 5, 6) incrementing 0, 1, 2, 3 + assertEquals(2, messages.get(2).getMessageId()); + assertEquals("0_requestB", messages.get(2).getMessage()); + assertEquals(2, messages.get(4).getMessageId()); + assertEquals("1_requestB", messages.get(4).getMessage()); + assertEquals(2, messages.get(5).getMessageId()); + assertEquals("2_requestB", messages.get(5).getMessage()); + assertEquals(2, messages.get(6).getMessageId()); + assertEquals("3_requestB", messages.get(6).getMessage()); + } + + /* **********************************************************************************************/ + + private ReplaySubject captureResponses(TestConnection conn) { + // capture all responses to client + ReplaySubject rs = ReplaySubject.create(); + conn.toClient.subscribe(rs); + return rs; + } + + private TestConnection establishConnection(ReactiveSocketServerProtocol p) { + TestConnection conn = new TestConnection(); + p.acceptConnection(conn).subscribe(PROTOCOL_SUBSCRIBER); + return conn; + } + + private org.reactivestreams.Subscriber PROTOCOL_SUBSCRIBER = new org.reactivestreams.Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Void t) { + + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + @Override + public void onComplete() { + + } + + }; + +} diff --git a/src/test/java/io/reactivesocket/TestConnection.java b/src/test/java/io/reactivesocket/TestConnection.java new file mode 100644 index 000000000..fbc049551 --- /dev/null +++ b/src/test/java/io/reactivesocket/TestConnection.java @@ -0,0 +1,25 @@ +package io.reactivesocket; + +import static rx.RxReactiveStreams.*; + +import org.reactivestreams.Publisher; + +import rx.Observable; +import rx.subjects.PublishSubject; + +class TestConnection implements DuplexConnection { + + final PublishSubject toServer = PublishSubject.create(); + final PublishSubject toClient = PublishSubject.create(); + + @Override + public Publisher write(Message o) { + toClient.onNext(o); + return toPublisher(Observable. empty()); + } + + @Override + public Publisher getInput() { + return toPublisher(toServer); + } +} \ No newline at end of file