Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test coverage and add note about websocket to readme. #956

Merged
merged 4 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ Please see unit test for examples of this behavior.
and
`testMaxPayloadJs` in [JetStreamPubTests.cs](src/test/java/io/nats/client/impl/JetStreamPubTests.java)

#### Version 2.16.8 Websocket Support

As of version 2.16.8 Websocket (`ws` and `wss`) protocols are supported for connecting to the server.
For instance, your server bootstrap url might be `ws://my-nats-host:80` or `wss://my-nats-host:443`.

Your server must be properly configured for websocket, see the NATS.IO docs
[WebSocket Configuration Example](https://docs.nats.io/running-a-nats-service/configuration/websocket/websocket_conf)
for more information.

If you use secure websockets (wss), your connection must be securely configured in the same way you would configure a `tls` connection.

#### Version 2.16.0 Consumer Create

This release by default will use a new JetStream consumer create API when interacting with nats-server version 2.9.0 or higher.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/BaseConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface BaseConsumerContext {

/**
* Read the next message with provided max wait
* @param maxWait duration of max wait
* @param maxWait duration of max wait. Cannot be less than {@value BaseConsumeOptions#MIN_EXPIRES_MILLS} milliseconds.
* @return the next message or null if the max wait expires
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
Expand All @@ -48,7 +48,7 @@ public interface BaseConsumerContext {

/**
* Read the next message with provided max wait
* @param maxWaitMillis the max wait value in milliseconds
* @param maxWaitMillis the max wait value in milliseconds. Cannot be less than {@value BaseConsumeOptions#MIN_EXPIRES_MILLS} milliseconds.
* @return the next message or null if the max wait expires
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/io/nats/client/impl/NatsConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ public String getConsumerName() {
*/
@Override
public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException {
if (consumerName != null) {
cachedConsumerInfo = streamCtx.jsm.getConsumerInfo(streamCtx.streamName, cachedConsumerInfo.getName());
}
cachedConsumerInfo = streamCtx.jsm.getConsumerInfo(streamCtx.streamName, consumerName);
return cachedConsumerInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
* Implementation of Ordered Consumer Context
*/
public class NatsOrderedConsumerContext implements OrderedConsumerContext {
NatsConsumerContext impl;
private NatsConsumerContext impl;

NatsOrderedConsumerContext(NatsStreamContext streamContext, OrderedConsumerConfiguration config) {
impl = new NatsConsumerContext(streamContext, config);
}
Expand Down
33 changes: 30 additions & 3 deletions src/test/java/io/nats/client/impl/JetStreamTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,30 @@ public NatsMessage getTestMessage(String replyTo, String sid) {
// ----------------------------------------------------------------------------------------------------
// Management
// ----------------------------------------------------------------------------------------------------
public static StreamInfo createMemoryStream(JetStreamManagement jsm, String streamName, String... subjects)
throws IOException, JetStreamApiException {
public static class CreateStreamResult {
public final String stream = stream();
public final String subject = subject();
public StreamInfo si;

public CreateStreamResult info(StreamInfo si) {
this.si = si;
return this;
}
}

public static CreateStreamResult createMemoryStream(JetStreamManagement jsm) throws IOException, JetStreamApiException {
CreateStreamResult csr = new CreateStreamResult();
return csr.info(createMemoryStream(jsm, csr.stream, csr.subject));
}

public static StreamInfo createMemoryStream(JetStreamManagement jsm, String streamName, String... subjects) throws IOException, JetStreamApiException {
if (streamName == null) {
streamName = stream();
}

if (subjects == null || subjects.length == 0) {
subjects = new String[]{subject()};
}

StreamConfiguration sc = StreamConfiguration.builder()
.name(streamName)
Expand All @@ -90,8 +112,13 @@ public static StreamInfo createMemoryStream(JetStreamManagement jsm, String stre
return jsm.addStream(sc);
}

public static CreateStreamResult createMemoryStream(Connection nc)
throws IOException, JetStreamApiException {
return createMemoryStream(nc.jetStreamManagement());
}

public static StreamInfo createMemoryStream(Connection nc, String streamName, String... subjects)
throws IOException, JetStreamApiException {
throws IOException, JetStreamApiException {
return createMemoryStream(nc.jetStreamManagement(), streamName, subjects);
}

Expand Down