diff --git a/ardere/aws.py b/ardere/aws.py index 3817221..8915ff2 100644 --- a/ardere/aws.py +++ b/ardere/aws.py @@ -8,7 +8,7 @@ import boto3 import botocore from concurrent.futures import ThreadPoolExecutor -from typing import Any, Dict, List # noqa +from typing import Any, Dict, List, Optional # noqa logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -181,21 +181,20 @@ def family_name(self, step): def metrics_family_name(self): return "{}-metrics".format(self._ecs_name) - def query_active_instances(self): - # type: () -> Dict[str, int] + def query_active_instances(self, additional_tags=None): + # type: (Optional[Dict[str, str]]) -> Dict[str, int] """Query EC2 for all the instances owned by ardere for this cluster.""" instance_dict = defaultdict(int) paginator = self._ec2_client.get_paginator('describe_instances') + filters = {"Owner": "ardere", "ECSCluster": self._ecs_name} + if additional_tags: + filters.update(additional_tags) response_iterator = paginator.paginate( Filters=[ { - "Name": "tag:Owner", - "Values": ["ardere"] - }, - { - "Name": "tag:ECSCluster", - "Values": [self._ecs_name] - } + "Name": "tag:{}".format(tag_name), + "Values": [tag_value] + } for tag_name, tag_value in filters.items() ] ) for page in response_iterator: @@ -218,10 +217,23 @@ def calculate_missing_instances(self, desired, current): needed[instance_type] = instance_count - cur return needed - def request_instances(self, instances): - # type: (Dict[str, int]) -> None + def has_metrics_node(self, instance_type): + # type: (str) -> bool + """Return whether a metrics node with this instance type exists""" + instances = self.query_active_instances( + additional_tags=dict(Role="metrics") + ) + return instance_type in instances + + def request_instances(self, instances, security_group_ids, + additional_tags=None): + # type: (Dict[str, int], List[str], Optional[Dict[str, str]]) -> None """Create requested types/quantities of instances for this cluster""" ami_id = self.ecs_ami_ids["us-east-1"] + tags = dict(Name=self._ecs_name, Owner="ardere", + ECSCluster=self._ecs_name) + if additional_tags: + tags.update(additional_tags) for instance_type, instance_count in instances.items(): self._ec2_client.run_instances( ImageId=ami_id, @@ -230,13 +242,13 @@ def request_instances(self, instances): InstanceType=instance_type, UserData=EC2_USER_DATA.format(ecs_name=self._ecs_name), IamInstanceProfile={"Arn": self.ecs_profile}, + SecurityGroupIds=security_group_ids, TagSpecifications=[ { "ResourceType": "instance", "Tags": [ - dict(Key="Name", Value=self._ecs_name), - dict(Key="Owner", Value="ardere"), - dict(Key="ECSCluster", Value=self._ecs_name), + dict(Key=tag_name, Value=tag_value) + for tag_name, tag_value in tags.items() ] } ] @@ -282,9 +294,9 @@ def create_metrics_service(self, options): cmd = """\ export GF_DEFAULT_INSTANCE_NAME=`wget -qO- http://169.254.169.254/latest/meta-data/instance-id` && \ - export GF_SECURITY_ADMIN_USER="%s" && \ - export GF_SECURITY_ADMIN_PASSWORD="%s" && \ - export GF_USERS_ALLOW_SIGN_UP="false" && \ + export GF_SECURITY_ADMIN_USER=%s && \ + export GF_SECURITY_ADMIN_PASSWORD=%s && \ + export GF_USERS_ALLOW_SIGN_UP=false && \ mkdir "${GF_DASHBOARDS_JSON_PATH}" && \ ./run.sh """ % (self.grafana_admin_user, self.grafana_admin_password) # noqa diff --git a/ardere/step_functions.py b/ardere/step_functions.py index 24fd2d3..f2c3d59 100644 --- a/ardere/step_functions.py +++ b/ardere/step_functions.py @@ -212,7 +212,22 @@ def populate_missing_instances(self): # Ensure we have the metrics instance if self.event["metrics_options"]["enabled"]: - needed[self.event["metrics_options"]["instance_type"]] += 1 + # Query to see if we need to add a metrics node + metric_inst_type = self.event["metrics_options"]["instance_type"] + + # We add the instance type to needed to ensure we don't leave out + # more nodes since this will turn up in the query_active results + needed[metric_inst_type] += 1 + + # We create it here up-front if needed since we have different + # tags + if not self.ecs.has_metrics_node(metric_inst_type): + self.ecs.request_instances( + instances={metric_inst_type: 1}, + security_group_ids=[os.environ["metric_sg"], + os.environ["ec2_sg"]], + additional_tags={"Role": "metrics"} + ) logger.info("Plan instances needed: {}".format(needed)) current_instances = self.ecs.query_active_instances() @@ -221,7 +236,10 @@ def populate_missing_instances(self): ) if missing_instances: logger.info("Requesting instances: {}".format(missing_instances)) - self.ecs.request_instances(missing_instances) + self.ecs.request_instances( + instances=missing_instances, + security_group_ids=[os.environ["ec2_sg"]] + ) return self.event def ensure_metrics_available(self): diff --git a/serverless.yml b/serverless.yml index 772528f..397eae3 100644 --- a/serverless.yml +++ b/serverless.yml @@ -25,6 +25,14 @@ provider: Ref: "S3ReadyBucket" metrics_bucket: Ref: "MetricsBucket" + ec2_sg: + Fn::GetAtt: + - EC2SecurityGroup + - GroupId + metric_sg: + Fn::GetAtt: + - MetricSecurityGroup + - GroupId container_log_group: Ref: "ContainerLogs" @@ -85,9 +93,10 @@ provider: functions: populate_missing_instances: handler: handler.populate_missing_instances + timeout: 300 ensure_metrics_available: handler: handler.ensure_metrics_available - timeout: 120 + timeout: 300 create_ecs_services: handler: handler.create_ecs_services wait_for_cluster_ready: @@ -199,6 +208,33 @@ resources: Type: "AWS::S3::Bucket" Properties: AccessControl: "AuthenticatedRead" + MetricSecurityGroup: + Type: "AWS::EC2::SecurityGroup" + Properties: + GroupDescription: "ardere metrics" + SecurityGroupIngress: + - + IpProtocol: tcp + FromPort: 3000 + ToPort: 3000 + CidrIp: 0.0.0.0/0 + - + IpProtocol: tcp + FromPort: 8086 + ToPort: 8086 + CidrIp: 0.0.0.0/0 + - + IpProtocol: udp + FromPort: 8125 + ToPort: 8125 + SourceSecurityGroupId: + Fn::GetAtt: + - EC2SecurityGroup + - GroupId + EC2SecurityGroup: + Type: "AWS::EC2::SecurityGroup" + Properties: + GroupDescription: "ardere load-testers" EC2ContainerRole: Type: "AWS::IAM::Role" Properties: diff --git a/tests/test_aws.py b/tests/test_aws.py index 7fcf32d..57d0133 100644 --- a/tests/test_aws.py +++ b/tests/test_aws.py @@ -68,6 +68,28 @@ def test_calculate_missing_instances(self): ) eq_(result, {"t2.medium": 1}) + def test_has_metrics_node(self): + mock_paginator = mock.Mock() + mock_paginator.paginate.return_value = [ + {"Reservations": [ + { + "Instances": [ + { + "State": { + "Code": 16 + }, + "InstanceType": "t2.medium" + } + ] + } + ]} + ] + + ecs = self._make_FUT() + ecs._ec2_client.get_paginator.return_value = mock_paginator + resp = ecs.has_metrics_node("t2.medium") + eq_(resp, True) + def test_request_instances(self): instances = { "t2.medium": 10 @@ -76,7 +98,7 @@ def test_request_instances(self): ecs._ec2_client.run_instances.return_value = { "Instances": [{"InstanceId": 12345}] } - ecs.request_instances(instances) + ecs.request_instances(instances, ["i-382842"], {"Role": "metrics"}) ecs._ec2_client.run_instances.assert_called() def test_locate_metrics_container_ip(self): diff --git a/tests/test_step_functions.py b/tests/test_step_functions.py index 3abf180..f59fc2b 100644 --- a/tests/test_step_functions.py +++ b/tests/test_step_functions.py @@ -50,6 +50,9 @@ def test_load_toml(self): eq_(self.runner.event["ecs_name"], "ardere-test") def test_populate_missing_instances(self): + os.environ["ec2_sg"] = "i-23232" + os.environ["metric_sg"] = "i-84828" + self.mock_ecs.has_metrics_node.return_value = False self.runner.populate_missing_instances() self.mock_ecs.query_active_instances.assert_called() self.mock_ecs.request_instances.assert_called()