-
Notifications
You must be signed in to change notification settings - Fork 192
/
timing.py
256 lines (209 loc) · 9.36 KB
/
timing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
from __future__ import absolute_import, print_function, division
import abc
import logging
import sys
import time
from petl.util.base import Table
from petl.util.statistics import onlinestats
def progress(table, batchsize=1000, prefix="", out=None):
"""
Report progress on rows passing through to a file or file-like object
(defaults to sys.stderr). E.g.::
>>> import petl as etl
>>> table = etl.dummytable(100000)
>>> table.progress(10000).tocsv('example.csv') # doctest: +SKIP
10000 rows in 0.13s (78363 row/s); batch in 0.13s (78363 row/s)
20000 rows in 0.22s (91679 row/s); batch in 0.09s (110448 row/s)
30000 rows in 0.31s (96573 row/s); batch in 0.09s (108114 row/s)
40000 rows in 0.40s (99535 row/s); batch in 0.09s (109625 row/s)
50000 rows in 0.49s (101396 row/s); batch in 0.09s (109591 row/s)
60000 rows in 0.59s (102245 row/s); batch in 0.09s (106709 row/s)
70000 rows in 0.68s (103221 row/s); batch in 0.09s (109498 row/s)
80000 rows in 0.77s (103810 row/s); batch in 0.09s (108126 row/s)
90000 rows in 0.90s (99465 row/s); batch in 0.13s (74516 row/s)
100000 rows in 1.02s (98409 row/s); batch in 0.11s (89821 row/s)
100000 rows in 1.02s (98402 row/s); batches in 0.10 +/- 0.02s [0.09-0.13] (100481 +/- 13340 rows/s [74516-110448])
See also :func:`petl.util.timing.clock`.
"""
return ProgressView(table, batchsize, prefix, out)
def log_progress(table, batchsize=1000, prefix="", logger=None, level=logging.INFO):
"""
Report progress on rows passing through to a python logger. If logger is
none, a new logger will be created that, by default, streams to stdout. E.g.::
>>> import petl as etl
>>> table = etl.dummytable(100000)
>>> table.log_progress(10000).tocsv('example.csv') # doctest: +SKIP
10000 rows in 0.13s (78363 row/s); batch in 0.13s (78363 row/s)
20000 rows in 0.22s (91679 row/s); batch in 0.09s (110448 row/s)
30000 rows in 0.31s (96573 row/s); batch in 0.09s (108114 row/s)
40000 rows in 0.40s (99535 row/s); batch in 0.09s (109625 row/s)
50000 rows in 0.49s (101396 row/s); batch in 0.09s (109591 row/s)
60000 rows in 0.59s (102245 row/s); batch in 0.09s (106709 row/s)
70000 rows in 0.68s (103221 row/s); batch in 0.09s (109498 row/s)
80000 rows in 0.77s (103810 row/s); batch in 0.09s (108126 row/s)
90000 rows in 0.90s (99465 row/s); batch in 0.13s (74516 row/s)
100000 rows in 1.02s (98409 row/s); batch in 0.11s (89821 row/s)
100000 rows in 1.02s (98402 row/s); batches in 0.10 +/- 0.02s [0.09-0.13] (100481 +/- 13340 rows/s [74516-110448])
See also :func:`petl.util.timing.clock`.
"""
return LoggingProgressView(table, batchsize, prefix, logger, level=level)
Table.progress = progress
Table.log_progress = log_progress
class ProgressViewBase(Table):
"""
Abstract base class for reporting on proecessing status
"""
def __init__(self, inner, batchsize, prefix):
self.inner = inner
self.batchsize = batchsize
self.prefix = prefix
@abc.abstractmethod
def print_message(self, message):
pass
def __iter__(self):
start = time.time()
batchstart = start
batchn = 0
batchtimemin, batchtimemax = None, None
batchtimemean, batchtimevar = 0, 0
batchratemean, batchratevar = 0, 0
for n, r in enumerate(self.inner):
if n % self.batchsize == 0 and n > 0:
batchn += 1
batchend = time.time()
batchtime = batchend - batchstart
if batchtimemin is None or batchtime < batchtimemin:
batchtimemin = batchtime
if batchtimemax is None or batchtime > batchtimemax:
batchtimemax = batchtime
elapsedtime = batchend - start
try:
rate = int(n / elapsedtime)
except ZeroDivisionError:
rate = 0
try:
batchrate = int(self.batchsize / batchtime)
except ZeroDivisionError:
batchrate = 0
v = (n, elapsedtime, rate, batchtime, batchrate)
message = self.prefix + \
'%s rows in %.2fs (%s row/s); ' \
'batch in %.2fs (%s row/s)' % v
self.print_message(message)
batchstart = batchend
batchtimemean, batchtimevar = \
onlinestats(batchtime, batchn, mean=batchtimemean,
variance=batchtimevar)
batchratemean, batchratevar = \
onlinestats(batchrate, batchn, mean=batchratemean,
variance=batchratevar)
yield r
# compute total elapsed time and rate
end = time.time()
elapsedtime = end - start
try:
rate = int(n / elapsedtime)
except ZeroDivisionError:
rate = 0
# construct the final message
if batchn > 1:
if batchtimemin is None:
batchtimemin = 0
if batchtimemax is None:
batchtimemax = 0
try:
batchratemin = int(self.batchsize / batchtimemax)
except ZeroDivisionError:
batchratemin = 0
try:
batchratemax = int(self.batchsize / batchtimemin)
except ZeroDivisionError:
batchratemax = 0
v = (n, elapsedtime, rate, batchtimemean, batchtimevar**.5,
batchtimemin, batchtimemax, int(batchratemean),
int(batchratevar**.5), int(batchratemin), int(batchratemax))
message = self.prefix + '%s rows in %.2fs (%s row/s); batches in ' \
'%.2f +/- %.2fs [%.2f-%.2f] ' \
'(%s +/- %s rows/s [%s-%s])' % v
else:
v = (n, elapsedtime, rate)
message = self.prefix + '%s rows in %.2fs (%s row/s)' % v
self.print_message(message)
class ProgressView(ProgressViewBase):
"""
Reports progress to a file_object like sys.stdout or a file handler
"""
def __init__(self, inner, batchsize, prefix, out):
if out is None:
self.file_object = sys.stderr
else:
self.file_object = out
super(ProgressView, self).__init__(inner, batchsize, prefix)
def print_message(self, message):
print(message, file=self.file_object)
if hasattr(self.file_object, 'flush'):
self.file_object.flush()
class LoggingProgressView(ProgressViewBase):
"""
Reports progress to a logger, log handler, or log adapter
"""
def __init__(self, inner, batchsize, prefix, logger, level=logging.INFO):
if logger is None:
self.logger = logging.getLogger(__name__)
self.logger.setLevel(level)
else:
self.logger = logger
self.level = level
super(LoggingProgressView, self).__init__(inner, batchsize, prefix)
def print_message(self, message):
self.logger.log(self.level, message)
def clock(table):
"""
Time how long is spent retrieving rows from the wrapped container. Enables
diagnosis of which steps in a pipeline are taking the most time. E.g.::
>>> import petl as etl
>>> t1 = etl.dummytable(100000)
>>> c1 = etl.clock(t1)
>>> t2 = etl.convert(c1, 'foo', lambda v: v**2)
>>> c2 = etl.clock(t2)
>>> p = etl.progress(c2, 10000)
>>> etl.tocsv(p, 'example.csv') # doctest: +SKIP
10000 rows in 0.23s (44036 row/s); batch in 0.23s (44036 row/s)
20000 rows in 0.38s (52167 row/s); batch in 0.16s (63979 row/s)
30000 rows in 0.54s (55749 row/s); batch in 0.15s (64624 row/s)
40000 rows in 0.69s (57765 row/s); batch in 0.15s (64793 row/s)
50000 rows in 0.85s (59031 row/s); batch in 0.15s (64707 row/s)
60000 rows in 1.00s (59927 row/s); batch in 0.15s (64847 row/s)
70000 rows in 1.16s (60483 row/s); batch in 0.16s (64051 row/s)
80000 rows in 1.31s (61008 row/s); batch in 0.15s (64953 row/s)
90000 rows in 1.47s (61356 row/s); batch in 0.16s (64285 row/s)
100000 rows in 1.62s (61703 row/s); batch in 0.15s (65012 row/s)
100000 rows in 1.62s (61700 row/s); batches in 0.16 +/- 0.02s [0.15-0.23] (62528 +/- 6173 rows/s [44036-65012])
>>> # time consumed retrieving rows from t1
... c1.time # doctest: +SKIP
0.7243089999999492
>>> # time consumed retrieving rows from t2
... c2.time # doctest: +SKIP
1.1704209999999766
>>> # actual time consumed by the convert step
... c2.time - c1.time # doctest: +SKIP
0.4461120000000274
See also :func:`petl.util.timing.progress`.
"""
return ClockView(table)
Table.clock = clock
class ClockView(Table):
def __init__(self, wrapped):
self.wrapped = wrapped
def __iter__(self):
self.time = 0
it = iter(self.wrapped)
while True:
before = time.clock()
try:
row = next(it)
except StopIteration:
return
after = time.clock()
self.time += (after - before)
yield row