Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions js-publish-extensions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

Extensions specific to JetStream publishing.

**Current Release**: 0.3.0
  **Current Snapshot**: 0.4.0-SNAPSHOT
**Current Release**: 0.4.0
  **Current Snapshot**: 0.4.1-SNAPSHOT
  **Gradle and Maven** `io.synadia:jnats-js-publish-extensions`
[Dependencies Help](https://github.com/synadia-io/orbit.java?tab=readme-ov-file#dependencies)

Expand All @@ -17,32 +17,38 @@ Extensions specific to JetStream publishing.
### PublishRetrier

This class parallels the standard JetStream publish api with methods that will retry the publish.

The examples:
* [Publish Retrier Sync Example](src/examples/java/io/synadia/examples/PublishRetrierSyncExample.java)
* [Publish Retrier Async Example](src/examples/java/io/synadia/examples/PublishRetrierAsyncExample.java)
* The [Publish Retrier Sync Example](src/examples/java/io/synadia/examples/PublishRetrierSyncExample.java)
demonstrates publishing synchronously with the retrier.

* The [Publish Retrier Async Example](src/examples/java/io/synadia/examples/PublishRetrierAsyncExample.java)
demonstrates publishing asynchronously with the retrier.

### AsyncJsPublisher

This class is a full async message publish manager.
This utility provides a workflow of
This class is a full async message publish manager that provides:
1. Publishing a message async
* The number of inflight messages (published but not received acks) can be set.
2. Queueing and tracking of the in-flight PublishAck future
2. Queueing and tracking of the inflight PublishAck future
3. The ability to observe the queue and respond to events
* The message was published
* The message received a valid ack
* The publish completed with an exception
* The publish timed out.
* Publishing was paused or resumed due to threshold settings

It can be combined with the retrier.
You must consider that when publishing async in this manner
it's possible for messages to be published out of order.
In that case you can use publish expectations.
If order of messages is a requirement, you

* [Async Js Publisher Example](src/examples/java/io/synadia/examples/AsyncJsPublisherExample.java)
* [Async Js Publisher More Customized Example](src/examples/java/io/synadia/examples/AsyncJsPublisherCustomizedExample.java)
* The [Async Js Publisher Example](src/examples/java/io/synadia/examples/AsyncJsPublisherExample.java)
demonstrates basic use of the class.

* The [Async Js Publisher Custom Threads Example](src/examples/java/io/synadia/examples/AsyncJsPublisherCustomThreadsExample.java)
has the identical workflow, but demonstrates the ability to provide the executors and threads manually instead of relying
on the built-in ones.

---
Copyright (c) 2024-2025 Synadia Communications Inc. All Rights Reserved.
Expand Down
2 changes: 1 addition & 1 deletion js-publish-extensions/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ plugins {
id 'signing'
}

def jarVersion = "0.4.0"
def jarVersion = "0.4.1"
group = 'io.synadia'

def isMerge = System.getenv("BUILD_EVENT") == "push"
Expand Down
45 changes: 28 additions & 17 deletions js-publish-extensions/docs/DraftDesign.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,39 @@ This document is the draft design describing a managed async publish utility.
* JetStream context on which to publish

#### Optional Properties
| Property | Description |
|---------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| String idPrefix | used to make unique identifiers around each message. Defaults to a NUID |
| int maxInFlight | no more than this number of messages can be waiting for publish ack. Defaults to 50. |
| int refillAllowedAt | if the queue size reaches maxInFlight, a hold is placed so no more messages can be published until the in flight queue contains this amount or less messages, at which time the hold is removed. Defaults to 0 which would be full sawtooth. Non zero provides for a window. |
| RetryConfig retryConfig | if the user wants to publish with retries, they must supply a config, otherwise the publish will be attempted only once. |
| long pollTime | the amount of time in ms to poll any given queue. Ensures polling doesn't block indefinitely. Defaults to 100ms |
| long holdPauseTime | the amount of time in ms to pause between checks when hold is on. Defaults to 100ms |
| long waitTimeout | the timeout when waiting for a publish to be acknowledged. Defaults to 5000ms |
| PublisherListener publisherListener | a callback for the user to see what's going on in the workflow, see description of Flight later |
| Property | Description |
|--------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| String idPrefix | used to make unique identifiers around each message. Defaults to a NUID |
| int maxInFlight | no more than this number of messages can be waiting for publish ack. Defaults to 50. |
| int refillAllowedAt | if the queue size reaches maxInFlight, a hold is placed so no more messages can be published until the in flight queue contains this amount or less messages, at which time the hold is removed. Defaults to 0 which would be full sawtooth. Non zero provides for a window. |
| RetryConfig retryConfig | if the user wants to publish with retries, they must supply a config, otherwise the publish will be attempted only once. |
| long pollTime | the amount of time in ms to poll any given queue. Ensures polling doesn't block indefinitely. Defaults to 100ms |
| long publishPauseTime | the amount of time in ms to pause between checks when hold is on. Defaults to 100ms |
| long waitTimeout | the timeout when waiting for a publish to be acknowledged. Defaults to 5000ms |
| PublisherListener publisherListener | a callback for the user to see what's going on in the workflow, see description of Flight later |


## PublisherListener Interface

The callback interface for the user to get information about the publish workflow

| Method | Description |
|---------------------------------------------|-----------------------------------------------------------------------|
| void published(Flight flight) | the flight is ready when the message is published |
| void acked(Flight flight); | the publish ack was received |
| void completedExceptionally(Flight flight); | the publish exceptioned, such as a 503 or lower level request timeout |
| void timeout(Flight flight) | the ack was not returned in time based on waitTimeout |
| Method | Description |
|-----------------------------------------------------------------------|-----------------------------------------------------------------------|
| void published(Flight flight) | the flight is ready when the message is published |
| void acked(Flight flight) | the publish ack was received |
| void completedExceptionally(Flight flight) | the publish exceptioned, such as a 503 or lower level request timeout |
| void timeout(Flight flight) | the ack was not returned in time based on waitTimeout |
| void paused(int currentInFlight, int maxInFlight, int resumeAmount) | Publishing was paused due to in-flight conditions |
| void resumed(int currentInFlight, int maxInFlight, int resumeAmount) | Publishing was resumed due to in-flight conditions |

/**
* The engine has just resumed publishing and will continue unless
* the number of messages in flight reaches the max
* @param currentInFlight the number of messages in flight
* @param maxInFlight the number of in flight messages when publishing will be paused
* @param resumeAmount the number of in flight messages when publishing will resume after being paused
*/
void resumed(int currentInFlight, int maxInFlight, int resumeAmount);

## Flight structure

Expand Down Expand Up @@ -68,7 +79,7 @@ while keepGoing flag
if in flight queue has reached maxInFlight put hold on
notify listener to indicate published
else in holding pattern
sleep holdPauseTime
sleep publishPauseTime
```

## Flights Runner Pseudo Code:
Expand Down
Loading