Skip to content

Commit

Permalink
[BACKPORT 2.12] [#8892, #11465, #11369] docdb: yb_backup performance …
Browse files Browse the repository at this point in the history
…improvements for large number of tablets

Summary:
This diff adds a variety of speedups, that help in all cases, but especially in cases where there are a large number of tablets.

- Add ssh multiplexing to run_ssh_cmd
  - This allows us to reuse ssh connections, that way we don't incur the ssh startup cost on every command
- Combine chain of ssh commands to single command
  - This reduces the number of requests we're sending, and also allows us to do retries on the entire command chain
- Add parallelism to find_tablet_replicas
  - Previously we made `yb-admin list_tablet_servers` calls sequentially for each tablet, which could take a long time. Changing this use half of the `--parallelism` flag set (with a max of 16 to not overload the master)

This also fixes the issue of not retrying on checksum failures, as we now will retry the entire command chain

Original diff: https://phabricator.dev.yugabyte.com/D15306
Original commit: 29d2c2c

Test Plan:
Tested on a large setup with 10 nodes rf3, 100 tables with 100 tablets each. Previously doing a restore took 11.5 hours, with these improvements, it took under 2 hours.

Also saw similar number at a different scale with a single table with 100 tablets, which went from 11 minutes to 2 minutes.

Generic backup tests:
ybd --cxx-test tools_yb-backup-test_ent
ybd --java-test org.yb.pgsql.TestYbBackup --tp 1
ybd --java-test org.yb.cql.TestYbBackup --tp 1
ybd --java-test org.yb.cql.ParameterizedTestYbBackup --tp 1

Reviewers: oleg

Reviewed By: oleg

Subscribers: oleg, bogdan, jenkins-bot

Differential Revision: https://phabricator.dev.yugabyte.com/D15494
  • Loading branch information
hulien22 committed Feb 22, 2022
1 parent 05c934a commit 59ee68d
Showing 1 changed file with 82 additions and 58 deletions.
140 changes: 82 additions & 58 deletions managed/devops/bin/yb_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
FS_DATA_DIRS_ARG_PREFIX = FS_DATA_DIRS_ARG_NAME + '='
RPC_BIND_ADDRESSES_ARG_NAME = '--rpc_bind_addresses'
RPC_BIND_ADDRESSES_ARG_PREFIX = RPC_BIND_ADDRESSES_ARG_NAME + '='
LIST_TABLET_SERVERS_RE = re.compile('.*list_tablet_servers.*(' + UUID_RE_STR + ').*')

IMPORTED_TABLE_RE = re.compile(r'(?:Colocated t|T)able being imported: ([^\.]*)\.(.*)')
RESTORATION_RE = re.compile('^Restoration id: (' + UUID_RE_STR + r')\b')
Expand Down Expand Up @@ -232,22 +233,10 @@ class SequencedParallelCmd(SingleArgParallelCmd):
-> run in parallel Thread-1: -> fn(a1, a2); fn(b1, b2)
Thread-2: -> fn(c1, c2); fn(d1, d2)
"""
def __init__(self, fn, handle_errors=False):
def __init__(self, fn, preprocess_args_fn=None, handle_errors=False):
self.fn = fn
self.args = []
"""
The index is used to return a function call result as the whole command result.
For example:
SequencedParallelCmd p(fn)
p.start_command()
p.add_args(a1, a2)
p.add_args(b1, b2)
p.use_last_fn_result_as_command_result()
p.add_args(c1, c2)
p.run(pool)
-> run -> fn(a1, a2); result = fn(b1, b2); fn(c1, c2); return result
"""
self.result_fn_call_index = None
self.preprocess_args_fn = preprocess_args_fn
# Whether or not we will throw an error on a cmd failure, or handle it and return a
# tuple: ('failed-cmd', handle).
self.handle_errors = handle_errors
Expand All @@ -259,14 +248,6 @@ def start_command(self, handle):
# Place handle at the front.
self.args.append([handle])

def use_last_fn_result_as_command_result(self):
# Let's remember the last fn call index to return its' result as the command result.
last_fn_call_index = len(self.args[-1]) - 1
# All commands in the set must have the same index of the result function call.
assert (self.result_fn_call_index is None or
self.result_fn_call_index == last_fn_call_index)
self.result_fn_call_index = last_fn_call_index

def add_args(self, *args_tuple):
assert isinstance(args_tuple, tuple)
assert len(self.args) > 0, 'Call start_command() before'
Expand All @@ -277,28 +258,25 @@ def internal_fn(list_of_arg_tuples):
assert isinstance(list_of_arg_tuples, list)
# First entry is the handle.
handle = list_of_arg_tuples[0]
# Add empty string at beginning to keep len(results) = len(list_of_arg_tuples).
results = ['']
# A list of commands: do it one by one.
for args_tuple in list_of_arg_tuples[1:]:
# Pre-process the list of arguments.
processed_arg_tuples = (list_of_arg_tuples[1:] if self.preprocess_args_fn is None
else self.preprocess_args_fn(list_of_arg_tuples[1:], handle))

results = []
for args_tuple in processed_arg_tuples:
assert isinstance(args_tuple, tuple)
try:
results.append(self.fn(*args_tuple))
except Exception as ex:
logging.warning(
"Encountered error for handle '{}' while running "
"command '{}'. Error: {}".
"Encountered error for handle '{}' while running command '{}'. Error: {}".
format(handle, args_tuple, ex))
if (self.handle_errors):
# If we handle errors, then return 'failed-cmd' with the handle.
return ('failed-cmd', handle)
raise ex

if self.result_fn_call_index is None:
return results
else:
assert self.result_fn_call_index < len(results)
return results[self.result_fn_call_index]
return results

fn_args = [str(list_of_arg_tuples) for list_of_arg_tuples in self.args]
return self._run_internal(internal_fn, self.args, fn_args, pool)
Expand Down Expand Up @@ -353,10 +331,15 @@ def key_and_file_filter(checksum_file):
return "\" $( sed 's| .*/| |' {} ) \"".format(pipes.quote(checksum_file))


# error_on_failure: If set to true, then the test command will return an error (errno != 0) if the
# check fails. This is useful if we're taking advantage of larger retry mechanisms (eg retrying an
# entire command chain).
# TODO: get rid of this sed / test program generation in favor of a more maintainable solution.
def compare_checksums_cmd(checksum_file1, checksum_file2):
return "test {} = {} && echo correct || echo invalid".format(
key_and_file_filter(checksum_file1), key_and_file_filter(checksum_file2))
def compare_checksums_cmd(checksum_file1, checksum_file2, error_on_failure=False):
return "test {} = {}{}".format(
key_and_file_filter(checksum_file1),
key_and_file_filter(checksum_file2),
'' if error_on_failure else ' && echo correct || echo invalid')


def get_db_name_cmd(dump_file):
Expand Down Expand Up @@ -1370,6 +1353,8 @@ def run_yb_admin(self, cmd_line_args, run_ip=None):
:return: the standard output of yb-admin
"""

# Convert to list, since some callers like SequencedParallelCmd will send in tuples.
cmd_line_args = list(cmd_line_args)
# Specify cert file in case TLS is enabled.
cert_flag = []
if self.args.certs_dir:
Expand Down Expand Up @@ -1788,6 +1773,10 @@ def run_ssh_cmd(self, cmd, server_ip, upload_cloud_cfg=True, num_ssh_retry=3, en
'ssh',
'-o', 'StrictHostKeyChecking=no',
'-o', 'UserKnownHostsFile=/dev/null',
# Control flags here are for ssh multiplexing (reuse the same ssh connections).
'-o', 'ControlMaster=auto',
'-o', 'ControlPath=~/.ssh/ssh-%r@%h:%p',
'-o', 'ControlPersist=1m',
'-i', self.args.ssh_key_path,
'-p', self.args.ssh_port,
'-q',
Expand All @@ -1797,6 +1786,23 @@ def run_ssh_cmd(self, cmd, server_ip, upload_cloud_cfg=True, num_ssh_retry=3, en
else:
return self.run_program(['bash', '-c', cmd])

def join_ssh_cmds(self, list_of_arg_tuples, handle):
(tablet_id, tserver_ip) = handle
# A list of commands: execute all of the tuples in a single control connection.
joined_cmd = 'set -ex;' # Exit as soon as one command fails.
for args_tuple in list_of_arg_tuples:
assert isinstance(args_tuple, tuple)
for args in args_tuple:
if (isinstance(args, tuple)):
joined_cmd += ' '.join(args)
else:
joined_cmd += ' ' + args
joined_cmd += ';'

# Return a single arg tuple with the entire joined command.
# Convert to string to handle python2 converting to 'unicode' by default.
return [(str(joined_cmd), tserver_ip)]

def find_data_dirs(self, tserver_ip):
"""
Finds the data directories on the given tserver. This queries the /varz endpoint of tserver
Expand Down Expand Up @@ -1974,7 +1980,8 @@ def upload_snapshot_directories(self, tablet_leaders, snapshot_id, snapshot_buck
leader_ip_to_tablet_id_to_snapshot_dirs = self.rearrange_snapshot_dirs(
find_snapshot_dir_results, snapshot_id, tablets_by_leader_ip)

parallel_uploads = SequencedParallelCmd(self.run_ssh_cmd)
parallel_uploads = SequencedParallelCmd(
self.run_ssh_cmd, preprocess_args_fn=self.join_ssh_cmds)
self.prepare_cloud_ssh_cmds(
parallel_uploads, leader_ip_to_tablet_id_to_snapshot_dirs, location_by_tablet,
snapshot_id, tablets_by_leader_ip, upload=True, snapshot_metadata=None)
Expand Down Expand Up @@ -2077,11 +2084,11 @@ def prepare_upload_command(self, parallel_commands, snapshot_filepath, tablet_id
# Commands to be run on TSes over ssh for uploading the tablet backup.
if not self.args.disable_checksums:
# 1. Create check-sum file (via sha256sum tool).
parallel_commands.add_args(create_checksum_cmd, tserver_ip)
parallel_commands.add_args(create_checksum_cmd)
# 2. Upload check-sum file.
parallel_commands.add_args(tuple(upload_checksum_cmd), tserver_ip)
parallel_commands.add_args(tuple(upload_checksum_cmd))
# 3. Upload tablet folder.
parallel_commands.add_args(tuple(upload_tablet_cmd), tserver_ip)
parallel_commands.add_args(tuple(upload_tablet_cmd))

def prepare_download_command(self, parallel_commands, tablet_id,
tserver_ip, snapshot_dir, snapshot_metadata):
Expand Down Expand Up @@ -2113,30 +2120,33 @@ def prepare_download_command(self, parallel_commands, tablet_id,
source_checksum_filepath, snapshot_dir_checksum)

create_checksum_cmd = self.create_checksum_cmd_for_dir(snapshot_dir_tmp)
# Throw an error on failed checksum comparison, this will trigger this entire command
# chain to be retried.
check_checksum_cmd = compare_checksums_cmd(
snapshot_dir_checksum, checksum_path(strip_dir(snapshot_dir_tmp)))
snapshot_dir_checksum,
checksum_path(strip_dir(snapshot_dir_tmp)),
error_on_failure=True)

rmcmd = ['rm', '-rf', snapshot_dir]
mkdircmd = ['mkdir', '-p', snapshot_dir_tmp]
mvcmd = ['mv', snapshot_dir_tmp, snapshot_dir]

# Commands to be run over ssh for downloading the tablet backup.
# 1. Clean-up: delete target tablet folder.
parallel_commands.add_args(tuple(rmcmd), tserver_ip)
parallel_commands.add_args(tuple(rmcmd))
# 2. Create temporary snapshot dir.
parallel_commands.add_args(tuple(mkdircmd), tserver_ip)
parallel_commands.add_args(tuple(mkdircmd))
# 3. Download tablet folder.
parallel_commands.add_args(tuple(cmd), tserver_ip)
parallel_commands.add_args(tuple(cmd))
if not self.args.disable_checksums:
# 4. Download check-sum file.
parallel_commands.add_args(tuple(cmd_checksum), tserver_ip)
parallel_commands.add_args(tuple(cmd_checksum))
# 5. Create new check-sum file.
parallel_commands.add_args(create_checksum_cmd, tserver_ip)
parallel_commands.add_args(create_checksum_cmd)
# 6. Compare check-sum files.
parallel_commands.add_args(check_checksum_cmd, tserver_ip)
parallel_commands.use_last_fn_result_as_command_result()
parallel_commands.add_args(check_checksum_cmd)
# 7. Move the backup in place.
parallel_commands.add_args(tuple(mvcmd), tserver_ip)
parallel_commands.add_args(tuple(mvcmd))

def prepare_cloud_ssh_cmds(
self, parallel_commands, tserver_ip_to_tablet_id_to_snapshot_dirs, location_by_tablet,
Expand Down Expand Up @@ -2604,8 +2614,7 @@ def download_file(self, src_path, target_path):
self.run_program(
self.create_checksum_cmd(target_path, checksum_path(target_path)))
check_checksum_res = self.run_program(
compare_checksums_cmd(checksum_downloaded,
checksum_path(target_path))).strip()
compare_checksums_cmd(checksum_downloaded, checksum_path(target_path))).strip()
else:
server_ip = self.get_main_host_ip()

Expand Down Expand Up @@ -2828,10 +2837,28 @@ def find_tablet_replicas(self, snapshot_metadata):
tservers that need to be processed.
"""

# Parallize this using half of the parallelism setting to not overload master with yb-admin.
parallelism = min(16, (self.args.parallelism + 1) // 2)
pool = ThreadPool(parallelism)
self.pools.append(pool)
tablets_by_tserver_ip = {}
parallel_find_tservers = MultiArgParallelCmd(self.run_yb_admin)

# First construct all the yb-admin commands to send.
for new_id in snapshot_metadata['tablet']:
output = self.run_yb_admin(['list_tablet_servers', new_id])
for line in output.splitlines():
parallel_find_tservers.add_args(('list_tablet_servers', new_id))

# Run all the list_tablet_servers in parallel.
output = parallel_find_tservers.run(pool)

# Process the output.
for cmd in output:
# Pull the new_id value out from the command string.
matches = LIST_TABLET_SERVERS_RE.match(str(cmd))
new_id = matches.group(1)

# For each output line, get the tablet servers ips for this tablet id.
for line in output[cmd].splitlines():
if LEADING_UUID_RE.match(line):
(ts_uuid, ts_ip_port, role) = split_by_tab(line)
(ts_ip, ts_port) = ts_ip_port.split(':')
Expand Down Expand Up @@ -2881,7 +2908,8 @@ def download_snapshot_directories(self, snapshot_meta, tablets_by_tserver_to_dow
tablets_by_tserver_to_download[tserver_ip] -= deleted_tablets

self.timer.log_new_phase("Download data")
parallel_downloads = SequencedParallelCmd(self.run_ssh_cmd, handle_errors=True)
parallel_downloads = SequencedParallelCmd(
self.run_ssh_cmd, preprocess_args_fn=self.join_ssh_cmds, handle_errors=True)
self.prepare_cloud_ssh_cmds(
parallel_downloads, tserver_to_tablet_to_snapshot_dirs,
None, snapshot_id, tablets_by_tserver_to_download,
Expand All @@ -2898,10 +2926,6 @@ def download_snapshot_directories(self, snapshot_meta, tablets_by_tserver_to_dow
# In case we fail a cmd, don't mark this tablet-tserver pair as succeeded, instead
# we will retry in the next round of downloads.
tserver_to_deleted_tablets.setdefault(tserver_ip, set()).add(tablet_id)
elif not self.args.disable_checksums:
v = v.strip()
if v != 'correct':
raise BackupException('Check-sum for "{}" is {}'.format(k, v))

return tserver_to_deleted_tablets

Expand Down

0 comments on commit 59ee68d

Please sign in to comment.