-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
INT-3846: Some fixes for the SimpleMessageStore
#1585
Conversation
@celandro, Feel free to comment! Thank you very much for the contribution any way! |
Looks great! I really like the per group limit, quite clean |
@@ -233,8 +270,8 @@ public void removeMessageGroup(Object groupId) { | |||
return; | |||
} | |||
|
|||
groupUpperBound.release(groupIdToMessageGroup.get(groupId).size()); | |||
groupIdToMessageGroup.remove(groupId); | |||
this.groupToUpperBound.remove(groupId).release(this.groupCapacity); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this NPE if groupId does not exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. Because the code here is enough robust from the concurrent access:
lock.lockInterruptibly();
try {
if (!groupIdToMessageGroup.containsKey(groupId)) {
return;
}
this.groupToUpperBound.remove(groupId).release(this.groupCapacity);
this.groupIdToMessageGroup.remove(groupId);
}
finally {
lock.unlock();
}
You can see the lock
and a couple of modification operators if groupId
exists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, i checked the addMessageToGroup code to make sure. Id still be paranoid about it and add a null check to future proof the code on this.groupToUpperBound.remove(groupId)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've revised this and it does not make sense for me to release
that UpperBound
any more.
If if remove the group
, we should just clean everything around it.
If something is waiting for the permits (]addMessageToGroup]), it should create a new UpperBound
.
If you can get this build on maven central, I can load test it before your release this week. Its a pain to test a foreign snapshot in our build system or I'd test it off this branch. |
@celandro , We don't place any SNAPSHOT, or even M or RC release to the Maven Central, but you can test it against Spring's repository:
We let you know when it will be available there. BTW, this PR is only for CC @garyrussell |
Found a major bug in DelayHandler which makes this unusable private void doReleaseMessage(Message<?> message) {
if (removeDelayedMessageFromMessageStore(message)) {
this.messageStore.removeMessagesFromGroup(this.messageGroupId, message);
this.handleMessageInternal(message);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("No message in the Message Store to release: " + message +
". Likely another instance has already released it.");
}
}
}
private boolean removeDelayedMessageFromMessageStore(Message<?> message) {
if (this.messageStore instanceof SimpleMessageStore) {
SimpleMessageGroup messageGroup =
(SimpleMessageGroup) this.messageStore.getMessageGroup(this.messageGroupId);
return messageGroup.remove(message);
}
else {
return ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null;
}
} EDIT: |
This fixes the issue but I don't like it. The upperbound release has to be moved outside of the lock. @Override
public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messages) {
UpperBound upperBound = this.groupToUpperBound.get(groupId);
upperBound.release(messages.size());
Lock lock = this.lockRegistry.obtain(groupId);
try {
lock.lockInterruptibly();
try {
SimpleMessageGroup group = this.groupIdToMessageGroup.get(groupId);
Assert.notNull(group, "MessageGroup for groupId '" + groupId + "' " +
"can not be located while attempting to remove Message(s) from the MessageGroup");
boolean modified = false;
for (Message<?> messageToRemove : messages) {
if (group.remove(messageToRemove)) {
modified = true;
}
}
if (modified) {
group.setLastModified(System.currentTimeMillis());
}
}
finally {
lock.unlock();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MessagingException("Interrupted while obtaining lock", e);
}
} I'm sure there is a better way but I'm at a loss. |
@@ -208,6 +239,12 @@ public MessageGroup addMessageToGroup(Object groupId, Message<?> message) { | |||
if (group == null) { | |||
group = new SimpleMessageGroup(groupId); | |||
this.groupIdToMessageGroup.putIfAbsent(groupId, group); | |||
this.groupToUpperBound.putIfAbsent(groupId, new UpperBound(this.groupCapacity)); | |||
} | |||
if (!this.groupToUpperBound.get(groupId).tryAcquire(this.upperBoundTimeout)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We either need a new catch for InterruptedException
here, or at least we need to fix the message in the MessagingException
in the outer catch since the interrupt might be here not on the lock acquisition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@garyrussell , unfortunately we don't have here InterruptedException
.
See UpperBound.tryAcquire()
source code:
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
Or am I missing anything here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, of course; yes, no problem; ignore my comment.
Hello, @celandro ! Thank you very much for pointing that out! Well, looking to the See:
Not sure yet how to fix or even overcome, but I see that your concern for the I guess we can disable |
That fixes 1 issue, but the issue of the tryAquire inside the lockExclusively means that you can easily exceed the maxAquire time. Moving the tryAquires in all the addMessage methods outside the lock would fix that edit: and yes it was nasty, my simple code that takes messages off a queue, puts them in a delayer and then aggregates and sends never got to the aggregate part :) |
That is an interesting point. But how about do not rely on the Having these all concerns, I'd say that we can't bakcport it to all those But I definetelly sure all this changes and fixes are very dangerous for those versions which are here for a while already... |
I will make a few changes and make a new pull request after I loadtest a few million messages through a delayhandler. The code will be setup to log warnings if remove is called on messages that don't exist. Remember that the code in the release version will hang if you ever insert maxCapacity elements into the store. An imperfect fix is still much better than the release version |
Not sure what you mean. Please, find the JavaDocs for the
As I said: this " imperfect fix" produces dead lock. Even if it is for some time ( Having that I even don't see reason to have this |
I agree this version, can not go to production, I was saying I could make a version without this deadlock that will work. You are correct, it does not hang on the producer end but it prevents any new elements from being added, ever. The effect is rejecting all new entries into the store or completely breaking a delayer. My consumers of the queue look hung in this situation but its actually the message store. As for semaphores or atomic integers, I have no opinion, I was trying for the most minimally invasive changes originally but I can certainly make this version have no race conditions and only have issues with more elements in the store than is allowed when remove is called on elements that do not exist (same issue as the current version). I'm also a little concerned there may have been a performance degredation at some point that makes a delayer working off a large message store(~150K backlog) perform poorly. The remove methods being used are O(N) worst case, I will look into it. Im only getting about 70/sec in my load test environment with this version, my prod version based on 2.2.6 gets 1200/sec. There may be some other issue in my environment, or the locking differences may be the culprit, Will look today. Perhaps I should extend SimpleMessageStore instead of modifying it? or make an interface they both conform to so DelayerHandler and others can check for that? |
You definitely can make your own MessageGroupStore and inject it to the DelayHandler. For this one I must think more. Come back to after the lunch |
Pushed some fix. |
Good morning! Pushed some simple polishing, but it isn't the final solution yet. I can't still figure out the fix for dead lock. Need to think more. Feel free to share you thought here how to overcome that dead lock. Thanks in advance! |
Pushed. Thanks for feedback, contribution, testing and just for the keeping dialog. Looking forward for more contribution! |
Hi @celandro ! I know you are busy, too. We have some urgent plan to release 😄 Thank you for the contribution one more time! |
It looks good to me but I haven't had a chance to load test unfortunately. |
Unfortunately I was not able to test with this code change however I tested with just a modified extension of SimpleMessageStore. 2 issues
@Override
public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messages) {
Lock lock = this.lockRegistry.obtain(groupId);
try {
lock.lockInterruptibly();
try {
SimpleMessageGroup group = this.groupIdToMessageGroup.get(groupId);
Assert.notNull(group, "MessageGroup for groupId '" + groupId + "' " +
"can not be located while attempting to remove Message(s) from the MessageGroup");
UpperBound upperBound = this.groupToUpperBound.get(groupId);
Assert.state(upperBound != null, "'upperBound' must not be null.");
boolean modified = false;
upperBound.release(messages.size());
for (Message<?> messageToRemove : messages) {
if (group.remove(messageToRemove)) {
modified = true;
} else {
log.warn("Could not remove message, capacity may be exceeded");
}
}
if (modified) {
group.setLastModified(System.currentTimeMillis());
}
}
finally {
lock.unlock();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MessagingException("Interrupted while obtaining lock", e);
}
} |
Hello, @celandro ! Thank you again for feedback! So, let me try to answer to your concerns:
Not a case for now. This my private boolean removeDelayedMessageFromMessageStore(Message<?> message) {
if (this.messageStore instanceof SimpleMessageStore) {
synchronized (this.messageGroupId) {
Collection<Message<?>> messages = this.messageStore.getMessageGroup(this.messageGroupId).getMessages();
if (messages.contains(message)) {
this.messageStore.removeMessageFromGroup(this.messageGroupId, message); So, we call here Another change looks like: if (!(this.messageStore instanceof SimpleMessageStore)) {
this.messageStore.removeMessagesFromGroup(this.messageGroupId, message);
} So, we don't perform Therefore I don't see your issue right now.
It isn't clear for me what is messageGroup.remove(message); I agree and from the |
Agree remove is not an issue however .contains might be. I would do the following: private void doReleaseMessage(Message<?> message) {
if (removeDelayedMessageFromMessageStore(message)) {
this.messageStore.removeMessagesFromGroup(this.messageGroupId, message);
this.handleMessageInternal(message);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("No message in the Message Store to release: " + message +
". Likely another instance has already released it.");
}
}
}
private boolean removeDelayedMessageFromMessageStore(Message<?> message) {
if (this.messageStore instanceof SimpleMessageStore) {
// not handled in here. Due to potential locking on .contains, just return true
return true;
}
else {
return ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null;
}
} Unfortunately the contract of SimpleMessageStore is not a great fit for use in a delay store as there is no way to get the number of messages removed and it does not match the implementation of any other setup. Passing around the message itself instead of just the id and group seems strange to me as well. I also am concerned with the contract of MessageStore in general as calling removeMessage followed by removeMessageFromGroup is unintuitive and implies that the caller needs to clean up what amounts to an index. One more major issue is when configurable delays are enabled, SimpleMessageGroup reverts to O(n) performance since there is no guaranteed ordering of the .remove methods when they are called. For example, in my situation, certain events have no delay and some events have 30s delay. This causes the no delay events to scan the entire queue before removal. These events then have the potential of blocking up the entire processing pipeline. I think making a version of SimpleMessageGroup that works with only ConcurrentHashMaps would be better performance. I will work on it. |
No, It's bad again. You don't check if you removed message from the store really. And therefore will end up with the issue for which that change in the I don't think that my Regarding all your other concerns... From one side I agree that For you idea around You last issue isn't clear to me at all... if (delay > 0) {
this.releaseMessageAfterDelay(requestMessage, delay);
return null;
} Please, be more specific, but already in the new JIRA and with some test-case to reproduce or, at least, to play. Let's finish with this PR and summarize if it is sufficient to merge and backport! Actually we with @garyrussell just wait here for your acceptance 😄 |
I understand your desire not to touch the contract. Looked at the version using ConcurrentHashMap and I think it looks good. I implemented something very similar as a different class since there might be some use case where they want to use it as an actual queue. Maybe name the class RandomAccessMessageStore. Regardless, I will test your version and let you know how it performs. Sorry we had just increased the default delay to 100ms with the others waiting 30000ms, my mistake. The random access should resolve the issue anyhow. |
Great! Thank you! |
I think it is good to release as well and resolves the issue with this bug. I see no reason to hold up the release. I will let you know about the other class you made. The only issue I see so far is the wiring is a bit awkward as you have to set the groupId to something like "delayerId.messageGroupId" and I'm about to run the load test... |
JIRAs: https://jira.spring.io/browse/INT-3830 https://jira.spring.io/browse/INT-3523 https://jira.spring.io/browse/INT-3846 * Fix the OOM condition, when we `release()` `UpperBound` independently of the previous `remove` result (https://jira.spring.io/browse/INT-3846) * Fix "confuse" around `groupCapacity`, when we really didn't care about individual groups (https://jira.spring.io/browse/INT-3523) * Add `upperBoundTimeout` to have a hook to wait some time for the empty slot in the store (https://jira.spring.io/browse/INT-3830) * Fix some JavaDocs warnings * Fix some typos
…oup` when `MS` is `SimpleMessageStore` * Remove `UpperBound.release()` operation from `SimpleMessageStore.removeGroup()`. The waiting process should worry about the new `UpperBound` instance.
But do that only for groups which already exist. For the new groups we have a fresh `UpperBound`, so no need to worry about dead lock and we can obtain a permit immediately.
Make some synchronization fixes according to the migration to the `LinkedHashSet`
… of `containsAll()`
Fix some typos
Pushed after rebase and Copyright fix, and |
So I should merge and that will be a new PR? |
Doh... Yes, of course we need doc. They will be here. |
Pushed Docs. |
Pushed fix for JavaDocs. |
@garyrussell , would you mind to review the last couple commits to close this lo-o-ong track and 3 JIRAs to have green graphic in JIRA project again 😄 |
Rebased, squashed, merged as 201f0fc Congrats @artembilan !! |
And especial thanks to @celandro , who initiated the work and has forced us to reconsider the stuff. Looking forward for more similar valuable contributions! 😄 |
JIRAs:
https://jira.spring.io/browse/INT-3830
https://jira.spring.io/browse/INT-3523
https://jira.spring.io/browse/INT-3846
release()
UpperBound
independently of the previousremove
result (https://jira.spring.io/browse/INT-3846)groupCapacity
, when we really didn't care about individual groups (https://jira.spring.io/browse/INT-3523)upperBoundTimeout
to have a hook to wait some time for the empty slot in the store (https://jira.spring.io/browse/INT-3830)