New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[REM3-388][REM3-387] At OutboundMessage.bufferWriter.accept(), move the message… #259
Conversation
@dmlloyd can you please have a look and let me know what you think of it? Thanks! |
long ackTimeout = OutboundMessage.this.ackTimeout; | ||
long initialTime = System.currentTimeMillis(); | ||
do { | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initialTime = System.currentTimeMillis(); // initialization should be here inside do-while block, otherwise do-while() assignment of (ackTimeout -= (System.currentTimeMillis() - initialTime)) is invalid (may grow too fast incidentally)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, you should always use nanoTime
for timeouts, not wall clock time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ropalka thanks for reviewing! The intention is that exactly. This block is there just to protect against spurious wakes. I.e., if I want to wait for a specific time and I'm awaken before that timeout expires without a notify. So, by comparing the elapsed time since initialTime everytime I'm awaken and the condition is not met, I can certify that the timeout has really expired.
@dmlloyd I was wondering that, I saw a PR to XNIO that used nanotime and converted that to ms, I'm going to do the same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ropalka see my response to David below. Yes, you are correct, the initialization of intialTime is inconsistent with updating the field in the while loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is still there @fl4via , nanoTime() wasn't moved to do-while block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These timeout loops are a challenge every time. Part of the reason is that every blocking and timeout API works a little bit differently, so there's no single template that always works for all cases.
Because of this, my usual approach for these is to write the loop as being completely unwound, until I have enough repeated code that I can clearly see where the loop is and how it enters and exits, and then once I'm sure it's correct I can carefully refactor it into a cleaner loop "shape". This is something I learned after many, many mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ropalka the initialization of initialTime outside the block was a deliberate choice. I just forgot to add a final to that field, btw.
So, I'm initializing initialTime outside of the block. This marks the beginning of the time I'll wait. My expectations is that the normal path is without spurious wakes, so, I'll probably reach the end of the while block and leave the do/while after a single iteration. I.e., I'll either have been able to acquireWindow, or I'll have verified that the timeout is expired. In the event there was a spurious wake and the window was not acquired, I reenter the loop, after updating ackTimeout based on current time minus initialTime, a value that is decremented from the total ack timeout. So, ackTimeout field is updated and then I wait for the remaining amount of time I have available. Because I'm always recalculating ackTimeout based on the original timeout (i.e., OutboundMessage.this.ackTimeout), initialTime does not have to be updated at every iteration of the loop. Unless I'm missing something, in that case please point me where is the mistake and I'll fix it right away :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explanation @fl4via . Now I see what you had in mind.
if (cancelSent) { | ||
throw new MessageCancelledException(this + ": message was cancelled"); | ||
} | ||
} while (!decrementWindow(msgSize) && (ackTimeout -= (System.currentTimeMillis() - initialTime)) > 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition evaluation should be reordered to
(ackTimeout -= (System.currentTimeMillis() - initialTime)) > 0 && !decrementWindow(msgSize)
otherwise an attempt to decrement the window may occur after timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ropalka that's exactly the goal. To decrement the window if the condition is met. If that's the case, it means I was awaken for the right reason, and timeout has not been reached. This is the condition that is expected to be met unless there is something off with the connection. If the condition is not met, I'm going to check for timeout just to protect myself against spurious wakes.
long ackTimeout = OutboundMessage.this.ackTimeout; | ||
long initialTime = System.currentTimeMillis(); | ||
do { | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, you should always use nanoTime
for timeouts, not wall clock time.
if (cancelSent) { | ||
throw new MessageCancelledException(this + ": message was cancelled"); | ||
} | ||
} while (!decrementWindow(msgSize) && (ackTimeout -= (System.currentTimeMillis() - initialTime)) > 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intentional that ackTimeout
is always reduced by the total time elapsed, rather than the time since the last loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, now I see why @ropalka found it strange that I was initiating the initialTime before the loop. Thank you both for catching this. This is not correct, I'll fix it.
@@ -50,7 +50,7 @@ | |||
*/ | |||
final class RemoteReadListener implements ChannelListener<ConduitStreamSourceChannel> { | |||
// FIXME temporarily using a system property as a solution to ACK TIMEOUT issue until an RFE is properly submitted | |||
private static final int MESSAGE_ACK_TIMEOUT = Integer.parseInt(WildFlySecurityManager.getPropertyPrivileged("org.jboss.remoting3.remote.message.ack.timeout", String.valueOf(RemotingOptions.DEFAULT_MESSAGE_ACK_TIMEOUT))); | |||
private static final long MESSAGE_ACK_TIMEOUT = Integer.parseInt(WildFlySecurityManager.getPropertyPrivileged("org.jboss.remoting3.remote.message.ack.timeout", String.valueOf(RemotingOptions.DEFAULT_MESSAGE_ACK_TIMEOUT))) * 1_000_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is subject to a wrapping error if the timeout is more than around 2000 milliseconds; maybe do it like this instead:
private static final long MESSAGE_ACK_TIMEOUT = Integer.parseInt(WildFlySecurityManager.getPropertyPrivileged("org.jboss.remoting3.remote.message.ack.timeout", String.valueOf(RemotingOptions.DEFAULT_MESSAGE_ACK_TIMEOUT))) * 1_000_000; | |
private static final long MESSAGE_ACK_TIMEOUT = Integer.parseInt(WildFlySecurityManager.getPropertyPrivileged("org.jboss.remoting3.remote.message.ack.timeout", String.valueOf(RemotingOptions.DEFAULT_MESSAGE_ACK_TIMEOUT))) * 1_000_000L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only have one comment; otherwise I think it looks good!
|
@ropalka as a second thought, I'm going to change this PR a bit, I'll let you know when it is ready |
… ack timeout wait to a loop to protect against spurious wakes. Plus, check for other causes for being awakened before assuming the message ack timeout has expired. The other causes for being awaked are all points where notifyAll was invoked. So, we need to check for closeReceived, closeCalled & !eof, and cancelSent before checking if the timeout expired withou a message ack
@ropalka ready, please, review and, if you think it is okay, merge it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
long ackTimeout = OutboundMessage.this.ackTimeout; | ||
long initialTime = System.currentTimeMillis(); | ||
do { | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explanation @fl4via . Now I see what you had in mind.
… ack timeout wait to a loop to protect against spurious wakes. Plus, check for other causes for being awakened before assuming the message ack timeout has expired.
The other causes for being awaked are all points where notifyAll was invoked. So, we need to check for closeReceived, closeCalled & !eof, and cancelSent before checking if the timeout expired withou a message ack
Jiras: https://issues.redhat.com/browse/REM3-387
https://issues.redhat.com/browse/REM3-388