Skip to content

Commit

Permalink
Swap termination on scheduled termination
Browse files Browse the repository at this point in the history
  • Loading branch information
Artem Stasiuk committed Sep 8, 2019
1 parent 934ef8c commit 7ead55f
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 84 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/amazon/jenkins/ec2fleet/CloudNanny.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ protected void doRun() {
info.add(Queue.withLock(new Callable<EC2FleetStatusInfo>() {
@Override
public EC2FleetStatusInfo call() {
final FleetStateStats stats = fleetCloud.updateStatus();
final FleetStateStats stats = fleetCloud.update();
return new EC2FleetStatusInfo(
fleetCloud.getFleet(), stats.getState(), fleetCloud.getLabelString(),
stats.getNumActive(), stats.getNumDesired());
Expand Down
147 changes: 89 additions & 58 deletions src/main/java/com/amazon/jenkins/ec2fleet/EC2FleetCloud.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@
import hudson.model.TaskListener;
import hudson.slaves.Cloud;
import hudson.slaves.ComputerConnector;
import hudson.slaves.Messages;
import hudson.slaves.NodeProperty;
import hudson.slaves.NodeProvisioner;
import hudson.slaves.OfflineCause.SimpleOfflineCause;
import hudson.util.FormValidation;
import hudson.util.ListBoxModel;
import jenkins.model.Jenkins;
Expand All @@ -40,6 +38,7 @@
import org.kohsuke.stapler.StaplerRequest;
import org.springframework.util.ObjectUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -132,13 +131,19 @@ public class EC2FleetCloud extends Cloud {
private final boolean disableTaskResubmit;

/**
* {@link EC2FleetCloud#updateStatus()} updating this field, this is one thread
* {@link EC2FleetCloud#update()} updating this field, this is one thread
* related to {@link CloudNanny}. At the same time {@link IdleRetentionStrategy}
* call {@link EC2FleetCloud#terminateInstance(String)} to stop instance when it free
* and use this field to know what capacity is current one
* call {@link EC2FleetCloud#scheduleToTerminate(String)} to stop instance when it free
* and use this field to know what capacity is current one.
* <p>
* It could be situation that <code>stats</code> is outdated and plugin will make wrong decision,
* however refresh time is low and probability of this event is low. We preferred to reduce amount of calls
* to API EC2 and increase plugin performance versus be precise. Any way outdated will be fixed after next update.
*/
private transient FleetStateStats stats;

private transient Set<String> instanceIdsToTerminate;

private transient Set<NodeProvisioner.PlannedNode> plannedNodesCache;

@DataBoundConstructor
Expand All @@ -165,7 +170,7 @@ public EC2FleetCloud(final String name,
final Integer initOnlineCheckIntervalSec,
final boolean scaleExecutorsByWeight) {
super(StringUtils.isBlank(name) ? FLEET_CLOUD_ID : name);
initCaches();
init();
this.credentialsId = credentialsId;
this.awsCredentialsId = awsCredentialsId;
this.region = region;
Expand Down Expand Up @@ -294,6 +299,7 @@ public boolean isRestrictUsage() {
public Collection<NodeProvisioner.PlannedNode> provision(
final Label label, final int excessWorkload) {
try {
// todo this lock is redundant, NodeProvisioner line 305 as example why
return Queue.withLock(new Callable<Collection<NodeProvisioner.PlannedNode>>() {
@Override
public Collection<NodeProvisioner.PlannedNode> call() {
Expand All @@ -310,10 +316,9 @@ public synchronized Collection<NodeProvisioner.PlannedNode> provisionInternal(
final Label label, int excessWorkload) {
info("excessWorkload = %s", excessWorkload);

final FleetStateStats stats = updateStatus();
final int maxAllowed = this.getMaxSize();
update();

if (stats.getNumDesired() >= maxAllowed || !"active".equals(stats.getState()))
if (stats.getNumDesired() >= getMaxSize() || !"active".equals(stats.getState()))
return Collections.emptyList();

// if the planned node has 0 executors configured force it to 1 so we end up doing an unweighted check
Expand All @@ -322,15 +327,12 @@ public synchronized Collection<NodeProvisioner.PlannedNode> provisionInternal(
// Calculate the ceiling, without having to work with doubles from Math.ceil
// https://stackoverflow.com/a/21830188/877024
final int weightedExcessWorkload = (excessWorkload + numExecutors - 1) / numExecutors;
int targetCapacity = stats.getNumDesired() + weightedExcessWorkload;

if (targetCapacity > maxAllowed) targetCapacity = maxAllowed;
int targetCapacity = Math.min(stats.getNumDesired() + weightedExcessWorkload, getMaxSize());

int toProvision = targetCapacity - stats.getNumDesired();
info("to provision = %s", toProvision);

if (toProvision < 1)
return Collections.emptyList();
if (toProvision < 1) return Collections.emptyList();

final ModifySpotFleetRequestRequest request = new ModifySpotFleetRequestRequest();
request.setSpotFleetRequestId(fleet);
Expand All @@ -345,16 +347,26 @@ public synchronized Collection<NodeProvisioner.PlannedNode> provisionInternal(
final NodeProvisioner.PlannedNode plannedNode = new NodeProvisioner.PlannedNode(
"FleetNode-" + f, SettableFuture.<Node>create(), this.numExecutors);
resultList.add(plannedNode);
this.plannedNodesCache.add(plannedNode);
plannedNodesCache.add(plannedNode);
}
return resultList;
}

public synchronized FleetStateStats updateStatus() {
/**
* Perform sync of plugin data with EC2 Spot Fleet state.
*
* @return current state
*/
public synchronized FleetStateStats update() {
info("start");

final Jenkins jenkins = Jenkins.getInstance();

final AmazonEC2 ec2 = Registry.getEc2Api().connect(getAwsCredentialsId(), region, endpoint);
final FleetStateStats stats = FleetStateStats.readClusterState(ec2, getFleet(), labelString);

updatePerformDelete(ec2, jenkins);

stats = FleetStateStats.readClusterState(ec2, getFleet(), labelString);
info("fleet instances: %s", stats.getInstances());

// Set up the lists of Jenkins nodes and fleet instances
Expand All @@ -363,7 +375,7 @@ public synchronized FleetStateStats updateStatus() {

// currentJenkinsNodes contains all registered Jenkins nodes related to this cloud
final Set<String> jenkinsInstances = new HashSet<>();
for (final Node node : Jenkins.getInstance().getNodes()) {
for (final Node node : jenkins.getNodes()) {
if (node instanceof EC2FleetNode && ((EC2FleetNode) node).getCloud() == this) {
jenkinsInstances.add(node.getNodeName());
}
Expand Down Expand Up @@ -401,7 +413,7 @@ public synchronized FleetStateStats updateStatus() {

// Update the label for all Jenkins nodes in the fleet instance cache
for (final String instanceId : jenkinsInstances) {
Node node = Jenkins.getInstance().getNode(instanceId);
Node node = jenkins.getNode(instanceId);
if (node == null)
continue;

Expand All @@ -427,51 +439,69 @@ public synchronized FleetStateStats updateStatus() {
return stats;
}

public synchronized boolean terminateInstance(final String instanceId) {
info("Attempting to terminate instance: %s", instanceId);

final Jenkins jenkins = Jenkins.getInstance();

int currentNodes = 0;
for (final Node node : jenkins.getNodes()) {
if (node instanceof EC2FleetNode && ((EC2FleetNode) node).getCloud() == this) {
currentNodes++;
private void updatePerformDelete(final AmazonEC2 ec2, final Jenkins jenkins) {
if (instanceIdsToTerminate.size() > 0) {
final Set<String> temp = instanceIdsToTerminate;
instanceIdsToTerminate = new HashSet<>();

info("Start termination for %s", temp);

// These operations aren't idempotent so only do them once
final ModifySpotFleetRequestRequest request = new ModifySpotFleetRequestRequest();
request.setSpotFleetRequestId(fleet);
final int targetCapacity = stats.getNumDesired() - instanceIdsToTerminate.size();
request.setTargetCapacity(targetCapacity);
request.setExcessCapacityTerminationPolicy("NoTermination");
ec2.modifySpotFleetRequest(request);
info("Reduce fleet target capacity to %s", targetCapacity);

// remove nodes from Jenkins
synchronized (jenkins) {
for (final String instanceId : temp) {
final Node node = jenkins.getNode(instanceId);
if (node != null) {
try {
jenkins.removeNode(node);
} catch (IOException e) {
warning("unable remove node %s from Jenkins, skip, just terminate EC2 instance", instanceId);
}
}
}
}
info("Delete terminating nodes from Jenkins %s", temp);

// terminateInstances is idempotent so it can be called until it's successful
final TerminateInstancesResult result = ec2.terminateInstances(
new TerminateInstancesRequest(new ArrayList<>(temp)));
info("Instances %s were terminated with result", temp, result.toString());
}
}

/**
* Schedule Jenkins Node and EC2 instance to termination. Check first if target capacity more
* then <code>minSize</code> otherwise reject termination.
* <p>
* Real termination will happens in {@link EC2FleetCloud#update()} which periodically called by
* {@link CloudNanny}. So it could be some lag between decision that node should be terminated
* and actual termination, you can find max lag size in {@link CloudNanny#getRecurrencePeriod()}
* <p>
* This method doesn't do real termination to reduce load for Jenkins in case when multiple nodes should be
* terminated in short time, without schedule process and batch termination, multiple calls should be raised
* to AWS EC2 API which takes some time and block cloud class.
*
* @param instanceId node name or instance ID
* @return <code>true</code> if node scheduled to delete, otherwise <code>false</code>
*/
public synchronized boolean scheduleToTerminate(final String instanceId) {
info("Attempting to terminate instance: %s", instanceId);

// We can't remove instances beyond minSize
if (currentNodes <= minSize) {
if (stats.getNumDesired() <= minSize) {
info("Not terminating %s because we need a minimum of %s instances running.", instanceId, minSize);
return false;
}

final AmazonEC2 ec2 = Registry.getEc2Api().connect(getAwsCredentialsId(), region, endpoint);

// These operations aren't idempotent so only do them once
final ModifySpotFleetRequestRequest request = new ModifySpotFleetRequestRequest();
request.setSpotFleetRequestId(fleet);
request.setTargetCapacity(stats.getNumDesired() - 1);
request.setExcessCapacityTerminationPolicy("NoTermination");
ec2.modifySpotFleetRequest(request);

// disconnect the node before terminating the instance
synchronized (jenkins) {
final Computer c = jenkins.getNode(instanceId).toComputer();
if (c.isOnline()) {
c.disconnect(SimpleOfflineCause.create(
Messages._SlaveComputer_DisconnectedBy(this.name, this.fleet)));
}
}
final Computer c = jenkins.getNode(instanceId).toComputer();
try {
c.waitUntilOffline();
} catch (InterruptedException e) {
warning("Interrupted while disconnecting %s", c.getDisplayName());
}
// terminateInstances is idempotent so it can be called until it's successful
final TerminateInstancesResult result = ec2.terminateInstances(new TerminateInstancesRequest(Collections.singletonList(instanceId)));
info("Instance %s termination result: %s", instanceId, result.toString());

instanceIdsToTerminate.add(instanceId);
return true;
}

Expand All @@ -483,14 +513,15 @@ public boolean canProvision(final Label label) {
}

private Object readResolve() {
initCaches();
init();
return this;
}

private void initCaches() {
private void init() {
id = new LazyUuid();

plannedNodesCache = new HashSet<>();
instanceIdsToTerminate = new HashSet<>();
}

private void removeNode(final String instanceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ public class EC2FleetNode extends Slave implements EphemeralNode, EC2FleetCloudA

private volatile EC2FleetCloud cloud;

@SuppressWarnings("WeakerAccess")
public EC2FleetNode(final String name, final String nodeDescription, final String remoteFS, final int numExecutors, final Mode mode, final String label,
final List<? extends NodeProperty<?>> nodeProperties, final EC2FleetCloud cloud, ComputerLauncher launcher) throws IOException, Descriptor.FormException {
super(name, nodeDescription, remoteFS, numExecutors, mode, label,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public long check(final SlaveComputer computer) {
return 0;
}

final String nodeId = compNode.getNodeName();
if (cloud.terminateInstance(nodeId)) {
final String instanceId = compNode.getNodeName();
if (cloud.scheduleToTerminate(instanceId)) {
// Instance successfully terminated, so no longer accept tasks
shouldAcceptTasks = false;
justTerminated = true;
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/com/amazon/jenkins/ec2fleet/CloudNannyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public void before() throws Exception {
when(cloud1.getFleet()).thenReturn("f1");
when(cloud2.getFleet()).thenReturn("f2");

when(cloud1.updateStatus()).thenReturn(stats1);
when(cloud2.updateStatus()).thenReturn(stats2);
when(cloud1.update()).thenReturn(stats1);
when(cloud2.update()).thenReturn(stats2);
}

@Test
Expand Down Expand Up @@ -115,7 +115,7 @@ public void shouldIgnoreNonEC2FleetClouds() throws Exception {

Whitebox.newInstance(CloudNanny.class).doRun();

verify(cloud1).updateStatus();
verify(cloud1).update();
verifyZeroInteractions(nonEc2FleetCloud);
}

Expand All @@ -139,7 +139,7 @@ public void shouldIgnoreExceptionsFromUpdateForOneofCloudAndUpdateOther() throws
clouds.add(cloud1);
clouds.add(cloud2);

when(cloud1.updateStatus()).thenThrow(new IllegalArgumentException("test"));
when(cloud1.update()).thenThrow(new IllegalArgumentException("test"));

widgets.add(widget1);

Expand Down

0 comments on commit 7ead55f

Please sign in to comment.