Skip to content

Commit

Permalink
pull status handling (#819)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Mar 20, 2023
1 parent 443968f commit 73c6039
Show file tree
Hide file tree
Showing 35 changed files with 2,291 additions and 1,398 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ repositories {
dependencies {
implementation 'net.i2p.crypto:eddsa:0.3.0'
testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0'
testImplementation 'io.nats:jnats-server-runner:1.2.1'
testImplementation 'io.nats:jnats-server-runner:1.2.5'
testImplementation 'nl.jqno.equalsverifier:equalsverifier:3.12.3'
}

Expand Down
15 changes: 2 additions & 13 deletions src/examples/java/io/nats/examples/ExampleUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.nats.examples;

import io.nats.client.*;
import io.nats.client.impl.ErrorListenerLoggerImpl;

import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -31,19 +32,7 @@ public static String getServer(String[] args) {

public static final ConnectionListener EXAMPLE_CONNECTION_LISTENER = (conn, type) -> System.out.println("Status change "+ type);

public static final ErrorListener EXAMPLE_ERROR_LISTENER = new ErrorListener() {
public void exceptionOccurred(Connection conn, Exception exp) {
System.out.println("Exception " + exp.getMessage());
}

public void errorOccurred(Connection conn, String type) {
System.out.println("Error " + type);
}

public void slowConsumerDetected(Connection conn, Consumer consumer) {
System.out.println("Slow consumer");
}
};
public static final ErrorListener EXAMPLE_ERROR_LISTENER = new ErrorListenerLoggerImpl();

public static Options createExampleOptions(String[] args) throws Exception {
String server = getServer(args);
Expand Down
56 changes: 54 additions & 2 deletions src/examples/java/io/nats/examples/jetstream/NatsJsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.impl.NatsJetStreamMetaData;
import io.nats.client.impl.NatsMessage;

import java.io.IOException;
Expand Down Expand Up @@ -178,8 +179,16 @@ public static void publish(JetStream js, String subject, int count) throws IOExc
publish(js, subject, "data", count, -1, false);
}

public static void publish(JetStream js, String subject, int count, int msgSize) throws IOException, JetStreamApiException {
publish(js, subject, "data", count, msgSize, false);
}

public static void publish(JetStream js, String subject, String prefix, int count) throws IOException, JetStreamApiException {
publish(js, subject, prefix, count, -1, true);
publish(js, subject, prefix, count, -1, false);
}

public static void publish(JetStream js, String subject, String prefix, int count, int msgSize) throws IOException, JetStreamApiException {
publish(js, subject, prefix, count, msgSize, false);
}

public static void publish(JetStream js, String subject, String prefix, int count, boolean verbose) throws IOException, JetStreamApiException {
Expand All @@ -205,7 +214,11 @@ public static void publish(JetStream js, String subject, String prefix, int coun
}

public static byte[] makeData(String prefix, int msgSize, boolean verbose, int x) {
String text = prefix + "#" + x + "#";
if (msgSize == 0) {
return null;
}

String text = prefix + "-" + x + ".";
if (verbose) {
System.out.print(" " + text);
}
Expand Down Expand Up @@ -342,6 +355,17 @@ public static void printObject(Object o, String... subObjectNames) {
System.out.println(s);
}

public static String metaString(NatsJetStreamMetaData meta) {
return "Meta{" +
"str='" + meta.getStream() + '\'' +
", con='" + meta.getConsumer() + '\'' +
", delivered=" + meta.deliveredCount() +
", strSeq=" + meta.streamSequence() +
", conSeq=" + meta.consumerSequence() +
", pending=" + meta.pendingCount() +
'}';
}

// ----------------------------------------------------------------------------------------------------
// REPORT
// ----------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -385,4 +409,32 @@ public static int count408s(List<Message> messages) {
}
return count;
}

public static void createCleanMemStream(JetStreamManagement jsm, String stream, String... subs) throws IOException, JetStreamApiException {
try {
jsm.deleteStream(stream);
}
catch (Exception ignore) {}

StreamConfiguration sc = StreamConfiguration.builder()
.name(stream)
.storageType(StorageType.Memory)
.subjects(subs)
.build();
jsm.addStream(sc);
}

public static void createCleanFileStream(JetStreamManagement jsm, String stream, String... subs) throws IOException, JetStreamApiException {
try {
jsm.deleteStream(stream);
}
catch (Exception ignore) {}

StreamConfiguration sc = StreamConfiguration.builder()
.name(stream)
.storageType(StorageType.File)
.subjects(subs)
.build();
jsm.addStream(sc);
}
}
23 changes: 21 additions & 2 deletions src/main/java/io/nats/client/ErrorListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,33 @@ default void heartbeatAlarm(Connection conn, JetStreamSubscription sub,
long lastStreamSequence, long lastConsumerSequence) {}

/**
* Called by the connection when an unhandled status is received.
*
* Called when an unhandled status is received in a push subscription.
* @param conn The connection that had the issue
* @param sub the JetStreamSubscription that this occurred on
* @param status the status
*/
default void unhandledStatus(Connection conn, JetStreamSubscription sub, Status status) {}

/**
* Called when a pull subscription receives a status message that indicates either
* the subscription or pull might be problematic
*
* @param conn The connection that had the issue
* @param sub the JetStreamSubscription that this occurred on
* @param status the status
*/
default void pullStatusWarning(Connection conn, JetStreamSubscription sub, Status status) {}

/**
* Called when a pull subscription receives a status message that indicates either
* the subscription cannot continue or the pull request cannot be processed.
*
* @param conn The connection that had the issue
* @param sub the JetStreamSubscription that this occurred on
* @param status the status
*/
default void pullStatusError(Connection conn, JetStreamSubscription sub, Status status) {}

enum FlowControlSource { FLOW_CONTROL, HEARTBEAT }

/**
Expand Down
25 changes: 23 additions & 2 deletions src/main/java/io/nats/client/JetStreamStatusException.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,31 @@
* JetStreamStatusException is used to indicate an unknown status message was received.
*/
public class JetStreamStatusException extends IllegalStateException {
public static final String DEFAULT_DESCRIPTION = "Unknown or unprocessed status message";

private final JetStreamSubscription sub;
private final String description;
private final Status status;

/**
* Construct an exception with a status message
*
* @param sub the subscription
* @param status the status
*/
public JetStreamStatusException(JetStreamSubscription sub, Status status) {
super("Unknown or unprocessed status message: " + status.getMessage());
this(sub, DEFAULT_DESCRIPTION, status);
}

/**
* Construct an exception with a status message
* @param sub the subscription
* @param description custom description
* @param status the status
*/
public JetStreamStatusException(JetStreamSubscription sub, String description, Status status) {
super(description + ": " + status.getMessage());
this.sub = sub;
this.description = description;
this.status = status;
}

Expand All @@ -43,6 +56,14 @@ public JetStreamSubscription getSubscription() {
return sub;
}

/**
* Get the description
* @return the description
*/
public String getDescription() {
return description;
}

/**
* Get the full status object
*
Expand Down
43 changes: 18 additions & 25 deletions src/main/java/io/nats/client/JetStreamSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.nats.client;

import io.nats.client.api.ConsumerInfo;
import io.nats.client.support.PullStatus;

import java.io.IOException;
import java.time.Duration;
Expand All @@ -27,9 +28,8 @@ public interface JetStreamSubscription extends Subscription {

/**
* Initiate pull with the specified batch size.
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
* ! Primitive API for Advanced use only. Prefer Fetch or Iterate
* ! Primitive API for ADVANCED use only, officially not supported. Prefer fetch, iterate or reader.
*
* @param batchSize the size of the batch
* @throws IllegalStateException if not a pull subscription.
Expand All @@ -38,11 +38,8 @@ public interface JetStreamSubscription extends Subscription {

/**
* Initiate pull with the specified request options
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
* ! Primitive API for Advanced use only. Prefer Fetch or Iterate
*
* IMPORTANT! PullRequestOptions ARE CURRENTLY EXPERIMENTAL AND SUBJECT TO CHANGE.
* ! Primitive API for ADVANCED use only, officially not supported. Prefer fetch, iterate or reader.
*
* @param pullRequestOptions the options object
* @throws IllegalStateException if not a pull subscription.
Expand All @@ -53,7 +50,7 @@ public interface JetStreamSubscription extends Subscription {
* Initiate pull in noWait mode with the specified batch size.
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
* ! Primitive API for Advanced use only. Prefer Fetch or Iterate
* ! Primitive API for ADVANCED use only, officially not supported. Prefer fetch, iterate or reader.
*
* @param batchSize the size of the batch
* @throws IllegalStateException if not a pull subscription.
Expand All @@ -62,9 +59,8 @@ public interface JetStreamSubscription extends Subscription {

/**
* Initiate pull in noWait mode with the specified batch size.
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
* ! Primitive API for Advanced use only. Prefer Fetch or Iterate
* ! Primitive API for ADVANCED use only, officially not supported. Prefer fetch, iterate or reader.
*
* @param batchSize the size of the batch
* @param expiresIn how long from now this request should be expired from the server wait list
Expand All @@ -74,9 +70,8 @@ public interface JetStreamSubscription extends Subscription {

/**
* Initiate pull in noWait mode with the specified batch size.
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
* ! Primitive API for Advanced use only. Prefer Fetch or Iterate
* ! Primitive API for ADVANCED use only, officially not supported. Prefer fetch, iterate or reader.
*
* @param batchSize the size of the batch
* @param expiresInMillis how long from now this request should be expired from the server wait list, in milliseconds
Expand All @@ -89,13 +84,12 @@ public interface JetStreamSubscription extends Subscription {
* <p>
* <code>sub.nextMessage(timeout)</code> can return a:
* <ul>
* <li>regular message
* <li>regular JetStream message
* <li>null
* </ul>
* <p>
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
* ! Primitive API for Advanced use only. Prefer Fetch or Iterate
* ! Primitive API for ADVANCED use only, officially not supported. Prefer fetch, iterate or reader.
*
* @param batchSize the size of the batch
* @param expiresIn how long from now this request should be expired from the server wait list
Expand All @@ -109,13 +103,12 @@ public interface JetStreamSubscription extends Subscription {
* <p>
* <code>sub.nextMessage(timeout)</code> can return a:
* <ul>
* <li>regular message
* <li>regular JetStream message
* <li>null
* </ul>
* <p>
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
* ! Primitive API for Advanced use only. Prefer Fetch or Iterate
* ! Primitive API for ADVANCED use only, officially not supported. Prefer fetch, iterate or reader.
*
* @param batchSize the size of the batch
* @param expiresInMillis how long from now this request should be expired from the server wait list, in milliseconds
Expand All @@ -128,7 +121,6 @@ public interface JetStreamSubscription extends Subscription {
* This uses <code>pullExpiresIn</code> under the covers, and manages all responses
* from <code>sub.nextMessage(...)</code> to only return regular JetStream messages.
* This can only be used when the subscription is pull based.
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
*
* @param batchSize the size of the batch
Expand All @@ -144,7 +136,6 @@ public interface JetStreamSubscription extends Subscription {
* This uses <code>pullExpiresIn</code> under the covers, and manages all responses
* from <code>sub.nextMessage(...)</code> to only return regular JetStream messages.
* This can only be used when the subscription is pull based.
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
*
* @param batchSize the size of the batch
Expand All @@ -161,7 +152,6 @@ public interface JetStreamSubscription extends Subscription {
* receive the first message within the max wait period. It will stop if the batch is
* fulfilled or if there are fewer than batch size messages. 408 Status messages
* are ignored and will not count toward the fulfilled batch size.
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
*
* @param batchSize the size of the batch
Expand All @@ -178,7 +168,6 @@ public interface JetStreamSubscription extends Subscription {
* receive the first message within the max wait period. It will stop if the batch is
* fulfilled or if there are fewer than batch size messages. 408 Status messages
* are ignored and will not count toward the fulfilled batch size.
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
*
* @param batchSize the size of the batch
Expand All @@ -193,11 +182,8 @@ public interface JetStreamSubscription extends Subscription {
* Prepares a reader. A reader looks like a push sync subscription,
* meaning it is just an endless stream of messages to ask for by nextMessage,
* but uses pull under the covers.
*
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
*
* THIS API IS CONSIDERED EXPERIMENTAL AND SUBJECT TO CHANGE
*
* @param batchSize the size of the batch
* @param repullAt the point in the current batch to tell the server to start the next batch
*
Expand All @@ -214,4 +200,11 @@ public interface JetStreamSubscription extends Subscription {
* @throws JetStreamApiException the request had an error related to the data
*/
ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException;
}

/**
* Get the current status of pull requests for this subscription
* ! Pull subscriptions only. Push subscription will throw IllegalStateException
* @return the PullStatus object
*/
PullStatus getPullStatus();
}
2 changes: 0 additions & 2 deletions src/main/java/io/nats/client/PullRequestOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

/**
* The PullRequestOptions class specifies the options for pull requests
*
* IMPORTANT! PullRequestOptions ARE CURRENTLY EXPERIMENTAL AND SUBJECT TO CHANGE.
*/
public class PullRequestOptions implements JsonSerializable {

Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/PullSubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* Options are set using the {@link PullSubscribeOptions.Builder} or static helper methods.
*/
public class PullSubscribeOptions extends SubscribeOptions {
public static final PullSubscribeOptions DEFAULT_PULL_OPTS = PullSubscribeOptions.builder().build();

private PullSubscribeOptions(Builder builder) {
super(builder, true, false, null, null, -1, -1);
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/PushSubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Options are set using the {@link PushSubscribeOptions.Builder} or static helper methods.
*/
public class PushSubscribeOptions extends SubscribeOptions {
public static final PushSubscribeOptions DEFAULT_PUSH_OPTS = PushSubscribeOptions.builder().build();

private PushSubscribeOptions(Builder builder, boolean ordered, String deliverSubject, String deliverGroup,
long pendingMessageLimit, long pendingByteLimit) {
Expand Down
1 change: 0 additions & 1 deletion src/main/java/io/nats/client/SubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
* The SubscribeOptions is the base class for PushSubscribeOptions and PullSubscribeOptions
*/
public abstract class SubscribeOptions {

public static final long DEFAULT_ORDERED_HEARTBEAT = 5000;

protected final String stream;
Expand Down

0 comments on commit 73c6039

Please sign in to comment.