Skip to content

Commit

Permalink
Merge pull request #25 from uber/stack_trace
Browse files Browse the repository at this point in the history
Enabled Tornado IOLoop stack tracing
  • Loading branch information
thepaulm committed Jul 22, 2016
2 parents dfcddf8 + fcc1d43 commit 5cf8a62
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 10 deletions.
13 changes: 12 additions & 1 deletion mutornadomon/collectors/ioloop_util.py
@@ -1,11 +1,11 @@
import time


class UtilizationCollector(object):
"""Collects stats for overall callback durations"""

def __init__(self, monitor):
self.monitor = monitor
self.monitor.stats_init = False

def start(self):
self.original_run_callback = self.monitor.io_loop._run_callback
Expand All @@ -21,8 +21,19 @@ def run_timed_callback(callback):

def add_timed_handler(fd, handler, events):
def timed_handler(*args, **kwargs):
profiler_enabled = False
start_time = time.time()

if self.monitor.stats_init == True:
self.monitor.profiler.enable()
profiler_enabled = True

result = handler(*args, **kwargs)

if profiler_enabled == True:
self.monitor.profiler.disable()
profiler_enabled = False

duration = (time.time() - start_time)
self.monitor.count('callback_duration', duration)

Expand Down
9 changes: 6 additions & 3 deletions mutornadomon/config.py
Expand Up @@ -3,13 +3,12 @@

from mutornadomon import MuTornadoMon
from mutornadomon.external_interfaces.publish import PublishExternalInterface
from mutornadomon.external_interfaces.http_endpoints import HTTPEndpointExternalInterface
from mutornadomon.external_interfaces.http_endpoints import HTTPEndpointExternalInterface, HTTPEndpointMuTornadoMonTracer
from mutornadomon.collectors.web import WebCollector
from mutornadomon.collectors.ioloop_util import UtilizationCollector


def initialize_mutornadomon(tornado_app=None, publisher=None, publish_interval=None,
host_limit=None, request_filter=None, **monitor_config):
host_limit=None, request_filter=None, tracer_port=None, **monitor_config):
"""Register mutornadomon to get Tornado request metrics"""
if not publisher and not tornado_app:
raise ValueError('Must pass at least one of `publisher` and `tornado_app`')
Expand All @@ -26,6 +25,10 @@ def initialize_mutornadomon(tornado_app=None, publisher=None, publish_interval=N
monitor = MuTornadoMon(external_interface, **monitor_config)
monitor.start()

# If tracer_port is not provided then don't start the tracer
if tracer_port != None:
HTTPEndpointMuTornadoMonTracer(monitor, request_filter, tracer_port)

if tornado_app:
web_collector = WebCollector(monitor, tornado_app)
web_collector.start()
Expand Down
98 changes: 96 additions & 2 deletions mutornadomon/external_interfaces/http_endpoints.py
@@ -1,7 +1,16 @@
import tornado

import cProfile, pstats
import time
from tornado import gen
import logging
from mutornadomon import net

try:
from StringIO import StringIO
except ImportError:
from io import StringIO

logger = logging.getLogger('mutornadomon')

def LOCALHOST(request):
if not net.is_local_address(request.remote_ip):
Expand All @@ -11,6 +20,92 @@ def LOCALHOST(request):
return True
return False

class HTTPEndpointMuTornadoMonTracer(object):
"""Handles external HTTP requests for tracer"""

def __init__(self, monitor, request_filter, server_port):

if request_filter is None:
self.request_filter = LOCALHOST
else:
self.request_filter = request_filter

self.tracer_app = tornado.web.Application([
(r'/', TornadoStatsHandler, {
'monitor': monitor,
'request_filter': self.request_filter
}),
])

self.tracer_app.listen(server_port)
logger.info('MuTornadoMon Tracer Listening on port %s', server_port)

def start(self):
pass

def stop(self):
pass


class TornadoStatsHandler(tornado.web.RequestHandler):
"""
Start & return the Tornado IOLoop stack trace.
Tracing will be started when the url end point is hit & stopped after
waittime or default trace collection time expires
Params for the url are
:param sortby: specifies how the traces will be sorted
(ex: tottime or cumtime)
:param waittime: specifies how long the traces will be collected (msec)
ex: curl "localhost:5951/?sortby=cumtime&&waittime=4000"
"""

def initialize(self, monitor, request_filter):
self.monitor = monitor
self.request_filter = request_filter

def prepare(self):
if not self.request_filter(self.request):
self.send_error(403)

@gen.coroutine
def get(self):
# Dictates how the stack trace is sorted
if 'sortby' in self.request.arguments:
sortby = self.request.arguments['sortby'][0]
else:
sortby = 'tottime'

# Wait time(msec) indicates for how long the trace is collected
if 'waittime' in self.request.arguments:
wait_time = float(self.request.arguments['waittime'][0])/1000
else:
wait_time = 3.0

# If collecting trace is not started, start it
if self.monitor.stats_init == False:
self.write("Trace collected for " + str(wait_time * 1000) + " msec\n")
self.monitor.stats_init = True
self.monitor.profiler = cProfile.Profile()
yield gen.Task(self.monitor.io_loop.add_timeout, time.time() + wait_time)

# disable collecting trace
self.monitor.stats_init = False

# Stats fails if there is no trace collected
try:
strm = StringIO()
ps = pstats.Stats(self.monitor.profiler,
stream=strm)
except TypeError:
self.write("No trace collected")
return

if ps is not None:
ps.sort_stats(sortby)
ps.print_stats()
self.write(strm.getvalue())


class StatusHandler(tornado.web.RequestHandler):

Expand Down Expand Up @@ -48,6 +143,5 @@ def start(self, monitor):
'request_filter': self.request_filter
})
])

def stop(self):
pass
36 changes: 35 additions & 1 deletion tests/test_basic.py
Expand Up @@ -5,9 +5,14 @@
from tornado.ioloop import IOLoop
import tornado.testing
from mutornadomon.config import initialize_mutornadomon
from tornado.httpclient import AsyncHTTPClient
import pstats, cProfile

from six import b

try:
from StringIO import StringIO
except ImportError:
from io import StringIO

class HeloHandler(tornado.web.RequestHandler):
def get(self):
Expand Down Expand Up @@ -69,3 +74,32 @@ def test_publisher_called(self, mock_num_threads):
)
self.assertEqual(metrics['process']['cpu']['num_threads'], 5)
assert metrics['process']['cpu']['system_time'] < 1.0

class TestTracer(tornado.testing.AsyncTestCase):

def setUp(self):
super(TestTracer, self).setUp()
self.publisher = mock.Mock(return_value=None)
self.io_loop = IOLoop.current()
self.monitor = initialize_mutornadomon(io_loop=self.io_loop, publisher=self.publisher, tracer_port=5989)

def tearDown(self):
super(TestTracer, self).tearDown()
self.monitor.stop()

@tornado.testing.gen_test
@mock.patch.object(psutil.Process, 'num_threads', autospec=True, return_value=5)
def test_tracer_endpoint(self, mock_num_threads):
client = AsyncHTTPClient(self.io_loop)
resp = yield client.fetch("http://localhost:5989")

trace_str = "Trace collected for"
self.assertTrue(trace_str in str(resp.body))

@tornado.testing.gen_test
def test_tracer_endpoint_params(self):
client = AsyncHTTPClient(self.io_loop)
trace_str = "Trace collected for"

resp = yield client.fetch("http://localhost:5989/?sortby=cumtime&&waittime=2000")
self.assertTrue(trace_str in str(resp.body), msg='{0}'.format(str(resp.body)))
43 changes: 40 additions & 3 deletions tests/test_config.py
Expand Up @@ -120,12 +120,49 @@ def test_initialize_mutornadomon_raises_when_no_publisher_and_no_app(self, perio
self.assertRaises(ValueError, initialize_mutornadomon,
io_loop=tornado.ioloop.IOLoop.current())

@mock.patch('psutil.Process')
@mock.patch('os.getpid')
@mock.patch('mutornadomon.external_interfaces.PublishExternalInterface')
def test_initialize_MuTornadoMon(self, pub_ext_iface_mock):
def test_MuTornadoMon(self, pub_ext_iface_mock, mock_os, mock_ps):
external_interface = pub_ext_iface_mock.return_value
monitor = MuTornadoMon(external_interface)

#check if no exceptions are raised
monitor.start()

stat = 'test'
val = 2

# __COUNTERS[stat] = 2
monitor.count(stat, val)

# __COUNTERS[stat] = 3
monitor.count(stat)
self.assertEqual(monitor.metrics['counters'][stat], 3)


monitor.kv(stat, 1)
self.assertEqual(monitor.metrics['gauges'][stat], 1)

# To make sure no exceptions are thrown
monitor._cb()
monitor.stop()
monitor._monkey_patch_ioloop_exceptions()

monitor.__del__()

@mock.patch('tornado.ioloop')
def test_initialize_PublishExternalInterface(self, mock_ioloop):

def publisher(monitor):
pass

pub_inst = PublishExternalInterface(publisher=publisher)
monitor = mock.MagicMock()
pub_inst.start(monitor)
self.assertTrue(pub_inst.publish_callback != None)

self.assertRaises(ValueError, pub_inst.start, monitor=monitor)

pub_inst._publish(monitor)

pub_inst.stop()
self.assertTrue(pub_inst.publish_callback == None)

0 comments on commit 5cf8a62

Please sign in to comment.