Skip to content

Commit

Permalink
Update docs and comments in MessageQueue and in readme regarding forc…
Browse files Browse the repository at this point in the history
…e reconnect. (#1142)
  • Loading branch information
scottf committed May 13, 2024
1 parent 52b5eec commit 8da8e3a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ is an example of one api being added to the Connection interface. It should have
Going forward, when a release contains only bug fixes, it's appropriate to simply bump the patch.
But if an api is added, even one, then the minor version will be bumped.

##### Force Reconnect

There is a new `Connection` interface api: `void forceReconnect() throws IOException, InterruptedException;`
If you call this, your connection will be immediately close and the reconnect logic will be executed.

#### Version 2.17.4 Core Improvements

This release was full of core improvements which improve use of more asynchronous behaviors including
Expand Down
39 changes: 28 additions & 11 deletions src/main/java/io/nats/client/impl/MessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class MessageQueue {
protected final boolean singleReaderMode;
protected final LinkedBlockingQueue<NatsMessage> queue;
protected final Lock editLock;
protected final int publishHighwaterMark;
protected final int maxMessagesInOutgoingQueue;
protected final boolean discardWhenFull;
protected final long offerLockMillis;
protected final long offerTimeoutMillis;
Expand All @@ -62,17 +62,17 @@ class MessageQueue {
* If set to a number of messages, the publish command will block, which provides
* backpressure on a publisher if the writer is slow to push things onto the network. Publishers use the value of Options.getMaxMessagesInOutgoingQueue().
* @param singleReaderMode allows the use of "accumulate"
* @param publishHighwaterMark sets a limit on the size of the underlying queue
* @param maxMessagesInOutgoingQueue sets a limit on the size of the underlying queue
* @param discardWhenFull allows to discard messages when the underlying queue is full
* @param requestCleanupInterval is used to figure the offerTimeoutMillis
*/
MessageQueue(boolean singleReaderMode, int publishHighwaterMark, boolean discardWhenFull, Duration requestCleanupInterval) {
this(singleReaderMode, publishHighwaterMark, discardWhenFull, requestCleanupInterval, null);
MessageQueue(boolean singleReaderMode, int maxMessagesInOutgoingQueue, boolean discardWhenFull, Duration requestCleanupInterval) {
this(singleReaderMode, maxMessagesInOutgoingQueue, discardWhenFull, requestCleanupInterval, null);
}

MessageQueue(boolean singleReaderMode, int publishHighwaterMark, boolean discardWhenFull, Duration requestCleanupInterval, MessageQueue source) {
this.publishHighwaterMark = publishHighwaterMark;
this.queue = publishHighwaterMark > 0 ? new LinkedBlockingQueue<>(publishHighwaterMark) : new LinkedBlockingQueue<>();
MessageQueue(boolean singleReaderMode, int maxMessagesInOutgoingQueue, boolean discardWhenFull, Duration requestCleanupInterval, MessageQueue source) {
this.maxMessagesInOutgoingQueue = maxMessagesInOutgoingQueue;
this.queue = maxMessagesInOutgoingQueue > 0 ? new LinkedBlockingQueue<>(maxMessagesInOutgoingQueue) : new LinkedBlockingQueue<>();
this.discardWhenFull = discardWhenFull;
this.running = new AtomicInteger(RUNNING);
this.sizeInBytes = new AtomicLong(0);
Expand All @@ -87,7 +87,7 @@ class MessageQueue {

this.singleReaderMode = singleReaderMode;
this.requestCleanupInterval = requestCleanupInterval;

if (source != null) {
source.drainTo(this);
}
Expand Down Expand Up @@ -141,9 +141,26 @@ boolean push(NatsMessage msg) {
boolean push(NatsMessage msg, boolean internal) {
long start = System.currentTimeMillis();
try {
// try to get the lock, but don't wait forever
// assuming that if we are waiting for the lock
// another push likely has the lock and
/*
This was essentially a Head-Of-Line blocking problem.
So the crux of the problem was that many threads were waiting to push a message to the queue.
They all waited for the lock and once they had the lock they waited 5 seconds (4750 millis actually)
only to find out the queue was full. They released the lock, so then another thread acquired the lock,
and waited 5 seconds. So instead of being parallel, all these threads had to wait in line
200 * 4750 = 15.8 minutes
So what I did was try to acquire the lock but only wait 5 seconds.
If I could not acquire the lock, then I assumed that this means that we are in this exact situation,
another thread can't add b/c the queue is full, and so there is no point in even trying, so just throw the queue full exception.
If I did acquire the lock, I deducted the time spent waiting for the lock from the time allowed to try to add.
I took the max of that or 100 millis to try to add to the queue.
This ensures that the max total time each thread can take is 5100 millis in parallel.
Notes: The 5 seconds and the 4750 seconds is derived from the Options requestCleanupInterval, which defaults to 5 seconds and can be modified.
The 4750 is 95% of that time. The 100 ms minimum is arbitrary.
*/
if (!editLock.tryLock(offerLockMillis, TimeUnit.MILLISECONDS)) {
throw new IllegalStateException(OUTPUT_QUEUE_IS_FULL + queue.size());
}
Expand Down

0 comments on commit 8da8e3a

Please sign in to comment.