Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Resilliency changes/fixes, and clear timers fix

- Fixes a bug introduced in 7f502e8
  that failed to clear timer counters when metrics where reported via
  the plain graphite protocol.
- Makes sure eventlet timeouts during reporting to graphite are actually
  caught, retried, and that the socket is cleaned up.
- wrap decode_recv and stats flush in bare excepts to let both
  continue on incase of unexpected errors that didn't inherit from
  Exception.

should help trackdown or fix issue #6
  • Loading branch information...
commit c87134e6e1ce077bda066ed8a64de14089a3e259 1 parent 2f2c55a
@pandemicsyn authored
Showing with 42 additions and 18 deletions.
  1. +1 −1  statsdpy/__init__.py
  2. +41 −17 statsdpy/statsd.py
View
2  statsdpy/__init__.py
@@ -2,7 +2,7 @@
#: Version information (major, minor, revision[, 'dev']).
-version_info = (0, 0, 11)
+version_info = (0, 0, 12)
#: Version string 'major.minor.revision'.
version = __version__ = ".".join(map(str, version_info))
gettext.install('statsdpy')
View
58 statsdpy/statsd.py
@@ -15,6 +15,7 @@
class StatsdServer(object):
TRUE_VALUES = set(('true', '1', 'yes', 'on', 't', 'y'))
+
def __init__(self, conf):
self.logger = logging.getLogger('statsdpy')
self.logger.setLevel(logging.INFO)
@@ -26,6 +27,7 @@ def __init__(self, conf):
self.graphite_host = conf.get('graphite_host', '127.0.0.1')
self.graphite_port = int(conf.get('graphite_port', '2003'))
self.graphite_pport = int(conf.get('graphite_pickle_port', '2004'))
+ self.graphite_timeout = int(conf.get('graphite_timeout', '5'))
self.pickle_proto = conf.get('pickle_protocol') in self.TRUE_VALUES
self.max_batch_size = int(conf.get('pickle_batch_size', '300'))
self.listen_addr = conf.get('listen_addr', '127.0.0.1')
@@ -82,7 +84,7 @@ def _get_batches(self, items):
for i in xrange(0, len(items), self.max_batch_size):
yield items[i:i + self.max_batch_size]
- def report_stats(self, payload):
+ def report_stats(self, payload, is_retry=False):
"""
Send data to graphite host
@@ -94,34 +96,51 @@ def report_stats(self, payload):
else:
print "reporting stats -> {\n%s}" % payload
try:
- with eventlet.Timeout(5, True):
- graphite = socket.socket()
+ graphite = socket.socket()
+ with eventlet.Timeout(self.graphite_timeout, True):
graphite.connect(self.graphite_addr)
graphite.sendall(payload)
graphite.close()
+ except eventlet.timeout.Timeout:
+ self.logger.critical("Timeout sending to graphite")
+ if self.debug:
+ print "Timeout talking to graphite"
+ if not is_retry:
+ self.logger.critical('Attempting 1 retry!')
+ self.report_stats(payload, is_retry=True)
+ else:
+ self.logger.critical('Already retried once, giving up')
except Exception as err:
self.logger.critical("error connecting to graphite: %s" % err)
if self.debug:
print "error connecting to graphite: %s" % err
+ finally:
+ try:
+ graphite.close()
+ except Exception:
+ pass
def stats_flush(self):
"""
Periodically flush stats to graphite
"""
while True:
- eventlet.sleep(self.flush_interval)
- if self.debug:
- print "seen %d stats so far." % self.stats_seen
- print "current counters: %s" % self.counters
- if self.pickle_proto:
- payload = self.pickle_payload()
- if payload:
- for batch in payload:
- self.report_stats(batch)
- else:
- payload = self.plain_payload()
- if payload:
- self.report_stats(payload)
+ try:
+ eventlet.sleep(self.flush_interval)
+ if self.debug:
+ print "seen %d stats so far." % self.stats_seen
+ print "current counters: %s" % self.counters
+ if self.pickle_proto:
+ payload = self.pickle_payload()
+ if payload:
+ for batch in payload:
+ self.report_stats(batch)
+ else:
+ payload = self.plain_payload()
+ if payload:
+ self.report_stats(payload)
+ except: # safety net
+ self.logger.critical('Encountered error in stats_flush loop')
def pickle_payload(self):
"""obtain stats payload in batches of pickle format"""
@@ -180,6 +199,7 @@ def plain_payload(self):
for key in self.timers:
if len(self.timers[key]) > 0:
self.process_timer_key(key, tstamp, payload)
+ self.timers[key] = []
for key in self.gauges:
payload.append("%s.%s %d %d\n" % (self.gauge_prefix, key,
@@ -333,7 +353,11 @@ def run(self):
else:
for metric in data.splitlines():
if metric:
- self.decode_recvd(metric)
+ try:
+ self.decode_recvd(metric)
+ except: # safety net
+ self.logger.critical("exception in decode_recvd")
+ pass
class Statsd(Daemon):
Please sign in to comment.
Something went wrong with that request. Please try again.