-
Notifications
You must be signed in to change notification settings - Fork 151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Offer timeout millis no longer hardcoded #1088
Conversation
@@ -123,7 +122,6 @@ protected void updateLastMessageReceived() { | |||
} | |||
|
|||
class MmTimerTask extends TimerTask { | |||
public String id = new NUID().nextSequence(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was for debugging that should have been removed
protected final static int DRAINING = 2; | ||
protected static final int STOPPED = 0; | ||
protected static final int RUNNING = 1; | ||
protected static final int DRAINING = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I'm here follow standard coding
@@ -37,12 +37,17 @@ class MessageQueue { | |||
protected final LinkedBlockingQueue<NatsMessage> queue; | |||
protected final Lock filterLock; | |||
protected final boolean discardWhenFull; | |||
protected final long offerTimeoutMillis; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new variable to hold the timeout
MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval) { | ||
this(singleReaderMode, -1, false, requestCleanupInterval); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I combined constructors which required cleanup up the places that the object is constructed. This is better because the calls are now more explicit as well as taking the new required input.
MessageQueue(boolean singleReaderMode, int publishHighwaterMark, boolean discardWhenFull) { | ||
this.queue = publishHighwaterMark > 0 ? new LinkedBlockingQueue<NatsMessage>(publishHighwaterMark) : new LinkedBlockingQueue<NatsMessage>(); | ||
MessageQueue(boolean singleReaderMode, int publishHighwaterMark, boolean discardWhenFull, Duration requestCleanupInterval) { | ||
this.queue = publishHighwaterMark > 0 ? new LinkedBlockingQueue<>(publishHighwaterMark) : new LinkedBlockingQueue<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line 62, while I'm here, proper generic syntax.
MessageQueue(boolean singleReaderMode, int publishHighwaterMark) { | ||
this(singleReaderMode, publishHighwaterMark, false); | ||
private static long calculateOfferTimeoutMillis(Duration requestCleanupInterval) { | ||
return Math.max(1, requestCleanupInterval.toMillis() * 95 / 100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calculates the 95% and makes sure its > 0
@@ -143,7 +146,7 @@ void poisonTheQueue() { | |||
|
|||
boolean offer(NatsMessage msg) { | |||
try { | |||
return this.queue.offer(msg, 5, TimeUnit.SECONDS); | |||
return this.queue.offer(msg, offerTimeoutMillis, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the new variable instead of hard coding.
|
||
// The "reconnect" buffer contains internal messages, and we will keep it unlimited in size | ||
reconnectOutgoing = new MessageQueue(true, 0); | ||
reconnectOutgoing = new MessageQueue(true, options.getRequestCleanupInterval()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageQueue constructor change.
@@ -53,7 +53,7 @@ class NatsDispatcher extends NatsConsumer implements Dispatcher, Runnable { | |||
NatsDispatcher(NatsConnection conn, MessageHandler handler) { | |||
super(conn); | |||
this.defaultHandler = handler; | |||
this.incoming = new MessageQueue(true); | |||
this.incoming = new MessageQueue(true, conn.getOptions().getRequestCleanupInterval()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageQueue constructor change.
@@ -44,7 +44,7 @@ class NatsSubscription extends NatsConsumer implements Subscription { | |||
this.unSubMessageLimit = new AtomicLong(-1); | |||
|
|||
if (this.dispatcher == null) { | |||
this.incoming = new MessageQueue(false); | |||
this.incoming = new MessageQueue(false, connection.getOptions().getRequestCleanupInterval()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageQueue constructor change.
NatsConnection nc = new NatsConnection(options); | ||
njsm.subscription = new NatsSubscription("sid", "sub", "q", nc, null); | ||
// remove the connection so we can test the coverage | ||
njsm.subscription.connection = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed b/c of the MessageQueue constructor change, because code now requires the options. This was a coverage test anyway, this should NEVER happen in real execution.
TestMessageManager tmm = new TestMessageManager(); | ||
NatsJetStreamSubscription sub = | ||
new NatsJetStreamSubscription(mockSid(), "sub", null, null, null, null, "stream", "con", tmm); | ||
new NatsJetStreamSubscription(mockSid(), "sub", null, nc, null, null, "stream", "con", tmm); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed b/c of the MessageQueue constructor change...
@@ -18,6 +18,8 @@ | |||
import java.util.concurrent.CompletableFuture; | |||
|
|||
public class MessageQueueBenchmark { | |||
static final Duration REQUEST_CLEANUP_INTERVAL = Duration.ofSeconds(5); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All changes in this test file are b/c of the MessageQueue constructor change
static final byte[] PING = "PING".getBytes(); | ||
static final byte[] ONE = "one".getBytes(); | ||
static final byte[] TWO = "two".getBytes(); | ||
static final byte[] THREE = "three".getBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Just cleanup.
- All other changes in this test file are b/c of the MessageQueue constructor change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
The offer timeout was hard-coded to 5 seconds. This change takes the request cleanup intervalfrom Options and takes 95% of that time.