diff --git a/opensvc/drivers/resource/sync/btrfs/__init__.py b/opensvc/drivers/resource/sync/btrfs/__init__.py index ec201fbae..ee6aa587b 100644 --- a/opensvc/drivers/resource/sync/btrfs/__init__.py +++ b/opensvc/drivers/resource/sync/btrfs/__init__.py @@ -1,14 +1,18 @@ import os +import time +import json +import subprocess import datetime -from subprocess import * - import core.status import utilities.subsystems.btrfs import core.exceptions as ex from .. import Sync, notify from env import Env +from utilities.chunker import chunker from utilities.converters import print_duration +from utilities.string import bdecode +from utilities.files import makedirs, protected_dir from core.objects.svcdict import KEYS DRIVER_GROUP = "sync" @@ -95,6 +99,9 @@ def __init__(self, self.dst_btrfs = {} self.src_btrfs = None + def on_add(self): + self.statefile = os.path.join(self.var_d, "btrfs_state") + def __str__(self): return "%s target=%s src=%s" % ( super(SyncBtrfs, self).__str__(),\ @@ -107,7 +114,7 @@ def sort_rset(self, rset): def _info(self): data = [ ["src", self.src], - ["target", " ".join(self.target) if self.target else ""], + ["target", subprocess.list2cmdline(self.target) if self.target else ""], ] data += self.stats_keys() return data @@ -133,7 +140,7 @@ def pre_action(self, action): if len(resources) == 0: return - if not action.startswith('sync'): + if not action.startswith("sync"): return self.pre_sync_check_svc_not_up() @@ -141,7 +148,7 @@ def pre_action(self, action): self.init_src_btrfs() for i, r in enumerate(resources): - if 'delay_snap' in r.tags: + if "delay_snap" in r.tags: continue r.get_targets(action) tgts = r.targets.copy() @@ -149,26 +156,84 @@ def pre_action(self, action): continue r.get_src_info() + r.remove_src_snap_next() + + for i, r in enumerate(resources): + tosends = [] - if not r.src_btrfs.has_subvol(r.src_snap_tosend): - r.create_snap(r.src, r.src_snap_tosend) - def create_snap(self, snap_orig, snap): + for subvol in r.subvols(): + src = r.src_btrfs.rootdir + "/" + subvol["path"] + dst = r.src_snap_next(subvol) + tosends.append((src, dst)) + + r.recreate_snaps(tosends) + + def all_subvols(self): + """ + sort by path so the subvols are sorted by path depth + """ + if not self.src: + return [] + if not self.recursive: + sub = self.src_btrfs.get_subvol(self.src) + if not sub: + return [] + return [sub] + subvols = self.src_btrfs.get_subvols().values() + subvols = sorted(subvols, key=lambda x: x["path"]) + return subvols + + def subvols(self): + """ + sort by path so the subvols are sorted by path depth + """ + if not self.src: + return [] + if not self.recursive: + sub = self.src_btrfs.get_subvol(self.src) + if not sub: + return [] + return [sub] + subvols = [] + for subvol in self.src_btrfs.get_subvols().values(): + if subvol["path"].endswith("@tosend"): + continue + if subvol["path"].endswith("@sent"): + continue + if subvol["path"] == self.src_subvol: + subvols.append(subvol) + elif subvol["path"].startswith(self.src_subvol + "/"): + subvols.append(subvol) + subvols = sorted(subvols, key=lambda x: x["path"]) + return subvols + + def remote_subvols(self, node): + if not self.dst: + return [] + if not self.recursive: + sub = self.dst_btrfs[node].get_subvol(self.dst) + if not sub: + return [] + return [sub] + subvols = self.dst_btrfs[node].get_subvols().values() + subvols = sorted(subvols, key=lambda x: x["path"]) + return subvols + + def recreate_snaps(self, snaps): + self.make_src_workdirs() self.init_src_btrfs() + self.src_btrfs.subvol_delete([snap[1] for snap in snaps if os.path.exists(snap[1])]) try: - self.src_btrfs.snapshot(snap_orig, snap, readonly=True, recursive=self.recursive) + self.src_btrfs.snapshots(snaps, readonly=True) except utilities.subsystems.btrfs.ExistError: - self.log.error('%s should not exist'%snap) + self.log.error("%s should not exist"%snaps) raise ex.Error except utilities.subsystems.btrfs.ExecError: raise ex.Error def get_src_info(self): self.init_src_btrfs() - subvol = self.src_subvol.replace('/','_') - base = self.src_btrfs.snapdir + '/' + subvol - self.src_snap_sent = base + '@sent' - self.src_snap_tosend = base + '@tosend' self.src = os.path.join(self.src_btrfs.rootdir, self.src_subvol) def get_dst_info(self, node): @@ -177,211 +242,262 @@ def get_dst_info(self, node): self.dst_btrfs[node] = utilities.subsystems.btrfs.Btrfs(label=self.dst_label, resource=self, node=node) except utilities.subsystems.btrfs.ExecError as e: raise ex.Error(str(e)) - #self.dst_btrfs[node].setup_snap() - subvol = self.src_subvol.replace('/','_') - base = self.dst_btrfs[node].snapdir + '/' + subvol - self.dst_snap_sent = base + '@sent' - self.dst_snap_tosend = base + '@tosend' self.dst = os.path.join(self.dst_btrfs[node].rootdir, self.dst_subvol) + def src_temp_dir(self): + return os.path.join(self.src_btrfs.rootdir, ".osync", self.svc.fullname, self.rid, "temp") + + def dst_temp_dir(self, node): + return os.path.join(self.dst_btrfs[node].rootdir, ".osync", self.svc.fullname, self.rid, "temp") + + def src_next_dir(self): + return os.path.join(self.src_btrfs.rootdir, ".osync", self.svc.fullname, self.rid, "next") + + def dst_next_dir(self, node): + return os.path.join(self.dst_btrfs[node].rootdir, ".osync", self.svc.fullname, self.rid, "next") + + def src_last_dir(self): + return os.path.join(self.src_btrfs.rootdir, ".osync", self.svc.fullname, self.rid, "last") + + def dst_last_dir(self, node): + return os.path.join(self.dst_btrfs[node].rootdir, ".osync", self.svc.fullname, self.rid, "last") + + def rel_snap_last(self, subvol): + p = subvol["path"].replace("/","_") + return os.path.join(".osync", self.svc.fullname, self.rid, "last", p) + + def rel_snap_next(self, subvol): + p = subvol["path"].replace("/","_") + return os.path.join(".osync", self.svc.fullname, self.rid, "next", p) + + def rel_tmp(self, subvol): + p = self.dst_subvol + subvol["path"][len(self.src_subvol):] + return os.path.join(".osync", self.svc.fullname, self.rid, "temp", p) + + def dst_tmp(self, subvol, node): + p = self.rel_tmp(subvol) + return os.path.join(self.dst_btrfs[node].rootdir, p) + + def src_snap_last(self, subvol): + p = self.rel_snap_last(subvol) + return os.path.join(self.src_btrfs.rootdir, p) + + def src_snap_next(self, subvol): + p = self.rel_snap_next(subvol) + return os.path.join(self.src_btrfs.rootdir, p) + + def dst_snap_next(self, subvol, node): + p = self.rel_snap_next(subvol) + return os.path.join(self.dst_btrfs[node].rootdir, p) + + def dst_snap_last(self, subvol, node): + p = self.rel_snap_last(subvol) + return os.path.join(self.dst_btrfs[node].rootdir, p) + def get_peersenders(self): self.peersenders = set() - if 'nodes' == self.sender: + if "nodes" == self.sender: self.peersenders |= self.svc.nodes self.peersenders -= set([Env.nodename]) def get_targets(self, action=None): self.targets = set() - if 'nodes' in self.target and action in (None, 'sync_nodes'): + if "nodes" in self.target and action in (None, "sync_nodes", "sync_full", "sync_all"): self.targets |= self.svc.nodes - if 'drpnodes' in self.target and action in (None, 'sync_drp'): + if "drpnodes" in self.target and action in (None, "sync_drp", "sync_full", "sync_all"): self.targets |= self.svc.drpnodes self.targets -= set([Env.nodename]) def sync_nodes(self): - self._sync_update('sync_nodes') + self._sync_update("sync_nodes") def sync_drp(self): - self._sync_update('sync_drp') + self._sync_update("sync_drp") def sanity_checks(self): self.pre_sync_check_svc_not_up() self.pre_sync_check_flex_primary() - def sync_full(self): - self.init_src_btrfs() - try: - self.sanity_checks() - except ex.Error: + def btrfs_send(self, subvols, node, incremental=True): + if len(subvols) == 0: return - self.get_src_info() - if not self.src_btrfs.has_subvol(self.src_snap_tosend): - self.create_snap(self.src, self.src_snap_tosend) - self.get_targets() - for n in self.targets: - self.get_dst_info(n) - self.btrfs_send_initial(n) - self.rotate_snaps(n) - self.install_snaps(n) - self.rotate_snaps() - self.write_statefile() - for n in self.targets: - self.push_statefile(n) - def btrfs_send_incremental(self, node): - if self.recursive: - send_cmd = ['btrfs', 'send', '-R', - '-c', self.src_snap_sent, - '-p', self.src_snap_sent, - self.src_snap_tosend] - else: - send_cmd = ['btrfs', 'send', - '-c', self.src_snap_sent, - '-p', self.src_snap_sent, - self.src_snap_tosend] + send_cmd = ["btrfs", "send"] + for subvol in subvols: + if incremental: + send_cmd += ["-c", self.src_snap_last(subvol)] + send_cmd += [self.src_snap_next(subvol)] - receive_cmd = ['btrfs', 'receive', self.dst_btrfs[node].snapdir] if node is not None: - receive_cmd = Env.rsh.strip(' -n').split() + [node] + receive_cmd - - self.log.info(' '.join(send_cmd + ["|"] + receive_cmd)) - p1 = Popen(send_cmd, stdout=PIPE) - pi = Popen(["dd", "bs=4096"], stdin=p1.stdout, stdout=PIPE, stderr=PIPE) - p2 = Popen(receive_cmd, stdin=pi.stdout, stdout=PIPE) - buff = p2.communicate() - if p2.returncode == 0: - stats_buff = pi.communicate()[1] - stats = self.parse_dd(stats_buff) - self.update_stats(stats, target=node) - else: - if buff[1] is not None and len(buff[1]) > 0: - self.log.error(buff[1]) - self.log.error("sync update failed") - raise ex.Error - if buff[0] is not None and len(buff[0]) > 0: - self.log.info(buff[0]) - - def btrfs_send_initial(self, node=None): - if self.recursive: - send_cmd = ['btrfs', 'send', '-R', self.src_snap_tosend] + receive_cmd = Env.rsh.strip(" -n").split() + [node] + receive_cmd += self.make_dst_workdirs(node) + [";"] + ["btrfs", "receive", self.dst_next_dir(node)] else: - send_cmd = ['btrfs', 'send', self.src_snap_tosend] + receive_cmd = ["btrfs", "receive", self.dst_next_dir(node)] - receive_cmd = ['btrfs', 'receive', self.dst_btrfs[node].snapdir] - if node is not None: - receive_cmd = Env.rsh.strip(' -n').split() + [node] + receive_cmd - - self.log.info(' '.join(send_cmd + ["|"] + receive_cmd)) - p1 = Popen(send_cmd, stdout=PIPE) - pi = Popen(["dd", "bs=4096"], stdin=p1.stdout, stdout=PIPE, stderr=PIPE) - p2 = Popen(receive_cmd, stdin=pi.stdout, stdout=PIPE) + self.log.info(subprocess.list2cmdline(send_cmd) + " | " + subprocess.list2cmdline(receive_cmd)) + p1 = subprocess.Popen(send_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + pi = subprocess.Popen(["dd", "bs=4096"], stdin=p1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p2 = subprocess.Popen(receive_cmd, stdin=pi.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE) buff = p2.communicate() + send_buff = p1.stderr.read() + if send_buff is not None and len(send_buff) > 0: + for line in bdecode(send_buff).split("\n"): + if line: + self.log.info("| " + line) if p2.returncode == 0: stats_buff = pi.communicate()[1] stats = self.parse_dd(stats_buff) self.update_stats(stats, target=node) + if buff[1] is not None and len(buff[1]) > 0: + for line in bdecode(buff[1]).split("\n"): + if line: + self.log.info("| " + line) else: if buff[1] is not None and len(buff[1]) > 0: - self.log.error(buff[1]) - self.log.error("full sync failed") + for line in bdecode(buff[1]).split("\n"): + if line: + self.log.error("| " + line) + self.log.error("sync failed") raise ex.Error if buff[0] is not None and len(buff[0]) > 0: - self.log.info(buff[0]) + for line in bdecode(buff[0]).split("\n"): + if line: + self.log.info("| " + line) - def remove_snap_tosend(self, node=None): + def cleanup_remote(self, node, remote_subvols): self.init_src_btrfs() - if node is not None: - o = self.dst_btrfs[node] - subvol = self.dst_snap_tosend - else: - o = self.src_btrfs - subvol = self.src_snap_tosend + o = self.get_btrfs(node) + l = [] + candidates = [] + for subvol in self.subvols(): + candidates.append(self.rel_snap_next(subvol)) + candidates.append(self.rel_tmp(subvol)) - if not o.has_subvol(subvol): - return + for subvol in remote_subvols: + if subvol["path"] in candidates: + l.append(subvol["path"]) + + l = [o.rootdir+"/"+p for p in l] try: - o.subvol_delete(subvol, recursive=self.recursive) + o.subvol_delete(l) except utilities.subsystems.btrfs.ExecError: raise ex.Error() - def remove_snap(self, node=None): - self.init_src_btrfs() - if node is not None: - o = self.dst_btrfs[node] - subvol = self.dst_snap_sent - else: - o = self.src_btrfs - subvol = self.src_snap_sent - - if not o.has_subvol(subvol): - return + def remove_dst_snap_temp(self, node): + return [subprocess.list2cmdline(["rm", "-rf", self.dst_temp_dir(node)])] + + def remove_dst_snap_next(self, node): + o = self.get_btrfs(node) + p = self.dst_next_dir(node) + subvols = o.get_subvols_in_path(p) + cmd = o.subvol_delete_cmd(subvols) or [] + if not cmd: + return [] + return [subprocess.list2cmdline(cmd)] + + def remove_dst_snap_last(self, node): + o = self.get_btrfs(node) + p = self.dst_last_dir(node) + subvols = o.get_subvols_in_path(p) + cmd = o.subvol_delete_cmd(subvols) or [] + if not cmd: + return [] + return [subprocess.list2cmdline(cmd)] + + def remove_src_snap_next(self): + o = self.get_btrfs() + p = self.src_next_dir() + subvols = o.get_subvols_in_path(p) + cmds = [] + for bunch in chunker(subvols, 20): + cmd = o.subvol_delete_cmd(bunch) or [] + if not cmd: + continue + cmds = [subprocess.list2cmdline(cmd)] + self.do_cmds(cmds) + + def remove_src_snap_last(self): + o = self.get_btrfs() + p = self.src_last_dir() + subvols = o.get_subvols_in_path(p) + cmd = o.subvol_delete_cmd(subvols) or [] + if not cmd: + return [] + return [subprocess.list2cmdline(cmd)] + + def remove_dst(self, node): + o = self.get_btrfs(node) + p = self.dst + subvols = o.get_subvols_in_path(p) + cmd = o.subvol_delete_cmd(subvols) or [] + if not cmd: + return [] + return [subprocess.list2cmdline(cmd)] + + def rename_src_snap_next(self): + src = self.src_next_dir() + dst = self.src_last_dir() + return [ + subprocess.list2cmdline(["rm", "-rf", dst]), + subprocess.list2cmdline(["mv", "-v", src, dst]), + ] - try: - o.subvol_delete(subvol, recursive=self.recursive) - except utilities.subsystems.btrfs.ExecError: - raise ex.Error() + def rename_dst_snap_next(self, node): + src = self.dst_next_dir(node) + dst = self.dst_last_dir(node) + cmds = [ + subprocess.list2cmdline(["rm", "-rf", dst]), + subprocess.list2cmdline(["mv", "-v", src, dst]), + ] + return cmds + + def install_final(self, node): + head_subvol = self.subvols()[0] + src = os.path.join(self.dst_temp_dir(node), head_subvol["path"]) + if protected_dir(self.dst): + raise ex.Error("%s is a protected dir. refuse to remove" % self.dst) + cmds = self.remove_dst(node) + cmds += [ + subprocess.list2cmdline(["rm", "-rf", self.dst]), + subprocess.list2cmdline(["mv", "-v", src, self.dst]), + ] + return cmds + + def install_dst(self, subvols, node): + cmds = [] + for subvol in subvols: + src = self.dst_snap_last(subvol, node) + dst = self.dst_tmp(subvol, node) + cmd = self.dst_btrfs[node].snapshot_cmd(src, dst, readonly=False) + if not cmd: + continue + cmds.append(subprocess.list2cmdline(["mkdir", "-p", os.path.dirname(dst)])) + cmds.append(subprocess.list2cmdline(cmd)) + return cmds - def rename_snap(self, node=None): - self.init_src_btrfs() - if node is None: - o = self.src_btrfs - src = self.src_snap_tosend - dst = self.src_snap_sent - else: - o = self.dst_btrfs[node] - src = self.dst_snap_tosend - dst = self.dst_snap_sent + def make_src_workdirs(self): + makedirs(self.src_last_dir()) + makedirs(self.src_next_dir()) + makedirs(self.src_temp_dir()) - if o.has_subvol(dst): - self.log.error("%s should not exist"%self.dst_snap_sent) - raise ex.Error + def make_dst_workdirs(self, node): + return [ "mkdir -p %s %s %s" % (self.dst_last_dir(node), self.dst_next_dir(node), self.dst_temp_dir(node)) ] - if self.recursive : - # ?? - cmd = ['mv', src, dst] + def get_btrfs(self, node=None): + if node: + o = self.dst_btrfs[node] else: - cmd = ['mv', src, dst] - - if node is not None: - cmd = Env.rsh.split() + [node] + cmd - ret, out, err = self.vcall(cmd) + o = self.src_btrfs + return o + def do_cmds(self, cmds, node=None): + o = self.get_btrfs(node) + ret, out, err = o.vcall(" && ".join(cmds), shell=True) if ret != 0: raise ex.Error - def remove_dst(self, node=None): - if node is None: - return - - subvols = self.dst_btrfs[node].get_subvols_in_path(self.dst) - - try: - self.dst_btrfs[node].subvol_delete(subvols) - except utilities.subsystems.btrfs.ExecError: - raise ex.Error() - - def install_dst(self, node=None): - if node is None: - return - try: - self.dst_btrfs[node].snapshot(self.dst_snap_sent, self.dst, readonly=False) - except utilities.subsystems.btrfs.ExistError: - self.log.error("%s should not exist on node %s", self.dst_snap_sent, node) - raise ex.Error() - except utilities.subsystems.btrfs.ExecError: - self.log.error("failed to install snapshot %s on node %s"%(self.dst, node)) - raise ex.Error() - - def install_snaps(self, node=None): - self.remove_dst(node) - self.install_dst(node) - - def rotate_snaps(self, node=None): - self.remove_snap(node) - self.rename_snap(node) - - def _sync_update(self, action): + def _sync_update(self, action, full=False): self.init_src_btrfs() try: self.sanity_checks() @@ -391,36 +507,69 @@ def _sync_update(self, action): if len(self.targets) == 0: return self.get_src_info() - - if not self.src_btrfs.has_subvol(self.src_snap_tosend): - self.create_snap(self.src, self.src_snap_tosend) + self.make_src_workdirs() + subvols = self.subvols() + src_cmds = [] for n in self.targets: self.get_dst_info(n) - self.remove_snap_tosend(n) - if self.src_btrfs.has_subvol(self.src_snap_sent) and self.dst_btrfs[n].has_subvol(self.dst_snap_sent): - self.btrfs_send_incremental(n) - else: - self.btrfs_send_initial(n) - self.rotate_snaps(n) - self.install_snaps(n) - - self.rotate_snaps() + remote_btrfs = self.get_btrfs(n) + remote_subvols = self.remote_subvols(n) + self.cleanup_remote(n, remote_subvols) + dst_cmds = [] + incrs = [] + fulls = [] + for subvol in subvols: + src_snap_path = self.src_snap_last(subvol) + src_snap = self.src_btrfs.get_subvol(src_snap_path) + dst_snap_path = self.dst_snap_last(subvol, n) + dst_snap = remote_btrfs.get_subvol(dst_snap_path) + if not src_snap: + self.log.info("upgrade %s to full copy because %s was not found", subvol["path"], src_snap_path) + fulls.append(subvol) + elif not dst_snap: + self.log.info("upgrade %s to full copy because %s on %s was not found", subvol["path"], dst_snap_path, n) + fulls.append(subvol) + elif dst_snap["received_uuid"] == "-": + self.log.info("upgrade %s to full copy because %s on %s has been turned rw", subvol["path"], dst_snap_path, n) + fulls.append(subvol) + elif src_snap["uuid"] != dst_snap["received_uuid"]: + self.log.info("upgrade %s to full copy because %s on %s has been received from a different subset %s", subvol["path"], dst_snap_path, n, dst_snap["received_uuid"]) + fulls.append(subvol) + else: + incrs.append(subvol) + dst_cmds += self.remove_dst_snap_last(n) + dst_cmds += self.remove_dst_snap_temp(n) + dst_cmds += self.rename_dst_snap_next(n) + dst_cmds += self.install_dst(subvols, n) + dst_cmds += self.install_final(n) + self.btrfs_send(incrs, n, incremental=True) + self.btrfs_send(fulls, n, incremental=False) + self.do_cmds(dst_cmds, n) + + src_cmds += self.remove_src_snap_last() + src_cmds += self.rename_src_snap_next() + self.do_cmds(src_cmds) self.write_statefile() for n in self.targets: self.push_statefile(n) self.write_stats() + def sync_full(self): + self._sync_update(None, full=True) + def can_sync(self, target=None): return True def sync_status(self, verbose=False): self.init_src_btrfs() try: - ls = self.get_local_state() - now = datetime.datetime.now() - last = datetime.datetime.strptime(ls['date'], "%Y-%m-%d %H:%M:%S.%f") - delay = datetime.timedelta(seconds=self.sync_max_delay) + last = os.path.getmtime(self.statefile) + now = time.time() + delay = self.sync_max_delay + except (KeyError, TypeError): + self.status_log("btrfs state file is corrupt") + return core.status.WARN except IOError: self.status_log("btrfs state file not found") return core.status.WARN @@ -431,73 +580,41 @@ def sync_status(self, verbose=False): print(e[0], e[1], traceback.print_tb(e[2])) return core.status.WARN if last < now - delay: - self.status_log("Last sync on %s older than %s"%(last, print_duration(self.sync_max_delay))) + self.status_log("last sync on %s older than %s"%(datetime.datetime.fromtimestamp(last), print_duration(self.sync_max_delay))) return core.status.WARN return core.status.UP - def check_remote(self, node): - rs = self.get_remote_state(node) - if self.snap_uuid != rs['uuid']: - self.log.error("%s last update uuid doesn't match snap uuid"%(node)) - raise ex.Error - def get_remote_state(self, node): - self.set_statefile() - cmd1 = ['cat', self.statefile] + cmd1 = ["cat", self.statefile] cmd = Env.rsh.split() + [node] + cmd1 (ret, out, err) = self.call(cmd) if ret != 0: self.log.error("could not fetch %s last update uuid"%node) raise ex.Error - return self.parse_statefile(out, node=node) + return json.loads(out) def get_local_state(self): - self.set_statefile() - with open(self.statefile, 'r') as f: + with open(self.statefile, "r") as f: out = f.read() - return self.parse_statefile(out) - - def get_snap_uuid(self, snap): - self.init_src_btrfs() - self.snap_uuid = self.src_btrfs.get_transid(snap) - - def set_statefile(self): - self.statefile = os.path.join(self.var_d, 'btrfs_state') + return json.loads(out) def write_statefile(self): - self.set_statefile() - self.get_snap_uuid(self.src_snap_sent) - self.log.info("update state file with snap uuid %s"%self.snap_uuid) - with open(self.statefile, 'w') as f: - f.write(str(datetime.datetime.now())+';'+self.snap_uuid+'\n') + data = self.all_subvols(), + with open(self.statefile, "w") as f: + json.dump(data, f) def _push_statefile(self, node): - cmd = Env.rcp.split() + [self.statefile, node+':'+self.statefile.replace('#', r'\#')] + cmd = Env.rcp.split() + [self.statefile, node+":"+self.statefile.replace("#", r"\#")] ret, out, err = self.vcall(cmd) if ret != 0: raise ex.Error def push_statefile(self, node): - self.set_statefile() self._push_statefile(node) self.get_peersenders() for s in self.peersenders: self._push_statefile(s) - def parse_statefile(self, out, node=None): - self.set_statefile() - if node is None: - node = Env.nodename - lines = out.strip().split('\n') - if len(lines) != 1: - self.log.error("%s:%s is corrupted"%(node, self.statefile)) - raise ex.Error - fields = lines[0].split(';') - if len(fields) != 2: - self.log.error("%s:%s is corrupted"%(node, self.statefile)) - raise ex.Error - return dict(date=fields[0], uuid=fields[1]) - @notify def sync_all(self): self.sync_nodes() diff --git a/opensvc/drivers/resource/sync/btrfssnap/__init__.py b/opensvc/drivers/resource/sync/btrfssnap/__init__.py index 62af7262f..45dd0727c 100644 --- a/opensvc/drivers/resource/sync/btrfssnap/__init__.py +++ b/opensvc/drivers/resource/sync/btrfssnap/__init__.py @@ -1,5 +1,6 @@ import datetime import os +import subprocess import core.status import utilities.subsystems.btrfs @@ -8,6 +9,7 @@ from env import Env from core.objects.svcdict import KEYS from utilities.proc import justcall +from utilities.files import makedirs DRIVER_GROUP = "sync" DRIVER_BASENAME = "btrfssnap" @@ -34,6 +36,14 @@ "example": "3", "text": "The maximum number of snapshots to retain." }, + { + "keyword": "recursive", + "at": True, + "default": False, + "convert": "boolean", + "candidates": [True, False], + "text": "Also replicate subvolumes in the src tree." + }, ] KEYS.register_driver( @@ -43,6 +53,8 @@ keywords=KEYWORDS, ) +TIMEFMT = "%Y-%m-%dT%H:%M:%S.%fZ" + def driver_capabilities(node=None): from utilities.proc import which if which("btrfs"): @@ -55,6 +67,7 @@ def __init__(self, name=None, subvol=None, keep=1, + recursive=False, **kwargs): super(SyncBtrfssnap, self).__init__(type="sync.btrfssnap", **kwargs) @@ -67,6 +80,7 @@ def __init__(self, self.subvol = subvol self.keep = keep self.name = name + self.recursive = recursive self.btrfs = {} def on_add(self): @@ -118,52 +132,103 @@ def get_btrfs(self, label): raise ex.Error(str(e)) return self.btrfs[label] + def src(self, label, path): + return os.path.join(self.btrfs[label].rootdir, path) + + def subvols(self, label, path): + """ + sort by path so the subvols are sorted by path depth + """ + btr = self.get_btrfs(label) + src = self.src(label, path) + if not src: + return [] + if not self.recursive: + sub = btr.get_subvol(src) + if not sub: + return [] + return [sub] + subvols = [] + for subvol in btr.get_subvols().values(): + if subvol["path"] == path: + subvols.append(subvol) + elif subvol["path"].startswith(path + "/"): + subvols.append(subvol) + subvols = sorted(subvols, key=lambda x: x["path"]) + return subvols + + def create_snaps(self, label, subvol): + cmds = [] + for sv in self.subvols(label, subvol): + if "/.snap/" in sv["path"]: + continue + cmds += self.create_snap(label, sv["path"]) + return cmds + def create_snap(self, label, subvol): btrfs = self.get_btrfs(label) orig = os.path.join(btrfs.rootdir, subvol) snap = os.path.join(btrfs.rootdir, subvol) + snap += "/.snap/" + snap += datetime.datetime.utcnow().isoformat("T")+"Z" if self.name: - suffix = "."+self.name - else: - suffix = "" - suffix += ".snap.%Y-%m-%d.%H:%M:%S" - snap += datetime.datetime.now().strftime(suffix) + snap += "," + self.name try: - btrfs.snapshot(orig, snap, readonly=True, recursive=False) - except utilities.subsystems.btrfs.ExistError: - raise ex.Error('%s should not exist'%snap) - except utilities.subsystems.btrfs.ExecError: - raise ex.Error + makedirs(os.path.dirname(snap)) + except OSError as e: + self.log.debug("skip %s snap: readonly", subvol) + return [] + cmd = btrfs.snapshot_cmd(orig, snap, readonly=True) + if not cmd: + return [] + return [subprocess.list2cmdline(cmd)] - def remove_snap(self, label, subvol): - btrfs = self.get_btrfs(label) - btrfs.get_subvols(refresh=True) - snaps = {} - for sv in btrfs.subvols.values(): - if not sv["path"].startswith(subvol): + def remove_snaps(self, label, subvol): + cmds = [] + for sv in self.subvols(label, subvol): + if "/.snap/" in sv["path"]: continue - s = sv["path"].replace(subvol, "") - l = s.split('.') - if len(l) < 2: + cmds += self._remove_snaps(label, sv["path"]) + return cmds + + def _remove_snaps(self, label, subvol): + btrfs = self.get_btrfs(label) + snaps = { + datetime.datetime.utcnow().strftime(TIMEFMT): {"path": ""}, + } + # does not contain the one we created due to cache + for sv in btrfs.get_subvols().values(): + if not sv["path"].startswith(subvol+"/.snap/"): continue - if l[1] not in ("snap", self.name): + if not self.match_snap_name(sv["path"]): continue + ds = sv["path"].replace(subvol+"/.snap/", "") + ds = ds.split(",")[0] # discard optional name try: - ds = sv["path"].split(".snap.")[-1] - d = datetime.datetime.strptime(ds, "%Y-%m-%d.%H:%M:%S") + d = datetime.datetime.strptime(ds, TIMEFMT) snaps[ds] = sv["path"] except Exception as e: pass if len(snaps) <= self.keep: - return + return [] sorted_snaps = [] for ds in sorted(snaps.keys(), reverse=True): sorted_snaps.append(snaps[ds]) + cmds = [] for path in sorted_snaps[self.keep:]: - try: - btrfs.subvol_delete(os.path.join(btrfs.rootdir, path), recursive=False) - except utilities.subsystems.btrfs.ExecError: - raise ex.Error + cmd = btrfs.subvol_delete_cmd(os.path.join(btrfs.rootdir, path)) + if cmd: + cmds.append(subprocess.list2cmdline(cmd)) + return cmds + + def match_snap_name(self, path): + if self.name: + if not path.endswith(","+self.name): + return False + else: + if not path.endswith("Z"): + return False + return True def _status_one(self, label, subvol): if self.test_btrfs(label) != 0: @@ -174,23 +239,16 @@ def _status_one(self, label, subvol): except Exception as e: self.status_log("%s:%s %s" % (label, subvol, str(e))) return - try: - btrfs.get_subvols() - except: - return snaps = [] - for sv in btrfs.subvols.values(): - if not sv["path"].startswith(subvol + '.'): - continue - s = sv["path"].replace(subvol, "") - l = s.split('.') - if len(l) < 2: + for sv in btrfs.get_subvols().values(): + if not sv["path"].startswith(subvol+"/.snap/"): continue - if l[1] not in ("snap", self.name): + if not self.match_snap_name(sv["path"]): continue + ds = sv["path"].replace(subvol+"/.snap/", "") + ds = ds.split(",")[0] # discard optional name try: - ds = sv["path"].split(".snap.")[-1] - d = datetime.datetime.strptime(ds, "%Y-%m-%d.%H:%M:%S") + d = datetime.datetime.strptime(ds, TIMEFMT) snaps.append(d) except Exception as e: pass @@ -198,26 +256,44 @@ def _status_one(self, label, subvol): self.status_log("%s:%s has no snap" % (label, subvol)) return if len(snaps) > self.keep: - self.status_log("%s:%s has %d too many snaps" % (label, subvol, len(snaps)-self.keep)) + self.status_log("%s:%s has %d/%d snaps" % (label, subvol, len(snaps), self.keep)) last = sorted(snaps, reverse=True)[0] limit = datetime.datetime.now() - datetime.timedelta(seconds=self.sync_max_delay) if last < limit: - self.status_log("%s:%s last snap is too old (%s)" % (label, subvol, last.strftime("%Y-%m-%d %H:%M:%S"))) + self.status_log("%s:%s last snap is too old (%s)" % (label, subvol, last.strftime(TIMEFMT))) def _status(self, verbose=False): + not_found = [] for s in self.subvol: try: label, subvol = s.split(":") except: self.status_log("misformatted subvol entry %s (expected