diff --git a/pom.xml b/pom.xml index 7c1b984f..a0d59635 100644 --- a/pom.xml +++ b/pom.xml @@ -26,13 +26,13 @@ THE SOFTWARE. org.jenkins-ci.plugins plugin - 2.6 + 2.22 throttle-concurrents hpi Jenkins Throttle Concurrent Builds Plug-in - 1.9.1-SNAPSHOT + 2.0-beta-SNAPSHOT http://wiki.jenkins-ci.org/display/JENKINS/Throttle+Concurrent+Builds+Plugin Plugin to throttle the number of concurrent builds of a single job per node. @@ -44,12 +44,11 @@ THE SOFTWARE. - 1.609.3 + 1.642.3 UTF-8 - 1.6 - 1.6 false + 7 @@ -105,29 +104,87 @@ THE SOFTWARE. org.jenkins-ci.plugins matrix-project - 1.4.1 + 1.8 - + + org.jenkins-ci.plugins.workflow + workflow-api + 2.12 + + + org.jenkins-ci.plugins.workflow + workflow-step-api + 2.7 + + + org.jenkins-ci.plugins.workflow + workflow-durable-task-step + 2.8 + + org.jenkins-ci.plugins cloudbees-folder - 4.0 + 6.0.2 test org.jenkins-ci.plugins matrix-auth - 1.2 + 1.4 test org.jenkins-ci.plugins credentials - 1.9.4 + 2.1.13 test - + + org.jenkins-ci.plugins.workflow + workflow-cps + 2.28 + test + + + org.jenkins-ci.plugins.workflow + workflow-cps + 2.28 + tests + test + + + org.jenkins-ci.plugins.workflow + workflow-job + 2.9 + test + + + org.jenkins-ci.plugins.workflow + workflow-basic-steps + 2.3 + test + + + org.jenkins-ci.plugins + junit + 1.15 + test + + + org.jenkins-ci.plugins.workflow + workflow-support + 2.13 + test + + + org.jenkins-ci.plugins.workflow + workflow-support + 2.13 + tests + test + org.assertj assertj-core @@ -140,12 +197,6 @@ THE SOFTWARE. 0.7.2 test - - com.google.guava - guava - 18.0 - test - diff --git a/src/main/java/hudson/plugins/throttleconcurrents/ThrottleJobProperty.java b/src/main/java/hudson/plugins/throttleconcurrents/ThrottleJobProperty.java index 1f26ada6..733d6eec 100644 --- a/src/main/java/hudson/plugins/throttleconcurrents/ThrottleJobProperty.java +++ b/src/main/java/hudson/plugins/throttleconcurrents/ThrottleJobProperty.java @@ -10,13 +10,15 @@ import hudson.model.JobProperty; import hudson.model.JobPropertyDescriptor; import hudson.model.Queue; +import hudson.model.Run; +import hudson.model.TaskListener; import hudson.util.FormValidation; import hudson.util.ListBoxModel; import hudson.Util; -import hudson.matrix.MatrixBuild; import hudson.matrix.MatrixProject; import hudson.matrix.MatrixRun; +import java.io.IOException; import java.util.Arrays; import java.util.ArrayList; import java.util.Collection; @@ -24,15 +26,23 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.WeakHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; + import jenkins.model.Jenkins; import net.sf.json.JSONObject; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; +import org.jenkinsci.plugins.workflow.flow.FlowExecution; +import org.jenkinsci.plugins.workflow.flow.FlowExecutionOwner; +import org.jenkinsci.plugins.workflow.graph.FlowNode; import org.kohsuke.stapler.DataBoundConstructor; import org.kohsuke.stapler.QueryParameter; import org.kohsuke.stapler.StaplerRequest; @@ -69,8 +79,8 @@ public ThrottleJobProperty(Integer maxConcurrentPerNode, String paramsToUseForLimit, @CheckForNull ThrottleMatrixProjectOptions matrixOptions ) { - this.maxConcurrentPerNode = maxConcurrentPerNode == null ? 0 : maxConcurrentPerNode; - this.maxConcurrentTotal = maxConcurrentTotal == null ? 0 : maxConcurrentTotal; + this.maxConcurrentPerNode = maxConcurrentPerNode; + this.maxConcurrentTotal = maxConcurrentTotal; this.categories = categories == null ? new CopyOnWriteArrayList() : new CopyOnWriteArrayList(categories); @@ -225,24 +235,55 @@ public List getParamsToCompare() { return paramsToCompare; } - static List getCategoryTasks(String category) { - assert category != null && !category.equals(""); + /** + * Get the list of categories for a given run ID (from {@link Run#getExternalizableId()}) and flow node ID (from + * {@link FlowNode#getId()}, if that run/flow node combination is recorded for one or more categories. + * + * @param runId The run ID + * @param flowNodeId The flow node ID + * @return A list of category names. May be empty. + */ + @Nonnull + static List getCategoriesForRunAndFlowNode(@Nonnull String runId, @Nonnull String flowNodeId) { + List categories = new ArrayList<>(); + + final DescriptorImpl descriptor = fetchDescriptor(); + + for (ThrottleCategory cat : descriptor.getCategories()) { + Map> runs = descriptor.getThrottledPipelinesForCategory(cat.getCategoryName()); + + if (!runs.isEmpty() && runs.containsKey(runId) && runs.get(runId).contains(flowNodeId)) { + categories.add(cat.getCategoryName()); + } + } + + return categories; + } + + /** + * Get all {@link Queue.Task}s with {@link ThrottleJobProperty}s attached to them. + * + * @param category a non-null string, the category name. + * @return A list of {@link Queue.Task}s with {@link ThrottleJobProperty} attached. + */ + static List getCategoryTasks(@Nonnull String category) { + assert !StringUtils.isEmpty(category); List categoryTasks = new ArrayList(); Collection properties; - DescriptorImpl descriptor = Jenkins.getActiveInstance().getDescriptorByType(DescriptorImpl.class); + DescriptorImpl descriptor = fetchDescriptor(); synchronized (descriptor.propertiesByCategoryLock) { - Map _properties = descriptor.propertiesByCategory.get(category); + Map _properties = descriptor.propertiesByCategory.get(category); properties = _properties != null ? new ArrayList(_properties.keySet()) : Collections.emptySet(); } for (ThrottleJobProperty t : properties) { if (t.getThrottleEnabled()) { if (t.getCategories() != null && t.getCategories().contains(category)) { - Job p = t.owner; + Job p = t.owner; if (/*is a task*/ p instanceof Queue.Task && /* not deleted */getItem(p.getParent(), p.getName()) == p && /* has not since been reconfigured */ p.getProperty(ThrottleJobProperty.class) == t) { categoryTasks.add((Queue.Task) p); if (p instanceof MatrixProject && t.isThrottleMatrixConfigurations()) { - for (MatrixConfiguration mc : ((MatrixProject)p).getActiveConfigurations()) { + for (MatrixConfiguration mc : ((MatrixProject) p).getActiveConfigurations()) { categoryTasks.add(mc); } } @@ -250,8 +291,61 @@ static List getCategoryTasks(String category) { } } } + return categoryTasks; } + + /** + * Gets a map of IDs for {@link Run}s to a list of {@link FlowNode}s currently running for a given category. Removes any + * no longer valid run/flow node combinations from the internal tracking for that category, due to the run not being + * found, the run not being a {@link FlowExecutionOwner.Executable}, the run no longer building, etc + * + * @param category The category name to look for. + * @return a map of IDs for {@link Run}s to lists of {@link FlowNode}s for this category, if any. May be empty. + */ + @Nonnull + static Map> getThrottledPipelineRunsForCategory(@Nonnull String category) { + Map> throttledPipelines = new TreeMap<>(); + + final DescriptorImpl descriptor = fetchDescriptor(); + for (Map.Entry> currentPipeline : descriptor.getThrottledPipelinesForCategory(category).entrySet()) { + Run flowNodeRun = Run.fromExternalizableId(currentPipeline.getKey()); + List flowNodes = new ArrayList<>(); + + if (flowNodeRun == null || + !(flowNodeRun instanceof FlowExecutionOwner.Executable) || + !flowNodeRun.isBuilding()) { + descriptor.removeAllFromPipelineRunForCategory(currentPipeline.getKey(), category, null); + } else { + FlowExecutionOwner executionOwner = ((FlowExecutionOwner.Executable) flowNodeRun).asFlowExecutionOwner(); + if (executionOwner != null) { + FlowExecution execution = executionOwner.getOrNull(); + if (execution == null) { + descriptor.removeAllFromPipelineRunForCategory(currentPipeline.getKey(), category, null); + } else { + for (String flowNodeId : currentPipeline.getValue()) { + try { + FlowNode node = execution.getNode(flowNodeId); + if (node != null) { + flowNodes.add(node); + } else { + descriptor.removeThrottledPipelineForCategory(currentPipeline.getKey(), flowNodeId, category, null); + } + } catch (IOException e) { + // do nothing + } + } + } + } + } + if (!flowNodes.isEmpty()) { + throttledPipelines.put(currentPipeline.getKey(), flowNodes); + } + } + + return throttledPipelines; + } + private static Item getItem(ItemGroup group, String name) { if (group instanceof Jenkins) { return ((Jenkins) group).getItemMap().get(name); @@ -259,10 +353,18 @@ private static Item getItem(ItemGroup group, String name) { return group.getItem(name); } } + + public static DescriptorImpl fetchDescriptor() { + return Jenkins.getActiveInstance().getDescriptorByType(DescriptorImpl.class); + } @Extension public static final class DescriptorImpl extends JobPropertyDescriptor { + private static final Logger LOGGER = Logger.getLogger(DescriptorImpl.class.getName()); + private List categories; + + private Map>> throttledPipelinesByCategory; /** Map from category names, to properties including that category. */ private transient Map> propertiesByCategory @@ -369,7 +471,115 @@ public ListBoxModel doFillCategoryItems() { return m; } - + + @Override + public void load() { + super.load(); + initThrottledPipelines(); + LOGGER.log(Level.FINE, "load: {0}", throttledPipelinesByCategory); + } + + private synchronized void initThrottledPipelines() { + if (throttledPipelinesByCategory == null) { + throttledPipelinesByCategory = new TreeMap<>(); + } + } + + @Override + public void save() { + super.save(); + LOGGER.log(Level.FINE, "save: {0}", throttledPipelinesByCategory); + } + + @Nonnull + public synchronized Map> getThrottledPipelinesForCategory(@Nonnull String category) { + return internalGetThrottledPipelinesForCategory(category); + } + + @Nonnull + private Map> internalGetThrottledPipelinesForCategory(@Nonnull String category) { + if (getCategoryByName(category) != null) { + if (throttledPipelinesByCategory.containsKey(category)) { + return throttledPipelinesByCategory.get(category); + } + } + return new TreeMap<>(); + } + + public synchronized void addThrottledPipelineForCategory(@Nonnull String runId, + @Nonnull String flowNodeId, + @Nonnull String category, + TaskListener listener) { + if (getCategoryByName(category) == null) { + if (listener != null) { + listener.getLogger().println(Messages.ThrottleJobProperty_DescriptorImpl_NoSuchCategory(category)); + } + } else { + Map> currentPipelines = internalGetThrottledPipelinesForCategory(category); + + List flowNodes = currentPipelines.get(runId); + if (flowNodes == null) { + flowNodes = new ArrayList<>(); + } + flowNodes.add(flowNodeId); + currentPipelines.put(runId, flowNodes); + throttledPipelinesByCategory.put(category, currentPipelines); + } + } + + public synchronized void removeThrottledPipelineForCategory(@Nonnull String runId, + @Nonnull String flowNodeId, + @Nonnull String category, + TaskListener listener) { + if (getCategoryByName(category) == null) { + if (listener != null) { + listener.getLogger().println(Messages.ThrottleJobProperty_DescriptorImpl_NoSuchCategory(category)); + } + } else { + Map> currentPipelines = internalGetThrottledPipelinesForCategory(category); + + if (!currentPipelines.isEmpty()) { + List flowNodes = currentPipelines.get(runId); + if (flowNodes != null && flowNodes.contains(flowNodeId)) { + flowNodes.remove(flowNodeId); + } + if (flowNodes != null && !flowNodes.isEmpty()) { + currentPipelines.put(runId, flowNodes); + } else { + currentPipelines.remove(runId); + } + } + + if (currentPipelines.isEmpty()) { + throttledPipelinesByCategory.remove(category); + } else { + throttledPipelinesByCategory.put(category, currentPipelines); + } + } + } + + public synchronized void removeAllFromPipelineRunForCategory(@Nonnull String runId, + @Nonnull String category, + TaskListener listener) { + if (getCategoryByName(category) == null) { + if (listener != null) { + listener.getLogger().println(Messages.ThrottleJobProperty_DescriptorImpl_NoSuchCategory(category)); + } + } else { + Map> currentPipelines = internalGetThrottledPipelinesForCategory(category); + + if (!currentPipelines.isEmpty()) { + if (currentPipelines.containsKey(runId)) { + currentPipelines.remove(runId); + } + } + if (currentPipelines.isEmpty()) { + throttledPipelinesByCategory.remove(category); + } else { + throttledPipelinesByCategory.put(category, currentPipelines); + } + } + } } public static final class ThrottleCategory extends AbstractDescribableImpl { @@ -383,8 +593,8 @@ public ThrottleCategory(String categoryName, Integer maxConcurrentPerNode, Integer maxConcurrentTotal, List nodeLabeledPairs) { - this.maxConcurrentPerNode = maxConcurrentPerNode == null ? 0 : maxConcurrentPerNode; - this.maxConcurrentTotal = maxConcurrentTotal == null ? 0 : maxConcurrentTotal; + this.maxConcurrentPerNode = maxConcurrentPerNode; + this.maxConcurrentTotal = maxConcurrentTotal; this.categoryName = categoryName; this.nodeLabeledPairs = nodeLabeledPairs == null ? new ArrayList() : nodeLabeledPairs; @@ -434,21 +644,20 @@ public static final class NodeLabeledPair extends AbstractDescribableImpl pipelineCategories = categoriesForPipeline(task); + // Handle multi-configuration filters - if (!shouldBeThrottled(task, tjp)) { + if (!shouldBeThrottled(task, tjp) && pipelineCategories.isEmpty()) { return null; } - if (tjp!=null && tjp.getThrottleEnabled()) { - CauseOfBlockage cause = canRunImpl(task, tjp); + if (!pipelineCategories.isEmpty() || (tjp!=null && tjp.getThrottleEnabled())) { + CauseOfBlockage cause = canRunImpl(task, tjp, pipelineCategories); if (cause != null) { return cause; } - - if (tjp.getThrottleOption().equals("project")) { - if (tjp.getMaxConcurrentPerNode().intValue() > 0) { - int maxConcurrentPerNode = tjp.getMaxConcurrentPerNode().intValue(); - int runCount = buildsOfProjectOnNode(node, task); - - // This would mean that there are as many or more builds currently running than are allowed. - if (runCount >= maxConcurrentPerNode) { - return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_MaxCapacityOnNode(runCount)); + if (tjp != null) { + if (tjp.getThrottleOption().equals("project")) { + if (tjp.getMaxConcurrentPerNode().intValue() > 0) { + int maxConcurrentPerNode = tjp.getMaxConcurrentPerNode().intValue(); + int runCount = buildsOfProjectOnNode(node, task); + + // This would mean that there are as many or more builds currently running than are allowed. + if (runCount >= maxConcurrentPerNode) { + return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_MaxCapacityOnNode(runCount)); + } } + } else if (tjp.getThrottleOption().equals("category")) { + return throttleCheckForCategoriesOnNode(node, jenkins, tjp.getCategories()); } + } else if (!pipelineCategories.isEmpty()) { + return throttleCheckForCategoriesOnNode(node, jenkins, pipelineCategories); } - else if (tjp.getThrottleOption().equals("category")) { - // If the project is in one or more categories... - if (tjp.getCategories() != null && !tjp.getCategories().isEmpty()) { - for (String catNm : tjp.getCategories()) { - // Quick check that catNm itself is a real string. - if (catNm != null && !catNm.equals("")) { - List categoryTasks = ThrottleJobProperty.getCategoryTasks(catNm); - - ThrottleJobProperty.ThrottleCategory category = - ((ThrottleJobProperty.DescriptorImpl)tjp.getDescriptor()).getCategoryByName(catNm); - - // Double check category itself isn't null - if (category != null) { - // Max concurrent per node for category - int maxConcurrentPerNode = getMaxConcurrentPerNodeBasedOnMatchingLabels( - node, category, category.getMaxConcurrentPerNode().intValue()); - if (maxConcurrentPerNode > 0) { - int runCount = 0; - for (Task catTask : categoryTasks) { - if (jenkins.getQueue().isPending(catTask)) { - return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_BuildPending()); - } - runCount += buildsOfProjectOnNode(node, catTask); - } - // This would mean that there are as many or more builds currently running than are allowed. - if (runCount >= maxConcurrentPerNode) { - return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_MaxCapacityOnNode(runCount)); + } + + return null; + } + + private CauseOfBlockage throttleCheckForCategoriesOnNode(Node node, Jenkins jenkins, List categories) { + // If the project is in one or more categories... + if (!categories.isEmpty()) { + for (String catNm : categories) { + // Quick check that catNm itself is a real string. + if (catNm != null && !catNm.equals("")) { + List categoryTasks = ThrottleJobProperty.getCategoryTasks(catNm); + + ThrottleJobProperty.ThrottleCategory category = + ThrottleJobProperty.fetchDescriptor().getCategoryByName(catNm); + + // Double check category itself isn't null + if (category != null) { + int runCount = 0; + // Max concurrent per node for category + int maxConcurrentPerNode = getMaxConcurrentPerNodeBasedOnMatchingLabels( + node, category, category.getMaxConcurrentPerNode().intValue()); + if (maxConcurrentPerNode > 0) { + for (Task catTask : categoryTasks) { + if (jenkins.getQueue().isPending(catTask)) { + return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_BuildPending()); + } + runCount += buildsOfProjectOnNode(node, catTask); + } + Map> throttledPipelines = ThrottleJobProperty.getThrottledPipelineRunsForCategory(catNm); + for (Map.Entry> entry : throttledPipelines.entrySet()) { + if (hasPendingPipelineForCategory(entry.getValue())) { + return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_BuildPending()); + } + Run r = Run.fromExternalizableId(entry.getKey()); + if (r != null) { + List flowNodes = entry.getValue(); + if (r.isBuilding()) { + runCount += pipelinesOnNode(node, r, flowNodes); } } } + // This would mean that there are as many or more builds currently running than are allowed. + if (runCount >= maxConcurrentPerNode) { + return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_MaxCapacityOnNode(runCount)); + } } } } } } - return null; } + private boolean hasPendingPipelineForCategory(List flowNodes) { + for (Queue.BuildableItem pending : Jenkins.getActiveInstance().getQueue().getPendingItems()) { + if (isTaskThrottledPipeline(pending.task, flowNodes)) { + return true; + } + } + + return false; + } + // @Override on jenkins 4.127+ , but still compatible with 1.399 public CauseOfBlockage canRun(Queue.Item item) { ThrottleJobProperty tjp = getThrottleJobProperty(item.task); - if (tjp!=null && tjp.getThrottleEnabled()) { - if (tjp.isLimitOneJobWithMatchingParams() && isAnotherBuildWithSameParametersRunningOnAnyNode(item)) { + List pipelineCategories = categoriesForPipeline(item.task); + + if (!pipelineCategories.isEmpty() || (tjp!=null && tjp.getThrottleEnabled())) { + if (tjp != null && tjp.isLimitOneJobWithMatchingParams() && isAnotherBuildWithSameParametersRunningOnAnyNode(item)) { return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_OnlyOneWithMatchingParameters()); } - return canRun(item.task, tjp); + return canRun(item.task, tjp, pipelineCategories); } return null; } @@ -138,6 +179,7 @@ public CauseOfBlockage canRun(Queue.Item item) { @Nonnull private ThrottleMatrixProjectOptions getMatrixOptions(Task task) { ThrottleJobProperty tjp = getThrottleJobProperty(task); + if (tjp == null){ return ThrottleMatrixProjectOptions.DEFAULT; } @@ -169,9 +211,9 @@ private boolean shouldBeThrottled(@Nonnull Task task, @CheckForNull ThrottleJobP return true; } - public CauseOfBlockage canRun(Task task, ThrottleJobProperty tjp) { + public CauseOfBlockage canRun(Task task, ThrottleJobProperty tjp, List pipelineCategories) { if (Jenkins.getAuthentication() == ACL.SYSTEM) { - return canRunImpl(task, tjp); + return canRunImpl(task, tjp, pipelineCategories); } // Throttle-concurrent-builds requires READ permissions for all projects. @@ -181,65 +223,83 @@ public CauseOfBlockage canRun(Task task, ThrottleJobProperty tjp) { SecurityContextHolder.setContext(auth); try { - return canRunImpl(task, tjp); + return canRunImpl(task, tjp, pipelineCategories); } finally { SecurityContextHolder.setContext(orig); } } - private CauseOfBlockage canRunImpl(Task task, ThrottleJobProperty tjp) { + private CauseOfBlockage canRunImpl(Task task, ThrottleJobProperty tjp, List pipelineCategories) { final Jenkins jenkins = Jenkins.getActiveInstance(); - if (!shouldBeThrottled(task, tjp)) { + if (!shouldBeThrottled(task, tjp) && pipelineCategories.isEmpty()) { return null; } if (jenkins.getQueue().isPending(task)) { return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_BuildPending()); } - if (tjp.getThrottleOption().equals("project")) { - if (tjp.getMaxConcurrentTotal().intValue() > 0) { - int maxConcurrentTotal = tjp.getMaxConcurrentTotal().intValue(); - int totalRunCount = buildsOfProjectOnAllNodes(task); + if (tjp != null) { + if (tjp.getThrottleOption().equals("project")) { + if (tjp.getMaxConcurrentTotal().intValue() > 0) { + int maxConcurrentTotal = tjp.getMaxConcurrentTotal().intValue(); + int totalRunCount = buildsOfProjectOnAllNodes(task); - if (totalRunCount >= maxConcurrentTotal) { - return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_MaxCapacityTotal(totalRunCount)); + if (totalRunCount >= maxConcurrentTotal) { + return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_MaxCapacityTotal(totalRunCount)); + } } + } else if (tjp.getThrottleOption().equals("category")) { + return throttleCheckForCategoriesAllNodes(jenkins, tjp.getCategories()); } + } else if (!pipelineCategories.isEmpty()) { + return throttleCheckForCategoriesAllNodes(jenkins, pipelineCategories); } - // If the project is in one or more categories... - else if (tjp.getThrottleOption().equals("category")) { - if (tjp.getCategories() != null && !tjp.getCategories().isEmpty()) { - for (String catNm : tjp.getCategories()) { - // Quick check that catNm itself is a real string. - if (catNm != null && !catNm.equals("")) { - List categoryTasks = ThrottleJobProperty.getCategoryTasks(catNm); - - ThrottleJobProperty.ThrottleCategory category = - ((ThrottleJobProperty.DescriptorImpl)tjp.getDescriptor()).getCategoryByName(catNm); - - // Double check category itself isn't null - if (category != null) { - if (category.getMaxConcurrentTotal().intValue() > 0) { - int maxConcurrentTotal = category.getMaxConcurrentTotal().intValue(); - int totalRunCount = 0; - - for (Task catTask : categoryTasks) { - if (jenkins.getQueue().isPending(catTask)) { - return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_BuildPending()); - } - totalRunCount += buildsOfProjectOnAllNodes(catTask); - } - if (totalRunCount >= maxConcurrentTotal) { - return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_MaxCapacityTotal(totalRunCount)); + return null; + } + + private CauseOfBlockage throttleCheckForCategoriesAllNodes(Jenkins jenkins, @Nonnull List categories) { + for (String catNm : categories) { + // Quick check that catNm itself is a real string. + if (catNm != null && !catNm.equals("")) { + List categoryTasks = ThrottleJobProperty.getCategoryTasks(catNm); + + ThrottleJobProperty.ThrottleCategory category = + ThrottleJobProperty.fetchDescriptor().getCategoryByName(catNm); + + // Double check category itself isn't null + if (category != null) { + if (category.getMaxConcurrentTotal().intValue() > 0) { + int maxConcurrentTotal = category.getMaxConcurrentTotal().intValue(); + int totalRunCount = 0; + + for (Task catTask : categoryTasks) { + if (jenkins.getQueue().isPending(catTask)) { + return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_BuildPending()); + } + totalRunCount += buildsOfProjectOnAllNodes(catTask); + } + Map> throttledPipelines = ThrottleJobProperty.getThrottledPipelineRunsForCategory(catNm); + for (Map.Entry> entry : throttledPipelines.entrySet()) { + if (hasPendingPipelineForCategory(entry.getValue())) { + return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_BuildPending()); + } + Run r = Run.fromExternalizableId(entry.getKey()); + if (r != null) { + List flowNodes = entry.getValue(); + if (r.isBuilding()) { + totalRunCount += pipelinesOnAllNodes(r, flowNodes); } } + } + if (totalRunCount >= maxConcurrentTotal) { + return CauseOfBlockage.fromMessage(Messages._ThrottleQueueTaskDispatcher_MaxCapacityTotal(totalRunCount)); } } + } } } - return null; } @@ -274,26 +334,24 @@ private boolean isAnotherBuildWithSameParametersRunningOnNode(Node node, Queue.I if (computer != null) { for (Executor exec : computer.getExecutors()) { - if (item != null && item.task != null) { - // TODO: refactor into a nameEquals helper method - final Queue.Executable currentExecutable = exec.getCurrentExecutable(); - final SubTask parentTask = currentExecutable != null ? currentExecutable.getParent() : null; - if (currentExecutable != null && parentTask != null && - parentTask.getOwnerTask() != null && - parentTask.getOwnerTask().getName().equals(item.task.getName())) { - List executingUnitParams = getParametersFromWorkUnit(exec.getCurrentWorkUnit()); - executingUnitParams = doFilterParams(paramsToCompare, executingUnitParams); - - if (executingUnitParams.containsAll(itemParams)) { - LOGGER.log(Level.FINE, "build (" + exec.getCurrentWorkUnit() + - ") with identical parameters (" + - executingUnitParams + ") is already running."); - return true; - } + // TODO: refactor into a nameEquals helper method + final Queue.Executable currentExecutable = exec.getCurrentExecutable(); + final SubTask parentTask = currentExecutable != null ? currentExecutable.getParent() : null; + if (currentExecutable != null && + parentTask.getOwnerTask().getName().equals(item.task.getName())) { + List executingUnitParams = getParametersFromWorkUnit(exec.getCurrentWorkUnit()); + executingUnitParams = doFilterParams(paramsToCompare, executingUnitParams); + + if (executingUnitParams.containsAll(itemParams)) { + LOGGER.log(Level.FINE, "build (" + exec.getCurrentWorkUnit() + + ") with identical parameters (" + + executingUnitParams + ") is already running."); + return true; } } } } + return false; } @@ -325,10 +383,7 @@ public List getParametersFromWorkUnit(WorkUnit unit) { List actions = unit.context.actions; for (Action action : actions) { if (action instanceof ParametersAction) { - ParametersAction params = (ParametersAction) action; - if (params != null) { - paramsList = params.getParameters(); - } + paramsList = ((ParametersAction)action).getParameters(); } } } @@ -349,6 +404,24 @@ public List getParametersFromQueueItem(Queue.Item item) { return paramsList; } + private List categoriesForPipeline(Task task) { + if (task instanceof PlaceholderTask) { + PlaceholderTask placeholderTask = (PlaceholderTask)task; + try { + FlowNode firstThrottle = firstThrottleStartNode(placeholderTask.getNode()); + Run r = placeholderTask.run(); + if (firstThrottle != null && r != null) { + return ThrottleJobProperty.getCategoriesForRunAndFlowNode(r.getExternalizableId(), + firstThrottle.getId()); + } + } catch (IOException | InterruptedException e) { + LOGGER.log(Level.WARNING, "Error getting categories for pipeline {0}: {1}", + new Object[] {task.getDisplayName(), e}); + return new ArrayList<>(); + } + } + return new ArrayList<>(); + } @CheckForNull private ThrottleJobProperty getThrottleJobProperty(Task task) { @@ -363,6 +436,31 @@ private ThrottleJobProperty getThrottleJobProperty(Task task) { return null; } + private int pipelinesOnNode(@Nonnull Node node, @Nonnull Run run, @Nonnull List flowNodes) { + int runCount = 0; + LOGGER.log(Level.FINE, "Checking for pipelines of {0} on node {1}", new Object[] {run.getDisplayName(), node.getDisplayName()}); + + Computer computer = node.toComputer(); + if (computer != null) { //Not all nodes are certain to become computers, like nodes with 0 executors. + // Don't count flyweight tasks that might not consume an actual executor, unlike with builds. + for (Executor e : computer.getExecutors()) { + runCount += pipelinesOnExecutor(run, e, flowNodes); + } + } + + return runCount; + } + + private int pipelinesOnAllNodes(@Nonnull Run run, @Nonnull List flowNodes) { + final Jenkins jenkins = Jenkins.getActiveInstance(); + int totalRunCount = pipelinesOnNode(jenkins, run, flowNodes); + + for (Node node : jenkins.getNodes()) { + totalRunCount += pipelinesOnNode(node, run, flowNodes); + } + return totalRunCount; + } + private int buildsOfProjectOnNode(Node node, Task task) { if (!shouldBeThrottled(task, getThrottleJobProperty(task))) { return 0; @@ -408,6 +506,77 @@ private int buildsOnExecutor(Task task, Executor exec) { return runCount; } + /** + * Get the count of currently executing {@link PlaceholderTask}s on a given {@link Executor} for a given {@link Run} + * and list of {@link FlowNode}s in that run that have been throttled. + * + * @param run The {@link Run} we care about. + * @param exec The {@link Executor} we're checking on. + * @param flowNodes The list of {@link FlowNode}s associated with that run that have been throttled with a particular + * category. + * @return 1 if there's something currently executing on that executor and it's of that run and one of the provided + * flow nodes, 0 otherwise. + */ + private int pipelinesOnExecutor(@Nonnull Run run, @Nonnull Executor exec, @Nonnull List flowNodes) { + final Queue.Executable currentExecutable = exec.getCurrentExecutable(); + if (currentExecutable != null) { + SubTask parent = currentExecutable.getParent(); + if (parent instanceof PlaceholderTask) { + PlaceholderTask task = (PlaceholderTask)parent; + if (run.equals(task.run())) { + if (isTaskThrottledPipeline(task, flowNodes)) { + return 1; + } + } + } + } + + return 0; + } + + private boolean isTaskThrottledPipeline(Task origTask, List flowNodes) { + if (origTask instanceof PlaceholderTask) { + PlaceholderTask task = (PlaceholderTask)origTask; + try { + FlowNode firstThrottle = firstThrottleStartNode(task.getNode()); + return firstThrottle != null && flowNodes.contains(firstThrottle); + } catch (IOException | InterruptedException e) { + // TODO: do something? + } + } + + return false; + } + + /** + * Given a {@link FlowNode}, find the {@link FlowNode} most directly enclosing this one that comes from a {@link ThrottleStep}. + * + * @param inner The inner {@link FlowNode} + * @return The most immediate enclosing {@link FlowNode} of the inner one that is associated with {@link ThrottleStep}. May be null. + */ + @CheckForNull + private FlowNode firstThrottleStartNode(@CheckForNull FlowNode inner) { + if (inner != null) { + LinearBlockHoppingScanner scanner = new LinearBlockHoppingScanner(); + scanner.setup(inner); + for (FlowNode enclosing : scanner) { + if (enclosing != null && + enclosing instanceof BlockStartNode && + enclosing instanceof StepNode && + // There are two BlockStartNodes (aka StepStartNodes) for ThrottleStep, so make sure we get the + // first one of those two, which will not have BodyInvocationAction.class on it. + enclosing.getAction(BodyInvocationAction.class) == null) { + // Check if this is a *different* throttling node. + StepDescriptor desc = ((StepNode) enclosing).getDescriptor(); + if (desc != null && desc.getClass().equals(ThrottleStep.DescriptorImpl.class)) { + return enclosing; + } + } + } + } + return null; + } + /** * @param node to compare labels with. * @param category to compare labels with. diff --git a/src/main/java/hudson/plugins/throttleconcurrents/pipeline/ThrottleStep.java b/src/main/java/hudson/plugins/throttleconcurrents/pipeline/ThrottleStep.java new file mode 100644 index 00000000..a7b817bb --- /dev/null +++ b/src/main/java/hudson/plugins/throttleconcurrents/pipeline/ThrottleStep.java @@ -0,0 +1,76 @@ +package hudson.plugins.throttleconcurrents.pipeline; + +import hudson.Extension; +import hudson.model.TaskListener; +import hudson.plugins.throttleconcurrents.ThrottleJobProperty; +import hudson.util.FormValidation; +import hudson.util.ListBoxModel; +import org.jenkinsci.plugins.workflow.steps.Step; +import org.jenkinsci.plugins.workflow.steps.StepContext; +import org.jenkinsci.plugins.workflow.steps.StepDescriptor; +import org.jenkinsci.plugins.workflow.steps.StepExecution; +import org.kohsuke.stapler.DataBoundConstructor; +import org.kohsuke.stapler.QueryParameter; + +import javax.annotation.Nonnull; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class ThrottleStep extends Step implements Serializable { + private List categories; + + @DataBoundConstructor + public ThrottleStep(@Nonnull List categories) { + this.categories = categories; + } + + @Nonnull + public List getCategories() { + return categories; + } + + @Override + public StepExecution start(StepContext context) throws Exception { + return new ThrottleStepExecution(this, context); + } + + private static final long serialVersionUID = 1L; + + @Extension + public static final class DescriptorImpl extends StepDescriptor { + @Override + public String getFunctionName() { + return "throttle"; + } + + @Override + public String getDisplayName() { + return Messages.ThrottleStep_DisplayName(); + } + + @Override + public boolean takesImplicitBlockArgument() { + return true; + } + + @Override + public Set> getRequiredContext() { + return Collections.singleton(TaskListener.class); + } + + public FormValidation doCheckCategoryName(@QueryParameter String value) { + return ThrottleJobProperty.fetchDescriptor().doCheckCategoryName(value); + } + + public List getCategories() { + return ThrottleJobProperty.fetchDescriptor().getCategories(); + } + + public ListBoxModel doFillCategoryItems() { + return ThrottleJobProperty.fetchDescriptor().doFillCategoryItems(); + } + } + +} diff --git a/src/main/java/hudson/plugins/throttleconcurrents/pipeline/ThrottleStepExecution.java b/src/main/java/hudson/plugins/throttleconcurrents/pipeline/ThrottleStepExecution.java new file mode 100644 index 00000000..45037c3c --- /dev/null +++ b/src/main/java/hudson/plugins/throttleconcurrents/pipeline/ThrottleStepExecution.java @@ -0,0 +1,122 @@ +package hudson.plugins.throttleconcurrents.pipeline; + +import hudson.model.Run; +import hudson.model.TaskListener; +import hudson.plugins.throttleconcurrents.ThrottleJobProperty; +import org.apache.commons.lang.StringUtils; +import org.jenkinsci.plugins.workflow.graph.FlowNode; +import org.jenkinsci.plugins.workflow.steps.BodyExecutionCallback; +import org.jenkinsci.plugins.workflow.steps.StepContext; +import org.jenkinsci.plugins.workflow.steps.StepExecution; + +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class ThrottleStepExecution extends StepExecution { + private final ThrottleStep step; + + public ThrottleStepExecution(@Nonnull ThrottleStep step, StepContext context) { + super(context); + this.step = step; + } + + @Nonnull + public List getCategories() { + return Collections.unmodifiableList(step.getCategories()); + } + + private List validateCategories(ThrottleJobProperty.DescriptorImpl descriptor, TaskListener listener) { + List undefinedCategories = new ArrayList<>(); + Set duplicates = new HashSet<>(); + List unique = new ArrayList<>(); + + if (descriptor.getCategories().isEmpty()) { + undefinedCategories.addAll(getCategories()); + } else { + for (String c : getCategories()) { + if (!unique.contains(c)) { + unique.add(c); + } else { + duplicates.add(c); + } + if (descriptor.getCategoryByName(c) == null) { + undefinedCategories.add(c); + } + } + } + + if (!duplicates.isEmpty()) { + listener.getLogger().println("One or more duplicate categories (" + StringUtils.join(duplicates, ", ") + + ") specified. Duplicates will be ignored."); + } + + if (!undefinedCategories.isEmpty()) { + throw new IllegalArgumentException("One or more specified categories do not exist: " + StringUtils.join(undefinedCategories, ", ")); + } + + return unique; + } + + @Override + public boolean start() throws Exception { + Run r = getContext().get(Run.class); + TaskListener listener = getContext().get(TaskListener.class); + FlowNode flowNode = getContext().get(FlowNode.class); + + ThrottleJobProperty.DescriptorImpl descriptor = ThrottleJobProperty.fetchDescriptor(); + + String runId = null; + String flowNodeId = null; + + if (r != null && flowNode != null) { + runId = r.getExternalizableId(); + flowNodeId = flowNode.getId(); + for (String category : validateCategories(descriptor, listener)) { + descriptor.addThrottledPipelineForCategory(runId, flowNodeId, category, listener); + } + } + + getContext().newBodyInvoker() + .withCallback(new Callback(runId, flowNodeId, getCategories())) + .start(); + return false; + } + + @Override + public void stop(Throwable cause) throws Exception { + + } + + private static final class Callback extends BodyExecutionCallback.TailCall { + @CheckForNull + private String runId; + @CheckForNull + private String flowNodeId; + private List categories = new ArrayList<>(); + + + private static final long serialVersionUID = 1; + + Callback(@CheckForNull String runId, @CheckForNull String flowNodeId, @Nonnull List categories) { + this.runId = runId; + this.flowNodeId = flowNodeId; + this.categories.addAll(categories); + } + + @Override protected void finished(StepContext context) throws Exception { + if (runId != null && flowNodeId != null) { + for (String category : categories) { + ThrottleJobProperty.fetchDescriptor().removeThrottledPipelineForCategory(runId, + flowNodeId, + category, + context.get(TaskListener.class)); + } + } + } + } +} diff --git a/src/main/resources/hudson/plugins/throttleconcurrents/Messages.properties b/src/main/resources/hudson/plugins/throttleconcurrents/Messages.properties index 98caeb7d..50ba33d2 100644 --- a/src/main/resources/hudson/plugins/throttleconcurrents/Messages.properties +++ b/src/main/resources/hudson/plugins/throttleconcurrents/Messages.properties @@ -3,4 +3,6 @@ ThrottleQueueTaskDispatcher.MaxCapacityTotal=Already running {0} builds across a ThrottleQueueTaskDispatcher.BuildPending=A build is pending launch ThrottleQueueTaskDispatcher.OnlyOneWithMatchingParameters=A build with matching parameters is already running -ThrottleMatrixProjectOptions.DisplayName=Additional options for Matrix projects \ No newline at end of file +ThrottleMatrixProjectOptions.DisplayName=Additional options for Matrix projects + +ThrottleJobProperty.DescriptorImpl.NoSuchCategory=Requested category "{0}" does not exist, so cannot throttle. diff --git a/src/main/resources/hudson/plugins/throttleconcurrents/ThrottleJobProperty/help.html b/src/main/resources/hudson/plugins/throttleconcurrents/ThrottleJobProperty/help.html new file mode 100644 index 00000000..25dc588d --- /dev/null +++ b/src/main/resources/hudson/plugins/throttleconcurrents/ThrottleJobProperty/help.html @@ -0,0 +1,4 @@ +
+

Note that the Throttle Concurrent Builds configuration here does not work for Pipeline jobs.

+

For that, use the throttle step.

+
\ No newline at end of file diff --git a/src/main/resources/hudson/plugins/throttleconcurrents/pipeline/Messages.properties b/src/main/resources/hudson/plugins/throttleconcurrents/pipeline/Messages.properties new file mode 100644 index 00000000..f3b21f5b --- /dev/null +++ b/src/main/resources/hudson/plugins/throttleconcurrents/pipeline/Messages.properties @@ -0,0 +1 @@ +ThrottleStep.DisplayName=Throttle execution of node blocks within this body \ No newline at end of file diff --git a/src/main/resources/hudson/plugins/throttleconcurrents/pipeline/ThrottleStep/config.jelly b/src/main/resources/hudson/plugins/throttleconcurrents/pipeline/ThrottleStep/config.jelly new file mode 100644 index 00000000..c452ea95 --- /dev/null +++ b/src/main/resources/hudson/plugins/throttleconcurrents/pipeline/ThrottleStep/config.jelly @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + No categories configured + + + diff --git a/src/main/resources/hudson/plugins/throttleconcurrents/pipeline/ThrottleStep/help-categories.html b/src/main/resources/hudson/plugins/throttleconcurrents/pipeline/ThrottleStep/help-categories.html new file mode 100644 index 00000000..c43611be --- /dev/null +++ b/src/main/resources/hudson/plugins/throttleconcurrents/pipeline/ThrottleStep/help-categories.html @@ -0,0 +1,3 @@ +
+

One or more throttle categories in a list.

+
\ No newline at end of file diff --git a/src/test/java/hudson/plugins/throttleconcurrents/ThrottleConcurrentTest.java b/src/test/java/hudson/plugins/throttleconcurrents/ThrottleConcurrentTest.java index 655f68db..5b204391 100644 --- a/src/test/java/hudson/plugins/throttleconcurrents/ThrottleConcurrentTest.java +++ b/src/test/java/hudson/plugins/throttleconcurrents/ThrottleConcurrentTest.java @@ -15,6 +15,7 @@ import hudson.slaves.RetentionStrategy; import hudson.tasks.Builder; import jenkins.model.Jenkins; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.jvnet.hudson.test.JenkinsRule; @@ -25,6 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat; +@Ignore("Depends on a newer version of Guava than can be used with Pipeline") public class ThrottleConcurrentTest extends ScenarioTest { @Rule @ScenarioState diff --git a/src/test/java/hudson/plugins/throttleconcurrents/ThrottleStepTest.java b/src/test/java/hudson/plugins/throttleconcurrents/ThrottleStepTest.java new file mode 100644 index 00000000..63147d5d --- /dev/null +++ b/src/test/java/hudson/plugins/throttleconcurrents/ThrottleStepTest.java @@ -0,0 +1,452 @@ +package hudson.plugins.throttleconcurrents; + +import hudson.Launcher; +import hudson.model.AbstractBuild; +import hudson.model.BuildListener; +import hudson.model.Computer; +import hudson.model.Executor; +import hudson.model.FreeStyleBuild; +import hudson.model.FreeStyleProject; +import hudson.model.Label; +import hudson.model.Node; +import hudson.model.Queue; +import hudson.model.Result; +import hudson.model.queue.QueueTaskFuture; +import hudson.plugins.throttleconcurrents.pipeline.ThrottleStep; +import hudson.slaves.DumbSlave; +import hudson.slaves.NodeProperty; +import hudson.slaves.RetentionStrategy; +import org.apache.commons.lang.StringUtils; +import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition; +import org.jenkinsci.plugins.workflow.cps.SnippetizerTester; +import org.jenkinsci.plugins.workflow.job.WorkflowJob; +import org.jenkinsci.plugins.workflow.job.WorkflowRun; +import org.jenkinsci.plugins.workflow.support.steps.ExecutorStepExecution; +import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runners.model.Statement; +import org.jvnet.hudson.test.BuildWatcher; +import org.jvnet.hudson.test.RestartableJenkinsRule; +import org.jvnet.hudson.test.TestBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Semaphore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class ThrottleStepTest { + private static final String ONE_PER_NODE = "one_per_node"; + private static final String OTHER_ONE_PER_NODE = "other_one_per_node"; + private static final String TWO_TOTAL = "two_total"; + + @Rule + public RestartableJenkinsRule story = new RestartableJenkinsRule(); + + @ClassRule + public static BuildWatcher buildWatcher = new BuildWatcher(); + + @Rule + public TemporaryFolder firstAgentTmp = new TemporaryFolder(); + @Rule + public TemporaryFolder secondAgentTmp = new TemporaryFolder(); + + public void setupAgentsAndCategories() throws Exception { + DumbSlave firstAgent = new DumbSlave("first-agent", "dummy agent", firstAgentTmp.getRoot().getAbsolutePath(), + "4", Node.Mode.NORMAL, "on-agent", story.j.createComputerLauncher(null), + RetentionStrategy.NOOP, Collections.>emptyList()); + + DumbSlave secondAgent = new DumbSlave("second-agent", "dummy agent", secondAgentTmp.getRoot().getAbsolutePath(), + "4", Node.Mode.NORMAL, "on-agent", story.j.createComputerLauncher(null), + RetentionStrategy.NOOP, Collections.>emptyList()); + + story.j.jenkins.addNode(firstAgent); + story.j.jenkins.addNode(secondAgent); + + ThrottleJobProperty.ThrottleCategory firstCat = new ThrottleJobProperty.ThrottleCategory(ONE_PER_NODE, 1, 0, null); + ThrottleJobProperty.ThrottleCategory secondCat = new ThrottleJobProperty.ThrottleCategory(TWO_TOTAL, 0, 2, null); + ThrottleJobProperty.ThrottleCategory thirdCat = new ThrottleJobProperty.ThrottleCategory(OTHER_ONE_PER_NODE, 1, 0, null); + + ThrottleJobProperty.DescriptorImpl descriptor = story.j.jenkins.getDescriptorByType(ThrottleJobProperty.DescriptorImpl.class); + assertNotNull(descriptor); + descriptor.setCategories(Arrays.asList(firstCat, secondCat, thirdCat)); + } + + @Test + public void onePerNode() throws Exception { + story.addStep(new Statement() { + @Override + public void evaluate() throws Throwable { + setupAgentsAndCategories(); + WorkflowJob firstJob = story.j.jenkins.createProject(WorkflowJob.class, "first-job"); + firstJob.setDefinition(getJobFlow("first", ONE_PER_NODE, "first-agent")); + + WorkflowRun firstJobFirstRun = firstJob.scheduleBuild2(0).waitForStart(); + SemaphoreStep.waitForStart("wait-first-job/1", firstJobFirstRun); + + WorkflowJob secondJob = story.j.jenkins.createProject(WorkflowJob.class, "second-job"); + secondJob.setDefinition(getJobFlow("second", ONE_PER_NODE, "first-agent")); + + WorkflowRun secondJobFirstRun = secondJob.scheduleBuild2(0).waitForStart(); + story.j.waitForMessage("Still waiting to schedule task", secondJobFirstRun); + assertFalse(story.j.jenkins.getQueue().isEmpty()); + Node n = story.j.jenkins.getNode("first-agent"); + assertNotNull(n); + assertEquals(1, n.toComputer().countBusy()); + hasPlaceholderTaskForRun(n, firstJobFirstRun); + + SemaphoreStep.success("wait-first-job/1", null); + story.j.assertBuildStatusSuccess(story.j.waitForCompletion(firstJobFirstRun)); + SemaphoreStep.waitForStart("wait-second-job/1", secondJobFirstRun); + assertTrue(story.j.jenkins.getQueue().isEmpty()); + assertEquals(1, n.toComputer().countBusy()); + hasPlaceholderTaskForRun(n, secondJobFirstRun); + SemaphoreStep.success("wait-second-job/1", null); + story.j.assertBuildStatusSuccess(story.j.waitForCompletion(secondJobFirstRun)); + + } + }); + } + + @Test + public void duplicateCategories() throws Exception { + story.addStep(new Statement() { + @Override + public void evaluate() throws Throwable { + setupAgentsAndCategories(); + + WorkflowJob j = story.j.jenkins.createProject(WorkflowJob.class, "first-job"); + j.setDefinition(new CpsFlowDefinition("throttle(['" + ONE_PER_NODE + "', '" + ONE_PER_NODE +"']) { echo 'Hello' }", false)); + + WorkflowRun b = j.scheduleBuild2(0).waitForStart(); + + story.j.assertBuildStatusSuccess(story.j.waitForCompletion(b)); + + story.j.assertLogContains("One or more duplicate categories (" + ONE_PER_NODE + ") specified. Duplicates will be ignored.", b); + story.j.assertLogContains("Hello", b); + } + }); + } + + @Test + public void undefinedCategories() throws Exception { + story.addStep(new Statement() { + @Override + public void evaluate() throws Throwable { + WorkflowJob j = story.j.jenkins.createProject(WorkflowJob.class, "first-job"); + j.setDefinition(new CpsFlowDefinition("throttle(['undefined', 'also-undefined']) { echo 'Hello' }", false)); + + WorkflowRun b = j.scheduleBuild2(0).waitForStart(); + + story.j.assertBuildStatus(Result.FAILURE, story.j.waitForCompletion(b)); + story.j.assertLogContains("One or more specified categories do not exist: undefined, also-undefined", b); + story.j.assertLogNotContains("Hello", b); + } + }); + } + + @Test + public void multipleCategories() throws Exception { + story.addStep(new Statement() { + @Override + public void evaluate() throws Throwable { + setupAgentsAndCategories(); + WorkflowJob firstJob = story.j.jenkins.createProject(WorkflowJob.class, "first-job"); + firstJob.setDefinition(getJobFlow("first", ONE_PER_NODE, "first-agent")); + + WorkflowRun firstJobFirstRun = firstJob.scheduleBuild2(0).waitForStart(); + SemaphoreStep.waitForStart("wait-first-job/1", firstJobFirstRun); + + WorkflowJob secondJob = story.j.jenkins.createProject(WorkflowJob.class, "second-job"); + secondJob.setDefinition(getJobFlow("second", OTHER_ONE_PER_NODE, "second-agent")); + + WorkflowRun secondJobFirstRun = secondJob.scheduleBuild2(0).waitForStart(); + SemaphoreStep.waitForStart("wait-second-job/1", secondJobFirstRun); + + WorkflowJob thirdJob = story.j.jenkins.createProject(WorkflowJob.class, "third-job"); + thirdJob.setDefinition(getJobFlow("third", + Arrays.asList(ONE_PER_NODE, OTHER_ONE_PER_NODE), + "on-agent")); + + WorkflowRun thirdJobFirstRun = thirdJob.scheduleBuild2(0).waitForStart(); + story.j.waitForMessage("Still waiting to schedule task", thirdJobFirstRun); + assertFalse(story.j.jenkins.getQueue().isEmpty()); + Node n = story.j.jenkins.getNode("first-agent"); + assertNotNull(n); + assertEquals(1, n.toComputer().countBusy()); + hasPlaceholderTaskForRun(n, firstJobFirstRun); + + Node n2 = story.j.jenkins.getNode("second-agent"); + assertNotNull(n2); + assertEquals(1, n2.toComputer().countBusy()); + hasPlaceholderTaskForRun(n2, secondJobFirstRun); + + SemaphoreStep.success("wait-first-job/1", null); + story.j.assertBuildStatusSuccess(story.j.waitForCompletion(firstJobFirstRun)); + + SemaphoreStep.waitForStart("wait-third-job/1", thirdJobFirstRun); + assertTrue(story.j.jenkins.getQueue().isEmpty()); + assertEquals(1, n.toComputer().countBusy()); + hasPlaceholderTaskForRun(n, thirdJobFirstRun); + + SemaphoreStep.success("wait-second-job/1", null); + story.j.assertBuildStatusSuccess(story.j.waitForCompletion(secondJobFirstRun)); + + SemaphoreStep.success("wait-third-job/1", null); + story.j.assertBuildStatusSuccess(story.j.waitForCompletion(thirdJobFirstRun)); + } + }); + } + + @Test + public void onePerNodeParallel() throws Exception { + story.addStep(new Statement() { + @Override + public void evaluate() throws Throwable { + setupAgentsAndCategories(); + WorkflowJob firstJob = story.j.jenkins.createProject(WorkflowJob.class, "first-job"); + firstJob.setDefinition(new CpsFlowDefinition("parallel(\n" + + " a: { " + getThrottleScript("first-branch-a", ONE_PER_NODE, "on-agent") + " },\n" + + " b: { " + getThrottleScript("first-branch-b", ONE_PER_NODE, "on-agent") + " },\n" + + " c: { " + getThrottleScript("first-branch-c", ONE_PER_NODE, "on-agent") + " }\n" + + ")\n", false)); + + WorkflowRun run1 = firstJob.scheduleBuild2(0).waitForStart(); + SemaphoreStep.waitForStart("wait-first-branch-a-job/1", run1); + SemaphoreStep.waitForStart("wait-first-branch-b-job/1", run1); + + WorkflowJob secondJob = story.j.jenkins.createProject(WorkflowJob.class, "second-job"); + secondJob.setDefinition(new CpsFlowDefinition("parallel(\n" + + " a: { " + getThrottleScript("second-branch-a", ONE_PER_NODE, "on-agent") + " },\n" + + " b: { " + getThrottleScript("second-branch-b", ONE_PER_NODE, "on-agent") + " },\n" + + " c: { " + getThrottleScript("second-branch-c", ONE_PER_NODE, "on-agent") + " }\n" + + ")\n", false)); + + WorkflowRun run2 = secondJob.scheduleBuild2(0).waitForStart(); + + Computer first = story.j.jenkins.getNode("first-agent").toComputer(); + Computer second = story.j.jenkins.getNode("second-agent").toComputer(); + assertEquals(1, first.countBusy()); + assertEquals(1, second.countBusy()); + + story.j.waitForMessage("Still waiting to schedule task", run1); + story.j.waitForMessage("Still waiting to schedule task", run2); + + SemaphoreStep.success("wait-first-branch-a-job/1", null); + SemaphoreStep.waitForStart("wait-first-branch-c-job/1", run1); + assertEquals(1, first.countBusy()); + assertEquals(1, second.countBusy()); + SemaphoreStep.success("wait-first-branch-b-job/1", null); + SemaphoreStep.waitForStart("wait-second-branch-a-job/1", run1); + assertEquals(1, first.countBusy()); + assertEquals(1, second.countBusy()); + SemaphoreStep.success("wait-first-branch-c-job/1", null); + SemaphoreStep.waitForStart("wait-second-branch-b-job/1", run1); + assertEquals(1, first.countBusy()); + assertEquals(1, second.countBusy()); + SemaphoreStep.success("wait-second-branch-a-job/1", null); + SemaphoreStep.waitForStart("wait-second-branch-c-job/1", run1); + assertEquals(1, first.countBusy()); + assertEquals(1, second.countBusy()); + SemaphoreStep.success("wait-second-branch-b-job/1", null); + SemaphoreStep.success("wait-second-branch-c-job/1", null); + + story.j.assertBuildStatusSuccess(story.j.waitForCompletion(run1)); + story.j.assertBuildStatusSuccess(story.j.waitForCompletion(run2)); + } + }); + } + + @Test + public void twoTotal() throws Exception { + story.addStep(new Statement() { + @Override + public void evaluate() throws Throwable { + setupAgentsAndCategories(); + WorkflowJob firstJob = story.j.jenkins.createProject(WorkflowJob.class, "first-job"); + firstJob.setDefinition(getJobFlow("first", TWO_TOTAL, "first-agent")); + + WorkflowRun firstJobFirstRun = firstJob.scheduleBuild2(0).waitForStart(); + SemaphoreStep.waitForStart("wait-first-job/1", firstJobFirstRun); + + WorkflowJob secondJob = story.j.jenkins.createProject(WorkflowJob.class, "second-job"); + secondJob.setDefinition(getJobFlow("second", TWO_TOTAL, "second-agent")); + + WorkflowRun secondJobFirstRun = secondJob.scheduleBuild2(0).waitForStart(); + SemaphoreStep.waitForStart("wait-second-job/1", secondJobFirstRun); + + WorkflowJob thirdJob = story.j.jenkins.createProject(WorkflowJob.class, "third-job"); + thirdJob.setDefinition(getJobFlow("third", TWO_TOTAL, "on-agent")); + + WorkflowRun thirdJobFirstRun = thirdJob.scheduleBuild2(0).waitForStart(); + story.j.waitForMessage("Still waiting to schedule task", thirdJobFirstRun); + assertFalse(story.j.jenkins.getQueue().isEmpty()); + Node n = story.j.jenkins.getNode("first-agent"); + assertNotNull(n); + assertEquals(1, n.toComputer().countBusy()); + hasPlaceholderTaskForRun(n, firstJobFirstRun); + + Node n2 = story.j.jenkins.getNode("second-agent"); + assertNotNull(n2); + assertEquals(1, n2.toComputer().countBusy()); + hasPlaceholderTaskForRun(n2, secondJobFirstRun); + + SemaphoreStep.success("wait-first-job/1", null); + story.j.assertBuildStatusSuccess(story.j.waitForCompletion(firstJobFirstRun)); + SemaphoreStep.waitForStart("wait-third-job/1", thirdJobFirstRun); + assertTrue(story.j.jenkins.getQueue().isEmpty()); + assertEquals(1, n.toComputer().countBusy()); + hasPlaceholderTaskForRun(n, thirdJobFirstRun); + + SemaphoreStep.success("wait-second-job/1", null); + story.j.assertBuildStatusSuccess(story.j.waitForCompletion(secondJobFirstRun)); + + SemaphoreStep.success("wait-third-job/1", null); + story.j.assertBuildStatusSuccess(story.j.waitForCompletion(thirdJobFirstRun)); + } + }); + } + + @Test + public void interopWithFreestyle() throws Exception { + final Semaphore semaphore = new Semaphore(1); + + story.addStep(new Statement() { + @Override + public void evaluate() throws Throwable { + setupAgentsAndCategories(); + WorkflowJob firstJob = story.j.jenkins.createProject(WorkflowJob.class, "first-job"); + firstJob.setDefinition(getJobFlow("first", ONE_PER_NODE, "first-agent")); + + WorkflowRun firstJobFirstRun = firstJob.scheduleBuild2(0).waitForStart(); + SemaphoreStep.waitForStart("wait-first-job/1", firstJobFirstRun); + + FreeStyleProject freeStyleProject = story.j.createFreeStyleProject("f"); + freeStyleProject.addProperty(new ThrottleJobProperty( + null, // maxConcurrentPerNode + null, // maxConcurrentTotal + Arrays.asList(ONE_PER_NODE), // categories + true, // throttleEnabled + "category", // throttleOption + false, + null, + ThrottleMatrixProjectOptions.DEFAULT + )); + freeStyleProject.setAssignedLabel(Label.get("first-agent")); + freeStyleProject.getBuildersList().add(new TestBuilder() { + @Override + public boolean perform(AbstractBuild build, Launcher launcher, BuildListener listener) throws InterruptedException, IOException { + semaphore.acquire(); + return true; + } + }); + + semaphore.acquire(); + + QueueTaskFuture futureBuild = freeStyleProject.scheduleBuild2(0); + assertFalse(story.j.jenkins.getQueue().isEmpty()); + assertEquals(1, story.j.jenkins.getQueue().getItems().length); + Queue.Item i = story.j.jenkins.getQueue().getItems()[0]; + assertTrue(i.task instanceof FreeStyleProject); + + Node n = story.j.jenkins.getNode("first-agent"); + assertNotNull(n); + assertEquals(1, n.toComputer().countBusy()); + hasPlaceholderTaskForRun(n, firstJobFirstRun); + SemaphoreStep.success("wait-first-job/1", null); + story.j.assertBuildStatusSuccess(story.j.waitForCompletion(firstJobFirstRun)); + + FreeStyleBuild freeStyleBuild = futureBuild.waitForStart(); + assertEquals(1, n.toComputer().countBusy()); + for (Executor e : n.toComputer().getExecutors()) { + if (e.isBusy()) { + assertEquals(freeStyleBuild, e.getCurrentExecutable()); + } + } + + WorkflowJob secondJob = story.j.jenkins.createProject(WorkflowJob.class, "second-job"); + secondJob.setDefinition(getJobFlow("second", ONE_PER_NODE, "first-agent")); + + WorkflowRun secondJobFirstRun = secondJob.scheduleBuild2(0).waitForStart(); + story.j.waitForMessage("Still waiting to schedule task", secondJobFirstRun); + assertFalse(story.j.jenkins.getQueue().isEmpty()); + + assertEquals(1, n.toComputer().countBusy()); + for (Executor e : n.toComputer().getExecutors()) { + if (e.isBusy()) { + assertEquals(freeStyleBuild, e.getCurrentExecutable()); + } + } + semaphore.release(); + + story.j.assertBuildStatusSuccess(story.j.waitForCompletion(freeStyleBuild)); + SemaphoreStep.waitForStart("wait-second-job/1", secondJobFirstRun); + assertTrue(story.j.jenkins.getQueue().isEmpty()); + assertEquals(1, n.toComputer().countBusy()); + hasPlaceholderTaskForRun(n, secondJobFirstRun); + SemaphoreStep.success("wait-second-job/1", null); + story.j.assertBuildStatusSuccess(story.j.waitForCompletion(secondJobFirstRun)); + } + }); + } + + private CpsFlowDefinition getJobFlow(String jobName, String category, String label) { + return getJobFlow(jobName, Collections.singletonList(category), label); + } + + private CpsFlowDefinition getJobFlow(String jobName, List categories, String label) { + // This should be sandbox:true, but when I do that, I get org.jenkinsci.plugins.scriptsecurity.sandbox.RejectedAccessException: Scripts not permitted to use method groovy.lang.GroovyObject invokeMethod java.lang.String java.lang.Object + // And I cannot figure out why. So for now... + return new CpsFlowDefinition(getThrottleScript(jobName, categories, label), false); + } + + private String getThrottleScript(String jobName, String category, String label) { + return getThrottleScript(jobName, Collections.singletonList(category), label); + } + + private String getThrottleScript(String jobName, List categories, String label) { + List quoted = new ArrayList<>(); + for (String c : categories) { + quoted.add("'" + c + "'"); + } + + return "throttle([" + StringUtils.join(quoted, ", ") + "]) {\n" + + " echo 'hi there'\n" + + " node('" + label + "') {\n" + + " semaphore 'wait-" + jobName + "-job'\n" + + " }\n" + + "}\n"; + } + + @Test + public void snippetizer() throws Exception { + story.addStep(new Statement() { + @Override + public void evaluate() throws Throwable { + setupAgentsAndCategories(); + SnippetizerTester st = new SnippetizerTester(story.j); + st.assertRoundTrip(new ThrottleStep(Collections.singletonList(ONE_PER_NODE)), + "throttle(['" + ONE_PER_NODE + "']) {\n // some block\n}"); + } + }); + } + + private void hasPlaceholderTaskForRun(Node n, WorkflowRun r) throws Exception { + for (Executor exec : n.toComputer().getExecutors()) { + if (exec.getCurrentExecutable() != null) { + assertTrue(exec.getCurrentExecutable().getParent() instanceof ExecutorStepExecution.PlaceholderTask); + ExecutorStepExecution.PlaceholderTask task = (ExecutorStepExecution.PlaceholderTask)exec.getCurrentExecutable().getParent(); + assertEquals(r, task.run()); + } + } + } +}