Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

scheduler progress

  • Loading branch information...
commit 7f42766d5cb46d1d28a837add0fddcaaec306c36 1 parent 3ab94ec
@minrk minrk authored
View
32 IPython/zmq/parallel/client.py
@@ -1,15 +1,24 @@
-#!/usr/bin/env python
"""A semi-synchronous Client for the ZMQ controller"""
+#-----------------------------------------------------------------------------
+# Copyright (C) 2010 The IPython Development Team
+#
+# Distributed under the terms of the BSD License. The full license is in
+# the file COPYING, distributed as part of this software.
+#-----------------------------------------------------------------------------
-import time
+#-----------------------------------------------------------------------------
+# Imports
+#-----------------------------------------------------------------------------
+import time
from pprint import pprint
+import zmq
+from zmq.eventloop import ioloop, zmqstream
+
from IPython.external.decorator import decorator
import streamsession as ss
-import zmq
-from zmq.eventloop import ioloop, zmqstream
from remotenamespace import RemoteNamespace
from view import DirectView
from dependency import Dependency, depend, require
@@ -357,7 +366,6 @@ def queue_status(self, targets=None, verbose=False):
def clear(self, targets=None, block=None):
"""clear the namespace in target(s)"""
targets = self._build_targets(targets)[0]
- print targets
for t in targets:
self.session.send(self.control_socket, 'clear_request', content={},ident=t)
error = False
@@ -377,7 +385,6 @@ def clear(self, targets=None, block=None):
def abort(self, msg_ids = None, targets=None, block=None):
"""abort the Queues of target(s)"""
targets = self._build_targets(targets)[0]
- print targets
if isinstance(msg_ids, basestring):
msg_ids = [msg_ids]
content = dict(msg_ids=msg_ids)
@@ -400,7 +407,6 @@ def abort(self, msg_ids = None, targets=None, block=None):
def kill(self, targets=None, block=None):
"""Terminates one or more engine processes."""
targets = self._build_targets(targets)[0]
- print targets
for t in targets:
self.session.send(self.control_socket, 'kill_request', content={},ident=t)
error = False
@@ -456,11 +462,20 @@ def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
"""the underlying method for applying functions in a load balanced
manner."""
block = block if block is not None else self.block
+ if isinstance(after, Dependency):
+ after = after.as_dict()
+ elif after is None:
+ after = []
+ if isinstance(follow, Dependency):
+ follow = follow.as_dict()
+ elif follow is None:
+ follow = []
+ subheader = dict(after=after, follow=follow)
bufs = ss.pack_apply_message(f,args,kwargs)
content = dict(bound=bound)
msg = self.session.send(self.task_socket, "apply_request",
- content=content, buffers=bufs)
+ content=content, buffers=bufs, subheader=subheader)
msg_id = msg['msg_id']
self.outstanding.add(msg_id)
self.history.append(msg_id)
@@ -477,7 +492,6 @@ def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
block = block if block is not None else self.block
queues,targets = self._build_targets(targets)
- print queues
bufs = ss.pack_apply_message(f,args,kwargs)
if isinstance(after, Dependency):
after = after.as_dict()
View
12 IPython/zmq/parallel/controller.py
@@ -1,11 +1,9 @@
#!/usr/bin/env python
-# encoding: utf-8
-
"""The IPython Controller with 0MQ
This is the master object that handles connections from engines, clients, and
"""
#-----------------------------------------------------------------------------
-# Copyright (C) 2008-2009 The IPython Development Team
+# Copyright (C) 2010 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
@@ -216,10 +214,10 @@ def _new_id(self):
newid += 1
return newid
-
#-----------------------------------------------------------------------------
# message validation
#-----------------------------------------------------------------------------
+
def _validate_targets(self, targets):
"""turn any valid targets argument into a list of integer ids"""
if targets is None:
@@ -262,7 +260,7 @@ def _validate_client_msg(self, msg):
#-----------------------------------------------------------------------------
- # dispatch methods (1 per socket)
+ # dispatch methods (1 per stream)
#-----------------------------------------------------------------------------
def dispatch_register_request(self, msg):
@@ -463,7 +461,7 @@ def save_task_result(self, idents, msg):
if msg_id in self.mia:
self.mia.remove(msg_id)
else:
- logger.debug("task:: unknown task %s finished"%msg_id)
+ logger.debug("task::unknown task %s finished"%msg_id)
def save_task_destination(self, idents, msg):
try:
@@ -479,7 +477,7 @@ def save_task_destination(self, idents, msg):
if queue_id == engine_uuid:
break
- logger.info("task:: task %s arrived on %s"%(msg_id, eid))
+ logger.info("task::task %s arrived on %s"%(msg_id, eid))
if msg_id in self.mia:
self.mia.remove(msg_id)
else:
View
19 IPython/zmq/parallel/scheduler.py
@@ -24,6 +24,7 @@
def logged(f,self,*args,**kwargs):
print ("#--------------------")
print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
+ print ("#--")
return f(self,*args, **kwargs)
#----------------------------------------------------------------------
@@ -91,7 +92,7 @@ def leastload(loads):
# Classes
#---------------------------------------------------------------------
class TaskScheduler(object):
- """Simple Python TaskScheduler object.
+ """Python TaskScheduler object.
This is the simplest object that supports msg_id based
DAG dependencies. *Only* task msg_ids are checked, not
@@ -136,6 +137,7 @@ def __init__(self, client_stream, engine_stream, mon_stream,
self.completed = {}
self.pending = {}
self.all_done = set()
+ self.blacklist = {}
self.targets = []
self.loads = []
@@ -221,7 +223,6 @@ def dispatch_submission(self, raw_msg):
return
msg = self.session.unpack_message(msg, content=False, copy=False)
- print idents,msg
header = msg['header']
msg_id = header['msg_id']
after = Dependency(header.get('after', []))
@@ -233,7 +234,6 @@ def dispatch_submission(self, raw_msg):
after = Dependency([])
follow = Dependency(header.get('follow', []))
- print raw_msg
if len(after) == 0:
# time deps already met, try to run
if not self.maybe_run(msg_id, raw_msg, follow):
@@ -268,9 +268,11 @@ def save_unmet(self, msg_id, msg, after, follow):
self.depending[msg_id] = (msg_id,msg,after,follow)
# track the ids in both follow/after, but not those already completed
for dep_id in after.union(follow).difference(self.all_done):
+ print dep_id
if dep_id not in self.dependencies:
self.dependencies[dep_id] = set()
self.dependencies[dep_id].add(msg_id)
+
@logged
def submit_task(self, msg_id, msg, follow=None, indices=None):
"""submit a task to any of a subset of our targets"""
@@ -283,8 +285,8 @@ def submit_task(self, msg_id, msg, follow=None, indices=None):
idx = indices[idx]
target = self.targets[idx]
print target, map(str, msg[:3])
- self.engine_stream.socket.send(target, flags=zmq.SNDMORE, copy=False)
- self.engine_stream.socket.send_multipart(msg, copy=False)
+ self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
+ self.engine_stream.send_multipart(msg, copy=False)
self.add_job(idx)
self.pending[target][msg_id] = (msg, follow)
@@ -305,7 +307,7 @@ def dispatch_result(self, raw_msg):
# send to monitor
self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
else:
- self.handle_unmet_dependency(self, idents, msg['parent_header'])
+ self.handle_unmet_dependency(idents, msg['parent_header'])
@logged
def handle_result_success(self, idents, parent, raw_msg):
@@ -331,7 +333,7 @@ def handle_unmet_dependency(self, idents, parent):
self.blacklist[msg_id] = set()
self.blacklist[msg_id].add(engine)
raw_msg,follow = self.pending[engine].pop(msg_id)
- if not self.maybe_run(raw_msg, follow):
+ if not self.maybe_run(msg_id, raw_msg, follow):
# resubmit failed, put it back in our dependency tree
self.save_unmet(msg_id, raw_msg, Dependency(), follow)
pass
@@ -350,7 +352,8 @@ def update_dependencies(self, dep_id):
if self.maybe_run(msg_id, raw_msg, follow):
self.depending.pop(job)
for mid in follow:
- self.dependencies[mid].remove(msg_id)
+ if mid in self.dependencies:
+ self.dependencies[mid].remove(msg_id)
#----------------------------------------------------------------------
# methods to be overridden by subclasses
View
5 IPython/zmq/parallel/streamsession.py
@@ -438,10 +438,11 @@ def feed_identities(self, msg, copy=True):
return idents, msg
def unpack_message(self, msg, content=True, copy=True):
- """return a message object from the format
+ """Return a message object from the format
sent by self.send.
- parameters:
+ Parameters:
+ -----------
content : bool (True)
whether to unpack the content dict (True),
Please sign in to comment.
Something went wrong with that request. Please try again.