Skip to content

Commit

Permalink
feat(aws): Support for pinning min=desired capacity on source server …
Browse files Browse the repository at this point in the history
…group (#1997)

This PR attempts to avoid unnecessary and problematic autoscale events
that occur as the total # of instances increase when a deploy is
happening.

As instance count increases, there is natural downward pressure
on the cluster resulting in the source server group being scaled down.

It is particularly bad for rolling red/black where the old server group
is not immediately disabled.

The expectation is that the source group will be returned to its
original unpinned capacity when the deploy completes (or fails!).
  • Loading branch information
ajordens committed Feb 20, 2018
1 parent ac478d0 commit 07c9765
Show file tree
Hide file tree
Showing 20 changed files with 391 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,141 @@

package com.netflix.spinnaker.orca.clouddriver.pipeline.providers.aws

import com.netflix.spinnaker.orca.clouddriver.pipeline.servergroup.ResizeServerGroupStage
import com.netflix.spinnaker.orca.clouddriver.pipeline.servergroup.strategies.AbstractDeployStrategyStage
import com.netflix.spinnaker.orca.clouddriver.pipeline.servergroup.strategies.DeployStagePreProcessor
import com.netflix.spinnaker.orca.clouddriver.pipeline.servergroup.support.TargetServerGroupResolver
import com.netflix.spinnaker.orca.kato.pipeline.support.ResizeStrategy
import com.netflix.spinnaker.orca.kato.pipeline.support.StageData
import com.netflix.spinnaker.orca.pipeline.model.Stage
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component

import static com.netflix.spinnaker.orca.kato.pipeline.support.ResizeStrategySupport.getSource

@Component
class AwsDeployStagePreProcessor implements DeployStagePreProcessor {
@Autowired
ApplySourceServerGroupCapacityStage applySourceServerGroupSnapshotStage

@Autowired
ResizeServerGroupStage resizeServerGroupStage

@Autowired
TargetServerGroupResolver targetServerGroupResolver

@Override
List<DeployStagePreProcessor.StepDefinition> additionalSteps() {
List<StepDefinition> additionalSteps(Stage stage) {
def stageData = stage.mapTo(StageData)
if (stageData.strategy == "rollingredblack") {
// rolling red/black has no need to snapshot capacities
return []
}

return [
new DeployStagePreProcessor.StepDefinition(
name: "snapshotSourceServerGroup",
taskClass: CaptureSourceServerGroupCapacityTask
)
new StepDefinition(
name: "snapshotSourceServerGroup",
taskClass: CaptureSourceServerGroupCapacityTask
)
]
}

@Override
List<DeployStagePreProcessor.StageDefinition> afterStageDefinitions() {
return [
new DeployStagePreProcessor.StageDefinition(
name: "restoreMinCapacityFromSnapshot",
stageDefinitionBuilder: applySourceServerGroupSnapshotStage,
context: [:]
List<StageDefinition> beforeStageDefinitions(Stage stage) {
def stageData = stage.mapTo(StageData)
if (shouldPinSourceServerGroup(stageData.strategy)) {
def resizeContext = getResizeContext(stageData)
resizeContext.pinMinimumCapacity = true

return [
new StageDefinition(
name: "Pin ${resizeContext.serverGroupName}".toString(),
stageDefinitionBuilder: resizeServerGroupStage,
context: resizeContext
)
]
]
}

return []
}

@Override
List<StageDefinition> afterStageDefinitions(Stage stage) {
def stageData = stage.mapTo(StageData)
def stageDefinitions = []
if (stageData.strategy != "rollingredblack") {
// rolling red/black has no need to apply a snapshotted capacity (on the newly created server group)
stageDefinitions << new StageDefinition(
name: "restoreMinCapacityFromSnapshot",
stageDefinitionBuilder: applySourceServerGroupSnapshotStage,
context: [:]
)
}

def unpinServerGroupStage = buildUnpinServerGroupStage(stageData)
if (unpinServerGroupStage) {
stageDefinitions << unpinServerGroupStage
}

return stageDefinitions
}

@Override
List<StageDefinition> onFailureStageDefinitions(Stage stage) {
def stageData = stage.mapTo(StageData)
def stageDefinitions = []

def unpinServerGroupStage = buildUnpinServerGroupStage(stageData)
if (unpinServerGroupStage) {
stageDefinitions << unpinServerGroupStage
}

return stageDefinitions
}

@Override
boolean supports(Stage stage) {
def stageData = stage.mapTo(StageData)
return stageData.useSourceCapacity && stageData.cloudProvider == "aws" && stageData.strategy != "rollingredblack"
return stageData.cloudProvider == "aws" // && stageData.useSourceCapacity
}

private static boolean shouldPinSourceServerGroup(String strategy) {
// TODO-AJ consciously only enabling for rolling red/black -- will add support for other strategies after it's working
return strategy == "rollingredblack"
}

private Map<String, Object> getResizeContext(StageData stageData) {
def cleanupConfig = AbstractDeployStrategyStage.CleanupConfig.fromStage(stageData)
def baseContext = [
(cleanupConfig.location.singularType()): cleanupConfig.location.value,
cluster : cleanupConfig.cluster,
moniker : cleanupConfig.moniker,
credentials : cleanupConfig.account,
cloudProvider : cleanupConfig.cloudProvider,
]

def source = getSource(targetServerGroupResolver, stageData, baseContext)
baseContext.putAll([
serverGroupName : source.serverGroupName,
action : ResizeStrategy.ResizeAction.scale_to_server_group,
source : source,
useNameAsLabel : true // hint to deck that it should _not_ override the name
])
return baseContext
}

private StageDefinition buildUnpinServerGroupStage(StageData stageData) {
if (!shouldPinSourceServerGroup(stageData.strategy)) {
return null;
}

def resizeContext = getResizeContext(stageData)
resizeContext.unpinMinimumCapacity = true

return new StageDefinition(
name: "Unpin ${resizeContext.serverGroupName}".toString(),
stageDefinitionBuilder: resizeServerGroupStage,
context: resizeContext
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,36 +74,36 @@ class CreateServerGroupStage extends AbstractDeployStrategyStage {

@Override
List<Stage> onFailureStages(@Nonnull Stage stage) {
def stageData = stage.mapTo(StageData)
def onFailureStages = super.onFailureStages(stage)

def stageData = stage.mapTo(StageData)
if (!stageData.rollback?.onFailure) {
// rollback on failure is not enabled
return []
return onFailureStages
}

if (!stageData.getServerGroup()) {
// did not get far enough to create a new server group
log.warn("No server group was created, skipping rollback! (executionId: ${stage.execution.id}, stageId: ${stage.id})")

return []
return onFailureStages
}

return [
newStage(
stage.execution,
rollbackClusterStage.type,
"Rollback ${stageData.getCluster()}",
[
"credentials" : stageData.getCredentials(),
"cloudProvider" : stageData.getCloudProvider(),
"regions" : [stageData.getRegion()],
"serverGroup" : stageData.getServerGroup(),
"stageTimeoutMs": TimeUnit.MINUTES.toMillis(30) // timebox a rollback to 30 minutes
],
stage,
SyntheticStageOwner.STAGE_AFTER
)
]
onFailureStages << newStage(
stage.execution,
rollbackClusterStage.type,
"Rollback ${stageData.getCluster()}",
[
"credentials" : stageData.getCredentials(),
"cloudProvider" : stageData.getCloudProvider(),
"regions" : [stageData.getRegion()],
"serverGroup" : stageData.getServerGroup(),
"stageTimeoutMs": TimeUnit.MINUTES.toMillis(30) // timebox a rollback to 30 minutes
],
stage,
SyntheticStageOwner.STAGE_AFTER
)

return onFailureStages
}

private static class StageData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ abstract class AbstractDeployStrategyStage extends AbstractCloudProviderAwareSta

correctContext(stage)
deployStagePreProcessors.findAll { it.supports(stage) }.each {
it.additionalSteps().each {
it.additionalSteps(stage).each {
builder.withTask(it.name, it.taskClass)
}
}
Expand Down Expand Up @@ -97,15 +97,17 @@ abstract class AbstractDeployStrategyStage extends AbstractCloudProviderAwareSta
Strategy strategy = (Strategy) strategies.findResult(noStrategy, {
it.name.equalsIgnoreCase(stage.context.strategy) ? it : null
})

def preProcessors = deployStagePreProcessors.findAll { it.supports(stage) }
def stages = strategy.composeFlow(stage)

def stageData = stage.mapTo(StageData)
deployStagePreProcessors.findAll { it.supports(stage) }.each {
preProcessors.each {
def defaultContext = [
credentials : stageData.account,
cloudProvider: stageData.cloudProvider
]
it.beforeStageDefinitions().each {
it.beforeStageDefinitions(stage).each {
stages << newStage(
stage.execution,
it.stageDefinitionBuilder.type,
Expand All @@ -115,7 +117,7 @@ abstract class AbstractDeployStrategyStage extends AbstractCloudProviderAwareSta
SyntheticStageOwner.STAGE_BEFORE
)
}
it.afterStageDefinitions().each {
it.afterStageDefinitions(stage).each {
stages << newStage(
stage.execution,
it.stageDefinitionBuilder.type,
Expand All @@ -130,6 +132,23 @@ abstract class AbstractDeployStrategyStage extends AbstractCloudProviderAwareSta
return stages
}

@Override
List<Stage> onFailureStages(Stage stage) {
return deployStagePreProcessors.findAll { it.supports(stage) }
.collect { it.onFailureStageDefinitions(stage) }
.flatten()
.collect { DeployStagePreProcessor.StageDefinition stageDefinition ->
newStage(
stage.execution,
stageDefinition.stageDefinitionBuilder.type,
stageDefinition.name,
stageDefinition.context,
stage,
SyntheticStageOwner.STAGE_AFTER
)
}
}

/**
* This nasty method is here because of an unfortunate misstep in pipeline configuration that introduced a nested
* "cluster" key, when in reality we want all of the parameters to be derived from the top level. To preserve
Expand Down Expand Up @@ -160,7 +179,10 @@ abstract class AbstractDeployStrategyStage extends AbstractCloudProviderAwareSta
Location location

static CleanupConfig fromStage(Stage stage) {
def stageData = stage.mapTo(StageData)
return fromStage(stage.mapTo(StageData))
}

static CleanupConfig fromStage(StageData stageData) {
def loc = TargetServerGroup.Support.locationFromStageData(stageData)
new CleanupConfig(
account: stageData.account,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@
public interface DeployStagePreProcessor {
boolean supports(Stage stage);

default List<StepDefinition> additionalSteps() {
default List<StepDefinition> additionalSteps(Stage stage) {
return Collections.emptyList();
}

default List<StageDefinition> beforeStageDefinitions() {
default List<StageDefinition> beforeStageDefinitions(Stage stage) {
return Collections.emptyList();
}

default List<StageDefinition> afterStageDefinitions() {
default List<StageDefinition> afterStageDefinitions(Stage stage) {
return Collections.emptyList();
}

default List<StageDefinition> onFailureStageDefinitions(Stage stage) {
return Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.springframework.context.ApplicationContextAware
import org.springframework.stereotype.Component
import static com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilder.newStage

import static com.netflix.spinnaker.orca.kato.pipeline.support.ResizeStrategySupport.*

@Slf4j
@Component
class RollingRedBlackStrategy implements Strategy, ApplicationContextAware {
Expand Down Expand Up @@ -89,7 +91,6 @@ class RollingRedBlackStrategy implements Strategy, ApplicationContextAware {

def targetPercentages = stageData.getTargetPercentages()
if (targetPercentages.size() == 0 || targetPercentages[-1] != 100) {
log.info("Inserting implicit 100% final target percentage...")
targetPercentages.add(100)
}

Expand All @@ -107,17 +108,19 @@ class RollingRedBlackStrategy implements Strategy, ApplicationContextAware {
SyntheticStageOwner.STAGE_AFTER
)

def source = getSource(targetServerGroupResolver, stageData, baseContext)

// java .forEach rather than groovy .each, since the nested .each closure sometimes omits parent context
targetPercentages.forEach({ p ->
def source = getSource(targetServerGroupResolver, stageData, baseContext)
def resizeContext = baseContext + [
target : TargetServerGroup.Params.Target.current_asg_dynamic,
action : ResizeStrategy.ResizeAction.scale_to_server_group,
source : source,
targetLocation: cleanupConfig.location,
scalePct : p,
pinCapacity : p < 100, // if p = 100, capacity should be unpinned,
useNameAsLabel: true // hint to deck that it should _not_ override the name
target : TargetServerGroup.Params.Target.current_asg_dynamic,
action : ResizeStrategy.ResizeAction.scale_to_server_group,
source : source,
targetLocation : cleanupConfig.location,
scalePct : p,
pinCapacity : p < 100, // if p < 100, capacity should be pinned (min == max == desired)
unpinMinimumCapacity: p == 100, // if p == 100, min capacity should be restored to the original unpinned value from source
useNameAsLabel : true // hint to deck that it should _not_ override the name
]

def resizeStage = newStage(
Expand Down Expand Up @@ -208,34 +211,5 @@ class RollingRedBlackStrategy implements Strategy, ApplicationContextAware {
return stages
}

static ResizeStrategy.Source getSource(TargetServerGroupResolver targetServerGroupResolver,
RollingRedBlackStageData stageData,
Map baseContext) {
if (stageData.source) {
return new ResizeStrategy.Source(
region: stageData.source.region,
serverGroupName: stageData.source.serverGroupName ?: stageData.source.asgName,
credentials: stageData.credentials ?: stageData.account,
cloudProvider: stageData.cloudProvider
)
}

// no source server group specified, lookup current server group
TargetServerGroup target = targetServerGroupResolver.resolve(
new Stage(null, null, null, baseContext + [target: TargetServerGroup.Params.Target.current_asg_dynamic])
)?.get(0)

if (!target) {
throw new IllegalStateException("No target server groups found (${baseContext})")
}

return new ResizeStrategy.Source(
region: target.getLocation().value,
serverGroupName: target.getName(),
credentials: stageData.credentials ?: stageData.account,
cloudProvider: stageData.cloudProvider
)
}

ApplicationContext applicationContext
}
Loading

0 comments on commit 07c9765

Please sign in to comment.