diff --git a/reactivesocket-core/build.gradle b/reactivesocket-core/build.gradle index 60fb959de..571b41756 100644 --- a/reactivesocket-core/build.gradle +++ b/reactivesocket-core/build.gradle @@ -1,3 +1,2 @@ dependencies { - testCompile 'io.reactivex:rxjava:2.0.0-DP0-20151003.214425-143' } \ No newline at end of file diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/TestFlowControlRequestN.java b/reactivesocket-core/src/test/java/io/reactivesocket/FlowControlRequestNTest.java similarity index 73% rename from reactivesocket-core/src/test/java/io/reactivesocket/TestFlowControlRequestN.java rename to reactivesocket-core/src/test/java/io/reactivesocket/FlowControlRequestNTest.java index 1d877ea03..639168d85 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/TestFlowControlRequestN.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/FlowControlRequestNTest.java @@ -29,18 +29,13 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import static io.reactivesocket.ConnectionSetupPayload.NO_FLAGS; -import static io.reactivesocket.TestUtil.byteToString; -import static io.reactivesocket.TestUtil.utf8EncodedPayload; -import static io.reactivex.Observable.error; -import static io.reactivex.Observable.fromPublisher; -import static io.reactivex.Observable.range; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class TestFlowControlRequestN { +import static io.reactivesocket.ConnectionSetupPayload.*; +import static io.reactivesocket.TestUtil.*; +import static org.junit.Assert.*; +import static rx.Observable.*; +import static rx.RxReactiveStreams.*; + +public class FlowControlRequestNTest { @Test(timeout=2000) public void testRequestStream_batches() throws InterruptedException { @@ -89,7 +84,7 @@ public void onNext(Payload t) { // be a "slow" consumer try { Thread.sleep(1000); - } catch (InterruptedException e) { } + } catch (InterruptedException ignored) { } System.out.println("Emitted on server: " + emitted.get() + " Received on client: " + received); } @@ -154,9 +149,8 @@ public void testRequestSubscription_batches() throws InterruptedException { @Test(timeout=2000) public void testRequestChannel_batches_downstream() throws InterruptedException { ControlledSubscriber s = new ControlledSubscriber(); - socketClient.requestChannel( - range(1, 10).map(i -> utf8EncodedPayload(String.valueOf(i), "1000")) - ).subscribe(s); + socketClient.requestChannel(toPublisher(range(1, 10).map(i -> utf8EncodedPayload(String.valueOf(i), "1000")))) + .subscribe(s); // if flatMap is being used, then each of the 10 streams will emit at least 128 (default) @@ -189,15 +183,14 @@ public void testRequestChannel_batches_downstream() throws InterruptedException public void testRequestChannel_batches_upstream_echo() throws InterruptedException { ControlledSubscriber s = new ControlledSubscriber(); AtomicInteger emittedClient = new AtomicInteger(); - socketClient.requestChannel( - range(1, 10000) - .doOnNext(n -> emittedClient.incrementAndGet()) - .doOnRequest(r -> System.out.println("CLIENT REQUESTS requestN: " + r)) - .map(i -> { - // metadata to route us to the echo behavior (only actually need - // this in the first payload) - return utf8EncodedPayload(String.valueOf(i), "echo"); - })).subscribe(s); + socketClient.requestChannel(toPublisher(range(1, 10000) + .doOnNext(n -> emittedClient.incrementAndGet()) + .doOnRequest(r -> System.out.println("CLIENT REQUESTS requestN: " + r)) + .map(i -> { + // metadata to route us to the echo behavior (only actually need + // this in the first payload) + return utf8EncodedPayload(String.valueOf(i), "echo"); + }))).subscribe(s); assertEquals(0, s.received.get()); assertEquals(0, emitted.get()); @@ -225,15 +218,14 @@ public void testRequestChannel_batches_upstream_echo() throws InterruptedExcepti public void testRequestChannel_batches_upstream_decoupled() throws InterruptedException { ControlledSubscriber s = new ControlledSubscriber(); AtomicInteger emittedClient = new AtomicInteger(); - socketClient.requestChannel( - range(1, 10000) + socketClient.requestChannel(toPublisher(range(1, 10000) .doOnNext(n -> emittedClient.incrementAndGet()) .doOnRequest(r -> System.out.println("CLIENT REQUESTS requestN: " + r)) .map(i -> { // metadata to route us to the echo behavior (only actually need this // in the first payload) return utf8EncodedPayload(String.valueOf(i), "decoupled"); - })).subscribe(s); + }))).subscribe(s); assertEquals(0, s.received.get()); assertEquals(0, emitted.get()); @@ -254,7 +246,7 @@ public void testRequestChannel_batches_upstream_decoupled() throws InterruptedEx + " requests and received " + s.received.get() + " responses"); } - private void waitForAsyncValue(AtomicInteger value, int n) throws InterruptedException { + private static void waitForAsyncValue(AtomicInteger value, int n) throws InterruptedException { while (value.get() != n && !Thread.interrupted()) { Thread.sleep(1); } @@ -270,7 +262,7 @@ private static class ControlledSubscriber implements Subscriber { @Override public void onSubscribe(Subscription s) { - this.subscription = s; + subscription = s; } @Override @@ -297,9 +289,9 @@ public void onComplete() { private static TestConnection clientConnection; private static ReactiveSocket socketServer; private static ReactiveSocket socketClient; - private static AtomicInteger emitted = new AtomicInteger(); - private static AtomicInteger numRequests = new AtomicInteger(); - private static AtomicLong requested = new AtomicLong(); + private static final AtomicInteger emitted = new AtomicInteger(); + private static final AtomicInteger numRequests = new AtomicInteger(); + private static final AtomicLong requested = new AtomicLong(); @Before public void init() { @@ -321,22 +313,22 @@ public static void setup() throws InterruptedException { public Publisher handleRequestStream(Payload payload) { String request = byteToString(payload.getData()); System.out.println("responder received requestStream: " + request); - return range(0, Integer.parseInt(request)) + return toPublisher(range(0, Integer.parseInt(request)) .doOnRequest(n -> System.out.println("requested in responder: " + n)) .doOnRequest(r -> requested.addAndGet(r)) .doOnRequest(r -> numRequests.incrementAndGet()) .doOnNext(i -> emitted.incrementAndGet()) - .map(i -> utf8EncodedPayload(String.valueOf(i), null)); + .map(i -> utf8EncodedPayload(String.valueOf(i), null))); } @Override public Publisher handleSubscription(Payload payload) { - return range(0, Integer.MAX_VALUE) + return toPublisher(range(0, Integer.MAX_VALUE) .doOnRequest(n -> System.out.println("requested in responder: " + n)) .doOnRequest(r -> requested.addAndGet(r)) .doOnRequest(r -> numRequests.incrementAndGet()) .doOnNext(i -> emitted.incrementAndGet()) - .map(i -> utf8EncodedPayload(String.valueOf(i), null)); + .map(i -> utf8EncodedPayload(String.valueOf(i), null))); } /** @@ -347,96 +339,101 @@ public Publisher handleChannel(Payload initialPayload, Publisher { + return toPublisher(toObservable(payloads).map(payload -> { String payloadData = byteToString(payload.getData()); - return utf8EncodedPayload(String.valueOf(payloadData) + "_echo", null); + return utf8EncodedPayload(payloadData + "_echo", null); }).doOnRequest(n -> System.out.println(">>> requested in echo responder: " + n)) .doOnRequest(r -> requested.addAndGet(r)) .doOnRequest(r -> numRequests.incrementAndGet()) .doOnError(t -> System.out.println("Error in 'echo' handler: " + t.getMessage())) - .doOnNext(i -> emitted.incrementAndGet()); - } else if (requestMetadata.equals("decoupled")) { + .doOnNext(i -> emitted.incrementAndGet())); + } else { + if ("decoupled".equals(requestMetadata)) { /* * Consume 300 from request and then stop requesting more (but no cancel from responder side) */ - fromPublisher(payloads).doOnNext(payload -> { - String payloadData = byteToString(payload.getData()); - System.out.println("DECOUPLED side-effect of request: " + payloadData); - }).subscribe(new Subscriber() { - - int count=0; - Subscription s; - - @Override - public void onError(Throwable e) { - - } - - @Override - public void onNext(Payload t) { - count++; - if(count == 50) { - s.request(250); + toPublisher(toObservable(payloads).doOnNext(payload -> { + String payloadData = byteToString(payload.getData()); + System.out.println("DECOUPLED side-effect of request: " + payloadData); + })).subscribe(new Subscriber() { + + int count; + Subscription s; + + @Override + public void onError(Throwable e) { + } - } - - @Override - public void onSubscribe(Subscription s) { - this.s = s; - // start with 50 - s.request(50); - } - - @Override - public void onComplete() { - // TODO Auto-generated method stub - - } - - - }); - - return range(1, 1000) - .doOnNext(n -> System.out.println("RESPONDER sending value: " + n)) - .map(i -> { - return utf8EncodedPayload(String.valueOf(i) + "_decoupled", null); - }) - .doOnRequest(n -> System.out.println(">>> requested in decoupled responder: " + n)) - .doOnRequest(r -> requested.addAndGet(r)) - .doOnRequest(r -> numRequests.incrementAndGet()) - .doOnError(t -> System.out.println("Error in 'decoupled' handler: " + t.getMessage())) - .doOnNext(i -> emitted.incrementAndGet()); - } else { - // TODO I want this to be concatMap instead of flatMap but apparently concatMap has a bug - return fromPublisher(payloads).flatMap(payload -> { - String payloadData = byteToString(payload.getData()); - System.out.println("responder handleChannel received payload: " + payloadData); - return range(0, Integer.parseInt(requestMetadata)) - .doOnRequest(n -> System.out.println("requested in responder [" + payloadData + "]: " + n)) - .doOnRequest(r -> requested.addAndGet(r)) - .doOnRequest(r -> numRequests.incrementAndGet()) - .doOnNext(i -> emitted.incrementAndGet()) - .map(i -> utf8EncodedPayload(String.valueOf(i), null)); - }).doOnRequest(n -> System.out.println(">>> response stream request(n) in responder: " + n)); + + @Override + public void onNext(Payload t) { + count++; + if (count == 50) { + s.request(250); + } + } + + @Override + public void onSubscribe(Subscription s) { + this.s = s; + // start with 50 + s.request(50); + } + + @Override + public void onComplete() { + // TODO Auto-generated method stub + + } + + + }); + + return toPublisher(range(1, 1000) + .doOnNext(n -> System.out.println("RESPONDER sending value: " + n)) + .map(i -> { + return utf8EncodedPayload(i + "_decoupled", null); + }) + .doOnRequest(n -> System.out + .println(">>> requested in decoupled responder: " + n)) + .doOnRequest(r -> requested.addAndGet(r)) + .doOnRequest(r -> numRequests.incrementAndGet()) + .doOnError(t -> System.out + .println("Error in 'decoupled' handler: " + t.getMessage())) + .doOnNext(i -> emitted.incrementAndGet())); + } else { + // TODO I want this to be concatMap instead of flatMap but apparently concatMap has a bug + return toPublisher(toObservable(payloads).flatMap(payload -> { + String payloadData = byteToString(payload.getData()); + System.out.println("responder handleChannel received payload: " + payloadData); + return range(0, Integer.parseInt(requestMetadata)) + .doOnRequest(n -> System.out + .println("requested in responder [" + payloadData + "]: " + n)) + .doOnRequest(r -> requested.addAndGet(r)) + .doOnRequest(r -> numRequests.incrementAndGet()) + .doOnNext(i -> emitted.incrementAndGet()) + .map(i -> utf8EncodedPayload(String.valueOf(i), null)); + }).doOnRequest(n -> System.out.println(">>> response stream request(n) in responder: " + n))); + } } } @Override public Publisher handleFireAndForget(Payload payload) { - return error(new RuntimeException("Not Found")); + return toPublisher(error(new RuntimeException("Not Found"))); } @Override public Publisher handleRequestResponse(Payload payload) { - return error(new RuntimeException("Not Found")); + return toPublisher(error(new RuntimeException("Not Found"))); } @Override public Publisher handleMetadataPush(Payload payload) { - return error(new RuntimeException("Not Found")); + return toPublisher(error(new RuntimeException("Not Found"))); } }, LeaseGovernor.UNLIMITED_LEASE_GOVERNOR, Throwable::printStackTrace); diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/LeaseTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/LeaseTest.java index 65c36738e..d513d4a6c 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/LeaseTest.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/LeaseTest.java @@ -20,17 +20,17 @@ import org.junit.Before; import org.junit.Test; import org.reactivestreams.Publisher; -import io.reactivex.subscribers.TestSubscriber; +import rx.observers.TestSubscriber; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static io.reactivesocket.TestUtil.byteToString; -import static io.reactivesocket.TestUtil.utf8EncodedPayload; import static io.reactivesocket.ConnectionSetupPayload.HONOR_LEASE; -import static org.junit.Assert.assertTrue; -import static io.reactivex.Observable.*; +import static io.reactivesocket.TestUtil.*; +import static org.junit.Assert.*; +import static rx.Observable.*; +import static rx.RxReactiveStreams.*; public class LeaseTest { private TestConnection clientConnection; @@ -38,11 +38,11 @@ public class LeaseTest { private ReactiveSocket socketClient; private TestingLeaseGovernor leaseGovernor; - private class TestingLeaseGovernor implements LeaseGovernor { + private static class TestingLeaseGovernor implements LeaseGovernor { private volatile Responder responder; private volatile long ttlExpiration; private volatile int grantedTickets; - private CountDownLatch latch = new CountDownLatch(1); + private final CountDownLatch latch = new CountDownLatch(1); @Override public synchronized void register(Responder responder) { @@ -85,45 +85,39 @@ public void setup() throws InterruptedException { @Override public Publisher handleRequestResponse(Payload payload) { - return just(utf8EncodedPayload("hello world", null)); + return toPublisher(just(utf8EncodedPayload("hello world", null))); } @Override public Publisher handleRequestStream(Payload payload) { - return - range(0, 100) - .map(i -> "hello world " + i) - .map(n -> utf8EncodedPayload(n, null) - ); + return toPublisher(range(0, 100).map(i -> "hello world " + i) + .map(n -> utf8EncodedPayload(n, null))); } @Override public Publisher handleSubscription(Payload payload) { - return interval(1, TimeUnit.MICROSECONDS) - .map(i -> "subscription " + i) - .map(n -> utf8EncodedPayload(n, null)); + return toPublisher(interval(1, TimeUnit.MICROSECONDS) + .map(i -> "subscription " + i) + .map(n -> utf8EncodedPayload(n, null))); } @Override public Publisher handleFireAndForget(Payload payload) { - return empty(); + return toPublisher(empty()); } /** * Use Payload.metadata for routing */ @Override - public Publisher handleChannel( - Payload initialPayload, Publisher inputs - ) { - return fromPublisher(inputs).map(p -> - utf8EncodedPayload(byteToString(p.getData()) + "_echo", null)); + public Publisher handleChannel(Payload initialPayload, Publisher inputs) { + return toPublisher(toObservable(inputs).map(p -> utf8EncodedPayload(byteToString(p.getData()) + + "_echo", null))); } @Override public Publisher handleMetadataPush(Payload payload) { - throw new IllegalStateException( - "TestingLeaseGovernor.handleMetadataPush is not implemented!"); + throw new IllegalStateException("TestingLeaseGovernor.handleMetadataPush is not implemented!"); } }, leaseGovernor, t -> {}); @@ -150,15 +144,13 @@ public void shutdown() { @Test(timeout=2000) public void testWriteWithoutLease() throws InterruptedException { // initially client doesn't have any availability - assertTrue(socketClient.availability() == 0.0); + assertEquals("Unexpected client availability.", 0.0, socketClient.availability(), 0.0); leaseGovernor.latch.await(); - assertTrue(socketClient.availability() == 0.0); + assertEquals("Unexpected client availability.", 0.0, socketClient.availability(), 0.0); // the first call will fail without a valid lease - Publisher response0 = socketClient.requestResponse( - TestUtil.utf8EncodedPayload("hello", null)); - TestSubscriber ts0 = new TestSubscriber<>();; - response0.subscribe(ts0); + Publisher response0 = socketClient.requestResponse(utf8EncodedPayload("hello", null)); + TestSubscriber ts0 = testSubscribe(response0); ts0.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); // send a Lease(10 sec, 1 message), and wait for the availability on the client side @@ -166,20 +158,16 @@ public void testWriteWithoutLease() throws InterruptedException { awaitSocketAvailabilityChange(socketClient, 1.0, 10, TimeUnit.SECONDS); // the second call will succeed - Publisher response1 = socketClient.requestResponse( - TestUtil.utf8EncodedPayload("hello", null)); - TestSubscriber ts1 = new TestSubscriber<>();; - response1.subscribe(ts1); + Publisher response1 = socketClient.requestResponse(utf8EncodedPayload("hello", null)); + TestSubscriber ts1 = testSubscribe(response1); ts1.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); ts1.assertNoErrors(); - ts1.assertValue(TestUtil.utf8EncodedPayload("hello world", null)); + ts1.assertValue(utf8EncodedPayload("hello world", null)); // the client consumed all its ticket, next call will fail // (even though the window is still ok) - Publisher response2 = socketClient.requestResponse( - TestUtil.utf8EncodedPayload("hello", null)); - TestSubscriber ts2 = new TestSubscriber<>(); - response2.subscribe(ts2); + Publisher response2 = socketClient.requestResponse(utf8EncodedPayload("hello", null)); + TestSubscriber ts2 = testSubscribe(response2); ts2.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); ts2.assertError(RuntimeException.class); } @@ -187,9 +175,9 @@ public void testWriteWithoutLease() throws InterruptedException { @Test(timeout=2000) public void testLeaseOverwrite() throws InterruptedException { - assertTrue(socketClient.availability() == 0.0); + assertEquals("Unexpected client availability.", 0.0, socketClient.availability(), 0.0); leaseGovernor.latch.await(); - assertTrue(socketClient.availability() == 0.0); + assertEquals("Unexpected client availability.", 0.0, socketClient.availability(), 0.0); leaseGovernor.distribute(10_000, 100); awaitSocketAvailabilityChange(socketClient, 1.0, 10, TimeUnit.SECONDS); @@ -198,12 +186,8 @@ public void testLeaseOverwrite() throws InterruptedException { awaitSocketAvailabilityChange(socketClient, 0.0, 10, TimeUnit.SECONDS); } - private void awaitSocketAvailabilityChange( - ReactiveSocket socket, - double expected, - long timeout, - TimeUnit unit - ) throws InterruptedException { + private static void awaitSocketAvailabilityChange(ReactiveSocket socket, double expected, long timeout, + TimeUnit unit) throws InterruptedException { long waitTimeMs = 1L; long startTime = System.nanoTime(); long timeoutNanos = TimeUnit.NANOSECONDS.convert(timeout, unit); diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/ReactiveSocketTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/ReactiveSocketTest.java index 4c66e99e7..b23c2d6b5 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/ReactiveSocketTest.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/ReactiveSocketTest.java @@ -17,9 +17,6 @@ import io.reactivesocket.lease.FairLeaseGovernor; import io.reactivesocket.rx.Completable; -import io.reactivex.disposables.Disposable; -import io.reactivex.observables.ConnectableObservable; -import io.reactivex.subscribers.TestSubscriber; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -30,7 +27,10 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import rx.observables.ConnectableObservable; +import rx.observers.TestSubscriber; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,17 +38,12 @@ import static io.reactivesocket.ConnectionSetupPayload.HONOR_LEASE; import static io.reactivesocket.ConnectionSetupPayload.NO_FLAGS; -import static io.reactivesocket.TestUtil.byteToString; -import static io.reactivesocket.TestUtil.utf8EncodedPayload; -import static io.reactivex.Observable.empty; -import static io.reactivex.Observable.error; -import static io.reactivex.Observable.fromPublisher; -import static io.reactivex.Observable.interval; -import static io.reactivex.Observable.just; -import static io.reactivex.Observable.range; +import static io.reactivesocket.TestUtil.*; +import static rx.Observable.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static rx.RxReactiveStreams.*; @RunWith(Theories.class) public class ReactiveSocketTest { @@ -56,14 +51,15 @@ public class ReactiveSocketTest { private TestConnection clientConnection; private ReactiveSocket socketServer; private ReactiveSocket socketClient; - private AtomicBoolean helloSubscriptionRunning = new AtomicBoolean(false); - private AtomicReference lastFireAndForget = new AtomicReference<>(); - private AtomicReference lastMetadataPush = new AtomicReference<>(); - private AtomicReference lastServerError = new AtomicReference<>(); + private final AtomicBoolean helloSubscriptionRunning = new AtomicBoolean(false); + private final AtomicReference lastFireAndForget = new AtomicReference<>(); + private final AtomicReference lastMetadataPush = new AtomicReference<>(); + private final AtomicReference lastServerError = new AtomicReference<>(); private CountDownLatch lastServerErrorCountDown; private CountDownLatch fireAndForgetOrMetadataPush; - public static final @DataPoints int[] setupFlags = {NO_FLAGS, HONOR_LEASE}; + @DataPoints + public static final int[] setupFlags = {NO_FLAGS, HONOR_LEASE}; @Before public void setup() { @@ -81,9 +77,9 @@ public Publisher handleRequestResponse(Payload payload) { System.out.println("********************************************************************************************** requestResponse: " + request); if ("hello".equals(request)) { System.out.println("********************************************************************************************** respond hello"); - return just(utf8EncodedPayload("hello world", null)); + return toPublisher(just(utf8EncodedPayload("hello world", null))); } else { - return error(new RuntimeException("Not Found")); + return toPublisher(error(new RuntimeException("Not Found"))); } } @@ -91,9 +87,10 @@ public Publisher handleRequestResponse(Payload payload) { public Publisher handleRequestStream(Payload payload) { String request = byteToString(payload.getData()); if ("hello".equals(request)) { - return range(0, 100).map(i -> "hello world " + i).map(n -> utf8EncodedPayload(n, null)); + return toPublisher(range(0, 100).map(i -> "hello world " + i) + .map(n -> utf8EncodedPayload(n, null))); } else { - return error(new RuntimeException("Not Found")); + return toPublisher(error(new RuntimeException("Not Found"))); } } @@ -101,14 +98,14 @@ public Publisher handleRequestStream(Payload payload) { public Publisher handleSubscription(Payload payload) { String request = byteToString(payload.getData()); if ("hello".equals(request)) { - return interval(1, TimeUnit.MICROSECONDS) - .onBackpressureDrop() - .doOnSubscribe(s -> helloSubscriptionRunning.set(true)) - .doOnCancel(() -> helloSubscriptionRunning.set(false)) - .map(i -> "subscription " + i) - .map(n -> utf8EncodedPayload(n, null)); + return toPublisher(interval(1, TimeUnit.MICROSECONDS) + .onBackpressureDrop() + .doOnSubscribe(() -> helloSubscriptionRunning.set(true)) + .doOnUnsubscribe(() -> helloSubscriptionRunning.set(false)) + .map(i -> "subscription " + i) + .map(n -> utf8EncodedPayload(n, null))); } else { - return error(new RuntimeException("Not Found")); + return toPublisher(error(new RuntimeException("Not Found"))); } } @@ -118,12 +115,12 @@ public Publisher handleFireAndForget(Payload payload) { String request = byteToString(payload.getData()); lastFireAndForget.set(request); if ("log".equals(request)) { - return empty(); // success + return toPublisher(empty()); // success } else if ("blowup".equals(request)) { throw new RuntimeException("forced blowup to simulate handler error"); } else { lastFireAndForget.set("notFound"); - return error(new RuntimeException("Not Found")); + return toPublisher(error(new RuntimeException("Not Found"))); } } finally { fireAndForgetOrMetadataPush.countDown(); @@ -176,24 +173,18 @@ public Publisher handleMetadataPush(Payload payload) String request = byteToString(payload.getMetadata()); lastMetadataPush.set(request); if ("log".equals(request)) { - return empty(); // success + return toPublisher(empty()); // success } else if ("blowup".equals(request)) { throw new RuntimeException("forced blowup to simulate handler error"); } else { lastMetadataPush.set("notFound"); - return error(new RuntimeException("Not Found")); + return toPublisher(error(new RuntimeException("Not Found"))); } } finally { fireAndForgetOrMetadataPush.countDown(); } } - private Publisher echoChannel(Publisher echo) { - return fromPublisher(echo).map(p -> { - return utf8EncodedPayload(byteToString(p.getData()) + "_echo", null); - }); - } - // }, LeaseGovernor.UNLIMITED_LEASE_GOVERNOR, t -> { }, new FairLeaseGovernor(100, 10L, TimeUnit.SECONDS), t -> { t.printStackTrace(); @@ -331,12 +322,11 @@ public void testRequestResponse(int setupFlag) throws InterruptedException { startSockets(setupFlag); // perform request/response - Publisher response = socketClient.requestResponse(TestUtil.utf8EncodedPayload("hello", null)); - TestSubscriber ts = new TestSubscriber<>(); - response.subscribe(ts); + Publisher response = socketClient.requestResponse(utf8EncodedPayload("hello", null)); + TestSubscriber ts = testSubscribe(response); ts.awaitTerminalEvent(); ts.assertNoErrors(); - ts.assertValue(TestUtil.utf8EncodedPayload("hello world", null)); + ts.assertValue(utf8EncodedPayload("hello world", null)); } @Test(timeout=2000, expected=IllegalStateException.class) @@ -347,7 +337,7 @@ public void testRequestResponsePremature() throws InterruptedException { err -> err.printStackTrace() ); - Publisher response = socketClient.requestResponse(TestUtil.utf8EncodedPayload("hello", null)); + socketClient.requestResponse(utf8EncodedPayload("hello", null)); } @Test(timeout=2000) @@ -356,13 +346,12 @@ public void testRequestStream(int setupFlag) throws InterruptedException { startSockets(setupFlag); // perform request/stream - Publisher response = socketClient.requestStream(TestUtil.utf8EncodedPayload("hello", null)); - TestSubscriber ts = new TestSubscriber<>(); - response.subscribe(ts); + Publisher response = socketClient.requestStream(utf8EncodedPayload("hello", null)); + TestSubscriber ts = testSubscribe(response); ts.awaitTerminalEvent(); ts.assertNoErrors(); - assertEquals(100, ts.values().size()); - assertEquals("hello world 99", byteToString(ts.values().get(99).getData())); + assertEquals(100, ts.getOnNextEvents().size()); + assertEquals("hello world 99", byteToString(ts.getOnNextEvents().get(99).getData())); } @Test(timeout=4000) @@ -371,28 +360,27 @@ public void testRequestSubscription(int setupFlag) throws InterruptedException { startSockets(setupFlag); // perform request/subscription - Publisher response = socketClient.requestSubscription(TestUtil.utf8EncodedPayload("hello", null)); + Publisher response = socketClient.requestSubscription(utf8EncodedPayload("hello", null)); TestSubscriber ts = new TestSubscriber<>(); TestSubscriber ts2 = new TestSubscriber<>(); - ConnectableObservable published = fromPublisher(response).publish(); + ConnectableObservable published = toObservable(response).publish(); published.take(10).subscribe(ts); published.subscribe(ts2); - Disposable subscription = published.connect(); + rx.Subscription subscription = published.connect(); // ts completed due to take ts.awaitTerminalEvent(); ts.assertNoErrors(); - ts.assertComplete(); // ts2 should never complete ts2.assertNoErrors(); - ts2.assertNotTerminated(); + ts2.assertNoTerminalEvent(); // assert it is running still assertTrue(helloSubscriptionRunning.get()); // shut down the work - subscription.dispose(); + subscription.unsubscribe(); // wait for up to 2 seconds for the async CANCEL to occur (it sends a message up) for (int i = 0; i < 20; i++) { @@ -402,13 +390,14 @@ public void testRequestSubscription(int setupFlag) throws InterruptedException { try { Thread.sleep(100); } catch (InterruptedException e) { + e.printStackTrace(); } } // and then stopped after unsubscribing assertFalse(helloSubscriptionRunning.get()); - assertEquals(10, ts.values().size()); - assertEquals("subscription 9", byteToString(ts.values().get(9).getData())); + assertEquals(10, ts.getOnNextEvents().size()); + assertEquals("subscription 9", byteToString(ts.getOnNextEvents().get(9).getData())); } @Test(timeout=2000) @@ -418,13 +407,11 @@ public void testFireAndForgetSuccess(int setupFlag) throws InterruptedException // perform request/response - Publisher response = socketClient.fireAndForget(TestUtil.utf8EncodedPayload("log", null)); - TestSubscriber ts = new TestSubscriber<>(); - response.subscribe(ts); + Publisher response = socketClient.fireAndForget(utf8EncodedPayload("log", null)); + TestSubscriber ts = testSubscribe(response); // these only test client side since this is fireAndForgetOrMetadataPush ts.awaitTerminalEvent(); ts.assertNoErrors(); - ts.assertComplete(); // this waits for server-side fireAndForgetOrMetadataPush.await(); assertEquals("log", lastFireAndForget.get()); @@ -436,13 +423,11 @@ public void testFireAndForgetServerSideErrorNotFound(int setupFlag) throws Inter startSockets(setupFlag); // perform request/response - Publisher response = socketClient.fireAndForget(TestUtil.utf8EncodedPayload("unknown", null)); - TestSubscriber ts = new TestSubscriber<>(); - response.subscribe(ts); + Publisher response = socketClient.fireAndForget(utf8EncodedPayload("unknown", null)); + TestSubscriber ts = testSubscribe(response); // these only test client side since this is fireAndForgetOrMetadataPush ts.awaitTerminalEvent(); ts.assertNoErrors();// client-side won't see an error - ts.assertComplete(); // this waits for server-side fireAndForgetOrMetadataPush.await(); assertEquals("notFound", lastFireAndForget.get()); @@ -454,13 +439,11 @@ public void testFireAndForgetServerSideErrorHandlerBlowup(int setupFlag) throws startSockets(setupFlag); // perform request/response - Publisher response = socketClient.fireAndForget(TestUtil.utf8EncodedPayload("blowup", null)); - TestSubscriber ts = new TestSubscriber<>(); - response.subscribe(ts); + Publisher response = socketClient.fireAndForget(utf8EncodedPayload("blowup", null)); + TestSubscriber ts = testSubscribe(response); // these only test client side since this is fireAndForgetOrMetadataPush ts.awaitTerminalEvent(); ts.assertNoErrors();// client-side won't see an error - ts.assertComplete(); // this waits for server-side fireAndForgetOrMetadataPush.await(); assertEquals("blowup", lastFireAndForget.get()); @@ -473,34 +456,32 @@ public void testFireAndForgetServerSideErrorHandlerBlowup(int setupFlag) throws public void testRequestChannelEcho(int setupFlag) throws InterruptedException { startSockets(setupFlag); - Publisher inputs = just( - TestUtil.utf8EncodedPayload("1", "echo"), - TestUtil.utf8EncodedPayload("2", "echo") - ); + Publisher inputs = toPublisher(just(utf8EncodedPayload("1", "echo"), utf8EncodedPayload("2", "echo"))); Publisher outputs = socketClient.requestChannel(inputs); - TestSubscriber ts = new TestSubscriber<>(); - outputs.subscribe(ts); + TestSubscriber ts = testSubscribe(outputs); ts.awaitTerminalEvent(); ts.assertNoErrors(); - assertEquals(2, ts.values().size()); - assertEquals("1_echo", byteToString(ts.values().get(0).getData())); - assertEquals("2_echo", byteToString(ts.values().get(1).getData())); + assertEquals(2, ts.getOnNextEvents().size()); + assertEquals("1_echo", byteToString(ts.getOnNextEvents().get(0).getData())); + assertEquals("2_echo", byteToString(ts.getOnNextEvents().get(1).getData())); } + + @Test(timeout=2000) @Theory public void testRequestChannelNotFound(int setupFlag) throws InterruptedException { startSockets(setupFlag); - Publisher requestStream = just(TestUtil.utf8EncodedPayload(null, "someChannel")); + Publisher requestStream = toPublisher(just(utf8EncodedPayload(null, "someChannel"))); Publisher response = socketClient.requestChannel(requestStream); - TestSubscriber ts = new TestSubscriber<>(); - response.subscribe(ts); + TestSubscriber ts = testSubscribe(response); ts.awaitTerminalEvent(); - ts.assertTerminated(); - ts.assertNotComplete(); + ts.assertNotCompleted(); ts.assertNoValues(); - ts.assertErrorMessage("Not Found"); + List onErrorEvents = ts.getOnErrorEvents(); + assertEquals("Unexpected onErrorEvents count.", 1, onErrorEvents.size()); + assertEquals("Unexpected error message.", "Not Found", onErrorEvents.get(0).getMessage()); } @Test(timeout=2000) @@ -510,12 +491,11 @@ public void testMetadataPushSuccess(int setupFlag) throws InterruptedException { // perform request/response - Publisher response = socketClient.metadataPush(TestUtil.utf8EncodedPayload(null, "log")); - TestSubscriber ts = new TestSubscriber<>(); - response.subscribe(ts); + Publisher response = socketClient.metadataPush(utf8EncodedPayload(null, "log")); + TestSubscriber ts = testSubscribe(response); ts.awaitTerminalEvent(); ts.assertNoErrors(); - ts.assertComplete(); + ts.assertCompleted(); // this waits for server-side fireAndForgetOrMetadataPush.await(); assertEquals("log", lastMetadataPush.get()); @@ -528,11 +508,10 @@ public void testMetadataPushServerSideErrorNotFound(int setupFlag) throws Interr // perform request/response Publisher response = socketClient.metadataPush(TestUtil.utf8EncodedPayload(null, "unknown")); - TestSubscriber ts = new TestSubscriber<>(); - response.subscribe(ts); + TestSubscriber ts = testSubscribe(response); ts.awaitTerminalEvent(); ts.assertNoErrors();// client-side won't see an error - ts.assertComplete(); + ts.assertCompleted(); // this waits for server-side fireAndForgetOrMetadataPush.await(); assertEquals("notFound", lastMetadataPush.get()); @@ -544,12 +523,11 @@ public void testMetadataPushServerSideErrorHandlerBlowup(int setupFlag) throws I startSockets(setupFlag); // perform request/response - Publisher response = socketClient.metadataPush(TestUtil.utf8EncodedPayload(null, "blowup")); - TestSubscriber ts = new TestSubscriber<>(); - response.subscribe(ts); + Publisher response = socketClient.metadataPush(utf8EncodedPayload(null, "blowup")); + TestSubscriber ts = testSubscribe(response); ts.awaitTerminalEvent(); ts.assertNoErrors();// client-side won't see an error - ts.assertComplete(); + ts.assertCompleted(); // this waits for server-side fireAndForgetOrMetadataPush.await(); assertEquals("blowup", lastMetadataPush.get()); @@ -562,7 +540,7 @@ public void testMetadataPushServerSideErrorHandlerBlowup(int setupFlag) throws I public void testServerRequestResponse(int setupFlag) throws InterruptedException { startSockets(setupFlag, new RequestHandler.Builder() .withRequestResponse(payload -> { - return just(utf8EncodedPayload("hello world from client", null)); + return toPublisher(just(utf8EncodedPayload("hello world from client", null))); }).build()); CountDownLatch latch = new CountDownLatch(1); @@ -571,13 +549,10 @@ public void testServerRequestResponse(int setupFlag) throws InterruptedException }); latch.await(); - Publisher response = socketServer.requestResponse(TestUtil.utf8EncodedPayload("hello", null)); - TestSubscriber ts = new TestSubscriber<>(); - response.subscribe(ts); + Publisher response = socketServer.requestResponse(utf8EncodedPayload("hello", null)); + TestSubscriber ts = testSubscribe(response); ts.awaitTerminalEvent(); ts.assertNoErrors(); ts.assertValue(TestUtil.utf8EncodedPayload("hello world from client", null)); } - - } diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/SerializedEventBus.java b/reactivesocket-core/src/test/java/io/reactivesocket/SerializedEventBus.java index 01018b9ee..b9c92cc3c 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/SerializedEventBus.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/SerializedEventBus.java @@ -18,9 +18,10 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; +import io.reactivesocket.rx.Disposable; import io.reactivesocket.rx.Observer; -import io.reactivex.subjects.PublishSubject; -import io.reactivex.subjects.Subject; +import rx.subjects.PublishSubject; +import rx.subjects.Subject; /** * Multicast eventbus that serializes incoming events. @@ -66,7 +67,7 @@ public void onComplete() { } @Override - public void onSubscribe(io.reactivesocket.rx.Disposable d) { + public void onSubscribe(Disposable d) { // TODO Auto-generated method stub } diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/TestConnection.java b/reactivesocket-core/src/test/java/io/reactivesocket/TestConnection.java index 125b45280..c4c14d8c2 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/TestConnection.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/TestConnection.java @@ -15,17 +15,16 @@ */ package io.reactivesocket; -import static io.reactivex.Observable.*; - -import java.io.IOException; - -import org.reactivestreams.Publisher; - import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Disposable; import io.reactivesocket.rx.Observer; -import io.reactivex.Observable; -import io.reactivex.Scheduler.Worker; -import io.reactivex.schedulers.Schedulers; +import org.reactivestreams.Publisher; +import rx.Observable; +import rx.RxReactiveStreams; +import rx.Scheduler.Worker; +import rx.schedulers.Schedulers; + +import java.io.IOException; public class TestConnection implements DuplexConnection { @@ -34,7 +33,7 @@ public class TestConnection implements DuplexConnection { @Override public void addOutput(Publisher o, Completable callback) { - fromPublisher(o).flatMap(m -> { + RxReactiveStreams.toObservable(o).flatMap(m -> { // no backpressure on a Subject so just firehosing for this test write.send(m); return Observable. empty(); @@ -61,7 +60,7 @@ public io.reactivesocket.rx.Observable getInput() { public void subscribe(Observer o) { toInput.add(o); // we are okay with the race of sending data and cancelling ... since this is "hot" by definition and unsubscribing is a race. - o.onSubscribe(new io.reactivesocket.rx.Disposable() { + o.onSubscribe(new Disposable() { @Override public void dispose() { @@ -107,8 +106,8 @@ public void connectToServerConnection(TestConnection serverConnection, boolean l @Override public void close() throws IOException { - clientThread.dispose(); - serverThread.dispose(); + clientThread.unsubscribe(); + serverThread.unsubscribe(); } } \ No newline at end of file diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/TestUtil.java b/reactivesocket-core/src/test/java/io/reactivesocket/TestUtil.java index 0ea5d7b12..2264df13d 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/TestUtil.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/TestUtil.java @@ -16,11 +16,14 @@ package io.reactivesocket; import org.agrona.MutableDirectBuffer; +import org.reactivestreams.Publisher; +import rx.RxReactiveStreams; +import rx.observers.TestSubscriber; import java.nio.ByteBuffer; import java.nio.charset.Charset; -public class TestUtil +public final class TestUtil { private TestUtil() {} @@ -28,11 +31,13 @@ public static Frame utf8EncodedRequestFrame(final int streamId, final FrameType { return Frame.Request.from(streamId, type, new Payload() { + @Override public ByteBuffer getData() { return byteBufferFromUtf8String(data); } + @Override public ByteBuffer getMetadata() { return Frame.NULL_BYTEBUFFER; @@ -73,10 +78,16 @@ public static void copyFrame(final MutableDirectBuffer dst, final int offset, fi dst.putBytes(offset, frame.getByteBuffer(), frame.offset(), frame.length()); } + public static TestSubscriber testSubscribe(Publisher source) { + TestSubscriber subscriber = new TestSubscriber<>(); + RxReactiveStreams.toObservable(source).subscribe(subscriber); + return subscriber; + } + private static class PayloadImpl implements Payload // some JDK shoutout { - private ByteBuffer data; - private ByteBuffer metadata; + private final ByteBuffer data; + private final ByteBuffer metadata; public PayloadImpl(final String data, final String metadata) { @@ -99,24 +110,30 @@ public PayloadImpl(final String data, final String metadata) } } - public boolean equals(Object obj) - { - System.out.println("equals: " + obj); + public boolean equals(Object obj) { final Payload rhs = (Payload)obj; - - return (TestUtil.byteToString(data).equals(TestUtil.byteToString(rhs.getData()))) && - (TestUtil.byteToString(metadata).equals(TestUtil.byteToString(rhs.getMetadata()))); + return byteToString(data).equals(byteToString(rhs.getData())) && + byteToString(metadata).equals(byteToString(rhs.getMetadata())); } + @Override public ByteBuffer getData() { return data; } + @Override public ByteBuffer getMetadata() { return metadata; } + + @Override + public int hashCode() { + int result = data != null? data.hashCode() : 0; + result = 31 * result + (metadata != null? metadata.hashCode() : 0); + return result; + } } } diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/TestTransportRequestN.java b/reactivesocket-core/src/test/java/io/reactivesocket/TransportRequestNTest.java similarity index 77% rename from reactivesocket-core/src/test/java/io/reactivesocket/TestTransportRequestN.java rename to reactivesocket-core/src/test/java/io/reactivesocket/TransportRequestNTest.java index eb27ebc2c..74641ecc2 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/TestTransportRequestN.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/TransportRequestNTest.java @@ -16,11 +16,12 @@ package io.reactivesocket; import io.reactivesocket.lease.FairLeaseGovernor; -import io.reactivex.subscribers.TestSubscriber; import org.junit.After; import org.junit.Ignore; import org.junit.Test; import org.reactivestreams.Publisher; +import rx.RxReactiveStreams; +import rx.observers.TestSubscriber; import java.io.IOException; import java.util.concurrent.CountDownLatch; @@ -29,18 +30,15 @@ import java.util.concurrent.atomic.AtomicReference; import static io.reactivesocket.TestUtil.utf8EncodedPayload; -import static io.reactivex.Observable.error; -import static io.reactivex.Observable.fromPublisher; -import static io.reactivex.Observable.interval; -import static io.reactivex.Observable.just; -import static io.reactivex.Observable.range; +import static rx.Observable.*; import static org.junit.Assert.fail; +import static rx.RxReactiveStreams.toPublisher; /** * Ensure that request(n) from DuplexConnection "transport" layer is respected. * */ -public class TestTransportRequestN { +public class TransportRequestNTest { @Test(timeout = 3000) public void testRequestStreamWithNFromTransport() throws InterruptedException { @@ -49,7 +47,7 @@ public void testRequestStreamWithNFromTransport() throws InterruptedException { setup(clientConnection, serverConnection); TestSubscriber ts = new TestSubscriber<>(); - fromPublisher(socketClient.requestStream(utf8EncodedPayload("", null))) + RxReactiveStreams.toObservable(socketClient.requestStream(utf8EncodedPayload("", null))) .take(150) .subscribe(ts); @@ -65,11 +63,12 @@ public void testRequestStreamWithNFromTransport() throws InterruptedException { // we should not have received more than 11 (10 + default 1 that is requested) - if (ts.valueCount() > 11) { - fail("Received more (" + ts.valueCount() + ") than transport requested (11)"); + int valueCount = ts.getOnNextEvents().size(); + if (valueCount > 11) { + fail("Received more (" + valueCount + ") than transport requested (11)"); } - ts.cancel(); + ts.unsubscribe(); // since we are async, give time for emission to occur Thread.sleep(500); @@ -86,9 +85,9 @@ public void testRequestChannelDownstreamWithNFromTransport() throws InterruptedE setup(clientConnection, serverConnection); TestSubscriber ts = new TestSubscriber<>(); - fromPublisher(socketClient.requestChannel(just(utf8EncodedPayload("", null)))) - .take(150) - .subscribe(ts); + RxReactiveStreams.toObservable(socketClient.requestStream(utf8EncodedPayload("", null))) + .take(150) + .subscribe(ts); // wait for server to add output if (!serverConnection.awaitSubscription(1000)) { @@ -101,12 +100,12 @@ public void testRequestChannelDownstreamWithNFromTransport() throws InterruptedE Thread.sleep(500); // we should not have received more than 11 (10 + default 1 that is requested) - - if (ts.valueCount() > 11) { - fail("Received more (" + ts.valueCount() + ") than transport requested (11)"); + int valueCount = ts.getOnNextEvents().size(); + if (valueCount > 11) { + fail("Received more (" + valueCount + ") than transport requested (11)"); } - ts.cancel(); + ts.unsubscribe(); // since we are async, give time for emission to occur Thread.sleep(500); @@ -125,9 +124,9 @@ public void testRequestChannelUpstreamWithNFromTransport() throws InterruptedExc setup(clientConnection, serverConnection); TestSubscriber ts = new TestSubscriber<>(); - fromPublisher(socketClient.requestChannel(range(0, 1000).map(i -> utf8EncodedPayload("" + i, null)))) - .take(10) - .subscribe(ts); + RxReactiveStreams.toObservable(socketClient.requestStream(utf8EncodedPayload("", null))) + .take(150) + .subscribe(ts); // wait for server to add output if (!serverConnection.awaitSubscription(1000)) { @@ -141,12 +140,12 @@ public void testRequestChannelUpstreamWithNFromTransport() throws InterruptedExc Thread.sleep(500); // we should not have received more than 11 (10 + default 1 that is requested) - - if (ts.valueCount() > 11) { - fail("Received more (" + ts.valueCount() + ") than transport requested (11)"); + int valueCount = ts.getOnNextEvents().size(); + if (valueCount > 11) { + fail("Received more (" + valueCount + ") than transport requested (11)"); } - ts.cancel(); + ts.unsubscribe(); // since we are async, give time for emission to occur Thread.sleep(500); @@ -164,8 +163,8 @@ public void testRequestChannelUpstreamWithNFromTransport() throws InterruptedExc private TestConnectionWithControlledRequestN clientConnection; private ReactiveSocket socketServer; private ReactiveSocket socketClient; - private AtomicBoolean helloSubscriptionRunning = new AtomicBoolean(false); - private AtomicReference lastServerError = new AtomicReference<>(); + private final AtomicBoolean helloSubscriptionRunning = new AtomicBoolean(false); + private final AtomicReference lastServerError = new AtomicReference<>(); private CountDownLatch lastServerErrorCountDown; public void setup(TestConnectionWithControlledRequestN clientConnection, TestConnectionWithControlledRequestN serverConnection) throws InterruptedException { @@ -176,27 +175,28 @@ public void setup(TestConnectionWithControlledRequestN clientConnection, TestCon @Override public Publisher handleRequestResponse(Payload payload) { - return just(utf8EncodedPayload("request_response", null)); + return toPublisher(just(utf8EncodedPayload("request_response", null))); } @Override public Publisher handleRequestStream(Payload payload) { - return range(0, 10000).map(i -> "stream_response_" + i).map(n -> utf8EncodedPayload(n, null)); + return toPublisher(range(0, 10000).map(i -> "stream_response_" + i) + .map(n -> utf8EncodedPayload(n, null))); } @Override public Publisher handleSubscription(Payload payload) { - return interval(1, TimeUnit.MILLISECONDS) + return toPublisher(interval(1, TimeUnit.MILLISECONDS) .onBackpressureDrop() - .doOnSubscribe(s -> helloSubscriptionRunning.set(true)) - .doOnCancel(() -> helloSubscriptionRunning.set(false)) + .doOnSubscribe(() -> helloSubscriptionRunning.set(true)) + .doOnUnsubscribe(() -> helloSubscriptionRunning.set(false)) .map(i -> "subscription " + i) - .map(n -> utf8EncodedPayload(n, null)); + .map(n -> utf8EncodedPayload(n, null))); } @Override public Publisher handleFireAndForget(Payload payload) { - return error(new RuntimeException("Not Found")); + return toPublisher(error(new RuntimeException("Not Found"))); } /** @@ -204,12 +204,13 @@ public Publisher handleFireAndForget(Payload payload) { */ @Override public Publisher handleChannel(Payload initialPayload, Publisher inputs) { - return range(0, 10000).map(i -> "channel_response_" + i).map(n -> utf8EncodedPayload(n, null)); + return toPublisher(range(0, 10000).map(i -> "channel_response_" + i) + .map(n -> utf8EncodedPayload(n, null))); } @Override public Publisher handleMetadataPush(Payload payload) { - return error(new RuntimeException("Not Found")); + return toPublisher(error(new RuntimeException("Not Found"))); } }, new FairLeaseGovernor(100, 10L, TimeUnit.SECONDS), t -> { diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/internal/ReassemblerTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/internal/ReassemblerTest.java index 0b134d697..693a93538 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/internal/ReassemblerTest.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/internal/ReassemblerTest.java @@ -21,7 +21,10 @@ import io.reactivesocket.TestUtil; import io.reactivesocket.internal.frame.FrameHeaderFlyweight; import io.reactivesocket.internal.frame.PayloadReassembler; -import io.reactivex.subjects.ReplaySubject; +import org.reactivestreams.Subscriber; +import rx.RxReactiveStreams; +import rx.observers.TestSubscriber; +import rx.subjects.ReplaySubject; import org.junit.Test; import java.nio.ByteBuffer; @@ -35,8 +38,9 @@ public class ReassemblerTest @Test public void shouldPassThroughUnfragmentedFrame() { - final ReplaySubject replaySubject = ReplaySubject.create(); - final PayloadReassembler reassembler = PayloadReassembler.with(replaySubject); + TestSubscriber ts = new TestSubscriber<>(); + Subscriber rsSub = RxReactiveStreams.toSubscriber(ts); + final PayloadReassembler reassembler = PayloadReassembler.with(rsSub); final String metadata = "metadata"; final String data = "data"; final ByteBuffer metadataBuffer = TestUtil.byteBufferFromUtf8String(metadata); @@ -44,16 +48,17 @@ public void shouldPassThroughUnfragmentedFrame() reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadataBuffer, dataBuffer, 0)); - assertEquals(1, replaySubject.getValues().length); - assertEquals(data, TestUtil.byteToString(replaySubject.getValue().getData())); - assertEquals(metadata, TestUtil.byteToString(replaySubject.getValue().getMetadata())); + assertEquals(1, ts.getOnNextEvents().size()); + assertEquals(data, TestUtil.byteToString(ts.getOnNextEvents().get(0).getData())); + assertEquals(metadata, TestUtil.byteToString(ts.getOnNextEvents().get(0).getMetadata())); } @Test public void shouldNotPassThroughFragmentedFrameIfStillMoreFollowing() { - final ReplaySubject replaySubject = ReplaySubject.create(); - final PayloadReassembler reassembler = PayloadReassembler.with(replaySubject); + TestSubscriber ts = new TestSubscriber<>(); + Subscriber rsSub = RxReactiveStreams.toSubscriber(ts); + final PayloadReassembler reassembler = PayloadReassembler.with(rsSub); final String metadata = "metadata"; final String data = "data"; final ByteBuffer metadataBuffer = TestUtil.byteBufferFromUtf8String(metadata); @@ -61,14 +66,15 @@ public void shouldNotPassThroughFragmentedFrameIfStillMoreFollowing() reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadataBuffer, dataBuffer, FrameHeaderFlyweight.FLAGS_RESPONSE_F)); - assertEquals(0, replaySubject.getValues().length); + assertEquals(0, ts.getOnNextEvents().size()); } @Test public void shouldReassembleTwoFramesWithFragmentedDataAndMetadata() { - final ReplaySubject replaySubject = ReplaySubject.create(); - final PayloadReassembler reassembler = PayloadReassembler.with(replaySubject); + TestSubscriber ts = new TestSubscriber<>(); + Subscriber rsSub = RxReactiveStreams.toSubscriber(ts); + final PayloadReassembler reassembler = PayloadReassembler.with(rsSub); final String metadata0 = "metadata0"; final String metadata1 = "md1"; final String metadata = metadata0 + metadata1; @@ -83,16 +89,17 @@ public void shouldReassembleTwoFramesWithFragmentedDataAndMetadata() reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadata0Buffer, data0Buffer, FrameHeaderFlyweight.FLAGS_RESPONSE_F)); reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadata1Buffer, data1Buffer, 0)); - assertEquals(1, replaySubject.getValues().length); - assertEquals(data, TestUtil.byteToString(replaySubject.getValue().getData())); - assertEquals(metadata, TestUtil.byteToString(replaySubject.getValue().getMetadata())); + assertEquals(1, ts.getOnNextEvents().size()); + assertEquals(data, TestUtil.byteToString(ts.getOnNextEvents().get(0).getData())); + assertEquals(metadata, TestUtil.byteToString(ts.getOnNextEvents().get(0).getMetadata())); } @Test public void shouldReassembleTwoFramesWithFragmentedData() { - final ReplaySubject replaySubject = ReplaySubject.create(); - final PayloadReassembler reassembler = PayloadReassembler.with(replaySubject); + TestSubscriber ts = new TestSubscriber<>(); + Subscriber rsSub = RxReactiveStreams.toSubscriber(ts); + final PayloadReassembler reassembler = PayloadReassembler.with(rsSub); final String metadata = "metadata"; final String data0 = "data0"; final String data1 = "d1"; @@ -104,16 +111,17 @@ public void shouldReassembleTwoFramesWithFragmentedData() reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadataBuffer, data0Buffer, FrameHeaderFlyweight.FLAGS_RESPONSE_F)); reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, Frame.NULL_BYTEBUFFER, data1Buffer, 0)); - assertEquals(1, replaySubject.getValues().length); - assertEquals(data, TestUtil.byteToString(replaySubject.getValue().getData())); - assertEquals(metadata, TestUtil.byteToString(replaySubject.getValue().getMetadata())); + assertEquals(1, ts.getOnNextEvents().size()); + assertEquals(data, TestUtil.byteToString(ts.getOnNextEvents().get(0).getData())); + assertEquals(metadata, TestUtil.byteToString(ts.getOnNextEvents().get(0).getMetadata())); } @Test public void shouldReassembleTwoFramesWithFragmentedMetadata() { - final ReplaySubject replaySubject = ReplaySubject.create(); - final PayloadReassembler reassembler = PayloadReassembler.with(replaySubject); + TestSubscriber ts = new TestSubscriber<>(); + Subscriber rsSub = RxReactiveStreams.toSubscriber(ts); + final PayloadReassembler reassembler = PayloadReassembler.with(rsSub); final String metadata0 = "metadata0"; final String metadata1 = "md1"; final String metadata = metadata0 + metadata1; @@ -125,16 +133,17 @@ public void shouldReassembleTwoFramesWithFragmentedMetadata() reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadata0Buffer, dataBuffer, FrameHeaderFlyweight.FLAGS_RESPONSE_F)); reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadata1Buffer, Frame.NULL_BYTEBUFFER, 0)); - assertEquals(1, replaySubject.getValues().length); - assertEquals(data, TestUtil.byteToString(replaySubject.getValue().getData())); - assertEquals(metadata, TestUtil.byteToString(replaySubject.getValue().getMetadata())); + assertEquals(1, ts.getOnNextEvents().size()); + assertEquals(data, TestUtil.byteToString(ts.getOnNextEvents().get(0).getData())); + assertEquals(metadata, TestUtil.byteToString(ts.getOnNextEvents().get(0).getMetadata())); } @Test public void shouldReassembleTwoFramesWithFragmentedDataAndMetadataWithMoreThanTwoFragments() { - final ReplaySubject replaySubject = ReplaySubject.create(); - final PayloadReassembler reassembler = PayloadReassembler.with(replaySubject); + TestSubscriber ts = new TestSubscriber<>(); + Subscriber rsSub = RxReactiveStreams.toSubscriber(ts); + final PayloadReassembler reassembler = PayloadReassembler.with(rsSub); final String metadata0 = "metadata0"; final String metadata1 = "md1"; final String metadata = metadata0 + metadata1; @@ -152,9 +161,8 @@ public void shouldReassembleTwoFramesWithFragmentedDataAndMetadataWithMoreThanTw reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadata1Buffer, data1Buffer, FrameHeaderFlyweight.FLAGS_RESPONSE_F)); reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, Frame.NULL_BYTEBUFFER, data2Buffer, 0)); - assertEquals(1, replaySubject.getValues().length); - assertEquals(data, TestUtil.byteToString(replaySubject.getValue().getData())); - assertEquals(metadata, TestUtil.byteToString(replaySubject.getValue().getMetadata())); + assertEquals(1, ts.getOnNextEvents().size()); + assertEquals(data, TestUtil.byteToString(ts.getOnNextEvents().get(0).getData())); + assertEquals(metadata, TestUtil.byteToString(ts.getOnNextEvents().get(0).getMetadata())); } - } diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/internal/RequesterTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/internal/RequesterTest.java index 56a680218..96eaa6c24 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/internal/RequesterTest.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/internal/RequesterTest.java @@ -15,29 +15,23 @@ */ package io.reactivesocket.internal; -import static io.reactivesocket.TestUtil.*; -import static org.junit.Assert.*; -import static io.reactivesocket.ConnectionSetupPayload.NO_FLAGS; -import static io.reactivex.Observable.*; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; - -import org.junit.Test; - -import io.reactivesocket.ConnectionSetupPayload; import io.reactivesocket.Frame; import io.reactivesocket.FrameType; import io.reactivesocket.LatchedCompletable; import io.reactivesocket.Payload; import io.reactivesocket.TestConnection; -import io.reactivesocket.rx.Completable; -import io.reactivex.subscribers.TestSubscriber; -import io.reactivex.Observable; -import io.reactivex.subjects.ReplaySubject; +import org.junit.Test; +import rx.observers.TestSubscriber; +import rx.subjects.ReplaySubject; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static io.reactivesocket.ConnectionSetupPayload.*; +import static io.reactivesocket.TestUtil.*; +import static org.junit.Assert.*; +import static rx.RxReactiveStreams.*; public class RequesterTest { @@ -48,11 +42,10 @@ public void testRequestResponseSuccess() throws InterruptedException { TestConnection conn = establishConnection(); ReplaySubject requests = captureRequests(conn); LatchedCompletable rc = new LatchedCompletable(1); - Requester p = Requester.createClientRequester(conn, ConnectionSetupPayload.create("UTF-8", "UTF-8", NO_FLAGS), ERROR_HANDLER, rc); + Requester p = Requester.createClientRequester(conn, create("UTF-8", "UTF-8", NO_FLAGS), ERROR_HANDLER, rc); rc.await(); - TestSubscriber ts = new TestSubscriber<>(); - p.requestResponse(utf8EncodedPayload("hello", null)).subscribe(ts); + TestSubscriber ts = testSubscribe(p.requestResponse(utf8EncodedPayload("hello", null))); ts.assertNoErrors(); assertEquals(2, requests.getValues().length); @@ -73,7 +66,7 @@ public void testRequestResponseSuccess() throws InterruptedException { ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); ts.assertValue(utf8EncodedPayload("world", null)); - ts.assertComplete(); + ts.assertCompleted(); } @Test(timeout=2000) @@ -81,11 +74,10 @@ public void testRequestResponseError() throws InterruptedException { TestConnection conn = establishConnection(); ReplaySubject requests = captureRequests(conn); LatchedCompletable rc = new LatchedCompletable(1); - Requester p = Requester.createClientRequester(conn, ConnectionSetupPayload.create("UTF-8", "UTF-8", NO_FLAGS), ERROR_HANDLER, rc); + Requester p = Requester.createClientRequester(conn, create("UTF-8", "UTF-8", NO_FLAGS), ERROR_HANDLER, rc); rc.await(); - TestSubscriber ts = new TestSubscriber<>(); - p.requestResponse(utf8EncodedPayload("hello", null)).subscribe(ts); + TestSubscriber ts = testSubscribe(p.requestResponse(utf8EncodedPayload("hello", null))); assertEquals(2, requests.getValues().length); List requested = requests.take(2).toList().toBlocking().single(); @@ -103,7 +95,7 @@ public void testRequestResponseError() throws InterruptedException { conn.toInput.send(Frame.Error.from(2, new RuntimeException("Failed"))); ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); ts.assertError(Exception.class); - assertEquals("Failed", ts.errors().get(0).getMessage()); + assertEquals("Failed", ts.getOnErrorEvents().get(0).getMessage()); } @Test(timeout=2000) @@ -111,12 +103,11 @@ public void testRequestResponseCancel() throws InterruptedException { TestConnection conn = establishConnection(); ReplaySubject requests = captureRequests(conn); LatchedCompletable rc = new LatchedCompletable(1); - Requester p = Requester.createClientRequester(conn, ConnectionSetupPayload.create("UTF-8", "UTF-8", NO_FLAGS), ERROR_HANDLER, rc); + Requester p = Requester.createClientRequester(conn, create("UTF-8", "UTF-8", NO_FLAGS), ERROR_HANDLER, rc); rc.await(); - TestSubscriber ts = new TestSubscriber<>(); - p.requestResponse(utf8EncodedPayload("hello", null)).subscribe(ts); - ts.cancel(); + TestSubscriber ts = testSubscribe(p.requestResponse(utf8EncodedPayload("hello", null))); + ts.unsubscribe(); assertEquals(3, requests.getValues().length); List requested = requests.take(3).toList().toBlocking().single(); @@ -136,7 +127,7 @@ public void testRequestResponseCancel() throws InterruptedException { assertEquals("", byteToString(three.getData())); assertEquals(FrameType.CANCEL, three.getType()); - ts.assertNotTerminated(); + ts.assertNoTerminalEvent(); ts.assertNoValues(); } @@ -146,11 +137,12 @@ public void testRequestStreamSuccess() throws InterruptedException { TestConnection conn = establishConnection(); ReplaySubject requests = captureRequests(conn); LatchedCompletable rc = new LatchedCompletable(1); - Requester p = Requester.createClientRequester(conn, ConnectionSetupPayload.create("UTF-8", "UTF-8", NO_FLAGS), ERROR_HANDLER, rc); + Requester p = Requester.createClientRequester(conn, create("UTF-8", "UTF-8", NO_FLAGS), ERROR_HANDLER, rc); rc.await(); TestSubscriber ts = new TestSubscriber<>(); - fromPublisher(p.requestStream(utf8EncodedPayload("hello", null))).map(pl -> byteToString(pl.getData())).subscribe(ts); + toObservable(p.requestStream(utf8EncodedPayload("hello", null))).map(pl -> byteToString(pl.getData())) + .subscribe(ts); assertEquals(2, requests.getValues().length); List requested = requests.take(2).toList().toBlocking().single(); @@ -172,8 +164,8 @@ public void testRequestStreamSuccess() throws InterruptedException { conn.toInput.send(utf8EncodedResponseFrame(2, FrameType.COMPLETE, "")); ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); - ts.assertComplete(); - ts.assertValueSequence(Arrays.asList("hello", "world")); + ts.assertCompleted(); + ts.assertValues("hello", "world"); } // TODO REQUEST_N on initial frame not implemented yet @@ -182,11 +174,12 @@ public void testRequestStreamSuccessTake2AndCancel() throws InterruptedException TestConnection conn = establishConnection(); ReplaySubject requests = captureRequests(conn); LatchedCompletable rc = new LatchedCompletable(1); - Requester p = Requester.createClientRequester(conn, ConnectionSetupPayload.create("UTF-8", "UTF-8", NO_FLAGS), ERROR_HANDLER, rc); + Requester p = Requester.createClientRequester(conn, create("UTF-8", "UTF-8", NO_FLAGS), ERROR_HANDLER, rc); rc.await(); TestSubscriber ts = new TestSubscriber<>(); - Observable.fromPublisher(p.requestStream(utf8EncodedPayload("hello", null))).take(2).map(pl -> byteToString(pl.getData())).subscribe(ts); + toObservable(p.requestStream(utf8EncodedPayload("hello", null))).take(2).map(pl -> byteToString(pl.getData())) + .subscribe(ts); assertEquals(2, requests.getValues().length); List requested = requests.take(2).toList().toBlocking().single(); @@ -207,8 +200,8 @@ public void testRequestStreamSuccessTake2AndCancel() throws InterruptedException conn.toInput.send(utf8EncodedResponseFrame(2, FrameType.NEXT, "world")); ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); - ts.assertComplete(); - ts.assertValueSequence(Arrays.asList("hello", "world")); + ts.assertCompleted(); + ts.assertValues("hello", "world"); assertEquals(3, requests.getValues().length); List requested2 = requests.take(3).toList().toBlocking().single(); @@ -225,11 +218,10 @@ public void testRequestStreamError() throws InterruptedException { TestConnection conn = establishConnection(); ReplaySubject requests = captureRequests(conn); LatchedCompletable rc = new LatchedCompletable(1); - Requester p = Requester.createClientRequester(conn, ConnectionSetupPayload.create("UTF-8", "UTF-8", NO_FLAGS), ERROR_HANDLER, rc); + Requester p = Requester.createClientRequester(conn, create("UTF-8", "UTF-8", NO_FLAGS), ERROR_HANDLER, rc); rc.await(); - TestSubscriber ts = new TestSubscriber<>(); - p.requestStream(utf8EncodedPayload("hello", null)).subscribe(ts); + TestSubscriber ts = testSubscribe(p.requestStream(utf8EncodedPayload("hello", null))); assertEquals(2, requests.getValues().length); List requested = requests.take(2).toList().toBlocking().single(); @@ -252,7 +244,7 @@ public void testRequestStreamError() throws InterruptedException { ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); ts.assertError(Exception.class); ts.assertValue(utf8EncodedPayload("hello", null)); - assertEquals("Failure", ts.errors().get(0).getMessage()); + assertEquals("Failure", ts.getOnErrorEvents().get(0).getMessage()); } // @Test // TODO need to implement test for REQUEST_N behavior as a long stream is consumed @@ -262,11 +254,11 @@ public void testRequestStreamRequestNReplenishing() { /* **********************************************************************************************/ - private TestConnection establishConnection() { + private static TestConnection establishConnection() { return new TestConnection(); } - private ReplaySubject captureRequests(TestConnection conn) { + private static ReplaySubject captureRequests(TestConnection conn) { ReplaySubject rs = ReplaySubject.create(); rs.forEach(i -> System.out.println("capturedRequest => " + i)); conn.write.add(rs::onNext); diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/internal/ResponderTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/internal/ResponderTest.java index 283ba7459..1ab6ed9dd 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/internal/ResponderTest.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/internal/ResponderTest.java @@ -22,13 +22,12 @@ import io.reactivesocket.ReactiveSocket; import io.reactivesocket.RequestHandler; import io.reactivesocket.TestConnection; -import io.reactivex.Observable; -import io.reactivex.schedulers.Schedulers; -import io.reactivex.schedulers.TestScheduler; -import io.reactivex.subjects.ReplaySubject; import org.junit.Test; import org.mockito.Mockito; -import org.reactivestreams.Subscription; +import rx.Observable; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; +import rx.subjects.ReplaySubject; import java.util.List; import java.util.concurrent.TimeUnit; @@ -39,14 +38,11 @@ import static io.reactivesocket.TestUtil.byteToString; import static io.reactivesocket.TestUtil.utf8EncodedPayload; import static io.reactivesocket.TestUtil.utf8EncodedRequestFrame; -import static io.reactivex.Observable.error; -import static io.reactivex.Observable.interval; -import static io.reactivex.Observable.just; -import static io.reactivex.Observable.never; -import static io.reactivex.Observable.range; +import static rx.Observable.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static rx.RxReactiveStreams.*; public class ResponderTest { @@ -61,7 +57,8 @@ public void testRequestResponseSuccess() throws InterruptedException { (setup, rs) -> new RequestHandler.Builder().withRequestResponse( request -> - just(utf8EncodedPayload(byteToString(request.getData()) + " world", null))).build(), + toPublisher(just(utf8EncodedPayload(byteToString(request.getData()) + " world", null)))) + .build(), NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc, @@ -90,7 +87,8 @@ public void testRequestResponseError() throws InterruptedException { TestConnection conn = establishConnection(); LatchedCompletable lc = new LatchedCompletable(1); Responder.createServerResponder(conn, (setup, rs) -> new RequestHandler.Builder() - .withRequestResponse(request -> Observable.error(new Exception("Request Not Found"))).build(), + .withRequestResponse(request -> toPublisher(error(new Exception("Request Not Found")))) + .build(), NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc, reactiveSocket); lc.await(); @@ -113,12 +111,12 @@ public void testRequestResponseCancel() throws InterruptedException { AtomicBoolean unsubscribed = new AtomicBoolean(); Observable delayed = never() .cast(Payload.class) - .doOnCancel(() -> unsubscribed.set(true)); + .doOnUnsubscribe(() -> unsubscribed.set(true)); TestConnection conn = establishConnection(); LatchedCompletable lc = new LatchedCompletable(1); Responder.createServerResponder(conn, (setup, rs) -> new RequestHandler.Builder() - .withRequestResponse(request -> delayed).build(), + .withRequestResponse(request -> toPublisher(delayed)).build(), NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc, reactiveSocket); lc.await(); @@ -142,7 +140,8 @@ public void testRequestStreamSuccess() throws InterruptedException { LatchedCompletable lc = new LatchedCompletable(1); Responder.createServerResponder(conn, (setup, rs) -> new RequestHandler.Builder() .withRequestStream( - request -> range(Integer.parseInt(byteToString(request.getData())), 10).map(i -> utf8EncodedPayload(i + "!", null))).build(), + request -> toPublisher(range(Integer.parseInt(byteToString(request.getData())), 10) + .map(i -> utf8EncodedPayload(i + "!", null)))).build(), NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc, reactiveSocket); lc.await(); @@ -175,9 +174,9 @@ public void testRequestStreamError() throws InterruptedException { TestConnection conn = establishConnection(); LatchedCompletable lc = new LatchedCompletable(1); Responder.createServerResponder(conn, (setup,rs) -> new RequestHandler.Builder() - .withRequestStream(request -> range(Integer.parseInt(byteToString(request.getData())), 3) + .withRequestStream(request -> toPublisher(range(Integer.parseInt(byteToString(request.getData())), 3) .map(i -> utf8EncodedPayload(i + "!", null)) - .concatWith(error(new Exception("Error Occurred!")))).build(), + .concatWith(error(new Exception("Error Occurred!"))))).build(), NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc, reactiveSocket); lc.await(); @@ -211,7 +210,8 @@ public void testRequestStreamCancel() throws InterruptedException { TestScheduler ts = Schedulers.test(); LatchedCompletable lc = new LatchedCompletable(1); Responder.createServerResponder(conn, (setup,rs) -> new RequestHandler.Builder() - .withRequestStream(request -> interval(1000, TimeUnit.MILLISECONDS, ts).map(i -> utf8EncodedPayload(i + "!", null))).build(), + .withRequestStream(request -> toPublisher(interval(1000, TimeUnit.MILLISECONDS, ts).map(i -> utf8EncodedPayload(i + "!", null)))) + .build(), NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc, reactiveSocket); lc.await(); @@ -253,7 +253,8 @@ public void testMultiplexedStreams() throws InterruptedException { TestConnection conn = establishConnection(); LatchedCompletable lc = new LatchedCompletable(1); Responder.createServerResponder(conn, (setup,rs) -> new RequestHandler.Builder() - .withRequestStream(request -> interval(1000, TimeUnit.MILLISECONDS, ts).map(i -> utf8EncodedPayload(i + "_" + byteToString(request.getData()), null))).build(), + .withRequestStream(request -> toPublisher(interval(1000, TimeUnit.MILLISECONDS, ts).map(i -> utf8EncodedPayload(i + "_" + byteToString(request.getData()), null)))) + .build(), NULL_LEASE_GOVERNOR, ERROR_HANDLER, lc, reactiveSocket); lc.await(); @@ -305,43 +306,18 @@ public void testMultiplexedStreams() throws InterruptedException { /* **********************************************************************************************/ - private ReplaySubject captureResponses(TestConnection conn) { + private static ReplaySubject captureResponses(TestConnection conn) { // capture all responses to client ReplaySubject rs = ReplaySubject.create(); conn.write.add(rs::onNext); return rs; } - private TestConnection establishConnection() { + private static TestConnection establishConnection() { return new TestConnection(); } - private org.reactivestreams.Subscriber PROTOCOL_SUBSCRIBER = new org.reactivestreams.Subscriber() { - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Void t) { - - } - - @Override - public void onError(Throwable t) { - t.printStackTrace(); - } - - @Override - public void onComplete() { - - } - - }; - - - private void sendSetupFrame(TestConnection conn) { + private static void sendSetupFrame(TestConnection conn) { // setup conn.toInput.send(Frame.Setup.from(0, 0, 0, "UTF-8", "UTF-8", utf8EncodedPayload("", ""))); } diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/internal/UnicastSubjectTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/internal/UnicastSubjectTest.java index a2ddbd3a8..e4f545f11 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/internal/UnicastSubjectTest.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/internal/UnicastSubjectTest.java @@ -20,9 +20,9 @@ import io.reactivesocket.Frame; import io.reactivesocket.FrameType; import io.reactivesocket.TestUtil; -import io.reactivesocket.internal.UnicastSubject; -import io.reactivex.subscribers.TestSubscriber; +import rx.observers.TestSubscriber; +import static io.reactivesocket.TestUtil.testSubscribe; import static org.junit.Assert.assertTrue; public class UnicastSubjectTest { @@ -31,11 +31,10 @@ public class UnicastSubjectTest { public void testSubscribeReceiveValue() { Frame f = TestUtil.utf8EncodedResponseFrame(1, FrameType.NEXT_COMPLETE, "response"); UnicastSubject us = UnicastSubject.create(); - TestSubscriber ts = new TestSubscriber<>(); - us.subscribe(ts); + TestSubscriber ts = testSubscribe(us); us.onNext(f); ts.assertValue(f); - ts.assertNotTerminated(); + ts.assertNoTerminalEvent(); } @Test(expected = NullPointerException.class) @@ -48,15 +47,12 @@ public void testNullPointerSendingWithoutSubscriber() { @Test public void testIllegalStateIfMultiSubscribe() { UnicastSubject us = UnicastSubject.create(); - TestSubscriber f1 = new TestSubscriber<>(); - us.subscribe(f1); - TestSubscriber f2 = new TestSubscriber<>(); - us.subscribe(f2); - - f1.assertNotTerminated(); - for (Throwable e : f2.errors()) { - assertTrue( IllegalStateException.class.isInstance(e) - || NullPointerException.class.isInstance(e)); + TestSubscriber f1 = testSubscribe(us); + TestSubscriber f2 = testSubscribe(us); + + f1.assertNoTerminalEvent(); + for (Throwable e : f2.getOnErrorEvents()) { + assertTrue( IllegalStateException.class.isInstance(e) || NullPointerException.class.isInstance(e)); } }