/
procset.py
316 lines (276 loc) · 10.8 KB
/
procset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
"""
The procset for a set of procs
"""
import inspect
from types import GeneratorType
from fnmatch import fnmatch, filter as fnfilter
from diot import Diot, OrderedDiot
from varname import varname
class Proxy(list):
"""
A proxy class extended from list to enable dot access
to all members and set attributes for all members.
"""
def __getattr__(self, item):
try:
# Get the attributes of list
return getattr(super(Proxy, self), item)
except AttributeError:
return self.__class__(getattr(proxy, item) for proxy in self)
def __setattr__(self, name, value):
# We are unable to setattr of existing attribute of list
#if hasattr(super(Proxy, self), name):
# super(Proxy, self).__setattr__(name, value)
if isinstance(value, Values):
for i, val in enumerate(value):
setattr(self[i], name, val)
else:
for proxy in self:
setattr(proxy, name, value)
def __getitem__(self, item):
if isinstance(item, int):
return super(Proxy, self).__getitem__(item)
if isinstance(item, slice):
return self.__class__(super(Proxy, self).__getitem__(item))
return self.__getattr__(item)
def __setitem__(self, item, value):
if isinstance(item, (int, slice)):
return super(Proxy, self).__setitem__(item, value)
return self.__setattr__(item, value)
def add(self, anything):
"""
Add elements to the list.
@params:
`anything`: anything that is to be added.
If it is a Proxy, element will be added individually
Otherwise the whole `anything` will be added as one element.
"""
if not anything:
return
if isinstance(anything, Proxy):
for thing in anything:
self.add(thing)
elif anything not in self:
self.append(anything)
class Values(Proxy):
"""A Proxy class but element can be passed one by one"""
def __init__(self, *args, **kwargs):
super().__init__(args, **kwargs)
class PSProxy:
"""A Proxy for procset"""
def __init__(self, procset, path=None):
self.__dict__['procset'] = procset
self.__dict__['path'] = path or []
def _delegated_attrs(self, attr_to_set):
path_to_check = '.'.join(self.path + [attr_to_set])
for dele_name in self.procset.delegates.keys():
if fnmatch(path_to_check, dele_name):
procs = self.procset.delegated(dele_name)
break
else:
procs = Proxy(self.procset.procs.values())
for pat in self.path:
procs = getattr(procs, pat)
return procs
def __getattr__(self, item):
self.path.append(item)
return self
def __setattr__(self, name, value):
attrs = self._delegated_attrs(name)
if isinstance(value, Values):
for i, val in enumerate(value):
setattr(attrs[i], name, val)
else:
for attr in attrs:
setattr(attr, name, value)
class ProcSet:
"""@API
The ProcSet for a set of processes
"""
# pylint: disable=redefined-builtin,invalid-name
def __init__(self, *procs, id=None, tag=None, copy=True, depends=True):
"""@API
Constructor
@params:
*procs (Proc) : the set of processes
**kwargs: Other arguments to instantiate a `ProcSet`
depends (bool): Whether auto deduce depends.
Default: `True`
id (str): The id of the procset.
Default: `None` (the variable name)
tag (str): The tag of the processes. Default: `None`
copy (bool): Whether copy the processes or just use them.
Default: `True`
"""
self.__dict__['id'] = id or varname()
self.__dict__['tag'] = tag
self.__dict__['starts'] = Proxy()
self.__dict__['ends'] = Proxy()
self.__dict__['delegates'] = OrderedDiot(diot_nest=False)
self.__dict__['procs'] = OrderedDiot(diot_nest=False)
self.__dict__['modules'] = Diot(diot_nest=False)
# save initial states before a module is called
# states will be resumed before each module is called
self.__dict__['initials'] = Diot(diot_nest=False)
prevproc = None
for proc in procs:
assert hasattr(proc, 'id') and hasattr(proc, 'tag'), (
'Argument has to be a Proc object: %r.' % proc
)
if copy:
self.procs[proc.id] = proc.copy(
proc.id,
tag=(self.tag or proc.tag.split('@', 1)[0]) + '@' +
self.id)
else:
self.procs[proc.id] = proc
proc.tag = (self.tag
or proc.tag.split('@', 1)[0]) + '@' + self.id
if depends and prevproc is None:
self.starts.add(self[proc.id])
if depends and prevproc:
self.procs[proc.id].depends = prevproc
prevproc = self.procs[proc.id]
if depends and prevproc:
self.ends.add(prevproc)
self.delegate('input', 'starts')
self.delegate('depends', 'starts')
# export functions moved to plugins
# self.delegate('ex*', 'ends')
def delegate(self, attr, *procs):
"""@API
Delegate process attributes to procset.
@params:
*procs (str|Proc): The first argument is the name of the attributes.
- The rest of them should be `Proc`s or `Proc` selectors.
"""
procs = list(procs)
self.delegates[attr] = procs
def delegated(self, name):
"""@API
Get the detegated processes by specific attribute name
@params:
name (str): the attribute name to query
@returns:
(Proxy): The set of processes
"""
if name not in self.delegates:
return None
return self[self.delegates[name]]
def restore_states(self):
"""@API
Restore the initial state of a procset
"""
if not self.initials: # extract the inital states
self.initials.starts = self.starts[:]
self.initials.ends = self.ends[:]
self.initials.depends = {
pid: proc.depends
for pid, proc in self.procs.items()
}
else:
self.__dict__['starts'] = self.initials.starts[:]
self.__dict__['ends'] = self.initials.ends[:]
for pid, depends in self.initials.items():
self.procs[pid] = depends
def module(self, name):
"""@API
A decorator used to define a module.
@params:
name (callable|str): The function to be decorated or
the name of the module.
@returns:
(callable): The decorator
"""
if callable(name):
funcname = name.__name__
if funcname.startswith(self.id + '_'):
funcname = funcname[(len(self.id) + 1):]
return self.module(funcname)(name)
def decorator(func):
signature = inspect.signature(func)
defaults = {
key: val.default
for key, val in signature.parameters.items()
if val.default is not inspect.Parameter.empty
}
def modfun(*args, **kwargs):
if kwargs.get('restore', defaults.get('restore', True)):
self.restore_states()
func(self, *args, **kwargs)
self.modules[name] = modfun
return self.modules[name]
return decorator
# pylint: disable=arguments-differ,redefined-builtin,unused-argument,invalid-name
def copy(self, id=None, tag=None, depends=True):
"""@API
Like `proc`'s `copy` function, copy a procset.
Each processes will be copied.
@params:
id (str): Use a different id if you don't
want to use the variant name
tag (str): The new tag of all copied processes
depends (bool): Whether to copy the dependencies or not.
Default: True
- dependences for processes in starts will not be copied
@returns:
(ProcSet): The new procset
"""
id = id or varname()
ret = self.__class__(*self.procs.values(),
id=id,
tag=tag,
copy=True,
depends=False)
if depends:
for proc in ret.procs.values():
proc.depends = [
ret.procs[dep.id] if dep is self.procs[dep.id] else dep
for dep in self.procs[proc.id].depends
]
ret.starts.add(Proxy(ret.procs[proc.id] for proc in self.starts))
ret.ends.add(Proxy(ret.procs[proc.id] for proc in self.ends))
return ret
def __repr__(self):
return 'ProcSet(name="%s%s")' % (self.id,
'.' + self.tag if self.tag else '')
def __setattr__(self, item, value):
if item in ('starts', 'ends'):
self.__dict__[item] = self[value]
elif item in ('id', 'tag'):
self.__dict__[item] = value
else:
PSProxy(procset=self).__setattr__(item, value)
def __getattr__(self, item):
if item in self.__dict__:
return self.__dict__[item]
if item in self.procs:
return self.procs[item]
return PSProxy(procset=self, path=[item])
def __getitem__(self, item, _ignore_default=True): # pylint:disable=too-many-return-statements
"""@API:
Process selector, always return Proxy object
@params:
item (any): The process selector.
@returns:
(Proxy): The processes match the item."""
if item in ('starts', 'ends'):
return self.__getattr__(item)
if hasattr(item, 'id') and hasattr(
item, 'tag') and not isinstance(item, ProcSet):
return Proxy([self.procs[item.id]])
if isinstance(item, slice):
return Proxy(
self.__getattr__(procid)
for procid in list(self.procs.keys())[item])
if isinstance(item, int):
return self[list(self.procs.keys())[item]]
if isinstance(item, (tuple, list, GeneratorType)):
ret = Proxy()
ret.add(Proxy(it for itm in item for it in self[itm]))
return ret
if item in self.procs:
return Proxy([self.procs[item]])
if ',' in item:
return self[(it.strip() for it in item.split(','))]
return self[fnfilter(self.procs.keys(), item)]