diff --git a/direct-batch/build.gradle b/direct-batch/build.gradle index 72aad87..cd99e6a 100644 --- a/direct-batch/build.gradle +++ b/direct-batch/build.gradle @@ -38,7 +38,8 @@ repositories { } dependencies { - implementation 'io.nats:jnats:2.21.0-SNAPSHOT' + implementation 'io.nats:jnats:2.21.1' + implementation 'io.synadia:direct-batch:0.1.0' testImplementation 'io.nats:jnats-server-runner:1.2.8' testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0' diff --git a/js-publish-extensions/README.md b/js-publish-extensions/README.md index 3f17828..231dfcf 100644 --- a/js-publish-extensions/README.md +++ b/js-publish-extensions/README.md @@ -4,8 +4,8 @@ Extensions specific to JetStream publishing. -**Current Release**: 0.1.0 -  **Current Snapshot**: 0.1.1-SNAPSHOT +**Current Release**: 0.3.0 +  **Current Snapshot**: 0.4.0-SNAPSHOT   **Gradle and Maven** `io.synadia:jnats-js-publish-extensions` [Dependencies Help](https://github.com/synadia-io/orbit.java?tab=readme-ov-file#dependencies) @@ -18,16 +18,31 @@ Extensions specific to JetStream publishing. This class parallels the standard JetStream publish api with methods that will retry the publish. -For how to use, please see the examples: +The examples: * [Publish Retrier Sync Example](src/examples/java/io/synadia/examples/PublishRetrierSyncExample.java) * [Publish Retrier Async Example](src/examples/java/io/synadia/examples/PublishRetrierAsyncExample.java) ### AsyncJsPublisher -This class is a full async message publish manager +This class is a full async message publish manager. +This utility provides a workflow of +1. Publishing a message async + * The number of inflight messages (published but not received acks) can be set. +2. Queueing and tracking of the in-flight PublishAck future +3. The ability to observe the queue and respond to events + * The message was published + * The message received a valid ack + * The publish completed with an exception + * The publish timed out. + +It can be combined with the retrier. +You must consider that when publishing async in this manner +it's possible for messages to be published out of order. +In that case you can use publish expectations. +If order of messages is a requirement, you -For how to use, please see the examples: * [Async Js Publisher Example](src/examples/java/io/synadia/examples/AsyncJsPublisherExample.java) +* [Async Js Publisher More Customized Example](src/examples/java/io/synadia/examples/AsyncJsPublisherCustomizedExample.java) --- Copyright (c) 2024-2025 Synadia Communications Inc. All Rights Reserved. diff --git a/js-publish-extensions/build.gradle b/js-publish-extensions/build.gradle index 9043fd5..f875e15 100644 --- a/js-publish-extensions/build.gradle +++ b/js-publish-extensions/build.gradle @@ -13,7 +13,7 @@ plugins { id 'signing' } -def jarVersion = "0.3.1" +def jarVersion = "0.4.0" group = 'io.synadia' def isMerge = System.getenv("BUILD_EVENT") == "push" @@ -41,7 +41,7 @@ repositories { } dependencies { - implementation 'io.nats:jnats:2.20.5' + implementation 'io.nats:jnats:2.21.1' implementation 'io.synadia:retrier:0.2.0' testImplementation 'io.nats:jnats-server-runner:1.2.8' diff --git a/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherCustomThreadsExample.java b/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherCustomThreadsExample.java index edc3985..a7192e3 100644 --- a/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherCustomThreadsExample.java +++ b/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherCustomThreadsExample.java @@ -68,7 +68,7 @@ public static void main(String[] args) { publisher.publishAsync(SUBJECT, ("data-" + x).getBytes()); } - while (publisher.preFlightSize() > 0 || publisher.inFlightSize() > 0) { + while (publisher.preFlightSize() > 0 || publisher.currentInFlight() > 0) { ExampleUtils.printStateThenWait(publisher, publishListener); } diff --git a/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherCustomizedExample.java b/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherCustomizedExample.java index f86c464..62e1d4c 100644 --- a/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherCustomizedExample.java +++ b/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherCustomizedExample.java @@ -43,7 +43,7 @@ public static void main(String[] args) { AsyncJsPublisher.Builder builder = AsyncJsPublisher.builder(nc.jetStream()) .maxInFlight(250) - .refillAllowedAt(100) + .resumeAmount(100) .pollTime(50) .holdPauseTime(150) .waitTimeout(3000) @@ -55,7 +55,7 @@ public static void main(String[] args) { publisher.publishAsync(SUBJECT, ("data-" + x).getBytes()); } - while (publisher.preFlightSize() > 0 || publisher.inFlightSize() > 0) { + while (publisher.preFlightSize() > 0 || publisher.currentInFlight() > 0) { ExampleUtils.printStateThenWait(publisher, publishListener); } diff --git a/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherExample.java b/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherExample.java index 8b531e5..5999104 100644 --- a/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherExample.java +++ b/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherExample.java @@ -8,7 +8,7 @@ import io.nats.client.Options; import io.nats.client.impl.ErrorListenerConsoleImpl; import io.synadia.jnats.extension.AsyncJsPublisher; -import io.synadia.retrier.RetryConfig; +import io.synadia.jnats.extension.PublishRetryConfig; public class AsyncJsPublisherExample { @@ -42,7 +42,7 @@ public static void main(String[] args) { // If you want to use retrying for publishing, you must give a Retry Config // -------------------------------------------------------------------------------- if (USE_RETRIER) { - builder.retryConfig(RetryConfig.DEFAULT_CONFIG); + builder.retryConfig(PublishRetryConfig.DEFAULT_CONFIG); } // The publisher is AutoCloseable @@ -52,7 +52,7 @@ public static void main(String[] args) { publisher.publishAsync(SUBJECT, ("data-" + x).getBytes()); } - while (publisher.preFlightSize() > 0 || publisher.inFlightSize() > 0) { + while (publisher.preFlightSize() > 0 || publisher.currentInFlight() > 0) { ExampleUtils.printStateThenWait(publisher, publishListener); } diff --git a/js-publish-extensions/src/examples/java/io/synadia/examples/ExamplePublishListener.java b/js-publish-extensions/src/examples/java/io/synadia/examples/ExamplePublishListener.java index f6db781..c227fb2 100644 --- a/js-publish-extensions/src/examples/java/io/synadia/examples/ExamplePublishListener.java +++ b/js-publish-extensions/src/examples/java/io/synadia/examples/ExamplePublishListener.java @@ -15,6 +15,8 @@ public class ExamplePublishListener implements AsyncJsPublishListener { public AtomicLong exceptioned = new AtomicLong(); public AtomicLong timedOut = new AtomicLong(); public AtomicLong start = new AtomicLong(); + public AtomicLong paused = new AtomicLong(); + public AtomicLong resumed = new AtomicLong(); @Override public void published(InFlight flight) { @@ -45,6 +47,24 @@ public void completedExceptionally(PostFlight postFlight) { @Override public void timeout(PostFlight postFlight) { timedOut.incrementAndGet(); - ExampleUtils.print("Timed-out", new String(postFlight.getBody()) ); + ExampleUtils.print("Timed-out", new String(postFlight.getBody())); + } + + @Override + public void paused(int currentInFlight, int maxInFlight, int resumeAmount) { + paused.incrementAndGet(); + ExampleUtils.print("Publishing paused." + + " Current In Flight: " + currentInFlight + + " Max In Flight: " + maxInFlight + + " Resume Amount: " + resumeAmount); + } + + @Override + public void resumed(int currentInFlight, int maxInFlight, int resumeAmount) { + resumed.incrementAndGet(); + ExampleUtils.print("Publishing resumed." + + " Current In Flight: " + currentInFlight + + " Max In Flight: " + maxInFlight + + " Resume Amount: " + resumeAmount); } } diff --git a/js-publish-extensions/src/examples/java/io/synadia/examples/ExampleUtils.java b/js-publish-extensions/src/examples/java/io/synadia/examples/ExampleUtils.java index 08ee4a2..abb9679 100644 --- a/js-publish-extensions/src/examples/java/io/synadia/examples/ExampleUtils.java +++ b/js-publish-extensions/src/examples/java/io/synadia/examples/ExampleUtils.java @@ -49,7 +49,7 @@ public static void printState(AsyncJsPublisher publisher, ExamplePublishListener print( "elapsed=" + publishListener.elapsed(), "pre-flight=" + publisher.preFlightSize(), - "in-flight=" + publisher.inFlightSize(), + "in-flight=" + publisher.currentInFlight(), "published=" + publishListener.published, "acked=" + publishListener.acked, "exceptioned=" + publishListener.exceptioned, diff --git a/js-publish-extensions/src/examples/java/io/synadia/examples/PublishRetrierAsyncExample.java b/js-publish-extensions/src/examples/java/io/synadia/examples/PublishRetrierAsyncExample.java index 17ca3ec..9999fd7 100644 --- a/js-publish-extensions/src/examples/java/io/synadia/examples/PublishRetrierAsyncExample.java +++ b/js-publish-extensions/src/examples/java/io/synadia/examples/PublishRetrierAsyncExample.java @@ -4,15 +4,13 @@ package io.synadia.examples; import io.nats.client.Connection; -import io.nats.client.JetStreamApiException; import io.nats.client.Nats; import io.nats.client.api.PublishAck; import io.nats.client.api.StorageType; import io.nats.client.api.StreamConfiguration; import io.synadia.jnats.extension.PublishRetrier; -import io.synadia.retrier.RetryConfig; +import io.synadia.jnats.extension.PublishRetryConfig; -import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -23,44 +21,33 @@ public class PublishRetrierAsyncExample { public static void main(String[] args) { try (Connection nc = Nats.connect()) { - try { - nc.jetStreamManagement().deleteStream(STREAM); + // create the stream, delete any existing one first for example purposes. + try { nc.jetStreamManagement().deleteStream(STREAM); } catch (Exception ignore) {} + System.out.println("Creating Stream @ " + System.currentTimeMillis()); + nc.jetStreamManagement().addStream(StreamConfiguration.builder() + .name(STREAM) + .subjects(SUBJECT) + .storageType(StorageType.File) // so it's persistent for a server restart test + .build()); + + // default attempts is 2, we change this so you have time to kill the server to test. + // the default backoff is {250, 250, 500, 500, 3000, 5000} + // the default deadline is 1 hour + // the default Retry Conditions are timeouts and no responders (both IOExceptions) + PublishRetryConfig config = PublishRetryConfig.builder().attempts(10).build(); + + int num = 0; + while (true) { + long now = System.currentTimeMillis(); + System.out.println("Publishing @ " + (++num)); + CompletableFuture cfpa = PublishRetrier.publishAsync(config, nc.jetStream(), SUBJECT, ("data" + num).getBytes()); + PublishAck pa = cfpa.get(30, TimeUnit.SECONDS); + long elapsed = System.currentTimeMillis() - now; + System.out.println("Publish Ack after " + elapsed + " --> " + pa.getJv().toJson()); } - catch (Exception ignore) {} - - // since the default backoff is {250, 250, 500, 500, 3000, 5000} - new Thread(() -> { - try { - Thread.sleep(1100); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - try { - System.out.println("Creating Stream @ " + System.currentTimeMillis()); - nc.jetStreamManagement().addStream(StreamConfiguration.builder() - .name(STREAM) - .subjects(SUBJECT) - .storageType(StorageType.Memory) - .build()); - } - catch (IOException | JetStreamApiException e) { - throw new RuntimeException(e); - } - }).start(); - - RetryConfig config = RetryConfig.builder().attempts(10).build(); - long now = System.currentTimeMillis(); - - System.out.println("Publishing @ " + now); - CompletableFuture cfpa = PublishRetrier.publishAsync(config, nc.jetStream(), SUBJECT, null); - PublishAck pa = cfpa.get(30, TimeUnit.SECONDS); - long done = System.currentTimeMillis(); - - System.out.println("Publish Ack: " + pa.getJv().toJson()); - System.out.println("Done @ " + done + ", Elapsed: " + (done - now)); } catch (Exception e) { + System.out.println("EXAMPLE EXCEPTION: " + e); e.printStackTrace(); } } diff --git a/js-publish-extensions/src/examples/java/io/synadia/examples/PublishRetrierSyncExample.java b/js-publish-extensions/src/examples/java/io/synadia/examples/PublishRetrierSyncExample.java index 360460d..45c08b6 100644 --- a/js-publish-extensions/src/examples/java/io/synadia/examples/PublishRetrierSyncExample.java +++ b/js-publish-extensions/src/examples/java/io/synadia/examples/PublishRetrierSyncExample.java @@ -4,61 +4,82 @@ package io.synadia.examples; import io.nats.client.Connection; -import io.nats.client.JetStreamApiException; +import io.nats.client.ErrorListener; import io.nats.client.Nats; +import io.nats.client.Options; import io.nats.client.api.PublishAck; import io.nats.client.api.StorageType; import io.nats.client.api.StreamConfiguration; import io.synadia.jnats.extension.PublishRetrier; -import io.synadia.retrier.RetryConfig; +import io.synadia.jnats.extension.PublishRetryConfig; +import io.synadia.jnats.extension.RetryCondition; -import java.io.IOException; +import java.time.Duration; +/** + * Publish sync retried example + */ public class PublishRetrierSyncExample { public static String STREAM = "pr-sync-stream"; public static String SUBJECT = "pr-sync-subject"; public static void main(String[] args) { - try (Connection nc = Nats.connect()) { - try { - nc.jetStreamManagement().deleteStream(STREAM); - } - catch (Exception ignore) {} + Options options = Options.builder() + .connectionListener((x,y) -> {}) + .errorListener(new ErrorListener() {}) + .build(); + try (Connection nc = Nats.connect(options)) { + // create the stream, delete any existing one first for example purposes. + try { nc.jetStreamManagement().deleteStream(STREAM); } catch (Exception ignore) {} + System.out.println("Creating Stream @ " + System.currentTimeMillis()); + nc.jetStreamManagement().addStream(StreamConfiguration.builder() + .name(STREAM) + .subjects(SUBJECT) + .storageType(StorageType.Memory) + .build()); - // since the default backoff is {250, 250, 500, 500, 3000, 5000} - new Thread(() -> { - try { - Thread.sleep(1100); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); + // -------------------------------------------------------------------------------- + // PublishRetryConfig... + // default attempts is 2 + // default backoff is {250, 250, 500, 500, 3000, 5000} + // default deadline is unlimited + // default Retry Conditions are timeouts and no responders (both IOExceptions) + // and shown here for example + // -------------------------------------------------------------------------------- + // This config will retry 3 times with a wait of 500 millis between retries + // but since the deadline is short, the deadline will short circuit that. + // This should be tuned to match your needs. + // -------------------------------------------------------------------------------- + PublishRetryConfig config = PublishRetryConfig.builder() + .attempts(3) + .backoffPolicy(new long[]{500}) + .deadline(Duration.ofSeconds(2)) + .retryConditions(RetryCondition.NoResponders, RetryCondition.IoEx) + .build(); + + int num = 0; + boolean keepGoing; + do { + long now = System.currentTimeMillis(); + System.out.print("Publishing @ " + (++num) + "..."); + PublishAck pa = PublishRetrier.publish(config, nc.jetStream(), SUBJECT, null); + long elapsed = System.currentTimeMillis() - now; + keepGoing = false; + if (pa == null) { + System.out.println("No Publish Ack after " + elapsed); } - try { - System.out.println("Creating Stream @ " + System.currentTimeMillis()); - nc.jetStreamManagement().addStream(StreamConfiguration.builder() - .name(STREAM) - .subjects(SUBJECT) - .storageType(StorageType.Memory) - .build()); + else if (pa.hasError()) { + System.out.println("Publish Ack after " + elapsed + " but got error: " + pa.getError()); } - catch (IOException | JetStreamApiException e) { - throw new RuntimeException(e); + else { + keepGoing = true; + System.out.println("Publish Ack after " + elapsed + " --> " + pa.getJv().toJson()); } - }).start(); - - RetryConfig config = RetryConfig.builder().attempts(10).build(); - long now = System.currentTimeMillis(); - - System.out.println("Publishing @ " + now); - PublishAck pa = PublishRetrier.publish(config, nc.jetStream(), SUBJECT, null); - long done = System.currentTimeMillis(); - - System.out.println("Publish Ack: " + pa.getJv().toJson()); - System.out.println("Done @ " + done + ", Elapsed: " + (done - now)); + } while (keepGoing); } catch (Exception e) { - e.printStackTrace(); + System.out.println("Probably can't connect... " + e); } } } diff --git a/js-publish-extensions/src/examples/java/io/synadia/examples/TransactionalPublishExample.java b/js-publish-extensions/src/examples/java/io/synadia/examples/TransactionalPublishExample.java new file mode 100644 index 0000000..bf593a1 --- /dev/null +++ b/js-publish-extensions/src/examples/java/io/synadia/examples/TransactionalPublishExample.java @@ -0,0 +1,33 @@ +// Copyright (c) 2024-2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.examples; + +import io.nats.client.Connection; +import io.nats.client.Nats; +import io.nats.client.Options; +import io.nats.client.impl.ErrorListenerConsoleImpl; + +public class TransactionalPublishExample { + + public static final String STREAM = "exampleStream"; + public static final String SUBJECT = "exampleSubject"; + + public static void main(String[] args) { + Options options = Options.builder() + .server(Options.DEFAULT_URL) + .connectionListener((connection, events) -> ExampleUtils.print("Connection Event:" + events.getEvent())) + .errorListener(new ErrorListenerConsoleImpl()) + .build(); + + try (Connection nc = Nats.connect(options)) { + ExampleUtils.setupStream(nc, STREAM, SUBJECT); + + + } + catch (Exception e) { + //noinspection CallToPrintStackTrace + e.printStackTrace(); + } + } +} diff --git a/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublishListener.java b/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublishListener.java index fb3255d..3ad502f 100644 --- a/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublishListener.java +++ b/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublishListener.java @@ -32,4 +32,22 @@ public interface AsyncJsPublishListener { * @param flight the flight representing the message */ void timeout(PostFlight flight); + + /** + * The engine has just paused publishing waiting for inflight to + * drop below the resumeInFlightAmount + * @param currentInFlight the number of messages in flight + * @param maxInFlight the number of in flight messages when publishing will be paused + * @param resumeAmount the number of in flight messages when publishing will resume after being paused + */ + void paused(int currentInFlight, int maxInFlight, int resumeAmount); + + /** + * The engine has just resumed publishing and will continue unless + * the number of messages in flight reaches the max + * @param currentInFlight the number of messages in flight + * @param maxInFlight the number of in flight messages when publishing will be paused + * @param resumeAmount the number of in flight messages when publishing will resume after being paused + */ + void resumed(int currentInFlight, int maxInFlight, int resumeAmount); } diff --git a/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublisher.java b/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublisher.java index 6c49ef2..9bf1c8e 100644 --- a/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublisher.java +++ b/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublisher.java @@ -6,7 +6,6 @@ import io.nats.client.*; import io.nats.client.api.PublishAck; import io.nats.client.impl.Headers; -import io.synadia.retrier.RetryConfig; import java.io.IOException; import java.util.concurrent.*; @@ -21,7 +20,7 @@ */ public class AsyncJsPublisher implements AutoCloseable { public static final int DEFAULT_MAX_IN_FLIGHT = 50; - public static final int DEFAULT_REFILL_AMOUNT = 0; + public static final int DEFAULT_RESUME_AMOUNT = 0; public static final long DEFAULT_POLL_TIME = 100; public static final long DEFAULT_HOLD_PAUSE_TIME = 100; public static final long DEFAULT_WAIT_TIMEOUT = DEFAULT_MAX_IN_FLIGHT * DEFAULT_POLL_TIME; @@ -33,16 +32,15 @@ public class AsyncJsPublisher implements AutoCloseable { private final Supplier messageIdSupplier; private final String idPrefix; private final int maxInFlight; - private final int refillAllowedAt; - private final RetryConfig retryConfig; + private final int resumeAmount; + private final PublishRetryConfig retryConfig; private final AsyncJsPublishListener publishListener; private final long pollTime; private final long holdPauseTime; private final long waitTimeout; - private final boolean processAcksInOrder; private final LinkedBlockingQueue preFlight; private final LinkedBlockingQueue inFlights; - private final AtomicBoolean notInHoldingPattern; + private final AtomicBoolean notPaused; private final AtomicBoolean draining; private final AtomicBoolean keepGoingPublishRunner; private final AtomicBoolean keepGoingFlightsRunner; @@ -66,13 +64,12 @@ private AsyncJsPublisher(Builder b) { messageIdSupplier = b.messageIdSupplier; } maxInFlight = b.maxInFlight; - refillAllowedAt = b.refillAllowedAt; + resumeAmount = b.resumeAmount; retryConfig = b.retryConfig; publishListener = b.publishListener; pollTime = b.pollTime; holdPauseTime = b.holdPauseTime; waitTimeout = b.waitTimeout; - processAcksInOrder = b.processAcksInOrder; if (b.notificationExecutorService == null) { notificationExecutorService = Executors.newFixedThreadPool(1); @@ -85,7 +82,7 @@ private AsyncJsPublisher(Builder b) { preFlight = new LinkedBlockingQueue<>(); inFlights = new LinkedBlockingQueue<>(); - notInHoldingPattern = new AtomicBoolean(true); + notPaused = new AtomicBoolean(true); draining = new AtomicBoolean(false); keepGoingPublishRunner = new AtomicBoolean(true); keepGoingFlightsRunner = new AtomicBoolean(true); @@ -158,7 +155,7 @@ public void close() throws Exception { * The number of messages currently in flight (published, awaiting ack) * @return the number */ - public int inFlightSize() { + public int currentInFlight() { return inFlights.size(); } @@ -195,11 +192,11 @@ public int getMaxInFlight() { } /** - * The configured refill allowed at + * The configured resume at amount * @return the value */ - public int getRefillAllowedAt() { - return refillAllowedAt; + public int getResumeAmount() { + return resumeAmount; } /** @@ -226,8 +223,9 @@ public long getWaitTimeout() { return waitTimeout; } + @Deprecated public boolean getProcessAcksInOrder() { - return processAcksInOrder; + return true; } /** @@ -236,7 +234,7 @@ public boolean getProcessAcksInOrder() { public void publishRunner() { try { while (keepGoingPublishRunner.get()) { - if (notInHoldingPattern.get()) { + if (notPaused.get()) { PreFlight pre = preFlight.poll(pollTime, TimeUnit.MILLISECONDS); if (pre != null) { if (pre == STOP_MARKER) { @@ -261,8 +259,10 @@ public void publishRunner() { // if we've reached the max in flight, put publishing on hold // this is reset by the flights runner when the condition is met - if (inFlights.size() >= maxInFlight) { - notInHoldingPattern.set(false); + int currentInFlight = inFlights.size(); + if (currentInFlight >= maxInFlight) { + notPaused.set(false); + notifyPaused(currentInFlight); } } } @@ -311,12 +311,15 @@ public void flightsRunner() { } catch (InterruptedException e) { Thread.currentThread().interrupt(); + keepGoingFlightsRunner.set(false); } // once the inFlight empty out/cross the refill threshold, // we are allowed to publish again - if (inFlights.size() <= refillAllowedAt) { - notInHoldingPattern.set(true); + int currentInFlight = inFlights.size(); + if (currentInFlight <= resumeAmount) { + notPaused.set(true); + notifyResumed(currentInFlight); } } } @@ -331,7 +334,10 @@ public void flightsRunner() { } private void handleExecutionException(ExecutionException e, InFlight inFlight) { - Throwable cause = e.getCause() == null ? e : e.getCause(); + Throwable cause = e; + while (cause.getCause() != null) { + cause = cause.getCause(); + } if (cause instanceof JetStreamApiException) { if (cause.getMessage().contains("10060") // expected stream does not match [10060] || (cause.getMessage().contains("10070")) // wrong last msg ID: [10070] @@ -378,6 +384,18 @@ private void notifyTimeout(PostFlight postFlight) { } } + private void notifyPaused(int currentInFlight) { + if (publishListener != null) { + notificationExecutorService.submit(() -> publishListener.paused(currentInFlight, maxInFlight, resumeAmount)); + } + } + + private void notifyResumed(int currentInFlight) { + if (publishListener != null) { + notificationExecutorService.submit(() -> publishListener.resumed(currentInFlight, maxInFlight, resumeAmount)); + } + } + /** * Creates a builder for the AsyncJsPublisher * @param js the JetStream context @@ -394,8 +412,8 @@ public static class Builder { JetStream js; Supplier messageIdSupplier; int maxInFlight = DEFAULT_MAX_IN_FLIGHT; - int refillAllowedAt = DEFAULT_REFILL_AMOUNT; - RetryConfig retryConfig; + int resumeAmount = DEFAULT_RESUME_AMOUNT; + PublishRetryConfig retryConfig; AsyncJsPublishListener publishListener; long pollTime = DEFAULT_POLL_TIME; long holdPauseTime = DEFAULT_HOLD_PAUSE_TIME; @@ -427,9 +445,9 @@ public Builder messageIdSupplier(Supplier messageIdSupplier) { * Defaults to {@value #DEFAULT_MAX_IN_FLIGHT} * In flight is defined as a message that has been published but not yet * completed either with a publish ack confirmation or an exception. - * Once the in flight maximum has been reached, no messages will be - * published until the number of messages in flight becomes less than - * or equal to the refill allowed at + * Once the in flight maximum has been reached, publishing will be paused + * and no moew messages will be published until the number of messages in flight + * becomes less than or equal to the resume amount * @param maxInFlight the maximum number of messages in flight * @return the builder */ @@ -439,25 +457,25 @@ public Builder maxInFlight(int maxInFlight) { } /** - * The number of messages to allow starting to refill the in flight queue - * if the in flight queue had reached the max in flight + * The number of messages to allow publishing to resume after a pause + * when in flight queue had reached the max in flight size * Defaults to 0 meaning it must be completely empty once it gets full * @param refillAllowedAt the amount * @return the builder */ - public Builder refillAllowedAt(int refillAllowedAt) { - this.refillAllowedAt = refillAllowedAt; + public Builder resumeAmount(int refillAllowedAt) { + this.resumeAmount = refillAllowedAt; return this; } /** - * If a retry config is supplied, the publish will be done with the Retrier - * using the supplied config. If no retry config is supplied, the publish + * If a retry config is supplied, the publish will be done using the supplied config. + * If no retry config is supplied, the publish * is just a standard one time publish * @param retryConfig the config * @return the buidler */ - public Builder retryConfig(RetryConfig retryConfig) { + public Builder retryConfig(PublishRetryConfig retryConfig) { this.retryConfig = retryConfig; return this; } @@ -509,13 +527,9 @@ public Builder waitTimeout(long waitTimeout) { } /** - * Defaults to true. Important if there are publish expectations on a message - * otherwise can be set to false. Will cause publish ack futures to be processed - * in the smae order they were published/queued - * @param processAcksInOrder the setting - * @return the builder * @deprecated Turns out processing in order is faster, - * my guess is that by waiting for the get, acks at the end of the queue complete. + * my guess is that by waiting for the get, + * acks at the end of the queue complete. */ @Deprecated public Builder processAcksInOrder(boolean processAcksInOrder) { @@ -559,17 +573,17 @@ public AsyncJsPublisher start() { * @param subject the subject to send the message to * @param headers optional headers to publish with the message. * @param body the message body - * @param options publish options + * @param po publish options * @return The future */ - public PreFlight publishAsync(String subject, Headers headers, byte[] body, PublishOptions options) { + public PreFlight publishAsync(String subject, Headers headers, byte[] body, PublishOptions po) { if (draining.get() || (!keepGoingPublishRunner.get() && !keepGoingFlightsRunner.get())) { throw new IllegalStateException("Cannot publish once drained or stopped."); } - String messageId = options != null && options.getMessageId() != null - ? options.getMessageId() : messageIdSupplier.get(); - PreFlight p = new PreFlight(messageId, subject, headers, body, options); + String messageId = po != null && po.getMessageId() != null + ? po.getMessageId() : messageIdSupplier.get(); + PreFlight p = new PreFlight(messageId, subject, headers, body, po); preFlight.offer(p); return p; } diff --git a/js-publish-extensions/src/main/java/io/synadia/jnats/extension/PublishRetrier.java b/js-publish-extensions/src/main/java/io/synadia/jnats/extension/PublishRetrier.java index 44a8e8d..1f82b53 100644 --- a/js-publish-extensions/src/main/java/io/synadia/jnats/extension/PublishRetrier.java +++ b/js-publish-extensions/src/main/java/io/synadia/jnats/extension/PublishRetrier.java @@ -4,22 +4,25 @@ package io.synadia.jnats.extension; import io.nats.client.JetStream; +import io.nats.client.JetStreamApiException; import io.nats.client.Message; import io.nats.client.PublishOptions; import io.nats.client.api.PublishAck; import io.nats.client.impl.Headers; -import io.nats.client.support.Status; -import io.synadia.retrier.RetryConfig; +import java.io.IOException; import java.util.concurrent.CompletableFuture; +import static io.synadia.jnats.extension.PublishRetryConfig.DEFAULT_CONFIG; import static io.synadia.retrier.Retrier.execute; -import static io.synadia.retrier.RetryConfig.DEFAULT_CONFIG; /** * The Publish Retrier provides methods which are built specifically for JetStream publishing. */ public class PublishRetrier { + + private static final String NO_RESPONDERS_TEXT = "No Responders"; + private PublishRetrier() {} /* ensures cannot be constructed */ /** @@ -34,10 +37,29 @@ private PublishRetrier() {} /* ensures cannot be constructed */ * @return The acknowledgement of the publish * @throws Exception various communication issues with the NATS server; only thrown if all retries failed. */ - public static PublishAck publish(RetryConfig config, JetStream js, String subject, Headers headers, byte[] body, PublishOptions options) throws Exception { - return execute(config, - () -> js.publish(subject, headers, body, options), - e -> e.getMessage().contains(Status.NO_RESPONDERS_TEXT)); + public static PublishAck publish(PublishRetryConfig config, JetStream js, String subject, Headers headers, byte[] body, PublishOptions options) throws Exception { + return execute(config.retryConfig, + () -> js.publish(subject, headers, body, options), e -> { + if (config.retryAll) { + return true; + } + if (e instanceof IOException) { + // No responders are actually surfaced as an IOException + // but we are treating it as its own entity, + // meaning no-responders does not follow the retryOnIoEx flag + // and we check it first. + if (e.getMessage().contains(NO_RESPONDERS_TEXT)) { + return config.retryOnNoResponders; + } + if (config.retryOnIoEx) { + return true; + } + } + if (config.retryOnJetStreamApiEx && e instanceof JetStreamApiException) { + return true; + } + return config.retryOnRuntimeEx && e instanceof RuntimeException; + }); } /** @@ -50,7 +72,7 @@ public static PublishAck publish(RetryConfig config, JetStream js, String subjec * @return The acknowledgement of the publish * @throws Exception various communication issues with the NATS server; only thrown if all retries failed. */ - public static PublishAck publish(RetryConfig config, JetStream js, String subject, byte[] body) throws Exception { + public static PublishAck publish(PublishRetryConfig config, JetStream js, String subject, byte[] body) throws Exception { return publish(config, js, subject, null, body, null); } @@ -65,7 +87,7 @@ public static PublishAck publish(RetryConfig config, JetStream js, String subjec * @return The acknowledgement of the publish * @throws Exception various communication issues with the NATS server; only thrown if all retries failed. */ - public static PublishAck publish(RetryConfig config, JetStream js, String subject, Headers headers, byte[] body) throws Exception { + public static PublishAck publish(PublishRetryConfig config, JetStream js, String subject, Headers headers, byte[] body) throws Exception { return publish(config, js, subject, headers, body, null); } @@ -80,7 +102,7 @@ public static PublishAck publish(RetryConfig config, JetStream js, String subjec * @return The acknowledgement of the publish * @throws Exception various communication issues with the NATS server; only thrown if all retries failed. */ - public static PublishAck publish(RetryConfig config, JetStream js, String subject, byte[] body, PublishOptions options) throws Exception { + public static PublishAck publish(PublishRetryConfig config, JetStream js, String subject, byte[] body, PublishOptions options) throws Exception { return publish(config, js, subject, null, body, options); } @@ -93,7 +115,7 @@ public static PublishAck publish(RetryConfig config, JetStream js, String subjec * @return The acknowledgement of the publish * @throws Exception various communication issues with the NATS server; only thrown if all retries failed. */ - public static PublishAck publish(RetryConfig config, JetStream js, Message message) throws Exception { + public static PublishAck publish(PublishRetryConfig config, JetStream js, Message message) throws Exception { return publish(config, js, message.getSubject(), message.getHeaders(), message.getData(), null); } @@ -107,7 +129,7 @@ public static PublishAck publish(RetryConfig config, JetStream js, Message messa * @return The acknowledgement of the publish * @throws Exception various communication issues with the NATS server; only thrown if all retries failed. */ - public static PublishAck publish(RetryConfig config, JetStream js, Message message, PublishOptions options) throws Exception { + public static PublishAck publish(PublishRetryConfig config, JetStream js, Message message, PublishOptions options) throws Exception { return publish(config, js, message.getSubject(), message.getHeaders(), message.getData(), options); } @@ -203,7 +225,7 @@ public static PublishAck publish(JetStream js, Message message, PublishOptions o * @param options publish options * @return The future */ - public static CompletableFuture publishAsync(RetryConfig config, JetStream js, String subject, Headers headers, byte[] body, PublishOptions options) { + public static CompletableFuture publishAsync(PublishRetryConfig config, JetStream js, String subject, Headers headers, byte[] body, PublishOptions options) { return CompletableFuture.supplyAsync(() -> { try { return publish(config, js, subject, headers, body, options); @@ -223,7 +245,7 @@ public static CompletableFuture publishAsync(RetryConfig config, Jet * @param body the message body * @return The future */ - public static CompletableFuture publishAsync(RetryConfig config, JetStream js, String subject, byte[] body) { + public static CompletableFuture publishAsync(PublishRetryConfig config, JetStream js, String subject, byte[] body) { return publishAsync(config, js, subject, null, body, null); } @@ -237,7 +259,7 @@ public static CompletableFuture publishAsync(RetryConfig config, Jet * @param body the message body * @return The future */ - public static CompletableFuture publishAsync(RetryConfig config, JetStream js, String subject, Headers headers, byte[] body) { + public static CompletableFuture publishAsync(PublishRetryConfig config, JetStream js, String subject, Headers headers, byte[] body) { return publishAsync(config, js, subject, headers, body, null); } @@ -251,7 +273,7 @@ public static CompletableFuture publishAsync(RetryConfig config, Jet * @param options publish options * @return The future */ - public static CompletableFuture publishAsync(RetryConfig config, JetStream js, String subject, byte[] body, PublishOptions options) { + public static CompletableFuture publishAsync(PublishRetryConfig config, JetStream js, String subject, byte[] body, PublishOptions options) { return publishAsync(config, js, subject, null, body, options); } @@ -263,7 +285,7 @@ public static CompletableFuture publishAsync(RetryConfig config, Jet * @param message the message to publish * @return The future */ - public static CompletableFuture publishAsync(RetryConfig config, JetStream js, Message message) { + public static CompletableFuture publishAsync(PublishRetryConfig config, JetStream js, Message message) { return publishAsync(config, js, message.getSubject(), message.getHeaders(), message.getData(), null); } @@ -276,7 +298,7 @@ public static CompletableFuture publishAsync(RetryConfig config, Jet * @param options publish options * @return The future */ - public static CompletableFuture publishAsync(RetryConfig config, JetStream js, Message message, PublishOptions options) { + public static CompletableFuture publishAsync(PublishRetryConfig config, JetStream js, Message message, PublishOptions options) { return publishAsync(config, js, message.getSubject(), message.getHeaders(), message.getData(), options); } diff --git a/js-publish-extensions/src/main/java/io/synadia/jnats/extension/PublishRetryConfig.java b/js-publish-extensions/src/main/java/io/synadia/jnats/extension/PublishRetryConfig.java new file mode 100644 index 0000000..1cb8844 --- /dev/null +++ b/js-publish-extensions/src/main/java/io/synadia/jnats/extension/PublishRetryConfig.java @@ -0,0 +1,146 @@ +// Copyright (c) 2024-2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.jnats.extension; + +import io.synadia.retrier.RetryConfig; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A class to config how publish retries are executed. + */ +public class PublishRetryConfig { + + public static final int DEFAULT_ATTEMPTS; + public static final long[] DEFAULT_BACKOFF_POLICY; + public static final List DEFAULT_RETRY_CONDITIONS; + public static final PublishRetryConfig DEFAULT_CONFIG; + + static { + // DEV NOTE. ORDER MATTERS. WE NEED THESE FOR THE CONFIG BUILDER + DEFAULT_ATTEMPTS = RetryConfig.DEFAULT_ATTEMPTS; + DEFAULT_BACKOFF_POLICY = RetryConfig.DEFAULT_BACKOFF_POLICY; + List list = new ArrayList<>(); + list.add(RetryCondition.IoEx); + list.add(RetryCondition.NoResponders); + DEFAULT_RETRY_CONDITIONS = Collections.unmodifiableList(list); + + // CONFIG BUILDER LAST + DEFAULT_CONFIG = PublishRetryConfig.builder().build(); + } + + public final RetryConfig retryConfig; + public final boolean retryAll; + public final boolean retryOnNoResponders; + public final boolean retryOnIoEx; + public final boolean retryOnJetStreamApiEx; + public final boolean retryOnRuntimeEx; + + public PublishRetryConfig(RetryConfig retryConfig, List retryConditions) { + this.retryConfig = retryConfig; + this.retryOnNoResponders = retryConditions.contains(RetryCondition.NoResponders); + this.retryOnIoEx = retryConditions.contains(RetryCondition.IoEx); + this.retryOnJetStreamApiEx = retryConditions.contains(RetryCondition.JetStreamApiEx); + this.retryOnRuntimeEx = retryConditions.contains(RetryCondition.RuntimeEx); + retryAll = retryOnNoResponders && retryOnIoEx && retryOnJetStreamApiEx && retryOnRuntimeEx; + } + + /** + * Creates a builder for the config. + * @return the builder + */ + public static Builder builder() { + return new Builder(); + } + + /** + * The builder class for the RetryConfig + */ + public static class Builder { + RetryConfig.Builder rcb = RetryConfig.builder(); + List retryConditions = new ArrayList<>(); + + public Builder() { + retryConditions.add(RetryCondition.NoResponders); + retryConditions.add(RetryCondition.IoEx); + } + + /** + * Set the backoff policy + * @param backoffPolicy the policy array + * @return the builder + */ + public Builder backoffPolicy(long[] backoffPolicy) { + rcb.backoffPolicy(backoffPolicy); + return this; + } + + /** + * Set the number of times to retry + * @param attempts the number of retry attempts + * @return the builder + */ + public Builder attempts(int attempts) { + rcb.attempts(attempts); + return this; + } + + /** + * Set the deadline. The retry will be given at max this much time to execute + * if expires before the number of retries. + * @param retryDeadlineMillis the deadline time in millis + * @return the builder + */ + public Builder deadline(long retryDeadlineMillis) { + rcb.deadline(retryDeadlineMillis); + return this; + } + + /** + * Set the deadline by duration. Will be truncated to millis. + * The retry will be given at max this much time to execute + * if expires before the number of retries. + * @param retryDeadline the deadline duration + * @return the builder + */ + public Builder deadline(Duration retryDeadline) { + rcb.deadline(retryDeadline.toMillis()); + return this; + } + + /** + * Set the exception conditions where the publisher allows the retrier to continue + * @param retryConditions the conditions + * @return the builder + */ + public Builder retryConditions(RetryCondition... retryConditions) { + List temp = new ArrayList<>(); + if (retryConditions != null) { + for (RetryCondition rc : retryConditions) { + if (rc != null) { + temp.add(rc); + } + } + } + if (temp.isEmpty()) { + this.retryConditions = DEFAULT_RETRY_CONDITIONS; + } + else { + this.retryConditions = temp; + } + return this; + } + + /** + * Builds the retry config. + * @return RetryConfig instance + */ + public PublishRetryConfig build() { + return new PublishRetryConfig(rcb.build(), retryConditions); + } + } +} diff --git a/js-publish-extensions/src/main/java/io/synadia/jnats/extension/RetryCondition.java b/js-publish-extensions/src/main/java/io/synadia/jnats/extension/RetryCondition.java new file mode 100644 index 0000000..a955b5c --- /dev/null +++ b/js-publish-extensions/src/main/java/io/synadia/jnats/extension/RetryCondition.java @@ -0,0 +1,27 @@ +// Copyright (c) 2024-2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.jnats.extension; + +public enum RetryCondition { + + /** + * Any 503 No Responder + */ + NoResponders, + + /** + * Any IOException like a timeout / disconnect + */ + IoEx, + + /** + * Any JetStreamApiException + */ + JetStreamApiEx, + + /** + * Any RuntimeException + */ + RuntimeEx +} diff --git a/js-publish-extensions/src/test/java/io/synadia/jnats/extension/PublishRetrierTests.java b/js-publish-extensions/src/test/java/io/synadia/jnats/extension/PublishRetrierTests.java index 1d9395c..b852ef5 100644 --- a/js-publish-extensions/src/test/java/io/synadia/jnats/extension/PublishRetrierTests.java +++ b/js-publish-extensions/src/test/java/io/synadia/jnats/extension/PublishRetrierTests.java @@ -6,6 +6,7 @@ import io.nats.client.api.StreamConfiguration; import io.nats.client.impl.Headers; import io.nats.client.impl.NatsMessage; +import io.synadia.retrier.RetryConfig; import nats.io.ConsoleOutput; import nats.io.NatsServerRunner; import org.junit.jupiter.api.Test; @@ -15,8 +16,9 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; -import static io.synadia.retrier.RetryConfig.DEFAULT_CONFIG; -import static org.junit.jupiter.api.Assertions.assertNotNull; +import static io.synadia.jnats.extension.PublishRetryConfig.DEFAULT_CONFIG; +import static io.synadia.jnats.extension.RetryCondition.*; +import static org.junit.jupiter.api.Assertions.*; public class PublishRetrierTests { static { @@ -32,6 +34,40 @@ interface AsyncRetryFunction { CompletableFuture execute(String subject) throws Exception; } + @Test + public void testConfig() { + PublishRetryConfig retryConfig = DEFAULT_CONFIG; + assertEquals(RetryConfig.DEFAULT_CONFIG.getAttempts(), retryConfig.retryConfig.getAttempts()); + assertEquals(RetryConfig.DEFAULT_CONFIG.getDeadline(), retryConfig.retryConfig.getDeadline()); + assertArrayEquals(RetryConfig.DEFAULT_CONFIG.getBackoffPolicy(), retryConfig.retryConfig.getBackoffPolicy()); + assertFalse(retryConfig.retryAll); + assertTrue(retryConfig.retryOnNoResponders); + assertTrue(retryConfig.retryOnIoEx); + assertFalse(retryConfig.retryOnJetStreamApiEx); + assertFalse(retryConfig.retryOnRuntimeEx); + + retryConfig = PublishRetryConfig.builder() + .retryConditions(NoResponders, IoEx, JetStreamApiEx, RuntimeEx) + .build(); + assertEquals(RetryConfig.DEFAULT_CONFIG.getAttempts(), retryConfig.retryConfig.getAttempts()); + assertEquals(RetryConfig.DEFAULT_CONFIG.getDeadline(), retryConfig.retryConfig.getDeadline()); + assertArrayEquals(RetryConfig.DEFAULT_CONFIG.getBackoffPolicy(), retryConfig.retryConfig.getBackoffPolicy()); + assertTrue(retryConfig.retryAll); + assertTrue(retryConfig.retryOnNoResponders); + assertTrue(retryConfig.retryOnIoEx); + assertTrue(retryConfig.retryOnJetStreamApiEx); + assertTrue(retryConfig.retryOnRuntimeEx); + + for (RetryCondition condition : RetryCondition.values()) { + retryConfig = PublishRetryConfig.builder().retryConditions(condition).build(); + assertFalse(retryConfig.retryAll); + assertEquals(condition == NoResponders, retryConfig.retryOnNoResponders); + assertEquals(condition == IoEx, retryConfig.retryOnIoEx); + assertEquals(condition == JetStreamApiEx, retryConfig.retryOnJetStreamApiEx); + assertEquals(condition == RuntimeEx, retryConfig.retryOnRuntimeEx); + } + } + @Test public void testRetryJsApis() throws Exception { try (NatsServerRunner runner = new NatsServerRunner(false, true)) { diff --git a/request-many/build.gradle b/request-many/build.gradle index bef9027..0fd3624 100644 --- a/request-many/build.gradle +++ b/request-many/build.gradle @@ -38,7 +38,7 @@ repositories { } dependencies { - implementation 'io.nats:jnats:2.20.5' + implementation 'io.nats:jnats:2.21.1' testImplementation 'io.nats:jnats-server-runner:1.2.8' testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0' diff --git a/retrier/build.gradle b/retrier/build.gradle index 952da40..cc081ba 100644 --- a/retrier/build.gradle +++ b/retrier/build.gradle @@ -38,7 +38,7 @@ repositories { } dependencies { - implementation 'io.nats:jnats:2.20.5' + implementation 'io.nats:jnats:2.21.1' testImplementation 'io.nats:jnats-server-runner:1.2.8' testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0'