/
dependency.py
159 lines (128 loc) · 4.84 KB
/
dependency.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""Dependency utilities"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
from IPython.external.decorator import decorator
from .asyncresult import AsyncResult
from .error import UnmetDependency
class depend(object):
"""Dependency decorator, for use with tasks.
`@depend` lets you define a function for engine dependencies
just like you use `apply` for tasks.
Examples
--------
::
@depend(df, a,b, c=5)
def f(m,n,p)
view.apply(f, 1,2,3)
will call df(a,b,c=5) on the engine, and if it returns False or
raises an UnmetDependency error, then the task will not be run
and another engine will be tried.
"""
def __init__(self, f, *args, **kwargs):
self.f = f
self.args = args
self.kwargs = kwargs
def __call__(self, f):
return dependent(f, self.f, *self.args, **self.kwargs)
class dependent(object):
"""A function that depends on another function.
This is an object to prevent the closure used
in traditional decorators, which are not picklable.
"""
def __init__(self, f, df, *dargs, **dkwargs):
self.f = f
self.func_name = getattr(f, '__name__', 'f')
self.df = df
self.dargs = dargs
self.dkwargs = dkwargs
def __call__(self, *args, **kwargs):
if self.df(*self.dargs, **self.dkwargs) is False:
raise UnmetDependency()
return self.f(*args, **kwargs)
@property
def __name__(self):
return self.func_name
def _require(*names):
"""Helper for @require decorator."""
for name in names:
try:
__import__(name)
except ImportError:
return False
return True
def require(*names):
"""Simple decorator for requiring names to be importable.
Examples
--------
In [1]: @require('numpy')
...: def norm(a):
...: import numpy
...: return numpy.linalg.norm(a,2)
"""
return depend(_require, *names)
class Dependency(set):
"""An object for representing a set of msg_id dependencies.
Subclassed from set().
Parameters
----------
dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict()
The msg_ids to depend on
all : bool [default True]
Whether the dependency should be considered met when *all* depending tasks have completed
or only when *any* have been completed.
success_only : bool [default True]
Whether to consider only successes for Dependencies, or consider failures as well.
If `all=success_only=True`, then this task will fail with an ImpossibleDependency
as soon as the first depended-upon task fails.
"""
all=True
success_only=True
def __init__(self, dependencies=[], all=True, success_only=True):
if isinstance(dependencies, dict):
# load from dict
all = dependencies.get('all', True)
success_only = dependencies.get('success_only', success_only)
dependencies = dependencies.get('dependencies', [])
ids = []
if isinstance(dependencies, AsyncResult):
ids.extend(AsyncResult.msg_ids)
else:
for d in dependencies:
if isinstance(d, basestring):
ids.append(d)
elif isinstance(d, AsyncResult):
ids.extend(d.msg_ids)
else:
raise TypeError("invalid dependency type: %r"%type(d))
set.__init__(self, ids)
self.all = all
self.success_only=success_only
def check(self, completed, failed=None):
if failed is not None and not self.success_only:
completed = completed.union(failed)
if len(self) == 0:
return True
if self.all:
return self.issubset(completed)
else:
return not self.isdisjoint(completed)
def unreachable(self, failed):
if len(self) == 0 or len(failed) == 0 or not self.success_only:
return False
# print self, self.success_only, self.all, failed
if self.all:
return not self.isdisjoint(failed)
else:
return self.issubset(failed)
def as_dict(self):
"""Represent this dependency as a dict. For json compatibility."""
return dict(
dependencies=list(self),
all=self.all,
success_only=self.success_only,
)
__all__ = ['depend', 'require', 'dependent', 'Dependency']