Skip to content
Browse files

cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors

  • Loading branch information...
1 parent ffe0399 commit 2676dc30cecc24166346e03ea52ed8716bb64c81 @minrk committed
Showing with 30 additions and 53 deletions.
  1. +27 −50 IPython/parallel/controller/hub.py
  2. +3 −3 IPython/parallel/controller/scheduler.py
View
77 IPython/parallel/controller/hub.py
@@ -198,9 +198,6 @@ def _transport_changed(self, name, old, new):
def __init__(self, **kwargs):
super(HubFactory, self).__init__(**kwargs)
self._update_monitor_url()
- # self.on_trait_change(self._sync_ips, 'ip')
- # self.on_trait_change(self._sync_transports, 'transport')
- # self.subconstructors.append(self.construct_hub)
def construct(self):
@@ -449,34 +446,16 @@ def _validate_targets(self, targets):
# dispatch methods (1 per stream)
#-----------------------------------------------------------------------------
- # def dispatch_registration_request(self, msg):
- # """"""
- # self.log.debug("registration::dispatch_register_request(%s)"%msg)
- # idents,msg = self.session.feed_identities(msg)
- # if not idents:
- # self.log.error("Bad Query Message: %s"%msg, exc_info=True)
- # return
- # try:
- # msg = self.session.unpack_message(msg,content=True)
- # except:
- # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
- # return
- #
- # msg_type = msg['msg_type']
- # content = msg['content']
- #
- # handler = self.query_handlers.get(msg_type, None)
- # if handler is None:
- # self.log.error("registration::got bad registration message: %s"%msg)
- # else:
- # handler(idents, msg)
def dispatch_monitor_traffic(self, msg):
"""all ME and Task queue messages come through here, as well as
IOPub traffic."""
self.log.debug("monitor traffic: %r"%msg[:2])
switch = msg[0]
- idents, msg = self.session.feed_identities(msg[1:])
+ try:
+ idents, msg = self.session.feed_identities(msg[1:])
+ except ValueError:
+ idents=[]
if not idents:
self.log.error("Bad Monitor Message: %r"%msg)
return
@@ -557,19 +536,19 @@ def handle_heart_failure(self, heart):
def save_queue_request(self, idents, msg):
if len(idents) < 2:
- self.log.error("invalid identity prefix: %s"%idents)
+ self.log.error("invalid identity prefix: %r"%idents)
return
queue_id, client_id = idents[:2]
try:
msg = self.session.unpack_message(msg, content=False)
- except:
- self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
+ except Exception:
+ self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
return
eid = self.by_ident.get(queue_id, None)
if eid is None:
self.log.error("queue::target %r not registered"%queue_id)
- self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
+ self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
return
header = msg['header']
@@ -597,21 +576,20 @@ def save_queue_request(self, idents, msg):
def save_queue_result(self, idents, msg):
if len(idents) < 2:
- self.log.error("invalid identity prefix: %s"%idents)
+ self.log.error("invalid identity prefix: %r"%idents)
return
client_id, queue_id = idents[:2]
try:
msg = self.session.unpack_message(msg, content=False)
- except:
- self.log.error("queue::engine %r sent invalid message to %r: %s"%(
+ except Exception:
+ self.log.error("queue::engine %r sent invalid message to %r: %r"%(
queue_id,client_id, msg), exc_info=True)
return
eid = self.by_ident.get(queue_id, None)
if eid is None:
self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
- # self.log.debug("queue:: %s"%msg[2:])
return
parent = msg['parent_header']
@@ -626,7 +604,7 @@ def save_queue_result(self, idents, msg):
elif msg_id not in self.all_completed:
# it could be a result from a dead engine that died before delivering the
# result
- self.log.warn("queue:: unknown msg finished %s"%msg_id)
+ self.log.warn("queue:: unknown msg finished %r"%msg_id)
return
# update record anyway, because the unregistration could have been premature
rheader = msg['header']
@@ -656,8 +634,8 @@ def save_task_request(self, idents, msg):
try:
msg = self.session.unpack_message(msg, content=False)
- except:
- self.log.error("task::client %r sent invalid task message: %s"%(
+ except Exception:
+ self.log.error("task::client %r sent invalid task message: %r"%(
client_id, msg), exc_info=True)
return
record = init_record(msg)
@@ -700,10 +678,9 @@ def save_task_result(self, idents, msg):
client_id = idents[0]
try:
msg = self.session.unpack_message(msg, content=False)
- except:
- self.log.error("task::invalid task result message send to %r: %s"%(
+ except Exception:
+ self.log.error("task::invalid task result message send to %r: %r"%(
client_id, msg), exc_info=True)
- raise
return
parent = msg['parent_header']
@@ -745,12 +722,12 @@ def save_task_result(self, idents, msg):
self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
else:
- self.log.debug("task::unknown task %s finished"%msg_id)
+ self.log.debug("task::unknown task %r finished"%msg_id)
def save_task_destination(self, idents, msg):
try:
msg = self.session.unpack_message(msg, content=True)
- except:
+ except Exception:
self.log.error("task::invalid task tracking message", exc_info=True)
return
content = msg['content']
@@ -759,11 +736,11 @@ def save_task_destination(self, idents, msg):
engine_uuid = content['engine_id']
eid = self.by_ident[engine_uuid]
- self.log.info("task::task %s arrived on %s"%(msg_id, eid))
+ self.log.info("task::task %r arrived on %r"%(msg_id, eid))
if msg_id in self.unassigned:
self.unassigned.remove(msg_id)
# else:
- # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
+ # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
self.tasks[eid].append(msg_id)
# self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
@@ -787,13 +764,13 @@ def save_iopub_message(self, topics, msg):
# print (topics)
try:
msg = self.session.unpack_message(msg, content=True)
- except:
+ except Exception:
self.log.error("iopub::invalid IOPub message", exc_info=True)
return
parent = msg['parent_header']
if not parent:
- self.log.error("iopub::invalid IOPub message: %s"%msg)
+ self.log.error("iopub::invalid IOPub message: %r"%msg)
return
msg_id = parent['msg_id']
msg_type = msg['msg_type']
@@ -833,7 +810,7 @@ def save_iopub_message(self, topics, msg):
def connection_request(self, client_id, msg):
"""Reply with connection addresses for clients."""
- self.log.info("client::client %s connected"%client_id)
+ self.log.info("client::client %r connected"%client_id)
content = dict(status='ok')
content.update(self.client_info)
jsonable = {}
@@ -905,7 +882,7 @@ def register_engine(self, reg, msg):
dc.start()
self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
else:
- self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
+ self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
return eid
def unregister_engine(self, ident, msg):
@@ -913,9 +890,9 @@ def unregister_engine(self, ident, msg):
try:
eid = msg['content']['id']
except:
- self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
+ self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
return
- self.log.info("registration::unregister_engine(%s)"%eid)
+ self.log.info("registration::unregister_engine(%r)"%eid)
# print (eid)
uuid = self.keytable[eid]
content=dict(id=eid, queue=uuid)
@@ -1135,7 +1112,7 @@ def finish(reply):
elif len(records) < len(msg_ids):
missing = [ m for m in msg_ids if m not in found_ids ]
try:
- raise KeyError("No such msg(s): %s"%missing)
+ raise KeyError("No such msg(s): %r"%missing)
except KeyError:
return finish(error.wrap_exception())
elif invalid_ids:
View
6 IPython/parallel/controller/scheduler.py
@@ -261,7 +261,6 @@ def handle_stranded_tasks(self, engine):
continue
raw_msg = lost[msg_id][0]
-
idents,msg = self.session.feed_identities(raw_msg, copy=False)
msg = self.session.unpack_message(msg, copy=False, content=False)
parent = msg['header']
@@ -294,9 +293,10 @@ def dispatch_submission(self, raw_msg):
idents, msg = self.session.feed_identities(raw_msg, copy=False)
msg = self.session.unpack_message(msg, content=False, copy=False)
except Exception:
- self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
+ self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
return
+
# send to monitor
self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
@@ -497,7 +497,7 @@ def dispatch_result(self, raw_msg):
else:
self.finish_job(idx)
except Exception:
- self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
+ self.log.error("task::Invaid result: %r"%raw_msg, exc_info=True)
return
header = msg['header']

0 comments on commit 2676dc3

Please sign in to comment.
Something went wrong with that request. Please try again.