Skip to content

Commit

Permalink
Merge 938c838 into 1907a43
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed May 2, 2024
2 parents 1907a43 + 938c838 commit 82bc16e
Show file tree
Hide file tree
Showing 14 changed files with 233 additions and 93 deletions.
4 changes: 3 additions & 1 deletion src/examples/java/io/nats/examples/benchmark/NatsBench2.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static io.nats.client.support.NatsConstants.OUTPUT_QUEUE_IS_FULL;

/**
* A utility class for measuring NATS performance, similar to the version in go
* and node. The various tradeoffs to make this code act/work like the other
Expand Down Expand Up @@ -279,7 +281,7 @@ public void run() {
nc.publish(subject, payload);
success = true;
} catch (IllegalStateException ex) {
if (ex.getMessage().contains("Output queue is full")) {
if (ex.getMessage().contains(OUTPUT_QUEUE_IS_FULL)) {
success = false;
Thread.sleep(1000);
} else {
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/nats/client/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -541,15 +541,14 @@ enum Status {
/**
* Immediately flushes the underlying connection buffer if the connection is valid.
* @throws IOException the connection flush fails
* @throws IllegalStateException the connection is not connected
*/
void flushBuffer() throws IOException;

/**
* Forces reconnect behavior. Stops the current connection including the reading and writing,
* copies already queued outgoing messages, and then begins the reconnect logic.
* @throws IOException
* @throws InterruptedException
* @throws IOException the forceReconnect fails
* @throws InterruptedException the connection is not connected
*/
void forceReconnect() throws IOException, InterruptedException;

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/ErrorListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ default void socketWriteTimeout(Connection conn) {}
* @param sub the JetStreamSubscription that this occurred on, if applicable
* @param pairs custom string pairs. I.E. "foo: ", fooObject, "bar-", barObject will be appended
* to the message like ", foo: <fooValue>, bar-<barValue>".
* @return
* @return the message
*/
default String supplyMessage(String label, Connection conn, Consumer consumer, Subscription sub, Object... pairs) {
StringBuilder sb = new StringBuilder(label == null ? "" : label);
Expand Down
33 changes: 29 additions & 4 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public class Options {
*/
public static final Duration DEFAULT_SOCKET_WRITE_TIMEOUT = Duration.ofMinutes(1);

/**
* Constant used for calculating if a socket write timeout is large enough.
*/
public static final long MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT = 1000;

/**
* Default server ping interval. The client will send a ping to the server on this interval to insure liveness.
* The server may send pings to the client as well, these are handled automatically by the library,
Expand Down Expand Up @@ -1239,11 +1244,23 @@ public Builder maxControlLine(int bytes) {
* Set the timeout for connection attempts. Each server in the options is allowed this timeout
* so if 3 servers are tried with a timeout of 5s the total time could be 15s.
*
* @param time the time to wait
* @param connectionTimeout the time to wait
* @return the Builder for chaining
*/
public Builder connectionTimeout(Duration connectionTimeout) {
this.connectionTimeout = connectionTimeout;
return this;
}

/**
* Set the timeout for connection attempts. Each server in the options is allowed this timeout
* so if 3 servers are tried with a timeout of 5s the total time could be 15s.
*
* @param connectionTimeoutMillis the time to wait in milliseconds
* @return the Builder for chaining
*/
public Builder connectionTimeout(Duration time) {
this.connectionTimeout = time;
public Builder connectionTimeout(long connectionTimeoutMillis) {
this.connectionTimeout = Duration.ofMillis(connectionTimeoutMillis);
return this;
}

Expand Down Expand Up @@ -1727,9 +1744,17 @@ else if (useDefaultTls) {
new DefaultThreadFactory(threadPrefix));
}

if (socketWriteTimeout != null && socketWriteTimeout.toMillis() < 1) {
if (socketWriteTimeout == null || socketWriteTimeout.toMillis() < 1) {
socketWriteTimeout = null;
}
else {
long swtMin = connectionTimeout.toMillis() + MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT;
if (socketWriteTimeout.toMillis() < swtMin) {
throw new IllegalStateException("Socket Write Timeout must be at least "
+ MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT
+ " milliseconds greater than the Connection Timeout");
}
}

if (errorListener == null) {
errorListener = new ErrorListenerLoggerImpl();
Expand Down
27 changes: 18 additions & 9 deletions src/main/java/io/nats/client/impl/MessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.function.Predicate;

import static io.nats.client.support.NatsConstants.EMPTY_BODY;
import static io.nats.client.support.NatsConstants.OUTPUT_QUEUE_IS_FULL;

class MessageQueue {
protected static final int STOPPED = 0;
Expand All @@ -35,7 +36,7 @@ class MessageQueue {
protected final AtomicInteger running;
protected final boolean singleReaderMode;
protected final LinkedBlockingQueue<NatsMessage> queue;
protected final Lock filterLock;
protected final Lock editLock;
protected final int publishHighwaterMark;
protected final boolean discardWhenFull;
protected final long offerTimeoutMillis;
Expand Down Expand Up @@ -72,16 +73,24 @@ class MessageQueue {
// The poisonPill is used to stop poll and accumulate when the queue is stopped
this.poisonPill = new NatsMessage("_poison", null, EMPTY_BODY);

this.filterLock = new ReentrantLock();
editLock = new ReentrantLock();

this.singleReaderMode = singleReaderMode;
this.requestCleanupInterval = requestCleanupInterval;
}

MessageQueue(MessageQueue source) {
this(source.singleReaderMode, source.publishHighwaterMark, source.discardWhenFull, source.requestCleanupInterval);
source.queue.drainTo(queue);
length.set(queue.size());
}

void loadFromSourceQueue(MessageQueue source) {
editLock.lock();
try {
source.queue.drainTo(queue);
length.set(queue.size());
} finally {
editLock.unlock();
}
}

private static long calculateOfferTimeoutMillis(Duration requestCleanupInterval) {
Expand Down Expand Up @@ -124,21 +133,21 @@ boolean push(NatsMessage msg) {
}

boolean push(NatsMessage msg, boolean internal) {
this.filterLock.lock();
editLock.lock();
try {
// If we aren't running, then we need to obey the filter lock
// to avoid ordering problems
if (!internal && this.discardWhenFull) {
return this.queue.offer(msg);
}
if (!this.offer(msg)) {
throw new IllegalStateException("Output queue is full " + queue.size());
throw new IllegalStateException(OUTPUT_QUEUE_IS_FULL + queue.size());
}
this.sizeInBytes.getAndAdd(msg.getSizeInBytes());
this.length.incrementAndGet();
return true;
} finally {
this.filterLock.unlock();
editLock.unlock();
}
}

Expand Down Expand Up @@ -289,7 +298,7 @@ long sizeInBytes() {
}

void filter(Predicate<NatsMessage> p) {
this.filterLock.lock();
editLock.lock();
try {
if (this.isRunning()) {
throw new IllegalStateException("Filter is only supported when the queue is paused");
Expand All @@ -307,7 +316,7 @@ void filter(Predicate<NatsMessage> p) {
}
this.queue.addAll(newQueue);
} finally {
this.filterLock.unlock();
editLock.unlock();
}
}
}

0 comments on commit 82bc16e

Please sign in to comment.