Skip to content

Commit

Permalink
Merge pull request #96 from rchodava/ic/process-runner-listener
Browse files Browse the repository at this point in the history
Add process listener to be able to notify without having to wait for …
  • Loading branch information
rchodava committed Nov 5, 2016
2 parents e1e1c46 + ddaadc1 commit ba1529c
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package foundation.stack.datamill;

/**
* @author Israel Colomer (israelcolomer@gmail.com)
*/
public interface ProcessOutputListener {
void output(String message, boolean errorStream);
}
131 changes: 86 additions & 45 deletions core/src/main/java/foundation/stack/datamill/ProcessRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;

import java.io.BufferedReader;
import java.io.File;
Expand Down Expand Up @@ -42,52 +43,12 @@ public Thread newThread(Runnable r) {
}
}));

public static void runProcess(File workingDirectory, String... command) throws IOException {
runProcess(workingDirectory, false, command);
public static ExecutionBuilder run(String... command) {
return new ExecutionBuilder(new ProcessBuilder().command(command));
}

public static void runProcess(File workingDirectory, boolean bufferOutput, String... command) throws IOException {
logger.debug("{}", Joiner.on(' ').join(command));

Process process = new ProcessBuilder().directory(workingDirectory).command(command).start();

try {
readLinesFromStream(process.getInputStream(), bufferOutput);
readLinesFromStream(process.getErrorStream(), bufferOutput);
} catch (InterruptedException e) {
throw new IOException(e);
}
}

public static ExecutionResult runProcessAndWait(File workingDirectory, String... command) throws IOException {
return runProcessAndWait(workingDirectory, false, command);
}

public static ExecutionResult runProcessAndWait(File workingDirectory, boolean bufferOutput, String... command)
throws IOException {
logger.debug("{}", Joiner.on(' ').join(command));

Process process = new ProcessBuilder().directory(workingDirectory).command(command).start();

try {
ListenableFuture<List<String>> standardOutputFuture, standardErrorFuture;

standardOutputFuture = readLinesFromStream(process.getInputStream(), bufferOutput);
standardErrorFuture = readLinesFromStream(process.getErrorStream(), bufferOutput);

int exitCode = process.waitFor();

if (bufferOutput) {
List<List<String>> results = Futures.allAsList(standardOutputFuture, standardErrorFuture).get(1, TimeUnit.SECONDS);
return new ExecutionResult(exitCode, results.size() > 0 ? results.get(0) : null, results.size() > 1 ? results.get(1) : null);
}
return new ExecutionResult(exitCode);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IOException(e);
}
}

private static ListenableFuture<List<String>> readLinesFromStream(InputStream inputStream, boolean bufferOutput)
private static ListenableFuture<List<String>> readLinesFromStream(
InputStream inputStream, boolean errorStream, Marker logMarker, boolean bufferOutput, ProcessOutputListener listener)
throws InterruptedException {
BufferedReader processOutput = new BufferedReader(new InputStreamReader(inputStream));
List<String> output = new CopyOnWriteArrayList<>();
Expand All @@ -101,7 +62,11 @@ private static ListenableFuture<List<String>> readLinesFromStream(InputStream in
output.add(line);
}

logger.debug(line);
if (listener != null) {
listener.output(line, errorStream);
}

logger.debug(logMarker, line);
}
} while (line != null && !Thread.interrupted());
} catch (IOException e) {
Expand All @@ -110,6 +75,82 @@ private static ListenableFuture<List<String>> readLinesFromStream(InputStream in
});
}

public static class ExecutionBuilder {
private final ProcessBuilder builder;
private boolean bufferOutput;
private ProcessOutputListener outputListener;
private Marker logMarker;

public ExecutionBuilder(ProcessBuilder builder) {
this.builder = builder;
}

public ExecutionBuilder workingDirectory(File workingDirectory) {
builder.directory(workingDirectory);
return this;
}

public ExecutionBuilder bufferOutput(boolean bufferOutput) {
this.bufferOutput = bufferOutput;
return this;
}

public ExecutionBuilder logMarker(Marker logMarker) {
this.logMarker = logMarker;
return this;
}

public ExecutionBuilder outputListener(ProcessOutputListener outputListener) {
this.outputListener = outputListener;
return this;
}

public ExecutionResult runAndWait() throws IOException {
logger.debug(logMarker, "{}", Joiner.on(' ').join(builder.command()));

Process process = builder.start();

try {
ListenableFuture<List<String>> standardOutputFuture, standardErrorFuture;

standardOutputFuture = readLinesFromStream(process.getInputStream(), false, logMarker, bufferOutput, outputListener);
standardErrorFuture = readLinesFromStream(process.getErrorStream(), true, logMarker, bufferOutput, outputListener);

int exitCode = process.waitFor();

if (bufferOutput) {
List<List<String>> results = Futures.allAsList(standardOutputFuture, standardErrorFuture)
.get(1, TimeUnit.SECONDS);
if (exitCode != 0 && results.size() == 2) {
logger.debug(logMarker, "Exit code was {} with standard error: {}", exitCode, results.get(1));
}

return new ExecutionResult(
exitCode,
results.size() > 0 ? results.get(0) : null,
results.size() > 1 ? results.get(1) : null);
}

return new ExecutionResult(exitCode);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IOException(e);
}
}

public void run() throws IOException {
logger.debug(logMarker, "{}", Joiner.on(' ').join(builder.command()));

Process process = builder.start();

try {
readLinesFromStream(process.getInputStream(), false, logMarker, bufferOutput, outputListener);
readLinesFromStream(process.getErrorStream(), true, logMarker, bufferOutput, outputListener);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}

public static class ExecutionResult {
private final int exitCode;
private final List<String> bufferedStandardOutput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,14 @@ private void sendResponse(ChannelHandlerContext context, HttpRequest originalReq
sendContent(context, buffer);
}
})
.finallyDo(() -> {
sendResponseEnd(context, originalRequest);
.doAfterTerminate(() -> {
if (first[0]) {
sendFullResponse(context, originalRequest,
serverResponse.status().getCode(),
serverResponse.headers());
} else {
sendResponseEnd(context, originalRequest);
}
}).subscribe();
}
});
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/foundation/stack/datamill/json/JsonArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@ public Object asObject(Class<?> type) {
return this;
}

public List<JsonObject> asJsonObjects() {
int length = array.length();
ArrayList<JsonObject> objects = new ArrayList<>(length);

for (int i = 0; i < length; i++) {
JSONObject object = array.getJSONObject(i);
objects.add(new JsonObject(object));
}

return objects;
}

@Override
public boolean isBoolean() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected Object defaultCase(JSONObject value1, String value2, JsonProperty valu

final JSONObject object;

private JsonObject(JSONObject object) {
JsonObject(JSONObject object) {
this.object = object;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class ProcessRunnerTest {
@Test
public void runProcessAndWait_ReturnsExpectedResults_OnSuccessfulCommandExecution() throws IOException {
String[] command = new String[] {"java", "-version"};
ProcessRunner.ExecutionResult executionResult = ProcessRunner.runProcessAndWait(null, true, command);
ProcessRunner.ExecutionResult executionResult = ProcessRunner.run(command).bufferOutput(true).runAndWait();
assertEquals(executionResult.getExitCode(), 0);
// It is counter intuitive, but jdk uses standard error instead of standard output for showing version
assertEquals(3, executionResult.getBufferedStandardError().size());
Expand All @@ -31,7 +31,7 @@ public void runProcessAndWait_ReturnsExpectedResults_OnSuccessfulCommandExecutio
@Test
public void runProcessAndWait_NoOutput_OnSuccessfulCommandExecution() throws IOException {
String[] command = new String[] {"java", "-version"};
ProcessRunner.ExecutionResult executionResult = ProcessRunner.runProcessAndWait(null, false, command);
ProcessRunner.ExecutionResult executionResult = ProcessRunner.run(command).runAndWait();
assertEquals(executionResult.getExitCode(), 0);
assertNull(executionResult.getBufferedStandardError());
assertNull(executionResult.getBufferedStandardOutput());
Expand All @@ -41,7 +41,8 @@ public void runProcessAndWait_NoOutput_OnSuccessfulCommandExecution() throws IOE
public void runProcessAndWait_ReturnsExpectedResults_OnFailingCommandExecution() throws IOException {
Path tmpDir = Paths.get(System.getProperty("java.io.tmpdir"));
String[] command = new String[] {"git", "status"};
ProcessRunner.ExecutionResult executionResult = ProcessRunner.runProcessAndWait(tmpDir.toFile(), true, command);
ProcessRunner.ExecutionResult executionResult = ProcessRunner.run(command)
.bufferOutput(true).workingDirectory(tmpDir.toFile()).runAndWait();
assertTrue(executionResult.getExitCode() != 0);
assertTrue(executionResult.getBufferedStandardOutput().isEmpty());
assertEquals("fatal: Not a git repository (or any of the parent directories): .git", executionResult.getBufferedStandardError().get(0));
Expand All @@ -50,12 +51,12 @@ public void runProcessAndWait_ReturnsExpectedResults_OnFailingCommandExecution()
@Test
public void runProcess_PerformsAsExpected_OnSuccessfulCommandExecution() throws IOException {
Path tempDir = Paths.get(System.getProperty("java.io.tmpdir"), File.separator, "test");
ProcessRunner.runProcess(null, "mkdir", tempDir.toString());
ProcessRunner.run("mkdir", tempDir.toString()).run();

boolean runningOnWindows = runningOnWindows();
String[] command = runningOnWindows ? new String[] {"dir", tempDir.toString()} : new String[] {"ls", "-la", tempDir.toString()};

ProcessRunner.ExecutionResult executionResult = ProcessRunner.runProcessAndWait(null, true, command);
ProcessRunner.ExecutionResult executionResult = ProcessRunner.run(command).bufferOutput(true).runAndWait();
assertEquals(executionResult.getExitCode(), 0);
assertTrue(executionResult.getBufferedStandardError().isEmpty());
if (runningOnWindows) {
Expand All @@ -66,6 +67,16 @@ public void runProcess_PerformsAsExpected_OnSuccessfulCommandExecution() throws
}
}

@Test
public void runProcess_NotifiesListenerOfOutput() throws IOException {
String[] command = new String[] {"java", "-version"};
StringBuilder output = new StringBuilder();
ProcessOutputListener listener = (message, error) -> output.append(message);
ProcessRunner.ExecutionResult executionResult = ProcessRunner.run(command).outputListener(listener).runAndWait();
assertEquals(executionResult.getExitCode(), 0);
assertTrue(output.toString().startsWith("java version "));
}

private static boolean runningOnWindows() {
return System.getProperty("os.name", "generic").toLowerCase(Locale.ENGLISH).contains("win");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void singleChunkResponseSent() throws Exception {
}

@Test
public void multipeResponseChunksSent() throws Exception {
public void multipleResponseChunksSent() throws Exception {
ExecutorService service = Executors.newSingleThreadExecutor();
ClientToServerChannelHandler handler = new ClientToServerChannelHandler(service, route, null);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package foundation.stack.datamill.json;

import org.junit.Test;

import java.util.List;

import static org.junit.Assert.assertEquals;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class JsonArrayTest {
@Test
public void asObjectsTest() throws Exception {
List<JsonObject> objects = new JsonArray("[{\"key\": \"value\"}, {\"key\":\"value\"}]").asJsonObjects();
assertEquals(2, objects.size());
assertEquals("value", objects.get(0).get("key").asString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public void executeCommandFromRelativeLocation(String command, String relativePa
try {
File temporaryDirectory = getOrCreateTemporaryDirectory();
File workingDirectory = resolvedRelativePath != null ? new File(temporaryDirectory, resolvedRelativePath) : temporaryDirectory;
ProcessRunner.ExecutionResult executionResult = ProcessRunner.runProcessAndWait(workingDirectory, resolvedCommand.split(" "));
ProcessRunner.ExecutionResult executionResult = ProcessRunner.run(resolvedCommand.split(" "))
.workingDirectory(workingDirectory).runAndWait();
if (executionResult.getExitCode() != 0) {
logger.error("Error while executing {} with message {}", resolvedCommand, executionResult.getBufferedStandardError());
fail("Received result code " + executionResult.getExitCode() + " after executing " + resolvedCommand);
Expand Down

0 comments on commit ba1529c

Please sign in to comment.