Permalink
Browse files

rework logging connections

  • Loading branch information...
1 parent 8d078bc commit c221efd01bf710aa2ca29a421ce458be377dc8f7 @minrk minrk committed Feb 9, 2011
@@ -480,12 +480,11 @@ def start_logging(self):
open_log_file = None
else:
open_log_file = sys.stdout
- logger = logging.getLogger()
- level = self.log_level
- self.log = logger
- # since we've reconnected the logger, we need to reconnect the log-level
- self.log_level = level
- if open_log_file is not None and self._log_handler not in self.log.handlers:
+ if open_log_file is not None:
+ self.log.removeHandler(self._log_handler)
+ self._log_handler = logging.StreamHandler(open_log_file)
+ self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
+ self._log_handler.setFormatter(self._log_formatter)
self.log.addHandler(self._log_handler)
# log.startLogging(open_log_file)
@@ -44,14 +44,12 @@ class ControllerFactory(HubFactory):
children = List()
mq_class = Str('zmq.devices.ProcessMonitoredQueue')
- def _update_mq(self):
- self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if self.usethreads else 'Process')
+ def _usethreads_changed(self, name, old, new):
+ self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
def __init__(self, **kwargs):
super(ControllerFactory, self).__init__(**kwargs)
self.subconstructors.append(self.construct_schedulers)
- self._update_mq()
- self.on_trait_change(self._update_mq, 'usethreads')
def start(self):
super(ControllerFactory, self).start()
@@ -91,18 +89,18 @@ def construct_schedulers(self):
children.append(q)
# Task Queue (in a Process)
if self.scheme == 'pure':
- logging.warn("task::using pure XREQ Task scheduler")
+ self.log.warn("task::using pure XREQ Task scheduler")
q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
q.bind_in(self.client_addrs['task'])
q.bind_out(self.engine_addrs['task'])
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
elif self.scheme == 'none':
- logging.warn("task::using no Task scheduler")
+ self.log.warn("task::using no Task scheduler")
else:
- logging.warn("task::using Python %s Task scheduler"%self.scheme)
+ self.log.warn("task::using Python %s Task scheduler"%self.scheme)
sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme))
q.daemon=True
@@ -25,8 +25,8 @@
import heartmonitor
def printer(*msg):
- # print (logging.handlers, file=sys.__stdout__)
- logging.info(str(msg))
+ # print (self.log.handlers, file=sys.__stdout__)
+ self.log.info(str(msg))
class EngineFactory(RegistrationFactory):
"""IPython engine"""
@@ -54,7 +54,7 @@ def __init__(self, **kwargs):
def register(self):
"""send the registration_request"""
- logging.info("registering")
+ self.log.info("registering")
content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
self.registrar.on_recv(self.complete_registration)
# print (self.session.key)
@@ -112,10 +112,9 @@ def complete_registration(self, msg):
sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
sys.displayhook.topic = 'engine.%i.pyout'%self.id
- self.kernel = Kernel(int_id=self.id, ident=self.ident, session=self.session,
- control_stream=control_stream,
- shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop,
- user_ns = self.user_ns, config=self.config)
+ self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
+ control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
+ loop=loop, user_ns = self.user_ns, logname=self.log.name)
self.kernel.start()
heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
@@ -124,10 +123,10 @@ def complete_registration(self, msg):
else:
- logging.error("Registration Failed: %s"%msg)
+ self.log.error("Registration Failed: %s"%msg)
raise Exception("Registration Failed: %s"%msg)
- logging.info("Completed registration with id %i"%self.id)
+ self.log.info("Completed registration with id %i"%self.id)
def unregister(self):
@@ -79,8 +79,8 @@ def integer_loglevel(loglevel):
loglevel = getattr(logging, loglevel)
return loglevel
-def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
- logger = logging.getLogger()
+def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
+ logger = logging.getLogger(logname)
if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
# don't add a second PUBHandler
return
@@ -106,9 +106,9 @@ def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
logger.addHandler(handler)
logger.setLevel(loglevel)
-def local_logger(loglevel=logging.DEBUG):
+def local_logger(logname, loglevel=logging.DEBUG):
loglevel = integer_loglevel(loglevel)
- logger = logging.getLogger()
+ logger = logging.getLogger(logname)
if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
# don't add a second StreamHandler
return
@@ -29,9 +29,15 @@
#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
+class LoggingFactory(Configurable):
+ """A most basic class, that has a `log` (type:`Logger`) attribute, set via a `logname` Trait."""
+ log = Instance('logging.Logger', ('ZMQ', logging.WARN))
+ logname = CStr('ZMQ')
+ def _logname_changed(self, name, old, new):
+ self.log = logging.getLogger(new)
+
-
-class SessionFactory(Configurable):
+class SessionFactory(LoggingFactory):
"""The Base factory from which every factory in IPython.zmq.parallel inherits"""
packer = Str('',config=True)
@@ -41,14 +47,14 @@ def _ident_default(self):
return str(uuid.uuid4())
username = Str(os.environ.get('USER','username'),config=True)
exec_key = CUnicode('',config=True)
-
# not configurable:
context = Instance('zmq.Context', (), {})
session = Instance('IPython.zmq.parallel.streamsession.StreamSession')
- loop = Instance('zmq.eventloop.ioloop.IOLoop')
+ loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
def _loop_default(self):
return IOLoop.instance()
+
def __init__(self, **kwargs):
super(SessionFactory, self).__init__(**kwargs)
@@ -13,6 +13,9 @@
from zmq.devices import ProcessDevice,ThreadDevice
from zmq.eventloop import ioloop, zmqstream
+from IPython.utils.traitlets import Set, Instance, CFloat, Bool
+from factory import LoggingFactory
+
class Heart(object):
"""A basic heart object for responding to a HeartMonitor.
This is a simple wrapper with defaults for the most common
@@ -39,49 +42,48 @@ def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_
def start(self):
return self.device.start()
-class HeartMonitor(object):
+class HeartMonitor(LoggingFactory):
"""A basic HeartMonitor class
pingstream: a PUB stream
pongstream: an XREP stream
period: the period of the heartbeat in milliseconds"""
- loop=None
- pingstream=None
- pongstream=None
- period=None
- hearts=None
- on_probation=None
- last_ping=None
- # debug=False
- def __init__(self, loop, pingstream, pongstream, period=1000):
- self.loop = loop
- self.period = period
+ period=CFloat(1000, config=True) # in milliseconds
+
+ pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
+ pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
+ loop = Instance('zmq.eventloop.ioloop.IOLoop')
+ def _loop_default(self):
+ return ioloop.IOLoop.instance()
+ debug=Bool(False)
+
+ # not settable:
+ hearts=Set()
+ responses=Set()
+ on_probation=Set()
+ last_ping=CFloat(0)
+ _new_handlers = Set()
+ _failure_handlers = Set()
+ lifetime = CFloat(0)
+ tic = CFloat(0)
+
+ def __init__(self, **kwargs):
+ super(HeartMonitor, self).__init__(**kwargs)
- self.pingstream = pingstream
- self.pongstream = pongstream
self.pongstream.on_recv(self.handle_pong)
-
- self.hearts = set()
- self.responses = set()
- self.on_probation = set()
- self.lifetime = 0
- self.tic = time.time()
-
- self._new_handlers = set()
- self._failure_handlers = set()
def start(self):
self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
self.caller.start()
def add_new_heart_handler(self, handler):
"""add a new handler for new hearts"""
- logging.debug("heartbeat::new_heart_handler: %s"%handler)
+ self.log.debug("heartbeat::new_heart_handler: %s"%handler)
self._new_handlers.add(handler)
def add_heart_failure_handler(self, handler):
"""add a new handler for heart failure"""
- logging.debug("heartbeat::new heart failure handler: %s"%handler)
+ self.log.debug("heartbeat::new heart failure handler: %s"%handler)
self._failure_handlers.add(handler)
def beat(self):
@@ -91,7 +93,7 @@ def beat(self):
toc = time.time()
self.lifetime += toc-self.tic
self.tic = toc
- # logging.debug("heartbeat::%s"%self.lifetime)
+ # self.log.debug("heartbeat::%s"%self.lifetime)
goodhearts = self.hearts.intersection(self.responses)
missed_beats = self.hearts.difference(goodhearts)
heartfailures = self.on_probation.intersection(missed_beats)
@@ -101,15 +103,15 @@ def beat(self):
self.on_probation = missed_beats.intersection(self.hearts)
self.responses = set()
# print self.on_probation, self.hearts
- # logging.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
+ # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
self.pingstream.send(str(self.lifetime))
def handle_new_heart(self, heart):
if self._new_handlers:
for handler in self._new_handlers:
handler(heart)
else:
- logging.info("heartbeat::yay, got new heart %s!"%heart)
+ self.log.info("heartbeat::yay, got new heart %s!"%heart)
self.hearts.add(heart)
def handle_heart_failure(self, heart):
@@ -118,25 +120,25 @@ def handle_heart_failure(self, heart):
try:
handler(heart)
except Exception as e:
- logging.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
+ self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
pass
else:
- logging.info("heartbeat::Heart %s failed :("%heart)
+ self.log.info("heartbeat::Heart %s failed :("%heart)
self.hearts.remove(heart)
def handle_pong(self, msg):
"a heart just beat"
if msg[1] == str(self.lifetime):
delta = time.time()-self.tic
- # logging.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
+ # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
self.responses.add(msg[0])
elif msg[1] == str(self.last_ping):
delta = time.time()-self.tic + (self.lifetime-self.last_ping)
- logging.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
+ self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
self.responses.add(msg[0])
else:
- logging.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
+ self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
(msg[1],self.lifetime))
Oops, something went wrong.

0 comments on commit c221efd

Please sign in to comment.