Skip to content
This repository
Browse code

improved logging + Hub,Engine,Scheduler are Configurable

  • Loading branch information...
commit 600411fc4546d469bae7c722f144b968d8a08959 1 parent 8554e33
Min RK minrk authored
4 IPython/zmq/log.py
@@ -21,7 +21,3 @@ def root_topic(self):
21 21 else:
22 22 return "engine"
23 23
24   -
25   -logger = logging.getLogger('ipzmq')
26   -logger.setLevel(logging.DEBUG)
27   -
9 IPython/zmq/parallel/client.py
@@ -90,11 +90,6 @@ def defaultblock(f, self, *args, **kwargs):
90 90 # Classes
91 91 #--------------------------------------------------------------------------
92 92
93   -class AbortedTask(object):
94   - """A basic wrapper object describing an aborted task."""
95   - def __init__(self, msg_id):
96   - self.msg_id = msg_id
97   -
98 93 class ResultDict(dict):
99 94 """A subclass of dict that raises errors if it has them."""
100 95 def __getitem__(self, key):
@@ -332,10 +327,10 @@ def connect_socket(s, addr):
332 327 msg = ss.Message(msg)
333 328 content = msg.content
334 329 if content.status == 'ok':
335   - if content.queue:
  330 + if content.mux:
336 331 self._mux_socket = self.context.socket(zmq.PAIR)
337 332 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
338   - connect_socket(self._mux_socket, content.queue)
  333 + connect_socket(self._mux_socket, content.mux)
339 334 if content.task:
340 335 self._task_socket = self.context.socket(zmq.PAIR)
341 336 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
25 IPython/zmq/parallel/controller.py
@@ -17,6 +17,7 @@
17 17
18 18 import os
19 19 import time
  20 +import logging
20 21 from multiprocessing import Process
21 22
22 23 import zmq
@@ -29,7 +30,8 @@
29 30
30 31 from hub import Hub
31 32 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
32   - connect_logger, parse_url, signal_children, generate_exec_key)
  33 + connect_logger, parse_url, signal_children, generate_exec_key,
  34 + local_logger)
33 35
34 36
35 37 import streamsession as session
@@ -118,8 +120,6 @@ def main(argv=None):
118 120 ctx = zmq.Context()
119 121 loop = ioloop.IOLoop.instance()
120 122
121   - # setup logging
122   - connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
123 123
124 124 # Registrar socket
125 125 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
@@ -207,7 +207,9 @@ def main(argv=None):
207 207 q.start()
208 208 children.append(q.launcher)
209 209 else:
210   - sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
  210 + log_addr = iface%args.logport if args.logport else None
  211 + sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport,
  212 + log_addr, args.loglevel, args.scheduler)
211 213 print (sargs)
212 214 q = Process(target=launch_scheduler, args=sargs)
213 215 q.daemon=True
@@ -224,7 +226,7 @@ def main(argv=None):
224 226 # build connection dicts
225 227 engine_addrs = {
226 228 'control' : iface%control[1],
227   - 'queue': iface%mux[1],
  229 + 'mux': iface%mux[1],
228 230 'heartbeat': (iface%hb[0], iface%hb[1]),
229 231 'task' : iface%task[1],
230 232 'iopub' : iface%iopub[1],
@@ -234,15 +236,24 @@ def main(argv=None):
234 236 client_addrs = {
235 237 'control' : iface%control[0],
236 238 'query': iface%cport,
237   - 'queue': iface%mux[0],
  239 + 'mux': iface%mux[0],
238 240 'task' : iface%task[0],
239 241 'iopub' : iface%iopub[0],
240 242 'notification': iface%nport
241 243 }
242 244
  245 + # setup logging
  246 + if args.logport:
  247 + connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
  248 + else:
  249 + local_logger(args.loglevel)
  250 +
243 251 # register relay of signals to the children
244 252 signal_children(children)
245   - hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
  253 + hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon,
  254 + registrar=reg, clientele=c, notifier=n, db=db,
  255 + engine_addrs=engine_addrs, client_addrs=client_addrs)
  256 +
246 257 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
247 258 dc.start()
248 259 loop.start()
79 IPython/zmq/parallel/engine.py
@@ -8,49 +8,48 @@
8 8 import time
9 9 import traceback
10 10 import uuid
  11 +import logging
11 12 from pprint import pprint
12 13
13 14 import zmq
14 15 from zmq.eventloop import ioloop, zmqstream
15 16
16   -from IPython.utils.traitlets import HasTraits
17   -from IPython.utils.localinterfaces import LOCALHOST
  17 +# internal
  18 +from IPython.config.configurable import Configurable
  19 +from IPython.utils.traitlets import Instance, Str, Dict
  20 +# from IPython.utils.localinterfaces import LOCALHOST
18 21
19 22 from streamsession import Message, StreamSession
20   -from client import Client
21 23 from streamkernel import Kernel, make_kernel
22 24 import heartmonitor
23   -from entry_point import make_base_argument_parser, connect_logger, parse_url
  25 +from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url,
  26 + local_logger)
24 27 # import taskthread
25   -# from log import logger
26   -
  28 +logger = logging.getLogger()
27 29
28 30 def printer(*msg):
29   - pprint(msg, stream=sys.__stdout__)
  31 + # print (logger.handlers, file=sys.__stdout__)
  32 + logger.info(str(msg))
30 33
31   -class Engine(object):
  34 +class Engine(Configurable):
32 35 """IPython engine"""
33 36
34   - id=None
35   - context=None
36   - loop=None
37   - session=None
38   - ident=None
39   - registrar=None
40   - heart=None
41 37 kernel=None
42   - user_ns=None
  38 + id=None
43 39
44   - def __init__(self, context, loop, session, registrar, client=None, ident=None,
45   - heart_id=None, user_ns=None):
46   - self.context = context
47   - self.loop = loop
48   - self.session = session
49   - self.registrar = registrar
50   - self.client = client
51   - self.ident = ident if ident else str(uuid.uuid4())
  40 + # configurables:
  41 + context=Instance(zmq.Context)
  42 + loop=Instance(ioloop.IOLoop)
  43 + session=Instance(StreamSession)
  44 + ident=Str()
  45 + registrar=Instance(zmqstream.ZMQStream)
  46 + user_ns=Dict()
  47 +
  48 + def __init__(self, **kwargs):
  49 + super(Engine, self).__init__(**kwargs)
  50 + if not self.ident:
  51 + self.ident = str(uuid.uuid4())
52 52 self.registrar.on_send(printer)
53   - self.user_ns = user_ns
54 53
55 54 def register(self):
56 55
@@ -64,9 +63,10 @@ def complete_registration(self, msg):
64 63 idents,msg = self.session.feed_identities(msg)
65 64 msg = Message(self.session.unpack_message(msg))
66 65 if msg.content.status == 'ok':
67   - self.session.username = str(msg.content.id)
68   - queue_addr = msg.content.queue
69   - shell_addrs = [str(queue_addr)]
  66 + self.id = int(msg.content.id)
  67 + self.session.username = 'engine-%i'%self.id
  68 + queue_addr = msg.content.mux
  69 + shell_addrs = [ str(queue_addr) ]
70 70 control_addr = str(msg.content.control)
71 71 task_addr = msg.content.task
72 72 iopub_addr = msg.content.iopub
@@ -75,7 +75,7 @@ def complete_registration(self, msg):
75 75
76 76 hb_addrs = msg.content.heartbeat
77 77 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
78   - k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr,
  78 + k = make_kernel(self.id, self.ident, control_addr, shell_addrs, iopub_addr,
79 79 hb_addrs, client_addr=None, loop=self.loop,
80 80 context=self.context, key=self.session.key)[-1]
81 81 self.kernel = k
@@ -84,12 +84,12 @@ def complete_registration(self, msg):
84 84 self.kernel.user_ns = self.user_ns
85 85
86 86 else:
87   - # logger.error("Registration Failed: %s"%msg)
  87 + logger.error("Registration Failed: %s"%msg)
88 88 raise Exception("Registration Failed: %s"%msg)
89 89
90   - # logger.info("engine::completed registration with id %s"%self.session.username)
  90 + logger.info("completed registration with id %i"%self.id)
91 91
92   - print (msg,file=sys.__stdout__)
  92 + # logger.info(str(msg))
93 93
94 94 def unregister(self):
95 95 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
@@ -97,7 +97,7 @@ def unregister(self):
97 97 sys.exit(0)
98 98
99 99 def start(self):
100   - print ("registering",file=sys.__stdout__)
  100 + logger.info("registering")
101 101 self.register()
102 102
103 103
@@ -118,7 +118,6 @@ def main(argv=None, user_ns=None):
118 118 ctx = zmq.Context()
119 119
120 120 # setup logging
121   - connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
122 121
123 122 reg_conn = iface % args.regport
124 123 print (reg_conn, file=sys.__stdout__)
@@ -127,10 +126,16 @@ def main(argv=None, user_ns=None):
127 126 reg = ctx.socket(zmq.PAIR)
128 127 reg.connect(reg_conn)
129 128 reg = zmqstream.ZMQStream(reg, loop)
130   - client = None
131 129
132   - e = Engine(ctx, loop, session, reg, client, args.ident, user_ns=user_ns)
133   - dc = ioloop.DelayedCallback(e.start, 100, loop)
  130 + e = Engine(context=ctx, loop=loop, session=session, registrar=reg,
  131 + ident=args.ident or '', user_ns=user_ns)
  132 + if args.logport:
  133 + print ("connecting logger to %s"%(iface%args.logport), file=sys.__stdout__)
  134 + connect_engine_logger(ctx, iface%args.logport, e, loglevel=args.loglevel)
  135 + else:
  136 + local_logger(args.loglevel)
  137 +
  138 + dc = ioloop.DelayedCallback(e.start, 0, loop)
134 139 dc.start()
135 140 loop.start()
136 141
42 IPython/zmq/parallel/entry_point.py
@@ -22,7 +22,7 @@
22 22 # Local imports.
23 23 from IPython.core.ultratb import FormattedTB
24 24 from IPython.external.argparse import ArgumentParser
25   -from IPython.zmq.log import logger
  25 +from IPython.zmq.log import EnginePUBHandler
26 26
27 27 def split_ports(s, n):
28 28 """Parser helper for multiport strings"""
@@ -82,6 +82,7 @@ def make_base_argument_parser():
82 82 """ Creates an ArgumentParser for the generic arguments supported by all
83 83 ipcluster entry points.
84 84 """
  85 +
85 86 parser = ArgumentParser()
86 87 parser.add_argument('--ip', type=str, default='127.0.0.1',
87 88 help='set the controller\'s IP address [default: local]')
@@ -89,10 +90,10 @@ def make_base_argument_parser():
89 90 help='set the transport to use [default: tcp]')
90 91 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
91 92 help='set the XREP port for registration [default: 10101]')
92   - parser.add_argument('--logport', type=int, metavar='PORT', default=20202,
93   - help='set the PUB port for logging [default: 10201]')
94   - parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.DEBUG,
95   - help='set the log level [default: DEBUG]')
  93 + parser.add_argument('--logport', type=int, metavar='PORT', default=0,
  94 + help='set the PUB port for remote logging [default: log to stdout]')
  95 + parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.INFO,
  96 + help='set the log level [default: INFO]')
96 97 parser.add_argument('--ident', type=str,
97 98 help='set the ZMQ identity [default: random]')
98 99 parser.add_argument('--packer', type=str, default='json',
@@ -105,17 +106,42 @@ def make_base_argument_parser():
105 106
106 107 return parser
107 108
108   -
109   -def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
  109 +def integer_loglevel(loglevel):
110 110 try:
111 111 loglevel = int(loglevel)
112 112 except ValueError:
113 113 if isinstance(loglevel, str):
114 114 loglevel = getattr(logging, loglevel)
  115 + return loglevel
  116 +
  117 +def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
  118 + loglevel = integer_loglevel(loglevel)
115 119 lsock = context.socket(zmq.PUB)
116 120 lsock.connect(iface)
117 121 handler = handlers.PUBHandler(lsock)
118 122 handler.setLevel(loglevel)
119 123 handler.root_topic = root
  124 + logger = logging.getLogger()
  125 + logger.addHandler(handler)
  126 + logger.setLevel(loglevel)
  127 +
  128 +def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
  129 + logger = logging.getLogger()
  130 + loglevel = integer_loglevel(loglevel)
  131 + lsock = context.socket(zmq.PUB)
  132 + lsock.connect(iface)
  133 + handler = EnginePUBHandler(engine, lsock)
  134 + handler.setLevel(loglevel)
  135 + logger.addHandler(handler)
  136 + logger.setLevel(loglevel)
  137 +
  138 +def local_logger(loglevel=logging.DEBUG):
  139 + loglevel = integer_loglevel(loglevel)
  140 + logger = logging.getLogger()
  141 + if logger.handlers:
  142 + # if there are any handlers, skip the hookup
  143 + return
  144 + handler = logging.StreamHandler()
  145 + handler.setLevel(loglevel)
120 146 logger.addHandler(handler)
121   -
  147 + logger.setLevel(loglevel)
22 IPython/zmq/parallel/heartmonitor.py
@@ -7,13 +7,13 @@
7 7 from __future__ import print_function
8 8 import time
9 9 import uuid
  10 +import logging
10 11
11 12 import zmq
12 13 from zmq.devices import ProcessDevice,ThreadDevice
13 14 from zmq.eventloop import ioloop, zmqstream
14 15
15   -#internal
16   -from IPython.zmq.log import logger
  16 +logger = logging.getLogger()
17 17
18 18 class Heart(object):
19 19 """A basic heart object for responding to a HeartMonitor.
@@ -53,6 +53,7 @@ class HeartMonitor(object):
53 53 hearts=None
54 54 on_probation=None
55 55 last_ping=None
  56 + # debug=False
56 57
57 58 def __init__(self, loop, pingstream, pongstream, period=1000):
58 59 self.loop = loop
@@ -84,19 +85,6 @@ def add_heart_failure_handler(self, handler):
84 85 """add a new handler for heart failure"""
85 86 logger.debug("heartbeat::new heart failure handler: %s"%handler)
86 87 self._failure_handlers.add(handler)
87   -
88   - # def _flush(self):
89   - # """override IOLoop triggers"""
90   - # while True:
91   - # try:
92   - # msg = self.pongstream.socket.recv_multipart(zmq.NOBLOCK)
93   - # logger.warn("IOLoop triggered beat with incoming heartbeat waiting to be handled")
94   - # except zmq.ZMQError:
95   - # return
96   - # else:
97   - # self.handle_pong(msg)
98   - # # print '.'
99   - #
100 88
101 89 def beat(self):
102 90 self.pongstream.flush()
@@ -105,7 +93,7 @@ def beat(self):
105 93 toc = time.time()
106 94 self.lifetime += toc-self.tic
107 95 self.tic = toc
108   - logger.debug("heartbeat::%s"%self.lifetime)
  96 + # logger.debug("heartbeat::%s"%self.lifetime)
109 97 goodhearts = self.hearts.intersection(self.responses)
110 98 missed_beats = self.hearts.difference(goodhearts)
111 99 heartfailures = self.on_probation.intersection(missed_beats)
@@ -144,7 +132,7 @@ def handle_pong(self, msg):
144 132 "a heart just beat"
145 133 if msg[1] == str(self.lifetime):
146 134 delta = time.time()-self.tic
147   - logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
  135 + # logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
148 136 self.responses.add(msg[0])
149 137 elif msg[1] == str(self.last_ping):
150 138 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
161 IPython/zmq/parallel/hub.py
@@ -18,14 +18,19 @@
18 18 import sys
19 19 from datetime import datetime
20 20 import time
  21 +import logging
21 22
22 23 import zmq
23   -from zmq.eventloop import ioloop
  24 +from zmq.eventloop import ioloop, zmqstream
24 25
25 26 # internal:
26   -from IPython.zmq.log import logger # a Logger object
  27 +from IPython.config.configurable import Configurable
  28 +from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict
  29 +# from IPython.zmq.log import logger # a Logger object
27 30
28 31 from streamsession import Message, wrap_exception, ISO8601
  32 +from heartmonitor import HeartMonitor
  33 +from util import validate_url_container
29 34
30 35 try:
31 36 from pymongo.binary import Binary
@@ -38,6 +43,8 @@
38 43 # Code
39 44 #-----------------------------------------------------------------------------
40 45
  46 +logger = logging.getLogger()
  47 +
41 48 def _passer(*args, **kwargs):
42 49 return
43 50
@@ -46,7 +53,7 @@ def _printer(*args, **kwargs):
46 53 print (kwargs)
47 54
48 55 def init_record(msg):
49   - """return an empty TaskRecord dict, with all keys initialized with None."""
  56 + """Initialize a TaskRecord based on a request."""
50 57 header = msg['header']
51 58 return {
52 59 'msg_id' : header['msg_id'],
@@ -71,7 +78,7 @@ def init_record(msg):
71 78 }
72 79
73 80
74   -class EngineConnector(object):
  81 +class EngineConnector(HasTraits):
75 82 """A simple object for accessing the various zmq connections of an object.
76 83 Attributes are:
77 84 id (int): engine ID
@@ -80,22 +87,18 @@ class EngineConnector(object):
80 87 registration (str): identity of registration XREQ socket
81 88 heartbeat (str): identity of heartbeat XREQ socket
82 89 """
83   - id=0
84   - queue=None
85   - control=None
86   - registration=None
87   - heartbeat=None
88   - pending=None
89   -
90   - def __init__(self, id, queue, registration, control, heartbeat=None):
91   - logger.info("engine::Engine Connected: %i"%id)
92   - self.id = id
93   - self.queue = queue
94   - self.registration = registration
95   - self.control = control
96   - self.heartbeat = heartbeat
97   -
98   -class Hub(object):
  90 + id=Int(0)
  91 + queue=Str()
  92 + control=Str()
  93 + registration=Str()
  94 + heartbeat=Str()
  95 + pending=Instance(set)
  96 +
  97 + def __init__(self, **kwargs):
  98 + super(EngineConnector, self).__init__(**kwargs)
  99 + logger.info("engine::Engine Connected: %i"%self.id)
  100 +
  101 +class Hub(Configurable):
99 102 """The IPython Controller Hub with 0MQ connections
100 103
101 104 Parameters
@@ -123,26 +126,25 @@ class Hub(object):
123 126 clients=None
124 127 hearts=None
125 128 pending=None
126   - results=None
127 129 tasks=None
128 130 completed=None
129   - mia=None
  131 + # mia=None
130 132 incoming_registrations=None
131 133 registration_timeout=None
132 134
133   - #objects from constructor:
134   - loop=None
135   - registrar=None
136   - clientelle=None
137   - queue=None
138   - heartbeat=None
139   - notifier=None
140   - db=None
141   - client_addr=None
142   - engine_addrs=None
  135 + # objects from constructor:
  136 + loop=Instance(ioloop.IOLoop)
  137 + registrar=Instance(zmqstream.ZMQStream)
  138 + clientele=Instance(zmqstream.ZMQStream)
  139 + monitor=Instance(zmqstream.ZMQStream)
  140 + heartmonitor=Instance(HeartMonitor)
  141 + notifier=Instance(zmqstream.ZMQStream)
  142 + db=Instance(object)
  143 + client_addrs=Dict()
  144 + engine_addrs=Dict()
143 145
144 146
145   - def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
  147 + def __init__(self, **kwargs):
146 148 """
147 149 # universal:
148 150 loop: IOLoop for creating future connections
@@ -158,6 +160,8 @@ def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifi
158 160 engine_addrs: zmq address/protocol dict for engine connections
159 161 client_addrs: zmq address/protocol dict for client connections
160 162 """
  163 +
  164 + super(Hub, self).__init__(**kwargs)
161 165 self.ids = set()
162 166 self.keytable={}
163 167 self.incoming_registrations={}
@@ -166,35 +170,44 @@ def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifi
166 170 self.clients = {}
167 171 self.hearts = {}
168 172 # self.mia = set()
169   -
  173 + self.registration_timeout = max(5000, 2*self.heartmonitor.period)
  174 + # this is the stuff that will move to DB:
  175 + self.pending = set() # pending messages, keyed by msg_id
  176 + self.queues = {} # pending msg_ids keyed by engine_id
  177 + self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
  178 + self.completed = {} # completed msg_ids keyed by engine_id
  179 + self.all_completed = set()
  180 + self._idcounter = 0
170 181 # self.sockets = {}
171   - self.loop = loop
172   - self.session = session
173   - self.registrar = registrar
174   - self.clientele = clientele
175   - self.queue = queue
176   - self.heartbeat = heartbeat
177   - self.notifier = notifier
178   - self.db = db
  182 + # self.loop = loop
  183 + # self.session = session
  184 + # self.registrar = registrar
  185 + # self.clientele = clientele
  186 + # self.queue = queue
  187 + # self.heartmonitor = heartbeat
  188 + # self.notifier = notifier
  189 + # self.db = db
179 190
180 191 # validate connection dicts:
181   - self.client_addrs = client_addrs
182   - assert isinstance(client_addrs['queue'], str)
183   - assert isinstance(client_addrs['control'], str)
  192 + # self.client_addrs = client_addrs
  193 + validate_url_container(self.client_addrs)
  194 +
  195 + # assert isinstance(self.client_addrs['queue'], str)
  196 + # assert isinstance(self.client_addrs['control'], str)
184 197 # self.hb_addrs = hb_addrs
185   - self.engine_addrs = engine_addrs
186   - assert isinstance(engine_addrs['queue'], str)
187   - assert isinstance(client_addrs['control'], str)
188   - assert len(engine_addrs['heartbeat']) == 2
  198 + validate_url_container(self.engine_addrs)
  199 + # self.engine_addrs = engine_addrs
  200 + # assert isinstance(self.engine_addrs['queue'], str)
  201 + # assert isinstance(self.engine_addrs['control'], str)
  202 + # assert len(engine_addrs['heartbeat']) == 2
189 203
190 204 # register our callbacks
191 205 self.registrar.on_recv(self.dispatch_register_request)
192 206 self.clientele.on_recv(self.dispatch_client_msg)
193   - self.queue.on_recv(self.dispatch_monitor_traffic)
  207 + self.monitor.on_recv(self.dispatch_monitor_traffic)
194 208
195   - if heartbeat is not None:
196   - heartbeat.add_heart_failure_handler(self.handle_heart_failure)
197   - heartbeat.add_new_heart_handler(self.handle_new_heart)
  209 + self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
  210 + self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
198 211
199 212 self.monitor_handlers = { 'in' : self.save_queue_request,
200 213 'out': self.save_queue_result,
@@ -218,25 +231,21 @@ def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifi
218 231 'unregistration_request' : self.unregister_engine,
219 232 'connection_request': self.connection_request,
220 233 }
221   - self.registration_timeout = max(5000, 2*self.heartbeat.period)
222   - # this is the stuff that will move to DB:
223   - # self.results = {} # completed results
224   - self.pending = set() # pending messages, keyed by msg_id
225   - self.queues = {} # pending msg_ids keyed by engine_id
226   - self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
227   - self.completed = {} # completed msg_ids keyed by engine_id
228   - self.all_completed = set()
229 234
230 235 logger.info("controller::created controller")
231 236
232   - def _new_id(self):
  237 + @property
  238 + def _next_id(self):
233 239 """gemerate a new ID"""
234   - newid = 0
235   - incoming = [id[0] for id in self.incoming_registrations.itervalues()]
236   - # print newid, self.ids, self.incoming_registrations
237   - while newid in self.ids or newid in incoming:
238   - newid += 1
  240 + newid = self._idcounter
  241 + self._idcounter += 1
239 242 return newid
  243 + # newid = 0
  244 + # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
  245 + # # print newid, self.ids, self.incoming_registrations
  246 + # while newid in self.ids or newid in incoming:
  247 + # newid += 1
  248 + # return newid
240 249
241 250 #-----------------------------------------------------------------------------
242 251 # message validation
@@ -580,6 +589,9 @@ def save_iopub_message(self, topics, msg):
580 589 return
581 590
582 591 parent = msg['parent_header']
  592 + if not parent:
  593 + logger.error("iopub::invalid IOPub message: %s"%msg)
  594 + return
583 595 msg_id = parent['msg_id']
584 596 msg_type = msg['msg_type']
585 597 content = msg['content']
@@ -631,7 +643,7 @@ def register_engine(self, reg, msg):
631 643 return
632 644 heart = content.get('heartbeat', None)
633 645 """register a new engine, and create the socket(s) necessary"""
634   - eid = self._new_id()
  646 + eid = self._next_id
635 647 # print (eid, queue, reg, heart)
636 648
637 649 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
@@ -644,10 +656,12 @@ def register_engine(self, reg, msg):
644 656 raise KeyError("queue_id %r in use"%queue)
645 657 except:
646 658 content = wrap_exception()
  659 + logger.error("queue_id %r in use"%queue, exc_info=True)
647 660 elif heart in self.hearts: # need to check unique hearts?
648 661 try:
649 662 raise KeyError("heart_id %r in use"%heart)
650 663 except:
  664 + logger.error("heart_id %r in use"%heart, exc_info=True)
651 665 content = wrap_exception()
652 666 else:
653 667 for h, pack in self.incoming_registrations.iteritems():
@@ -655,12 +669,14 @@ def register_engine(self, reg, msg):
655 669 try:
656 670 raise KeyError("heart_id %r in use"%heart)
657 671 except:
  672 + logger.error("heart_id %r in use"%heart, exc_info=True)
658 673 content = wrap_exception()
659 674 break
660 675 elif queue == pack[1]:
661 676 try:
662 677 raise KeyError("queue_id %r in use"%queue)
663 678 except:
  679 + logger.error("queue_id %r in use"%queue, exc_info=True)
664 680 content = wrap_exception()
665 681 break
666 682
@@ -669,15 +685,15 @@ def register_engine(self, reg, msg):
669 685 ident=reg)
670 686
671 687 if content['status'] == 'ok':
672   - if heart in self.heartbeat.hearts:
  688 + if heart in self.heartmonitor.hearts:
673 689 # already beating
674   - self.incoming_registrations[heart] = (eid,queue,reg,None)
  690 + self.incoming_registrations[heart] = (eid,queue,reg[0],None)
675 691 self.finish_registration(heart)
676 692 else:
677 693 purge = lambda : self._purge_stalled_registration(heart)
678 694 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
679 695 dc.start()
680   - self.incoming_registrations[heart] = (eid,queue,reg,dc)
  696 + self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
681 697 else:
682 698 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
683 699 return eid
@@ -718,7 +734,8 @@ def finish_registration(self, heart):
718 734 control = queue
719 735 self.ids.add(eid)
720 736 self.keytable[eid] = queue
721   - self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
  737 + self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
  738 + control=control, heartbeat=heart)
722 739 self.by_ident[queue] = eid
723 740 self.queues[eid] = list()
724 741 self.tasks[eid] = list()
64 IPython/zmq/parallel/scheduler.py
@@ -11,6 +11,8 @@
11 11
12 12 from __future__ import print_function
13 13 from random import randint,random
  14 +import logging
  15 +from types import FunctionType
14 16
15 17 try:
16 18 import numpy
@@ -21,17 +23,22 @@
21 23 from zmq.eventloop import ioloop, zmqstream
22 24
23 25 # local imports
24   -from IPython.zmq.log import logger # a Logger object
  26 +from IPython.external.decorator import decorator
  27 +from IPython.config.configurable import Configurable
  28 +from IPython.utils.traitlets import Instance
  29 +
25 30 from client import Client
26 31 from dependency import Dependency
27 32 import streamsession as ss
  33 +from entry_point import connect_logger, local_logger
28 34
29   -from IPython.external.decorator import decorator
  35 +
  36 +logger = logging.getLogger()
30 37
31 38 @decorator
32 39 def logged(f,self,*args,**kwargs):
33 40 # print ("#--------------------")
34   - # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
  41 + logger.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
35 42 # print ("#--")
36 43 return f(self,*args, **kwargs)
37 44
@@ -99,7 +106,7 @@ def leastload(loads):
99 106 #---------------------------------------------------------------------
100 107 # Classes
101 108 #---------------------------------------------------------------------
102   -class TaskScheduler(object):
  109 +class TaskScheduler(Configurable):
103 110 """Python TaskScheduler object.
104 111
105 112 This is the simplest object that supports msg_id based
@@ -108,10 +115,15 @@ class TaskScheduler(object):
108 115
109 116 """
110 117
111   - scheme = leastload # function for determining the destination
112   - client_stream = None # client-facing stream
113   - engine_stream = None # engine-facing stream
114   - mon_stream = None # controller-facing stream
  118 + # configurables:
  119 + scheme = Instance(FunctionType, default=leastload) # function for determining the destination
  120 + client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
  121 + engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
  122 + notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
  123 + mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
  124 + io_loop = Instance(ioloop.IOLoop)
  125 +
  126 + # internals:
115 127 dependencies = None # dict by msg_id of [ msg_ids that depend on key ]
116 128 depending = None # dict by msg_id of (msg_id, raw_msg, after, follow)
117 129 pending = None # dict by engine_uuid of submitted tasks
@@ -123,23 +135,10 @@ class TaskScheduler(object):
123 135 blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency
124 136
125 137
126   - def __init__(self, client_stream, engine_stream, mon_stream,
127   - notifier_stream, scheme=None, io_loop=None):
128   - if io_loop is None:
129   - io_loop = ioloop.IOLoop.instance()
130   - self.io_loop = io_loop
131   - self.client_stream = client_stream
132   - self.engine_stream = engine_stream
133   - self.mon_stream = mon_stream
134   - self.notifier_stream = notifier_stream
135   -
136   - if scheme is not None:
137   - self.scheme = scheme
138   - else:
139   - self.scheme = TaskScheduler.scheme
  138 + def __init__(self, **kwargs):
  139 + super(TaskScheduler, self).__init__(**kwargs)
140 140
141 141 self.session = ss.StreamSession(username="TaskScheduler")
142   -
143 142 self.dependencies = {}
144 143 self.depending = {}
145 144 self.completed = {}
@@ -150,12 +149,13 @@ def __init__(self, client_stream, engine_stream, mon_stream,
150 149 self.targets = []
151 150 self.loads = []
152 151
153   - engine_stream.on_recv(self.dispatch_result, copy=False)
  152 + self.engine_stream.on_recv(self.dispatch_result, copy=False)
154 153 self._notification_handlers = dict(
155 154 registration_notification = self._register_engine,
156 155 unregistration_notification = self._unregister_engine
157 156 )
158 157 self.notifier_stream.on_recv(self.dispatch_notification)
  158 + logger.info("Scheduler started...%r"%self)
159 159
160 160 def resume_receiving(self):
161 161 """Resume accepting jobs."""
@@ -183,6 +183,7 @@ def dispatch_notification(self, msg):
183 183 handler(str(msg['content']['queue']))
184 184 except KeyError:
185 185 logger.error("task::Invalid notification msg: %s"%msg)
  186 +
186 187 @logged
187 188 def _register_engine(self, uid):
188 189 """New engine with ident `uid` became available."""
@@ -306,7 +307,8 @@ def submit_task(self, msg_id, msg, follow=None, indices=None):
306 307 self.add_job(idx)
307 308 self.pending[target][msg_id] = (msg, follow)
308 309 content = dict(msg_id=msg_id, engine_id=target)
309   - self.session.send(self.mon_stream, 'task_destination', content=content, ident='tracktask')
  310 + self.session.send(self.mon_stream, 'task_destination', content=content,
  311 + ident=['tracktask',self.session.session])
310 312
311 313 #-----------------------------------------------------------------------
312 314 # Result Handling
@@ -395,7 +397,7 @@ def finish_job(self, idx):
395 397
396 398
397 399
398   -def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'):
  400 +def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
399 401 from zmq.eventloop import ioloop
400 402 from zmq.eventloop.zmqstream import ZMQStream
401 403
@@ -414,7 +416,15 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'):
414 416 nots.setsockopt(zmq.SUBSCRIBE, '')
415 417 nots.connect(not_addr)
416 418
417   - scheduler = TaskScheduler(ins,outs,mons,nots,scheme,loop)
  419 + # setup logging
  420 + if log_addr:
  421 + connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel)
  422 + else:
  423 + local_logger(loglevel)
  424 +
  425 + scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
  426 + mon_stream=mons,notifier_stream=nots,
  427 + scheme=scheme,io_loop=loop)
418 428
419 429 loop.start()
420 430
50 IPython/zmq/parallel/streamkernel.py
@@ -15,6 +15,7 @@
15 15 import sys
16 16 import time
17 17 import traceback
  18 +import logging
18 19 from datetime import datetime
19 20 from signal import SIGTERM, SIGKILL
20 21 from pprint import pprint
@@ -25,9 +26,8 @@
25 26
26 27 # Local imports.
27 28 from IPython.core import ultratb
28   -from IPython.utils.traitlets import HasTraits, Instance, List
  29 +from IPython.utils.traitlets import HasTraits, Instance, List, Int
29 30 from IPython.zmq.completer import KernelCompleter
30   -from IPython.zmq.log import logger # a Logger object
31 31 from IPython.zmq.iostream import OutStream
32 32 from IPython.zmq.displayhook import DisplayHook
33 33
@@ -38,6 +38,8 @@
38 38 import heartmonitor
39 39 from client import Client
40 40
  41 +logger = logging.getLogger()
  42 +
41 43 def printer(*args):
42 44 pprint(args, stream=sys.__stdout__)
43 45
@@ -51,8 +53,9 @@ class Kernel(HasTraits):
51 53 # Kernel interface
52 54 #---------------------------------------------------------------------------
53 55
  56 + id = Int(-1)
54 57 session = Instance(StreamSession)
55   - shell_streams = Instance(list)
  58 + shell_streams = List()
56 59 control_stream = Instance(zmqstream.ZMQStream)
57 60 task_stream = Instance(zmqstream.ZMQStream)
58 61 iopub_stream = Instance(zmqstream.ZMQStream)
@@ -62,7 +65,8 @@ class Kernel(HasTraits):
62 65 def __init__(self, **kwargs):
63 66 super(Kernel, self).__init__(**kwargs)
64 67 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
65   - self.prefix = 'engine.%s'%self.identity
  68 + self.prefix = 'engine.%s'%self.id
  69 + logger.root_topic = self.prefix
66 70 self.user_ns = {}
67 71 self.history = []
68 72 self.compiler = CommandCompiler()
@@ -108,8 +112,8 @@ def abort_queue(self, stream):
108 112
109 113 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
110 114 # msg = self.reply_socket.recv_json()
111   - print ("Aborting:", file=sys.__stdout__)
112   - print (Message(msg), file=sys.__stdout__)
  115 + logger.info("Aborting:")
  116 + logger.info(str(msg))
113 117 msg_type = msg['msg_type']
114 118 reply_type = msg_type.split('_')[0] + '_reply'
115 119 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
@@ -117,7 +121,7 @@ def abort_queue(self, stream):
117 121 # self.reply_socket.send_json(reply_msg)
118 122 reply_msg = self.session.send(stream, reply_type,
119 123 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
120   - print(Message(reply_msg), file=sys.__stdout__)
  124 + logger.debug(str(reply_msg))
121 125 # We need to wait a bit for requests to come in. This can probably
122 126 # be set shorter for true asynchronous clients.
123 127 time.sleep(0.05)
@@ -135,7 +139,7 @@ def abort_request(self, stream, ident, parent):
135 139 content = dict(status='ok')
136 140 reply_msg = self.session.send(stream, 'abort_reply', content=content,
137 141 parent=parent, ident=ident)[0]
138   - print(Message(reply_msg), file=sys.__stdout__)
  142 + logger(Message(reply_msg), file=sys.__stdout__)
139 143
140 144 def shutdown_request(self, stream, ident, parent):
141 145 """kill ourself. This should really be handled in an external process"""
@@ -168,7 +172,7 @@ def dispatch_control(self, msg):
168 172
169 173 handler = self.control_handlers.get(msg['msg_type'], None)
170 174 if handler is None:
171   - print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
  175 + logger.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
172 176 else:
173 177 handler(self.control_stream, idents, msg)
174 178
@@ -211,13 +215,12 @@ def execute_request(self, stream, ident, parent):
211 215 try:
212 216 code = parent[u'content'][u'code']
213 217 except:
214   - print("Got bad msg: ", file=sys.__stderr__)
215   - print(Message(parent), file=sys.__stderr__)
  218 + logger.error("Got bad msg: %s"%parent, exc_info=True)
216 219 return
217 220 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
218 221 # self.iopub_stream.send(pyin_msg)
219 222 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
220   - ident=self.identity+'.pyin')
  223 + ident='%s.pyin'%self.prefix)
221 224 started = datetime.now().strftime(ISO8601)
222 225 try:
223 226 comp_code = self.compiler(code, '<zmq-kernel>')
@@ -231,7 +234,7 @@ def execute_request(self, stream, ident, parent):
231 234 exc_content = self._wrap_exception('execute')
232 235 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
233 236 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
234   - ident=self.identity+'.pyerr')
  237 + ident='%s.pyerr'%self.prefix)
235 238 reply_content = exc_content
236 239 else:
237 240 reply_content = {'status' : 'ok'}
@@ -240,7 +243,7 @@ def execute_request(self, stream, ident, parent):
240 243 # self.reply_socket.send_json(reply_msg)
241 244 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
242 245 ident=ident, subheader = dict(started=started))
243   - print(Message(reply_msg), file=sys.__stdout__)
  246 + logger.debug(str(reply_msg))
244 247 if reply_msg['content']['status'] == u'error':
245 248 self.abort_queues()
246 249
@@ -262,8 +265,7 @@ def apply_request(self, stream, ident, parent):
262 265 msg_id = parent['header']['msg_id']
263 266 bound = content.get('bound', False)
264 267 except:
265   - print("Got bad msg: ", file=sys.__stderr__)
266   - print(Message(parent), file=sys.__stderr__)
  268 + logger.error("Got bad msg: %s"%parent, exc_info=True)
267 269 return
268 270 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
269 271 # self.iopub_stream.send(pyin_msg)
@@ -316,7 +318,7 @@ def apply_request(self, stream, ident, parent):
316 318 exc_content = self._wrap_exception('apply')
317 319 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
318 320 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
319   - ident=self.identity+'.pyerr')
  321 + ident='%s.pyerr'%self.prefix)
320 322 reply_content = exc_content
321 323 result_buf = []
322 324
@@ -354,7 +356,7 @@ def dispatch_queue(self, stream, msg):
354 356 return
355 357 handler = self.shell_handlers.get(msg['msg_type'], None)
356 358 if handler is None:
357   - print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
  359 + logger.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
358 360 else:
359 361 handler(stream, idents, msg)
360 362
@@ -398,7 +400,7 @@ def dispatcher(msg):
398 400 # # don't busywait
399 401 # time.sleep(1e-3)
400 402
401   -def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
  403 +def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
402 404 client_addr=None, loop=None, context=None, key=None,
403 405 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
404 406
@@ -410,7 +412,7 @@ def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
410 412 c = context
411 413 session = StreamSession(key=key)
412 414 # print (session.key)
413   - print (control_addr, shell_addrs, iopub_addr, hb_addrs)
  415 + # print (control_addr, shell_addrs, iopub_addr, hb_addrs)
414 416
415 417 # create Control Stream
416 418 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
@@ -433,12 +435,12 @@ def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
433 435 # Redirect input streams and set a display hook.
434 436 if out_stream_factory:
435 437 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
436   - sys.stdout.topic = identity+'.stdout'
  438 + sys.stdout.topic = 'engine.%i.stdout'%int_id
437 439 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
438   - sys.stderr.topic = identity+'.stderr'
  440 + sys.stderr.topic = 'engine.%i.stderr'%int_id
439 441 if display_hook_factory:
440 442 sys.displayhook = display_hook_factory(session, iopub_stream)
441   - sys.displayhook.topic = identity+'.pyout'
  443 + sys.displayhook.topic = 'engine.%i.pyout'%int_id
442 444
443 445
444 446 # launch heartbeat
@@ -451,7 +453,7 @@ def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
451 453 else:
452 454 client = None
453 455
454   - kernel = Kernel(session=session, control_stream=control_stream,
  456 + kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
455 457 shell_streams=shell_streams, iopub_stream=iopub_stream,
456 458 client=client, loop=loop)
457 459 kernel.start()
44 IPython/zmq/parallel/util.py
... ... @@ -1,4 +1,5 @@
1 1 """some generic utilities"""
  2 +import re
2 3
3 4 class ReverseDict(dict):
4 5 """simple double-keyed subset of dict methods."""
@@ -33,3 +34,46 @@ def get(self, key, default=None):
33 34 return default
34 35
35 36
  37 +def validate_url(url):
  38 + """validate a url for zeromq"""
  39 + if not isinstance(url, basestring):
  40 + raise TypeError("url must be a string, not %r"%type(url))
  41 + url = url.lower()
  42 +
  43 + proto_addr = url.split('://')
  44 + assert len(proto_addr) == 2, 'Invalid url: %r'%url
  45 + proto, addr = proto_addr
  46 + assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
  47 +
  48 + # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
  49 + # author: Remi Sabourin
  50 + pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
  51 +
  52 + if proto == 'tcp':
  53 + lis = addr.split(':')
  54 + assert len(lis) == 2, 'Invalid url: %r'%url
  55 + addr,s_port = lis
  56 + try:
  57 + port = int(s_port)
  58 + except ValueError:
  59 + raise AssertionError("Invalid port %r in url: %r"%(port, url))
  60 +
  61 + assert pat.match(addr) is not None, 'Invalid url: %r'%url
  62 +
  63 + else:
  64 + # only validate tcp urls currently
  65 + pass
  66 +
  67 + return True
  68 +
  69 +
  70 +def validate_url_container(container):
  71 + """validate a potentially nested collection of urls."""
  72 + if isinstance(container, basestring):
  73 + url = container
  74 + return validate_url(url)
  75 + elif isinstance(container, dict):
  76 + container = container.itervalues()
  77 +
  78 + for element in container:
  79 + validate_url_container(element)
16 examples/zmqontroller/logwatcher.py
@@ -19,6 +19,7 @@
19 19 # You should have received a copy of the Lesser GNU General Public License
20 20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 21
  22 +import sys
22 23 import zmq
23 24 logport = 20202
24 25 def main(topics, addrs):
@@ -26,20 +27,27 @@ def main(topics, addrs):
26 27 context = zmq.Context()
27 28 socket = context.socket(zmq.SUB)
28 29 for topic in topics:
  30 + print "Subscribing to: %r"%topic
29 31 socket.setsockopt(zmq.SUBSCRIBE, topic)
30 32 if addrs:
31 33 for addr in addrs:
32 34 print "Connecting to: ", addr
33 35 socket.connect(addr)
34 36 else:
35   - socket.bind('tcp://127.0.0.1:%i'%logport)
  37 + socket.bind('tcp://*:%i'%logport)
36 38
37 39 while True:
38 40 # topic = socket.recv()
39 41 # print topic
40   - topic, msg = socket.recv_multipart()
41   - # msg = socket.recv_pyobj()
42   - print "%s | %s " % (topic, msg),
  42 + # print 'tic'