Permalink
Browse files

First implementation of metricsd

  • Loading branch information...
1 parent 90590b0 commit e9e97c07a93c639665b4661ce330bdc5497ab444 @davisp davisp committed Nov 17, 2011
View
@@ -1,2 +1,3 @@
*.pyc
bucky.egg-info/
+dist/
View
@@ -22,25 +22,22 @@
log = logging.getLogger(__name__)
-class CarbonException(Exception):
- def __init__(self, mesg):
- self.mesg = mesg
- def __str__(self):
- return self.mesg
-
-
-class ConnectError(CarbonException):
- pass
+class DebugSocket(object):
+ def sendall(self, data):
+ sys.stdout.write(data)
class CarbonClient(object):
def __init__(self, cfg):
- ip = cfg["graphite_ip"]
- port = cfg["graphite_port"]
+ ip, port = cfg.graphite_ip, cfg.graphite_port
+ if cfg.debug:
+ log.debug("Connected the debug socket.")
+ self.sock = DebugSocket()
+ return
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.sock.connect((ip, port))
- log.info("Connect to Carbon at %s:%s" % (ip, port))
+ log.info("Connected to Carbon at %s:%s" % (ip, port))
except Exception:
log.error("Failed to connect to %s:%s" % (ip, port))
sys.exit(2)
View
@@ -0,0 +1,32 @@
+
+debug = False
+
+metricsd_ip = "127.0.0.1"
+metricsd_port = 23632
+metricsd_enabled = True
+metricsd_default_interval = 10.0
+metricsd_handlers = []
+
+collectd_ip = "127.0.0.1"
+collectd_port = 25826
+collectd_enabled = True
+collectd_types = None
+collectd_converters = []
+collectd_use_entry_points = True
+
+statsd_ip = "127.0.0.1"
+statsd_port = 8125
+statsd_enabled = True
+statsd_flush_time = 10.0
+
+graphite_ip = "127.0.0.1"
+graphite_port = 2003
+
+full_trace = False
+
+name_prefix = None
+name_postfix = None
+name_replace_char = '_'
+name_strip_duplicates = True
+name_host_trim = []
+
View
@@ -15,38 +15,15 @@
import copy
import logging
import os
-import socket
import struct
-import sys
-import threading
+from bucky.errors import ConfigError, ProtocolError
+from bucky.names import statname
+from bucky.udpserver import UDPServer
log = logging.getLogger(__name__)
-class CollectDError(Exception):
- def __init__(self, mesg):
- self.mesg = mesg
- def __str__(self):
- return self.mesg
-
-
-class ConfigError(CollectDError):
- pass
-
-
-class ProtocolError(CollectDError):
- pass
-
-
-class BindError(CollectDError):
- pass
-
-
-class ServerErrror(CollectDError):
- pass
-
-
class CPUConverter(object):
PRIORITY = 0
def __call__(self, sample):
@@ -231,51 +208,13 @@ def _parser(sample, data):
class CollectDConverter(object):
def __init__(self, cfg):
- self.prefix = cfg.get("collectd_prefix")
- self.postfix = cfg.get("collectd_postfix")
- self.replace = cfg.get("collectd_replace", "_")
- self.strip_dupes = cfg.get("collectd_strip_duplicates", True)
- self.use_entry_points = cfg.get("collectd_use_entry_points", True)
- host_trim = cfg.get("collectd_host_trim", [])
- self.host_trim = []
- for s in host_trim:
- s = list(reversed([p.strip() for p in s.split(".")]))
- self.host_trim.append(s)
self.converters = dict(DEFAULT_CONVERTERS)
self._load_converters(cfg)
def convert(self, sample):
- stat = self.stat(sample)
- return stat, sample["value_type"], sample["value"], int(sample["time"])
-
- def stat(self, sample):
- parts = []
- if self.prefix:
- parts.append(self.prefix)
- parts.extend(self.hostname(sample.get("host", "")))
handler = self.converters.get(sample["plugin"], self.default)
- parts.extend(handler(sample))
- if self.postfix:
- parts.append(self.postfix)
- if self.replace is not None:
- parts = [p.replace(".", self.replace) for p in parts]
- if self.strip_duplicates:
- parts = self.strip_duplicates(parts)
- return ".".join(parts)
-
- def hostname(self, host):
- parts = host.split(".")
- parts = list(reversed([p.strip() for p in parts]))
- for s in self.host_trim:
- same = True
- for i, p in enumerate(s):
- if p != parts[i]:
- same = False
- break
- if same:
- parts = parts[len(s):]
- return parts
- return parts
+ stat = statname(sample.get("host", ""), handle(sample))
+ return stat, sample["value_type"], sample["value"], int(sample["time"])
def default(self, sample):
parts = []
@@ -293,26 +232,19 @@ def default(self, sample):
parts.append(vname)
return parts
- def strip_duplicates(self, parts):
- ret = []
- for p in parts:
- if len(ret) == 0 or p != ret[-1]:
- ret.append(p)
- return ret
-
def _load_converters(self, cfg):
- cfg_conv = cfg.get("collectd_converters", {})
+ cfg_conv = cfg.collectd_converters
for conv in cfg_conv:
self._add_converter(conv, cfg_conv[conv])
- if not cfg["collectd_use_entry_points"]:
+ if not cfg.collectd_use_entry_points:
return
import pkg_resources
group = 'bucky.collectd.converters'
for ep in pkg_resources.iter_entry_points(group):
name, klass = ep.name, ep.load()
- self._add_converter(name, klass())
+ self._add_converter(ep, name, klass)
- def _add_converter(self, name, inst):
+ def _add_converter(self, ep, name, klass):
if name not in self.converters:
log.info("Converter: %s from %s" % (name, ep.module_name))
self.converters[name] = klass()
@@ -327,43 +259,28 @@ def _add_converter(self, name, inst):
log.info("Ignoring: %s from %s" % (name, ep.module_name))
-class CollectDServer(threading.Thread):
+class CollectDServer(UDPServer):
def __init__(self, queue, cfg):
- super(CollectDServer, self).__init__()
- self.setDaemon(True)
-
+ super(CollectDServer, self).__init__(cfg.collectd_ip, cfg.collectd_port)
self.queue = queue
- self.parser = CollectDParser(cfg["collectd_types"])
+ self.parser = CollectDParser(cfg.collectd_types)
self.converter = CollectDConverter(cfg)
- self.sock = self.init_socket(cfg["collectd_ip"], cfg["collectd_port"])
self.prev_samples = {}
- def init_socket(self, ip, port):
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ def handle(self, data, addr):
try:
- sock.bind((ip, port))
- log.info("Opened collectd socket %s:%s" % (ip, port))
- return sock
- except Exception:
- log.error("Error opening collectd socket %s:%s." % (ip, port))
- sys.exit(1)
-
- def run(self):
- while True:
- data, addr = self.sock.recvfrom(65535)
- try:
- for sample in self.parser.parse(data):
- self.last_sample = sample
- name, vtype, val, time = self.converter.convert(sample)
- if not name.strip():
- continue
- val = self.calculate(name, vtype, val, time)
- if val is not None:
- self.queue.put((name, val, time))
- except ProtocolError, e:
- log.error("Protocol error: %s" % e)
- log.info("Last sample: %s" % self.last_sample)
+ for sample in self.parser.parse(data):
+ self.last_sample = sample
+ name, vtype, val, time = self.converter.convert(sample)
+ if not name.strip():
+ continue
+ val = self.calculate(name, vtype, val, time)
+ if val is not None:
+ self.queue.put((name, val, time))
+ except ProtocolError, e:
+ log.error("Protocol error: %s" % e)
+ log.info("Last sample: %s" % self.last_sample)
+ return True
def calculate(self, name, vtype, val, time):
handlers = {
View
@@ -0,0 +1,18 @@
+
+class BuckyError(Exception):
+ def __init__(self, mesg):
+ self.mesg = mesg
+ def __str__(self):
+ return self.mesg
+
+
+class ConnectError(BuckyError):
+ pass
+
+
+class ConfigError(BuckyError):
+ pass
+
+
+class ProtocolError(BuckyError):
+ pass
Oops, something went wrong.

0 comments on commit e9e97c0

Please sign in to comment.