Skip to content
This repository has been archived by the owner on Jun 26, 2020. It is now read-only.

Commit

Permalink
carbonara: optimize resampling
Browse files Browse the repository at this point in the history
This changes the resampling method we used to have by not doing any real
resampling like Pandas used too. The `resampling' method from Pandas insert a
lot of empty points filled with NaN as value if your timeserie is sparse –
which is a typical case in Carbonara/Gnocchi. This ends up creating timeseries
with millions of empty points, consuming hundreds of MB of memory for nothing.

This method inspired by Jeff on pandas-dev/pandas#11217
implements a simpler versino of what `resample` does: it groups the sample by
timestamp, and then compute an aggregation method on them. This avoids creating
thousands of useless points and ends up being much faster and consume a *LOT*
less memory.

Benchmarked: for a new timeserie with 10k measures with 10-80k points by
archive this reduces the memory usage of metricd from 2 GB to 100 MB and the
compute speed of the most complicated aggregations like percentile to 15min to
20s (45× speed improvement).

Change-Id: I1b8718508bdd4633e7324949b76184efc3718ede
  • Loading branch information
jd committed Oct 6, 2015
1 parent 7c02791 commit 5a174ab
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 48 deletions.
50 changes: 33 additions & 17 deletions gnocchi/carbonara.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import re

import msgpack
import numpy
import pandas
import six

Expand Down Expand Up @@ -51,6 +50,14 @@ def __init__(self, reason):
super(UnAggregableTimeseries, self).__init__(reason)


class UnknownAggregationMethod(Exception):
"""Error raised when the aggregation method is unknown."""
def __init__(self, agg):
self.aggregation_method = agg
super(UnknownAggregationMethod, self).__init__(
"Unknown aggregation method `%s'" % agg)


class SerializableMixin(object):

@classmethod
Expand Down Expand Up @@ -227,13 +234,6 @@ class AggregatedTimeSerie(TimeSerie):

_AGG_METHOD_PCT_RE = re.compile(r"([1-9][0-9]?)pct")

@staticmethod
def _percentile(a, q):
# TODO(jd) Find a way to compute all the percentile in one pass as
# numpy can do numpy.percentile(a, q=[75, 90, 95])
if len(a) > 0:
return numpy.percentile(a, q)

def __init__(self, timestamps=None, values=None,
max_size=None,
sampling=None, aggregation_method='mean'):
Expand All @@ -245,18 +245,20 @@ def __init__(self, timestamps=None, values=None,
"""
super(AggregatedTimeSerie, self).__init__(timestamps, values)

self.aggregation_method = aggregation_method

m = self._AGG_METHOD_PCT_RE.match(aggregation_method)

if m:
self.aggregation_method_func = functools.partial(
self._percentile, q=float(m.group(1)))
self.q = float(m.group(1)) / 100
self.aggregation_method_func_name = 'quantile'
else:
self.aggregation_method_func = aggregation_method
if not hasattr(pandas.core.groupby.SeriesGroupBy,
aggregation_method):
raise UnknownAggregationMethod(aggregation_method)
self.aggregation_method_func_name = aggregation_method

self.sampling = pandas.tseries.frequencies.to_offset(sampling)
self.max_size = max_size
self.aggregation_method = aggregation_method

def __eq__(self, other):
return (isinstance(other, AggregatedTimeSerie)
Expand Down Expand Up @@ -295,12 +297,26 @@ def _truncate(self):
# Remove empty points if any that could be added by aggregation
self.ts = self.ts.dropna()[-self.max_size:]

@staticmethod
def _round_timestamp(ts, freq):
return pandas.Timestamp(
(ts.value // freq.delta.value) * freq.delta.value)

def _resample(self, after):
if self.sampling:
self.ts = self.ts[after:].resample(
self.sampling,
how=self.aggregation_method_func).dropna().combine_first(
self.ts[:after][:-1])
# Group by the sampling, and then apply the aggregation method on
# the points after `after'
groupedby = self.ts[after:].groupby(
functools.partial(self._round_timestamp,
freq=self.sampling))
agg_func = getattr(groupedby, self.aggregation_method_func_name)
if self.aggregation_method_func_name == 'quantile':
aggregated = agg_func(self.q)
else:
aggregated = agg_func()
# Now combine the result with the rest of the point – everything
# that is before `after'
self.ts = aggregated.combine_first(self.ts[:after][:-1])

def update(self, ts):
index = ts.ts.index
Expand Down
50 changes: 19 additions & 31 deletions gnocchi/tests/test_carbonara.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,37 +105,14 @@ def test_base():
datetime.datetime(2014, 1, 1, 12, 0, 9)],
[3, 5, 6])

def test_0_percentile(self):
ts = carbonara.AggregatedTimeSerie(sampling='1Min',
aggregation_method='0pct')
self.assertRaises(AttributeError,
ts.update,
carbonara.TimeSerie.from_tuples(
[(datetime.datetime(2014, 1, 1, 12, 0, 0), 3),
(datetime.datetime(2014, 1, 1, 12, 0, 4), 5),
(datetime.datetime(2014, 1, 1, 12, 0, 9), 6)]))

def test_100_percentile(self):
ts = carbonara.AggregatedTimeSerie(sampling='1Min',
aggregation_method='100pct')
self.assertRaises(AttributeError,
ts.update,
carbonara.TimeSerie.from_tuples(
[(datetime.datetime(2014, 1, 1, 12, 0, 0), 3),
(datetime.datetime(2014, 1, 1, 12, 0, 4), 5),
(datetime.datetime(2014, 1, 1, 12, 0, 9), 6)]))

def test_123_percentile(self):
ts = carbonara.AggregatedTimeSerie(sampling='1Min',
aggregation_method='123pct')
self.assertRaises(AttributeError,
ts.update,
carbonara.TimeSerie.from_tuples(
[(datetime.datetime(2014, 1, 1, 12, 0, 0), 3),
(datetime.datetime(2014, 1, 1, 12, 0, 4), 5),
(datetime.datetime(2014, 1, 1, 12, 0, 9), 6)]))

def test_74_percentile(self):
def test_bad_percentile(self):
for bad_percentile in ('0pct', '100pct', '-1pct', '123pct'):
self.assertRaises(carbonara.UnknownAggregationMethod,
carbonara.AggregatedTimeSerie,
sampling='1Min',
aggregation_method=bad_percentile)

def test_74_percentile_serialized(self):
ts = carbonara.AggregatedTimeSerie(sampling='1Min',
aggregation_method='74pct')
ts.update(carbonara.TimeSerie.from_tuples(
Expand All @@ -146,6 +123,17 @@ def test_74_percentile(self):
self.assertEqual(1, len(ts))
self.assertEqual(5.48, ts[datetime.datetime(2014, 1, 1, 12, 0, 0)])

# Serialize and unserialize
ts = carbonara.AggregatedTimeSerie.unserialize(ts.serialize())

ts.update(carbonara.TimeSerie.from_tuples(
[(datetime.datetime(2014, 1, 1, 12, 0, 0), 3),
(datetime.datetime(2014, 1, 1, 12, 0, 4), 5),
(datetime.datetime(2014, 1, 1, 12, 0, 9), 6)]))

self.assertEqual(1, len(ts))
self.assertEqual(5.48, ts[datetime.datetime(2014, 1, 1, 12, 0, 0)])

def test_95_percentile(self):
ts = carbonara.AggregatedTimeSerie(sampling='1Min',
aggregation_method='95pct')
Expand Down

0 comments on commit 5a174ab

Please sign in to comment.