diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java b/reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java index f993de26f..111550ac4 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java @@ -301,7 +301,7 @@ private void handleFrame(int streamId, FrameType type, Frame frame) { } private void handleMissingResponseProcessor(int streamId, FrameType type, Frame frame) { - if (!streamIdSupplier.isValid(streamId)) { + if (!streamIdSupplier.isBeforeOrCurrent(streamId)) { if (type == FrameType.ERROR) { // message for stream that has never existed, we have a problem with // the overall connection and must tear down diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/StreamIdSupplier.java b/reactivesocket-core/src/main/java/io/reactivesocket/StreamIdSupplier.java index a11f77fb2..931fb9673 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/StreamIdSupplier.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/StreamIdSupplier.java @@ -29,15 +29,15 @@ public synchronized int nextStreamId() { return streamId; } - public synchronized boolean isValid(int streamId) { - return this.streamId < streamId; + public synchronized boolean isBeforeOrCurrent(int streamId) { + return this.streamId >= streamId && streamId > 0; } public static StreamIdSupplier clientSupplier() { - return new StreamIdSupplier(1); + return new StreamIdSupplier(-1); } public static StreamIdSupplier serverSupplier() { - return new StreamIdSupplier(2); + return new StreamIdSupplier(0); } } diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/StreamIdSupplierTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/StreamIdSupplierTest.java new file mode 100644 index 000000000..43b504175 --- /dev/null +++ b/reactivesocket-core/src/test/java/io/reactivesocket/StreamIdSupplierTest.java @@ -0,0 +1,69 @@ +package io.reactivesocket; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class StreamIdSupplierTest { + @Test + public void testClientSequence() { + StreamIdSupplier s = StreamIdSupplier.clientSupplier(); + assertEquals(1, s.nextStreamId()); + assertEquals(3, s.nextStreamId()); + assertEquals(5, s.nextStreamId()); + } + + @Test + public void testServerSequence() { + StreamIdSupplier s = StreamIdSupplier.serverSupplier(); + assertEquals(2, s.nextStreamId()); + assertEquals(4, s.nextStreamId()); + assertEquals(6, s.nextStreamId()); + } + + @Test + public void testClientIsValid() { + StreamIdSupplier s = StreamIdSupplier.clientSupplier(); + + assertFalse(s.isBeforeOrCurrent(1)); + assertFalse(s.isBeforeOrCurrent(3)); + + s.nextStreamId(); + assertTrue(s.isBeforeOrCurrent(1)); + assertFalse(s.isBeforeOrCurrent(3)); + + s.nextStreamId(); + assertTrue(s.isBeforeOrCurrent(3)); + + // negative + assertFalse(s.isBeforeOrCurrent(-1)); + // connection + assertFalse(s.isBeforeOrCurrent(0)); + // server also accepted (checked externally) + assertTrue(s.isBeforeOrCurrent(2)); + } + + @Test + public void testServerIsValid() { + StreamIdSupplier s = StreamIdSupplier.serverSupplier(); + + assertFalse(s.isBeforeOrCurrent(2)); + assertFalse(s.isBeforeOrCurrent(4)); + + s.nextStreamId(); + assertTrue(s.isBeforeOrCurrent(2)); + assertFalse(s.isBeforeOrCurrent(4)); + + s.nextStreamId(); + assertTrue(s.isBeforeOrCurrent(4)); + + // negative + assertFalse(s.isBeforeOrCurrent(-2)); + // connection + assertFalse(s.isBeforeOrCurrent(0)); + // client also accepted (checked externally) + assertTrue(s.isBeforeOrCurrent(1)); + } +}