Skip to content
This repository has been archived by the owner on Jun 26, 2019. It is now read-only.

Commit

Permalink
2016-05 code push
Browse files Browse the repository at this point in the history
Add initial MySQl 5.7 support for testing
Use RAM based on availability for for xtrabackup restores
Lots of pep8/80 char fixes
Direct pt-kill errors to the filesystem
MySQL CSV backup changes:
1. Add caching of backup status
2. Improve error messages for certain backup errors
3. Move pathing for backups into a function in environment_specifics
4. Multiprocess the  backup checker for some situations
5. Recheck missing backups as the list API for s3 will not always return all data
  • Loading branch information
Rob Wultsch committed May 9, 2016
1 parent b985f84 commit 7095eac
Show file tree
Hide file tree
Showing 14 changed files with 465 additions and 261 deletions.
6 changes: 3 additions & 3 deletions launch_amazon_mysql_server.py
Expand Up @@ -30,11 +30,11 @@ def main():
choices=environment_specific.SUPPORTED_AZ,
required=True)
parser.add_argument('--mysql_major_version',
choices=launch_replacement_db_host.SUPPORTED_MYSQL_MAJOR_VERSIONS.keys(),
choices=environment_specific.SUPPORTED_MYSQL_MAJOR_VERSIONS,
default=launch_replacement_db_host.DEFAULT_MYSQL_MAJOR_VERSION,
help='Default: {default}'.format(default=launch_replacement_db_host.DEFAULT_MYSQL_MAJOR_VERSION))
parser.add_argument('--mysql_minor_version',
choices=launch_replacement_db_host.SUPPORTED_MYSQL_MINOR_VERSIONS,
choices=environment_specific.SUPPORTED_MYSQL_MINOR_VERSIONS,
default=launch_replacement_db_host.DEFAULT_MYSQL_MINOR_VERSION,
help='Default: {default}'.format(default=launch_replacement_db_host.DEFAULT_MYSQL_MINOR_VERSION))
parser.add_argument('--dry_run',
Expand Down Expand Up @@ -111,7 +111,7 @@ def launch_amazon_mysql_server(hostname, instance_type, vpc_security_group, clas

hiera_config = environment_specific.HIERA_FORMAT.format(
ssh_security=ssh_security,
mysql_major_version=launch_replacement_db_host.SUPPORTED_MYSQL_MAJOR_VERSIONS[mysql_major_version],
mysql_major_version=mysql_major_version.replace('.', ''),
mysql_minor_version=mysql_minor_version)
if hiera_config not in environment_specific.SUPPORTED_HIERA_CONFIGS:
raise Exception('Hiera config {hiera_config} is not supported.'
Expand Down
12 changes: 5 additions & 7 deletions launch_replacement_db_host.py
Expand Up @@ -10,9 +10,7 @@
from lib import host_utils
from lib import mysql_lib

SUPPORTED_MYSQL_MAJOR_VERSIONS = {'5.5': '55', '5.6': '56'}
DEFAULT_MYSQL_MAJOR_VERSION = '5.6'
SUPPORTED_MYSQL_MINOR_VERSIONS = set(('stable', 'staging', 'latest'))
DEFAULT_MYSQL_MINOR_VERSION = 'stable'
# After SERVER_BUILD_TIMEOUT we can assume that the build failed
# and automatically go into --replace_again mode
Expand Down Expand Up @@ -71,12 +69,12 @@ def main():
help=('Do not replace with an instance of the same '
'version as the master db, instead use the '
'supplied version.'),
choices=SUPPORTED_MYSQL_MAJOR_VERSIONS.keys(),
choices=environment_specific.SUPPORTED_MYSQL_MAJOR_VERSIONS,
default=None)
parser.add_argument('--override_mysql_minor_version',
help=('Which "branch" of the MySQL major version'
'to be used. Default is "stable".'),
choices=SUPPORTED_MYSQL_MINOR_VERSIONS,
choices=environment_specific.SUPPORTED_MYSQL_MINOR_VERSIONS,
# default is set in the underlying function
default=None)
parser.add_argument('--override_classic_security',
Expand Down Expand Up @@ -219,11 +217,11 @@ def launch_replacement_db_host(original_server,
''.format(key=key,
old=replacement_config[key],
new=overrides[key]))
replacement_config[key] = overrides[key]
reasons.add('changing {key} from {old} to '
'{old}'.format(key=key,
'{new}'.format(key=key,
old=replacement_config[key],
new=overrides[key]))
replacement_config[key] = overrides[key]
config_overridden = True

if config_overridden:
Expand Down Expand Up @@ -255,7 +253,7 @@ def launch_replacement_db_host(original_server,
# If we get to here and there is no reason, bail out
if not reasons and not replacement_config['dry_run']:
raise Exception(('MySQL appears to be up and no reason for '
'replacement is supplied. You can specify a reason'
'replacement is supplied. You can specify a reason '
'with the --reason argument'))
reason = ', '.join(reasons)
log.info('Reason for launch: {reason}'.format(reason=reason))
Expand Down
65 changes: 36 additions & 29 deletions lib/backup.py
Expand Up @@ -19,9 +19,10 @@
BACKUP_TYPE_LOGICAL = 'sql.gz'
BACKUP_TYPE_CSV = 'csv'
BACKUP_TYPE_XBSTREAM = 'xbstream'
BACKUP_TYPES = set([BACKUP_TYPE_LOGICAL, BACKUP_TYPE_XBSTREAM, BACKUP_TYPE_CSV])
BACKUP_TYPES = set([BACKUP_TYPE_LOGICAL, BACKUP_TYPE_XBSTREAM,
BACKUP_TYPE_CSV])
INNOBACKUPEX = '/usr/bin/innobackupex'
INNOBACKUP_OK = 'innobackupex: completed OK!'
INNOBACKUP_OK = 'completed OK!'
MYSQLDUMP = '/usr/bin/mysqldump'
MYSQLDUMP_CMD = ' '.join((MYSQLDUMP,
'--master-data',
Expand All @@ -41,21 +42,26 @@
XB_RESTORE_STATUS = ("CREATE TABLE IF NOT EXISTS test.xb_restore_status ("
"id INT UNSIGNED NOT NULL AUTO_INCREMENT, "
"restore_source VARCHAR(64), "
"restore_type ENUM('s3', 'remote_server', 'local_file') NOT NULL, "
"restore_type ENUM('s3', 'remote_server', "
" 'local_file') NOT NULL, "
"test_restore ENUM('normal', 'test') NOT NULL, "
"restore_destination VARCHAR(64), "
"restore_date DATE, "
"restore_port SMALLINT UNSIGNED NOT NULL DEFAULT 3306, "
"restore_port SMALLINT UNSIGNED NOT NULL "
" DEFAULT 3306, "
"restore_file VARCHAR(255), "
"replication ENUM('SKIP', 'REQ', 'OK', 'FAIL'), "
"zookeeper ENUM('SKIP', 'REQ', 'OK', 'FAIL'), "
"started_at DATETIME NOT NULL, "
"finished_at DATETIME, "
"restore_status ENUM('OK', 'IPR', 'BAD') DEFAULT 'IPR', "
"restore_status ENUM('OK', 'IPR', 'BAD') "
" DEFAULT 'IPR', "
"status_message TEXT, "
"PRIMARY KEY(id), "
"INDEX (restore_type, started_at), "
"INDEX (restore_type, restore_status, started_at) )")
"INDEX (restore_type, restore_status, "
" started_at) )")

XTRABACKUP_CMD = ' '.join((INNOBACKUPEX,
'{datadir}',
'--slave-info',
Expand Down Expand Up @@ -153,7 +159,8 @@ def get_metadata_from_backup_file(full_path):
pattern = 'mysql-([a-z0-9-]+)-(330[0-9])-(\d{4})-(\d{2})-(\d{2}).*\.(.+)'
res = re.match(pattern, filename)
host = host_utils.HostAddr(':'.join((res.group(1), res.group(2))))
creation = datetime.date(int(res.group(3)), int(res.group(4)), int(res.group(5)))
creation = datetime.date(int(res.group(3)), int(res.group(4)),
int(res.group(5)))
extension = res.group(6)
return host, creation, extension

Expand All @@ -173,7 +180,8 @@ def logical_backup_instance(instance, timestamp):
timestamp=time.strftime('%Y-%m-%d-%H:%M:%S',
timestamp),
backup_type=BACKUP_TYPE_LOGICAL)
dump_user, dump_pass = mysql_lib.get_mysql_user_for_role(USER_ROLE_MYSQLDUMP)
(dump_user,
dump_pass) = mysql_lib.get_mysql_user_for_role(USER_ROLE_MYSQLDUMP)
dump_cmd = MYSQLDUMP_CMD.format(dump_user=dump_user,
dump_pass=dump_pass,
host=instance.hostname,
Expand Down Expand Up @@ -210,22 +218,21 @@ def xtrabackup_instance(instance, timestamp):
"""
# Prevent issues with too many open files
resource.setrlimit(resource.RLIMIT_NOFILE, (131072, 131072))
backup_file = BACKUP_FILE.format(hostname=instance.hostname,
port=instance.port,
timestamp=time.strftime('%Y-%m-%d-%H:%M:%S', timestamp),
backup_type=BACKUP_TYPE_XBSTREAM)
backup_file = BACKUP_FILE.format(
hostname=instance.hostname,
port=instance.port,
timestamp=time.strftime('%Y-%m-%d-%H:%M:%S', timestamp),
backup_type=BACKUP_TYPE_XBSTREAM)

tmp_log = os.path.join(environment_specific.RAID_MOUNT,
'log',
''.join(['xtrabackup_',
time.strftime('%Y-%m-%d-%H:%M:%S', timestamp),
'.log']))
'log', 'xtrabackup_{ts}.log'.format(
ts=time.strftime('%Y-%m-%d-%H:%M:%S', timestamp)))
tmp_log_handle = open(tmp_log, "w")
procs = dict()
try:
procs['xtrabackup'] = subprocess.Popen(create_xtrabackup_command(instance, timestamp, tmp_log),
stdout=subprocess.PIPE,
stderr=tmp_log_handle)
procs['xtrabackup'] = subprocess.Popen(
create_xtrabackup_command(instance, timestamp, tmp_log),
stdout=subprocess.PIPE, stderr=tmp_log_handle)
log.info('Uploading backup to {buk}/{loc}'
''.format(buk=environment_specific.S3_BUCKET,
loc=backup_file))
Expand Down Expand Up @@ -272,7 +279,8 @@ def create_xtrabackup_command(instance, timestamp, tmp_log):
cnf = host_utils.MYSQL_CNF_FILE
cnf_group = 'mysqld{port}'.format(port=instance.port)
datadir = host_utils.get_cnf_setting('datadir', instance.port)
xtra_user, xtra_pass = mysql_lib.get_mysql_user_for_role(USER_ROLE_XTRABACKUP)
(xtra_user,
xtra_pass) = mysql_lib.get_mysql_user_for_role(USER_ROLE_XTRABACKUP)
return XTRABACKUP_CMD.format(datadir=datadir,
xtra_user=xtra_user,
xtra_pass=xtra_pass,
Expand Down Expand Up @@ -301,7 +309,7 @@ def xbstream_unpack(xbstream, port, restore_source, size=None):
cmd = ' | '.join((cmd, '{pv} -s {size}'.format(pv=PV,
size=str(size))))
# And finally pipe everything into xbstream to unpack it
cmd = ' | '.join((cmd, '/usr/bin/xbstream -x -C {datadir}'.format(datadir=datadir)))
cmd = ' | '.join((cmd, '/usr/bin/xbstream -x -C {}'.format(datadir)))
log.info(cmd)

extract = subprocess.Popen(cmd, shell=True)
Expand Down Expand Up @@ -341,9 +349,9 @@ def innobackup_decompress(port, threads=8):

err_handle.seek(0)
log_data = err_handle.readlines()
if 'innobackupex: completed OK!' not in log_data[-1]:
msg = ('Fatal error: innobackupex decompress did not end with ',
'"innobackupex: completed OK"')
if INNOBACKUP_OK not in log_data[-1]:
msg = ('Fatal error: innobackupex decompress did not end with '
'"{}"'.format(INNOBACKUP_OK))
raise Exception(msg)


Expand Down Expand Up @@ -374,9 +382,9 @@ def apply_log(port, memory='10G'):

log_handle.seek(0)
log_data = log_handle.readlines()
if 'innobackupex: completed OK!' not in log_data[-1]:
msg = ('Fatal error: innobackupex apply-log did not end with ',
'"innobackupex: completed OK"')
if INNOBACKUP_OK not in log_data[-1]:
msg = ('Fatal error: innobackupex apply-log did not end with '
'"{}"'.format(INNOBACKUP_OK))
raise Exception(msg)


Expand Down Expand Up @@ -503,8 +511,7 @@ def update_restore_log(instance, row_id, params):
updates_fields.append('finished_at=NOW()')

sql = ("UPDATE test.xb_restore_status SET "
+ ', '.join(updates_fields) +
" WHERE id = %(row_id)s")
"{} WHERE id=%(row_id)s".format(', '.join(updates_fields)))
params['row_id'] = row_id
cursor = conn.cursor()
cursor.execute(sql, params)
Expand Down
19 changes: 11 additions & 8 deletions lib/host_utils.py
Expand Up @@ -502,18 +502,22 @@ def get_host_shard_map(self, repl_type=REPLICA_ROLE_MASTER):
A dict with a key of the MySQL master instance and the value a set
of shards
"""
shard_map = dict()
global_shard_map = dict()
for sharded_db in environment_specific.SHARDED_DBS_PREFIX_MAP.values():
sharddb_map = self.compute_shard_map(sharded_db['mappings'],
sharded_db['prefix'],
sharded_db['zpad'])
shard_map.update(sharddb_map)
shard_map = self.compute_shard_map(sharded_db['mappings'],
sharded_db['prefix'],
sharded_db['zpad'])
for entry in shard_map:
if entry in global_shard_map:
global_shard_map[entry].update(shard_map[entry])
else:
global_shard_map[entry] = shard_map[entry]

host_shard_map = dict()
for replica_set in shard_map:
for replica_set in global_shard_map:
instance = self.get_mysql_instance_from_replica_set(replica_set,
repl_type)
host_shard_map[instance.__str__()] = shard_map[replica_set]
host_shard_map[instance.__str__()] = global_shard_map[replica_set]

return host_shard_map

Expand Down Expand Up @@ -556,7 +560,6 @@ def shard_to_instance(self, shard, repl_type=REPLICA_ROLE_MASTER):
A hostaddr object for an instance of the replica set
"""
shard_map = self.get_host_shard_map(repl_type)

for instance in shard_map:
if shard in shard_map[instance]:
return HostAddr(instance)
Expand Down
3 changes: 1 addition & 2 deletions lib/mysql_lib.py
Expand Up @@ -55,7 +55,6 @@
REPLICATION_TOLERANCE_NORMAL = 'Normal'
REPLICATION_TOLERANCE_LOOSE = 'Loose'


class ReplicationError(Exception):
pass

Expand Down Expand Up @@ -563,7 +562,7 @@ def setup_response_time_metrics(instance):
cursor = conn.cursor()

version = get_global_variables(instance)['version']
if version[0:3] != '5.6':
if version[0:3] < '5.6':
return

try:
Expand Down

0 comments on commit 7095eac

Please sign in to comment.