Skip to content

Commit

Permalink
Make test/worker.py do batch insert at setup, so that worker copy and…
Browse files Browse the repository at this point in the history
… initial insert take similiar amounts of time
  • Loading branch information
aaijazi committed May 22, 2015
1 parent 08e0d27 commit b575f2f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
11 changes: 9 additions & 2 deletions test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def wait_for_vars(name, port, var=None):
break
timeout = wait_step('waiting for /debug/vars of %s' % name, timeout)

def poll_for_vars(name, port, condition_msg, timeout=60.0, condition_fn=None):
def poll_for_vars(name, port, condition_msg, timeout=60.0, condition_fn=None, require_vars=False):
"""Polls for debug variables to exist, or match specific conditions, within a timeout.
This function polls in a tight loop, with no sleeps. This is useful for
Expand All @@ -343,9 +343,14 @@ def poll_for_vars(name, port, condition_msg, timeout=60.0, condition_fn=None):
timeout - number of seconds that we should attempt to poll for.
condition_fn - a function that takes the debug vars dict as input, and
returns a truthy value if it matches the success conditions.
require_vars - True iff we expect the vars to always exist. If True, and the
vars don't exist, we'll raise a TestError. This can be used to differentiate
between a timeout waiting for a particular condition vs if the process that
you're polling has already exited.
Raises:
TestError, if the conditions aren't met within the given timeout
TestError, if vars are required and don't exist
Returns:
dict of debug variables
Expand All @@ -356,6 +361,8 @@ def poll_for_vars(name, port, condition_msg, timeout=60.0, condition_fn=None):
raise TestError('Timed out polling for vars from %s; condition "%s" not met' % (name, condition_msg))
_vars = get_vars(port)
if _vars is None:
if require_vars:
raise TestError('Expected vars to exist on %s, but they do not; process probably exited earlier than expected.' % (name,))
continue
if condition_fn is None:
return _vars
Expand Down Expand Up @@ -396,7 +403,7 @@ def wait_for_replication_pos(tablet_a, tablet_b, timeout=60.0):
timeout = wait_step(
"%s's replication position to catch up %s's; currently at: %s, waiting to catch up to: %s" % (
tablet_b.tablet_alias, tablet_a.tablet_alias, replication_pos_b, replication_pos_a),
timeout
timeout, sleep_time=0.1
)

# vtgate helpers, assuming it always restarts on the same port
Expand Down
24 changes: 16 additions & 8 deletions test/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ def run_shard_tablets(self, shard_name, shard_tablets, create_db=True, create_ta
'test_keyspace'],
auto_log=True)

def _insert_value(self, tablet, id, msg, keyspace_id):
"""Inserts a value in the MySQL database along with the required routing comments.
def _insert_values(self, tablet, id_offset, msg, keyspace_id, num_values):
"""Inserts values in the MySQL database along with the required routing comments.
Args:
tablet - the Tablet instance to insert into
Expand All @@ -202,9 +202,14 @@ def _insert_value(self, tablet, id, msg, keyspace_id):
keyspace_id - the value of `keyspace_id` column
"""
k = "%u" % keyspace_id
values_str = ''
for i in xrange(num_values):
if i != 0:
values_str += ','
values_str += '(%u, "%s", 0x%x)' % (id_offset + i, msg, keyspace_id)
tablet.mquery('vt_test_keyspace', [
'begin',
'insert into worker_test(id, msg, keyspace_id) values(%u, "%s", 0x%x) /* EMD keyspace_id:%s user_id:%u */' % (id, msg, keyspace_id, k, id),
'insert into worker_test(id, msg, keyspace_id) values%s /* EMD keyspace_id:%s*/' % (values_str, k),
'commit'
], write=True)

Expand All @@ -224,11 +229,12 @@ def insert_values(self, tablet, num_values, num_shards, offset=0, keyspace_id_ra
"""
shard_width = keyspace_id_range / num_shards
shard_offsets = [i * shard_width for i in xrange(num_shards)]
for i in xrange(num_values):
for shard_num in xrange(num_shards):
self._insert_value(tablet, shard_offsets[shard_num] + offset + i,
for shard_num in xrange(num_shards):
self._insert_values(tablet,
shard_offsets[shard_num] + offset,
'msg-shard-%u' % shard_num,
shard_offsets[shard_num] + i)
shard_offsets[shard_num],
num_values)

def assert_shard_data_equal(self, shard_num, source_tablet, destination_tablet):
"""Asserts that a shard's data is identical on source and destination tablets.
Expand Down Expand Up @@ -369,7 +375,9 @@ def verify_successful_worker_copy_with_reparent(self, mysql_down=False):
# for your environment (trial-and-error...)
worker_vars = utils.poll_for_vars('vtworker', worker_port,
'WorkerState == cleaning up',
condition_fn=lambda v: v.get('WorkerState') == 'cleaning up')
condition_fn=lambda v: v.get('WorkerState') == 'cleaning up',
# We know that vars should already be ready, since we read them earlier
require_vars=True)

# Verify that we were forced to reresolve and retry.
self.assertGreater(worker_vars['WorkerDestinationActualResolves'], 1)
Expand Down

0 comments on commit b575f2f

Please sign in to comment.