Skip to content

Commit

Permalink
Fix for streaming bodies that have empty content
Browse files Browse the repository at this point in the history
Allow retrieving objects that make up JSON array
Refactor process runner code to use a builder pattern
Allow specifying logging marker, whether to buffer output and process output listener
  • Loading branch information
rchodava committed Nov 5, 2016
1 parent bfbe666 commit 60930df
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
/**
* @author Israel Colomer (israelcolomer@gmail.com)
*/
public interface ProcessRunnerListener {
void notify(String message);
public interface ProcessOutputListener {
void output(String message, boolean errorStream);
}
139 changes: 85 additions & 54 deletions core/src/main/java/foundation/stack/datamill/ProcessRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;

import java.io.BufferedReader;
import java.io.File;
Expand Down Expand Up @@ -42,60 +43,12 @@ public Thread newThread(Runnable r) {
}
}));

public static void runProcess(File workingDirectory, String... command) throws IOException {
runProcess(workingDirectory, false, command);
public static ExecutionBuilder run(String... command) {
return new ExecutionBuilder(new ProcessBuilder().command(command));
}

public static void runProcess(File workingDirectory, boolean bufferOutput, String... command) throws IOException {
logger.debug("{}", Joiner.on(' ').join(command));

Process process = new ProcessBuilder().directory(workingDirectory).command(command).start();

try {
readLinesFromStream(process.getInputStream(), bufferOutput, null);
readLinesFromStream(process.getErrorStream(), bufferOutput, null);
} catch (InterruptedException e) {
throw new IOException(e);
}
}

public static ExecutionResult runProcessAndWait(File workingDirectory, String... command) throws IOException {
return runProcessAndWait(workingDirectory, false, command);
}

public static ExecutionResult runProcessAndWait(File workingDirectory, boolean bufferOutput, String... command)
throws IOException {
return runProcessAndWait(workingDirectory, bufferOutput, null, command);
}

public static ExecutionResult runProcessAndWait(File workingDirectory, boolean bufferOutput, ProcessRunnerListener listener, String... command)
throws IOException {
logger.debug("{}", Joiner.on(' ').join(command));

Process process = new ProcessBuilder().directory(workingDirectory).command(command).start();

try {
ListenableFuture<List<String>> standardOutputFuture, standardErrorFuture;

standardOutputFuture = readLinesFromStream(process.getInputStream(), bufferOutput, listener);
standardErrorFuture = readLinesFromStream(process.getErrorStream(), bufferOutput, listener);

int exitCode = process.waitFor();

if (bufferOutput) {
List<List<String>> results = Futures.allAsList(standardOutputFuture, standardErrorFuture).get(1, TimeUnit.SECONDS);
if (exitCode != 0 && results.size() == 2) {
logger.debug("Exit code was {} with standard error: {}", exitCode, results.get(1));
}
return new ExecutionResult(exitCode, results.size() > 0 ? results.get(0) : null, results.size() > 1 ? results.get(1) : null);
}
return new ExecutionResult(exitCode);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IOException(e);
}
}

private static ListenableFuture<List<String>> readLinesFromStream(InputStream inputStream, boolean bufferOutput, ProcessRunnerListener listener)
private static ListenableFuture<List<String>> readLinesFromStream(
InputStream inputStream, boolean errorStream, Marker logMarker, boolean bufferOutput, ProcessOutputListener listener)
throws InterruptedException {
BufferedReader processOutput = new BufferedReader(new InputStreamReader(inputStream));
List<String> output = new CopyOnWriteArrayList<>();
Expand All @@ -108,10 +61,12 @@ private static ListenableFuture<List<String>> readLinesFromStream(InputStream in
if (bufferOutput) {
output.add(line);
}

if (listener != null) {
listener.notify(line);
listener.output(line, errorStream);
}
logger.debug(line);

logger.debug(logMarker, line);
}
} while (line != null && !Thread.interrupted());
} catch (IOException e) {
Expand All @@ -120,6 +75,82 @@ private static ListenableFuture<List<String>> readLinesFromStream(InputStream in
});
}

public static class ExecutionBuilder {
private final ProcessBuilder builder;
private boolean bufferOutput;
private ProcessOutputListener outputListener;
private Marker logMarker;

public ExecutionBuilder(ProcessBuilder builder) {
this.builder = builder;
}

public ExecutionBuilder workingDirectory(File workingDirectory) {
builder.directory(workingDirectory);
return this;
}

public ExecutionBuilder bufferOutput(boolean bufferOutput) {
this.bufferOutput = bufferOutput;
return this;
}

public ExecutionBuilder logMarker(Marker logMarker) {
this.logMarker = logMarker;
return this;
}

public ExecutionBuilder outputListener(ProcessOutputListener outputListener) {
this.outputListener = outputListener;
return this;
}

public ExecutionResult runAndWait() throws IOException {
logger.debug(logMarker, "{}", Joiner.on(' ').join(builder.command()));

Process process = builder.start();

try {
ListenableFuture<List<String>> standardOutputFuture, standardErrorFuture;

standardOutputFuture = readLinesFromStream(process.getInputStream(), false, logMarker, bufferOutput, outputListener);
standardErrorFuture = readLinesFromStream(process.getErrorStream(), true, logMarker, bufferOutput, outputListener);

int exitCode = process.waitFor();

if (bufferOutput) {
List<List<String>> results = Futures.allAsList(standardOutputFuture, standardErrorFuture)
.get(1, TimeUnit.SECONDS);
if (exitCode != 0 && results.size() == 2) {
logger.debug(logMarker, "Exit code was {} with standard error: {}", exitCode, results.get(1));
}

return new ExecutionResult(
exitCode,
results.size() > 0 ? results.get(0) : null,
results.size() > 1 ? results.get(1) : null);
}

return new ExecutionResult(exitCode);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IOException(e);
}
}

public void run() throws IOException {
logger.debug(logMarker, "{}", Joiner.on(' ').join(builder.command()));

Process process = builder.start();

try {
readLinesFromStream(process.getInputStream(), false, logMarker, bufferOutput, outputListener);
readLinesFromStream(process.getErrorStream(), true, logMarker, bufferOutput, outputListener);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}

public static class ExecutionResult {
private final int exitCode;
private final List<String> bufferedStandardOutput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,14 @@ private void sendResponse(ChannelHandlerContext context, HttpRequest originalReq
sendContent(context, buffer);
}
})
.finallyDo(() -> {
sendResponseEnd(context, originalRequest);
.doAfterTerminate(() -> {
if (first[0]) {
sendFullResponse(context, originalRequest,
serverResponse.status().getCode(),
serverResponse.headers());
} else {
sendResponseEnd(context, originalRequest);
}
}).subscribe();
}
});
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/foundation/stack/datamill/json/JsonArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@ public Object asObject(Class<?> type) {
return this;
}

public List<JsonObject> asJsonObjects() {
int length = array.length();
ArrayList<JsonObject> objects = new ArrayList<>(length);

for (int i = 0; i < length; i++) {
JSONObject object = array.getJSONObject(i);
objects.add(new JsonObject(object));
}

return objects;
}

@Override
public boolean isBoolean() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected Object defaultCase(JSONObject value1, String value2, JsonProperty valu

final JSONObject object;

private JsonObject(JSONObject object) {
JsonObject(JSONObject object) {
this.object = object;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class ProcessRunnerTest {
@Test
public void runProcessAndWait_ReturnsExpectedResults_OnSuccessfulCommandExecution() throws IOException {
String[] command = new String[] {"java", "-version"};
ProcessRunner.ExecutionResult executionResult = ProcessRunner.runProcessAndWait(null, true, command);
ProcessRunner.ExecutionResult executionResult = ProcessRunner.run(command).bufferOutput(true).runAndWait();
assertEquals(executionResult.getExitCode(), 0);
// It is counter intuitive, but jdk uses standard error instead of standard output for showing version
assertEquals(3, executionResult.getBufferedStandardError().size());
Expand All @@ -31,7 +31,7 @@ public void runProcessAndWait_ReturnsExpectedResults_OnSuccessfulCommandExecutio
@Test
public void runProcessAndWait_NoOutput_OnSuccessfulCommandExecution() throws IOException {
String[] command = new String[] {"java", "-version"};
ProcessRunner.ExecutionResult executionResult = ProcessRunner.runProcessAndWait(null, false, command);
ProcessRunner.ExecutionResult executionResult = ProcessRunner.run(command).runAndWait();
assertEquals(executionResult.getExitCode(), 0);
assertNull(executionResult.getBufferedStandardError());
assertNull(executionResult.getBufferedStandardOutput());
Expand All @@ -41,7 +41,8 @@ public void runProcessAndWait_NoOutput_OnSuccessfulCommandExecution() throws IOE
public void runProcessAndWait_ReturnsExpectedResults_OnFailingCommandExecution() throws IOException {
Path tmpDir = Paths.get(System.getProperty("java.io.tmpdir"));
String[] command = new String[] {"git", "status"};
ProcessRunner.ExecutionResult executionResult = ProcessRunner.runProcessAndWait(tmpDir.toFile(), true, command);
ProcessRunner.ExecutionResult executionResult = ProcessRunner.run(command)
.bufferOutput(true).workingDirectory(tmpDir.toFile()).runAndWait();
assertTrue(executionResult.getExitCode() != 0);
assertTrue(executionResult.getBufferedStandardOutput().isEmpty());
assertEquals("fatal: Not a git repository (or any of the parent directories): .git", executionResult.getBufferedStandardError().get(0));
Expand All @@ -50,12 +51,12 @@ public void runProcessAndWait_ReturnsExpectedResults_OnFailingCommandExecution()
@Test
public void runProcess_PerformsAsExpected_OnSuccessfulCommandExecution() throws IOException {
Path tempDir = Paths.get(System.getProperty("java.io.tmpdir"), File.separator, "test");
ProcessRunner.runProcess(null, "mkdir", tempDir.toString());
ProcessRunner.run("mkdir", tempDir.toString()).run();

boolean runningOnWindows = runningOnWindows();
String[] command = runningOnWindows ? new String[] {"dir", tempDir.toString()} : new String[] {"ls", "-la", tempDir.toString()};

ProcessRunner.ExecutionResult executionResult = ProcessRunner.runProcessAndWait(null, true, command);
ProcessRunner.ExecutionResult executionResult = ProcessRunner.run(command).bufferOutput(true).runAndWait();
assertEquals(executionResult.getExitCode(), 0);
assertTrue(executionResult.getBufferedStandardError().isEmpty());
if (runningOnWindows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public void executeCommandFromRelativeLocation(String command, String relativePa
try {
File temporaryDirectory = getOrCreateTemporaryDirectory();
File workingDirectory = resolvedRelativePath != null ? new File(temporaryDirectory, resolvedRelativePath) : temporaryDirectory;
ProcessRunner.ExecutionResult executionResult = ProcessRunner.runProcessAndWait(workingDirectory, resolvedCommand.split(" "));
ProcessRunner.ExecutionResult executionResult = ProcessRunner.run(resolvedCommand.split(" "))
.workingDirectory(workingDirectory).runAndWait();
if (executionResult.getExitCode() != 0) {
logger.error("Error while executing {} with message {}", resolvedCommand, executionResult.getBufferedStandardError());
fail("Received result code " + executionResult.getExitCode() + " after executing " + resolvedCommand);
Expand Down

0 comments on commit 60930df

Please sign in to comment.