Skip to content

Commit

Permalink
Zap databases before each add run. Fixes #42
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed May 30, 2019
1 parent 72ec443 commit 24e5348
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 80 deletions.
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
- Add an option to test the performance of blob storage. See
:issue:`29`.
- Add support for zapping file storages. See :issue:`43`.
- When zapping, do so right before running the 'add' benchmark. This
ensures that the databases are all the same size even when the same
underlying storage (e.g., MySQL databas) is used multiple times in a
configuration. Previously, the second and further uses of the same
storage would not be zapped and so would grow with the data from the
previous contender tests. See :issue:`42`.

0.6.0 (2016-12-13)
==================
Expand Down
5 changes: 4 additions & 1 deletion src/zodbshootout/_dbsupport.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def _log_cache_stats(self, db, msg=''):

def _zap_all(self):
if not self.can_zap:
logger.debug("Not asked to zap %s", self.name)
return

db = self.factory.open()
Expand All @@ -101,8 +102,10 @@ def _zap_all(self):
zap = db.storage.cleanup
db.close()
if zap is not None:
logger.info("Zapping database %s using %s", db, zap)
logger.debug("Zapping database %s using %s", db, zap)
zap()
else:
logger.debug("No way to zap database %s", self.name)
db.close()

def __repr__(self):
Expand Down
80 changes: 35 additions & 45 deletions src/zodbshootout/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,16 @@

from .interfaces import IDBBenchmark

from ._dbsupport import get_databases_from_conf_file
from ._dbsupport import BenchmarkDBFactory
from ._dbsupport import MappingFactory

from .speedtest import SpeedTestData
from .speedtest import SpeedTestWorker
from .speedtest import ForkedSpeedTestWorker
from .speedtest import pobject_base_size

from six import PY3


if PY3:
ask = input
else:
ask = raw_input # pylint:disable=undefined-variable


logger = __import__('logging').getLogger(__name__)

def _make_leak_check(options):
Expand Down Expand Up @@ -77,13 +70,7 @@ def show_leaks():

return prep_leaks, show_leaks

def _can_zap(zodb_factory, force=False):
if force:
return True

prompt = "Really destroy all data in %s? [yN] " % zodb_factory.name
resp = ask(prompt)
return resp in 'yY'

def run_with_options(runner, options):
# Do the gevent stuff ASAP
Expand Down Expand Up @@ -113,11 +100,9 @@ def will_run_add():
if options.zap and not will_run_add():
raise Exception("Cannot zap if you're not adding")

print(options.config_file)
config_databases = get_databases_from_conf_file(options.config_file)
contenders = []
for db_factory in config_databases:
can_zap = options.zap and _can_zap(db_factory, force=options.zap == 'force')
for db_factory in options.databases:
can_zap = db_factory.name in options.zap
factory = BenchmarkDBFactory(db_factory, objects_per_txn, concurrency,
can_zap=can_zap)
contenders.append((db_factory.name, factory))
Expand All @@ -139,14 +124,6 @@ def will_run_add():
objects_per_txn, concurrency)))


# TODO: Move this to the first run of each contender so we always
# zap even if we share the backing database between contenders.
for _, factory in contenders:
db = factory.open()
db.close()
db.speedtest_zap_all()


# For concurrency of 1, or if we're using forked concurrency, we
# want to take the times as reported by the benchmark functions as
# accurate: There is no other timer running that could interfere.
Expand Down Expand Up @@ -183,11 +160,6 @@ def will_run_add():
}
# TODO: Include the gevent loop implementation in the metadata.

if not options.worker and will_run_add():
# I'm the master process. Only do this (which resets everything)
# if we're going to run the add benchmark.
data.populate(db_factory)

db_benchmarks = {}
# TODO: Where to include leak prints?
for bench_descr, bench_func, bench_opt_name in (
Expand Down Expand Up @@ -383,17 +355,39 @@ def __getattr__(self, name):
return getattr(self.inner, name)

def __call__(self, loops, db_factory):
orig_db_factory = db_factory
db = db_factory()
db.setPoolSize(self.inner.concurrency)
close = db.close
db.close = lambda: None
db_factory = lambda: db
db_factory.name = orig_db_factory.name
class DbAndClose(object):
factory = db_factory
def __init__(self):
self.name = self.factory.name
self.db = None
self.reset()

def reset(self):
self.db = db = self.factory()
db.close = lambda: None
speedtest_zap_all = db.speedtest_zap_all
def shared_zap():
self.close()
speedtest_zap_all()
self.reset()
db.speedtest_zap_all = shared_zap

def close(self):
if self.db is not None:
db = self.db
self.db = None

del db.close
db.close()

def __call__(self):
return self.db

db_and_close = DbAndClose()
try:
return self.inner(loops, db_factory)
return self.inner(loops, db_and_close)
finally:
close()
db_and_close.close()


class ThreadedRunner(AbstractWrappingRunner):
Expand Down Expand Up @@ -470,11 +464,7 @@ def __call__(self, loops, db_factory):
st.print_stats()


class ForkedWorker(SpeedTestWorker):

def should_clear_all_caches(self):
return True

class ForkedRunner(ThreadedRunner):
mp_strategy = 'mp'
WorkerClass = ForkedWorker
WorkerClass = ForkedSpeedTestWorker
33 changes: 11 additions & 22 deletions src/zodbshootout/fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,23 +145,17 @@ def run(self):

def sync(self, name):
self.parent_queue.put((self.child_num, 'sync', name))
resume_time = self.child_queue.get()
now = time.time()
if now > resume_time:
raise AssertionError(
"Resume time has already passed (%fs too late). Consider "
"increasing 'MESSAGE_DELAY', which is currently set to %f."
% (now - resume_time, MESSAGE_DELAY))
# sleep until the resume time is near
delay = resume_time - time.time() - 0.1
if delay > 0:
time.sleep(delay)
# get as close as we can to the exact resume time
while time.time() < resume_time:
# On CPython, this uses a system call (select() on unix),
# and does so while allowing threads and interrupts. In
# gevent, it lets the loop cycle.
time.sleep(0.0001)
self.child_queue.get()
# Previously, we tried to wait to resume until a specific
# time. The idea being to try to maximize the actual
# concurrency. But now that the benchmarks are shorter and
# under the control of pyperf, we just wind up spending lots
# of time sleeping. And we don't really use sync in the same
# way anymore (to control concurrency; now we just use it to
# prevent errors for things like zapping the database and
# otherwise making sure something only gets done once), so
# it's fine to let children resume as soon as they get the
# message. That's what the threaded implementation does.

def __str__(self):
return "%s(%s)" % (self.__class__.__name__,
Expand Down Expand Up @@ -330,11 +324,6 @@ def distribute(func, param_iter, strategy='mp',
child_factory = Child
if strategy == 'threads':
child_factory = ThreadedChild
else:
# MP is no longer needing to sync because
# we don't do phases at the child level anymore, it's
# higher.
child_factory = SynclessChild

for child_num, param in enumerate(param_iter):
child = child_factory(child_num, parent_queue, func, param, Process, Queue)
Expand Down
53 changes: 48 additions & 5 deletions src/zodbshootout/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,37 @@

from ._pobject import pobject_base_size

class ZapAction(argparse.Action):

if str is not bytes:
ask = input
else:
ask = raw_input # pylint:disable=undefined-variable

class ContainsAll(object):

def __contains__(self, name):
return True

class ContainsIfPrompted(object):

def __contains__(self, name):
prompt = "Really destroy all data in %s? [yN] " % name
resp = ask(prompt)
return resp in 'yY'


class ZapAction(argparse.Action):
# Store an object that responds to "in" for the database name.
# This will later be replaced with a list of database names
def __call__(self, parser, namespace, values, option_string=None):
if values == 'force':
setattr(namespace, self.dest, 'force')
setattr(namespace, self.dest, ContainsAll())
elif not values:
# No argument given
setattr(namespace, self.dest, ContainsIfPrompted())
else:
setattr(namespace, self.dest, True)
setattr(namespace, self.dest, values.split(','))


def main(argv=None): # pylint:disable=too-many-statements
if argv is None:
Expand Down Expand Up @@ -73,7 +97,9 @@ def worker_args(cmd, args):
if not args.include_mapping:
cmd.extend(('--include-mapping', "false"))
if args.log:
cmd.extend(('--log', options.log))
cmd.extend(('--log', args.log))
if args.zap:
cmd.extend(('--zap', ','.join(args.zap)))
cmd.extend(env_options)
cmd.append(args.config_file.name)
cmd.extend(args.benchmarks)
Expand Down Expand Up @@ -107,11 +133,17 @@ def worker_args(cmd, args):
help="Use BTrees. An argument, if given, is the family name to use, either IO or OO."
" Specifying --btrees by itself will use an IO BTree; not specifying it will use PersistentMapping."
)
# This becomes a list of database names to zap. Empty means zap nothing,
# something besides a list means we need to prompt the user and replace the object
# with the list they agreed to.
obj_group.add_argument(
"--zap", action=ZapAction,
default=False,
default=[],
nargs='?',
help="Zap the entire RelStorage before running tests. This will destroy all data. "
"An argument of 'force' does this without prompting for all databases. "
"An argument that is a comma-separated list of databases will zap those database "
"without prompting."
)
obj_group.add_argument(
"--min-objects", dest="min_object_count",
Expand Down Expand Up @@ -206,6 +238,17 @@ def worker_args(cmd, args):
logging.basicConfig(level=lvl_map.get(options.log, logging.INFO),
format='%(asctime)s %(levelname)-5.5s [%(name)s][%(thread)d:%(process)d][%(threadName)s] %(message)s')

from ._dbsupport import get_databases_from_conf_file
databases = get_databases_from_conf_file(options.config_file)
# TODO: Allow filtering the list of databases on the command line.
options.databases = databases
if not isinstance(options.zap, list):
zappable = [db_factory.name
for db_factory in databases
if db_factory.name in options.zap]
options.zap = zappable


from ._runner import run_with_options
run_with_options(runner, options)

Expand Down

0 comments on commit 24e5348

Please sign in to comment.