Skip to content

Commit

Permalink
fix(aws): Tightening direct SQS IAM perms in termination events (#1470)
Browse files Browse the repository at this point in the history
Sets direct SQS lifecycle event messages to only allow from the
configured lifecycle hook source ARNs, which is more correct
behavior than accepting anything from autoscaling.
  • Loading branch information
robzienert committed Mar 3, 2017
1 parent 6f6e9c7 commit ccc5c02
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 22 deletions.
Expand Up @@ -52,9 +52,10 @@
import javax.inject.Provider;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -124,13 +125,10 @@ public void run() {
AmazonSQS amazonSQS = amazonClientProvider.getAmazonSQS(queueARN.account, queueARN.region);
AmazonSNS amazonSNS = amazonClientProvider.getAmazonSNS(topicARN.account, topicARN.region);

List<String> allAccountIds = accountCredentialsProvider.getAll()
.stream()
.map(AccountCredentials::getAccountId)
.filter(a -> a != null)
.collect(Collectors.toList());
Set<? extends AccountCredentials> accountCredentials = accountCredentialsProvider.getAll();
List<String> allAccountIds = getAllAccountIds(accountCredentials);

this.queueId = ensureQueueExists(amazonSQS, queueARN, topicARN, allAccountIds);
this.queueId = ensureQueueExists(amazonSQS, queueARN, topicARN, allAccountIds, getSourceRoleArns(accountCredentials));
ensureTopicExists(amazonSNS, topicARN, allAccountIds, queueARN);

AtomicInteger messagesProcessed = new AtomicInteger(0);
Expand Down Expand Up @@ -222,7 +220,7 @@ private void handleMessage(LifecycleMessage message, Task task) {
description, task, "handleLifecycleMessage", DiscoveryStatus.Disable, instanceIds
);

recordLag(message.time, queueARN.region, message.ec2InstanceId);
recordLag(message.time, queueARN.region, message.accountId, message.autoScalingGroupName, message.ec2InstanceId);
}

private static void deleteMessage(AmazonSQS amazonSQS, String queueUrl, Message message) {
Expand Down Expand Up @@ -268,10 +266,10 @@ private static Policy buildSNSPolicy(ARN topicARN, List<String> allAccountIds) {
return new Policy("allow-remote-account-send", Collections.singletonList(statement));
}

private static String ensureQueueExists(AmazonSQS amazonSQS, ARN queueARN, ARN topicARN, List<String> allAccounts) {
private static String ensureQueueExists(AmazonSQS amazonSQS, ARN queueARN, ARN topicARN, List<String> allAccounts, Set<String> terminatingRoleArns) {
String queueUrl = amazonSQS.createQueue(queueARN.name).getQueueUrl();
amazonSQS.setQueueAttributes(
queueUrl, Collections.singletonMap("Policy", buildSQSPolicy(queueARN, topicARN, allAccounts).toJson())
queueUrl, Collections.singletonMap("Policy", buildSQSPolicy(queueARN, topicARN, allAccounts, terminatingRoleArns).toJson())
);

return queueUrl;
Expand All @@ -281,34 +279,69 @@ private static String ensureQueueExists(AmazonSQS amazonSQS, ARN queueARN, ARN t
* This policy allows operators to choose whether or not to have lifecycle hooks to be sent via SNS for fanout, or
* be sent directly to an SQS queue from the autoscaling group.
*/
private static Policy buildSQSPolicy(ARN queue, ARN topic, List<String> allAccounts) {
private static Policy buildSQSPolicy(ARN queue, ARN topic, List<String> allAccounts, Set<String> terminatingRoleArns) {
Statement snsStatement = new Statement(Effect.Allow).withActions(SQSActions.SendMessage);
snsStatement.setPrincipals(Principal.All);
snsStatement.setResources(Collections.singletonList(new Resource(queue.arn)));
snsStatement.setConditions(Collections.singletonList(
new Condition().withType("ArnEquals").withConditionKey("aws:SourceArn").withValues(topic.arn)
));

Statement sqsStatement = new Statement(Effect.Allow).withActions(SQSActions.SendMessage, SQSActions.GetQueueUrl);
sqsStatement.setPrincipals(allAccounts.stream().map(Principal::new).collect(Collectors.toSet()));
sqsStatement.setResources(Collections.singletonList(new Resource(queue.arn)));
sqsStatement.setConditions(Collections.singletonList(
new Condition().withType("ArnLike").withConditionKey("aws:SourceArn").withValues("arn:aws:autoscaling:*:*:autoscalingGroup:*:*")
));
Set<Principal> allAccountPrincipals = allAccounts.stream().map(Principal::new).collect(Collectors.toSet());

List<Statement> statements = new ArrayList<>(Collections.singletonList(snsStatement));
for (String arnMatcher : terminatingRoleArns) {
Statement lifecycleStatement = new Statement(Effect.Allow).withActions(SQSActions.SendMessage, SQSActions.GetQueueUrl);
lifecycleStatement.setPrincipals(allAccountPrincipals);
lifecycleStatement.setResources(Collections.singletonList(new Resource(queue.arn)));
lifecycleStatement.setConditions(Collections.singletonList(
new Condition().withType("ArnLike").withConditionKey("aws:SourceArn").withValues(arnMatcher)
));
statements.add(lifecycleStatement);
}

return new Policy("allow-sns-or-sqs-send", Arrays.asList(snsStatement, sqsStatement));
return new Policy("allow-sns-or-sqs-send", statements);
}

Id getLagMetricId(String region) {
return registry.createId("terminationLifecycle.lag", "region", region);
}

void recordLag(Date start, String region, String instanceId) {
void recordLag(Date start, String region, String account, String serverGroup, String instanceId) {
if (start != null) {
Long lag = registry.clock().wallTime() - start.getTime();
log.info("Lifecycle message processed (instance: {}, lagSeconds: {})", instanceId, Duration.ofMillis(lag).getSeconds());
log.info("Lifecycle message processed (account: {}, serverGroup: {}, instance: {}, lagSeconds: {})", account, serverGroup, instanceId, Duration.ofMillis(lag).getSeconds());
registry.gauge(getLagMetricId(region), lag);
}
}

private static List<String> getAllAccountIds(Set<? extends AccountCredentials> accountCredentials) {
return accountCredentials
.stream()
.map(AccountCredentials::getAccountId)
.filter(a -> a != null)
.collect(Collectors.toList());
}

private static <T extends AccountCredentials> Set<String> getSourceRoleArns(Set<T> allCredentials) {
Set<String> sourceRoleArns = new HashSet<>();
for (T credentials : allCredentials) {
if (credentials instanceof NetflixAmazonCredentials) {
NetflixAmazonCredentials c = (NetflixAmazonCredentials) credentials;
if (c.getLifecycleHooks() != null) {
sourceRoleArns.addAll(c.getLifecycleHooks()
.stream()
.filter(h -> "autoscaling:EC2_INSTANCE_TERMINATING".equals(h.getLifecycleTransition()))
.map(h -> convertRoleArnToIamConditionalMatcher(h.getRoleARN()))
.collect(Collectors.toSet()));
}
}
}
return sourceRoleArns;
}

private static String convertRoleArnToIamConditionalMatcher(String roleArn) {
String[] roleName = roleArn.split(":");
return "arn:aws:iam::*:" + roleName[roleName.length-1];
}
}
Expand Up @@ -101,15 +101,15 @@ class InstanceTerminationLifecycleAgentSpec extends Specification {

def 'should create queue if it does not exist'() {
when:
def queueId = InstanceTerminationLifecycleAgent.ensureQueueExists(amazonSQS, queueARN, topicARN, ['1234'])
def queueId = InstanceTerminationLifecycleAgent.ensureQueueExists(amazonSQS, queueARN, topicARN, ['1234'], [] as Set<String>)

then:
queueId == "my-queue-url"

1 * amazonSQS.createQueue(queueARN.name) >> { new CreateQueueResult().withQueueUrl("my-queue-url") }

1 * amazonSQS.setQueueAttributes("my-queue-url", [
"Policy": InstanceTerminationLifecycleAgent.buildSQSPolicy(queueARN, topicARN, ['1234']).toJson()
"Policy": InstanceTerminationLifecycleAgent.buildSQSPolicy(queueARN, topicARN, ['1234'], [] as Set<String>).toJson()
])
0 * _
}
Expand Down Expand Up @@ -176,4 +176,35 @@ class InstanceTerminationLifecycleAgentSpec extends Specification {
result.lifecycleTransition == lifecycleMessage.lifecycleTransition
}

def 'should build sqs policy supporting both sns and direct notifications'() {
given:
def allAccountIds = ['100', '200']
Set<String> terminatingRoleArns = ['arn:aws:iam::*:role/terminatingRole']

when:
def result = subject.buildSQSPolicy(queueARN, topicARN, allAccountIds, terminatingRoleArns)

then:
result.statements.size() == 2

// sns fanout
result.statements[0].with {
it.principals*.id == ['*']
it.actions*.actionName == ['SendMessage']
it.resources*.id == ['arn:aws:sqs:us-west-2:100:queueName']
it.conditions*.type == ['ArnEquals']
it.conditions*.conditionKey == ['aws:SourceArn']
it.conditions*.values == [['arn:aws:sns:us-west-2:100:topicName']]
}

// direct sqs
result.statements[1].with {
it.principals*.id == ['100', '200']
it.actions*.actionName == ['SendMessage, GetQueueUrl']
it.resources*.id == ['arn:aws:sqs:us-west-2:100:queueName']
it.conditions*.type == ['ArnLike']
it.conditions*.conditionKey == ['aws:SourceArn']
it.conditions*.values == [['arn:aws:iam::*:role/terminatingRole']]
}
}
}

0 comments on commit ccc5c02

Please sign in to comment.