Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion direct-batch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ repositories {
}

dependencies {
implementation 'io.nats:jnats:2.21.0-SNAPSHOT'
implementation 'io.nats:jnats:2.21.1'
implementation 'io.synadia:direct-batch:0.1.0'

testImplementation 'io.nats:jnats-server-runner:1.2.8'
testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0'
Expand Down
25 changes: 20 additions & 5 deletions js-publish-extensions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

Extensions specific to JetStream publishing.

**Current Release**: 0.1.0
  **Current Snapshot**: 0.1.1-SNAPSHOT
**Current Release**: 0.3.0
  **Current Snapshot**: 0.4.0-SNAPSHOT
  **Gradle and Maven** `io.synadia:jnats-js-publish-extensions`
[Dependencies Help](https://github.com/synadia-io/orbit.java?tab=readme-ov-file#dependencies)

Expand All @@ -18,16 +18,31 @@ Extensions specific to JetStream publishing.

This class parallels the standard JetStream publish api with methods that will retry the publish.

For how to use, please see the examples:
The examples:
* [Publish Retrier Sync Example](src/examples/java/io/synadia/examples/PublishRetrierSyncExample.java)
* [Publish Retrier Async Example](src/examples/java/io/synadia/examples/PublishRetrierAsyncExample.java)

### AsyncJsPublisher

This class is a full async message publish manager
This class is a full async message publish manager.
This utility provides a workflow of
1. Publishing a message async
* The number of inflight messages (published but not received acks) can be set.
2. Queueing and tracking of the in-flight PublishAck future
3. The ability to observe the queue and respond to events
* The message was published
* The message received a valid ack
* The publish completed with an exception
* The publish timed out.

It can be combined with the retrier.
You must consider that when publishing async in this manner
it's possible for messages to be published out of order.
In that case you can use publish expectations.
If order of messages is a requirement, you

For how to use, please see the examples:
* [Async Js Publisher Example](src/examples/java/io/synadia/examples/AsyncJsPublisherExample.java)
* [Async Js Publisher More Customized Example](src/examples/java/io/synadia/examples/AsyncJsPublisherCustomizedExample.java)

---
Copyright (c) 2024-2025 Synadia Communications Inc. All Rights Reserved.
Expand Down
4 changes: 2 additions & 2 deletions js-publish-extensions/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ plugins {
id 'signing'
}

def jarVersion = "0.3.1"
def jarVersion = "0.4.0"
group = 'io.synadia'

def isMerge = System.getenv("BUILD_EVENT") == "push"
Expand Down Expand Up @@ -41,7 +41,7 @@ repositories {
}

dependencies {
implementation 'io.nats:jnats:2.20.5'
implementation 'io.nats:jnats:2.21.1'
implementation 'io.synadia:retrier:0.2.0'

testImplementation 'io.nats:jnats-server-runner:1.2.8'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static void main(String[] args) {
publisher.publishAsync(SUBJECT, ("data-" + x).getBytes());
}

while (publisher.preFlightSize() > 0 || publisher.inFlightSize() > 0) {
while (publisher.preFlightSize() > 0 || publisher.currentInFlight() > 0) {
ExampleUtils.printStateThenWait(publisher, publishListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static void main(String[] args) {
AsyncJsPublisher.Builder builder =
AsyncJsPublisher.builder(nc.jetStream())
.maxInFlight(250)
.refillAllowedAt(100)
.resumeAmount(100)
.pollTime(50)
.holdPauseTime(150)
.waitTimeout(3000)
Expand All @@ -55,7 +55,7 @@ public static void main(String[] args) {
publisher.publishAsync(SUBJECT, ("data-" + x).getBytes());
}

while (publisher.preFlightSize() > 0 || publisher.inFlightSize() > 0) {
while (publisher.preFlightSize() > 0 || publisher.currentInFlight() > 0) {
ExampleUtils.printStateThenWait(publisher, publishListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.nats.client.Options;
import io.nats.client.impl.ErrorListenerConsoleImpl;
import io.synadia.jnats.extension.AsyncJsPublisher;
import io.synadia.retrier.RetryConfig;
import io.synadia.jnats.extension.PublishRetryConfig;

public class AsyncJsPublisherExample {

Expand Down Expand Up @@ -42,7 +42,7 @@ public static void main(String[] args) {
// If you want to use retrying for publishing, you must give a Retry Config
// --------------------------------------------------------------------------------
if (USE_RETRIER) {
builder.retryConfig(RetryConfig.DEFAULT_CONFIG);
builder.retryConfig(PublishRetryConfig.DEFAULT_CONFIG);
}

// The publisher is AutoCloseable
Expand All @@ -52,7 +52,7 @@ public static void main(String[] args) {
publisher.publishAsync(SUBJECT, ("data-" + x).getBytes());
}

while (publisher.preFlightSize() > 0 || publisher.inFlightSize() > 0) {
while (publisher.preFlightSize() > 0 || publisher.currentInFlight() > 0) {
ExampleUtils.printStateThenWait(publisher, publishListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public class ExamplePublishListener implements AsyncJsPublishListener {
public AtomicLong exceptioned = new AtomicLong();
public AtomicLong timedOut = new AtomicLong();
public AtomicLong start = new AtomicLong();
public AtomicLong paused = new AtomicLong();
public AtomicLong resumed = new AtomicLong();

@Override
public void published(InFlight flight) {
Expand Down Expand Up @@ -45,6 +47,24 @@ public void completedExceptionally(PostFlight postFlight) {
@Override
public void timeout(PostFlight postFlight) {
timedOut.incrementAndGet();
ExampleUtils.print("Timed-out", new String(postFlight.getBody()) );
ExampleUtils.print("Timed-out", new String(postFlight.getBody()));
}

@Override
public void paused(int currentInFlight, int maxInFlight, int resumeAmount) {
paused.incrementAndGet();
ExampleUtils.print("Publishing paused." +
" Current In Flight: " + currentInFlight +
" Max In Flight: " + maxInFlight +
" Resume Amount: " + resumeAmount);
}

@Override
public void resumed(int currentInFlight, int maxInFlight, int resumeAmount) {
resumed.incrementAndGet();
ExampleUtils.print("Publishing resumed." +
" Current In Flight: " + currentInFlight +
" Max In Flight: " + maxInFlight +
" Resume Amount: " + resumeAmount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static void printState(AsyncJsPublisher publisher, ExamplePublishListener
print(
"elapsed=" + publishListener.elapsed(),
"pre-flight=" + publisher.preFlightSize(),
"in-flight=" + publisher.inFlightSize(),
"in-flight=" + publisher.currentInFlight(),
"published=" + publishListener.published,
"acked=" + publishListener.acked,
"exceptioned=" + publishListener.exceptioned,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
package io.synadia.examples;

import io.nats.client.Connection;
import io.nats.client.JetStreamApiException;
import io.nats.client.Nats;
import io.nats.client.api.PublishAck;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.synadia.jnats.extension.PublishRetrier;
import io.synadia.retrier.RetryConfig;
import io.synadia.jnats.extension.PublishRetryConfig;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

Expand All @@ -23,44 +21,33 @@ public class PublishRetrierAsyncExample {

public static void main(String[] args) {
try (Connection nc = Nats.connect()) {
try {
nc.jetStreamManagement().deleteStream(STREAM);
// create the stream, delete any existing one first for example purposes.
try { nc.jetStreamManagement().deleteStream(STREAM); } catch (Exception ignore) {}
System.out.println("Creating Stream @ " + System.currentTimeMillis());
nc.jetStreamManagement().addStream(StreamConfiguration.builder()
.name(STREAM)
.subjects(SUBJECT)
.storageType(StorageType.File) // so it's persistent for a server restart test
.build());

// default attempts is 2, we change this so you have time to kill the server to test.
// the default backoff is {250, 250, 500, 500, 3000, 5000}
// the default deadline is 1 hour
// the default Retry Conditions are timeouts and no responders (both IOExceptions)
PublishRetryConfig config = PublishRetryConfig.builder().attempts(10).build();

int num = 0;
while (true) {
long now = System.currentTimeMillis();
System.out.println("Publishing @ " + (++num));
CompletableFuture<PublishAck> cfpa = PublishRetrier.publishAsync(config, nc.jetStream(), SUBJECT, ("data" + num).getBytes());
PublishAck pa = cfpa.get(30, TimeUnit.SECONDS);
long elapsed = System.currentTimeMillis() - now;
System.out.println("Publish Ack after " + elapsed + " --> " + pa.getJv().toJson());
}
catch (Exception ignore) {}

// since the default backoff is {250, 250, 500, 500, 3000, 5000}
new Thread(() -> {
try {
Thread.sleep(1100);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
try {
System.out.println("Creating Stream @ " + System.currentTimeMillis());
nc.jetStreamManagement().addStream(StreamConfiguration.builder()
.name(STREAM)
.subjects(SUBJECT)
.storageType(StorageType.Memory)
.build());
}
catch (IOException | JetStreamApiException e) {
throw new RuntimeException(e);
}
}).start();

RetryConfig config = RetryConfig.builder().attempts(10).build();
long now = System.currentTimeMillis();

System.out.println("Publishing @ " + now);
CompletableFuture<PublishAck> cfpa = PublishRetrier.publishAsync(config, nc.jetStream(), SUBJECT, null);
PublishAck pa = cfpa.get(30, TimeUnit.SECONDS);
long done = System.currentTimeMillis();

System.out.println("Publish Ack: " + pa.getJv().toJson());
System.out.println("Done @ " + done + ", Elapsed: " + (done - now));
}
catch (Exception e) {
System.out.println("EXAMPLE EXCEPTION: " + e);
e.printStackTrace();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,61 +4,82 @@
package io.synadia.examples;

import io.nats.client.Connection;
import io.nats.client.JetStreamApiException;
import io.nats.client.ErrorListener;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.api.PublishAck;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.synadia.jnats.extension.PublishRetrier;
import io.synadia.retrier.RetryConfig;
import io.synadia.jnats.extension.PublishRetryConfig;
import io.synadia.jnats.extension.RetryCondition;

import java.io.IOException;
import java.time.Duration;

/**
* Publish sync retried example
*/
public class PublishRetrierSyncExample {

public static String STREAM = "pr-sync-stream";
public static String SUBJECT = "pr-sync-subject";

public static void main(String[] args) {
try (Connection nc = Nats.connect()) {
try {
nc.jetStreamManagement().deleteStream(STREAM);
}
catch (Exception ignore) {}
Options options = Options.builder()
.connectionListener((x,y) -> {})
.errorListener(new ErrorListener() {})
.build();
try (Connection nc = Nats.connect(options)) {
// create the stream, delete any existing one first for example purposes.
try { nc.jetStreamManagement().deleteStream(STREAM); } catch (Exception ignore) {}
System.out.println("Creating Stream @ " + System.currentTimeMillis());
nc.jetStreamManagement().addStream(StreamConfiguration.builder()
.name(STREAM)
.subjects(SUBJECT)
.storageType(StorageType.Memory)
.build());

// since the default backoff is {250, 250, 500, 500, 3000, 5000}
new Thread(() -> {
try {
Thread.sleep(1100);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
// --------------------------------------------------------------------------------
// PublishRetryConfig...
// default attempts is 2
// default backoff is {250, 250, 500, 500, 3000, 5000}
// default deadline is unlimited
// default Retry Conditions are timeouts and no responders (both IOExceptions)
// and shown here for example
// --------------------------------------------------------------------------------
// This config will retry 3 times with a wait of 500 millis between retries
// but since the deadline is short, the deadline will short circuit that.
// This should be tuned to match your needs.
// --------------------------------------------------------------------------------
PublishRetryConfig config = PublishRetryConfig.builder()
.attempts(3)
.backoffPolicy(new long[]{500})
.deadline(Duration.ofSeconds(2))
.retryConditions(RetryCondition.NoResponders, RetryCondition.IoEx)
.build();

int num = 0;
boolean keepGoing;
do {
long now = System.currentTimeMillis();
System.out.print("Publishing @ " + (++num) + "...");
PublishAck pa = PublishRetrier.publish(config, nc.jetStream(), SUBJECT, null);
long elapsed = System.currentTimeMillis() - now;
keepGoing = false;
if (pa == null) {
System.out.println("No Publish Ack after " + elapsed);
}
try {
System.out.println("Creating Stream @ " + System.currentTimeMillis());
nc.jetStreamManagement().addStream(StreamConfiguration.builder()
.name(STREAM)
.subjects(SUBJECT)
.storageType(StorageType.Memory)
.build());
else if (pa.hasError()) {
System.out.println("Publish Ack after " + elapsed + " but got error: " + pa.getError());
}
catch (IOException | JetStreamApiException e) {
throw new RuntimeException(e);
else {
keepGoing = true;
System.out.println("Publish Ack after " + elapsed + " --> " + pa.getJv().toJson());
}
}).start();

RetryConfig config = RetryConfig.builder().attempts(10).build();
long now = System.currentTimeMillis();

System.out.println("Publishing @ " + now);
PublishAck pa = PublishRetrier.publish(config, nc.jetStream(), SUBJECT, null);
long done = System.currentTimeMillis();

System.out.println("Publish Ack: " + pa.getJv().toJson());
System.out.println("Done @ " + done + ", Elapsed: " + (done - now));
} while (keepGoing);
}
catch (Exception e) {
e.printStackTrace();
System.out.println("Probably can't connect... " + e);
}
}
}
Loading