-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mailbox 285 Solve issues with flags update #556
Conversation
chibenwa
commented
Jan 11, 2017
- Flags Updates on something else than the last message
- Flags upates on duplicated mailboxIds
👍 |
(tested on my laptop with 10 threads that does 100 updates each) |
Concurrent test for MessageMapper and possible corrections needs to be done as well.... |
395436e
to
06cf4e5
Compare
|
||
package org.apache.james.mailbox.cassandra.mail; | ||
|
||
public class MailboxDeleteDuringUpdate extends RuntimeException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*MailboxDeleteDuringUpdateException
} | ||
|
||
@ContractTest | ||
public void setFlagsShouldWorkWithConcurency() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*Concurrency
List<MailboxMessage> messages = sut.find(ImmutableList.of(message1.getMessageId()), MessageMapper.FetchType.Body); | ||
assertThat(messages).hasSize(1); | ||
assertThat(messages.get(0).createFlags().getUserFlags()).hasSize(threadCount * updateCount); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think you can add the same tests with some delete operations instead of add?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't it become highly correletated with execution order then ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope: first create the n flags, then delete them in whatever order and just ensure all are removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my idea was simpler: first you create the n flags at once, without threads. And then you delete all of them with threads.
messageMapper.updateFlags(mailbox, | ||
new FlagsUpdateCalculator(new Flags("custom-" + threadNumber + "-" + i), | ||
FlagsUpdateMode.ADD), | ||
MessageRange.one(uid)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can extract all but this call in a lamba which could be a parameter to ConcurrentSetFlagTestRunnable and then make this one shared between the two test classes.
|
test this please |
CI CI CI !!!!
|
test this please |
Hmmm... I'm very sorry to say so but I have stricktly no idea on haw to write such tests without polluting implementation with something that would allow me to do barrier injection (what I don't want!) |
Rebased on master The only addition is the last commit. It introduced a mix of ADD and REMOVE. (Note : to avoid REMOVE of a flag being sceduled before it is added, I need to ensure it is done by the same thread) |
return mailboxIds.stream() | ||
.map(mailboxId -> (CassandraId) mailboxId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really prefer having the cast first
.map(composedMessageId -> flagsUpdateWithRetry(newState, updateMode, composedMessageId)) | ||
.distinct() | ||
.filter(mailboxId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of((CassandraId) mailboxId)) | ||
.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to this review but the join here break the "no early join" rule
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well if we want to get ride of it, we need our retrier to handle futures
.findAny() | ||
.isPresent()) | ||
.map(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId)) | ||
.filter(Optional::isPresent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flagsUpdateWithRetry being a private method, it could return a Stream on zero or one element and flatmap here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also mdified in CassandraMessageMapper
newFlagsWithModSeq.getLeft())); | ||
Pair<Flags, ComposedMessageIdWithMetaData> pair = new FunctionRunnerWithRetry(MAX_RETRY) | ||
.executeAndRetrieveObject(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId)); | ||
return Optional.of(Pair.of(pair.getRight().getComposedMessageId().getMailboxId(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pair.getRight is called 4 times, I think it deserves a temp (named) variable
CompletableFuture<Boolean> imapUidFuture = imapUidDAO.updateMetadata(composedMessageIdWithMetaData, oldModSeq); | ||
CompletableFuture<Boolean> messageIdFuture = messageIdDAO.updateMetadata(composedMessageIdWithMetaData, oldModSeq); | ||
return imapUidFuture.join() && messageIdFuture.join(); | ||
private Optional<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method is too big
executorService.submit(new ConcurrentRunnableTask(i)); | ||
} | ||
executorService.shutdown(); | ||
assertThat(executorService.awaitTermination(1, TimeUnit.MINUTES)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should not use an assert here, you can throw instead
.isTrue(); | ||
} | ||
|
||
protected abstract void performOperation(int threadNumber, int step) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
composition ! please requires a BiConsumer in your runner, it makes things way simpler to use
executorService.submit(new ConcurrentRunnableTask(i)); | ||
} | ||
executorService.shutdown(); | ||
assertThat(executorService.awaitTermination(1, TimeUnit.MINUTES)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add that you should allow the caller to do that await, for example by returning a CompletableFuture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No method to do that by completableFuture, but behaviour added
|
||
public void run() throws Exception { | ||
ExecutorService executorService = Executors.newFixedThreadPool(threadCount); | ||
for (int i = 0; i < threadCount; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about a java8 stream with range ? (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not in a java8 project
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public abstract class ConcurrentTestRunner { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about some tests ?
Modseq used for condition was not well retrieved
test this please |
test this please |
private Optional<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(ComposedMessageIdWithMetaData oldComposedId, ComposedMessageIdWithMetaData newComposedId) { | ||
return imapUidDAO.updateMetadata(newComposedId, oldComposedId.getModSeq()) | ||
.thenCompose(updateSuccess -> Optional.of(updateSuccess) | ||
.filter(b -> b) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are these filters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To repkace if as asked by Matthieu
List<MailboxMessage> messages = sut.find(ImmutableList.of(message1.getMessageId()), MessageMapper.FetchType.Body); | ||
assertThat(messages).hasSize(1); | ||
assertThat(messages.get(0).createFlags().getUserFlags()).hasSize(threadCount * updateCount); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my idea was simpler: first you create the n flags at once, without threads. And then you delete all of them with threads.
Here at least it even mixes add and deletes |
test this please |