Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

quiet down scheduler printing, fix dep_id check in update_dependencies

  • Loading branch information...
commit f1b3eb2310cf565e5ee871081d7bd9e260bafa97 1 parent e198829
@minrk minrk authored
Showing with 12 additions and 11 deletions.
  1. +12 −11 IPython/zmq/parallel/scheduler.py
View
23 IPython/zmq/parallel/scheduler.py
@@ -30,9 +30,9 @@
@decorator
def logged(f,self,*args,**kwargs):
- print ("#--------------------")
- print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
- print ("#--")
+ # print ("#--------------------")
+ # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
+ # print ("#--")
return f(self,*args, **kwargs)
#----------------------------------------------------------------------
@@ -234,6 +234,9 @@ def dispatch_submission(self, raw_msg):
logger.error("task::Invaid msg: %s"%msg)
return
+ # send to monitor
+ self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
+
msg = self.session.unpack_message(msg, content=False, copy=False)
header = msg['header']
msg_id = header['msg_id']
@@ -256,8 +259,6 @@ def dispatch_submission(self, raw_msg):
self.save_unmet(msg_id, raw_msg, after, follow)
else:
self.save_unmet(msg_id, raw_msg, after, follow)
- # send to monitor
- self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
@logged
def maybe_run(self, msg_id, raw_msg, follow=None):
@@ -284,7 +285,6 @@ def save_unmet(self, msg_id, msg, after, follow):
self.depending[msg_id] = (msg_id,msg,after,follow)
# track the ids in both follow/after, but not those already completed
for dep_id in after.union(follow).difference(self.all_done):
- print (dep_id)
if dep_id not in self.dependencies:
self.dependencies[dep_id] = set()
self.dependencies[dep_id].add(msg_id)
@@ -300,7 +300,7 @@ def submit_task(self, msg_id, msg, follow=None, indices=None):
if indices:
idx = indices[idx]
target = self.targets[idx]
- print (target, map(str, msg[:3]))
+ # print (target, map(str, msg[:3]))
self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
self.engine_stream.send_multipart(msg, copy=False)
self.add_job(idx)
@@ -334,7 +334,7 @@ def handle_result_success(self, idents, parent, raw_msg):
client = idents[1]
# swap_ids for XREP-XREP mirror
raw_msg[:2] = [client,engine]
- print (map(str, raw_msg[:4]))
+ # print (map(str, raw_msg[:4]))
self.client_stream.send_multipart(raw_msg, copy=False)
# now, update our data structures
msg_id = parent['msg_id']
@@ -359,14 +359,15 @@ def handle_unmet_dependency(self, idents, parent):
def update_dependencies(self, dep_id):
"""dep_id just finished. Update our dependency
table and submit any jobs that just became runable."""
+
if dep_id not in self.dependencies:
return
jobs = self.dependencies.pop(dep_id)
for job in jobs:
msg_id, raw_msg, after, follow = self.depending[job]
- if msg_id in after:
- after.remove(msg_id)
- if not after: # time deps met
+ if dep_id in after:
+ after.remove(dep_id)
+ if not after: # time deps met, maybe run
if self.maybe_run(msg_id, raw_msg, follow):
self.depending.pop(job)
for mid in follow:
Please sign in to comment.
Something went wrong with that request. Please try again.