-
Notifications
You must be signed in to change notification settings - Fork 77
/
benchmark
executable file
·167 lines (119 loc) · 4.34 KB
/
benchmark
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim: sw=4:ts=4:expandtab
from __future__ import (
absolute_import, division, print_function, unicode_literals)
import sys
from os import path as p
from functools import partial
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import Pool
from time import time, sleep
from itertools import chain
from builtins import * # noqa # pylint: disable=unused-import
sys.path.append('../riko')
from riko import get_path
from riko.bado import coroutine, return_value, react
from riko.bado.util import async_sleep
from riko.bado.itertools import async_imap
from riko.modules.fetch import pipe, async_pipe
from riko.collections import (
SyncPipe, SyncCollection, AsyncPipe, AsyncCollection, get_chunksize,
get_worker_cnt)
NUMBER = 1
LOOPS = 1
DELAY = 0.1
parent = p.join(p.abspath(p.dirname(p.dirname(__file__))), 'data')
files = [
'ouseful.xml',
'feed.xml',
'delicious.xml',
'psychemedia_delicious.xml',
'ouseful_feedburner.xml',
'TheEdTechie.xml',
'yodel.xml',
'gawker.xml',
'health.xml',
'topstories.xml',
'autoblog.xml',
'fourtitude.xml',
'greenhughes.xml',
'psychemedia_slideshare.xml']
urls = [get_path(f) for f in files]
confs = [{'url': url, 'sleep': DELAY} for url in urls]
sources = [{'url': url} for url in urls]
length = len(files)
iterable = [DELAY for x in files]
def baseline_sync():
return list(map(sleep, iterable))
def baseline_threads():
workers = get_worker_cnt(length)
chunksize = get_chunksize(length, workers)
pool = ThreadPool(workers)
return list(pool.imap_unordered(sleep, iterable, chunksize=chunksize))
def baseline_procs():
workers = get_worker_cnt(length, False)
chunksize = get_chunksize(length, workers)
pool = Pool(workers)
return list(pool.imap_unordered(sleep, iterable, chunksize=chunksize))
def sync_pipeline():
pipes = (pipe(conf=conf) for conf in confs)
return list(chain.from_iterable(pipes))
def sync_pipe():
streams = (SyncPipe('fetch', conf=conf).list for conf in confs)
return list(chain.from_iterable(streams))
def sync_collection():
return SyncCollection(sources, sleep=DELAY).list
def par_sync_collection():
return SyncCollection(sources, parallel=True, sleep=DELAY).list
def baseline_async():
return async_imap(async_sleep, iterable)
def async_pipeline():
d = async_imap(lambda conf: async_pipe(conf=conf), confs)
d.addCallbacks(list, print)
def async_pipe():
asyncCallable = lambda conf: AsyncPipe('fetch', conf=conf).list
d = async_imap(asyncCallable, confs)
d.addCallbacks(list, print)
def async_collection():
return AsyncCollection(sources, sleep=DELAY).list
def parse_results(results):
switch = {0: 'secs', 3: 'msecs', 6: 'usecs'}
best = min(results)
for places in [0, 3, 6]:
factor = pow(10, places)
if 1 / best // factor == 0:
break
return round(best * factor, 2), switch[places]
def print_time(test, max_chars, run_time, units):
padded = test.zfill(max_chars).replace('0', ' ')
msg = '%s - %i repetitions/loop, best of %i loops: %s %s'
print(msg % (padded, NUMBER, LOOPS, run_time, units))
@coroutine
def run_async(reactor, tests, max_chars):
for test in tests:
results = []
for i in range(LOOPS):
loop = 0
for j in range(NUMBER):
start = time()
yield test()
loop += time() - start
results.append(loop)
run_time, units = parse_results(results)
print_time(test.__name__, max_chars, run_time, units)
return_value(None)
if __name__ == '__main__':
from timeit import repeat
run = partial(repeat, repeat=LOOPS, number=NUMBER)
sync_tests = [
'baseline_sync', 'baseline_threads', 'baseline_procs', 'sync_pipeline',
'sync_pipe', 'sync_collection', 'par_sync_collection']
async_tests = [baseline_async, async_pipeline, async_pipe, async_collection]
combined_tests = sync_tests + [f.__name__ for f in async_tests]
max_chars = max(list(map(len, combined_tests)))
for test in sync_tests:
results = run('%s()' % test, setup='from __main__ import %s' % test)
run_time, units = parse_results(results)
print_time(test, max_chars, run_time, units)
react(run_async, [async_tests, max_chars])