forked from holoviz/panel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipeline.py
209 lines (185 loc) · 7.87 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
from __future__ import absolute_import, division, unicode_literals
import os
import param
import numpy as np
from .layout import Row, Column, HSpacer, VSpacer
from .pane import HoloViews, Markdown, Pane
from .param import Param
from .util import param_reprs
class Pipeline(param.Parameterized):
"""
Allows connecting a linear series of panels to define a workflow.
Each stage in a pipeline should declare a panel method which
returns a panel object that can be displayed and annotate its
outputs using the param.output decorator.
"""
debug = param.Boolean(default=False, precedence=-1, doc="""
Whether to raise errors, useful for debugging while building an application.""")
inherit_params = param.Boolean(default=True, precedence=-1, doc="""
Whether parameters should be inherited between pipeline stages""")
next = param.Action(default=lambda x: x.param.trigger('next'))
previous = param.Action(default=lambda x: x.param.trigger('previous'))
def __init__(self, stages=[], **params):
try:
import holoviews as hv
except:
raise ImportError('Pipeline requires holoviews to be installed')
self._stages = list(stages)
self._stage = 0
super(Pipeline, self).__init__(**params)
self._error = Markdown('')
self._states = []
self._state = None
self._progress_sel = hv.streams.Selection1D()
self._progress_sel.add_subscriber(self._set_stage)
prev_button = Param(self.param.previous, width=100)
next_button = Param(self.param.next, width=100)
prev_button.layout[0].disabled = True
self._progress_bar = Row(self._make_progress, prev_button, next_button)
spinner = Pane(os.path.join(os.path.dirname(__file__), 'assets', 'spinner.gif'))
self._spinner_layout = Row(HSpacer(), Column(VSpacer(), spinner, VSpacer()), HSpacer())
stage_layout = Row()
if len(stages):
stage_layout.append(self._init_stage())
self._layout = Column(self._progress_bar, self._error, stage_layout)
def add_stage(self, name, stage):
self._validate(stage)
self._stages.append((name, stage))
if len(self._stages) == 1:
self._layout[2].append(self._init_stage())
def _validate(self, stage):
if any(stage is s for n, s in self._stages):
raise ValueError('Stage %s is already in pipeline' % stage)
elif not ((isinstance(stage, type) and issubclass(stage, param.Parameterized))
or isinstance(stage, param.Parameterized)):
raise ValueError('Pipeline stages must be Parameterized classes or instances.')
def __repr__(self):
repr_str = 'Pipeline:'
for i, (name, stage) in enumerate(self._stages):
if isinstance(stage, param.Parameterized):
cls_name = type(stage).__name__
else:
cls_name = stage.__name__
params = ', '.join(param_reprs(stage))
repr_str += '\n [%d] %s: %s(%s)' % (i, name, cls_name, params)
return repr_str
def __getitem__(self, index):
return self._stages[index][1]
@property
def layout(self):
self._progress_bar[0] = self._make_progress
return self._layout
def _init_stage(self):
name, stage = self._stages[self._stage]
kwargs = {}
if self._state:
results = {}
for name, (_, method, index) in self._state.param.outputs().items():
if name not in stage.param:
continue
if method not in results:
results[method] = method()
result = results[method]
if index is not None:
result = result[index]
kwargs[name] = result
if self.inherit_params:
params = [k for k, v in self._state.param.objects('existing').items()
if v.precedence is None or v.precedence >= 0]
kwargs.update({k: v for k, v in self._state.param.get_param_values()
if k in stage.param and k != 'name' and k in params})
if isinstance(stage, param.Parameterized):
stage.set_param(**kwargs)
self._state = stage
else:
self._state = stage(**kwargs)
if len(self._states) <= self._stage:
self._states.append(self._state)
else:
self._states[self._stage] = self._state
return self._state.panel()
def _set_stage(self, index):
idx = index[0]
steps = idx-self._stage
if steps < 0:
for i in range(abs(steps)):
self.param.trigger('previous')
if self._error.object:
break
else:
for i in range(steps):
self.param.trigger('next')
if self._error.object:
break
def _update_button(self):
# Disable previous button
if self._stage == 0:
self._progress_bar[1].layout[0].disabled = True
else:
self._progress_bar[1].layout[0].disabled = False
# Disable next button
if self._stage == len(self._stages)-1:
self._progress_bar[2].layout[0].disabled = True
else:
self._progress_bar[2].layout[0].disabled = False
@param.depends('next', watch=True)
def _next(self):
self._stage += 1
prev_state = self._layout[2][0]
self._layout[2][0] = self._spinner_layout
try:
new_stage = self._init_stage()
self._layout[2][0] = new_stage
self._update_button()
except Exception as e:
self._stage -= 1
self._state = prev_state
self._error.object = ('Next stage raised following error:\n\n\t%s: %s'
% (type(e).__name__, str(e)))
self._layout[2][0] = prev_state
if self.debug:
raise e
return e
else:
self._error.object = ''
@param.depends('previous', watch=True)
def _previous(self):
self._stage -= 1
old_stage = self._layout[2][0]
try:
self._state = self._states[self._stage]
self._layout[2][0] = self._state.panel()
self._update_button()
except Exception as e:
self._stage += 1
self._state = old_stage
self._error.object = ('Previous stage raised following error:\n\n\t%s: %s'
% (type(e).__name__, str(e)))
if self.debug:
raise e
else:
self._error.object = ''
@param.depends('previous', 'next')
def _make_progress(self):
import holoviews as hv
import holoviews.plotting.bokeh # noqa
stages = len(self._stages)
line = hv.Path([[(0, 0), (stages-1, 0)]]).options(
line_width=6, color='black', backend='bokeh'
)
vals = np.arange(stages)
active = [1 if v == self._stage else 0 for v in vals]
points = hv.Points((vals, np.zeros(stages), active), vdims=['active']).options(
color_index='active', line_color='black', cmap={0: 'white', 1: '#5cb85c'},
show_legend=False, size=20, default_tools=[], tools=['tap'],
nonselection_alpha=1, backend='bokeh'
)
point_labels = points.add_dimension('text', 0, [n for n, _ in self._stages], vdim=True)
labels = hv.Labels(point_labels).options(yoffset=-2.5, backend='bokeh')
self._progress_sel.source = points
hv_plot = (line * points * labels).options(
xaxis=None, yaxis=None, width=800, show_frame=False, toolbar=None,
height=80, xlim=(-0.5, stages-0.5), ylim=(-4, 1.5),
clone=False, backend='bokeh'
)
return HoloViews(hv_plot, backend='bokeh')