From 355e2739e568ba5daf05f1c598c10ee90bb86ecf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Fri, 9 May 2025 10:05:20 +0100 Subject: [PATCH 1/2] lib: Add async command execution. Allow stdin forwarding. #TASK-7524 --- .../java/org/opencb/commons/exec/Command.java | 172 +++++++++++------- .../commons/exec/NamedThreadFactory.java | 60 ++++++ .../opencb/commons/exec/RunnableProcess.java | 2 +- .../opencb/commons/exec/SingleProcess.java | 2 +- .../org/opencb/commons/exec/CommandTest.java | 35 ++++ 5 files changed, 207 insertions(+), 64 deletions(-) create mode 100644 commons-lib/src/main/java/org/opencb/commons/exec/NamedThreadFactory.java create mode 100644 commons-lib/src/test/java/org/opencb/commons/exec/CommandTest.java diff --git a/commons-lib/src/main/java/org/opencb/commons/exec/Command.java b/commons-lib/src/main/java/org/opencb/commons/exec/Command.java index efcbf7f2f..0e2294ac7 100755 --- a/commons-lib/src/main/java/org/opencb/commons/exec/Command.java +++ b/commons-lib/src/main/java/org/opencb/commons/exec/Command.java @@ -20,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -27,6 +28,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.stream.Collectors; public class Command extends RunnableProcess { @@ -35,6 +40,7 @@ public class Command extends RunnableProcess { // protected String pathScript; // protected String outDir; // protected Arguments arguments; + private static final int STDIN_BUFFER_SIZE = 128 * 1024; private String commandLine; private Map environment; @@ -47,6 +53,10 @@ public class Command extends RunnableProcess { private OutputStream outputOutputStream = null; private OutputStream errorOutputStream = null; + private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool( + new NamedThreadFactory("command") + .setDaemon(true)); + private final String[] cmdArray; private boolean printOutput = true; @@ -73,8 +83,16 @@ public Command(String[] cmdArray, Map environment) { this.environment = environment; } + public Future async() { + return run(true); + } + @Override public void run() { + run(false); + } + + public Future run(boolean background) { try { startTime(); logger.debug(Commandline.describeCommand(cmdArray)); @@ -92,32 +110,19 @@ public void run() { } setStatus(Status.RUNNING); - InputStream is = proc.getInputStream(); - // Thread out = readOutputStream(is); - Thread readOutputStreamThread = readOutputStream(is); - InputStream es = proc.getErrorStream(); - // Thread err = readErrorStream(es); - Thread readErrorStreamThread = readErrorStream(es); - - proc.waitFor(); - readOutputStreamThread.join(); - readErrorStreamThread.join(); - endTime(); - - setExitValue(proc.exitValue()); - if (proc.exitValue() != 0) { - status = Status.ERROR; - // output = IOUtils.toString(proc.getInputStream()); - // error = IOUtils.toString(proc.getErrorStream()); - output = outputBuffer.toString(); - error = errorBuffer.toString(); - } - if (status != Status.KILLED && status != Status.TIMEOUT && status != Status.ERROR) { - status = Status.DONE; - // output = IOUtils.toString(proc.getInputStream()); - // error = IOUtils.toString(proc.getErrorStream()); - output = outputBuffer.toString(); - error = errorBuffer.toString(); + InputStream stdout = proc.getInputStream(); + Future readOutputStreamThread = readOutputStream(stdout); + InputStream stderr = proc.getErrorStream(); + Future readErrorStreamThread = readErrorStream(stderr); + + if (background) { + // Wait in the background + return EXECUTOR_SERVICE.submit(() -> { + waitFor(readOutputStreamThread, readErrorStreamThread); + return getStatus(); + }); + } else { + waitFor(readOutputStreamThread, readErrorStreamThread); } } catch (RuntimeException | IOException | InterruptedException e) { @@ -129,67 +134,97 @@ public void run() { exitValue = -1; logger.error("Exception occurred while executing Command " + exception, e); } + return null; + } + + private void waitFor(Future readOutputStreamThread, Future readErrorStreamThread) throws InterruptedException { + proc.waitFor(); + try { + readOutputStreamThread.get(); + readErrorStreamThread.get(); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + endTime(); + + setExitValue(proc.exitValue()); + if (proc.exitValue() != 0) { + status = Status.ERROR; + // output = IOUtils.toString(proc.getInputStream()); + // error = IOUtils.toString(proc.getErrorStream()); + output = outputBuffer.toString(); + error = errorBuffer.toString(); + } + if (status != Status.KILLED && status != Status.TIMEOUT && status != Status.ERROR) { + status = Status.DONE; + // output = IOUtils.toString(proc.getInputStream()); + // error = IOUtils.toString(proc.getErrorStream()); + output = outputBuffer.toString(); + error = errorBuffer.toString(); + } } @Override public void destroy() { - if (proc != null) { + if (proc != null && proc.isAlive()) { proc.destroy(); setStatus(Status.KILLED); } } - private Thread readOutputStream(InputStream ins) throws IOException { + private Future readOutputStream(InputStream ins) throws IOException { return readStream("stdout", outputOutputStream, outputBuffer, ins); } - private Thread readErrorStream(InputStream ins) throws IOException { + private Future readErrorStream(InputStream ins) throws IOException { return readStream("stderr", errorOutputStream, errorBuffer, ins); } - private Thread readStream(String outputName, OutputStream outputStream, StringBuffer stringBuffer, InputStream in) { - Thread thread = new Thread(() -> { + private Future readStream(String outputName, OutputStream outputStream, StringBuffer stringBuffer, InputStream in) { + return EXECUTOR_SERVICE.submit(() -> { try { - int bytesRead = 0; - int bufferLength; - byte[] buffer; + int bytesRead = 0; + int bufferLength; + byte[] buffer; - while (bytesRead != -1) { - // int x=in.available(); - // if (x<=0) - // continue ; + while (bytesRead != -1) { + // int x=in.available(); + // if (x<=0) + // continue ; - bufferLength = in.available(); - bufferLength = Math.max(bufferLength, 1); + bufferLength = in.available(); + bufferLength = Math.max(bufferLength, 1); - buffer = new byte[bufferLength]; - bytesRead = in.read(buffer, 0, bufferLength); + buffer = new byte[bufferLength]; + bytesRead = in.read(buffer, 0, bufferLength); - if (bytesRead == 0) { - Thread.sleep(500); + if (bytesRead == 0) { + Thread.sleep(500); + if (logger.isTraceEnabled()) { logger.trace(outputName + " - Sleep"); - } else if (bytesRead > 0) { + } + } else if (bytesRead > 0) { + if (logger.isTraceEnabled()) { logger.trace(outputName + " - last bytesRead = {})", bytesRead); - if (printOutput) { - System.err.print(new String(buffer)); - } - - if (outputStream == null) { - stringBuffer.append(new String(buffer)); - } else { - outputStream.write(buffer); - outputStream.flush(); - } + } + if (printOutput) { + System.err.print(new String(buffer)); + } + + if (outputStream == null) { + stringBuffer.append(new String(buffer)); + } else { + outputStream.write(buffer); + outputStream.flush(); } } - logger.debug("Read {} - Exit while", outputName); - } catch (Exception ex) { - logger.error("Error reading " + outputName, ex); - exception = ex.toString(); } - }, outputName + "_reader"); - thread.start(); - return thread; + logger.debug("Read {} - Exit while", outputName); + } catch (Exception ex) { + logger.error("Error reading " + outputName, ex); + exception = ex.toString(); + } + }); } /** @@ -275,4 +310,17 @@ public Command setPrintOutput(boolean printOutput) { this.printOutput = printOutput; return this; } + + public Process getProc() { + return proc; + } + + public DataOutputStream getStdin() { + if (status == Status.RUNNING) { + return new DataOutputStream(proc.getOutputStream()); + } else { + throw new IllegalStateException("Process is not running. Process status: " + status); + } + } + } diff --git a/commons-lib/src/main/java/org/opencb/commons/exec/NamedThreadFactory.java b/commons-lib/src/main/java/org/opencb/commons/exec/NamedThreadFactory.java new file mode 100644 index 000000000..fee4cb302 --- /dev/null +++ b/commons-lib/src/main/java/org/opencb/commons/exec/NamedThreadFactory.java @@ -0,0 +1,60 @@ +package org.opencb.commons.exec; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class NamedThreadFactory implements ThreadFactory { + + private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String prefix; + private Integer priority; + private boolean daemon; + + public NamedThreadFactory(String namePrefix) { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + prefix = namePrefix + "-" + + POOL_NUMBER.getAndIncrement() + + "-thread-"; + priority = Thread.NORM_PRIORITY; + daemon = false; + } + + public NamedThreadFactory setPriority(Integer priority) { + this.priority = priority; + return this; + } + + /** + * Marks this thread as either a {@linkplain #isDaemon daemon} thread + * or a user thread. The Java Virtual Machine exits when the only + * threads running are all daemon threads. + * + * @param daemon true to mark this thread as a daemon thread; + * @return this thread factory + */ + public NamedThreadFactory setDaemon(boolean daemon) { + this.daemon = daemon; + return this; + } + + public boolean isDaemon() { + return daemon; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, prefix + threadNumber.getAndIncrement()); + + t.setDaemon(daemon); + + if (priority != null) { + if (t.getPriority() != priority) { + t.setPriority(priority); + } + } + return t; + } +} diff --git a/commons-lib/src/main/java/org/opencb/commons/exec/RunnableProcess.java b/commons-lib/src/main/java/org/opencb/commons/exec/RunnableProcess.java index 6897fad4a..0e19f6f30 100755 --- a/commons-lib/src/main/java/org/opencb/commons/exec/RunnableProcess.java +++ b/commons-lib/src/main/java/org/opencb/commons/exec/RunnableProcess.java @@ -28,7 +28,7 @@ public abstract class RunnableProcess implements Runnable { protected String exception; protected int exitValue; - protected Status status; + protected Status status = Status.WAITING; public enum Status {WAITING, RUNNING, DONE, ERROR, TIMEOUT, KILLED} diff --git a/commons-lib/src/main/java/org/opencb/commons/exec/SingleProcess.java b/commons-lib/src/main/java/org/opencb/commons/exec/SingleProcess.java index f5dc7073e..7a641f622 100755 --- a/commons-lib/src/main/java/org/opencb/commons/exec/SingleProcess.java +++ b/commons-lib/src/main/java/org/opencb/commons/exec/SingleProcess.java @@ -22,7 +22,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; - +@Deprecated public class SingleProcess { private Logger logger; diff --git a/commons-lib/src/test/java/org/opencb/commons/exec/CommandTest.java b/commons-lib/src/test/java/org/opencb/commons/exec/CommandTest.java new file mode 100644 index 000000000..d1b8fd718 --- /dev/null +++ b/commons-lib/src/test/java/org/opencb/commons/exec/CommandTest.java @@ -0,0 +1,35 @@ +package org.opencb.commons.exec; + +import org.junit.Test; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class CommandTest { + + + @Test + public void testCommand() throws Exception { + // Test the Command class +// String commandLine = "bash -c 'echo Hello World ; echo Error Message >&2 ; cat -'"; +// String commandLine = "cat"; + String commandLine = "bash -c 'cat <(echo -n \"Hello - \") - <(echo -n \" - END\")'"; + Command command = new Command(commandLine); + command.setPrintOutput(false); + + // Execute the command + Future future = command.async(); + DataOutputStream stdin = command.getStdin(); + stdin.write("World from STDIN".getBytes()); + stdin.close(); + future.get(1000, TimeUnit.MILLISECONDS); + assertEquals(RunnableProcess.Status.DONE, command.getStatus()); + assertEquals("Hello - World from STDIN - END", command.getOutput()); + assertEquals("", command.getError()); + } + +} \ No newline at end of file From 64fbb5fa59d3490e250257be0960ded85c9ea998 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Fri, 9 May 2025 10:09:01 +0100 Subject: [PATCH 2/2] lib: Remove unused constant #TASK-7524 --- commons-lib/src/main/java/org/opencb/commons/exec/Command.java | 1 - 1 file changed, 1 deletion(-) diff --git a/commons-lib/src/main/java/org/opencb/commons/exec/Command.java b/commons-lib/src/main/java/org/opencb/commons/exec/Command.java index 0e2294ac7..dd9e7182e 100755 --- a/commons-lib/src/main/java/org/opencb/commons/exec/Command.java +++ b/commons-lib/src/main/java/org/opencb/commons/exec/Command.java @@ -40,7 +40,6 @@ public class Command extends RunnableProcess { // protected String pathScript; // protected String outDir; // protected Arguments arguments; - private static final int STDIN_BUFFER_SIZE = 128 * 1024; private String commandLine; private Map environment;