Skip to content

Commit

Permalink
[JBPM-9334] Process execution stuck at ExecWorkItemHandler node if co…
Browse files Browse the repository at this point in the history
…mmand is long running
  • Loading branch information
abhijithumbe committed Sep 13, 2020
1 parent b7efa41 commit c5cebe6
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 49 deletions.
Expand Up @@ -17,9 +17,13 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.Period;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
Expand Down Expand Up @@ -48,7 +52,8 @@
parameters = {
@WidParameter(name = "Command", required = true),
@WidParameter(name = "Arguments", runtimeType = "java.util.List"),
@WidParameter(name = "CommandExecutionTimeout", runtimeType = "java.lang.String")
@WidParameter(name = "CommandExecutionTimeout", runtimeType = "java.lang.String"),
@WidParameter(name = "TimeoutInSeconds", runtimeType = "java.lang.String")
},
results = {
@WidResult(name = "Output")
Expand All @@ -66,7 +71,7 @@ public class ExecWorkItemHandler extends AbstractLogOrThrowWorkItemHandler {
private static final Logger logger = LoggerFactory.getLogger(ExecWorkItemHandler.class);
public static final String RESULT = "Output";
private String parsedCommandStr = "";
private long defaultTimeoutInSeconds=0;
private long defaultTimeout = 4L;

public void executeWorkItem(WorkItem workItem,
WorkItemManager manager) {
Expand All @@ -78,47 +83,18 @@ public void executeWorkItem(WorkItem workItem,

String command = (String) workItem.getParameter("Command");
List<String> arguments = (List<String>) workItem.getParameter("Arguments");
String commandExecutionTimeout = (String) workItem.getParameter("CommandExecutionTimeout");
String commandExecutionTimeout = (String) workItem.getParameter("TimeoutInSeconds");

if (commandExecutionTimeout ==null) {
this.SetDefaultTimeoutInSeconds(4L);
} else {
this.SetDefaultTimeoutInSeconds(Long.parseLong(commandExecutionTimeout));
if (commandExecutionTimeout != null) {
this.setDefaultTimeoutInSeconds(parsetimeout(commandExecutionTimeout));
}

Map<String, Object> results = new HashMap<>();
String executionResult = executecommand(command, arguments, defaultTimeout);

CommandLine commandLine = CommandLine.parse(command);
if (arguments != null && arguments.size() > 0) {
commandLine.addArguments(arguments.toArray(new String[0]),
true);
}

parsedCommandStr = commandLine.toString();

ExecuteWatchdog watchdog = new ExecuteWatchdog(java.time.Duration.ofSeconds(defaultTimeoutInSeconds).toMillis());
DefaultExecutor executor = new DefaultExecutor();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream);
executor.setStreamHandler(streamHandler);
executor.setWatchdog(watchdog);
try {
executor.execute(commandLine);
} catch (IOException e) {
if (watchdog.killedProcess()) {
logger.debug("A timeout occured after "+java.time.Duration.ofSeconds(defaultTimeoutInSeconds).toMillis()+"ms while executing a command "+parsedCommandStr.replace(",", ""));
results.put(RESULT, outputStream.toString());
outputStream.close();
manager.completeWorkItem(workItem.getId(), results);
} else {
throw e;
}
}
Map<String, Object> results = new HashMap<>();

results.put(RESULT,
outputStream.toString());

outputStream.close();
executionResult);

manager.completeWorkItem(workItem.getId(),
results);
Expand All @@ -127,13 +103,70 @@ public void executeWorkItem(WorkItem workItem,
}
}

protected long parsetimeout(String durationStr) {
try {
if (durationStr.startsWith("PT")) { // ISO-8601 PTnHnMn.nS
return TimeUnit.MILLISECONDS.toSeconds(Duration.parse(durationStr).toMillis());
} else if (!durationStr.contains("T")) { // ISO-8601 PnYnMnWnD
Period period = Period.parse(durationStr);
OffsetDateTime now = OffsetDateTime.now();
return TimeUnit.MILLISECONDS.toSeconds(Duration.between(now, now.plus(period)).toMillis());
} else { // ISO-8601 PnYnMnWnDTnHnMn.nS
String[] elements = durationStr.split("T");
Period period = Period.parse(elements[0]);
Duration duration = Duration.parse("PT" + elements[1]);
OffsetDateTime now = OffsetDateTime.now();

return TimeUnit.MILLISECONDS
.toSeconds(Duration.between(now, now.plus(period).plus(duration)).toMillis());
}
} catch (Exception e) {
logger.error("Exception occured while parsing provided timeout" + durationStr
+ ".Default timeout will be used for command execution");
return defaultTimeout;
}
}

protected String executecommand(String command, List<String> arguments, long timeout) throws IOException {

String result = null;
CommandLine commandLine = CommandLine.parse(command);
if (arguments != null && arguments.size() > 0) {
commandLine.addArguments(arguments.toArray(new String[0]), true);
}

parsedCommandStr = commandLine.toString();

ExecuteWatchdog watchdog = new ExecuteWatchdog(Duration.ofSeconds(defaultTimeout).toMillis());
DefaultExecutor executor = new DefaultExecutor();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream);
executor.setStreamHandler(streamHandler);
executor.setWatchdog(watchdog);
try {
executor.execute(commandLine);
} catch (IOException e) {
if (watchdog.killedProcess()) {
logger.error("A timeout occured after " + Duration.ofSeconds(defaultTimeout)
+ "sec while executing a command " + parsedCommandStr.replace(",", ""));
outputStream.reset();
outputStream.close();
return result = "A timeout occured after " + Duration.ofSeconds(defaultTimeout)
+ "sec while executing a command " + parsedCommandStr.replace(",", "");
}
}

return outputStream.toString();

}

public void abortWorkItem(WorkItem workItem,
WorkItemManager manager) {
// Do nothing, this work item cannot be aborted
}

public void SetDefaultTimeoutInSeconds(long defaultTimeoutInSeconds) {
this.defaultTimeoutInSeconds = defaultTimeoutInSeconds;
public void setDefaultTimeoutInSeconds(long defaultTimeout) {
this.defaultTimeout = defaultTimeout;
}

public String getParsedCommandStr() {
Expand Down
Expand Up @@ -86,32 +86,38 @@ public void testExecCommandWithArguments() throws Exception {
assertTrue(result.contains("java version") || result.contains("jdk version"));
}

@Test(timeout=6000)
@Test(timeout = 6000)
public void testExecCommandWithTimeout() throws Exception {

TestWorkItemManager manager = new TestWorkItemManager();
WorkItemImpl workItem = new WorkItemImpl();
workItem.setParameter("Command", "ping");
workItem.setParameter("Command",
"ping");
List<String> argumentList = new ArrayList<>();
argumentList.add("127.0.0.1");
workItem.setParameter("Arguments", argumentList);
workItem.setParameter("CommandExecutionTimeout", "5");
workItem.setParameter("Arguments",
argumentList);
workItem.setParameter("TimeoutInSeconds",
"PT5S");
ExecWorkItemHandler handler = new ExecWorkItemHandler();
handler.setLogThrownException(true);

handler.executeWorkItem(workItem, manager);

assertNotNull(manager.getResults());
assertEquals(1, manager.getResults().size());
handler.executeWorkItem(workItem,
manager);

assertNotNull(manager.getResults());
assertEquals(1,
manager.getResults().size());
assertTrue(manager.getResults().containsKey(workItem.getId()));

Map<String, Object> results = ((TestWorkItemManager) manager).getResults(workItem.getId());
String result = (String) results.get(ExecWorkItemHandler.RESULT);

assertEquals("[ping, 127.0.0.1]", handler.getParsedCommandStr());
assertEquals("[ping, 127.0.0.1]",
handler.getParsedCommandStr());

assertNotNull(result);
assertTrue(result.contains("time="));
assertTrue(result.contains("A timeout occured"));

}

Expand Down

0 comments on commit c5cebe6

Please sign in to comment.