-
Notifications
You must be signed in to change notification settings - Fork 5
/
steering_object.py
114 lines (96 loc) · 3.62 KB
/
steering_object.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import logging
import os
import copy
import adage
import shutil
from .serialize import snapshot
from .wflowstate import load_model_fromstring
from .controllers import setup_controller
from .utils import setupbackend_fromstring, prepare_meta
from .creators import handlers as creators
log = logging.getLogger(__name__)
class YadageSteering(object):
"""
high level steering object to manage worklfow execution
"""
def __init__(self, metadir, controller):
self.metadir = metadir
self.controller = controller
self.adage_kwargs = dict(workdir=os.path.join(metadir, "adage"))
@classmethod
def connect(
cls,
metadir,
ctrlstring,
ctrlopts=None,
modelsetup=None,
modelopts=None,
accept_metadir=False,
):
prepare_meta(metadir, accept_metadir)
model = None
if modelsetup:
model = load_model_fromstring(modelsetup, modelopts)
ctrl = setup_controller(model=model, controller=ctrlstring, ctrlopts=ctrlopts)
log.info("connected to model")
return cls(metadir, ctrl)
@classmethod
def create(cls, **kwargs):
dataopts = kwargs.get("dataopts") or {}
if kwargs["dataarg"].startswith("local:"):
dataarg = kwargs["dataarg"].split(":", 1)[1]
metadir = kwargs.get("metadir")
metadir = metadir or "{}/_yadage/".format(dataarg)
if dataopts.get("overwrite") and os.path.exists(metadir):
shutil.rmtree(metadir)
else:
metadir = kwargs["metadir"]
accept_metadir = kwargs.pop("accept_metadir", False)
kw = copy.deepcopy(kwargs)
kw["metadir"] = metadir
prepare_meta(
metadir, accept_metadir
) # meta must be here because data model might store stuff here
ctrl = creators["local"](**kw)
prepare_meta(metadir, accept=True) # Hack in case creator deletes meta
return cls(metadir, ctrl)
@property
def workflow(self):
"""
:return: the workflow object (from the controller)
"""
return self.controller.adageobj
def adage_argument(self, **kwargs):
"""
add keyword arguments for workflow execution (adage)
:param kwargs: adage keyword arguments (see adage documentation for options)
"""
self.adage_kwargs.update(**kwargs)
def run_adage(self, backend="auto", **adage_kwargs):
"""
execution workflow with adage based against given backend
:param backend: backend to use for packtivity processing.
"""
if backend == "auto":
# respect if the controller already has a backend wired up
self.controller.backend = (
self.controller.backend or setupbackend_fromstring("multiproc:auto")
)
log.info("backend automatically set to %s", backend)
elif backend:
self.controller.backend = backend
assert self.controller.backend
self.adage_argument(**adage_kwargs)
adage.rundag(controller=self.controller, **self.adage_kwargs)
def serialize(self):
"""
serialized workflow and backend states (stored in meta directory)
"""
snapshot(self.workflow, "{}/yadage_snapshot_workflow.json".format(self.metadir))
def visualize(self):
"""
generate workflow visualization (stored in meta directory)
"""
import yadage.visualize as visualize
visualize.write_prov_graph(self.metadir, self.workflow, vizformat="png")
visualize.write_prov_graph(self.metadir, self.workflow, vizformat="pdf")