Skip to content
Permalink
Browse files

[JENKINS-25623]

previously, CpsThreadGroup.run() was greedy. It was running as much as
it can before it returns. This means if the Pipeline script in question
has an infinite loop, this method never returns. This prevents other
activities to take place on CPS VM thread, most notably the attempt to
kill the execution.

In this change, CpsThreadGroup.run() is modified to execute just a
little bit, not as much as it can, by using the new safepoint capability
in groovy-cps. CpsThreadGroup.scheduleRun() is modified so that if the
Pipeline Script is still runnable, the next chunk of execution is
scheduled.

Future implementation from scheduleRun() was simplified a bit. This code
is really only used for testing, so it's not that important that the
cancel() and other methods work correctly.
  • Loading branch information...
kohsuke committed Aug 19, 2016
1 parent d253c81 commit 95e73725ec0479685916ace0e1b2d80801519e43
@@ -172,7 +172,7 @@
<dependency>
<groupId>com.cloudbees</groupId>
<artifactId>groovy-cps</artifactId>
<version>1.9</version>
<version>1.10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.jenkins-ci.ui</groupId>
@@ -77,7 +77,9 @@ public CpsGroovyShellFactory withParent(ClassLoader parent) {

private CpsTransformer makeCpsTransformer() {
CpsTransformer t = sandbox ? new SandboxCpsTransformer() : new CpsTransformer();
t.setConfiguration(new TransformerConfiguration().withClosureType(CpsClosure2.class));
t.setConfiguration(new TransformerConfiguration()
.withClosureType(CpsClosure2.class)
.withSafepoint(Safepoint.class,"safepoint"));
return t;
}

@@ -151,62 +151,59 @@ public StepExecution getStep() {
@Nonnull Outcome runNextChunk() throws IOException {
assert program!=null;

while (true) {
Outcome outcome;

final CpsThread old = CURRENT.get();
CURRENT.set(this);

try {
LOGGER.log(FINE, "runNextChunk on {0}", resumeValue);
Outcome o = resumeValue;
resumeValue = null;
outcome = program.run0(o);
if (outcome.getAbnormal() != null) {
LOGGER.log(FINE, "ran and produced error", outcome.getAbnormal());
} else {
LOGGER.log(FINE, "ran and produced {0}", outcome);
}
} finally {
CURRENT.set(old);
Outcome outcome;

final CpsThread old = CURRENT.get();
CURRENT.set(this);

try {
LOGGER.log(FINE, "runNextChunk on {0}", resumeValue);
Outcome o = resumeValue;
resumeValue = null;
outcome = program.run0(o);
if (outcome.getAbnormal() != null) {
LOGGER.log(FINE, "ran and produced error", outcome.getAbnormal());
} else {
LOGGER.log(FINE, "ran and produced {0}", outcome);
}
} finally {
CURRENT.set(old);
}

if (outcome.getNormal() instanceof ThreadTask) {
// if an execution in the thread safepoint is requested, deliver that
ThreadTask sc = (ThreadTask) outcome.getNormal();
ThreadTaskResult r = sc.eval(this);
if (r.resume!=null) {
// keep evaluating the CPS code
resumeValue = r.resume;
continue;
} else {
// break but with a different value
outcome = r.suspend;
}
if (outcome.getNormal() instanceof ThreadTask) {
// if an execution in the thread safepoint is requested, deliver that
ThreadTask sc = (ThreadTask) outcome.getNormal();
ThreadTaskResult r = sc.eval(this);
if (r.resume!=null) {
// yield, then keep evaluating the CPS code
resumeValue = r.resume;
} else {
// break but with a different value
outcome = r.suspend;
}
}


if (promise!=null) {
if (outcome.isSuccess()) promise.set(outcome.getNormal());
else {
try {
promise.setException(outcome.getAbnormal());
} catch (Error e) {
if (e==outcome.getAbnormal()) {
// SettableFuture tries to rethrow an Error, which we don't want.
// so prevent that from happening. I need to see if this behaviour
// affects other places that use SettableFuture
;
} else {
throw e;
}
if (promise!=null) {
if (outcome.isSuccess()) promise.set(outcome.getNormal());
else {
try {
promise.setException(outcome.getAbnormal());
} catch (Error e) {
if (e==outcome.getAbnormal()) {
// SettableFuture tries to rethrow an Error, which we don't want.
// so prevent that from happening. I need to see if this behaviour
// affects other places that use SettableFuture
;
} else {
throw e;
}
}
promise = null;
}

return outcome;
promise = null;
}

return outcome;
}

/**
@@ -27,6 +27,8 @@
import com.cloudbees.groovy.cps.Continuable;
import com.cloudbees.groovy.cps.Outcome;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import groovy.lang.Closure;
import groovy.lang.GroovyShell;
import groovy.lang.Script;
@@ -221,96 +223,51 @@ public void unexport(BodyReference ref) {
* {@link Future} object that represents when the CPS VM is executed.
*/
public Future<?> scheduleRun() {
final Future<Future<?>> f;
final SettableFuture<Void> f = SettableFuture.create();
try {
f = runner.submit(new Callable<Future<?>>() {
public Future<?> call() throws Exception {
Jenkins j = Jenkins.getInstance();
if (paused.get() || j == null || j.isQuietingDown()) {
// by doing the pause check inside, we make sure that scheduleRun() returns a
// future that waits for any previously scheduled tasks to be completed.
saveProgram();
return Futures.immediateFuture(null);
}
runner.submit(new Callable<Void>() {
public Void call() throws Exception {
Jenkins j = Jenkins.getInstance();
if (paused.get() || j == null || j.isQuietingDown()) {
// by doing the pause check inside, we make sure that scheduleRun() returns a
// future that waits for any previously scheduled tasks to be completed.
saveProgram();
f.set(null);
return null;
}

run();
// we ensure any tasks submitted during run() will complete before we declare us complete
// those include things like notifying listeners or updating various other states
// runner is a single-threaded queue, so running a no-op and waiting for its completion
// ensures that everything submitted in front of us has finished.
try {
return runner.submit(new Runnable() {
@Override public void run() {
if (threads.isEmpty()) {
runner.shutdown();
}
boolean stillRunnable = run();
try {
if (stillRunnable) {
// we can run more.
runner.submit(this);
} else {
// we ensure any tasks submitted during run() will complete before we declare us complete
// those include things like notifying listeners or updating various other states
// runner is a single-threaded queue, so running a no-op and waiting for its completion
// ensures that everything submitted in front of us has finished.
runner.submit(new Runnable() {
public void run() {
if (threads.isEmpty()) {
runner.shutdown();
}
// the original promise of scheduleRun() is now complete
f.set(null);
}
});
}
});
} catch (RejectedExecutionException x) {
// Was shut down by a prior task?
return Futures.immediateFuture(null);
} catch (RejectedExecutionException x) {
// Was shut down by a prior task?
f.setException(x);
}
return null;
}
}
});
});
} catch (RejectedExecutionException x) {
return Futures.immediateFuture(null);
}

// unfortunately that means we have to wait for Future of Future,
// so we need a rather unusual implementation of Future to hide that behind the scene.
return new Future<Object>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (!f.isDone())
return f.cancel(mayInterruptIfRunning);

try {
return f.get().cancel(mayInterruptIfRunning);
} catch (InterruptedException e) {
throw new AssertionError(e);
} catch (ExecutionException e) {
return false;
}
}

@Override
public boolean isCancelled() {
if (f.isCancelled()) return true;
if (!f.isDone()) return false;

try {
return f.get().isCancelled();
} catch (InterruptedException e) {
throw new AssertionError(e);
} catch (ExecutionException e) {
return false;
}
}

@Override
public boolean isDone() {
if (!f.isDone()) return false;

try {
return f.get().isDone();
} catch (InterruptedException e) {
throw new AssertionError(e);
} catch (ExecutionException e) {
return false;
}
}

@Override
public Object get() throws InterruptedException, ExecutionException {
return f.get().get();
}

@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// FIXME: this ends up waiting up to 2x
return f.get(timeout,unit).get(timeout,unit);
}
};
return f;
}

/**
@@ -328,7 +285,7 @@ public Object get(long timeout, TimeUnit unit) throws InterruptedException, Exec
}

/**
* If the execution is {@link isPaused}, cancel the pause state.
* If the execution is {@link #isPaused()}, cancel the pause state.
*/
public void unpause() {
if (paused.getAndSet(false)) {
@@ -347,56 +304,66 @@ public boolean isPaused() {
}

/**
* Run all runnable threads as much as possible.
* Run the CPS program a little bit.
*
* <p>
* The amount of execution needs to be small enough so as not to hog CPS VM thread
* In particular, time sensitive activities like the interruption wants to run on CPS VM thread.
*
* @return
* true if this program can still execute further. false if the program is suspended
* and requires some external event to become resumable again. The false return value
* is akin to a Unix thread waiting for I/O completion.
*/
@CpsVmThreadOnly("root")
private void run() throws IOException {
boolean doneSomeWork = false;
boolean changed; // used to see if we need to loop over
private boolean run() throws IOException {
boolean changed = false;
boolean ending = false;
do {
changed = false;
for (CpsThread t : threads.values().toArray(new CpsThread[threads.size()])) {
if (t.isRunnable()) {
Outcome o = t.runNextChunk();
if (o.isFailure()) {
assert !t.isAlive(); // failed thread is non-resumable

// workflow produced an exception
Result result = Result.FAILURE;
Throwable error = o.getAbnormal();
if (error instanceof FlowInterruptedException) {
result = ((FlowInterruptedException) error).getResult();
}
execution.setResult(result);
t.head.get().addAction(new ErrorAction(error));
boolean stillRunnable = false;

// TODO: maybe instead of running all the thread, run just one thread in round robin
for (CpsThread t : threads.values().toArray(new CpsThread[threads.size()])) {
if (t.isRunnable()) {
Outcome o = t.runNextChunk();
if (o.isFailure()) {
assert !t.isAlive(); // failed thread is non-resumable

// workflow produced an exception
Result result = Result.FAILURE;
Throwable error = o.getAbnormal();
if (error instanceof FlowInterruptedException) {
result = ((FlowInterruptedException) error).getResult();
}
execution.setResult(result);
t.head.get().addAction(new ErrorAction(error));
}

if (!t.isAlive()) {
LOGGER.fine("completed " + t);
t.fireCompletionHandlers(o); // do this after ErrorAction is set above
if (!t.isAlive()) {
LOGGER.fine("completed " + t);
t.fireCompletionHandlers(o); // do this after ErrorAction is set above

threads.remove(t.id);
if (threads.isEmpty()) {
execution.onProgramEnd(o);
ending = true;
}
threads.remove(t.id);
if (threads.isEmpty()) {
execution.onProgramEnd(o);
ending = true;
}

changed = true;
} else {
stillRunnable |= t.isRunnable();
}
}

doneSomeWork |= changed;
} while (changed);
changed = true;
}
}

if (doneSomeWork) {
if (changed) {
saveProgram();
}
if (ending) {
execution.cleanUpHeap();
scripts.clear();
}

return stillRunnable;
}

private transient List<FlowNode> nodesToNotify;

0 comments on commit 95e7372

Please sign in to comment.
You can’t perform that action at this time.