-
Notifications
You must be signed in to change notification settings - Fork 161
/
FlowNodeUtil.java
312 lines (277 loc) · 12.5 KB
/
FlowNodeUtil.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
/*
* The MIT License
*
* Copyright (c) 2013-2016, CloudBees, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.cloudbees.workflow.flownode;
import com.cloudbees.workflow.rest.external.RunExt;
import com.cloudbees.workflow.rest.external.StageNodeExt;
import com.cloudbees.workflow.rest.external.StatusExt;
import com.google.common.base.Predicate;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import hudson.Extension;
import hudson.ExtensionPoint;
import hudson.model.Item;
import hudson.model.Queue;
import hudson.model.listeners.ItemListener;
import jenkins.model.Jenkins;
import org.jenkinsci.plugins.workflow.actions.ErrorAction;
import org.jenkinsci.plugins.workflow.actions.NotExecutedNodeAction;
import org.jenkinsci.plugins.workflow.actions.TimingAction;
import org.jenkinsci.plugins.workflow.flow.FlowExecution;
import org.jenkinsci.plugins.workflow.graph.FlowEndNode;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.graphanalysis.ForkScanner;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import org.jenkinsci.plugins.workflow.support.actions.PauseAction;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* @author <a href="mailto:tom.fennelly@gmail.com">tom.fennelly@gmail.com</a>
*/
public class FlowNodeUtil {
private static final Logger LOGGER = Logger.getLogger(FlowNodeUtil.class.getName());
private FlowNodeUtil() {
}
public abstract static class CacheExtensionPoint implements ExtensionPoint {
public abstract Cache<String, RunExt> getRunCache();
}
// Used in testing where Jenkins is not running yet
private static final List<CacheExtension> FALLBACK_CACHES = Arrays.asList(new CacheExtension());
@Extension
@Restricted(NoExternalUse.class)
public static class CacheExtension extends CacheExtensionPoint {
// Larger cache of run data, for completed runs, keyed by flowexecution url, useful for serving info
// Actually can be used to serve Stage data too
// Because the RunExt caps the total elements returned, and this is fully realized, this is the fastest way
protected final Cache<String, RunExt> runData = CacheBuilder.newBuilder().maximumSize(1000).build();
public Cache<String, RunExt> getRunCache() {
return this.runData;
}
public static List<CacheExtension> all() {
Jenkins myJenkins = Jenkins.getInstance();
if ( myJenkins == null) {
return FALLBACK_CACHES;
} else {
return myJenkins.getExtensionList(CacheExtension.class);
}
}
}
@CheckForNull
public static RunExt getCachedRun(@Nonnull WorkflowRun run) {
RunExt cachedRun = CacheExtension.all().get(0).getRunCache().getIfPresent(run.getExternalizableId());
if (cachedRun != null) {
return cachedRun;
}
return null;
}
public static void cacheRun(WorkflowRun run, RunExt runExt) {
if (!run.isBuilding()) {
CacheExtension.all().get(0).getRunCache().put(run.getExternalizableId(), runExt);
}
}
public static boolean isNotPartOfRunningBuild(FlowExecution execution) {
return (execution != null && execution.isComplete());
}
/** Find a node following this one, using an optimized approach */
@CheckForNull
public static FlowNode getNodeAfter(@Nonnull final FlowNode node) {
if (node.isRunning() || node instanceof FlowEndNode) {
return null;
}
FlowNode nextNode = null;
FlowExecution exec = node.getExecution();
int iota = 0;
// Look for the next node or the one after it to see if it follows the current node, this can be much faster
// It relies on the IDs being an iota (a number), but falls back to exhaustive search in case of failure
try {
iota = Integer.parseInt(node.getId());
nextNode = exec.getNode(Integer.toString(iota + 1));
if (nextNode != null && nextNode.getParents().contains(node)) {
return nextNode;
}
} catch (IOException ioe) {
try {
nextNode = exec.getNode(Integer.toString(iota + 2));
if (nextNode != null && nextNode.getParents().contains(node)) {
return nextNode;
}
} catch (IOException ioe2) {
// Thrown when node with that ID does not exist
}
} catch (NumberFormatException nfe) {
// Can't parse iota as number, fall back to looking for parent
}
// Find node after this one, scanning everything until this one
final FlowNode after = new ForkScanner().findFirstMatch(node.getExecution().getCurrentHeads(), Collections.singletonList(node), new Predicate<FlowNode>() {
public boolean apply(@Nonnull FlowNode f) {
List<FlowNode> parents = f.getParents();
return (parents.contains(node));
}
});
return after;
}
/**
* Is the supplied node causing the workflow to pause at that point.
* @param flowNode The node.
* @return True if the node is causing the workflow to pause, otherwise false.
*/
public static boolean isPauseNode(FlowNode flowNode) {
return PauseAction.isPaused(flowNode);
}
// Enables us to get the status of a node without creating a bunch of objects
public static StatusExt getStatus(FlowNode node) {
boolean isExecuted = NotExecutedNodeAction.isExecuted(node);
if (isExecuted) {
ErrorAction errorAction = node.getError();
return StatusExt.valueOf(errorAction);
} else {
return StatusExt.NOT_EXECUTED;
}
}
@CheckForNull
public static WorkflowRun getWorkflowRunForExecution(@CheckForNull FlowExecution exec) {
if (exec == null) {
return null;
}
try {
Queue.Executable executable = exec.getOwner().getExecutable();
if (executable instanceof WorkflowRun) {
WorkflowRun myRun = (WorkflowRun) executable;
return myRun;
}
} catch (IOException ioe) {
throw new RuntimeException("Execution probably has not begun, or invalid pipeline data!", ioe);
}
return null;
}
/** List the stage nodes for a FlowExecution -- needed for back-compat.
* Note: far more efficient than existing implementation because it uses the cache of run info.
*/
@Nonnull
public static List<FlowNode> getStageNodes(@CheckForNull FlowExecution execution) throws RuntimeException {
WorkflowRun run = getWorkflowRunForExecution(execution);
if (run == null) {
return Collections.emptyList();
}
RunExt runExt = RunExt.create(run);
if (runExt.getStages() != null) {
ArrayList<FlowNode> nodes = new ArrayList<FlowNode>(runExt.getStages().size());
try {
for (StageNodeExt st : runExt.getStages()) {
nodes.add(execution.getNode(st.getId()));
}
return nodes;
} catch (IOException ioe) {
throw new RuntimeException("Unable to load flownode for valid run ", ioe);
}
} else {
return Collections.emptyList();
}
}
/** Needed for back-compat, returns nodes inside a stage
* Note: far more efficient than existing implementation because it uses the cache of run info.
*/
@Nonnull
public static List<FlowNode> getStageNodes(@CheckForNull FlowNode stageNode) {
if (stageNode == null) { return Collections.emptyList(); }
FlowExecution exec = stageNode.getExecution();
WorkflowRun run = getWorkflowRunForExecution(exec);
if (run == null) { return Collections.emptyList(); }
RunExt runExt = RunExt.create(run);
if (runExt.getStages() == null || runExt.getStages().isEmpty()) { return Collections.emptyList(); }
List<String> childIds = null;
for (StageNodeExt st : runExt.getStages()) {
if (st.getId().equals(stageNode.getId())) {
childIds = st.getAllChildNodeIds();
break;
}
}
try {
if (childIds == null) { return Collections.emptyList(); }
List<FlowNode> nodes = new ArrayList<FlowNode>(childIds.size());
for (String s : childIds) {
nodes.add(exec.getNode(s));
}
return nodes;
} catch (IOException ioe) {
throw new RuntimeException("Failed to load a FlowNode, even though run exists! ", ioe);
}
}
/** This is used to cover an obscure case where a WorkflowJob is renamed BUT
* a previous WorkflowJob existed with cached execution data.
* Otherwise the previous job's cached data would be returned.
**/
@Extension
public static class RenameHandler extends ItemListener {
/** Removes all cache entries, because the pipeline has been deleted/renamed.
* Works by scanning the cache -- which is faster than iterating all builds because
* it does not require deserializing build records, and cache is capped at 1000 entries.
*/
private void removeCachedRuns(String pipelineFullName) {
String runPrefix = pipelineFullName+"#"; // See Run#getExternalizableId - this is hardcoded
CacheExtension ext = CacheExtension.all().get(0);
Cache<String, RunExt> rc = ext.getRunCache();
ConcurrentMap<String, RunExt> runMap = rc.asMap();
for (String cacheRunId : runMap.keySet()) { // Put the Concurrent in ConcurrentMap to work for us
// Null-check may not be needed, but just in case of mutation by another thread
if (cacheRunId != null && cacheRunId.startsWith(runPrefix)) {
runMap.remove(cacheRunId); // Map view writes through modifications
}
}
}
@Override
public void onLocationChanged(Item item, String oldFullName, String newFullName) {
// We need to invalidate cache entries because all the URLs within the run will have changed, such as for logs
removeCachedRuns(oldFullName);
}
@Override
public void onDeleted(Item item) {
if (item instanceof WorkflowJob) {
removeCachedRuns(item.getFullName());
}
}
}
/**
* Simple debug utility for dumping a node list to sysout.
* @param nodeList The list to dump.
*/
public static void dumpNodes(List<FlowNode> nodeList) {
System.out.println("------------------------------------------------------------------------------------------");
for (FlowNode node : nodeList) {
System.out.println("[" + node.getId() + "][" + TimingAction.getStartTime(node) + "] " + node.getDisplayName());
}
System.out.println("------------------------------------------------------------------------------------------");
}
}