diff --git a/README.md b/README.md index 07df59c4..243aae0d 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,7 @@ It works seamlessly across AEM on-premise, AMS, and AEMaaCS environments. - [Outputs example](#outputs-example) - [ACL example](#acl-example) - [Repo example](#repo-example) + - [Abortable example](#abortable-example) - [History](#history) - [Extension scripts](#extension-scripts) - [Example extension script](#example-extension-script) @@ -421,6 +422,38 @@ void doRun() { ACM ACL Repo Output +#### Abortable example + +For long-running scripts that process many nodes, it's important to support graceful abortion. This allows users to stop the script execution without leaving the repository in an inconsistent state. + +```groovy +void doRun() { + repo.queryRaw("SELECT * FROM [nt:base] WHERE ISDESCENDANTNODE('/content/acme/us/en')").forEach { resource -> + // Safe point + context.checkAborted() + + // Process resource + // TODO resource.save() etc. + } +} +``` + +Alternatively, you can use `context.isAborted()` for manual control: + +```groovy +void doRun() { + def assets = repo.queryRaw("SELECT * FROM [dam:Asset] WHERE ISDESCENDANTNODE('/content/dam')").iterator() + for (asset in assets) { + if (context.isAborted()) { + // Do clean when aborted + break + } + } + // Still remember to propagate abort status + context.checkAborted() +} +``` + ### History All code executions are logged in the history. You can see the status of each execution, including whether it was successful or failed. The history also provides detailed logs for each execution, including any errors that occurred. diff --git a/core/src/main/java/dev/vml/es/acm/core/code/AbortException.java b/core/src/main/java/dev/vml/es/acm/core/code/AbortException.java new file mode 100644 index 00000000..278d900d --- /dev/null +++ b/core/src/main/java/dev/vml/es/acm/core/code/AbortException.java @@ -0,0 +1,12 @@ +package dev.vml.es.acm.core.code; + +public class AbortException extends RuntimeException { + + public AbortException(String message) { + super(message); + } + + public AbortException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/core/src/main/java/dev/vml/es/acm/core/code/ExecutionContext.java b/core/src/main/java/dev/vml/es/acm/core/code/ExecutionContext.java index 97a2c33d..66b4c2c3 100644 --- a/core/src/main/java/dev/vml/es/acm/core/code/ExecutionContext.java +++ b/core/src/main/java/dev/vml/es/acm/core/code/ExecutionContext.java @@ -151,6 +151,23 @@ public void setSkipped(boolean skipped) { this.skipped = skipped; } + public boolean isAborted() { + return getCodeContext() + .getOsgiContext() + .getService(ExecutionQueue.class) + .isAborted(getId()); + } + + public void abort() { + throw new AbortException("Execution aborted gracefully!"); + } + + public void checkAborted() throws AbortException { + if (isAborted()) { + abort(); + } + } + public Inputs getInputs() { return inputs; } @@ -174,6 +191,7 @@ public Conditions getConditions() { private void customizeBinding() { Binding binding = getCodeContext().getBinding(); + binding.setVariable("context", this); binding.setVariable("schedules", schedules); binding.setVariable("arguments", inputs); // TODO deprecated binding.setVariable("inputs", inputs); diff --git a/core/src/main/java/dev/vml/es/acm/core/code/ExecutionQueue.java b/core/src/main/java/dev/vml/es/acm/core/code/ExecutionQueue.java index 90732afa..8f8a2e4f 100644 --- a/core/src/main/java/dev/vml/es/acm/core/code/ExecutionQueue.java +++ b/core/src/main/java/dev/vml/es/acm/core/code/ExecutionQueue.java @@ -10,6 +10,7 @@ import dev.vml.es.acm.core.util.StreamUtils; import java.util.*; import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -52,7 +53,12 @@ public class ExecutionQueue implements JobExecutor, EventListener { @AttributeDefinition( name = "Async Poll Interval", description = "Interval in milliseconds to poll for job status.") - long asyncPollInterval() default 500L; + long asyncPollInterval() default 750L; + + @AttributeDefinition( + name = "Abort Timeout", + description = "Time in milliseconds to wait for graceful abort before forcing it.") + long abortTimeout() default -1; } @Reference @@ -69,6 +75,8 @@ public class ExecutionQueue implements JobExecutor, EventListener { private ExecutorService jobAsyncExecutor; + private final Map jobAborted = new ConcurrentHashMap<>(); + private Config config; @Activate @@ -128,6 +136,10 @@ public Optional findByExecutableId(String executableId) { .findFirst(); } + public boolean isAborted(String executionId) { + return Boolean.TRUE.equals(jobAborted.get(executionId)); + } + public Stream findAllSummaries() { return findJobs().map(job -> new QueuedExecutionSummary(executor, job)); } @@ -208,46 +220,93 @@ public JobExecutionResult process(Job job, JobExecutionContext context) { } }); - while (!future.isDone()) { - if (context.isStopped() || Thread.currentThread().isInterrupted()) { - future.cancel(true); - LOG.debug("Execution is cancelling '{}'", queuedExecution); - break; + try { + Long abortStartTime = null; + while (!future.isDone()) { + if (context.isStopped()) { + if (abortStartTime == null) { + abortStartTime = System.currentTimeMillis(); + jobAborted.put(job.getId(), Boolean.TRUE); + + if (config.abortTimeout() < 0) { + LOG.debug("Execution is aborting gracefully '{}' (no timeout)", queuedExecution); + } else { + LOG.debug( + "Execution is aborting '{}' (timeout: {}ms)", + queuedExecution, + config.abortTimeout()); + } + } else if (config.abortTimeout() >= 0) { + long abortDuration = System.currentTimeMillis() - abortStartTime; + if (abortDuration >= config.abortTimeout()) { + LOG.debug( + "Execution abort timeout exceeded ({}ms), forcing abort '{}'", + abortDuration, + queuedExecution); + future.cancel(true); + break; + } + } + } + + try { + Thread.sleep(config.asyncPollInterval()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.debug("Execution is interrupted '{}'", queuedExecution); + return context.result().cancelled(); + } } + try { - Thread.sleep(config.asyncPollInterval()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.debug("Execution is interrupted '{}'", queuedExecution); - return context.result().cancelled(); + Execution immediateExecution = future.get(); + + if (immediateExecution.getStatus() == ExecutionStatus.SKIPPED) { + LOG.debug("Execution skipped '{}'", immediateExecution); + return context.result() + .message(QueuedMessage.of(ExecutionStatus.SKIPPED, null) + .toJson()) + .cancelled(); + } else { + LOG.debug("Execution succeeded '{}'", immediateExecution); + return context.result().succeeded(); + } + } catch (CancellationException e) { + LOG.debug("Execution aborted forcefully '{}'", queuedExecution); + return context.result() + .message(QueuedMessage.of(ExecutionStatus.ABORTED, ExceptionUtils.toString(e)) + .toJson()) + .cancelled(); + } catch (Exception e) { + AbortException abortException = findAbortException(e); + if (abortException != null) { + LOG.debug("Execution aborted gracefully '{}'", queuedExecution); + return context.result() + .message(QueuedMessage.of(ExecutionStatus.ABORTED, ExceptionUtils.toString(abortException)) + .toJson()) + .cancelled(); + } + + LOG.debug("Execution failed '{}'", queuedExecution, e); + return context.result() + .message(QueuedMessage.of(ExecutionStatus.FAILED, ExceptionUtils.toString(e)) + .toJson()) + .failed(); } + } finally { + jobAborted.remove(job.getId()); } + } - try { - Execution immediateExecution = future.get(); - - if (immediateExecution.getStatus() == ExecutionStatus.SKIPPED) { - LOG.debug("Execution skipped '{}'", immediateExecution); - return context.result() - .message(QueuedMessage.of(ExecutionStatus.SKIPPED, null).toJson()) - .cancelled(); - } else { - LOG.info("Execution succeeded '{}'", immediateExecution); - return context.result().succeeded(); + private AbortException findAbortException(Throwable e) { + Throwable current = e; + while (current != null) { + if (current instanceof AbortException) { + return (AbortException) current; } - } catch (CancellationException e) { - LOG.warn("Execution aborted '{}'", queuedExecution); - return context.result() - .message(QueuedMessage.of(ExecutionStatus.ABORTED, ExceptionUtils.toString(e)) - .toJson()) - .cancelled(); - } catch (Exception e) { - LOG.error("Execution failed '{}'", queuedExecution, e); - return context.result() - .message(QueuedMessage.of(ExecutionStatus.FAILED, ExceptionUtils.toString(e)) - .toJson()) - .failed(); + current = current.getCause(); } + return null; } private Execution executeAsync(ExecutionContextOptions contextOptions, QueuedExecution execution) diff --git a/core/src/main/java/dev/vml/es/acm/core/code/Executor.java b/core/src/main/java/dev/vml/es/acm/core/code/Executor.java index 9b20204a..02081f43 100644 --- a/core/src/main/java/dev/vml/es/acm/core/code/Executor.java +++ b/core/src/main/java/dev/vml/es/acm/core/code/Executor.java @@ -230,18 +230,28 @@ private ContextualExecution executeInternal(ExecutionContext context) { context.getOut().withLoggerTimestamps(config.logPrintingTimestamps()); } contentScript.run(); + + LOG.info("Execution succeeded '{}'", context.getId()); return execution.end(ExecutionStatus.SUCCEEDED); } finally { if (locking) { useLocker(resolverFactory, l -> l.unlock(lockName)); } } - } catch (Throwable e) { + } catch (AbortException e) { + LOG.warn("Execution aborted gracefully '{}'", context.getId()); execution.error(e); + return execution.end(ExecutionStatus.ABORTED); + } catch (Throwable e) { if ((e.getCause() != null && e.getCause() instanceof InterruptedException)) { + LOG.warn("Execution aborted forcefully '{}'", context.getId()); + execution.error(e); return execution.end(ExecutionStatus.ABORTED); + } else { + LOG.error("Execution failed '{}'", context.getId(), e); + execution.error(e); + return execution.end(ExecutionStatus.FAILED); } - return execution.end(ExecutionStatus.FAILED); } finally { statuses.remove(context.getId()); } diff --git a/core/src/main/java/dev/vml/es/acm/core/util/ExceptionUtils.java b/core/src/main/java/dev/vml/es/acm/core/util/ExceptionUtils.java index e6fff9f9..5da0d6ea 100644 --- a/core/src/main/java/dev/vml/es/acm/core/util/ExceptionUtils.java +++ b/core/src/main/java/dev/vml/es/acm/core/util/ExceptionUtils.java @@ -13,4 +13,5 @@ public static String toString(Throwable cause) { .map(org.apache.commons.lang3.exception.ExceptionUtils::getStackTrace) .orElse(null); } + } diff --git a/ui.content/src/main/content/jcr_root/conf/acm/settings/snippet/available/core/general/demo_processing.yml b/ui.content/src/main/content/jcr_root/conf/acm/settings/snippet/available/core/general/demo_processing.yml index 1e2e4f4d..8c8036e9 100644 --- a/ui.content/src/main/content/jcr_root/conf/acm/settings/snippet/available/core/general/demo_processing.yml +++ b/ui.content/src/main/content/jcr_root/conf/acm/settings/snippet/available/core/general/demo_processing.yml @@ -11,6 +11,7 @@ content: | println "Updating resources..." def max = 20 for (int i = 0; i < max; i++) { + context.checkAborted() Thread.sleep(1000) println "Updated (\${i + 1}/\${max})" } diff --git a/ui.frontend/src/components/ExecutionAbortButton.tsx b/ui.frontend/src/components/ExecutionAbortButton.tsx index 6eb409bf..eb61b3ec 100644 --- a/ui.frontend/src/components/ExecutionAbortButton.tsx +++ b/ui.frontend/src/components/ExecutionAbortButton.tsx @@ -1,6 +1,9 @@ -import { Button, ButtonGroup, Content, Dialog, DialogTrigger, Divider, Heading, Text } from '@adobe/react-spectrum'; +import { Button, ButtonGroup, Content, Dialog, DialogTrigger, Divider, Heading, InlineAlert, Text } from '@adobe/react-spectrum'; import { ToastQueue } from '@react-spectrum/toast'; +import AlertIcon from '@spectrum-icons/workflow/Alert'; import Cancel from '@spectrum-icons/workflow/Cancel'; +import CheckmarkCircle from '@spectrum-icons/workflow/CheckmarkCircle'; +import CloseCircle from '@spectrum-icons/workflow/CloseCircle'; import StopCircle from '@spectrum-icons/workflow/StopCircle'; import React, { useState } from 'react'; import { useDeepCompareEffect } from 'react-use'; @@ -78,8 +81,22 @@ const ExecutionAbortButton: React.FC = ({ execution, -

This action will abort current code execution.

-

Be aware that aborting execution may leave data in an inconsistent state.

+

+ The abort request signals the script to stop, but the script must explicitly check for this signal by calling context.checkAborted(). +

+

+ If the script doesn't check for abort, it will continue running until it completes naturally. Only if an abort timeout is configured (by default it's not), will the execution be forcefully terminated after the timeout expires. +

+

+ For scripts with loops or long-running operations, add context.checkAborted() at safe checkpoints (e.g., at the beginning of each loop iteration) to enable graceful termination and prevent data corruption. +

+ + + Warning + + Proceed with aborting only if the requirements above are met. + +