Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1153: Some emails still not posted as comments in PR #1214

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -251,7 +251,7 @@ private void submitOrSchedule(WorkItem item) {

for (var pendingItem : pending.entrySet()) {
// If there are pending items of the same type that we cannot run concurrently with, replace them.
if (pendingItem.getKey().getClass().equals(item.getClass()) && !pendingItem.getKey().concurrentWith(item)) {
if (item.replaces(pendingItem.getKey())) {
erikj79 marked this conversation as resolved.
Show resolved Hide resolved
log.finer("Discarding obsoleted item " + pendingItem.getKey() +
" in favor of item " + item);
DISCARDED_COUNTER.labels(item.botName(), item.workItemName()).inc();
@@ -33,6 +33,15 @@ public interface WorkItem {
*/
boolean concurrentWith(WorkItem other);

/**
* Returns true if this item should replace the other item in the queue. By default
* this is true if both items are of the same type, and cannot run concurrently with
* each other. In some cases we need a more specific condition.
*/
default boolean replaces(WorkItem other) {
return this.getClass().equals(other.getClass()) && !concurrentWith(other);
}

/**
* Execute the appropriate tasks with the provided scratch folder. Optionally return follow-up work items
* that will be scheduled for execution.
@@ -40,7 +40,7 @@ public class ArchiveReaderWorkItem implements WorkItem {

@Override
public String toString() {
return "ArchiveReaderWorkItem@" + list;
return "ArchiveReaderWorkItem@" + bot.repository().name();
}

@Override
@@ -55,6 +55,17 @@ public boolean concurrentWith(WorkItem other) {
return false;
}

/**
* An ArchiveReaderWorkItem can't run concurrently with another item that shares the same
* MailingListReader, but it only replaces an item that acts on the same repository.
*/
@Override
public boolean replaces(WorkItem other) {
return !concurrentWith(other)
&& (other instanceof ArchiveReaderWorkItem archiveReaderWorkItem)
&& bot.repository().name().equals(archiveReaderWorkItem.bot.repository().name());
}

@Override
public Collection<WorkItem> run(Path scratchPath) {
// Give the bot a chance to act on all found messages
@@ -35,20 +35,22 @@
import java.util.stream.Collectors;

public class MailingListArchiveReaderBot implements Bot {
private final EmailAddress archivePoster;
private final MailingListReader list;
private final Set<HostedRepository> repositories;
private final HostedRepository repository;
private final Map<EmailAddress, String> parsedConversations = new HashMap<>();
private final Map<EmailAddress, PullRequest> resolvedPullRequests = new HashMap<>();
private final Set<EmailAddress> parsedEmailIds = new HashSet<>();
private final Queue<CommentPosterWorkItem> commentQueue = new ConcurrentLinkedQueue<>();
private final Pattern pullRequestLinkPattern = Pattern.compile("^(?:PR: |Pull request:\\R)(.*?)$", Pattern.MULTILINE);
private final Logger log = Logger.getLogger("org.openjdk.skara.bots.mlbridge");

MailingListArchiveReaderBot(EmailAddress archivePoster, MailingListReader list, Set<HostedRepository> repositories) {
this.archivePoster = archivePoster;
MailingListArchiveReaderBot(MailingListReader list, HostedRepository repository) {
this.list = list;
this.repositories = repositories;
this.repository = repository;
}

public HostedRepository repository() {
return repository;
}

private synchronized void invalidate(List<Email> messages) {
@@ -103,10 +105,7 @@ synchronized void inspect(Conversation conversation) {
if (prLink.equals("invalid")) {
return;
}
var foundPr = repositories.stream()
.map(repository -> repository.parsePullRequestUrl(prLink))
.filter(Optional::isPresent)
.map(Optional::get).findAny();
var foundPr = repository.parsePullRequestUrl(prLink);
if (foundPr.isEmpty()) {
log.info("PR link that can't be matched to an actual PR: " + prLink);
parsedConversations.put(first.id(), "invalid");
@@ -24,7 +24,7 @@

import org.openjdk.skara.bot.*;
import org.openjdk.skara.email.EmailAddress;
import org.openjdk.skara.forge.HostedRepository;
import org.openjdk.skara.mailinglist.MailingListReader;
import org.openjdk.skara.network.URIBuilder;
import org.openjdk.skara.json.*;
import org.openjdk.skara.mailinglist.MailingListServerFactory;
@@ -89,8 +89,6 @@ public List<Bot> create(BotConfiguration configuration) {
var archiveRef = configuration.repositoryRef(specific.get("archive").asString());
var issueTracker = URIBuilder.base(specific.get("issues").asString()).build();

var allRepositories = new HashSet<HostedRepository>();

var readyLabels = specific.get("ready").get("labels").stream()
.map(JSONValue::asString)
.collect(Collectors.toSet());
@@ -101,8 +99,11 @@ public List<Bot> create(BotConfiguration configuration) {
var cooldown = specific.contains("cooldown") ? Duration.parse(specific.get("cooldown").asString()) : Duration.ofMinutes(1);
var mailmanServer = MailingListServerFactory.createMailmanServer(listArchive, listSmtp, Duration.ZERO);

var mailingListReaderMap = new HashMap<List<String>, MailingListReader>();

for (var repoConfig : specific.get("repositories").asArray()) {
var repo = repoConfig.get("repository").asString();
var hostedRepository = configuration.repository(repo);
var censusRepo = configuration.repository(repoConfig.get("census").asString());
var censusRef = configuration.repositoryRef(repoConfig.get("census").asString());

@@ -120,7 +121,13 @@ public List<Bot> create(BotConfiguration configuration) {
var listsForReading = listNamesForReading.stream()
.map(EmailAddress::localPart)
.collect(Collectors.toList());
var bot = new MailingListArchiveReaderBot(from, mailmanServer.getListReader(listsForReading.toArray(new String[0])), allRepositories);

// Reuse MailingListReaders with the exact same set of mailing lists between bots
// to benefit more from cached results.
if (!mailingListReaderMap.containsKey(listsForReading)) {
mailingListReaderMap.put(listsForReading, mailmanServer.getListReader(listsForReading.toArray(new String[0])));
}
var bot = new MailingListArchiveReaderBot(mailingListReaderMap.get(listsForReading), hostedRepository);
ret.add(bot);
}

@@ -137,7 +144,7 @@ public List<Bot> create(BotConfiguration configuration) {
repoConfig.get("webrevs").get("json").asBoolean();

var botBuilder = MailingListBridgeBot.newBuilder().from(from)
.repo(configuration.repository(repo))
.repo(hostedRepository)
.archive(archiveRepo)
.archiveRef(archiveRef)
.censusRepo(censusRepo)
@@ -169,8 +176,6 @@ public List<Bot> create(BotConfiguration configuration) {
botBuilder.branchInSubject(Pattern.compile(repoConfig.get("branchname").asString()));
}
ret.add(botBuilder.build());

allRepositories.add(configuration.repository(repo));
}

return ret;
@@ -88,7 +88,7 @@ void simpleArchive(TestInfo testInfo) throws IOException {
var mailmanServer = MailingListServerFactory.createMailmanServer(listServer.getArchive(), listServer.getSMTP(),
Duration.ZERO);
var mailmanList = mailmanServer.getListReader(listAddress.address());
var readerBot = new MailingListArchiveReaderBot(from, mailmanList, Set.of(archive));
var readerBot = new MailingListArchiveReaderBot(mailmanList, archive);

// Populate the projects repository
var localRepo = CheckableRepository.init(tempFolder.path(), author.repositoryType());
@@ -164,7 +164,7 @@ void rememberBridged(TestInfo testInfo) throws IOException {
var mailmanServer = MailingListServerFactory.createMailmanServer(listServer.getArchive(), listServer.getSMTP(),
Duration.ZERO);
var mailmanList = mailmanServer.getListReader(listAddress.address());
var readerBot = new MailingListArchiveReaderBot(from, mailmanList, Set.of(archive));
var readerBot = new MailingListArchiveReaderBot(mailmanList, archive);

// Populate the projects repository
var localRepo = CheckableRepository.init(tempFolder.path(), author.repositoryType());
@@ -197,7 +197,7 @@ void rememberBridged(TestInfo testInfo) throws IOException {
var updated = pr.comments();
assertEquals(2, updated.size());

var newReaderBot = new MailingListArchiveReaderBot(from, mailmanList, Set.of(archive));
var newReaderBot = new MailingListArchiveReaderBot(mailmanList, archive);
TestBotRunner.runPeriodicItems(newReaderBot);
TestBotRunner.runPeriodicItems(newReaderBot);

@@ -240,7 +240,7 @@ void largeEmail(TestInfo testInfo) throws IOException {
var mailmanServer = MailingListServerFactory.createMailmanServer(listServer.getArchive(), listServer.getSMTP(),
Duration.ZERO);
var mailmanList = mailmanServer.getListReader(listAddress.address());
var readerBot = new MailingListArchiveReaderBot(from, mailmanList, Set.of(archive));
var readerBot = new MailingListArchiveReaderBot(mailmanList, archive);

// Populate the projects repository
var localRepo = CheckableRepository.init(tempFolder.path(), author.repositoryType());
@@ -318,12 +318,6 @@ void branchMissing(TestInfo testInfo) throws IOException {
.issueTracker(URIBuilder.base("http://issues.test/browse/").build())
.build();

// The mailing list as well
var mailmanServer = MailingListServerFactory.createMailmanServer(listServer.getArchive(), listServer.getSMTP(),
Duration.ZERO);
var mailmanList = mailmanServer.getListReader(listAddress.address());
var readerBot = new MailingListArchiveReaderBot(from, mailmanList, Set.of(archive));

// Populate the projects repository
var localRepo = CheckableRepository.init(tempFolder.path(), author.repositoryType());
var masterHash = localRepo.resolve("master").orElseThrow();
@@ -89,45 +89,69 @@ public static List<Conversation> parseMbox(List<Email> emails) {
.filter(email -> !email.hasHeader("In-Reply-To"))
.collect(Collectors.toMap(Email::id, Conversation::new));

var outOfOrder = new ArrayList<Email>();
var lastOutOfOrderCount = -1;
while (outOfOrder.size() != lastOutOfOrderCount) {
lastOutOfOrderCount = outOfOrder.size();
outOfOrder.clear();

for (var email : emails) {
if (email.hasHeader("In-Reply-To")) {
var inReplyToMatcher = inReplyToPattern.matcher(email.headerValue("In-Reply-To"));
if (!inReplyToMatcher.find()) {
log.info("Cannot parse In-Reply-To header: " + email.headerValue("In-Reply-To"));
continue;
}
var inReplyTo = EmailAddress.from(inReplyToMatcher.group(1));
if (!idToMail.containsKey(inReplyTo)) {
log.info("Can't find parent: " + inReplyTo + " - discarding");
continue;
}
var parent = idToMail.get(inReplyTo);
if (!idToConversation.containsKey(inReplyTo)) {
outOfOrder.add(email);
} else {
var conversation = idToConversation.get(inReplyTo);
conversation.addReply(parent, email);
idToConversation.put(email.id(), conversation);
var unhandledEmails = emails;
var lastUnhandledCount = 0;
while (unhandledEmails.size() != lastUnhandledCount) {
lastUnhandledCount = unhandledEmails.size();
var emailsToCheck = unhandledEmails;
unhandledEmails = new ArrayList<>();

for (var email : emailsToCheck) {
if (!idToConversation.containsKey(email.id())) {
EmailAddress inReplyTo = findInReplyTo(idToMail, email);
if (inReplyTo != null) {
if (!idToConversation.containsKey(inReplyTo)) {
unhandledEmails.add(email);
} else {
var conversation = idToConversation.get(inReplyTo);
var parent = idToMail.get(inReplyTo);
conversation.addReply(parent, email);
idToConversation.put(email.id(), conversation);
}
}
}
}
}
if (!outOfOrder.isEmpty()) {
log.info("Out of order remaining: " + outOfOrder.size());
outOfOrder.forEach(oo -> log.info(" " + oo.id()));
if (!unhandledEmails.isEmpty()) {
log.info("Out of order remaining: " + unhandledEmails.size());
unhandledEmails.forEach(oo -> log.info(" " + oo.id()));
}

return idToConversation.values().stream()
.distinct()
.collect(Collectors.toList());
}

private static EmailAddress findInReplyTo(Map<EmailAddress, Email> idToMail, Email email) {
if (email.hasHeader("In-Reply-To")) {
var inReplyToMatcher = inReplyToPattern.matcher(email.headerValue("In-Reply-To"));
if (!inReplyToMatcher.find()) {
log.info("Cannot parse In-Reply-To header: " + email.headerValue("In-Reply-To"));
} else {
var inReplyTo = EmailAddress.from(inReplyToMatcher.group(1));
if (idToMail.containsKey(inReplyTo)) {
return inReplyTo;
}
}
}
if (email.hasHeader("References")) {
var references = email.headerValue("References");
var referenceList = Arrays.asList(references.split("\\s+"));
Collections.reverse(referenceList);
for (String reference : referenceList) {
var referenceMatcher = inReplyToPattern.matcher(reference);
if (referenceMatcher.find()) {
var referenceAddress = EmailAddress.from(referenceMatcher.group(1));
if (idToMail.containsKey(referenceAddress)) {
return referenceAddress;
}
}
}
}
log.info("Can't find parent for: " + email.id() + " - discarding");
return null;
}

public static String fromMail(Email mail) {
var mboxString = new StringWriter();
var mboxMail = new PrintWriter(mboxString);
@@ -378,4 +378,43 @@ void replyOutOfOrderSplit() throws IOException {
assertEquals(3, conversation.allMessages().size());
}
}

/**
* Tests that fallback on References field works when In-Reply-To points to a non
* existing email.
*/
@Test
void middleMessageMissing() throws IOException {
try (var folder = new TemporaryDirectory()) {
var rawMbox1 = folder.path().resolve("test1.mbox");
Files.writeString(rawMbox1, """
From test at example.com Wed Aug 21 17:22:50 2019
From: test at example.com (test at example.com)
Date: Wed, 21 Aug 2019 17:22:50 +0000
Subject: this is a test
Message-ID: <abc123@example.com>

First message

From test3 at example.com Wed Aug 21 17:42:50 2019
From: test3 at example.com (test3 at example.com)
Date: Wed, 21 Aug 2019 17:42:50 +0000
Subject: Re: this is a test
In-Reply-To: <def456@example.com>
References: <foo999@example.com>
<abc123@example.com>
<def456@example.com>
Message-ID: <ghi789@example.com>

Third message
""",
StandardCharsets.UTF_8);
var mbox = MailingListServerFactory.createMboxFileServer(folder.path());
var list = mbox.getListReader("test1");
var conversations = list.conversations(Duration.ofDays(365 * 100));
assertEquals(1, conversations.size());
var conversation = conversations.get(0);
assertEquals(2, conversation.allMessages().size());
}
}
}