Skip to content

Commit

Permalink
feat(clouddriver): favor configured capacity for certain operations (#…
Browse files Browse the repository at this point in the history
…2923)

- We generally calculate targetDesired for resize operations by trusting
that clouddriver's cache accurately reflects the new desired size when
waiting for a capacity match. This is often provided via an onDemand
cache object (force cache refresh). When the new configured capacity
doesn't allow for autoscaling, we can derive targetDesired directly from
the stage context. This assists the TitusStreamingUpdateAgent which
doesn't currently implement onDemand caching.
  • Loading branch information
asher committed May 20, 2019
1 parent 6cdb446 commit 5fe370d
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ class WaitForUpInstancesTask extends AbstractWaitingForInstancesTask {
@Override
Map getAdditionalRunningStageContext(Stage stage, Map serverGroup) {
def additionalRunningStageContext = [
targetDesiredSize: calculateTargetDesiredSize(stage, serverGroup),
lastCapacityCheck: getHealthCountSnapshot(stage, serverGroup)
targetDesiredSize: calculateTargetDesiredSize(stage, serverGroup),
lastCapacityCheck: getHealthCountSnapshot(stage, serverGroup)
]

if (!stage.context.capacitySnapshot) {
def initialTargetCapacity = getServerGroupCapacity(stage, serverGroup)
additionalRunningStageContext.capacitySnapshot = [
minSize : initialTargetCapacity.min,
desiredCapacity: initialTargetCapacity.desired,
maxSize : initialTargetCapacity.max
minSize : initialTargetCapacity.min,
desiredCapacity: initialTargetCapacity.desired,
maxSize : initialTargetCapacity.max
]
}

Expand Down Expand Up @@ -116,9 +116,16 @@ class WaitForUpInstancesTask extends AbstractWaitingForInstancesTask {
return 0
}

Map<String, Integer> capacity = getServerGroupCapacity(stage, serverGroup)
Integer targetDesiredSize = capacity.desired as Integer
splainer.add("setting targetDesiredSize=${targetDesiredSize} from the desired size in capacity=${capacity}")
Map<String, Integer> currentCapacity = getServerGroupCapacity(stage, serverGroup)
Integer targetDesiredSize

if (useConfiguredCapacity(stage, currentCapacity)) {
targetDesiredSize = ((Map<String, Integer>) stage.context.capacity).desired
splainer.add("setting targetDesiredSize=${targetDesiredSize} from the configured stage context.capacity=${stage.context.capacity}")
} else {
targetDesiredSize = currentCapacity.desired as Integer
splainer.add("setting targetDesiredSize=${targetDesiredSize} from the desired size in current serverGroup capacity=${currentCapacity}")
}

if (stage.context.capacitySnapshot) {
Integer snapshotCapacity = ((Map) stage.context.capacitySnapshot).desiredCapacity as Integer
Expand All @@ -140,13 +147,31 @@ class WaitForUpInstancesTask extends AbstractWaitingForInstancesTask {
targetDesiredSize = newTargetDesiredSize
} else if (stage.context.desiredPercentage != null) {
Integer percentage = (Integer) stage.context.desiredPercentage
targetDesiredSize = getDesiredInstanceCount(capacity, percentage)
splainer.add("setting targetDesiredSize=${targetDesiredSize} based on desiredPercentage=${percentage}% of capacity=${capacity}")
targetDesiredSize = getDesiredInstanceCount(currentCapacity, percentage)
splainer.add("setting targetDesiredSize=${targetDesiredSize} based on desiredPercentage=${percentage}% of capacity=${currentCapacity}")
}

return targetDesiredSize
}

// If either the configured capacity or current serverGroup has autoscaling diasbled, calculate
// targetDesired from the configured capacity. This relaxes the need for clouddriver onDemand
// cache updates while resizing serverGroups.
static boolean useConfiguredCapacity(Stage stage, Map<String, Integer> current) {
Map<String, Integer> configured = stage.context.getOrDefault("capacity", [:]) as Map<String, Integer>

if (configured.desired == null) {
return false
}

if (current.desired == null) {
return true
}

return (configured.min == configured.max && configured.min == configured.desired) ||
(current.min == current.max && current.min == current.desired)
}

@Override
protected boolean hasSucceeded(Stage stage, Map serverGroup, List<Map> instances, Collection<String> interestingHealthProviderNames) {
allInstancesMatch(stage, serverGroup, instances, interestingHealthProviderNames)
Expand All @@ -165,13 +190,13 @@ class WaitForUpInstancesTask extends AbstractWaitingForInstancesTask {
snapshot.up++
} else if (someAreDown(instance, interestingHealthProviderNames)) {
snapshot.down++
} else if (healths.any { it.state == 'OutOfService' } ) {
} else if (healths.any { it.state == 'OutOfService' }) {
snapshot.outOfService++
} else if (healths.any { it.state == 'Starting' } ) {
} else if (healths.any { it.state == 'Starting' }) {
snapshot.starting++
} else if (healths.every { it.state == 'Succeeded' } ) {
} else if (healths.every { it.state == 'Succeeded' }) {
snapshot.succeeded++
} else if (healths.any { it.state == 'Failed' } ) {
} else if (healths.any { it.state == 'Failed' }) {
snapshot.failed++
} else {
snapshot.unknown++
Expand Down Expand Up @@ -212,9 +237,9 @@ class WaitForUpInstancesTask extends AbstractWaitingForInstancesTask {
// expectation is reconciliation has happened within 10 minutes and that the
// current server group capacity should be preferred
log.error(
"Short circuiting initial target capacity determination after 10 minutes (serverGroup: {}, executionId: {})",
"${cloudProvider}:${serverGroup.region}:${serverGroup.name}",
stage.execution.id
"Short circuiting initial target capacity determination after 10 minutes (serverGroup: {}, executionId: {})",
"${cloudProvider}:${serverGroup.region}:${serverGroup.name}",
stage.execution.id
)
return serverGroupCapacity
}
Expand All @@ -223,30 +248,30 @@ class WaitForUpInstancesTask extends AbstractWaitingForInstancesTask {
def initialTargetCapacity = getInitialTargetCapacity(stage, serverGroup)
if (!initialTargetCapacity) {
log.debug(
"Unable to determine initial target capacity (serverGroup: {}, executionId: {})",
"${cloudProvider}:${serverGroup.region}:${serverGroup.name}",
stage.execution.id
"Unable to determine initial target capacity (serverGroup: {}, executionId: {})",
"${cloudProvider}:${serverGroup.region}:${serverGroup.name}",
stage.execution.id
)
return serverGroupCapacity
}

if ((serverGroup.capacity.max == 0 && initialTargetCapacity.max != 0) ||
(serverGroup.capacity.desired == 0 && initialTargetCapacity.desired > 0)) {
(serverGroup.capacity.desired == 0 && initialTargetCapacity.desired > 0)) {
log.info(
"Overriding server group capacity (serverGroup: {}, initialTargetCapacity: {}, executionId: {})",
"${cloudProvider}:${serverGroup.region}:${serverGroup.name}",
initialTargetCapacity,
stage.execution.id
"Overriding server group capacity (serverGroup: {}, initialTargetCapacity: {}, executionId: {})",
"${cloudProvider}:${serverGroup.region}:${serverGroup.name}",
initialTargetCapacity,
stage.execution.id
)
serverGroupCapacity = initialTargetCapacity
}

log.debug(
"Determined server group capacity (serverGroup: {}, serverGroupCapacity: {}, initialTargetCapacity: {}, executionId: {}",
"${cloudProvider}:${serverGroup.region}:${serverGroup.name}",
serverGroupCapacity,
initialTargetCapacity,
stage.execution.id
"Determined server group capacity (serverGroup: {}, serverGroupCapacity: {}, initialTargetCapacity: {}, executionId: {}",
"${cloudProvider}:${serverGroup.region}:${serverGroup.name}",
serverGroupCapacity,
initialTargetCapacity,
stage.execution.id
)

return serverGroupCapacity
Expand Down Expand Up @@ -289,6 +314,7 @@ class WaitForUpInstancesTask extends AbstractWaitingForInstancesTask {

private static class NoopSplainer extends Splainer {
def add(String message) {}

def splain() {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,17 @@ class WaitForCapacityMatchTask extends AbstractInstancesCheckTask {
return false
}

splainer.add("checking if capacity matches (capacity.desired=${serverGroup.capacity.desired}, instances.size()=${instances.size()}) ")
if (serverGroup.capacity.desired != instances.size()) {
Integer desired

if (WaitForUpInstancesTask.useConfiguredCapacity(stage, serverGroup.capacity as Map<String, Integer>)) {
desired = ((Map<String, Integer>) stage.context.capacity).desired
splainer.add("using desired from stage.context.capacity ($desired)")
} else {
desired = ((Map<String, Integer>)serverGroup.capacity).desired
}

splainer.add("checking if capacity matches (desired=${desired}, instances.size()=${instances.size()}) ")
if (desired != instances.size()) {
splainer.add("short-circuiting out of WaitForCapacityMatchTask because expected and current capacity don't match}")
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,65 @@ class WaitForUpInstancesTaskSpec extends Specification {
true || false | 3 | 2 | null | 3 | 'not using source, using configured size of 2, ignoring source size of 3'
}

@Unroll
void 'calculates targetDesired based on configured capacity or servergroup depending on value'() {
when:
def serverGroup = [
asg : [
desiredCapacity: asg.desired
],
capacity: [
min : asg.min,
max : asg.max,
desired: asg.desired
]
]

def context = [
source: [useSourceCapacity: (snapshot != null)],
]

if (snapshot) {
context.capacitySnapshot = [desiredCapacity: snapshot]
}

if (configured) {
context.capacity = [
min : configured.min,
max : configured.max,
desired: configured.desired
]
}

def instances = []
(1..healthy).each {
instances << [health: [[state: 'Up']]]
}

then:
result == task.hasSucceeded(
new Stage(Execution.newPipeline("orca"), "", "", context),
serverGroup, instances, null
)

where:
result || snapshot | healthy | asg | configured
false || null | 2 | [min: 3, max: 3, desired: 3] | null
// configured is used if present and min == max == desired
true || null | 2 | [min: 3, max: 3, desired: 3] | [min: 2, max: 2, desired: 2]
// configured is used if current allows autoscaling but configured doesn't
true || null | 2 | [min: 3, max: 3, desired: 3] | [min: 2, max: 500, desired: 2]
true || null | 2 | [min: 5, max: 5, desired: 5] | [min: 1, max: 5, desired: 2]
// useSourceCapacity with a snapshot is used over configured and current
true || 3 | 3 | [min: 5, max: 5, desired: 5] | [min: 5, max: 5, desired: 5]
true || 3 | 3 | [min: 5, max: 5, desired: 5] | null
false || 4 | 3 | [min: 5, max: 5, desired: 5] | null
// sourceCapacity is ignored if > than the calculated target due to a scale down corner case
true || 4 | 3 | [min: 3, max: 3, desired: 3] | null
false || 4 | 3 | [min: 3, max: 3, desired: 3] | [min: 4, max: 4, desired: 4]
true || 5 | 4 | [min: 3, max: 3, desired: 3] | [min: 4, max: 4, desired: 4]
}

@Unroll
void 'should throw an exception if targetHealthyDeployPercentage is not between 0 and 100'() {
when:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,57 @@ class WaitForCapacityMatchTaskSpec extends Specification {
]
}

@Unroll
void 'should wait based on configured capacity when autoscaling is disabled'() {
when:
def serverGroup = [
asg : [
desiredCapacity: asg.desired
],
capacity: [
min : asg.min,
max : asg.max,
desired: asg.desired
]
]

def context = [
source: [useSourceCapacity: false],
]

if (configured) {
context.capacity = [
min : configured.min,
max : configured.max,
desired: configured.desired
]
}

def instances = []
(1..healthy).each {
instances << [health: [[state: 'Up']]]
}

then:
result == task.hasSucceeded(
new Stage(Execution.newPipeline("orca"), "", "", context),
serverGroup, instances, null
)

where:
result || healthy | asg | configured
// scale down
false || 5 | [min: 3, max: 3, desired: 3] | null
true || 3 | [min: 3, max: 3, desired: 3] | null
false || 5 | [min: 10, max: 10, desired: 10] | [min: 3, max: 3, desired: 3]
true || 5 | [min: 10, max: 10, desired: 10] | [min: 5, max: 5, desired: 5]
// scale up
false || 5 | [min: 5, max: 5, desired: 5] | [min: 10, max: 10, desired: 10]
true || 3 | [min: 1, max: 1, desired: 1] | [min: 3, max: 3, desired: 3]
// asg value is used when autoscaling
true || 4 | [min: 3, max: 10, desired: 4] | [min: 1, max: 50, desired: 5]
}

private static Map makeInstance(id, healthState = 'Up') {
[instanceId: id, health: [ [ state: healthState ] ]]
}
Expand Down

0 comments on commit 5fe370d

Please sign in to comment.