Skip to content

Commit

Permalink
Merge pull request #18 from nicholaskuechler/replace_time-20160617-1326
Browse files Browse the repository at this point in the history
Adds option to replace time to deal with delayed metrics.
  • Loading branch information
VinnyQ committed Jun 28, 2016
2 parents cedbeae + dd4f183 commit b1f309f
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 16 deletions.
3 changes: 2 additions & 1 deletion README.md
Expand Up @@ -38,6 +38,7 @@ It does very primitive _'caching'_: aggregates all metrics and flushes them in r
| -k | Keystone key | |
| --auth-url | Keystone token URL | |
| --config | Set options from a config file | |
| --overwrite_collection_timestamp | Replace metric collection timestamp with ingest timestamp | False |

In case you need no authentication leave `-u`/`--user` command line argument empty (default value).
It is recommended not to set the `key` option from the command line, as that can compromise api keys. Instead, set the key in a config file and set the `--config` option to the name of the file.'
Expand All @@ -59,7 +60,7 @@ Configuration is done with command line arguments passed to twistd daemon when r
```

#Logging
# Logging

Logging can be controlled using LogObserver provided along or you can use your own LogObserver

Expand Down
33 changes: 22 additions & 11 deletions bluefloodserver/blueflood.py
Expand Up @@ -4,6 +4,7 @@
import urlparse
import json
import logging
import time

from StringIO import StringIO

Expand All @@ -14,21 +15,26 @@
from twisted.web.http_headers import Headers
from twisted.python import log


def _get_metrics_url(url, tenantId):
return url + '/v2.0/'\
+ tenantId + '/ingest'


def _get_metrics_query_url(url, tenantId,
metricName, start, end, points):
return url + '/v2.0/' + tenantId\
+ '/views/' + metricName\
+ '?from=' + str(start) + '&to=' + str(end) + '&points=' + str(points)


def _get_metrics_query_url_resolution(url, tenantId,
metricName, start, end, resolution='FULL'):
metricName, start, end,
resolution='FULL'):
return url + '/v2.0/' + tenantId\
+ '/views/' + metricName\
+ '?from=' + str(start) + '&to=' + str(end) + '&resolution=' + resolution
+ '?from=' + str(start) + '&to=' + str(end)\
+ '&resolution=' + resolution


class LimitExceededException(Exception):
Expand All @@ -37,7 +43,9 @@ class LimitExceededException(Exception):

class BluefloodEndpoint():

def __init__(self, ingest_url='http://localhost:19000', retrieve_url='http://localhost:20000', tenant='tenant-id', agent=None, limit=None):
def __init__(self, ingest_url='http://localhost:19000',
retrieve_url='http://localhost:20000', tenant='tenant-id',
agent=None, limit=None, overwrite_collection_timestamp=False):
self.agent = agent
self.ingest_url = ingest_url
self.retrieve_url = retrieve_url
Expand All @@ -46,22 +54,26 @@ def __init__(self, ingest_url='http://localhost:19000', retrieve_url='http://loc
self.headers = {}
self.limit = limit
self._buffer_size = 0
self.overwrite_collection_timestamp = overwrite_collection_timestamp

def ingest(self, metric_name, metric_time, value, ttl):
if self.overwrite_collection_timestamp:
metric_time = [time.time()]
elif not isinstance(metric_time, list):
metric_time = [metric_time]

def ingest(self, metric_name, time, value, ttl):
if not isinstance(time, list):
time = [time]
if not isinstance(value, list):
value = [value]
if len(time) != len(value):
raise Exception('time and value list lengths differ')

if len(metric_time) != len(value):
raise Exception('time and value list lengths differ')

data = [{
"collectionTime": t*1000,
"ttlInSeconds": ttl,
"metricValue": v,
"metricName": metric_name
} for t,v in zip(time, value)]
} for t, v in zip(metric_time, value)]
row_size = len(json.dumps(data))
# len('[]'') = 2, len(',' * total) = len(data) - 1
if self.limit and row_size + len(self._json_buffer) - 1 + self._buffer_size > self.limit:
Expand All @@ -81,12 +93,11 @@ def commit(self):

resp = yield d
log.msg('POST {}, resp_code={}'.format(url, resp.code), level=logging.DEBUG)
if resp.code in [200,201,202,204,207]:
if resp.code in [200, 201, 202, 204, 207]:
self._json_buffer = []
self._buffer_size = 0
returnValue(None)


@inlineCallbacks
def retrieve_points(self, metric_name, start, to, points):
d = self.agent.request(
Expand Down
1 change: 1 addition & 0 deletions example.conf
Expand Up @@ -10,3 +10,4 @@ user=
key=
auth_url=https://identity.api.rackspacecloud.com/v2.0/tokens
limit=0
overwrite_collection_timestamp=False
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -35,7 +35,7 @@ def run_tests(self):

setup(
name='blueflood-carbon-forwarder',
version="0.4.0",
version="0.4.1",
url='https://github.com/rackerlabs/blueflood-carbon-forwarder',
license='Apache Software License',
author='Rackspace Metrics',
Expand Down
13 changes: 10 additions & 3 deletions twisted/plugins/graphite_blueflood_plugin.py
Expand Up @@ -32,6 +32,7 @@ class Options(usage.Options):
['key', 'k', '', 'Rackspace authentication password. It is recommended not to set this option from the command line, as that can compromise api keys. Instead, set the key in a config file and use the \'--config\' option below.'],
['auth_url', '', AUTH_URL, 'Auth URL'],
['limit', '', 0, 'Blueflood json payload limit, bytes. 0 means no limit'],
['overwrite_collection_timestamp', '', False, 'Replace metric time with current blueflood carbon forwarder node time'],
['config', 'c', None,
'Path to a configuration file. The file must be in INI format, with '
'[bracketed] sections. All sections other than '
Expand Down Expand Up @@ -74,6 +75,8 @@ def __init__(self, **kwargs):
self.auth_url = kwargs.get('auth_url')
self.limit = kwargs.get('limit', 0)
self.metric_prefix = kwargs.get('metric_prefix', None)
self.overwrite_collection_timestamp = \
kwargs.get('overwrite_collection_timestamp', False)
self.port = None

def startService(self):
Expand Down Expand Up @@ -110,10 +113,13 @@ def _setup_blueflood(self, factory, agent):
ingest_url=self.blueflood_url,
tenant=self.tenant,
agent=agent,
limit=int(self.limit))
flusher = BluefloodFlush(client=client, ttl=self.ttl, metric_prefix=self.metric_prefix)
limit=int(self.limit),
overwrite_collection_timestamp=self.overwrite_collection_timestamp)
flusher = BluefloodFlush(client=client, ttl=self.ttl,
metric_prefix=self.metric_prefix)
factory._metric_collection.flusher = flusher


@implementer(IServiceMaker, IPlugin)
class MetricServiceMaker(object):
tapname = 'blueflood-forward'
Expand Down Expand Up @@ -142,7 +148,8 @@ def makeService(self, options):
key=options['key'],
auth_url=options['auth_url'],
limit=options['limit'],
metric_prefix=options['metric_prefix']
metric_prefix=options['metric_prefix'],
overwrite_collection_timestamp=options['overwrite_collection_timestamp']
)


Expand Down

0 comments on commit b1f309f

Please sign in to comment.