Permalink
Browse files

added dependencies & Python scheduler

  • Loading branch information...
1 parent 48c8946 commit 48fdd06d6c3165d212df13048b88d2a120735961 @minrk minrk committed Oct 20, 2010
Showing with 501 additions and 13 deletions.
  1. +50 −12 IPython/zmq/parallel/client.py
  2. +50 −1 IPython/zmq/parallel/dependency.py
  3. +401 −0 IPython/zmq/parallel/scheduler.py
@@ -2,19 +2,17 @@
"""A semi-synchronous Client for the ZMQ controller"""
import time
-import threading
from pprint import pprint
-from functools import wraps
-
from IPython.external.decorator import decorator
import streamsession as ss
import zmq
-
+from zmq.eventloop import ioloop, zmqstream
from remotenamespace import RemoteNamespace
from view import DirectView
+from dependency import Dependency, depend, require
def _push(ns):
globals().update(ns)
@@ -147,13 +145,13 @@ def __init__(self, addr, context=None, username=None, debug=False):
self.history = []
self.debug = debug
self.session.debug = debug
- self._connect()
self._notification_handlers = {'registration_notification' : self._register_engine,
'unregistration_notification' : self._unregister_engine,
}
self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
'apply_reply' : self._handle_apply_reply}
+ self._connect()
@property
@@ -453,7 +451,8 @@ def run(self, code, block=None):
result = self.apply(execute, (code,), targets=None, block=block, bound=False)
return result
- def _apply_balanced(self, f, args, kwargs, bound=True, block=None):
+ def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
+ after=None, follow=None):
"""the underlying method for applying functions in a load balanced
manner."""
block = block if block is not None else self.block
@@ -471,17 +470,29 @@ def _apply_balanced(self, f, args, kwargs, bound=True, block=None):
else:
return msg_id
- def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
+ def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
+ after=None, follow=None):
"""Then underlying method for applying functions to specific engines."""
+
block = block if block is not None else self.block
+
queues,targets = self._build_targets(targets)
print queues
bufs = ss.pack_apply_message(f,args,kwargs)
+ if isinstance(after, Dependency):
+ after = after.as_dict()
+ elif after is None:
+ after = []
+ if isinstance(follow, Dependency):
+ follow = follow.as_dict()
+ elif follow is None:
+ follow = []
+ subheader = dict(after=after, follow=follow)
content = dict(bound=bound)
msg_ids = []
for queue in queues:
msg = self.session.send(self.queue_socket, "apply_request",
- content=content, buffers=bufs,ident=queue)
+ content=content, buffers=bufs,ident=queue, subheader=subheader)
msg_id = msg['msg_id']
self.outstanding.add(msg_id)
self.history.append(msg_id)
@@ -501,25 +512,31 @@ def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
result[target] = self.results[mid]
return result
- def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None):
+ def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
+ after=None, follow=None):
"""calls f(*args, **kwargs) on a remote engine(s), returning the result.
if self.block is False:
returns msg_id or list of msg_ids
else:
returns actual result of f(*args, **kwargs)
"""
+ # enforce types of f,args,kwrags
args = args if args is not None else []
kwargs = kwargs if kwargs is not None else {}
+ if not callable(f):
+ raise TypeError("f must be callable, not %s"%type(f))
if not isinstance(args, (tuple, list)):
raise TypeError("args must be tuple or list, not %s"%type(args))
if not isinstance(kwargs, dict):
raise TypeError("kwargs must be dict, not %s"%type(kwargs))
+
+ options = dict(bound=bound, block=block, after=after, follow=follow)
+
if targets is None:
- return self._apply_balanced(f,args,kwargs,bound=bound, block=block)
+ return self._apply_balanced(f, args, kwargs, **options)
else:
- return self._apply_direct(f, args, kwargs,
- bound=bound,block=block, targets=targets)
+ return self._apply_direct(f, args, kwargs, targets=targets, **options)
def push(self, ns, targets=None, block=None):
"""push the contents of `ns` into the namespace on `target`"""
@@ -598,5 +615,26 @@ def get_results(self, msg_ids,status_only=False):
# else:
# break
return msg['content']
+
+class AsynClient(Client):
+ """An Asynchronous client, using the Tornado Event Loop"""
+ io_loop = None
+ queue_stream = None
+ notifier_stream = None
+
+ def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
+ Client.__init__(self, addr, context, username, debug)
+ if io_loop is None:
+ io_loop = ioloop.IOLoop.instance()
+ self.io_loop = io_loop
+
+ self.queue_stream = zmqstream.ZMQStream(self.queue_socket, io_loop)
+ self.control_stream = zmqstream.ZMQStream(self.control_socket, io_loop)
+ self.task_stream = zmqstream.ZMQStream(self.task_socket, io_loop)
+ self.notification_stream = zmqstream.ZMQStream(self.notification_socket, io_loop)
+ def spin(self):
+ for stream in (self.queue_stream, self.notifier_stream,
+ self.task_stream, self.control_stream):
+ stream.flush()
@@ -50,6 +50,55 @@ def __call__(self, *args, **kwargs):
raise UnmetDependency()
return self.f(*args, **kwargs)
+def _require(*names):
+ for name in names:
+ try:
+ __import__(name)
+ except ImportError:
+ return False
+ return True
-__all__ = ['UnmetDependency', 'depend', 'evaluate_dependencies']
+def require(*names):
+ return depend(_require, *names)
+
+class Dependency(set):
+ """An object for representing a set of dependencies.
+
+ Subclassed from set()."""
+
+ mode='all'
+
+ def __init__(self, dependencies=[], mode='all'):
+ if isinstance(dependencies, dict):
+ # load from dict
+ dependencies = dependencies.get('dependencies', [])
+ mode = dependencies.get('mode', mode)
+ set.__init__(self, dependencies)
+ self.mode = mode.lower()
+ if self.mode not in ('any', 'all'):
+ raise NotImplementedError("Only any|all supported, not %r"%mode)
+
+ def check(self, completed):
+ if len(self) == 0:
+ return True
+ if self.mode == 'all':
+ for dep in self:
+ if dep not in completed:
+ return False
+ return True
+ elif self.mode == 'any':
+ for com in completed:
+ if com in self.dependencies:
+ return True
+ return False
+
+ def as_dict(self):
+ """Represent this dependency as a dict. For json compatibility."""
+ return dict(
+ dependencies=list(self),
+ mode=self.mode
+ )
+
+
+__all__ = ['UnmetDependency', 'depend', 'require', 'Dependency']
Oops, something went wrong.

0 comments on commit 48fdd06

Please sign in to comment.