-
Notifications
You must be signed in to change notification settings - Fork 14
/
widgets.py
227 lines (185 loc) · 8.44 KB
/
widgets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import sys, math, time
from unittest import SkipTest
try:
import IPython
from IPython.core.display import clear_output
except:
clear_output = None
raise SkipTest("IPython extension requires IPython >= 0.12")
# IPython 0.13 does not have version_info
ipython2 = hasattr(IPython, 'version_info') and (IPython.version_info[0] == 2)
import param
from ..core.util import ProgressIndicator
class ProgressBar(ProgressIndicator):
"""
A simple text progress bar suitable for both the IPython notebook
and the IPython interactive prompt.
ProgressBars are automatically nested if a previous instantiated
progress bars has not achieved 100% completion.
"""
display = param.ObjectSelector(default='stdout',
objects=['stdout', 'disabled', 'broadcast'], doc="""
Parameter to control display of the progress bar. By default,
progress is shown on stdout but this may be disabled e.g. for
jobs that log standard output to file.
If the output mode is set to 'broadcast', a socket is opened on
a stated port to broadcast the completion percentage. The
RemoteProgress class may then be used to view the progress from
a different process.""")
width = param.Integer(default=70, doc="""
The width of the progress bar as the number of characters""")
fill_char = param.String(default='#', doc="""
The character used to fill the progress bar.""")
blank_char = param.String(default=' ', doc="""
The character for the blank portion of the progress bar.""")
elapsed_time = param.Boolean(default=True, doc="""
If enabled, the progress bar will disappear and display the
total elapsed time once 100% completion is reached.""")
cache = {}
current_progress = []
def __init__(self, **params):
self.start_time = None
self._stdout_display(0, False)
ProgressBar.current_progress.append(self)
super(ProgressBar,self).__init__(**params)
def __call__(self, percentage):
" Update the progress bar within the specified percent_range"
if self.start_time is None: self.start_time = time.time()
span = (self.percent_range[1]-self.percent_range[0])
percentage = self.percent_range[0] + ((percentage/100.0) * span)
if self.display == 'disabled': return
elif self.display == 'stdout':
if percentage==100 and self.elapsed_time:
elapsed = time.time() - self.start_time
if clear_output and not ipython2: clear_output()
if clear_output and ipython2: clear_output(wait=True)
self.out = '\r' + ('100%% %s %02d:%02d:%02d'
% (self.label.lower(), elapsed//3600,
elapsed//60, elapsed%60))
output = ''.join([pg.out for pg in self.current_progress])
sys.stdout.write(output)
else:
self._stdout_display(percentage)
if percentage == 100 and ProgressBar.current_progress:
ProgressBar.current_progress.pop()
return
if 'socket' not in self.cache:
self.cache['socket'] = self._get_socket()
if self.cache['socket'] is not None:
self.cache['socket'].send('%s|%s' % (percentage, self.label))
def _stdout_display(self, percentage, display=True):
if clear_output and not ipython2: clear_output()
if clear_output and ipython2: clear_output(wait=True)
percent_per_char = 100.0 / self.width
char_count = int(math.floor(percentage/percent_per_char)
if percentage<100.0 else self.width)
blank_count = self.width - char_count
prefix = '\n' if len(self.current_progress) > 1 else ''
self.out = prefix + ("%s[%s%s] %0.1f%%" %
(self.label+':\n' if self.label else '',
self.fill_char * char_count,
' '*len(self.fill_char) * blank_count,
percentage))
if display:
sys.stdout.write(''.join([pg.out for pg in self.current_progress]))
sys.stdout.flush()
time.sleep(0.0001)
def _get_socket(self, min_port=8080, max_port=8100, max_tries=20):
import zmq
context = zmq.Context()
sock = context.socket(zmq.PUB)
try:
port = sock.bind_to_random_port('tcp://*',
min_port=min_port,
max_port=max_port,
max_tries=max_tries)
self.message("Progress broadcast bound to port %d" % port)
return sock
except:
self.message("No suitable port found for progress broadcast.")
return None
class RemoteProgress(ProgressBar):
"""
Connect to a progress bar in a separate process with output_mode
set to 'broadcast' in order to display the results (to stdout).
"""
hostname=param.String(default='localhost', doc="""
Hostname where progress is being broadcast.""")
port = param.Integer(default=8080,
doc="""Target port on hostname.""")
def __init__(self, port, **params):
super(RemoteProgress, self).__init__(port=port, **params)
def __call__(self):
import zmq
context = zmq.Context()
sock = context.socket(zmq.SUB)
sock.setsockopt(zmq.SUBSCRIBE, '')
sock.connect('tcp://' + self.hostname +':'+str(self.port))
# Get progress via socket
percent = None
while True:
try:
message= sock.recv()
[percent_str, label] = message.split('|')
percent = float(percent_str)
self.label = label
super(RemoteProgress, self).__call__(percent)
except KeyboardInterrupt:
if percent is not None:
self.message("Exited at %.3f%% completion" % percent)
break
except:
self.message("Could not process socket message: %r"
% message)
class RunProgress(ProgressBar):
"""
RunProgress breaks up the execution of a slow running command so
that the level of completion can be displayed during execution.
This class is designed to run commands that take a single numeric
argument that acts additively. Namely, it is expected that a slow
running command 'run_hook(X+Y)' can be arbitrarily broken up into
multiple, faster executing calls 'run_hook(X)' and 'run_hook(Y)'
without affecting the overall result.
For instance, this is suitable for simulations where the numeric
argument is the simulated time - typically, advancing 10 simulated
seconds takes about twice as long as advancing by 5 seconds.
"""
interval = param.Number(default=100, doc="""
The run interval used to break up updates to the progress bar.""")
run_hook = param.Callable(default=param.Dynamic.time_fn.advance, doc="""
By default updates time in param which is very fast and does
not need a progress bar. Should be set to some slower running
callable where display of progress level is desired.""")
def __init__(self, **params):
super(RunProgress,self).__init__(**params)
def __call__(self, value):
"""
Execute the run_hook to a total of value, breaking up progress
updates by the value specified by interval.
"""
completed = 0
while (value - completed) >= self.interval:
self.run_hook(self.interval)
completed += self.interval
super(RunProgress, self).__call__(100 * (completed / float(value)))
remaining = value - completed
if remaining != 0:
self.run_hook(remaining)
super(RunProgress, self).__call__(100)
def progress(iterator, enum=False, length=None):
"""
A helper utility to display a progress bar when iterating over a
collection of a fixed length or a generator (with a declared
length).
If enum=True, then equivalent to enumerate with a progress bar.
"""
progress = ProgressBar()
length = len(iterator) if length is None else length
gen = enumerate(iterator)
while True:
i, val = next(gen)
progress((i+1.0)/length * 100)
if enum:
yield i, val
else:
yield val