Skip to content

Commit

Permalink
Release 0.5.0 (#71)
Browse files Browse the repository at this point in the history
Release 0.5.0
  • Loading branch information
Colin Sullivan committed Oct 19, 2017
1 parent 45c23b2 commit 64fe399
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 78 deletions.
152 changes: 83 additions & 69 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,39 @@ We use RNG to generate unique inbox names. A peculiarity of the JDK on Linux (se

```java

ConnectionFactory cf = new ConnectionFactory(clusterID, clientID);
Connection sc = cf.createConnection();

// Simple Synchronous Publisher
sc.publish("foo", "Hello World".getBytes()); // does not return until an ack has been received from NATS Streaming server

// use latch to await delivery of message before shutting down
CountDownLatch latch = new CountDownLatch(1);

// Simple Async Subscriber
// Create a connection factory
StreamingConnectionFactory cf = new StreamingConnectionFactory("test-cluster", "bar");

// A StreamingConnection is a logical connection to the NATS streaming
// server. This API creates an underlying core NATS connection for
// convenience and simplicity. In most cases one would create a secure
// core NATS connection and pass it in via
// StreamingConnectionFactory.setNatsConnection(Connection nc)
StreamingConnection sc = cf.createConnection();

// This simple synchronous publish API blocks until an acknowledgement
// is returned from the server. If no exception is thrown, the message
// has been stored in NATS streaming.
sc.publish("foo", "Hello World".getBytes());

// Use a countdown latch to wait for our subscriber to receive the
// message we published above.
final CountDownLatch doneSignal = new CountDownLatch(1);

// Simple Async Subscriber that retrieves all available messages.
Subscription sub = sc.subscribe("foo", new MessageHandler() {
public void onMessage(Message m) {
latch.countDown();
System.out.printf("Received a message: %s\n", new String(m.getData()));
}
public void onMessage(Message m) {
System.out.printf("Received a message: %s\n", new String(m.getData()));
doneSignal.countDown();
}
}, new SubscriptionOptions.Builder().deliverAllAvailable().build());

// pause until message delivered
latch.await();
doneSignal.await();

// Unsubscribe
// Unsubscribe to clean up
sub.unsubscribe();

// Close connection
// Close the logical connection to NATS streaming
sc.close();
```

Expand Down Expand Up @@ -146,29 +155,30 @@ Durable subscriptions allow clients to assign a durable name to a subscription w
Doing this causes the NATS Streaming server to track the last acknowledged message for that clientID + durable name, so that only messages since the last acknowledged message will be delivered to the client.

```java
Connection sc = new ConnectionFactory("test-cluster", "client-123").createConnection();
StreamingConnection sc = new StreamingConnectionFactory("test-cluster", "client-123").createConnection();

// Subscribe with durable name
// Subscribe with a durable name
sc.subscribe("foo", new MessageHandler() {
public void onMessage(Message m) {
System.out.printf("Received a message: %s\n", m.getData());
}
}, new SubscriptionOptions.Builder().setDurableName("my-durable"));
...
// client receives message sequence 1-40
...
// client disconnects for an hour
...
// client reconnects with same clientID "client-123"
sc = new ConnectionFactory("test-cluster", "client-123").createConnection();
}, new SubscriptionOptions.Builder().durableName("my-durable").build());

// The client receives message sequence 1-40, then disconnects.
sc.close();

// Meanwhile more messages are published to subject "foo"

// Here the client reconnects with same clientID "client-123"
sc = new StreamingConnectionFactory("test-cluster", "client-123").createConnection();

// client re-subscribes to "foo" with same durable name "my-durable"
sc.subscribe("foo", new MessageHandler() {
public void onMessage(Message m) {
System.out.printf("Received a message: %s\n", m.getData());
}
}, new SubscriptionOptions.Builder().setDurableName("my-durable"));
...
}, new SubscriptionOptions.Builder().durableName("my-durable").build());

// client receives messages 41-current
```

Expand All @@ -185,19 +195,19 @@ The basic publish API (`Publish(subject, payload)`) is synchronous; it does not
Advanced users may wish to process these publish acknowledgements manually to achieve higher publish throughput by not waiting on individual acknowledgements during the publish operation. An asynchronous publish API is provided for this purpose:

```java
// will be invoked when a publish acknowledgement is received
AckHandler ackHandler = new AckHandler() { {
public void onAck(String ackedNuid, Exception err) {
if (err != null) {
System.err.printf("Error publishing msg id %s: %s\n", ackedNuid, err.getMessage());
} else {
System.out.printf("Received ack for msg id %s\n", ackedNuid);
}
// The ack handler will be invoked when a publish acknowledgement is received
AckHandler ackHandler = new AckHandler() {
public void onAck(String guid, Exception err) {
if (err != null) {
System.err.printf("Error publishing msg id %s: %s\n", guid, err.getMessage());
} else {
System.out.printf("Received ack for msg id %s\n", guid);
}
}

// can also use publish(subj, replysubj, payload, ah)
String nuid = sc.publish("foo", "Hello World".getBytes(), ackHandler) // returns immediately
};

// This returns immediately. The result of the publish can be handled in the ack handler.
String guid = sc.publish("foo", "Hello World".getBytes(), ackHandler);
```

### Message Acknowledgements and Redelivery
Expand All @@ -206,15 +216,21 @@ NATS Streaming offers At-Least-Once delivery semantics, meaning that once a mess
This timeout interval is specified by the subscription option `AckWait`, which defaults to 30 seconds.

By default, messages are automatically acknowledged by the NATS Streaming client library after the subscriber's message handler is invoked. However, there may be cases in which the subscribing client wishes to accelerate or defer acknowledgement of the message.
To do this, the client must set manual acknowledgement mode on the subscription, and invoke `Ack()` on the `Msg`. ex:
To do this, the client must set manual acknowledgement mode on the subscription, and individually acknowledge messages.
For example:

```java
// Subscribe with manual ack mode, and set AckWait to 60 seconds
sc.subscribe("foo", new MessageHandler() {
public void onMessage(Message m) {
m.ack(); // ack message before performing I/O intensive operation
...
System.out.printf("Received a message: %s\n", m.getData());

// You must manually ack when manualAcks() are set.
try {
m.ack();
} catch (IOException e) {
e.printStackTrace();
}
}
}, new SubscriptionOptions.Builder().setManualAcks(true), setAckWait(Duration.ofSeconds(60)));
```
Expand All @@ -230,22 +246,23 @@ This mismatch is commonly called a "fast producer/slow consumer" problem, and ma
NATS Streaming provides a connection option called `MaxPubAcksInFlight` that effectively limits the number of unacknowledged messages that a publisher may have in-flight at any given time. When this maximum is reached, further `PublishAsync()` calls will block until the number of unacknowledged messages falls below the specified limit. ex:

```java
ConnectionFactory cf = new ConnectionFactory(clusterID, clientID);
StreamingConnectionFactory cf = new StreamingConnectionFactory("test-cluster", "client-123");
cf.setMaxPubAcksInFlight(25);
Connection sc = cf.createConnection();

AckHandler ah = new MessageHandler() {
public void onAck(String nuid, Exception e) {
// process the ack
...
StreamingConnection sc = cf.createConnection();

AckHandler ah = new AckHandler() {
public void onAck(String guid, Exception e) {
// process the ack
}
}
};

for (int i = 1; i < 1000; i++) {
// If the server is unable to keep up with the publisher, the number of oustanding acks will eventually
// If the server is unable to keep up with the publisher, the number of oustanding acks will eventually
// reach the max and this call will block
String guid = sc.publish("foo", "Hello World".getBytes(), ah);
}

String guid = sc.publish("foo", "Hello World".getBytes(), ah);
// track the guid in application code to resend, log, etc. if an error is identified in the ack handler.
```

### Subscriber rate limiting
Expand All @@ -255,26 +272,23 @@ This option specifies the maximum number of outstanding acknowledgements (messag
When this limit is reached, NATS Streaming will suspend delivery of messages to this subscription until the number of unacknowledged messages falls below the specified limit. ex:

```java

// Subscribe with manual ack mode and a max in-flight limit of 25
int i = 0;
sc.subscribe("foo", new MessageHandler() {
public void onMessage(Message m) {
System.out.printf("Received message #%d: %s\n", ++i, m.getData())
...
// Does not ack, or takes a very long time to ack
...
// Message delivery will suspend when the number of unacknowledged messages reaches 25
System.out.printf("Received message : %s\n", m.getData());

// You must manually ack when manualAcks() are set. If acks fail or take too long,
// message delivery will suspend one the number of unacknowledged messages reaches 25
// due to the max in flight value.
try {
m.ack();
} catch (IOException e) {
e.printStackTrace();
}
}
}, new SubscriptionOptions.Builder().setManualAcks(true).setMaxInFlight(25).build());
}, new SubscriptionOptions.Builder().manualAcks().maxInFlight(25).build());
```
## Logging
This library logs error, warning, and debug information using the [Simple Logging Facade for Java (SLF4J)](www.slf4j.org) API.
This gives you, the downstream user, flexibility to choose which (if any) logging implementation you prefer.
### Q: Hey, what the heck is this `Failed to load class org.slf4j.impl.StaticLoggerBinder` exception?".
A: You're getting that message because slf4j can't find an actual logger implementation in your classpath.
Carefully reading [the link embedded in those exception messages](http://www.slf4j.org/codes.html#StaticLoggerBinder) is highly recommended!

## License

Expand Down
10 changes: 1 addition & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<groupId>io.nats</groupId>
<artifactId>java-nats-streaming</artifactId>
<version>0.5.1-SNAPSHOT</version>
<version>0.5.0</version>
<packaging>bundle</packaging>

<name>java-nats-streaming</name>
Expand Down Expand Up @@ -452,14 +452,6 @@
<profiles>
<profile>
<id>ossrh</id>
<properties>
<gpg.executable>gpg2</gpg.executable>
<gpg.keyname>${env.GPG_KEYNAME}</gpg.keyname>
<gpg.passphrase>${env.GPG_PASSPHRASE}</gpg.passphrase>
<gpg.defaultKeyring>false</gpg.defaultKeyring>
<gpg.publicKeyring>${env.GPG_DIR}/pubring.gpg</gpg.publicKeyring>
<gpg.secretKeyring>${env.GPG_DIR}/secring.gpg</gpg.secretKeyring>
</properties>
<activation>
<property>
<name>performRelease</name>
Expand Down

0 comments on commit 64fe399

Please sign in to comment.