Skip to content

Commit

Permalink
Porting the current process launching approach to ClientCommandRunner
Browse files Browse the repository at this point in the history
This PR ports the current process launching approach to ClientCommandRunner
and use ClientCommandRunner in OcAction and OcWatch commands.

ClientCommandRunner uses piped streams (FastPipedOutputStream and FastPipedInputputStream) to transfer output (stdout/stderr) of the remote process
to Jenkins master, and leverages Apache Common IO's IOUtils.lineIterator to read the output line by line without reading fully to memory or disk files.

Like the current implementation of OcWatch, this PR's implementation also relaunches the oc watch command when it returns 0.
  • Loading branch information
vfreex authored and gabemontero committed Mar 13, 2019
1 parent 394ba97 commit ddac238
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 286 deletions.
134 changes: 48 additions & 86 deletions src/main/java/com/openshift/jenkins/plugins/pipeline/OcAction.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.openshift.jenkins.plugins.pipeline;

import com.openshift.jenkins.plugins.util.ClientCommandBuilder;
import com.openshift.jenkins.plugins.util.ClientCommandRunner;
import hudson.*;
import hudson.model.Computer;
import hudson.model.Executor;
import hudson.model.Run;
import hudson.model.TaskListener;
import hudson.util.QuotedStringTokenizer;

import org.jenkinsci.plugins.scriptsecurity.sandbox.whitelists.Whitelisted;
import org.jenkinsci.plugins.workflow.steps.AbstractStepDescriptorImpl;
import org.jenkinsci.plugins.workflow.steps.AbstractStepImpl;
Expand All @@ -16,9 +16,8 @@
import org.kohsuke.stapler.DataBoundConstructor;

import javax.inject.Inject;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand All @@ -37,9 +36,9 @@ public class OcAction extends AbstractStepImpl {

@DataBoundConstructor
public OcAction(String server, String project, boolean skipTLSVerify, String caPath,
String verb, List advArgs, List verbArgs, List userArgs, List options, String token,
String streamStdOutToConsolePrefix,
HashMap<String, String> reference, int logLevel) {
String verb, List advArgs, List verbArgs, List userArgs, List options, String token,
String streamStdOutToConsolePrefix,
HashMap<String, String> reference, int logLevel) {
this.cmdBuilder = new ClientCommandBuilder(server, project, skipTLSVerify, caPath,
verb, advArgs, verbArgs, userArgs, options, token, logLevel);
this.verbose = (logLevel > 0);
Expand Down Expand Up @@ -146,6 +145,10 @@ public static class Execution extends
private transient Computer computer;

private void printToConsole(String line) {
if (step.streamStdOutToConsolePrefix == null
|| step.streamStdOutToConsolePrefix.trim().isEmpty()) {
return;
}
final String prefix = "[" + step.streamStdOutToConsolePrefix + "] ";
listener.getLogger().println(prefix + line);
listener.getLogger().flush();
Expand All @@ -156,100 +159,59 @@ protected OcActionResult run() throws IOException, InterruptedException, Executi
if (filePath != null && !filePath.exists()) {
filePath.mkdirs();
}

if (step.streamStdOutToConsolePrefix != null
&& step.streamStdOutToConsolePrefix
.startsWith("start-build")) {
.startsWith("start-build")) {
listener.getLogger()
.println(
"NOTE: the selector returned when -F/--follow is supplied to startBuild() will be inoperative for the various selector operations.");
listener.getLogger()
.println(
"Consider removing those options from startBuild and using the logs() command to follow the build output.");
}

ByteArrayOutputStream stdErr = new ByteArrayOutputStream();
BufferedOutputStream errBuf = new BufferedOutputStream(stdErr);
ByteArrayOutputStream stdOut = new ByteArrayOutputStream();
BufferedOutputStream outBuf = new BufferedOutputStream(stdOut);

String commandString = step.cmdBuilder.asString(false);
String[] command = QuotedStringTokenizer.tokenize(commandString);
command = ClientCommandBuilder.fixPathInCommandArray(command, envVars, listener, filePath, launcher, step.verbose);

Proc proc = null;
final StringBuffer stdout = new StringBuffer();
final StringBuffer stderr = new StringBuffer();
ClientCommandRunner runner = new ClientCommandRunner(command, filePath, envVars,
line -> { // got a line from stdout
stdout.append(line).append('\n');
printToConsole(line);
return false; // don't interrupt `oc`
},
line -> { // got a line from stderr
stderr.append(line).append('\n');
printToConsole(line);
return false; // don't interrupt `oc`
});

int exitStatus = -1;
try {
Launcher.ProcStarter ps = launcher.launch().cmds(Arrays.asList(command)).envs(envVars).pwd(filePath).quiet(true).stdout(outBuf).stderr(errBuf);
proc = ps.start();
long reCheckSleep = 250;
int outputSize = 0;
while (proc.isAlive()) {
Thread.sleep(reCheckSleep);

if (step.streamStdOutToConsolePrefix != null &&
step.streamStdOutToConsolePrefix.trim().length() > 0) {
byte[] newOutput = stdOut.toByteArray();
if (newOutput.length > 0) {
if (newOutput.length > outputSize) {
printToConsole(new String(Arrays.copyOfRange(newOutput, outputSize, newOutput.length), StandardCharsets.UTF_8));
}
outputSize = newOutput.length;
// If we are streaming to console and getting output,
// keep sleep duration small.
reCheckSleep = 1000;
continue;
}
}

if (reCheckSleep < 10000) { // Gradually check less
// frequently for slow execution
// tasks
reCheckSleep *= 1.2f;
}
}
int rc = proc.join();
outBuf.flush();
errBuf.flush();

if (step.streamStdOutToConsolePrefix != null
&& step.streamStdOutToConsolePrefix.trim().length() > 0) {
byte[] newOutput = stdOut.toByteArray();
if (newOutput.length > outputSize) {
printToConsole(new String(Arrays.copyOfRange(newOutput, outputSize, newOutput.length), StandardCharsets.UTF_8));
}
listener.getLogger().println(); // final newline if output does not contain it.
}


OcActionResult result = new OcActionResult();
result.verb = step.cmdBuilder.verb;
result.cmd = step.cmdBuilder.asString(true);
result.reference = step.reference;
result.status = rc;
result.out = stdOut.toString("UTF-8").trim();
result.err = stdErr.toString("UTF-8").trim();

if (step.verbose) {
listener.getLogger().println("Verbose sub-step output:");
listener.getLogger().println("\tCommand> " + result.cmd);
listener.getLogger().println("\tStatus> " + result.status);
listener.getLogger().println("\tStdOut>" + result.out);
listener.getLogger().println("\tStdErr> " + result.err);
listener.getLogger().println(
"\tReference> " + result.reference);
}
return result;
} finally {
if (proc != null && proc.isAlive()) {
proc.kill();
}
stdOut.close();
outBuf.close();
stdErr.close();
errBuf.close();
exitStatus = runner.run(launcher);
} catch (Throwable ex) {
getContext().onFailure(ex);
}
OcActionResult result = new OcActionResult();
result.status = exitStatus;
result.verb = step.cmdBuilder.verb;
result.cmd = step.cmdBuilder.asString(true);
result.reference = step.reference;
result.out = stdout.toString();
result.err = stderr.toString();

if (step.verbose) {
listener.getLogger().println("Verbose sub-step output:");
listener.getLogger().println("\tCommand> " + result.cmd);
listener.getLogger().println("\tStatus> " + result.status);
listener.getLogger().println("\tStdOut>" + result.out);
listener.getLogger().println("\tStdErr> " + result.err);
listener.getLogger().println(
"\tReference> " + result.reference);
}
return result;
}

}

}

0 comments on commit ddac238

Please sign in to comment.