Permalink
Browse files

Merge pull request #28 from vvuksan/master

Clean up old server-gmond.py -- thanks @vvuksan
  • Loading branch information...
2 parents f638f06 + 0098ac0 commit eda9e80773a66fd9e8c9ec880a119133dd006a3d @sivy committed Sep 13, 2011
Showing with 6 additions and 185 deletions.
  1. +0 −180 pystatsd/server-gmond.py
  2. +6 −5 pystatsd/server.py
View
@@ -1,180 +0,0 @@
-import re
-from socket import AF_INET, SOCK_DGRAM, socket
-import threading
-import time
-import types
-import gmetric
-from xdrlib import Packer, Unpacker
-
-try:
- from setproctitle import setproctitle
-except ImportError:
- setproctitle = None
-
-from daemon import Daemon
-
-
-__all__ = ['Server']
-
-def _clean_key(k):
- return re.sub(
- '[^a-zA-Z_\-0-9\.]',
- '',
- k.replace('/','-').replace(' ','_')
- )
-
-class Server(object):
-
- def __init__(self, pct_threshold=90, debug=False, ganglia_host='localhost', ganglia_port=8649):
- self.buf = 1024
- # How often to flush metrics in milliseconds (default 15 seconds)
- self.flush_interval = 15000
- # Set DMAX to flush interval plus 20%. That should avoid metrics to prematurely expire if there is
- # some type of a delay when flushing
- self.dmax = int ( self.flush_interval * 1.2 )
- self.pct_threshold = pct_threshold
- self.ganglia_host = ganglia_host
- self.ganglia_port = ganglia_port
- self.ganglia_protocol = "udp"
- self.debug = debug
- # What hostname should these metrics be attached to. Here we'll just create a fake host called
- # statsd
- self.ganglia_spoof_host = "statsd:statsd"
-
- self.counters = {}
- self.timers = {}
- self.flusher = 0
-
-
- def process(self, data):
- key, val = data.split(':')
- key = _clean_key(key)
-
- sample_rate = 1;
- fields = val.split('|')
-
- if (fields[1] == 'ms'):
- if key not in self.timers:
- self.timers[key] = []
- self.timers[key].append(float(fields[0] or 0))
- else:
- if len(fields) == 3:
- sample_rate = float(re.match('^@([\d\.]+)', fields[2]).groups()[0])
- if key not in self.counters:
- self.counters[key] = 0;
- self.counters[key] += int(fields[0] or 1) * (1 / sample_rate)
-
- def flush(self):
- stats = 0
- g = gmetric.Gmetric(self.ganglia_host, self.ganglia_port, self.ganglia_protocol)
-
- for k, v in self.counters.items():
- v = float(v) / (self.flush_interval / 1000)
- if self.debug:
- print "Sending %s => count=%s" % ( k, v )
- # We put counters in _counters group. Underscore is to make sure counters show up
- # first in the GUI. Change below if you disagree
- g.send(k, v, "double", "count", "both", 60, self.dmax, "_counters", self.ganglia_spoof_host)
-
- self.counters[k] = 0
- stats += 1
-
- for k, v in self.timers.items():
- if len(v) > 0:
- v.sort()
- count = len(v)
- min = v[0]
- max = v[-1]
-
- mean = min
- max_threshold = max
-
- if count > 1:
- thresh_index = int((self.pct_threshold / 100.0) * count)
- max_threshold = v[thresh_index - 1]
- total = sum(v)
- mean = total / count
-
- self.timers[k] = []
-
- if self.debug:
- print "Sending %s ====> lower=%s, mean=%s, upper=%s, %dpct=%s, count=%s" % ( k, min, mean, max, self.pct_threshold, max_threshold, count )
- # What group should these metrics be in. For the time being we'll set it to the name of the key
- group = k
- g.send(k + "_lower", min, "double", "time", "both", 60, self.dmax, group, self.ganglia_spoof_host)
- g.send(k + "_mean", mean, "double", "time", "both", 60, self.dmax, group, self.ganglia_spoof_host)
- g.send(k + "_upper", max, "double", "time", "both", 60, self.dmax, group, self.ganglia_spoof_host)
- g.send(k + "_count", count, "double", "count", "both", 60, self.dmax, group, self.ganglia_spoof_host)
- g.send(k + "_" + str(self.pct_threshold) +"pct", max_threshold, "double", "time", "both", 60, self.dmax, group, self.ganglia_spoof_host)
-
- stats += 1
-
- if self.debug:
- print "\n================== Flush completed. Waiting until next flush. Sent out %d metrics =======" % ( stats )
-
-
- self._set_timer()
-
- def _set_timer(self):
- self._timer = threading.Timer(self.flush_interval/1000, self.flush)
- self._timer.start()
-
- def serve(self, hostname='', port=8125, ganglia_host='localhost', ganglia_port=8649):
- assert type(port) is types.IntType, 'port is not an integer: %s' % (port)
- addr = (hostname, port)
- self._sock = socket(AF_INET, SOCK_DGRAM)
- self._sock.bind(addr)
- self.ganglia_host = ganglia_host
- self.ganglia_port = ganglia_port
-
- import signal
- import sys
- def signal_handler(signal, frame):
- self.stop()
- signal.signal(signal.SIGINT, signal_handler)
-
- self._set_timer()
- while 1:
- data, addr = self._sock.recvfrom(self.buf)
- self.process(data)
-
- def stop(self):
- self._timer.cancel()
- self._sock.close()
-
-class ServerDaemon(Daemon):
- def run(self, options):
- if setproctitle:
- setproctitle('pystatsd')
- server = Server(pct_threshold=options.pct, debug=options.debug)
- server.serve(options.name, options.port, options.ganglia_host,
- options.ganglia_port)
-
-def run_server():
- import sys
- import argparse
- parser = argparse.ArgumentParser()
- parser.add_argument('-d', '--debug', dest='debug', action='store_true', help='debug mode', default=False)
- parser.add_argument('-n', '--name', dest='name', help='hostname to run on', default='')
- parser.add_argument('-p', '--port', dest='port', help='port to run on', type=int, default=8125)
- parser.add_argument('--ganglia-port', dest='ganglia_port', help='port to connect to ganglia on', type=int, default=8649)
- parser.add_argument('--ganglia-host', dest='ganglia_host', help='host to connect to ganglia on', type=str, default='localhost')
- parser.add_argument('-t', '--pct', dest='pct', help='stats pct threshold', type=int, default=90)
- parser.add_argument('-D', '--daemon', dest='daemonize', action='store_true', help='daemonize', default=False)
- parser.add_argument('--pidfile', dest='pidfile', action='store', help='pid file', default='/tmp/pystatsd.pid')
- parser.add_argument('--restart', dest='restart', action='store_true', help='restart a running daemon', default=False)
- parser.add_argument('--stop', dest='stop', action='store_true', help='stop a running daemon', default=False)
- options = parser.parse_args(sys.argv[1:])
-
- daemon = ServerDaemon(options.pidfile)
- if options.daemonize:
- daemon.start(options)
- elif options.restart:
- daemon.restart(options)
- elif options.stop:
- daemon.stop()
- else:
- daemon.run(options)
-
-if __name__ == '__main__':
- run_server()
View
@@ -40,7 +40,7 @@ def _clean_key(k):
class Server(object):
def __init__(self, pct_threshold=90, debug=False, transport = 'graphite',
- ganglia_host='localhost', ganglia_port=8649,
+ ganglia_host='localhost', ganglia_port=8649, ganglia_spoof_host='statd:statd',
graphite_host='localhost', graphite_port=2003,
flush_interval=10000, no_aggregate_counters = False, counters_prefix = 'stats',
timers_prefix = 'stats.timers'):
@@ -55,9 +55,8 @@ def __init__(self, pct_threshold=90, debug=False, transport = 'graphite',
# Set DMAX to flush interval plus 20%. That should avoid metrics to prematurely expire if there is
# some type of a delay when flushing
self.dmax = int ( self.flush_interval * 1.2 )
- # What hostname should these metrics be attached to. Here we'll just create a fake host called
- # statsd
- self.ganglia_spoof_host = "statsd:statsd"
+ # What hostname should these metrics be attached to.
+ self.ganglia_spoof_host = ganglia_spoof_host
# Graphite specific settings
self.graphite_host = graphite_host
@@ -220,6 +219,7 @@ def run(self, options):
graphite_host = options.graphite_host,
graphite_port = options.graphite_port,
ganglia_host = options.ganglia_host,
+ ganglia_spoof_host = options.ganglia_spoof_host,
ganglia_port = options.ganglia_port,
flush_interval = options.flush_interval,
no_aggregate_counters = options.no_aggregate_counters,
@@ -240,6 +240,7 @@ def run_server():
parser.add_argument('--graphite-host', dest='graphite_host', help='host to connect to graphite on (default: localhost)', type=str, default='localhost')
parser.add_argument('--ganglia-port', dest='ganglia_port', help='port to connect to ganglia on', type=int, default=8649)
parser.add_argument('--ganglia-host', dest='ganglia_host', help='host to connect to ganglia on', type=str, default='localhost')
+ parser.add_argument('--ganglia-spoof-host', dest='ganglia_spoof_host', help='host to report metrics as to ganglia', type=str, default='statd:statd')
parser.add_argument('--flush-interval', dest='flush_interval', help='how often to send data to graphite in millis (default: 10000)', type=int, default=10000)
parser.add_argument('--no-aggregate-counters', dest='no_aggregate_counters', help='should statsd report counters as absolute instead of count/sec', action='store_true')
parser.add_argument('--counters-prefix', dest='counters_prefix', help='prefix to append before sending counter data to graphite (default: statsd)', type=str, default='statsd')
@@ -262,4 +263,4 @@ def run_server():
daemon.run(options)
if __name__ == '__main__':
- run_server()
+ run_server()

0 comments on commit eda9e80

Please sign in to comment.