Skip to content
This repository has been archived by the owner on Oct 24, 2020. It is now read-only.

Commit

Permalink
feat: add telegraf setup for per-container stat reporting
Browse files Browse the repository at this point in the history
Creates InfluxDB database and sets up all the steps to have
telegraf running when influxdb is enabled.

Closes #33
  • Loading branch information
bbangert committed Mar 25, 2017
1 parent 95267fb commit 7749e2e
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 13 deletions.
68 changes: 60 additions & 8 deletions ardere/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@
parent_dir_path = os.path.dirname(dir_path)
shell_path = os.path.join(parent_dir_path, "src", "shell",
"waitforcluster.sh")
telegraf_path = os.path.join(parent_dir_path, "src", "shell",
"telegraf.toml")

# Load the shell script
# Load the shell scripts
with open(shell_path, 'r') as f:
shell_script = f.read()

with open(telegraf_path, 'r') as f:
telegraf_script = f.read()


# List tracking vcpu's of all instance types for cpu unit reservations
# We are intentionally leaving out the following instance types as they're
# considered overkill for load-testing purposes or any instance req's we have
Expand Down Expand Up @@ -70,13 +76,14 @@ class ECSManager(object):

# ECS optimized AMI id's
ecs_ami_ids = {
"us-east-1": "ami-b2df2ca4",
"us-east-2": "ami-832b0ee6",
"us-west-1": "ami-dd104dbd",
"us-west-2": "ami-022b9262"
"us-east-1": "ami-275ffe31",
"us-east-2": "ami-62745007",
"us-west-1": "ami-689bc208",
"us-west-2": "ami-62d35c02"
}

influxdb_container = "influxdb:1.1-alpine"
telegraf_container = "telegraf:1.2-alpine"

def __init__(self, plan):
# type: (Dict[str, Any]) -> None
Expand Down Expand Up @@ -107,6 +114,10 @@ def s3_ready_file(self):
key="{}.ready".format(self._plan_uuid)
)

@property
def influx_db_name(self):
return "run-{}".format(self.plan_uuid)

def family_name(self, step):
"""Generate a consistent family name for a given step"""
return step["name"] + "-" + self._plan_uuid
Expand Down Expand Up @@ -205,7 +216,8 @@ def locate_metrics_service(self):
cluster=self._ecs_name,
services=["metrics"]
)
if response["services"]:
if response["services"] and response["services"][0]["status"] == \
"ACTIVE":
return response["services"][0]
else:
return None
Expand Down Expand Up @@ -320,15 +332,55 @@ def create_service(self, step):
}
}
}

if "port_mapping" in step:
ports = [{"containerPort": port} for port in step["port_mapping"]]
container_def["portMappings"] = ports

# Setup the telegraf container definition
cmd = """\
echo "${__ARDERE_TELEGRAF_CONF__}" > /etc/telegraf/telegraf.conf && \
export __ARDERE_TELEGRAF_HOST__=`wget -qO- http://169.254.169.254/latest/meta-data/instance-id` && \
telegraf \
"""
cmd = ['sh', '-c', '{}'.format(cmd)]
telegraf_def = {
"name": "telegraf",
"image": self.telegraf_container,
"cpu": 512,
"memoryReservation": 256,
"entryPoint": cmd,
"portMappings": [
{"containerPort": 8125}
],
"environment": [
{"name": "__ARDERE_TELEGRAF_CONF__",
"value": telegraf_script},
{"name": "__ARDERE_TELEGRAF_STEP__",
"value": step["name"]},
{"name": "__ARDERE_INFLUX_ADDR__",
"value": "{}:8086".format(self._plan["influxdb_public_ip"])},
{"name": "__ARDERE_INFLUX_DB__",
"value": self.influx_db_name},
{"name": "__ARDERE_TELEGRAF_TYPE__",
"value": step["docker_series"]}
],
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": self.container_log_group,
"awslogs-region": "us-east-1",
"awslogs-stream-prefix": "ardere-{}".format(
self.plan_uuid
)
}
}
}

task_response = self._ecs_client.register_task_definition(
family=family_name,
containerDefinitions=[
container_def
container_def,
telegraf_def
],
# use host network mode for optimal performance
networkMode="host",
Expand Down
7 changes: 7 additions & 0 deletions ardere/step_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import boto3
import botocore
import toml
from influxdb import InfluxDBClient
from marshmallow import (
Schema,
decorators,
Expand Down Expand Up @@ -42,6 +43,7 @@ class StepValidator(Schema):
cmd = fields.String(required=True)
port_mapping = fields.List(fields.Int())
env = fields.Dict()
docker_series = fields.String(missing="default")


class InfluxOptions(Schema):
Expand Down Expand Up @@ -162,6 +164,7 @@ def ensure_metrics_available(self):

# Is the service already running?
metrics = self.ecs.locate_metrics_service()
logger.info("Metrics info: %s", metrics)

if not metrics:
# Start the metrics service, throw a retry
Expand All @@ -170,6 +173,7 @@ def ensure_metrics_available(self):

deploy = metrics["deployments"][0]
ready = deploy["desiredCount"] == deploy["runningCount"]
logger.info("Deploy info: %s", deploy)
if not ready:
raise ServicesStartingException("Waiting for metrics")

Expand All @@ -180,6 +184,9 @@ def ensure_metrics_available(self):
raise Exception("Unable to locate metrics IP even though its "
"running")

# Create an influxdb for this run
influx_client = InfluxDBClient(host=metric_ip)
influx_client.create_database(self.ecs.influx_db_name)
self.event["influxdb_public_ip"] = metric_ip
return self.event

Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
futures==3.0.5
typing==3.5.3.0
toml==0.9.2
marshmallow==2.13.4
marshmallow==2.13.4
influxdb==4.0.0
2 changes: 1 addition & 1 deletion serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ resources:
S3ReadyBucket:
Type: "AWS::S3::Bucket"
Properties:
AccessControl: "AuthenticatedRead"
AccessControl: "PublicRead"
MetricsBucket:
Type: "AWS::S3::Bucket"
Properties:
Expand Down
177 changes: 177 additions & 0 deletions src/shell/telegraf.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# Telegraf Configuration
#
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
#
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
#
# Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate.
#
# Environment variables can be used anywhere in this config file, simply prepend
# them with $. For strings the variable must be within quotes (ie, "$STR_VAR"),
# for numbers and booleans they should be plain (ie, $INT_VAR, $BOOL_VAR)


# Global tags can be specified here in key="value" format.
[global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a"
## Environment variables can be used as tags, and throughout the config file
# user = "$USER"
step = "$__ARDERE_TELEGRAF_STEP__"
## type is the old "docker_series"
type = "$__ARDERE_TELEGRAF_TYPE__"


# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "10s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true

## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000

## For failed writes, telegraf will cache metric_buffer_limit metrics for each
## output, and will flush this buffer on a successful write. Oldest metrics
## are dropped first when this buffer fills.
## This buffer only fills when writes fail to output plugin(s).
metric_buffer_limit = 10000

## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"

## Default flushing interval for all outputs. You shouldn't set this below
## interval. Maximum flush_interval will be flush_interval + flush_jitter
flush_interval = "10s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## By default, precision will be set to the same timestamp order as the
## collection interval, with the maximum being 1s.
## Precision will NOT be used for service inputs, such as logparser and statsd.
## Valid values are "ns", "us" (or "µs"), "ms", "s".
precision = ""
## Logging configuration:
## Run telegraf with debug log messages.
debug = false
## Run telegraf in quiet mode (error log messages only).
quiet = false
## Specify the log file name. The empty string means to log to stderr.
logfile = ""
## Override default hostname, if empty use os.Hostname()
hostname = "$__ARDERE_TELEGRAF_HOST__"
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
###############################################################################
# OUTPUT PLUGINS #
###############################################################################
# Configuration for influxdb server to send metrics to
[[outputs.influxdb]]
## The full HTTP or UDP endpoint URL for your InfluxDB instance.
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
# urls = ["udp://localhost:8089"] # UDP endpoint example
urls = ["http://$__ARDERE_INFLUX_ADDR__"] # required
## The target database for metrics (telegraf will create it if not exists).
database = "$__ARDERE_INFLUX_DB__" # required
## Retention policy to write to. Empty string writes to the default rp.
retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
## Write timeout (for the InfluxDB client), formatted as a string.
## If not provided, will default to 5s. 0s means no timeout (not recommended).
timeout = "5s"
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## Set the user agent for HTTP POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
## Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
# udp_payload = 512
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
###############################################################################
# PROCESSOR PLUGINS #
###############################################################################
# # Print all metrics that pass through this filter.
# [[processors.printer]]
###############################################################################
# AGGREGATOR PLUGINS #
###############################################################################
# # Keep the aggregate min/max of each metric passing through.
# [[aggregators.minmax]]
# ## General Aggregator Arguments:
# ## The period on which to flush & clear the aggregator.
# period = "30s"
# ## If true, the original metric will be dropped by the
# ## aggregator and will not get sent to the output plugins.
# drop_original = false
###############################################################################
# INPUT PLUGINS #
###############################################################################
# Read metrics about cpu usage
[[inputs.cpu]]
## Whether to report per-cpu stats or not
percpu = true
## Whether to report total system cpu stats or not
totalcpu = true
## If true, collect raw CPU time metrics.
collect_cpu_time = false
# Read metrics about memory usage
[[inputs.mem]]
# no configuration
# Read TCP metrics such as established, time wait and sockets counts.
[[inputs.netstat]]
# no configuration
###############################################################################
# SERVICE INPUT PLUGINS #
###############################################################################
# Statsd Server
[[inputs.statsd]]
## Address and port to host UDP listener on
service_address = ":8125"
## The following configuration options control when telegraf clears it's cache
## of previous values. If set to false, then telegraf will only clear it's
## cache when the daemon is restarted.
## Reset gauges every interval (default=true)
delete_gauges = true
## Reset counters every interval (default=true)
delete_counters = true
## Reset sets every interval (default=true)
delete_sets = true
## Reset timings & histograms every interval (default=true)
delete_timings = true
## Percentiles to calculate for timing & histogram stats
percentiles = [90]
## separator to use between elements of a statsd metric
metric_separator = "_"
## Parses tags in the datadog statsd format
## http://docs.datadoghq.com/guides/dogstatsd/
parse_data_dog_tags = false
## Statsd data translation templates, more info can be read here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite
# templates = [
# "cpu.* measurement*"
# ]
## Number of UDP messages allowed to queue up, once filled,
## the statsd server will start dropping packets
allowed_pending_messages = 10000
## Number of timing/histogram values to track per-measurement in the
## calculation of percentiles. Raising this limit increases the accuracy
## of percentiles but also increases the memory usage and cpu time.
#percentile_limit = 1000
percentile_limit = 10
2 changes: 1 addition & 1 deletion tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
},
"port_mapping": [8000, 4000],
"container_name": "bbangert/ap-loadtester:latest",
"cmd": "./apenv/bin/aplt_testplan wss://autopush.stage.mozaws.net 'aplt.scenarios:notification_forever,1000,1,0'"
"cmd": "./apenv/bin/aplt_testplan wss://autopush.stage.mozaws.net 'aplt.scenarios:notification_forever,1000,1,0' --statsd_host=localhost --statsd_port=8125"
}
]
}
Expand Down
6 changes: 4 additions & 2 deletions tests/test_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ def test_locate_metrics_service(self):
ecs = self._make_FUT()
ecs._ecs_client.describe_services.return_value = {
"services": [
{"stuff": 1}
{"stuff": 1, "status": "ACTIVE"}
]
}
result = ecs.locate_metrics_service()
eq_(result, {"stuff": 1})
eq_(result, {"stuff": 1, "status": "ACTIVE"})

def test_locate_metrics_service_not_found(self):
ecs = self._make_FUT()
Expand Down Expand Up @@ -141,6 +141,8 @@ def test_create_service(self):
ecs = self._make_FUT()

step = ecs._plan["steps"][0]
ecs._plan["influxdb_public_ip"] = "1.1.1.1"
step["docker_series"] = "default"

# Setup mocks
ecs._ecs_client.register_task_definition.return_value = {
Expand Down
Loading

0 comments on commit 7749e2e

Please sign in to comment.