diff --git a/mutornadomon/collectors/ioloop_util.py b/mutornadomon/collectors/ioloop_util.py index 14a13ed..6f8149e 100644 --- a/mutornadomon/collectors/ioloop_util.py +++ b/mutornadomon/collectors/ioloop_util.py @@ -1,11 +1,11 @@ import time - class UtilizationCollector(object): """Collects stats for overall callback durations""" def __init__(self, monitor): self.monitor = monitor + self.monitor.stats_init = False def start(self): self.original_run_callback = self.monitor.io_loop._run_callback @@ -21,8 +21,19 @@ def run_timed_callback(callback): def add_timed_handler(fd, handler, events): def timed_handler(*args, **kwargs): + profiler_enabled = False start_time = time.time() + + if self.monitor.stats_init == True: + self.monitor.profiler.enable() + profiler_enabled = True + result = handler(*args, **kwargs) + + if profiler_enabled == True: + self.monitor.profiler.disable() + profiler_enabled = False + duration = (time.time() - start_time) self.monitor.count('callback_duration', duration) diff --git a/mutornadomon/config.py b/mutornadomon/config.py index 2899fee..ae7427c 100644 --- a/mutornadomon/config.py +++ b/mutornadomon/config.py @@ -3,13 +3,12 @@ from mutornadomon import MuTornadoMon from mutornadomon.external_interfaces.publish import PublishExternalInterface -from mutornadomon.external_interfaces.http_endpoints import HTTPEndpointExternalInterface +from mutornadomon.external_interfaces.http_endpoints import HTTPEndpointExternalInterface, HTTPEndpointMuTornadoMonTracer from mutornadomon.collectors.web import WebCollector from mutornadomon.collectors.ioloop_util import UtilizationCollector - def initialize_mutornadomon(tornado_app=None, publisher=None, publish_interval=None, - host_limit=None, request_filter=None, **monitor_config): + host_limit=None, request_filter=None, tracer_port=None, **monitor_config): """Register mutornadomon to get Tornado request metrics""" if not publisher and not tornado_app: raise ValueError('Must pass at least one of `publisher` and `tornado_app`') @@ -26,6 +25,10 @@ def initialize_mutornadomon(tornado_app=None, publisher=None, publish_interval=N monitor = MuTornadoMon(external_interface, **monitor_config) monitor.start() + # If tracer_port is not provided then don't start the tracer + if tracer_port != None: + HTTPEndpointMuTornadoMonTracer(monitor, request_filter, tracer_port) + if tornado_app: web_collector = WebCollector(monitor, tornado_app) web_collector.start() diff --git a/mutornadomon/external_interfaces/http_endpoints.py b/mutornadomon/external_interfaces/http_endpoints.py index 1bb6515..ab684aa 100644 --- a/mutornadomon/external_interfaces/http_endpoints.py +++ b/mutornadomon/external_interfaces/http_endpoints.py @@ -1,7 +1,16 @@ import tornado - +import cProfile, pstats +import time +from tornado import gen +import logging from mutornadomon import net +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + +logger = logging.getLogger('mutornadomon') def LOCALHOST(request): if not net.is_local_address(request.remote_ip): @@ -11,6 +20,92 @@ def LOCALHOST(request): return True return False +class HTTPEndpointMuTornadoMonTracer(object): + """Handles external HTTP requests for tracer""" + + def __init__(self, monitor, request_filter, server_port): + + if request_filter is None: + self.request_filter = LOCALHOST + else: + self.request_filter = request_filter + + self.tracer_app = tornado.web.Application([ + (r'/', TornadoStatsHandler, { + 'monitor': monitor, + 'request_filter': self.request_filter + }), + ]) + + self.tracer_app.listen(server_port) + logger.info('MuTornadoMon Tracer Listening on port %s', server_port) + + def start(self): + pass + + def stop(self): + pass + + +class TornadoStatsHandler(tornado.web.RequestHandler): + """ + Start & return the Tornado IOLoop stack trace. + Tracing will be started when the url end point is hit & stopped after + waittime or default trace collection time expires + Params for the url are + :param sortby: specifies how the traces will be sorted + (ex: tottime or cumtime) + :param waittime: specifies how long the traces will be collected (msec) + + ex: curl "localhost:5951/?sortby=cumtime&&waittime=4000" + """ + + def initialize(self, monitor, request_filter): + self.monitor = monitor + self.request_filter = request_filter + + def prepare(self): + if not self.request_filter(self.request): + self.send_error(403) + + @gen.coroutine + def get(self): + # Dictates how the stack trace is sorted + if 'sortby' in self.request.arguments: + sortby = self.request.arguments['sortby'][0] + else: + sortby = 'tottime' + + # Wait time(msec) indicates for how long the trace is collected + if 'waittime' in self.request.arguments: + wait_time = float(self.request.arguments['waittime'][0])/1000 + else: + wait_time = 3.0 + + # If collecting trace is not started, start it + if self.monitor.stats_init == False: + self.write("Trace collected for " + str(wait_time * 1000) + " msec\n") + self.monitor.stats_init = True + self.monitor.profiler = cProfile.Profile() + yield gen.Task(self.monitor.io_loop.add_timeout, time.time() + wait_time) + + # disable collecting trace + self.monitor.stats_init = False + + # Stats fails if there is no trace collected + try: + strm = StringIO() + ps = pstats.Stats(self.monitor.profiler, + stream=strm) + except TypeError: + self.write("No trace collected") + return + + if ps is not None: + ps.sort_stats(sortby) + ps.print_stats() + self.write(strm.getvalue()) + class StatusHandler(tornado.web.RequestHandler): @@ -48,6 +143,5 @@ def start(self, monitor): 'request_filter': self.request_filter }) ]) - def stop(self): pass diff --git a/tests/test_basic.py b/tests/test_basic.py index 8e39f25..2040aab 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -5,9 +5,14 @@ from tornado.ioloop import IOLoop import tornado.testing from mutornadomon.config import initialize_mutornadomon +from tornado.httpclient import AsyncHTTPClient +import pstats, cProfile from six import b - +try: + from StringIO import StringIO +except ImportError: + from io import StringIO class HeloHandler(tornado.web.RequestHandler): def get(self): @@ -69,3 +74,32 @@ def test_publisher_called(self, mock_num_threads): ) self.assertEqual(metrics['process']['cpu']['num_threads'], 5) assert metrics['process']['cpu']['system_time'] < 1.0 + +class TestTracer(tornado.testing.AsyncTestCase): + + def setUp(self): + super(TestTracer, self).setUp() + self.publisher = mock.Mock(return_value=None) + self.io_loop = IOLoop.current() + self.monitor = initialize_mutornadomon(io_loop=self.io_loop, publisher=self.publisher, tracer_port=5989) + + def tearDown(self): + super(TestTracer, self).tearDown() + self.monitor.stop() + + @tornado.testing.gen_test + @mock.patch.object(psutil.Process, 'num_threads', autospec=True, return_value=5) + def test_tracer_endpoint(self, mock_num_threads): + client = AsyncHTTPClient(self.io_loop) + resp = yield client.fetch("http://localhost:5989") + + trace_str = "Trace collected for" + self.assertTrue(trace_str in str(resp.body)) + + @tornado.testing.gen_test + def test_tracer_endpoint_params(self): + client = AsyncHTTPClient(self.io_loop) + trace_str = "Trace collected for" + + resp = yield client.fetch("http://localhost:5989/?sortby=cumtime&&waittime=2000") + self.assertTrue(trace_str in str(resp.body), msg='{0}'.format(str(resp.body))) diff --git a/tests/test_config.py b/tests/test_config.py index 9118892..72640c3 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -120,12 +120,49 @@ def test_initialize_mutornadomon_raises_when_no_publisher_and_no_app(self, perio self.assertRaises(ValueError, initialize_mutornadomon, io_loop=tornado.ioloop.IOLoop.current()) + @mock.patch('psutil.Process') + @mock.patch('os.getpid') @mock.patch('mutornadomon.external_interfaces.PublishExternalInterface') - def test_initialize_MuTornadoMon(self, pub_ext_iface_mock): + def test_MuTornadoMon(self, pub_ext_iface_mock, mock_os, mock_ps): external_interface = pub_ext_iface_mock.return_value monitor = MuTornadoMon(external_interface) - #check if no exceptions are raised monitor.start() + + stat = 'test' + val = 2 + + # __COUNTERS[stat] = 2 + monitor.count(stat, val) + + # __COUNTERS[stat] = 3 + monitor.count(stat) + self.assertEqual(monitor.metrics['counters'][stat], 3) + + + monitor.kv(stat, 1) + self.assertEqual(monitor.metrics['gauges'][stat], 1) + + # To make sure no exceptions are thrown monitor._cb() - monitor.stop() + monitor._monkey_patch_ioloop_exceptions() + + monitor.__del__() + + @mock.patch('tornado.ioloop') + def test_initialize_PublishExternalInterface(self, mock_ioloop): + + def publisher(monitor): + pass + + pub_inst = PublishExternalInterface(publisher=publisher) + monitor = mock.MagicMock() + pub_inst.start(monitor) + self.assertTrue(pub_inst.publish_callback != None) + + self.assertRaises(ValueError, pub_inst.start, monitor=monitor) + + pub_inst._publish(monitor) + + pub_inst.stop() + self.assertTrue(pub_inst.publish_callback == None)