Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt to better handle incomplete transfers #13

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 115 additions & 24 deletions snapbtrex.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,12 @@ def freespace(path):


class Operations:
def __init__(self, path, trace=None):
def __init__(self, path, trace=None, handle_incomplete=False):
self.tracef = trace
self.path = path
self.handle_incomplete = handle_incomplete

def check_call(self, args, shell=False):
def check_call(self, args, shell=False, dry_safe=False):
cmd_str = " ".join(args)
self.trace(LOG_EXEC + cmd_str)
import subprocess
Expand Down Expand Up @@ -300,10 +301,28 @@ def listdir(self):
def listdir_path(self, target_path):
return [d for d in os.listdir(target_path) if timef(d)]

def remote_check_incomplete(self, receiver, receiver_path, ssh_port):
self.trace(LOG_REMOTE + "check for incomplete received snap host=%s, dir=%s", receiver, receiver_path)
args = ["ssh", "-p", ssh_port, receiver, "cat", os.path.join(receiver_path, ".snapbtrex_incomplete")]
try:
incomplete = self.check_call(args, dry_safe=True).strip()
return incomplete
except RuntimeError:
return None

def remote_get_received_uuid(self, receiver, snapshot_path, ssh_port):
self.trace(LOG_REMOTE + "get received uuid host=%s, dir=%s", receiver, snapshot_path)
args = ["ssh", "-p", ssh_port, receiver, "sudo", "btrfs", "subvolume", "show", snapshot_path]
for line in self.check_call(args, dry_safe=True).splitlines():
parts = line.strip().split(':')
if parts[0] == "Received UUID":
return parts[1].strip()
return None

def listremote_dir(self, receiver, receiver_path, ssh_port):
self.trace(LOG_REMOTE + "list remote files host=%s, dir=%s", receiver, receiver_path)
args = ["ssh", "-p", ssh_port, receiver, "ls -1 " + receiver_path]
return [d for d in self.check_call(args).splitlines() if timef(d)]
return [d for d in self.check_call(args, dry_safe=True).splitlines() if timef(d)]

def snap(self, path):
# yt: changed to readonly snapshots
Expand All @@ -327,6 +346,12 @@ def trace(self, *args, **kwargs):

def send_single(self, snap, receiver, receiver_path, ssh_port, rate_limit):
self.trace(LOG_REMOTE + "send single snapshot from %s to host %s path=%s", snap, receiver, receiver_path)

if self.handle_incomplete:
# Mark as incomplete
args = ["ssh -p " + ssh_port + " " + receiver + " 'echo " + snap + " > " + os.path.join(receiver_path, ".snapbtrex_incomplete") + "'"]
self.check_call(args, shell=True)

args = ["sudo btrfs send -v " +
os.path.join(self.path, snap) +
" | pv -brtfL " + rate_limit + " | " +
Expand All @@ -335,9 +360,20 @@ def send_single(self, snap, receiver, receiver_path, ssh_port, rate_limit):
# TODO: breakup the pipe stuff and do it without shell=True, currently it has problems with pipes :(
self.check_call(args, shell=True)

if self.handle_incomplete:
# Remove incomplete marker
args = ["ssh -p " + ssh_port + " " + receiver + " 'rm " + os.path.join(receiver_path, ".snapbtrex_incomplete") + "'"]
self.check_call(args, shell=True)

def send_withparent(self, parent_snap, snap, receiver, receiver_path, ssh_port, rate_limit):
self.trace(LOG_REMOTE + "send snapshot from %s with parent %s to host %s path=%s", snap, parent_snap, receiver,
receiver_path)

if self.handle_incomplete:
# Mark as incomplete
args = ["ssh -p " + ssh_port + " " + receiver + " 'echo " + snap + " > " + os.path.join(receiver_path, ".snapbtrex_incomplete") + "'"]
self.check_call(args, shell=True)

args = ["sudo btrfs send -v -p " +
os.path.join(self.path, parent_snap) + " " +
os.path.join(self.path, snap) +
Expand All @@ -346,7 +382,12 @@ def send_withparent(self, parent_snap, snap, receiver, receiver_path, ssh_port,
" \'sudo btrfs receive -v " + receiver_path + " \'"
]
self.check_call(args, shell=True)
self.trace(LOG_REMOTE + "finished sending snapshot")

if self.handle_incomplete:
# Remove incomplete marker
args = ["ssh -p " + ssh_port + " " + receiver + " 'rm " + os.path.join(receiver_path, ".snapbtrex_incomplete") + "'"]
self.check_call(args, shell=True)
self.trace(LOG_REMOTE + "finished sending snapshot")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be un-indented.


def link_current(self, receiver, receiver_path, snap, link_target, ssh_port):
self.trace(LOG_REMOTE + "linking current snapshot host=%s path=%s snap=%s link=%s", receiver, receiver_path,
Expand Down Expand Up @@ -382,13 +423,18 @@ def sync_withparent(self, parent_snap, snap, target_path):

# Allows to Simulate operations
class DryOperations(Operations):
def __init__(self, path, trace=None):
Operations.__init__(self, path=path, trace=trace)
def __init__(self, path, trace=None, handle_incomplete=False):
Operations.__init__(self, path=path, trace=trace, handle_incomplete=handle_incomplete)
self.dirs = None

def check_call(self, args, shell=False):
def check_call(self, args, shell=False, dry_safe=False):
cmd_str = " ".join(args)
self.trace(LOG_EXEC + cmd_str)
if dry_safe:
self.trace(LOG_EXEC + "executing dry-safe command: " + cmd_str)
return Operations.check_call(self, args, shell, dry_safe)
else:
self.trace(LOG_EXEC + cmd_str)


# added to simulate also the deletion of snapshots
def listdir(self):
Expand All @@ -407,8 +453,10 @@ def __init__(self,
trace=None,
dirs=None,
space=None,
snap_space=None):
Operations.__init__(self, path=path, trace=trace)
snap_space=None,
handle_incomplete=False
):
Operations.__init__(self, path=path, trace=trace, handle_incomplete=handle_incomplete)
if dirs is None:
dirs = {}
if space is None:
Expand Down Expand Up @@ -466,6 +514,16 @@ def cleandir(operations, targets):
while True:
do_del = None
dirs = sorted(operations.listdir())

if operations.handle_incomplete:
args = ["cat", os.path.join(targets.path, ".snapbtrex_incomplete")]
try:
incomplete = operations.check_call(args, dry_safe=True).strip()
if incomplete in dirs:
raise Exception("Refusing to cleandir {} with potential incomplete snapshot={}. Delete {} if you know what you're doing".format(operations.path, incomplete, os.path.join(operations.path, ".snapbtrex_incomplete")))
except RuntimeError:
pass

dirs_len = len(dirs)
if dirs_len <= 0:
raise Exception("No more directories to clean")
Expand Down Expand Up @@ -543,19 +601,44 @@ def transfer(operations, target_host, target_dir, link_dir, ssh_port, rate_limit

parents = targetsnaps.intersection(localsnaps)

# no parent exists so
if len(parents) == 0:
# start transferring the oldest snapshot
# by that snapbtrex will transfer all snapshots that have been created
operations.send_single(min(localsnaps), target_host, target_dir, ssh_port, rate_limit)
parents.add(min(localsnaps))
max_parent = None
while max_parent == None:
# no parent exists so
if len(parents) == 0:
# start transferring the oldest snapshot
# by that snapbtrex will transfer all snapshots that have been created
operations.send_single(min(localsnaps), target_host, target_dir, ssh_port, rate_limit)
parents.add(min(localsnaps))

# parent existing, use the latest as parent
max_parent = max(parents)

trace(LOG_REMOTE + "last possible parent = %s", max_parent)

if operations.handle_incomplete:
# Try and check that the parent we've chosen is valid
# If a transfer is interrupted, then the received_uuid will be '-', and we can't use
# that snapshot
ruuid = operations.remote_get_received_uuid(target_host, os.path.join(target_dir, max_parent), ssh_port)
if ruuid == "-":
trace(LOG_REMOTE + "no received UUID for possible parent = %s", max_parent)
# Remove this from the parents list - if it has no received_uuid, we can't use it as a parent
parents.remove(max_parent)

# For extra confidence, try and use the marker file left during transfers
incomplete = operations.remote_check_incomplete(target_host, target_dir, ssh_port)
if incomplete == max_parent:
trace(LOG_REMOTE + "possible parent = %s is likely incomplete. Deleting", max_parent)
operations.remote_unsnap(target_host, target_dir, max_parent, ssh_port)
else:
# If we can't be sure, just entirely ignore that snapshot.
# Remove it from the local list because any attempt to send it will fail (already exists)
trace(LOG_REMOTE + "can't be confident that possible parent = %s is incomplete, ignoring it.", max_parent)
localsnaps.remove(max_parent)
max_parent = None

# parent existing, use the latest as parent
max_parent = max(parents)
parent = max_parent

trace(LOG_REMOTE + "last possible parent = %s", max_parent)

for s in sorted(localsnaps):
if s > max_parent:
trace(LOG_REMOTE + "transfer: parent=%s snap=%s", parent, s)
Expand Down Expand Up @@ -802,6 +885,12 @@ def parse_ageoffset_to_timestamp(age_str):
help='Verbose output',
action='store_true')

parser.add_argument(
'--handle-incomplete', '-i',
help='Attempt to handle incomplete transfers',
dest='handle_incomplete',
action='store_true')

transfer_group = parser.add_argument_group(
title='Transfer',
description='Transfer snapshots to other hosts via ssh. ' +
Expand Down Expand Up @@ -907,14 +996,16 @@ def parse_ageoffset_to_timestamp(age_str):
'20101201-070000': 7,
'20101201-080000': 8,
},
space=5)
space=5,
handle_incomplete=pa.handle_incomplete,
)
elif pa.dry_run:
trace(" ## DRY RUN ##")
trace(" ## DRY RUN ## Dry Run mode: all operations are only displayed without execution")
trace(" ## DRY RUN ## Dry Run mode: disk-modifying operations are only displayed without execution")
trace(" ## DRY RUN ##")
operations = DryOperations(path=pa.path, trace=trace)
operations = DryOperations(path=pa.path, trace=trace, handle_incomplete=pa.handle_incomplete)
else:
operations = Operations(path=pa.path, trace=trace)
operations = Operations(path=pa.path, trace=trace, handle_incomplete=pa.handle_incomplete)

if pa.snap:
operations.snap(path=pa.snap)
Expand Down