From b64f1740295e03e2e2398980fb1ba90a92110219 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Wed, 28 Dec 2016 14:28:08 +0000 Subject: [PATCH 1/2] StreamIdSupplier returns 1,3,5... or 2,4,6... --- .../io/reactivesocket/StreamIdSupplier.java | 10 ++- .../reactivesocket/StreamIdSupplierTest.java | 69 +++++++++++++++++++ 2 files changed, 76 insertions(+), 3 deletions(-) create mode 100644 reactivesocket-core/src/test/java/io/reactivesocket/StreamIdSupplierTest.java diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/StreamIdSupplier.java b/reactivesocket-core/src/main/java/io/reactivesocket/StreamIdSupplier.java index a11f77fb2..1bcd57ae6 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/StreamIdSupplier.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/StreamIdSupplier.java @@ -30,14 +30,18 @@ public synchronized int nextStreamId() { } public synchronized boolean isValid(int streamId) { - return this.streamId < streamId; + return (isEven(streamId) == isEven(this.streamId)) && this.streamId >= streamId && streamId > 0; + } + + private boolean isEven(int streamId) { + return streamId % 2 == 0; } public static StreamIdSupplier clientSupplier() { - return new StreamIdSupplier(1); + return new StreamIdSupplier(-1); } public static StreamIdSupplier serverSupplier() { - return new StreamIdSupplier(2); + return new StreamIdSupplier(0); } } diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/StreamIdSupplierTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/StreamIdSupplierTest.java new file mode 100644 index 000000000..4e3c3f317 --- /dev/null +++ b/reactivesocket-core/src/test/java/io/reactivesocket/StreamIdSupplierTest.java @@ -0,0 +1,69 @@ +package io.reactivesocket; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class StreamIdSupplierTest { + @Test + public void testClientSequence() { + StreamIdSupplier s = StreamIdSupplier.clientSupplier(); + assertEquals(1, s.nextStreamId()); + assertEquals(3, s.nextStreamId()); + assertEquals(5, s.nextStreamId()); + } + + @Test + public void testServerSequence() { + StreamIdSupplier s = StreamIdSupplier.serverSupplier(); + assertEquals(2, s.nextStreamId()); + assertEquals(4, s.nextStreamId()); + assertEquals(6, s.nextStreamId()); + } + + @Test + public void testClientIsValid() { + StreamIdSupplier s = StreamIdSupplier.clientSupplier(); + + assertFalse(s.isValid(1)); + assertFalse(s.isValid(3)); + + s.nextStreamId(); + assertTrue(s.isValid(1)); + assertFalse(s.isValid(3)); + + s.nextStreamId(); + assertTrue(s.isValid(3)); + + // negative + assertFalse(s.isValid(-1)); + // connection + assertFalse(s.isValid(0)); + // server + assertFalse(s.isValid(2)); + } + + @Test + public void testServerIsValid() { + StreamIdSupplier s = StreamIdSupplier.serverSupplier(); + + assertFalse(s.isValid(2)); + assertFalse(s.isValid(4)); + + s.nextStreamId(); + assertTrue(s.isValid(2)); + assertFalse(s.isValid(4)); + + s.nextStreamId(); + assertTrue(s.isValid(4)); + + // negative + assertFalse(s.isValid(-2)); + // connection + assertFalse(s.isValid(0)); + // server + assertFalse(s.isValid(1)); + } +} From 92312b63559793c3f09be79d651a050660d57e1f Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Tue, 3 Jan 2017 09:09:13 +0000 Subject: [PATCH 2/2] review comments - don't split client/server --- .../reactivesocket/ClientReactiveSocket.java | 2 +- .../io/reactivesocket/StreamIdSupplier.java | 8 ++--- .../reactivesocket/StreamIdSupplierTest.java | 36 +++++++++---------- 3 files changed, 21 insertions(+), 25 deletions(-) diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java b/reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java index f993de26f..111550ac4 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java @@ -301,7 +301,7 @@ private void handleFrame(int streamId, FrameType type, Frame frame) { } private void handleMissingResponseProcessor(int streamId, FrameType type, Frame frame) { - if (!streamIdSupplier.isValid(streamId)) { + if (!streamIdSupplier.isBeforeOrCurrent(streamId)) { if (type == FrameType.ERROR) { // message for stream that has never existed, we have a problem with // the overall connection and must tear down diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/StreamIdSupplier.java b/reactivesocket-core/src/main/java/io/reactivesocket/StreamIdSupplier.java index 1bcd57ae6..931fb9673 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/StreamIdSupplier.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/StreamIdSupplier.java @@ -29,12 +29,8 @@ public synchronized int nextStreamId() { return streamId; } - public synchronized boolean isValid(int streamId) { - return (isEven(streamId) == isEven(this.streamId)) && this.streamId >= streamId && streamId > 0; - } - - private boolean isEven(int streamId) { - return streamId % 2 == 0; + public synchronized boolean isBeforeOrCurrent(int streamId) { + return this.streamId >= streamId && streamId > 0; } public static StreamIdSupplier clientSupplier() { diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/StreamIdSupplierTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/StreamIdSupplierTest.java index 4e3c3f317..43b504175 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/StreamIdSupplierTest.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/StreamIdSupplierTest.java @@ -27,43 +27,43 @@ public void testServerSequence() { public void testClientIsValid() { StreamIdSupplier s = StreamIdSupplier.clientSupplier(); - assertFalse(s.isValid(1)); - assertFalse(s.isValid(3)); + assertFalse(s.isBeforeOrCurrent(1)); + assertFalse(s.isBeforeOrCurrent(3)); s.nextStreamId(); - assertTrue(s.isValid(1)); - assertFalse(s.isValid(3)); + assertTrue(s.isBeforeOrCurrent(1)); + assertFalse(s.isBeforeOrCurrent(3)); s.nextStreamId(); - assertTrue(s.isValid(3)); + assertTrue(s.isBeforeOrCurrent(3)); // negative - assertFalse(s.isValid(-1)); + assertFalse(s.isBeforeOrCurrent(-1)); // connection - assertFalse(s.isValid(0)); - // server - assertFalse(s.isValid(2)); + assertFalse(s.isBeforeOrCurrent(0)); + // server also accepted (checked externally) + assertTrue(s.isBeforeOrCurrent(2)); } @Test public void testServerIsValid() { StreamIdSupplier s = StreamIdSupplier.serverSupplier(); - assertFalse(s.isValid(2)); - assertFalse(s.isValid(4)); + assertFalse(s.isBeforeOrCurrent(2)); + assertFalse(s.isBeforeOrCurrent(4)); s.nextStreamId(); - assertTrue(s.isValid(2)); - assertFalse(s.isValid(4)); + assertTrue(s.isBeforeOrCurrent(2)); + assertFalse(s.isBeforeOrCurrent(4)); s.nextStreamId(); - assertTrue(s.isValid(4)); + assertTrue(s.isBeforeOrCurrent(4)); // negative - assertFalse(s.isValid(-2)); + assertFalse(s.isBeforeOrCurrent(-2)); // connection - assertFalse(s.isValid(0)); - // server - assertFalse(s.isValid(1)); + assertFalse(s.isBeforeOrCurrent(0)); + // client also accepted (checked externally) + assertTrue(s.isBeforeOrCurrent(1)); } }