Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ It works seamlessly across AEM on-premise, AMS, and AEMaaCS environments.
- [Outputs example](#outputs-example)
- [ACL example](#acl-example)
- [Repo example](#repo-example)
- [Abortable example](#abortable-example)
- [History](#history)
- [Extension scripts](#extension-scripts)
- [Example extension script](#example-extension-script)
Expand Down Expand Up @@ -421,6 +422,38 @@ void doRun() {

<img src="docs/screenshot-content-script-repo-output.png" width="720" alt="ACM ACL Repo Output">

#### Abortable example

For long-running scripts that process many nodes, it's important to support graceful abortion. This allows users to stop the script execution without leaving the repository in an inconsistent state.

```groovy
void doRun() {
repo.queryRaw("SELECT * FROM [nt:base] WHERE ISDESCENDANTNODE('/content/acme/us/en')").forEach { resource ->
// Safe point
context.checkAborted()

// Process resource
// TODO resource.save() etc.
}
}
```

Alternatively, you can use `context.isAborted()` for manual control:

```groovy
void doRun() {
def assets = repo.queryRaw("SELECT * FROM [dam:Asset] WHERE ISDESCENDANTNODE('/content/dam')").iterator()
for (asset in assets) {
if (context.isAborted()) {
// Do clean when aborted
break
}
}
// Still remember to propagate abort status
context.checkAborted()
}
```

### History

All code executions are logged in the history. You can see the status of each execution, including whether it was successful or failed. The history also provides detailed logs for each execution, including any errors that occurred.
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/dev/vml/es/acm/core/code/AbortException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package dev.vml.es.acm.core.code;

public class AbortException extends RuntimeException {

public AbortException(String message) {
super(message);
}

public AbortException(String message, Throwable cause) {
super(message, cause);
}
}
18 changes: 18 additions & 0 deletions core/src/main/java/dev/vml/es/acm/core/code/ExecutionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,23 @@ public void setSkipped(boolean skipped) {
this.skipped = skipped;
}

public boolean isAborted() {
return getCodeContext()
.getOsgiContext()
.getService(ExecutionQueue.class)
.isAborted(getId());
}

public void abort() {
throw new AbortException("Execution aborted gracefully!");
}

public void checkAborted() throws AbortException {
if (isAborted()) {
abort();
}
}

public Inputs getInputs() {
return inputs;
}
Expand All @@ -174,6 +191,7 @@ public Conditions getConditions() {
private void customizeBinding() {
Binding binding = getCodeContext().getBinding();

binding.setVariable("context", this);
binding.setVariable("schedules", schedules);
binding.setVariable("arguments", inputs); // TODO deprecated
binding.setVariable("inputs", inputs);
Expand Down
127 changes: 93 additions & 34 deletions core/src/main/java/dev/vml/es/acm/core/code/ExecutionQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import dev.vml.es.acm.core.util.StreamUtils;
import java.util.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -52,7 +53,12 @@ public class ExecutionQueue implements JobExecutor, EventListener {
@AttributeDefinition(
name = "Async Poll Interval",
description = "Interval in milliseconds to poll for job status.")
long asyncPollInterval() default 500L;
long asyncPollInterval() default 750L;

@AttributeDefinition(
name = "Abort Timeout",
description = "Time in milliseconds to wait for graceful abort before forcing it.")
long abortTimeout() default -1;
}

@Reference
Expand All @@ -69,6 +75,8 @@ public class ExecutionQueue implements JobExecutor, EventListener {

private ExecutorService jobAsyncExecutor;

private final Map<String, Boolean> jobAborted = new ConcurrentHashMap<>();
Copy link
Collaborator Author

@krystian-panek-vmltech krystian-panek-vmltech Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jobManager.stopByJobId() does not make job.getJobState() == STOPPED but .. ACTIVE remains :(

only makes internal flag JobExecutionContext.isStopped() true

that's why I need to track/expose it on my own


private Config config;

@Activate
Expand Down Expand Up @@ -128,6 +136,10 @@ public Optional<Execution> findByExecutableId(String executableId) {
.findFirst();
}

public boolean isAborted(String executionId) {
return Boolean.TRUE.equals(jobAborted.get(executionId));
}

public Stream<ExecutionSummary> findAllSummaries() {
return findJobs().map(job -> new QueuedExecutionSummary(executor, job));
}
Expand Down Expand Up @@ -208,46 +220,93 @@ public JobExecutionResult process(Job job, JobExecutionContext context) {
}
});

while (!future.isDone()) {
if (context.isStopped() || Thread.currentThread().isInterrupted()) {
future.cancel(true);
LOG.debug("Execution is cancelling '{}'", queuedExecution);
break;
try {
Long abortStartTime = null;
while (!future.isDone()) {
if (context.isStopped()) {
if (abortStartTime == null) {
abortStartTime = System.currentTimeMillis();
jobAborted.put(job.getId(), Boolean.TRUE);

if (config.abortTimeout() < 0) {
LOG.debug("Execution is aborting gracefully '{}' (no timeout)", queuedExecution);
} else {
LOG.debug(
"Execution is aborting '{}' (timeout: {}ms)",
queuedExecution,
config.abortTimeout());
}
} else if (config.abortTimeout() >= 0) {
long abortDuration = System.currentTimeMillis() - abortStartTime;
if (abortDuration >= config.abortTimeout()) {
LOG.debug(
"Execution abort timeout exceeded ({}ms), forcing abort '{}'",
abortDuration,
queuedExecution);
future.cancel(true);
break;
}
}
}

try {
Thread.sleep(config.asyncPollInterval());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.debug("Execution is interrupted '{}'", queuedExecution);
return context.result().cancelled();
}
}

try {
Thread.sleep(config.asyncPollInterval());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.debug("Execution is interrupted '{}'", queuedExecution);
return context.result().cancelled();
Execution immediateExecution = future.get();

if (immediateExecution.getStatus() == ExecutionStatus.SKIPPED) {
LOG.debug("Execution skipped '{}'", immediateExecution);
return context.result()
.message(QueuedMessage.of(ExecutionStatus.SKIPPED, null)
.toJson())
.cancelled();
} else {
LOG.debug("Execution succeeded '{}'", immediateExecution);
return context.result().succeeded();
}
} catch (CancellationException e) {
LOG.debug("Execution aborted forcefully '{}'", queuedExecution);
return context.result()
.message(QueuedMessage.of(ExecutionStatus.ABORTED, ExceptionUtils.toString(e))
.toJson())
.cancelled();
} catch (Exception e) {
AbortException abortException = findAbortException(e);
if (abortException != null) {
LOG.debug("Execution aborted gracefully '{}'", queuedExecution);
return context.result()
.message(QueuedMessage.of(ExecutionStatus.ABORTED, ExceptionUtils.toString(abortException))
.toJson())
.cancelled();
}

LOG.debug("Execution failed '{}'", queuedExecution, e);
return context.result()
.message(QueuedMessage.of(ExecutionStatus.FAILED, ExceptionUtils.toString(e))
.toJson())
.failed();
}
} finally {
jobAborted.remove(job.getId());
}
}

try {
Execution immediateExecution = future.get();

if (immediateExecution.getStatus() == ExecutionStatus.SKIPPED) {
LOG.debug("Execution skipped '{}'", immediateExecution);
return context.result()
.message(QueuedMessage.of(ExecutionStatus.SKIPPED, null).toJson())
.cancelled();
} else {
LOG.info("Execution succeeded '{}'", immediateExecution);
return context.result().succeeded();
private AbortException findAbortException(Throwable e) {
Throwable current = e;
while (current != null) {
if (current instanceof AbortException) {
return (AbortException) current;
}
} catch (CancellationException e) {
LOG.warn("Execution aborted '{}'", queuedExecution);
return context.result()
.message(QueuedMessage.of(ExecutionStatus.ABORTED, ExceptionUtils.toString(e))
.toJson())
.cancelled();
} catch (Exception e) {
LOG.error("Execution failed '{}'", queuedExecution, e);
return context.result()
.message(QueuedMessage.of(ExecutionStatus.FAILED, ExceptionUtils.toString(e))
.toJson())
.failed();
current = current.getCause();
}
return null;
}

private Execution executeAsync(ExecutionContextOptions contextOptions, QueuedExecution execution)
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/java/dev/vml/es/acm/core/code/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,18 +230,28 @@ private ContextualExecution executeInternal(ExecutionContext context) {
context.getOut().withLoggerTimestamps(config.logPrintingTimestamps());
}
contentScript.run();

LOG.info("Execution succeeded '{}'", context.getId());
return execution.end(ExecutionStatus.SUCCEEDED);
} finally {
if (locking) {
useLocker(resolverFactory, l -> l.unlock(lockName));
}
}
} catch (Throwable e) {
} catch (AbortException e) {
LOG.warn("Execution aborted gracefully '{}'", context.getId());
execution.error(e);
return execution.end(ExecutionStatus.ABORTED);
} catch (Throwable e) {
if ((e.getCause() != null && e.getCause() instanceof InterruptedException)) {
LOG.warn("Execution aborted forcefully '{}'", context.getId());
execution.error(e);
return execution.end(ExecutionStatus.ABORTED);
} else {
LOG.error("Execution failed '{}'", context.getId(), e);
execution.error(e);
return execution.end(ExecutionStatus.FAILED);
}
return execution.end(ExecutionStatus.FAILED);
} finally {
statuses.remove(context.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ public static String toString(Throwable cause) {
.map(org.apache.commons.lang3.exception.ExceptionUtils::getStackTrace)
.orElse(null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ content: |
println "Updating resources..."
def max = 20
for (int i = 0; i < max; i++) {
context.checkAborted()
Thread.sleep(1000)
println "Updated (\${i + 1}/\${max})"
}
Expand Down
23 changes: 20 additions & 3 deletions ui.frontend/src/components/ExecutionAbortButton.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { Button, ButtonGroup, Content, Dialog, DialogTrigger, Divider, Heading, Text } from '@adobe/react-spectrum';
import { Button, ButtonGroup, Content, Dialog, DialogTrigger, Divider, Heading, InlineAlert, Text } from '@adobe/react-spectrum';
import { ToastQueue } from '@react-spectrum/toast';
import AlertIcon from '@spectrum-icons/workflow/Alert';
import Cancel from '@spectrum-icons/workflow/Cancel';
import CheckmarkCircle from '@spectrum-icons/workflow/CheckmarkCircle';
import CloseCircle from '@spectrum-icons/workflow/CloseCircle';
import StopCircle from '@spectrum-icons/workflow/StopCircle';
import React, { useState } from 'react';
import { useDeepCompareEffect } from 'react-use';
Expand Down Expand Up @@ -78,8 +81,22 @@ const ExecutionAbortButton: React.FC<ExecutionAbortButtonProps> = ({ execution,
</Heading>
<Divider />
<Content>
<p>This action will abort current code execution.</p>
<p>Be aware that aborting execution may leave data in an inconsistent state.</p>
<p>
<CheckmarkCircle size="XS" /> The abort request signals the script to stop, but the script must explicitly check for this signal by calling <code>context.checkAborted()</code>.
</p>
<p>
<CloseCircle size="XS" /> If the script doesn't check for abort, it will continue running until it completes naturally. Only if an abort timeout is configured (by default it's not), will the execution be forcefully terminated after the timeout expires.
</p>
<p>
<AlertIcon size="XS" /> For scripts with loops or long-running operations, add <code>context.checkAborted()</code> at safe checkpoints (e.g., at the beginning of each loop iteration) to enable graceful termination and prevent data corruption.
</p>

<InlineAlert width="100%" variant="negative" UNSAFE_style={{ padding: '8px' }} marginTop="size-200">
<Heading>Warning</Heading>
<Content>
Proceed with aborting only if the requirements above are met.
</Content>
</InlineAlert>
</Content>
<ButtonGroup>
<Button variant="secondary" onPress={() => setShowDialog(false)} isDisabled={isAborting}>
Expand Down