Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.elasticsearch.index.query.QueryBuilders.termQuery;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import org.apache.james.backends.es.utils.TestingClientProvider;
import org.apache.james.util.concurrent.NamedThreadFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.SearchQuery;
import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.mailbox.store.mail.MessageMapperFactory;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
Expand All @@ -68,7 +68,7 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
private final MessageToElasticSearchJson messageToElasticSearchJson;

@Inject
public ElasticSearchListeningMessageSearchIndex(MessageMapperFactory factory,
public ElasticSearchListeningMessageSearchIndex(MailboxSessionMapperFactory factory,
@Named(MailboxElasticSearchConstants.InjectionNames.MAILBOX) ElasticSearchIndexer indexer,
ElasticSearchSearcher searcher, MessageToElasticSearchJson messageToElasticSearchJson,
MailboxManager mailboxManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.List;
import java.util.Optional;

Expand All @@ -43,7 +42,7 @@
import org.apache.james.mailbox.mock.MockMailboxSession;
import org.apache.james.mailbox.model.TestId;
import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.mailbox.store.mail.MessageMapperFactory;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.elasticsearch.ElasticsearchException;
Expand All @@ -60,14 +59,12 @@
import com.google.common.collect.Lists;

public class ElasticSearchListeningMessageSearchIndexTest {


public static final long MODSEQ = 18L;
public static final MessageUid MESSAGE_UID = MessageUid.of(1);
public static final TestId MAILBOX_ID = TestId.of(12);
public static final String ELASTIC_SEARCH_ID = "12:1";
public static final String EXPECTED_JSON_CONTENT = "json content";
public static final String USERNAME = "username";
private static final long MODSEQ = 18L;
private static final MessageUid MESSAGE_UID = MessageUid.of(1);
private static final TestId MAILBOX_ID = TestId.of(12);
private static final String ELASTIC_SEARCH_ID = "12:1";
private static final String EXPECTED_JSON_CONTENT = "json content";
private static final String USERNAME = "username";

private ElasticSearchIndexer elasticSearchIndexer;
private MessageToElasticSearchJson messageToElasticSearchJson;
Expand All @@ -76,9 +73,8 @@ public class ElasticSearchListeningMessageSearchIndexTest {
private List<User> users;

@Before
public void setup() throws JsonProcessingException {

MessageMapperFactory mapperFactory = mock(MessageMapperFactory.class);
public void setup() {
MailboxSessionMapperFactory mapperFactory = mock(MailboxSessionMapperFactory.class);
messageToElasticSearchJson = mock(MessageToElasticSearchJson.class);
ElasticSearchSearcher elasticSearchSearcher = mock(ElasticSearchSearcher.class);
MailboxManager mockMailboxManager = mock(MailboxManager.class);
Expand Down Expand Up @@ -131,7 +127,7 @@ public void addShouldIndexEmailBodyWhenNotIndexableAttachment() throws Exception
verify(elasticSearchIndexer).index(eq(ELASTIC_SEARCH_ID), eq(EXPECTED_JSON_CONTENT));
}

private MailboxMessage mockedMessage(MessageUid messageId) throws IOException {
private MailboxMessage mockedMessage(MessageUid messageId) {
MailboxMessage message = mock(MailboxMessage.class);
when(message.getUid())
.thenReturn(messageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import org.apache.james.mailbox.model.SearchQuery.UidCriterion;
import org.apache.james.mailbox.model.SearchQuery.UidRange;
import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.mailbox.store.mail.MessageMapperFactory;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
Expand Down Expand Up @@ -363,24 +363,22 @@ public class LuceneMessageSearchIndex extends ListeningMessageSearchIndex {

@Inject
public LuceneMessageSearchIndex(
MessageMapperFactory factory,
MailboxSessionMapperFactory factory,
MailboxId.Factory mailboxIdFactory,
Directory directory,
MessageId.Factory messageIdFactory,
MailboxManager mailboxManager
) throws IOException {
MailboxManager mailboxManager) throws IOException {
this(factory, mailboxIdFactory, directory, false, true, messageIdFactory, mailboxManager);
}

public LuceneMessageSearchIndex(
MessageMapperFactory factory,
MailboxSessionMapperFactory factory,
MailboxId.Factory mailboxIdFactory,
Directory directory,
boolean dropIndexOnStart,
boolean lenient,
MessageId.Factory messageIdFactory,
MailboxManager mailboxManager
) throws IOException {
MailboxManager mailboxManager) throws IOException {
super(factory, mailboxManager);
this.mailboxIdFactory = mailboxIdFactory;
this.messageIdFactory = messageIdFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,26 @@

import java.io.InputStream;
import java.util.List;
import java.util.stream.Stream;

import javax.inject.Inject;

import org.apache.james.mailbox.Event;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.Role;
import org.apache.james.mailbox.SystemMailboxesProvider;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.store.event.EventFactory;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
import org.apache.james.mailbox.store.event.MessageMoveEvent;
import org.apache.james.mailbox.store.event.SpamEventListener;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.Message;
import org.apache.james.util.streams.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,15 +51,20 @@
public class SpamAssassinListener implements SpamEventListener {

private static final Logger LOGGER = LoggerFactory.getLogger(SpamAssassinListener.class);
private static final int LIMIT = 1;

private final SpamAssassin spamAssassin;
private final SystemMailboxesProvider systemMailboxesProvider;
private final MailboxManager mailboxManager;
private final MailboxSessionMapperFactory mapperFactory;
private final ExecutionMode executionMode;

@Inject
SpamAssassinListener(SpamAssassin spamAssassin, SystemMailboxesProvider systemMailboxesProvider, ExecutionMode executionMode) {
SpamAssassinListener(SpamAssassin spamAssassin, SystemMailboxesProvider systemMailboxesProvider, MailboxManager mailboxManager, MailboxSessionMapperFactory mapperFactory, ExecutionMode executionMode) {
this.spamAssassin = spamAssassin;
this.systemMailboxesProvider = systemMailboxesProvider;
this.mailboxManager = mailboxManager;
this.mapperFactory = mapperFactory;
this.executionMode = executionMode;
}

Expand All @@ -68,32 +80,55 @@ public ExecutionMode getExecutionMode() {

@Override
public void event(Event event) {
if (event instanceof MessageMoveEvent) {
MessageMoveEvent messageMoveEvent = (MessageMoveEvent) event;
if (isMessageMovedToSpamMailbox(messageMoveEvent)) {
LOGGER.debug("Spam event detected");
ImmutableList<InputStream> messages = retrieveMessages(messageMoveEvent);
spamAssassin.learnSpam(messages, event.getUser());
try {
MailboxSession session = mailboxManager.createSystemSession(getClass().getCanonicalName());
if (event instanceof MessageMoveEvent) {
handleMessageMove(event, (MessageMoveEvent) event);
}
if (isMessageMovedOutOfSpamMailbox(messageMoveEvent)) {
ImmutableList<InputStream> messages = retrieveMessages(messageMoveEvent);
spamAssassin.learnHam(messages, event.getUser());
if (event instanceof Added) {
handleAdded(event, session, (Added) event);
}
} catch (MailboxException e) {
LOGGER.warn("Error while processing SPAM listener", e);
}
if (event instanceof EventFactory.AddedImpl) {
EventFactory.AddedImpl addedEvent = (EventFactory.AddedImpl) event;
if (isAppendedToInbox(addedEvent)) {
List<InputStream> contents = addedEvent.getAvailableMessages()
.values()
.stream()
.map(Throwing.function(MailboxMessage::getFullContent))
.collect(Guavate.toImmutableList());
spamAssassin.learnHam(contents, event.getUser());
}
}

private void handleAdded(Event event, MailboxSession session, Added addedEvent) throws MailboxException {
if (isAppendedToInbox(addedEvent)) {
Mailbox mailbox = mapperFactory.getMailboxMapper(session).findMailboxById(addedEvent.getMailboxId());
MessageMapper messageMapper = mapperFactory.getMessageMapper(session);

List<InputStream> contents = MessageRange.toRanges(addedEvent.getUids())
.stream()
.flatMap(range -> retrieveMessages(messageMapper, mailbox, range))
.map(Throwing.function(MailboxMessage::getFullContent))
.collect(Guavate.toImmutableList());
spamAssassin.learnHam(contents, event.getUser());
}
}

private void handleMessageMove(Event event, MessageMoveEvent messageMoveEvent) {
if (isMessageMovedToSpamMailbox(messageMoveEvent)) {
LOGGER.debug("Spam event detected");
ImmutableList<InputStream> messages = retrieveMessages(messageMoveEvent);
spamAssassin.learnSpam(messages, event.getUser());
}
if (isMessageMovedOutOfSpamMailbox(messageMoveEvent)) {
ImmutableList<InputStream> messages = retrieveMessages(messageMoveEvent);
spamAssassin.learnHam(messages, event.getUser());
}
}

private Stream<MailboxMessage> retrieveMessages(MessageMapper messageMapper, Mailbox mailbox, MessageRange range) {
try {
return Iterators.toStream(messageMapper.findInMailbox(mailbox, range, MessageMapper.FetchType.Full, LIMIT));
} catch (MailboxException e) {
LOGGER.warn("Can not retrieve message {} {}", mailbox.getMailboxId(), range.toString(), e);
return Stream.empty();
}
}

private boolean isAppendedToInbox(EventFactory.AddedImpl addedEvent) {
private boolean isAppendedToInbox(Added addedEvent) {
try {
return systemMailboxesProvider.findMailbox(Role.INBOX, addedEvent.getUser())
.getId().equals(addedEvent.getMailboxId());
Expand All @@ -103,7 +138,7 @@ private boolean isAppendedToInbox(EventFactory.AddedImpl addedEvent) {
}
}

public ImmutableList<InputStream> retrieveMessages(MessageMoveEvent messageMoveEvent) {
private ImmutableList<InputStream> retrieveMessages(MessageMoveEvent messageMoveEvent) {
return messageMoveEvent.getMessages()
.values()
.stream()
Expand Down
Loading