Skip to content

Commit

Permalink
Allow excluding particular benchmarks on the command line.
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Oct 25, 2019
1 parent cbdf0b4 commit 23f67d4
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 68 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Expand Up @@ -45,6 +45,9 @@

- Add support for Python 3.8.

- Allow excluding particular benchmarks on the command line. For
example, ``-cold``.

0.7.0 (2019-05-31)
==================

Expand Down
13 changes: 10 additions & 3 deletions scripts/zs_matrix_graph.py
Expand Up @@ -12,6 +12,8 @@
from __future__ import division
from __future__ import print_function

# pylint:disable=too-many-locals

import argparse
import json
import os
Expand All @@ -32,13 +34,13 @@ def _fix_database(n, version=''):
result = 'PostgreSQL'
if 'zeo' in n.lower():
result = 'ZEO'
if 'fs' in n.lower() or 'filestorage' in n.lower():
elif 'fs' in n.lower() or 'filestorage' in n.lower():
result = 'FileStorage'
if version:
result = f'{result} ({version})'
return result

def suite_to_benchmark_data(args, benchmark_suite, version=''):
def suite_to_benchmark_data(_args, benchmark_suite, version=''):
"""
Return a DataFrame containing every observation.
"""
Expand All @@ -47,12 +49,17 @@ def suite_to_benchmark_data(args, benchmark_suite, version=''):

# {c=1 processes, o=100} mysqlclient_hf: read 100 hot objects'
name = benchmark.get_name()
if '(disabled)' in name:
continue

# '{c=1 processes, o=100', ' mysqlclient_hf: read 100 hot objects'
prefix, suffix = name.rsplit('}', 1)
ConcurrencyKind = 'processes' if 'processes' in prefix else 'threads'

prefix = prefix.replace(' processes', '').replace(' threads', '')
prefix = prefix.replace(' greenlets', '')
prefix += '}'

d = json.loads(prefix.replace('c', '"c"').replace('o', '"o"').replace('=', ':'))
Concurrency = d['c']
Objects = d['o']
Expand Down Expand Up @@ -158,7 +165,7 @@ def main():
matplotlib.rcParams["figure.figsize"] = 10, 5
seaborn.set(style="white")

save_all(df, outdir, args.prefixes)
save_all(df, outdir, args.versions)


if __name__ == "__main__":
Expand Down
128 changes: 99 additions & 29 deletions scripts/zs_matrix_runner.py
Expand Up @@ -18,108 +18,155 @@

import os
import subprocess
import sys
import time
import traceback
import tempfile

# The type of runner to enable, and the arguments needed
# to use it.
procs = {
'process': (),
'gevent': ('--threads', 'shared', '--gevent'),
'threads': ('--threads', 'shared'),
# 'process': (),
# 'threads': ('--threads', 'shared'),
}

# The concurrency levels.
# TODO: thread=1 and process=1 are pretty much redundant.
concs = [
1,
5,
10
20,
]

# How many objects
counts = [
1,
100,
1000
10,
100
]

# The virtual environment to use.
# Relies on virtualenvwrapper.
envs = [
'relstorage38',
#'relstorage38',
'relstorage27',
#'relstorage27-rs2',
]

if 'ZS_MATRIX_ENV' in os.environ:
envs = [os.environ['ZS_MATRIX_ENV']]

workon_home = os.environ['WORKON_HOME']
results_home = os.environ.get(
'ZS_MATRIX_RESULTS',
'~/Projects/GithubSources/zodbshootout-results'
)

# General configuration for the zodbshootout runs.
values = 3
warmups = 0
min_time_ms = 50.0 # Default is 100ms


branch = os.environ['GIT_BRANCH']

# Set this if you restart a run after fixing a problem
# and edit out entries in the matrix that already completed.
now = os.environ.get('ZS_NOW', '') or int(time.time())

def run_one(env, proc, conc, count):
child_env = os.environ.copy()
child_env['PYTHONHASHSEED'] = '6587'
child_env['PYTHONFAULTHANDLER'] = '1'
child_env['ZS_COLLECTOR_FUNC'] = 'avg'
# We don't have the logs enabled anyway, and this shows up in
# profiling.
child_env['RS_PERF_LOG_ENABLE'] = 'off'

child_env.pop('PYTHONDEVMODE', None)
child_env.pop('ZS_NO_SMOOTH', None)

smooth_results_in_process_concurrency = True
if not smooth_results_in_process_concurrency:
child_env['ZS_NO_SMOOTH'] = '1'


def run_one(
env, proc, conc, count, conf,
excluded=(),
processes=1, # How many times the whole thing is repeated.
# How many times does the function get to run its loops. If
# processes * values = 1, then it can't report a standard deviation
# or print stability warnings.
values=3,
warmups=0,
min_time_ms=20.0, # Default is 100ms
loops=3 # How many loops (* its inner loops)
): # pylint:disable=too-many-locals
if 'pypy' in env:
values = 10 # Need to JIT

if conc == 1 and count == 1:
processes += 2
min_time_ms = max(min_time_ms, 100.0)

smooth = 'smoothed' if smooth_results_in_process_concurrency else 'unsmoothed'
out_dir = os.path.expanduser(
f"{results_home}/{env}/{branch}/"
f"{now}/{proc[0]}-c{conc}-o{count}/"
f"{results_home}/{env}/{branch}/{child_env['ZS_COLLECTOR_FUNC']}/"
f"{smooth}/"
f"{now}/{proc[0]}-c{conc}-o{count}-p{processes}-v{values}-l{loops}/"
)

os.makedirs(out_dir, exist_ok=True)

# Each process (-p) runs --loops for --values times.
# Plus the initial calibration, which is always at least two
# values (begin at 1 loop and go up until you get two consecutive
# runs with the same loop count > --min-time)
# runs with the same loop count > --min-time). For small counts, it can take a substantial
# amount of time to calibrate the loop.

print("***", env, proc, conc, count)
output_path = os.path.join(out_dir, "output.json")
if os.path.exists(output_path):
print("\t", output_path, "Already exists, skipping")
return
cmd = [
os.path.expanduser(f"{workon_home}/{env}/bin/zodbshootout"),
'-q',
'--include-mapping', "no",
'--zap', 'force',
'--min-time', str(min_time_ms / 1000.0),
# '--loops', '5', # let it auto-calibrate using min-time
'--values', str(values),
'--warmups', str(warmups),
'-p', '5',
'-p', str(processes),
'-o', output_path,
'-c', str(conc),
'--object-counts', str(count)
'--object-counts', str(count),
]

if loops and conc > 1 and count > 1:
cmd.extend(('--loops', str(loops)))
else:
cmd.extend(('--min-time', str(min_time_ms / 1000.0)))

cmd.extend(proc[1])
cmd.append(os.path.expanduser("~/Projects/GithubSources/zodbshootout-results/zodb3.conf"))
cmd.append(conf)

# Set these to only run a subset of the benchmarks.

# cmd.extend([
# "add",
# "store",
# "update",
# "cold",
# "conflicts",
# "tpc",
# "im_commit",
# 'warm',
# 'new_oid',
# ])

if excluded:
cmd.append('--')
for exc in excluded:
cmd.append('-' + exc)

print("\t", ' '.join(cmd))

try:
subprocess.check_call(cmd)
subprocess.check_call(cmd, env=child_env)
except subprocess.CalledProcessError:
traceback.print_exc()
if os.path.exists(output_path):
Expand All @@ -130,16 +177,39 @@ def run_one(env, proc, conc, count):
print()

def main():
if 1 in concs and 'process' and 'threads' in procs:
blacklist = set() # {(proc_name, conc)}
if 1 in concs and 'process' in procs and 'threads' in procs:
# This is redundant.
del procs['process']
blacklist.add(('process', 1))

if len(sys.argv) > 1:
conf = sys.argv[1]
else:
conf = "~/Projects/GithubSources/zodbshootout-results/zodb3.conf"
conf = os.path.abspath(os.path.expanduser(conf))

for env in envs:
for count in counts:
for conc in concs:
for proc in sorted(procs.items()):
run_one(env, proc, conc, count)
excluded_bmarks = set()
for count in sorted(counts):
for conc in sorted(concs):
if conc == 1 and len(procs) == 1 and 'gevent' in procs:
# If we're only testing one concurrent connection,
# and we're only testing gevent by itself, then
# the test is unlikely to be interesting. (It might be interesting
# to compare gevent to thread or process to see what overhead
# the driver adds, but otherwise we want to see how it does
# concurrently).
continue

for proc in sorted(procs.items()):
if (proc[0], conc) in blacklist:
continue
run_one(env, proc, conc, count, conf, excluded=excluded_bmarks)
# Once we've done these once, they don't really change.
# They're independent of count, they don't really even
# touch the storage or DB.
excluded_bmarks.add('ex_commit')
excluded_bmarks.add('im_commit')

if __name__ == '__main__':
main()
40 changes: 33 additions & 7 deletions src/zodbshootout/_concurrent.py
Expand Up @@ -37,6 +37,9 @@

logger = __import__('logging').getLogger(__name__)

def avg(times):
return sum(times) / len(times)

@implementer(IDBBenchmark)
class AbstractConcurrentFunction(AbstractWrapper):
"""
Expand All @@ -49,7 +52,7 @@ class AbstractConcurrentFunction(AbstractWrapper):
'max': max,
'sum': sum,
'min': min,
'avg': lambda times: sum(times) / len(times)
'avg': avg,
}
collector_strategy = max

Expand Down Expand Up @@ -142,6 +145,7 @@ def _result_collector(self, times, total_duration, db_factory): # pylint:disable
class ThreadedConcurrentFunction(AbstractConcurrentFunction):
mp_strategy = 'threads'
uses_gevent = False
smooth = not os.environ.get('ZS_NO_SMOOTH')

def _result_collector(self, times, total_duration, db_factory):
# We used in-process concurrency. There may be contention to
Expand Down Expand Up @@ -172,11 +176,11 @@ def _result_collector(self, times, total_duration, db_factory):
recorded_duration = sum(times)
actual_average = actual_duration / concurrency
recorded_average = recorded_duration / concurrency
if recorded_duration > actual_duration and not os.environ.get('ZS_NO_SMOOTH'):
if recorded_duration > actual_duration and not self.smooth:
# We think we took longer than we actually did. This means
# that there was a high level of actual concurrent operations
# going on, a lot of GIL switching, or gevent switching. That's a good thing!
# it means you have a cooperative database
# it means you have a cooperative database driver.
#
# In that case, we want to treat the concurrency as essentially an extra
# 'inner_loop' for the benchmark. To do this, we find the biggest one
Expand All @@ -187,11 +191,11 @@ def _result_collector(self, times, total_duration, db_factory):
# with one type of database (fully cooperative, fully
# non-cooperative), you may not want to do this normalization.
if self.uses_gevent:
logger.info('(gevent-cooperative driver %s)', db_factory.name)
logger.debug('(gevent-cooperative driver %s)', db_factory.name)
result = max(times) / concurrency
else:
if self.uses_gevent:
logger.info('(gevent NON-cooperative driver %s)', db_factory.name)
logger.debug('(gevent NON-cooperative driver %s)', db_factory.name)
result = self.collector_strategy(times)
logger.debug(
"Actual duration of %s is %s. Recorded duration is %s. "
Expand Down Expand Up @@ -228,6 +232,15 @@ def __init__(self, data, options):
def delegate(self):
return self.workers[0]

@property
def inner_loops(self):
return self.workers[0].inner_loops

@inner_loops.setter
def inner_loops(self, nv):
for w in self.workers:
w.inner_loops = nv


class ThreadedConcurrentBenchmarkCollection(AbstractConcurrentBenchmarkCollection):
"""
Expand Down Expand Up @@ -260,9 +273,17 @@ def make_function_wrapper(self, func_name):
class _NonConcurrentFunction(AbstractConcurrentFunction):

def _distribute(self, func, arg_iter):
import gc
args = list(arg_iter)
assert len(args) == 1
return func(args[0], lambda msg: None)
# Do account for GC as part of the single-threaded function.
# This shows up better on profiling.
c = gc.collect
gc.collect = lambda *args: []
try:
return func(args[0], lambda msg: None)
finally:
gc.collect = c

def _result_collector(self, times, total_duration, db_factory):
return times
Expand All @@ -281,7 +302,12 @@ def __init__(self, data, options):

def make_function_wrapper(self, func_name):
# pylint:disable=no-value-for-parameter
return _NonConcurrentFunction(self.workers, func_name)
# Some storages are very expensive to open, and some benchmarks
# open and close storages (when they really should probably be explicitly
# clearing caches)...that makes interpreting profiles hard. So
# only open them once.

return SharedDBFunction(_NonConcurrentFunction(self.workers, func_name))


class ForkedConcurrentBenchmarkCollection(ThreadedConcurrentBenchmarkCollection):
Expand Down

0 comments on commit 23f67d4

Please sign in to comment.