Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
[FIXED JENKINS-30055] The listener which closes flow graph log files …
…must receive events immediately, and unregister itself.
  • Loading branch information
jglick committed Sep 25, 2015
1 parent f50138a commit 1084a9bd2495989bb78e013c08c8348542e7955f
@@ -3,6 +3,7 @@
Only noting significant user changes, not internal code cleanups and minor bug fixes.

## 1.11 (not yet released)
* [JENKINS-30055](https://issues.jenkins-ci.org/browse/JENKINS-30055): poor performance and file handle leaks when running a script with an enormous number of steps in quick succession
* [JENKINS-30346](https://issues.jenkins-ci.org/browse/JENKINS-30346): added a cross platform `deleteDir` step to recursively delete a directory and its contents.

## 1.10 (Aug 31 2015)
@@ -27,7 +27,6 @@
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.Rule;
import org.jvnet.hudson.test.Issue;
@@ -37,7 +36,6 @@

@Rule public JenkinsRule r = new JenkinsRule();

@Ignore("TODO fails w/ various errors incl. IOException: Too many open files")
@Issue("JENKINS-30055")
@Test public void manySteps() throws Exception {
WorkflowJob p = r.jenkins.createProject(WorkflowJob.class, "demo");
@@ -158,7 +158,25 @@ public String getUrl() throws IOException {
*/
public abstract void interrupt(Result r, CauseOfInterruption... causes) throws IOException, InterruptedException;

public abstract void addListener(GraphListener listener);
@Deprecated
public void addListener(GraphListener listener) {
addListener(listener, false);
}

/**
* Add a listener to changes in the flow graph structure.
* @param listener a listener to add
* @param synchronous true to receive events immediately as they occur (you must be very careful not to acquire locks or block); false to receive notification as soon as possible
*/
public /*abstract*/ void addListener(GraphListener listener, boolean synchronous) {
if (Util.isOverridden(FlowExecution.class, getClass(), "addListener", GraphListener.class) && !synchronous) {
addListener(listener);
} else {
throw new AbstractMethodError("you must override the new overload of addListener");
}
}

public /*abstract*/ void removeListener(GraphListener listener) {}

/**
* Checks whether this flow execution has finished executing completely.
@@ -237,7 +237,7 @@

private final AtomicInteger iota = new AtomicInteger();

private transient List<GraphListener> listeners;
private transient List<GraphListener> listeners, synchronousListeners;

/**
* Result set from {@link StepContext}. Start by success and progressively gets worse.
@@ -681,11 +681,27 @@ void subsumeHead(FlowNode n) {


@Override
public void addListener(GraphListener listener) {
if (listeners == null) {
listeners = new CopyOnWriteArrayList<GraphListener>();
public void addListener(GraphListener listener, boolean synchronous) {
if (synchronous) {
if (synchronousListeners == null) {
synchronousListeners = new CopyOnWriteArrayList<GraphListener>();
}
synchronousListeners.add(listener);
} else {
if (listeners == null) {
listeners = new CopyOnWriteArrayList<GraphListener>();
}
listeners.add(listener);
}
}

@Override public void removeListener(GraphListener listener) {
if (listeners != null) {
listeners.remove(listener);
}
if (synchronousListeners != null) {
synchronousListeners.remove(listener);
}
listeners.add(listener);
}

@Override
@@ -767,9 +783,10 @@ synchronized FlowHead getFirstHead() {
return heads.firstEntry().getValue();
}

void notifyListeners(FlowNode node) {
if (listeners != null) {
for (GraphListener listener : listeners) {
void notifyListeners(FlowNode node, boolean synchronous) {
List<GraphListener> _listeners = synchronous ? synchronousListeners : listeners;
if (_listeners != null) {
for (GraphListener listener : _listeners) {
listener.onNewHead(node);
}
}
@@ -315,9 +315,10 @@ private void run() throws IOException {
@CpsVmThreadOnly
/*package*/ void notifyNewHead(final FlowNode head) {
assertVmThread();
execution.notifyListeners(head, true);
runner.execute(new Runnable() {
public void run() {
execution.notifyListeners(head);
execution.notifyListeners(head, false);
}
});
}
@@ -119,7 +119,9 @@ void setNewHead(FlowNode v) {
} else {
// in recovering from error and such situation, we sometimes need to grow the graph
// without running the program.
execution.notifyListeners(v);
// TODO can CpsThreadGroup.notifyNewHead be used instead?
execution.notifyListeners(v, true);
execution.notifyListeners(v, false);
}
} catch (IOException e) {
LOGGER.log(Level.FINE, "Failed to record new head: " + v, e);
@@ -182,7 +182,7 @@ public WorkflowRun(WorkflowJob job, File dir) throws IOException {

FlowExecution newExecution = definition.create(owner, listener, getAllActions());
FlowExecutionList.get().register(owner);
newExecution.addListener(new GraphL());
newExecution.addListener(new GraphL(), false);
completed = new AtomicBoolean();
logsToCopy = new LinkedHashMap<String,Long>();
execution = newExecution;
@@ -385,7 +385,7 @@ private String key() {
}
}
if (execution != null) {
execution.addListener(new GraphL());
execution.addListener(new GraphL(), false);
executionPromise.set(execution);
if (!execution.isComplete()) {
// we've been restarted while we were running. let's get the execution going again.
@@ -38,6 +38,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckForNull;
@@ -90,17 +91,20 @@
os = filter.decorateLogger(null, os);
}
listener = new StreamTaskListener(os);
getExecution().addListener(new GraphListener() {
final AtomicReference<GraphListener> graphListener = new AtomicReference<GraphListener>();
graphListener.set(new GraphListener() {
@Override public void onNewHead(FlowNode node) {
try {
if (!getNode().isRunning()) {
getExecution().removeListener(graphListener.get());
listener.getLogger().close();
}
} catch (IOException x) {
Logger.getLogger(DefaultStepContext.class.getName()).log(Level.FINE, null, x);
}
}
});
getExecution().addListener(graphListener.get(), true);
}
return key.cast(listener);
} else if (Node.class.isAssignableFrom(key)) {

0 comments on commit 1084a9b

Please sign in to comment.