diff --git a/js-publish-extensions/README.md b/js-publish-extensions/README.md index 231dfcf..bcb5840 100644 --- a/js-publish-extensions/README.md +++ b/js-publish-extensions/README.md @@ -4,8 +4,8 @@ Extensions specific to JetStream publishing. -**Current Release**: 0.3.0 -  **Current Snapshot**: 0.4.0-SNAPSHOT +**Current Release**: 0.4.0 +  **Current Snapshot**: 0.4.1-SNAPSHOT   **Gradle and Maven** `io.synadia:jnats-js-publish-extensions` [Dependencies Help](https://github.com/synadia-io/orbit.java?tab=readme-ov-file#dependencies) @@ -17,23 +17,25 @@ Extensions specific to JetStream publishing. ### PublishRetrier This class parallels the standard JetStream publish api with methods that will retry the publish. - The examples: -* [Publish Retrier Sync Example](src/examples/java/io/synadia/examples/PublishRetrierSyncExample.java) -* [Publish Retrier Async Example](src/examples/java/io/synadia/examples/PublishRetrierAsyncExample.java) +* The [Publish Retrier Sync Example](src/examples/java/io/synadia/examples/PublishRetrierSyncExample.java) +demonstrates publishing synchronously with the retrier. + +* The [Publish Retrier Async Example](src/examples/java/io/synadia/examples/PublishRetrierAsyncExample.java) +demonstrates publishing asynchronously with the retrier. ### AsyncJsPublisher -This class is a full async message publish manager. -This utility provides a workflow of +This class is a full async message publish manager that provides: 1. Publishing a message async * The number of inflight messages (published but not received acks) can be set. -2. Queueing and tracking of the in-flight PublishAck future +2. Queueing and tracking of the inflight PublishAck future 3. The ability to observe the queue and respond to events * The message was published * The message received a valid ack * The publish completed with an exception * The publish timed out. + * Publishing was paused or resumed due to threshold settings It can be combined with the retrier. You must consider that when publishing async in this manner @@ -41,8 +43,12 @@ it's possible for messages to be published out of order. In that case you can use publish expectations. If order of messages is a requirement, you -* [Async Js Publisher Example](src/examples/java/io/synadia/examples/AsyncJsPublisherExample.java) -* [Async Js Publisher More Customized Example](src/examples/java/io/synadia/examples/AsyncJsPublisherCustomizedExample.java) +* The [Async Js Publisher Example](src/examples/java/io/synadia/examples/AsyncJsPublisherExample.java) +demonstrates basic use of the class. + +* The [Async Js Publisher Custom Threads Example](src/examples/java/io/synadia/examples/AsyncJsPublisherCustomThreadsExample.java) +has the identical workflow, but demonstrates the ability to provide the executors and threads manually instead of relying +on the built-in ones. --- Copyright (c) 2024-2025 Synadia Communications Inc. All Rights Reserved. diff --git a/js-publish-extensions/build.gradle b/js-publish-extensions/build.gradle index f875e15..8853648 100644 --- a/js-publish-extensions/build.gradle +++ b/js-publish-extensions/build.gradle @@ -13,7 +13,7 @@ plugins { id 'signing' } -def jarVersion = "0.4.0" +def jarVersion = "0.4.1" group = 'io.synadia' def isMerge = System.getenv("BUILD_EVENT") == "push" diff --git a/js-publish-extensions/docs/DraftDesign.md b/js-publish-extensions/docs/DraftDesign.md index de5f7db..92e6876 100644 --- a/js-publish-extensions/docs/DraftDesign.md +++ b/js-publish-extensions/docs/DraftDesign.md @@ -7,28 +7,39 @@ This document is the draft design describing a managed async publish utility. * JetStream context on which to publish #### Optional Properties -| Property | Description | -|---------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| String idPrefix | used to make unique identifiers around each message. Defaults to a NUID | -| int maxInFlight | no more than this number of messages can be waiting for publish ack. Defaults to 50. | -| int refillAllowedAt | if the queue size reaches maxInFlight, a hold is placed so no more messages can be published until the in flight queue contains this amount or less messages, at which time the hold is removed. Defaults to 0 which would be full sawtooth. Non zero provides for a window. | -| RetryConfig retryConfig | if the user wants to publish with retries, they must supply a config, otherwise the publish will be attempted only once. | -| long pollTime | the amount of time in ms to poll any given queue. Ensures polling doesn't block indefinitely. Defaults to 100ms | -| long holdPauseTime | the amount of time in ms to pause between checks when hold is on. Defaults to 100ms | -| long waitTimeout | the timeout when waiting for a publish to be acknowledged. Defaults to 5000ms | -| PublisherListener publisherListener | a callback for the user to see what's going on in the workflow, see description of Flight later | +| Property | Description | +|--------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| String idPrefix | used to make unique identifiers around each message. Defaults to a NUID | +| int maxInFlight | no more than this number of messages can be waiting for publish ack. Defaults to 50. | +| int refillAllowedAt | if the queue size reaches maxInFlight, a hold is placed so no more messages can be published until the in flight queue contains this amount or less messages, at which time the hold is removed. Defaults to 0 which would be full sawtooth. Non zero provides for a window. | +| RetryConfig retryConfig | if the user wants to publish with retries, they must supply a config, otherwise the publish will be attempted only once. | +| long pollTime | the amount of time in ms to poll any given queue. Ensures polling doesn't block indefinitely. Defaults to 100ms | +| long publishPauseTime | the amount of time in ms to pause between checks when hold is on. Defaults to 100ms | +| long waitTimeout | the timeout when waiting for a publish to be acknowledged. Defaults to 5000ms | +| PublisherListener publisherListener | a callback for the user to see what's going on in the workflow, see description of Flight later | ## PublisherListener Interface The callback interface for the user to get information about the publish workflow -| Method | Description | -|---------------------------------------------|-----------------------------------------------------------------------| -| void published(Flight flight) | the flight is ready when the message is published | -| void acked(Flight flight); | the publish ack was received | -| void completedExceptionally(Flight flight); | the publish exceptioned, such as a 503 or lower level request timeout | -| void timeout(Flight flight) | the ack was not returned in time based on waitTimeout | +| Method | Description | +|-----------------------------------------------------------------------|-----------------------------------------------------------------------| +| void published(Flight flight) | the flight is ready when the message is published | +| void acked(Flight flight) | the publish ack was received | +| void completedExceptionally(Flight flight) | the publish exceptioned, such as a 503 or lower level request timeout | +| void timeout(Flight flight) | the ack was not returned in time based on waitTimeout | +| void paused(int currentInFlight, int maxInFlight, int resumeAmount) | Publishing was paused due to in-flight conditions | +| void resumed(int currentInFlight, int maxInFlight, int resumeAmount) | Publishing was resumed due to in-flight conditions | + + /** + * The engine has just resumed publishing and will continue unless + * the number of messages in flight reaches the max + * @param currentInFlight the number of messages in flight + * @param maxInFlight the number of in flight messages when publishing will be paused + * @param resumeAmount the number of in flight messages when publishing will resume after being paused + */ + void resumed(int currentInFlight, int maxInFlight, int resumeAmount); ## Flight structure @@ -68,7 +79,7 @@ while keepGoing flag if in flight queue has reached maxInFlight put hold on notify listener to indicate published else in holding pattern - sleep holdPauseTime + sleep publishPauseTime ``` ## Flights Runner Pseudo Code: diff --git a/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherCustomThreadsExample.java b/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherCustomThreadsExample.java index a7192e3..b878494 100644 --- a/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherCustomThreadsExample.java +++ b/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherCustomThreadsExample.java @@ -4,88 +4,184 @@ package io.synadia.examples; import io.nats.client.Connection; +import io.nats.client.JetStream; import io.nats.client.Nats; import io.nats.client.Options; import io.nats.client.impl.ErrorListenerConsoleImpl; import io.synadia.jnats.extension.AsyncJsPublisher; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; public class AsyncJsPublisherCustomThreadsExample { - public static final int COUNT = 100_000; - public static final String STREAM = "customStream"; - public static final String SUBJECT = "customSubject"; + // -------------------------------------------------------------------------------- + // Example general configuration + // -------------------------------------------------------------------------------- + public static final String STREAM = "exampleStream"; + public static final String SUBJECT = "exampleSubject"; + public static final int PUBLISH_COUNT = 1_000_000; + + // -------------------------------------------------------------------------------- + // AsyncJsPublisher configuration + // These are the defaults if you don't manually set them in the builder. + // You can play with the values and see how it affects the run + // -------------------------------------------------------------------------------- + // public static final int DEFAULT_MAX_IN_FLIGHT = 50; + // public static final int DEFAULT_REFILL_AMOUNT = 0; + // public static final long DEFAULT_POLL_TIME = 100; + // public static final long DEFAULT_PAUSE_TIME = 100; + // public static final long DEFAULT_WAIT_TIMEOUT = DEFAULT_MAX_IN_FLIGHT * DEFAULT_POLL_TIME; + // -------------------------------------------------------------------------------- + public static final int MAX_IN_FLIGHT = 10000; + public static final int RESUME_AMOUNT = 1000; + public static final long POLL_TIME = 50; + public static final long PUBLISH_PAUSE_TIME = 100; + public static final long WAIT_TIMEOUT = 2500; public static void main(String[] args) { Options options = Options.builder() .server(Options.DEFAULT_URL) - .connectionListener((connection, events) -> ExampleUtils.print("Connection Event:" + events.getEvent())) + .connectionListener((connection, events) -> ExampleUtils.print("Connection Event", events.getEvent())) .errorListener(new ErrorListenerConsoleImpl()) .build(); try (Connection nc = Nats.connect(options)) { ExampleUtils.setupStream(nc, STREAM, SUBJECT); + JetStream js = nc.jetStream(); + // -------------------------------------------------------------------------------- - // The listener is important for the developer to have a window in to the publishing. - // see the ExamplePublishListener implementation + // Build the AsyncJsPublisher... // -------------------------------------------------------------------------------- - ExamplePublishListener publishListener = new ExamplePublishListener(); - AsyncJsPublisher.Builder builder = - AsyncJsPublisher.builder(nc.jetStream()) - .publishListener(publishListener); + // -------------------------------------------------------------------------------- + // The listener is important for the developer to have a window into the publishing + // It will be called as an executor task + // -------------------------------------------------------------------------------- + ExamplePublishListener publishListener = new ExamplePublishListener(); // -------------------------------------------------------------------------------- - // Custom Start + // Custom Notification Executor // -------------------------------------------------------------------------------- // Since we can envision developers wanting more control, the non-built-in code path // demonstrates the developer supplying its own threads for running // the event loops and for providing the ExecutorService for notification. - // The custom code here is essentially the same as the built-in, but shows how the - // developer can do it themselves. // -------------------------------------------------------------------------------- - // custom notification executor ExecutorService notificationExecutorService = Executors.newFixedThreadPool(1); - builder.notificationExecutorService(notificationExecutorService); + AsyncJsPublisher.Builder builder = + AsyncJsPublisher.builder(js) + .maxInFlight(MAX_IN_FLIGHT) + .resumeAmount(RESUME_AMOUNT) + .pollTime(POLL_TIME) + .publishPauseTime(PUBLISH_PAUSE_TIME) + .waitTimeout(WAIT_TIMEOUT) + .publishListener(publishListener) + .notificationExecutorService(notificationExecutorService); AsyncJsPublisher publisher = builder.build(); - // this custom start mimics what the built-in does but + // -------------------------------------------------------------------------------- + // Notice we called .build() instead of AsyncJsPublisher publisher = builder.start(); + // This is so we can provide our own / custom start. + // This mimics what the built-in does and // shows how to access the publish / flights runner Runnable(s) + // -------------------------------------------------------------------------------- Thread publishRunnerThread = new Thread(publisher::publishRunner); publishRunnerThread.start(); Thread flightsRunnerThread = new Thread(publisher::flightsRunner); flightsRunnerThread.start(); // -------------------------------------------------------------------------------- - // example logic + // Example logic // -------------------------------------------------------------------------------- - for (int x = 1; x <= COUNT; x++) { + + // -------------------------------------------------------------------------------- + // Add the entire count of messages to the publisher, it will put them in a queue + // Usually publishing will happen more organically, this is just for the example. + // -------------------------------------------------------------------------------- + for (int x = 1; x <= PUBLISH_COUNT; x++) { publisher.publishAsync(SUBJECT, ("data-" + x).getBytes()); } - while (publisher.preFlightSize() > 0 || publisher.currentInFlight() > 0) { - ExampleUtils.printStateThenWait(publisher, publishListener); + // -------------------------------------------------------------------------------- + // Once the listener detects that all the messages have been actually published, + // move to the next phase of the example. + // -------------------------------------------------------------------------------- + while (publishListener.publishedCount.get() < PUBLISH_COUNT) { + ExampleUtils.printStatus(publisher, publishListener, false); + //noinspection BusyWait + Thread.sleep(500); } - ExampleUtils.printState(publisher, publishListener); + // -------------------------------------------------------------------------------- + // Call stop with true for drain this tells the publisher that we are done with it + // but to drain (finish publishing) and finish handling in-flight messages. + // -------------------------------------------------------------------------------- + // There is also a no-parameter stop() that does drain. + // -------------------------------------------------------------------------------- + // If you call stop without drain == true, the publisher + // will finish any publish or in-flight check the current loop is in the middle of + // but then stop after that with unfinished work. + // -------------------------------------------------------------------------------- + publisher.stop(true); + + // -------------------------------------------------------------------------------- + // When we call stop, there might be up to one more poll (POLL_TIME) of the + // pre-flight (unpublished) queue, but after that, the publish thread/loop will + // finish and the PublishRunnerDoneFuture will complete. + // -------------------------------------------------------------------------------- + CompletableFuture future = publisher.getPublishRunnerDoneFuture(); + future.get(POLL_TIME + 10, TimeUnit.MILLISECONDS); + + // -------------------------------------------------------------------------------- + // When you try to publish to a stopped publisher, you get an exception + // -------------------------------------------------------------------------------- + try { + System.out.print("Attempting to publish after stop..."); + publisher.publishAsync(SUBJECT, "should fail".getBytes()); + System.out.println("SHOULD HAVE EXCEPTIONED!"); + } + catch (IllegalStateException e) { + System.out.println(" \"" + e + "\""); + } + + // -------------------------------------------------------------------------------- + // This future lets us know when all the in-flight messages are acked. + // -------------------------------------------------------------------------------- + System.out.println("Waiting for the flight runner to complete processing publish acks..."); + publisher.getFlightsRunnerDoneFuture().get(11, TimeUnit.MINUTES);// should be done much sooner + ExampleUtils.printStatus(publisher, publishListener, true); // -------------------------------------------------------------------------------- - // if you have a custom start, you probably want some custom closing - // again, the example mimics what the built-in does - // don't forget to call the publisher close, because it does some stuff + // close() actually shuts down the threads and the notification executor + // Since we have a custom executor and a custom start, you will also have custom + // closing again, the example mimics what the built-in does. + // You should call the publisher close() first, because it does some stuff + // and then you can clean up your own executors/threads/runners // -------------------------------------------------------------------------------- publisher.close(); notificationExecutorService.shutdown(); - if (!publisher.getPublishRunnerDoneLatch().await(publisher.getPollTime(), TimeUnit.MILLISECONDS)) { - publishRunnerThread.interrupt(); + + try { + // give the publish runner a little time to finish + publisher.getPublishRunnerDoneFuture().get(publisher.getPollTime(), TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + // it didn't finish, it may still be alive, interrupt it + if (publishRunnerThread.isAlive()) { + publishRunnerThread.interrupt(); + } + } + + try { + // give the flights runner a little time to finish + publisher.getFlightsRunnerDoneFuture().get(publisher.getPollTime(), TimeUnit.MILLISECONDS); } - if (!publisher.getFlightsRunnerDoneLatch().await(publisher.getPollTime(), TimeUnit.MILLISECONDS)) { - flightsRunnerThread.interrupt(); + catch (TimeoutException e) { + // it didn't finish, it may still be alive, interrupt it + if (flightsRunnerThread.isAlive()) { + flightsRunnerThread.interrupt(); + } } } catch (Exception e) { diff --git a/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherCustomizedExample.java b/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherCustomizedExample.java deleted file mode 100644 index 62e1d4c..0000000 --- a/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherCustomizedExample.java +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright (c) 2024-2025 Synadia Communications Inc. All Rights Reserved. -// See LICENSE and NOTICE file for details. - -package io.synadia.examples; - -import io.nats.client.Connection; -import io.nats.client.Nats; -import io.nats.client.Options; -import io.nats.client.impl.ErrorListenerConsoleImpl; -import io.synadia.jnats.extension.AsyncJsPublisher; - -public class AsyncJsPublisherCustomizedExample { - - public static final int COUNT = 100_0000; - public static final String STREAM = "customStream"; - public static final String SUBJECT = "customSubject"; - - public static void main(String[] args) { - Options options = Options.builder() - .server(Options.DEFAULT_URL) - .connectionListener((connection, events) -> ExampleUtils.print("Connection Event:" + events.getEvent())) - .errorListener(new ErrorListenerConsoleImpl()) - .build(); - - try (Connection nc = Nats.connect(options)) { - ExampleUtils.setupStream(nc, STREAM, SUBJECT); - - // -------------------------------------------------------------------------------- - // The listener is important for the developer to have a window in to the publishing. - // see the ExamplePublishListener implementation - // -------------------------------------------------------------------------------- - ExamplePublishListener publishListener = new ExamplePublishListener(); - - // -------------------------------------------------------------------------------- - // These are the defaults from AsyncJsPublisher... - // -------------------------------------------------------------------------------- - // public static final int DEFAULT_MAX_IN_FLIGHT = 50; - // public static final int DEFAULT_REFILL_AMOUNT = 0; - // public static final long DEFAULT_POLL_TIME = 100; - // public static final long DEFAULT_PAUSE_TIME = 100; - // public static final long DEFAULT_WAIT_TIMEOUT = DEFAULT_MAX_IN_FLIGHT * DEFAULT_POLL_TIME; - // -------------------------------------------------------------------------------- - AsyncJsPublisher.Builder builder = - AsyncJsPublisher.builder(nc.jetStream()) - .maxInFlight(250) - .resumeAmount(100) - .pollTime(50) - .holdPauseTime(150) - .waitTimeout(3000) - .publishListener(publishListener); - - // The publisher is AutoCloseable - try (AsyncJsPublisher publisher = builder.start()) { - for (int x = 1; x <= COUNT; x++) { - publisher.publishAsync(SUBJECT, ("data-" + x).getBytes()); - } - - while (publisher.preFlightSize() > 0 || publisher.currentInFlight() > 0) { - ExampleUtils.printStateThenWait(publisher, publishListener); - } - - ExampleUtils.printState(publisher, publishListener); - } - } - catch (Exception e) { - //noinspection CallToPrintStackTrace - e.printStackTrace(); - } - } -} diff --git a/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherExample.java b/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherExample.java index 5999104..0c1c7c8 100644 --- a/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherExample.java +++ b/js-publish-extensions/src/examples/java/io/synadia/examples/AsyncJsPublisherExample.java @@ -4,60 +4,135 @@ package io.synadia.examples; import io.nats.client.Connection; +import io.nats.client.JetStream; import io.nats.client.Nats; import io.nats.client.Options; import io.nats.client.impl.ErrorListenerConsoleImpl; import io.synadia.jnats.extension.AsyncJsPublisher; -import io.synadia.jnats.extension.PublishRetryConfig; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; public class AsyncJsPublisherExample { - public static final int COUNT = 100_000; + // -------------------------------------------------------------------------------- + // Example general configuration + // -------------------------------------------------------------------------------- public static final String STREAM = "exampleStream"; public static final String SUBJECT = "exampleSubject"; + public static final int PUBLISH_COUNT = 1_000_000; - public static final boolean USE_RETRIER = false; // set this to true to have each publish use retry logic + // -------------------------------------------------------------------------------- + // AsyncJsPublisher configuration + // These are the defaults if you don't manually set them in the builder. + // You can play with the values and see how it affects the run + // -------------------------------------------------------------------------------- + // public static final int DEFAULT_MAX_IN_FLIGHT = 50; + // public static final int DEFAULT_REFILL_AMOUNT = 0; + // public static final long DEFAULT_POLL_TIME = 100; + // public static final long DEFAULT_PAUSE_TIME = 100; + // public static final long DEFAULT_WAIT_TIMEOUT = DEFAULT_MAX_IN_FLIGHT * DEFAULT_POLL_TIME; + // -------------------------------------------------------------------------------- + public static final int MAX_IN_FLIGHT = 10000; + public static final int RESUME_AMOUNT = 1000; + public static final long POLL_TIME = 50; + public static final long PUBLISH_PAUSE_TIME = 100; + public static final long WAIT_TIMEOUT = 2500; public static void main(String[] args) { Options options = Options.builder() .server(Options.DEFAULT_URL) - .connectionListener((connection, events) -> ExampleUtils.print("Connection Event:" + events.getEvent())) + .connectionListener((connection, events) -> ExampleUtils.print("Connection Event", events.getEvent())) .errorListener(new ErrorListenerConsoleImpl()) .build(); try (Connection nc = Nats.connect(options)) { ExampleUtils.setupStream(nc, STREAM, SUBJECT); + JetStream js = nc.jetStream(); // -------------------------------------------------------------------------------- - // The listener is important for the developer to have a window in to the publishing. - // see the ExamplePublishListener implementation + // Build the AsyncJsPublisher... + // -------------------------------------------------------------------------------- + // The listener is important for the developer to have a window into the publishing + // It will be called as an executor task // -------------------------------------------------------------------------------- ExamplePublishListener publishListener = new ExamplePublishListener(); - AsyncJsPublisher.Builder builder = - AsyncJsPublisher.builder(nc.jetStream()) + AsyncJsPublisher.builder(js) + .maxInFlight(MAX_IN_FLIGHT) + .resumeAmount(RESUME_AMOUNT) + .pollTime(POLL_TIME) + .publishPauseTime(PUBLISH_PAUSE_TIME) + .waitTimeout(WAIT_TIMEOUT) .publishListener(publishListener); + AsyncJsPublisher publisher = builder.start(); + + // -------------------------------------------------------------------------------- + // Example logic + // -------------------------------------------------------------------------------- // -------------------------------------------------------------------------------- - // If you want to use retrying for publishing, you must give a Retry Config + // Add the entire count of messages to the publisher, it will put them in a queue + // Usually publishing will happen more organically, this is just for the example. // -------------------------------------------------------------------------------- - if (USE_RETRIER) { - builder.retryConfig(PublishRetryConfig.DEFAULT_CONFIG); + for (int x = 1; x <= PUBLISH_COUNT; x++) { + publisher.publishAsync(SUBJECT, ("data-" + x).getBytes()); } - // The publisher is AutoCloseable - try (AsyncJsPublisher publisher = builder.start()) { + // -------------------------------------------------------------------------------- + // Once the listener detects that all the messages have been actually published, + // move to the next phase of the example. + // -------------------------------------------------------------------------------- + while (publishListener.publishedCount.get() < PUBLISH_COUNT) { + ExampleUtils.printStatus(publisher, publishListener, false); + //noinspection BusyWait + Thread.sleep(500); + } - for (int x = 1; x <= COUNT; x++) { - publisher.publishAsync(SUBJECT, ("data-" + x).getBytes()); - } + // -------------------------------------------------------------------------------- + // Call stop with true for drain this tells the publisher that we are done with it + // but to drain (finish publishing) and finish handling in-flight messages. + // -------------------------------------------------------------------------------- + // There is also a no-parameter stop() that does drain. + // -------------------------------------------------------------------------------- + // If you call stop without drain == true, the publisher + // will finish any publish or in-flight check the current loop is in the middle of + // but then stop after that with unfinished work. + // -------------------------------------------------------------------------------- + publisher.stop(true); - while (publisher.preFlightSize() > 0 || publisher.currentInFlight() > 0) { - ExampleUtils.printStateThenWait(publisher, publishListener); - } + // -------------------------------------------------------------------------------- + // When we call stop, there might be up to one more poll (POLL_TIME) of the + // pre-flight (unpublished) queue, but after that, the publish thread/loop will + // finish and the PublishRunnerDoneFuture will complete. + // -------------------------------------------------------------------------------- + CompletableFuture future = publisher.getPublishRunnerDoneFuture(); + future.get(POLL_TIME + 10, TimeUnit.MILLISECONDS); - ExampleUtils.printState(publisher, publishListener); + // -------------------------------------------------------------------------------- + // When you try to publish to a stopped publisher, you get an exception + // -------------------------------------------------------------------------------- + try { + System.out.print("Attempting to publish after stop..."); + publisher.publishAsync(SUBJECT, "should fail".getBytes()); + System.out.println("SHOULD HAVE EXCEPTIONED!"); + } + catch (IllegalStateException e) { + System.out.println(" \"" + e + "\""); } + + // -------------------------------------------------------------------------------- + // This future lets us know when all the in-flight messages are acked. + // -------------------------------------------------------------------------------- + System.out.println("Waiting for the flight runner to complete processing publish acks..."); + publisher.getFlightsRunnerDoneFuture().get(11, TimeUnit.MINUTES);// should be done much sooner + ExampleUtils.printStatus(publisher, publishListener, true); + + // -------------------------------------------------------------------------------- + // This publisher is AutoCloseable, but we didn't start it in a try-resources, + // so just close it manually. + // -------------------------------------------------------------------------------- + publisher.close(); } catch (Exception e) { //noinspection CallToPrintStackTrace diff --git a/js-publish-extensions/src/examples/java/io/synadia/examples/ExamplePublishListener.java b/js-publish-extensions/src/examples/java/io/synadia/examples/ExamplePublishListener.java index c227fb2..4a9395c 100644 --- a/js-publish-extensions/src/examples/java/io/synadia/examples/ExamplePublishListener.java +++ b/js-publish-extensions/src/examples/java/io/synadia/examples/ExamplePublishListener.java @@ -7,64 +7,68 @@ import io.synadia.jnats.extension.InFlight; import io.synadia.jnats.extension.PostFlight; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; public class ExamplePublishListener implements AsyncJsPublishListener { - public AtomicLong published = new AtomicLong(); - public AtomicLong acked = new AtomicLong(); - public AtomicLong exceptioned = new AtomicLong(); - public AtomicLong timedOut = new AtomicLong(); - public AtomicLong start = new AtomicLong(); - public AtomicLong paused = new AtomicLong(); - public AtomicLong resumed = new AtomicLong(); + public AtomicLong startTime = new AtomicLong(); + public AtomicLong publishedCount = new AtomicLong(); + public AtomicLong ackedCount = new AtomicLong(); + public AtomicLong exceptionedCount = new AtomicLong(); + public AtomicLong timedOutCount = new AtomicLong(); + public AtomicLong pausedCount = new AtomicLong(); + public AtomicLong resumedCount = new AtomicLong(); + public AtomicBoolean paused = new AtomicBoolean(false); @Override public void published(InFlight flight) { - start.compareAndSet(0, System.currentTimeMillis()); - published.incrementAndGet(); + publishedCount.incrementAndGet(); + startTime.compareAndSet(0, System.currentTimeMillis()); // only sets the first publish. } public long elapsed() { - return System.currentTimeMillis() - start.get(); + return System.currentTimeMillis() - startTime.get(); } @Override public void acked(PostFlight postFlight) { - acked.incrementAndGet(); + ackedCount.incrementAndGet(); } @Override public void completedExceptionally(PostFlight postFlight) { - exceptioned.incrementAndGet(); + exceptionedCount.incrementAndGet(); if (postFlight.expectationFailed) { ExampleUtils.print("Expectation Failed", new String(postFlight.getBody()), postFlight.cause); } else { ExampleUtils.print("Completed Exceptionally", new String(postFlight.getBody()), postFlight.cause); } + + // TODO THIS IS DEBUG TRYING TO FIGURE OUT HOW TO HANDLE 429 + //noinspection DataFlowIssue + if (postFlight.cause.getMessage().contains("429")) { + ExampleUtils.print("429", postFlight.getSubject(), postFlight.getMessageId(), new String(postFlight.getBody())); + ExampleUtils.print("429", publishedCount.get(), ackedCount.get(), exceptionedCount.get(), timedOutCount.get()); + System.exit(-1); + } } @Override public void timeout(PostFlight postFlight) { - timedOut.incrementAndGet(); + timedOutCount.incrementAndGet(); ExampleUtils.print("Timed-out", new String(postFlight.getBody())); } @Override public void paused(int currentInFlight, int maxInFlight, int resumeAmount) { - paused.incrementAndGet(); - ExampleUtils.print("Publishing paused." + - " Current In Flight: " + currentInFlight + - " Max In Flight: " + maxInFlight + - " Resume Amount: " + resumeAmount); + pausedCount.incrementAndGet(); + paused.set(true); } @Override public void resumed(int currentInFlight, int maxInFlight, int resumeAmount) { - resumed.incrementAndGet(); - ExampleUtils.print("Publishing resumed." + - " Current In Flight: " + currentInFlight + - " Max In Flight: " + maxInFlight + - " Resume Amount: " + resumeAmount); + resumedCount.incrementAndGet(); + paused.set(false); } } diff --git a/js-publish-extensions/src/examples/java/io/synadia/examples/ExampleUtils.java b/js-publish-extensions/src/examples/java/io/synadia/examples/ExampleUtils.java index abb9679..5dd63c2 100644 --- a/js-publish-extensions/src/examples/java/io/synadia/examples/ExampleUtils.java +++ b/js-publish-extensions/src/examples/java/io/synadia/examples/ExampleUtils.java @@ -32,27 +32,28 @@ public static void setupStream(Connection nc, String stream, String subject) { } } - public static void print(Object... objects) { + public static void print(String label, Object... objects) { StringJoiner joiner = new StringJoiner(" | "); for (Object o: objects) { joiner.add(o.toString()); } - System.out.println(joiner); + System.out.println(label + ": " + joiner); } - public static void printStateThenWait(AsyncJsPublisher publisher, ExamplePublishListener publishListener) throws InterruptedException { - printState(publisher, publishListener); - Thread.sleep(100); + public static void printStatus(AsyncJsPublisher publisher, ExamplePublishListener listener, boolean stopped) { + print("Status", + pad(stopped ? "Stopped" : (listener.paused.get() ? "Paused" : "Active"), 7), + "pre-flight: " + pad(publisher.preFlightSize(), 8), + "in-flight: " + pad(publisher.currentInFlight(), 8), + "published/acked: " + pad(listener.publishedCount + "/" + listener.ackedCount, 17), + "paused/resumed: " + pad(listener.pausedCount + "/" + listener.resumedCount, 7), + "exceptioned/timed-out: " + pad(listener.exceptionedCount + "/" + listener.timedOutCount, 7), + "elapsed: " + listener.elapsed() + "ms" + ); } - public static void printState(AsyncJsPublisher publisher, ExamplePublishListener publishListener) { - print( - "elapsed=" + publishListener.elapsed(), - "pre-flight=" + publisher.preFlightSize(), - "in-flight=" + publisher.currentInFlight(), - "published=" + publishListener.published, - "acked=" + publishListener.acked, - "exceptioned=" + publishListener.exceptioned, - "timed out=" + publishListener.timedOut); + private static final String PADDING = " "; + private static String pad(Object s, int len) { + return (s + PADDING).substring(0, len); } } diff --git a/js-publish-extensions/src/examples/java/io/synadia/examples/PublishRetrierAsyncExample.java b/js-publish-extensions/src/examples/java/io/synadia/examples/PublishRetrierAsyncExample.java index 9999fd7..eeda93f 100644 --- a/js-publish-extensions/src/examples/java/io/synadia/examples/PublishRetrierAsyncExample.java +++ b/js-publish-extensions/src/examples/java/io/synadia/examples/PublishRetrierAsyncExample.java @@ -37,6 +37,7 @@ public static void main(String[] args) { PublishRetryConfig config = PublishRetryConfig.builder().attempts(10).build(); int num = 0; + //noinspection InfiniteLoopStatement while (true) { long now = System.currentTimeMillis(); System.out.println("Publishing @ " + (++num)); diff --git a/js-publish-extensions/src/examples/java/io/synadia/examples/TransactionalPublishExample.java b/js-publish-extensions/src/examples/java/io/synadia/examples/TransactionalPublishExample.java index bf593a1..023f92a 100644 --- a/js-publish-extensions/src/examples/java/io/synadia/examples/TransactionalPublishExample.java +++ b/js-publish-extensions/src/examples/java/io/synadia/examples/TransactionalPublishExample.java @@ -16,7 +16,7 @@ public class TransactionalPublishExample { public static void main(String[] args) { Options options = Options.builder() .server(Options.DEFAULT_URL) - .connectionListener((connection, events) -> ExampleUtils.print("Connection Event:" + events.getEvent())) + .connectionListener((connection, events) -> ExampleUtils.print("Connection Event", events.getEvent())) .errorListener(new ErrorListenerConsoleImpl()) .build(); diff --git a/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublishListener.java b/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublishListener.java index 3ad502f..63c95f5 100644 --- a/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublishListener.java +++ b/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublishListener.java @@ -27,8 +27,8 @@ public interface AsyncJsPublishListener { void completedExceptionally(PostFlight flight); /** - * The message has internally timed out waiting for the ack. Usually a sign of - * lost connection. + * The message has internally timed out waiting for the ack. + * Likely a sign of lost connection. * @param flight the flight representing the message */ void timeout(PostFlight flight); diff --git a/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublisher.java b/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublisher.java index 9bf1c8e..ea06328 100644 --- a/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublisher.java +++ b/js-publish-extensions/src/main/java/io/synadia/jnats/extension/AsyncJsPublisher.java @@ -22,10 +22,10 @@ public class AsyncJsPublisher implements AutoCloseable { public static final int DEFAULT_MAX_IN_FLIGHT = 50; public static final int DEFAULT_RESUME_AMOUNT = 0; public static final long DEFAULT_POLL_TIME = 100; - public static final long DEFAULT_HOLD_PAUSE_TIME = 100; + public static final long DEFAULT_PUBLISH_PAUSE_TIME = 100; public static final long DEFAULT_WAIT_TIMEOUT = DEFAULT_MAX_IN_FLIGHT * DEFAULT_POLL_TIME; - private static final PreFlight STOP_MARKER = new PreFlight("STOP", null, null, null, null); + private static final PreFlight DRAIN_MARKER = new PreFlight("DRAIN", null, null, null, null); private final AtomicLong messageIdGenerator; private final JetStream js; @@ -36,11 +36,11 @@ public class AsyncJsPublisher implements AutoCloseable { private final PublishRetryConfig retryConfig; private final AsyncJsPublishListener publishListener; private final long pollTime; - private final long holdPauseTime; + private final long publishPauseTime; private final long waitTimeout; private final LinkedBlockingQueue preFlight; private final LinkedBlockingQueue inFlights; - private final AtomicBoolean notPaused; + private final AtomicBoolean publishingNotPaused; private final AtomicBoolean draining; private final AtomicBoolean keepGoingPublishRunner; private final AtomicBoolean keepGoingFlightsRunner; @@ -48,8 +48,8 @@ public class AsyncJsPublisher implements AutoCloseable { private final boolean executorWasntUserSupplied; private final AtomicReference publishRunnerThread; private final AtomicReference flightsRunnerThread; - private final CountDownLatch publishRunnerDoneLatch; - private final CountDownLatch flightsRunnerDoneLatch; + private final CompletableFuture publishRunnerDoneFuture; + private final CompletableFuture flightsRunnerDoneFuture; private AsyncJsPublisher(Builder b) { js = b.js; @@ -68,7 +68,7 @@ private AsyncJsPublisher(Builder b) { retryConfig = b.retryConfig; publishListener = b.publishListener; pollTime = b.pollTime; - holdPauseTime = b.holdPauseTime; + publishPauseTime = b.publishPauseTime; waitTimeout = b.waitTimeout; if (b.notificationExecutorService == null) { @@ -82,15 +82,15 @@ private AsyncJsPublisher(Builder b) { preFlight = new LinkedBlockingQueue<>(); inFlights = new LinkedBlockingQueue<>(); - notPaused = new AtomicBoolean(true); + publishingNotPaused = new AtomicBoolean(true); draining = new AtomicBoolean(false); keepGoingPublishRunner = new AtomicBoolean(true); keepGoingFlightsRunner = new AtomicBoolean(true); publishRunnerThread = new AtomicReference<>(); flightsRunnerThread = new AtomicReference<>(); - publishRunnerDoneLatch = new CountDownLatch(1); - flightsRunnerDoneLatch = new CountDownLatch(1); + publishRunnerDoneFuture = new CompletableFuture<>(); + flightsRunnerDoneFuture = new CompletableFuture<>(); } /** @@ -107,22 +107,23 @@ public void start() { } /** - * stop the publisher + * stop the publisher with drain */ public void stop() { - keepGoingPublishRunner.set(false); - keepGoingFlightsRunner.set(false); + stop(true); } /** - * Drain the publish. - *

The manager stops accepting new publishes

- *

The manager tries to publish all already asked to be published

- * You can still call stop, which will just finish it's current work. + * stop the publisher, optionally drain + * @param drain whether to drain or not */ - public void drain() { - preFlight.offer(STOP_MARKER); - draining.set(true); + public void stop(boolean drain) { + if (drain) { + preFlight.offer(DRAIN_MARKER); + draining.set(true); + } + keepGoingPublishRunner.set(false); + keepGoingFlightsRunner.set(false); } /** @@ -130,23 +131,38 @@ public void drain() { */ @Override public void close() throws Exception { - stop(); + keepGoingPublishRunner.set(false); + keepGoingFlightsRunner.set(false); if (executorWasntUserSupplied) { notificationExecutorService.shutdown(); } - if (!publishRunnerDoneLatch.await(pollTime, TimeUnit.MILLISECONDS)) { - Thread t = publishRunnerThread.get(); - if (t != null && t.isAlive()) { - t.interrupt(); + Thread t = publishRunnerThread.get(); + if (t != null) { // thread is null if the user provided their own + try { + // give the publish runner a little time to finish + publishRunnerDoneFuture.get(pollTime + 10, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + // it didn't finish, it may still be alive, interrupt it + if (t.isAlive()) { + t.interrupt(); + } } } - if (!flightsRunnerDoneLatch.await(pollTime, TimeUnit.MILLISECONDS)) { - Thread t = flightsRunnerThread.get(); - if (t != null && t.isAlive()) { - t.interrupt(); + t = flightsRunnerThread.get(); + if (t != null) { // thread is null if the user provided their own + try { + // give the flights runner a little time to finish + flightsRunnerDoneFuture.get(pollTime + 10, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + // it didn't finish, it may still be alive, interrupt it + if (t.isAlive()) { + t.interrupt(); + } } } } @@ -168,19 +184,19 @@ public int preFlightSize() { } /** - * A latch that finishes when then publish runner event loop is complete + * A future that completes when then publish runner event loop is complete * @return the latch */ - public CountDownLatch getPublishRunnerDoneLatch() { - return publishRunnerDoneLatch; + public CompletableFuture getPublishRunnerDoneFuture() { + return publishRunnerDoneFuture; } /** - * A latch that finishes when then flights runner event loop is complete + * A future that completes when then flights runner event loop is complete * @return the latch */ - public CountDownLatch getFlightsRunnerDoneLatch() { - return flightsRunnerDoneLatch; + public CompletableFuture getFlightsRunnerDoneFuture() { + return flightsRunnerDoneFuture; } /** @@ -208,11 +224,12 @@ public long getPollTime() { } /** - * The configured hold pause time + * The configured amount of time to pause between checks + * to continue if publishing is in the paused state * @return the time in millis */ - public long getHoldPauseTime() { - return holdPauseTime; + public long getPublishPauseTime() { + return publishPauseTime; } /** @@ -233,11 +250,11 @@ public boolean getProcessAcksInOrder() { */ public void publishRunner() { try { - while (keepGoingPublishRunner.get()) { - if (notPaused.get()) { + while (keepGoingPublishRunner.get() || draining.get()) { + if (publishingNotPaused.get()) { PreFlight pre = preFlight.poll(pollTime, TimeUnit.MILLISECONDS); if (pre != null) { - if (pre == STOP_MARKER) { + if (pre == DRAIN_MARKER) { return; } @@ -261,14 +278,14 @@ public void publishRunner() { // this is reset by the flights runner when the condition is met int currentInFlight = inFlights.size(); if (currentInFlight >= maxInFlight) { - notPaused.set(false); + publishingNotPaused.set(false); notifyPaused(currentInFlight); } } } else { //noinspection BusyWait - Thread.sleep(holdPauseTime); + Thread.sleep(publishPauseTime); } } } @@ -277,7 +294,7 @@ public void publishRunner() { } finally { keepGoingPublishRunner.set(false); - publishRunnerDoneLatch.countDown(); + publishRunnerDoneFuture.complete(null); } } @@ -286,7 +303,7 @@ public void publishRunner() { */ public void flightsRunner() { try { - while (keepGoingFlightsRunner.get()) { + while (keepGoingFlightsRunner.get() || draining.get()) { InFlight head = inFlights.poll(pollTime, TimeUnit.MILLISECONDS); if (head == null) { // no inFlight? draining? no more queued? no more in inFlight? we are done! @@ -314,12 +331,15 @@ public void flightsRunner() { keepGoingFlightsRunner.set(false); } - // once the inFlight empty out/cross the refill threshold, - // we are allowed to publish again - int currentInFlight = inFlights.size(); - if (currentInFlight <= resumeAmount) { - notPaused.set(true); - notifyResumed(currentInFlight); + // if paused (not publishing), check if we can resume + if (!publishingNotPaused.get()) { + // once the inFlight size is LE the resume amount, + // we are allowed to resume (publish again) + int currentInFlight = inFlights.size(); + if (currentInFlight <= resumeAmount) { + publishingNotPaused.set(true); + notifyResumed(currentInFlight); + } } } } @@ -329,7 +349,7 @@ public void flightsRunner() { } finally { keepGoingFlightsRunner.set(false); - flightsRunnerDoneLatch.countDown(); + flightsRunnerDoneFuture.complete(null); } } @@ -416,7 +436,7 @@ public static class Builder { PublishRetryConfig retryConfig; AsyncJsPublishListener publishListener; long pollTime = DEFAULT_POLL_TIME; - long holdPauseTime = DEFAULT_HOLD_PAUSE_TIME; + long publishPauseTime = DEFAULT_PUBLISH_PAUSE_TIME; long waitTimeout = DEFAULT_WAIT_TIMEOUT; boolean processAcksInOrder = true; ExecutorService notificationExecutorService; @@ -506,12 +526,12 @@ public Builder pollTime(long pollTime) { * The amount of time to pause if the publish loop is in the holding pattern * The holding pattern happens once the in flight queue is filled to the max in flight * and the completed acks have not cleared the refill at amount. - * Defaults to {@value #DEFAULT_HOLD_PAUSE_TIME} - * @param holdPauseTime the time in milliseconds + * Defaults to {@value #DEFAULT_PUBLISH_PAUSE_TIME} + * @param publishPauseTime the time in milliseconds * @return the builder */ - public Builder holdPauseTime(long holdPauseTime) { - this.holdPauseTime = holdPauseTime; + public Builder publishPauseTime(long publishPauseTime) { + this.publishPauseTime = publishPauseTime; return this; } @@ -577,12 +597,14 @@ public AsyncJsPublisher start() { * @return The future */ public PreFlight publishAsync(String subject, Headers headers, byte[] body, PublishOptions po) { - if (draining.get() || (!keepGoingPublishRunner.get() && !keepGoingFlightsRunner.get())) { - throw new IllegalStateException("Cannot publish once drained or stopped."); + if (!keepGoingPublishRunner.get()) { + throw new IllegalStateException("Cannot publish once stopped."); } - String messageId = po != null && po.getMessageId() != null - ? po.getMessageId() : messageIdSupplier.get(); + String messageId = po == null || po.getMessageId() == null + ? messageIdSupplier.get() + : po.getMessageId() ; + PreFlight p = new PreFlight(messageId, subject, headers, body, po); preFlight.offer(p); return p;