Skip to content

Commit

Permalink
Fix ImapIdleChA for scheduling race condition (#8670)
Browse files Browse the repository at this point in the history
* Fix ImapIdleChA for scheduling race condition

The IMAP IDLE is long-lived process and can be blocked
waiting for any reply from the server.
This way it is not suited to be used in a `TaskScheduler`
especially when it has only one thread in its pool in Spring Boot by default.
Another concurrent scheduled task is exactly an `ImapMailReceiver.IdleCanceller`.
With a single thread in a `TaskScheduler` pool it cannot be reached
therefore we never cancel and IDLE task and cannot react to the connection loss properly

* Rework the `ImapIdleChannelAdapter` logic to use a regular `Executor` and `while()` loop
with a `Thread.sleep()` when we lose connection
* Clean up the `ImapMailReceiverTests` from `TaskScheduler` not used anymore.
* Expose new `taskExecutor` option in the `ImapIdleChannelAdapterSpec` for Java DSL
* Enable `ImapMailReceiverTests.testIdleWithMessageMapping()` with an attempt to see
if this fix covers an unclear problem exposed before

* * The `testIdleWithMessageMapping()` still fails on GH actions
  • Loading branch information
artembilan committed Jul 12, 2023
1 parent 9597b7a commit e4c1851
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 135 deletions.
Expand Up @@ -17,10 +17,9 @@
package org.springframework.integration.mail;

import java.io.Serial;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import jakarta.mail.Folder;
Expand All @@ -31,15 +30,13 @@
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mail.event.MailIntegrationEvent;
import org.springframework.integration.transaction.IntegrationResourceHolder;
import org.springframework.integration.transaction.IntegrationResourceHolderSynchronization;
import org.springframework.integration.transaction.TransactionSynchronizationFactory;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
Expand All @@ -63,12 +60,10 @@ public class ImapIdleChannelAdapter extends MessageProducerSupport implements Be

private static final int DEFAULT_RECONNECT_DELAY = 10000;

private final ExceptionAwarePeriodicTrigger receivingTaskTrigger = new ExceptionAwarePeriodicTrigger();

private final IdleTask idleTask = new IdleTask();

private final ImapMailReceiver mailReceiver;

private Executor taskExecutor;

private TransactionSynchronizationFactory transactionSynchronizationFactory;

private ClassLoader classLoader;
Expand Down Expand Up @@ -100,6 +95,16 @@ public void setAdviceChain(List<Advice> adviceChain) {
this.adviceChain = adviceChain;
}

/**
* Provide a managed {@link Executor} to schedule a receiving IDLE task.
* @param taskExecutor the {@link Executor} to use.
* @since 6.2
*/
public void setTaskExecutor(Executor taskExecutor) {
Assert.notNull(taskExecutor, "'taskExecutor' must not be null");
this.taskExecutor = taskExecutor;
}

/**
* Specify whether the IDLE task should reconnect automatically after
* catching a {@link jakarta.mail.MessagingException} while waiting for messages. The
Expand Down Expand Up @@ -139,6 +144,10 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
protected void onInit() {
super.onInit();

if (this.taskExecutor == null) {
this.taskExecutor = new SimpleAsyncTaskExecutor(getBeanName() + "-");
}

Consumer<?> messageSenderToUse = new MessageSender();

if (!CollectionUtils.isEmpty(this.adviceChain)) {
Expand All @@ -153,16 +162,9 @@ protected void onInit() {
this.messageSender = (Consumer<Object>) messageSenderToUse;
}


/*
* Lifecycle implementation
*/

@Override // guarded by super#lifecycleLock
@Override
protected void doStart() {
TaskScheduler scheduler = getTaskScheduler();
Assert.notNull(scheduler, "'taskScheduler' must not be null");
this.receivingTask = scheduler.schedule(new ReceivingTask(), this.receivingTaskTrigger);
this.taskExecutor.execute(this::callIdle);
}

@Override
Expand Down Expand Up @@ -190,6 +192,70 @@ private void publishException(Exception ex) {
}
}

private void callIdle() {
while (isActive()) {
try {
processIdle();
logger.debug("Task completed successfully. Re-scheduling it again right away.");
}
catch (Exception ex) {
publishException(ex);
if (this.shouldReconnectAutomatically
&& ex.getCause() instanceof jakarta.mail.MessagingException messagingException) {

//run again after a delay
logger.info(messagingException,
() -> "Failed to execute IDLE task. Will attempt to resubmit in "
+ this.reconnectDelay + " milliseconds.");
delayNextIdleCall();
}
else {
logger.warn(ex,
"Failed to execute IDLE task. " +
"Won't resubmit since not a 'shouldReconnectAutomatically' " +
"or not a 'jakarta.mail.MessagingException'");
break;
}
}
}
}

private void processIdle() {
try {
logger.debug("waiting for mail");
this.mailReceiver.waitForNewMessages();
Folder folder = this.mailReceiver.getFolder();
if (folder != null && folder.isOpen() && isRunning()) {
Object[] mailMessages = this.mailReceiver.receive();
logger.debug(() -> "received " + mailMessages.length + " mail messages");
for (Object mailMessage : mailMessages) {
if (isRunning()) {
this.messageSender.accept(mailMessage);
}
}
}
}
catch (jakarta.mail.MessagingException ex) {
logger.warn(ex, "error occurred in idle task");
if (this.shouldReconnectAutomatically) {
throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", ex);
}
else {
throw new MessagingException("Failure in 'idle' task. Will NOT resubmit.", ex);
}
}
}

private void delayNextIdleCall() {
try {
Thread.sleep(this.reconnectDelay);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException(ex);
}
}

private class MessageSender implements Consumer<Object> {

MessageSender() {
Expand Down Expand Up @@ -227,112 +293,6 @@ public void accept(Object mailMessage) {

}

private class ReceivingTask implements Runnable {

ReceivingTask() {
}

@Override
public void run() {
if (isRunning()) {
try {
ImapIdleChannelAdapter.this.idleTask.run();
logger.debug("Task completed successfully. Re-scheduling it again right away.");
}
catch (Exception ex) {
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically
&& ex.getCause() instanceof jakarta.mail.MessagingException messagingException) {

//run again after a delay
logger.info(messagingException,
() -> "Failed to execute IDLE task. Will attempt to resubmit in "
+ ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds.");
ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution();
}
else {
logger.warn(ex,
"Failed to execute IDLE task. " +
"Won't resubmit since not a 'shouldReconnectAutomatically' " +
"or not a 'jakarta.mail.MessagingException'");
ImapIdleChannelAdapter.this.receivingTaskTrigger.stop();
}
publishException(ex);
}
}
}

}


private class IdleTask implements Runnable {

IdleTask() {
}

@Override
public void run() {
if (isRunning()) {
try {
logger.debug("waiting for mail");
ImapIdleChannelAdapter.this.mailReceiver.waitForNewMessages();
Folder folder = ImapIdleChannelAdapter.this.mailReceiver.getFolder();
if (folder != null && folder.isOpen() && isRunning()) {
Object[] mailMessages = ImapIdleChannelAdapter.this.mailReceiver.receive();
logger.debug(() -> "received " + mailMessages.length + " mail messages");
for (Object mailMessage : mailMessages) {
if (isRunning()) {
ImapIdleChannelAdapter.this.messageSender.accept(mailMessage);
}
}
}
}
catch (jakarta.mail.MessagingException ex) {
logger.warn(ex, "error occurred in idle task");
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically) {
throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", ex);
}
else {
throw new MessagingException("Failure in 'idle' task. Will NOT resubmit.", ex);
}
}
}
}

}

private class ExceptionAwarePeriodicTrigger implements Trigger {

private final AtomicBoolean delayNextExecution = new AtomicBoolean();

private final AtomicBoolean stop = new AtomicBoolean();


ExceptionAwarePeriodicTrigger() {
}

@Override
public Instant nextExecution(TriggerContext triggerContext) {
if (this.stop.getAndSet(false)) {
return null;
}
if (this.delayNextExecution.getAndSet(false)) {
return Instant.now().plusMillis(ImapIdleChannelAdapter.this.reconnectDelay);
}
else {
return Instant.now();
}
}

void delayNextExecution() {
this.delayNextExecution.set(true);
}

void stop() {
this.stop.set(true);
}

}

public class ImapIdleExceptionEvent extends MailIntegrationEvent {

@Serial
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -395,6 +396,17 @@ public ImapIdleChannelAdapterSpec simpleContent(boolean simpleContent) {
return _this();
}

/**
* Provide a managed {@link Executor} to schedule a receiving IDLE task.
* @param taskExecutor the {@link Executor} to use.
* @return the spec.
* @since 6.2
*/
public ImapIdleChannelAdapterSpec taskExecutor(Executor taskExecutor) {
this.target.setTaskExecutor(taskExecutor);
return this;
}

@Override
public Map<Object, String> getComponentsToRegister() {
return this.componentsToRegister;
Expand Down
Expand Up @@ -228,7 +228,6 @@ public void testIdleWithServerGuts(ImapMailReceiver receiver, boolean mapped, bo
ImapIdleChannelAdapter adapter = new ImapIdleChannelAdapter(receiver);
QueueChannel channel = new QueueChannel();
adapter.setOutputChannel(channel);
adapter.setTaskScheduler(taskScheduler);
adapter.setReconnectDelay(10);
adapter.afterPropertiesSet();
adapter.start();
Expand Down Expand Up @@ -781,17 +780,14 @@ public void testConnectionException() throws Exception {
theEvent.set(event);
latch.countDown();
});
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.initialize();
adapter.setTaskScheduler(taskScheduler);
adapter.setReconnectDelay(10);
adapter.afterPropertiesSet();
adapter.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(theEvent.get().toString())
.endsWith("cause=java.lang.IllegalStateException: Failure in 'idle' task. Will resubmit.]");

adapter.stop();
taskScheduler.destroy();
}

@Test // see INT-1801
Expand Down Expand Up @@ -967,19 +963,15 @@ public void testIdleReconnects() throws Exception {
i.callRealMethod();
throw new FolderClosedException(folder, "test");
}).given(receiver).waitForNewMessages();
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.initialize();
adapter.setTaskScheduler(taskScheduler);
adapter.setReconnectDelay(10);
adapter.afterPropertiesSet();
final CountDownLatch latch = new CountDownLatch(3);
CountDownLatch latch = new CountDownLatch(3);
adapter.setApplicationEventPublisher(e -> latch.countDown());
adapter.afterPropertiesSet();
adapter.start();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
verify(store, atLeast(3)).connect();

adapter.stop();
taskScheduler.shutdown();
}

private void setUpScheduler(ImapMailReceiver mailReceiver, ThreadPoolTaskScheduler taskScheduler) {
Expand Down

0 comments on commit e4c1851

Please sign in to comment.