Skip to content

Commit

Permalink
Handle local vol aliases for receive, issue #185
Browse files Browse the repository at this point in the history
  • Loading branch information
tasket committed Apr 15, 2024
1 parent 81a8f14 commit 6938f6f
Showing 1 changed file with 42 additions and 39 deletions.
81 changes: 42 additions & 39 deletions src/wyng
Original file line number Diff line number Diff line change
Expand Up @@ -369,15 +369,15 @@ class ArchiveVolume:
max_volinfosz = (ArchiveSet.mhash_sz + sesname_sz + 10) * max_sessions \
+ volname_len + ArchiveSet.comment_len + 10

__slots__ = ("vid","name","hashval","archive","path","alias","sessions","sesnames",
__slots__ = ("vid","name","hashval","archive","path","alias","aliastype","sessions","sesnames",
"_seslist","last","meta_checked","tags","desc","changed_bytes")

def __init__(self, archive, vid, hashval, path, name=None, children=2):
self.vid = vid ; self.tags = {}
self.archive = archive ; self.path = path
self.hashval = hashval ; self.changed_bytes = 0
self.last = "None" ; self.meta_checked = False
self.alias = None
self.alias = name ; self.aliastype = None
# persisted here:
self.name = name ; self.desc = ""
self.sessions = {} ; self.sesnames= []
Expand Down Expand Up @@ -405,6 +405,7 @@ class ArchiveVolume:
else:
setattr(self, vname, value)
if not self.name: raise ValueError("Vol name missing")
self.alias = self.name

# session name list sorted by sequence field
self._seslist = seslist = list(self.sessions.values()) if children > 1 else []
Expand Down Expand Up @@ -1213,6 +1214,7 @@ class LocalStorage:
self.path = self.pooltype = self.fstype = self.lvpool = None; self.online = False

loctype, locvol, locpool, pathxvg = LocalStorage.parse_local_path(localpath)
ol_reason = ""

if loctype is None:
self.path = None
Expand All @@ -1236,7 +1238,7 @@ class LocalStorage:
if os.stat(self.path).st_ino == 256:
self.snappath = self.path+"wyng_snapshot_tmp/" ; self.online = True
else:
print("Offline:", self.path, "is not a subvolume.")
ol_reason = " is not a subvolume."
elif fs == "xfs":
self.snappath = "" ; self.online = True
#self.snappath = self.path+("wyng_snapshot_tmp/" if fs == "btrfs" else "")
Expand All @@ -1252,7 +1254,7 @@ class LocalStorage:
(ln.split(":") for ln in open("/etc/group","r") if ln.strip())}

if require_online and not self.online:
err_out("Local storage is offline: "+repr(localpath))
err_out("Local storage is offline: "+repr(localpath)+ol_reason)

if self.online: #Fix: make conditional on send/monitor/receive
self.LVolClass = self.stypes[self.pooltype]
Expand Down Expand Up @@ -2065,8 +2067,10 @@ def get_configs_remote(dest, base_dir, opts):


# Returns either a single LocalStorage obj from --local, or a dict of them
# with associated vol lists defined in --local-from json file:
# s = {"vg1/pool1": ["vol1","vol-xyz"], "/mnt/btrfs/mysubvol": ["vol8","vol-abc"]}
# with associated list of vol name, alias pairs defined in --local-from json file:
# s = {"vg1/pool1": [("vol1",""), ("vol-xyz","xyz-alias")], ...}

# alias is used for receiving to a local vol name different from the archive vol name

def get_configs_storage(aset, require_local=True):
storage, storagesets = None, {}
Expand All @@ -2082,15 +2086,21 @@ def get_configs_storage(aset, require_local=True):

for url, vols in locallists.items():
if not vols: x_it(1, "Empty vol list for "+url)

for vname, alias in vols:
if debug: print("alias", vname, "=", alias)
if alias and vname in aset.vols:
aset.vols[vname].alias = alias ; aset.vols[vname].aliastype = "rlnk-lvm"

storagesets[LocalStorage(url, require_online=require_local,
arch_vols=arch_vols[:], auuid=aset.uuid)] = vols
arch_vols=arch_vols[:], auuid=aset.uuid)] \
= [x[0] for x in vols]

else:
storage = LocalStorage(local_url or None, require_online=require_local,
arch_vols=arch_vols[:], auuid=aset.uuid)
storagesets[storage] = []
# # if require_local and not storagesets:
# # x_it(7, f"Please specify local storage for {opts.action}.")

return storage, storagesets


Expand Down Expand Up @@ -2633,10 +2643,11 @@ def prepare_snapshots_lvm(storage, aset, datavols, monitor_only, import_others=[
add_volume(aset, datavol, aset.opts.voldesc)

vol = aset.vols[datavol]
if vol.alias: continue
if vol.aliastype == "import_other": continue
if datavol in other_vols:
print(" Queuing full scan of import '%s'" % datavol)
vol.alias = other_vols[datavol] ; continue
vol.alias, vol.aliastype = other_vols[datavol], "import_other"
continue

# 'mapfile' is the deltamap file, snap1 holds vol state between send/monitor ops
l_vol = storage.new_vol_entry(datavol, vol.vid) ; snap1, snap2 = l_vol.snap1, l_vol.snap2
Expand Down Expand Up @@ -2728,10 +2739,11 @@ def prepare_snapshots_reflink(storage, aset, datavols, monitor_only, import_othe
add_volume(aset, datavol, aset.opts.voldesc)

vol = aset.vols[datavol]
if vol.alias: continue
if vol.aliastype == "import_other": continue
if datavol in other_vols:
print(" Queuing full scan of import from '%s'" % datavol)
vol.alias = other_vols[datavol] ; continue
print(" Queuing full scan of import '%s'" % datavol)
vol.alias, vol.aliastype = other_vols[datavol], "import_other"
continue

# 'mapfile' is the deltamap file, snap1 holds vol state between send/monitor ops
l_vol = storage.new_vol_entry(datavol, vol.vid) ; snap1, snap2 = l_vol.snap1, l_vol.snap2
Expand Down Expand Up @@ -3075,8 +3087,8 @@ def manifest_to_deltamap(volume, manifest, mapsize):
# send_volume() has two main modes which are full (send_all) and incremental. After send
# finishes a full session, the volume will have a new deltamap and snapshot pair to track
# future changes. After an incremental send, snapshots are rotated and the deltamap is reset.
# An exception is when vol.alias is set and no deltamap or snapshot is used so the send will
# perform more slowly like a traditional incremental backup.
# An exception is when vol.aliastype is set to "import_other" and no deltamap or snapshot is
# used so the send will perform more slowly like a traditional incremental backup.
#
# Returns (bool, int, int, int) representing whether a session was sent, the current volume size,
# the number of bytes sent and the runtime in seconds.
Expand All @@ -3094,7 +3106,7 @@ def send_volume(storage, vol, curtime, ses_tags, send_all=False, benchmark=False
tar_info.size = fileobj.write(etag) + fileobj.write(buf) ; fileobj.seek(0)
tarf_addfile(tarinfo=tar_info, fileobj=fileobj)

if vol.alias:
if vol.aliastype == "import_other":
# Use a plain file or blockdev as a source volume
volpath = vol.alias ; send_all = True
vol_ts = os.stat(volpath).st_mtime_ns ; volperms = storage.getperms(volpath)
Expand Down Expand Up @@ -3461,7 +3473,7 @@ def monitor_send(storage, aset, datavols, monitor_only, use_sesid=None):

storage.check_support()
if not storage.online:
err_out("\nStorage offline: " + storage.path); err_out(" Skipping " + ", ".join(datavols))
err_out("\nOffline volumes: " + ", ".join(datavols))
error_cache.extend(datavols)
return curtime

Expand Down Expand Up @@ -3966,7 +3978,7 @@ def receive_volume(storage, vol, select_ses="", save_path="", diff=False, verify
select_ses = "S_"+select_ses

if select_ses not in sessions:
err_out(f"No volume '{vol.name}' in session {select_ses}.")
err_out(f"No volume '{vol.alias}' in session {select_ses}.")
return None

elif len(sessions) > 0:
Expand Down Expand Up @@ -3998,7 +4010,7 @@ def receive_volume(storage, vol, select_ses="", save_path="", diff=False, verify
if not storage.online: x_it(1, "Local storage is offline!")
save_storage = storage ; returned_home = storage.online
save_type = "tlvm pool" if storage.pooltype=="tlvm" else "file"
l_vol = save_storage.new_vol_entry(vol.name, vol.vid) ; save_path = l_vol.path
l_vol = save_storage.new_vol_entry(vol.alias, vol.vid) ; save_path = l_vol.path

if exists(save_path) and attended:
print("\n!! This will", "overwrite" if sparse_write else "erase all",
Expand Down Expand Up @@ -4043,23 +4055,12 @@ def receive_volume(storage, vol, select_ses="", save_path="", diff=False, verify
volf = open(save_path, "r+b") ; volf.truncate(volsize)

elif diff:
l_vol = storage.lvols[vol.name] ; snap1vol = l_vol.snap1
l_vol = storage.lvols[vol.alias] ; snap1vol = l_vol.snap1
if not l_vol.exists():
err_out("Local volume must exist for diff.")
return None
if remap:
raise NotImplementedError()
#lv_remove(vgname, snap1vol)
#do_exec([[CP.lvm,"lvcreate", "-pr", "-kn", "-ay", "--addtag=wyng",
#"--addtag="+vol.last, "--addtag=delta",
#"--addtag=arch-"+aset.uuid, "-s", vgname+"/"+datavol, "-n", snap1vol]])
#print(" Initial snapshot created for", datavol)
#get_lvm_vgs(aset.vgname)
#if not exists(vol.mapfile()):
#vol.init_deltamap()
#bmapf = open(vol.mapfile(), "r+b")
#bmapf.truncate(vol.mapsize()) ; bmapf.flush()
#bmap_mm = mmap.mmap(bmapf.fileno(), 0)
else:
if storage.lvols[snap1vol].exists():
l_vol = storage.lvols[snap1vol]
Expand All @@ -4077,7 +4078,7 @@ def receive_volume(storage, vol, select_ses="", save_path="", diff=False, verify
volfno = volf.fileno() ; fcntl.lockf(volf, fcntl.LOCK_EX|fcntl.LOCK_NB)

if verify_only != 2:
print("\nReceiving" if save_path else "\nVerifying", f"volume '{vol.name}'",select_ses[2:])
print("\nReceiving" if save_path else "\nVerifying", f"volume '{vol.alias}'",select_ses[2:])
if save_path: print("Saving to %s '%s'" % (save_type, save_path))

# Collect session manifests
Expand Down Expand Up @@ -4244,10 +4245,10 @@ def receive_volume(storage, vol, select_ses="", save_path="", diff=False, verify
vol.init_deltamap()
tags = ["--addtag=wyng", "--addtag=arch-"+aset.uuid, "--addtag="+sessions[-1]]
save_storage.lvols[l_vol.snap2].delete()
save_storage.lvols[l_vol.snap2].create(snapshotfrom=vol.name, addtags=tags)
save_storage.lvols[l_vol.snap2].create(snapshotfrom=vol.alias, addtags=tags)
l_vol = l_vol.update() ; save_storage.lvols[l_vol.snap2].update()
l_vol.rotate_snapshots(timestamp_path=vol.mapfile())
print(f" Snapshot paired for '{vol.name}'.")
print(f" Snapshot paired for '{vol.alias}'.")
if remap:
bmapf.close()
if diff_count > 0 and options.action != "send":
Expand Down Expand Up @@ -4535,7 +4536,7 @@ def cleanup():
except:
pass
if error_cache:
err_out("Error on volume(s): " + ", ".join(error_cache))
err_out("Error on volumes: " + ", ".join(error_cache))
os._exit(2)


Expand All @@ -4545,7 +4546,7 @@ def cleanup():

# Constants / Globals
prog_name = "wyng"
prog_version = "0.8wip" ; prog_date = "20240412"
prog_version = "0.8wip" ; prog_date = "20240414"
format_version = 3 ; debug = False ; tmpdir = None
admin_permission = os.getuid() == 0

Expand Down Expand Up @@ -4660,6 +4661,8 @@ if options.save_to and options.action != "receive":
if not (options.volumes or options.all or options.import_other_from or options.local_from) \
and options.action not in ("arch-init","arch-check","list"):
x_it(1, f"Volume name(s) or --all required for {options.action}.")
if options.import_other_from and options.action != "send":
x_it(1, "Option --import-other-from may only be used with send.")
if options.action not in ("send", "arch-deduplicate"): options.dedup = False
if options.action == "arch-deduplicate": options.dedup = True
options.unattended = options.unattended or not sys.stdin.isatty()
Expand Down Expand Up @@ -4790,7 +4793,7 @@ elif options.action == "receive":
for storage, vols in storagesets.items():
dvlist = vols or selected_vols
if not storage.online:
err_out("\nStorage offline: " + storage.path + "\n Skipping " + ", ".join(dvlist))
err_out("\n Offline volumes: " + ", ".join(dvlist))
error_cache.extend(dvlist) ; continue

for dv in dvlist:
Expand All @@ -4811,7 +4814,7 @@ elif options.action == "diff":
for storage, vols in storagesets.items():
dvlist = vols or selected_vols
if not storage.online:
err_out("\nStorage offline: " + storage.path + "\n Skipping " + ", ".join(dvlist))
err_out("\n Offline volumes: " + ", ".join(dvlist))
error_cache.extend(dvlist) ; continue

for dv in dvlist:
Expand Down

0 comments on commit 6938f6f

Please sign in to comment.