Skip to content

Commit

Permalink
Merge pull request #4 from sprockets/sampling
Browse files Browse the repository at this point in the history
Add batch submission probability
  • Loading branch information
amberheilman committed Apr 5, 2017
2 parents cb705b8 + b94f14b commit 22cf737
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 32 deletions.
62 changes: 33 additions & 29 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,39 @@ documentation.

The following table details the environment variable configuration options.

+------------------------------+--------------------------------------------------+---------------+
| Variable | Definition | Default |
+==============================+==================================================+===============+
| ``INFLUXDB_SCHEME`` | The URL request scheme for making HTTP requests | ``https`` |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_HOST`` | The InfluxDB server hostname | ``localhost`` |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_PORT`` | The InfluxDB server port | ``8086`` |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_USER`` | The InfluxDB server username | |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_PASSWORD`` | The InfluxDB server password | |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_ENABLED`` | Set to ``false`` to disable InfluxDB support | ``true`` |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_INTERVAL`` | How many milliseconds to wait before submitting | ``60000`` |
| | measurements when the buffer has fewer than | |
| | ``INFLUXDB_TRIGGER_SIZE`` measurements. | |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_MAX_BATCH_SIZE`` | Max # of measurements to submit in a batch | ``10000`` |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_MAX_BUFFER_SIZE`` | Limit of measurements in a buffer before new | ``25000`` |
| | measurements are discarded. | |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_TRIGGER_SIZE`` | The number of metrics in the buffer to trigger | ``60000`` |
| | the submission of a batch. | |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_TAG_HOSTNAME`` | Include the hostname as a tag in the measurement | ``true`` |
+------------------------------+--------------------------------------------------+---------------+
+---------------------------------+--------------------------------------------------+---------------+
| Variable | Definition | Default |
+=================================+==================================================+===============+
| ``INFLUXDB_SCHEME`` | The URL request scheme for making HTTP requests | ``https`` |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_HOST`` | The InfluxDB server hostname | ``localhost`` |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_PORT`` | The InfluxDB server port | ``8086`` |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_USER`` | The InfluxDB server username | |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_PASSWORD`` | The InfluxDB server password | |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_ENABLED`` | Set to ``false`` to disable InfluxDB support | ``true`` |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_INTERVAL`` | How many milliseconds to wait before submitting | ``60000`` |
| | measurements when the buffer has fewer than | |
| | ``INFLUXDB_TRIGGER_SIZE`` measurements. | |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_MAX_BATCH_SIZE`` | Max # of measurements to submit in a batch | ``10000`` |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_MAX_BUFFER_SIZE`` | Limit of measurements in a buffer before new | ``25000`` |
| | measurements are discarded. | |
+------------------------------+-----------------------------------------------------+---------------+
| ``INFLUXDB_SAMPLE_PROBABILITY`` | A value that is >= 0 and <= 1.0 that specifies | ``1.0`` |
| | the probability that a batch will be submitted | |
| | to InfluxDB or dropped. | |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_TRIGGER_SIZE`` | The number of metrics in the buffer to trigger | ``60000`` |
| | the submission of a batch. | |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_TAG_HOSTNAME`` | Include the hostname as a tag in the measurement | ``true`` |
+---------------------------------+--------------------------------------------------+---------------+

Mixin Configuration
^^^^^^^^^^^^^^^^^^^
Expand Down
55 changes: 52 additions & 3 deletions sprockets_influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import contextlib
import logging
import os
import random
import select
import socket
import ssl
Expand All @@ -29,7 +30,7 @@
logging.critical('Could not import Tornado')
concurrent, httpclient, ioloop = None, None, None

version_info = (2, 0, 0)
version_info = (2, 1, 0)
__version__ = '.'.join(str(v) for v in version_info)
__all__ = ['__version__', 'version_info', 'add_measurement', 'flush',
'install', 'shutdown', 'Measurement']
Expand Down Expand Up @@ -61,6 +62,7 @@ class TimeoutError(Exception):
_max_batch_size = 10000
_max_buffer_size = 25000
_max_clients = 10
_sample_probability = 1.0
_stopping = False
_timeout_interval = 60000
_timeout = None
Expand Down Expand Up @@ -196,7 +198,8 @@ def flush():

def install(url=None, auth_username=None, auth_password=None, io_loop=None,
submission_interval=None, max_batch_size=None, max_clients=10,
base_tags=None, max_buffer_size=None, trigger_size=None):
base_tags=None, max_buffer_size=None, trigger_size=None,
sample_probability=1.0):
"""Call this to install/setup the InfluxDB client collector. All arguments
are optional.
Expand Down Expand Up @@ -227,6 +230,8 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
Default: ``25000``
:param int trigger_size: The minimum number of measurements that
are in the buffer before a batch can be submitted. Default: ``5000``
:param float sample_probability: Value between 0 and 1.0 specifying the
probability that a batch will be submitted (0.25 == 25%)
:returns: :data:`True` if the client was installed by this call
and :data:`False` otherwise.
Expand All @@ -236,7 +241,7 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
"""
global _base_tags, _base_url, _credentials, _enabled, _installed, \
_io_loop, _max_batch_size, _max_buffer_size, _max_clients, \
_timeout, _timeout_interval, _trigger_size
_sample_probability, _timeout, _timeout_interval, _trigger_size

_enabled = os.environ.get('INFLUXDB_ENABLED', 'true') == 'true'
if not _enabled:
Expand Down Expand Up @@ -269,6 +274,9 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
_max_clients = max_clients
_max_buffer_size = max_buffer_size or \
int(os.environ.get('INFLUXDB_MAX_BUFFER_SIZE', _max_buffer_size))
_sample_probability = sample_probability or \
float(os.environ.get('INFLUXDB_SAMPLE_PROBABILITY',
_sample_probability))
_trigger_size = trigger_size or \
int(os.environ.get('INFLUXDB_TRIGGER_SIZE', _trigger_size))

Expand All @@ -279,6 +287,9 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
_base_tags.setdefault('environment', os.environ['ENVIRONMENT'])
_base_tags.update(base_tags or {})

# Seed the random number generator for batch sampling
random.seed()

# Don't let this run multiple times
_installed = True

Expand Down Expand Up @@ -375,6 +386,25 @@ def set_max_clients(limit):
_max_clients = limit


def set_sample_probability(probability):
"""Set the probability that a batch will be submitted to the InfluxDB
server. This should be a value that is greater than or equal to ``0`` and
less than or equal to ``1.0``. A value of ``0.25`` would represent a
probability of 25% that a batch would be written to InfluxDB.
:param float probability: The value between 0 and 1.0 that represents the
probability that a batch will be submitted to the InfluxDB server.
"""
global _sample_probability

if not 0.0 <= probability <= 1.0:
raise ValueError('Invalid probability value')

LOGGER.debug('Setting sample probability to %.2f', probability)
_sample_probability = float(probability)


def set_timeout(milliseconds):
"""Override the maximum duration to wait for submitting measurements to
InfluxDB.
Expand Down Expand Up @@ -581,6 +611,22 @@ def _pending_measurements():
return sum([len(_measurements[dbname]) for dbname in _measurements])


def _sample_batch():
"""Determine if a batch should be processed and if not, pop off all of
the pending metrics for that batch.
:rtype: bool
"""
if _sample_probability == 1.0 or random.random() < _sample_probability:
return True

# Pop off all the metrics for the batch
for database in _measurements:
_measurements[database] = _measurements[database][_max_batch_size:]
return False


def _start_timeout():
"""Stop a running timeout if it's there, then create a new one."""
global _timeout
Expand Down Expand Up @@ -620,6 +666,9 @@ def _write_measurements():
future.set_result(False)
elif not _pending_measurements():
future.set_result(True)
elif not _sample_batch():
LOGGER.debug('Skipping batch submission due to sampling')
future.set_result(True)

# Exit early if there's an error condition
if future.done():
Expand Down
29 changes: 29 additions & 0 deletions tests/client_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,32 @@ def test_timer(self):
self.flush()
value = self.get_measurement()
self.assertAlmostEqual(float(value.fields['duration-test']), 0.1, 1)


class SampleProbabilityTestCase(base.AsyncServerTestCase):

@staticmethod
def setup_batch():
influxdb.set_max_batch_size(100)
database = str(uuid.uuid4())
name = str(uuid.uuid4())
for iteration in range(0, 1000):
measurement = influxdb.Measurement(database, name)
measurement.set_field('test', random.randint(1000, 2000))
influxdb.add_measurement(measurement)

def test_sample_batch_false(self):
influxdb.set_sample_probability(0.0)
self.setup_batch()
self.assertEqual(influxdb._pending_measurements(), 1000)
result = influxdb._sample_batch()
self.assertFalse(result)
self.assertEqual(influxdb._pending_measurements(), 900)

def test_sample_batch_true(self):
influxdb.set_sample_probability(1.0)
self.setup_batch()
self.assertEqual(influxdb._pending_measurements(), 1000)
result = influxdb._sample_batch()
self.assertTrue(result)
self.assertEqual(influxdb._pending_measurements(), 1000)
12 changes: 12 additions & 0 deletions tests/install_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ def test_set_timeout(self):
influxdb.set_timeout(expectation)
self.assertEqual(influxdb._timeout_interval, expectation)

def test_set_sample_probability(self):
influxdb.install()
expectation = random.random()
influxdb.set_sample_probability(expectation)
self.assertEqual(influxdb._sample_probability, expectation)

def test_set_invalid_sample_probability(self):
influxdb.install()
with self.assertRaises(ValueError):
influxdb.set_sample_probability(2.0)
influxdb.set_sample_probability(-1.0)


class MeasurementTests(unittest.TestCase):

Expand Down

0 comments on commit 22cf737

Please sign in to comment.