Permalink
Browse files

cleanup pass

  • Loading branch information...
1 parent 7c24f2e commit f36800d2f5fdddae511388e2d6f35c053cdf4f4a @minrk minrk committed Mar 8, 2011
@@ -0,0 +1,18 @@
+"""The IPython ZMQ-based parallel computing interface."""
+#-----------------------------------------------------------------------------
+# Copyright (C) 2011 The IPython Development Team
+#
+# Distributed under the terms of the BSD License. The full license is in
+# the file COPYING, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+#-----------------------------------------------------------------------------
+# Imports
+#-----------------------------------------------------------------------------
+
+from .asyncresult import *
+from .client import Client
+from .dependency import *
+from .remotefunction import *
+from .view import *
+
@@ -30,7 +30,7 @@ def check_ready(f, self, *args, **kwargs):
class AsyncResult(object):
"""Class for representing results of non-blocking calls.
- Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
+ Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
"""
msg_ids = None
@@ -53,7 +53,8 @@ def __repr__(self):
def _reconstruct_result(self, res):
- """
+ """Reconstruct our result from actual result list (always a list)
+
Override me in subclasses for turning a list of results
into the expected form.
"""
@@ -68,7 +69,7 @@ def get(self, timeout=-1):
If `timeout` is not ``None`` and the result does not arrive within
`timeout` seconds then ``TimeoutError`` is raised. If the
remote call raised an exception then that exception will be reraised
- by get().
+ by get() inside a `RemoteError`.
"""
if not self.ready():
self.wait(timeout)
@@ -89,6 +90,8 @@ def ready(self):
def wait(self, timeout=-1):
"""Wait until the result is available or until `timeout` seconds pass.
+
+ This method always returns None.
"""
if self._ready:
return
@@ -118,15 +121,19 @@ def successful(self):
Will raise ``AssertionError`` if the result is not ready.
"""
- assert self._ready
+ assert self.ready()
return self._success
#----------------------------------------------------------------
# Extra methods not in mp.pool.AsyncResult
#----------------------------------------------------------------
def get_dict(self, timeout=-1):
- """Get the results as a dict, keyed by engine_id."""
+ """Get the results as a dict, keyed by engine_id.
+
+ timeout behavior is described in `get()`.
+ """
+
results = self.get(timeout)
engine_ids = [ md['engine_id'] for md in self._metadata ]
bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
@@ -140,7 +147,7 @@ def get_dict(self, timeout=-1):
@property
@check_ready
def result(self):
- """result property."""
+ """result property wrapper for `get(timeout=0)`."""
return self._result
# abbreviated alias:
@@ -149,7 +156,7 @@ def result(self):
@property
@check_ready
def metadata(self):
- """metadata property."""
+ """property for accessing execution metadata."""
if self._single_result:
return self._metadata[0]
else:
@@ -186,7 +193,7 @@ def __getitem__(self, key):
@check_ready
def __getattr__(self, key):
- """getattr maps to getitem for convenient access to metadata."""
+ """getattr maps to getitem for convenient attr access to metadata."""
if key not in self._metadata[0].keys():
raise AttributeError("%r object has no attribute %r"%(
self.__class__.__name__, key))
@@ -249,7 +256,11 @@ def __iter__(self):
class AsyncHubResult(AsyncResult):
- """Class to wrap pending results that must be requested from the Hub"""
+ """Class to wrap pending results that must be requested from the Hub.
+
+ Note that waiting/polling on these objects requires polling the Hubover the network,
+ so use `AsyncHubResult.wait()` sparingly.
+ """
def wait(self, timeout=-1):
"""wait for result to complete."""
@@ -32,12 +32,13 @@
from . import error
from . import map as Map
+from . import util
from . import streamsession as ss
from .asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult
from .clusterdir import ClusterDir, ClusterDirError
from .dependency import Dependency, depend, require, dependent
-from .remotefunction import remote,parallel,ParallelFunction,RemoteFunction
-from .util import ReverseDict, disambiguate_url, validate_url
+from .remotefunction import remote, parallel, ParallelFunction, RemoteFunction
+from .util import ReverseDict, validate_url, disambiguate_url
from .view import DirectView, LoadBalancedView
#--------------------------------------------------------------------------
@@ -489,7 +490,7 @@ def connect_socket(s, url):
def _unwrap_exception(self, content):
"""unwrap exception, and remap engineid to int."""
- e = ss.unwrap_exception(content)
+ e = error.unwrap_exception(content)
if e.engine_info:
e_uuid = e.engine_info['engine_uuid']
eid = self._engines[e_uuid]
@@ -526,11 +527,11 @@ def _extract_metadata(self, header, parent, content):
md['engine_id'] = self._engines.get(md['engine_uuid'], None)
if 'date' in parent:
- md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
+ md['submitted'] = datetime.strptime(parent['date'], util.ISO8601)
if 'started' in header:
- md['started'] = datetime.strptime(header['started'], ss.ISO8601)
+ md['started'] = datetime.strptime(header['started'], util.ISO8601)
if 'date' in header:
- md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
+ md['completed'] = datetime.strptime(header['date'], util.ISO8601)
return md
def _handle_execute_reply(self, msg):
@@ -573,7 +574,7 @@ def _handle_apply_reply(self, msg):
# construct result:
if content['status'] == 'ok':
- self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
+ self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
elif content['status'] == 'aborted':
self.results[msg_id] = error.AbortedTask(msg_id)
elif content['status'] == 'resubmitted':
@@ -1055,7 +1056,7 @@ def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
after = self._build_dependency(after)
follow = self._build_dependency(follow)
subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
- bufs = ss.pack_apply_message(f,args,kwargs)
+ bufs = util.pack_apply_message(f,args,kwargs)
content = dict(bound=bound)
msg = self.session.send(self._task_socket, "apply_request",
@@ -1087,7 +1088,7 @@ def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None):
subheader = {}
content = dict(bound=bound)
- bufs = ss.pack_apply_message(f,args,kwargs)
+ bufs = util.pack_apply_message(f,args,kwargs)
msg_ids = []
for ident in idents:
@@ -1399,7 +1400,7 @@ def result_status(self, msg_ids, status_only=True):
md.update(iodict)
if rcontent['status'] == 'ok':
- res,buffers = ss.unserialize_object(buffers)
+ res,buffers = util.unserialize_object(buffers)
else:
print rcontent
res = self._unwrap_exception(rcontent)
@@ -1437,7 +1438,7 @@ def queue_status(self, targets='all', verbose=False):
status = content.pop('status')
if status != 'ok':
raise self._unwrap_exception(content)
- return ss.rekey(content)
+ return util.rekey(content)
@spinfirst
def purge_results(self, jobs=[], targets=[]):
@@ -1495,5 +1496,6 @@ def purge_results(self, jobs=[], targets=[]):
'DirectView',
'LoadBalancedView',
'AsyncResult',
- 'AsyncMapResult'
+ 'AsyncMapResult',
+ 'Reference'
]
@@ -22,7 +22,6 @@
import re
import shutil
import sys
-import warnings
from IPython.config.loader import PyFileConfigLoader
from IPython.config.configurable import Configurable
@@ -21,7 +21,7 @@
from zmq.devices import ProcessMonitoredQueue
# internal:
from IPython.utils.importstring import import_item
-from IPython.utils.traitlets import Int, Str, Instance, List, Bool
+from IPython.utils.traitlets import Int, CStr, Instance, List, Bool
from .entry_point import signal_children
from .hub import Hub, HubFactory
@@ -41,7 +41,7 @@ class ControllerFactory(HubFactory):
# internal
children = List()
- mq_class = Str('zmq.devices.ProcessMonitoredQueue')
+ mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
def _usethreads_changed(self, name, old, new):
self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
@@ -7,7 +7,25 @@
class depend(object):
- """Dependency decorator, for use with tasks."""
+ """Dependency decorator, for use with tasks.
+
+ `@depend` lets you define a function for engine dependencies
+ just like you use `apply` for tasks.
+
+
+ Examples
+ --------
+ ::
+
+ @depend(df, a,b, c=5)
+ def f(m,n,p)
+
+ view.apply(f, 1,2,3)
+
+ will call df(a,b,c=5) on the engine, and if it returns False or
+ raises an UnmetDependency error, then the task will not be run
+ and another engine will be tried.
+ """
def __init__(self, f, *args, **kwargs):
self.f = f
self.args = args
@@ -39,6 +57,7 @@ def __name__(self):
return self.func_name
def _require(*names):
+ """Helper for @require decorator."""
for name in names:
try:
__import__(name)
@@ -47,12 +66,35 @@ def _require(*names):
return True
def require(*names):
+ """Simple decorator for requiring names to be importable.
+
+ Examples
+ --------
+
+ In [1]: @require('numpy')
+ ...: def norm(a):
+ ...: import numpy
+ ...: return numpy.linalg.norm(a,2)
+ """
return depend(_require, *names)
class Dependency(set):
"""An object for representing a set of msg_id dependencies.
- Subclassed from set()."""
+ Subclassed from set().
+
+ Parameters
+ ----------
+ dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict()
+ The msg_ids to depend on
+ all : bool [default True]
+ Whether the dependency should be considered met when *all* depending tasks have completed
+ or only when *any* have been completed.
+ success_only : bool [default True]
+ Whether to consider only successes for Dependencies, or consider failures as well.
+ If `all=success_only=True`, then this task will fail with an ImpossibleDependency
+ as soon as the first depended-upon task fails.
+ """
all=True
success_only=True
@@ -45,15 +45,15 @@
from datetime import datetime
filters = {
- '$eq' : lambda a,b: a==b,
'$lt' : lambda a,b: a < b,
'$gt' : lambda a,b: b > a,
+ '$eq' : lambda a,b: a == b,
+ '$ne' : lambda a,b: a != b,
'$lte': lambda a,b: a <= b,
'$gte': lambda a,b: a >= b,
- '$ne' : lambda a,b: not a==b,
'$in' : lambda a,b: a in b,
'$nin': lambda a,b: a not in b,
- '$all' : lambda a,b: all([ a in bb for bb in b ]),
+ '$all': lambda a,b: all([ a in bb for bb in b ]),
'$mod': lambda a,b: a%b[0] == b[1],
'$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
}
@@ -1,21 +1,17 @@
#!/usr/bin/env python
"""A simple engine that talks to a controller over 0MQ.
it handles registration, etc. and launches a kernel
-connected to the Controller's queue(s).
+connected to the Controller's Schedulers.
"""
from __future__ import print_function
-import logging
import sys
import time
-import uuid
-from pprint import pprint
import zmq
from zmq.eventloop import ioloop, zmqstream
# internal
-from IPython.config.configurable import Configurable
from IPython.utils.traitlets import Instance, Str, Dict, Int, Type, CFloat
# from IPython.utils.localinterfaces import LOCALHOST
@@ -25,10 +21,6 @@
from .streamsession import Message
from .util import disambiguate_url
-def printer(*msg):
- # print (self.log.handlers, file=sys.__stdout__)
- self.log.info(str(msg))
-
class EngineFactory(RegistrationFactory):
"""IPython engine"""
@@ -3,6 +3,9 @@
"""Classes and functions for kernel related errors and exceptions."""
from __future__ import print_function
+import sys
+import traceback
+
__docformat__ = "restructuredtext en"
# Tell nose to skip this module
@@ -290,3 +293,21 @@ def collect_exceptions(rdict_or_list, method='unspecified'):
except CompositeError as e:
raise e
+def wrap_exception(engine_info={}):
+ etype, evalue, tb = sys.exc_info()
+ stb = traceback.format_exception(etype, evalue, tb)
+ exc_content = {
+ 'status' : 'error',
+ 'traceback' : stb,
+ 'ename' : unicode(etype.__name__),
+ 'evalue' : unicode(evalue),
+ 'engine_info' : engine_info
+ }
+ return exc_content
+
+def unwrap_exception(content):
+ err = RemoteError(content['ename'], content['evalue'],
+ ''.join(content['traceback']),
+ content.get('engine_info', {}))
+ return err
+
Oops, something went wrong.

0 comments on commit f36800d

Please sign in to comment.