Skip to content

Commit

Permalink
Merge pull request #640 from scottkmaxwell/extended-stats-hook
Browse files Browse the repository at this point in the history
Add extended_stats hook
  • Loading branch information
tarekziade committed Nov 5, 2013
2 parents 01a75f5 + ee519bc commit a2d2181
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 26 deletions.
3 changes: 2 additions & 1 deletion circus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ def __call__(self, watchers, controller=None,
callable or the callabled itself and a boolean flag indicating if
an exception occuring in the hook should not be ignored.
Possible values for the hook name: *before_start*, *after_start*,
*before_stop*, *after_stop*, *before_signal*, *after_signal*
*before_stop*, *after_stop*, *before_signal*, *after_signal*,
*extended_stats*
- **controller** -- the zmq entry point
(default: 'tcp://127.0.0.1:5555')
Expand Down
31 changes: 24 additions & 7 deletions circus/commands/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class Stats(Command):
To get stats for a process::
{
"command": "stats",
"properties": {
Expand All @@ -43,6 +42,18 @@ class Stats(Command):
}
}
Stats can be extended with the extended_stats hook but extended stats
need to be requested::
{
"command": "stats",
"properties": {
"name": <name>,
"process": <processid>,
"extended": True
}
}
The response retun an object per process with the property "info"
containing some process informations::
Expand All @@ -69,22 +80,26 @@ class Stats(Command):
::
$ circusctl stats [<watchername>] [<processid>]
$ circusctl stats [--extended] [<watchername>] [<processid>]
"""

name = "stats"
options = [('', 'extended', False,
"Include info from extended_stats hook")]

def message(self, *args, **opts):
if len(args) > 2:
raise ArgumentError("message invalid")

extended = opts.get("extended", False)
if len(args) == 2:
return self.make_message(name=args[0], process=int(args[1]))
return self.make_message(name=args[0], process=int(args[1]),
extended=extended)
elif len(args) == 1:
return self.make_message(name=args[0])
return self.make_message(name=args[0], extended=extended)
else:
return self.make_message()
return self.make_message(extended=extended)

def execute(self, arbiter, props):
if 'name' in props:
Expand All @@ -93,13 +108,15 @@ def execute(self, arbiter, props):
try:
return {
"process": props['process'],
"info": watcher.process_info(props['process'])
"info": watcher.process_info(props['process'],
props.get('extended')),
}
except KeyError:
raise MessageError("process %r not found in %r" % (
props['process'], props['name']))
else:
return {"name": props['name'], "info": watcher.info()}
return {"name": props['name'],
"info": watcher.info(props.get('extended'))}
else:
infos = {}
for watcher in arbiter.watchers:
Expand Down
3 changes: 2 additions & 1 deletion circus/commands/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@


_HOOKS = ('before_start', 'after_start', 'before_stop', 'after_stop',
'before_spawn', 'before_signal', 'after_signal')
'before_spawn', 'before_signal', 'after_signal',
'extended_stats')


def convert_option(key, val):
Expand Down
6 changes: 3 additions & 3 deletions circus/py3compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ def get_next(c):

MAXSIZE = sys.maxsize # NOQA
else:
string_types = basestring
string_types = basestring # NOQA
integer_types = (int, long)
text_type = unicode
text_type = unicode # NOQA
long = long

def bytestring(s): # NOQA
if isinstance(s, unicode):
if isinstance(s, unicode): # NOQA
return s.encode('utf-8')
return s

Expand Down
42 changes: 42 additions & 0 deletions circus/tests/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,14 @@ def run_with_hooks(self, hooks):
def _stop(self):
yield self.call("stop", name="test", waiting=True)

@tornado.gen.coroutine
def _stats(self):
yield self.call("stats", name="test")

@tornado.gen.coroutine
def _extended_stats(self):
yield self.call("stats", name="test", extended=True)

@tornado.gen.coroutine
def get_status(self):
resp = yield self.call("status", name="test")
Expand All @@ -358,6 +366,9 @@ def hook(watcher, arbiter, hook_name, **kwargs):
if hook_kwargs_test_function is not None:
hook_kwargs_test_function(kwargs)

if hook_name == 'extended_stats':
kwargs['stats']['tx'] = 1000
return
if behavior == SUCCESS:
return True
elif behavior == FAILURE:
Expand All @@ -383,6 +394,32 @@ def hook(watcher, arbiter, hook_name, **kwargs):
self.assertTrue(events['before_start_called'])
self.assertEqual(events['arbiter_in_hook'], arbiter)

@tornado.gen.coroutine
def _test_extended_stats(self, extended=False):
events = {'extended_stats_called': False}

def hook(watcher, arbiter, hook_name, **kwargs):
events['extended_stats_called'] = True

old = logger.exception
logger.exception = lambda x: x

hooks = {'extended_stats': (hook, False)}
testfile, arbiter = self.run_with_hooks(hooks)
yield arbiter.start()
try:
if extended:
yield self._extended_stats()
else:
yield self._stats()
resp_status = yield self.get_status()
self.assertEqual(resp_status, 'active')
finally:
yield arbiter.stop()
logger.exception = old

self.assertEqual(events['extended_stats_called'], extended)

@tornado.testing.gen_test
def test_before_start(self):
yield self._test_hooks()
Expand Down Expand Up @@ -482,6 +519,11 @@ def test_before_spawn_false(self):
yield self._test_hooks(behavior=FAILURE, status='stopped',
hook_name='before_spawn', call=self._stop)

@tornado.testing.gen_test
def test_extended_stats(self):
yield self._test_extended_stats()
yield self._test_extended_stats(extended=True)


def oneshot_process(test_file):
pass
Expand Down
28 changes: 20 additions & 8 deletions circus/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ class Watcher(object):
or the callabled itself and a boolean flag indicating if an
exception occuring in the hook should not be ignored.
Possible values for the hook name: *before_start*, *after_start*,
*before_spawn*, *before_stop*, *after_stop*., *before_signal* or
*after_signal*.
*before_spawn*, *before_stop*, *after_stop*., *before_signal*,
*after_signal* or *extended_stats*.
- **options** -- extra options for the worker. All options
found in the configuration file for instance, are passed
Expand Down Expand Up @@ -213,7 +213,8 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,
self.max_age = int(max_age)
self.max_age_variance = int(max_age_variance)
self.ignore_hook_failure = ['before_stop', 'after_stop',
'before_signal', 'after_signal']
'before_signal', 'after_signal',
'extended_stats']

self.respawn = respawn
self.autostart = autostart
Expand Down Expand Up @@ -676,14 +677,25 @@ def status(self):
return self._status

@util.debuglog
def process_info(self, pid):
def process_info(self, pid, extended=False):
process = self.processes[int(pid)]
return process.info()
result = process.info()
if extended and 'extended_stats' in self.hooks:
self.hooks['extended_stats'](self, self.arbiter,
'extended_stats',
pid=pid, stats=result)
return result

@util.debuglog
def info(self):
return dict([(proc.pid, proc.info())
for proc in self.processes.values()])
def info(self, extended=False):
result = dict([(proc.pid, proc.info())
for proc in self.processes.values()])
if extended and 'extended_stats' in self.hooks:
for pid, stats in result.items():
self.hooks['extended_stats'](self, self.arbiter,
'extended_stats',
pid=pid, stats=stats)
return result

@util.synchronized("watcher_stop")
@gen.coroutine
Expand Down
14 changes: 13 additions & 1 deletion docs/source/for-devs/writing-hooks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ events. Available hooks are:

- **after_signal**: called after a signal is sent to a watcher's process.

- **extended_stats**: called when stats are requested with extended=True.
Used for adding process-specific stats to the regular stats output.

Example
=======

Expand Down Expand Up @@ -99,7 +102,7 @@ Where **watcher** is the **Watcher** class instance, **arbiter** the
**Arbiter** one, **hook_name** the hook name and **kwargs** some additional
optional parameters (depending on the hook type).

For the moment, only **before_signal** and **after_signal** hooks offer some
The **before_signal** and **after_signal** hooks offer some
additional parameters in **kwargs**::

def before_signal_hook(watcher, arbiter, hook_name, pid, signum, **kwargs):
Expand All @@ -117,6 +120,15 @@ data and methods can be useful in some hooks.
Note that hooks are called with named arguments. So use the hook signature without
changing argument names.

The **extended_stats** hook has its own additional parameters in **kwargs**::

def extended_stats_hook(watcher, arbiter, hook_name, pid, stats, **kwargs):
...

Where **pid** is the PID of the corresponding process and **stats** the
regular stats to be returned. Add your own stats into **stats**. An example
is in examples/uwsgi_lossless_reload.py.

As a last example, here is a super hook which can deal with all kind of signals::

def super_hook(watcher, arbiter, hook_name, **kwargs):
Expand Down
3 changes: 2 additions & 1 deletion docs/source/for-ops/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ watcher:NAME - as many sections as you want

**hooks.***
Available hooks: **before_start**, **before_spawn**, **after_start**,
**before_stop**, **after_stop**, **before_signal**, **after_signal**
**before_stop**, **after_stop**, **before_signal**, **after_signal**,
**extended_stats**

Define callback functions that hook into the watcher startup/shutdown process.

Expand Down
3 changes: 2 additions & 1 deletion examples/uwsgi_lossless_reload.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
cmd = uwsgi --ini uwsgi.ini --socket fd://$(circus.sockets.web) --stats --stats 127.0.0.1:809$(circus.wid)
stop_signal = QUIT
use_sockets = True
hooks.before_signal = uwsgi_clean_stop
hooks.before_signal = examples.uwsgi_lossless_reload.uwsgi_clean_stop
hooks.extended_stats = examples.uwsgi_lossless_reload.extended_stats

[socket:web]
host = 127.0.0.1
Expand Down
28 changes: 25 additions & 3 deletions examples/uwsgi_lossless_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
}


def get_worker_status(name, wid):
def get_uwsgi_stats(name, wid):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock = socket.create_connection(('127.0.0.1', 8090 + wid))
Expand All @@ -48,7 +48,11 @@ def get_worker_status(name, wid):
"Error: No stats seem available for WID {0} of {1}"
.format(wid, name))
return
stats = loads(data.decode())
return loads(data.decode())


def get_worker_states(name, wid):
stats = get_uwsgi_stats(name, wid)
if 'workers' not in stats:
log.error(
"Error: No workers found for WID {0} of {1}"
Expand All @@ -62,12 +66,30 @@ def get_worker_status(name, wid):
def wait_for_workers(name, wid, state, timeout_seconds=60):
timeout = time() + timeout_seconds
while not all(worker.lower() in worker_states[state]
for worker in get_worker_status(name, wid)):
for worker in get_worker_states(name, wid)):
if timeout_seconds and time() > timeout:
raise Exception('timeout')
sleep(0.25)


def extended_stats(watcher, arbiter, hook_name, pid, stats, **kwargs):
name = watcher.name
wid = watcher.processes[pid].wid
uwsgi_stats = get_uwsgi_stats(name, wid)
for k in ('load', 'version'):
if k in uwsgi_stats:
stats[k] = uwsgi_stats[k]
if 'children' in stats and 'workers' in uwsgi_stats:
workers = dict((worker['pid'], worker) for worker in uwsgi_stats['workers'])
for worker in stats['children']:
uwsgi_worker = workers.get(worker['pid'])
if uwsgi_worker:
for k in ('exceptions', 'harakiri_count', 'requests', 'respawn_count', 'status', 'tx'):
if k in uwsgi_worker:
worker[k] = uwsgi_worker[k]
return True


def uwsgi_clean_stop(watcher, arbiter, hook_name, pid, signum, **kwargs):
if len(watcher.processes) > 1 and signum == signal.SIGQUIT:
wid = watcher.processes[pid].wid
Expand Down

0 comments on commit a2d2181

Please sign in to comment.