Skip to content

Commit

Permalink
Split running contenders into their own functions.
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Oct 3, 2016
1 parent 9f5f93b commit adec9ee
Showing 1 changed file with 78 additions and 61 deletions.
139 changes: 78 additions & 61 deletions src/zodbshootout/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def _print_results(options, contenders, object_counts, results):
row.append("?")
continue

time = getattr(times[phase[1]], phase[2])
time = mean(getattr(t[phase[1]], phase[2]) for t in times)
count = (concurrency * objects_per_txn / time)
row.append('%d' % count)

Expand All @@ -168,6 +168,77 @@ def _print_results(options, contenders, object_counts, results):
for line in _align_columns(rows):
print(line)

def _run_one_repetition(options, rep, speedtest, contender_name, db_factory, db_close):
"""
Runs a single repetition of a contender.
Returns a (write_times, read_times) tuple.
"""
repetitions = options.repetitions
for attempt in range(DEFAULT_MAX_ATTEMPTS):
msg = ' Running %d/%d...' % (rep + 1, repetitions)
if attempt > 0:
msg += ' (attempt %d)' % (attempt + 1)
print(msg, end=' ', file=sys.stderr)
try:
try:
write_times, read_times = speedtest.run(
db_factory, contender_name, rep)
return write_times, read_times
finally:
db_close()
except ChildProcessError:
if attempt >= DEFAULT_MAX_ATTEMPTS - 1:
raise
continue


def _run_one_contender(options, speedtest, contender_name, db_conf):
"""
Runs the speed test *repetition* number of times.
Return a list of (write_times, read_times) tuples.
"""

def make_factory():
_db = db_conf.open()
return _db, _db.close, lambda: _db

results = []
prep_leaks, show_leaks = _make_leak_check(options)

for rep in range(options.repetitions):
if options.threads == 'shared':
_db, db_close, db_factory = make_factory()
_db.close = lambda: None
_db.pack = lambda: None
else:
db_factory = db_conf.open
db_close = lambda: None
# After the DB is opened, so modules, etc, are imported.
prep_leaks()
write_times, read_times = _run_one_repetition(options, rep, speedtest, contender_name,
db_factory, db_close)
msg = (
'add %6.4fs, update %6.4fs, '
'warm %6.4fs, cold %6.4fs, '
'hot %6.4fs, steamin %6.4fs'
% (write_times.add_time, write_times.update_time,
read_times.warm_time, read_times.cold_time,
read_times.hot_time, read_times.steamin_time))
print(msg, file=sys.stderr)
results.append((write_times, read_times))

# Clear the things we created before checking for leaks
del db_factory
del db_close
# in case it wasn't defined
_db = None
__db_close = None
show_leaks()

return results

def _zap(contenders):
for db_name, db in contenders:
db = db.open()
Expand Down Expand Up @@ -198,10 +269,8 @@ def run_with_options(options):
object_size = max(options.object_size, pobject_base_size)
concurrency_levels = options.concurrency or DEFAULT_CONCURRENCIES
profile_dir = options.profile_dir
repetitions = options.repetitions
if profile_dir and not os.path.exists(profile_dir):
os.makedirs(profile_dir)
prep_leaks, show_leaks = _make_leak_check(options)

schema = ZConfig.loadSchemaFile(StringIO(schema_xml))
config, _handler = ZConfig.loadConfigFile(schema, conf_fn)
Expand All @@ -210,16 +279,12 @@ def run_with_options(options):
if options.zap:
_zap(contenders)

# results: {(objects_per_txn, concurrency, contender): [write_times, read_times]}}
# results: {(objects_per_txn, concurrency, contender): [(write_times, read_times)]}}
results = {}


def make_factory(db_conf):
_db = db_conf.open()
return _db, lambda: _db
try:
for objects_per_txn in object_counts:
for concurrency in concurrency_levels:
for objects_per_txn in options.counts or DEFAULT_OBJ_COUNTS:
for concurrency in options.concurrency or DEFAULT_CONCURRENCIES:
speedtest = SpeedTest(
concurrency, objects_per_txn, object_size, profile_dir,
'threads' if options.threads else 'mp',
Expand All @@ -239,58 +304,10 @@ def make_factory(db_conf):
speedtest.MappingType,
concurrency, options.threads)), file=sys.stderr)

key = (objects_per_txn, concurrency, contender_name)

for rep in range(repetitions):
if options.threads == 'shared':
_db, db_factory = make_factory(db)
__db_close = _db.close
def db_close():
#import pprint; pprint.pprint(_db._storage._cache.clients_local_first[0].stats())
__db_close()

_db.close = lambda: None
_db.pack = lambda: None
else:
db_factory = db.open
db_close = lambda: None
# After the DB is opened, so modules, etc, are imported.
prep_leaks()
for attempt in range(DEFAULT_MAX_ATTEMPTS):
msg = ' Running %d/%d...' % (rep + 1, repetitions)
if attempt > 0:
msg += ' (attempt %d)' % (attempt + 1)
print(msg, end=' ', file=sys.stderr)
try:
try:
write_times, read_times = speedtest.run(
db_factory, contender_name, rep)
finally:
db_close()
except ChildProcessError:
if attempt >= DEFAULT_MAX_ATTEMPTS - 1:
raise
continue
else:
break

msg = (
'add %6.4fs, update %6.4fs, '
'warm %6.4fs, cold %6.4fs, '
'hot %6.4fs, steamin %6.4fs'
% (write_times.add_time, write_times.update_time,
read_times.warm_time, read_times.cold_time,
read_times.hot_time, read_times.steamin_time))
print(msg, file=sys.stderr)
results[key] = write_times, read_times

# Clear the things we created before checking for leaks
del db_factory
del db_close
# in case it wasn't defined
_db = None
__db_close = None
show_leaks()
key = (objects_per_txn, concurrency, contender_name)
all_times = _run_one_contender(options, speedtest, contender_name, db)
results[key] = all_times

# The finally clause causes test results to print even if the tests
# stop early.
Expand Down

0 comments on commit adec9ee

Please sign in to comment.