Skip to content

Commit

Permalink
Merge pull request #233 from nats-io/2.4.4
Browse files Browse the repository at this point in the history
2.4.4
  • Loading branch information
Stephen Asbury committed May 9, 2019
2 parents 5f29104 + 5b5c329 commit 71511a2
Show file tree
Hide file tree
Showing 21 changed files with 401 additions and 103 deletions.
8 changes: 4 additions & 4 deletions .travis.yml
Expand Up @@ -8,10 +8,10 @@ jdk:
- openjdk9
- openjdk10
before_script:
- wget "https://github.com/nats-io/gnatsd/releases/download/$gnatsd_version/gnatsd-$gnatsd_version-linux-amd64.zip"
- wget "https://github.com/nats-io/nats-server/releases/download/$nats_server_version/gnatsd-$nats_server_version-linux-amd64.zip"
-O tmp.zip
- unzip tmp.zip
- mv gnatsd-$gnatsd_version-linux-amd64 gnatsd
- mv gnatsd-$nats_server_version-linux-amd64 nats-server
before_install:
- openssl aes-256-cbc -K $encrypted_f07928735f08_key -iv $encrypted_f07928735f08_iv
-in .travis/nats.travis.gpg.enc -out .travis/nats.travis.gpg -d
Expand All @@ -32,8 +32,8 @@ after_success:
#Disable for now, upload archives fails because of IP address changes - "test ${TRAVIS_PULL_REQUEST} != 'true' && test ${TRAVIS_BRANCH} = 'master' && ./gradlew closeAndReleaseRepository"
env:
global:
- gnatsd_version=v1.3.0
- gnatsd_path=$TRAVIS_BUILD_DIR/gnatsd/gnatsd
- nats_server_version=v1.4.0
- nats_server_path=$TRAVIS_BUILD_DIR/nats-server/gnatsd
- GPG_KEYRING_FILE=.travis/nats.travis.gpg
- secure: yvOfk7kJzzTQ38n444jTDets24FZmxewwb3lrhXwpHTwOnQyq/B8QaHeqvhneECMc0Bq5M4blTlJ/wOWJAvs61POv2QVkyw+u8cVNROzkb8GPaH4ybPo8HMl33EHFNqh1KRo2C9hAPMYbbTjKCVY2UdkdfJ2l4lN/Awk7uEDX8ckc/sENhDeQjY/xoGZUP28O568Eg4ZxN3fr3WEV/0T+R15YyL2X0ev8MiGJM5TojXnNFKdb5fkUodRWwiY8JDn5xzP7xUzzen7MqE/5YNTcIC6haU8LToJM2gXEQtdoWLZqMPWr7k4A+eTBO5vl9qWrPBaOodFJYKzEjrEDfHj5RR9uaufEsnwQzXKw1ODrIFVZiC2n73j/tatWDI+vjnJ5tO+VMwWj53qdBYrvYeyewIT3cz9rrDHH8fGINsKAsk6HgWM3SMgeNSuXjRN0ePxEph5FVQ3ZUjF1ZXp90O7kjD5kXg/jVs6GrhCviRT3fx6Z4hyat9ytshy66jqcttHEfJ5sSOBg8fVbWJjLbxmghWUFp1fuc0HGNiMJStEyOBai5AkG6uJccTlgjlNL/8mgEF+fxo8HGVyStQzRnr7LJuCmWW9hx/aBVmqXR4p6cRgsSO09PvHRmcsLQoktCxVxsvcfblQqMbiQKjsJ4tXLe0U88DMOHnEGOgtik/tt+4=
- secure: isW18c01AJEDAPUUl6rKcewHxOqItTW0TiiEIrWQqQP/C3O06WgAbiFYVFPJ9zCi6me0Wj3YMmEoxiYBhFdgH/O5xoQnnU7xIfD9hcmByglsoyGsK/Wz0wcERoVf9bfbVQkj9q/Mg7kaUZCMWqcFR3CqHEGu8UH5x7ecDW5FXfAQDjN5czT1j1VAwhHZCfIktJuy/GzoFGgRJpvnFPSlHmi0I8fApoX43tmOCkTVHnaXt9CDL3A5EIKtok5dwu0FF5d9hQFncJB8gqGxd+r8a3W3+0Gfgdou3x+AlGTf3R62LgB03GY0MFrMVfanWJE1ORdV0o9hC3AiwOsKBTungZ0arQeXtDXHSeMY52O6u7C8MCwQgbTmzO2YsmMwwTL98PPQxEJ6c8r7WBAfxzxxRTJ/QjPqQdyWV9dFWOnsmEhBLM2Wi858dJlw5fDEoHgy8EUZTQcquUWqEzTJca1VdrLza/PlND8dqfAjxqINtpsXu88JsLUu5VjFiLwln5NpdNKfcY4oaPiLLYdrSgdxBfHCCISP+r8iqgKLDguFwza3xcPSFwqtEq8aYmy0fjgd0c9hlz6oe0NvLc4kPJf4q9NDjffUXBciiv8VXdL3YyRG67h9AF+ndbM8NHsup5FfmALfq2bGIpe4USIqoOAZFUSa35hPDW87C7Z4vvPvb9I=
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
@@ -1,6 +1,14 @@

# Change Log

## Version 2.4.4

* [FIXED] - #230 - removed extra executor allocation
* [FIXED] - #231 - found a problem with message ordering when filtering pings on reconnect, caused issues with reconnect in general
* [FIXED] - #226 - added more doc about ping intervals and max ping
* [FIXED] - #224 - resolved a latency problem with windows due to the cost of the message queues spinwait/lock
* [CHANGED] - started support for renaming gnatsd to nats-server, full release isn't done so using gnatsd for tests still

## Version 2.4.3

* [FIXED] - #223 - made SID public in the message
Expand Down
18 changes: 10 additions & 8 deletions README.md
Expand Up @@ -12,6 +12,8 @@ A [Java](http://java.com) client for the [NATS messaging system](https://nats.io

## A Note on Versions

The NATS server renamed itself from gnatsd to nats-server around 2.4.4. This and other files try to use the new names, but some underlying code may change over several versions. If you are building yourself, please keep an eye out for issues and report them.

This is version 2.1 of the java-nats library. This version is a ground up rewrite of the original library. Part of the goal of this re-write was to address the excessive use of threads, we created a Dispatcher construct to allow applications to control thread creation more intentionally. This version also removes all non-JDK runtime dependencies.

The API is [simple to use](#listening-for-incoming-messages) and highly [performant](#Benchmarking).
Expand All @@ -36,9 +38,9 @@ The java-nats client is provided in a single jar file, with a single external de

### Downloading the Jar

You can download the latest jar at [https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.4.3/jnats-2.4.3.jar](https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.4.3/jnats-2.4.3.jar).
You can download the latest jar at [https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.4.4/jnats-2.4.4.jar](https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.4.4/jnats-2.4.4.jar).

The examples are available at [https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.4.3/jnats-2.4.3-examples.jar](https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.4.3/jnats-2.4.3-examples.jar).
The examples are available at [https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.4.4/jnats-2.4.4-examples.jar](https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.4.4/jnats-2.4.4-examples.jar).

To use NKeys, you will need the ed25519 library, which can be downloaded at [https://repo1.maven.org/maven2/net/i2p/crypto/eddsa/0.3.0/eddsa-0.3.0.jar](https://repo1.maven.org/maven2/net/i2p/crypto/eddsa/0.3.0/eddsa-0.3.0.jar).

Expand All @@ -48,7 +50,7 @@ The NATS client is available in the Maven central repository, and can be importe

```groovy
dependencies {
implementation 'io.nats:jnats:2.4.3'
implementation 'io.nats:jnats:2.4.4'
}
```

Expand All @@ -74,7 +76,7 @@ The NATS client is available on the Maven central repository, and can be importe
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.4.3</version>
<version>2.4.4</version>
</dependency>
```

Expand All @@ -101,7 +103,7 @@ NATS uses RNG to generate unique inbox names. A peculiarity of the JDK on Linux

## Basic Usage

Sending and receiving with NATS is as simple as connecting to the gnatsd and publishing or subscribing for messages. A number of examples are provided in this repo as described in [examples.md](src/examples/java/io/nats/examples/examples.md).
Sending and receiving with NATS is as simple as connecting to the nats-server and publishing or subscribing for messages. A number of examples are provided in this repo as described in [examples.md](src/examples/java/io/nats/examples/examples.md).

### Connecting

Expand Down Expand Up @@ -232,7 +234,7 @@ If you want to try out these techniques, take a look at the [examples.md](src/ex

### Clusters & Reconnecting

The Java client will automatically reconnect if it loses its connection the gnatsd. If given a single server, the client will keep trying that one. If given a list of servers, the client will rotate between them. When the gnatsd servers are in a cluster, they will tell the client about the other servers, so that in the simplest case a client could connect to one server, learn about the cluster and reconnect to another server if its initial one goes down.
The Java client will automatically reconnect if it loses its connection the nats-server. If given a single server, the client will keep trying that one. If given a list of servers, the client will rotate between them. When the nats servers are in a cluster, they will tell the client about the other servers, so that in the simplest case a client could connect to one server, learn about the cluster and reconnect to another server if its initial one goes down.

To tell the connection about multiple servers for the initial connection, use the `servers()` method on the options builder, or call `server()` multiple times.

Expand All @@ -245,7 +247,7 @@ Reconnection behavior is controlled via a few options, see the javadoc for the O

## Benchmarking

The `io.nats.examples` package contains two benchmarking tools, modeled after tools in other NATS clients. Both examples run against an existing gnatsd. The first called `io.nats.examples.benchmark.NatsBench` runs two simple tests, the first simply publishes messages, the second also receives messages. Tests are run with 1 thread/connection per publisher or subscriber. Running on an iMac (2017), with 4.2 GHz Intel Core i7 and 64GB of memory produced results like:
The `io.nats.examples` package contains two benchmarking tools, modeled after tools in other NATS clients. Both examples run against an existing nats-server. The first called `io.nats.examples.benchmark.NatsBench` runs two simple tests, the first simply publishes messages, the second also receives messages. Tests are run with 1 thread/connection per publisher or subscriber. Running on an iMac (2017), with 4.2 GHz Intel Core i7 and 64GB of memory produced results like:

```AsciiDoc
Starting benchmark(s) [msgs=5000000, msgsize=256, pubs=2, subs=2]
Expand Down Expand Up @@ -371,7 +373,7 @@ The java doc is located in `build/docs` and the example jar is in `build/libs`.

which will create a folder called `build/reports/jacoco` containing the file `index.html` you can open and use to browse the coverage. Keep in mind we have focused on library test coverage, not coverage for the examples.

Many of the tests run gnatsd on a custom port. If gnatsd is in your path they should just work, but in cases where it is not, or an IDE running tests has issues with the path you can specify the gnatsd location with the environment variable `gnatsd_path`.
Many of the tests run nats-server on a custom port. If nats-server is in your path they should just work, but in cases where it is not, or an IDE running tests has issues with the path you can specify the nats-server location with the environment variable `nats_-_server_path`.

## License

Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Expand Up @@ -14,9 +14,9 @@ plugins {
// Be sure to update Nats.java with the latest version, the change log and the package-info.java
def versionMajor = 2
def versionMinor = 4
def versionPatch = 3
def versionPatch = 4
def versionModifier = ""
def jarVersion = "2.4.3"
def jarVersion = "2.4.4"
def branch = System.getenv("TRAVIS_BRANCH");

def getVersionName = { ->
Expand Down
93 changes: 93 additions & 0 deletions src/examples/java/io/nats/examples/RawTCPLatencyTest.java
@@ -0,0 +1,93 @@
package io.nats.examples;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class RawTCPLatencyTest {

static boolean isServer;
static String host = "localhost";
static int port = 1234;
static int warmIters = 1000, runIters = 10000;

static InputStream in;
static OutputStream out;

public static void main(String[] args) {
if (args.length < 1) {
System.out.println("Parameters: server/client host port");
return;
}
isServer = args[0].startsWith("s");
if (args.length > 1) {
host = args[1];
}
if (args.length > 2) {
port = Integer.parseInt(args[2]);
}
try {
if (isServer) {
runServer();
} else {
runClient();
}
} catch (IOException ex) {
Logger.getLogger(RawTCPLatencyTest.class.getName()).log(Level.SEVERE, null, ex);
}
}

private static void runServer() throws IOException {
ServerSocket serverSocket = new ServerSocket(port);
while (true) {
Socket socket = serverSocket.accept();
System.out.println("Connected");
socket.setTcpNoDelay(true);
socket.setReceiveBufferSize(2 * 1024 * 1024);
socket.setSendBufferSize(2 * 1024 * 1024);
in = socket.getInputStream();
out = socket.getOutputStream();
try {
while (true) {
int rq = in.read();
out.write(rq);
}
} catch (IOException e) {
System.out.println("Disconnected");
}
}
}

private static void runClient() throws SocketException, IOException {
Socket socket = new Socket();
socket.setTcpNoDelay(true);
socket.setReceiveBufferSize(2 * 1024 * 1024);
socket.setSendBufferSize(2 * 1024 * 1024);
socket.connect(new InetSocketAddress(host, port), 1000);
in = socket.getInputStream();
out = socket.getOutputStream();
System.out.println("Connected");
for (int i = 0; i < warmIters; i++) {
sendRecv();
}
System.out.println("Warmed");
long t0 = System.nanoTime();
for (int i = 0; i < runIters; i++) {
sendRecv();
}
long t1 = System.nanoTime();
System.out.println("Average latency " + (1.0 * (t1 - t0)) / (1000000.0 * runIters) + " ms");
socket.close();
}

private static void sendRecv() throws IOException {
out.write(11);
in.read();
}
}
113 changes: 65 additions & 48 deletions src/examples/java/io/nats/examples/autobench/NatsAutoBench.java
Expand Up @@ -38,6 +38,7 @@ public static void main(String args[]) {
boolean utf8 = false;
int baseMsgs = 100_000;
int latencyMsgs = 5_000;
long maxSize = 8*1024;

if (args.length > 0) {
for (String s : args) {
Expand All @@ -49,12 +50,15 @@ public static void main(String args[]) {
} else if (s.equals("small")) {
baseMsgs = 5_000;
latencyMsgs = 250;
maxSize = 1024;
} else if (s.equals("tiny")) {
baseMsgs = 1_000;
latencyMsgs = 50;
maxSize = 1024;
} else if (s.equals("nano")) {
baseMsgs = 10;
latencyMsgs = 5;
maxSize = 512;
} else if (s.equals("help")) {
usage();
return;
Expand All @@ -64,7 +68,7 @@ public static void main(String args[]) {
}
}

System.out.printf("Connecting to gnatsd at %s\n", server);
System.out.printf("Connecting to NATS server at %s\n", server);

try {
Options.Builder builder = new Options.Builder().
Expand All @@ -78,7 +82,7 @@ public static void main(String args[]) {
}

Options connectOptions = builder.build();
List<AutoBenchmark> tests = buildTestList(baseMsgs, latencyMsgs);
List<AutoBenchmark> tests = buildTestList(baseMsgs, latencyMsgs, maxSize);

System.out.println("Running warmup");
runWarmup(connectOptions);
Expand Down Expand Up @@ -125,7 +129,7 @@ public static void main(String args[]) {
}

public static void runWarmup(Options connectOptions) throws Exception {
AutoBenchmark warmup = (new PubSubBenchmark("warmup", 1_000_000, 64));
AutoBenchmark warmup = (new PubSubBenchmark("warmup", 100_000, 64));
warmup.execute(connectOptions);

if (warmup.getException() != null) {
Expand All @@ -134,56 +138,69 @@ public static void runWarmup(Options connectOptions) throws Exception {
}
}

public static List<AutoBenchmark> buildTestList(int baseMsgs, int latencyMsgs) {
public static List<AutoBenchmark> buildTestList(int baseMsgs, int latencyMsgs, long maxSize) {
ArrayList<AutoBenchmark> tests = new ArrayList<>();

int[] sizes = {0, 8, 32, 256, 512, 1024, 4*1024, 8*1024};
int[] msgsMultiple = {100, 100, 100, 100, 100, 10, 5, 1};
int[] msgsDivider = {5, 5, 10, 10, 10, 10, 10, 10};

/**/
tests.add(new PubBenchmark("PubOnly 0b", 100 * baseMsgs, 0));
tests.add(new PubBenchmark("PubOnly 8b", 100 * baseMsgs, 8));
tests.add(new PubBenchmark("PubOnly 32b", 100 * baseMsgs, 32));
tests.add(new PubBenchmark("PubOnly 256b", 100 * baseMsgs, 256));
tests.add(new PubBenchmark("PubOnly 512b", 100 * baseMsgs, 512));
tests.add(new PubBenchmark("PubOnly 1k", 10 * baseMsgs, 1024));
tests.add(new PubBenchmark("PubOnly 4k", 5 * baseMsgs, 4*1024));
tests.add(new PubBenchmark("PubOnly 8k", baseMsgs, 8*1024));

tests.add(new PubSubBenchmark("PubSub 0b", 100 * baseMsgs, 0));
tests.add(new PubSubBenchmark("PubSub 8b", 100 * baseMsgs, 8));
tests.add(new PubSubBenchmark("PubSub 32b", 100 * baseMsgs, 32));
tests.add(new PubSubBenchmark("PubSub 256b", 100 * baseMsgs, 256));
tests.add(new PubSubBenchmark("PubSub 512b", 50 * baseMsgs, 512));
tests.add(new PubSubBenchmark("PubSub 1k", 10 * baseMsgs, 1024));
tests.add(new PubSubBenchmark("PubSub 4k", baseMsgs, 4*1024));
tests.add(new PubSubBenchmark("PubSub 8k", baseMsgs, 8*1024));

tests.add(new PubDispatchBenchmark("PubDispatch 0b", 100 * baseMsgs, 0));
tests.add(new PubDispatchBenchmark("PubDispatch 8b", 100 * baseMsgs, 8));
tests.add(new PubDispatchBenchmark("PubDispatch 32b", 100 * baseMsgs, 32));
tests.add(new PubDispatchBenchmark("PubDispatch 256b", 100 * baseMsgs, 256));
tests.add(new PubDispatchBenchmark("PubDispatch 512b", 50 * baseMsgs, 512));
tests.add(new PubDispatchBenchmark("PubDispatch 1k", 10 * baseMsgs, 1024));
tests.add(new PubDispatchBenchmark("PubDispatch 4k", baseMsgs, 4*1024));
tests.add(new PubDispatchBenchmark("PubDispatch 8k", baseMsgs, 8*1024));

for(int i=0; i<sizes.length; i++) {
int size = sizes[i];
int msgMult = msgsMultiple[i];

if(size > maxSize) {
break;
}

tests.add(new PubBenchmark("PubOnly "+size, msgMult * baseMsgs, size));
}

for(int i=0; i<sizes.length; i++) {
int size = sizes[i];
int msgMult = msgsMultiple[i];

if(size > maxSize) {
break;
}

tests.add(new PubSubBenchmark("PubSub "+size, msgMult * baseMsgs, size));
}

for(int i=0; i<sizes.length; i++) {
int size = sizes[i];
int msgMult = msgsMultiple[i];

if(size > maxSize) {
break;
}

tests.add(new PubDispatchBenchmark("PubDispatch "+size, msgMult * baseMsgs, size));
}

// Request reply is a 4 message trip, and runs the full loop before sending another message
// so we run fewer because the client cannot batch any socket calls to the server together
tests.add(new ReqReplyBenchmark("ReqReply 0b", baseMsgs / 5, 0));
tests.add(new ReqReplyBenchmark("ReqReply 8b", baseMsgs / 5, 8));
tests.add(new ReqReplyBenchmark("ReqReply 32b", baseMsgs / 10, 32));
tests.add(new ReqReplyBenchmark("ReqReply 256b", baseMsgs / 10, 256));
tests.add(new ReqReplyBenchmark("ReqReply 512b", baseMsgs / 10, 512));
tests.add(new ReqReplyBenchmark("ReqReply 1k", baseMsgs / 10, 1024));
tests.add(new ReqReplyBenchmark("ReqReply 4k", baseMsgs / 10, 4*1024));
tests.add(new ReqReplyBenchmark("ReqReply 8k", baseMsgs / 10, 8*1024));

tests.add(new LatencyBenchmark("Latency 0b", latencyMsgs, 0));
tests.add(new LatencyBenchmark("Latency 8b", latencyMsgs, 8));
tests.add(new LatencyBenchmark("Latency 32b", latencyMsgs, 32));
tests.add(new LatencyBenchmark("Latency 256b", latencyMsgs, 256));
tests.add(new LatencyBenchmark("Latency 512b", latencyMsgs, 512));
tests.add(new LatencyBenchmark("Latency 1k", latencyMsgs, 1024));
tests.add(new LatencyBenchmark("Latency 4k", latencyMsgs, 4 * 1024));
tests.add(new LatencyBenchmark("Latency 8k", latencyMsgs, 8 * 1024));
for(int i=0; i<sizes.length; i++) {
int size = sizes[i];
int msgDivide = msgsDivider[i];

if(size > maxSize) {
break;
}

tests.add(new ReqReplyBenchmark("ReqReply "+size, baseMsgs / msgDivide, size));
}

for(int i=0; i<sizes.length; i++) {
int size = sizes[i];

if(size > maxSize) {
break;
}

tests.add(new LatencyBenchmark("Latency "+size, latencyMsgs, size));
}
/**/

return tests;
Expand Down

0 comments on commit 71511a2

Please sign in to comment.