From 030cd440d3b5ddbe05c328c5c81084c97529e252 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 21 Jul 2015 21:04:03 -0700 Subject: [PATCH 1/2] API changes --- .../ReactiveSocketServerProtocol.java | 9 ++++++++- .../ReactiveSocketServerProtocolTest.java | 14 +++++++------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/reactivesocket/ReactiveSocketServerProtocol.java b/src/main/java/io/reactivesocket/ReactiveSocketServerProtocol.java index a06a47eac..f57882e0d 100644 --- a/src/main/java/io/reactivesocket/ReactiveSocketServerProtocol.java +++ b/src/main/java/io/reactivesocket/ReactiveSocketServerProtocol.java @@ -29,13 +29,19 @@ public class ReactiveSocketServerProtocol { private Func1> requestResponseHandler; private Func1> requestStreamHandler; - ReactiveSocketServerProtocol( + private ReactiveSocketServerProtocol( Func1> requestResponseHandler, Func1> requestStreamHandler) { this.requestResponseHandler = requestResponseHandler; this.requestStreamHandler = requestStreamHandler; } + public static ReactiveSocketServerProtocol create( + Func1> requestResponseHandler, + Func1> requestStreamHandler) { + return new ReactiveSocketServerProtocol(requestResponseHandler, requestStreamHandler); + } + public Publisher acceptConnection(DuplexConnection ws) { /* state of cancellation subjects during connection */ // TODO consider using the LongObjectHashMap from Agrona for perf improvement @@ -43,6 +49,7 @@ public Publisher acceptConnection(DuplexConnection ws) { final ConcurrentHashMap cancellationObservables = new ConcurrentHashMap<>(); return toPublisher(toObservable(ws.getInput()).flatMap(message -> { + System.out.println("message: " + message); if (message.getMessageType() == MessageType.SUBSCRIBE_REQUEST_RESPONSE) { CancellationToken cancellationToken = CancellationToken.create(); cancellationObservables.put(message.getMessageId(), cancellationToken); diff --git a/src/test/java/io/reactivesocket/ReactiveSocketServerProtocolTest.java b/src/test/java/io/reactivesocket/ReactiveSocketServerProtocolTest.java index cc14c997b..af390c8d3 100644 --- a/src/test/java/io/reactivesocket/ReactiveSocketServerProtocolTest.java +++ b/src/test/java/io/reactivesocket/ReactiveSocketServerProtocolTest.java @@ -35,7 +35,7 @@ public class ReactiveSocketServerProtocolTest { @Test public void testRequestResponseSuccess() { - ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol( + ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create( request -> toPublisher(just(request + " world")), null); @@ -54,7 +54,7 @@ public void testRequestResponseSuccess() { @Test public void testRequestResponseError() { - ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol( + ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create( request -> toPublisher(error(new Exception("Request Not Found"))), null); @@ -78,7 +78,7 @@ public void testRequestResponseCancel() { .cast(String.class) .doOnUnsubscribe(() -> unsubscribed.set(true)); - ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol( + ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create( request -> toPublisher(delayed), null); @@ -97,7 +97,7 @@ public void testRequestResponseCancel() { @Test public void testRequestStreamSuccess() { - ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol( + ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create( null, request -> toPublisher(range(Integer.parseInt(request), 10).map(i -> i + "!"))); @@ -126,7 +126,7 @@ public void testRequestStreamSuccess() { @Test public void testRequestStreamError() { - ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol( + ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create( null, request -> toPublisher(range(Integer.parseInt(request), 3) .map(i -> i + "!") @@ -158,7 +158,7 @@ public void testRequestStreamError() { @Test public void testRequestStreamCancel() { TestScheduler ts = Schedulers.test(); - ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol( + ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create( null, request -> toPublisher(interval(1000, TimeUnit.MILLISECONDS, ts).map(i -> i + "!"))); @@ -196,7 +196,7 @@ public void testRequestStreamCancel() { @Test public void testMultiplexedStreams() { TestScheduler ts = Schedulers.test(); - ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol( + ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create( null, request -> toPublisher(interval(1000, TimeUnit.MILLISECONDS, ts).map(i -> i + "_" + request))); From 36124e6d685078a43322a41dad2525a51cfc996f Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 21 Jul 2015 21:04:27 -0700 Subject: [PATCH 2/2] Message from ByteBuffer Don't rely on .array() as it is optional. --- src/main/java/io/reactivesocket/Message.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/reactivesocket/Message.java b/src/main/java/io/reactivesocket/Message.java index 83f1568bd..2ed9bd097 100644 --- a/src/main/java/io/reactivesocket/Message.java +++ b/src/main/java/io/reactivesocket/Message.java @@ -127,7 +127,10 @@ private void decode() { /** * This is NOT how we want it for real. Just representing the idea for discussion. */ - String data = new String(b.array()); + byte[] copy = new byte[b.limit()]; + b.get(copy); + String data = new String(copy); + System.out.println("RAW: " + data); int separator = data.indexOf('|'); String prefix = data.substring(0, separator); this.type = MessageType.values[Integer.parseInt(prefix.substring(1, data.indexOf(']')))]; @@ -135,4 +138,15 @@ private void decode() { this.message = data.substring(separator + 1, data.length()); } + @Override + public String toString() { + if (type == null) { + try { + decode(); + } catch (Exception e) { + e.printStackTrace(); + } + } + return "Message => ID: " + messageId + " Type: " + type + " Data: " + message; + } }