diff --git a/ardere/aws.py b/ardere/aws.py index 7123a78..25bd494 100644 --- a/ardere/aws.py +++ b/ardere/aws.py @@ -38,22 +38,31 @@ class ECSManager(object): } def __init__(self, plan): - # type: (str) -> None + # type: (Dict[str, Any]) -> None """Create and return a ECSManager for a cluster of the given name.""" self._ecs_client = self.boto.client('ecs') self._ec2_client = self.boto.client('ec2') self._ecs_name = plan["ecs_name"] self._plan = plan + # Pull out the env vars + self.s3_ready_bucket = os.environ["s3_ready_bucket"] + self.container_log_group = os.environ["container_log_group"] + self.ecs_profile = os.environ["ecs_profile"] + if "plan_run_uuid" not in plan: plan["plan_run_uuid"] = str(uuid.uuid4()) self._plan_uuid = plan["plan_run_uuid"] + @property + def plan_uuid(self): + return self._plan_uuid + @property def s3_ready_file(self): return "https://s3.amazonaws.com/{bucket}/{key}".format( - bucket=os.environ["s3_ready_bucket"], + bucket=self.s3_ready_bucket, key="{}.ready".format(self._plan_uuid) ) @@ -112,7 +121,7 @@ def request_instances(self, instances): InstanceType=instance_type, UserData="#!/bin/bash \necho ECS_CLUSTER='" + self._ecs_name + "' >> /etc/ecs/ecs.config", - IamInstanceProfile={"Arn": os.environ["ecs_profile"]} + IamInstanceProfile={"Arn": self.ecs_profile} ) # Track returned instances for tagging step @@ -133,17 +142,17 @@ def create_service(self, step): logger.info("CreateService called with: {}".format(step)) # Prep the shell command - shell_command = [ - 'sh', '-c', '"$WAITFORCLUSTER"', - 'waitforcluster.sh', self.s3_ready_file, - str(step.get("run_delay", 0)) - ] - shell_command2 = ' '.join(shell_command) + ' && ' + step[ - "additional_command_args"] - shell_command3 = ['sh', '-c', '{}'.format(shell_command2)] + wfc_var = '__ARDERE_WAITFORCLUSTER_SH__' + wfc_cmd = 'sh -c "${}" waitforcluster.sh {} {}'.format( + wfc_var, + self.s3_ready_file, + step.get("run_delay", 0) + ) + service_cmd = step["additional_command_args"] + cmd = ['sh', '-c', '{} && {}'.format(wfc_cmd, service_cmd)] # Prep the env vars - env_vars = [{"name": "WAITFORCLUSTER", "value": shell_script}] + env_vars = [{"name": wfc_var, "value": shell_script}] for env_var in step.get("environment_data", []): name, value = env_var.split("=", 1) env_vars.append({"name": name, "value": value}) @@ -160,7 +169,17 @@ def create_service(self, step): # using only memoryReservation sets no hard limit "memoryReservation": 256, "environment": env_vars, - "entryPoint": shell_command3 + "entryPoint": cmd, + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-group": self.container_log_group, + "awslogs-region": "us-east-1", + "awslogs-stream-prefix": "ardere-{}".format( + self.plan_uuid + ) + } + } } ], placementConstraints=[ diff --git a/ardere/handler.py b/ardere/handler.py deleted file mode 100644 index 60911dd..0000000 --- a/ardere/handler.py +++ /dev/null @@ -1,129 +0,0 @@ -import logging -import os -import time -from collections import defaultdict - -import boto3 -import botocore -from typing import Any, Dict, List # noqa - -from aws import ECSManager -from exceptions import ( - ServicesStartingException, - ShutdownPlanException -) - -logger = logging.getLogger() -logger.setLevel(logging.INFO) - - -def _build_instance_map(test_plan): - """Given a JSON test-plan, build and return a dict of instance types - and how many should exist for each type.""" - instances = defaultdict(int) - for step in test_plan["steps"]: - instances[step["instance_type"]] += step["instance_count"] - return instances - - -def _find_test_plan_duration(plan): - # type: (Dict[str, Any]) -> int - """Locates and calculates the longest test plan duration from its - delay through its duration of the plan.""" - return max( - [x.get("run_delay", 0) + x["run_max_time"] for x in plan["steps"]] - ) - - -def populate_missing_instances(event, context): - logger.info("Called with {}".format(event)) - logger.info("Environ: {}".format(os.environ)) - ecs_manager = ECSManager(plan=event) - needed = _build_instance_map(event) - logger.info("Plan instances needed: {}".format(needed)) - current_instances = ecs_manager.query_active_instances() - missing_instances = ecs_manager.calculate_missing_instances( - desired=needed, current=current_instances - ) - if missing_instances: - logger.info("Requesting instances: {}".format(missing_instances)) - ecs_manager.request_instances(missing_instances) - return event - - -def create_ecs_services(event, context): - logger.info("Called with {}".format(event)) - ecs_manager = ECSManager(plan=event) - ecs_manager.create_services(event["steps"]) - return event - - -def wait_for_cluster_ready(event, context): - logger.info("Called with {}".format(event)) - ecs_manager = ECSManager(plan=event) - if not ecs_manager.all_services_ready(event["steps"]): - raise ServicesStartingException() - return event - - -def signal_cluster_start(event, context): - logger.info("Called with {}".format(event)) - logger.info("Bucket: {}".format(os.environ["s3_ready_bucket"])) - logger.info("Key: {}.ready".format(event["plan_run_uuid"])) - s3_client = boto3.client('s3') - s3_client.put_object( - ACL="public-read", - Body=b'{}'.format(int(time.time())), - Bucket=os.environ["s3_ready_bucket"], - Key="{}.ready".format(event["plan_run_uuid"]), - Metadata={ - "ECSCluster": event["ecs_name"] - } - ) - return event - - -def check_for_cluster_done(event, context): - logger.info("Called with {}".format(event)) - ecs_manager = ECSManager(plan=event) - - # Check to see if the S3 file is still around - s3 = boto3.resource('s3') - try: - ready_file = s3.Object( - os.environ["s3_ready_bucket"], - "{}.ready".format(event["plan_run_uuid"]) - ) - except botocore.exceptions.ClientError: - # Error getting to the bucket/key, abort test run - raise ShutdownPlanException("Error accessing ready file") - - file_contents = ready_file.get()['Body'].read().decode('utf-8') - start_time = int(file_contents) - - # Update to running count 0 any services that should halt by now - ecs_manager.stop_finished_services(start_time, event["steps"]) - - # If we're totally done, exit. - now = time.time() - plan_duration = _find_test_plan_duration(event) - if now > (start_time + plan_duration): - raise ShutdownPlanException("Test Plan has completed") - return event - -def cleanup_cluster(event, context): - logger.info("Called with {}".format(event)) - ecs_manager = ECSManager(plan=event) - ecs_manager.shutdown_plan(event["steps"]) - - # Attempt to remove the S3 object - s3 = boto3.resource('s3') - try: - ready_file = s3.Object( - os.environ["s3_ready_bucket"], - "{}.ready".format(event["plan_run_uuid"]) - ) - ready_file.delete() - except botocore.exceptions.ClientError: - pass - return event diff --git a/ardere/step_functions.py b/ardere/step_functions.py new file mode 100644 index 0000000..809917c --- /dev/null +++ b/ardere/step_functions.py @@ -0,0 +1,159 @@ +import os +import logging +import time +from collections import defaultdict + +import boto3 +import botocore +from typing import Any, Dict, List # noqa + +from ardere.aws import ECSManager +from ardere.exceptions import ( + ServicesStartingException, + ShutdownPlanException +) + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +class AsynchronousPlanRunner(object): + """Asynchronous Test Plan Runner + + This step function based runner handles running a test plan in an + asynchronous manner, where each step will wait for its run_delay if + present before running. + + """ + # For testing purposes + boto = boto3 + + def __init__(self, event, context): + logger.info("Called with {}".format(event)) + logger.info("Environ: {}".format(os.environ)) + self.ecs = ECSManager(plan=event) + self.event = event + self.context = context + + def _build_instance_map(self): + """Given a JSON test-plan, build and return a dict of instance types + and how many should exist for each type.""" + instances = defaultdict(int) + for step in self.event["steps"]: + instances[step["instance_type"]] += step["instance_count"] + return instances + + def _find_test_plan_duration(self): + # type: (Dict[str, Any]) -> int + """Locates and calculates the longest test plan duration from its + delay through its duration of the plan.""" + return max( + [x.get("run_delay", 0) + x["run_max_time"] for x in + self.event["steps"]] + ) + + def populate_missing_instances(self): + """Populate any missing EC2 instances needed for the test plan in the + cluster + + Step 1 + + """ + needed = self._build_instance_map() + logger.info("Plan instances needed: {}".format(needed)) + current_instances = self.ecs.query_active_instances() + missing_instances = self.ecs.calculate_missing_instances( + desired=needed, current=current_instances + ) + if missing_instances: + logger.info("Requesting instances: {}".format(missing_instances)) + self.ecs.request_instances(missing_instances) + return self.event + + def create_ecs_services(self): + """Create all the ECS services needed + + Step 2 + + """ + self.ecs.create_services(self.event["steps"]) + return self.event + + def wait_for_cluster_ready(self): + """Check all the ECS services to see if they're ready + + Step 3 + + """ + if not self.ecs.all_services_ready(self.event["steps"]): + raise ServicesStartingException() + return self.event + + def signal_cluster_start(self): + """Drop a ready file in S3 to trigger the test plan to being + + Step 4 + + """ + s3_client = self.boto.client('s3') + s3_client.put_object( + ACL="public-read", + Body=b'{}'.format(int(time.time())), + Bucket=os.environ["s3_ready_bucket"], + Key="{}.ready".format(self.ecs.plan_uuid), + Metadata={ + "ECSCluster": self.event["ecs_name"] + } + ) + return self.event + + def check_for_cluster_done(self): + """Check all the ECS services to see if they've run for their + specified duration + + Step 5 + + """ + # Check to see if the S3 file is still around + s3 = self.boto.resource('s3') + try: + ready_file = s3.Object( + os.environ["s3_ready_bucket"], + "{}.ready".format(self.ecs.plan_uuid) + ) + except botocore.exceptions.ClientError: + # Error getting to the bucket/key, abort test run + raise ShutdownPlanException("Error accessing ready file") + + file_contents = ready_file.get()['Body'].read().decode('utf-8') + start_time = int(file_contents) + + # Update to running count 0 any services that should halt by now + self.ecs.stop_finished_services(start_time, self.event["steps"]) + + # If we're totally done, exit. + now = time.time() + plan_duration = self._find_test_plan_duration() + if now > (start_time + plan_duration): + raise ShutdownPlanException("Test Plan has completed") + return self.event + + def cleanup_cluster(self): + """Shutdown all ECS services and deregister all task definitions + + Step 6 + + """ + self.ecs.shutdown_plan(self.event["steps"]) + + # Attempt to remove the S3 object + s3 = self.boto.resource('s3') + try: + ready_file = s3.Object( + os.environ["s3_ready_bucket"], + "{}.ready".format(self.ecs.plan_uuid) + ) + ready_file.delete() + except botocore.exceptions.ClientError: + pass + return self.event diff --git a/handler.py b/handler.py new file mode 100644 index 0000000..9bfb267 --- /dev/null +++ b/handler.py @@ -0,0 +1,38 @@ +# First some funky path manipulation so that we can work properly in +# the AWS environment +import sys +import os +dir_path = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(dir_path) + +from ardere.step_functions import AsynchronousPlanRunner + + +def populate_missing_instances(event, context): + runner = AsynchronousPlanRunner(event, context) + return runner.populate_missing_instances() + + +def create_ecs_services(event, context): + runner = AsynchronousPlanRunner(event, context) + return runner.create_ecs_services() + + +def wait_for_cluster_ready(event, context): + runner = AsynchronousPlanRunner(event, context) + return runner.wait_for_cluster_ready() + + +def signal_cluster_start(event, context): + runner = AsynchronousPlanRunner(event, context) + return runner.signal_cluster_start() + + +def check_for_cluster_done(event, context): + runner = AsynchronousPlanRunner(event, context) + return runner.check_for_cluster_done() + + +def cleanup_cluster(event, context): + runner = AsynchronousPlanRunner(event, context) + return runner.cleanup_cluster() diff --git a/serverless.yml b/serverless.yml index 868224a..8fff437 100644 --- a/serverless.yml +++ b/serverless.yml @@ -23,6 +23,8 @@ provider: - Arn s3_ready_bucket: Ref: "S3ReadyBucket" + container_log_group: + Ref: "ContainerLogs" iamRoleStatements: - Effect: "Allow" @@ -73,17 +75,17 @@ provider: functions: populate_missing_instances: - handler: ardere.handler.populate_missing_instances + handler: handler.populate_missing_instances create_ecs_services: - handler: ardere.handler.create_ecs_services + handler: handler.create_ecs_services wait_for_cluster_ready: - handler: ardere.handler.wait_for_cluster_ready + handler: handler.wait_for_cluster_ready signal_cluster_start: - handler: ardere.handler.signal_cluster_start + handler: handler.signal_cluster_start check_for_cluster_done: - handler: ardere.handler.check_for_cluster_done + handler: handler.check_for_cluster_done cleanup_cluster: - handler: ardere.handler.cleanup_cluster + handler: handler.cleanup_cluster stepFunctions: stateMachines: @@ -137,6 +139,13 @@ stepFunctions: Type: Task Resource: check_for_cluster_done Next: "Wait for new Cluster Check" + Retry: + - + ErrorEquals: + - NoSuchKey + IntervalSeconds: 10 + MaxAttempts: 4 + BackoffRate: 1 Catch: - ErrorEquals: @@ -193,6 +202,10 @@ resources: - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: "*" + ContainerLogs: + Type: "AWS::Logs::LogGroup" + Properties: + RetentionInDays: 1 EC2ContainerProfile: Type: "AWS::IAM::InstanceProfile" Properties: diff --git a/tests/test_aws.py b/tests/test_aws.py index 510a032..ed00d36 100644 --- a/tests/test_aws.py +++ b/tests/test_aws.py @@ -12,6 +12,9 @@ class TestECSManager(unittest.TestCase): def _make_FUT(self, plan=None): from ardere.aws import ECSManager + os.environ["s3_ready_bucket"] = "test_bucket" + os.environ["ecs_profile"] = "arn:something:fantastic:::" + os.environ["container_log_group"] = "ardere" self.boto_mock = mock.Mock() ECSManager.boto = self.boto_mock if not plan: @@ -21,10 +24,10 @@ def _make_FUT(self, plan=None): def test_init(self): ecs = self._make_FUT() eq_(ecs._plan["plan_run_uuid"], ecs._plan_uuid) + eq_(ecs.plan_uuid, ecs._plan_uuid) def test_ready_file(self): ecs = self._make_FUT() - os.environ["s3_ready_bucket"] = "test_bucket" ready_filename = ecs.s3_ready_file ok_("test_bucket" in ready_filename) ok_(ecs._plan_uuid in ready_filename) @@ -59,7 +62,6 @@ def test_calculate_missing_instances(self): eq_(result, {"t2.medium": 1}) def test_request_instances(self): - os.environ["ecs_profile"] = "arn:something:fantastic:::" instances = { "t2.medium": 10 } @@ -76,7 +78,6 @@ def test_request_instances(self): ) def test_create_service(self): - os.environ["s3_ready_bucket"] = "test_bucket" ecs = self._make_FUT() step = ecs._plan["steps"][0] diff --git a/tests/test_handler.py b/tests/test_step_functions.py similarity index 54% rename from tests/test_handler.py rename to tests/test_step_functions.py index f0d4f44..6173252 100644 --- a/tests/test_handler.py +++ b/tests/test_step_functions.py @@ -10,72 +10,60 @@ from tests import fixtures -class TestHandler(unittest.TestCase): +class TestAsyncPlanRunner(unittest.TestCase): def setUp(self): - self.plan = json.loads(fixtures.sample_basic_test_plan) self.mock_ecs = mock.Mock() - self._patcher = mock.patch("ardere.handler.ECSManager") + self._patcher = mock.patch("ardere.step_functions.ECSManager") mock_manager = self._patcher.start() mock_manager.return_value = self.mock_ecs + from ardere.step_functions import AsynchronousPlanRunner + + self.plan = json.loads(fixtures.sample_basic_test_plan) + self.runner = AsynchronousPlanRunner(self.plan, {}) + self.runner.boto = self.mock_boto = mock.Mock() + def tearDown(self): self._patcher.stop() def test_build_instance_map(self): - from ardere.handler import _build_instance_map - - result = _build_instance_map(self.plan) + result = self.runner._build_instance_map() eq_(len(result), 1) eq_(result, {"t2.medium": 1}) def test_find_test_plan_duration(self): - from ardere.handler import _find_test_plan_duration - - result = _find_test_plan_duration(self.plan) + result = self.runner._find_test_plan_duration() eq_(result, 140) def test_populate_missing_instances(self): - from ardere.handler import populate_missing_instances - - populate_missing_instances(self.plan, {}) + self.runner.populate_missing_instances() self.mock_ecs.query_active_instances.assert_called() self.mock_ecs.request_instances.assert_called() def test_create_ecs_services(self): - from ardere.handler import create_ecs_services - - create_ecs_services(self.plan, {}) + self.runner.create_ecs_services() self.mock_ecs.create_services.assert_called_with(self.plan["steps"]) def test_wait_for_cluster_ready_not_ready(self): - from ardere.handler import wait_for_cluster_ready from ardere.exceptions import ServicesStartingException self.mock_ecs.all_services_ready.return_value = False - assert_raises(ServicesStartingException, wait_for_cluster_ready, - self.plan, {}) + assert_raises(ServicesStartingException, + self.runner.wait_for_cluster_ready) def test_wait_for_cluster_ready_all_ready(self): - from ardere.handler import wait_for_cluster_ready - from ardere.exceptions import ServicesStartingException - self.mock_ecs.all_services_ready.return_value = True - wait_for_cluster_ready(self.plan, {}) + self.runner.wait_for_cluster_ready() self.mock_ecs.all_services_ready.assert_called() - @mock.patch("ardere.handler.boto3") - def test_signal_cluster_start(self, mock_boto): - from ardere.handler import signal_cluster_start - + def test_signal_cluster_start(self): self.plan["plan_run_uuid"] = str(uuid.uuid4()) - signal_cluster_start(self.plan, {}) - mock_boto.client.assert_called() + self.runner.signal_cluster_start() + self.mock_boto.client.assert_called() - @mock.patch("ardere.handler.boto3") - def test_check_for_cluster_done_not_done(self, mock_boto): - from ardere.handler import check_for_cluster_done + def test_check_for_cluster_done_not_done(self): os.environ["s3_ready_bucket"] = "test_bucket" mock_file = mock.Mock() mock_file.get.return_value = {"Body": mock_file} @@ -83,15 +71,14 @@ def test_check_for_cluster_done_not_done(self, mock_boto): 'utf-8') mock_s3_obj = mock.Mock() mock_s3_obj.Object.return_value = mock_file - mock_boto.resource.return_value = mock_s3_obj + self.mock_boto.resource.return_value = mock_s3_obj self.plan["plan_run_uuid"] = str(uuid.uuid4()) - check_for_cluster_done(self.plan, {}) + self.runner.check_for_cluster_done() - @mock.patch("ardere.handler.boto3") - def test_check_for_cluster_done_shutdown(self, mock_boto): - from ardere.handler import check_for_cluster_done + def test_check_for_cluster_done_shutdown(self): from ardere.exceptions import ShutdownPlanException + os.environ["s3_ready_bucket"] = "test_bucket" mock_file = mock.Mock() mock_file.get.return_value = {"Body": mock_file} @@ -99,16 +86,14 @@ def test_check_for_cluster_done_shutdown(self, mock_boto): 'utf-8') mock_s3_obj = mock.Mock() mock_s3_obj.Object.return_value = mock_file - mock_boto.resource.return_value = mock_s3_obj + self.mock_boto.resource.return_value = mock_s3_obj self.plan["plan_run_uuid"] = str(uuid.uuid4()) - assert_raises(ShutdownPlanException, check_for_cluster_done, - self.plan, {}) + assert_raises(ShutdownPlanException, self.runner.check_for_cluster_done) - @mock.patch("ardere.handler.boto3") - def test_check_for_cluster_done_object_error(self, mock_boto): - from ardere.handler import check_for_cluster_done + def test_check_for_cluster_done_object_error(self): from ardere.exceptions import ShutdownPlanException + os.environ["s3_ready_bucket"] = "test_bucket" mock_file = mock.Mock() mock_file.get.return_value = {"Body": mock_file} @@ -118,29 +103,24 @@ def test_check_for_cluster_done_object_error(self, mock_boto): mock_s3_obj.Object.side_effect = ClientError( {"Error": {}}, None ) - mock_boto.resource.return_value = mock_s3_obj + self.mock_boto.resource.return_value = mock_s3_obj self.plan["plan_run_uuid"] = str(uuid.uuid4()) - assert_raises(ShutdownPlanException, check_for_cluster_done, - self.plan, {}) + assert_raises(ShutdownPlanException, self.runner.check_for_cluster_done) - @mock.patch("ardere.handler.boto3") - def test_cleanup_cluster(self, mock_boto): - from ardere.handler import cleanup_cluster + def test_cleanup_cluster(self): self.plan["plan_run_uuid"] = str(uuid.uuid4()) - cleanup_cluster(self.plan, {}) - mock_boto.resource.assert_called() + self.runner.cleanup_cluster() + self.mock_boto.resource.assert_called() - @mock.patch("ardere.handler.boto3") - def test_cleanup_cluster_error(self, mock_boto): - from ardere.handler import cleanup_cluster + def test_cleanup_cluster_error(self): self.plan["plan_run_uuid"] = str(uuid.uuid4()) mock_s3 = mock.Mock() - mock_boto.resource.return_value = mock_s3 + self.mock_boto.resource.return_value = mock_s3 mock_s3.Object.side_effect = ClientError( {"Error": {}}, None ) - cleanup_cluster(self.plan, {}) + self.runner.cleanup_cluster() mock_s3.Object.assert_called()