forked from rax-maas/blueflood-carbon-forwarder
/
graphite_blueflood_plugin.py
164 lines (137 loc) · 7.06 KB
/
graphite_blueflood_plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import logging
import ConfigParser
from twisted.application.service import IServiceMaker, Service, MultiService
from twisted.internet.endpoints import serverFromString
from twisted.internet.protocol import Factory
from twisted.internet.task import LoopingCall
from twisted.application.internet import TCPServer
from twisted.plugin import IPlugin
from twisted.python import usage, log
from twisted.web.client import Agent
from zope.interface import implementer
from bluefloodserver.protocols import MetricLineReceiver, MetricPickleReceiver
from bluefloodserver.collect import MetricCollection, ConsumeFlush, BluefloodFlush
from bluefloodserver.blueflood import BluefloodEndpoint
from txKeystone import KeystoneAgent
class Options(usage.Options):
AUTH_URL = 'https://identity.api.rackspacecloud.com/v2.0/tokens'
DEFAULT_TTL = 60 * 60 * 24
optParameters = [
['endpoint', 'e', 'tcp:2004', 'Twisted formatted endpoint to listen to pickle protocol metrics'],
['interval', 'i', 30, 'Amount of time to wait before sending the next batch of metrics to blueflood, in seconds.'],
['blueflood', 'b', 'http://localhost:19000', 'Blueflood server ingest URL (schema, host, port)'],
['tenant', 't', '', 'Blueflood tenant ID'],
['metric_prefix', 'p', '', 'prefix metric name with this string before sending them to Blueflood'],
['ttl', '', DEFAULT_TTL, 'TTL value for metrics, sec'],
['user', 'u', '', 'Rackspace authentication username. Leave empty if no auth is required'],
['key', 'k', '', 'Rackspace authentication password. It is recommended not to set this option from the command line, as that can compromise api keys. Instead, set the key in a config file and use the \'--config\' option below.'],
['auth_url', '', AUTH_URL, 'Auth URL'],
['limit', '', 0, 'Blueflood json payload limit, bytes. 0 means no limit'],
['protocol', '', 'MetricPickleReceiver', 'Listening protocol class. MetricPickleReceiver for receiving metrics from graphite, or MetricLineReceiver to act as a graphite replacement.'],
['overwrite_collection_timestamp', '', False, 'Replace metric time with current blueflood carbon forwarder node time'],
['config', 'c', None,
'Path to a configuration file. The file must be in INI format, with '
'[bracketed] sections. All sections other than '
'[blueflood_carbon_forwarder] will be ignored. If this option is not '
'specified, no config file will be processed. Any of the above options'
' can be set in the config file. Options specified in the config file'
' override values on the command line.']
]
class GraphiteMetricFactory(Factory):
def __init__(self):
flusher = ConsumeFlush()
self._metric_collection = MetricCollection(flusher)
def processMetric(self, metric, datapoint):
log.msg('Receive metric {} {}:{}'.format(metric, datapoint[0], datapoint[1]), level=logging.DEBUG)
self._metric_collection.collect(metric, datapoint)
def flushMetric(self):
try:
log.msg('Sending {} metrics'.format(self._metric_collection.count()), level=logging.DEBUG)
self._metric_collection.flush()
except Exception, e:
log.err(e)
class MetricService(Service):
def __init__(self, **kwargs):
self.protocol_cls = kwargs.get('protocol_cls')
self.endpoint = kwargs.get('endpoint')
self.flush_interval = kwargs.get('interval')
self.blueflood_url = kwargs.get('blueflood_url')
self.tenant = kwargs.get('tenant')
self.ttl = kwargs.get('ttl')
self.user = kwargs.get('user')
self.key = kwargs.get('key')
self.auth_url = kwargs.get('auth_url')
self.limit = kwargs.get('limit', 0)
self.metric_prefix = kwargs.get('metric_prefix', None)
self.overwrite_collection_timestamp = \
kwargs.get('overwrite_collection_timestamp', False)
self.port = None
def startService(self):
from twisted.internet import reactor
server = serverFromString(reactor, self.endpoint)
log.msg('Start listening at {}'.format(self.endpoint))
factory = GraphiteMetricFactory.forProtocol(self.protocol_cls)
agent = Agent(reactor)
if self.user:
agent = KeystoneAgent(agent, self.auth_url, (self.user, self.key))
log.msg('Auth URL: {}, user: {}'.format(self.auth_url, self.user))
self._setup_blueflood(factory, agent)
self.timer = LoopingCall(factory.flushMetric)
self.timer.start(self.flush_interval)
d = server.listen(factory)
d.addCallback(self._store_listening_port)
return d
def _store_listening_port(self, port):
self.port = port
def stopService(self):
if self.port:
self.port.stopListening()
self.timer.stop()
def _setup_blueflood(self, factory, agent):
log.msg('Send metrics to {} as tenant {} with {} sec interval'
.format(self.blueflood_url, self.tenant, self.flush_interval))
log.msg('Limit is {} bytes'.format(self.limit))
client = BluefloodEndpoint(
ingest_url=self.blueflood_url,
tenant=self.tenant,
agent=agent,
limit=int(self.limit),
overwrite_collection_timestamp=self.overwrite_collection_timestamp)
flusher = BluefloodFlush(client=client, ttl=self.ttl,
metric_prefix=self.metric_prefix)
factory._metric_collection.flusher = flusher
@implementer(IServiceMaker, IPlugin)
class MetricServiceMaker(object):
tapname = 'blueflood-forward'
description = 'Forwarding metrics from graphite sources to Blueflood'
options = Options
def makeService(self, options):
if 'config' in options and options['config'] is not None:
filename = options['config']
config = ConfigParser.RawConfigParser()
config.read(filename)
for key, value in config.items('blueflood_carbon_forwarder'):
if key in ['interval', 'ttl', 'limit']:
value = int(value)
options[key] = value
if options['protocol'] == "MetricPickleReceiver":
protocol_cls = MetricPickleReceiver
elif options['protocol'] == "MetricLineReceiver":
protocol_cls = MetricLineReceiver
else:
protocol_cls = MetricPickleReceiver
return MetricService(
protocol_cls=protocol_cls,
endpoint=options['endpoint'],
interval=float(options['interval']),
blueflood_url=options['blueflood'],
tenant=options['tenant'],
ttl=int(options['ttl']),
user=options['user'],
key=options['key'],
auth_url=options['auth_url'],
limit=options['limit'],
metric_prefix=options['metric_prefix'],
overwrite_collection_timestamp=options['overwrite_collection_timestamp']
)
serviceMaker = MetricServiceMaker()