Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2b67b79
[ADDED] Added support for drain.
Aug 15, 2018
c2760f4
Bumped version to 2.1 in changelog and build.gradle.
Aug 15, 2018
0ad7832
Fixed flaky test.
Aug 15, 2018
5beb799
Added more JDK for travis build.
Aug 16, 2018
0770c54
Fixed javadoc bug found in jdk9+ builds
Aug 16, 2018
aacb04e
Removed oracle-ea version from travis checks.
Aug 16, 2018
741aba1
Removed openjdk11 from build for now (gradle reflection issue).
Aug 16, 2018
0fbf382
Fixed two tests that were flaky when running on slower travis vms.
Aug 16, 2018
ea5e851
Hardened drain against timing issues when dispatcher is in a call back.
Aug 17, 2018
24bb053
[ADDED] Added support for NKeys - forcing 2 dependencies
Aug 20, 2018
3577fba
Changed timing for flaky test
Aug 20, 2018
e3b9da4
Fixed a flaky test
Aug 20, 2018
9e2274a
[FIX] Fixed a problem with a flaky test, related to upper/lower case …
Aug 20, 2018
65f5826
Tuning flaky test with drain and queues.
Aug 20, 2018
0b17d40
Tuning flaky drain/queue tests that has problems on jdk10 on travis
Aug 21, 2018
06aee7c
Major rework for drain.
Aug 23, 2018
641ee09
Tuning drain with queue test for travis.
Aug 23, 2018
c320471
Continuing to tune timing for queue handoff test for travis.
Aug 23, 2018
cc481ca
Continuing to clean up drain/queue test for travis.
Aug 23, 2018
716c2cc
Fixed nkey private key impl to match go version, and added test for s…
Aug 23, 2018
2dbbfd7
added tests for public/private from seed
Aug 23, 2018
3202051
Removed NKey from 2.1.0
Aug 28, 2018
5a329d4
[Fixed] Fixed a problem with null pointer when there are too many pin…
Aug 31, 2018
07384c6
Prep for 2.1.0 release
Sep 6, 2018
9e69e67
Minor doc change.
Sep 6, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ language: java
sudo: required
jdk:
- oraclejdk8
- oraclejdk9
- oraclejdk10
- openjdk8
- openjdk9
- openjdk10
before_script:
- wget "https://github.com/nats-io/gnatsd/releases/download/$gnatsd_version/gnatsd-$gnatsd_version-linux-amd64.zip"
-O tmp.zip
Expand All @@ -24,7 +29,7 @@ cache:
- "$HOME/.gradle/wrapper/"
after_success:
- "./gradlew test jacocoTestReport coveralls"
- test ${TRAVIS_BRANCH} != 'master' && "./gradlew uploadArchives" # Disable master for now, it fails due to ip address issues
- test ${TRAVIS_BRANCH} != 'master' && test ${TRAVIS_JDK_VERSION} == 'oraclejdk8' && "./gradlew uploadArchives" # Disable master for now, it fails due to ip address issues
#Disable for now, upload archives fails because of IP address changes - "test ${TRAVIS_PULL_REQUEST} != 'true' && test ${TRAVIS_BRANCH} = 'master' && ./gradlew closeAndReleaseRepository"
env:
global:
Expand Down
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@

# Change Log

## Version 2.0.0
## Version 2.1.0

* [ADDED] Support for consumer or connection drain. (New API lead to version bump.)
* [FIXED] Fixed an issue with null pointer when ping/pong and reconnect interacted poorly.

## Version 2.0.2

* [FIXED] In a cluster situation the library wasn't using each server's auth info if it was in the URI.

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ A [Java](http://java.com) client for the [NATS messaging system](https://nats.io

## A Note on Versions

This is version 2.0 of the java-nats library. This version is a ground up rewrite of the original library. Part of the goal of this re-write was to address the excessive use of threads, we created a Dispatcher construct to allow applications to control thread creation more intentionally. This version also removes all non-JDK runtime dependencies.
This is version 2.1 of the java-nats library. This version is a ground up rewrite of the original library. Part of the goal of this re-write was to address the excessive use of threads, we created a Dispatcher construct to allow applications to control thread creation more intentionally. This version also removes all non-JDK runtime dependencies.

The API is [simple to use](#listening-for-incoming-messages) and highly [performant](#Benchmarking).

Version 2.0 uses a simplified versioning scheme. Any issues will be fixed in the incremental version number. As a major release, the major version has been updated to 2.0 to allow clients to limit there use of this new API.
Version 2.1 uses a simplified versioning scheme. Any issues will be fixed in the incremental version number. As a major release, the major version has been updated to 2 to allow clients to limit there use of this new API. With the addition of drain() we are updating to 2.1.

Previous versions are still available in the repo.

Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ plugins {

// Update version here, repeated check-ins not into master will have snapshot on them
def versionMajor = 2
def versionMinor = 0
def versionPatch = 2
def versionMinor = 1
def versionPatch = 0
def versionModifier = ""
def branch = System.getenv("TRAVIS_BRANCH");

Expand Down
63 changes: 59 additions & 4 deletions src/examples/java/io/nats/examples/benchmark/NatsBench.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

/**
* A utility class for measuring NATS performance, similar to the version in go and node.
Expand Down Expand Up @@ -184,8 +185,14 @@ public void run() {
}

class PubWorker extends Worker {
PubWorker(Future<Boolean> starter, Phaser finisher, int numMsgs, int size, boolean secure) {

private AtomicLong start;
private long targetPubRate;

PubWorker(Future<Boolean> starter, Phaser finisher, int numMsgs, int size, boolean secure, long targetPubRate) {
super(starter, finisher, numMsgs, size, secure);
this.start = new AtomicLong();
this.targetPubRate = targetPubRate;
}

@Override
Expand All @@ -201,22 +208,66 @@ public void run() {

// Wait for the signal
starter.get(60, TimeUnit.SECONDS);
long start = System.nanoTime();
start.set(System.nanoTime());
for (int i = 0; i < numMsgs; i++) {
nc.publish(subject, payload);
sent.incrementAndGet();
}
nc.flush(Duration.ZERO);
long end = System.nanoTime();

bench.addPubSample(new Sample(numMsgs, size, start, end, nc.getStatistics()));
bench.addPubSample(new Sample(numMsgs, size, start.get(), end, nc.getStatistics()));
nc.close();
} catch (Exception e) {
errorQueue.add(e);
} finally {
finisher.arrive();
}
}


void adjustAndSleep(Connection nc) throws InterruptedException {

if (this.targetPubRate <= 0) {
return;
}

long count = sent.incrementAndGet();

if (count % 1000 != 0) { // Only sleep every 1000 message
return;
}

long now = System.nanoTime();
long start = this.start.get();
double rate = (1e9 * (double) count)/((double)(now - start));
double delay = (1.0/((double) this.targetPubRate));
double adjust = delay / 20.0; // 5%

if (adjust == 0) {
adjust = 1e-9; // 1ns min
}

if (rate < this.targetPubRate) {
delay -= adjust;
} else if (rate > this.targetPubRate) {
delay += adjust;
}

if (delay < 0) {
delay = 0;
}

delay = delay * 1000; // we are doing this every 1000 messages

long nanos = (long)(delay * 1e9);
LockSupport.parkNanos(nanos);

// Flush small messages regularly
if (this.size < 64 && count != 0 && count % 100_000 == 0) {
try {nc.flush(Duration.ZERO);}catch(Exception e){}
}
}
}

/**
Expand Down Expand Up @@ -290,7 +341,11 @@ public void runTest(String title, int pubCount, int subCount) throws Exception {
perPubMsgs = remaining;
}

new Thread(new PubWorker(starter, finisher, perPubMsgs, this.size, secure), "Pub-"+i).start();
if (subCount == 0) {
new Thread(new PubWorker(starter, finisher, perPubMsgs, this.size, secure, 0), "Pub-"+i).start();
} else {
new Thread(new PubWorker(starter, finisher, perPubMsgs, this.size, secure, 2_000_000), "Pub-"+i).start();
}

remaining -= perPubMsgs;
}
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/io/nats/client/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,45 @@ public enum Status {
*/
public void flush(Duration timeout) throws TimeoutException, InterruptedException;

/**
* Drain tells the connection to process in flight messages before closing.
*
* Drain initially drains all of the consumers, stopping incoming messages.
* Next, publishing is halted and a flush call is used to insure all published
* messages have reached the server.
* Finally the connection is closed.
*
* In order to drain subscribers, an unsub protocol message is sent to the server followed by a flush.
* These two steps occur before drain returns. The remaining steps occur in a background thread.
* This method tries to manage the timeout properly, so that if the timeout is 1 second, and the flush
* takes 100ms, the remaining steps have 900ms in the background thread.
*
* The connection will try to let all messages be drained, but when the timeout is reached
* the connection is closed and any outstanding dispatcher threads are interrupted.
*
* A future is used to allow this call to be treated as synchronous or asynchronous as
* needed by the application. The value of the future will be true if all of the subscriptions
* were drained in the timeout, and false otherwise. The future is completed after the connection
* is closed, so any connection handler notifications will happen before the future completes.
*
* @param timeout The time to wait for the drain to succeed, pass 0 to wait
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other languages we use -1 for "forever".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we don't support 0 time, i think this is ok. Also, changing would require a major revision bump.

* forever. Drain involves moving messages to and from the server
* so a very short timeout is not recommended. If the timeout is reached before
* the drain completes, the connection is simply closed, which can result in message
* loss.
* @return A future that can be used to check if the drain has completed
* @throws InterruptedException if the thread is interrupted
* @throws TimeoutException if the initial flush times out
*/
public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutException, InterruptedException;

/**
* Close the connection and release all blocking calls like {@link #flush flush}
* and {@link Subscription#nextMessage(Duration) nextMessage}.
*
* If close() is called after {@link #drain(Duration) drain} it will wait up to the connection timeout
* to return, but it will not initiate a close. The drain takes precedence and will initiate the close.
*
* @throws InterruptedException if the thread, or one owned by the connection is interrupted during the close
*/
public void close() throws InterruptedException ;
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/io/nats/client/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

package io.nats.client;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;

/**
* A Consumer in the NATS library is an object that represents an incoming queue of
* messages. There are two types of consumers {@link Dispatcher} and {@link Subscription}.
Expand Down Expand Up @@ -96,4 +99,20 @@ public interface Consumer {
* For a subscription the answer is false after unsubscribe. For a dispatcher, false after stop.
*/
public boolean isActive();

/**
* Drain tells the consumer to process in flight, or cached messages, but stop receiving new ones. The library will
* flush the unsubscribe call(s) insuring that any publish calls made by this client are included. When all messages
* are processed the consumer effectively becomes unsubscribed.
*
* A future is used to allow this call to be treated as synchronous or asynchronous as
* needed by the application.
*
* @param timeout The time to wait for the drain to succeed, pass 0 to wait
* forever. Drain involves moving messages to and from the server
* so a very short timeout is not recommended.
* @return A future that can be used to check if the drain has completed
* @throws InterruptedException if the thread is interrupted
*/
public CompletableFuture<Boolean> drain(Duration timeout) throws InterruptedException;
}
51 changes: 39 additions & 12 deletions src/main/java/io/nats/client/impl/MessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,26 @@

import java.time.Duration;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Predicate;

class MessageQueue {
private final static int STOPPED = 0;
private final static int RUNNING = 1;
private final static int DRAINING = 2;

private final AtomicLong length;
private final AtomicLong sizeInBytes;
private final AtomicBoolean running;
private final AtomicInteger running;
private final boolean singleThreadedReader;
private final ConcurrentLinkedQueue<NatsMessage> queue;
private final ConcurrentLinkedQueue<Thread> waiters;

MessageQueue(boolean singleReaderMode) {
this.queue = new ConcurrentLinkedQueue<>();
this.running = new AtomicBoolean(true);
this.running = new AtomicInteger(RUNNING);
this.sizeInBytes = new AtomicLong(0);
this.length = new AtomicLong(0);

Expand All @@ -43,19 +47,32 @@ boolean isSingleReaderMode() {
}

boolean isRunning() {
return this.running.get();
return this.running.get() != STOPPED;
}

boolean isDraining() {
return this.running.get() == DRAINING;
}

void pause() {
this.running.set(false);
this.running.set(STOPPED);
signalAll();
}

void resume() {
this.running.set(true);
this.running.set(RUNNING);
signalAll();
}

void drain() {
this.running.set(DRAINING);
signalAll();
}

boolean isDrained() {
return this.running.get() == DRAINING && this.length() == 0;
}

void signalOne() {
Thread t = waiters.poll();
if (t != null) {
Expand Down Expand Up @@ -99,7 +116,12 @@ NatsMessage waitForTimeout(Duration timeout) throws InterruptedException {
// Semi-spin for at most MAX_SPIN_TIME
if (timeoutNanos > MAX_SPIN_TIME) {
int count = 0;
while (this.running.get() && (retVal = this.queue.poll()) == null && count < MAX_SPINS) {
while (this.isRunning() && (retVal = this.queue.poll()) == null && count < MAX_SPINS) {

if (this.isDraining()) {
break;
}

count++;
LockSupport.parkNanos(SPIN_WAIT);
}
Expand All @@ -111,7 +133,12 @@ NatsMessage waitForTimeout(Duration timeout) throws InterruptedException {

long now = start;

while (this.running.get() && (retVal = this.queue.poll()) == null) {
while (this.isRunning() && (retVal = this.queue.poll()) == null) {

if (this.isDraining()) {
break;
}

if (timeoutNanos > 0) { // If it is 0, keep it as zero, otherwise reduce based on time
now = System.nanoTime();
timeoutNanos = timeoutNanos - (now - start); //include the semi-spin time
Expand Down Expand Up @@ -140,7 +167,7 @@ NatsMessage waitForTimeout(Duration timeout) throws InterruptedException {
}

NatsMessage pop(Duration timeout) throws InterruptedException {
if (!this.running.get()) {
if (!this.isRunning()) {
return null;
}

Expand Down Expand Up @@ -176,7 +203,7 @@ NatsMessage accumulate(long maxSize, long maxMessages, Duration timeout)
throw new IllegalStateException("Accumulate is only supported in single reader mode.");
}

if (!this.running.get()) {
if (!this.isRunning()) {
return null;
}

Expand All @@ -185,7 +212,7 @@ NatsMessage accumulate(long maxSize, long maxMessages, Duration timeout)
if (msg == null) {
msg = waitForTimeout(timeout);

if (!this.running.get() || (msg == null)) {
if (!this.isRunning() || (msg == null)) {
return null;
}
}
Expand Down Expand Up @@ -247,7 +274,7 @@ long sizeInBytes() {
}

void filter(Predicate<NatsMessage> p) {
if (this.running.get()) {
if (this.isRunning()) {
throw new IllegalStateException("Filter is only supported when the queue is paused");
}

Expand Down
Loading