Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base: da34e0a3f1
...
compare: 0173778dd3
Checking mergeability… Don't worry, you can still create the pull request.
  • 6 commits
  • 7 files changed
  • 0 commit comments
  • 2 contributors
View
20 txstatsd/server/configurableprocessor.py
@@ -23,19 +23,16 @@ class ConfigurableMessageProcessor(MessageProcessor):
duration statistics, plus throughput statistics.
"""
- METRICS_SUMMARY = "statsd.numStats"
-
- def __init__(self, time_function=time.time, message_prefix="", plugins=None):
+ def __init__(self, time_function=time.time, message_prefix="",
+ internal_metrics_prefix="", plugins=None):
super(ConfigurableMessageProcessor, self).__init__(
time_function=time_function, plugins=plugins)
- if message_prefix:
- self.metrics_summary = (
- message_prefix + "." +
- ConfigurableMessageProcessor.METRICS_SUMMARY)
- else:
- self.metrics_summary = ConfigurableMessageProcessor.METRICS_SUMMARY
-
+ if not internal_metrics_prefix and not message_prefix:
+ internal_metrics_prefix = "statsd."
+ elif message_prefix and not internal_metrics_prefix:
+ internal_metrics_prefix = message_prefix + "." + "statsd."
+ self.internal_metrics_prefix = internal_metrics_prefix
self.message_prefix = message_prefix
self.gauge_metrics = {}
@@ -112,9 +109,6 @@ def flush_timer_metrics(self, percent, timestamp):
return (metrics, events)
- def flush_metrics_summary(self, messages, num_stats, timestamp):
- messages.append((self.metrics_summary, num_stats, timestamp))
-
def update_metrics(self):
super(ConfigurableMessageProcessor, self).update_metrics()
View
56 txstatsd/server/processor.py
@@ -66,11 +66,13 @@ def __init__(self, time_function=time.time, plugins=None):
self.time_function = time_function
self.stats_prefix = "stats."
- self.metrics_summary = "statsd.numStats"
+ self.internal_metrics_prefix = "statsd."
self.count_prefix = "stats_counts."
self.timer_prefix = self.stats_prefix + "timers."
self.gauge_prefix = self.stats_prefix + "gauge."
+ self.process_timings = {}
+ self.by_type = {}
self.timer_metrics = {}
self.counter_metrics = {}
self.gauge_metrics = deque()
@@ -89,6 +91,7 @@ def process_message(self, message, metric_type, key, fields):
Process a single entry, adding it to either C{counters}, C{timers},
or C{gauge_metrics} depending on which kind of message it is.
"""
+ start = self.time_function()
if metric_type == "c":
self.process_counter_metric(key, fields, message)
elif metric_type == "ms":
@@ -101,6 +104,10 @@ def process_message(self, message, metric_type, key, fields):
self.process_plugin_metric(metric_type, key, fields, message)
else:
return self.fail(message)
+ self.process_timings.setdefault(metric_type, 0)
+ self.process_timings[metric_type] += self.time_function() - start
+ self.by_type.setdefault(metric_type, 0)
+ self.by_type[metric_type] += 1
def get_message_prefix(self, kind):
return "stats." + kind
@@ -187,37 +194,53 @@ def flush(self, interval=10000, percent=90):
C{interval} and mean timings based on C{threshold}.
"""
messages = []
+ per_metric = {}
num_stats = 0
interval = interval / 1000
timestamp = int(self.time_function())
+ start = self.time_function()
counter_metrics, events = self.flush_counter_metrics(interval,
timestamp)
+ duration = self.time_function() - start
if events > 0:
messages.extend(sorted(counter_metrics))
num_stats += events
+ per_metric["counter"] = (events, duration)
+ start = self.time_function()
timer_metrics, events = self.flush_timer_metrics(percent, timestamp)
+ duration = self.time_function() - start
if events > 0:
messages.extend(sorted(timer_metrics))
num_stats += events
+ per_metric["timer"] = (events, duration)
+ start = self.time_function()
gauge_metrics, events = self.flush_gauge_metrics(timestamp)
+ duration = self.time_function() - start
if events > 0:
messages.extend(sorted(gauge_metrics))
num_stats += events
+ per_metric["gauge"] = (events, duration)
+ start = self.time_function()
meter_metrics, events = self.flush_meter_metrics(timestamp)
+ duration = self.time_function() - start
if events > 0:
messages.extend(sorted(meter_metrics))
num_stats += events
+ per_metric["meter"] = (events, duration)
+ start = self.time_function()
plugin_metrics, events = self.flush_plugin_metrics(interval, timestamp)
+ duration = self.time_function() - start
if events > 0:
messages.extend(sorted(plugin_metrics))
num_stats += events
+ per_metric["plugin"] = (events, duration)
- self.flush_metrics_summary(messages, num_stats, timestamp)
+ self.flush_metrics_summary(messages, num_stats, per_metric, timestamp)
return messages
def flush_counter_metrics(self, interval, timestamp):
@@ -305,8 +328,33 @@ def flush_plugin_metrics(self, interval, timestamp):
return (metrics, events)
- def flush_metrics_summary(self, messages, num_stats, timestamp):
- messages.append((self.metrics_summary, num_stats, timestamp))
+ def flush_metrics_summary(self, messages, num_stats,
+ per_metric, timestamp):
+ messages.append((self.internal_metrics_prefix + "numStats",
+ num_stats, timestamp))
+ for name, (value, duration) in per_metric.iteritems():
+ messages.extend([
+ (self.internal_metrics_prefix +
+ "flush.%s.count" % name,
+ value, timestamp),
+ (self.internal_metrics_prefix +
+ "flush.%s.duration" % name,
+ duration * 1000, timestamp)])
+ log.msg("Flushed %d %s metrics in %.6f" %
+ (value, name, duration))
+ for metric_type, duration in self.process_timings.iteritems():
+ messages.extend([
+ (self.internal_metrics_prefix +
+ "receive.%s.count" %
+ metric_type, self.by_type[metric_type], timestamp),
+ (self.internal_metrics_prefix +
+ "receive.%s.duration" %
+ metric_type, duration * 1000, timestamp)
+ ])
+ log.msg("Processing %d %s metrics took %.6f" %
+ (self.by_type[metric_type], metric_type, duration))
+ self.process_timings.clear()
+ self.by_type.clear()
def update_metrics(self):
for metric in self.meter_metrics.itervalues():
View
28 txstatsd/service.py
@@ -1,12 +1,13 @@
import getopt
-import socket
import sys
+import time
import ConfigParser
+import platform
from twisted.application.internet import UDPServer, TCPServer
from twisted.application.service import MultiService
-from twisted.python import usage
+from twisted.python import usage, log
from twisted.plugin import getPlugins
from txstatsd.client import InternalClient
@@ -152,9 +153,9 @@ class StatsDOptions(OptionsGlue):
"Routing rules", str],
["listen-tcp-port", "t", None,
"The TCP port where we will listen.", int],
- ["max-queue-size", "Q", 1000,
+ ["max-queue-size", "Q", 20000,
"Maximum send queue size per destination.", int],
- ["max-datapoints-per-message", "M", 500,
+ ["max-datapoints-per-message", "M", 1000,
"Maximum datapoints per message to carbon-cache.", int],
]
@@ -187,9 +188,13 @@ def __init__(self, carbon_client, processor, flush_interval, clock=None):
def flushProcessor(self):
"""Flush messages queued in the processor to Graphite."""
+ flushed = 0
+ start = time.time()
for metric, value, timestamp in self.processor.flush(
interval=self.flush_interval):
self.carbon_client.sendDatapoint(metric, (timestamp, value))
+ flushed += 1
+ log.msg("Flushed total %d metrics in %.6f" % (flushed, time.time() - start))
def startService(self):
self.flush_task.start(self.flush_interval / 1000, False)
@@ -226,6 +231,10 @@ def createService(options):
if prefix is None:
prefix = "statsd"
+ instance_name = options["instance-name"]
+ if not instance_name:
+ instance_name = platform.node()
+
# initialize plugins
plugin_metrics = []
for plugin in getPlugins(IMetricFactory):
@@ -238,8 +247,10 @@ def createService(options):
connection = InternalClient(input_router)
metrics = Metrics(connection)
else:
- processor = ConfigurableMessageProcessor(message_prefix=prefix,
- plugins=plugin_metrics)
+ processor = ConfigurableMessageProcessor(
+ message_prefix=prefix,
+ internal_metrics_prefix=prefix + "." + instance_name + ".",
+ plugins=plugin_metrics)
input_router = Router(processor, options['routing'], root_service)
connection = InternalClient(input_router)
metrics = ExtendedMetrics(connection)
@@ -251,10 +262,6 @@ def createService(options):
if not options["carbon-cache-name"]:
options["carbon-cache-name"].append(None)
- instance_name = options["instance-name"]
- if not instance_name:
- instance_name = socket.gethostname().replace('.','_')
-
reporting = ReportingService(instance_name)
reporting.setServiceParent(root_service)
@@ -277,7 +284,6 @@ def createService(options):
report_name.upper(), ()):
reporting.schedule(reporter, 60, metrics.gauge)
-
# XXX Make this configurable.
router = ConsistentHashingRouter()
carbon_client = CarbonClientManager(router)
View
48 txstatsd/stats/exponentiallydecayingsample.py
@@ -1,4 +1,4 @@
-
+import bisect
import math
import random
import time
@@ -19,10 +19,10 @@ class ExponentiallyDecayingSample(object):
library/publications/CormodeShkapenyukSrivastavaXu09.pdf>}
"""
- # 1 hour (in seconds)
- RESCALE_THRESHOLD = 60 * 60
+ # 10 minutes (in seconds)
+ RESCALE_THRESHOLD = 60 * 10
- def __init__(self, reservoir_size, alpha):
+ def __init__(self, reservoir_size, alpha, wall_time=None):
"""Creates a new C{ExponentiallyDecayingSample}.
@param reservoir_size: The number of samples to keep in the sampling
@@ -30,7 +30,7 @@ def __init__(self, reservoir_size, alpha):
@parama alpha: The exponential decay factor; the higher this is,
the more biased the sample will be towards newer values.
"""
- self._values = dict()
+ self._values = []
self.alpha = alpha
self.reservoir_size = reservoir_size
@@ -38,14 +38,17 @@ def __init__(self, reservoir_size, alpha):
self.start_time = 0
self.next_scale_time = 0
+ if wall_time is None:
+ wall_time = time.time
+ self._wall_time = wall_time
self.clear()
def clear(self):
- self._values.clear()
+ self._values = []
self.count = 0
self.start_time = self.tick()
self.next_scale_time = (
- time.time() + ExponentiallyDecayingSample.RESCALE_THRESHOLD)
+ self._wall_time() + self.RESCALE_THRESHOLD)
def size(self):
return min(self.reservoir_size, self.count)
@@ -64,27 +67,24 @@ def update(self, value, timestamp=None):
self.count += 1
new_count = self.count
if new_count <= self.reservoir_size:
- self._values[priority] = value
+ bisect.insort(self._values, (priority, value))
else:
- keys = sorted(self._values.keys())
- first = keys[0]
+ first = self._values[0][0]
if first < priority:
- if priority not in self._values:
- self._values[priority] = value
- del self._values[first]
+ bisect.insort(self._values, (priority, value))
+ self._values = self._values[1:]
- now = time.time()
+ now = self._wall_time()
next = self.next_scale_time
if now >= next:
self.rescale(now, next)
def get_values(self):
- keys = sorted(self._values.keys())
- return [self._values[k] for k in keys]
+ return [v for (k, v) in self._values]
def tick(self):
- return time.time()
+ return self._wall_time()
def weight(self, t):
return math.exp(self.alpha * t)
@@ -113,12 +113,12 @@ def rescale(self, now, next):
"""
self.next_scale_time = (
- now + ExponentiallyDecayingSample.RESCALE_THRESHOLD)
+ now + self.RESCALE_THRESHOLD)
old_start_time = self.start_time
self.start_time = self.tick()
- keys = sorted(self._values.keys())
- for k in keys:
- v = self._values[k]
- del self._values[k]
- self._values[k * math.exp(-self.alpha *
- (self.start_time - old_start_time))] = v
+
+ new_values = []
+ for k, v in self._values:
+ nk = k * math.exp(-self.alpha * (self.start_time - old_start_time))
+ new_values.append((nk, v))
+ self._values = new_values
View
26 txstatsd/tests/stats/test_exponentiallydecayingsample.py
@@ -1,3 +1,4 @@
+import random
from unittest import TestCase
@@ -26,7 +27,7 @@ def test_100_out_of_10_elements(self):
for i in population:
sample.update(i)
- self.assertEqual(sample.size(), 10, 'Should have 10 elements')
+ self.assertEqual(sample.size(), 10)
self.assertEqual(len(sample.get_values()), 10,
'Should have 10 elements')
self.assertEqual(
@@ -42,6 +43,27 @@ def test_heavily_biased_100_out_of_1000_elements(self):
self.assertEqual(sample.size(), 100, 'Should have 100 elements')
self.assertEqual(len(sample.get_values()), 100,
'Should have 100 elements')
+
self.assertEqual(
len(set(sample.get_values()).difference(set(population))), 0,
- 'Should only have elements from the population')
+ 'Should only have elements from the population')
+
+ def test_ewma_sample_load(self):
+
+ _time = [10000]
+
+ def wtime():
+ return _time[0]
+
+ sample = ExponentiallyDecayingSample(100, 0.99, wall_time=wtime)
+ sample.RESCALE_THRESHOLD = 100
+ sample.clear()
+ for i in xrange(10000000):
+ sample.update(random.normalvariate(0, 10))
+ _time[0] += 1
+
+ self.assertEqual(sample.size(), 100)
+ self.assertEqual(len(sample.get_values()), 100,
+ 'Should have 100 elements')
+ test_ewma_sample_load.skip = "takes too long to run"
+
View
24 txstatsd/tests/test_configurableprocessor.py
@@ -19,7 +19,6 @@ def test_flush_counter_with_empty_prefix(self):
time_function=lambda: 42)
configurable_processor.process("gorets:17|c")
messages = configurable_processor.flush()
- self.assertEqual(2, len(messages))
self.assertEqual(("gorets.count", 17, 42), messages[0])
self.assertEqual(("statsd.numStats", 1, 42), messages[1])
@@ -31,10 +30,22 @@ def test_flush_counter_with_prefix(self):
time_function=lambda: 42, message_prefix="test.metric")
configurable_processor.process("gorets:17|c")
messages = configurable_processor.flush()
- self.assertEqual(2, len(messages))
self.assertEqual(("test.metric.gorets.count", 17, 42), messages[0])
self.assertEqual(("test.metric.statsd.numStats", 1, 42),
messages[1])
+
+ def test_flush_counter_with_internal_prefix(self):
+ """
+ Ensure the prefix features if one is supplied.
+ """
+ configurable_processor = ConfigurableMessageProcessor(
+ time_function=lambda: 42, message_prefix="test.metric",
+ internal_metrics_prefix="statsd.foo.")
+ configurable_processor.process("gorets:17|c")
+ messages = configurable_processor.flush()
+ self.assertEqual(("test.metric.gorets.count", 17, 42), messages[0])
+ self.assertEqual(("statsd.foo.numStats", 1, 42),
+ messages[1])
def test_flush_plugin(self):
"""
@@ -45,10 +56,8 @@ def test_flush_plugin(self):
plugins=[distinct_metric_factory])
configurable_processor.process("gorets:17|pd")
messages = configurable_processor.flush()
- self.assertEqual(5, len(messages))
self.assertEquals(("test.metric.gorets.count", 1, 42), messages[0])
-
def test_flush_single_timer_single_time(self):
"""
If a single timer with a single data point is present, all
@@ -60,7 +69,6 @@ def test_flush_single_timer_single_time(self):
configurable_processor.process("glork:24|ms")
messages = configurable_processor.flush()
- self.assertEqual(11, len(messages))
self.assertEqual(("glork.75percentile", 24.0, 42), messages[0])
self.assertEqual(("glork.95percentile", 24.0, 42), messages[1])
self.assertEqual(("glork.98percentile", 24.0, 42), messages[2])
@@ -71,7 +79,6 @@ def test_flush_single_timer_single_time(self):
self.assertEqual(("glork.median", 24.0, 42), messages[7])
self.assertEqual(("glork.min", 24.0, 42), messages[8])
self.assertEqual(("glork.stddev", 0.0, 42), messages[9])
- self.assertEqual(("statsd.numStats", 1, 42), messages[10])
def test_flush_single_timer_multiple_times(self):
"""
@@ -94,7 +101,6 @@ def test_flush_single_timer_multiple_times(self):
configurable_processor.update_metrics()
messages = configurable_processor.flush()
- self.assertEqual(11, len(messages))
self.assertEqual(("glork.75percentile", 27.75, 42), messages[0])
self.assertEqual(("glork.95percentile", 42.0, 42), messages[1])
self.assertEqual(("glork.98percentile", 42.0, 42), messages[2])
@@ -105,7 +111,6 @@ def test_flush_single_timer_multiple_times(self):
self.assertEqual(("glork.median", 15.5, 42), messages[7])
self.assertEqual(("glork.min", 4.0, 42), messages[8])
self.assertEqual(("glork.stddev", 13.490738, 42), messages[9])
- self.assertEqual(("statsd.numStats", 1, 42), messages[10])
class FlushMeterMetricMessagesTest(TestCase):
@@ -131,7 +136,6 @@ def test_flush_meter_metric_with_prefix(self):
self.time_now += 1
messages = self.configurable_processor.flush()
- self.assertEqual(6, len(messages))
self.assertEqual(("test.metric.gorets.15min_rate", 0.0, self.time_now),
messages[0])
self.assertEqual(("test.metric.gorets.1min_rate", 0.0, self.time_now),
@@ -142,5 +146,3 @@ def test_flush_meter_metric_with_prefix(self):
messages[3])
self.assertEqual(("test.metric.gorets.mean_rate", 3.0, self.time_now),
messages[4])
- self.assertEqual(("test.metric.statsd.numStats", 1, self.time_now),
- messages[5])
View
87 txstatsd/tests/test_processor.py
@@ -1,12 +1,26 @@
import time
-from unittest import TestCase
from twisted.plugin import getPlugins
+from twisted.trial.unittest import TestCase
from txstatsd.server.processor import MessageProcessor
from txstatsd.itxstatsd import IMetricFactory
+class Timer(object):
+
+ def __init__(self, times=None):
+ if times is None:
+ times = []
+ self.times = times
+
+ def set(self, times):
+ self.times = times
+
+ def __call__(self):
+ return self.times.pop(0)
+
+
class TestMessageProcessor(MessageProcessor):
def __init__(self):
@@ -125,6 +139,66 @@ def test_receive_too_many_fields(self):
self.assertEqual(["gorets:1|c|@0.1|yay"], self.processor.failures)
+class ProcessorStatsTest(TestCase):
+
+ def setUp(self):
+ self.timer = Timer()
+ self.processor = MessageProcessor(time_function=self.timer)
+
+ def test_process_keeps_processing_time(self):
+ """
+ When a message is processed, we keep the time it took to process it for
+ later reporting.
+ """
+ self.timer.set([0, 5])
+ self.processor.process("gorets:1|c")
+ self.assertEqual(5, self.processor.process_timings["c"])
+ self.assertEquals(1, self.processor.by_type["c"])
+
+ def test_flush_tracks_flushing_time(self):
+ """
+ When flushing metrics, we track the time each metric type took to be
+ flushed.
+ """
+ self.timer.set([0,
+ 0, 1, # counter
+ 1, 3, # timer
+ 3, 6, # gauge
+ 6, 10, # meter
+ 10, 15, # plugin
+ ])
+ def flush_metrics_summary(messages, num_stats, per_metric, timestamp):
+ self.assertEqual((0, 1), per_metric["counter"])
+ self.assertEqual((0, 2), per_metric["timer"])
+ self.assertEqual((0, 3), per_metric["gauge"])
+ self.assertEqual((0, 4), per_metric["meter"])
+ self.assertEqual((0, 5), per_metric["plugin"])
+ self.addCleanup(setattr, self.processor, "flush_metrics_summary",
+ self.processor.flush_metrics_summary)
+ self.processor.flush_metrics_summary = flush_metrics_summary
+ self.processor.flush()
+
+ def test_flush_metrics_summary(self):
+ """
+ When flushing the metrics summary, we report duration and count of
+ flushing each different type of metric as well as processing time.
+ """
+ per_metric = {"counter": (10, 1)}
+ self.processor.process_timings = {"c": 1}
+ self.processor.by_type = {"c": 42}
+ messages = []
+ self.processor.flush_metrics_summary(messages, 1, per_metric, 42)
+ self.assertEqual(5, len(messages))
+ self.assertEqual([('statsd.numStats', 1, 42),
+ ('statsd.flush.counter.count', 10, 42),
+ ('statsd.flush.counter.duration', 1000, 42),
+ ('statsd.receive.c.count', 42, 42),
+ ('statsd.receive.c.duration', 1000, 42)],
+ messages)
+ self.assertEquals({}, self.processor.process_timings)
+ self.assertEquals({}, self.processor.by_type)
+
+
class FlushMessagesTest(TestCase):
def setUp(self):
@@ -136,7 +210,7 @@ def test_flush_no_stats(self):
Flushing the message processor when there are no stats available should
still produce one message where C{statsd.numStats} is set to zero.
"""
- self.assertEqual([("statsd.numStats", 0, 42)], self.processor.flush())
+ self.assertEqual(("statsd.numStats", 0, 42), self.processor.flush()[0])
def test_flush_counter(self):
"""
@@ -145,7 +219,6 @@ def test_flush_counter(self):
"""
self.processor.counter_metrics["gorets"] = 42
messages = self.processor.flush()
- self.assertEqual(3, len(messages))
self.assertEqual(("stats.gorets", 4, 42), messages[0])
self.assertEqual(("stats_counts.gorets", 42, 42), messages[1])
self.assertEqual(("statsd.numStats", 1, 42), messages[2])
@@ -158,7 +231,6 @@ def test_flush_counter_one_second_interval(self):
"""
self.processor.counter_metrics["gorets"] = 42
messages = self.processor.flush(interval=1000)
- self.assertEqual(3, len(messages))
self.assertEqual(("stats.gorets", 42, 42), messages[0])
self.assertEqual(("stats_counts.gorets", 42, 42), messages[1])
self.assertEqual(("statsd.numStats", 1, 42), messages[2])
@@ -172,7 +244,6 @@ def test_flush_single_timer_single_time(self):
"""
self.processor.timer_metrics["glork"] = [24]
messages = self.processor.flush()
- self.assertEqual(6, len(messages))
self.assertEqual(("stats.timers.glork.count", 1, 42), messages[0])
self.assertEqual(("stats.timers.glork.lower", 24, 42), messages[1])
self.assertEqual(("stats.timers.glork.mean", 24, 42), messages[2])
@@ -192,7 +263,6 @@ def test_flush_single_timer_multiple_times(self):
"""
self.processor.timer_metrics["glork"] = [4, 8, 15, 16, 23, 42]
messages = self.processor.flush()
- self.assertEqual(6, len(messages))
self.assertEqual(("stats.timers.glork.count", 6, 42), messages[0])
self.assertEqual(("stats.timers.glork.lower", 4, 42), messages[1])
self.assertEqual(("stats.timers.glork.mean", 13, 42), messages[2])
@@ -215,7 +285,6 @@ def test_flush_single_timer_50th_percentile(self):
"""
self.processor.timer_metrics["glork"] = [4, 8, 15, 16, 23, 42]
messages = self.processor.flush(percent=50)
- self.assertEqual(6, len(messages))
self.assertEqual(("stats.timers.glork.count", 6, 42), messages[0])
self.assertEqual(("stats.timers.glork.lower", 4, 42), messages[1])
self.assertEqual(("stats.timers.glork.mean", 9, 42), messages[2])
@@ -233,7 +302,6 @@ def test_flush_gauge_metric(self):
self.processor.process("gorets:9.6|g")
messages = self.processor.flush()
- self.assertEqual(2, len(messages))
self.assertEqual(
("stats.gauge.gorets.value", 9.6, 42), messages[0])
self.assertEqual(
@@ -249,7 +317,6 @@ def test_flush_distinct_metric(self):
self.processor.process("gorets:item|pd")
messages = self.processor.flush()
- self.assertEqual(5, len(messages))
self.assertEqual(("stats.pdistinct.gorets.count", 1, 42), messages[0])
self.assertEqual(("stats.pdistinct.gorets.count_1day",
5552568545, 42), messages[1])
@@ -294,7 +361,6 @@ def test_flush_meter_metric(self):
self.time_now += 1
messages = self.processor.flush()
- self.assertEqual(6, len(messages))
self.assertEqual(
("stats.meter.gorets.15min_rate", 0.0, self.time_now),
messages[0])
@@ -321,7 +387,6 @@ def test_flush_meter_metric(self):
self.mark_minutes(1)
self.time_now += 60
messages = self.processor.flush()
- self.assertEqual(6, len(messages))
self.assertEqual(
("stats.meter.gorets.15min_rate", 0.561304, self.time_now),
messages[0])

No commit comments for this range

Something went wrong with that request. Please try again.