diff --git a/README.md b/README.md index dc02dc20f..32e9b89e7 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,17 @@ Please see unit test for examples of this behavior. and `testMaxPayloadJs` in [JetStreamPubTests.cs](src/test/java/io/nats/client/impl/JetStreamPubTests.java) +#### Version 2.16.8 Websocket Support + +As of version 2.16.8 Websocket (`ws` and `wss`) protocols are supported for connecting to the server. +For instance, your server bootstrap url might be `ws://my-nats-host:80` or `wss://my-nats-host:443`. + +Your server must be properly configured for websocket, see the NATS.IO docs +[WebSocket Configuration Example](https://docs.nats.io/running-a-nats-service/configuration/websocket/websocket_conf) +for more information. + +If you use secure websockets (wss), your connection must be securely configured in the same way you would configure a `tls` connection. + #### Version 2.16.0 Consumer Create This release by default will use a new JetStream consumer create API when interacting with nats-server version 2.9.0 or higher. diff --git a/src/main/java/io/nats/client/BaseConsumerContext.java b/src/main/java/io/nats/client/BaseConsumerContext.java index 157fc2902..30735fdd9 100644 --- a/src/main/java/io/nats/client/BaseConsumerContext.java +++ b/src/main/java/io/nats/client/BaseConsumerContext.java @@ -35,7 +35,7 @@ public interface BaseConsumerContext { /** * Read the next message with provided max wait - * @param maxWait duration of max wait + * @param maxWait duration of max wait. Cannot be less than {@value BaseConsumeOptions#MIN_EXPIRES_MILLS} milliseconds. * @return the next message or null if the max wait expires * @throws IOException covers various communication issues with the NATS * server such as timeout or interruption @@ -48,7 +48,7 @@ public interface BaseConsumerContext { /** * Read the next message with provided max wait - * @param maxWaitMillis the max wait value in milliseconds + * @param maxWaitMillis the max wait value in milliseconds. Cannot be less than {@value BaseConsumeOptions#MIN_EXPIRES_MILLS} milliseconds. * @return the next message or null if the max wait expires * @throws IOException covers various communication issues with the NATS * server such as timeout or interruption diff --git a/src/main/java/io/nats/client/impl/NatsConsumerContext.java b/src/main/java/io/nats/client/impl/NatsConsumerContext.java index b04dd23a1..bea6eece4 100644 --- a/src/main/java/io/nats/client/impl/NatsConsumerContext.java +++ b/src/main/java/io/nats/client/impl/NatsConsumerContext.java @@ -140,9 +140,7 @@ public String getConsumerName() { */ @Override public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException { - if (consumerName != null) { - cachedConsumerInfo = streamCtx.jsm.getConsumerInfo(streamCtx.streamName, cachedConsumerInfo.getName()); - } + cachedConsumerInfo = streamCtx.jsm.getConsumerInfo(streamCtx.streamName, consumerName); return cachedConsumerInfo; } diff --git a/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java b/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java index 887e60547..942895506 100644 --- a/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java +++ b/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java @@ -23,7 +23,8 @@ * Implementation of Ordered Consumer Context */ public class NatsOrderedConsumerContext implements OrderedConsumerContext { - NatsConsumerContext impl; + private NatsConsumerContext impl; + NatsOrderedConsumerContext(NatsStreamContext streamContext, OrderedConsumerConfiguration config) { impl = new NatsConsumerContext(streamContext, config); } diff --git a/src/test/java/io/nats/client/impl/JetStreamTestBase.java b/src/test/java/io/nats/client/impl/JetStreamTestBase.java index 7402f6a70..84ca20f97 100644 --- a/src/test/java/io/nats/client/impl/JetStreamTestBase.java +++ b/src/test/java/io/nats/client/impl/JetStreamTestBase.java @@ -79,8 +79,30 @@ public NatsMessage getTestMessage(String replyTo, String sid) { // ---------------------------------------------------------------------------------------------------- // Management // ---------------------------------------------------------------------------------------------------- - public static StreamInfo createMemoryStream(JetStreamManagement jsm, String streamName, String... subjects) - throws IOException, JetStreamApiException { + public static class CreateStreamResult { + public final String stream = stream(); + public final String subject = subject(); + public StreamInfo si; + + public CreateStreamResult info(StreamInfo si) { + this.si = si; + return this; + } + } + + public static CreateStreamResult createMemoryStream(JetStreamManagement jsm) throws IOException, JetStreamApiException { + CreateStreamResult csr = new CreateStreamResult(); + return csr.info(createMemoryStream(jsm, csr.stream, csr.subject)); + } + + public static StreamInfo createMemoryStream(JetStreamManagement jsm, String streamName, String... subjects) throws IOException, JetStreamApiException { + if (streamName == null) { + streamName = stream(); + } + + if (subjects == null || subjects.length == 0) { + subjects = new String[]{subject()}; + } StreamConfiguration sc = StreamConfiguration.builder() .name(streamName) @@ -90,8 +112,13 @@ public static StreamInfo createMemoryStream(JetStreamManagement jsm, String stre return jsm.addStream(sc); } + public static CreateStreamResult createMemoryStream(Connection nc) + throws IOException, JetStreamApiException { + return createMemoryStream(nc.jetStreamManagement()); + } + public static StreamInfo createMemoryStream(Connection nc, String streamName, String... subjects) - throws IOException, JetStreamApiException { + throws IOException, JetStreamApiException { return createMemoryStream(nc.jetStreamManagement(), streamName, subjects); } diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java index e9ce5da53..c40abc67f 100644 --- a/src/test/java/io/nats/client/impl/SimplificationTests.java +++ b/src/test/java/io/nats/client/impl/SimplificationTests.java @@ -45,51 +45,63 @@ public void testStreamContext() throws Exception { assertThrows(JetStreamApiException.class, () -> nc.getStreamContext(STREAM, JetStreamOptions.DEFAULT_JS_OPTIONS)); assertThrows(JetStreamApiException.class, () -> js.getStreamContext(STREAM)); - createMemoryStream(jsm, STREAM, SUBJECT); - StreamContext streamContext = nc.getStreamContext(STREAM); - assertEquals(STREAM, streamContext.getStreamName()); - _testStreamContext(js, streamContext); - - jsm.deleteStream(STREAM); - - createMemoryStream(jsm, STREAM, SUBJECT); - streamContext = js.getStreamContext(STREAM); - assertEquals(STREAM, streamContext.getStreamName()); - _testStreamContext(js, streamContext); + CreateStreamResult csr = createMemoryStream(jsm); + StreamContext streamContext = nc.getStreamContext(csr.stream); + assertEquals(csr.stream, streamContext.getStreamName()); + _testStreamContext(js, csr, streamContext); + + csr = createMemoryStream(jsm); + streamContext = js.getStreamContext(csr.stream); + assertEquals(csr.stream, streamContext.getStreamName()); + _testStreamContext(js, csr, streamContext); }); } - private static void _testStreamContext(JetStream js, StreamContext streamContext) throws IOException, JetStreamApiException { - assertThrows(JetStreamApiException.class, () -> streamContext.getConsumerContext(DURABLE)); - assertThrows(JetStreamApiException.class, () -> streamContext.deleteConsumer(DURABLE)); + private static void _testStreamContext(JetStream js, CreateStreamResult csr, StreamContext streamContext) throws IOException, JetStreamApiException { + String durable = durable(); + assertThrows(JetStreamApiException.class, () -> streamContext.getConsumerContext(durable)); + assertThrows(JetStreamApiException.class, () -> streamContext.deleteConsumer(durable)); - ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(DURABLE).build(); + ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(durable).build(); ConsumerContext consumerContext = streamContext.createOrUpdateConsumer(cc); ConsumerInfo ci = consumerContext.getConsumerInfo(); - assertEquals(STREAM, ci.getStreamName()); - assertEquals(DURABLE, ci.getName()); + assertEquals(csr.stream, ci.getStreamName()); + assertEquals(durable, ci.getName()); - ci = streamContext.getConsumerInfo(DURABLE); + ci = streamContext.getConsumerInfo(durable); assertNotNull(ci); - assertEquals(STREAM, ci.getStreamName()); - assertEquals(DURABLE, ci.getName()); + assertEquals(csr.stream, ci.getStreamName()); + assertEquals(durable, ci.getName()); assertEquals(1, streamContext.getConsumerNames().size()); assertEquals(1, streamContext.getConsumers().size()); - assertNotNull(streamContext.getConsumerContext(DURABLE)); - streamContext.deleteConsumer(DURABLE); + consumerContext = streamContext.getConsumerContext(durable); + assertNotNull(consumerContext); + assertEquals(durable, consumerContext.getConsumerName()); + + ci = consumerContext.getConsumerInfo(); + assertNotNull(ci); + assertEquals(csr.stream, ci.getStreamName()); + assertEquals(durable, ci.getName()); - assertThrows(JetStreamApiException.class, () -> streamContext.getConsumerContext(DURABLE)); - assertThrows(JetStreamApiException.class, () -> streamContext.deleteConsumer(DURABLE)); + ci = consumerContext.getCachedConsumerInfo(); + assertNotNull(ci); + assertEquals(csr.stream, ci.getStreamName()); + assertEquals(durable, ci.getName()); + + streamContext.deleteConsumer(durable); + + assertThrows(JetStreamApiException.class, () -> streamContext.getConsumerContext(durable)); + assertThrows(JetStreamApiException.class, () -> streamContext.deleteConsumer(durable)); // coverage - js.publish(SUBJECT, "one".getBytes()); - js.publish(SUBJECT, "two".getBytes()); - js.publish(SUBJECT, "three".getBytes()); - js.publish(SUBJECT, "four".getBytes()); - js.publish(SUBJECT, "five".getBytes()); - js.publish(SUBJECT, "six".getBytes()); + js.publish(csr.subject, "one".getBytes()); + js.publish(csr.subject, "two".getBytes()); + js.publish(csr.subject, "three".getBytes()); + js.publish(csr.subject, "four".getBytes()); + js.publish(csr.subject, "five".getBytes()); + js.publish(csr.subject, "six".getBytes()); assertTrue(streamContext.deleteMessage(3)); assertTrue(streamContext.deleteMessage(4, true)); @@ -97,13 +109,13 @@ private static void _testStreamContext(JetStream js, StreamContext streamContext MessageInfo mi = streamContext.getMessage(1); assertEquals(1, mi.getSeq()); - mi = streamContext.getFirstMessage(SUBJECT); + mi = streamContext.getFirstMessage(csr.subject); assertEquals(1, mi.getSeq()); - mi = streamContext.getLastMessage(SUBJECT); + mi = streamContext.getLastMessage(csr.subject); assertEquals(6, mi.getSeq()); - mi = streamContext.getNextMessage(3, SUBJECT); + mi = streamContext.getNextMessage(3, csr.subject); assertEquals(5, mi.getSeq()); assertNotNull(streamContext.getStreamInfo()); @@ -112,62 +124,86 @@ private static void _testStreamContext(JetStream js, StreamContext streamContext streamContext.purge(PurgeOptions.builder().sequence(5).build()); assertThrows(JetStreamApiException.class, () -> streamContext.getMessage(1)); - mi = streamContext.getFirstMessage(SUBJECT); + mi = streamContext.getFirstMessage(csr.subject); assertEquals(5, mi.getSeq()); streamContext.purge(); - assertThrows(JetStreamApiException.class, () -> streamContext.getFirstMessage(SUBJECT)); + assertThrows(JetStreamApiException.class, () -> streamContext.getFirstMessage(csr.subject)); } + static int FETCH_EPHEMERAL = 1; + static int FETCH_DURABLE = 2; + static int FETCH_ORDERED = 3; @Test public void testFetch() throws Exception { runInJsServer(this::mustBeAtLeast291, nc -> { - createDefaultTestStream(nc); + CreateStreamResult csr = createMemoryStream(nc); JetStream js = nc.jetStream(); for (int x = 1; x <= 20; x++) { - js.publish(SUBJECT, ("test-fetch-msg-" + x).getBytes()); + js.publish(csr.subject, ("test-fetch-msg-" + x).getBytes()); } - // 1. Different fetch sizes demonstrate expiration behavior + for (int f = FETCH_EPHEMERAL; f <= FETCH_ORDERED; f++) { + // 1. Different fetch sizes demonstrate expiration behavior - // 1A. equal number of messages than the fetch size - _testFetch("1A", nc, 20, 0, 20); + // 1A. equal number of messages than the fetch size + _testFetch("1A", nc, csr, 20, 0, 20, f); - // 1B. more messages than the fetch size - _testFetch("1B", nc, 10, 0, 10); + // 1B. more messages than the fetch size + _testFetch("1B", nc, csr, 10, 0, 10, f); - // 1C. fewer messages than the fetch size - _testFetch("1C", nc, 40, 0, 40); + // 1C. fewer messages than the fetch size + _testFetch("1C", nc, csr, 40, 0, 40, f); - // 1D. simple-consumer-40msgs was created in 1C and has no messages available - _testFetch("1D", nc, 40, 0, 40); + // 1D. simple-consumer-40msgs was created in 1C and has no messages available + _testFetch("1D", nc, csr, 40, 0, 40, f); - // 2. Different max bytes sizes demonstrate expiration behavior - // - each test message is approximately 100 bytes + // 2. Different max bytes sizes demonstrate expiration behavior + // - each test message is approximately 100 bytes - // 2A. max bytes is reached before message count - _testFetch("2A", nc, 0, 750, 20); + // 2A. max bytes is reached before message count + _testFetch("2A", nc, csr, 0, 750, 20, f); - // 2B. fetch size is reached before byte count - _testFetch("2B", nc, 10, 1500, 10); + // 2B. fetch size is reached before byte count + _testFetch("2B", nc, csr, 10, 1500, 10, f); - // 2C. fewer bytes than the byte count - _testFetch("2C", nc, 0, 3000, 40); + if (f > FETCH_EPHEMERAL) { + // 2C. fewer bytes than the byte count + _testFetch("2C", nc, csr, 0, 3000, 40, f); + } + } }); } - private static void _testFetch(String label, Connection nc, int maxMessages, int maxBytes, int testAmount) throws Exception { + private static void _testFetch(String label, Connection nc, CreateStreamResult csr, int maxMessages, int maxBytes, int testAmount, int fetchType) throws Exception { JetStreamManagement jsm = nc.jetStreamManagement(); JetStream js = nc.jetStream(); - String name = generateConsumerName(maxMessages, maxBytes); + StreamContext sc = js.getStreamContext(csr.stream); - // Pre define a consumer - ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(name).build(); - jsm.addOrUpdateConsumer(STREAM, cc); + BaseConsumerContext consumerContext; + if (fetchType == FETCH_ORDERED) { + consumerContext = sc.createOrderedConsumer(new OrderedConsumerConfiguration()); + // coverage + } + else { + // Pre define a consumer + String name = generateConsumerName(maxMessages, maxBytes); + ConsumerConfiguration.Builder builder = ConsumerConfiguration.builder(); + ConsumerConfiguration cc; + if (fetchType == FETCH_DURABLE) { + name = name + "D"; + cc = builder.durable(name).build(); + } + else { + name = name + "E"; + cc = builder.name(name).inactiveThreshold(10_000).build(); + } + jsm.addOrUpdateConsumer(csr.stream, cc); - // Consumer[Context] - ConsumerContext consumerContext = js.getConsumerContext(STREAM, name); + // Consumer[Context] + consumerContext = sc.getConsumerContext(name); + } // Custom consume options FetchConsumeOptions.Builder builder = FetchConsumeOptions.builder().expiresIn(2000); @@ -352,22 +388,32 @@ public void testNext() throws Exception { JetStreamManagement jsm = nc.jetStreamManagement(); JetStream js = nc.jetStream(); - createDefaultTestStream(jsm); - jsPublish(js, SUBJECT, 4); + CreateStreamResult csr = createMemoryStream(jsm); + jsPublish(js, csr.subject, 4); + + String name = name(); // Pre define a consumer - ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(NAME).build(); - jsm.addOrUpdateConsumer(STREAM, cc); + ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(name).build(); + jsm.addOrUpdateConsumer(csr.stream, cc); // Consumer[Context] - ConsumerContext consumerContext = js.getConsumerContext(STREAM, NAME); - - assertThrows(IllegalArgumentException.class, () -> consumerContext.next(1)); + ConsumerContext consumerContext = js.getConsumerContext(csr.stream, name); + assertThrows(IllegalArgumentException.class, () -> consumerContext.next(1)); // max wait too small assertNotNull(consumerContext.next(1000)); assertNotNull(consumerContext.next(Duration.ofMillis(1000))); assertNotNull(consumerContext.next(null)); assertNotNull(consumerContext.next()); assertNull(consumerContext.next(1000)); + + StreamContext sc = js.getStreamContext(csr.stream); + OrderedConsumerContext occ = sc.createOrderedConsumer(new OrderedConsumerConfiguration()); + assertThrows(IllegalArgumentException.class, () -> occ.next(1)); // max wait too small + assertNotNull(occ.next(1000)); + assertNotNull(occ.next(Duration.ofMillis(1000))); + assertNotNull(occ.next(null)); + assertNotNull(occ.next()); + assertNull(occ.next(1000)); }); } @@ -376,38 +422,38 @@ public void testCoverage() throws Exception { runInJsServer(this::mustBeAtLeast291, nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); - createDefaultTestStream(jsm); + CreateStreamResult csr = createMemoryStream(jsm); JetStream js = nc.jetStream(); // Pre define a consumer - jsm.addOrUpdateConsumer(STREAM, ConsumerConfiguration.builder().durable(name(1)).build()); - jsm.addOrUpdateConsumer(STREAM, ConsumerConfiguration.builder().durable(name(2)).build()); - jsm.addOrUpdateConsumer(STREAM, ConsumerConfiguration.builder().durable(name(3)).build()); - jsm.addOrUpdateConsumer(STREAM, ConsumerConfiguration.builder().durable(name(4)).build()); + jsm.addOrUpdateConsumer(csr.stream, ConsumerConfiguration.builder().durable(name(1)).build()); + jsm.addOrUpdateConsumer(csr.stream, ConsumerConfiguration.builder().durable(name(2)).build()); + jsm.addOrUpdateConsumer(csr.stream, ConsumerConfiguration.builder().durable(name(3)).build()); + jsm.addOrUpdateConsumer(csr.stream, ConsumerConfiguration.builder().durable(name(4)).build()); // Stream[Context] - StreamContext sctx1 = nc.getStreamContext(STREAM); - nc.getStreamContext(STREAM, JetStreamOptions.DEFAULT_JS_OPTIONS); - js.getStreamContext(STREAM); + StreamContext sctx1 = nc.getStreamContext(csr.stream); + nc.getStreamContext(csr.stream, JetStreamOptions.DEFAULT_JS_OPTIONS); + js.getStreamContext(csr.stream); // Consumer[Context] - ConsumerContext cctx1 = nc.getConsumerContext(STREAM, name(1)); - ConsumerContext cctx2 = nc.getConsumerContext(STREAM, name(2), JetStreamOptions.DEFAULT_JS_OPTIONS); - ConsumerContext cctx3 = js.getConsumerContext(STREAM, name(3)); + ConsumerContext cctx1 = nc.getConsumerContext(csr.stream, name(1)); + ConsumerContext cctx2 = nc.getConsumerContext(csr.stream, name(2), JetStreamOptions.DEFAULT_JS_OPTIONS); + ConsumerContext cctx3 = js.getConsumerContext(csr.stream, name(3)); ConsumerContext cctx4 = sctx1.getConsumerContext(name(4)); ConsumerContext cctx5 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(name(5)).build()); ConsumerContext cctx6 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(name(6)).build()); - closeConsumer(cctx1.iterate(), name(1), true); - closeConsumer(cctx2.iterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), name(2), true); - closeConsumer(cctx3.consume(m -> {}), name(3), true); - closeConsumer(cctx4.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS, m -> {}), name(4), true); - closeConsumer(cctx5.fetchMessages(1), name(5), false); - closeConsumer(cctx6.fetchBytes(1000), name(6), false); + after(cctx1.iterate(), name(1), true); + after(cctx2.iterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), name(2), true); + after(cctx3.consume(m -> {}), name(3), true); + after(cctx4.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS, m -> {}), name(4), true); + after(cctx5.fetchMessages(1), name(5), false); + after(cctx6.fetchBytes(1000), name(6), false); }); } - private void closeConsumer(MessageConsumer con, String name, boolean doStop) throws Exception { + private void after(MessageConsumer con, String name, boolean doStop) throws Exception { ConsumerInfo ci = con.getConsumerInfo(); assertEquals(name, ci.getName()); if (doStop) { @@ -547,33 +593,39 @@ protected Boolean beforeQueueProcessorImpl(NatsMessage msg) { } @Test - public void testOrderedActives() throws Exception { + public void testOrderedBehaviors() throws Exception { runInJsServer(this::mustBeAtLeast291, nc -> { // Setup JetStream js = nc.jetStream(); JetStreamManagement jsm = nc.jetStreamManagement(); - String stream = stream("soa"); - String subject = subject("uoa"); - createMemoryStream(jsm, stream, subject); - - StreamContext sc = js.getStreamContext(stream); - - jsPublish(js, subject, 101, 6); - - OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(subject); // Get this in place before subscriptions are made ((NatsJetStream)js)._pullOrderedMessageManagerFactory = OrderedPullNextTestDropSimulator::new; - testOrderedActiveNext(sc, occ); + + CreateStreamResult csr = createMemoryStream(jsm); + StreamContext sc = js.getStreamContext(csr.stream); + jsPublish(js, csr.subject, 101, 6); + testOrderedBehaviorNext(sc, new OrderedConsumerConfiguration().filterSubject(csr.subject)); + try { jsm.deleteStream(csr.stream); } catch (Exception ignore) {}; // Get this in place before subscriptions are made ((NatsJetStream)js)._pullOrderedMessageManagerFactory = OrderedPullTestDropSimulator::new; - testOrderedActiveFetch(sc, occ); - testOrderedActiveIterable(sc, occ); + + csr = createMemoryStream(jsm); + sc = js.getStreamContext(csr.stream); + jsPublish(js, csr.subject, 101, 6); + testOrderedBehaviorFetch(sc, new OrderedConsumerConfiguration().filterSubject(csr.subject)); + try { jsm.deleteStream(csr.stream); } catch (Exception ignore) {}; + + csr = createMemoryStream(jsm); + sc = js.getStreamContext(csr.stream); + jsPublish(js, csr.subject, 101, 6); + testOrderedBehaviorIterable(sc, new OrderedConsumerConfiguration().filterSubject(csr.subject)); + try { jsm.deleteStream(csr.stream); } catch (Exception ignore) {}; }); } - private static void testOrderedActiveNext(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception { + private static void testOrderedBehaviorNext(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception { OrderedConsumerContext ctx = sc.createOrderedConsumer(occ); // Loop through the messages to make sure I get stream sequence 1 to 6 int expectedStreamSeq = 1; @@ -587,7 +639,7 @@ private static void testOrderedActiveNext(StreamContext sc, OrderedConsumerConfi } } - private static void testOrderedActiveFetch(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception { + private static void testOrderedBehaviorFetch(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception { OrderedConsumerContext ctx = sc.createOrderedConsumer(occ); try (FetchConsumer fcon = ctx.fetchMessages(6)) { // Loop through the messages to make sure I get stream sequence 1 to 6 @@ -603,7 +655,7 @@ private static void testOrderedActiveFetch(StreamContext sc, OrderedConsumerConf } } - private static void testOrderedActiveIterable(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception { + private static void testOrderedBehaviorIterable(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception { OrderedConsumerContext ctx = sc.createOrderedConsumer(occ); try (IterableConsumer icon = ctx.iterate()) { // Loop through the messages to make sure I get stream sequence 1 to 6 @@ -722,9 +774,9 @@ public void testOrderedMultipleWays() throws Exception { seq++; } - m = fc.nextMessage(); - assertNull(m); + assertNull(fc.nextMessage()); assertTrue(fc.isFinished()); + assertNull(fc.nextMessage()); // just some coverage } // can do others now diff --git a/src/test/java/io/nats/client/support/JwtUtilsTests.java b/src/test/java/io/nats/client/support/JwtUtilsTests.java index c706ec217..6850d3446 100644 --- a/src/test/java/io/nats/client/support/JwtUtilsTests.java +++ b/src/test/java/io/nats/client/support/JwtUtilsTests.java @@ -21,19 +21,34 @@ import java.util.List; import static io.nats.client.support.JwtUtils.*; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static io.nats.client.utils.TestBase.sleep; +import static org.junit.jupiter.api.Assertions.*; public class JwtUtilsTests { + static NKey USER_KEY = NKey.fromSeed("SUAGL3KX4ZBBD53BNNLSHGAAGCMXSEYZ6NTYUBUCPZQGHYNK3ZRQBUDPRY".toCharArray()); + static NKey SIGNING_KEY = NKey.fromSeed("SAANJIBNEKGCRUWJCPIWUXFBFJLR36FJTFKGBGKAT7AQXH2LVFNQWZJMQU".toCharArray()); + static String ACCOUNT_ID = "ACXZRALIL22WRETDRXYKOYDB7XC3E7MBSVUSUMFACO6OM5VPRNFMOOO6"; @Test public void issueUserJWTSuccessMinimal() throws Exception { - NKey userKey = NKey.fromSeed("SUAGL3KX4ZBBD53BNNLSHGAAGCMXSEYZ6NTYUBUCPZQGHYNK3ZRQBUDPRY".toCharArray()); - NKey signingKey = NKey.fromSeed("SAANJIBNEKGCRUWJCPIWUXFBFJLR36FJTFKGBGKAT7AQXH2LVFNQWZJMQU".toCharArray()); - String accountId = "ACXZRALIL22WRETDRXYKOYDB7XC3E7MBSVUSUMFACO6OM5VPRNFMOOO6"; - String jwt = issueUserJWT(signingKey, accountId, new String(userKey.getPublicKey()), null, null, null, 1633043378, "audience"); + String expectedClaimBody = "{\"aud\":\"audience\",\"jti\":\"PASRIRDVH2NWAPOKCO7TFIJVWI2OESTOH4CJ2PSGYH77YPQRXPVA\",\"iat\":1633043378,\"iss\":\"ADQ4BYM5KICR5OXDSP3S3WVJ5CYEORGQKT72SVRF2ZDVA7LTFKMCIPGY\",\"name\":\"UA6KOMQ67XOE3FHE37W4OXADVXVYISBNLTBUT2LSY5VFKAIJ7CRDR2RZ\",\"sub\":\"UA6KOMQ67XOE3FHE37W4OXADVXVYISBNLTBUT2LSY5VFKAIJ7CRDR2RZ\",\"nats\":{\"issuer_account\":\"ACXZRALIL22WRETDRXYKOYDB7XC3E7MBSVUSUMFACO6OM5VPRNFMOOO6\",\"type\":\"user\",\"version\":2,\"subs\":-1,\"data\":-1,\"payload\":-1}}"; + String expectedCred = "-----BEGIN NATS USER JWT-----\n" + + "eyJ0eXAiOiJKV1QiLCAiYWxnIjoiZWQyNTUxOS1ua2V5In0.eyJhdWQiOiJhdWRpZW5jZSIsImp0aSI6IlBBU1JJUkRWSDJOV0FQT0tDTzdURklKVldJMk9FU1RPSDRDSjJQU0dZSDc3WVBRUlhQVkEiLCJpYXQiOjE2MzMwNDMzNzgsImlzcyI6IkFEUTRCWU01S0lDUjVPWERTUDNTM1dWSjVDWUVPUkdRS1Q3MlNWUkYyWkRWQTdMVEZLTUNJUEdZIiwibmFtZSI6IlVBNktPTVE2N1hPRTNGSEUzN1c0T1hBRFZYVllJU0JOTFRCVVQyTFNZNVZGS0FJSjdDUkRSMlJaIiwic3ViIjoiVUE2S09NUTY3WE9FM0ZIRTM3VzRPWEFEVlhWWUlTQk5MVEJVVDJMU1k1VkZLQUlKN0NSRFIyUloiLCJuYXRzIjp7Imlzc3Vlcl9hY2NvdW50IjoiQUNYWlJBTElMMjJXUkVURFJYWUtPWURCN1hDM0U3TUJTVlVTVU1GQUNPNk9NNVZQUk5GTU9PTzYiLCJ0eXBlIjoidXNlciIsInZlcnNpb24iOjIsInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTF9fQ.nt_bErX7UuqDSxC8NUORaB0r4IS_33Wds1vV_o0HRI-BwE9UxM-zAFtq43o3-d98s6u1jASgVXp0h81om8mVDw\n" + + "------END NATS USER JWT------\n" + + "\n" + + "************************* IMPORTANT *************************\n" + + " NKEY Seed printed below can be used to sign and prove identity.\n" + + " NKEYs are sensitive and should be treated as secrets.\n" + + "\n" + + "-----BEGIN USER NKEY SEED-----\n" + + "SUAGL3KX4ZBBD53BNNLSHGAAGCMXSEYZ6NTYUBUCPZQGHYNK3ZRQBUDPRY\n" + + "------END USER NKEY SEED------\n" + + "\n" + + "*************************************************************\n"; + + String jwt = issueUserJWT(SIGNING_KEY, ACCOUNT_ID, new String(USER_KEY.getPublicKey()), null, null, null, 1633043378, "audience"); String claimBody = getClaimBody(jwt); - String cred = String.format(NATS_USER_JWT_FORMAT, jwt, new String(userKey.getSeed())); + String cred = String.format(NATS_USER_JWT_FORMAT, jwt, new String(USER_KEY.getSeed())); /* Formatted Claim Body: { @@ -53,9 +68,15 @@ public void issueUserJWTSuccessMinimal() throws Exception { } } */ - String expectedClaimBody = "{\"aud\":\"audience\",\"jti\":\"PASRIRDVH2NWAPOKCO7TFIJVWI2OESTOH4CJ2PSGYH77YPQRXPVA\",\"iat\":1633043378,\"iss\":\"ADQ4BYM5KICR5OXDSP3S3WVJ5CYEORGQKT72SVRF2ZDVA7LTFKMCIPGY\",\"name\":\"UA6KOMQ67XOE3FHE37W4OXADVXVYISBNLTBUT2LSY5VFKAIJ7CRDR2RZ\",\"sub\":\"UA6KOMQ67XOE3FHE37W4OXADVXVYISBNLTBUT2LSY5VFKAIJ7CRDR2RZ\",\"nats\":{\"issuer_account\":\"ACXZRALIL22WRETDRXYKOYDB7XC3E7MBSVUSUMFACO6OM5VPRNFMOOO6\",\"type\":\"user\",\"version\":2,\"subs\":-1,\"data\":-1,\"payload\":-1}}"; + assertEquals(expectedClaimBody, claimBody); + assertEquals(expectedCred, cred); + } + + @Test + public void issueUserJWTSuccessMinimalCoverageNonAudienceApi() throws Exception { + String expectedClaimBody = "{\"jti\":\"WQYMEEISPFLDXLNAOEF3TC2FWLZCKVCPQZWMDBDCGT3ZTSSHSBYA\",\"iat\":1633043378,\"iss\":\"ADQ4BYM5KICR5OXDSP3S3WVJ5CYEORGQKT72SVRF2ZDVA7LTFKMCIPGY\",\"name\":\"UA6KOMQ67XOE3FHE37W4OXADVXVYISBNLTBUT2LSY5VFKAIJ7CRDR2RZ\",\"sub\":\"UA6KOMQ67XOE3FHE37W4OXADVXVYISBNLTBUT2LSY5VFKAIJ7CRDR2RZ\",\"nats\":{\"issuer_account\":\"ACXZRALIL22WRETDRXYKOYDB7XC3E7MBSVUSUMFACO6OM5VPRNFMOOO6\",\"type\":\"user\",\"version\":2,\"subs\":-1,\"data\":-1,\"payload\":-1}}"; String expectedCred = "-----BEGIN NATS USER JWT-----\n" + - "eyJ0eXAiOiJKV1QiLCAiYWxnIjoiZWQyNTUxOS1ua2V5In0.eyJhdWQiOiJhdWRpZW5jZSIsImp0aSI6IlBBU1JJUkRWSDJOV0FQT0tDTzdURklKVldJMk9FU1RPSDRDSjJQU0dZSDc3WVBRUlhQVkEiLCJpYXQiOjE2MzMwNDMzNzgsImlzcyI6IkFEUTRCWU01S0lDUjVPWERTUDNTM1dWSjVDWUVPUkdRS1Q3MlNWUkYyWkRWQTdMVEZLTUNJUEdZIiwibmFtZSI6IlVBNktPTVE2N1hPRTNGSEUzN1c0T1hBRFZYVllJU0JOTFRCVVQyTFNZNVZGS0FJSjdDUkRSMlJaIiwic3ViIjoiVUE2S09NUTY3WE9FM0ZIRTM3VzRPWEFEVlhWWUlTQk5MVEJVVDJMU1k1VkZLQUlKN0NSRFIyUloiLCJuYXRzIjp7Imlzc3Vlcl9hY2NvdW50IjoiQUNYWlJBTElMMjJXUkVURFJYWUtPWURCN1hDM0U3TUJTVlVTVU1GQUNPNk9NNVZQUk5GTU9PTzYiLCJ0eXBlIjoidXNlciIsInZlcnNpb24iOjIsInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTF9fQ.nt_bErX7UuqDSxC8NUORaB0r4IS_33Wds1vV_o0HRI-BwE9UxM-zAFtq43o3-d98s6u1jASgVXp0h81om8mVDw\n" + + "eyJ0eXAiOiJKV1QiLCAiYWxnIjoiZWQyNTUxOS1ua2V5In0.eyJqdGkiOiJXUVlNRUVJU1BGTERYTE5BT0VGM1RDMkZXTFpDS1ZDUFFaV01EQkRDR1QzWlRTU0hTQllBIiwiaWF0IjoxNjMzMDQzMzc4LCJpc3MiOiJBRFE0QllNNUtJQ1I1T1hEU1AzUzNXVko1Q1lFT1JHUUtUNzJTVlJGMlpEVkE3TFRGS01DSVBHWSIsIm5hbWUiOiJVQTZLT01RNjdYT0UzRkhFMzdXNE9YQURWWFZZSVNCTkxUQlVUMkxTWTVWRktBSUo3Q1JEUjJSWiIsInN1YiI6IlVBNktPTVE2N1hPRTNGSEUzN1c0T1hBRFZYVllJU0JOTFRCVVQyTFNZNVZGS0FJSjdDUkRSMlJaIiwibmF0cyI6eyJpc3N1ZXJfYWNjb3VudCI6IkFDWFpSQUxJTDIyV1JFVERSWFlLT1lEQjdYQzNFN01CU1ZVU1VNRkFDTzZPTTVWUFJORk1PT082IiwidHlwZSI6InVzZXIiLCJ2ZXJzaW9uIjoyLCJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xfX0.5Zq3wTy_5WeBYmFmYjI1OArCJ8IiJpFCflm5tnoWB7bYOOWoINR2t8a0i2fuGVYtFmDBAV8im_G2sibtlu53AQ\n" + "------END NATS USER JWT------\n" + "\n" + "************************* IMPORTANT *************************\n" + @@ -67,18 +88,44 @@ public void issueUserJWTSuccessMinimal() throws Exception { "------END USER NKEY SEED------\n" + "\n" + "*************************************************************\n"; + + String jwt = issueUserJWT(SIGNING_KEY, ACCOUNT_ID, new String(USER_KEY.getPublicKey()), null, null, null, 1633043378, null); + _testMinimalNoAudience(expectedClaimBody, expectedCred, USER_KEY, jwt); + + jwt = issueUserJWT(SIGNING_KEY, ACCOUNT_ID, new String(USER_KEY.getPublicKey()), null, null, null, 1633043378); + _testMinimalNoAudience(expectedClaimBody, expectedCred, USER_KEY, jwt); + } + + private static void _testMinimalNoAudience(String expectedClaimBody, String expectedCred, NKey userKey, String jwt) { + String claimBody = getClaimBody(jwt); + String cred = String.format(NATS_USER_JWT_FORMAT, jwt, new String(userKey.getSeed())); + /* + Formatted Claim Body: + { + "jti": "WQYMEEISPFLDXLNAOEF3TC2FWLZCKVCPQZWMDBDCGT3ZTSSHSBYA", + "iat": 1633043378, + "iss": "ADQ4BYM5KICR5OXDSP3S3WVJ5CYEORGQKT72SVRF2ZDVA7LTFKMCIPGY", + "name": "UA6KOMQ67XOE3FHE37W4OXADVXVYISBNLTBUT2LSY5VFKAIJ7CRDR2RZ", + "sub": "UA6KOMQ67XOE3FHE37W4OXADVXVYISBNLTBUT2LSY5VFKAIJ7CRDR2RZ", + "nats": { + "issuer_account": "ACXZRALIL22WRETDRXYKOYDB7XC3E7MBSVUSUMFACO6OM5VPRNFMOOO6", + "type": "user", + "version": 2, + "subs": -1, + "data": -1, + "payload": -1 + } + } + */ assertEquals(expectedClaimBody, claimBody); assertEquals(expectedCred, cred); } @Test public void issueUserJWTSuccessAllArgs() throws Exception { - NKey userKey = NKey.fromSeed("SUAGL3KX4ZBBD53BNNLSHGAAGCMXSEYZ6NTYUBUCPZQGHYNK3ZRQBUDPRY".toCharArray()); - NKey signingKey = NKey.fromSeed("SAANJIBNEKGCRUWJCPIWUXFBFJLR36FJTFKGBGKAT7AQXH2LVFNQWZJMQU".toCharArray()); - String accountId = "ACXZRALIL22WRETDRXYKOYDB7XC3E7MBSVUSUMFACO6OM5VPRNFMOOO6"; - String jwt = issueUserJWT(signingKey, accountId, new String(userKey.getPublicKey()), "name", Duration.ofSeconds(100), new String[]{"tag1", "tag\\two"}, 1633043378, "audience"); + String jwt = issueUserJWT(SIGNING_KEY, ACCOUNT_ID, new String(USER_KEY.getPublicKey()), "name", Duration.ofSeconds(100), new String[]{"tag1", "tag\\two"}, 1633043378, "audience"); String claimBody = getClaimBody(jwt); - String cred = String.format(NATS_USER_JWT_FORMAT, jwt, new String(userKey.getSeed())); + String cred = String.format(NATS_USER_JWT_FORMAT, jwt, new String(USER_KEY.getSeed())); /* Formatted Claim Body: { @@ -120,9 +167,6 @@ public void issueUserJWTSuccessAllArgs() throws Exception { @Test public void issueUserJWTSuccessCustom() throws Exception { - NKey userKey = NKey.fromSeed("SUAGL3KX4ZBBD53BNNLSHGAAGCMXSEYZ6NTYUBUCPZQGHYNK3ZRQBUDPRY".toCharArray()); - NKey signingKey = NKey.fromSeed("SAANJIBNEKGCRUWJCPIWUXFBFJLR36FJTFKGBGKAT7AQXH2LVFNQWZJMQU".toCharArray()); - UserClaim userClaim = new UserClaim("ACXZRALIL22WRETDRXYKOYDB7XC3E7MBSVUSUMFACO6OM5VPRNFMOOO6") .pub(new Permission() .allow(new String[] {"pub-allow-subject"}) @@ -132,9 +176,9 @@ public void issueUserJWTSuccessCustom() throws Exception { .deny(new String[] {"sub-deny-subject"})) .tags(new String[]{"tag1", "tag\\two"}); - String jwt = issueUserJWT(signingKey, new String(userKey.getPublicKey()), "custom", null, 1633043378, userClaim); + String jwt = issueUserJWT(SIGNING_KEY, new String(USER_KEY.getPublicKey()), "custom", null, 1633043378, userClaim); String claimBody = getClaimBody(jwt); - String cred = String.format(NATS_USER_JWT_FORMAT, jwt, new String(userKey.getSeed())); + String cred = String.format(NATS_USER_JWT_FORMAT, jwt, new String(USER_KEY.getSeed())); /* Formatted Claim Body: { @@ -183,17 +227,14 @@ public void issueUserJWTSuccessCustom() throws Exception { @Test public void issueUserJWTSuccessCustomLimits() throws Exception { - NKey userKey = NKey.fromSeed("SUAGL3KX4ZBBD53BNNLSHGAAGCMXSEYZ6NTYUBUCPZQGHYNK3ZRQBUDPRY".toCharArray()); - NKey signingKey = NKey.fromSeed("SAANJIBNEKGCRUWJCPIWUXFBFJLR36FJTFKGBGKAT7AQXH2LVFNQWZJMQU".toCharArray()); - UserClaim userClaim = new UserClaim("ACXZRALIL22WRETDRXYKOYDB7XC3E7MBSVUSUMFACO6OM5VPRNFMOOO6") .subs(1) .data(2) .payload(3); - String jwt = issueUserJWT(signingKey, new String(userKey.getPublicKey()), null, null, 1633043378, userClaim); + String jwt = issueUserJWT(SIGNING_KEY, new String(USER_KEY.getPublicKey()), null, null, 1633043378, userClaim); String claimBody = getClaimBody(jwt); - String cred = String.format(NATS_USER_JWT_FORMAT, jwt, new String(userKey.getSeed())); + String cred = String.format(NATS_USER_JWT_FORMAT, jwt, new String(USER_KEY.getSeed())); /* Formatted Claim Body: @@ -236,8 +277,7 @@ public void issueUserJWTBadSigningKey() { NKey userKey = NKey.fromSeed("SUAGL3KX4ZBBD53BNNLSHGAAGCMXSEYZ6NTYUBUCPZQGHYNK3ZRQBUDPRY".toCharArray()); // should be account, but this is a user key: NKey signingKey = NKey.fromSeed("SUAIW7IZ2YDQYLTE4FJ64ZBX7UMLCN57V6GHALKMUSMJCU5PJDNUO6BVUI".toCharArray()); - String accountId = "ACXZRALIL22WRETDRXYKOYDB7XC3E7MBSVUSUMFACO6OM5VPRNFMOOO6"; - IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> issueUserJWT(signingKey, accountId, new String(userKey.getPublicKey()), null, null, null, 1633043378)); + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> issueUserJWT(signingKey, ACCOUNT_ID, new String(userKey.getPublicKey()), null, null, null, 1633043378)); assertEquals("issueUserJWT requires an account key for the signingKey parameter, but got USER", e.getMessage()); } @@ -255,8 +295,7 @@ public void issueUserJWTBadAccountId() { public void issueUserJWTBadPublicUserKey() { NKey userKey = NKey.fromSeed("SAADFHQTEKYBOCG4CPEPNAJ5FLRX4G4WTCNTAIOKN3LARLHGVKB4BRUHYY".toCharArray()); NKey signingKey = NKey.fromSeed("SAANJIBNEKGCRUWJCPIWUXFBFJLR36FJTFKGBGKAT7AQXH2LVFNQWZJMQU".toCharArray()); - String accountId = "ACXZRALIL22WRETDRXYKOYDB7XC3E7MBSVUSUMFACO6OM5VPRNFMOOO6"; - IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> issueUserJWT(signingKey, accountId, new String(userKey.getPublicKey()), null, null, null, 1633043378)); + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> issueUserJWT(signingKey, ACCOUNT_ID, new String(userKey.getPublicKey()), null, null, null, 1633043378)); assertEquals("issueUserJWT requires a user key for the publicUserKey parameter, but got ACCOUNT", e.getMessage()); } @@ -283,6 +322,13 @@ public void testUserClaimJson() { assertEquals(FULL_JSON, uc.toJson()); } + @Test + public void testMiscCoverage() { + long seconds = JwtUtils.currentTimeSeconds(); + sleep(1000); + assertTrue(JwtUtils.currentTimeSeconds() > seconds); + } + private static final String BASIC_JSON = "{\"issuer_account\":\"test-issuer-account\",\"type\":\"user\",\"version\":2,\"subs\":-1,\"data\":-1,\"payload\":-1}"; private static final String FULL_JSON = "{\"issuer_account\":\"test-issuer-account\",\"tags\":[\"tag1\",\"tag2\"],\"type\":\"user\",\"version\":2,\"pub\":{\"allow\":[\"pa1\",\"pa2\"],\"deny\":[\"pd1\",\"pd2\"]},\"sub\":{\"allow\":[\"sa1\",\"sa2\"],\"deny\":[\"sd1\",\"sd2\"]},\"resp\":{\"max\":99,\"ttl\":999000000},\"src\":[\"src1\",\"src2\"],\"times\":[{\"start\":\"01:15:00\",\"end\":\"03:15:00\"}],\"times_location\":\"US\\/Eastern\",\"subs\":42,\"data\":43,\"payload\":44,\"bearer_token\":true,\"allowed_connection_types\":[\"nats\",\"tls\"]}"; diff --git a/src/test/java/io/nats/client/utils/TestBase.java b/src/test/java/io/nats/client/utils/TestBase.java index 7f7d1ebe1..fe47715dd 100644 --- a/src/test/java/io/nats/client/utils/TestBase.java +++ b/src/test/java/io/nats/client/utils/TestBase.java @@ -237,20 +237,31 @@ private static void cleanupJs(Connection c) public static final String KEY = "key"; public static final String DATA = "data"; + public static String variant(Object variant) { + return variant == null ? NUID.nextGlobalSequence() : "" + variant; + } + + public static String stream() { + return STREAM + "-" + variant(null); + } + public static String stream(Object variant) { - return STREAM + "-" + variant; + return STREAM + "-" + variant(variant); } public static String mirror(Object variant) { - return MIRROR + "-" + variant; + return MIRROR + "-" + variant(variant); } public static String source(Object variant) { - return SOURCE + "-" + variant; + return SOURCE + "-" + variant(variant); } + public static String subject() { + return SUBJECT + "-" + variant(null); + } public static String subject(Object variant) { - return SUBJECT + "-" + variant; + return SUBJECT + "-" + variant(variant); } public static String subjectDot(String field) { @@ -258,39 +269,47 @@ public static String subjectDot(String field) { } public static String queue(Object variant) { - return QUEUE + "-" + variant; + return QUEUE + "-" + variant(variant); + } + + public static String durable() { + return DURABLE + "-" + variant(null); } public static String durable(Object variant) { - return DURABLE + "-" + variant; + return DURABLE + "-" + variant(variant); } public static String durable(String vary, Object variant) { - return DURABLE + "-" + vary + "-" + variant; + return DURABLE + "-" + vary + "-" + variant(variant); + } + + public static String name() { + return NAME + "-" + variant(null); } public static String name(Object variant) { - return NAME + "-" + variant; + return NAME + "-" + variant(variant); } public static String deliver(Object variant) { - return DELIVER + "-" + variant; + return DELIVER + "-" + variant(variant); } public static String bucket(Object variant) { - return BUCKET + "-" + variant; + return BUCKET + "-" + variant(variant); } public static String key(Object variant) { - return KEY + "-" + variant; + return KEY + "-" + variant(variant); } public static String messageId(Object variant) { - return MESSAGE_ID + "-" + variant; + return MESSAGE_ID + "-" + variant(variant); } public static String data(Object variant) { - return DATA + "-" + variant; + return DATA + "-" + variant(variant); } public static byte[] dataBytes(Object variant) { diff --git a/src/test/java/io/nats/service/ServiceTests.java b/src/test/java/io/nats/service/ServiceTests.java index cd37dca90..db9b0a8b3 100644 --- a/src/test/java/io/nats/service/ServiceTests.java +++ b/src/test/java/io/nats/service/ServiceTests.java @@ -824,6 +824,9 @@ public void testGroupConstruction() { assertNull(g1.getNext()); assertNull(g2.getNext()); assertNull(g3.getNext()); + assertTrue(g1.toString().contains(subject(1))); // coverage + assertTrue(g2.toString().contains(subject(2))); // coverage + assertTrue(g3.toString().contains(subject(3))); // coverage assertEquals(g1, g1.appendGroup(g2)); assertEquals(subject(2), g1.getNext().getName()); @@ -832,12 +835,14 @@ public void testGroupConstruction() { assertEquals(subject(1) + DOT + subject(2), g1.getSubject()); assertEquals(subject(2), g2.getName()); assertEquals(subject(2), g2.getSubject()); + assertTrue(g1.toString().contains(subject(2))); // coverage assertEquals(g1, g1.appendGroup(g3)); assertEquals(subject(2), g1.getNext().getName()); assertEquals(subject(3), g1.getNext().getNext().getName()); assertEquals(subject(1), g1.getName()); assertEquals(subject(1) + DOT + subject(2) + DOT + subject(3), g1.getSubject()); + assertTrue(g1.toString().contains(subject(3))); // coverage g1 = new Group("foo.*"); assertEquals("foo.*", g1.getName());