From 7749e2eb373a6f6afc49b2a7d03fcf5c4f9a18fb Mon Sep 17 00:00:00 2001 From: Ben Bangert Date: Fri, 24 Mar 2017 10:32:07 -0700 Subject: [PATCH] feat: add telegraf setup for per-container stat reporting Creates InfluxDB database and sets up all the steps to have telegraf running when influxdb is enabled. Closes #33 --- ardere/aws.py | 68 ++++++++++++-- ardere/step_functions.py | 7 ++ requirements.txt | 3 +- serverless.yml | 2 +- src/shell/telegraf.toml | 177 +++++++++++++++++++++++++++++++++++ tests/fixtures.py | 2 +- tests/test_aws.py | 6 +- tests/test_step_functions.py | 5 + 8 files changed, 257 insertions(+), 13 deletions(-) create mode 100644 src/shell/telegraf.toml diff --git a/ardere/aws.py b/ardere/aws.py index 64c4461..1ac7974 100644 --- a/ardere/aws.py +++ b/ardere/aws.py @@ -18,11 +18,17 @@ parent_dir_path = os.path.dirname(dir_path) shell_path = os.path.join(parent_dir_path, "src", "shell", "waitforcluster.sh") +telegraf_path = os.path.join(parent_dir_path, "src", "shell", + "telegraf.toml") -# Load the shell script +# Load the shell scripts with open(shell_path, 'r') as f: shell_script = f.read() +with open(telegraf_path, 'r') as f: + telegraf_script = f.read() + + # List tracking vcpu's of all instance types for cpu unit reservations # We are intentionally leaving out the following instance types as they're # considered overkill for load-testing purposes or any instance req's we have @@ -70,13 +76,14 @@ class ECSManager(object): # ECS optimized AMI id's ecs_ami_ids = { - "us-east-1": "ami-b2df2ca4", - "us-east-2": "ami-832b0ee6", - "us-west-1": "ami-dd104dbd", - "us-west-2": "ami-022b9262" + "us-east-1": "ami-275ffe31", + "us-east-2": "ami-62745007", + "us-west-1": "ami-689bc208", + "us-west-2": "ami-62d35c02" } influxdb_container = "influxdb:1.1-alpine" + telegraf_container = "telegraf:1.2-alpine" def __init__(self, plan): # type: (Dict[str, Any]) -> None @@ -107,6 +114,10 @@ def s3_ready_file(self): key="{}.ready".format(self._plan_uuid) ) + @property + def influx_db_name(self): + return "run-{}".format(self.plan_uuid) + def family_name(self, step): """Generate a consistent family name for a given step""" return step["name"] + "-" + self._plan_uuid @@ -205,7 +216,8 @@ def locate_metrics_service(self): cluster=self._ecs_name, services=["metrics"] ) - if response["services"]: + if response["services"] and response["services"][0]["status"] == \ + "ACTIVE": return response["services"][0] else: return None @@ -320,15 +332,55 @@ def create_service(self, step): } } } - if "port_mapping" in step: ports = [{"containerPort": port} for port in step["port_mapping"]] container_def["portMappings"] = ports + # Setup the telegraf container definition + cmd = """\ + echo "${__ARDERE_TELEGRAF_CONF__}" > /etc/telegraf/telegraf.conf && \ + export __ARDERE_TELEGRAF_HOST__=`wget -qO- http://169.254.169.254/latest/meta-data/instance-id` && \ + telegraf \ + """ + cmd = ['sh', '-c', '{}'.format(cmd)] + telegraf_def = { + "name": "telegraf", + "image": self.telegraf_container, + "cpu": 512, + "memoryReservation": 256, + "entryPoint": cmd, + "portMappings": [ + {"containerPort": 8125} + ], + "environment": [ + {"name": "__ARDERE_TELEGRAF_CONF__", + "value": telegraf_script}, + {"name": "__ARDERE_TELEGRAF_STEP__", + "value": step["name"]}, + {"name": "__ARDERE_INFLUX_ADDR__", + "value": "{}:8086".format(self._plan["influxdb_public_ip"])}, + {"name": "__ARDERE_INFLUX_DB__", + "value": self.influx_db_name}, + {"name": "__ARDERE_TELEGRAF_TYPE__", + "value": step["docker_series"]} + ], + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-group": self.container_log_group, + "awslogs-region": "us-east-1", + "awslogs-stream-prefix": "ardere-{}".format( + self.plan_uuid + ) + } + } + } + task_response = self._ecs_client.register_task_definition( family=family_name, containerDefinitions=[ - container_def + container_def, + telegraf_def ], # use host network mode for optimal performance networkMode="host", diff --git a/ardere/step_functions.py b/ardere/step_functions.py index ef611f2..dc45f63 100644 --- a/ardere/step_functions.py +++ b/ardere/step_functions.py @@ -6,6 +6,7 @@ import boto3 import botocore import toml +from influxdb import InfluxDBClient from marshmallow import ( Schema, decorators, @@ -42,6 +43,7 @@ class StepValidator(Schema): cmd = fields.String(required=True) port_mapping = fields.List(fields.Int()) env = fields.Dict() + docker_series = fields.String(missing="default") class InfluxOptions(Schema): @@ -162,6 +164,7 @@ def ensure_metrics_available(self): # Is the service already running? metrics = self.ecs.locate_metrics_service() + logger.info("Metrics info: %s", metrics) if not metrics: # Start the metrics service, throw a retry @@ -170,6 +173,7 @@ def ensure_metrics_available(self): deploy = metrics["deployments"][0] ready = deploy["desiredCount"] == deploy["runningCount"] + logger.info("Deploy info: %s", deploy) if not ready: raise ServicesStartingException("Waiting for metrics") @@ -180,6 +184,9 @@ def ensure_metrics_available(self): raise Exception("Unable to locate metrics IP even though its " "running") + # Create an influxdb for this run + influx_client = InfluxDBClient(host=metric_ip) + influx_client.create_database(self.ecs.influx_db_name) self.event["influxdb_public_ip"] = metric_ip return self.event diff --git a/requirements.txt b/requirements.txt index 3ce18e2..e40927a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ futures==3.0.5 typing==3.5.3.0 toml==0.9.2 -marshmallow==2.13.4 \ No newline at end of file +marshmallow==2.13.4 +influxdb==4.0.0 \ No newline at end of file diff --git a/serverless.yml b/serverless.yml index 9e12aff..246c4ab 100644 --- a/serverless.yml +++ b/serverless.yml @@ -182,7 +182,7 @@ resources: S3ReadyBucket: Type: "AWS::S3::Bucket" Properties: - AccessControl: "AuthenticatedRead" + AccessControl: "PublicRead" MetricsBucket: Type: "AWS::S3::Bucket" Properties: diff --git a/src/shell/telegraf.toml b/src/shell/telegraf.toml new file mode 100644 index 0000000..66f3f89 --- /dev/null +++ b/src/shell/telegraf.toml @@ -0,0 +1,177 @@ +# Telegraf Configuration +# +# Telegraf is entirely plugin driven. All metrics are gathered from the +# declared inputs, and sent to the declared outputs. +# +# Plugins must be declared in here to be active. +# To deactivate a plugin, comment out the name and any variables. +# +# Use 'telegraf -config telegraf.conf -test' to see what metrics a config +# file would generate. +# +# Environment variables can be used anywhere in this config file, simply prepend +# them with $. For strings the variable must be within quotes (ie, "$STR_VAR"), +# for numbers and booleans they should be plain (ie, $INT_VAR, $BOOL_VAR) + + +# Global tags can be specified here in key="value" format. +[global_tags] +# dc = "us-east-1" # will tag all metrics with dc=us-east-1 +# rack = "1a" +## Environment variables can be used as tags, and throughout the config file +# user = "$USER" +step = "$__ARDERE_TELEGRAF_STEP__" +## type is the old "docker_series" +type = "$__ARDERE_TELEGRAF_TYPE__" + + +# Configuration for telegraf agent +[agent] +## Default data collection interval for all inputs +interval = "10s" +## Rounds collection interval to 'interval' +## ie, if interval="10s" then always collect on :00, :10, :20, etc. +round_interval = true + +## Telegraf will send metrics to outputs in batches of at most +## metric_batch_size metrics. +## This controls the size of writes that Telegraf sends to output plugins. +metric_batch_size = 1000 + +## For failed writes, telegraf will cache metric_buffer_limit metrics for each +## output, and will flush this buffer on a successful write. Oldest metrics +## are dropped first when this buffer fills. +## This buffer only fills when writes fail to output plugin(s). +metric_buffer_limit = 10000 + +## Collection jitter is used to jitter the collection by a random amount. +## Each plugin will sleep for a random time within jitter before collecting. +## This can be used to avoid many plugins querying things like sysfs at the +## same time, which can have a measurable effect on the system. +collection_jitter = "0s" + +## Default flushing interval for all outputs. You shouldn't set this below +## interval. Maximum flush_interval will be flush_interval + flush_jitter +flush_interval = "10s" +## Jitter the flush interval by a random amount. This is primarily to avoid +## large write spikes for users running a large number of telegraf instances. +## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s +flush_jitter = "0s" +## By default, precision will be set to the same timestamp order as the +## collection interval, with the maximum being 1s. +## Precision will NOT be used for service inputs, such as logparser and statsd. +## Valid values are "ns", "us" (or "µs"), "ms", "s". +precision = "" +## Logging configuration: +## Run telegraf with debug log messages. +debug = false +## Run telegraf in quiet mode (error log messages only). +quiet = false +## Specify the log file name. The empty string means to log to stderr. +logfile = "" +## Override default hostname, if empty use os.Hostname() +hostname = "$__ARDERE_TELEGRAF_HOST__" +## If set to true, do no set the "host" tag in the telegraf agent. +omit_hostname = false +############################################################################### +# OUTPUT PLUGINS # +############################################################################### +# Configuration for influxdb server to send metrics to +[[outputs.influxdb]] +## The full HTTP or UDP endpoint URL for your InfluxDB instance. +## Multiple urls can be specified as part of the same cluster, +## this means that only ONE of the urls will be written to each interval. +# urls = ["udp://localhost:8089"] # UDP endpoint example +urls = ["http://$__ARDERE_INFLUX_ADDR__"] # required +## The target database for metrics (telegraf will create it if not exists). +database = "$__ARDERE_INFLUX_DB__" # required +## Retention policy to write to. Empty string writes to the default rp. +retention_policy = "" +## Write consistency (clusters only), can be: "any", "one", "quorum", "all" +write_consistency = "any" +## Write timeout (for the InfluxDB client), formatted as a string. +## If not provided, will default to 5s. 0s means no timeout (not recommended). +timeout = "5s" +# username = "telegraf" +# password = "metricsmetricsmetricsmetrics" +## Set the user agent for HTTP POSTs (can be useful for log differentiation) +# user_agent = "telegraf" +## Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes) +# udp_payload = 512 +## Optional SSL Config +# ssl_ca = "/etc/telegraf/ca.pem" +# ssl_cert = "/etc/telegraf/cert.pem" +# ssl_key = "/etc/telegraf/key.pem" +## Use SSL but skip chain & host verification +# insecure_skip_verify = false +############################################################################### +# PROCESSOR PLUGINS # +############################################################################### +# # Print all metrics that pass through this filter. +# [[processors.printer]] +############################################################################### +# AGGREGATOR PLUGINS # +############################################################################### +# # Keep the aggregate min/max of each metric passing through. +# [[aggregators.minmax]] +# ## General Aggregator Arguments: +# ## The period on which to flush & clear the aggregator. +# period = "30s" +# ## If true, the original metric will be dropped by the +# ## aggregator and will not get sent to the output plugins. +# drop_original = false +############################################################################### +# INPUT PLUGINS # +############################################################################### +# Read metrics about cpu usage +[[inputs.cpu]] +## Whether to report per-cpu stats or not +percpu = true +## Whether to report total system cpu stats or not +totalcpu = true +## If true, collect raw CPU time metrics. +collect_cpu_time = false +# Read metrics about memory usage +[[inputs.mem]] +# no configuration +# Read TCP metrics such as established, time wait and sockets counts. +[[inputs.netstat]] +# no configuration +############################################################################### +# SERVICE INPUT PLUGINS # +############################################################################### +# Statsd Server +[[inputs.statsd]] +## Address and port to host UDP listener on +service_address = ":8125" +## The following configuration options control when telegraf clears it's cache +## of previous values. If set to false, then telegraf will only clear it's +## cache when the daemon is restarted. +## Reset gauges every interval (default=true) +delete_gauges = true +## Reset counters every interval (default=true) +delete_counters = true +## Reset sets every interval (default=true) +delete_sets = true +## Reset timings & histograms every interval (default=true) +delete_timings = true +## Percentiles to calculate for timing & histogram stats +percentiles = [90] +## separator to use between elements of a statsd metric +metric_separator = "_" +## Parses tags in the datadog statsd format +## http://docs.datadoghq.com/guides/dogstatsd/ +parse_data_dog_tags = false +## Statsd data translation templates, more info can be read here: +## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite +# templates = [ +# "cpu.* measurement*" +# ] +## Number of UDP messages allowed to queue up, once filled, +## the statsd server will start dropping packets +allowed_pending_messages = 10000 +## Number of timing/histogram values to track per-measurement in the +## calculation of percentiles. Raising this limit increases the accuracy +## of percentiles but also increases the memory usage and cpu time. +#percentile_limit = 1000 +percentile_limit = 10 \ No newline at end of file diff --git a/tests/fixtures.py b/tests/fixtures.py index 0ed891a..1591992 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -14,7 +14,7 @@ }, "port_mapping": [8000, 4000], "container_name": "bbangert/ap-loadtester:latest", - "cmd": "./apenv/bin/aplt_testplan wss://autopush.stage.mozaws.net 'aplt.scenarios:notification_forever,1000,1,0'" + "cmd": "./apenv/bin/aplt_testplan wss://autopush.stage.mozaws.net 'aplt.scenarios:notification_forever,1000,1,0' --statsd_host=localhost --statsd_port=8125" } ] } diff --git a/tests/test_aws.py b/tests/test_aws.py index 092e3fc..7de2af4 100644 --- a/tests/test_aws.py +++ b/tests/test_aws.py @@ -107,11 +107,11 @@ def test_locate_metrics_service(self): ecs = self._make_FUT() ecs._ecs_client.describe_services.return_value = { "services": [ - {"stuff": 1} + {"stuff": 1, "status": "ACTIVE"} ] } result = ecs.locate_metrics_service() - eq_(result, {"stuff": 1}) + eq_(result, {"stuff": 1, "status": "ACTIVE"}) def test_locate_metrics_service_not_found(self): ecs = self._make_FUT() @@ -141,6 +141,8 @@ def test_create_service(self): ecs = self._make_FUT() step = ecs._plan["steps"][0] + ecs._plan["influxdb_public_ip"] = "1.1.1.1" + step["docker_series"] = "default" # Setup mocks ecs._ecs_client.register_task_definition.return_value = { diff --git a/tests/test_step_functions.py b/tests/test_step_functions.py index fc59450..c9060d2 100644 --- a/tests/test_step_functions.py +++ b/tests/test_step_functions.py @@ -15,6 +15,9 @@ class TestAsyncPlanRunner(unittest.TestCase): def setUp(self): self.mock_ecs = mock.Mock() self._patcher = mock.patch("ardere.step_functions.ECSManager") + self._influx_patcher = mock.patch( + "ardere.step_functions.InfluxDBClient") + self.mock_influx = self._influx_patcher.start() mock_manager = self._patcher.start() mock_manager.return_value = self.mock_ecs @@ -25,6 +28,7 @@ def setUp(self): self.runner.boto = self.mock_boto = mock.Mock() def tearDown(self): + self._influx_patcher.stop() self._patcher.stop() def test_build_instance_map(self): @@ -105,6 +109,7 @@ def test_ensure_metrics_available_running(self): self.mock_ecs.locate_metrics_container_ip.return_value = "1.1.1.1" self.runner.ensure_metrics_available() self.mock_ecs.locate_metrics_container_ip.assert_called() + self.mock_influx.assert_called() def test_ensure_metrics_available_disabled(self): self.plan["influx_options"] = dict(enabled=False)