Skip to content

Commit

Permalink
perf(aws): Simplifying termination lifecyle to not lookup app info fi…
Browse files Browse the repository at this point in the history
…rst (#1517)

After talking to cloudperf, we're mostly interested in processing messages
as quick as possible. We don't really care if discovery knows about the
application or instance, so we'll only retry on network errors, otherwise,
we'll just move onto the next message.
  • Loading branch information
robzienert committed Mar 16, 2017
1 parent 36f1091 commit cce19b8
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 62 deletions.
Expand Up @@ -27,7 +27,6 @@ class InstanceTerminationConfigurationProperties {
int visibilityTimeout = 30
int waitTimeSeconds = 5
int sqsMessageRetentionPeriodSeconds = 120
int eurekaFindApplicationRetryMax = 3
int eurekaUpdateStatusRetryMax = 3

InstanceTerminationConfigurationProperties() {
Expand All @@ -40,15 +39,13 @@ class InstanceTerminationConfigurationProperties {
int visibilityTimeout,
int waitTimeSeconds,
int sqsMessageRetentionPeriodSeconds,
int eurekaFindApplicationRetryMax,
int eurekaUpdateStatusRetryMax) {
this.accountName = accountName
this.queueARN = queueARN
this.topicARN = topicARN
this.visibilityTimeout = visibilityTimeout
this.waitTimeSeconds = waitTimeSeconds
this.sqsMessageRetentionPeriodSeconds = sqsMessageRetentionPeriodSeconds
this.eurekaFindApplicationRetryMax = eurekaFindApplicationRetryMax
this.eurekaUpdateStatusRetryMax = eurekaUpdateStatusRetryMax
}
}
Expand Up @@ -31,21 +31,21 @@
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.frigga.Names;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.clouddriver.aws.deploy.description.EnableDisableInstanceDiscoveryDescription;
import com.netflix.spinnaker.clouddriver.aws.deploy.ops.discovery.AwsEurekaSupport;
import com.netflix.spinnaker.clouddriver.aws.security.AmazonClientProvider;
import com.netflix.spinnaker.clouddriver.aws.security.AmazonCredentials.LifecycleHook;
import com.netflix.spinnaker.clouddriver.aws.security.NetflixAmazonCredentials;
import com.netflix.spinnaker.clouddriver.data.task.DefaultTask;
import com.netflix.spinnaker.clouddriver.data.task.Task;
import com.netflix.spinnaker.clouddriver.data.task.TaskRepository;
import com.netflix.spinnaker.clouddriver.eureka.api.Eureka;
import com.netflix.spinnaker.clouddriver.eureka.deploy.ops.AbstractEurekaSupport.DiscoveryStatus;
import com.netflix.spinnaker.clouddriver.security.AccountCredentials;
import com.netflix.spinnaker.clouddriver.security.AccountCredentialsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import retrofit.RetrofitError;
import retrofit.RetrofitError.Kind;

import javax.inject.Provider;
import java.io.IOException;
Expand All @@ -56,9 +56,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class InstanceTerminationLifecycleWorker implements Runnable {
Expand Down Expand Up @@ -140,36 +138,21 @@ amazonSQS, queueARN, topicARN, getSourceRoleArns(accountCredentials), properties
continue;
}

AtomicInteger messagesProcessed = new AtomicInteger(0);
AtomicInteger messagesSkipped = new AtomicInteger(0);
receiveMessageResult.getMessages().forEach(message -> {
LifecycleMessage lifecycleMessage = unmarshalLifecycleMessage(message.getBody());

if (lifecycleMessage != null) {
if (!SUPPORTED_LIFECYCLE_TRANSITION.equalsIgnoreCase(lifecycleMessage.lifecycleTransition)) {
log.info("Ignoring unsupported lifecycle transition: " + lifecycleMessage.lifecycleTransition);
deleteMessage(amazonSQS, queueId, message);
messagesSkipped.incrementAndGet();
return;
}

Task originalTask = TaskRepository.threadLocalTask.get();
try {
TaskRepository.threadLocalTask.set(
Optional.ofNullable(originalTask).orElse(new DefaultTask(InstanceTerminationLifecycleWorker.class.getSimpleName()))
);
handleMessage(lifecycleMessage, TaskRepository.threadLocalTask.get());
} finally {
TaskRepository.threadLocalTask.set(originalTask);
}
} else {
messagesSkipped.incrementAndGet();
handleMessage(lifecycleMessage);
}

deleteMessage(amazonSQS, queueId, message);
messagesProcessed.incrementAndGet();
registry.counter(getProcessedMetricId(queueARN.region)).increment();
});
log.info("Processed {} messages, {} skipped (queueARN: {})", messagesProcessed.get(), messagesSkipped.get(), queueARN.arn);
}
}

Expand All @@ -196,29 +179,39 @@ private LifecycleMessage unmarshalLifecycleMessage(String messageBody) {
return lifecycleMessage;
}

private void handleMessage(LifecycleMessage message, Task task) {
List<String> instanceIds = Collections.singletonList(message.ec2InstanceId);

private void handleMessage(LifecycleMessage message) {
NetflixAmazonCredentials credentials = getAccountCredentialsById(message.accountId);
if (credentials == null) {
log.error("Unable to find credentials for account id: {}", message.accountId);
return;
}

EnableDisableInstanceDiscoveryDescription description = new EnableDisableInstanceDiscoveryDescription();
description.setCredentials(credentials);
description.setRegion(queueARN.region);
description.setAsgName(message.autoScalingGroupName);
description.setInstanceIds(instanceIds);

discoverySupport.get().updateDiscoveryStatusForInstances(
description, task, "handleLifecycleMessage", DiscoveryStatus.Disable, instanceIds,
properties.getEurekaFindApplicationRetryMax(), properties.getEurekaUpdateStatusRetryMax()
);
Names names = Names.parseName(message.autoScalingGroupName);
Eureka eureka = discoverySupport.get().getEureka(credentials, queueARN.region);

if (!updateInstanceStatus(eureka, names.getApp(), message.ec2InstanceId)) {
registry.counter(getFailedMetricId(queueARN.region)).increment();
}
recordLag(message.time, queueARN.region, message.accountId, message.autoScalingGroupName, message.ec2InstanceId);
}

private boolean updateInstanceStatus(Eureka eureka, String app, String instanceId) {
int retry = 0;
while (retry < properties.getEurekaUpdateStatusRetryMax()) {
retry++;
try {
eureka.updateInstanceStatus(app, instanceId, DiscoveryStatus.Disable.getValue());
return true;
} catch (RetrofitError e) {
log.warn(String.format("Failed marking app out of service (status: %s, app: %s, instance: %s, retry: %d)", e.getResponse().getStatus(), app, instanceId, retry), e);
if (e.getKind() != Kind.NETWORK) {
return false;
}
}
}
return false;
}

private static void deleteMessage(AmazonSQS amazonSQS, String queueUrl, Message message) {
try {
amazonSQS.deleteMessage(queueUrl, message.getReceiptHandle());
Expand Down Expand Up @@ -311,6 +304,14 @@ void recordLag(Date start, String region, String account, String serverGroup, St
}
}

Id getProcessedMetricId(String region) {
return registry.createId("terminationLifecycle.totalProcessed", "region", region);
}

Id getFailedMetricId(String region) {
return registry.createId("terminationLifecycle.totalFailed", "region", region);
}

private static List<String> getAllAccountIds(Set<? extends AccountCredentials> accountCredentials) {
return accountCredentials
.stream()
Expand Down
Expand Up @@ -88,7 +88,6 @@ public void start() {
properties.getVisibilityTimeout(),
properties.getWaitTimeSeconds(),
properties.getSqsMessageRetentionPeriodSeconds(),
properties.getEurekaFindApplicationRetryMax(),
properties.getEurekaUpdateStatusRetryMax()
),
discoverySupport,
Expand Down
@@ -1,8 +1,6 @@
package com.netflix.spinnaker.clouddriver.aws.lifecycle

import groovy.transform.Canonical
import org.springframework.boot.context.properties.ConfigurationProperties

/*
* Copyright 2017 Netflix, Inc.
*
Expand Down
Expand Up @@ -21,15 +21,18 @@ import com.amazonaws.services.sns.model.SetTopicAttributesRequest
import com.amazonaws.services.sqs.AmazonSQS
import com.amazonaws.services.sqs.model.CreateQueueResult
import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.Counter
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.clouddriver.aws.deploy.description.EnableDisableInstanceDiscoveryDescription
import com.netflix.spinnaker.clouddriver.aws.deploy.ops.discovery.AwsEurekaSupport
import com.netflix.spinnaker.clouddriver.aws.security.AmazonClientProvider
import com.netflix.spinnaker.clouddriver.aws.security.NetflixAmazonCredentials
import com.netflix.spinnaker.clouddriver.data.task.DefaultTask
import com.netflix.spinnaker.clouddriver.data.task.Task
import com.netflix.spinnaker.clouddriver.eureka.api.Eureka
import com.netflix.spinnaker.clouddriver.eureka.deploy.ops.AbstractEurekaSupport.DiscoveryStatus
import com.netflix.spinnaker.clouddriver.security.AccountCredentialsProvider
import retrofit.RetrofitError
import retrofit.RetrofitError.Kind
import retrofit.client.Response
import retrofit.converter.Converter
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Subject
Expand All @@ -54,6 +57,7 @@ class InstanceTerminationLifecycleWorkerSpec extends Specification {
}
Provider<AwsEurekaSupport> awsEurekaSupportProvider = Mock()
AwsEurekaSupport awsEurekaSupport = Mock()
Eureka eureka = Mock()
Registry registry = Mock()

def queueARN = new ARN([mgmtCredentials, testCredentials], "arn:aws:sqs:us-west-2:100:queueName")
Expand All @@ -74,8 +78,7 @@ class InstanceTerminationLifecycleWorkerSpec extends Specification {
-1,
-1,
-1,
-1,
-1
3
),
awsEurekaSupportProvider,
registry
Expand Down Expand Up @@ -127,25 +130,13 @@ class InstanceTerminationLifecycleWorkerSpec extends Specification {
)

when:
subject.handleMessage(message, Mock(DefaultTask))
subject.handleMessage(message)

then:
1 * accountCredentialsProvider.getAll() >> [mgmtCredentials, testCredentials]
1 * awsEurekaSupportProvider.get() >> awsEurekaSupport
1 * awsEurekaSupport.updateDiscoveryStatusForInstances(
{ EnableDisableInstanceDiscoveryDescription arg ->
arg.credentials == testCredentials
arg.region == 'us-west-2'
arg.asgName == 'clouddriver-main-v000'
arg.instanceIds == ['i-1234']
},
_ as Task,
'handleLifecycleMessage',
DiscoveryStatus.Disable,
['i-1234'],
-1,
-1
)
1 * awsEurekaSupport.getEureka(_, 'us-west-2') >> eureka
1 * eureka.updateInstanceStatus('clouddriver', 'i-1234', DiscoveryStatus.Disable.value)
}

def 'should process both sns and sqs messages'() {
Expand Down Expand Up @@ -211,4 +202,20 @@ class InstanceTerminationLifecycleWorkerSpec extends Specification {
it.resources*.id == ['arn:aws:sqs:us-west-2:100:queueName']
}
}

def 'should retry on network errors'() {
given:
subject.queueARN >> Mock(ARN)
subject.registry.counter(_) >> Mock(Counter)

when:
subject.updateInstanceStatus(eureka, 'foo', 'i-1234')

then:
1 * eureka.updateInstanceStatus(_, _, _) >> {
throw new RetrofitError("cannot connect", "http://discovery", new Response("http://discovery", 400, "reason", [], null), Mock(Converter), String, Kind.NETWORK, Mock(Throwable))
}
1 * eureka.updateInstanceStatus(_, _, _)
0 * eureka.updateInstanceStatus(_, _, _)
}
}

0 comments on commit cce19b8

Please sign in to comment.