Skip to content

Commit

Permalink
Merge pull request #2 from sprockets/add-influxdb
Browse files Browse the repository at this point in the history
Add Influx DB support
  • Loading branch information
gmr committed Jan 26, 2016
2 parents 89cce19 + 005ad9d commit e04905b
Show file tree
Hide file tree
Showing 9 changed files with 441 additions and 9 deletions.
11 changes: 11 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ Statsd Implementation
.. autoclass:: sprockets.mixins.metrics.StatsdMixin
:members:

InfluxDB Implementation
-----------------------
.. autoclass:: sprockets.mixins.metrics.InfluxDBMixin
:members:

.. autoclass:: sprockets.mixins.metrics.influxdb.InfluxDBConnection
:members:

Testing Helpers
---------------
*So who actually tests that their metrics are emitted as they expect?*
Expand All @@ -56,3 +64,6 @@ contains some helper that make testing a little easier.

.. autoclass:: sprockets.mixins.metrics.testing.FakeStatsdServer
:members:

.. autoclass:: sprockets.mixins.metrics.testing.FakeInfluxHandler
:members:
19 changes: 19 additions & 0 deletions docs/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,22 @@ as a base class.

.. literalinclude:: ../examples/statsd.py
:pyobject: SimpleHandler

Sending measurements to InfluxDB
--------------------------------
This simple application sends per-request measurements to an InfluxDB
server listening on ``localhost``. The mix-in is configured by passing
a ``sprockets.mixins.metrics.influxdb`` key into the application settings
as shown below.

.. literalinclude:: ../examples/influxdb.py
:pyobject: make_application

The InfluxDB database and measurement name are also configured in the
application settings object. The request handler is responsible for
providing the tag and value portions of the measurement. The standard
:class:`Metric Mixin API<sprockets.mixins.metrics.Mixin>` is used to set
tagged values.

.. literalinclude:: ../examples/influxdb.py
:pyobject: SimpleHandler
3 changes: 3 additions & 0 deletions docs/history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@ Release History
---------------
- Add :class:`sprockets.mixins.metrics.StatsdMixin`
- Add :class:`sprockets.mixins.metrics.testing.FakeStatsdServer`
- Add :class:`sprockets.mixins.metrics.testing.FakeInfluxHandler`
- Add :class:`sprockets.mixins.metrics.InfluxDBMixin`
- Add :class:`sprockets.mixins.metrics.influxdb.InfluxDBConnection`

.. _Next Release: https://github.com/sprockets/sprockets.mixins.metrics/compare/0.0.0...master
76 changes: 76 additions & 0 deletions examples/influxdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import os
import signal

from sprockets.mixins import metrics
from tornado import gen, ioloop, web


class SimpleHandler(metrics.InfluxDBMixin, web.RequestHandler):
"""
Simply emits a few metrics around the GET method.
The ``InfluxDBMixin`` sends all of the metrics gathered during
the processing of a request as a single measurement when the
request is finished. Each request of this sample will result
in a single measurement using the service name as the key.
The following tag keys are defined by default:
handler="examples.influxdb.SimpleHandler"
host="$HOSTNAME"
method="GET"
and the following values are written:
duration=0.2573668956756592
sleepytime=0.255108118057251
slept=42
status_code=204
The duration and status_code values are handled by the mix-in
and the slept and sleepytime values are added in the method.
"""

@gen.coroutine
def get(self):
with self.execution_timer('sleepytime'):
yield gen.sleep(0.25)
self.increase_counter('slept', amount=42)
self.set_status(204)
self.finish()


def _sig_handler(*args_):
iol = ioloop.IOLoop.instance()
iol.add_callback_from_signal(iol.stop)


def make_application():
"""
Create a application configured to send metrics.
Measurements will be sent to the ``testing`` database on the
configured InfluxDB instance. The measurement name is set
by the ``service`` setting.
"""
influx_url = 'http://{}:{}/write'.format(
os.environ.get('INFLUX_HOST', '127.0.0.1'),
os.environ.get('INFLUX_PORT', 8086))
settings = {
metrics.InfluxDBMixin.SETTINGS_KEY: {
'measurement': 'cli',
'database': 'testing',
'write_url': influx_url,
}
}
return web.Application([web.url('/', SimpleHandler)], **settings)


if __name__ == '__main__':
app = make_application()
app.listen(8000)
signal.signal(signal.SIGINT, _sig_handler)
signal.signal(signal.SIGTERM, _sig_handler)
ioloop.IOLoop.instance().start()
1 change: 1 addition & 0 deletions requires/testing.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
mock>=1.0.1,<2
nose>=1.3,<2
tornado>=4.2,<4.3
3 changes: 2 additions & 1 deletion sprockets/mixins/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .influxdb import InfluxDBMixin
from .statsd import StatsdMixin

version_info = (0, 0, 0)
__version__ = '.'.join(str(v) for v in version_info)
__all__ = ['StatsdMixin']
__all__ = ['InfluxDBMixin', 'StatsdMixin']
142 changes: 142 additions & 0 deletions sprockets/mixins/metrics/influxdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import contextlib
import socket
import time

from tornado import httpclient, ioloop


class InfluxDBConnection(object):
"""
Connection to an InfluxDB instance.
:param str write_url: the URL to send HTTP requests to
:param str database: the database to write measurements into
:param tornado.ioloop.IOLoop: the IOLoop to spawn callbacks on.
If this parameter is :data:`None`, then the active IOLoop,
as determined by :meth:`tornado.ioloop.IOLoop.instance`,
is used.
An instance of this class is stored in the application settings
and used to asynchronously send measurements to InfluxDB instance.
Each measurement is sent by spawning a context-free callback on
the IOloop.
"""

def __init__(self, write_url, database, io_loop=None):
self.io_loop = ioloop.IOLoop.instance() if io_loop is None else io_loop
self.client = httpclient.AsyncHTTPClient()
self.write_url = '{}?db={}'.format(write_url, database)

def submit(self, measurement, tags, values):
body = '{},{} {} {:d}'.format(measurement, ','.join(tags),
','.join(values),
int(time.time() * 1000000000))
request = httpclient.HTTPRequest(self.write_url, method='POST',
body=body.encode('utf-8'))
ioloop.IOLoop.current().spawn_callback(self.client.fetch, request)


class InfluxDBMixin(object):
"""
Mix this class in to record measurements to a InfluxDB server.
**Configuration**
:database:
InfluxDB database to write measurements to. This is passed
as the ``db`` query parameter when writing to Influx.
https://docs.influxdata.com/influxdb/v0.9/guides/writing_data/
:write_url:
The URL that the InfluxDB write endpoint is available on.
This is used as-is to write data into Influx.
"""

SETTINGS_KEY = 'sprockets.mixins.metrics.influxdb'
"""``self.settings`` key that configures this mix-in."""

def initialize(self):
self.__tags = {
'host': socket.gethostname(),
'handler': '{}.{}'.format(self.__module__,
self.__class__.__name__),
'method': self.request.method,
}

super(InfluxDBMixin, self).initialize()
settings = self.settings.setdefault(self.SETTINGS_KEY, {})
if 'db_connection' not in settings:
settings['db_connection'] = InfluxDBConnection(
settings['write_url'], settings['database'])
self.__metrics = []

def set_metric_tag(self, tag, value):
"""
Add a tag to the measurement key.
:param str tag: name of the tag to set
:param str value: value to assign
This will overwrite the current value assigned to a tag
if one exists.
"""
self.__tags[tag] = value

def record_timing(self, duration, *path):
"""
Record a timing.
:param float duration: timing to record in seconds
:param path: elements of the metric path to record
A timing is a named duration value.
"""
self.__metrics.append('{}={}'.format('.'.join(path), duration))

def increase_counter(self, *path, **kwargs):
"""
Increase a counter.
:param path: elements of the path to record
:keyword int amount: value to record. If omitted, the counter
value is one.
Counters are simply values that are summed in a query.
"""
self.__metrics.append('{}={}'.format('.'.join(path),
kwargs.get('amount', 1)))

@contextlib.contextmanager
def execution_timer(self, *path):
"""
Record the time it takes to run an arbitrary code block.
:param path: elements of the metric path to record
This method returns a context manager that records the amount
of time spent inside of the context and records a value
named `path` using (:meth:`record_timing`).
"""
start = time.time()
try:
yield
finally:
fini = max(time.time(), start)
self.record_timing(fini - start, *path)

def on_finish(self):
super(InfluxDBMixin, self).on_finish()
self.__metrics.append('status_code={}'.format(self._status_code))
self.record_timing(self.request.request_time(), 'duration')
self.settings[self.SETTINGS_KEY]['db_connection'].submit(
self.settings[self.SETTINGS_KEY]['measurement'],
('{}={}'.format(k, v) for k, v in self.__tags.items()),
self.__metrics,
)
89 changes: 89 additions & 0 deletions sprockets/mixins/metrics/testing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import collections
import logging
import re
import socket

from tornado import gen, web


LOGGER = logging.getLogger(__name__)
STATS_PATTERN = re.compile(r'(?P<path>[^:]*):(?P<value>[^|]*)\|(?P<type>.*)$')


Expand Down Expand Up @@ -81,3 +86,87 @@ def find_metrics(self, prefix, metric_type):
raise AssertionError(
'Expected metric starting with "{}" in {!r}'.format(
prefix, self.datagrams))


class FakeInfluxHandler(web.RequestHandler):
"""
Request handler that mimics the InfluxDB write endpoint.
Install this handler into your testing application and configure
the metrics plugin to write to it. After running a test, you can
examine the received measurements by iterating over the
``influx_db`` list in the application object.
.. code-block:: python
class TestThatMyStuffWorks(testing.AsyncHTTPTestCase):
def get_app(self):
self.app = web.Application([
web.url('/', HandlerUnderTest),
web.url('/write', metrics.testing.FakeInfluxHandler),
])
return self.app
def setUp(self):
super(TestThatMyStuffWorks, self).setUp()
self.app.settings[metrics.InfluxDBMixin.SETTINGS_KEY] = {
'measurement': 'stuff',
'write_url': self.get_url('/write'),
'database': 'requests',
}
def test_that_measurements_are_emitted(self):
self.fetch('/') # invokes handler under test
measurements = metrics.testing.FakeInfluxHandler(
self.app, 'requests', self)
for key, fields, timestamp in measurements:
# inspect measurements
"""

def initialize(self):
super(FakeInfluxHandler, self).initialize()
self.logger = LOGGER.getChild(__name__)
if not hasattr(self.application, 'influx_db'):
self.application.influx_db = collections.defaultdict(list)

def post(self):
db = self.get_query_argument('db')
payload = self.request.body.decode('utf-8')
for line in payload.splitlines():
self.logger.debug('received "%s"', line)
key, fields, timestamp = line.split()
self.application.influx_db[db].append((key, fields, timestamp))
self.set_status(204)

@staticmethod
def get_messages(application, database, test_case):
"""
Wait for measurements to show up and return them.
:param tornado.web.Application application: application that
:class:`.FakeInfluxHandler` is writing to
:param str database: database to retrieve
:param tornado.testing.AsyncTestCase test_case: test case
that is being executed
:return: measurements received as a :class:`list` of
(key, fields, timestamp) tuples
Since measurements are sent asynchronously from within the
``on_finish`` handler they are usually not sent by the time
that the test case has stopped the IOloop. This method accounts
for this by running the IOloop until measurements have been
received. It will raise an assertion error if measurements
are not received in a reasonable number of runs.
"""
for _ in range(0, 10):
if hasattr(application, 'influx_db'):
if application.influx_db[database]:
return application.influx_db[database]
test_case.io_loop.add_future(gen.sleep(0.1),
lambda _: test_case.stop())
test_case.wait()
else:
test_case.fail('Message not published to InfluxDB before timeout')

0 comments on commit e04905b

Please sign in to comment.