Skip to content
Permalink
Browse files
1153: Some emails still not posted as comments in PR
Reviewed-by: kcr
  • Loading branch information
erikj79 committed Sep 3, 2021
1 parent 273d83d commit 3293f7afda04febc4ca212d2a7acedb682d5cb5f
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 56 deletions.
@@ -251,7 +251,7 @@ private void submitOrSchedule(WorkItem item) {

for (var pendingItem : pending.entrySet()) {
// If there are pending items of the same type that we cannot run concurrently with, replace them.
if (pendingItem.getKey().getClass().equals(item.getClass()) && !pendingItem.getKey().concurrentWith(item)) {
if (item.replaces(pendingItem.getKey())) {
log.finer("Discarding obsoleted item " + pendingItem.getKey() +
" in favor of item " + item);
DISCARDED_COUNTER.labels(item.botName(), item.workItemName()).inc();
@@ -33,6 +33,15 @@ public interface WorkItem {
*/
boolean concurrentWith(WorkItem other);

/**
* Returns true if this item should replace the other item in the queue. By default
* this is true if both items are of the same type, and cannot run concurrently with
* each other. In some cases we need a more specific condition.
*/
default boolean replaces(WorkItem other) {
return this.getClass().equals(other.getClass()) && !concurrentWith(other);
}

/**
* Execute the appropriate tasks with the provided scratch folder. Optionally return follow-up work items
* that will be scheduled for execution.
@@ -40,7 +40,7 @@ public class ArchiveReaderWorkItem implements WorkItem {

@Override
public String toString() {
return "ArchiveReaderWorkItem@" + list;
return "ArchiveReaderWorkItem@" + bot.repository().name();
}

@Override
@@ -55,6 +55,17 @@ public boolean concurrentWith(WorkItem other) {
return false;
}

/**
* An ArchiveReaderWorkItem can't run concurrently with another item that shares the same
* MailingListReader, but it only replaces an item that acts on the same repository.
*/
@Override
public boolean replaces(WorkItem other) {
return !concurrentWith(other)
&& (other instanceof ArchiveReaderWorkItem archiveReaderWorkItem)
&& bot.repository().name().equals(archiveReaderWorkItem.bot.repository().name());
}

@Override
public Collection<WorkItem> run(Path scratchPath) {
// Give the bot a chance to act on all found messages
@@ -35,20 +35,22 @@
import java.util.stream.Collectors;

public class MailingListArchiveReaderBot implements Bot {
private final EmailAddress archivePoster;
private final MailingListReader list;
private final Set<HostedRepository> repositories;
private final HostedRepository repository;
private final Map<EmailAddress, String> parsedConversations = new HashMap<>();
private final Map<EmailAddress, PullRequest> resolvedPullRequests = new HashMap<>();
private final Set<EmailAddress> parsedEmailIds = new HashSet<>();
private final Queue<CommentPosterWorkItem> commentQueue = new ConcurrentLinkedQueue<>();
private final Pattern pullRequestLinkPattern = Pattern.compile("^(?:PR: |Pull request:\\R)(.*?)$", Pattern.MULTILINE);
private final Logger log = Logger.getLogger("org.openjdk.skara.bots.mlbridge");

MailingListArchiveReaderBot(EmailAddress archivePoster, MailingListReader list, Set<HostedRepository> repositories) {
this.archivePoster = archivePoster;
MailingListArchiveReaderBot(MailingListReader list, HostedRepository repository) {
this.list = list;
this.repositories = repositories;
this.repository = repository;
}

public HostedRepository repository() {
return repository;
}

private synchronized void invalidate(List<Email> messages) {
@@ -103,10 +105,7 @@ synchronized void inspect(Conversation conversation) {
if (prLink.equals("invalid")) {
return;
}
var foundPr = repositories.stream()
.map(repository -> repository.parsePullRequestUrl(prLink))
.filter(Optional::isPresent)
.map(Optional::get).findAny();
var foundPr = repository.parsePullRequestUrl(prLink);
if (foundPr.isEmpty()) {
log.info("PR link that can't be matched to an actual PR: " + prLink);
parsedConversations.put(first.id(), "invalid");
@@ -24,7 +24,7 @@

import org.openjdk.skara.bot.*;
import org.openjdk.skara.email.EmailAddress;
import org.openjdk.skara.forge.HostedRepository;
import org.openjdk.skara.mailinglist.MailingListReader;
import org.openjdk.skara.network.URIBuilder;
import org.openjdk.skara.json.*;
import org.openjdk.skara.mailinglist.MailingListServerFactory;
@@ -89,8 +89,6 @@ public List<Bot> create(BotConfiguration configuration) {
var archiveRef = configuration.repositoryRef(specific.get("archive").asString());
var issueTracker = URIBuilder.base(specific.get("issues").asString()).build();

var allRepositories = new HashSet<HostedRepository>();

var readyLabels = specific.get("ready").get("labels").stream()
.map(JSONValue::asString)
.collect(Collectors.toSet());
@@ -101,8 +99,11 @@ public List<Bot> create(BotConfiguration configuration) {
var cooldown = specific.contains("cooldown") ? Duration.parse(specific.get("cooldown").asString()) : Duration.ofMinutes(1);
var mailmanServer = MailingListServerFactory.createMailmanServer(listArchive, listSmtp, Duration.ZERO);

var mailingListReaderMap = new HashMap<List<String>, MailingListReader>();

for (var repoConfig : specific.get("repositories").asArray()) {
var repo = repoConfig.get("repository").asString();
var hostedRepository = configuration.repository(repo);
var censusRepo = configuration.repository(repoConfig.get("census").asString());
var censusRef = configuration.repositoryRef(repoConfig.get("census").asString());

@@ -120,7 +121,13 @@ public List<Bot> create(BotConfiguration configuration) {
var listsForReading = listNamesForReading.stream()
.map(EmailAddress::localPart)
.collect(Collectors.toList());
var bot = new MailingListArchiveReaderBot(from, mailmanServer.getListReader(listsForReading.toArray(new String[0])), allRepositories);

// Reuse MailingListReaders with the exact same set of mailing lists between bots
// to benefit more from cached results.
if (!mailingListReaderMap.containsKey(listsForReading)) {
mailingListReaderMap.put(listsForReading, mailmanServer.getListReader(listsForReading.toArray(new String[0])));
}
var bot = new MailingListArchiveReaderBot(mailingListReaderMap.get(listsForReading), hostedRepository);
ret.add(bot);
}

@@ -137,7 +144,7 @@ public List<Bot> create(BotConfiguration configuration) {
repoConfig.get("webrevs").get("json").asBoolean();

var botBuilder = MailingListBridgeBot.newBuilder().from(from)
.repo(configuration.repository(repo))
.repo(hostedRepository)
.archive(archiveRepo)
.archiveRef(archiveRef)
.censusRepo(censusRepo)
@@ -169,8 +176,6 @@ public List<Bot> create(BotConfiguration configuration) {
botBuilder.branchInSubject(Pattern.compile(repoConfig.get("branchname").asString()));
}
ret.add(botBuilder.build());

allRepositories.add(configuration.repository(repo));
}

return ret;
@@ -88,7 +88,7 @@ void simpleArchive(TestInfo testInfo) throws IOException {
var mailmanServer = MailingListServerFactory.createMailmanServer(listServer.getArchive(), listServer.getSMTP(),
Duration.ZERO);
var mailmanList = mailmanServer.getListReader(listAddress.address());
var readerBot = new MailingListArchiveReaderBot(from, mailmanList, Set.of(archive));
var readerBot = new MailingListArchiveReaderBot(mailmanList, archive);

// Populate the projects repository
var localRepo = CheckableRepository.init(tempFolder.path(), author.repositoryType());
@@ -164,7 +164,7 @@ void rememberBridged(TestInfo testInfo) throws IOException {
var mailmanServer = MailingListServerFactory.createMailmanServer(listServer.getArchive(), listServer.getSMTP(),
Duration.ZERO);
var mailmanList = mailmanServer.getListReader(listAddress.address());
var readerBot = new MailingListArchiveReaderBot(from, mailmanList, Set.of(archive));
var readerBot = new MailingListArchiveReaderBot(mailmanList, archive);

// Populate the projects repository
var localRepo = CheckableRepository.init(tempFolder.path(), author.repositoryType());
@@ -197,7 +197,7 @@ void rememberBridged(TestInfo testInfo) throws IOException {
var updated = pr.comments();
assertEquals(2, updated.size());

var newReaderBot = new MailingListArchiveReaderBot(from, mailmanList, Set.of(archive));
var newReaderBot = new MailingListArchiveReaderBot(mailmanList, archive);
TestBotRunner.runPeriodicItems(newReaderBot);
TestBotRunner.runPeriodicItems(newReaderBot);

@@ -240,7 +240,7 @@ void largeEmail(TestInfo testInfo) throws IOException {
var mailmanServer = MailingListServerFactory.createMailmanServer(listServer.getArchive(), listServer.getSMTP(),
Duration.ZERO);
var mailmanList = mailmanServer.getListReader(listAddress.address());
var readerBot = new MailingListArchiveReaderBot(from, mailmanList, Set.of(archive));
var readerBot = new MailingListArchiveReaderBot(mailmanList, archive);

// Populate the projects repository
var localRepo = CheckableRepository.init(tempFolder.path(), author.repositoryType());
@@ -318,12 +318,6 @@ void branchMissing(TestInfo testInfo) throws IOException {
.issueTracker(URIBuilder.base("http://issues.test/browse/").build())
.build();

// The mailing list as well
var mailmanServer = MailingListServerFactory.createMailmanServer(listServer.getArchive(), listServer.getSMTP(),
Duration.ZERO);
var mailmanList = mailmanServer.getListReader(listAddress.address());
var readerBot = new MailingListArchiveReaderBot(from, mailmanList, Set.of(archive));

// Populate the projects repository
var localRepo = CheckableRepository.init(tempFolder.path(), author.repositoryType());
var masterHash = localRepo.resolve("master").orElseThrow();
@@ -89,45 +89,69 @@ public static List<Conversation> parseMbox(List<Email> emails) {
.filter(email -> !email.hasHeader("In-Reply-To"))
.collect(Collectors.toMap(Email::id, Conversation::new));

var outOfOrder = new ArrayList<Email>();
var lastOutOfOrderCount = -1;
while (outOfOrder.size() != lastOutOfOrderCount) {
lastOutOfOrderCount = outOfOrder.size();
outOfOrder.clear();

for (var email : emails) {
if (email.hasHeader("In-Reply-To")) {
var inReplyToMatcher = inReplyToPattern.matcher(email.headerValue("In-Reply-To"));
if (!inReplyToMatcher.find()) {
log.info("Cannot parse In-Reply-To header: " + email.headerValue("In-Reply-To"));
continue;
}
var inReplyTo = EmailAddress.from(inReplyToMatcher.group(1));
if (!idToMail.containsKey(inReplyTo)) {
log.info("Can't find parent: " + inReplyTo + " - discarding");
continue;
}
var parent = idToMail.get(inReplyTo);
if (!idToConversation.containsKey(inReplyTo)) {
outOfOrder.add(email);
} else {
var conversation = idToConversation.get(inReplyTo);
conversation.addReply(parent, email);
idToConversation.put(email.id(), conversation);
var unhandledEmails = emails;
var lastUnhandledCount = 0;
while (unhandledEmails.size() != lastUnhandledCount) {
lastUnhandledCount = unhandledEmails.size();
var emailsToCheck = unhandledEmails;
unhandledEmails = new ArrayList<>();

for (var email : emailsToCheck) {
if (!idToConversation.containsKey(email.id())) {
EmailAddress inReplyTo = findInReplyTo(idToMail, email);
if (inReplyTo != null) {
if (!idToConversation.containsKey(inReplyTo)) {
unhandledEmails.add(email);
} else {
var conversation = idToConversation.get(inReplyTo);
var parent = idToMail.get(inReplyTo);
conversation.addReply(parent, email);
idToConversation.put(email.id(), conversation);
}
}
}
}
}
if (!outOfOrder.isEmpty()) {
log.info("Out of order remaining: " + outOfOrder.size());
outOfOrder.forEach(oo -> log.info(" " + oo.id()));
if (!unhandledEmails.isEmpty()) {
log.info("Out of order remaining: " + unhandledEmails.size());
unhandledEmails.forEach(oo -> log.info(" " + oo.id()));
}

return idToConversation.values().stream()
.distinct()
.collect(Collectors.toList());
}

private static EmailAddress findInReplyTo(Map<EmailAddress, Email> idToMail, Email email) {
if (email.hasHeader("In-Reply-To")) {
var inReplyToMatcher = inReplyToPattern.matcher(email.headerValue("In-Reply-To"));
if (!inReplyToMatcher.find()) {
log.info("Cannot parse In-Reply-To header: " + email.headerValue("In-Reply-To"));
} else {
var inReplyTo = EmailAddress.from(inReplyToMatcher.group(1));
if (idToMail.containsKey(inReplyTo)) {
return inReplyTo;
}
}
}
if (email.hasHeader("References")) {
var references = email.headerValue("References");
var referenceList = Arrays.asList(references.split("\\s+"));
Collections.reverse(referenceList);
for (String reference : referenceList) {
var referenceMatcher = inReplyToPattern.matcher(reference);
if (referenceMatcher.find()) {
var referenceAddress = EmailAddress.from(referenceMatcher.group(1));
if (idToMail.containsKey(referenceAddress)) {
return referenceAddress;
}
}
}
}
log.info("Can't find parent for: " + email.id() + " - discarding");
return null;
}

public static String fromMail(Email mail) {
var mboxString = new StringWriter();
var mboxMail = new PrintWriter(mboxString);
@@ -378,4 +378,43 @@ void replyOutOfOrderSplit() throws IOException {
assertEquals(3, conversation.allMessages().size());
}
}

/**
* Tests that fallback on References field works when In-Reply-To points to a non
* existing email.
*/
@Test
void middleMessageMissing() throws IOException {
try (var folder = new TemporaryDirectory()) {
var rawMbox1 = folder.path().resolve("test1.mbox");
Files.writeString(rawMbox1, """
From test at example.com Wed Aug 21 17:22:50 2019
From: test at example.com (test at example.com)
Date: Wed, 21 Aug 2019 17:22:50 +0000
Subject: this is a test
Message-ID: <abc123@example.com>

First message

From test3 at example.com Wed Aug 21 17:42:50 2019
From: test3 at example.com (test3 at example.com)
Date: Wed, 21 Aug 2019 17:42:50 +0000
Subject: Re: this is a test
In-Reply-To: <def456@example.com>
References: <foo999@example.com>
<abc123@example.com>
<def456@example.com>
Message-ID: <ghi789@example.com>

Third message
""",
StandardCharsets.UTF_8);
var mbox = MailingListServerFactory.createMboxFileServer(folder.path());
var list = mbox.getListReader("test1");
var conversations = list.conversations(Duration.ofDays(365 * 100));
assertEquals(1, conversations.size());
var conversation = conversations.get(0);
assertEquals(2, conversation.allMessages().size());
}
}
}

1 comment on commit 3293f7a

@openjdk-notifier
Copy link

@openjdk-notifier openjdk-notifier bot commented on 3293f7a Sep 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.