forked from yahoo/graphkit
/
netop.py
430 lines (357 loc) · 14.5 KB
/
netop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
# Copyright 2016, Yahoo Inc.
# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms.
"""About :term:`network operation`\\s (those based on graphs)"""
import logging
import re
from collections import abc
from typing import Any, Callable, Mapping
import networkx as nx
from boltons.setutils import IndexedSet as iset
from .base import Items, Plotter, UNSET, aslist, astuple, jetsam
from .config import reset_abort
from .modifiers import optional, sideffect
from .network import ExecutionPlan, Network, NodePredicate, Solution, yield_ops
from .op import FunctionalOperation, Operation, reparse_operation_data
log = logging.getLogger(__name__)
def _id_bool(b):
return hash(bool(b)) + 1
def _id_tristate_bool(b):
return 3 if b is None else (hash(bool(b)) + 1)
def _make_network(
operations,
rescheduled=None,
endured=None,
parallel=None,
marshalled=None,
merge=None,
node_props=None,
):
def proc_op(op, parent=None):
"""clone FuncOperation with certain props changed"""
assert isinstance(op, FunctionalOperation), op
## Convey any node-props specified in the netop here
# to all sub-operations.
#
if (
node_props
or (not merge and parent)
or rescheduled is not None
or endured is not None
or parallel is not None
or marshalled is not None
):
kw = {
k: v
for k, v in [
("rescheduled", rescheduled),
("endured", endured),
("parallel", parallel),
("marshalled", marshalled),
]
if v is not None
}
if node_props:
op_node_props = op.node_props.copy()
op_node_props.update(node_props)
kw["node_props"] = op_node_props
## If `merge` asked, leave original `name` to deduplicate operations,
# otherwise rename the op by prefixing them with their parent netop.
#
if not merge and parent:
kw["parents"] = (parent,) + (op.parents or ())
op = op.withset(**kw)
return op
merge_set = iset() # Preseve given node order.
for op in operations:
if isinstance(op, NetworkOperation):
merge_set.update(
proc_op(s, op.name) for s in op.net.graph if isinstance(s, Operation)
)
else:
merge_set.add(proc_op(op))
assert all(bool(n) for n in merge_set)
net = Network(*merge_set)
return net
class NetworkOperation(Operation, Plotter):
"""
An operation that can :term:`compute` a network-graph of operations.
.. Tip::
Use :func:`compose()` factory to prepare the `net` and build
instances of this class.
"""
#: The name for the new netop, used when nesting them.
name = None
#: Will prune `net` against these possible outputs when :meth:`compute()` called.
outputs = None
#: The :term:`node predicate` is a 2-argument callable(op, node-data)
#: that should return true for nodes to include; if None, all nodes included.
predicate = None
#: The execution_plan of the last call to compute(), stored as debugging aid.
last_plan = None
#: The outputs names (possibly `None`) used to compile the :attr:`plan`.
outputs = None
def __init__(
self,
operations,
name,
*,
outputs=None,
predicate: NodePredicate = None,
rescheduled=None,
endured=None,
parallel=None,
marshalled=None,
merge=None,
node_props=None,
):
"""
For arguments, ee :meth:`withset()` & class attributes.
:raises ValueError:
if dupe operation, with msg:
*Operations may only be added once, ...*
"""
## Set data asap, for debugging, although `net.withset()` will reset them.
self.name = name
# Remember Outputs for future `compute()`?
self.outputs = outputs
self.predicate = predicate
# Prune network
self.net = _make_network(
operations, rescheduled, endured, parallel, marshalled, merge, node_props
)
self.name, self.needs, self.provides = reparse_operation_data(
self.name, self.net.needs, self.net.provides
)
def __repr__(self):
"""
Display more informative names for the Operation class
"""
clsname = type(self).__name__
needs = aslist(self.needs, "needs")
provides = aslist(self.provides, "provides")
ops = list(yield_ops(self.net.graph))
steps = "".join(f"\n +--{s}" for s in ops)
return f"{clsname}({self.name!r}, needs={needs}, provides={provides}, x{len(ops)} ops:{steps})"
def withset(
self,
outputs: Items = UNSET,
predicate: NodePredicate = UNSET,
*,
name=None,
rescheduled=None,
endured=None,
parallel=None,
marshalled=None,
) -> "NetworkOperation":
"""
Return a copy with a network pruned for the given `needs` & `provides`.
:param outputs:
Will be stored and applied on the next :meth:`compute()` or :meth:`compile()`.
If not given, the value of this instance is conveyed to the clone.
:param predicate:
Will be stored and applied on the next :meth:`compute()` or :meth:`compile()`.
If not given, the value of this instance is conveyed to the clone.
:param name:
the name for the new netop:
- if `None`, the same name is kept;
- if True, a distinct name is devised::
<old-name>-<uid>
- otherwise, the given `name` is applied.
:param rescheduled:
applies :term:`reschedule`\\d to all contained `operations`
:param endured:
applies :term:`endurance` to all contained `operations`
:param parallel:
mark all contained `operations` to be executed in :term:`parallel`
:param marshalled:
mark all contained `operations` to be :term:`marshalled <marshalling>`
(usefull when run in `parallel` with a :term:`process pool`).
:return:
A narrowed netop clone, which **MIGHT be empty!***
:raises ValueError:
- If `outputs` asked do not exist in network, with msg:
*Unknown output nodes: ...*
"""
outputs = self.outputs if outputs == UNSET else outputs
predicate = self.predicate if predicate == UNSET else predicate
if name is None:
name = self.name
elif name is True:
name = self.name
## Devise a stable UID based args.
#
uid = str(
abs(
hash(str(outputs))
^ hash(predicate)
^ (1 * _id_bool(rescheduled))
^ (2 * _id_bool(endured))
^ (4 * _id_tristate_bool(parallel))
^ (8 * _id_tristate_bool(marshalled))
)
)[:7]
m = re.match(r"^(.*)-(\d+)$", name)
if m:
name = m.group(1)
name = f"{name}-{uid}"
return NetworkOperation(
list(yield_ops(self.net.graph)),
name,
outputs=outputs,
predicate=predicate,
rescheduled=rescheduled,
endured=endured,
parallel=parallel,
marshalled=marshalled,
)
def _build_pydot(self, **kws):
"""delegate to network"""
# FIXME: demarcate netop inputs/outputs in plots.
kws.setdefault("title", self.name)
plotter = self.last_plan or self.net
return plotter._build_pydot(**kws)
def compile(
self, inputs=None, outputs=UNSET, predicate: NodePredicate = UNSET
) -> ExecutionPlan:
"""
Produce a :term:`plan` for the given args or `outputs`/`predicate` narrowed earlier.
:param named_inputs:
a string or a list of strings that should be fed to the `needs` of all operations.
:param outputs:
A string or a list of strings with all data asked to compute.
If ``None``, all possible intermediate outputs will be kept.
If not given, those set by a previous call to :meth:`withset()` or cstor are used.
:param predicate:
Will be stored and applied on the next :meth:`compute()` or :meth:`compile()`.
If not given, those set by a previous call to :meth:`withset()` or cstor are used.
:return:
the :term:`execution plan` satisfying the given `inputs`, `outputs` & `predicate`
:raises ValueError:
- If `outputs` asked do not exist in network, with msg:
*Unknown output nodes: ...*
- If solution does not contain any operations, with msg:
*Unsolvable graph: ...*
- If given `inputs` mismatched plan's :attr:`needs`, with msg:
*Plan needs more inputs...*
- If `outputs` asked cannot be produced by the :attr:`dag`, with msg:
*Impossible outputs...*
"""
outputs = self.outputs if outputs == UNSET else outputs
predicate = self.predicate if predicate == UNSET else predicate
return self.net.compile(inputs, outputs, predicate)
def compute(
self,
named_inputs: Mapping,
outputs: Items = UNSET,
predicate: NodePredicate = UNSET,
) -> Solution:
"""
Compile a plan & :term:`execute` the graph, sequentially or parallel.
.. Attention::
If intermediate :term:`compilation` is successful, the "global
:term:`abort run` flag is reset before the :term:`execution` starts.
:param named_inputs:
A maping of names --> values that will be fed to the `needs` of all operations.
Cloned, not modified.
:param outputs:
A string or a list of strings with all data asked to compute.
If ``None``, all intermediate data will be kept.
:return:
The :term:`solution` which contains the results of each operation executed
+1 for inputs in separate dictionaries.
:raises ValueError:
- If `outputs` asked do not exist in network, with msg:
*Unknown output nodes: ...*
- If plan does not contain any operations, with msg:
*Unsolvable graph: ...*
- If given `inputs` mismatched plan's :attr:`needs`, with msg:
*Plan needs more inputs...*
- If `outputs` asked cannot be produced by the :attr:`dag`, with msg:
*Impossible outputs...*
See also :meth:`.Operation.compute()`.
"""
try:
net = self.net # jetsam
outputs = self.outputs if outputs == UNSET else outputs
predicate = self.predicate if predicate == UNSET else predicate
# Build the execution plan.
log.debug("=== Compiling netop(%s)...", self.name)
self.last_plan = plan = net.compile(named_inputs.keys(), outputs, predicate)
# Restore `abort` flag for next run.
reset_abort()
solution = plan.execute(named_inputs, outputs, name=self.name)
return solution
except Exception as ex:
jetsam(ex, locals(), "plan", "solution", "outputs", network="net")
raise
def __call__(self, **input_kwargs) -> Solution:
"""
Delegates to :meth:`compute()`, respecting any narrowed `outputs`.
"""
# To respect narrowed `outputs` must send them due to recompilation.
return self.compute(input_kwargs, outputs=self.outputs)
def compose(
name,
op1,
*operations,
outputs: Items = None,
rescheduled=None,
endured=None,
parallel=None,
marshalled=None,
merge=False,
node_props=None,
) -> NetworkOperation:
"""
Composes a collection of operations into a single computation graph,
obeying the ``merge`` property, if set in the constructor.
:param str name:
A optional name for the graph being composed by this object.
:param op1:
syntactically force at least 1 operation
:param operations:
Each argument should be an operation instance created using
``operation``.
:param bool merge:
If ``True``, this compose object will attempt to merge together
``operation`` instances that represent entire computation graphs.
Specifically, if one of the ``operation`` instances passed to this
``compose`` object is itself a graph operation created by an
earlier use of ``compose`` the sub-operations in that graph are
compared against other operations passed to this ``compose``
instance (as well as the sub-operations of other graphs passed to
this ``compose`` instance). If any two operations are the same
(based on name), then that operation is computed only once, instead
of multiple times (one for each time the operation appears).
:param rescheduled:
applies :term:`reschedule`\\d to all contained `operations`
:param endured:
applies :term:`endurance` to all contained `operations`
:param parallel:
mark all contained `operations` to be executed in :term:`parallel`
:param marshalled:
mark all contained `operations` to be :term:`marshalled <marshalling>`
(usefull when run in `parallel` with a :term:`process pool`).
:param node_props:
added as-is into NetworkX graph, to provide for filtering
by :meth:`.NetworkOperation.withset()`.
:return:
Returns a special type of operation class, which represents an
entire computation graph as a single operation.
:raises ValueError:
If the `net`` cannot produce the asked `outputs` from the given `inputs`.
"""
operations = (op1,) + operations
if not all(isinstance(op, Operation) for op in operations):
raise ValueError(f"Non-Operation instances given: {operations}")
return NetworkOperation(
operations,
name,
outputs=outputs,
rescheduled=rescheduled,
endured=endured,
parallel=parallel,
marshalled=marshalled,
merge=merge,
node_props=node_props,
)