Skip to content

Commit

Permalink
feat(disable): add an optional waitForDisabledServerGroup task (#3943)
Browse files Browse the repository at this point in the history
* feat(disable): add an optional waitForDisabledServerGroup task

When we disable a server group, we disable and also take instances down, but only wait for instances to become down. This means that it is possible to finish a DisableServerGroupStage successfully with down instances but a server group that does not yet appear disabled. This creates poor interactions, for instance if the stage that follows is a resize.
  • Loading branch information
dreynaud committed Oct 14, 2020
1 parent fe13853 commit 661094a
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.base.CaseFormat;
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService;
import org.springframework.core.env.Environment;

/**
* Provides convenience methods for stages that are aware of force cache refresh operations.
Expand All @@ -27,19 +28,28 @@
*/
public interface ForceCacheRefreshAware {

default boolean isForceCacheRefreshEnabled(DynamicConfigService dynamicConfigService) {
String className = getClass().getSimpleName();
default boolean isForceCacheRefreshEnabled(Environment environment) {
try {
return environment.getProperty(propertyName(), Boolean.class, true);
} catch (Exception e) {
return true;
}
}

default boolean isForceCacheRefreshEnabled(DynamicConfigService dynamicConfigService) {
try {
return dynamicConfigService.isEnabled(
String.format(
"stages.%s.force-cache-refresh",
CaseFormat.LOWER_CAMEL.to(
CaseFormat.LOWER_HYPHEN,
Character.toLowerCase(className.charAt(0)) + className.substring(1))),
true);
return dynamicConfigService.isEnabled(propertyName(), true);
} catch (Exception e) {
return true;
}
}

private String propertyName() {
String className = getClass().getSimpleName();
return String.format(
"stages.%s.force-cache-refresh.enabled",
CaseFormat.LOWER_CAMEL.to(
CaseFormat.LOWER_HYPHEN,
Character.toLowerCase(className.charAt(0)) + className.substring(1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class ModifyAwsScalingProcessStage extends TargetServerGroupLinearStageSupport i
throw new IllegalStateException("No server group found (serverGroupName: ${stageData.region}:${stageData.serverGroupName})")
}

def suspendedProcesses = getSuspendedProcesses(targetServerGroup.get())
def suspendedProcesses = targetServerGroup.get().getSuspendedProcesses()

def isComplete
if (stageData.isResume()) {
Expand All @@ -113,12 +113,6 @@ class ModifyAwsScalingProcessStage extends TargetServerGroupLinearStageSupport i
return isComplete ? TaskResult.ofStatus(ExecutionStatus.SUCCEEDED) : TaskResult.ofStatus(ExecutionStatus.RUNNING)
}

@CompileDynamic
static Collection<String> getSuspendedProcesses(TargetServerGroup targetServerGroup) {
def asgDetails = targetServerGroup.asg as Map
return asgDetails.suspendedProcesses*.processName
}

static class StageData {
String credentials
String serverGroupName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,25 @@ import com.netflix.spinnaker.orca.clouddriver.tasks.DetermineHealthProvidersTask
import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask
import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.DisableServerGroupTask
import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.ServerGroupCacheForceRefreshTask
import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.WaitForDisabledServerGroupTask
import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.WaitForRequiredInstancesDownTask
import com.netflix.spinnaker.orca.api.pipeline.graph.TaskNode
import groovy.transform.CompileStatic
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.core.env.Environment
import org.springframework.stereotype.Component

@Component
@CompileStatic
class DisableServerGroupStage extends TargetServerGroupLinearStageSupport implements ForceCacheRefreshAware {
public static final String TOGGLE = "stages.disable-server-group.wait-for-disabled.enabled"
static final String PIPELINE_CONFIG_TYPE = "disableServerGroup"

private final DynamicConfigService dynamicConfigService;
private final Environment environment

@Autowired
DisableServerGroupStage(DynamicConfigService dynamicConfigService) {
this.dynamicConfigService = dynamicConfigService;
DisableServerGroupStage(Environment environment) {
this.environment = environment
}

@Override
Expand All @@ -50,7 +53,11 @@ class DisableServerGroupStage extends TargetServerGroupLinearStageSupport implem
.withTask("monitorServerGroup", MonitorKatoTask)
.withTask("waitForDownInstances", WaitForRequiredInstancesDownTask)

if (isForceCacheRefreshEnabled(dynamicConfigService)) {
if (environment.getProperty(TOGGLE, Boolean, false)) {
builder.withTask("waitForServerGroupDisabled", WaitForDisabledServerGroupTask)
}

if (isForceCacheRefreshEnabled(environment)) {
builder.withTask("forceCacheRefresh", ServerGroupCacheForceRefreshTask)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class TargetServerGroup {
serverGroup = new HashMap(serverGroupData).asImmutable()
}

Collection<String> getSuspendedProcesses() {
def asgDetails = serverGroup.asg as Map
return asgDetails.suspendedProcesses*.processName
}

/**
* All invocations of this method should use the full 'getLocation()' signature, instead of the shorthand dot way
* (i.e. "serverGroup.location"). Otherwise, the property 'location' is looked for in the serverGroup map, which is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
package com.netflix.spinnaker.orca.clouddriver.tasks.servergroup

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.frigga.Names
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus
import com.netflix.spinnaker.orca.api.pipeline.RetryableTask
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
import com.netflix.spinnaker.orca.api.pipeline.TaskResult
import com.netflix.spinnaker.orca.clouddriver.OortService
import com.netflix.spinnaker.orca.clouddriver.tasks.AbstractCloudProviderAwareTask
import com.netflix.spinnaker.orca.clouddriver.utils.ClusterDescriptor
import com.netflix.spinnaker.orca.clouddriver.utils.ServerGroupDescriptor
import com.netflix.spinnaker.orca.retrofit.exceptions.RetrofitExceptionHandler
import groovy.util.logging.Slf4j
import org.springframework.beans.factory.annotation.Autowired
Expand All @@ -44,15 +45,13 @@ class WaitForDestroyedServerGroupTask extends AbstractCloudProviderAwareTask imp

@Override
TaskResult execute(StageExecution stage) {
String cloudProvider = getCloudProvider(stage)
String account = getCredentials(stage)
String serverGroupRegion = (stage.context.regions as Collection)?.getAt(0) ?: stage.context.region
String serverGroupName = (stage.context.serverGroupName ?: stage.context.asgName) as String // TODO: Retire asgName
Names names = Names.parseName(serverGroupName)
String appName = stage.context.moniker?.app ?: names.app
String clusterName = stage.context.moniker?.cluster ?: names.cluster
ClusterDescriptor clusterDescriptor = getClusterDescriptor(stage)
try {
def response = oortService.getCluster(appName, account, clusterName, cloudProvider)
def response = oortService.getCluster(
clusterDescriptor.app,
clusterDescriptor.account,
clusterDescriptor.name,
clusterDescriptor.cloudProvider)

if (response.status != 200) {
return TaskResult.ofStatus(ExecutionStatus.RUNNING)
Expand All @@ -62,12 +61,17 @@ class WaitForDestroyedServerGroupTask extends AbstractCloudProviderAwareTask imp
if (!cluster || !cluster.serverGroups) {
return TaskResult.builder(ExecutionStatus.SUCCEEDED).context([remainingInstances: []]).build()
}
def serverGroup = cluster.serverGroups.find { it.name == serverGroupName && it.region == serverGroupRegion }

ServerGroupDescriptor serverGroupDescriptor = getServerGroupDescriptor(stage)
def serverGroup = cluster.serverGroups.find {
it.name == serverGroupDescriptor.name && it.region == serverGroupDescriptor.region
}
if (!serverGroup) {
return TaskResult.builder(ExecutionStatus.SUCCEEDED).context([remainingInstances: []]).build()
}

def instances = serverGroup.instances ?: []
log.info("${serverGroupName}: not yet destroyed, found instances: ${instances?.join(', ') ?: 'none'}")
log.info("${serverGroupDescriptor.name}: not yet destroyed, found instances: ${instances?.join(', ') ?: 'none'}")
return TaskResult.builder(ExecutionStatus.RUNNING).context([remainingInstances: instances.findResults { it.name }]).build()
} catch (RetrofitError e) {
def retrofitErrorResponse = new RetrofitExceptionHandler().handle(stage.name, e)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.netflix.spinnaker.orca.clouddriver.tasks.servergroup;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spinnaker.orca.api.pipeline.RetryableTask;
import com.netflix.spinnaker.orca.api.pipeline.TaskResult;
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus;
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution;
import com.netflix.spinnaker.orca.clouddriver.OortService;
import com.netflix.spinnaker.orca.clouddriver.pipeline.servergroup.support.TargetServerGroup;
import com.netflix.spinnaker.orca.clouddriver.tasks.AbstractCloudProviderAwareTask;
import com.netflix.spinnaker.orca.clouddriver.utils.ServerGroupDescriptor;
import com.netflix.spinnaker.orca.retrofit.exceptions.RetrofitExceptionHandler;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import retrofit.RetrofitError;

@Component
@Slf4j
public class WaitForDisabledServerGroupTask extends AbstractCloudProviderAwareTask
implements RetryableTask {
private final OortService oortService;
private final ObjectMapper objectMapper;

@Autowired
WaitForDisabledServerGroupTask(OortService oortService, ObjectMapper objectMapper) {
this.oortService = oortService;
this.objectMapper = objectMapper;
}

@Override
public long getBackoffPeriod() {
return 10000;
}

@Override
public long getTimeout() {
return 1800000;
}

@NotNull
@Override
public TaskResult execute(@NotNull StageExecution stage) {
val serverGroupDescriptor = getServerGroupDescriptor(stage);
try {
var serverGroup = fetchServerGroup(serverGroupDescriptor);
return serverGroup.isDisabled() ? TaskResult.SUCCEEDED : TaskResult.RUNNING;
} catch (RetrofitError e) {
val retrofitErrorResponse = new RetrofitExceptionHandler().handle(stage.getName(), e);
log.error("Unexpected retrofit error {}", retrofitErrorResponse, e);
return TaskResult.builder(ExecutionStatus.RUNNING)
.context(Collections.singletonMap("lastRetrofitException", retrofitErrorResponse))
.build();
} catch (IOException e) {
log.error("Unexpected exception", e);
return TaskResult.builder(ExecutionStatus.RUNNING)
.context(Collections.singletonMap("lastException", e))
.build();
}
}

private TargetServerGroup fetchServerGroup(ServerGroupDescriptor serverGroupDescriptor)
throws IOException {
val response =
oortService.getServerGroup(
serverGroupDescriptor.getAccount(),
serverGroupDescriptor.getRegion(),
serverGroupDescriptor.getName());
var serverGroupData =
(Map<String, Object>) objectMapper.readValue(response.getBody().in(), Map.class);
return new TargetServerGroup(serverGroupData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ModifyAwsScalingProcessStageSpec extends Specification {
)

expect:
ModifyAwsScalingProcessStage.WaitForScalingProcess.getSuspendedProcesses(targetServerGroup) == suspendedProcesses
targetServerGroup.getSuspendedProcesses() == suspendedProcesses

where:
suspendedProcesses || _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ import com.netflix.spinnaker.orca.clouddriver.pipeline.servergroup.support.Deter
import com.netflix.spinnaker.orca.front50.pipeline.PipelineStage
import com.netflix.spinnaker.orca.kato.pipeline.support.ResizeStrategy
import com.netflix.spinnaker.orca.pipeline.WaitStage
import org.springframework.core.env.Environment
import spock.lang.Specification
import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.pipeline
import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.stage


class RollingRedBlackStrategySpec extends Specification {
def dynamicConfigService = Mock(DynamicConfigService)
def disableServerGroupStage = new DisableServerGroupStage(dynamicConfigService)
def environment = Mock(Environment)
def disableServerGroupStage = new DisableServerGroupStage(environment)
def scaleDownClusterStage = new ScaleDownClusterStage(dynamicConfigService)
def resizeServerGroupStage = new ResizeServerGroupStage()
def waitStage = new WaitStage()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ class WaitForDestroyedAsgTaskSpec extends Specification {

and:
def stage = new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", [
"regions": [region],
"region": region,
"asgName": asgName,
"account": "test"
])

def result = task.execute(stage)
Expand Down Expand Up @@ -101,9 +102,10 @@ class WaitForDestroyedAsgTaskSpec extends Specification {

and:
def stage = new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", [
"regions": ["us-east-1"],
"region": ["us-east-1"],
"asgName": "app-test-v000",
"remainingInstances": ["i-123"]
"remainingInstances": ["i-123"],
"account": "test"
])

def result = task.execute(stage)
Expand All @@ -128,9 +130,10 @@ class WaitForDestroyedAsgTaskSpec extends Specification {

and:
def stage = new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", [
"regions": ["us-east-1"],
"region": ["us-east-1"],
"asgName": "app-test-v000",
"remainingInstances": ['i-123', 'i-234', 'i-345']
"remainingInstances": ['i-123', 'i-234', 'i-345'],
"account": "test"
])

def result = task.execute(stage)
Expand Down
2 changes: 2 additions & 0 deletions orca-core/orca-core.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {
api("com.netflix.spinnaker.kork:kork-core")
api("com.netflix.spinnaker.kork:kork-exceptions")
api("com.netflix.spinnaker.kork:kork-expressions")
api("com.netflix.spinnaker.kork:kork-moniker")
api("com.netflix.spinnaker.kork:kork-plugins")
api("com.netflix.spinnaker.kork:kork-security")
api("com.netflix.spinnaker.kork:kork-telemetry")
Expand All @@ -42,6 +43,7 @@ dependencies {
implementation("com.fasterxml.jackson.core:jackson-databind")
implementation("cglib:cglib-nodep")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-guava")
implementation("com.netflix.frigga:frigga")
implementation("com.netflix.spectator:spectator-api")
implementation("com.netflix.spinnaker.kork:kork-core")
implementation("net.logstash.logback:logstash-logback-encoder")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
package com.netflix.spinnaker.orca.clouddriver.utils;

import com.google.common.collect.ImmutableList;
import com.netflix.frigga.Names;
import com.netflix.spinnaker.moniker.Moniker;
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.val;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -93,4 +97,33 @@ default boolean hasCloudProvider(@Nonnull StageExecution stage) {
default boolean hasCredentials(@Nonnull StageExecution stage) {
return getCredentials(stage) != null;
}

default @Nullable String getServerGroupName(StageExecution stage) {
String serverGroupName = (String) stage.getContext().get("serverGroupName");
return serverGroupName != null ? serverGroupName : (String) stage.getContext().get("asgName");
}

default @Nullable Moniker getMoniker(StageExecution stage) {
return (Moniker) stage.getContext().get("moniker");
}

default ClusterDescriptor getClusterDescriptor(StageExecution stage) {
String cloudProvider = getCloudProvider(stage);
String account = getCredentials(stage);

// Names accept a null String, and all its fields will be null which is fine in this context
Names namesFromServerGroup = Names.parseName(getServerGroupName(stage));
val moniker = Optional.ofNullable(getMoniker(stage));

String appName = moniker.isPresent() ? moniker.get().getApp() : namesFromServerGroup.getApp();
String clusterName =
moniker.isPresent() ? moniker.get().getCluster() : namesFromServerGroup.getCluster();
return new ClusterDescriptor(appName, account, clusterName, cloudProvider);
}

default ServerGroupDescriptor getServerGroupDescriptor(StageExecution stage) {
List<String> regions = getRegions(stage.getContext());
String region = regions.isEmpty() ? null : regions.get(0);
return new ServerGroupDescriptor(getCredentials(stage), getServerGroupName(stage), region);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.netflix.spinnaker.orca.clouddriver.utils

data class ClusterDescriptor(val app: String, val account: String, val name: String, val cloudProvider: String)
data class ServerGroupDescriptor(val account: String, val name: String, val region: String)

0 comments on commit 661094a

Please sign in to comment.