diff --git a/opensvc/drivers/resource/sync/btrfs/__init__.py b/opensvc/drivers/resource/sync/btrfs/__init__.py index 53e0ce3f9..2d75cf741 100644 --- a/opensvc/drivers/resource/sync/btrfs/__init__.py +++ b/opensvc/drivers/resource/sync/btrfs/__init__.py @@ -11,6 +11,7 @@ from env import Env from utilities.converters import print_duration from utilities.string import bdecode +from utilities.files import makedirs, protected_dir from core.objects.svcdict import KEYS DRIVER_GROUP = "sync" @@ -158,7 +159,7 @@ def pre_action(self, action): for subvol in r.subvols(): src = r.src_btrfs.rootdir + "/" + subvol["path"] - dst = self.src_snap_tosend(subvol) + dst = self.src_snap_next(subvol) tosends.append((src, dst)) r.recreate_snaps(tosends) @@ -215,6 +216,7 @@ def remote_subvols(self, node): return subvols def recreate_snaps(self, snaps): + self.make_src_workdirs() self.init_src_btrfs() self.src_btrfs.subvol_delete([snap[1] for snap in snaps]) try: @@ -229,49 +231,63 @@ def get_src_info(self): self.init_src_btrfs() self.src = os.path.join(self.src_btrfs.rootdir, self.src_subvol) - def rel_src_snap_sent(self, subvol): - p = subvol["path"].replace("/","_") - base = self.src_btrfs.snapvol + "/" + p - return base + "@sent" - - def src_snap_sent(self, subvol): - p = subvol["path"].replace("/","_") - base = self.src_btrfs.snapdir + "/" + p - return base + "@sent" - - def src_snap_tosend(self, subvol): - p = subvol["path"].replace("/","_") - base = self.src_btrfs.snapdir + "/" + p - return base + "@tosend" - def get_dst_info(self, node): if node not in self.dst_btrfs: try: self.dst_btrfs[node] = utilities.subsystems.btrfs.Btrfs(label=self.dst_label, resource=self, node=node) except utilities.subsystems.btrfs.ExecError as e: raise ex.Error(str(e)) - #self.dst_btrfs[node].setup_snap() self.dst = os.path.join(self.dst_btrfs[node].rootdir, self.dst_subvol) - def rel_dst_snap_tosend(self, subvol, node): - p = subvol["path"].replace("/","_") - base = self.dst_btrfs[node].snapvol + "/" + p - return base + "@tosend" + def src_temp_dir(self): + return os.path.join(self.src_btrfs.rootdir, ".osync", self.svc.fullname, self.rid, "temp") - def dst_snap_tosend(self, subvol, node): - p = subvol["path"].replace("/","_") - base = self.dst_btrfs[node].snapdir + "/" + p - return base + "@tosend" + def dst_temp_dir(self, node): + return os.path.join(self.dst_btrfs[node].rootdir, ".osync", self.svc.fullname, self.rid, "temp") + + def src_next_dir(self): + return os.path.join(self.src_btrfs.rootdir, ".osync", self.svc.fullname, self.rid, "next") + + def dst_next_dir(self, node): + return os.path.join(self.dst_btrfs[node].rootdir, ".osync", self.svc.fullname, self.rid, "next") + + def src_last_dir(self): + return os.path.join(self.src_btrfs.rootdir, ".osync", self.svc.fullname, self.rid, "last") - def rel_dst_snap_sent(self, subvol, node): + def dst_last_dir(self, node): + return os.path.join(self.dst_btrfs[node].rootdir, ".osync", self.svc.fullname, self.rid, "last") + + def rel_snap_last(self, subvol): p = subvol["path"].replace("/","_") - base = self.dst_btrfs[node].snapvol + "/" + p - return base + "@sent" + return os.path.join(".osync", self.svc.fullname, self.rid, "last", p) - def dst_snap_sent(self, subvol, node): + def rel_snap_next(self, subvol): p = subvol["path"].replace("/","_") - base = self.dst_btrfs[node].snapdir + "/" + p - return base + "@sent" + return os.path.join(".osync", self.svc.fullname, self.rid, "next", p) + + def rel_tmp(self, subvol): + p = self.dst_subvol + subvol["path"][len(self.src_subvol):] + return os.path.join(".osync", self.svc.fullname, self.rid, "temp", p) + + def dst_tmp(self, subvol, node): + p = self.rel_tmp(subvol) + return os.path.join(self.dst_btrfs[node].rootdir, p) + + def src_snap_last(self, subvol): + p = self.rel_snap_last(subvol) + return os.path.join(self.src_btrfs.rootdir, p) + + def src_snap_next(self, subvol): + p = self.rel_snap_next(subvol) + return os.path.join(self.src_btrfs.rootdir, p) + + def dst_snap_next(self, subvol, node): + p = self.rel_snap_next(subvol) + return os.path.join(self.dst_btrfs[node].rootdir, p) + + def dst_snap_last(self, subvol, node): + p = self.rel_snap_last(subvol) + return os.path.join(self.dst_btrfs[node].rootdir, p) def get_peersenders(self): self.peersenders = set() @@ -297,24 +313,26 @@ def sanity_checks(self): self.pre_sync_check_svc_not_up() self.pre_sync_check_flex_primary() - def btrfs_send_incremental(self, subvols, node): + def btrfs_send(self, subvols, node, incremental=True): if len(subvols) == 0: return + send_cmd = ["btrfs", "send"] for subvol in subvols: - send_cmd += [ - "-c", self.src_snap_sent(subvol), - self.src_snap_tosend(subvol), - ] + if incremental: + send_cmd += ["-c", self.src_snap_last(subvol)] + send_cmd += [self.src_snap_next(subvol)] - receive_cmd = ["btrfs", "receive", self.dst_btrfs[node].snapdir] if node is not None: - receive_cmd = Env.rsh.strip(" -n").split() + [node] + receive_cmd + receive_cmd = Env.rsh.strip(" -n").split() + [node] + receive_cmd += self.make_dst_workdirs(node) + [";"] + ["btrfs", "receive", self.dst_next_dir(node)] + else: + receive_cmd = ["btrfs", "receive", self.dst_next_dir(node)] self.log.info(" ".join(send_cmd + ["|"] + receive_cmd)) p1 = Popen(send_cmd, stdout=PIPE, stderr=PIPE) pi = Popen(["dd", "bs=4096"], stdin=p1.stdout, stdout=PIPE, stderr=PIPE) - p2 = Popen(receive_cmd, stdin=pi.stdout, stdout=PIPE) + p2 = Popen(receive_cmd, stdin=pi.stdout, stdout=PIPE, stderr=PIPE) buff = p2.communicate() send_buff = p1.stderr.read() if send_buff is not None and len(send_buff) > 0: @@ -325,56 +343,30 @@ def btrfs_send_incremental(self, subvols, node): stats_buff = pi.communicate()[1] stats = self.parse_dd(stats_buff) self.update_stats(stats, target=node) + if buff[1] is not None and len(buff[1]) > 0: + for line in bdecode(buff[1]).split("\n"): + if line: + self.log.info("| " + line) else: if buff[1] is not None and len(buff[1]) > 0: - self.log.error(buff[1]) - self.log.error("sync update failed") + for line in bdecode(buff[1]).split("\n"): + if line: + self.log.error("| " + line) + self.log.error("sync failed") raise ex.Error if buff[0] is not None and len(buff[0]) > 0: for line in bdecode(buff[0]).split("\n"): if line: self.log.info("| " + line) - def btrfs_send_initial(self, subvols, node): - if len(subvols) == 0: - return - send_cmd = ["btrfs", "send"] - for subvol in subvols: - send_cmd += [self.src_snap_tosend(subvol)] - receive_cmd = ["btrfs", "receive", self.dst_btrfs[node].snapdir] - if node is not None: - receive_cmd = Env.rsh.strip(" -n").split() + [node] + receive_cmd - - self.log.info(" ".join(send_cmd + ["|"] + receive_cmd)) - p1 = Popen(send_cmd, stdout=PIPE) - pi = Popen(["dd", "bs=4096"], stdin=p1.stdout, stdout=PIPE, stderr=PIPE) - p2 = Popen(receive_cmd, stdin=pi.stdout, stdout=PIPE) - buff = p2.communicate() - if p2.returncode == 0: - stats_buff = pi.communicate()[1] - stats = self.parse_dd(stats_buff) - self.update_stats(stats, target=node) - else: - if buff[1] is not None and len(buff[1]) > 0: - self.log.error(buff[1]) - self.log.error("full sync failed") - raise ex.Error - if buff[0] is not None and len(buff[0]) > 0: - self.log.info(buff[0]) - - def to_snap_path(self, p, node=None): - btr = self.get_btrfs(node) - snapdir = btr.snapdir - return os.path.join(snapdir, p.replace("/", "_")) - def cleanup_remote(self, node, remote_subvols): self.init_src_btrfs() o = self.get_btrfs(node) l = [] candidates = [] for subvol in self.subvols(): - candidates.append(self.rel_dst_snap_tosend(subvol, node)) - candidates.append(self.rel_dst_tmp(subvol, node)) + candidates.append(self.rel_snap_next(subvol)) + candidates.append(self.rel_tmp(subvol)) for subvol in remote_subvols: if subvol["path"] in candidates: @@ -387,33 +379,51 @@ def cleanup_remote(self, node, remote_subvols): except utilities.subsystems.btrfs.ExecError: raise ex.Error() - def remove_snap(self, subvol, node=None): - #self.init_src_btrfs() - o = self.get_btrfs(node) - if node is not None: - p = self.dst_snap_sent(subvol, node) - else: - p = self.src_snap_sent(subvol) + def remove_dst_snap_temp(self, node): + return ["rm -rf %s" % self.dst_temp_dir(node)] - if not o.has_subvol(p): + def remove_dst_snap_next(self, node): + o = self.get_btrfs(node) + p = self.dst_next_dir(node) + subvols = o.get_subvols_in_path(p) + cmd = o.subvol_delete_cmd(subvols) or [] + if not cmd: return [] - - cmd = o.subvol_delete_cmd(p) or [] return [" ".join(cmd)] - def rename_snap(self, subvol, node=None): - self.init_src_btrfs() + def remove_dst_snap_last(self, node): o = self.get_btrfs(node) - if node is None: - src = self.src_snap_tosend(subvol) - dst = self.src_snap_sent(subvol) - else: - src = self.dst_snap_tosend(subvol, node) - dst = self.dst_snap_sent(subvol, node) + p = self.dst_last_dir(node) + subvols = o.get_subvols_in_path(p) + cmd = o.subvol_delete_cmd(subvols) or [] + if not cmd: + return [] + return [" ".join(cmd)] + + def remove_src_snap_last(self): + o = self.get_btrfs() + p = self.src_last_dir() + subvols = o.get_subvols_in_path(p) + cmd = o.subvol_delete_cmd(subvols) or [] + if not cmd: + return [] + return [" ".join(cmd)] + def rename_src_snap_next(self): + src = self.src_next_dir() + dst = self.src_last_dir() cmd = ["mv", "-v", src, dst] return [" ".join(cmd)] + def rename_dst_snap_next(self, node): + src = self.dst_next_dir(node) + dst = self.dst_last_dir(node) + cmds = [ + "rm -rf %s" % dst, + "mv -v %s %s" % (src, dst), + ] + return cmds + def remove_dst(self, subvol, node): dst = os.path.join(self.dst, subvol["path"]) cmd = self.dst_btrfs[node].subvol_delete_cmd(dst) @@ -421,42 +431,36 @@ def remove_dst(self, subvol, node): return [] return [" ".join(cmd)] - def rel_dst_tmp(self, subvol, node): - return os.path.join( - ".opensvc/tmp", - self.dst_subvol, - subvol["path"][len(self.src_subvol):].lstrip("/"), - ).rstrip("/") - - - def dst_tmp(self, subvol, node): - return os.path.join( - self.dst_btrfs[node].rootdir, - self.rel_dst_tmp(subvol, node), - ) - def install_final(self, node): - cmds = [] - - subvols = self.dst_btrfs[node].get_subvols() - paths = [os.path.join(self.dst_btrfs[node].rootdir, s["path"]) for s in subvols.values() if s["path"].startswith(self.dst_subvol+"/") or s["path"] == self.dst_subvol] - head_subvol = self.subvols()[0] - src = self.dst_tmp(head_subvol, node) - cmd = self.dst_btrfs[node].subvol_delete_cmd(paths) or [] - if cmd: + src = os.path.join(self.dst_temp_dir(node), head_subvol["path"]) + if protected_dir(self.dst): + raise ex.Error("%s is a protected dir. refuse to remove" % self.dst) + cmds = [ + "rm -rf %s" % self.dst, + "mv -v %s %s" % (src, self.dst), + ] + return cmds + + def install_dst(self, subvols, node): + cmds = [] + for subvol in subvols: + src = self.dst_snap_last(subvol, node) + dst = self.dst_tmp(subvol, node) + cmd = self.dst_btrfs[node].snapshot_cmd(src, dst, readonly=False) + if not cmd: + continue + cmds.append("mkdir -p %s" % os.path.dirname(dst)) cmds.append(" ".join(cmd)) - cmd = ["mv", "-v", src, self.dst] - cmds.append(" ".join(cmd)) return cmds - def install_dst(self, subvol, node): - src = self.dst_snap_sent(subvol, node) - dst = self.dst_tmp(subvol, node) - cmd = self.dst_btrfs[node].snapshot_cmd(src, dst, readonly=False) - if not cmd: - return [] - return [" ".join(cmd)] + def make_src_workdirs(self): + makedirs(self.src_last_dir()) + makedirs(self.src_next_dir()) + makedirs(self.src_temp_dir()) + + def make_dst_workdirs(self, node): + return [ "mkdir -p %s %s %s" % (self.dst_last_dir(node), self.dst_next_dir(node), self.dst_temp_dir(node)) ] def get_btrfs(self, node=None): if node: @@ -471,7 +475,7 @@ def do_cmds(self, cmds, node=None): if ret != 0: raise ex.Error - def _sync_update(self, action): + def _sync_update(self, action, full=False): self.init_src_btrfs() try: self.sanity_checks() @@ -481,38 +485,48 @@ def _sync_update(self, action): if len(self.targets) == 0: return self.get_src_info() - subvols_paths = [s["path"] for s in self.all_subvols()] - src_cmds = [] + self.make_src_workdirs() subvols = self.subvols() + src_cmds = [] for n in self.targets: self.get_dst_info(n) + remote_btrfs = self.get_btrfs(n) remote_subvols = self.remote_subvols(n) self.cleanup_remote(n, remote_subvols) - rsubvols_paths = [s["path"] for s in remote_subvols] - cmds = [] + dst_cmds = [] incrs = [] fulls = [] for subvol in subvols: - if self.rel_dst_snap_sent(subvol, n) in rsubvols_paths and self.rel_src_snap_sent(subvol) in subvols_paths: - incrs.append(subvol) - else: + src_snap_path = self.src_snap_last(subvol) + src_snap = self.src_btrfs.get_subvol(src_snap_path) + dst_snap_path = self.dst_snap_last(subvol, n) + dst_snap = remote_btrfs.get_subvol(dst_snap_path) + if not src_snap: + self.log.info("upgrade %s to full copy because %s was not found", subvol["path"], src_snap_path) fulls.append(subvol) - for subvol in subvols: - cmds += self.remove_snap(subvol, n) - src_cmds += self.remove_snap(subvol) - for subvol in subvols: - cmds += self.rename_snap(subvol, n) - src_cmds += self.rename_snap(subvol) - for subvol in subvols: - cmds += self.remove_dst(subvol, n) - for subvol in subvols: - cmds += self.install_dst(subvol, n) - cmds += self.install_final(n) - self.btrfs_send_incremental(incrs, n) - self.btrfs_send_initial(fulls, n) - self.do_cmds(cmds, n) - + elif not dst_snap: + self.log.info("upgrade %s to full copy because %s on %s was not found", subvol["path"], dst_snap_path, n) + fulls.append(subvol) + elif dst_snap["received_uuid"] == "-": + self.log.info("upgrade %s to full copy because %s on %s has been turned rw", subvol["path"], dst_snap_path, n) + fulls.append(subvol) + elif src_snap["uuid"] != dst_snap["received_uuid"]: + self.log.info("upgrade %s to full copy because %s on %s has been received from a different subset %s", subvol["path"], dst_snap_path, n, dst_snap["received_uuid"]) + fulls.append(subvol) + else: + incrs.append(subvol) + dst_cmds += self.remove_dst_snap_last(n) + dst_cmds += self.remove_dst_snap_temp(n) + dst_cmds += self.rename_dst_snap_next(n) + dst_cmds += self.install_dst(subvols, n) + dst_cmds += self.install_final(n) + self.btrfs_send(incrs, n, incremental=True) + self.btrfs_send(fulls, n, incremental=False) + self.do_cmds(dst_cmds, n) + + src_cmds += self.remove_src_snap_last() + src_cmds += self.rename_src_snap_next() self.do_cmds(src_cmds) self.write_statefile() for n in self.targets: @@ -520,37 +534,7 @@ def _sync_update(self, action): self.write_stats() def sync_full(self): - self.init_src_btrfs() - try: - self.sanity_checks() - except ex.Error: - return - self.get_src_info() - self.get_targets() - src_cmds = [] - for n in self.targets: - self.get_dst_info(n) - remote_subvols = self.remote_subvols(n) - self.cleanup_remote(n, remote_subvols) - cmds = [] - for subvol in self.subvols(): - cmds += self.remove_snap(subvol, n) - src_cmds += self.remove_snap(subvol, n) - for subvol in self.subvols(): - cmds += self.rename_snap(subvol, n) - src_cmds += self.rename_snap(subvol, n) - for subvol in self.subvols(): - cmds += self.remove_dst(subvol, n) - for subvol in self.subvols(): - cmds += self.install_dst(subvol, n) - - cmds += self.install_final(n) - self.btrfs_send_initial(self.subvols(), n) - self.do_cmds(cmds, n) - self.do_cmds(src_cmds) - self.write_statefile() - for n in self.targets: - self.push_statefile(n) + self._sync_update(None, full=True) def can_sync(self, target=None): return True diff --git a/opensvc/utilities/subsystems/btrfs.py b/opensvc/utilities/subsystems/btrfs.py index 2add8f7fe..c02e5dd48 100644 --- a/opensvc/utilities/subsystems/btrfs.py +++ b/opensvc/utilities/subsystems/btrfs.py @@ -32,8 +32,6 @@ def btrfs_devs(mnt): class Btrfs(object): log = None - snapvol = ".opensvc/snapshots" - tempvol = ".opensvc/tmp" def __init__(self, path=None, label=None, node=None, resource=None): self.path = path @@ -60,10 +58,6 @@ def __init__(self, path=None, label=None, node=None, resource=None): raise InitError("failed to determine btrfs label") self.rootdir = os.path.join(Env.paths.pathvar, 'btrfs', self.label) - self.snapdir = os.path.join(self.rootdir, self.snapvol) - self.snapdir = os.path.normpath(self.snapdir) - self.tempdir = os.path.join(self.rootdir, self.tempvol) - self.tempdir = os.path.normpath(self.tempdir) self.path = self.rootdir self.setup_rootvol() @@ -127,13 +121,14 @@ def get_snaps_of(self, path, refresh=False): def get_subvols(self, refresh=False): """ - ID 1070 gen 1071 top level 5 parent_uuid 4a48e4ca-634f-7f40-8c9a-5df931ddfa3e uuid 1f92bd2b-f032-834c-ba70-481f2bb04ae0 path bt1_child1@sent - 1 3 6 8 10 12 + ID 9203 gen 19446 parent 5 top level 5 parent_uuid 9a272ac8-b089-d540-8777-87535ee8f4be received_uuid d77d16ad-a1ad-f84f-8392-5d2a66fc6cbf uuid 23a3e8aa-8996-1e41-b810-017d4afc4c8a path .opensvc/snapshots/bt1@sent + + 1 3 5 8 10 12 13 14 16 """ if not refresh and self.subvols is not None: return self.subvols self.subvols = {} - cmd = ['btrfs', 'subvol', 'list', '-qu', self.path] + cmd = ['btrfs', 'subvol', 'list', '-qupR', self.path] out, err, ret = self.justcall(cmd) if ret != 0: cmd_string = " ".join(cmd) @@ -148,9 +143,11 @@ def get_subvols(self, refresh=False): subvol = {} subvol['id'] = int(l[1]) subvol['gen'] = int(l[3]) - subvol['top'] = int(l[6]) - subvol['parent_uuid'] = l[8] - subvol['uuid'] = l[10] + subvol['parent'] = int(l[5]) + subvol['top'] = int(l[8]) + subvol['parent_uuid'] = l[10] + subvol['received_uuid'] = l[12] + subvol['uuid'] = l[14] subvol['path'] = line[line.index(" path ")+6:] self.subvols[subvol['id']] = subvol return self.subvols @@ -196,8 +193,8 @@ def fsunfreeze(self): if ret != 0: raise ExecError() - def get_subvols_in_path(self, path): - self.get_subvols(refresh=True) + def get_subvols_in_path(self, path, refresh=False): + self.get_subvols(refresh=refresh) head = self.path_to_subvol(path) subvols = [path] for subvol in self.subvols.values(): @@ -299,21 +296,8 @@ def setup_rootvol(self): ret, out, err = self.vcall(cmd) if ret != 0: raise ExecError("error creating dir %s:\n"%self.rootdir+err) - self.mount_rootvol() - if not self.dir_exists(self.snapdir): - cmd = ['mkdir', '-p', self.snapdir] - ret, out, err = self.vcall(cmd) - if ret != 0: - raise ExecError("error creating dir %s:\n"%self.snapdir+err) - - if not self.dir_exists(self.tempdir): - cmd = ['mkdir', '-p', self.tempdir] - ret, out, err = self.vcall(cmd) - if ret != 0: - raise ExecError("error creating dir %s:\n"%self.tempdir+err) - def get_mounts(self): """ /dev/vdb on /data type btrfs (rw) [data]