Skip to content

Commit

Permalink
Found a problem in message queue where null can be returned when leng…
Browse files Browse the repository at this point in the history
…th > 1. Fixed that.

Created an example for running long tests.
  • Loading branch information
Stephen Asbury committed Jul 12, 2018
1 parent e3c0b04 commit b32ff85
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 17 deletions.
4 changes: 2 additions & 2 deletions src/examples/java/io/nats/examples/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ This folder contains a number of examples:
6. `NatsSub.java` - reads messages synchronously
7. `autobench` - benchmarks the current system/setup in several scenarios
8. `benchmark` - benchmark that supports multiple threads

There is also a larger example called `stan` that implements a server that can respond on multiple subjects, and several clients that send requests on those various subjects.
9. `stan` - A larger example that implements a server that can respond on multiple subjects, and several clients that send requests on those various subjects.
10. `stability` - a small producer and subscriber that run forever printing some status every so often. These are intended for long running tests without burning the CPU.

All of these examples take the server URL on the command line, which means that you can use the `tls` and `opentls` schemas to test over a secure connection.

Expand Down
104 changes: 104 additions & 0 deletions src/examples/java/io/nats/examples/stability/StabilityPub.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2015-2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.examples.stability;

import java.text.NumberFormat;
import java.time.Duration;
import java.time.Instant;

import io.nats.client.Connection;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.examples.benchmark.Utils;

public class StabilityPub {

static final String usageString =
"\nUsage: java NatsPub [server] <subject> <msgSize>"
+ "\n\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n";

public static void main(String args[]) {
String subject;
String server;
int msgSize;
long messageCount = 0;
long payloadCount = 0;

if (args.length == 3) {
server = args[0];
subject = args[1];
msgSize = Integer.parseInt(args[2]);
} else if (args.length == 2) {
server = Options.DEFAULT_URL;
subject = args[0];
msgSize = Integer.parseInt(args[1]);
} else {
usage();
return;
}

try {
Options options = new Options.Builder().server(server).noReconnect().build();
Connection nc = Nats.connect(options);
Instant start = Instant.now();

byte[] payload = new byte[msgSize];

System.out.println("Running stability publisher for indefinite test, ctrl-c to cancel...\n");

while (true) {
nc.publish(subject, payload);

payloadCount += msgSize;
messageCount++;

// This is a long running test, we are going to try for a message rate around
// 10,000/sec not a lot but ok for wifi/slow consumers, the main point
// is to run a long time and be able to watch memory/stability over time
if (messageCount != 0 && messageCount % 1_000 == 0) {
nc.flush(Duration.ofSeconds(30));
try {
Thread.sleep(100);
} catch(Exception exp) {
// ignore it
}
}

if (messageCount != 0 && messageCount % 100_000 == 0) {
Instant finish = Instant.now();
System.out.printf("Running for %s\n", Duration.between(start, finish).toString()
.substring(2)
.replaceAll("(\\d[HMS])(?!$)", "$1 ")
.toLowerCase());
System.out.printf("Sent %s messages.\n", NumberFormat.getIntegerInstance().format(messageCount));
System.out.printf("Sent %s payload bytes.\n", Utils.humanBytes(payloadCount));
System.out.printf("Current memory usage is %s / %s / %s free/total/max\n",
Utils.humanBytes(Runtime.getRuntime().freeMemory()),
Utils.humanBytes(Runtime.getRuntime().totalMemory()),
Utils.humanBytes(Runtime.getRuntime().maxMemory()));
System.out.println();
}
}

} catch (Exception exp) {
exp.printStackTrace();
System.exit(-1);
}
}

static void usage() {
System.err.println(usageString);
System.exit(-1);
}
}
112 changes: 112 additions & 0 deletions src/examples/java/io/nats/examples/stability/StabilitySub.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2015-2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.examples.stability;

import java.text.NumberFormat;
import java.time.Duration;
import java.time.Instant;

import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.Subscription;
import io.nats.examples.benchmark.Utils;

public class StabilitySub {

static final String usageString =
"\nUsage: java NatsSub [server] <subject>"
+ "\n\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n";

public static void main(String args[]) {
String subject;
String server;
long nullCount = 0;
long messageCount = 0;
long payloadCount = 0;
long restarts = 0;

if (args.length == 2) {
server = args[0];
subject = args[1];
} else if (args.length == 1) {
server = Options.DEFAULT_URL;
subject = args[0];
} else {
usage();
return;
}

Instant start = Instant.now();

System.out.println("Running stability subscriber for indefinite test, ctrl-c to cancel...\n");

while (true) {
try {
Options options = new Options.Builder().server(server).noReconnect().build();
Connection nc = Nats.connect(options);
Subscription sub = nc.subscribe(subject);

try {
while(true) { // receive as long as we can
Message msg = sub.nextMessage(Duration.ofHours(1));

if (msg == null) {
nullCount++;
} else if (msg.getData() != null) {
payloadCount += msg.getData().length;
}

messageCount++;

if (messageCount != 0 && messageCount % 100_000 == 0) {
Instant finish = Instant.now();
System.out.printf("Running for %s\n", Duration.between(start, finish).toString()
.substring(2)
.replaceAll("(\\d[HMS])(?!$)", "$1 ")
.toLowerCase());
System.out.printf("Received %s messages.\n", NumberFormat.getIntegerInstance().format(messageCount));
System.out.printf("Received %s payload bytes.\n", Utils.humanBytes(payloadCount));
System.out.printf("Received %s null messages.\n", NumberFormat.getIntegerInstance().format(nullCount));
System.out.printf("Restarted %s times.\n", NumberFormat.getIntegerInstance().format(restarts));
System.out.printf("Current memory usage is %s / %s / %s free/total/max\n",
Utils.humanBytes(Runtime.getRuntime().freeMemory()),
Utils.humanBytes(Runtime.getRuntime().totalMemory()),
Utils.humanBytes(Runtime.getRuntime().maxMemory()));
System.out.println();
}
}
} catch (Exception exp) {
System.out.println("Exception from running connection, creating a new one.");
exp.printStackTrace();
System.out.println("Reconnecting...");
System.out.println();
}

restarts++;

} catch (Exception exp) {
System.out.println("Exception connecting, exiting...");
exp.printStackTrace();
System.exit(-1);
}
}
}

static void usage() {
System.err.println(usageString);
System.exit(-1);
}
}
29 changes: 14 additions & 15 deletions src/main/java/io/nats/client/impl/MessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,18 @@ void signalAll() {

void push(NatsMessage msg) {
this.queue.add(msg);
this.length.incrementAndGet();
this.sizeInBytes.getAndAdd(msg.getSizeInBytes());
this.length.incrementAndGet();
signalOne();
}

public static final int MAX_SPINS = 200;
public static final int SPIN_WAIT = 50;
public static final int MAX_SPIN_TIME = SPIN_WAIT * MAX_SPINS;

void waitForTimeout(Duration timeout) throws InterruptedException {
NatsMessage waitForTimeout(Duration timeout) throws InterruptedException {
long timeoutNanos = (timeout != null) ? timeout.toNanos() : -1;
NatsMessage retVal = null;

if (timeoutNanos >= 0) {
Thread t = Thread.currentThread();
Expand All @@ -98,15 +99,19 @@ void waitForTimeout(Duration timeout) throws InterruptedException {
// Semi-spin for at most MAX_SPIN_TIME
if (timeoutNanos > MAX_SPIN_TIME) {
int count = 0;
while (this.length.get() == 0 && this.running.get() && count < MAX_SPINS) {
while (this.running.get() && (retVal = this.queue.poll()) == null && count < MAX_SPINS) {
count++;
LockSupport.parkNanos(SPIN_WAIT);
}
}

if (retVal != null) {
return retVal;
}

long now = start;

while (this.length.get() == 0 && this.running.get()) {
while (this.running.get() && (retVal = this.queue.poll()) == null) {
if (timeoutNanos > 0) { // If it is 0, keep it as zero, otherwise reduce based on time
now = System.nanoTime();
timeoutNanos = timeoutNanos - (now - start); //include the semi-spin time
Expand All @@ -130,6 +135,8 @@ void waitForTimeout(Duration timeout) throws InterruptedException {
}
}
}

return retVal;
}

NatsMessage pop(Duration timeout) throws InterruptedException {
Expand All @@ -140,13 +147,7 @@ NatsMessage pop(Duration timeout) throws InterruptedException {
NatsMessage retVal = this.queue.poll();

if (retVal == null && timeout != null) {
waitForTimeout(timeout);

if (!this.running.get()) {
return null;
}

retVal = this.queue.poll();
retVal = waitForTimeout(timeout);
}

if(retVal != null) {
Expand Down Expand Up @@ -182,13 +183,11 @@ NatsMessage accumulate(long maxSize, long maxMessages, Duration timeout)
NatsMessage msg = this.queue.poll();

if (msg == null) {
waitForTimeout(timeout);
msg = waitForTimeout(timeout);

if (!this.running.get() || (this.queue.peek() == null)) {
if (!this.running.get() || (msg == null)) {
return null;
}

msg = this.queue.poll();
}

long size = msg.getSizeInBytes();
Expand Down

0 comments on commit b32ff85

Please sign in to comment.