Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/BaseConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ public class BaseConsumeOptions implements JsonSerializable {
protected BaseConsumeOptions(Builder b) {
bytes = b.bytes;
if (bytes > 0) {
messages = b.messages == -1 ? DEFAULT_MESSAGE_COUNT_WHEN_BYTES : b.messages;
messages = b.messages < 0 ? DEFAULT_MESSAGE_COUNT_WHEN_BYTES : b.messages;
}
else {
messages = b.messages == -1 ? DEFAULT_MESSAGE_COUNT : b.messages;
messages = b.messages < 0 ? DEFAULT_MESSAGE_COUNT : b.messages;
}

// validation handled in builder
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/nats/client/ConnectionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* listener is configured in the {@link Options Options} at creation time.
*/
public interface ConnectionListener {
public enum Events {
enum Events {
/** The connection has successfully completed the handshake with the nats-server. */
CONNECTED(true, "opened"),
/** The connection is permanently closed, either by manual action or failed reconnects. */
Expand All @@ -29,7 +29,7 @@ public enum Events {
RECONNECTED(true, "reconnected"),
/** The connection was reconnected and the server has been notified of all subscriptions. */
RESUBSCRIBED(false, "subscriptions re-established"),
/** The connection was told about new servers from, from the current server. */
/** The connection was made aware of new servers from the current server connection. */
DISCOVERED_SERVERS(false, "discovered servers"),
/** Server Sent a lame duck mode. */
LAME_DUCK(false, "lame duck mode");
Expand Down Expand Up @@ -77,5 +77,5 @@ public String toString() {
* @param conn the connection associated with the error
* @param type the type of event that has occurred
*/
public void connectionEvent(Connection conn, Events type);
}
void connectionEvent(Connection conn, Events type);
}
40 changes: 0 additions & 40 deletions src/main/java/io/nats/client/JetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,46 +271,6 @@ public interface JetStreamManagement {
*/
MessageInfo getLastMessage(String streamName, String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the first message of the subject.
* @param streamName the name of the stream.
* @param subject the subject to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String streamName, String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the first message created at or after the start time.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream.
* @param startTime the start time to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the first message created at or after the start time matching the subject.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream.
* @param startTime the start time to get the first message for.
* @param subject the subject to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime, String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the message of the message sequence
* is equal to or greater the requested sequence for the subject.
Expand Down
10 changes: 0 additions & 10 deletions src/main/java/io/nats/client/StreamContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,6 @@ public interface StreamContext {
*/
MessageInfo getLastMessage(String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the first message of the subject.
* @param subject the subject to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the message of the message sequence
* is equal to or greater the requested sequence for the subject.
Expand Down
24 changes: 0 additions & 24 deletions src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,30 +281,6 @@ public MessageInfo getLastMessage(String streamName, String subject) throws IOEx
return _getMessage(streamName, MessageGetRequest.lastForSubject(subject));
}

/**
* {@inheritDoc}
*/
@Override
public MessageInfo getFirstMessage(String streamName, String subject) throws IOException, JetStreamApiException {
return _getMessage(streamName, MessageGetRequest.firstForSubject(subject));
}

/**
* {@inheritDoc}
*/
@Override
public MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime) throws IOException, JetStreamApiException {
return _getMessage(streamName, MessageGetRequest.firstForStartTime(startTime));
}

/**
* {@inheritDoc}
*/
@Override
public MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime, String subject) throws IOException, JetStreamApiException {
return _getMessage(streamName, MessageGetRequest.firstForStartTimeAndSubject(startTime, subject));
}

/**
* {@inheritDoc}
*/
Expand Down
8 changes: 0 additions & 8 deletions src/main/java/io/nats/client/impl/NatsStreamContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,6 @@ public MessageInfo getLastMessage(String subject) throws IOException, JetStreamA
return jsm.getLastMessage(streamName, subject);
}

/**
* {@inheritDoc}
*/
@Override
public MessageInfo getFirstMessage(String subject) throws IOException, JetStreamApiException {
return jsm.getFirstMessage(streamName, subject);
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1291,10 +1291,6 @@ private void validateGetMessage(JetStreamManagement jsm, TestingStreamContainer
assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, -1, tsc.subject(1)), beforeCreated);
assertMessageInfo(tsc, 0, 1, jsm.getNextMessage(tsc.stream, 0, tsc.subject(0)), beforeCreated);
assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 0, tsc.subject(1)), beforeCreated);
assertMessageInfo(tsc, 0, 1, jsm.getFirstMessage(tsc.stream, tsc.subject(0)), beforeCreated);
assertMessageInfo(tsc, 1, 2, jsm.getFirstMessage(tsc.stream, tsc.subject(1)), beforeCreated);
assertMessageInfo(tsc, 0, 1, jsm.getFirstMessage(tsc.stream, beforeCreated), beforeCreated);
assertMessageInfo(tsc, 1, 2, jsm.getFirstMessage(tsc.stream, beforeCreated, tsc.subject(1)), beforeCreated);

assertMessageInfo(tsc, 0, 1, jsm.getNextMessage(tsc.stream, 1, tsc.subject(0)), beforeCreated);
assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 1, tsc.subject(1)), beforeCreated);
Expand All @@ -1307,10 +1303,8 @@ private void validateGetMessage(JetStreamManagement jsm, TestingStreamContainer

assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, -1)));
assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, 0)));
assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getFirstMessage(tsc.stream, DEFAULT_TIME)));
assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, 9)));
assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getLastMessage(tsc.stream, "not-a-subject")));
assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getFirstMessage(tsc.stream, "not-a-subject")));
assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getNextMessage(tsc.stream, 9, tsc.subject(0))));
assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getNextMessage(tsc.stream, 1, "not-a-subject")));
}
Expand Down
9 changes: 0 additions & 9 deletions src/test/java/io/nats/client/impl/SimplificationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,6 @@ private static void _testStreamContext(JetStream js, TestingStreamContainer tsc,
MessageInfo mi = streamContext.getMessage(1);
assertEquals(1, mi.getSeq());

mi = streamContext.getFirstMessage(tsc.subject());
assertEquals(1, mi.getSeq());

mi = streamContext.getLastMessage(tsc.subject());
assertEquals(6, mi.getSeq());

Expand All @@ -123,12 +120,6 @@ private static void _testStreamContext(JetStream js, TestingStreamContainer tsc,

streamContext.purge(PurgeOptions.builder().sequence(5).build());
assertThrows(JetStreamApiException.class, () -> streamContext.getMessage(1));

mi = streamContext.getFirstMessage(tsc.subject());
assertEquals(5, mi.getSeq());

streamContext.purge();
assertThrows(JetStreamApiException.class, () -> streamContext.getFirstMessage(tsc.subject()));
}

static int FETCH_EPHEMERAL = 1;
Expand Down
Loading