Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 109 additions & 62 deletions commons-lib/src/main/java/org/opencb/commons/exec/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

public class Command extends RunnableProcess {
Expand All @@ -47,6 +52,10 @@ public class Command extends RunnableProcess {
private OutputStream outputOutputStream = null;
private OutputStream errorOutputStream = null;

private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool(
new NamedThreadFactory("command")
.setDaemon(true));

private final String[] cmdArray;
private boolean printOutput = true;

Expand All @@ -73,8 +82,16 @@ public Command(String[] cmdArray, Map<String, String> environment) {
this.environment = environment;
}

public Future<Status> async() {
return run(true);
}

@Override
public void run() {
run(false);
}

public Future<Status> run(boolean background) {
try {
startTime();
logger.debug(Commandline.describeCommand(cmdArray));
Expand All @@ -92,32 +109,19 @@ public void run() {
}
setStatus(Status.RUNNING);

InputStream is = proc.getInputStream();
// Thread out = readOutputStream(is);
Thread readOutputStreamThread = readOutputStream(is);
InputStream es = proc.getErrorStream();
// Thread err = readErrorStream(es);
Thread readErrorStreamThread = readErrorStream(es);

proc.waitFor();
readOutputStreamThread.join();
readErrorStreamThread.join();
endTime();

setExitValue(proc.exitValue());
if (proc.exitValue() != 0) {
status = Status.ERROR;
// output = IOUtils.toString(proc.getInputStream());
// error = IOUtils.toString(proc.getErrorStream());
output = outputBuffer.toString();
error = errorBuffer.toString();
}
if (status != Status.KILLED && status != Status.TIMEOUT && status != Status.ERROR) {
status = Status.DONE;
// output = IOUtils.toString(proc.getInputStream());
// error = IOUtils.toString(proc.getErrorStream());
output = outputBuffer.toString();
error = errorBuffer.toString();
InputStream stdout = proc.getInputStream();
Future<?> readOutputStreamThread = readOutputStream(stdout);
InputStream stderr = proc.getErrorStream();
Future<?> readErrorStreamThread = readErrorStream(stderr);

if (background) {
// Wait in the background
return EXECUTOR_SERVICE.submit(() -> {
waitFor(readOutputStreamThread, readErrorStreamThread);
return getStatus();
});
} else {
waitFor(readOutputStreamThread, readErrorStreamThread);
}

} catch (RuntimeException | IOException | InterruptedException e) {
Expand All @@ -129,67 +133,97 @@ public void run() {
exitValue = -1;
logger.error("Exception occurred while executing Command " + exception, e);
}
return null;
}

private void waitFor(Future<?> readOutputStreamThread, Future<?> readErrorStreamThread) throws InterruptedException {
proc.waitFor();
try {
readOutputStreamThread.get();
readErrorStreamThread.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
endTime();

setExitValue(proc.exitValue());
if (proc.exitValue() != 0) {
status = Status.ERROR;
// output = IOUtils.toString(proc.getInputStream());
// error = IOUtils.toString(proc.getErrorStream());
output = outputBuffer.toString();
error = errorBuffer.toString();
}
if (status != Status.KILLED && status != Status.TIMEOUT && status != Status.ERROR) {
status = Status.DONE;
// output = IOUtils.toString(proc.getInputStream());
// error = IOUtils.toString(proc.getErrorStream());
output = outputBuffer.toString();
error = errorBuffer.toString();
}
}

@Override
public void destroy() {
if (proc != null) {
if (proc != null && proc.isAlive()) {
proc.destroy();
setStatus(Status.KILLED);
}
}

private Thread readOutputStream(InputStream ins) throws IOException {
private Future<?> readOutputStream(InputStream ins) throws IOException {
return readStream("stdout", outputOutputStream, outputBuffer, ins);
}

private Thread readErrorStream(InputStream ins) throws IOException {
private Future<?> readErrorStream(InputStream ins) throws IOException {
return readStream("stderr", errorOutputStream, errorBuffer, ins);
}

private Thread readStream(String outputName, OutputStream outputStream, StringBuffer stringBuffer, InputStream in) {
Thread thread = new Thread(() -> {
private Future<?> readStream(String outputName, OutputStream outputStream, StringBuffer stringBuffer, InputStream in) {
return EXECUTOR_SERVICE.submit(() -> {
try {
int bytesRead = 0;
int bufferLength;
byte[] buffer;
int bytesRead = 0;
int bufferLength;
byte[] buffer;

while (bytesRead != -1) {
// int x=in.available();
// if (x<=0)
// continue ;
while (bytesRead != -1) {
// int x=in.available();
// if (x<=0)
// continue ;

bufferLength = in.available();
bufferLength = Math.max(bufferLength, 1);
bufferLength = in.available();
bufferLength = Math.max(bufferLength, 1);

buffer = new byte[bufferLength];
bytesRead = in.read(buffer, 0, bufferLength);
buffer = new byte[bufferLength];
bytesRead = in.read(buffer, 0, bufferLength);

if (bytesRead == 0) {
Thread.sleep(500);
if (bytesRead == 0) {
Thread.sleep(500);
if (logger.isTraceEnabled()) {
logger.trace(outputName + " - Sleep");
} else if (bytesRead > 0) {
}
} else if (bytesRead > 0) {
if (logger.isTraceEnabled()) {
logger.trace(outputName + " - last bytesRead = {})", bytesRead);
if (printOutput) {
System.err.print(new String(buffer));
}

if (outputStream == null) {
stringBuffer.append(new String(buffer));
} else {
outputStream.write(buffer);
outputStream.flush();
}
}
if (printOutput) {
System.err.print(new String(buffer));
}

if (outputStream == null) {
stringBuffer.append(new String(buffer));
} else {
outputStream.write(buffer);
outputStream.flush();
}
}
logger.debug("Read {} - Exit while", outputName);
} catch (Exception ex) {
logger.error("Error reading " + outputName, ex);
exception = ex.toString();
}
}, outputName + "_reader");
thread.start();
return thread;
logger.debug("Read {} - Exit while", outputName);
} catch (Exception ex) {
logger.error("Error reading " + outputName, ex);
exception = ex.toString();
}
});
}

/**
Expand Down Expand Up @@ -275,4 +309,17 @@ public Command setPrintOutput(boolean printOutput) {
this.printOutput = printOutput;
return this;
}

public Process getProc() {
return proc;
}

public DataOutputStream getStdin() {
if (status == Status.RUNNING) {
return new DataOutputStream(proc.getOutputStream());
} else {
throw new IllegalStateException("Process is not running. Process status: " + status);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.opencb.commons.exec;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class NamedThreadFactory implements ThreadFactory {

private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String prefix;
private Integer priority;
private boolean daemon;

public NamedThreadFactory(String namePrefix) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
prefix = namePrefix + "-"
+ POOL_NUMBER.getAndIncrement()
+ "-thread-";
priority = Thread.NORM_PRIORITY;
daemon = false;
}

public NamedThreadFactory setPriority(Integer priority) {
this.priority = priority;
return this;
}

/**
* Marks this thread as either a {@linkplain #isDaemon daemon} thread
* or a user thread. The Java Virtual Machine exits when the only
* threads running are all daemon threads.
*
* @param daemon true to mark this thread as a daemon thread;
* @return this thread factory
*/
public NamedThreadFactory setDaemon(boolean daemon) {
this.daemon = daemon;
return this;
}

public boolean isDaemon() {
return daemon;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, prefix + threadNumber.getAndIncrement());

t.setDaemon(daemon);

if (priority != null) {
if (t.getPriority() != priority) {
t.setPriority(priority);
}
}
return t;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class RunnableProcess implements Runnable {
protected String exception;
protected int exitValue;

protected Status status;
protected Status status = Status.WAITING;

public enum Status {WAITING, RUNNING, DONE, ERROR, TIMEOUT, KILLED}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


@Deprecated
public class SingleProcess {

private Logger logger;
Expand Down
35 changes: 35 additions & 0 deletions commons-lib/src/test/java/org/opencb/commons/exec/CommandTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.opencb.commons.exec;

import org.junit.Test;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

public class CommandTest {


@Test
public void testCommand() throws Exception {
// Test the Command class
// String commandLine = "bash -c 'echo Hello World ; echo Error Message >&2 ; cat -'";
// String commandLine = "cat";
String commandLine = "bash -c 'cat <(echo -n \"Hello - \") - <(echo -n \" - END\")'";
Command command = new Command(commandLine);
command.setPrintOutput(false);

// Execute the command
Future<RunnableProcess.Status> future = command.async();
DataOutputStream stdin = command.getStdin();
stdin.write("World from STDIN".getBytes());
stdin.close();
future.get(1000, TimeUnit.MILLISECONDS);
assertEquals(RunnableProcess.Status.DONE, command.getStatus());
assertEquals("Hello - World from STDIN - END", command.getOutput());
assertEquals("", command.getError());
}

}
Loading