Skip to content

Commit

Permalink
Merge pull request #29 from manusa/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
manusa committed Sep 3, 2019
2 parents 0e92792 + bf0bceb commit d3c80a6
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 60 deletions.
3 changes: 2 additions & 1 deletion src/main/java/com/marcnuri/mnimapsync/MNIMAPSync.java
Expand Up @@ -38,6 +38,7 @@
*
* @author Marc Nuri <marc@marcnuri.com>
*/
@SuppressWarnings("WeakerAccess")
public class MNIMAPSync {

static final int THREADS = 5;
Expand Down Expand Up @@ -120,7 +121,7 @@ public void sync() {
indexTargetStore();
copySourceToTarget();
//Delete only if source store was completely indexed (this happens if no exceptions where raised)
if (syncOptions.getDelete() && sourceIndex != null && !sourceCopier.hasCopyException()) {
if (syncOptions.getDelete() && !sourceCopier.hasCopyException()) {
deleteFromTarget();
}
} catch (MessagingException | GeneralSecurityException ex) {
Expand Down
Expand Up @@ -72,7 +72,7 @@ public void run() {
}
folder.close(false);
} catch (MessagingException messagingException) {
index.getCrawlExceptions().add(messagingException);
index.addCrawlException(messagingException);
}
index.updatedIndexedMessageCount(indexedMessages);
index.updatedSkippedMessageCount(skippedMessages);
Expand Down
35 changes: 14 additions & 21 deletions src/main/java/com/marcnuri/mnimapsync/index/Index.java
Expand Up @@ -18,11 +18,7 @@

import static com.marcnuri.mnimapsync.imap.IMAPUtils.INBOX_MAILBOX;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -36,29 +32,27 @@
*/
public class Index {

private AtomicReference<String> folderSeparator;
private AtomicReference<String> inbox;
private final AtomicReference<String> folderSeparator;
private final AtomicReference<String> inbox;
private final Set<String> folders;
private final Map<String, Set<MessageId>> folderMessages;
private final AtomicLong indexedMessageCount;
private final AtomicLong skippedMessageCount;
//If no empty, the other processes shouldn't continue
private final List<MessagingException> crawlExceptions;
private final Set<MessagingException> crawlExceptions;

public Index() {
this.folderSeparator = new AtomicReference<>();
this.inbox = new AtomicReference<>();
this.folders = ConcurrentHashMap.newKeySet();
this.folderMessages = Collections.synchronizedMap(new HashMap<>());
this.folderMessages = new ConcurrentHashMap<>();
this.indexedMessageCount = new AtomicLong();
this.skippedMessageCount = new AtomicLong();
this.crawlExceptions = Collections.synchronizedList(new ArrayList<>());
this.crawlExceptions = ConcurrentHashMap.newKeySet();
}

public final boolean hasCrawlException() {
synchronized (crawlExceptions) {
return !crawlExceptions.isEmpty();
}
return !crawlExceptions.isEmpty();
}

public final void updatedIndexedMessageCount(long delta) {
Expand Down Expand Up @@ -104,17 +98,16 @@ public final long getSkippedMessageCount() {
return skippedMessageCount.longValue();
}

public synchronized Set<MessageId> getFolderMessages(String folder) {
synchronized (folderMessages) {
if (!folderMessages.containsKey(folder)) {
folderMessages.put(folder, Collections.synchronizedSet(new HashSet<>()));
}
return folderMessages.get(folder);
}
public Set<MessageId> getFolderMessages(String folder) {
return folderMessages.computeIfAbsent(folder, k -> ConcurrentHashMap.newKeySet());
}

final void addCrawlException(MessagingException exception) {
crawlExceptions.add(exception);
}

public final synchronized List<MessagingException> getCrawlExceptions() {
return crawlExceptions;
public final Set<MessagingException> getCrawlExceptions() {
return Collections.unmodifiableSet(crawlExceptions);
}


Expand Down
11 changes: 1 addition & 10 deletions src/main/java/com/marcnuri/mnimapsync/index/MessageId.java
Expand Up @@ -58,9 +58,6 @@ public class MessageId implements Serializable {
* will duplicate.
*
* It's a pity because fetching all of the HEADERS is a performance HOG
*
* @param message
* @throws MessageId.MessageIdException
*/
public MessageId(Message message) throws MessageIdException {
try {
Expand Down Expand Up @@ -111,9 +108,6 @@ public int hashCode() {

/**
* Really important. Different servers return different address values when they are invalid.
*
* @param addresses
* @return
*/
private static String[] parseAddress(String[] addresses) {
if (addresses != null) {
Expand All @@ -125,17 +119,14 @@ private static String[] parseAddress(String[] addresses) {
}
}
Collections.sort(ret);
return ret.toArray(new String[ret.size()]);
return ret.toArray(new String[0]);
}
return new String[0];
}


/**
* Adds required headers to fetch profile
*
* @param fetchProfile
* @return
*/
public static FetchProfile addHeaders(FetchProfile fetchProfile) {
fetchProfile.add(FetchProfile.Item.ENVELOPE);
Expand Down
Expand Up @@ -53,7 +53,7 @@ public static Index populateFromStore(final Index index, Store store, int thread
service.shutdown();
service.awaitTermination(1, TimeUnit.HOURS);
if (index.hasCrawlException()) {
messagingException = index.getCrawlExceptions().get(0);
messagingException = index.getCrawlExceptions().iterator().next();
}
if (messagingException != null) {
throw messagingException;
Expand Down
Expand Up @@ -71,7 +71,7 @@ public void run() {
final List<Message> toCopy = new ArrayList<>();
for (Message message : sourceMessages) {
try {
final MessageId id = new MessageId((IMAPMessage) message);
final MessageId id = new MessageId(message);
//Index message for deletion (if necessary)
if (storeCopier.getSourceIndex() != null) {
storeCopier.getSourceIndex().getFolderMessages(sourceFolderName).add(id);
Expand All @@ -93,13 +93,13 @@ public void run() {
fullProfile.add(FetchProfile.Item.FLAGS);
fullProfile.add(IMAPFolder.FetchProfileItem.HEADERS);
fullProfile.add(FetchProfile.Item.SIZE);
sourceFolder.fetch(toCopy.toArray(new Message[toCopy.size()]), fullProfile);
sourceFolder.fetch(toCopy.toArray(new Message[0]), fullProfile);
final Folder targetFolder = storeCopier.getTargetStore().getFolder(targetFolderName);
targetFolder.open(Folder.READ_WRITE);
for (Message message : toCopy) {
targetFolder.appendMessages(new Message[]{message});
try {
targetFolderMessages.add(new MessageId((IMAPMessage) message));
targetFolderMessages.add(new MessageId(message));
copied++;
if (copied % updateCount == 0) {
storeCopier.updatedMessagesCopiedCount(copied);
Expand Down
Expand Up @@ -18,7 +18,6 @@

import com.marcnuri.mnimapsync.index.Index;
import com.marcnuri.mnimapsync.index.MessageId;
import com.sun.mail.imap.IMAPMessage;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -53,7 +52,8 @@ public MessageDeleter(StoreDeleter storeDeleter,
}

public void run() {
long deleted = 0L, skipped = 0L;
long deleted = 0L;
long skipped = 0L;
try {
final Folder targetFolder = storeDeleter.getTargetStore().getFolder(targetFolderName);
//Opens a new connection per Thread
Expand All @@ -62,7 +62,7 @@ public void run() {
targetFolder.fetch(targetMessages, MessageId.addHeaders(new FetchProfile()));
for (Message message : targetMessages) {
try {
final MessageId id = new MessageId((IMAPMessage) message);
final MessageId id = new MessageId(message);
if (!sourceFolderMessages.contains(id)) {
message.setFlag(Flags.Flag.DELETED, true);
deleted++;
Expand Down
18 changes: 6 additions & 12 deletions src/main/java/com/marcnuri/mnimapsync/store/StoreCopier.java
Expand Up @@ -88,9 +88,6 @@ public final void copy() throws InterruptedException {
*
* It also indexes the source store folders if we want to delete target folders that no longer
* exist
*
* @param folder
* @throws MessagingException
*/
private void copySourceFolder(Folder folder) throws MessagingException {
final String sourceFolderName = folder.getFullName();
Expand All @@ -106,9 +103,9 @@ private void copySourceFolder(Folder folder) throws MessagingException {
throw new MessagingException(String.format(
"Couldn't create folder: %s in target server.", sourceFolderName));
}
updatedFoldersCopiedCount(1);
incrementFoldersCopiedCount();
} else {
updatedFoldersSkippedCount(1);
incrementFoldersSkippedCount();
}
//Folder recursion. Get all children
if ((folder.getType() & Folder.HOLDS_FOLDERS) == Folder.HOLDS_FOLDERS) {
Expand All @@ -121,9 +118,6 @@ private void copySourceFolder(Folder folder) throws MessagingException {
/**
* Once the folder structure has been created it copies messages recursively from the root
* folder.
*
* @param sourceFolder
* @throws MessagingException
*/
private void copySourceMessages(IMAPFolder sourceFolder) throws MessagingException {
if (sourceFolder != null) {
Expand Down Expand Up @@ -170,12 +164,12 @@ public final boolean hasCopyException() {
}
}

private void updatedFoldersCopiedCount(int delta) {
foldersCopiedCount.getAndAdd(delta);
private void incrementFoldersCopiedCount() {
foldersCopiedCount.getAndAdd(1);
}

private void updatedFoldersSkippedCount(int delta) {
foldersSkippedCount.getAndAdd(delta);
private void incrementFoldersSkippedCount() {
foldersSkippedCount.getAndAdd(1);
}

protected final void updatedMessagesCopiedCount(long delta) {
Expand Down
13 changes: 6 additions & 7 deletions src/main/java/com/marcnuri/mnimapsync/store/StoreDeleter.java
Expand Up @@ -20,7 +20,6 @@

import com.marcnuri.mnimapsync.MNIMAPSync;
import com.marcnuri.mnimapsync.index.Index;
import com.sun.mail.imap.IMAPFolder;
import com.sun.mail.imap.IMAPStore;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -65,15 +64,15 @@ public final void delete() throws InterruptedException {
//Delete Folder Structure
deleteTargetFolder(targetStore.getDefaultFolder());
//Copy messages
deleteTargetMessages((IMAPFolder) targetStore.getDefaultFolder());
deleteTargetMessages(targetStore.getDefaultFolder());
} catch (MessagingException ex) {
Logger.getLogger(StoreDeleter.class.getName()).log(Level.SEVERE, null, ex);
}
service.shutdown();
service.awaitTermination(1, TimeUnit.DAYS);
}

private void deleteTargetMessages(IMAPFolder targetFolder) throws MessagingException {
private void deleteTargetMessages(Folder targetFolder) throws MessagingException {
if (targetFolder != null) {
final String targetFolderName = targetFolder.getFullName();
final String sourceFolderName = targetToSourceFolderName(targetFolderName, sourceIndex, targetIndex);
Expand All @@ -98,7 +97,7 @@ private void deleteTargetMessages(IMAPFolder targetFolder) throws MessagingExcep
//Folder recursion. Get all children
if ((targetFolder.getType() & Folder.HOLDS_FOLDERS) == Folder.HOLDS_FOLDERS) {
for (Folder child : targetFolder.list()) {
deleteTargetMessages((IMAPFolder) child);
deleteTargetMessages(child);
}
}
}
Expand All @@ -111,7 +110,7 @@ private void deleteTargetFolder(Folder folder) throws MessagingException {
if (!sourceIndex.containsFolder(sourceFolderName)) {
//Delete recursively
targetStore.getFolder(targetFolderName).delete(true);
updatedFoldersDeletedCount(1);
incrementFoldersDeletedCount();
}
//Folder recursion. Get all children
if ((folder.getType() & Folder.HOLDS_FOLDERS) == Folder.HOLDS_FOLDERS) {
Expand All @@ -121,8 +120,8 @@ private void deleteTargetFolder(Folder folder) throws MessagingException {
}
}

private void updatedFoldersDeletedCount(int delta) {
foldersDeletedCount.getAndAdd(delta);
private void incrementFoldersDeletedCount() {
foldersDeletedCount.getAndAdd(1);
}

protected final void updatedMessagesDeletedCount(long delta) {
Expand Down
Expand Up @@ -24,6 +24,8 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -68,7 +70,7 @@ void tearDown() {
}

@Test
void populateFromStore_storeHasFolders_ShouldPopulateIndex() throws Exception {
void populateFromStore_storeHasFolders_shouldPopulateIndex() throws Exception {
// Given
final Index index = new Index();
doReturn(1).when(defaultFolder).getMessageCount();
Expand All @@ -83,6 +85,26 @@ void populateFromStore_storeHasFolders_ShouldPopulateIndex() throws Exception {
assertThat(index.hasCrawlException(), equalTo(false));
}

@Test
void populateFromStore_indexHasExceptionsAndStoreHasFolders_shouldThrowException() throws Exception {
// Given
final Index index = new Index();
index.addCrawlException(new MessagingException("Indexing tasks went wrong at some point"));
doReturn(1).when(defaultFolder).getMessageCount();
// When
final MessagingException result = assertThrows(MessagingException.class, () -> {
populateFromStore(index, imapStore, 1);
fail();
});
// Then
verify(defaultFolder, times(1)).expunge();
assertThat(index.containsFolder("INBOX"), equalTo(true));
assertThat(index.containsFolder("Folder 1"), equalTo(true));
assertThat(index.containsFolder("Folder 2"), equalTo(true));
assertThat(index.hasCrawlException(), equalTo(true));
assertThat(result.getMessage(), equalTo("Indexing tasks went wrong at some point"));
}

private static IMAPFolder mockFolder(String name) throws MessagingException {
final IMAPFolder mockFolder = Mockito.mock(IMAPFolder.class);
doReturn(name).when(mockFolder).getFullName();
Expand Down

0 comments on commit d3c80a6

Please sign in to comment.